From 82f19b1a439131feaa16674375ad94cbe450301b Mon Sep 17 00:00:00 2001 From: Mison Date: Mon, 23 Mar 2026 14:06:46 +0800 Subject: [PATCH 1/6] fix: avoid false distributor 503 on stale channel cache --- model/channel.go | 39 +++-------- model/channel_cache.go | 127 +++++++++++++++++++++++------------- model/channel_cache_test.go | 101 ++++++++++++++++++++++++++++ model/channel_satisfy.go | 6 +- 4 files changed, 197 insertions(+), 76 deletions(-) create mode 100644 model/channel_cache_test.go diff --git a/model/channel.go b/model/channel.go index f256b54ce35..2a48d52c50e 100644 --- a/model/channel.go +++ b/model/channel.go @@ -612,38 +612,8 @@ func UpdateChannelStatus(channelId int, usingKey string, status int, reason stri if common.MemoryCacheEnabled { channelStatusLock.Lock() defer channelStatusLock.Unlock() - - channelCache, _ := CacheGetChannel(channelId) - if channelCache == nil { - return false - } - if channelCache.ChannelInfo.IsMultiKey { - // Use per-channel lock to prevent concurrent map read/write with GetNextEnabledKey - pollingLock := GetChannelPollingLock(channelId) - pollingLock.Lock() - // 如果是多Key模式,更新缓存中的状态 - handlerMultiKeyUpdate(channelCache, usingKey, status, reason) - pollingLock.Unlock() - //CacheUpdateChannel(channelCache) - //return true - } else { - // 如果缓存渠道存在,且状态已是目标状态,直接返回 - if channelCache.Status == status { - return false - } - CacheUpdateChannelStatus(channelId, status) - } } - shouldUpdateAbilities := false - defer func() { - if shouldUpdateAbilities { - err := UpdateAbilityStatus(channelId, status == common.ChannelStatusEnabled) - if err != nil { - common.SysLog(fmt.Sprintf("failed to update ability status: channel_id=%d, error=%v", channelId, err)) - } - } - }() channel, err := GetChannelById(channelId, true) if err != nil { return false @@ -676,6 +646,15 @@ func UpdateChannelStatus(channelId int, usingKey string, status int, reason stri return false } } + if shouldUpdateAbilities { + err := UpdateAbilityStatus(channelId, status == common.ChannelStatusEnabled) + if err != nil { + common.SysLog(fmt.Sprintf("failed to update ability status: channel_id=%d, error=%v", channelId, err)) + } + } + if common.MemoryCacheEnabled { + InitChannelCache() + } return true } diff --git a/model/channel_cache.go b/model/channel_cache.go index c9c50357603..e539389ccf6 100644 --- a/model/channel_cache.go +++ b/model/channel_cache.go @@ -5,8 +5,8 @@ import ( "fmt" "math/rand" "sort" - "strings" "sync" + "sync/atomic" "time" "github.com/QuantumNous/new-api/common" @@ -17,6 +17,7 @@ import ( var group2model2channels map[string]map[string][]int // enabled channel var channelsIDM map[int]*Channel // all channels include disabled var channelSyncLock sync.RWMutex +var channelCacheRefreshInFlight atomic.Bool func InitChannelCache() { if !common.MemoryCacheEnabled { @@ -30,37 +31,40 @@ func InitChannelCache() { } var abilities []*Ability DB.Find(&abilities) - groups := make(map[string]bool) - for _, ability := range abilities { - groups[ability.Group] = true - } newGroup2model2channels := make(map[string]map[string][]int) - for group := range groups { - newGroup2model2channels[group] = make(map[string][]int) - } - for _, channel := range channels { - if channel.Status != common.ChannelStatusEnabled { - continue // skip disabled channels + for _, ability := range abilities { + if !ability.Enabled { + continue } - groups := strings.Split(channel.Group, ",") - for _, group := range groups { - models := strings.Split(channel.Models, ",") - for _, model := range models { - if _, ok := newGroup2model2channels[group][model]; !ok { - newGroup2model2channels[group][model] = make([]int, 0) - } - newGroup2model2channels[group][model] = append(newGroup2model2channels[group][model], channel.Id) - } + channel, ok := newChannelId2channel[ability.ChannelId] + if !ok || channel.Status != common.ChannelStatusEnabled { + continue + } + if _, ok := newGroup2model2channels[ability.Group]; !ok { + newGroup2model2channels[ability.Group] = make(map[string][]int) } + newGroup2model2channels[ability.Group][ability.Model] = append( + newGroup2model2channels[ability.Group][ability.Model], + ability.ChannelId, + ) } - // sort by priority + // dedupe and sort by priority for group, model2channels := range newGroup2model2channels { for model, channels := range model2channels { - sort.Slice(channels, func(i, j int) bool { - return newChannelId2channel[channels[i]].GetPriority() > newChannelId2channel[channels[j]].GetPriority() + seen := make(map[int]struct{}, len(channels)) + deduped := make([]int, 0, len(channels)) + for _, channelId := range channels { + if _, ok := seen[channelId]; ok { + continue + } + seen[channelId] = struct{}{} + deduped = append(deduped, channelId) + } + sort.Slice(deduped, func(i, j int) bool { + return newChannelId2channel[deduped[i]].GetPriority() > newChannelId2channel[deduped[j]].GetPriority() }) - newGroup2model2channels[group][model] = channels + newGroup2model2channels[group][model] = deduped } } @@ -93,15 +97,25 @@ func SyncChannelCache(frequency int) { } } -func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, error) { - // if memory cache is disabled, get channel directly from database +func requestChannelCacheRefreshAsync() { if !common.MemoryCacheEnabled { - return GetChannel(group, model, retry) + return } + if !channelCacheRefreshInFlight.CompareAndSwap(false, true) { + return + } + go func() { + defer channelCacheRefreshInFlight.Store(false) + defer func() { + if r := recover(); r != nil { + common.SysLog(fmt.Sprintf("InitChannelCache panic: %v", r)) + } + }() + InitChannelCache() + }() +} - channelSyncLock.RLock() - defer channelSyncLock.RUnlock() - +func getRandomSatisfiedChannelFromCache(group string, model string, retry int) (*Channel, error, bool) { // First, try to find channels with the exact model name. channels := group2model2channels[group][model] @@ -112,14 +126,14 @@ func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, } if len(channels) == 0 { - return nil, nil + return nil, nil, false } if len(channels) == 1 { if channel, ok := channelsIDM[channels[0]]; ok { - return channel, nil + return channel, nil, true } - return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channels[0]) + return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channels[0]), true } uniquePriorities := make(map[int]bool) @@ -127,7 +141,7 @@ func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, if channel, ok := channelsIDM[channelId]; ok { uniquePriorities[int(channel.GetPriority())] = true } else { - return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channelId) + return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channelId), true } } var sortedUniquePriorities []int @@ -151,12 +165,12 @@ func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, targetChannels = append(targetChannels, channel) } } else { - return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channelId) + return nil, fmt.Errorf("数据库一致性错误,渠道# %d 不存在,请联系管理员修复", channelId), true } } if len(targetChannels) == 0 { - return nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)) + return nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)), true } // smoothing factor and adjustment @@ -183,11 +197,41 @@ func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, for _, channel := range targetChannels { randomWeight -= channel.GetWeight()*smoothingFactor + smoothingAdjustment if randomWeight < 0 { - return channel, nil + return channel, nil, true } } - // return null if no channel is not found - return nil, errors.New("channel not found") + return nil, errors.New("channel not found"), true +} + +func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, error) { + // if memory cache is disabled, get channel directly from database + if !common.MemoryCacheEnabled { + return GetChannel(group, model, retry) + } + + channelSyncLock.RLock() + channel, cacheErr, cacheHit := getRandomSatisfiedChannelFromCache(group, model, retry) + channelSyncLock.RUnlock() + if channel != nil || (cacheHit && cacheErr == nil) { + return channel, cacheErr + } + + fallbackChannel, fallbackErr := GetChannel(group, model, retry) + if fallbackErr != nil { + if cacheErr != nil { + return nil, cacheErr + } + return nil, fallbackErr + } + if fallbackChannel != nil && fallbackChannel.Status == common.ChannelStatusEnabled { + requestChannelCacheRefreshAsync() + return fallbackChannel, nil + } + if cacheErr != nil { + requestChannelCacheRefreshAsync() + return nil, cacheErr + } + return nil, nil } func CacheGetChannel(id int) (*Channel, error) { @@ -256,10 +300,5 @@ func CacheUpdateChannel(channel *Channel) { if channel == nil { return } - - println("CacheUpdateChannel:", channel.Id, channel.Name, channel.Status, channel.ChannelInfo.MultiKeyPollingIndex) - - println("before:", channelsIDM[channel.Id].ChannelInfo.MultiKeyPollingIndex) channelsIDM[channel.Id] = channel - println("after :", channelsIDM[channel.Id].ChannelInfo.MultiKeyPollingIndex) } diff --git a/model/channel_cache_test.go b/model/channel_cache_test.go new file mode 100644 index 00000000000..212f5e7d388 --- /dev/null +++ b/model/channel_cache_test.go @@ -0,0 +1,101 @@ +package model + +import ( + "testing" + "time" + + "github.com/QuantumNous/new-api/common" + "github.com/stretchr/testify/require" +) + +func prepareChannelCacheTest(t *testing.T) { + t.Helper() + initCol() + require.NoError(t, DB.AutoMigrate(&Ability{})) + DB.Exec("DELETE FROM abilities") + DB.Exec("DELETE FROM channels") + + channelSyncLock.Lock() + group2model2channels = nil + channelsIDM = nil + channelSyncLock.Unlock() + channelCacheRefreshInFlight.Store(false) +} + +func TestGetRandomSatisfiedChannelFallsBackToDatabaseOnCacheMiss(t *testing.T) { + prepareChannelCacheTest(t) + + prevMemoryCacheEnabled := common.MemoryCacheEnabled + common.MemoryCacheEnabled = true + t.Cleanup(func() { + common.MemoryCacheEnabled = prevMemoryCacheEnabled + }) + + channel := &Channel{ + Id: 101, + Name: "fallback-channel", + Status: common.ChannelStatusEnabled, + Group: "default", + Models: "other-model", + } + require.NoError(t, DB.Create(channel).Error) + require.NoError(t, DB.Create(&Ability{ + Group: "default", + Model: "gpt-5.4", + ChannelId: channel.Id, + Enabled: true, + }).Error) + + got, err := GetRandomSatisfiedChannel("default", "gpt-5.4", 0) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, channel.Id, got.Id) + + require.Eventually(t, func() bool { + channelSyncLock.RLock() + defer channelSyncLock.RUnlock() + return isChannelIDInList(group2model2channels["default"]["gpt-5.4"], channel.Id) + }, time.Second, 20*time.Millisecond) +} + +func TestUpdateChannelStatusRefreshesMemoryCacheAfterEnable(t *testing.T) { + prepareChannelCacheTest(t) + + prevMemoryCacheEnabled := common.MemoryCacheEnabled + common.MemoryCacheEnabled = true + t.Cleanup(func() { + common.MemoryCacheEnabled = prevMemoryCacheEnabled + }) + + channel := &Channel{ + Id: 102, + Name: "auto-disabled-channel", + Status: common.ChannelStatusAutoDisabled, + Group: "default", + Models: "gpt-5.4", + } + require.NoError(t, DB.Create(channel).Error) + require.NoError(t, DB.Create(&Ability{ + Group: "default", + Model: "gpt-5.4", + ChannelId: channel.Id, + Enabled: false, + }).Error) + + InitChannelCache() + + got, err := GetRandomSatisfiedChannel("default", "gpt-5.4", 0) + require.NoError(t, err) + require.Nil(t, got) + + require.True(t, UpdateChannelStatus(channel.Id, "", common.ChannelStatusEnabled, "")) + + got, err = GetRandomSatisfiedChannel("default", "gpt-5.4", 0) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, channel.Id, got.Id) + + channelSyncLock.RLock() + defer channelSyncLock.RUnlock() + require.True(t, isChannelIDInList(group2model2channels["default"]["gpt-5.4"], channel.Id)) +} diff --git a/model/channel_satisfy.go b/model/channel_satisfy.go index 681f1e69bb6..28271a1dfe7 100644 --- a/model/channel_satisfy.go +++ b/model/channel_satisfy.go @@ -25,9 +25,11 @@ func IsChannelEnabledForGroupModel(group string, modelName string, channelID int } normalized := ratio_setting.FormatMatchingModelName(modelName) if normalized != "" && normalized != modelName { - return isChannelIDInList(group2model2channels[group][normalized], channelID) + if isChannelIDInList(group2model2channels[group][normalized], channelID) { + return true + } } - return false + return isChannelEnabledForGroupModelDB(group, modelName, channelID) } func IsChannelEnabledForAnyGroupModel(groups []string, modelName string, channelID int) bool { From 8eb94242a914e5c19616c66e90ee93a9ccc49a6a Mon Sep 17 00:00:00 2001 From: Mison Date: Mon, 23 Mar 2026 15:04:01 +0800 Subject: [PATCH 2/6] test: harden distributor cache fallback coverage --- model/channel.go | 21 +++++++++- model/channel_cache.go | 10 ++++- model/channel_cache_test.go | 84 +++++++++++++++++++++++++++++++++++++ model/channel_satisfy.go | 8 ++-- 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/model/channel.go b/model/channel.go index 2a48d52c50e..4f46804f833 100644 --- a/model/channel.go +++ b/model/channel.go @@ -74,8 +74,25 @@ func (c ChannelInfo) Value() (driver.Value, error) { // Scan implements sql.Scanner interface func (c *ChannelInfo) Scan(value interface{}) error { - bytesValue, _ := value.([]byte) - return common.Unmarshal(bytesValue, c) + switch typedValue := value.(type) { + case nil: + *c = ChannelInfo{} + return nil + case []byte: + if len(typedValue) == 0 { + *c = ChannelInfo{} + return nil + } + return common.Unmarshal(typedValue, c) + case string: + if typedValue == "" { + *c = ChannelInfo{} + return nil + } + return common.Unmarshal([]byte(typedValue), c) + default: + return fmt.Errorf("unsupported channel info type: %T", value) + } } func (channel *Channel) GetKeys() []string { diff --git a/model/channel_cache.go b/model/channel_cache.go index e539389ccf6..aece350cf77 100644 --- a/model/channel_cache.go +++ b/model/channel_cache.go @@ -25,12 +25,18 @@ func InitChannelCache() { } newChannelId2channel := make(map[int]*Channel) var channels []*Channel - DB.Find(&channels) + if err := DB.Find(&channels).Error; err != nil { + common.SysError("failed to sync channels from database: " + err.Error()) + return + } for _, channel := range channels { newChannelId2channel[channel.Id] = channel } var abilities []*Ability - DB.Find(&abilities) + if err := DB.Find(&abilities).Error; err != nil { + common.SysError("failed to sync abilities from database: " + err.Error()) + return + } newGroup2model2channels := make(map[string]map[string][]int) for _, ability := range abilities { if !ability.Enabled { diff --git a/model/channel_cache_test.go b/model/channel_cache_test.go index 212f5e7d388..c1e8c8503f5 100644 --- a/model/channel_cache_test.go +++ b/model/channel_cache_test.go @@ -99,3 +99,87 @@ func TestUpdateChannelStatusRefreshesMemoryCacheAfterEnable(t *testing.T) { defer channelSyncLock.RUnlock() require.True(t, isChannelIDInList(group2model2channels["default"]["gpt-5.4"], channel.Id)) } + +func TestIsChannelEnabledForGroupModelFallsBackToDatabaseOnCacheMiss(t *testing.T) { + prepareChannelCacheTest(t) + + prevMemoryCacheEnabled := common.MemoryCacheEnabled + common.MemoryCacheEnabled = true + t.Cleanup(func() { + common.MemoryCacheEnabled = prevMemoryCacheEnabled + }) + + channel := &Channel{ + Id: 103, + Name: "satisfy-fallback-channel", + Status: common.ChannelStatusEnabled, + Group: "default", + Models: "other-model", + } + require.NoError(t, DB.Create(channel).Error) + require.NoError(t, DB.Create(&Ability{ + Group: "default", + Model: "gpt-5.4-mini", + ChannelId: channel.Id, + Enabled: true, + }).Error) + + require.True(t, IsChannelEnabledForGroupModel("default", "gpt-5.4-mini", channel.Id)) +} + +func TestInitChannelCacheKeepsPreviousSnapshotOnScanError(t *testing.T) { + prepareChannelCacheTest(t) + + prevMemoryCacheEnabled := common.MemoryCacheEnabled + common.MemoryCacheEnabled = true + t.Cleanup(func() { + common.MemoryCacheEnabled = prevMemoryCacheEnabled + }) + + channel := &Channel{ + Id: 104, + Name: "stable-cache-channel", + Status: common.ChannelStatusEnabled, + Group: "default", + Models: "gpt-5.4", + } + require.NoError(t, DB.Create(channel).Error) + require.NoError(t, DB.Create(&Ability{ + Group: "default", + Model: "gpt-5.4", + ChannelId: channel.Id, + Enabled: true, + }).Error) + + InitChannelCache() + + require.NoError(t, DB.Exec( + "INSERT INTO channels (id, type, key, status, name, models, `group`, channel_info, settings) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + 999, + 1, + "broken-key", + common.ChannelStatusEnabled, + "broken-channel", + "broken-model", + "default", + "{", + "", + ).Error) + + InitChannelCache() + + channelSyncLock.RLock() + defer channelSyncLock.RUnlock() + require.True(t, isChannelIDInList(group2model2channels["default"]["gpt-5.4"], channel.Id)) + require.Nil(t, channelsIDM[999]) +} + +func TestChannelInfoScanSupportsStringValue(t *testing.T) { + var info ChannelInfo + err := info.Scan(`{"is_multi_key":false,"multi_key_size":0,"multi_key_status_list":{},"multi_key_disabled_reason":{},"multi_key_disabled_time":{},"multi_key_polling_index":0,"multi_key_mode":"random"}`) + require.NoError(t, err) + require.False(t, info.IsMultiKey) + require.Equal(t, 0, info.MultiKeySize) + require.Equal(t, 0, info.MultiKeyPollingIndex) + require.Equal(t, "random", string(info.MultiKeyMode)) +} diff --git a/model/channel_satisfy.go b/model/channel_satisfy.go index 28271a1dfe7..99dd81fd68a 100644 --- a/model/channel_satisfy.go +++ b/model/channel_satisfy.go @@ -14,21 +14,23 @@ func IsChannelEnabledForGroupModel(group string, modelName string, channelID int } channelSyncLock.RLock() - defer channelSyncLock.RUnlock() - if group2model2channels == nil { - return false + channelSyncLock.RUnlock() + return isChannelEnabledForGroupModelDB(group, modelName, channelID) } if isChannelIDInList(group2model2channels[group][modelName], channelID) { + channelSyncLock.RUnlock() return true } normalized := ratio_setting.FormatMatchingModelName(modelName) if normalized != "" && normalized != modelName { if isChannelIDInList(group2model2channels[group][normalized], channelID) { + channelSyncLock.RUnlock() return true } } + channelSyncLock.RUnlock() return isChannelEnabledForGroupModelDB(group, modelName, channelID) } From d9c84d2e90f543a7d1acf9c5a5939e898f74fb6c Mon Sep 17 00:00:00 2001 From: Mison Date: Mon, 23 Mar 2026 15:53:53 +0800 Subject: [PATCH 3/6] fix: harden distributor cache refresh consistency --- model/channel.go | 31 +++++++++++++++++++-------- model/channel_cache.go | 42 +++++++++++++++++++++++++++++++------ model/channel_cache_test.go | 9 ++++++-- model/channel_satisfy.go | 9 ++++++-- 4 files changed, 72 insertions(+), 19 deletions(-) diff --git a/model/channel.go b/model/channel.go index 4f46804f833..e0f846bc4da 100644 --- a/model/channel.go +++ b/model/channel.go @@ -625,6 +625,7 @@ func handlerMultiKeyUpdate(channel *Channel, usingKey string, status int, reason } } +// UpdateChannelStatus updates channel state and its ability visibility atomically. func UpdateChannelStatus(channelId int, usingKey string, status int, reason string) bool { if common.MemoryCacheEnabled { channelStatusLock.Lock() @@ -657,17 +658,29 @@ func UpdateChannelStatus(channelId int, usingKey string, status int, reason stri channel.Status = status shouldUpdateAbilities = true } - err = channel.SaveWithoutKey() - if err != nil { - common.SysLog(fmt.Sprintf("failed to update channel status: channel_id=%d, status=%d, error=%v", channel.Id, status, err)) - return false - } } - if shouldUpdateAbilities { - err := UpdateAbilityStatus(channelId, status == common.ChannelStatusEnabled) - if err != nil { - common.SysLog(fmt.Sprintf("failed to update ability status: channel_id=%d, error=%v", channelId, err)) + err = DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Omit("key").Save(channel).Error; err != nil { + return err + } + if shouldUpdateAbilities { + err := tx.Model(&Ability{}). + Where("channel_id = ?", channelId). + Select("enabled"). + Update("enabled", status == common.ChannelStatusEnabled).Error + if err != nil { + return err + } + } + return nil + }) + if err != nil { + if shouldUpdateAbilities { + common.SysLog(fmt.Sprintf("failed to update channel or ability status atomically: channel_id=%d, status=%d, error=%v", channelId, status, err)) + } else { + common.SysLog(fmt.Sprintf("failed to update channel status: channel_id=%d, status=%d, error=%v", channel.Id, status, err)) } + return false } if common.MemoryCacheEnabled { InitChannelCache() diff --git a/model/channel_cache.go b/model/channel_cache.go index aece350cf77..dac3e74894d 100644 --- a/model/channel_cache.go +++ b/model/channel_cache.go @@ -18,24 +18,35 @@ var group2model2channels map[string]map[string][]int // enabled channel var channelsIDM map[int]*Channel // all channels include disabled var channelSyncLock sync.RWMutex var channelCacheRefreshInFlight atomic.Bool +var channelCacheRefreshPending atomic.Bool +// InitChannelCache rebuilds the in-memory channel cache from database state. func InitChannelCache() { if !common.MemoryCacheEnabled { return } + channelCacheRefreshPending.Store(true) + if channelCacheRefreshInFlight.CompareAndSwap(false, true) { + runChannelCacheRefreshLoop() + return + } + for channelCacheRefreshInFlight.Load() { + time.Sleep(10 * time.Millisecond) + } +} + +func buildChannelCacheSnapshot() error { newChannelId2channel := make(map[int]*Channel) var channels []*Channel if err := DB.Find(&channels).Error; err != nil { - common.SysError("failed to sync channels from database: " + err.Error()) - return + return fmt.Errorf("failed to sync channels from database: %w", err) } for _, channel := range channels { newChannelId2channel[channel.Id] = channel } var abilities []*Ability if err := DB.Find(&abilities).Error; err != nil { - common.SysError("failed to sync abilities from database: " + err.Error()) - return + return fmt.Errorf("failed to sync abilities from database: %w", err) } newGroup2model2channels := make(map[string]map[string][]int) for _, ability := range abilities { @@ -93,8 +104,23 @@ func InitChannelCache() { channelsIDM = newChannelId2channel channelSyncLock.Unlock() common.SysLog("channels synced from database") + return nil } +func runChannelCacheRefreshLoop() { + defer channelCacheRefreshInFlight.Store(false) + for { + channelCacheRefreshPending.Store(false) + if err := buildChannelCacheSnapshot(); err != nil { + common.SysError(err.Error()) + } + if !channelCacheRefreshPending.Load() { + return + } + } +} + +// SyncChannelCache periodically refreshes the in-memory channel cache. func SyncChannelCache(frequency int) { for { time.Sleep(time.Duration(frequency) * time.Second) @@ -111,13 +137,12 @@ func requestChannelCacheRefreshAsync() { return } go func() { - defer channelCacheRefreshInFlight.Store(false) defer func() { if r := recover(); r != nil { common.SysLog(fmt.Sprintf("InitChannelCache panic: %v", r)) } }() - InitChannelCache() + runChannelCacheRefreshLoop() }() } @@ -209,6 +234,7 @@ func getRandomSatisfiedChannelFromCache(group string, model string, retry int) ( return nil, errors.New("channel not found"), true } +// GetRandomSatisfiedChannel returns a channel for the requested group/model pair. func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, error) { // if memory cache is disabled, get channel directly from database if !common.MemoryCacheEnabled { @@ -240,6 +266,7 @@ func GetRandomSatisfiedChannel(group string, model string, retry int) (*Channel, return nil, nil } +// CacheGetChannel returns a channel from the in-memory cache when available. func CacheGetChannel(id int) (*Channel, error) { if !common.MemoryCacheEnabled { return GetChannelById(id, true) @@ -254,6 +281,7 @@ func CacheGetChannel(id int) (*Channel, error) { return c, nil } +// CacheGetChannelInfo returns cached channel info when available. func CacheGetChannelInfo(id int) (*ChannelInfo, error) { if !common.MemoryCacheEnabled { channel, err := GetChannelById(id, true) @@ -272,6 +300,7 @@ func CacheGetChannelInfo(id int) (*ChannelInfo, error) { return &c.ChannelInfo, nil } +// CacheUpdateChannelStatus mutates a cached channel status in place. func CacheUpdateChannelStatus(id int, status int) { if !common.MemoryCacheEnabled { return @@ -297,6 +326,7 @@ func CacheUpdateChannelStatus(id int, status int) { } } +// CacheUpdateChannel updates a cached channel entry in place. func CacheUpdateChannel(channel *Channel) { if !common.MemoryCacheEnabled { return diff --git a/model/channel_cache_test.go b/model/channel_cache_test.go index c1e8c8503f5..c70b47afc2a 100644 --- a/model/channel_cache_test.go +++ b/model/channel_cache_test.go @@ -1,6 +1,7 @@ package model import ( + "fmt" "testing" "time" @@ -154,7 +155,11 @@ func TestInitChannelCacheKeepsPreviousSnapshotOnScanError(t *testing.T) { InitChannelCache() require.NoError(t, DB.Exec( - "INSERT INTO channels (id, type, key, status, name, models, `group`, channel_info, settings) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + fmt.Sprintf( + "INSERT INTO channels (id, type, %s, status, name, models, %s, channel_info, settings) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + commonKeyCol, + commonGroupCol, + ), 999, 1, "broken-key", @@ -162,7 +167,7 @@ func TestInitChannelCacheKeepsPreviousSnapshotOnScanError(t *testing.T) { "broken-channel", "broken-model", "default", - "{", + `{"is_multi_key":false,"multi_key_size":0,"multi_key_status_list":{},"multi_key_disabled_reason":{},"multi_key_disabled_time":{},"multi_key_polling_index":0,"multi_key_mode":0}`, "", ).Error) diff --git a/model/channel_satisfy.go b/model/channel_satisfy.go index 99dd81fd68a..35c12d91aac 100644 --- a/model/channel_satisfy.go +++ b/model/channel_satisfy.go @@ -5,6 +5,7 @@ import ( "github.com/QuantumNous/new-api/setting/ratio_setting" ) +// IsChannelEnabledForGroupModel reports whether a channel is enabled for a group/model pair. func IsChannelEnabledForGroupModel(group string, modelName string, channelID int) bool { if group == "" || modelName == "" || channelID <= 0 { return false @@ -34,6 +35,7 @@ func IsChannelEnabledForGroupModel(group string, modelName string, channelID int return isChannelEnabledForGroupModelDB(group, modelName, channelID) } +// IsChannelEnabledForAnyGroupModel reports whether a channel is enabled for any group/model pair. func IsChannelEnabledForAnyGroupModel(groups []string, modelName string, channelID int) bool { if len(groups) == 0 { return false @@ -48,8 +50,10 @@ func IsChannelEnabledForAnyGroupModel(groups []string, modelName string, channel func isChannelEnabledForGroupModelDB(group string, modelName string, channelID int) bool { var count int64 + groupColumn := "abilities." + commonGroupCol err := DB.Model(&Ability{}). - Where(commonGroupCol+" = ? and model = ? and channel_id = ? and enabled = ?", group, modelName, channelID, true). + Joins("JOIN channels ON channels.id = abilities.channel_id"). + Where(groupColumn+" = ? and abilities.model = ? and abilities.channel_id = ? and abilities.enabled = ? and channels.status = ?", group, modelName, channelID, true, common.ChannelStatusEnabled). Count(&count).Error if err == nil && count > 0 { return true @@ -60,7 +64,8 @@ func isChannelEnabledForGroupModelDB(group string, modelName string, channelID i } count = 0 err = DB.Model(&Ability{}). - Where(commonGroupCol+" = ? and model = ? and channel_id = ? and enabled = ?", group, normalized, channelID, true). + Joins("JOIN channels ON channels.id = abilities.channel_id"). + Where(groupColumn+" = ? and abilities.model = ? and abilities.channel_id = ? and abilities.enabled = ? and channels.status = ?", group, normalized, channelID, true, common.ChannelStatusEnabled). Count(&count).Error return err == nil && count > 0 } From c2d6870f7af22bf7cee2382b5dfc5e4e2de54860 Mon Sep 17 00:00:00 2001 From: Mison Date: Mon, 23 Mar 2026 16:03:30 +0800 Subject: [PATCH 4/6] style: align distributor cache refresh flow --- model/channel_cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/model/channel_cache.go b/model/channel_cache.go index dac3e74894d..8f8a30e9bdb 100644 --- a/model/channel_cache.go +++ b/model/channel_cache.go @@ -133,6 +133,7 @@ func requestChannelCacheRefreshAsync() { if !common.MemoryCacheEnabled { return } + channelCacheRefreshPending.Store(true) if !channelCacheRefreshInFlight.CompareAndSwap(false, true) { return } @@ -201,7 +202,7 @@ func getRandomSatisfiedChannelFromCache(group string, model string, retry int) ( } if len(targetChannels) == 0 { - return nil, errors.New(fmt.Sprintf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority)), true + return nil, fmt.Errorf("no channel found, group: %s, model: %s, priority: %d", group, model, targetPriority), true } // smoothing factor and adjustment From f8d0abb0c020dc14ef492e86c37eb17646a24998 Mon Sep 17 00:00:00 2001 From: Mison Date: Thu, 26 Mar 2026 17:36:14 +0800 Subject: [PATCH 5/6] test: fix channel cache scan error fixture --- model/channel_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/channel_cache_test.go b/model/channel_cache_test.go index c70b47afc2a..7a86e1f492e 100644 --- a/model/channel_cache_test.go +++ b/model/channel_cache_test.go @@ -167,7 +167,7 @@ func TestInitChannelCacheKeepsPreviousSnapshotOnScanError(t *testing.T) { "broken-channel", "broken-model", "default", - `{"is_multi_key":false,"multi_key_size":0,"multi_key_status_list":{},"multi_key_disabled_reason":{},"multi_key_disabled_time":{},"multi_key_polling_index":0,"multi_key_mode":0}`, + `{invalid`, "", ).Error) From cabcae1fbeaf16229bcbc5ddf6085fa1574d3bf4 Mon Sep 17 00:00:00 2001 From: Mison Date: Thu, 26 Mar 2026 18:02:38 +0800 Subject: [PATCH 6/6] test: reset channel cache pending state --- model/channel_cache_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/model/channel_cache_test.go b/model/channel_cache_test.go index 7a86e1f492e..6601324e407 100644 --- a/model/channel_cache_test.go +++ b/model/channel_cache_test.go @@ -21,6 +21,7 @@ func prepareChannelCacheTest(t *testing.T) { channelsIDM = nil channelSyncLock.Unlock() channelCacheRefreshInFlight.Store(false) + channelCacheRefreshPending.Store(false) } func TestGetRandomSatisfiedChannelFallsBackToDatabaseOnCacheMiss(t *testing.T) {