Skip to content

fix: avoid false distributor 503 on stale channel cache#3402

Open
MisonL wants to merge 6 commits intoQuantumNous:mainfrom
MisonL:fix/distributor-cache-desync
Open

fix: avoid false distributor 503 on stale channel cache#3402
MisonL wants to merge 6 commits intoQuantumNous:mainfrom
MisonL:fix/distributor-cache-desync

Conversation

@MisonL
Copy link

@MisonL MisonL commented Mar 23, 2026

📝 变更描述 / Description

  • 修复分发层因内存缓存与数据库状态漂移而误报 503 No available channel ... (distributor) 的问题。
  • 将缓存重建与兜底查询统一回到数据库真实能力映射,避免 channelsabilities 双来源不一致。
  • 在缓存缺失或漂移时回退数据库查询,并在成功命中后异步刷新内存缓存。
  • 保留缓存构建失败时的上一份可用快照,并补齐对应测试。

🚀 变更类型 / Type of change

  • 🐛 Bug 修复 (Bug fix)
  • ✨ 新功能 (New feature)
  • ⚡ 性能优化 / 重构 (Refactor)
  • 📝 文档更新 (Documentation)

🔗 关联任务 / Related Issue

✅ 提交前检查项 / Checklist

  • 人工确认: 我已亲自撰写此描述,去除了 AI 原始输出的冗余。
  • 深度理解: 我已完全理解这些更改的工作原理及潜在影响。
  • 范围聚焦: 本 PR 未包含任何与当前任务无关的代码改动。
  • 本地验证: 已在本地运行并通过了测试或手动验证。
  • 安全合规: 代码中无敏感凭据,且符合项目代码规范。

📸 运行证明 / Proof of Work

  • go test ./model -run 'Test(GetRandomSatisfiedChannelFallsBackToDatabaseOnCacheMiss|UpdateChannelStatusRefreshesMemoryCacheAfterEnable|IsChannelEnabledForGroupModelFallsBackToDatabaseOnCacheMiss|InitChannelCacheKeepsPreviousSnapshotOnScanError|ChannelInfoScanSupportsStringValue|UpdateWithStatus_.*|Snapshot.*)' -count=1
  • go test ./service ./controller ./middleware -count=1
  • git diff --check

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 23, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR consolidates channel and ability persistence into transactions, adds atomic coordination and async refresh for channel cache rebuilds, derives cache from enabled Ability records, improves cache-miss fallbacks to DB, tightens ChannelInfo.Scan type handling, and extends tests for these behaviors.

Changes

Cohort / File(s) Summary
Transaction & Status Updates
model/channel.go
ChannelInfo.Scan now uses a type-switch; UpdateChannelStatus removed cache-guarded early-return and deferred ability updates, persists Channel and Ability.enabled inside a single DB.Transaction, and triggers cache init post-commit when enabled.
Cache Coordination & Derivation
model/channel_cache.go
Added atomic in-flight-refresh flag with CompareAndSwap, async refresh loop with panic recovery, split cache build into buildChannelCacheSnapshot(), changed derivation to use enabled Ability rows, added per-(group,model) deduplication, and refined cache error/miss handling in GetRandomSatisfiedChannel.
Cache Tests
model/channel_cache_test.go
New tests and helpers that reset DB/cache globals, force common.MemoryCacheEnabled = true, validate DB fallback behavior, cache refresh on status changes, error handling during cache init, and ChannelInfo.Scan parsing.
Cache Satisfaction Query
model/channel_satisfy.go
IsChannelEnabledForGroupModel uses explicit RLock/RUnlock on all returns and falls back to DB; isChannelEnabledForGroupModelDB now JOINs channels, filters by channels.status, and queries abilities.* fields.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Caller as Caller
    participant Cache as Channel Cache
    participant DB as Database
    participant Lock as Rebuild Lock

    Caller->>Cache: UpdateChannelStatus(id, status)

    rect rgba(100,150,200,0.5)
    Note over Cache,DB: New transactional flow
    Cache->>DB: Begin Transaction
    DB->>DB: Save Channel (tx.Omit("key").Save)
    alt shouldUpdateAbilities
        DB->>DB: Update Ability.enabled rows (WHERE channel_id)
    end
    DB-->>Cache: Commit/Abort
    alt Commit
        Cache->>Lock: InitChannelCache() (if MemoryCacheEnabled)
        Lock->>Cache: Acquire CAS guard, rebuild snapshot
        Cache->>DB: Read Ability rows -> build cache
        Lock->>Cache: Release
        Cache-->>Caller: return true
    else Abort
        Cache-->>Caller: return false
    end
    end
