Skip to content

FoBoHuang/distributed-tx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

distributed-tx:异构数据源分布式事务实战(Go)

业务场景

Redis 扣库存成功 → 发奖(MySQL 用户余额 + num)

这是一个典型的异构数据源分布式事务问题:Redis(库存)和 MySQL(余额)无法参与同一个本地事务。本项目用 Go 语言完整实现了三种主流的分布式事务解决方案,每种方案均包含生产级的容错设计。

客户端请求
    │
    ▼
┌──────────┐     ┌──────────┐
│  Redis   │     │  MySQL   │
│  库存 -N  │ ──→ │ 余额 +N  │
└──────────┘     └──────────┘
  异构数据源,无法共享本地事务

项目结构

distributed-tx/
├── go.mod
│
├── common/                         # 公共模块
│   ├── model.go                    #   DeductRewardRequest 请求模型
│   └── lua.go                      #   Redis Lua 脚本(扣库存、TCC 冻结/恢复)
│
├── barrier/                        # 事务屏障(TCC + SAGA 共用)
│   ├── schema.sql                  #   branch_tx_status 建表 DDL
│   └── barrier.go                  #   BranchBarrier:防悬挂 + 空回滚
│
├── local_msg_mq/                   # 方案一:本地消息表 + 可靠消息队列
│   ├── schema.sql                  #   local_messages + users 建表 DDL
│   ├── model.go                    #   LocalMessage / RewardMessage 模型
│   ├── stock_service.go            #   核心:Redis 扣库存 + MySQL 写本地消息
│   ├── consumer.go                 #   MQ 消费者:幂等更新余额
│   └── compensation.go             #   补偿任务:定时扫描重发未完成消息
│
├── tcc/                            # 方案二:TCC(Try-Confirm-Cancel)
│   ├── schema.sql                  #   balance_freeze + global_tx_log 建表 DDL
│   ├── coordinator.go              #   TCC 协调器(含全局事务状态持久化)
│   ├── stock_branch.go             #   Redis 库存分支(Barrier 保护)
│   ├── balance_branch.go           #   MySQL 余额分支(Barrier 保护)
│   ├── recovery.go                 #   协调器宕机恢复任务
│   └── example.go                  #   使用示例 + BranchRebuilder
│
└── saga/                           # 方案三:SAGA(编排式)
    ├── schema.sql                  #   saga_log 建表 DDL
    ├── orchestrator.go             #   SAGA 编排器(含步骤状态持久化)
    ├── stock_step.go               #   Redis 库存步骤(Barrier 保护)
    ├── balance_step.go             #   MySQL 余额步骤(Barrier 保护)
    ├── recovery.go                 #   补偿恢复任务(无限重试 + 告警)
    └── example.go                  #   使用示例 + StepRebuilder

环境依赖

依赖 版本 用途
Go >= 1.21 编译运行
Redis >= 6.0 库存存储,支持 Lua 脚本
MySQL >= 5.7 用户余额、事务日志、消息表
MQ(Kafka/RocketMQ) - 仅方案一需要,代码中通过接口抽象

数据库初始化

按需执行所选方案的 SQL 文件:

# 公共表(三个方案都需要)
mysql -u root -p your_db < local_msg_mq/schema.sql   # 包含 users 表

# 事务屏障表(TCC 和 SAGA 需要)
mysql -u root -p your_db < barrier/schema.sql

# 方案一:本地消息表
mysql -u root -p your_db < local_msg_mq/schema.sql

# 方案二:TCC
mysql -u root -p your_db < tcc/schema.sql

# 方案三:SAGA
mysql -u root -p your_db < saga/schema.sql

完整表清单

表名 所属方案 用途
users 公共 用户余额
local_messages 本地消息表 本地消息持久化
branch_tx_status TCC / SAGA 分支事务状态(防悬挂 + 空回滚)
balance_freeze TCC 余额冻结记录
global_tx_log TCC 全局事务日志(协调器宕机恢复)
saga_log SAGA SAGA 事务日志(编排器宕机恢复)

方案一:本地消息表 + 可靠消息队列

一致性级别:最终一致性

核心流程

┌─────────┐   ① Lua 原子扣库存   ┌─────────┐
│  Redis   │ ◄──────────────────  │  调用方  │
└─────────┘                      └────┬────┘
                                      │ ② MySQL 事务:写 local_messages
                                      ▼
                                ┌───────────┐   ③ 异步投递   ┌─────┐
                                │   MySQL    │ ─────────────→ │ MQ  │
                                └───────────┘                └──┬──┘
                                                                │ ④ 消费
                                                                ▼
                                                          ┌───────────┐
                                                          │ 消费者     │
                                                          │ 幂等更新   │
                                                          │ 用户余额   │
                                                          └───────────┘
  1. Redis Lua 原子扣库存stock_service.go
  2. MySQL 本地事务写消息 — 与扣库存在同一个请求中,失败则回滚 Redis
  3. 异步 MQ 投递 — 最佳努力,失败不影响主流程
  4. MQ 消费者幂等处理consumer.go,单事务内更新余额 + 标记消息完成
  5. 补偿任务兜底compensation.go,定时扫描 5 分钟内未完成的消息重发

