Redis 扣库存成功 → 发奖(MySQL 用户余额 + num)
这是一个典型的异构数据源分布式事务问题:Redis(库存)和 MySQL(余额)无法参与同一个本地事务。本项目用 Go 语言完整实现了三种主流的分布式事务解决方案,每种方案均包含生产级的容错设计。
客户端请求
│
▼
┌──────────┐ ┌──────────┐
│ Redis │ │ MySQL │
│ 库存 -N │ ──→ │ 余额 +N │
└──────────┘ └──────────┘
异构数据源,无法共享本地事务
distributed-tx/
├── go.mod
│
├── common/ # 公共模块
│ ├── model.go # DeductRewardRequest 请求模型
│ └── lua.go # Redis Lua 脚本(扣库存、TCC 冻结/恢复)
│
├── barrier/ # 事务屏障(TCC + SAGA 共用)
│ ├── schema.sql # branch_tx_status 建表 DDL
│ └── barrier.go # BranchBarrier:防悬挂 + 空回滚
│
├── local_msg_mq/ # 方案一:本地消息表 + 可靠消息队列
│ ├── schema.sql # local_messages + users 建表 DDL
│ ├── model.go # LocalMessage / RewardMessage 模型
│ ├── stock_service.go # 核心:Redis 扣库存 + MySQL 写本地消息
│ ├── consumer.go # MQ 消费者:幂等更新余额
│ └── compensation.go # 补偿任务:定时扫描重发未完成消息
│
├── tcc/ # 方案二:TCC(Try-Confirm-Cancel)
│ ├── schema.sql # balance_freeze + global_tx_log 建表 DDL
│ ├── coordinator.go # TCC 协调器(含全局事务状态持久化)
│ ├── stock_branch.go # Redis 库存分支(Barrier 保护)
│ ├── balance_branch.go # MySQL 余额分支(Barrier 保护)
│ ├── recovery.go # 协调器宕机恢复任务
│ └── example.go # 使用示例 + BranchRebuilder
│
└── saga/ # 方案三:SAGA(编排式)
├── schema.sql # saga_log 建表 DDL
├── orchestrator.go # SAGA 编排器(含步骤状态持久化)
├── stock_step.go # Redis 库存步骤(Barrier 保护)
├── balance_step.go # MySQL 余额步骤(Barrier 保护)
├── recovery.go # 补偿恢复任务(无限重试 + 告警)
└── example.go # 使用示例 + StepRebuilder
| 依赖 | 版本 | 用途 |
|---|---|---|
| Go | >= 1.21 | 编译运行 |
| Redis | >= 6.0 | 库存存储,支持 Lua 脚本 |
| MySQL | >= 5.7 | 用户余额、事务日志、消息表 |
| MQ(Kafka/RocketMQ) | - | 仅方案一需要,代码中通过接口抽象 |
按需执行所选方案的 SQL 文件:
# 公共表(三个方案都需要)
mysql -u root -p your_db < local_msg_mq/schema.sql # 包含 users 表
# 事务屏障表(TCC 和 SAGA 需要)
mysql -u root -p your_db < barrier/schema.sql
# 方案一:本地消息表
mysql -u root -p your_db < local_msg_mq/schema.sql
# 方案二:TCC
mysql -u root -p your_db < tcc/schema.sql
# 方案三:SAGA
mysql -u root -p your_db < saga/schema.sql| 表名 | 所属方案 | 用途 |
|---|---|---|
users |
公共 | 用户余额 |
local_messages |
本地消息表 | 本地消息持久化 |
branch_tx_status |
TCC / SAGA | 分支事务状态(防悬挂 + 空回滚) |
balance_freeze |
TCC | 余额冻结记录 |
global_tx_log |
TCC | 全局事务日志(协调器宕机恢复) |
saga_log |
SAGA | SAGA 事务日志(编排器宕机恢复) |
一致性级别:最终一致性
┌─────────┐ ① Lua 原子扣库存 ┌─────────┐
│ Redis │ ◄────────────────── │ 调用方 │
└─────────┘ └────┬────┘
│ ② MySQL 事务:写 local_messages
▼
┌───────────┐ ③ 异步投递 ┌─────┐
│ MySQL │ ─────────────→ │ MQ │
└───────────┘ └──┬──┘
│ ④ 消费
▼
┌───────────┐
│ 消费者 │
│ 幂等更新 │
│ 用户余额 │
└───────────┘
- Redis Lua 原子扣库存 —
stock_service.go - MySQL 本地事务写消息 — 与扣库存在同一个请求中,失败则回滚 Redis
- 异步 MQ 投递 — 最佳努力,失败不影响主流程
- MQ 消费者幂等处理 —
consumer.go,单事务内更新余额 + 标记消息完成 - 补偿任务兜底 —
compensation.go,定时扫描 5 分钟内未完成的消息重发
svc := local_msg_mq.NewStockService(rdb, db, mqProducer)
err := svc.DeductStockAndReward(ctx, &common.DeductRewardRequest{
BizID: "order_12345",
UserID: 100,
Num: 10,
})- 生产端:
biz_idUNIQUE KEY 防止重复写入消息 - 消费端:检查
local_messages.status,已完成则跳过
一致性级别:准强一致性
┌──────────────────┐
│ TCC 协调器 │
│ (状态持久化到 │
│ global_tx_log) │
└───┬──────────┬───┘
Try │ │ Try
┌───────────────┘ └───────────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Redis 库存分支 │ │ MySQL 余额分支 │
│ │ │ │
│ Try: │ │ Try: │
│ DECRBY 扣库存 │ │ INSERT 冻结记录 │
│ HSET 写冻结 │ │ status=trying │
│ │ │ │
│ Confirm: │ │ Confirm: │
│ HDEL 删冻结 │ │ 余额 += num │
│ │ │ status=confirmed│
│ Cancel: │ │ │
│ INCRBY 恢复 │ │ Cancel: │
│ HDEL 删冻结 │ │ status=cancelled│
└───────────────┘ └───────────────┘
- Try 阶段:预留资源(Redis 冻结库存 + MySQL 插入冻结记录),不做最终变更
- Confirm 阶段:所有 Try 成功后,正式提交(删除 Redis 冻结、MySQL 余额 += num)
- Cancel 阶段:任一 Try 失败,反序回滚已预留的资源
协调器在每个阶段转换时将状态写入 global_tx_log 表:
| 阶段 | phase 值 | 含义 |
|---|---|---|
| Try 中 | trying |
正在逐个调用分支 Try |
| Confirm 中 | confirming |
所有 Try 成功,正在逐个 Confirm |
| Cancel 中 | cancelling |
某个 Try 失败,正在逐个 Cancel |
| 最终成功 | committed |
所有 Confirm 完成 |
| 最终回滚 | rolled_back |
所有 Cancel 完成 |
| 恢复失败 | failed |
超过最大重试次数 |
RecoveryTask 定时扫描卡在中间态超过 1 分钟的事务:
- trying / cancelling → Cancel 所有分支(Barrier 处理空回滚)
- confirming → Confirm 所有分支(Barrier 处理幂等)
// 执行 TCC 事务
err := tcc.ExecuteRewardTCC(ctx, rdb, db, &common.DeductRewardRequest{
BizID: "order_12345",
UserID: 100,
Num: 10,
})
// 启动恢复任务(应用启动时)
recovery := tcc.NewRecoveryTask(db, alerter)
recovery.RegisterRebuilder(tcc.BizTypeReward, &tcc.RewardBranchRebuilder{RDB: rdb, DB: db})
go func() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
recovery.Run(ctx)
}
}()一致性级别:最终一致性
┌──────────────────┐
│ SAGA 编排器 │
│ (状态持久化到 │
│ saga_log) │
└───┬──────────┬───┘
Forward │ │ Forward
┌───────────────┘ └───────────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Redis 库存步骤 │ │ MySQL 余额步骤 │
│ │ │ │
│ Forward: │ │ Forward: │
│ DECRBY 扣库存 │ │ 余额 += num │
│ (直接生效) │ │ (直接生效) │
│ │ │ │
│ Compensate: │ │ Compensate: │
│ INCRBY 恢复 │ │ 余额 -= num │
│ (语义回滚) │ │ (语义回滚) │
└───────────────┘ └───────────────┘
- Forward:直接提交业务操作(存在中间状态)
- Compensate:通过反向操作实现语义回滚(非数据库 Rollback)
编排器将执行状态持久化到 saga_log 表:
| 阶段 | phase 值 | 含义 |
|---|---|---|
| 执行中 | executing |
正在逐步执行 Forward |
| 补偿中 | compensating |
某步 Forward 失败,正在补偿 |
| 全部成功 | completed |
所有 Forward 完成 |
| 补偿完成 | compensated |
所有补偿完成 |
与 TCC 恢复的关键区别:SAGA 补偿永不放弃。 因为 Forward 已直接提交(无冻结机制),补偿是恢复一致性的唯一手段。RecoveryTask 无限重试,超过告警阈值(默认 3 次)后每次重试都触发告警。
// 执行 SAGA 事务
err := saga.ExecuteRewardSAGA(ctx, rdb, db, &common.DeductRewardRequest{
BizID: "order_12345",
UserID: 100,
Num: 10,
})
// 启动恢复任务(应用启动时)
recovery := saga.NewRecoveryTask(db, alerter)
recovery.RegisterRebuilder(saga.BizTypeReward, &saga.RewardStepRebuilder{RDB: rdb, DB: db})
go func() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
recovery.Run(ctx)
}
}()TCC 和 SAGA 共用 barrier/barrier.go,通过 branch_tx_status 表的 SELECT ... FOR UPDATE 行级锁解决两个经典问题:
问题:网络乱序导致 Cancel/Compensate 比 Try/Forward 先到达,Try/Forward 后执行会导致资源泄漏。
解决:
Cancel 先到 → 插入 status="cancelled"
Try 后到 → 发现 status="cancelled" → 拒绝执行 → 返回 ErrSuspension
问题:Try/Forward 从未执行(超时或网络丢失),协调器/编排器直接触发 Cancel/Compensate,实际无需回滚。
解决:
Cancel 到达 → 查无记录(Try 未执行)→ 仅插入 status="cancelled" → 不执行业务回滚
| 方法 | 正常执行 | 防悬挂 | 空回滚 | 幂等 |
|---|---|---|---|---|
TCCTry |
插入 trying → 执行业务 | 发现 cancelled → 拒绝 | - | 发现 trying → 跳过 |
TCCConfirm |
trying → confirmed → 执行业务 | - | - | 非 trying → 跳过 |
TCCCancel |
trying → cancelled → 执行业务 | - | 无记录 → 插入 cancelled | cancelled → 跳过 |
SAGAForward |
插入 forwarded → 执行业务 | 发现 compensated → 拒绝 | - | 发现 forwarded → 跳过 |
SAGACompensate |
forwarded → compensated → 执行业务 | - | 无记录 → 插入 compensated | compensated → 跳过 |
| 维度 | 本地消息表 + MQ | TCC | SAGA |
|---|---|---|---|
| 一致性级别 | 最终一致性 | 准强一致性 | 最终一致性 |
| 中间状态 | 有(异步窗口期) | 无(冻结隔离) | 有(Forward 直接提交) |
| 实现复杂度 | ★★☆☆ | ★★★★ | ★★★☆ |
| 性能开销 | 低(异步解耦) | 高(3 阶段 + 冻结锁) | 中(补偿开销) |
| 额外依赖 | MQ(Kafka/RMQ) | 冻结表 + 协调器 | 编排器 + 状态表 |
| 资源锁定 | 无 | 有(Try 到 Confirm 期间) | 无 |
| 防悬挂/空回滚 | 不涉及 | 需要(Barrier) | 需要(Barrier) |
| 宕机恢复 | 补偿任务重发消息 | global_tx_log + RecoveryTask | saga_log + RecoveryTask |
| 恢复策略 | 有限重试,超限告警 | 有限重试,超限标记 failed | 无限重试,超限告警 |
| 适用场景 | 大多数异步业务 | 强一致 + 短事务 | 长事务 + 多步骤编排 |
- 大多数场景优先选「本地消息表 + MQ」— 实现简单、性能好、依赖成熟,能接受秒级延迟即可
- 需要准强一致性时选「TCC」— 如金融转账、库存超卖零容忍场景,但侵入性较高
- 长流程多步骤编排选「SAGA」— 如订单 → 支付 → 物流 → 积分等多服务协调,补偿逻辑清晰
# 1. 克隆并进入项目
cd distributed-tx
# 2. 下载依赖
go mod tidy
# 3. 编译验证
go build ./...
# 4. 初始化数据库(根据选择的方案执行对应 SQL)
# 5. 在业务代码中引用对应包即可三种方案均通过接口抽象支持扩展:
| 方案 | 扩展接口 | 说明 |
|---|---|---|
| 本地消息表 | MQProducer / Alerter |
实现消息投递和告警接口 |
| TCC | TCCBranch + BranchRebuilder |
新增分支实现 + 恢复重建器 |
| SAGA | SAGAStep + StepRebuilder |
新增步骤实现 + 恢复重建器 |
以 TCC 为例,扩展一个新的分支:
// 1. 实现 TCCBranch 接口
type MyNewBranch struct { /* ... */ }
func (b *MyNewBranch) Try(ctx context.Context) error { /* ... */ }
func (b *MyNewBranch) Confirm(ctx context.Context) error { /* ... */ }
func (b *MyNewBranch) Cancel(ctx context.Context) error { /* ... */ }
// 2. 在 BranchRebuilder 中添加重建逻辑
func (r *MyRebuilder) RebuildBranches(xid string, payload []byte) ([]tcc.TCCBranch, error) {
return []tcc.TCCBranch{
NewStockTCCBranch(/* ... */),
&MyNewBranch{/* ... */},
}, nil
}
// 3. 注册到 RecoveryTask
recovery.RegisterRebuilder("my_biz_type", &MyRebuilder{})本项目为学习用途的八股文代码实现,仅供参考。