-
Notifications
You must be signed in to change notification settings - Fork 8
Open
Description
队列取任务支持最小等待时间过滤(minAgeSeconds)
背景
当前队列取任务(take)操作会返回任意可用任务,不考虑任务在队列中等待了多长时间。某些业务场景需要任务在队列中"沉淀"一段时间后才能被调度,例如:
- 给上游留出取消窗口期
- 定时批处理场景,任务需积压到一定时间后统一处理
- 防止任务刚入队就被立即消费
需求描述
在 Take 请求中新增 minAgeSeconds 参数。当该值为正数时,只有入队时间距当前时刻超过
minAgeSeconds 秒的任务才会被取出。默认值为 0,表示不限制,保持现有行为不变。
接口示例:
```json
{
"queues": ["my-queue:1"],
"size": 10,
"strategy": "fifo",
"minAgeSeconds": 30
}
```
取队列中入队超过 30 秒的任务,未到 30 秒的任务继续留在队列中等待。
实现方案
策略层:四种策略均透传 minAgeSeconds
| 策略 | 过滤方式 |
|---|---|
active_passive |
RedisBlockingQueue.poll(minAgeSeconds) |
round_robin |
RedisBlockingQueue.poll(minAgeSeconds) |
fifo |
Lua 脚本:候选任务筛选从 ZRANGE 改为 ZRANGEBYSCORE -inf {maxScore} LIMIT 0 1 |
sequential |
Lua 脚本:ZPOPMIN 前增加 ZRANGEBYSCORE 检查 |
Redis 层(RedisBlockingQueue)
新增 poll(long minAgeSeconds) 重载,计算 maxScore = now - minAgeSeconds * 1000
作为 ARGV[2] 传入 Lua 脚本;minAgeSeconds <= 0 时传空字符串,Lua 跳过过滤。
Lua 脚本变更
offline/dequeue.lua/online/dequeue.lua:已支持可选ARGV[2]=maxScorefifo.lua:新增ARGV[3]=maxScore,候选队列头部筛选改用ZRANGEBYSCOREsequential.lua:新增ARGV[1]=maxScore,ZPOPMIN前增加ZRANGEBYSCORE过滤
兼容性
minAgeSeconds 不传或传 0 时行为与原来完全一致,无破坏性变更。
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels