From 7d2fbafb9c3e695b877db5a32453f9d0a89a29e8 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 7 Jun 2026 17:36:32 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat(config):=20=E6=96=B0=E5=A2=9E=20Mode?= =?UTF-8?q?=20=E8=BF=90=E8=A1=8C=E6=A8=A1=E5=BC=8F=EF=BC=88standalone/raft?= =?UTF-8?q?=EF=BC=89=EF=BC=8Cstandalone=20=E8=B7=B3=E8=BF=87=20Raft=20?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- config/config.json | 1 + config/global.go | 37 ++++++++++++++++++++++++++++++++++++- config/global_test.go | 10 +++++++--- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/config/config.json b/config/config.json index afec85c..053ef3d 100644 --- a/config/config.json +++ b/config/config.json @@ -1,4 +1,5 @@ { + "Mode": "standalone", "MaxMemTableSize": 10000, "WALPath": "../../log/wal.log", "SSTablePath": "../../log", diff --git a/config/global.go b/config/global.go index fc3fa01..309faf6 100644 --- a/config/global.go +++ b/config/global.go @@ -12,6 +12,12 @@ import ( "github.com/NeverENG/BanDB/network/banIface" ) +// 运行模式取值 +const ( + ModeStandalone = "standalone" // 单机:写经存储层 WAL,不启动 Raft + ModeRaft = "raft" // 集群:写经 Raft 日志 +) + type GlobalConfig struct { Name string Port int @@ -38,6 +44,10 @@ type GlobalConfig struct { MaxMemTableP float64 MaxMemTableLevel int + // Mode 运行模式:"standalone"(单机 WAL,不启动 Raft)或 "raft"(集群,写经 Raft 日志)。 + // 留空时按 len(Peers) 推断:1→standalone,>1→raft。 + Mode string + // Raft 集群配置 Peers []string // 集群中所有节点的地址 Me int // 当前节点在 Peers 中的索引(0-based) @@ -162,13 +172,38 @@ func (g *GlobalConfig) ParseFlags() { } } + g.resolveMode() + + // standalone 不启动 Raft,无需 Me/Peers 校验 + if g.Mode == ModeStandalone { + slog.Info("config finalized", "mode", g.Mode) + return + } + // 验证配置 if g.Me < 0 || g.Me >= len(g.Peers) { slog.Error("invalid me value", "me", g.Me, "peers_len", len(g.Peers)) panic("invalid me value") } - slog.Info("config finalized", "peers", g.Peers, "me", g.Me) + slog.Info("config finalized", "mode", g.Mode, "peers", g.Peers, "me", g.Me) +} + +// resolveMode 归一化运行模式:显式取值优先,留空时按 Peers 数量推断。 +func (g *GlobalConfig) resolveMode() { + switch g.Mode { + case ModeStandalone, ModeRaft: + // 显式指定,尊重之 + case "": + if len(g.Peers) > 1 { + g.Mode = ModeRaft + } else { + g.Mode = ModeStandalone + } + default: + slog.Error("invalid mode", "mode", g.Mode) + panic("invalid mode: " + g.Mode) + } } func meFlagArgs(args []string) []string { diff --git a/config/global_test.go b/config/global_test.go index aa223a1..d9b0dd1 100644 --- a/config/global_test.go +++ b/config/global_test.go @@ -188,13 +188,16 @@ func TestConfigInitWithFile(t *testing.T) { func TestPeerValidation(t *testing.T) { testCases := []struct { name string + mode string peers []string me int wantErr bool }{ - {"valid single node", []string{"localhost:8080"}, 0, false}, - {"valid multi node", []string{"localhost:8080", "localhost:8081", "localhost:8082"}, 1, false}, - {"invalid out of range", []string{"localhost:8080"}, 1, true}, + {"valid single node", ModeRaft, []string{"localhost:8080"}, 0, false}, + {"valid multi node", ModeRaft, []string{"localhost:8080", "localhost:8081", "localhost:8082"}, 1, false}, + {"invalid out of range", ModeRaft, []string{"localhost:8080"}, 1, true}, + // standalone 不启动 Raft,越界的 Me 不再触发校验 + {"standalone skips validation", ModeStandalone, []string{"localhost:8080"}, 1, false}, } for _, tc := range testCases { @@ -205,6 +208,7 @@ func TestPeerValidation(t *testing.T) { os.Args = []string{"test", "-me", strconv.Itoa(tc.me)} g := &GlobalConfig{ + Mode: tc.mode, Peers: tc.peers, } From ce49e3c409d1bed96c0a5ce829a521ff8db81519 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 7 Jun 2026 17:43:53 +0800 Subject: [PATCH 2/5] =?UTF-8?q?feat(storage):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E5=B1=82=20WAL=EF=BC=88append+fsync=E3=80=81?= =?UTF-8?q?=E5=B9=82=E7=AD=89=E9=87=8D=E6=94=BE=E3=80=81=E6=AE=8B=E7=BC=BA?= =?UTF-8?q?=E5=B0=BE=E9=83=A8=E6=8C=89=20EOF=20=E5=81=9C=E6=AD=A2=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- storage/wal.go | 103 +++++++++++++++++++++++++++++++++++ storage/wal_test.go | 128 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 storage/wal.go create mode 100644 storage/wal_test.go diff --git a/storage/wal.go b/storage/wal.go new file mode 100644 index 0000000..47506da --- /dev/null +++ b/storage/wal.go @@ -0,0 +1,103 @@ +package storage + +import ( + "bufio" + "encoding/binary" + "io" + "os" + "path/filepath" +) + +// WAL 操作码 +const ( + WALOpPut uint8 = 1 + WALOpDelete uint8 = 2 +) + +// WAL 存储层预写日志:standalone 模式下,写先 append + fsync 到此处再进 memtable, +// 提供单机崩溃恢复。记录格式 [op u8][klen u32][vlen u32][key][value](BigEndian)。 +// 与 Raft/raft_wal.go 一致:重放读到残缺尾部记录时直接停止(撕裂的尾写按 EOF 处理), +// 不使用 CRC。重放是幂等盲写(Put/Delete),未截断的 WAL 反复重放也安全。 +type WAL struct { + file *os.File + path string +} + +// NewWAL 打开(或创建)WAL 文件,以追加模式准备写入。 +func NewWAL(path string) (*WAL, error) { + if dir := filepath.Dir(path); dir != "" { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + } + f, err := os.OpenFile(path, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + return &WAL{file: f, path: path}, nil +} + +// Close 关闭底层文件。 +func (w *WAL) Close() error { + if w.file != nil { + return w.file.Close() + } + return nil +} + +// Append 追加一条记录并 fsync,确保返回时数据已落盘。 +// 删除墓碑用 op=WALOpDelete、value 传 nil。 +func (w *WAL) Append(op uint8, key, value []byte) error { + buf := make([]byte, 9, 9+len(key)+len(value)) + buf[0] = op + binary.BigEndian.PutUint32(buf[1:5], uint32(len(key))) + binary.BigEndian.PutUint32(buf[5:9], uint32(len(value))) + buf = append(buf, key...) + buf = append(buf, value...) + + if _, err := w.file.Write(buf); err != nil { + return err + } + return w.file.Sync() +} + +// Replay 从头读取全部记录,对每条调用 fn。读到残缺记录(撕裂尾写)即停止重放, +// 返回 nil;底层 IO 错误或 fn 返回错误则向上抛出。 +func (w *WAL) Replay(fn func(op uint8, key, value []byte) error) error { + f, err := os.Open(w.path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + r := bufio.NewReader(f) + var hdr [9]byte + for { + if _, err := io.ReadFull(r, hdr[:]); err != nil { + break // EOF 或残缺头部:正常结束 + } + op := hdr[0] + klen := binary.BigEndian.Uint32(hdr[1:5]) + vlen := binary.BigEndian.Uint32(hdr[5:9]) + + key := make([]byte, klen) + if _, err := io.ReadFull(r, key); err != nil { + break // 残缺尾写 + } + var value []byte + if vlen > 0 { + value = make([]byte, vlen) + if _, err := io.ReadFull(r, value); err != nil { + break // 残缺尾写 + } + } + + if err := fn(op, key, value); err != nil { + return err + } + } + return nil +} diff --git a/storage/wal_test.go b/storage/wal_test.go new file mode 100644 index 0000000..cbc1fc3 --- /dev/null +++ b/storage/wal_test.go @@ -0,0 +1,128 @@ +package storage + +import ( + "bytes" + "os" + "path/filepath" + "testing" +) + +type walRec struct { + op uint8 + key []byte + value []byte +} + +func replayAll(t *testing.T, w *WAL) []walRec { + t.Helper() + var recs []walRec + if err := w.Replay(func(op uint8, key, value []byte) error { + recs = append(recs, walRec{op: op, key: append([]byte(nil), key...), value: append([]byte(nil), value...)}) + return nil + }); err != nil { + t.Fatalf("replay: %v", err) + } + return recs +} + +func TestWALAppendReplay(t *testing.T) { + path := filepath.Join(t.TempDir(), "wal.log") + w, err := NewWAL(path) + if err != nil { + t.Fatalf("new wal: %v", err) + } + + if err := w.Append(WALOpPut, []byte("k1"), []byte("v1")); err != nil { + t.Fatalf("append: %v", err) + } + if err := w.Append(WALOpDelete, []byte("k2"), nil); err != nil { + t.Fatalf("append: %v", err) + } + if err := w.Append(WALOpPut, []byte("k3"), []byte{}); err != nil { + t.Fatalf("append empty value: %v", err) + } + + recs := replayAll(t, w) + if len(recs) != 3 { + t.Fatalf("want 3 records, got %d", len(recs)) + } + if recs[0].op != WALOpPut || !bytes.Equal(recs[0].key, []byte("k1")) || !bytes.Equal(recs[0].value, []byte("v1")) { + t.Errorf("rec0 mismatch: %+v", recs[0]) + } + if recs[1].op != WALOpDelete || !bytes.Equal(recs[1].key, []byte("k2")) || len(recs[1].value) != 0 { + t.Errorf("rec1 mismatch: %+v", recs[1]) + } + if recs[2].op != WALOpPut || !bytes.Equal(recs[2].key, []byte("k3")) || len(recs[2].value) != 0 { + t.Errorf("rec2 (empty-value put) mismatch: %+v", recs[2]) + } + _ = w.Close() +} + +func TestWALReopenPersists(t *testing.T) { + path := filepath.Join(t.TempDir(), "wal.log") + + w, err := NewWAL(path) + if err != nil { + t.Fatalf("new wal: %v", err) + } + if err := w.Append(WALOpPut, []byte("a"), []byte("1")); err != nil { + t.Fatalf("append: %v", err) + } + _ = w.Close() + + // 重新打开应追加而非截断,且能重放出旧记录 + w2, err := NewWAL(path) + if err != nil { + t.Fatalf("reopen wal: %v", err) + } + if err := w2.Append(WALOpPut, []byte("b"), []byte("2")); err != nil { + t.Fatalf("append after reopen: %v", err) + } + recs := replayAll(t, w2) + if len(recs) != 2 || !bytes.Equal(recs[0].key, []byte("a")) || !bytes.Equal(recs[1].key, []byte("b")) { + t.Fatalf("reopen replay mismatch: %+v", recs) + } + _ = w2.Close() +} + +func TestWALTornTailStops(t *testing.T) { + path := filepath.Join(t.TempDir(), "wal.log") + w, err := NewWAL(path) + if err != nil { + t.Fatalf("new wal: %v", err) + } + if err := w.Append(WALOpPut, []byte("k1"), []byte("v1")); err != nil { + t.Fatalf("append: %v", err) + } + _ = w.Close() + + // 追加一段残缺记录(只有半个头部),模拟崩溃时的撕裂尾写 + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatalf("open for torn write: %v", err) + } + if _, err := f.Write([]byte{WALOpPut, 0x00, 0x00}); err != nil { + t.Fatalf("torn write: %v", err) + } + _ = f.Close() + + w2, err := NewWAL(path) + if err != nil { + t.Fatalf("reopen: %v", err) + } + recs := replayAll(t, w2) + if len(recs) != 1 || !bytes.Equal(recs[0].key, []byte("k1")) { + t.Fatalf("torn tail should yield exactly the 1 intact record, got %+v", recs) + } + _ = w2.Close() +} + +func TestWALReplayMissingFile(t *testing.T) { + path := filepath.Join(t.TempDir(), "absent", "wal.log") + // 不创建文件,直接构造一个指向不存在路径的 WAL 实例做重放 + w := &WAL{path: path} + recs := replayAll(t, w) + if len(recs) != 0 { + t.Fatalf("missing file should replay 0 records, got %d", len(recs)) + } +} From f0fb1795e01007f57798aa523a94f14405516e46 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 7 Jun 2026 17:47:33 +0800 Subject: [PATCH 3/5] =?UTF-8?q?feat(service):=20standalone=20=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E5=86=99=E7=9B=B4=E8=BF=9E=20WAL+=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E7=BB=95=E8=BF=87=20Raft=EF=BC=8C=E7=BB=9F=E4=B8=80=20Write=20?= =?UTF-8?q?=E5=85=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- service/fsm.go | 83 ++++++++++++++++++++++++++++++++++++++------- service/fsm_test.go | 52 ++++++++++++++++++++++++++++ service/ha.go | 10 +++--- service/router.go | 19 ++--------- 4 files changed, 132 insertions(+), 32 deletions(-) diff --git a/service/fsm.go b/service/fsm.go index 8985746..a32b4d3 100644 --- a/service/fsm.go +++ b/service/fsm.go @@ -22,37 +22,93 @@ type Command struct { type KVServer struct { raft *Raft.Raft storage *storage.Engine + wal *storage.WAL // standalone 模式的存储层 WAL;raft 模式为 nil } -// NewFSM 创建 FSM,自动从全局配置初始化 Raft 和存储 +// NewFSM 创建 FSM,按运行模式初始化存储与持久化路径。 +// standalone:构建存储层 WAL 并重放到 memtable,不启动 Raft。 +// raft:启动 Raft,写经其日志,不使用存储层 WAL。 func NewKVServer() *KVServer { - // 从全局配置获取集群信息 - peers := config.G.Peers - me := config.G.Me - - // 初始化 Raft - raft := Raft.NewRaft(peers, me) - // 初始化存储 memTable := zstorage.NewMemTable() store := storage.NewEngine(memTable) - KVServer := &KVServer{ - raft: raft, + kv := &KVServer{ storage: store, } - return KVServer + if config.G.Mode == config.ModeStandalone { + wal, err := storage.NewWAL(config.G.WALPath) + if err != nil { + slog.Error("failed to open storage WAL", "path", config.G.WALPath, "error", err) + panic("failed to open storage WAL: " + err.Error()) + } + kv.wal = wal + kv.replayWAL() + return kv + } + + // raft 模式:写经 Raft 日志 + kv.raft = Raft.NewRaft(config.G.Peers, config.G.Me) + return kv } -// Run 运行 FSM +// replayWAL 启动时把 WAL 中的记录重放进 memtable(幂等盲写)。 +func (k *KVServer) replayWAL() { + if err := k.wal.Replay(func(op uint8, key, value []byte) error { + switch op { + case storage.WALOpPut: + return k.storage.Put(key, value) + case storage.WALOpDelete: + return k.storage.Delete(key) + } + return nil + }); err != nil { + slog.Error("WAL replay failed", "error", err) + } +} + +// Run 运行 FSM。standalone 模式下写已在 Write 中直接落 WAL+存储,无需 apply 循环。 func (k *KVServer) Run() { + if k.raft == nil { + slog.Info("KVServer started in standalone mode (no Raft apply loop)") + return + } slog.Info("KVServer started, waiting for Raft entries") for entry := range k.raft.GetApplyCh() { k.Apply(entry) } } +// Write 统一写入入口:standalone 直接落 WAL+存储;raft 经日志提交后由 apply 循环落盘。 +func (k *KVServer) Write(cmd Command) error { + if k.raft == nil { + return k.writeStandalone(cmd) + } + index, err := k.AppendEntry(cmd) + if err != nil { + return err + } + return k.WaitForCommit(index) +} + +// writeStandalone 先 append+fsync WAL,再写 memtable,提供单机崩溃恢复。 +func (k *KVServer) writeStandalone(cmd Command) error { + switch cmd.Type { + case "Put": + if err := k.wal.Append(storage.WALOpPut, cmd.Key, cmd.Value); err != nil { + return err + } + return k.storage.Put(cmd.Key, cmd.Value) + case "Delete": + if err := k.wal.Append(storage.WALOpDelete, cmd.Key, nil); err != nil { + return err + } + return k.storage.Delete(cmd.Key) + } + return nil +} + // Apply 应用日志到存储 func (k *KVServer) Apply(entry Raft.LogEntry) { if entry.IsSnapshot { @@ -156,6 +212,9 @@ func (k *KVServer) WaitForCommit(index int) error { // 成为 Leader,且其端口需立即开放以提供本地读;多节点选主窗口内的写失败需由客户端 // 重试关闭,超出本修复范围。 func (k *KVServer) WaitUntilReady() { + if k.raft == nil { + return // standalone:无选主,端口可立即开放 + } if len(config.G.Peers) != 1 { return } diff --git a/service/fsm_test.go b/service/fsm_test.go index 9529b10..04a5f1a 100644 --- a/service/fsm_test.go +++ b/service/fsm_test.go @@ -2,6 +2,7 @@ package service import ( "os" + "path/filepath" "testing" "time" @@ -14,10 +15,13 @@ func setupTest(t *testing.T) (*KVServer, func()) { oldMaxSize := config.G.MaxMemTableSize oldPeers := config.G.Peers oldMe := config.G.Me + oldMode := config.G.Mode // 每个测试用唯一的文件名,避免测试间干扰 testWALPath := "test_service_wal_" + time.Now().Format("20060102150405.000000") + ".log" + // 这些测试覆盖 Raft FSM 路径(Apply/AppendEntry/WaitUntilReady),固定 raft 模式 + config.G.Mode = config.ModeRaft config.G.WALPath = testWALPath config.G.MaxMemTableSize = 100 config.G.Peers = []string{"localhost:9000"} @@ -35,6 +39,7 @@ func setupTest(t *testing.T) (*KVServer, func()) { config.G.MaxMemTableSize = oldMaxSize config.G.Peers = oldPeers config.G.Me = oldMe + config.G.Mode = oldMode } return fsm, cleanup @@ -143,6 +148,53 @@ func TestFSM_UpdateOperation(t *testing.T) { } } +// TestStandalone_WriteAndRecover 验证 standalone 模式:写经 WAL+存储且不启动 Raft, +// 重启后新实例能从同一 WAL 重放恢复(含删除墓碑)。 +func TestStandalone_WriteAndRecover(t *testing.T) { + oldWALPath := config.G.WALPath + oldSSTablePath := config.G.SSTablePath + oldMode := config.G.Mode + oldMaxSize := config.G.MaxMemTableSize + + dir := t.TempDir() + config.G.Mode = config.ModeStandalone + config.G.WALPath = filepath.Join(dir, "wal.log") + config.G.SSTablePath = dir + config.G.MaxMemTableSize = 1 << 20 // 足够大,避免本测试触发刷盘 + defer func() { + config.G.WALPath = oldWALPath + config.G.SSTablePath = oldSSTablePath + config.G.Mode = oldMode + config.G.MaxMemTableSize = oldMaxSize + }() + + kv := NewKVServer() + if kv.raft != nil { + t.Fatal("standalone mode must not start Raft") + } + for _, c := range []Command{ + {Type: "Put", Key: []byte("k1"), Value: []byte("v1")}, + {Type: "Put", Key: []byte("k2"), Value: []byte("v2")}, + {Type: "Delete", Key: []byte("k1")}, + } { + if err := kv.Write(c); err != nil { + t.Fatalf("write %+v: %v", c, err) + } + } + _ = kv.wal.Close() + + // 模拟重启:新实例从同一 WAL 恢复 + kv2 := NewKVServer() + defer kv2.wal.Close() + + if v, err := kv2.Get([]byte("k2")); err != nil || string(v) != "v2" { + t.Fatalf("recover k2: v=%q err=%v", v, err) + } + if _, err := kv2.Get([]byte("k1")); err == nil { + t.Fatal("k1 should remain deleted after recovery") + } +} + // TestWaitUntilReady_SingleNodeAcceptsImmediateWrite 验证 #86 的修复: // 新建的单节点在选主前不是 Leader(写会被拒),WaitUntilReady 返回后必为 Leader, // 此时立即写入应当成功。 diff --git a/service/ha.go b/service/ha.go index 331c05f..6f42f91 100644 --- a/service/ha.go +++ b/service/ha.go @@ -38,10 +38,12 @@ func (h *HA) healthCheckLoop() { // checkHealth 检查健康状态 func (h *HA) checkHealth() { - // 检查 Raft 状态 - state, _ := h.kv.GetRaft().GetState() - if state == 0 { // Follower - // 可以添加更多健康检查逻辑 + // 检查 Raft 状态(standalone 模式无 Raft,直接视为健康) + if raft := h.kv.GetRaft(); raft != nil { + state, _ := raft.GetState() + if state == 0 { // Follower + // 可以添加更多健康检查逻辑 + } } // 简单的健康检查:只要 Raft 状态不是错误状态,就认为是健康的 diff --git a/service/router.go b/service/router.go index 2c8d066..cbcc642 100644 --- a/service/router.go +++ b/service/router.go @@ -100,15 +100,8 @@ func (r *Router) handlePut(data []byte, request banIface.IRequest) { Value: value, } - index, err := r.kv.AppendEntry(cmd) - if err != nil { - slog.Error("[ERROR] handlePut: AppendEntry failed", "error", err) - sendErr(request) - return - } - - if err := r.kv.WaitForCommit(index); err != nil { - slog.Error("[ERROR] handlePut: WaitForCommit failed", "error", err) + if err := r.kv.Write(cmd); err != nil { + slog.Error("[ERROR] handlePut: write failed", "error", err) sendErr(request) return } @@ -166,13 +159,7 @@ func (r *Router) handleDelete(data []byte, request banIface.IRequest) { Key: key, } - index, err := r.kv.AppendEntry(cmd) - if err != nil { - sendErr(request) - return - } - - if err := r.kv.WaitForCommit(index); err != nil { + if err := r.kv.Write(cmd); err != nil { sendErr(request) return } From 1e9fd37b339755b4016cc10300979751b00428c3 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 15:00:01 +0800 Subject: [PATCH 4/5] =?UTF-8?q?feat(net):=20PreHandle=20=E9=92=A9=E5=AD=90?= =?UTF-8?q?=E5=8F=AF=E6=8B=A6=E6=88=AA/=E6=94=B9=E5=86=99=E2=80=94?= =?UTF-8?q?=E2=80=94=E8=BF=94=E5=9B=9E=20HookAction=20=E7=9F=AD=E8=B7=AF?= =?UTF-8?q?=20Handle=EF=BC=8C=E6=96=B0=E5=A2=9E=20SetMsgData=20=E4=B8=8E?= =?UTF-8?q?=20StatusDropped?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 钩子从只读旁观者升级为可丢弃/改写一帧的真实拦截点: - banIface 新增 HookAction(HookPass/HookDrop),IRouter.PreHandle 返回处置决定 - DoMsgHandle 在 HookDrop 时短路 Handle - IRequest.SetMsgData 改写负载并同步 DataLen,供脱敏/裁剪 - service 层 PreHandle 持有「丢弃即回写唯一 StatusDropped 响应」的不变式,避免响应错位 Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/edge-positioning-feasibility.md | 2 +- network/banIface/iRequest.go | 2 ++ network/banIface/iRouter.go | 12 +++++++++++- network/banNet/msgHandle.go | 4 +++- network/banNet/request.go | 6 ++++++ network/banNet/router.go | 2 +- pkg/proto/codes.go | 5 +++-- service/router.go | 25 ++++++++++++++++++------- 8 files changed, 45 insertions(+), 13 deletions(-) diff --git a/docs/edge-positioning-feasibility.md b/docs/edge-positioning-feasibility.md index 2b2ab5a..86d9a72 100644 --- a/docs/edge-positioning-feasibility.md +++ b/docs/edge-positioning-feasibility.md @@ -86,7 +86,7 @@ Multi-Raft 移动设备强一致互备 · 去中心化共识调度 · 解决传 - **Multi-Raft 的工程量**:多 Raft Group + 分片 + 成员变更,是 TiKV/etcd 级别的工程。你现在连**单组多节点**都还没打磨好(README 自承)。 - **诚实的替代解**:要做对等互备,用**异步反熵复制(anti-entropy / gossip)已落盘的数据块**(最终一致),**不要**在高频采集热路径上跑同步强一致 Raft。这样既救了数据,又不拖垮硬件。**建议**:简历上别写"Multi-Raft 边缘强一致互备";写就要能扛住上面三连问。 -### 护城河 2:去中心化"边缘共识调度"(Raft 选个临时指挥官)—— 砍掉 +### 护城河 2:去中心化"边缘共识调度"(Raft 选个临时指挥官) - 这是**产品越界**:存储 + 传输 + **集群编排**,三个产品塞进一个简历条目,每个都是多年工程。 - **技术上也站不住**:Raft 给你的是"一致的日志",**不是一个好的任务分配器**。"谁去采集区域1、谁去清洗区域2"是多机器人任务分配(MRTA)问题,和"对日志达成共识"是两回事。用 Raft 选主 ≠ 解决了调度。 - **建议**:从简历主线删除,最多作为"未来探索"脚注存在。它只会稀释你真正能打的点。 diff --git a/network/banIface/iRequest.go b/network/banIface/iRequest.go index fbcf35b..985d03b 100644 --- a/network/banIface/iRequest.go +++ b/network/banIface/iRequest.go @@ -4,4 +4,6 @@ type IRequest interface { GetConnection() IConnect GetMsgData() []byte GetMsgID() string + // SetMsgData 改写本帧负载,供 PreHandle 钩子做脱敏/裁剪。 + SetMsgData([]byte) } diff --git a/network/banIface/iRouter.go b/network/banIface/iRouter.go index c96f4f5..82285dd 100644 --- a/network/banIface/iRouter.go +++ b/network/banIface/iRouter.go @@ -1,7 +1,17 @@ package banIface +// HookAction 是 PreHandle 钩子对一帧请求的处置决定。 +type HookAction int + +const ( + // HookPass 放行,继续走 Handle。 + HookPass HookAction = iota + // HookDrop 丢弃该帧,跳过 Handle。丢弃方负责回写唯一响应,避免响应错位。 + HookDrop +) + type IRouter interface { - PreHandle(request IRequest) + PreHandle(request IRequest) HookAction Handle(request IRequest) PostHandle(request IRequest) } diff --git a/network/banNet/msgHandle.go b/network/banNet/msgHandle.go index a09338f..f90d199 100644 --- a/network/banNet/msgHandle.go +++ b/network/banNet/msgHandle.go @@ -37,7 +37,9 @@ func (m *MsgHandle) DoMsgHandle(request banIface.IRequest) { fmt.Println("[ERROR] unregistered MsgID:", request.GetMsgID()) return } - handler.PreHandle(request) + if handler.PreHandle(request) == banIface.HookDrop { + return + } handler.Handle(request) handler.PostHandle(request) } diff --git a/network/banNet/request.go b/network/banNet/request.go index 05383ea..e0d0aa6 100644 --- a/network/banNet/request.go +++ b/network/banNet/request.go @@ -19,6 +19,12 @@ func (req *Request) GetMsgData() []byte { return req.msg.GetData() } +// SetMsgData 改写负载并同步长度,避免 DataLen 与实际数据漂移。 +func (req *Request) SetMsgData(data []byte) { + req.msg.SetData(data) + req.msg.SetMsgLen(uint32(len(data))) +} + func (req *Request) GetMsgID() string { return req.msg.GetMsgID() } diff --git a/network/banNet/router.go b/network/banNet/router.go index 24de56e..2307a81 100644 --- a/network/banNet/router.go +++ b/network/banNet/router.go @@ -8,7 +8,7 @@ type BaseRouter struct{} var _ banIface.IRouter = &BaseRouter{} -func (B *BaseRouter) PreHandle(req banIface.IRequest) {} +func (B *BaseRouter) PreHandle(req banIface.IRequest) banIface.HookAction { return banIface.HookPass } func (B *BaseRouter) Handle(req banIface.IRequest) {} diff --git a/pkg/proto/codes.go b/pkg/proto/codes.go index dea99a9..e81ad1d 100644 --- a/pkg/proto/codes.go +++ b/pkg/proto/codes.go @@ -21,6 +21,7 @@ const ( // 响应负载内的状态字段。 const ( - StatusOK = "ok" - StatusError = "error" + StatusOK = "ok" + StatusError = "error" + StatusDropped = "dropped" // 被 PreHandle 钩子按策略丢弃,非传输错误 ) diff --git a/service/router.go b/service/router.go index cbcc642..5796205 100644 --- a/service/router.go +++ b/service/router.go @@ -12,8 +12,8 @@ import ( type Router struct { kv *KVServer - // 前置处理函数 - preHandleFunc func(request banIface.IRequest) + // 前置处理函数;返回 HookDrop 表示丢弃本帧 + preHandleFunc func(request banIface.IRequest) banIface.HookAction // 后置处理函数 postHandleFunc func(request banIface.IRequest) } @@ -26,7 +26,7 @@ func NewRouter(kv *KVServer) *Router { } // SetPreHandle 设置前置处理函数 -func (r *Router) SetPreHandle(f func(request banIface.IRequest)) { +func (r *Router) SetPreHandle(f func(request banIface.IRequest) banIface.HookAction) { r.preHandleFunc = f } @@ -35,11 +35,17 @@ func (r *Router) SetPostHandle(f func(request banIface.IRequest)) { r.postHandleFunc = f } -// PreHandle 前置处理 -func (r *Router) PreHandle(request banIface.IRequest) { - if r.preHandleFunc != nil { - r.preHandleFunc(request) +// PreHandle 前置处理。返回 HookDrop 时由本函数回写唯一的「丢弃」响应, +// 使纯请求-响应协议不发生响应错位(见 OnConnStart 注释)。 +func (r *Router) PreHandle(request banIface.IRequest) banIface.HookAction { + if r.preHandleFunc == nil { + return banIface.HookPass } + action := r.preHandleFunc(request) + if action == banIface.HookDrop { + sendDropped(request) + } + return action } // Handle 处理请求 @@ -75,6 +81,11 @@ func sendOK(req banIface.IRequest) { req.GetConnection().SendBuffMsg(proto.MsgRespOK, statusPayload(proto.StatusOK)) } +// sendDropped 写回「被钩子按策略丢弃」响应;保证每请求恰好一个响应。 +func sendDropped(req banIface.IRequest) { + req.GetConnection().SendBuffMsg(proto.MsgRespErr, statusPayload(proto.StatusDropped)) +} + // handlePut 处理 PUT 操作 func (r *Router) handlePut(data []byte, request banIface.IRequest) { // 解析数据格式:key_len + key + value_len + value From 6f3bbd752f2a53fd40a04a2c4beef178710dea39 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 15:03:36 +0800 Subject: [PATCH 5/5] =?UTF-8?q?feat(service):=20=E6=8C=82=E8=BD=BD?= =?UTF-8?q?=E7=9C=9F=E5=AE=9E=E9=87=87=E9=9B=86=E5=85=A5=E5=8F=A3=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E9=92=A9=E5=AD=90=EF=BC=88=E4=B8=A2=E7=95=B8=E5=BD=A2?= =?UTF-8?q?=E5=B8=A7+=E6=97=B6=E9=97=B4=E6=88=B3=E5=8D=95=E8=B0=83?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C+=E5=AD=97=E6=AE=B5=E8=84=B1=E6=95=8F?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 service/ingesthook.Filter 并在两个 Server main 上用 SetPreHandle 挂载, 把「落盘前可编程预处理」从挂载点变成端到端可演示能力: - 丢弃畸形 PUT 帧(长度字段与数据不符 / value 超限) - 按设备 best-effort 时间戳单调校验,丢回退/重放帧(work-stealing 下乱序 非顺序保证,已在注释说明,可经 dropBackward 关闭) - 对 JSON 中 gps/user_id 等敏感字段脱敏,改写时重建 valueLen 前缀 仅针对 PUT,GET/DELETE 原样放行;钩子不触碰连接,丢弃响应由 Router 统一回写。 Co-Authored-By: Claude Opus 4.8 (1M context) --- Server/server.go | 6 ++ Server/server_pprof.go | 6 ++ service/ingesthook/filter.go | 152 ++++++++++++++++++++++++++++++ service/ingesthook/filter_test.go | 130 +++++++++++++++++++++++++ 4 files changed, 294 insertions(+) create mode 100644 service/ingesthook/filter.go create mode 100644 service/ingesthook/filter_test.go diff --git a/Server/server.go b/Server/server.go index c37941e..2a37702 100644 --- a/Server/server.go +++ b/Server/server.go @@ -6,6 +6,7 @@ import ( "github.com/NeverENG/BanDB/network/banNet" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" + "github.com/NeverENG/BanDB/service/ingesthook" ) func main() { @@ -24,6 +25,11 @@ func main() { // 创建路由 router := service.NewRouter(KVServer) + // 挂载采集入口过滤钩子:落盘前丢弃畸形帧、按设备做 best-effort 时间戳 + // 单调校验、对敏感字段脱敏。 + filter := ingesthook.NewFilter([]string{"gps", "user_id"}, 0, true) + router.SetPreHandle(filter.Handle) + // 注册路由 server.AddRouter(proto.MsgPut, router) server.AddRouter(proto.MsgGet, router) diff --git a/Server/server_pprof.go b/Server/server_pprof.go index 0824f8a..31c76ce 100644 --- a/Server/server_pprof.go +++ b/Server/server_pprof.go @@ -10,6 +10,7 @@ import ( "github.com/NeverENG/BanDB/network/banNet" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" + "github.com/NeverENG/BanDB/service/ingesthook" ) func main() { @@ -34,6 +35,11 @@ func main() { // 创建路由 router := service.NewRouter(KVServer) + // 挂载采集入口过滤钩子:落盘前丢弃畸形帧、按设备做 best-effort 时间戳 + // 单调校验、对敏感字段脱敏。 + filter := ingesthook.NewFilter([]string{"gps", "user_id"}, 0, true) + router.SetPreHandle(filter.Handle) + // 注册路由 server.AddRouter(proto.MsgPut, router) server.AddRouter(proto.MsgGet, router) diff --git a/service/ingesthook/filter.go b/service/ingesthook/filter.go new file mode 100644 index 0000000..e437e39 --- /dev/null +++ b/service/ingesthook/filter.go @@ -0,0 +1,152 @@ +// Package ingesthook 提供一个挂在采集入口的真实 PreHandle 过滤钩子示例: +// 在数据落盘前完成「丢弃畸形帧 + 时间戳单调性校验 + 字段脱敏」三件事, +// 把「可编程边缘采集缓冲网关」从挂载点变成有内容的演示。 +// +// 钩子只读取并改写请求负载,绝不向连接写响应——「丢弃即回写唯一响应」的 +// 不变式由 service.Router.PreHandle 统一持有(见 service/router.go)。 +package ingesthook + +import ( + "encoding/binary" + "encoding/json" + "strconv" + "strings" + "sync" + + "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/proto" +) + +// redactedValue 是脱敏字段被替换成的 JSON 值。 +var redactedValue = json.RawMessage(`"[REDACTED]"`) + +// Filter 是采集入口过滤器。零值不可用,请用 NewFilter 构造。 +type Filter struct { + // redactFields 命中的 JSON 字段会被脱敏改写。 + redactFields []string + // maxValueLen 限制 value 字节数,超过视为畸形丢弃;<=0 表示不限。 + maxValueLen int + // dropBackward 为 true 时,时间戳回退/重放的帧按设备丢弃。 + dropBackward bool + + mu sync.Mutex + // lastTS 记录每个设备最近一次接受的时间戳,用于 best-effort 单调校验。 + lastTS map[string]int64 +} + +// NewFilter 构造过滤器。redactFields 为需脱敏的 JSON 字段名;maxValueLen<=0 +// 表示不限 value 长度;dropBackward 控制是否丢弃时间戳回退帧。 +func NewFilter(redactFields []string, maxValueLen int, dropBackward bool) *Filter { + return &Filter{ + redactFields: redactFields, + maxValueLen: maxValueLen, + dropBackward: dropBackward, + lastTS: make(map[string]int64), + } +} + +// Handle 实现 PreHandle 钩子签名。返回 HookDrop 表示丢弃本帧。 +func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction { + // 钩子只针对写入帧:GET/DELETE 的负载格式不同,放行不动。 + if req.GetMsgID() != proto.MsgPut { + return banIface.HookPass + } + + key, value, ok := parsePut(req.GetMsgData()) + if !ok { + return banIface.HookDrop // 畸形帧:长度字段与实际数据不符 + } + + if f.maxValueLen > 0 && len(value) > f.maxValueLen { + return banIface.HookDrop // 畸形帧:value 超过上限 + } + + // 时间戳单调性校验(best-effort):DoMsgHandle 的 work-stealing 在背压下 + // 可能让同一连接的帧落到不同 worker 而乱序,此处只做尽力而为的回退/重放 + // 拦截,不是顺序保证;DropBackward 关闭时仅放行不校验。 + if f.dropBackward { + if device, ts, ok := parseKey(key); ok { + f.mu.Lock() + last, seen := f.lastTS[device] + if seen && ts <= last { + f.mu.Unlock() + return banIface.HookDrop // 回退/重放帧 + } + f.lastTS[device] = ts + f.mu.Unlock() + } + } + + // 字段脱敏:命中则改写 value 并重建整帧(含新的 valueLen 前缀)。 + if newValue, changed := redact(value, f.redactFields); changed { + req.SetMsgData(encodePut(key, newValue)) + } + + return banIface.HookPass +} + +// parsePut 解析 PUT 负载 keyLen(u32 LE)+valueLen(u32 LE)+key+value。 +func parsePut(data []byte) (key, value []byte, ok bool) { + if len(data) < 8 { + return nil, nil, false + } + keyLen := int(binary.LittleEndian.Uint32(data[0:4])) + valueLen := int(binary.LittleEndian.Uint32(data[4:8])) + if keyLen < 0 || valueLen < 0 || 8+keyLen+valueLen > len(data) { + return nil, nil, false + } + key = data[8 : 8+keyLen] + value = data[8+keyLen : 8+keyLen+valueLen] + return key, value, true +} + +// encodePut 按 PUT 负载格式重建一帧。 +func encodePut(key, value []byte) []byte { + buf := make([]byte, 8+len(key)+len(value)) + binary.LittleEndian.PutUint32(buf[0:4], uint32(len(key))) + binary.LittleEndian.PutUint32(buf[4:8], uint32(len(value))) + copy(buf[8:], key) + copy(buf[8+len(key):], value) + return buf +} + +// parseKey 从形如 "imu:dev0:" 的 key 中切出设备标识(末段之前的全部) +// 与数值时间戳。不符合约定的 key 返回 ok=false(跳过单调校验,不丢弃)。 +func parseKey(key []byte) (device string, ts int64, ok bool) { + s := string(key) + i := strings.LastIndexByte(s, ':') + if i <= 0 || i == len(s)-1 { + return "", 0, false + } + ts, err := strconv.ParseInt(s[i+1:], 10, 64) + if err != nil { + return "", 0, false + } + return s[:i], ts, true +} + +// redact 把 value(JSON 对象)中命中 fields 的字段替换为脱敏占位符; +// 非 JSON 或未命中任何字段时原样返回 changed=false。其余字段保留原始字节。 +func redact(value []byte, fields []string) (newValue []byte, changed bool) { + if len(fields) == 0 { + return value, false + } + var m map[string]json.RawMessage + if err := json.Unmarshal(value, &m); err != nil { + return value, false + } + for _, field := range fields { + if _, present := m[field]; present { + m[field] = redactedValue + changed = true + } + } + if !changed { + return value, false + } + out, err := json.Marshal(m) + if err != nil { + return value, false + } + return out, true +} diff --git a/service/ingesthook/filter_test.go b/service/ingesthook/filter_test.go new file mode 100644 index 0000000..3ac0a48 --- /dev/null +++ b/service/ingesthook/filter_test.go @@ -0,0 +1,130 @@ +package ingesthook + +import ( + "bytes" + "encoding/json" + "testing" + + "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/proto" +) + +// fakeReq 是 banIface.IRequest 的测试替身。钩子不触碰连接,GetConnection 返回 nil。 +type fakeReq struct { + msgID string + data []byte +} + +func (f *fakeReq) GetConnection() banIface.IConnect { return nil } +func (f *fakeReq) GetMsgData() []byte { return f.data } +func (f *fakeReq) GetMsgID() string { return f.msgID } +func (f *fakeReq) SetMsgData(d []byte) { f.data = d } + +func putReq(key, value string) *fakeReq { + return &fakeReq{msgID: proto.MsgPut, data: encodePut([]byte(key), []byte(value))} +} + +// GET/DELETE 必须原样放行——它们的负载没有 valueLen 字段,误判会丢掉合法读。 +func TestHandle_NonPutPassesThrough(t *testing.T) { + f := NewFilter([]string{"gps"}, 0, true) + // GET 负载是 keyLen+key,前 8 字节是 key 的一部分,绝不能被当 PUT 解析。 + req := &fakeReq{msgID: proto.MsgGet, data: []byte("anything")} + if got := f.Handle(req); got != banIface.HookPass { + t.Fatalf("GET 应放行,得到 %v", got) + } +} + +func TestHandle_MalformedDropped(t *testing.T) { + f := NewFilter(nil, 0, false) + req := &fakeReq{msgID: proto.MsgPut, data: []byte{1, 2, 3}} // 不足 8 字节 + if got := f.Handle(req); got != banIface.HookDrop { + t.Fatalf("畸形帧应丢弃,得到 %v", got) + } +} + +func TestHandle_OversizedDropped(t *testing.T) { + f := NewFilter(nil, 4, false) + if got := f.Handle(putReq("imu:dev0:1", "toolong")); got != banIface.HookDrop { + t.Fatalf("超长 value 应丢弃,得到 %v", got) + } + if got := f.Handle(putReq("imu:dev0:2", "ok")); got != banIface.HookPass { + t.Fatalf("正常 value 应放行,得到 %v", got) + } +} + +func TestHandle_MonotonicDrop(t *testing.T) { + f := NewFilter(nil, 0, true) + if got := f.Handle(putReq("imu:dev0:100", "{}")); got != banIface.HookPass { + t.Fatalf("首帧应放行,得到 %v", got) + } + if got := f.Handle(putReq("imu:dev0:99", "{}")); got != banIface.HookDrop { + t.Fatalf("回退帧应丢弃,得到 %v", got) + } + if got := f.Handle(putReq("imu:dev0:100", "{}")); got != banIface.HookDrop { + t.Fatalf("重放(相等)帧应丢弃,得到 %v", got) + } + if got := f.Handle(putReq("imu:dev0:101", "{}")); got != banIface.HookPass { + t.Fatalf("前进帧应放行,得到 %v", got) + } + // 不同设备各自独立计水位。 + if got := f.Handle(putReq("imu:dev1:1", "{}")); got != banIface.HookPass { + t.Fatalf("另一设备首帧应放行,得到 %v", got) + } +} + +func TestHandle_MonotonicDisabled(t *testing.T) { + f := NewFilter(nil, 0, false) + f.Handle(putReq("imu:dev0:100", "{}")) + if got := f.Handle(putReq("imu:dev0:99", "{}")); got != banIface.HookPass { + t.Fatalf("关闭单调校验后回退帧应放行,得到 %v", got) + } +} + +// 不符合 imu:dev:ts 约定的 key 不参与单调校验,应放行。 +func TestHandle_UnconventionalKeyPasses(t *testing.T) { + f := NewFilter(nil, 0, true) + if got := f.Handle(putReq("plainkey", "{}")); got != banIface.HookPass { + t.Fatalf("无约定 key 应放行,得到 %v", got) + } +} + +func TestHandle_RedactRewritesPayload(t *testing.T) { + f := NewFilter([]string{"gps", "user_id"}, 0, false) + req := putReq("imu:dev0:1", `{"ax":0.01,"gps":"39.9,116.4","user_id":"u123"}`) + if got := f.Handle(req); got != banIface.HookPass { + t.Fatalf("脱敏帧应放行,得到 %v", got) + } + + // 钩子改写后,整帧必须可按 PUT 格式重新解析(valueLen 前缀已重建)。 + key, value, ok := parsePut(req.GetMsgData()) + if !ok { + t.Fatal("改写后的帧无法解析,valueLen 前缀可能未重建") + } + if !bytes.Equal(key, []byte("imu:dev0:1")) { + t.Fatalf("key 不应被改动,得到 %q", key) + } + + var m map[string]any + if err := json.Unmarshal(value, &m); err != nil { + t.Fatalf("改写后的 value 不是合法 JSON: %v", err) + } + if m["gps"] != "[REDACTED]" || m["user_id"] != "[REDACTED]" { + t.Fatalf("敏感字段未脱敏: %v", m) + } + if m["ax"] != 0.01 { + t.Fatalf("非敏感字段应保留: %v", m) + } +} + +// 非 JSON 的 value 配置了脱敏字段时应原样放行,不丢弃、不改写。 +func TestHandle_NonJSONValueUnchanged(t *testing.T) { + f := NewFilter([]string{"gps"}, 0, false) + req := putReq("imu:dev0:1", "rawbinaryblob") + if got := f.Handle(req); got != banIface.HookPass { + t.Fatalf("非 JSON value 应放行,得到 %v", got) + } + _, value, _ := parsePut(req.GetMsgData()) + if !bytes.Equal(value, []byte("rawbinaryblob")) { + t.Fatalf("非 JSON value 不应被改动,得到 %q", value) + } +}