Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
server.AddRouter(proto.MsgPut, router)
server.AddRouter(proto.MsgGet, router)
server.AddRouter(proto.MsgDelete, router)
server.AddRouter(proto.MsgScan, router)

// 注册连接生命周期回调
server.SetConnStartFunc(router.OnConnStart)
Expand Down
1 change: 1 addition & 0 deletions Server/server_pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func main() {
server.AddRouter(proto.MsgPut, router)
server.AddRouter(proto.MsgGet, router)
server.AddRouter(proto.MsgDelete, router)
server.AddRouter(proto.MsgScan, router)

// 注册连接生命周期回调
server.SetConnStartFunc(router.OnConnStart)
Expand Down
23 changes: 23 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,29 @@ func (c *Client) SendDelete(key []byte) error {
return nil
}

// SendScan 发送 SCAN 范围查询,返回命中条目。
func (c *Client) SendScan(req proto.ScanRequest) ([]proto.ScanEntry, error) {
msg := utils.NewMessage2(proto.MsgScan, proto.EncodeScanRequest(req))

if err := c.send(msg); err != nil {
return nil, fmt.Errorf("failed to send SCAN request: %v", err)
}

_, payload, err := c.readResponse()
if err != nil {
return nil, fmt.Errorf("failed to read response: %v", err)
}

status, entries, err := proto.DecodeScanResponse(payload)
if err != nil {
return nil, err
}
if status != proto.StatusOK {
return nil, fmt.Errorf("server error")
}
return entries, nil
}

// send 打包并写出一条消息
func (c *Client) send(msg *utils.Message) error {
dp := banNet.NewDataPack()
Expand Down
73 changes: 72 additions & 1 deletion client/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"os"
"strings"

"github.com/NeverENG/BanDB/pkg/predicate"
"github.com/NeverENG/BanDB/pkg/proto"
)

// InteractiveClient 交互式客户端
Expand All @@ -25,7 +28,7 @@ func NewInteractiveClient(addr string) (*InteractiveClient, error) {

fmt.Printf("已连接到 %s\n", addr)
fmt.Println("输入命令进行操作,输入 'quit' 或 'exit' 退出")
fmt.Println("支持命令: put <key> <value>, get <key>, delete <key>")
fmt.Println("支持命令: put <key> <value>, get <key>, delete <key>, scan <start|-> <end|-> [field op operand]")
fmt.Println()

return &InteractiveClient{
Expand Down Expand Up @@ -88,6 +91,8 @@ func (ic *InteractiveClient) executeCommand(line string) {
ic.handleGet(parts)
case "delete":
ic.handleDelete(parts)
case "scan":
ic.handleScan(parts)
case "help":
ic.showHelp()
default:
Expand Down Expand Up @@ -151,13 +156,79 @@ func (ic *InteractiveClient) handleDelete(parts []string) {
fmt.Println("✅ OK")
}

// handleScan 处理 SCAN 命令: scan <start|-> <end|-> [field op operand]
func (ic *InteractiveClient) handleScan(parts []string) {
if len(parts) < 3 {
fmt.Println("用法: scan <start|-> <end|-> [field op operand]")
fmt.Println("示例: scan imu:dev0:100 imu:dev0:200 az > 9.9")
return
}

req := proto.ScanRequest{
Start: dashToEmpty(parts[1]),
End: dashToEmpty(parts[2]),
Pred: predicate.Predicate{Op: predicate.OpNone},
}

if len(parts) >= 6 {
op, ok := parseOp(parts[4])
if !ok {
fmt.Printf("不支持的算子: %s (支持 > >= < <= == !=)\n", parts[4])
return
}
req.Pred = predicate.Predicate{Field: parts[3], Op: op, Operand: strings.Join(parts[5:], " ")}
} else if len(parts) > 3 {
fmt.Println("谓词需 3 段: field op operand,例: az > 9.9")
return
}

entries, err := ic.client.SendScan(req)
if err != nil {
fmt.Printf("❌ 错误: %v\n", err)
return
}

fmt.Printf("命中 %d 条:\n", len(entries))
for _, e := range entries {
fmt.Printf(" %s = %s\n", string(e.Key), string(e.Value))
}
}

// dashToEmpty 把 "-" 视为不限边界(空),否则返回原值字节。
func dashToEmpty(s string) []byte {
if s == "-" {
return nil
}
return []byte(s)
}

// parseOp 把算子符号映射为 predicate.Op。
func parseOp(s string) (predicate.Op, bool) {
switch s {
case ">":
return predicate.OpGT, true
case ">=":
return predicate.OpGTE, true
case "<":
return predicate.OpLT, true
case "<=":
return predicate.OpLTE, true
case "==":
return predicate.OpEQ, true
case "!=":
return predicate.OpNE, true
}
return predicate.OpNone, false
}

// showHelp 显示帮助信息
func (ic *InteractiveClient) showHelp() {
fmt.Println("\n=== BanKV 客户端帮助 ===")
fmt.Println("命令:")
fmt.Println(" put <key> <value> - 存储键值对")
fmt.Println(" get <key> - 获取值")
fmt.Println(" delete <key> - 删除键")
fmt.Println(" scan <start|-> <end|-> [field op operand] - 范围查询(只回传命中切片)")
fmt.Println(" help - 显示此帮助信息")
fmt.Println(" quit/exit - 退出客户端")
fmt.Println()
Expand Down
2 changes: 1 addition & 1 deletion config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"Port": 8080,
"Name": "BanDB",
"MaxConn": 100,
"MaxPackageSize": 1024,
"MaxPackageSize": 16777216,
"WorkPoolSize": 100,
"MaxWorkPoolTaskLen": 100,
"MaxMsgChanLen": 100,
Expand Down
2 changes: 1 addition & 1 deletion config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewGlobalConfig() *GlobalConfig {
Host: "localhost",
Version: "1.0.0",
MaxConn: 1000,
MaxPackageSize: 1024,
MaxPackageSize: 16 << 20, // 16MiB:容纳多模态大值(相机帧)与多条 SCAN 响应
WorkerPoolSize: 10,
MaxWorkerTaskLen: 10000,
MaxMsgChanLen: 100,
Expand Down
2 changes: 1 addition & 1 deletion config/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestDefaultConfigValues(t *testing.T) {
value interface{}
}{
{"Version", "1.0.0"},
{"MaxPackageSize", uint32(1024)},
{"MaxPackageSize", uint32(16 << 20)},
{"WorkerPoolSize", uint32(10)},
{"MaxWorkerTaskLen", uint32(10000)},
{"MaxMsgChanLen", uint32(100)},
Expand Down
56 changes: 56 additions & 0 deletions network/banNet/wire_scan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package banNet_test

import (
"fmt"
"testing"

"github.com/NeverENG/BanDB/network/banNet"
"github.com/NeverENG/BanDB/pkg/proto"
)

// TestScanResponseSurvivesWire 复现并守护 SCAN 的网络缝:一个多条命中的响应
// 必须能完整通过「服务端 Pack → 客户端 UnPack(受 MaxPackageSize 限制)→ 解码」,
// 不再被旧的 1024B 上限拒绝。该路径正是单元测试绕过、却最易出错的地方。
func TestScanResponseSurvivesWire(t *testing.T) {
entries := make([]proto.ScanEntry, 0, 200)
for i := 0; i < 200; i++ {
entries = append(entries, proto.ScanEntry{
Key: []byte(fmt.Sprintf("imu:dev0:%06d", i)),
Value: []byte(`{"az":9.95,"ax":0.01,"ay":9.8}`),
})
}
payload := proto.EncodeScanResponse(proto.StatusOK, entries)
if len(payload) <= 1024 {
t.Fatalf("测试前提失效:响应应远超 1024B,实际 %d", len(payload))
}

dp := banNet.NewDataPack()

// 服务端 SendBuffMsg 的打包路径。
packet, err := dp.Pack(banNet.NewMessage(proto.MsgRespOK, payload))
if err != nil {
t.Fatalf("Pack 失败: %v", err)
}

// 客户端 readResponse 的读取路径:先解头部(此处触发 MaxPackageSize 检查)。
headLen := int(dp.GetHeadLen())
if len(packet) < headLen {
t.Fatalf("packet 过短")
}
tempMsg, err := dp.UnPack(packet[:headLen])
if err != nil {
t.Fatalf("UnPack 拒绝了大响应(MaxPackageSize 太小?): %v", err)
}
m := tempMsg.(*banNet.Message)

off := headLen + int(m.IDLen)
data := packet[off : off+int(tempMsg.GetMsgLen())]

status, got, err := proto.DecodeScanResponse(data)
if err != nil {
t.Fatalf("DecodeScanResponse 失败: %v", err)
}
if status != proto.StatusOK || len(got) != len(entries) {
t.Fatalf("跨网络往返不一致: status=%q n=%d 期望 %d", status, len(got), len(entries))
}
}
4 changes: 4 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
FramesDroppedNonMonotonic atomic.Int64 // 被钩子按「时间戳回退/重放」丢弃
Writes atomic.Int64 // 成功写入(PUT)次数
Reads atomic.Int64 // 读取(GET)次数
Scans atomic.Int64 // 边缘范围查询(SCAN)次数
Deletes atomic.Int64 // 成功删除(DEL)次数
WriteErrors atomic.Int64 // 写入/删除失败次数
BackpressureStalls atomic.Int64 // 写入被字节信用背压阻塞的次数
Expand Down Expand Up @@ -54,6 +55,7 @@ type Snapshot struct {
DroppedNonMonotonic int64
Writes int64
Reads int64
Scans int64
Deletes int64
WriteErrors int64
BackpressureStalls int64
Expand All @@ -69,6 +71,7 @@ func Take() Snapshot {
DroppedNonMonotonic: FramesDroppedNonMonotonic.Load(),
Writes: Writes.Load(),
Reads: Reads.Load(),
Scans: Scans.Load(),
Deletes: Deletes.Load(),
WriteErrors: WriteErrors.Load(),
BackpressureStalls: BackpressureStalls.Load(),
Expand All @@ -86,6 +89,7 @@ func LogSnapshot() {
"dropped_non_monotonic", s.DroppedNonMonotonic,
"writes", s.Writes,
"reads", s.Reads,
"scans", s.Scans,
"deletes", s.Deletes,
"write_errors", s.WriteErrors,
"backpressure_stalls", s.BackpressureStalls,
Expand Down
99 changes: 99 additions & 0 deletions pkg/predicate/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Package predicate 提供一个最小的字段谓词,用于边缘查询的服务端下推:
// 对 JSON 值取出指定字段,与操作数按算子比较,只让命中的行通过——
// 从而「只回传命中切片」而非整段原始流。
//
// 设计刻意从简:单字段、单算子、单操作数(如 az > 9.9 / status == ok),
// 覆盖具身采集的常见查询,不引入完整表达式语言。
package predicate

import (
"encoding/json"
"strconv"
)

// Op 是谓词算子。
type Op uint8

const (
OpNone Op = iota // 无谓词:全匹配
OpGT // >
OpGTE // >=
OpLT // <
OpLTE // <=
OpEQ // ==
OpNE // !=
)

// Predicate 表示「Field Op Operand」。Op==OpNone 时 Eval 恒为真。
type Predicate struct {
Field string
Op Op
Operand string // 文本操作数:有序比较按 float64 解析,相等比较按字符串
}

// Eval 报告 value(JSON 对象)是否满足谓词。
// 无谓词恒真;value 非 JSON、字段缺失、或类型无法比较时返回 false。
func (p Predicate) Eval(value []byte) bool {
if p.Op == OpNone {
return true
}

var obj map[string]json.RawMessage
if err := json.Unmarshal(value, &obj); err != nil {
return false
}
raw, ok := obj[p.Field]
if !ok {
return false
}

switch p.Op {
case OpGT, OpGTE, OpLT, OpLTE:
fv, ok1 := asFloat(raw)
ov, err := strconv.ParseFloat(p.Operand, 64)
if !ok1 || err != nil {
return false
}
switch p.Op {
case OpGT:
return fv > ov
case OpGTE:
return fv >= ov
case OpLT:
return fv < ov
default: // OpLTE
return fv <= ov
}
case OpEQ:
return asScalar(raw) == p.Operand
case OpNE:
return asScalar(raw) != p.Operand
default:
return false
}
}

// asFloat 把字段原始 JSON 解析为 float64:直接数字或带引号的数字串都接受。
func asFloat(raw json.RawMessage) (float64, bool) {
var f float64
if err := json.Unmarshal(raw, &f); err == nil {
return f, true
}
var s string
if err := json.Unmarshal(raw, &s); err == nil {
if f, err := strconv.ParseFloat(s, 64); err == nil {
return f, true
}
}
return 0, false
}

// asScalar 把字段原始 JSON 归一成可比较的标量字符串:
// JSON 字符串去引号,其余(数字/布尔)按原文。
func asScalar(raw json.RawMessage) string {
var s string
if err := json.Unmarshal(raw, &s); err == nil {
return s
}
return string(raw)
}
Loading
Loading