Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/registry/file/containerprofile_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *ContainerProfileStorageImpl) GetStorageImpl() *StorageImpl {
func (c *ContainerProfileStorageImpl) GetTsContainerProfile(ctx context.Context, key string) (softwarecomposition.ContainerProfile, error) {
conn := ctx.Value(connKey).(*sqlite.Conn)
tsProfile := softwarecomposition.ContainerProfile{}
err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile) // get instead of GetWithConn to bypass locking
err := c.storageImpl.get(ctx, conn, key, storage.GetOptions{}, &tsProfile, noLock) // get instead of GetWithConn to bypass locking
return tsProfile, err
}

Expand Down
209 changes: 138 additions & 71 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,21 @@ func (s *StorageImpl) GetWithConn(ctx context.Context, conn *sqlite.Conn, key st
if lockDuration > time.Second {
logger.L().Debug("Get", helpers.String("key", key), helpers.String("lockDuration", lockDuration.String()))
}
return s.get(ctx, conn, key, opts, objPtr)
return s.get(ctx, conn, key, opts, objPtr, hasReadLock)
}

// getLockState describes what lock the caller holds when invoking get(), so the
// migration path can manage locks correctly without crashing or deadlocking.
type getLockState int

const (
noLock getLockState = iota // caller holds no lock; migration acquires a temporary write lock
hasReadLock // caller holds a read lock; migration upgrades to write lock then restores read lock
hasWriteLock // caller already holds the write lock; migration runs without lock changes
)

// get is a helper function for Get to allow calls without locks from other methods that already have them
func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object) error {
func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, opts storage.GetOptions, objPtr runtime.Object, lockState getLockState) error {
p := filepath.Join(s.root, key)
if opts.ResourceVersion == softwarecomposition.ResourceVersionMetadata {
// get metadata from SQLite
Expand All @@ -455,6 +465,25 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
}
return json.Unmarshal(metadata, objPtr)
}

// noLock callers perform unsynchronized file I/O by default. Acquire a
// temporary read lock so that a concurrent saveObject cannot truncate or
// overwrite the file while we are decoding it. The lock is released via
// the deferred cleanup below; the migration path clears ownedRLock before
// calling RUnlock itself so the defer becomes a no-op.
ownedRLock := false
if lockState == noLock {
if err := s.locks.RLock(ctx, key); err != nil {
return apierrors.NewTimeoutError(fmt.Sprintf("rlock: %v", err), 0)
}
ownedRLock = true
defer func() {
if ownedRLock {
s.locks.RUnlock(key)
}
}()
}

payloadFile, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0)
if err != nil {
if errors.Is(err, afero.ErrFileNotFound) {
Expand Down Expand Up @@ -484,78 +513,51 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
if strings.Contains(err.Error(), "gob: wrong type") || strings.Contains(err.Error(), "extra fields") {
logger.L().Ctx(ctx).Info("Get - detected gob type mismatch, attempting external migration", helpers.Error(err), helpers.String("key", key))

// Rewrite the object in the modern format to complete the migration
// We upgrade to a write lock BEFORE running the migration tool to prevent concurrent migration attempts
s.locks.RUnlock(key)
// re-acquire read lock if anything fails or when we are done
defer s.locks.RLock(ctx, key)

if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
}
defer s.locks.Unlock(key)

// Re-check if the file still needs migration now that we have the write lock
// Another thread might have finished the migration while we were waiting for the lock
payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(p), os.O_RDONLY, 0)
if err == nil {
decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry))
errRetry := decoderRetry.Decode(objPtr)
_ = payloadFileRetry.Close()
if errRetry == nil {
logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key))
return nil
}
}

typeName := "ApplicationProfile"
if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok {
typeName = "ContainerProfile"
} else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok {
typeName = "SeccompProfile"
}

// Run migration tool: /usr/bin/migration -file <path> -type <typeName>
migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second)
defer migrationCancel()

cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(p), "-type", typeName)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if runErr := cmd.Run(); runErr != nil {
if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) {
logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key))
return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err())
// Acquire a write lock before running the migration tool to prevent
// concurrent migration attempts. All lock transitions are explicit
// (no deferred lock calls) so that errors can be checked and the lock
// state is always well-defined on every return path.
var migErr error
switch lockState {
case hasReadLock:
// Drop the caller's read lock and upgrade to write lock.
// After migration, restore the read lock so the caller's deferred
// RUnlock (in GetWithConn) has a matching acquire.
s.locks.RUnlock(key)
if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
// Best-effort: restore the read lock so the caller's deferred
// RUnlock is not left unmatched.
if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to restore read lock after write lock failure", helpers.Error(rlockErr), helpers.String("key", key))
}
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
}
logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key))
// If migration tool fails, treat as corrupted and delete
_ = DeleteMetadata(conn, key, nil)
_ = s.appFs.Remove(makePayloadPath(p))
if opts.IgnoreNotFound {
return runtime.SetZeroValue(objPtr)
} else {
return storage.NewKeyNotFoundError(key, 0)
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
s.locks.Unlock(key)
if rlockErr := s.locks.RLock(ctx, key); rlockErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to restore read lock after migration", helpers.Error(rlockErr), helpers.String("key", key))
return fmt.Errorf("failed to restore read lock after migration: %w", rlockErr)
}
}

