From 098d2532c4576e399dacd91a41b08f59ea0c9dae Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Thu, 2 Apr 2026 13:11:07 +0200 Subject: [PATCH] fix buggy locking for migration path Signed-off-by: Matthias Bertschy --- pkg/registry/file/containerprofile_storage.go | 2 +- pkg/registry/file/storage.go | 209 ++++++++++++------ 2 files changed, 139 insertions(+), 72 deletions(-) diff --git a/pkg/registry/file/containerprofile_storage.go b/pkg/registry/file/containerprofile_storage.go index b8275874a..7c4b64c6d 100644 --- a/pkg/registry/file/containerprofile_storage.go +++ b/pkg/registry/file/containerprofile_storage.go @@ -90,7 +90,7 @@ func (c *ContainerProfileStorageImpl) GetStorageImpl() *StorageImpl { func (c *ContainerProfileStorageImpl) GetTsContainerProfile(ctx context.Context, key string) (softwarecomposition.ContainerProfile, error) { conn := ctx.Value(connKey).(*sqlite.Conn) tsProfile := softwarecomposition.ContainerProfile{} - err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile) // get instead of GetWithConn to bypass locking + err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile, noLock) // get instead of GetWithConn to bypass locking return tsProfile, err } diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 4bf1046ef..506b07584 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -433,11 +433,21 @@ func (s *StorageImpl) GetWithConn(ctx context.Context, conn *sqlite.Conn, key st if lockDuration > time.Second { logger.L().Debug("Get", helpers.String("key", key), helpers.String("lockDuration", lockDuration.String())) } - return s.get(ctx, conn, key, opts, objPtr) + return s.get(ctx, conn, key, opts, objPtr, hasReadLock) } +// getLockState describes what lock the caller holds when invoking get(), so the +// migration path can manage locks correctly without crashing or deadlocking. +type getLockState int + +const ( + noLock getLockState = iota // caller holds no lock; migration acquires a temporary write lock + hasReadLock // caller holds a read lock; migration upgrades to write lock then restores read lock + hasWriteLock // caller already holds the write lock; migration runs without lock changes +) + // get is a helper function for Get to allow calls without locks from other methods that already have them -func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object) error { +func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object, lockState getLockState) error { p := filepath.Join(s.root, key) if opts.ResourceVersion == softwarecomposition.ResourceVersionMetadata { // get metadata from SQLite @@ -455,6 +465,25 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op } return json.Unmarshal(metadata, objPtr) } + + // noLock callers perform unsynchronized file I/O by default. Acquire a + // temporary read lock so that a concurrent saveObject cannot truncate or + // overwrite the file while we are decoding it. The lock is released via + // the deferred cleanup below; the migration path clears ownedRLock before + // calling RUnlock itself so the defer becomes a no-op. + ownedRLock := false + if lockState == noLock { + if err := s.locks.RLock(ctx, key); err != nil { + return apierrors.NewTimeoutError(fmt.Sprintf("rlock: %v", err), 0) + } + ownedRLock = true + defer func() { + if ownedRLock { + s.locks.RUnlock(key) + } + }() + } + payloadFile, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0) if err != nil { if errors.Is(err, afero.ErrFileNotFound) { @@ -484,78 +513,51 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op if strings.Contains(err.Error(), "gob: wrong type") || strings.Contains(err.Error(), "extra fields") { logger.L().Ctx(ctx).Info("Get - detected gob type mismatch, attempting external migration", helpers.Error(err), helpers.String("key", key)) - // Rewrite the object in the modern format to complete the migration - // We upgrade to a write lock BEFORE running the migration tool to prevent concurrent migration attempts - s.locks.RUnlock(key) - // re-acquire read lock if anything fails or when we are done - defer s.locks.RLock(ctx, key) - - if lockErr := s.locks.Lock(ctx, key); lockErr != nil { - logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key)) - return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr) - } - defer s.locks.Unlock(key) - - // Re-check if the file still needs migration now that we have the write lock - // Another thread might have finished the migration while we were waiting for the lock - payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0) - if err == nil { - decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry)) - errRetry := decoderRetry.Decode(objPtr) - _ = payloadFileRetry.Close() - if errRetry == nil { - logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key)) - return nil - } - } - - typeName := "ApplicationProfile" - if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok { - typeName = "ContainerProfile" - } else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok { - typeName = "SeccompProfile" - } - - // Run migration tool: /usr/bin/migration -file -type - migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second) - defer migrationCancel() - - cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(p), "-type", typeName) - var out bytes.Buffer - var stderr bytes.Buffer - cmd.Stdout = &out - cmd.Stderr = &stderr - if runErr := cmd.Run(); runErr != nil { - if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) { - logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key)) - return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err()) + // Acquire a write lock before running the migration tool to prevent + // concurrent migration attempts. All lock transitions are explicit + // (no deferred lock calls) so that errors can be checked and the lock + // state is always well-defined on every return path. + var migErr error + switch lockState { + case hasReadLock: + // Drop the caller's read lock and upgrade to write lock. + // After migration, restore the read lock so the caller's deferred + // RUnlock (in GetWithConn) has a matching acquire. + s.locks.RUnlock(key) + if lockErr := s.locks.Lock(ctx, key); lockErr != nil { + logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key)) + // Best-effort: restore the read lock so the caller's deferred + // RUnlock is not left unmatched. + if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil { + logger.L().Ctx(ctx).Error("Get - failed to restore read lock after write lock failure", helpers.Error(rlockErr), helpers.String("key", key)) + } + return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr) } - logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key)) - // If migration tool fails, treat as corrupted and delete - _ = DeleteMetadata(conn, key, nil) - _ = s.appFs.Remove(makePayloadPath(p)) - if opts.IgnoreNotFound { - return runtime.SetZeroValue(objPtr) - } else { - return storage.NewKeyNotFoundError(key, 0) + migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr) + s.locks.Unlock(key) + if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil { + logger.L().Ctx(ctx).Error("Get - failed to restore read lock after migration", helpers.Error(rlockErr), helpers.String("key", key)) + return fmt.Errorf("failed to restore read lock after migration: %w", rlockErr) } - } - // Migration tool outputted JSON, unmarshal it into objPtr - if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil { - logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key)) - return unmarshalErr - } - - logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key)) + case noLock: + // We hold a temporary read lock (ownedRLock=true); release it and + // upgrade to write lock. Do not re-acquire afterwards — there is + // no outer caller expecting a read lock to be held. + ownedRLock = false + s.locks.RUnlock(key) + if lockErr := s.locks.Lock(ctx, key); lockErr != nil { + logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key)) + return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr) + } + migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr) + s.locks.Unlock(key) - if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil { - logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key)) - } else { - logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key)) + case hasWriteLock: + // Already holding the write lock — just migrate. + migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr) } - - return nil + return migErr } if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) { @@ -573,6 +575,71 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op return err } +// migrateObject runs the external migration tool and unmarshals the output into objPtr. +// It is used by get() to migrate objects that need external migration. +// The caller must hold the write lock for key before calling this. +func (s *StorageImpl) migrateObject(ctx context.Context, conn *sqlite.Conn, path, key string, opts storage.GetOptions, objPtr runtime.Object) error { + // Re-check if the file still needs migration — another goroutine may have + // already migrated it while we were waiting for the write lock. + payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(path), os.O_RDONLY, 0) + if err == nil { + decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry)) + errRetry := decoderRetry.Decode(objPtr) + _ = payloadFileRetry.Close() + if errRetry == nil { + logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key)) + return nil + } + } + + typeName := "ApplicationProfile" + if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok { + typeName = "ContainerProfile" + } else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok { + typeName = "SeccompProfile" + } + + migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second) + defer migrationCancel() + + cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(path), "-type", typeName) + var out bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &stderr + if runErr := cmd.Run(); runErr != nil { + if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) { + logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key)) + return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err()) + } + logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key)) + // If migration tool fails, treat as corrupted and delete + _ = DeleteMetadata(conn, key, nil) + _ = s.appFs.Remove(makePayloadPath(path)) + if opts.IgnoreNotFound { + return runtime.SetZeroValue(objPtr) + } else { + return storage.NewKeyNotFoundError(key, 0) + } + } + + // Migration tool outputted JSON, unmarshal it into objPtr + if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil { + logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key)) + return unmarshalErr + } + + logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key)) + + if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil { + logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key)) + } else { + logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key)) + } + + return nil +} + // GetList unmarshalls objects found at key into a *List api object (an object // that satisfies runtime.IsList definition). // If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive` @@ -623,7 +690,7 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke v.Set(reflect.MakeSlice(v.Type(), 0, len(list))) for _, k := range list { obj := reflect.New(elem).Interface().(runtime.Object) - if err := s.get(ctx, conn, k, storage.GetOptions{}, obj); err != nil { + if err := s.get(ctx, conn, k, storage.GetOptions{}, obj, noLock); err != nil { logger.L().Ctx(ctx).Error("GetList - get object failed", helpers.Error(err), helpers.String("key", k)) } v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) @@ -807,7 +874,7 @@ func (s *StorageImpl) GuaranteedUpdateWithConn( getCurrentState := func() (*objState, error) { objPtr := reflect.New(v.Type()).Interface().(runtime.Object) - err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr) + err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr, hasWriteLock) if err != nil { logger.L().Ctx(ctx).Error("GuaranteedUpdate - get failed", helpers.Error(err), helpers.String("key", key)) return nil, err