fix: avoid false distributor 503 on stale channel cache#3402
fix: avoid false distributor 503 on stale channel cache#3402MisonL wants to merge 6 commits intoQuantumNous:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR consolidates channel and ability persistence into transactions, adds atomic coordination and async refresh for channel cache rebuilds, derives cache from enabled Ability records, improves cache-miss fallbacks to DB, tightens ChannelInfo.Scan type handling, and extends tests for these behaviors. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant Cache as Channel Cache
participant DB as Database
participant Lock as Rebuild Lock
Caller->>Cache: UpdateChannelStatus(id, status)
rect rgba(100,150,200,0.5)
Note over Cache,DB: New transactional flow
Cache->>DB: Begin Transaction
DB->>DB: Save Channel (tx.Omit("key").Save)
alt shouldUpdateAbilities
DB->>DB: Update Ability.enabled rows (WHERE channel_id)
end
DB-->>Cache: Commit/Abort
alt Commit
Cache->>Lock: InitChannelCache() (if MemoryCacheEnabled)
Lock->>Cache: Acquire CAS guard, rebuild snapshot
Cache->>DB: Read Ability rows -> build cache
Lock->>Cache: Release
Cache-->>Caller: return true
else Abort
Cache-->>Caller: return false
end
end
sequenceDiagram
autonumber
participant Client as Client
participant Cache as Channel Cache
participant DB as Database
participant Atomic as Refresh Flag
Client->>Cache: GetRandomSatisfiedChannel(group, model)
Cache->>Cache: Acquire RLock
alt Cache hit
Cache->>Cache: getRandomSatisfiedChannelFromCache (returns channel, cacheHit)
Cache->>Cache: RUnlock
Client-->>Cache: return channel
else Cache miss or error
Cache->>Cache: RUnlock
Cache->>DB: GetChannel fallback (DB query)
alt DB returns channel
Client-->>Cache: return channel
Cache->>Atomic: requestChannelCacheRefreshAsync (set pending/CAS)
else DB no channel
Cache-->>Client: return "no channel found" error
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model/channel_cache_test.go`:
- Around line 156-167: The test uses a SQLite-specific raw INSERT
(backtick-quoted `group` and invalid JSON "{") so it won't fail the same way on
MySQL/Postgres; update the DB.Exec in channel_cache_test.go to use the shared
identifier variables (commonGroupCol and commonKeyCol) instead of backticks and
replace the hardcoded invalid JSON with a backend-agnostic value that forces
ChannelInfo.Scan to fail (e.g., NULL or a type-mismatched value) so
InitChannelCache() exercises the Scan error path across SQLite/MySQL/Postgres;
keep the same values for id, type, key, status, name, models but ensure the SQL
and bound parameters are compatible with all supported backends.
In `@model/channel_cache.go`:
- Line 20: The cache rebuild gating is incomplete: channelCacheRefreshInFlight
only protects one async path while SyncChannelCache() and UpdateChannelStatus()
call InitChannelCache() directly and InitChannelCache() only takes
channelSyncLock at publish time, allowing stale builds to overwrite newer
snapshots. Fix by moving the in-flight coordination into the single shared
refresh entry (e.g., create or update the existing RefreshChannelCache()
wrapper) and have SyncChannelCache() and UpdateChannelStatus() call that shared
refresh path instead of calling InitChannelCache() directly; alternatively,
ensure InitChannelCache() itself acquires channelCacheRefreshInFlight (or the
same gate) for the entire DB read/build and also holds channelSyncLock
appropriately so no older rebuild can finish after a newer publish (reference
symbols: channelCacheRefreshInFlight, SyncChannelCache, UpdateChannelStatus,
InitChannelCache, channelSyncLock).
In `@model/channel_satisfy.go`:
- Around line 18-19: The DB fallback currently invoked from the cache miss (the
call to isChannelEnabledForGroupModelDB(group, modelName, channelID) after
channelSyncLock.RUnlock()) only checks abilities.enabled; update the DB query
inside isChannelEnabledForGroupModelDB to include the channel status by joining
the channels table (or adding channels.enabled = true) so the fallback enforces
the same "channel row is enabled" rule that InitChannelCache() uses; apply the
same change to the other similar fallback query referenced in the file (the
second occurrence around the other cache miss) so both DB paths require
channels.enabled && abilities.enabled.
In `@model/channel.go`:
- Around line 666-674: The code currently continues as success when
UpdateAbilityStatus(channelId, status == common.ChannelStatusEnabled) fails,
then calls InitChannelCache(), causing cache rebuild from a partial update and
possible channels/abilities desync; change the flow in the block that checks
shouldUpdateAbilities so that if UpdateAbilityStatus returns an error you do not
proceed to success or call InitChannelCache — instead return or propagate the
error (or perform a rollback of the channel row save) so channel and ability
updates are atomic; specifically ensure the code around UpdateAbilityStatus,
shouldUpdateAbilities, channelId and status either aborts/returns the error or
rolls back before calling InitChannelCache when common.MemoryCacheEnabled.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: db00f116-864b-48a8-ba33-cd6325bd11f7
📒 Files selected for processing (4)
model/channel.gomodel/channel_cache.gomodel/channel_cache_test.gomodel/channel_satisfy.go
There was a problem hiding this comment.
🧹 Nitpick comments (2)
model/channel_cache.go (2)
132-147: Consider settingpendingbefore attemptingCompareAndSwapfor consistency.Unlike
InitChannelCache(line 28), this function doesn't setchannelCacheRefreshPendingbefore checkingCompareAndSwap. If a refresh is already in flight, the request is silently dropped rather than queued for the next loop iteration. This inconsistency means async refresh requests during an active refresh may not trigger a follow-up rebuild.Since the periodic
SyncChannelCachewill eventually catch up, this is low risk, but aligning the behavior would be more robust.♻️ Suggested fix
func requestChannelCacheRefreshAsync() { if !common.MemoryCacheEnabled { return } + channelCacheRefreshPending.Store(true) if !channelCacheRefreshInFlight.CompareAndSwap(false, true) { return } go func() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model/channel_cache.go` around lines 132 - 147, The async refresh path in requestChannelCacheRefreshAsync currently only checks channelCacheRefreshInFlight.CompareAndSwap and drops requests if a refresh is already running; to match InitChannelCache behavior and ensure follow-up rebuilds are scheduled, set channelCacheRefreshPending (use its Store/atomic setter) to true before attempting the CompareAndSwap on channelCacheRefreshInFlight so the running loop will see the pending flag and perform another rebuild; reference requestChannelCacheRefreshAsync, channelCacheRefreshPending, channelCacheRefreshInFlight, runChannelCacheRefreshLoop (and InitChannelCache for the desired behavior) and ensure any existing logic that clears channelCacheRefreshPending inside runChannelCacheRefreshLoop remains correct.
204-204: Usefmt.Errorfinstead oferrors.New(fmt.Sprintf(...)).The idiomatic Go pattern for formatted error messages is
fmt.Errorf.♻️ Suggested fix
- return nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)), true + return nil, fmt.Errorf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority), true🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model/channel_cache.go` at line 204, Replace the non-idiomatic errors.New(fmt.Sprintf(...)) return with fmt.Errorf(...) at the return that currently uses variables group, model and targetPriority (the line returning nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)), true). Use fmt.Errorf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority) and add an import for "fmt" if it's not already imported in channel_cache.go.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@model/channel_cache.go`:
- Around line 132-147: The async refresh path in requestChannelCacheRefreshAsync
currently only checks channelCacheRefreshInFlight.CompareAndSwap and drops
requests if a refresh is already running; to match InitChannelCache behavior and
ensure follow-up rebuilds are scheduled, set channelCacheRefreshPending (use its
Store/atomic setter) to true before attempting the CompareAndSwap on
channelCacheRefreshInFlight so the running loop will see the pending flag and
perform another rebuild; reference requestChannelCacheRefreshAsync,
channelCacheRefreshPending, channelCacheRefreshInFlight,
runChannelCacheRefreshLoop (and InitChannelCache for the desired behavior) and
ensure any existing logic that clears channelCacheRefreshPending inside
runChannelCacheRefreshLoop remains correct.
- Line 204: Replace the non-idiomatic errors.New(fmt.Sprintf(...)) return with
fmt.Errorf(...) at the return that currently uses variables group, model and
targetPriority (the line returning nil, errors.New(fmt.Sprintf("no channel
found, group: %s, model: %s, priority: %d", group, model, targetPriority)),
true). Use fmt.Errorf("no channel found, group: %s, model: %s, priority: %d",
group, model, targetPriority) and add an import for "fmt" if it's not already
imported in channel_cache.go.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d19a2ffc-edd7-40d4-906c-279c707f706b
📒 Files selected for processing (4)
model/channel.gomodel/channel_cache.gomodel/channel_cache_test.gomodel/channel_satisfy.go
✅ Files skipped from review due to trivial changes (1)
- model/channel_cache_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- model/channel.go
8067a66 to
c2d6870
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
model/channel.go (1)
685-687: SynchronousInitChannelCache()may cause latency spikes under concurrent status updates.Per
model/channel_cache.golines 24-36,InitChannelCache()either runs the full cache rebuild synchronously or busy-waits (10ms sleeps) until an in-flight rebuild completes. Call sites inservice/channel.go(DisableChannelByError, EnableChannel) andrelay/mjproxy_handler.gowill block on every status change.Under load (e.g., multiple channels failing simultaneously), this can cause cascading request latency as each caller waits for the cache rebuild. Consider whether the cache refresh could be made fully asynchronous (fire-and-forget with coalescing) to avoid blocking the status-update callers, accepting slightly stale reads until the rebuild completes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model/channel.go` around lines 685 - 687, The synchronous InitChannelCache() causes blocking on status updates; change it to a non-blocking, fire-and-forget refresh with coalescing: add a new async entrypoint (e.g., InitChannelCacheAsync or EnqueueChannelCacheRefresh) that returns immediately and schedules a single background rebuild (use a package-level goroutine, mutex/atomic flag or a single buffered channel to coalesce concurrent requests and avoid duplicate rebuilds), update call sites (DisableChannelByError, EnableChannel and the call in relay/mjproxy_handler.go) to call the new async enqueuer instead of invoking InitChannelCache() directly, and keep the original InitChannelCache implementation intact for synchronous use/testing if needed.model/channel_cache_test.go (1)
12-24: Missing reset forchannelCacheRefreshPending.The helper resets
channelCacheRefreshInFlightbut notchannelCacheRefreshPending. Per the package-level variables inmodel/channel_cache.go(lines 17-21), both flags coordinate cache rebuilds. A stalepending=truefrom a prior test could causeInitChannelCache()to trigger an extra rebuild loop, potentially making tests flaky.🔧 Proposed fix
channelSyncLock.Unlock() channelCacheRefreshInFlight.Store(false) + channelCacheRefreshPending.Store(false) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model/channel_cache_test.go` around lines 12 - 24, prepareChannelCacheTest resets channelCacheRefreshInFlight but not channelCacheRefreshPending, leaving a stale pending=true that can trigger extra cache rebuilds; update the prepareChannelCacheTest helper to also clear the pending flag by calling channelCacheRefreshPending.Store(false) (or the appropriate reset method) in the same place where channelCacheRefreshInFlight is reset so both coordination flags are cleared before each test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model/channel_cache_test.go`:
- Around line 131-180: The test
TestInitChannelCacheKeepsPreviousSnapshotOnScanError is inserting valid JSON
into channel_info so ChannelInfo.Scan() will not error, meaning
buildChannelCacheSnapshot() won't exercise the scan-error path; update the
inserted channel_info payload to be invalid/corrupt (e.g., "{invalid") so
ChannelInfo.Scan() (which calls common.Unmarshal()) returns an error and the
snapshot-preservation behavior is tested, or if you intend to test exclusion due
to missing abilities instead, rename the test to reflect that behavior and keep
valid JSON; check channelsIDM and group2model2channels assertions remain correct
after the change.
---
Nitpick comments:
In `@model/channel_cache_test.go`:
- Around line 12-24: prepareChannelCacheTest resets channelCacheRefreshInFlight
but not channelCacheRefreshPending, leaving a stale pending=true that can
trigger extra cache rebuilds; update the prepareChannelCacheTest helper to also
clear the pending flag by calling channelCacheRefreshPending.Store(false) (or
the appropriate reset method) in the same place where
channelCacheRefreshInFlight is reset so both coordination flags are cleared
before each test.
In `@model/channel.go`:
- Around line 685-687: The synchronous InitChannelCache() causes blocking on
status updates; change it to a non-blocking, fire-and-forget refresh with
coalescing: add a new async entrypoint (e.g., InitChannelCacheAsync or
EnqueueChannelCacheRefresh) that returns immediately and schedules a single
background rebuild (use a package-level goroutine, mutex/atomic flag or a single
buffered channel to coalesce concurrent requests and avoid duplicate rebuilds),
update call sites (DisableChannelByError, EnableChannel and the call in
relay/mjproxy_handler.go) to call the new async enqueuer instead of invoking
InitChannelCache() directly, and keep the original InitChannelCache
implementation intact for synchronous use/testing if needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4601f80d-7244-4460-8630-1f817b6de027
📒 Files selected for processing (4)
model/channel.gomodel/channel_cache.gomodel/channel_cache_test.gomodel/channel_satisfy.go
🚧 Files skipped from review as they are similar to previous changes (2)
- model/channel_satisfy.go
- model/channel_cache.go
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
model/channel_cache_test.go (1)
16-17: Assert cleanup SQL errors in test setup.Ignoring
DB.Exec(...).Errorcan mask fixture setup failures and produce misleading test results.🔧 Proposed fix
- DB.Exec("DELETE FROM abilities") - DB.Exec("DELETE FROM channels") + require.NoError(t, DB.Exec("DELETE FROM abilities").Error) + require.NoError(t, DB.Exec("DELETE FROM channels").Error)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model/channel_cache_test.go` around lines 16 - 17, The test setup is ignoring SQL errors from the cleanup calls; update the two DB.Exec("DELETE FROM abilities") and DB.Exec("DELETE FROM channels") invocations in channel_cache_test.go to capture their returned result and assert no error (e.g., check result.Error and call t.Fatalf or use require.NoError(t, result.Error)) so any fixture cleanup failure fails the test immediately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model/channel_cache_test.go`:
- Around line 12-24: prepareChannelCacheTest resets channelCacheRefreshInFlight
but omits resetting channelCacheRefreshPending, which can leak state between
tests; update prepareChannelCacheTest to also clear channelCacheRefreshPending
(similar to channelCacheRefreshInFlight) so the pending flag is reset before
each test run, keeping test state isolated from previous runs (see
prepareChannelCacheTest, channelCacheRefreshPending,
channelCacheRefreshInFlight).
---
Nitpick comments:
In `@model/channel_cache_test.go`:
- Around line 16-17: The test setup is ignoring SQL errors from the cleanup
calls; update the two DB.Exec("DELETE FROM abilities") and DB.Exec("DELETE FROM
channels") invocations in channel_cache_test.go to capture their returned result
and assert no error (e.g., check result.Error and call t.Fatalf or use
require.NoError(t, result.Error)) so any fixture cleanup failure fails the test
immediately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 08d5c012-c212-4def-b114-cdd8fded33e9
📒 Files selected for processing (1)
model/channel_cache_test.go
📝 变更描述 / Description
503 No available channel ... (distributor)的问题。channels与abilities双来源不一致。🚀 变更类型 / Type of change
🔗 关联任务 / Related Issue
✅ 提交前检查项 / Checklist
📸 运行证明 / Proof of Work
go test ./model -run 'Test(GetRandomSatisfiedChannelFallsBackToDatabaseOnCacheMiss|UpdateChannelStatusRefreshesMemoryCacheAfterEnable|IsChannelEnabledForGroupModelFallsBackToDatabaseOnCacheMiss|InitChannelCacheKeepsPreviousSnapshotOnScanError|ChannelInfoScanSupportsStringValue|UpdateWithStatus_.*|Snapshot.*)' -count=1go test ./service ./controller ./middleware -count=1git diff --check