// Migration tool outputted JSON, unmarshal it into objPtr
if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil {
logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key))
return unmarshalErr
}

logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key))
case noLock:
// We hold a temporary read lock (ownedRLock=true); release it and
// upgrade to write lock. Do not re-acquire afterwards — there is
// no outer caller expecting a read lock to be held.
ownedRLock = false
s.locks.RUnlock(key)
if lockErr := s.locks.Lock(ctx, key); lockErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to acquire write lock for migration", helpers.Error(lockErr), helpers.String("key", key))
return fmt.Errorf("failed to acquire write lock for migration: %w", lockErr)
}
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
s.locks.Unlock(key)

if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key))
} else {
logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key))
case hasWriteLock:
// Already holding the write lock — just migrate.
migErr = s.migrateObject(ctx, conn, p, key, opts, objPtr)
}

return nil
return migErr
}

if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
Expand All @@ -573,6 +575,71 @@ func (s *StorageImpl) get(ctx context.Context, conn *sqlite.Conn, key string, op
return err
}

// migrateObject runs the external migration tool and unmarshals the output into objPtr.
// It is used by get() to migrate objects that need external migration.
// The caller must hold the write lock for key before calling this.
func (s *StorageImpl) migrateObject(ctx context.Context, conn *sqlite.Conn, path, key string, opts storage.GetOptions, objPtr runtime.Object) error {
// Re-check if the file still needs migration — another goroutine may have
// already migrated it while we were waiting for the write lock.
payloadFileRetry, err := s.openPayloadFileWithFallback(makePayloadPath(path), os.O_RDONLY, 0)
if err == nil {
decoderRetry := gob.NewDecoder(NewDirectIOReader(payloadFileRetry))
errRetry := decoderRetry.Decode(objPtr)
_ = payloadFileRetry.Close()
if errRetry == nil {
logger.L().Ctx(ctx).Info("Get - migration already completed by another thread", helpers.String("key", key))
return nil
}
}

typeName := "ApplicationProfile"
if _, ok := objPtr.(*softwarecomposition.ContainerProfile); ok {
typeName = "ContainerProfile"
} else if _, ok := objPtr.(*softwarecomposition.SeccompProfile); ok {
typeName = "SeccompProfile"
}

migrationCtx, migrationCancel := context.WithTimeout(ctx, 30*time.Second)
defer migrationCancel()

cmd := exec.CommandContext(migrationCtx, "/usr/bin/migration", "-file", makePayloadPath(path), "-type", typeName)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if runErr := cmd.Run(); runErr != nil {
if errors.Is(migrationCtx.Err(), context.DeadlineExceeded) {
logger.L().Ctx(ctx).Error("Get - migration tool timed out", helpers.String("key", key))
return fmt.Errorf("migration tool timed out: %w", migrationCtx.Err())
}
logger.L().Ctx(ctx).Error("Get - migration tool failed", helpers.Error(runErr), helpers.String("stderr", stderr.String()), helpers.String("key", key))
// If migration tool fails, treat as corrupted and delete
_ = DeleteMetadata(conn, key, nil)
_ = s.appFs.Remove(makePayloadPath(path))
if opts.IgnoreNotFound {
return runtime.SetZeroValue(objPtr)
} else {
return storage.NewKeyNotFoundError(key, 0)
}
}

// Migration tool outputted JSON, unmarshal it into objPtr
if unmarshalErr := json.Unmarshal(out.Bytes(), objPtr); unmarshalErr != nil {
logger.L().Ctx(ctx).Error("Get - unmarshal migrated JSON failed", helpers.Error(unmarshalErr), helpers.String("key", key))
return unmarshalErr
}

logger.L().Ctx(ctx).Info("Get - external migration successful", helpers.String("key", key))

if saveErr := s.saveObject(conn, key, objPtr, nil, ""); saveErr != nil {
logger.L().Ctx(ctx).Error("Get - failed to rewrite migrated object", helpers.Error(saveErr), helpers.String("key", key))
} else {
logger.L().Ctx(ctx).Info("Get - successfully migrated object to modern format", helpers.String("key", key))
}

return nil
}

// GetList unmarshalls objects found at key into a *List api object (an object
// that satisfies runtime.IsList definition).
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive`
Expand Down Expand Up @@ -623,7 +690,7 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke
v.Set(reflect.MakeSlice(v.Type(), 0, len(list)))
for _, k := range list {
obj := reflect.New(elem).Interface().(runtime.Object)
if err := s.get(ctx, conn, k, storage.GetOptions{}, obj); err != nil {
if err := s.get(ctx, conn, k, storage.GetOptions{}, obj, noLock); err != nil {
logger.L().Ctx(ctx).Error("GetList - get object failed", helpers.Error(err), helpers.String("key", k))
}
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
Expand Down Expand Up @@ -807,7 +874,7 @@ func (s *StorageImpl) GuaranteedUpdateWithConn(

getCurrentState := func() (*objState, error) {
objPtr := reflect.New(v.Type()).Interface().(runtime.Object)
err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr)
err := s.get(ctx, conn, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, objPtr, hasWriteLock)
if err != nil {
logger.L().Ctx(ctx).Error("GuaranteedUpdate - get failed", helpers.Error(err), helpers.String("key", key))
return nil, err
Expand Down
Loading