使用示例

svc := local_msg_mq.NewStockService(rdb, db, mqProducer)

err := svc.DeductStockAndReward(ctx, &common.DeductRewardRequest{
    BizID:  "order_12345",
    UserID: 100,
    Num:    10,
})

幂等保障

  • 生产端:biz_id UNIQUE KEY 防止重复写入消息
  • 消费端:检查 local_messages.status,已完成则跳过

方案二:TCC(Try-Confirm-Cancel)

一致性级别:准强一致性

核心流程

                    ┌──────────────────┐
                    │   TCC 协调器      │
                    │  (状态持久化到     │
                    │  global_tx_log)   │
                    └───┬──────────┬───┘
           Try          │          │          Try
        ┌───────────────┘          └───────────────┐
        ▼                                          ▼
┌───────────────┐                          ┌───────────────┐
│ Redis 库存分支  │                          │ MySQL 余额分支  │
│               │                          │               │
│ Try:          │                          │ Try:          │
│  DECRBY 扣库存 │                          │  INSERT 冻结记录 │
│  HSET 写冻结   │                          │  status=trying │
│               │                          │               │
│ Confirm:      │                          │ Confirm:      │
│  HDEL 删冻结   │                          │  余额 += num   │
│               │                          │  status=confirmed│
│ Cancel:       │                          │               │
│  INCRBY 恢复   │                          │ Cancel:       │
│  HDEL 删冻结   │                          │  status=cancelled│
└───────────────┘                          └───────────────┘
  • Try 阶段:预留资源(Redis 冻结库存 + MySQL 插入冻结记录),不做最终变更
  • Confirm 阶段:所有 Try 成功后,正式提交(删除 Redis 冻结、MySQL 余额 += num)
  • Cancel 阶段:任一 Try 失败,反序回滚已预留的资源

协调器宕机恢复

协调器在每个阶段转换时将状态写入 global_tx_log 表:

阶段 phase 值 含义
Try 中 trying 正在逐个调用分支 Try
Confirm 中 confirming 所有 Try 成功,正在逐个 Confirm
Cancel 中 cancelling 某个 Try 失败,正在逐个 Cancel
最终成功 committed 所有 Confirm 完成
最终回滚 rolled_back 所有 Cancel 完成
恢复失败 failed 超过最大重试次数

RecoveryTask 定时扫描卡在中间态超过 1 分钟的事务:

  • trying / cancelling → Cancel 所有分支(Barrier 处理空回滚)
  • confirming → Confirm 所有分支(Barrier 处理幂等)

使用示例

// 执行 TCC 事务
err := tcc.ExecuteRewardTCC(ctx, rdb, db, &common.DeductRewardRequest{
    BizID:  "order_12345",
    UserID: 100,
    Num:    10,
})

// 启动恢复任务(应用启动时)
recovery := tcc.NewRecoveryTask(db, alerter)
recovery.RegisterRebuilder(tcc.BizTypeReward, &tcc.RewardBranchRebuilder{RDB: rdb, DB: db})

go func() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        recovery.Run(ctx)
    }
}()

方案三:SAGA(编排式)

一致性级别:最终一致性

核心流程

                    ┌──────────────────┐
                    │  SAGA 编排器      │
                    │  (状态持久化到     │
                    │   saga_log)       │
                    └───┬──────────┬───┘
        Forward         │          │         Forward
        ┌───────────────┘          └───────────────┐
        ▼                                          ▼
┌───────────────┐                          ┌───────────────┐
│ Redis 库存步骤  │                          │ MySQL 余额步骤  │
│               │                          │               │
│ Forward:      │                          │ Forward:      │
│  DECRBY 扣库存 │                          │  余额 += num   │
│  (直接生效)    │                          │  (直接生效)    │
│               │                          │               │
│ Compensate:   │                          │ Compensate:   │
│  INCRBY 恢复   │                          │  余额 -= num   │
│  (语义回滚)    │                          │  (语义回滚)    │
└───────────────┘                          └───────────────┘
  • Forward:直接提交业务操作(存在中间状态)
  • Compensate:通过反向操作实现语义回滚(非数据库 Rollback)

编排器宕机恢复

编排器将执行状态持久化到 saga_log 表:

阶段 phase 值 含义
执行中 executing 正在逐步执行 Forward
补偿中 compensating 某步 Forward 失败,正在补偿
全部成功 completed 所有 Forward 完成
补偿完成 compensated 所有补偿完成

与 TCC 恢复的关键区别:SAGA 补偿永不放弃。 因为 Forward 已直接提交(无冻结机制),补偿是恢复一致性的唯一手段。RecoveryTask 无限重试,超过告警阈值(默认 3 次)后每次重试都触发告警。

