-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessor.go
More file actions
191 lines (161 loc) · 5.46 KB
/
processor.go
File metadata and controls
191 lines (161 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package batchflow
import (
"context"
"database/sql"
"errors"
"time"
"github.com/redis/go-redis/v9"
)
type Operations []any
// BatchProcessor 批量处理器接口 - SQL数据库的核心处理逻辑
type BatchProcessor interface {
// GenerateOperations 生成批量操作
GenerateOperations(ctx context.Context, schema SchemaInterface, data []map[string]any) (operations Operations, err error)
// ExecuteOperations 执行批量操作
ExecuteOperations(ctx context.Context, operations Operations) error
}
// TimeOutCapable 扩展接口:支持超时设置(自类型泛型)
type TimeOutCapable[T any] interface {
WithTimeout(time.Duration) T
}
// SQLBatchProcessor SQL数据库批量处理器
// 实现 BatchProcessor 接口,专注于SQL数据库的核心处理逻辑
type SQLBatchProcessor struct {
db *sql.DB // 数据库连接
driver SQLDriver // SQL生成器(数据库特定)
timeout time.Duration
}
var _ BatchProcessor = (*SQLBatchProcessor)(nil)
// NewSQLBatchProcessor 创建SQL批量处理器
// 参数:
// - db: 数据库连接(用户管理连接池)
// - driver: 数据库特定的SQL生成器
func NewSQLBatchProcessor(db *sql.DB, driver SQLDriver) *SQLBatchProcessor {
return &SQLBatchProcessor{
db: db,
driver: driver,
}
}
func (bp *SQLBatchProcessor) WithTimeout(timeout time.Duration) *SQLBatchProcessor {
bp.timeout = timeout
return bp
}
func (bp *SQLBatchProcessor) GenerateOperations(ctx context.Context, schema SchemaInterface, data []map[string]any) (operations Operations, err error) {
s, ok := schema.(*SQLSchema)
if !ok {
return nil, errors.New("schema is not a SQLSchema")
}
sql, args, innerErr := bp.driver.GenerateInsertSQL(ctx, s, data)
if innerErr != nil {
return nil, innerErr
}
operations = append(operations, sql)
operations = append(operations, args...)
return operations, nil
}
/*
SQL 执行语义:
- 在设置了 bp.timeout 时,使用 context.WithTimeoutCause 派生子 ctx(具体 cause 如 "execute batch timeout")。
- 当子 ctx 达到超时时,驱动通常返回 context.DeadlineExceeded;本处理器会读取 context.Cause(ctx) 并原样返回该 cause,
以便上层执行器的重试分类器可以区分“处理器内部超时”,按需实施重试与退避。
- 安全性:在执行前校验空 operations,避免越界;不持久化/返回子 ctx,defer cancel() 安全。
*/
func (bp *SQLBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error {
if bp.timeout > 0 {
ctxTimeout, cancel := context.WithTimeoutCause(ctx, bp.timeout, errors.New("execute batch timeout"))
defer cancel()
ctx = ctxTimeout
}
if len(operations) < 1 {
return errors.New("empty operations")
}
if sql, ok := operations[0].(string); ok {
args := operations[1:]
_, err := bp.db.ExecContext(ctx, sql, args...)
// processor 会捕获超时异常, 可以出发重试
if err != nil && errors.Is(err, context.DeadlineExceeded) {
if cause := context.Cause(ctx); cause != nil {
return cause
}
}
return err
}
return errors.New("invalid operation type")
}
// RedisBatchProcessor Redis批量处理器
// 实现 BatchProcessor 接口,专注于Redis的核心处理逻辑
type RedisBatchProcessor struct {
client *redis.Client // Redis客户端连接
driver RedisDriver // Redis操作生成器
timeout time.Duration
}
var _ BatchProcessor = (*RedisBatchProcessor)(nil)
// NewRedisBatchProcessor 创建Redis批量处理器
// 参数:
// - client: Redis客户端连接
// - driver: Redis操作生成器
func NewRedisBatchProcessor(client *redis.Client, driver RedisDriver) *RedisBatchProcessor {
return &RedisBatchProcessor{
client: client,
driver: driver,
}
}
func (rp *RedisBatchProcessor) WithTimeout(timeout time.Duration) *RedisBatchProcessor {
rp.timeout = timeout
return rp
}
// GenerateOperations 执行批量操作
func (rp *RedisBatchProcessor) GenerateOperations(ctx context.Context, schema SchemaInterface, data []map[string]any) (operations Operations, err error) {
s, ok := schema.(*Schema)
if !ok {
return nil, errors.New("schema is not a Schema")
}
cmds, innerErr := rp.driver.GenerateCmds(ctx, s, data)
if innerErr != nil {
return nil, innerErr
}
for _, cmd := range cmds {
operations = append(operations, cmd)
}
return operations, nil
}
/*
Redis 执行与快速退出:
- 在设置了 rp.timeout 时,使用 context.WithTimeoutCause 限定执行时限。
- 大批量 operations 时,在循环内检查 ctx(可每次或每 N 次)以快速响应取消/超时,避免无谓迭代开销。
- Pipeline 在本函数内构建并执行,不跨越函数生命周期,defer cancel() 安全。
*/
func (rp *RedisBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error {
if rp.timeout > 0 {
ctxTimeout, cancel := context.WithTimeoutCause(ctx, rp.timeout, errors.New("execute batch timeout"))
defer cancel()
ctx = ctxTimeout
}
// 使用Pipeline批量执行
pipeline := rp.client.Pipeline()
for _, operation := range operations {
if ctx.Err() != nil {
return ctx.Err()
}
if cmd, ok := operation.(RedisCmd); ok {
pipeline.Do(ctx, cmd...)
}
}
// 执行Pipeline
cmds, err := pipeline.Exec(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
if cause := context.Cause(ctx); cause != nil {
return cause
}
}
return err
}
// 检查每个命令的执行结果
for _, cmd := range cmds {
if cmd.Err() != nil {
err = errors.Join(err, cmd.Err())
}
}
return err
}