Loading
sequenceDiagram
    autonumber
    participant Client as Client
    participant Cache as Channel Cache
    participant DB as Database
    participant Atomic as Refresh Flag

    Client->>Cache: GetRandomSatisfiedChannel(group, model)

    Cache->>Cache: Acquire RLock
    alt Cache hit
        Cache->>Cache: getRandomSatisfiedChannelFromCache (returns channel, cacheHit)
        Cache->>Cache: RUnlock
        Client-->>Cache: return channel
    else Cache miss or error
        Cache->>Cache: RUnlock
        Cache->>DB: GetChannel fallback (DB query)
        alt DB returns channel
            Client-->>Cache: return channel
            Cache->>Atomic: requestChannelCacheRefreshAsync (set pending/CAS)
        else DB no channel
            Cache-->>Client: return "no channel found" error
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • seefs001

"🐰
I nibble code, then hop along,
Transactions fixed what once was wrong.
Cache wakes smart with single-run flair,
Abilities guide the channels there.
Hooray — no more split-brain hare!"

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 45.45% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly relates to the main objective—fixing false distributor 503 errors caused by stale cache—and accurately summarizes the primary change.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model/channel_cache_test.go`:
- Around line 156-167: The test uses a SQLite-specific raw INSERT
(backtick-quoted `group` and invalid JSON "{") so it won't fail the same way on
MySQL/Postgres; update the DB.Exec in channel_cache_test.go to use the shared
identifier variables (commonGroupCol and commonKeyCol) instead of backticks and
replace the hardcoded invalid JSON with a backend-agnostic value that forces
ChannelInfo.Scan to fail (e.g., NULL or a type-mismatched value) so
InitChannelCache() exercises the Scan error path across SQLite/MySQL/Postgres;
keep the same values for id, type, key, status, name, models but ensure the SQL
and bound parameters are compatible with all supported backends.

In `@model/channel_cache.go`:
- Line 20: The cache rebuild gating is incomplete: channelCacheRefreshInFlight
only protects one async path while SyncChannelCache() and UpdateChannelStatus()
call InitChannelCache() directly and InitChannelCache() only takes
channelSyncLock at publish time, allowing stale builds to overwrite newer
snapshots. Fix by moving the in-flight coordination into the single shared
refresh entry (e.g., create or update the existing RefreshChannelCache()
wrapper) and have SyncChannelCache() and UpdateChannelStatus() call that shared
refresh path instead of calling InitChannelCache() directly; alternatively,
ensure InitChannelCache() itself acquires channelCacheRefreshInFlight (or the
same gate) for the entire DB read/build and also holds channelSyncLock
appropriately so no older rebuild can finish after a newer publish (reference
symbols: channelCacheRefreshInFlight, SyncChannelCache, UpdateChannelStatus,
InitChannelCache, channelSyncLock).

In `@model/channel_satisfy.go`:
- Around line 18-19: The DB fallback currently invoked from the cache miss (the
call to isChannelEnabledForGroupModelDB(group, modelName, channelID) after
channelSyncLock.RUnlock()) only checks abilities.enabled; update the DB query
inside isChannelEnabledForGroupModelDB to include the channel status by joining
the channels table (or adding channels.enabled = true) so the fallback enforces
the same "channel row is enabled" rule that InitChannelCache() uses; apply the
same change to the other similar fallback query referenced in the file (the
second occurrence around the other cache miss) so both DB paths require
channels.enabled && abilities.enabled.

In `@model/channel.go`:
- Around line 666-674: The code currently continues as success when
UpdateAbilityStatus(channelId, status == common.ChannelStatusEnabled) fails,
then calls InitChannelCache(), causing cache rebuild from a partial update and
possible channels/abilities desync; change the flow in the block that checks
shouldUpdateAbilities so that if UpdateAbilityStatus returns an error you do not
proceed to success or call InitChannelCache — instead return or propagate the
error (or perform a rollback of the channel row save) so channel and ability
updates are atomic; specifically ensure the code around UpdateAbilityStatus,
shouldUpdateAbilities, channelId and status either aborts/returns the error or
rolls back before calling InitChannelCache when common.MemoryCacheEnabled.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: db00f116-864b-48a8-ba33-cd6325bd11f7

📥 Commits

Reviewing files that changed from the base of the PR and between 9ae9040 and 8cac144.

📒 Files selected for processing (4)
  • model/channel.go
  • model/channel_cache.go
  • model/channel_cache_test.go
  • model/channel_satisfy.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
model/channel_cache.go (2)

132-147: Consider setting pending before attempting CompareAndSwap for consistency.

Unlike InitChannelCache (line 28), this function doesn't set channelCacheRefreshPending before checking CompareAndSwap. If a refresh is already in flight, the request is silently dropped rather than queued for the next loop iteration. This inconsistency means async refresh requests during an active refresh may not trigger a follow-up rebuild.

Since the periodic SyncChannelCache will eventually catch up, this is low risk, but aligning the behavior would be more robust.

♻️ Suggested fix
 func requestChannelCacheRefreshAsync() {
 	if !common.MemoryCacheEnabled {
 		return
 	}
+	channelCacheRefreshPending.Store(true)
 	if !channelCacheRefreshInFlight.CompareAndSwap(false, true) {
 		return
 	}
 	go func() {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model/channel_cache.go` around lines 132 - 147, The async refresh path in