使用示例

// 执行 SAGA 事务
err := saga.ExecuteRewardSAGA(ctx, rdb, db, &common.DeductRewardRequest{
    BizID:  "order_12345",
    UserID: 100,
    Num:    10,
})

// 启动恢复任务(应用启动时)
recovery := saga.NewRecoveryTask(db, alerter)
recovery.RegisterRebuilder(saga.BizTypeReward, &saga.RewardStepRebuilder{RDB: rdb, DB: db})

go func() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        recovery.Run(ctx)
    }
}()

事务屏障(BranchBarrier)

TCC 和 SAGA 共用 barrier/barrier.go,通过 branch_tx_status 表的 SELECT ... FOR UPDATE 行级锁解决两个经典问题:

防悬挂

问题:网络乱序导致 Cancel/Compensate 比 Try/Forward 先到达,Try/Forward 后执行会导致资源泄漏。

解决

Cancel 先到 → 插入 status="cancelled"
Try 后到   → 发现 status="cancelled" → 拒绝执行 → 返回 ErrSuspension

空回滚

问题:Try/Forward 从未执行(超时或网络丢失),协调器/编排器直接触发 Cancel/Compensate,实际无需回滚。

解决

Cancel 到达 → 查无记录(Try 未执行)→ 仅插入 status="cancelled" → 不执行业务回滚

Barrier 支持的完整操作

方法 正常执行 防悬挂 空回滚 幂等
TCCTry 插入 trying → 执行业务 发现 cancelled → 拒绝 - 发现 trying → 跳过
TCCConfirm trying → confirmed → 执行业务 - - 非 trying → 跳过
TCCCancel trying → cancelled → 执行业务 - 无记录 → 插入 cancelled cancelled → 跳过
SAGAForward 插入 forwarded → 执行业务 发现 compensated → 拒绝 - 发现 forwarded → 跳过
SAGACompensate forwarded → compensated → 执行业务 - 无记录 → 插入 compensated compensated → 跳过

三种方案对比

维度 本地消息表 + MQ TCC SAGA
一致性级别 最终一致性 准强一致性 最终一致性
中间状态 有(异步窗口期) 无(冻结隔离) 有(Forward 直接提交)
实现复杂度 ★★☆☆ ★★★★ ★★★☆
性能开销 低(异步解耦) 高(3 阶段 + 冻结锁) 中(补偿开销)
额外依赖 MQ(Kafka/RMQ) 冻结表 + 协调器 编排器 + 状态表
资源锁定 有(Try 到 Confirm 期间)
防悬挂/空回滚 不涉及 需要(Barrier) 需要(Barrier)
宕机恢复 补偿任务重发消息 global_tx_log + RecoveryTask saga_log + RecoveryTask
恢复策略 有限重试,超限告警 有限重试,超限标记 failed 无限重试,超限告警
适用场景 大多数异步业务 强一致 + 短事务 长事务 + 多步骤编排

选型建议

  • 大多数场景优先选「本地消息表 + MQ」— 实现简单、性能好、依赖成熟,能接受秒级延迟即可
  • 需要准强一致性时选「TCC」— 如金融转账、库存超卖零容忍场景,但侵入性较高
  • 长流程多步骤编排选「SAGA」— 如订单 → 支付 → 物流 → 积分等多服务协调,补偿逻辑清晰

快速开始

# 1. 克隆并进入项目
cd distributed-tx

# 2. 下载依赖
go mod tidy

# 3. 编译验证
go build ./...

# 4. 初始化数据库(根据选择的方案执行对应 SQL)
# 5. 在业务代码中引用对应包即可

扩展新的业务场景

三种方案均通过接口抽象支持扩展:

方案 扩展接口 说明
本地消息表 MQProducer / Alerter 实现消息投递和告警接口
TCC TCCBranch + BranchRebuilder 新增分支实现 + 恢复重建器
SAGA SAGAStep + StepRebuilder 新增步骤实现 + 恢复重建器

以 TCC 为例,扩展一个新的分支:

// 1. 实现 TCCBranch 接口
type MyNewBranch struct { /* ... */ }
func (b *MyNewBranch) Try(ctx context.Context) error     { /* ... */ }
func (b *MyNewBranch) Confirm(ctx context.Context) error  { /* ... */ }
func (b *MyNewBranch) Cancel(ctx context.Context) error   { /* ... */ }

// 2. 在 BranchRebuilder 中添加重建逻辑
func (r *MyRebuilder) RebuildBranches(xid string, payload []byte) ([]tcc.TCCBranch, error) {
    return []tcc.TCCBranch{
        NewStockTCCBranch(/* ... */),
        &MyNewBranch{/* ... */},
    }, nil
}

// 3. 注册到 RecoveryTask
recovery.RegisterRebuilder("my_biz_type", &MyRebuilder{})

License

本项目为学习用途的八股文代码实现,仅供参考。

About

分布式事务方案(Redis扣减库存,用户余额增加)

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages