Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/NeverENG/BanDB/network/banNet"
"github.com/NeverENG/BanDB/pkg/proto"
"github.com/NeverENG/BanDB/service"
"github.com/NeverENG/BanDB/service/ingesthook"
)

func main() {
Expand All @@ -24,6 +25,11 @@ func main() {
// 创建路由
router := service.NewRouter(KVServer)

// 挂载采集入口过滤钩子:落盘前丢弃畸形帧、按设备做 best-effort 时间戳
// 单调校验、对敏感字段脱敏。
filter := ingesthook.NewFilter([]string{"gps", "user_id"}, 0, true)
router.SetPreHandle(filter.Handle)

// 注册路由
server.AddRouter(proto.MsgPut, router)
server.AddRouter(proto.MsgGet, router)
Expand Down
6 changes: 6 additions & 0 deletions Server/server_pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/NeverENG/BanDB/network/banNet"
"github.com/NeverENG/BanDB/pkg/proto"
"github.com/NeverENG/BanDB/service"
"github.com/NeverENG/BanDB/service/ingesthook"
)

func main() {
Expand All @@ -34,6 +35,11 @@ func main() {
// 创建路由
router := service.NewRouter(KVServer)

// 挂载采集入口过滤钩子:落盘前丢弃畸形帧、按设备做 best-effort 时间戳
// 单调校验、对敏感字段脱敏。
filter := ingesthook.NewFilter([]string{"gps", "user_id"}, 0, true)
router.SetPreHandle(filter.Handle)

// 注册路由
server.AddRouter(proto.MsgPut, router)
server.AddRouter(proto.MsgGet, router)
Expand Down
1 change: 1 addition & 0 deletions config/config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"Mode": "standalone",
"MaxMemTableSize": 10000,
"WALPath": "../../log/wal.log",
"SSTablePath": "../../log",
Expand Down
37 changes: 36 additions & 1 deletion config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/NeverENG/BanDB/network/banIface"
)

// 运行模式取值
const (
ModeStandalone = "standalone" // 单机:写经存储层 WAL,不启动 Raft
ModeRaft = "raft" // 集群:写经 Raft 日志
)

type GlobalConfig struct {
Name string
Port int
Expand All @@ -38,6 +44,10 @@ type GlobalConfig struct {
MaxMemTableP float64
MaxMemTableLevel int

// Mode 运行模式:"standalone"(单机 WAL,不启动 Raft)或 "raft"(集群,写经 Raft 日志)。
// 留空时按 len(Peers) 推断:1→standalone,>1→raft。
Mode string

// Raft 集群配置
Peers []string // 集群中所有节点的地址
Me int // 当前节点在 Peers 中的索引(0-based)
Expand Down Expand Up @@ -162,13 +172,38 @@ func (g *GlobalConfig) ParseFlags() {
}
}

g.resolveMode()

// standalone 不启动 Raft,无需 Me/Peers 校验
if g.Mode == ModeStandalone {
slog.Info("config finalized", "mode", g.Mode)
return
}

// 验证配置
if g.Me < 0 || g.Me >= len(g.Peers) {
slog.Error("invalid me value", "me", g.Me, "peers_len", len(g.Peers))
panic("invalid me value")
}

slog.Info("config finalized", "peers", g.Peers, "me", g.Me)
slog.Info("config finalized", "mode", g.Mode, "peers", g.Peers, "me", g.Me)
}

// resolveMode 归一化运行模式:显式取值优先,留空时按 Peers 数量推断。
func (g *GlobalConfig) resolveMode() {
switch g.Mode {
case ModeStandalone, ModeRaft:
// 显式指定,尊重之
case "":
if len(g.Peers) > 1 {
g.Mode = ModeRaft
} else {
g.Mode = ModeStandalone
}
default:
slog.Error("invalid mode", "mode", g.Mode)
panic("invalid mode: " + g.Mode)
}
}

func meFlagArgs(args []string) []string {
Expand Down
10 changes: 7 additions & 3 deletions config/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,16 @@ func TestConfigInitWithFile(t *testing.T) {
func TestPeerValidation(t *testing.T) {
testCases := []struct {
name string
mode string
peers []string
me int
wantErr bool
}{
{"valid single node", []string{"localhost:8080"}, 0, false},
{"valid multi node", []string{"localhost:8080", "localhost:8081", "localhost:8082"}, 1, false},
{"invalid out of range", []string{"localhost:8080"}, 1, true},
{"valid single node", ModeRaft, []string{"localhost:8080"}, 0, false},
{"valid multi node", ModeRaft, []string{"localhost:8080", "localhost:8081", "localhost:8082"}, 1, false},
{"invalid out of range", ModeRaft, []string{"localhost:8080"}, 1, true},
// standalone 不启动 Raft,越界的 Me 不再触发校验
{"standalone skips validation", ModeStandalone, []string{"localhost:8080"}, 1, false},
}

for _, tc := range testCases {
Expand All @@ -205,6 +208,7 @@ func TestPeerValidation(t *testing.T) {
os.Args = []string{"test", "-me", strconv.Itoa(tc.me)}

g := &GlobalConfig{
Mode: tc.mode,
Peers: tc.peers,
}

Expand Down
2 changes: 1 addition & 1 deletion docs/edge-positioning-feasibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Multi-Raft 移动设备强一致互备 · 去中心化共识调度 · 解决传
- **Multi-Raft 的工程量**:多 Raft Group + 分片 + 成员变更,是 TiKV/etcd 级别的工程。你现在连**单组多节点**都还没打磨好(README 自承)。
- **诚实的替代解**:要做对等互备,用**异步反熵复制(anti-entropy / gossip)已落盘的数据块**(最终一致),**不要**在高频采集热路径上跑同步强一致 Raft。这样既救了数据,又不拖垮硬件。**建议**:简历上别写"Multi-Raft 边缘强一致互备";写就要能扛住上面三连问。

### 护城河 2:去中心化"边缘共识调度"(Raft 选个临时指挥官)—— 砍掉
### 护城河 2:去中心化"边缘共识调度"(Raft 选个临时指挥官)
- 这是**产品越界**:存储 + 传输 + **集群编排**,三个产品塞进一个简历条目,每个都是多年工程。
- **技术上也站不住**:Raft 给你的是"一致的日志",**不是一个好的任务分配器**。"谁去采集区域1、谁去清洗区域2"是多机器人任务分配(MRTA)问题,和"对日志达成共识"是两回事。用 Raft 选主 ≠ 解决了调度。
- **建议**:从简历主线删除,最多作为"未来探索"脚注存在。它只会稀释你真正能打的点。
Expand Down
2 changes: 2 additions & 0 deletions network/banIface/iRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ type IRequest interface {
GetConnection() IConnect
GetMsgData() []byte
GetMsgID() string
// SetMsgData 改写本帧负载,供 PreHandle 钩子做脱敏/裁剪。
SetMsgData([]byte)
}
12 changes: 11 additions & 1 deletion network/banIface/iRouter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package banIface

// HookAction 是 PreHandle 钩子对一帧请求的处置决定。
type HookAction int

const (
// HookPass 放行,继续走 Handle。
HookPass HookAction = iota
// HookDrop 丢弃该帧,跳过 Handle。丢弃方负责回写唯一响应,避免响应错位。
HookDrop
)

type IRouter interface {
PreHandle(request IRequest)
PreHandle(request IRequest) HookAction
Handle(request IRequest)
PostHandle(request IRequest)
}
4 changes: 3 additions & 1 deletion network/banNet/msgHandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (m *MsgHandle) DoMsgHandle(request banIface.IRequest) {
fmt.Println("[ERROR] unregistered MsgID:", request.GetMsgID())
return
}
handler.PreHandle(request)
if handler.PreHandle(request) == banIface.HookDrop {
return
}
handler.Handle(request)
handler.PostHandle(request)
}
Expand Down
6 changes: 6 additions & 0 deletions network/banNet/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func (req *Request) GetMsgData() []byte {
return req.msg.GetData()
}

// SetMsgData 改写负载并同步长度,避免 DataLen 与实际数据漂移。
func (req *Request) SetMsgData(data []byte) {
req.msg.SetData(data)
req.msg.SetMsgLen(uint32(len(data)))
}

func (req *Request) GetMsgID() string {
return req.msg.GetMsgID()
}
Expand Down
2 changes: 1 addition & 1 deletion network/banNet/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type BaseRouter struct{}

var _ banIface.IRouter = &BaseRouter{}

func (B *BaseRouter) PreHandle(req banIface.IRequest) {}
func (B *BaseRouter) PreHandle(req banIface.IRequest) banIface.HookAction { return banIface.HookPass }

func (B *BaseRouter) Handle(req banIface.IRequest) {}

Expand Down
5 changes: 3 additions & 2 deletions pkg/proto/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (

// 响应负载内的状态字段。
const (
StatusOK = "ok"
StatusError = "error"
StatusOK = "ok"
StatusError = "error"
StatusDropped = "dropped" // 被 PreHandle 钩子按策略丢弃,非传输错误
)
83 changes: 71 additions & 12 deletions service/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,93 @@ type Command struct {
type KVServer struct {
raft *Raft.Raft
storage *storage.Engine
wal *storage.WAL // standalone 模式的存储层 WAL;raft 模式为 nil
}

// NewFSM 创建 FSM,自动从全局配置初始化 Raft 和存储
// NewFSM 创建 FSM,按运行模式初始化存储与持久化路径。
// standalone:构建存储层 WAL 并重放到 memtable,不启动 Raft。
// raft:启动 Raft,写经其日志,不使用存储层 WAL。
func NewKVServer() *KVServer {
// 从全局配置获取集群信息
peers := config.G.Peers
me := config.G.Me

// 初始化 Raft
raft := Raft.NewRaft(peers, me)

// 初始化存储
memTable := zstorage.NewMemTable()
store := storage.NewEngine(memTable)

KVServer := &KVServer{
raft: raft,
kv := &KVServer{
storage: store,
}

return KVServer
if config.G.Mode == config.ModeStandalone {
wal, err := storage.NewWAL(config.G.WALPath)
if err != nil {
slog.Error("failed to open storage WAL", "path", config.G.WALPath, "error", err)
panic("failed to open storage WAL: " + err.Error())
}
kv.wal = wal
kv.replayWAL()
return kv
}

// raft 模式:写经 Raft 日志
kv.raft = Raft.NewRaft(config.G.Peers, config.G.Me)
return kv
}

// Run 运行 FSM
// replayWAL 启动时把 WAL 中的记录重放进 memtable(幂等盲写)。
func (k *KVServer) replayWAL() {
if err := k.wal.Replay(func(op uint8, key, value []byte) error {
switch op {
case storage.WALOpPut:
return k.storage.Put(key, value)
case storage.WALOpDelete:
return k.storage.Delete(key)
}
return nil
}); err != nil {
slog.Error("WAL replay failed", "error", err)
}
}

// Run 运行 FSM。standalone 模式下写已在 Write 中直接落 WAL+存储,无需 apply 循环。
func (k *KVServer) Run() {
if k.raft == nil {
slog.Info("KVServer started in standalone mode (no Raft apply loop)")
return
}
slog.Info("KVServer started, waiting for Raft entries")
for entry := range k.raft.GetApplyCh() {
k.Apply(entry)
}
}

// Write 统一写入入口:standalone 直接落 WAL+存储;raft 经日志提交后由 apply 循环落盘。
func (k *KVServer) Write(cmd Command) error {
if k.raft == nil {
return k.writeStandalone(cmd)
}
index, err := k.AppendEntry(cmd)
if err != nil {
return err
}
return k.WaitForCommit(index)
}

// writeStandalone 先 append+fsync WAL,再写 memtable,提供单机崩溃恢复。
func (k *KVServer) writeStandalone(cmd Command) error {
switch cmd.Type {
case "Put":
if err := k.wal.Append(storage.WALOpPut, cmd.Key, cmd.Value); err != nil {
return err
}
return k.storage.Put(cmd.Key, cmd.Value)
case "Delete":
if err := k.wal.Append(storage.WALOpDelete, cmd.Key, nil); err != nil {
return err
}
return k.storage.Delete(cmd.Key)
}
return nil
}

// Apply 应用日志到存储
func (k *KVServer) Apply(entry Raft.LogEntry) {
if entry.IsSnapshot {
Expand Down Expand Up @@ -156,6 +212,9 @@ func (k *KVServer) WaitForCommit(index int) error {
// 成为 Leader,且其端口需立即开放以提供本地读;多节点选主窗口内的写失败需由客户端
// 重试关闭,超出本修复范围。
func (k *KVServer) WaitUntilReady() {
if k.raft == nil {
return // standalone:无选主,端口可立即开放
}
if len(config.G.Peers) != 1 {
return
}
Expand Down
Loading
Loading