requestChannelCacheRefreshAsync currently only checks
channelCacheRefreshInFlight.CompareAndSwap and drops requests if a refresh is
already running; to match InitChannelCache behavior and ensure follow-up
rebuilds are scheduled, set channelCacheRefreshPending (use its Store/atomic
setter) to true before attempting the CompareAndSwap on
channelCacheRefreshInFlight so the running loop will see the pending flag and
perform another rebuild; reference requestChannelCacheRefreshAsync,
channelCacheRefreshPending, channelCacheRefreshInFlight,
runChannelCacheRefreshLoop (and InitChannelCache for the desired behavior) and
ensure any existing logic that clears channelCacheRefreshPending inside
runChannelCacheRefreshLoop remains correct.

204-204: Use fmt.Errorf instead of errors.New(fmt.Sprintf(...)).

The idiomatic Go pattern for formatted error messages is fmt.Errorf.

♻️ Suggested fix
-		return nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)), true
+		return nil, fmt.Errorf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority), true
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model/channel_cache.go` at line 204, Replace the non-idiomatic
errors.New(fmt.Sprintf(...)) return with fmt.Errorf(...) at the return that
currently uses variables group, model and targetPriority (the line returning
nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority:
%d", group, model, targetPriority)), true). Use fmt.Errorf("no channel found,
group: %s, model: %s, priority: %d", group, model, targetPriority) and add an
import for "fmt" if it's not already imported in channel_cache.go.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@model/channel_cache.go`:
- Around line 132-147: The async refresh path in requestChannelCacheRefreshAsync
currently only checks channelCacheRefreshInFlight.CompareAndSwap and drops
requests if a refresh is already running; to match InitChannelCache behavior and
ensure follow-up rebuilds are scheduled, set channelCacheRefreshPending (use its
Store/atomic setter) to true before attempting the CompareAndSwap on
channelCacheRefreshInFlight so the running loop will see the pending flag and
perform another rebuild; reference requestChannelCacheRefreshAsync,
channelCacheRefreshPending, channelCacheRefreshInFlight,
runChannelCacheRefreshLoop (and InitChannelCache for the desired behavior) and
ensure any existing logic that clears channelCacheRefreshPending inside
runChannelCacheRefreshLoop remains correct.
- Line 204: Replace the non-idiomatic errors.New(fmt.Sprintf(...)) return with
fmt.Errorf(...) at the return that currently uses variables group, model and
targetPriority (the line returning nil, errors.New(fmt.Sprintf("no channel
found, group: %s, model: %s, priority: %d", group, model, targetPriority)),
true). Use fmt.Errorf("no channel found, group: %s, model: %s, priority: %d",
group, model, targetPriority) and add an import for "fmt" if it's not already
imported in channel_cache.go.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d19a2ffc-edd7-40d4-906c-279c707f706b

📥 Commits

Reviewing files that changed from the base of the PR and between 8cac144 and 54e494e.

📒 Files selected for processing (4)
  • model/channel.go
  • model/channel_cache.go
  • model/channel_cache_test.go
  • model/channel_satisfy.go
✅ Files skipped from review due to trivial changes (1)
  • model/channel_cache_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • model/channel.go

@MisonL MisonL force-pushed the fix/distributor-cache-desync branch from 8067a66 to c2d6870 Compare March 26, 2026 08:59
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
model/channel.go (1)

685-687: Synchronous InitChannelCache() may cause latency spikes under concurrent status updates.

Per model/channel_cache.go lines 24-36, InitChannelCache() either runs the full cache rebuild synchronously or busy-waits (10ms sleeps) until an in-flight rebuild completes. Call sites in service/channel.go (DisableChannelByError, EnableChannel) and relay/mjproxy_handler.go will block on every status change.

Under load (e.g., multiple channels failing simultaneously), this can cause cascading request latency as each caller waits for the cache rebuild. Consider whether the cache refresh could be made fully asynchronous (fire-and-forget with coalescing) to avoid blocking the status-update callers, accepting slightly stale reads until the rebuild completes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model/channel.go` around lines 685 - 687, The synchronous InitChannelCache()
causes blocking on status updates; change it to a non-blocking, fire-and-forget
refresh with coalescing: add a new async entrypoint (e.g., InitChannelCacheAsync
or EnqueueChannelCacheRefresh) that returns immediately and schedules a single
background rebuild (use a package-level goroutine, mutex/atomic flag or a single
buffered channel to coalesce concurrent requests and avoid duplicate rebuilds),
update call sites (DisableChannelByError, EnableChannel and the call in
relay/mjproxy_handler.go) to call the new async enqueuer instead of invoking
InitChannelCache() directly, and keep the original InitChannelCache
implementation intact for synchronous use/testing if needed.
model/channel_cache_test.go (1)

12-24: Missing reset for channelCacheRefreshPending.

The helper resets channelCacheRefreshInFlight but not channelCacheRefreshPending. Per the package-level variables in model/channel_cache.go (lines 17-21), both flags coordinate cache rebuilds. A stale pending=true from a prior test could cause InitChannelCache() to trigger an extra rebuild loop, potentially making tests flaky.

🔧 Proposed fix
 	channelSyncLock.Unlock()
 	channelCacheRefreshInFlight.Store(false)
+	channelCacheRefreshPending.Store(false)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model/channel_cache_test.go` around lines 12 - 24, prepareChannelCacheTest
resets channelCacheRefreshInFlight but not channelCacheRefreshPending, leaving a
stale pending=true that can trigger extra cache rebuilds; update the
prepareChannelCacheTest helper to also clear the pending flag by calling
channelCacheRefreshPending.Store(false) (or the appropriate reset method) in the
same place where channelCacheRefreshInFlight is reset so both coordination flags
are cleared before each test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model/channel_cache_test.go`:
- Around line 131-180: The test
TestInitChannelCacheKeepsPreviousSnapshotOnScanError is inserting valid JSON
into channel_info so ChannelInfo.Scan() will not error, meaning
buildChannelCacheSnapshot() won't exercise the scan-error path; update the
inserted channel_info payload to be invalid/corrupt (e.g., "{invalid") so
ChannelInfo.Scan() (which calls common.Unmarshal()) returns an error and the
snapshot-preservation behavior is tested, or if you intend to test exclusion due
to missing abilities instead, rename the test to reflect that behavior and keep
valid JSON; check channelsIDM and group2model2channels assertions remain correct
after the change.

---

Nitpick comments:
In `@model/channel_cache_test.go`:
- Around line 12-24: prepareChannelCacheTest resets channelCacheRefreshInFlight
but not channelCacheRefreshPending, leaving a stale pending=true that can
trigger extra cache rebuilds; update the prepareChannelCacheTest helper to also
clear the pending flag by calling channelCacheRefreshPending.Store(false) (or
the appropriate reset method) in the same place where
channelCacheRefreshInFlight is reset so both coordination flags are cleared
before each test.

In `@model/channel.go`:
- Around line 685-687: The synchronous InitChannelCache() causes blocking on
status updates; change it to a non-blocking, fire-and-forget refresh with
coalescing: add a new async entrypoint (e.g., InitChannelCacheAsync or
EnqueueChannelCacheRefresh) that returns immediately and schedules a single
background rebuild (use a package-level goroutine, mutex/atomic flag or a single
buffered channel to coalesce concurrent requests and avoid duplicate rebuilds),
update call sites (DisableChannelByError, EnableChannel and the call in
relay/mjproxy_handler.go) to call the new async enqueuer instead of invoking
InitChannelCache() directly, and keep the original InitChannelCache
implementation intact for synchronous use/testing if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4601f80d-7244-4460-8630-1f817b6de027

📥 Commits

Reviewing files that changed from the base of the PR and between 85181c9 and c2d6870.

📒 Files selected for processing (4)
  • model/channel.go
  • model/channel_cache.go
  • model/channel_cache_test.go
  • model/channel_satisfy.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • model/channel_satisfy.go
  • model/channel_cache.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
model/channel_cache_test.go (1)

16-17: Assert cleanup SQL errors in test setup.

Ignoring DB.Exec(...).Error can mask fixture setup failures and produce misleading test results.

🔧 Proposed fix
-	DB.Exec("DELETE FROM abilities")
-	DB.Exec("DELETE FROM channels")
+	require.NoError(t, DB.Exec("DELETE FROM abilities").Error)
+	require.NoError(t, DB.Exec("DELETE FROM channels").Error)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model/channel_cache_test.go` around lines 16 - 17, The test setup is ignoring
SQL errors from the cleanup calls; update the two DB.Exec("DELETE FROM
abilities") and DB.Exec("DELETE FROM channels") invocations in
channel_cache_test.go to capture their returned result and assert no error
(e.g., check result.Error and call t.Fatalf or use require.NoError(t,
result.Error)) so any fixture cleanup failure fails the test immediately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model/channel_cache_test.go`:
- Around line 12-24: prepareChannelCacheTest resets channelCacheRefreshInFlight
but omits resetting channelCacheRefreshPending, which can leak state between
tests; update prepareChannelCacheTest to also clear channelCacheRefreshPending
(similar to channelCacheRefreshInFlight) so the pending flag is reset before
each test run, keeping test state isolated from previous runs (see
prepareChannelCacheTest, channelCacheRefreshPending,
channelCacheRefreshInFlight).

---

Nitpick comments:
In `@model/channel_cache_test.go`:
- Around line 16-17: The test setup is ignoring SQL errors from the cleanup
calls; update the two DB.Exec("DELETE FROM abilities") and DB.Exec("DELETE FROM
channels") invocations in channel_cache_test.go to capture their returned result
and assert no error (e.g., check result.Error and call t.Fatalf or use
require.NoError(t, result.Error)) so any fixture cleanup failure fails the test
immediately.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 08d5c012-c212-4def-b114-cdd8fded33e9

📥 Commits

Reviewing files that changed from the base of the PR and between c2d6870 and f8d0abb.

📒 Files selected for processing (1)
  • model/channel_cache_test.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant