From 9f42f3922f91c7c876d50a0ec28d4f793f66c08f Mon Sep 17 00:00:00 2001 From: zhaoli Date: Tue, 11 Nov 2025 17:56:02 +0800 Subject: [PATCH] [fix][dingoscheduler] Add caching tasks concurrently --- cmd/wire_gen.go | 3 ++- internal/dao/dao.go | 2 +- internal/dao/lock_dao.go | 39 +++++++++++++++++++++++++++ internal/dao/repository_dao.go | 6 +++-- internal/service/cache_job_service.go | 16 ++++++++++- pkg/util/avatar_util.go | 1 - 6 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 internal/dao/lock_dao.go diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 9a5d19e..b602d96 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -47,7 +47,8 @@ func wireApp(configConfig *config.Config) (*app.App, func(), error) { repositoryHandler := handler.NewRepositoryHandler(repositoryService) tagService := service.NewTagService(tagDao) tagHandler := handler.NewTagHandler(tagService) - cacheJobService := service.NewCacheJobService(dingospeedDao, modelFileProcessDao, cacheJobDao, hfTokenDao) + lockDao := dao.NewLockDao(baseData) + cacheJobService := service.NewCacheJobService(dingospeedDao, modelFileProcessDao, cacheJobDao, hfTokenDao, lockDao) cacheJobHandler := handler.NewCacheJobHandler(cacheJobService) httpRouter := router.NewHttpRouter(echo, managerHandler, sysHandler, repositoryHandler, tagHandler, cacheJobHandler) httpServer := server.NewHTTPServer(configConfig, httpRouter) diff --git a/internal/dao/dao.go b/internal/dao/dao.go index bb3dd07..a047e74 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -17,4 +17,4 @@ package dao import "github.com/google/wire" var DaoProvider = wire.NewSet(NewDingospeedDao, NewModelFileRecordDao, NewModelFileProcessDao, NewCacheJobDao, - NewRepositoryDao, NewTagDao, NewRepositoryTagDao, NewOrganizationDao, NewHfTokenDao) + NewRepositoryDao, NewTagDao, NewRepositoryTagDao, NewOrganizationDao, NewHfTokenDao, NewLockDao) diff --git a/internal/dao/lock_dao.go b/internal/dao/lock_dao.go new file mode 100644 index 0000000..5e2557c --- /dev/null +++ b/internal/dao/lock_dao.go @@ -0,0 +1,39 @@ +package dao + +import ( + "fmt" + "sync" + "time" + + "dingoscheduler/internal/data" +) + +type LockDao struct { + baseData *data.BaseData + cacheJobReqMu sync.Mutex + cacheJobReqTimeout time.Duration +} + +func NewLockDao(baseData *data.BaseData) *LockDao { + return &LockDao{baseData: baseData, cacheJobReqTimeout: 60 * time.Second} +} + +func (f *LockDao) GetCacheJobReqLock(orgRepoKey string) *sync.RWMutex { + if val, ok := f.baseData.Cache.Get(orgRepoKey); ok { + f.baseData.Cache.Set(orgRepoKey, val, f.cacheJobReqTimeout) + return val.(*sync.RWMutex) + } + f.cacheJobReqMu.Lock() + defer f.cacheJobReqMu.Unlock() + if val, ok := f.baseData.Cache.Get(orgRepoKey); ok { + f.baseData.Cache.Set(orgRepoKey, val, f.cacheJobReqTimeout) + return val.(*sync.RWMutex) + } + newLock := &sync.RWMutex{} + f.baseData.Cache.Set(orgRepoKey, newLock, f.cacheJobReqTimeout) + return newLock +} + +func GetCacheJobOrgRepoKey(orgRepo string) string { + return fmt.Sprintf("cacheJob/%s", orgRepo) +} diff --git a/internal/dao/repository_dao.go b/internal/dao/repository_dao.go index 0770343..b7b2f53 100644 --- a/internal/dao/repository_dao.go +++ b/internal/dao/repository_dao.go @@ -55,7 +55,7 @@ func NewRepositoryDao(data *data.BaseData, repositoryTagDao *RepositoryTagDao, t } func (r *RepositoryDao) PersistRepo(persistRepoReq *query.PersistRepoReq) error { - zap.S().Debugf("PersistRepo instanceId:%s, org:%s, repo:%s", persistRepoReq.InstanceIds, persistRepoReq.Org, persistRepoReq.Repo) + zap.S().Debugf("PersistRepo start instanceId:%s, org:%s, repo:%s", persistRepoReq.InstanceIds, persistRepoReq.Org, persistRepoReq.Repo) var ( pipelineMap map[string]string err error @@ -93,6 +93,7 @@ func (r *RepositoryDao) PersistRepo(persistRepoReq *query.PersistRepoReq) error } } } + zap.S().Debugf("PersistRepo end instanceId:%s, org:%s, repo:%s", persistRepoReq.InstanceIds, persistRepoReq.Org, persistRepoReq.Repo) return nil } @@ -117,7 +118,8 @@ func (r *RepositoryDao) singleRepositoryPersist(repository *model.Repository, in return err } if !isComplete { - return myerr.New(fmt.Sprintf("repo file unComplete.%s", orgRepo)) + zap.S().Infof("repo file unComplete.%s", orgRepo) + return nil } } // 保存组织图片 diff --git a/internal/service/cache_job_service.go b/internal/service/cache_job_service.go index 6989e66..f2fb68f 100644 --- a/internal/service/cache_job_service.go +++ b/internal/service/cache_job_service.go @@ -34,15 +34,17 @@ type CacheJobService struct { modelFileProcessDao *dao.ModelFileProcessDao cacheJobDao *dao.CacheJobDao hfTokenDao *dao.HfTokenDao + lockDao *dao.LockDao } func NewCacheJobService(dingospeedDao *dao.DingospeedDao, modelFileProcessDao *dao.ModelFileProcessDao, - cacheJobDao *dao.CacheJobDao, hfTokenDao *dao.HfTokenDao) *CacheJobService { + cacheJobDao *dao.CacheJobDao, hfTokenDao *dao.HfTokenDao, lockDao *dao.LockDao) *CacheJobService { return &CacheJobService{ dingospeedDao: dingospeedDao, cacheJobDao: cacheJobDao, modelFileProcessDao: modelFileProcessDao, hfTokenDao: hfTokenDao, + lockDao: lockDao, } } @@ -62,6 +64,9 @@ func (c *CacheJobService) ListCacheJob(instanceId, datatype string, page, pageSi func (c *CacheJobService) CreateCacheJob(createCacheJobReq *query.CreateCacheJobReq) (*common.Response, error) { zap.S().Debugf("Cache instanceId:%s, %s/%s", createCacheJobReq.InstanceId, createCacheJobReq.Org, createCacheJobReq.Repo) + lock := c.lockDao.GetCacheJobReqLock(createCacheJobReq.OrgRepo) + lock.Lock() + defer lock.Unlock() cacheJob, err := c.cacheJobDao.GetCacheJob(&query.CacheJobQuery{InstanceId: createCacheJobReq.InstanceId, Type: createCacheJobReq.Type, Org: createCacheJobReq.Org, Repo: createCacheJobReq.Repo, Datatype: createCacheJobReq.Datatype}) if err != nil { @@ -86,6 +91,9 @@ func (c *CacheJobService) CreateCacheJob(createCacheJobReq *query.CreateCacheJob } func (c *CacheJobService) StopCacheJob(jobStatusReq *query.JobStatusReq) error { + lock := c.lockDao.GetCacheJobReqLock(util.Itoa(jobStatusReq.Id)) + lock.Lock() + defer lock.Unlock() cacheJob, err := c.cacheJobDao.GetCacheJob(&query.CacheJobQuery{Id: jobStatusReq.Id}) if err != nil { return err @@ -120,6 +128,9 @@ func (c *CacheJobService) StopCacheJob(jobStatusReq *query.JobStatusReq) error { } func (c *CacheJobService) ResumeCacheJob(resumeCacheJobReq *query.ResumeCacheJobReq) error { + lock := c.lockDao.GetCacheJobReqLock(util.Itoa(resumeCacheJobReq.Id)) + lock.Lock() + defer lock.Unlock() cacheJob, err := c.cacheJobDao.GetCacheJob(&query.CacheJobQuery{Id: resumeCacheJobReq.Id}) if err != nil { return err @@ -158,6 +169,9 @@ func (c *CacheJobService) ResumeCacheJob(resumeCacheJobReq *query.ResumeCacheJob } func (c *CacheJobService) DeleteCacheJob(id int64) error { + lock := c.lockDao.GetCacheJobReqLock(util.Itoa(id)) + lock.Lock() + defer lock.Unlock() cacheJob, err := c.cacheJobDao.GetCacheJob(&query.CacheJobQuery{Id: id}) if err != nil { return err diff --git a/pkg/util/avatar_util.go b/pkg/util/avatar_util.go index ac723a3..97d43b7 100644 --- a/pkg/util/avatar_util.go +++ b/pkg/util/avatar_util.go @@ -96,7 +96,6 @@ func FetchAvatarURL(orgName string) (string, error) { } findAvatar(doc) if avatarURL == "" { - zap.S().Errorf("在组织页面(%s)中未找到符合特征的头像节点", orgUri) return "", fmt.Errorf("未在组织页面(%s)中找到头像元素", orgUri) } return avatarURL, nil