fix(bub-schedule): route scheduled jobs through live runtime#34
Conversation
|
@frostming Could you please review this? |
|
I understand your intention and don't oppose it, but I think the current implementation is not the best way. Because the function parameters of the scheduler must be serializable, we could use a global variable to achieve this. |
|
The design idea is to maintain a global queue at the channel level, ensuring that the triggered scheduling functions do not introduce side effects. |
| chat_id=chat_id, | ||
| ) | ||
| await framework.process_inbound(payload) | ||
| await ScheduleChannel.enqueue_current(payload) |
There was a problem hiding this comment.
Why not just run process_inbound using the bound framework, rather than synchronizing over a queue?
There was a problem hiding this comment.
Your PR doesn't mention the rationale for why a queue is needed.
There was a problem hiding this comment.
Great question — there are actually two layers to why the queue exists here.
Layer 1: Why the old design was broken (detached framework)
The previous implementation created a detached BubFramework() per reminder run inside the job callback. That detached runtime could execute a turn, but it did not carry the live outbound router / channel-manager binding from the active gateway process. As a result, outbound messages were produced but were not reliably delivered back to the original chat.
This PR fixes that by routing scheduled reminders into the live runtime.
Layer 2: Why the live runtime still uses a queue (not direct call)
Once we route into the live runtime, the persisted APScheduler job still cannot hold a reference to BubFramework as a job argument — it must remain serializable and recoverable across process restarts. The queue bridges this:
- Minimal job payload — the persisted job reconstructs only a small
ChannelMessagedict, no framework reference - Single ingress boundary — the long-lived schedule worker is the only place that calls
framework.process_inbound(), keeping the scheduler callback from becoming a direct runtime execution point - Lifecycle isolation — this separation gives us a clean place to handle errors, backpressure, and graceful shutdown without the scheduler needing to know about runtime internals
In short: the detached framework explains why the old design was broken; the queue explains why the new live-runtime design still needs an intermediate worker rather than a direct call.
There was a problem hiding this comment.
The previous implementation was wrong because it created a detached BubFramework() for each reminder. That detached runtime could run a turn, but it did not have the live outbound router/channel-manager binding from the active gateway process, so replies were not reliably delivered back to the original chat.
This PR first fixes that by routing scheduled reminders into the live runtime. Separately, the queue is the mechanism we use after that: the persisted job reconstructs only a minimal ChannelMessage, while the long-lived schedule worker is the only place that calls framework.process_inbound(). That keeps the job payload serializable and preserves a single ingress boundary for runtime/lifecycle/error handling
There was a problem hiding this comment.
I mean, we could use a global framework stored somewhere so it doesn't need to be passed to the job handler. In the handler we can still run framework.process_inbound.
There is no strong necessity to keep only one place to process the messages. And this would make the diff much smaller.
There was a problem hiding this comment.
I mean, we could use a global
frameworkstored somewhere so it doesn't need to be passed to the job handler. In the handler we can still runframework.process_inbound.There is no strong necessity to keep only one place to process the messages. And this would make the diff much smaller.
A new version has been submitted, which directly invokes the global live framework in run_scheduled_reminder. Does this meet expectations? The previous use of the queue was somewhat redundant.
b8b0e3e to
ac220ce
Compare
CI was failing on async test functions because pytest-asyncio needed explicit markers or auto mode. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
LGTM, thanks for spending time on this |
Summary
This fixes
bub-schedulereminders that executed a turn but failed to reliably deliver the resulting reply back to the original chat.Problem
Before this change,
run_scheduled_reminder()created a brand newBubFramework()on every trigger and calledprocess_inbound()on that detached runtime.That meant the scheduled reminder could:
ChannelMessageBut it did not have the live
ChannelManager/ outbound router that the active gateway process had already bound to its runtime.In practice this caused the scheduled job to look like it ran, while the actual reminder reply was not reliably delivered back to the original Feishu chat.
Fix
This PR changes
bub-scheduleto route scheduled payloads through the live runtime instead of a detached runtime:bub_schedule.plugin:ScheduleImplso the plugin can receive the liveframeworkframeworkintoScheduleChannelScheduleChannelScheduleChannel.enqueue_current(payload)run_scheduled_reminder()build the minimalChannelMessageand enqueue it into the live schedule workerawait framework.process_inbound(payload)so outbound delivery continues through Bub's normal live router pathThis keeps the schedule job payload minimal and avoids persisting live runtime objects into the job store.
Tests
Added coverage for:
framework.process_inbound(payload)executionschedule.triggerbehavior regression coverageLocally verified with:
cd packages/bub-schedule uv run pytest tests/test_scheduled_delivery.py tests/test_trigger.py -qResult:
16 passedManual validation
The fix was also validated end-to-end from
philipby overridingbub-scheduleto the local editable package, starting the gateway, and confirming that scheduled reminder output was delivered back into the original chat.