From 5cc232a5c8aac2520965e1636beb11f61b4463b7 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:10:45 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat(storage):=20MemTable=20=E8=8C=83?= =?UTF-8?q?=E5=9B=B4=E6=89=AB=E6=8F=8F=20ScanRange=E2=80=94=E2=80=94?= =?UTF-8?q?=E9=97=AD=E5=8C=BA=E9=97=B4=E5=8D=87=E5=BA=8F=E3=80=81active=20?= =?UTF-8?q?=E8=A6=86=E7=9B=96=20dirty=E3=80=81=E8=B7=B3=E5=A2=93=E7=A2=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为边缘查询提供存储层基础: - SkipList.firstGTE 定位起点,level-0 有序遍历 - ScanRange 全程持读锁,2 路归并 active(最新)+dirty,新值优先、墓碑遮蔽 - start/end 空表示该侧不限;fn 返回 false 提前停止 - Engine.Scan 转发;当前仅覆盖未刷盘热数据,SSTable 历史待 Phase 2 Co-Authored-By: Claude Opus 4.8 (1M context) --- storage/engine.go | 6 +++ storage/istorage/IMemTable.go | 3 ++ storage/zstorage/memtable.go | 60 +++++++++++++++++++++++ storage/zstorage/scan_test.go | 90 +++++++++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+) create mode 100644 storage/zstorage/scan_test.go diff --git a/storage/engine.go b/storage/engine.go index 982e308..5948808 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -41,6 +41,12 @@ func (e *Engine) Delete(key []byte) error { return e.memTable.Delete(key) } +// Scan 在 [start,end] 闭区间升序遍历最新可见键值,跳过墓碑;fn 返回 false 提前停止。 +// 当前仅覆盖 MemTable(未刷盘热数据),已刷盘到 SSTable 的历史数据待后续接入。 +func (e *Engine) Scan(start, end []byte, fn func(key, value []byte) bool) { + e.memTable.ScanRange(start, end, fn) +} + func (e *Engine) Apply(cmd StorageCommand) error { e.applyCh <- cmd return nil diff --git a/storage/istorage/IMemTable.go b/storage/istorage/IMemTable.go index 157e41b..bbfcc48 100644 --- a/storage/istorage/IMemTable.go +++ b/storage/istorage/IMemTable.go @@ -4,6 +4,9 @@ type IMemTable interface { Get(key []byte) ([]byte, error) Put(key []byte, value []byte) error Delete(key []byte) error + // ScanRange 在 [start,end] 闭区间升序遍历最新可见键值,跳过墓碑; + // fn 返回 false 提前停止。start/end 为空表示该侧不限。 + ScanRange(start, end []byte, fn func(key, value []byte) bool) Size() int StartFlush() // FlushToSSTable 将 entries 写入临时表并立即 Flush 到 SSTable diff --git a/storage/zstorage/memtable.go b/storage/zstorage/memtable.go index 0cc7f34..76b0e03 100644 --- a/storage/zstorage/memtable.go +++ b/storage/zstorage/memtable.go @@ -148,6 +148,66 @@ func (m *MemTable) Get(key []byte) ([]byte, error) { return nil, errors.New("Key not found") } +// firstGTE 返回第一个 Key >= key 的节点;key 为空表示从头开始。无则返回 nil。 +func (sl *SkipList) firstGTE(key []byte) *SkipNode { + p := sl.head + if p == nil { + return nil + } + for i := sl.level - 1; i >= 0; i-- { + for p.Next[i] != nil && bytes.Compare(p.Next[i].Key, key) < 0 { + p = p.Next[i] + } + } + return p.Next[0] +} + +// ScanRange 在 [start,end] 闭区间内升序遍历 active+dirty 合并后的最新可见键值, +// 跳过墓碑(value==nil),对每条命中调用 fn;fn 返回 false 可提前停止。 +// start/end 为空分别表示下界/上界不限。 +// +// 全程持读锁:热窗口范围扫描有界且短,期间写入与刷盘会等待。fn 内若需在调用 +// 返回后继续持有 key/value,应自行拷贝——底层切片归 MemTable 所有。 +func (m *MemTable) ScanRange(start, end []byte, fn func(key, value []byte) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + var a, d *SkipNode + if m.active != nil { + a = m.active.firstGTE(start) + } + if m.dirty != nil { + d = m.dirty.firstGTE(start) + } + + for a != nil || d != nil { + // 选更小的 key;相等时 active 为最新版本,用 active 值并同步推进 dirty。 + var key, val []byte + switch { + case d == nil || (a != nil && bytes.Compare(a.Key, d.Key) < 0): + key, val = a.Key, a.Value + a = a.Next[0] + case a == nil || bytes.Compare(d.Key, a.Key) < 0: + key, val = d.Key, d.Value + d = d.Next[0] + default: // a.Key == d.Key:active 覆盖 dirty + key, val = a.Key, a.Value + a = a.Next[0] + d = d.Next[0] + } + + if len(end) > 0 && bytes.Compare(key, end) > 0 { + return // 升序遍历已越过上界,可停 + } + if val == nil { + continue // 墓碑:跳过 + } + if !fn(key, val) { + return + } + } +} + // search 在跳表中查找指定 key,返回值和是否找到 func (sl *SkipList) search(key []byte) ([]byte, bool) { p := sl.head diff --git a/storage/zstorage/scan_test.go b/storage/zstorage/scan_test.go new file mode 100644 index 0000000..2d764e9 --- /dev/null +++ b/storage/zstorage/scan_test.go @@ -0,0 +1,90 @@ +package zstorage + +import ( + "reflect" + "testing" +) + +// collect 把 [start,end] 扫描结果收成 key:value 串对,便于断言顺序与内容。 +func collect(m *MemTable, start, end []byte) [][2]string { + var out [][2]string + m.ScanRange(start, end, func(k, v []byte) bool { + out = append(out, [2]string{string(k), string(v)}) + return true + }) + return out +} + +func newMemWith(active, dirty *SkipList) *MemTable { + return &MemTable{active: active, dirty: dirty} +} + +func sl(pairs ...[2]string) *SkipList { + s := newSkipList() + for _, p := range pairs { + var val []byte + if p[1] != "" { + val = []byte(p[1]) + } // 空串表示墓碑(nil value) + s.insert([]byte(p[0]), val) + } + return s +} + +func TestScanRange_Bounds(t *testing.T) { + m := newMemWith(sl( + [2]string{"k1", "a"}, [2]string{"k2", "b"}, [2]string{"k3", "c"}, + [2]string{"k4", "d"}, [2]string{"k5", "e"}, + ), nil) + + got := collect(m, []byte("k2"), []byte("k4")) + want := [][2]string{{"k2", "b"}, {"k3", "c"}, {"k4", "d"}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("闭区间扫描错误\n got=%v\nwant=%v", got, want) + } +} + +func TestScanRange_Unbounded(t *testing.T) { + m := newMemWith(sl([2]string{"a", "1"}, [2]string{"b", "2"}, [2]string{"c", "3"}), nil) + got := collect(m, nil, nil) + want := [][2]string{{"a", "1"}, {"b", "2"}, {"c", "3"}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("无界扫描应返回全部升序\n got=%v\nwant=%v", got, want) + } +} + +func TestScanRange_SkipTombstone(t *testing.T) { + m := newMemWith(sl( + [2]string{"k1", "a"}, [2]string{"k2", ""}, [2]string{"k3", "c"}, + ), nil) + got := collect(m, nil, nil) + want := [][2]string{{"k1", "a"}, {"k3", "c"}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("墓碑应被跳过\n got=%v\nwant=%v", got, want) + } +} + +func TestScanRange_EarlyStop(t *testing.T) { + m := newMemWith(sl([2]string{"k1", "a"}, [2]string{"k2", "b"}, [2]string{"k3", "c"}), nil) + var seen int + m.ScanRange(nil, nil, func(k, v []byte) bool { + seen++ + return false // 立即停止 + }) + if seen != 1 { + t.Fatalf("fn 返回 false 应在第一条后停止,实际遍历 %d 条", seen) + } +} + +func TestScanRange_ActiveOverridesDirty(t *testing.T) { + // active 最新:k2 覆盖 dirty 的旧值,k4 的墓碑遮蔽 dirty 的值。 + active := sl([2]string{"k2", "A"}, [2]string{"k4", ""}) + dirty := sl([2]string{"k2", "old"}, [2]string{"k3", "C"}, [2]string{"k4", "D"}) + m := newMemWith(active, dirty) + + got := collect(m, nil, nil) + want := [][2]string{{"k2", "A"}, {"k3", "C"}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("active 应覆盖 dirty 且墓碑遮蔽\n got=%v\nwant=%v", got, want) + } +} From 72d91db1821a0edd87d24da619b6591aca3841de Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:11:34 +0800 Subject: [PATCH 2/5] =?UTF-8?q?feat(predicate):=20=E6=9C=80=E5=B0=8F?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E8=B0=93=E8=AF=8D=E4=B8=8B=E6=8E=A8=EF=BC=88?= =?UTF-8?q?field=20op=20value=EF=BC=8CJSON=20=E5=80=BC=E6=AF=94=E8=BE=83?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支持 > >= < <= == !=,数值/字符串自动判别;无谓词全匹配。 覆盖具身采集常见查询(如 az>9.9 / status==ok),不引入完整表达式语言。 纯函数、零业务依赖,供边缘查询服务端筛选、只回传命中切片。 Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/predicate/predicate.go | 99 +++++++++++++++++++++++++++++++++ pkg/predicate/predicate_test.go | 33 +++++++++++ 2 files changed, 132 insertions(+) create mode 100644 pkg/predicate/predicate.go create mode 100644 pkg/predicate/predicate_test.go diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go new file mode 100644 index 0000000..245f08e --- /dev/null +++ b/pkg/predicate/predicate.go @@ -0,0 +1,99 @@ +// Package predicate 提供一个最小的字段谓词,用于边缘查询的服务端下推: +// 对 JSON 值取出指定字段,与操作数按算子比较,只让命中的行通过—— +// 从而「只回传命中切片」而非整段原始流。 +// +// 设计刻意从简:单字段、单算子、单操作数(如 az > 9.9 / status == ok), +// 覆盖具身采集的常见查询,不引入完整表达式语言。 +package predicate + +import ( + "encoding/json" + "strconv" +) + +// Op 是谓词算子。 +type Op uint8 + +const ( + OpNone Op = iota // 无谓词:全匹配 + OpGT // > + OpGTE // >= + OpLT // < + OpLTE // <= + OpEQ // == + OpNE // != +) + +// Predicate 表示「Field Op Operand」。Op==OpNone 时 Eval 恒为真。 +type Predicate struct { + Field string + Op Op + Operand string // 文本操作数:有序比较按 float64 解析,相等比较按字符串 +} + +// Eval 报告 value(JSON 对象)是否满足谓词。 +// 无谓词恒真;value 非 JSON、字段缺失、或类型无法比较时返回 false。 +func (p Predicate) Eval(value []byte) bool { + if p.Op == OpNone { + return true + } + + var obj map[string]json.RawMessage + if err := json.Unmarshal(value, &obj); err != nil { + return false + } + raw, ok := obj[p.Field] + if !ok { + return false + } + + switch p.Op { + case OpGT, OpGTE, OpLT, OpLTE: + fv, ok1 := asFloat(raw) + ov, err := strconv.ParseFloat(p.Operand, 64) + if !ok1 || err != nil { + return false + } + switch p.Op { + case OpGT: + return fv > ov + case OpGTE: + return fv >= ov + case OpLT: + return fv < ov + default: // OpLTE + return fv <= ov + } + case OpEQ: + return asScalar(raw) == p.Operand + case OpNE: + return asScalar(raw) != p.Operand + default: + return false + } +} + +// asFloat 把字段原始 JSON 解析为 float64:直接数字或带引号的数字串都接受。 +func asFloat(raw json.RawMessage) (float64, bool) { + var f float64 + if err := json.Unmarshal(raw, &f); err == nil { + return f, true + } + var s string + if err := json.Unmarshal(raw, &s); err == nil { + if f, err := strconv.ParseFloat(s, 64); err == nil { + return f, true + } + } + return 0, false +} + +// asScalar 把字段原始 JSON 归一成可比较的标量字符串: +// JSON 字符串去引号,其余(数字/布尔)按原文。 +func asScalar(raw json.RawMessage) string { + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s + } + return string(raw) +} diff --git a/pkg/predicate/predicate_test.go b/pkg/predicate/predicate_test.go new file mode 100644 index 0000000..f7c0a5a --- /dev/null +++ b/pkg/predicate/predicate_test.go @@ -0,0 +1,33 @@ +package predicate + +import "testing" + +func TestEval(t *testing.T) { + cases := []struct { + name string + pred Predicate + value string + want bool + }{ + {"无谓词恒真", Predicate{Op: OpNone}, `{"az":1}`, true}, + {"数值大于命中", Predicate{"az", OpGT, "9.9"}, `{"az":9.91}`, true}, + {"数值大于未命中", Predicate{"az", OpGT, "9.9"}, `{"az":9.8}`, false}, + {"数值大于等于边界", Predicate{"az", OpGTE, "9.9"}, `{"az":9.9}`, true}, + {"数值小于命中", Predicate{"az", OpLT, "0"}, `{"az":-0.5}`, true}, + {"数值小于等于边界", Predicate{"az", OpLTE, "9.9"}, `{"az":9.9}`, true}, + {"字符串相等命中", Predicate{"status", OpEQ, "ok"}, `{"status":"ok"}`, true}, + {"字符串相等未命中", Predicate{"status", OpEQ, "ok"}, `{"status":"err"}`, false}, + {"不等命中", Predicate{"status", OpNE, "ok"}, `{"status":"err"}`, true}, + {"带引号数字串可比", Predicate{"az", OpGT, "9.9"}, `{"az":"9.91"}`, true}, + {"字段缺失不命中", Predicate{"az", OpGT, "9.9"}, `{"ax":1}`, false}, + {"非 JSON 不命中", Predicate{"az", OpGT, "9.9"}, `not-json`, false}, + {"操作数非数值不命中", Predicate{"az", OpGT, "abc"}, `{"az":1}`, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := c.pred.Eval([]byte(c.value)); got != c.want { + t.Fatalf("Eval(%s) = %v, want %v", c.value, got, c.want) + } + }) + } +} From 6a979a42e4371a26f71854180229c6c2dbfb250c Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:14:59 +0800 Subject: [PATCH 3/5] =?UTF-8?q?feat(service):=20SCAN=20=E8=BE=B9=E7=BC=98?= =?UTF-8?q?=E8=8C=83=E5=9B=B4=E6=9F=A5=E8=AF=A2=E7=AB=AF=E5=88=B0=E7=AB=AF?= =?UTF-8?q?=E2=80=94=E2=80=94=E5=8D=8F=E8=AE=AE=E7=BC=96=E8=A7=A3=E7=A0=81?= =?UTF-8?q?+=E8=B0=93=E8=AF=8D=E4=B8=8B=E6=8E=A8+=E5=8F=AA=E5=9B=9E?= =?UTF-8?q?=E4=BC=A0=E5=91=BD=E4=B8=AD=E5=88=87=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pkg/proto: SCAN msgID 与请求/响应 wire 编解码(server/client 共用) - KVServer.Scan: 扫 MemTable 热数据,谓词下推筛选,命中拷贝返回,10000 条封顶并告警 - router.handleScan + 两个 Server 注册 SCAN 路由 - metrics 增 Scans 计数并入快照 谓词由钩子放行(仅 PUT 过滤),SCAN 原样直达。 Co-Authored-By: Claude Opus 4.8 (1M context) --- Server/server.go | 1 + Server/server_pprof.go | 1 + pkg/metrics/metrics.go | 4 ++ pkg/proto/codes.go | 1 + pkg/proto/scan.go | 136 +++++++++++++++++++++++++++++++++++++++++ pkg/proto/scan_test.go | 64 +++++++++++++++++++ service/fsm.go | 27 ++++++++ service/router.go | 16 +++++ 8 files changed, 250 insertions(+) create mode 100644 pkg/proto/scan.go create mode 100644 pkg/proto/scan_test.go diff --git a/Server/server.go b/Server/server.go index e044c06..0a7914c 100644 --- a/Server/server.go +++ b/Server/server.go @@ -37,6 +37,7 @@ func main() { server.AddRouter(proto.MsgPut, router) server.AddRouter(proto.MsgGet, router) server.AddRouter(proto.MsgDelete, router) + server.AddRouter(proto.MsgScan, router) // 注册连接生命周期回调 server.SetConnStartFunc(router.OnConnStart) diff --git a/Server/server_pprof.go b/Server/server_pprof.go index ed08997..f3a3022 100644 --- a/Server/server_pprof.go +++ b/Server/server_pprof.go @@ -47,6 +47,7 @@ func main() { server.AddRouter(proto.MsgPut, router) server.AddRouter(proto.MsgGet, router) server.AddRouter(proto.MsgDelete, router) + server.AddRouter(proto.MsgScan, router) // 注册连接生命周期回调 server.SetConnStartFunc(router.OnConnStart) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d29d654..41402f9 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -20,6 +20,7 @@ var ( FramesDroppedNonMonotonic atomic.Int64 // 被钩子按「时间戳回退/重放」丢弃 Writes atomic.Int64 // 成功写入(PUT)次数 Reads atomic.Int64 // 读取(GET)次数 + Scans atomic.Int64 // 边缘范围查询(SCAN)次数 Deletes atomic.Int64 // 成功删除(DEL)次数 WriteErrors atomic.Int64 // 写入/删除失败次数 BackpressureStalls atomic.Int64 // 写入被字节信用背压阻塞的次数 @@ -54,6 +55,7 @@ type Snapshot struct { DroppedNonMonotonic int64 Writes int64 Reads int64 + Scans int64 Deletes int64 WriteErrors int64 BackpressureStalls int64 @@ -69,6 +71,7 @@ func Take() Snapshot { DroppedNonMonotonic: FramesDroppedNonMonotonic.Load(), Writes: Writes.Load(), Reads: Reads.Load(), + Scans: Scans.Load(), Deletes: Deletes.Load(), WriteErrors: WriteErrors.Load(), BackpressureStalls: BackpressureStalls.Load(), @@ -86,6 +89,7 @@ func LogSnapshot() { "dropped_non_monotonic", s.DroppedNonMonotonic, "writes", s.Writes, "reads", s.Reads, + "scans", s.Scans, "deletes", s.Deletes, "write_errors", s.WriteErrors, "backpressure_stalls", s.BackpressureStalls, diff --git a/pkg/proto/codes.go b/pkg/proto/codes.go index e81ad1d..e489cfa 100644 --- a/pkg/proto/codes.go +++ b/pkg/proto/codes.go @@ -13,6 +13,7 @@ const ( MsgPut = "PUT" MsgGet = "GET" MsgDelete = "DEL" + MsgScan = "SCAN" MsgRespOK = "OK" MsgRespErr = "ERR" MsgHello = "HELLO" diff --git a/pkg/proto/scan.go b/pkg/proto/scan.go new file mode 100644 index 0000000..c602192 --- /dev/null +++ b/pkg/proto/scan.go @@ -0,0 +1,136 @@ +package proto + +import ( + "encoding/binary" + "fmt" + + "github.com/NeverENG/BanDB/pkg/predicate" +) + +// SCAN 请求负载布局(小端): +// +// [startLen u32][endLen u32][fieldLen u32][op u8][start][end][field][operand=剩余] +// +// SCAN 响应负载布局: +// +// [statusLen u8][status][count u32]{ [keyLen u32][key][valueLen u32][value] }×count +const scanReqHeaderLen = 13 // 3×u32 + 1×u8 + +// ScanRequest 是一次边缘范围查询的参数。Start/End 为空表示该侧不限。 +type ScanRequest struct { + Start []byte + End []byte + Pred predicate.Predicate +} + +// ScanEntry 是一条命中结果。 +type ScanEntry struct { + Key []byte + Value []byte +} + +// EncodeScanRequest 编码 SCAN 请求负载。 +func EncodeScanRequest(r ScanRequest) []byte { + field := []byte(r.Pred.Field) + operand := []byte(r.Pred.Operand) + + buf := make([]byte, scanReqHeaderLen+len(r.Start)+len(r.End)+len(field)+len(operand)) + binary.LittleEndian.PutUint32(buf[0:4], uint32(len(r.Start))) + binary.LittleEndian.PutUint32(buf[4:8], uint32(len(r.End))) + binary.LittleEndian.PutUint32(buf[8:12], uint32(len(field))) + buf[12] = byte(r.Pred.Op) + + off := scanReqHeaderLen + off += copy(buf[off:], r.Start) + off += copy(buf[off:], r.End) + off += copy(buf[off:], field) + copy(buf[off:], operand) + return buf +} + +// DecodeScanRequest 解码 SCAN 请求负载。 +func DecodeScanRequest(data []byte) (ScanRequest, error) { + if len(data) < scanReqHeaderLen { + return ScanRequest{}, fmt.Errorf("scan request too short: %d", len(data)) + } + startLen := int(binary.LittleEndian.Uint32(data[0:4])) + endLen := int(binary.LittleEndian.Uint32(data[4:8])) + fieldLen := int(binary.LittleEndian.Uint32(data[8:12])) + op := predicate.Op(data[12]) + + off := scanReqHeaderLen + if startLen < 0 || endLen < 0 || fieldLen < 0 || off+startLen+endLen+fieldLen > len(data) { + return ScanRequest{}, fmt.Errorf("scan request length fields exceed payload") + } + start := data[off : off+startLen] + off += startLen + end := data[off : off+endLen] + off += endLen + field := data[off : off+fieldLen] + off += fieldLen + operand := data[off:] + + return ScanRequest{ + Start: start, + End: end, + Pred: predicate.Predicate{Field: string(field), Op: op, Operand: string(operand)}, + }, nil +} + +// EncodeScanResponse 编码 SCAN 响应负载。 +func EncodeScanResponse(status string, entries []ScanEntry) []byte { + size := 1 + len(status) + 4 + for _, e := range entries { + size += 8 + len(e.Key) + len(e.Value) + } + buf := make([]byte, size) + buf[0] = byte(len(status)) + off := 1 + off += copy(buf[off:], status) + binary.LittleEndian.PutUint32(buf[off:off+4], uint32(len(entries))) + off += 4 + for _, e := range entries { + binary.LittleEndian.PutUint32(buf[off:off+4], uint32(len(e.Key))) + off += 4 + binary.LittleEndian.PutUint32(buf[off:off+4], uint32(len(e.Value))) + off += 4 + off += copy(buf[off:], e.Key) + off += copy(buf[off:], e.Value) + } + return buf +} + +// DecodeScanResponse 解码 SCAN 响应负载,返回状态与命中条目。 +func DecodeScanResponse(payload []byte) (status string, entries []ScanEntry, err error) { + if len(payload) < 1 { + return "", nil, fmt.Errorf("scan response empty") + } + statusLen := int(payload[0]) + off := 1 + if off+statusLen+4 > len(payload) { + return "", nil, fmt.Errorf("scan response too short for status+count") + } + status = string(payload[off : off+statusLen]) + off += statusLen + count := int(binary.LittleEndian.Uint32(payload[off : off+4])) + off += 4 + + entries = make([]ScanEntry, 0, count) + for i := 0; i < count; i++ { + if off+8 > len(payload) { + return "", nil, fmt.Errorf("scan response truncated at entry %d header", i) + } + keyLen := int(binary.LittleEndian.Uint32(payload[off : off+4])) + valueLen := int(binary.LittleEndian.Uint32(payload[off+4 : off+8])) + off += 8 + if keyLen < 0 || valueLen < 0 || off+keyLen+valueLen > len(payload) { + return "", nil, fmt.Errorf("scan response truncated at entry %d body", i) + } + key := payload[off : off+keyLen] + off += keyLen + value := payload[off : off+valueLen] + off += valueLen + entries = append(entries, ScanEntry{Key: key, Value: value}) + } + return status, entries, nil +} diff --git a/pkg/proto/scan_test.go b/pkg/proto/scan_test.go new file mode 100644 index 0000000..372e658 --- /dev/null +++ b/pkg/proto/scan_test.go @@ -0,0 +1,64 @@ +package proto + +import ( + "bytes" + "testing" + + "github.com/NeverENG/BanDB/pkg/predicate" +) + +func TestScanRequestRoundTrip(t *testing.T) { + cases := []ScanRequest{ + {Start: []byte("imu:dev0:100"), End: []byte("imu:dev0:200"), + Pred: predicate.Predicate{Field: "az", Op: predicate.OpGT, Operand: "9.9"}}, + {Start: nil, End: nil, Pred: predicate.Predicate{Op: predicate.OpNone}}, // 全量无谓词 + {Start: []byte("a"), End: nil, Pred: predicate.Predicate{Field: "s", Op: predicate.OpEQ, Operand: "ok"}}, + } + for _, want := range cases { + got, err := DecodeScanRequest(EncodeScanRequest(want)) + if err != nil { + t.Fatalf("decode 失败: %v", err) + } + if !bytes.Equal(got.Start, want.Start) || !bytes.Equal(got.End, want.End) { + t.Fatalf("范围不符: got start=%q end=%q", got.Start, got.End) + } + if got.Pred != want.Pred { + t.Fatalf("谓词不符: got %+v want %+v", got.Pred, want.Pred) + } + } +} + +func TestDecodeScanRequest_Truncated(t *testing.T) { + if _, err := DecodeScanRequest([]byte{1, 2, 3}); err == nil { + t.Fatal("过短负载应报错") + } +} + +func TestScanResponseRoundTrip(t *testing.T) { + want := []ScanEntry{ + {Key: []byte("imu:dev0:101"), Value: []byte(`{"az":9.91}`)}, + {Key: []byte("imu:dev0:150"), Value: []byte(`{"az":10.2}`)}, + } + status, got, err := DecodeScanResponse(EncodeScanResponse(StatusOK, want)) + if err != nil { + t.Fatalf("decode 失败: %v", err) + } + if status != StatusOK { + t.Fatalf("status=%q", status) + } + if len(got) != len(want) { + t.Fatalf("条目数 %d,期望 %d", len(got), len(want)) + } + for i := range want { + if !bytes.Equal(got[i].Key, want[i].Key) || !bytes.Equal(got[i].Value, want[i].Value) { + t.Fatalf("条目 %d 不符: %q=%q", i, got[i].Key, got[i].Value) + } + } +} + +func TestScanResponseEmpty(t *testing.T) { + status, got, err := DecodeScanResponse(EncodeScanResponse(StatusOK, nil)) + if err != nil || status != StatusOK || len(got) != 0 { + t.Fatalf("空结果应正常: status=%q n=%d err=%v", status, len(got), err) + } +} diff --git a/service/fsm.go b/service/fsm.go index a32b4d3..2f55ea5 100644 --- a/service/fsm.go +++ b/service/fsm.go @@ -8,6 +8,8 @@ import ( "github.com/NeverENG/BanDB/Raft" "github.com/NeverENG/BanDB/config" + "github.com/NeverENG/BanDB/pkg/predicate" + "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/storage" "github.com/NeverENG/BanDB/storage/istorage" "github.com/NeverENG/BanDB/storage/zstorage" @@ -168,6 +170,31 @@ func (k *KVServer) Get(key []byte) ([]byte, error) { return value, err } +// maxScanResults 限制单次扫描返回条目数,防止无谓词大范围扫描撑爆内存。 +const maxScanResults = 10000 + +// Scan 在 [start,end] 闭区间扫描 MemTable 热数据,对满足谓词的条目收集 key/value +// 拷贝后返回(只回传命中切片)。底层切片归 MemTable 所有,故必须拷贝。 +// 达到上限时截断并告警。 +func (k *KVServer) Scan(start, end []byte, pred predicate.Predicate) []proto.ScanEntry { + out := make([]proto.ScanEntry, 0) + k.storage.Scan(start, end, func(key, value []byte) bool { + if !pred.Eval(value) { + return true + } + out = append(out, proto.ScanEntry{ + Key: append([]byte(nil), key...), + Value: append([]byte(nil), value...), + }) + if len(out) >= maxScanResults { + slog.Warn("[WARN] scan: 结果达到上限,已截断", "cap", maxScanResults) + return false + } + return true + }) + return out +} + /* Put 直接写入存储(仅用于测试,生产环境应通过 Raft 写入) func (k *KVServer) Put(key []byte, value []byte) error { return k.storage.Put(key, value) diff --git a/service/router.go b/service/router.go index e781c75..88fe210 100644 --- a/service/router.go +++ b/service/router.go @@ -61,6 +61,8 @@ func (r *Router) Handle(request banIface.IRequest) { r.handleGet(data, request) case proto.MsgDelete: r.handleDelete(data, request) + case proto.MsgScan: + r.handleScan(data, request) } } @@ -184,6 +186,20 @@ func (r *Router) handleDelete(data []byte, request banIface.IRequest) { sendOK(request) } +// handleScan 处理 SCAN 边缘范围查询:解码范围+谓词,服务端筛选后只回传命中切片。 +func (r *Router) handleScan(data []byte, request banIface.IRequest) { + req, err := proto.DecodeScanRequest(data) + if err != nil { + slog.Warn("[WARN] handleScan: decode failed", "error", err) + sendErr(request) + return + } + + metrics.Scans.Add(1) + entries := r.kv.Scan(req.Start, req.End, req.Pred) + request.GetConnection().SendBuffMsg(proto.MsgRespOK, proto.EncodeScanResponse(proto.StatusOK, entries)) +} + // PostHandle 后置处理 func (r *Router) PostHandle(request banIface.IRequest) { if r.postHandleFunc != nil { From a500d1f5c3071e7a99f43f0c0a7ea8e7ec2094d4 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:18:03 +0800 Subject: [PATCH 4/5] =?UTF-8?q?feat(client):=20scan=20=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E2=80=94=E2=80=94=E4=BA=A4=E4=BA=92=E5=BC=8F=E8=BE=B9=E7=BC=98?= =?UTF-8?q?=E8=8C=83=E5=9B=B4=E6=9F=A5=E8=AF=A2=20+=20=E7=AB=AF=E5=88=B0?= =?UTF-8?q?=E7=AB=AF=E9=9B=86=E6=88=90=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - client.SendScan 编解码 SCAN 请求/响应 - 交互式 scan [field op operand],- 表示该侧不限 - service 集成测试:写 IMU 帧→范围+谓词扫描→只回命中、越界设备排除 Co-Authored-By: Claude Opus 4.8 (1M context) --- client/client.go | 23 ++++++++++ client/interactive.go | 73 +++++++++++++++++++++++++++++++- service/scan_integration_test.go | 66 +++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 service/scan_integration_test.go diff --git a/client/client.go b/client/client.go index a961230..5d5b478 100644 --- a/client/client.go +++ b/client/client.go @@ -132,6 +132,29 @@ func (c *Client) SendDelete(key []byte) error { return nil } +// SendScan 发送 SCAN 范围查询,返回命中条目。 +func (c *Client) SendScan(req proto.ScanRequest) ([]proto.ScanEntry, error) { + msg := utils.NewMessage2(proto.MsgScan, proto.EncodeScanRequest(req)) + + if err := c.send(msg); err != nil { + return nil, fmt.Errorf("failed to send SCAN request: %v", err) + } + + _, payload, err := c.readResponse() + if err != nil { + return nil, fmt.Errorf("failed to read response: %v", err) + } + + status, entries, err := proto.DecodeScanResponse(payload) + if err != nil { + return nil, err + } + if status != proto.StatusOK { + return nil, fmt.Errorf("server error") + } + return entries, nil +} + // send 打包并写出一条消息 func (c *Client) send(msg *utils.Message) error { dp := banNet.NewDataPack() diff --git a/client/interactive.go b/client/interactive.go index 635fda8..4cb1210 100644 --- a/client/interactive.go +++ b/client/interactive.go @@ -5,6 +5,9 @@ import ( "fmt" "os" "strings" + + "github.com/NeverENG/BanDB/pkg/predicate" + "github.com/NeverENG/BanDB/pkg/proto" ) // InteractiveClient 交互式客户端 @@ -25,7 +28,7 @@ func NewInteractiveClient(addr string) (*InteractiveClient, error) { fmt.Printf("已连接到 %s\n", addr) fmt.Println("输入命令进行操作,输入 'quit' 或 'exit' 退出") - fmt.Println("支持命令: put , get , delete ") + fmt.Println("支持命令: put , get , delete , scan [field op operand]") fmt.Println() return &InteractiveClient{ @@ -88,6 +91,8 @@ func (ic *InteractiveClient) executeCommand(line string) { ic.handleGet(parts) case "delete": ic.handleDelete(parts) + case "scan": + ic.handleScan(parts) case "help": ic.showHelp() default: @@ -151,6 +156,71 @@ func (ic *InteractiveClient) handleDelete(parts []string) { fmt.Println("✅ OK") } +// handleScan 处理 SCAN 命令: scan [field op operand] +func (ic *InteractiveClient) handleScan(parts []string) { + if len(parts) < 3 { + fmt.Println("用法: scan [field op operand]") + fmt.Println("示例: scan imu:dev0:100 imu:dev0:200 az > 9.9") + return + } + + req := proto.ScanRequest{ + Start: dashToEmpty(parts[1]), + End: dashToEmpty(parts[2]), + Pred: predicate.Predicate{Op: predicate.OpNone}, + } + + if len(parts) >= 6 { + op, ok := parseOp(parts[4]) + if !ok { + fmt.Printf("不支持的算子: %s (支持 > >= < <= == !=)\n", parts[4]) + return + } + req.Pred = predicate.Predicate{Field: parts[3], Op: op, Operand: strings.Join(parts[5:], " ")} + } else if len(parts) > 3 { + fmt.Println("谓词需 3 段: field op operand,例: az > 9.9") + return + } + + entries, err := ic.client.SendScan(req) + if err != nil { + fmt.Printf("❌ 错误: %v\n", err) + return + } + + fmt.Printf("命中 %d 条:\n", len(entries)) + for _, e := range entries { + fmt.Printf(" %s = %s\n", string(e.Key), string(e.Value)) + } +} + +// dashToEmpty 把 "-" 视为不限边界(空),否则返回原值字节。 +func dashToEmpty(s string) []byte { + if s == "-" { + return nil + } + return []byte(s) +} + +// parseOp 把算子符号映射为 predicate.Op。 +func parseOp(s string) (predicate.Op, bool) { + switch s { + case ">": + return predicate.OpGT, true + case ">=": + return predicate.OpGTE, true + case "<": + return predicate.OpLT, true + case "<=": + return predicate.OpLTE, true + case "==": + return predicate.OpEQ, true + case "!=": + return predicate.OpNE, true + } + return predicate.OpNone, false +} + // showHelp 显示帮助信息 func (ic *InteractiveClient) showHelp() { fmt.Println("\n=== BanKV 客户端帮助 ===") @@ -158,6 +228,7 @@ func (ic *InteractiveClient) showHelp() { fmt.Println(" put - 存储键值对") fmt.Println(" get - 获取值") fmt.Println(" delete - 删除键") + fmt.Println(" scan [field op operand] - 范围查询(只回传命中切片)") fmt.Println(" help - 显示此帮助信息") fmt.Println(" quit/exit - 退出客户端") fmt.Println() diff --git a/service/scan_integration_test.go b/service/scan_integration_test.go new file mode 100644 index 0000000..e0f07d6 --- /dev/null +++ b/service/scan_integration_test.go @@ -0,0 +1,66 @@ +package service + +import ( + "path/filepath" + "testing" + + "github.com/NeverENG/BanDB/config" + "github.com/NeverENG/BanDB/pkg/predicate" +) + +// TestKVServer_Scan 端到端验证边缘查询:写入若干 IMU 帧后,按时间范围 + 谓词扫描, +// 只返回命中切片(谓词下推),且越界设备帧被范围排除。 +func TestKVServer_Scan(t *testing.T) { + oldWALPath := config.G.WALPath + oldSSTablePath := config.G.SSTablePath + oldMode := config.G.Mode + oldMaxSize := config.G.MaxMemTableSize + + dir := t.TempDir() + config.G.Mode = config.ModeStandalone + config.G.WALPath = filepath.Join(dir, "wal.log") + config.G.SSTablePath = dir + config.G.MaxMemTableSize = 1 << 20 // 足够大,确保数据留在 MemTable 热窗口 + defer func() { + config.G.WALPath = oldWALPath + config.G.SSTablePath = oldSSTablePath + config.G.Mode = oldMode + config.G.MaxMemTableSize = oldMaxSize + }() + + kv := NewKVServer() + defer kv.wal.Close() + + frames := []Command{ + {Type: "Put", Key: []byte("imu:dev0:100"), Value: []byte(`{"az":9.8}`)}, + {Type: "Put", Key: []byte("imu:dev0:150"), Value: []byte(`{"az":9.95}`)}, // 命中 + {Type: "Put", Key: []byte("imu:dev0:200"), Value: []byte(`{"az":10.2}`)}, // 命中 + {Type: "Put", Key: []byte("imu:dev0:250"), Value: []byte(`{"az":9.0}`)}, + {Type: "Put", Key: []byte("imu:dev1:150"), Value: []byte(`{"az":11}`)}, // 设备越界 + } + for _, c := range frames { + if err := kv.Write(c); err != nil { + t.Fatalf("write %s: %v", c.Key, err) + } + } + + pred := predicate.Predicate{Field: "az", Op: predicate.OpGT, Operand: "9.9"} + got := kv.Scan([]byte("imu:dev0:100"), []byte("imu:dev0:299"), pred) + + want := map[string]string{ + "imu:dev0:150": `{"az":9.95}`, + "imu:dev0:200": `{"az":10.2}`, + } + if len(got) != len(want) { + t.Fatalf("命中数 %d,期望 %d: %+v", len(got), len(want), got) + } + for _, e := range got { + w, ok := want[string(e.Key)] + if !ok { + t.Fatalf("意外命中 %s(谓词或范围未正确下推)", e.Key) + } + if string(e.Value) != w { + t.Fatalf("%s 值不符: %s", e.Key, e.Value) + } + } +} From 83ad15be996656af602f22eb79bed06bd9452e6a Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:32:42 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix(proto):=20MaxPackageSize=201024?= =?UTF-8?q?=E2=86=9216MiB=EF=BC=8C=E8=A7=A3=E9=94=81=E5=A4=9A=E6=9D=A1=20S?= =?UTF-8?q?CAN=20=E5=93=8D=E5=BA=94=E4=B8=8E=E5=A4=A7=E5=80=BC=20PUT=20?= =?UTF-8?q?=E8=B7=A8=E7=BD=91=E7=BB=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1024B 上限会让客户端 UnPack 拒收多条 SCAN 响应,也使 >1KB 的相机帧无法 PUT (UnPack 对收发双方都生效)。提到 16MiB 容纳多模态大值与范围查询结果。 新增 banNet 网络缝回归测试:200 条命中的响应需完整通过 Pack→UnPack→解码。 Co-Authored-By: Claude Opus 4.8 (1M context) --- config/config.json | 2 +- config/global.go | 2 +- config/global_test.go | 2 +- network/banNet/wire_scan_test.go | 56 ++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 network/banNet/wire_scan_test.go diff --git a/config/config.json b/config/config.json index 053ef3d..c5fff4f 100644 --- a/config/config.json +++ b/config/config.json @@ -7,7 +7,7 @@ "Port": 8080, "Name": "BanDB", "MaxConn": 100, - "MaxPackageSize": 1024, + "MaxPackageSize": 16777216, "WorkPoolSize": 100, "MaxWorkPoolTaskLen": 100, "MaxMsgChanLen": 100, diff --git a/config/global.go b/config/global.go index 309faf6..06d7293 100644 --- a/config/global.go +++ b/config/global.go @@ -100,7 +100,7 @@ func NewGlobalConfig() *GlobalConfig { Host: "localhost", Version: "1.0.0", MaxConn: 1000, - MaxPackageSize: 1024, + MaxPackageSize: 16 << 20, // 16MiB:容纳多模态大值(相机帧)与多条 SCAN 响应 WorkerPoolSize: 10, MaxWorkerTaskLen: 10000, MaxMsgChanLen: 100, diff --git a/config/global_test.go b/config/global_test.go index d9b0dd1..6a9434f 100644 --- a/config/global_test.go +++ b/config/global_test.go @@ -111,7 +111,7 @@ func TestDefaultConfigValues(t *testing.T) { value interface{} }{ {"Version", "1.0.0"}, - {"MaxPackageSize", uint32(1024)}, + {"MaxPackageSize", uint32(16 << 20)}, {"WorkerPoolSize", uint32(10)}, {"MaxWorkerTaskLen", uint32(10000)}, {"MaxMsgChanLen", uint32(100)}, diff --git a/network/banNet/wire_scan_test.go b/network/banNet/wire_scan_test.go new file mode 100644 index 0000000..f37c624 --- /dev/null +++ b/network/banNet/wire_scan_test.go @@ -0,0 +1,56 @@ +package banNet_test + +import ( + "fmt" + "testing" + + "github.com/NeverENG/BanDB/network/banNet" + "github.com/NeverENG/BanDB/pkg/proto" +) + +// TestScanResponseSurvivesWire 复现并守护 SCAN 的网络缝:一个多条命中的响应 +// 必须能完整通过「服务端 Pack → 客户端 UnPack(受 MaxPackageSize 限制)→ 解码」, +// 不再被旧的 1024B 上限拒绝。该路径正是单元测试绕过、却最易出错的地方。 +func TestScanResponseSurvivesWire(t *testing.T) { + entries := make([]proto.ScanEntry, 0, 200) + for i := 0; i < 200; i++ { + entries = append(entries, proto.ScanEntry{ + Key: []byte(fmt.Sprintf("imu:dev0:%06d", i)), + Value: []byte(`{"az":9.95,"ax":0.01,"ay":9.8}`), + }) + } + payload := proto.EncodeScanResponse(proto.StatusOK, entries) + if len(payload) <= 1024 { + t.Fatalf("测试前提失效:响应应远超 1024B,实际 %d", len(payload)) + } + + dp := banNet.NewDataPack() + + // 服务端 SendBuffMsg 的打包路径。 + packet, err := dp.Pack(banNet.NewMessage(proto.MsgRespOK, payload)) + if err != nil { + t.Fatalf("Pack 失败: %v", err) + } + + // 客户端 readResponse 的读取路径:先解头部(此处触发 MaxPackageSize 检查)。 + headLen := int(dp.GetHeadLen()) + if len(packet) < headLen { + t.Fatalf("packet 过短") + } + tempMsg, err := dp.UnPack(packet[:headLen]) + if err != nil { + t.Fatalf("UnPack 拒绝了大响应(MaxPackageSize 太小?): %v", err) + } + m := tempMsg.(*banNet.Message) + + off := headLen + int(m.IDLen) + data := packet[off : off+int(tempMsg.GetMsgLen())] + + status, got, err := proto.DecodeScanResponse(data) + if err != nil { + t.Fatalf("DecodeScanResponse 失败: %v", err) + } + if status != proto.StatusOK || len(got) != len(entries) { + t.Fatalf("跨网络往返不一致: status=%q n=%d 期望 %d", status, len(got), len(entries)) + } +}