Skip to content

take方法,增加根据数据年龄获取数据的策略 #41

@muverystrong

Description

@muverystrong

队列取任务支持最小等待时间过滤(minAgeSeconds)

背景

当前队列取任务(take)操作会返回任意可用任务,不考虑任务在队列中等待了多长时间。某些业务场景需要任务在队列中"沉淀"一段时间后才能被调度,例如:

  • 给上游留出取消窗口期
  • 定时批处理场景,任务需积压到一定时间后统一处理
  • 防止任务刚入队就被立即消费

需求描述

Take 请求中新增 minAgeSeconds 参数。当该值为正数时,只有入队时间距当前时刻超过
minAgeSeconds 秒的任务才会被取出。默认值为 0,表示不限制,保持现有行为不变。

接口示例:

```json
{
"queues": ["my-queue:1"],
"size": 10,
"strategy": "fifo",
"minAgeSeconds": 30
}
```

取队列中入队超过 30 秒的任务,未到 30 秒的任务继续留在队列中等待。

实现方案

策略层:四种策略均透传 minAgeSeconds

策略 过滤方式
active_passive RedisBlockingQueue.poll(minAgeSeconds)
round_robin RedisBlockingQueue.poll(minAgeSeconds)
fifo Lua 脚本:候选任务筛选从 ZRANGE 改为 ZRANGEBYSCORE -inf {maxScore} LIMIT 0 1
sequential Lua 脚本:ZPOPMIN 前增加 ZRANGEBYSCORE 检查

Redis 层RedisBlockingQueue

新增 poll(long minAgeSeconds) 重载,计算 maxScore = now - minAgeSeconds * 1000
作为 ARGV[2] 传入 Lua 脚本;minAgeSeconds <= 0 时传空字符串,Lua 跳过过滤。

Lua 脚本变更

  • offline/dequeue.lua / online/dequeue.lua:已支持可选 ARGV[2]=maxScore
  • fifo.lua:新增 ARGV[3]=maxScore,候选队列头部筛选改用 ZRANGEBYSCORE
  • sequential.lua:新增 ARGV[1]=maxScoreZPOPMIN 前增加 ZRANGEBYSCORE 过滤

兼容性

minAgeSeconds 不传或传 0 时行为与原来完全一致,无破坏性变更。

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions