diff --git a/packages/orchestrator/pkg/factories/run.go b/packages/orchestrator/pkg/factories/run.go index 0a07b39e7d..f7d38fa592 100644 --- a/packages/orchestrator/pkg/factories/run.go +++ b/packages/orchestrator/pkg/factories/run.go @@ -37,6 +37,7 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/pkg/metrics" "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy" nfscfg "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" "github.com/e2b-dev/infra/packages/orchestrator/pkg/portmap" "github.com/e2b-dev/infra/packages/orchestrator/pkg/proxy" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox" @@ -99,9 +100,10 @@ type EgressFactory func(ctx context.Context, deps *Deps) (*EgressSetup, error) // Options configures the orchestrator with edition-specific behavior. type Options struct { - Version string - CommitSHA string - EgressFactory EgressFactory + Version string + CommitSHA string + EgressFactory EgressFactory + NFSInterceptors []middleware.Interceptor } type closer struct { @@ -588,7 +590,7 @@ func run(config cfg.Config, opts Options) (success bool) { // nfs proxy server if len(config.PersistentVolumeMounts) > 0 { - nfsClosers, err := startNFSProxy(ctx, config, builder, startService, sandboxes) + nfsClosers, err := startNFSProxy(ctx, config, builder, startService, sandboxes, opts) if err != nil { logger.L().Fatal(ctx, "failed to start nfs proxy", zap.Error(err)) } @@ -787,13 +789,7 @@ func run(config cfg.Config, opts Options) (success bool) { return success } -func startNFSProxy( - ctx context.Context, - config cfg.Config, - builder *chrooted.Builder, - startService func(name string, f func() error), - sandboxes *sandbox.Map, -) ([]closer, error) { +func startNFSProxy(ctx context.Context, config cfg.Config, builder *chrooted.Builder, startService func(name string, f func() error), sandboxes *sandbox.Map, opts Options) ([]closer, error) { var closers []closer // portmapper listener @@ -826,6 +822,7 @@ func startNFSProxy( RecordHandleCalls: config.NFSProxyRecordHandleCalls, RecordStatCalls: config.NFSProxyRecordStatCalls, NFSLogLevel: config.NFSProxyLogLevel, + Interceptors: opts.NFSInterceptors, }) if err != nil { return nil, fmt.Errorf("failed to create nfs proxy: %w", err) diff --git a/packages/orchestrator/pkg/nfsproxy/cfg/model.go b/packages/orchestrator/pkg/nfsproxy/cfg/model.go index 05576a0803..246409d682 100644 --- a/packages/orchestrator/pkg/nfsproxy/cfg/model.go +++ b/packages/orchestrator/pkg/nfsproxy/cfg/model.go @@ -1,6 +1,10 @@ package cfg -import "github.com/willscott/go-nfs" +import ( + "github.com/willscott/go-nfs" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" +) type Config struct { Logging bool @@ -9,4 +13,5 @@ type Config struct { RecordStatCalls bool RecordHandleCalls bool NFSLogLevel nfs.LogLevel + Interceptors []middleware.Interceptor } diff --git a/packages/orchestrator/pkg/nfsproxy/e2e_test.go b/packages/orchestrator/pkg/nfsproxy/e2e_test.go index 535694950d..b257dd2af7 100644 --- a/packages/orchestrator/pkg/nfsproxy/e2e_test.go +++ b/packages/orchestrator/pkg/nfsproxy/e2e_test.go @@ -19,6 +19,7 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/exec" "github.com/testcontainers/testcontainers-go/log" + "github.com/willscott/go-nfs" "go.uber.org/zap" "github.com/e2b-dev/infra/packages/orchestrator/pkg/cfg" @@ -140,7 +141,10 @@ func TestIntegrationTest(t *testing.T) { createVolumeDir(t, builder, volumeType, teamID, volumeID) - s, err := NewProxy(t.Context(), builder, sandboxes, nfscfg.Config{}) + s, err := NewProxy(t.Context(), builder, sandboxes, nfscfg.Config{ + Logging: true, + NFSLogLevel: nfs.DebugLevel, + }) require.NoError(t, err) go func() { err := s.Serve(nfsListener) diff --git a/packages/orchestrator/pkg/nfsproxy/logged/change.go b/packages/orchestrator/pkg/nfsproxy/logged/change.go deleted file mode 100644 index 9912795099..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/logged/change.go +++ /dev/null @@ -1,48 +0,0 @@ -package logged - -import ( - "context" - "os" - "time" - - "github.com/go-git/go-billy/v5" -) - -type loggedChange struct { - ctx context.Context //nolint:containedctx // can't change the API, still need it - inner billy.Change -} - -var _ billy.Change = (*loggedChange)(nil) - -func newChange(ctx context.Context, change billy.Change) loggedChange { - return loggedChange{ctx: ctx, inner: change} -} - -func (s loggedChange) Chmod(name string, mode os.FileMode) (err error) { - finish := logStart(s.ctx, "Change.Chmod", name, mode) - defer func() { finish(s.ctx, err) }() - - return s.inner.Chmod(name, mode) -} - -func (s loggedChange) Lchown(name string, uid, gid int) (err error) { - finish := logStart(s.ctx, "Change.Lchown", name, uid, gid) - defer func() { finish(s.ctx, err) }() - - return s.inner.Lchown(name, uid, gid) -} - -func (s loggedChange) Chown(name string, uid, gid int) (err error) { - finish := logStart(s.ctx, "Change.Chown", name, uid, gid) - defer func() { finish(s.ctx, err) }() - - return s.inner.Chown(name, uid, gid) -} - -func (s loggedChange) Chtimes(name string, atime time.Time, mtime time.Time) (err error) { - finish := logStart(s.ctx, "Change.Chtimes", name, atime, mtime) - defer func() { finish(s.ctx, err) }() - - return s.inner.Chtimes(name, atime, mtime) -} diff --git a/packages/orchestrator/pkg/nfsproxy/logged/file.go b/packages/orchestrator/pkg/nfsproxy/logged/file.go deleted file mode 100644 index 41c9717b27..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/logged/file.go +++ /dev/null @@ -1,78 +0,0 @@ -package logged - -import ( - "context" - - "github.com/go-git/go-billy/v5" -) - -type loggedFile struct { - ctx context.Context //nolint:containedctx // can't change the API, still need it - inner billy.File -} - -var _ billy.File = (*loggedFile)(nil) - -func wrapFile(ctx context.Context, f billy.File) billy.File { - return &loggedFile{ctx: ctx, inner: f} -} - -func (l loggedFile) Name() string { - return l.inner.Name() -} - -func (l loggedFile) Write(p []byte) (n int, err error) { - finish := logStart(l.ctx, "File.Write", len(p)) - defer func() { finish(l.ctx, err, n) }() - - return l.inner.Write(p) -} - -func (l loggedFile) Read(p []byte) (n int, err error) { - finish := logStart(l.ctx, "File.Read", len(p)) - defer func() { finish(l.ctx, err, n) }() - - return l.inner.Read(p) -} - -func (l loggedFile) ReadAt(p []byte, off int64) (n int, err error) { - finish := logStart(l.ctx, "File.ReadAt", len(p), off) - defer func() { finish(l.ctx, err, n) }() - - return l.inner.ReadAt(p, off) -} - -func (l loggedFile) Seek(offset int64, whence int) (n int64, err error) { - finish := logStart(l.ctx, "File.Seek", offset, whence) - defer func() { finish(l.ctx, err, n) }() - - return l.inner.Seek(offset, whence) -} - -func (l loggedFile) Close() (err error) { - finish := logStart(l.ctx, "File.Close") - defer func() { finish(l.ctx, err) }() - - return l.inner.Close() -} - -func (l loggedFile) Lock() (err error) { - finish := logStart(l.ctx, "File.Lock") - defer func() { finish(l.ctx, err) }() - - return l.inner.Lock() -} - -func (l loggedFile) Unlock() (err error) { - finish := logStart(l.ctx, "File.Unlock") - defer func() { finish(l.ctx, err) }() - - return l.inner.Unlock() -} - -func (l loggedFile) Truncate(size int64) (err error) { - finish := logStart(l.ctx, "File.Truncate", size) - defer func() { finish(l.ctx, err) }() - - return l.inner.Truncate(size) -} diff --git a/packages/orchestrator/pkg/nfsproxy/logged/fs.go b/packages/orchestrator/pkg/nfsproxy/logged/fs.go deleted file mode 100644 index 780c555cfa..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/logged/fs.go +++ /dev/null @@ -1,153 +0,0 @@ -package logged - -import ( - "context" - "os" - - "github.com/go-git/go-billy/v5" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type loggedFS struct { - ctx context.Context //nolint:containedctx // can't change the API, still need it - inner billy.Filesystem - config cfg.Config -} - -var _ billy.Filesystem = (*loggedFS)(nil) - -func wrapFS(ctx context.Context, fs billy.Filesystem, config cfg.Config) loggedFS { - return loggedFS{ctx: ctx, inner: fs, config: config} -} - -func (l loggedFS) Create(filename string) (f billy.File, err error) { - finish := logStart(l.ctx, "FS.Create", filename) - defer func() { finish(l.ctx, err, f) }() - - f, err = l.inner.Create(filename) - f = wrapFile(l.ctx, f) - - return -} - -func (l loggedFS) Open(filename string) (f billy.File, err error) { - finish := logStart(l.ctx, "FS.Open", filename) - defer func() { finish(l.ctx, err, f) }() - - f, err = l.inner.Open(filename) - f = wrapFile(l.ctx, f) - - return -} - -func (l loggedFS) OpenFile(filename string, flag int, perm os.FileMode) (f billy.File, err error) { - finish := logStart(l.ctx, "FS.OpenFile", filename, flag, perm) - defer func() { finish(l.ctx, err, f) }() - - f, err = l.inner.OpenFile(filename, flag, perm) - f = wrapFile(l.ctx, f) - - return -} - -func (l loggedFS) Stat(filename string) (fi os.FileInfo, err error) { - if !l.config.RecordStatCalls { - return l.inner.Stat(filename) - } - - finish := logStart(l.ctx, "FS.Stat", filename) - defer func() { finish(l.ctx, err, fi) }() - - return l.inner.Stat(filename) -} - -func (l loggedFS) Rename(oldpath, newpath string) (err error) { - finish := logStart(l.ctx, "FS.Rename", oldpath, newpath) - defer func() { finish(l.ctx, err) }() - - return l.inner.Rename(oldpath, newpath) -} - -func (l loggedFS) Remove(filename string) (err error) { - finish := logStart(l.ctx, "FS.Remove", filename) - defer func() { finish(l.ctx, err) }() - - return l.inner.Remove(filename) -} - -func (l loggedFS) Join(elem ...string) (path string) { - finish := logStart(l.ctx, "FS.Join", elem) - defer func() { finish(l.ctx, nil, path) }() - - return l.inner.Join(elem...) -} - -func (l loggedFS) TempFile(dir, prefix string) (f billy.File, err error) { - finish := logStart(l.ctx, "FS.TempFile", dir, prefix) - defer func() { finish(l.ctx, err, f) }() - - f, err = l.inner.TempFile(dir, prefix) - f = wrapFile(l.ctx, f) - - return -} - -func (l loggedFS) ReadDir(path string) (fi []os.FileInfo, err error) { - finish := logStart(l.ctx, "FS.ReadDir", path) - defer func() { finish(l.ctx, err, fi) }() - - return l.inner.ReadDir(path) -} - -func (l loggedFS) MkdirAll(filename string, perm os.FileMode) (err error) { - finish := logStart(l.ctx, "FS.MkdirAll", filename, perm) - defer func() { finish(l.ctx, err) }() - - return l.inner.MkdirAll(filename, perm) -} - -func (l loggedFS) Lstat(filename string) (fi os.FileInfo, err error) { - if !l.config.RecordStatCalls { - return l.inner.Lstat(filename) - } - - finish := logStart(l.ctx, "FS.Lstat", filename) - defer func() { finish(l.ctx, err, fi) }() - - return l.inner.Lstat(filename) -} - -func (l loggedFS) Symlink(target, link string) (err error) { - finish := logStart(l.ctx, "FS.Symlink", target, link) - defer func() { finish(l.ctx, err) }() - - return l.inner.Symlink(target, link) -} - -func (l loggedFS) Readlink(link string) (target string, err error) { - finish := logStart(l.ctx, "FS.Readlink", link) - defer func() { finish(l.ctx, err, target) }() - - return l.inner.Readlink(link) -} - -func (l loggedFS) Chroot(path string) (fs billy.Filesystem, err error) { - finish := logStart(l.ctx, "FS.Chroot", path) - defer func() { finish(l.ctx, err, fs) }() - - inner, err := l.inner.Chroot(path) - if err != nil { - return nil, err - } - - return wrapFS(l.ctx, inner, l.config), nil -} - -func (l loggedFS) Root() string { - return l.inner.Root() -} - -func (l loggedFS) Unwrap() billy.Filesystem { - return l.inner -} diff --git a/packages/orchestrator/pkg/nfsproxy/logged/handler.go b/packages/orchestrator/pkg/nfsproxy/logged/handler.go deleted file mode 100644 index e73bea5687..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/logged/handler.go +++ /dev/null @@ -1,98 +0,0 @@ -package logged - -import ( - "context" - "fmt" - "net" - - "github.com/go-git/go-billy/v5" - "github.com/willscott/go-nfs" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type loggedHandler struct { - ctx context.Context //nolint:containedctx // can't change the API, still need it - inner nfs.Handler - config cfg.Config -} - -var _ nfs.Handler = (*loggedHandler)(nil) - -func WrapWithLogging(ctx context.Context, handler nfs.Handler, config cfg.Config) nfs.Handler { - return loggedHandler{ctx: ctx, inner: handler, config: config} -} - -func (e loggedHandler) Mount(ctx context.Context, conn net.Conn, request nfs.MountRequest) (s nfs.MountStatus, fs billy.Filesystem, auth []nfs.AuthFlavor) { - finish := logStart(ctx, "Handler.Mount", - fmt.Sprintf("net.Conn{LocalAddr=%q, RemoteAddr=%q}", conn.LocalAddr(), conn.RemoteAddr()), - fmt.Sprintf("nfs.MountRequest{Dirpath=%q}", string(request.Dirpath))) - defer func() { - var err error - if s != nfs.MountStatusOk { - err = fmt.Errorf("mount status = %d", s) - } - finish(ctx, err) - }() - - s, fs, auth = e.inner.Mount(ctx, conn, request) - fs = wrapFS(ctx, fs, e.config) - - return -} - -func (e loggedHandler) Change(ctx context.Context, filesystem billy.Filesystem) billy.Change { - finish := logStart(ctx, "Handler.Change") - defer finish(ctx, nil) - - change := e.inner.Change(ctx, filesystem) - - return newChange(ctx, change) -} - -func (e loggedHandler) FSStat(ctx context.Context, filesystem billy.Filesystem, stat *nfs.FSStat) (err error) { - finish := logStart(ctx, "Handler.FSStat") - defer func() { finish(ctx, err) }() - - return e.inner.FSStat(ctx, filesystem, stat) -} - -func (e loggedHandler) ToHandle(ctx context.Context, fs billy.Filesystem, path []string) (fh []byte) { - if !e.config.RecordHandleCalls { - return e.inner.ToHandle(ctx, fs, path) - } - - finish := logStart(ctx, "Handler.ToHandle", path) - defer func() { finish(ctx, nil, fh) }() - - return e.inner.ToHandle(ctx, fs, path) -} - -func (e loggedHandler) FromHandle(ctx context.Context, fh []byte) (fs billy.Filesystem, paths []string, err error) { - if !e.config.RecordHandleCalls { - return e.inner.FromHandle(ctx, fh) - } - - finish := logStart(ctx, "Handler.FromHandle", fh) - defer func() { finish(ctx, err, paths) }() - - return e.inner.FromHandle(ctx, fh) -} - -func (e loggedHandler) InvalidateHandle(ctx context.Context, filesystem billy.Filesystem, bytes []byte) (err error) { - if !e.config.RecordHandleCalls { - return e.inner.InvalidateHandle(ctx, filesystem, bytes) - } - - finish := logStart(ctx, "Handler.InvalidateHandle") - defer func() { finish(ctx, err) }() - - return e.inner.InvalidateHandle(ctx, filesystem, bytes) -} - -func (e loggedHandler) HandleLimit() int { - finish := logStart(e.ctx, "Handler.HandleLimit") - defer finish(e.ctx, nil) - - return e.inner.HandleLimit() -} diff --git a/packages/orchestrator/pkg/nfsproxy/logged/util.go b/packages/orchestrator/pkg/nfsproxy/logged/util.go deleted file mode 100644 index 70f36f32be..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/logged/util.go +++ /dev/null @@ -1,39 +0,0 @@ -package logged - -import ( - "context" - "fmt" - "time" - - "github.com/google/uuid" - "go.uber.org/zap" - - "github.com/e2b-dev/infra/packages/shared/pkg/logger" -) - -func logStart(ctx context.Context, s string, args ...any) func(context.Context, error, ...any) { - start := time.Now() - requestID := uuid.NewString() - - l := logger.L().With(zap.String("requestID", requestID)) - l.Debug(ctx, fmt.Sprintf("[nfs proxy] %s: start", s), zap.String("operation", s)) - - return func(ctx context.Context, err error, result ...any) { - args := []zap.Field{ - zap.Duration("dur", time.Since(start)), - zap.Any("args", args), - zap.Any("result", result), - } - - var log func(context.Context, string, ...zap.Field) - if err == nil { - log = l.Debug - } else { - log = l.Warn - args = append(args, zap.Error(err)) - // args = append(args, zap.Stack("stack")) - } - - log(ctx, fmt.Sprintf("[nfs proxy] %s: end", s), args...) - } -} diff --git a/packages/orchestrator/pkg/nfsproxy/metrics/change.go b/packages/orchestrator/pkg/nfsproxy/metrics/change.go deleted file mode 100644 index 5fb916b2a4..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/metrics/change.go +++ /dev/null @@ -1,48 +0,0 @@ -package metrics - -import ( - "context" - "os" - "time" - - "github.com/go-git/go-billy/v5" -) - -type metricsChange struct { - ctx context.Context //nolint:containedctx - inner billy.Change -} - -var _ billy.Change = (*metricsChange)(nil) - -func newChange(ctx context.Context, change billy.Change) billy.Change { - return &metricsChange{ctx: ctx, inner: change} -} - -func (m *metricsChange) Chmod(name string, mode os.FileMode) (err error) { - finish := recordCall(m.ctx, "Change.Chmod") - defer func() { finish(err) }() - - return m.inner.Chmod(name, mode) -} - -func (m *metricsChange) Lchown(name string, uid, gid int) (err error) { - finish := recordCall(m.ctx, "Change.Lchown") - defer func() { finish(err) }() - - return m.inner.Lchown(name, uid, gid) -} - -func (m *metricsChange) Chown(name string, uid, gid int) (err error) { - finish := recordCall(m.ctx, "Change.Chown") - defer func() { finish(err) }() - - return m.inner.Chown(name, uid, gid) -} - -func (m *metricsChange) Chtimes(name string, atime time.Time, mtime time.Time) (err error) { - finish := recordCall(m.ctx, "Change.Chtimes") - defer func() { finish(err) }() - - return m.inner.Chtimes(name, atime, mtime) -} diff --git a/packages/orchestrator/pkg/nfsproxy/metrics/file.go b/packages/orchestrator/pkg/nfsproxy/metrics/file.go deleted file mode 100644 index 4a7f254b57..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/metrics/file.go +++ /dev/null @@ -1,85 +0,0 @@ -package metrics - -import ( - "context" - - "github.com/go-git/go-billy/v5" -) - -type metricsFile struct { - ctx context.Context //nolint:containedctx - inner billy.File - finishOpen finishFunc -} - -var _ billy.File = (*metricsFile)(nil) - -func wrapFile(ctx context.Context, f billy.File, finish finishFunc) billy.File { - return &metricsFile{ctx: ctx, inner: f, finishOpen: finish} -} - -func (m *metricsFile) Name() string { - return m.inner.Name() -} - -func (m *metricsFile) Write(p []byte) (n int, err error) { - finish := recordCall(m.ctx, "File.Write") - defer func() { finish(err) }() - - return m.inner.Write(p) -} - -func (m *metricsFile) Read(p []byte) (n int, err error) { - finish := recordCall(m.ctx, "File.Read") - defer func() { finish(err) }() - - return m.inner.Read(p) -} - -func (m *metricsFile) ReadAt(p []byte, off int64) (n int, err error) { - finish := recordCall(m.ctx, "File.ReadAt") - defer func() { finish(err) }() - - return m.inner.ReadAt(p, off) -} - -func (m *metricsFile) Seek(offset int64, whence int) (n int64, err error) { - finish := recordCall(m.ctx, "File.Seek") - defer func() { finish(err) }() - - return m.inner.Seek(offset, whence) -} - -func (m *metricsFile) Close() (err error) { - finish := recordCall(m.ctx, "File.Close") - defer func() { - finish(err) - // End the open metric when the file is closed - if m.finishOpen != nil { - m.finishOpen(err) - } - }() - - return m.inner.Close() -} - -func (m *metricsFile) Lock() (err error) { - finish := recordCall(m.ctx, "File.Lock") - defer func() { finish(err) }() - - return m.inner.Lock() -} - -func (m *metricsFile) Unlock() (err error) { - finish := recordCall(m.ctx, "File.Unlock") - defer func() { finish(err) }() - - return m.inner.Unlock() -} - -func (m *metricsFile) Truncate(size int64) (err error) { - finish := recordCall(m.ctx, "File.Truncate") - defer func() { finish(err) }() - - return m.inner.Truncate(size) -} diff --git a/packages/orchestrator/pkg/nfsproxy/metrics/fs.go b/packages/orchestrator/pkg/nfsproxy/metrics/fs.go deleted file mode 100644 index 29393ec35b..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/metrics/fs.go +++ /dev/null @@ -1,171 +0,0 @@ -package metrics - -import ( - "context" - "os" - - "github.com/go-git/go-billy/v5" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type metricsFS struct { - ctx context.Context //nolint:containedctx - inner billy.Filesystem - - config cfg.Config -} - -var _ billy.Filesystem = (*metricsFS)(nil) - -func wrapFS(ctx context.Context, fs billy.Filesystem, config cfg.Config) billy.Filesystem { - return &metricsFS{ctx: ctx, inner: fs, config: config} -} - -func (m *metricsFS) Create(filename string) (f billy.File, err error) { - finish := recordCall(m.ctx, "FS.Create") - - f, err = m.inner.Create(filename) - if err != nil { - finish(err) - - return - } - - f = wrapFile(m.ctx, f, finish) - - return -} - -func (m *metricsFS) Open(filename string) (f billy.File, err error) { - finish := recordCall(m.ctx, "FS.Open") - - f, err = m.inner.Open(filename) - if err != nil { - finish(err) - - return - } - - f = wrapFile(m.ctx, f, finish) - - return -} - -func (m *metricsFS) OpenFile(filename string, flag int, perm os.FileMode) (f billy.File, err error) { - finish := recordCall(m.ctx, "FS.OpenFile") - - f, err = m.inner.OpenFile(filename, flag, perm) - if err != nil { - finish(err) - - return - } - - f = wrapFile(m.ctx, f, finish) - - return -} - -func (m *metricsFS) Stat(filename string) (fi os.FileInfo, err error) { - if !m.config.RecordStatCalls { - return m.inner.Stat(filename) - } - - finish := recordCall(m.ctx, "FS.Stat") - defer func() { finish(err) }() - - return m.inner.Stat(filename) -} - -func (m *metricsFS) Rename(oldpath, newpath string) (err error) { - finish := recordCall(m.ctx, "FS.Rename") - defer func() { finish(err) }() - - return m.inner.Rename(oldpath, newpath) -} - -func (m *metricsFS) Remove(filename string) (err error) { - finish := recordCall(m.ctx, "FS.Remove") - defer func() { finish(err) }() - - return m.inner.Remove(filename) -} - -func (m *metricsFS) Join(elem ...string) string { - return m.inner.Join(elem...) -} - -func (m *metricsFS) TempFile(dir, prefix string) (f billy.File, err error) { - finish := recordCall(m.ctx, "FS.TempFile") - - f, err = m.inner.TempFile(dir, prefix) - if err != nil { - finish(err) - - return - } - - f = wrapFile(m.ctx, f, finish) - - return -} - -func (m *metricsFS) ReadDir(path string) (fi []os.FileInfo, err error) { - finish := recordCall(m.ctx, "FS.ReadDir") - defer func() { finish(err) }() - - return m.inner.ReadDir(path) -} - -func (m *metricsFS) MkdirAll(filename string, perm os.FileMode) (err error) { - finish := recordCall(m.ctx, "FS.MkdirAll") - defer func() { finish(err) }() - - return m.inner.MkdirAll(filename, perm) -} - -func (m *metricsFS) Lstat(filename string) (fi os.FileInfo, err error) { - if !m.config.RecordStatCalls { - return m.inner.Lstat(filename) - } - - finish := recordCall(m.ctx, "FS.Lstat") - defer func() { finish(err) }() - - return m.inner.Lstat(filename) -} - -func (m *metricsFS) Symlink(target, link string) (err error) { - finish := recordCall(m.ctx, "FS.Symlink") - defer func() { finish(err) }() - - return m.inner.Symlink(target, link) -} - -func (m *metricsFS) Readlink(link string) (target string, err error) { - finish := recordCall(m.ctx, "FS.Readlink") - defer func() { finish(err) }() - - return m.inner.Readlink(link) -} - -func (m *metricsFS) Chroot(path string) (fs billy.Filesystem, err error) { - finish := recordCall(m.ctx, "FS.Chroot") - defer func() { finish(err) }() - - inner, err := m.inner.Chroot(path) - if err != nil { - return nil, err - } - - return wrapFS(m.ctx, inner, m.config), nil -} - -func (m *metricsFS) Root() string { - return m.inner.Root() -} - -func (m *metricsFS) Unwrap() billy.Filesystem { - return m.inner -} diff --git a/packages/orchestrator/pkg/nfsproxy/metrics/handler.go b/packages/orchestrator/pkg/nfsproxy/metrics/handler.go deleted file mode 100644 index 3b59d9f69d..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/metrics/handler.go +++ /dev/null @@ -1,100 +0,0 @@ -package metrics - -import ( - "context" - "fmt" - "net" - - "github.com/go-git/go-billy/v5" - "github.com/willscott/go-nfs" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type metricsHandler struct { - inner nfs.Handler - config cfg.Config -} - -var _ nfs.Handler = (*metricsHandler)(nil) - -func WrapWithMetrics(handler nfs.Handler, config cfg.Config) nfs.Handler { - return &metricsHandler{inner: handler, config: config} -} - -func (m *metricsHandler) Mount(ctx context.Context, conn net.Conn, request nfs.MountRequest) (s nfs.MountStatus, fs billy.Filesystem, auth []nfs.AuthFlavor) { - finish := recordCall(ctx, "NFS.Mount") - - defer func() { - var err error - if s != nfs.MountStatusOk { - err = fmt.Errorf("mount status = %d", s) - } - finish(err) - }() - - s, fs, auth = m.inner.Mount(ctx, conn, request) - if fs != nil { - fs = wrapFS(ctx, fs, m.config) - } - - return -} - -func (m *metricsHandler) Change(ctx context.Context, filesystem billy.Filesystem) billy.Change { - finish := recordCall(ctx, "NFS.Change") - defer finish(nil) - - change := m.inner.Change(ctx, filesystem) - - return newChange(ctx, change) -} - -func (m *metricsHandler) FSStat(ctx context.Context, filesystem billy.Filesystem, stat *nfs.FSStat) (err error) { - finish := recordCall(ctx, "NFS.FSStat") - defer func() { finish(err) }() - - return m.inner.FSStat(ctx, filesystem, stat) -} - -func (m *metricsHandler) ToHandle(ctx context.Context, fs billy.Filesystem, path []string) (fh []byte) { - if !m.config.RecordHandleCalls { - return m.inner.ToHandle(ctx, fs, path) - } - - finish := recordCall(ctx, "NFS.ToHandle") - defer finish(nil) - - return m.inner.ToHandle(ctx, fs, path) -} - -func (m *metricsHandler) FromHandle(ctx context.Context, fh []byte) (fs billy.Filesystem, paths []string, err error) { - if !m.config.RecordHandleCalls { - return m.inner.FromHandle(ctx, fh) - } - - finish := recordCall(ctx, "NFS.FromHandle") - defer func() { finish(err) }() - - fs, paths, err = m.inner.FromHandle(ctx, fh) - if fs != nil { - fs = wrapFS(ctx, fs, m.config) - } - - return -} - -func (m *metricsHandler) InvalidateHandle(ctx context.Context, filesystem billy.Filesystem, bytes []byte) (err error) { - if !m.config.RecordHandleCalls { - return m.inner.InvalidateHandle(ctx, filesystem, bytes) - } - - finish := recordCall(ctx, "NFS.InvalidateHandle") - defer func() { finish(err) }() - - return m.inner.InvalidateHandle(ctx, filesystem, bytes) -} - -func (m *metricsHandler) HandleLimit() int { - return m.inner.HandleLimit() -} diff --git a/packages/orchestrator/pkg/nfsproxy/metrics/util.go b/packages/orchestrator/pkg/nfsproxy/metrics/util.go deleted file mode 100644 index b180a611b2..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/metrics/util.go +++ /dev/null @@ -1,79 +0,0 @@ -package metrics - -import ( - "context" - "errors" - "os" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - - "github.com/e2b-dev/infra/packages/shared/pkg/utils" -) - -var meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/metrics") - -var ( - callsCounter = utils.Must(meter.Int64Counter("orchestrator.nfsproxy.calls.total", - metric.WithDescription("Total number of calls to the NFS proxy"), - metric.WithUnit("1"))) - durationRecorder = utils.Must(meter.Int64Histogram("orchestrator.nfsproxy.call.duration", - metric.WithDescription("Duration of calls to the NFS proxy"), - metric.WithUnit("ms"))) -) - -var ( - operationKey = attribute.Key("operation") - resultKey = attribute.Key("result") -) - -const ( - resultSuccess = "success" - resultClientError = "client_error" - resultOtherError = "other_error" -) - -type finishFunc func(error) - -func recordCall(ctx context.Context, operation string) finishFunc { - start := time.Now() - - return func(err error) { - result := classifyResult(err) - durationMs := time.Since(start).Milliseconds() - - attrs := metric.WithAttributes( - operationKey.String(operation), - resultKey.String(result), - ) - - callsCounter.Add(ctx, 1, attrs) - durationRecorder.Record(ctx, durationMs, attrs) - } -} - -func classifyResult(err error) string { - if err == nil { - return resultSuccess - } - - if isClientError(err) { - return resultClientError - } - - return resultOtherError -} - -func isClientError(err error) bool { - if errors.Is(err, os.ErrNotExist) { - return true - } - - if errors.Is(err, os.ErrExist) { - return true - } - - return false -} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/change.go b/packages/orchestrator/pkg/nfsproxy/middleware/change.go new file mode 100644 index 0000000000..acdc371f01 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/change.go @@ -0,0 +1,54 @@ +package middleware + +import ( + "context" + "os" + "time" + + "github.com/go-git/go-billy/v5" +) + +type wrappedChange struct { + inner billy.Change + chain *Chain + ctx context.Context //nolint:containedctx +} + +var _ billy.Change = (*wrappedChange)(nil) + +// WrapChange wraps a billy.Change with the interceptor chain. +func WrapChange(ctx context.Context, c billy.Change, chain *Chain) billy.Change { + if c == nil { + return nil + } + + return &wrappedChange{inner: c, chain: chain, ctx: ctx} +} + +func (w *wrappedChange) Chmod(name string, mode os.FileMode) error { + return w.chain.Exec(w.ctx, ChangeChmodRequest{Name: name, Mode: mode}, + func(_ context.Context) error { + return w.inner.Chmod(name, mode) + }) +} + +func (w *wrappedChange) Lchown(name string, uid, gid int) error { + return w.chain.Exec(w.ctx, ChangeLchownRequest{Name: name, UID: uid, GID: gid}, + func(_ context.Context) error { + return w.inner.Lchown(name, uid, gid) + }) +} + +func (w *wrappedChange) Chown(name string, uid, gid int) error { + return w.chain.Exec(w.ctx, ChangeChownRequest{Name: name, UID: uid, GID: gid}, + func(_ context.Context) error { + return w.inner.Chown(name, uid, gid) + }) +} + +func (w *wrappedChange) Chtimes(name string, atime, mtime time.Time) error { + return w.chain.Exec(w.ctx, ChangeChtimesRequest{Name: name, ATime: atime, MTime: mtime}, + func(_ context.Context) error { + return w.inner.Chtimes(name, atime, mtime) + }) +} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/file.go b/packages/orchestrator/pkg/nfsproxy/middleware/file.go new file mode 100644 index 0000000000..6b3b906016 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/file.go @@ -0,0 +1,100 @@ +package middleware + +import ( + "context" + + "github.com/go-git/go-billy/v5" +) + +type wrappedFile struct { + inner billy.File + chain *Chain + ctx context.Context //nolint:containedctx +} + +var _ billy.File = (*wrappedFile)(nil) + +// WrapFile wraps a billy.File with the interceptor chain. +func WrapFile(ctx context.Context, f billy.File, chain *Chain) billy.File { + if f == nil { + return nil + } + + return &wrappedFile{inner: f, chain: chain, ctx: ctx} +} + +func (w *wrappedFile) Name() string { + return w.inner.Name() +} + +func (w *wrappedFile) Write(p []byte) (n int, err error) { + err = w.chain.Exec(w.ctx, FileWriteRequest{Data: p}, + func(_ context.Context) error { + n, err = w.inner.Write(p) + + return err + }) + + return n, err +} + +func (w *wrappedFile) Read(p []byte) (n int, err error) { + err = w.chain.Exec(w.ctx, FileReadRequest{Buffer: p}, + func(_ context.Context) error { + n, err = w.inner.Read(p) + + return err + }) + + return n, err +} + +func (w *wrappedFile) ReadAt(p []byte, off int64) (n int, err error) { + err = w.chain.Exec(w.ctx, FileReadAtRequest{Buffer: p, Offset: off}, + func(_ context.Context) error { + n, err = w.inner.ReadAt(p, off) + + return err + }) + + return n, err +} + +func (w *wrappedFile) Seek(offset int64, whence int) (n int64, err error) { + err = w.chain.Exec(w.ctx, FileSeekRequest{Offset: offset, Whence: whence}, + func(_ context.Context) error { + n, err = w.inner.Seek(offset, whence) + + return err + }) + + return n, err +} + +func (w *wrappedFile) Close() error { + return w.chain.Exec(w.ctx, FileCloseRequest{}, + func(_ context.Context) error { + return w.inner.Close() + }) +} + +func (w *wrappedFile) Lock() error { + return w.chain.Exec(w.ctx, FileLockRequest{}, + func(_ context.Context) error { + return w.inner.Lock() + }) +} + +func (w *wrappedFile) Unlock() error { + return w.chain.Exec(w.ctx, FileUnlockRequest{}, + func(_ context.Context) error { + return w.inner.Unlock() + }) +} + +func (w *wrappedFile) Truncate(size int64) error { + return w.chain.Exec(w.ctx, FileTruncateRequest{Size: size}, + func(_ context.Context) error { + return w.inner.Truncate(size) + }) +} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/fs.go b/packages/orchestrator/pkg/nfsproxy/middleware/fs.go new file mode 100644 index 0000000000..9c9aead1a9 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/fs.go @@ -0,0 +1,165 @@ +package middleware + +import ( + "context" + "os" + + "github.com/go-git/go-billy/v5" +) + +type wrappedFS struct { + inner billy.Filesystem + chain *Chain + ctx context.Context //nolint:containedctx +} + +var _ billy.Filesystem = (*wrappedFS)(nil) + +// WrapFilesystem wraps a billy.Filesystem with the interceptor chain. +func WrapFilesystem(ctx context.Context, fs billy.Filesystem, chain *Chain) billy.Filesystem { + if fs == nil { + return nil + } + + return &wrappedFS{inner: fs, chain: chain, ctx: ctx} +} + +func (w *wrappedFS) Create(filename string) (f billy.File, err error) { + err = w.chain.Exec(w.ctx, FSCreateRequest{Filename: filename}, + func(_ context.Context) error { + f, err = w.inner.Create(filename) + + return err + }) + + return WrapFile(w.ctx, f, w.chain), err +} + +func (w *wrappedFS) Open(filename string) (f billy.File, err error) { + err = w.chain.Exec(w.ctx, FSOpenRequest{Filename: filename}, + func(_ context.Context) error { + f, err = w.inner.Open(filename) + + return err + }) + + return WrapFile(w.ctx, f, w.chain), err +} + +func (w *wrappedFS) OpenFile(filename string, flag int, perm os.FileMode) (f billy.File, err error) { + err = w.chain.Exec(w.ctx, FSOpenFileRequest{Filename: filename, Flag: flag, Perm: perm}, + func(_ context.Context) error { + f, err = w.inner.OpenFile(filename, flag, perm) + + return err + }) + + return WrapFile(w.ctx, f, w.chain), err +} + +func (w *wrappedFS) Stat(filename string) (info os.FileInfo, err error) { + err = w.chain.Exec(w.ctx, FSStatRequest{Filename: filename}, + func(_ context.Context) error { + info, err = w.inner.Stat(filename) + + return err + }) + + return info, err +} + +func (w *wrappedFS) Rename(oldpath, newpath string) error { + return w.chain.Exec(w.ctx, FSRenameRequest{OldPath: oldpath, NewPath: newpath}, + func(_ context.Context) error { + return w.inner.Rename(oldpath, newpath) + }) +} + +func (w *wrappedFS) Remove(filename string) error { + return w.chain.Exec(w.ctx, FSRemoveRequest{Filename: filename}, + func(_ context.Context) error { + return w.inner.Remove(filename) + }) +} + +func (w *wrappedFS) Join(elem ...string) string { + return w.inner.Join(elem...) +} + +func (w *wrappedFS) TempFile(dir, prefix string) (f billy.File, err error) { + err = w.chain.Exec(w.ctx, FSTempFileRequest{Dir: dir, Prefix: prefix}, + func(_ context.Context) error { + f, err = w.inner.TempFile(dir, prefix) + + return err + }) + + return WrapFile(w.ctx, f, w.chain), err +} + +func (w *wrappedFS) ReadDir(path string) (infos []os.FileInfo, err error) { + err = w.chain.Exec(w.ctx, FSReadDirRequest{Path: path}, + func(_ context.Context) error { + infos, err = w.inner.ReadDir(path) + + return err + }) + + return infos, err +} + +func (w *wrappedFS) MkdirAll(filename string, perm os.FileMode) error { + return w.chain.Exec(w.ctx, FSMkdirAllRequest{Filename: filename, Perm: perm}, + func(_ context.Context) error { + return w.inner.MkdirAll(filename, perm) + }) +} + +func (w *wrappedFS) Lstat(filename string) (info os.FileInfo, err error) { + err = w.chain.Exec(w.ctx, FSLstatRequest{Filename: filename}, + func(_ context.Context) error { + info, err = w.inner.Lstat(filename) + + return err + }) + + return info, err +} + +func (w *wrappedFS) Symlink(target, link string) error { + return w.chain.Exec(w.ctx, FSSymlinkRequest{Target: target, Link: link}, + func(_ context.Context) error { + return w.inner.Symlink(target, link) + }) +} + +func (w *wrappedFS) Readlink(link string) (target string, err error) { + err = w.chain.Exec(w.ctx, FSReadlinkRequest{Link: link}, + func(_ context.Context) error { + target, err = w.inner.Readlink(link) + + return err + }) + + return target, err +} + +func (w *wrappedFS) Chroot(path string) (fs billy.Filesystem, err error) { + err = w.chain.Exec(w.ctx, FSChrootRequest{Path: path}, + func(_ context.Context) error { + fs, err = w.inner.Chroot(path) + + return err + }) + + return WrapFilesystem(w.ctx, fs, w.chain), err +} + +func (w *wrappedFS) Root() string { + return w.inner.Root() +} + +// Unwrap returns the inner filesystem (used by go-nfs internals). +func (w *wrappedFS) Unwrap() billy.Filesystem { + return w.inner +} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/handler.go b/packages/orchestrator/pkg/nfsproxy/middleware/handler.go new file mode 100644 index 0000000000..c15a770c54 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/handler.go @@ -0,0 +1,127 @@ +package middleware + +import ( + "context" + "fmt" + "net" + + "github.com/go-git/go-billy/v5" + "github.com/willscott/go-nfs" + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +type wrappedHandler struct { + inner nfs.Handler + interceptors *Chain +} + +var _ nfs.Handler = (*wrappedHandler)(nil) + +// WrapHandler wraps an nfs.Handler with the interceptor chain. +func WrapHandler(handler nfs.Handler, interceptors *Chain) nfs.Handler { + return &wrappedHandler{inner: handler, interceptors: interceptors} +} + +func (w *wrappedHandler) Mount(ctx context.Context, conn net.Conn, req nfs.MountRequest) (nfs.MountStatus, billy.Filesystem, []nfs.AuthFlavor) { + var status nfs.MountStatus + var fs billy.Filesystem + var auth []nfs.AuthFlavor + + err := w.interceptors.Exec( + ctx, HandlerMountRequest{RemoteAddr: conn.RemoteAddr().String(), Dirpath: string(req.Dirpath)}, + func(ctx context.Context) error { + status, fs, auth = w.inner.Mount(ctx, conn, req) + if status != nfs.MountStatusOk { + return fmt.Errorf("mount status: %d", status) + } + + return nil + }, + ) + if err != nil { + logger.L().Error(ctx, "Handler.Mount interceptor error", zap.Error(err)) + + // only override the status if the interceptor returns OK + if status == nfs.MountStatusOk { + status = nfs.MountStatusErrServerFault + } + + return status, nil, nil + } + + return status, WrapFilesystem(ctx, fs, w.interceptors), auth +} + +func (w *wrappedHandler) Change(ctx context.Context, fs billy.Filesystem) (change billy.Change) { + err := w.interceptors.Exec(ctx, HandlerChangeRequest{}, + func(ctx context.Context) error { + change = w.inner.Change(ctx, fs) + + return nil + }) + if err != nil { + logger.L().Error(ctx, "Handler.Change interceptor error", zap.Error(err)) + + return nil + } + + return WrapChange(ctx, change, w.interceptors) +} + +func (w *wrappedHandler) FSStat(ctx context.Context, fs billy.Filesystem, stat *nfs.FSStat) error { + return w.interceptors.Exec(ctx, HandlerFSStatRequest{}, + func(ctx context.Context) error { + return w.inner.FSStat(ctx, fs, stat) + }, + ) +} + +func (w *wrappedHandler) ToHandle(ctx context.Context, fs billy.Filesystem, path []string) []byte { + var result []byte + + err := w.interceptors.Exec(ctx, HandlerToHandleRequest{Path: path}, + func(ctx context.Context) error { + result = w.inner.ToHandle(ctx, fs, path) + + return nil + }) + if err != nil { + logger.L().Error(ctx, "Handler.ToHandle interceptor error", + zap.Error(err), + zap.Strings("path", path)) + } + + return result +} + +func (w *wrappedHandler) FromHandle(ctx context.Context, fh []byte) (billy.Filesystem, []string, error) { + var fs billy.Filesystem + var paths []string + + err := w.interceptors.Exec(ctx, HandlerFromHandleRequest{}, + func(ctx context.Context) error { + var err error + fs, paths, err = w.inner.FromHandle(ctx, fh) + + return err + }) + + // Note: We intentionally do NOT wrap the filesystem here. + // The caching handler (inner) returns the already-wrapped filesystem + // that was stored during ToHandle (which received the wrapped fs from Mount). + // Wrapping again would cause double-interception of filesystem operations. + return fs, paths, err +} + +func (w *wrappedHandler) InvalidateHandle(ctx context.Context, fs billy.Filesystem, fh []byte) error { + return w.interceptors.Exec(ctx, HandlerInvalidateHandleRequest{}, + func(ctx context.Context) error { + return w.inner.InvalidateHandle(ctx, fs, fh) + }) +} + +func (w *wrappedHandler) HandleLimit() int { + return w.inner.HandleLimit() +} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/interceptor.go b/packages/orchestrator/pkg/nfsproxy/middleware/interceptor.go new file mode 100644 index 0000000000..d5c0ff10b7 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/interceptor.go @@ -0,0 +1,30 @@ +package middleware + +import "context" + +// Interceptor wraps an operation, calling next() to proceed to the next interceptor or the actual operation. +type Interceptor func(ctx context.Context, req Request, next func(context.Context) error) error + +// Chain holds the interceptor stack. +type Chain struct { + interceptors []Interceptor +} + +// NewChain creates a new interceptor chain. +func NewChain(interceptors ...Interceptor) *Chain { + return &Chain{interceptors: interceptors} +} + +// Exec runs the operation through all interceptors. +func (c *Chain) Exec(ctx context.Context, req Request, fn func(context.Context) error) error { + wrapped := fn + for i := len(c.interceptors) - 1; i >= 0; i-- { + interceptor := c.interceptors[i] + inner := wrapped + wrapped = func(ctx context.Context) error { + return interceptor(ctx, req, inner) + } + } + + return wrapped(ctx) +} diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/middleware_test.go b/packages/orchestrator/pkg/nfsproxy/middleware/middleware_test.go new file mode 100644 index 0000000000..7839bda090 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/middleware_test.go @@ -0,0 +1,952 @@ +package middleware_test + +import ( + "context" + "errors" + "os" + "testing" + "time" + + "github.com/go-git/go-billy/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + nfsproxymocks "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/mocks" +) + +var errTest = errors.New("test error") + +// testRequest is a simple request type for testing the chain. +type testRequest struct { + op string + data string +} + +func (r testRequest) Op() string { return r.op } + +// TestChain_ExecutesInterceptorsInOrder verifies that interceptors are executed +// in the order they were added to the chain. +func TestChain_ExecutesInterceptorsInOrder(t *testing.T) { + t.Parallel() + + var order []int + + interceptor1 := func(ctx context.Context, _ middleware.Request, next func(context.Context) error) error { + order = append(order, 1) + err := next(ctx) + order = append(order, -1) + + return err + } + + interceptor2 := func(ctx context.Context, _ middleware.Request, next func(context.Context) error) error { + order = append(order, 2) + err := next(ctx) + order = append(order, -2) + + return err + } + + chain := middleware.NewChain(interceptor1, interceptor2) + + err := chain.Exec(context.Background(), testRequest{op: "test.op"}, func(_ context.Context) error { + order = append(order, 0) + + return nil + }) + + require.NoError(t, err) + assert.Equal(t, []int{1, 2, 0, -2, -1}, order) +} + +// TestChain_PropagatesErrors verifies that errors from the inner function +// are propagated through all interceptors. +func TestChain_PropagatesErrors(t *testing.T) { + t.Parallel() + + var interceptorSawError bool + + interceptor := func(ctx context.Context, _ middleware.Request, next func(context.Context) error) error { + err := next(ctx) + interceptorSawError = err != nil + + return err + } + + chain := middleware.NewChain(interceptor) + + err := chain.Exec(context.Background(), testRequest{op: "test.op"}, func(_ context.Context) error { + return errTest + }) + + require.ErrorIs(t, err, errTest) + assert.True(t, interceptorSawError) +} + +// TestChain_InterceptorCanModifyError verifies that an interceptor can +// modify or wrap the error returned by the inner function. +func TestChain_InterceptorCanModifyError(t *testing.T) { + t.Parallel() + + wrappedErr := errors.New("wrapped error") + + interceptor := func(ctx context.Context, _ middleware.Request, next func(context.Context) error) error { + err := next(ctx) + if err != nil { + return wrappedErr + } + + return nil + } + + chain := middleware.NewChain(interceptor) + + err := chain.Exec(context.Background(), testRequest{op: "test.op"}, func(_ context.Context) error { + return errTest + }) + + require.ErrorIs(t, err, wrappedErr) +} + +// TestChain_PassesRequest verifies that the request is correctly passed to interceptors. +func TestChain_PassesRequest(t *testing.T) { + t.Parallel() + + var capturedReq middleware.Request + + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + req := testRequest{op: "File.Read", data: "test-data"} + + err := chain.Exec(context.Background(), req, func(_ context.Context) error { + return nil + }) + + require.NoError(t, err) + assert.Equal(t, "File.Read", capturedReq.Op()) + tr, ok := capturedReq.(testRequest) + require.True(t, ok) + assert.Equal(t, "test-data", tr.data) +} + +// TestChain_EmptyChain verifies that an empty chain just executes the function. +func TestChain_EmptyChain(t *testing.T) { + t.Parallel() + + chain := middleware.NewChain() + called := false + + err := chain.Exec(context.Background(), testRequest{op: "test.op"}, func(_ context.Context) error { + called = true + + return nil + }) + + require.NoError(t, err) + assert.True(t, called) +} + +// TestWrapFile_ReturnsNilForNilInput verifies that WrapFile returns nil +// when given a nil file. +func TestWrapFile_ReturnsNilForNilInput(t *testing.T) { + t.Parallel() + + chain := middleware.NewChain() + result := middleware.WrapFile(context.Background(), nil, chain) + + assert.Nil(t, result) +} + +// TestWrappedFile_Write verifies that Write calls the inner file and +// executes interceptors. +func TestWrappedFile_Write(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Write([]byte("hello")).Return(5, nil) + + var interceptorCalled bool + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + interceptorCalled = true + assert.Equal(t, "File.Write", req.Op()) + writeReq, ok := req.(middleware.FileWriteRequest) + assert.True(t, ok) + assert.Equal(t, []byte("hello"), writeReq.Data) + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + n, err := wrapped.Write([]byte("hello")) + + require.NoError(t, err) + assert.Equal(t, 5, n) + assert.True(t, interceptorCalled) +} + +// TestWrappedFile_Write_WithError verifies that Write returns both +// the bytes written and the error. +func TestWrappedFile_Write_WithError(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Write([]byte("hello")).Return(3, errTest) + + chain := middleware.NewChain() + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + n, err := wrapped.Write([]byte("hello")) + + require.ErrorIs(t, err, errTest) + assert.Equal(t, 3, n) +} + +// TestWrappedFile_Read verifies that Read calls the inner file and +// returns the correct values. +func TestWrappedFile_Read(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Read(mock.Anything).Run(func(p []byte) { + copy(p, "hello") + }).Return(5, nil) + + var interceptorCalled bool + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + interceptorCalled = true + assert.Equal(t, "File.Read", req.Op()) + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + buf := make([]byte, 10) + n, err := wrapped.Read(buf) + + require.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, "hello", string(buf[:n])) + assert.True(t, interceptorCalled) +} + +// TestWrappedFile_ReadAt verifies that ReadAt calls the inner file correctly. +func TestWrappedFile_ReadAt(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().ReadAt(mock.Anything, int64(10)).Run(func(p []byte, _ int64) { + copy(p, "world") + }).Return(5, nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + buf := make([]byte, 10) + n, err := wrapped.ReadAt(buf, 10) + + require.NoError(t, err) + assert.Equal(t, 5, n) + readAtReq, ok := capturedReq.(middleware.FileReadAtRequest) + require.True(t, ok) + assert.Equal(t, int64(10), readAtReq.Offset) +} + +// TestWrappedFile_Seek verifies that Seek calls the inner file correctly. +func TestWrappedFile_Seek(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Seek(int64(100), 0).Return(int64(100), nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + pos, err := wrapped.Seek(100, 0) + + require.NoError(t, err) + assert.Equal(t, int64(100), pos) +} + +// TestWrappedFile_Close verifies that Close calls the inner file. +func TestWrappedFile_Close(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Close().Return(nil) + + var interceptorCalled bool + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + interceptorCalled = true + assert.Equal(t, "File.Close", req.Op()) + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + err := wrapped.Close() + + require.NoError(t, err) + assert.True(t, interceptorCalled) +} + +// TestWrappedFile_Lock verifies that Lock calls the inner file. +func TestWrappedFile_Lock(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Lock().Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + err := wrapped.Lock() + + require.NoError(t, err) +} + +// TestWrappedFile_Unlock verifies that Unlock calls the inner file. +func TestWrappedFile_Unlock(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Unlock().Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + err := wrapped.Unlock() + + require.NoError(t, err) +} + +// TestWrappedFile_Truncate verifies that Truncate calls the inner file. +func TestWrappedFile_Truncate(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Truncate(int64(1024)).Return(nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + err := wrapped.Truncate(1024) + + require.NoError(t, err) + truncReq, ok := capturedReq.(middleware.FileTruncateRequest) + require.True(t, ok) + assert.Equal(t, int64(1024), truncReq.Size) +} + +// TestWrappedFile_Name verifies that Name returns the inner file's name +// without going through the chain. +func TestWrappedFile_Name(t *testing.T) { + t.Parallel() + + mockFile := nfsproxymocks.NewMockFile(t) + mockFile.EXPECT().Name().Return("/path/to/file.txt") + + chain := middleware.NewChain() + wrapped := middleware.WrapFile(context.Background(), mockFile, chain) + + name := wrapped.Name() + + assert.Equal(t, "/path/to/file.txt", name) +} + +// TestWrapFilesystem_ReturnsNilForNilInput verifies that WrapFilesystem +// returns nil when given a nil filesystem. +func TestWrapFilesystem_ReturnsNilForNilInput(t *testing.T) { + t.Parallel() + + chain := middleware.NewChain() + result := middleware.WrapFilesystem(context.Background(), nil, chain) + + assert.Nil(t, result) +} + +// TestWrappedFS_Create verifies that Create calls the inner filesystem +// and wraps the returned file. +func TestWrappedFS_Create(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFile := nfsproxymocks.NewMockFile(t) + mockFS.EXPECT().Create("/test.txt").Return(mockFile, nil) + + var interceptorCalled bool + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + interceptorCalled = true + assert.Equal(t, "FS.Create", req.Op()) + createReq, ok := req.(middleware.FSCreateRequest) + assert.True(t, ok) + assert.Equal(t, "/test.txt", createReq.Filename) + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.Create("/test.txt") + + require.NoError(t, err) + assert.NotNil(t, file) + assert.True(t, interceptorCalled) +} + +// TestWrappedFS_Create_WithError verifies that Create returns both +// the file and error when the inner operation fails. +func TestWrappedFS_Create_WithError(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Create("/test.txt").Return(nil, errTest) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.Create("/test.txt") + + require.ErrorIs(t, err, errTest) + assert.Nil(t, file) +} + +// TestWrappedFS_Open verifies that Open calls the inner filesystem. +func TestWrappedFS_Open(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFile := nfsproxymocks.NewMockFile(t) + mockFS.EXPECT().Open("/test.txt").Return(mockFile, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.Open("/test.txt") + + require.NoError(t, err) + assert.NotNil(t, file) +} + +// TestWrappedFS_OpenFile verifies that OpenFile calls the inner filesystem. +func TestWrappedFS_OpenFile(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFile := nfsproxymocks.NewMockFile(t) + mockFS.EXPECT().OpenFile("/test.txt", os.O_RDWR, os.FileMode(0o644)).Return(mockFile, nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.OpenFile("/test.txt", os.O_RDWR, 0o644) + + require.NoError(t, err) + assert.NotNil(t, file) + openFileReq, ok := capturedReq.(middleware.FSOpenFileRequest) + require.True(t, ok) + assert.Equal(t, "/test.txt", openFileReq.Filename) + assert.Equal(t, os.O_RDWR, openFileReq.Flag) + assert.Equal(t, os.FileMode(0o644), openFileReq.Perm) +} + +// TestWrappedFS_Stat verifies that Stat calls the inner filesystem. +func TestWrappedFS_Stat(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockInfo := &mockFileInfo{name: "test.txt", size: 1024} + mockFS.EXPECT().Stat("/test.txt").Return(mockInfo, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + info, err := wrapped.Stat("/test.txt") + + require.NoError(t, err) + assert.Equal(t, "test.txt", info.Name()) + assert.Equal(t, int64(1024), info.Size()) +} + +// TestWrappedFS_Rename verifies that Rename calls the inner filesystem. +func TestWrappedFS_Rename(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Rename("/old.txt", "/new.txt").Return(nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + err := wrapped.Rename("/old.txt", "/new.txt") + + require.NoError(t, err) + renameReq, ok := capturedReq.(middleware.FSRenameRequest) + require.True(t, ok) + assert.Equal(t, "/old.txt", renameReq.OldPath) + assert.Equal(t, "/new.txt", renameReq.NewPath) +} + +// TestWrappedFS_Remove verifies that Remove calls the inner filesystem. +func TestWrappedFS_Remove(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Remove("/test.txt").Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + err := wrapped.Remove("/test.txt") + + require.NoError(t, err) +} + +// TestWrappedFS_Join verifies that Join calls the inner filesystem +// without going through the chain. +func TestWrappedFS_Join(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Join(mock.Anything).Return("/path/to/file.txt") + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + result := wrapped.Join("path", "to", "file.txt") + + assert.Equal(t, "/path/to/file.txt", result) +} + +// TestWrappedFS_TempFile verifies that TempFile calls the inner filesystem. +func TestWrappedFS_TempFile(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFile := nfsproxymocks.NewMockFile(t) + mockFS.EXPECT().TempFile("/tmp", "prefix").Return(mockFile, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.TempFile("/tmp", "prefix") + + require.NoError(t, err) + assert.NotNil(t, file) +} + +// TestWrappedFS_ReadDir verifies that ReadDir calls the inner filesystem. +func TestWrappedFS_ReadDir(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + infos := []os.FileInfo{ + &mockFileInfo{name: "file1.txt"}, + &mockFileInfo{name: "file2.txt"}, + } + mockFS.EXPECT().ReadDir("/dir").Return(infos, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + result, err := wrapped.ReadDir("/dir") + + require.NoError(t, err) + require.Len(t, result, 2) + assert.Equal(t, "file1.txt", result[0].Name()) + assert.Equal(t, "file2.txt", result[1].Name()) +} + +// TestWrappedFS_MkdirAll verifies that MkdirAll calls the inner filesystem. +func TestWrappedFS_MkdirAll(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().MkdirAll("/path/to/dir", os.FileMode(0o755)).Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + err := wrapped.MkdirAll("/path/to/dir", 0o755) + + require.NoError(t, err) +} + +// TestWrappedFS_Lstat verifies that Lstat calls the inner filesystem. +func TestWrappedFS_Lstat(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockInfo := &mockFileInfo{name: "link.txt"} + mockFS.EXPECT().Lstat("/link.txt").Return(mockInfo, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + info, err := wrapped.Lstat("/link.txt") + + require.NoError(t, err) + assert.Equal(t, "link.txt", info.Name()) +} + +// TestWrappedFS_Symlink verifies that Symlink calls the inner filesystem. +func TestWrappedFS_Symlink(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Symlink("/target", "/link").Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + err := wrapped.Symlink("/target", "/link") + + require.NoError(t, err) +} + +// TestWrappedFS_Readlink verifies that Readlink calls the inner filesystem. +func TestWrappedFS_Readlink(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Readlink("/link").Return("/target", nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + target, err := wrapped.Readlink("/link") + + require.NoError(t, err) + assert.Equal(t, "/target", target) +} + +// TestWrappedFS_Chroot verifies that Chroot calls the inner filesystem +// and wraps the returned filesystem. +func TestWrappedFS_Chroot(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockChrootFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Chroot("/subdir").Return(mockChrootFS, nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + chrootFS, err := wrapped.Chroot("/subdir") + + require.NoError(t, err) + assert.NotNil(t, chrootFS) +} + +// TestWrappedFS_Root verifies that Root returns the inner filesystem's root +// without going through the chain. +func TestWrappedFS_Root(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFS.EXPECT().Root().Return("/root/path") + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + root := wrapped.Root() + + assert.Equal(t, "/root/path", root) +} + +// TestWrapChange_ReturnsNilForNilInput verifies that WrapChange +// returns nil when given a nil change. +func TestWrapChange_ReturnsNilForNilInput(t *testing.T) { + t.Parallel() + + chain := middleware.NewChain() + result := middleware.WrapChange(context.Background(), nil, chain) + + assert.Nil(t, result) +} + +// TestWrappedChange_Chmod verifies that Chmod calls the inner change. +func TestWrappedChange_Chmod(t *testing.T) { + t.Parallel() + + mockChange := nfsproxymocks.NewMockChange(t) + mockChange.EXPECT().Chmod("/test.txt", os.FileMode(0o755)).Return(nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapChange(context.Background(), mockChange, chain) + + err := wrapped.Chmod("/test.txt", 0o755) + + require.NoError(t, err) + chmodReq, ok := capturedReq.(middleware.ChangeChmodRequest) + require.True(t, ok) + assert.Equal(t, "/test.txt", chmodReq.Name) + assert.Equal(t, os.FileMode(0o755), chmodReq.Mode) +} + +// TestWrappedChange_Lchown verifies that Lchown calls the inner change. +func TestWrappedChange_Lchown(t *testing.T) { + t.Parallel() + + mockChange := nfsproxymocks.NewMockChange(t) + mockChange.EXPECT().Lchown("/test.txt", 1000, 1000).Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapChange(context.Background(), mockChange, chain) + + err := wrapped.Lchown("/test.txt", 1000, 1000) + + require.NoError(t, err) +} + +// TestWrappedChange_Chown verifies that Chown calls the inner change. +func TestWrappedChange_Chown(t *testing.T) { + t.Parallel() + + mockChange := nfsproxymocks.NewMockChange(t) + mockChange.EXPECT().Chown("/test.txt", 1000, 1000).Return(nil) + + chain := middleware.NewChain() + wrapped := middleware.WrapChange(context.Background(), mockChange, chain) + + err := wrapped.Chown("/test.txt", 1000, 1000) + + require.NoError(t, err) +} + +// TestWrappedChange_Chtimes verifies that Chtimes calls the inner change. +func TestWrappedChange_Chtimes(t *testing.T) { + t.Parallel() + + mockChange := nfsproxymocks.NewMockChange(t) + atime := time.Now() + mtime := time.Now().Add(-time.Hour) + mockChange.EXPECT().Chtimes("/test.txt", atime, mtime).Return(nil) + + var capturedReq middleware.Request + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + capturedReq = req + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapChange(context.Background(), mockChange, chain) + + err := wrapped.Chtimes("/test.txt", atime, mtime) + + require.NoError(t, err) + chtimesReq, ok := capturedReq.(middleware.ChangeChtimesRequest) + require.True(t, ok) + assert.Equal(t, "/test.txt", chtimesReq.Name) + assert.Equal(t, atime, chtimesReq.ATime) + assert.Equal(t, mtime, chtimesReq.MTime) +} + +// TestWrappedFS_NestedOperations verifies that operations on wrapped files +// returned from wrapped filesystems still go through the interceptor chain. +func TestWrappedFS_NestedOperations(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + mockFile := nfsproxymocks.NewMockFile(t) + mockFS.EXPECT().Create("/test.txt").Return(mockFile, nil) + mockFile.EXPECT().Write([]byte("hello")).Return(5, nil) + mockFile.EXPECT().Close().Return(nil) + + var ops []string + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + ops = append(ops, req.Op()) + + return next(ctx) + } + + chain := middleware.NewChain(interceptor) + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + file, err := wrapped.Create("/test.txt") + require.NoError(t, err) + + _, err = file.Write([]byte("hello")) + require.NoError(t, err) + + err = file.Close() + require.NoError(t, err) + + assert.Equal(t, []string{"FS.Create", "File.Write", "File.Close"}, ops) +} + +// TestWrappedFS_Unwrap verifies that the inner filesystem can be unwrapped. +func TestWrappedFS_Unwrap(t *testing.T) { + t.Parallel() + + mockFS := nfsproxymocks.NewMockFilesystem(t) + + chain := middleware.NewChain() + wrapped := middleware.WrapFilesystem(context.Background(), mockFS, chain) + + // Type assert to get access to the Unwrap method + unwrapper, ok := wrapped.(interface{ Unwrap() billy.Filesystem }) + require.True(t, ok) + + inner := unwrapper.Unwrap() + assert.Equal(t, mockFS, inner) +} + +// TestWrappedHandler_FromHandle_DoesNotDoubleWrap verifies that FromHandle +// does not wrap an already-wrapped filesystem. This is critical because: +// 1. Mount wraps the filesystem before returning to the NFS server +// 2. The NFS server passes the wrapped filesystem to ToHandle +// 3. The caching handler stores the wrapped filesystem +// 4. FromHandle returns the cached (already wrapped) filesystem +// 5. If we wrap again, operations would go through interceptors twice +func TestWrappedHandler_FromHandle_DoesNotDoubleWrap(t *testing.T) { + t.Parallel() + + // Create a mock inner filesystem (simulating what chroot returns) + mockInnerFS := nfsproxymocks.NewMockFilesystem(t) + + // Create a wrapped filesystem (simulating what Mount returns) + chain := middleware.NewChain() + alreadyWrappedFS := middleware.WrapFilesystem(context.Background(), mockInnerFS, chain) + + // Create a mock handler that returns the already-wrapped filesystem + // (simulating what the caching handler would do) + mockHandler := nfsproxymocks.NewMockHandler(t) + mockHandler.EXPECT(). + FromHandle(mock.Anything, []byte("test-handle")). + Return(alreadyWrappedFS, []string{"path", "to", "file"}, nil) + + // Wrap the mock handler with our middleware + wrappedHandler := middleware.WrapHandler(mockHandler, chain) + + // Call FromHandle + resultFS, paths, err := wrappedHandler.FromHandle(context.Background(), []byte("test-handle")) + + // Verify no error + require.NoError(t, err) + assert.Equal(t, []string{"path", "to", "file"}, paths) + + // CRITICAL: Verify the filesystem is the SAME object, not a new wrapper + // If this fails, it means FromHandle is double-wrapping + assert.Same(t, alreadyWrappedFS, resultFS, + "FromHandle should return the filesystem as-is, not wrap it again") +} + +// TestWrappedHandler_FromHandle_PreservesInterceptorChain verifies that +// filesystem operations on the returned filesystem still go through the +// interceptor chain (because the filesystem was wrapped during Mount). +func TestWrappedHandler_FromHandle_PreservesInterceptorChain(t *testing.T) { + t.Parallel() + + // Track interceptor calls + var interceptorOps []string + interceptor := func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + interceptorOps = append(interceptorOps, req.Op()) + + return next(ctx) + } + chain := middleware.NewChain(interceptor) + + // Create a mock inner filesystem + mockInnerFS := nfsproxymocks.NewMockFilesystem(t) + mockInnerFS.EXPECT().Rename("/old", "/new").Return(nil) + + // Wrap it (simulating what Mount does) + wrappedFS := middleware.WrapFilesystem(context.Background(), mockInnerFS, chain) + + // Create a mock handler that returns the wrapped filesystem + mockHandler := nfsproxymocks.NewMockHandler(t) + mockHandler.EXPECT(). + FromHandle(mock.Anything, []byte("handle")). + Return(wrappedFS, []string{}, nil) + + // Wrap the handler + wrappedHandler := middleware.WrapHandler(mockHandler, chain) + + // Get filesystem via FromHandle + resultFS, _, err := wrappedHandler.FromHandle(context.Background(), []byte("handle")) + require.NoError(t, err) + + // Clear the interceptor ops (FromHandle itself calls interceptors) + interceptorOps = nil + + // Perform an operation on the filesystem + err = resultFS.Rename("/old", "/new") + require.NoError(t, err) + + // Verify the interceptor was called exactly ONCE for FS.Rename + // If double-wrapped, we'd see it twice + renameCount := 0 + for _, op := range interceptorOps { + if op == "FS.Rename" { + renameCount++ + } + } + assert.Equal(t, 1, renameCount, + "FS.Rename should be intercepted exactly once, not %d times (double-wrap bug)", renameCount) +} + +// mockFileInfo implements os.FileInfo for testing. +type mockFileInfo struct { + name string + size int64 + mode os.FileMode + modTime time.Time + isDir bool +} + +func (m *mockFileInfo) Name() string { return m.name } +func (m *mockFileInfo) Size() int64 { return m.size } +func (m *mockFileInfo) Mode() os.FileMode { return m.mode } +func (m *mockFileInfo) ModTime() time.Time { return m.modTime } +func (m *mockFileInfo) IsDir() bool { return m.isDir } +func (m *mockFileInfo) Sys() any { return nil } diff --git a/packages/orchestrator/pkg/nfsproxy/middleware/requests.go b/packages/orchestrator/pkg/nfsproxy/middleware/requests.go new file mode 100644 index 0000000000..1197a47111 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/middleware/requests.go @@ -0,0 +1,207 @@ +package middleware + +import ( + "os" + "time" +) + +// Request is the interface for all typed operation requests. +type Request interface { + Op() string +} + +// File operations + +type FileWriteRequest struct { + Data []byte +} + +func (r FileWriteRequest) Op() string { return "File.Write" } + +type FileReadRequest struct { + Buffer []byte +} + +func (r FileReadRequest) Op() string { return "File.Read" } + +type FileReadAtRequest struct { + Buffer []byte + Offset int64 +} + +func (r FileReadAtRequest) Op() string { return "File.ReadAt" } + +type FileSeekRequest struct { + Offset int64 + Whence int +} + +func (r FileSeekRequest) Op() string { return "File.Seek" } + +type FileCloseRequest struct{} + +func (r FileCloseRequest) Op() string { return "File.Close" } + +type FileLockRequest struct{} + +func (r FileLockRequest) Op() string { return "File.Lock" } + +type FileUnlockRequest struct{} + +func (r FileUnlockRequest) Op() string { return "File.Unlock" } + +type FileTruncateRequest struct { + Size int64 +} + +func (r FileTruncateRequest) Op() string { return "File.Truncate" } + +// Filesystem operations + +type FSCreateRequest struct { + Filename string +} + +func (r FSCreateRequest) Op() string { return "FS.Create" } + +type FSOpenRequest struct { + Filename string +} + +func (r FSOpenRequest) Op() string { return "FS.Open" } + +type FSOpenFileRequest struct { + Filename string + Flag int + Perm os.FileMode +} + +func (r FSOpenFileRequest) Op() string { return "FS.OpenFile" } + +type FSStatRequest struct { + Filename string +} + +func (r FSStatRequest) Op() string { return "FS.Stat" } + +type FSRenameRequest struct { + OldPath string + NewPath string +} + +func (r FSRenameRequest) Op() string { return "FS.Rename" } + +type FSRemoveRequest struct { + Filename string +} + +func (r FSRemoveRequest) Op() string { return "FS.Remove" } + +type FSTempFileRequest struct { + Dir string + Prefix string +} + +func (r FSTempFileRequest) Op() string { return "FS.TempFile" } + +type FSReadDirRequest struct { + Path string +} + +func (r FSReadDirRequest) Op() string { return "FS.ReadDir" } + +type FSMkdirAllRequest struct { + Filename string + Perm os.FileMode +} + +func (r FSMkdirAllRequest) Op() string { return "FS.MkdirAll" } + +type FSLstatRequest struct { + Filename string +} + +func (r FSLstatRequest) Op() string { return "FS.Lstat" } + +type FSSymlinkRequest struct { + Target string + Link string +} + +func (r FSSymlinkRequest) Op() string { return "FS.Symlink" } + +type FSReadlinkRequest struct { + Link string +} + +func (r FSReadlinkRequest) Op() string { return "FS.Readlink" } + +type FSChrootRequest struct { + Path string +} + +func (r FSChrootRequest) Op() string { return "FS.Chroot" } + +// Change operations + +type ChangeChmodRequest struct { + Name string + Mode os.FileMode +} + +func (r ChangeChmodRequest) Op() string { return "Change.Chmod" } + +type ChangeLchownRequest struct { + Name string + UID int + GID int +} + +func (r ChangeLchownRequest) Op() string { return "Change.Lchown" } + +type ChangeChownRequest struct { + Name string + UID int + GID int +} + +func (r ChangeChownRequest) Op() string { return "Change.Chown" } + +type ChangeChtimesRequest struct { + Name string + ATime time.Time + MTime time.Time +} + +func (r ChangeChtimesRequest) Op() string { return "Change.Chtimes" } + +// Handler operations + +type HandlerMountRequest struct { + RemoteAddr string + Dirpath string +} + +func (r HandlerMountRequest) Op() string { return "Handler.Mount" } + +type HandlerChangeRequest struct{} + +func (r HandlerChangeRequest) Op() string { return "Handler.Change" } + +type HandlerFSStatRequest struct{} + +func (r HandlerFSStatRequest) Op() string { return "Handler.FSStat" } + +type HandlerToHandleRequest struct { + Path []string +} + +func (r HandlerToHandleRequest) Op() string { return "Handler.ToHandle" } + +type HandlerFromHandleRequest struct{} + +func (r HandlerFromHandleRequest) Op() string { return "Handler.FromHandle" } + +type HandlerInvalidateHandleRequest struct{} + +func (r HandlerInvalidateHandleRequest) Op() string { return "Handler.InvalidateHandle" } diff --git a/packages/orchestrator/pkg/nfsproxy/o11y/logging.go b/packages/orchestrator/pkg/nfsproxy/o11y/logging.go new file mode 100644 index 0000000000..019a2b2b1d --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/o11y/logging.go @@ -0,0 +1,45 @@ +package o11y + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +// Logging logs operation start/end with durations. +func Logging(skipOps map[string]bool) middleware.Interceptor { + return func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + op := req.Op() + if skipOps[op] { + return next(ctx) + } + + start := time.Now() + requestID := uuid.NewString() + + l := logger.L().With(zap.String("requestID", requestID)) + l.Debug(ctx, fmt.Sprintf("[nfs proxy] %s: start", op), zap.String("operation", op)) + + err := next(ctx) + + logFields := []zap.Field{ + zap.Duration("dur", time.Since(start)), + } + logFields = append(logFields, argsToZapFields(req)...) + + if err == nil { + l.Debug(ctx, fmt.Sprintf("[nfs proxy] %s: end", op), logFields...) + } else { + logFields = append(logFields, zap.Error(err)) + l.Warn(ctx, fmt.Sprintf("[nfs proxy] %s: end", op), logFields...) + } + + return err + } +} diff --git a/packages/orchestrator/pkg/nfsproxy/o11y/metrics.go b/packages/orchestrator/pkg/nfsproxy/o11y/metrics.go new file mode 100644 index 0000000000..4aeb6ef0d2 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/o11y/metrics.go @@ -0,0 +1,47 @@ +package o11y + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" +) + +var meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/o11y") + +var ( + callsCounter = utils.Must(meter.Int64Counter("orchestrator.nfsproxy.calls.total", + metric.WithDescription("Total number of calls to the NFS proxy"), + metric.WithUnit("1"))) + durationHistogram = utils.Must(meter.Int64Histogram("orchestrator.nfsproxy.call.duration", + metric.WithDescription("Duration of calls to the NFS proxy"), + metric.WithUnit("ms"))) +) + +// Metrics records call counts and durations. +func Metrics(skipOps map[string]bool) middleware.Interceptor { + return func(ctx context.Context, req middleware.Request, next func(context.Context) error) error { + op := req.Op() + if skipOps[op] { + return next(ctx) + } + + start := time.Now() + err := next(ctx) + durationMs := time.Since(start).Milliseconds() + + attrs := metric.WithAttributes( + attribute.String("operation", op), + attribute.String("result", classifyResult(err)), + ) + callsCounter.Add(ctx, 1, attrs) + durationHistogram.Record(ctx, durationMs, attrs) + + return err + } +} diff --git a/packages/orchestrator/pkg/nfsproxy/o11y/tracing.go b/packages/orchestrator/pkg/nfsproxy/o11y/tracing.go new file mode 100644 index 0000000000..b8add66bba --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/o11y/tracing.go @@ -0,0 +1,36 @@ +package o11y + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" +) + +// Tracing creates OpenTelemetry spans for each operation. +func Tracing(skipOps map[string]bool) middleware.Interceptor { + tracer := otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/o11y") + + return func(ctx context.Context, req middleware.Request, next func(context.Context) error) (err error) { + op := req.Op() + if skipOps[op] { + return next(ctx) + } + + ctx, span := tracer.Start(ctx, op, trace.WithAttributes(argsToAttrs(req)...)) + defer func() { + if err != nil { + span.RecordError(err) + if !isUserError(err) { + span.SetStatus(codes.Error, err.Error()) + } + } + span.End() + }() + + return next(ctx) + } +} diff --git a/packages/orchestrator/pkg/nfsproxy/o11y/utils.go b/packages/orchestrator/pkg/nfsproxy/o11y/utils.go new file mode 100644 index 0000000000..18af3f44aa --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/o11y/utils.go @@ -0,0 +1,171 @@ +package o11y + +import ( + "errors" + "os" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +func isUserError(err error) bool { + return errors.Is(err, os.ErrNotExist) || errors.Is(err, os.ErrExist) +} + +func classifyResult(err error) string { + if err == nil { + return "success" + } + + if isUserError(err) { + return "client_error" + } + + return "other_error" +} + +// argField represents a single extracted argument field with its typed value. +// This is the intermediate representation used by both argsToAttrs and argsToZapFields. +type argField struct { + key string + value any // string, int, int64, []string, time.Time, or byteLen +} + +// byteLen is a wrapper to indicate we want to log the length of bytes, not the bytes themselves. +type byteLen int + +// extractArgFields extracts the relevant fields from a typed request. +// This is the single source of truth for which args to extract for each operation. +func extractArgFields(req middleware.Request) []argField { + var fields []argField + + switch r := req.(type) { + // Filesystem operations + case middleware.FSCreateRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + case middleware.FSOpenRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + case middleware.FSStatRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + case middleware.FSLstatRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + case middleware.FSRemoveRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + case middleware.FSReadlinkRequest: + fields = append(fields, argField{"nfs.filename", r.Link}) + case middleware.FSRenameRequest: + fields = append(fields, argField{"nfs.oldpath", r.OldPath}) + fields = append(fields, argField{"nfs.newpath", r.NewPath}) + case middleware.FSOpenFileRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + fields = append(fields, argField{"nfs.flag", r.Flag}) + fields = append(fields, argField{"nfs.perm", r.Perm.String()}) + case middleware.FSTempFileRequest: + fields = append(fields, argField{"nfs.dir", r.Dir}) + fields = append(fields, argField{"nfs.prefix", r.Prefix}) + case middleware.FSReadDirRequest: + fields = append(fields, argField{"nfs.path", r.Path}) + case middleware.FSChrootRequest: + fields = append(fields, argField{"nfs.path", r.Path}) + case middleware.FSMkdirAllRequest: + fields = append(fields, argField{"nfs.filename", r.Filename}) + fields = append(fields, argField{"nfs.perm", r.Perm.String()}) + case middleware.FSSymlinkRequest: + fields = append(fields, argField{"nfs.target", r.Target}) + fields = append(fields, argField{"nfs.link", r.Link}) + + // File operations + case middleware.FileWriteRequest: + fields = append(fields, argField{"nfs.len", byteLen(len(r.Data))}) + case middleware.FileReadRequest: + fields = append(fields, argField{"nfs.len", byteLen(len(r.Buffer))}) + case middleware.FileReadAtRequest: + fields = append(fields, argField{"nfs.len", byteLen(len(r.Buffer))}) + fields = append(fields, argField{"nfs.offset", r.Offset}) + case middleware.FileSeekRequest: + fields = append(fields, argField{"nfs.offset", r.Offset}) + fields = append(fields, argField{"nfs.whence", r.Whence}) + case middleware.FileTruncateRequest: + fields = append(fields, argField{"nfs.size", r.Size}) + + // Change operations + case middleware.ChangeChmodRequest: + fields = append(fields, argField{"nfs.name", r.Name}) + fields = append(fields, argField{"nfs.mode", r.Mode.String()}) + case middleware.ChangeLchownRequest: + fields = append(fields, argField{"nfs.name", r.Name}) + fields = append(fields, argField{"nfs.uid", r.UID}) + fields = append(fields, argField{"nfs.gid", r.GID}) + case middleware.ChangeChownRequest: + fields = append(fields, argField{"nfs.name", r.Name}) + fields = append(fields, argField{"nfs.uid", r.UID}) + fields = append(fields, argField{"nfs.gid", r.GID}) + case middleware.ChangeChtimesRequest: + fields = append(fields, argField{"nfs.name", r.Name}) + fields = append(fields, argField{"nfs.atime", r.ATime}) + fields = append(fields, argField{"nfs.mtime", r.MTime}) + + // Handler operations + case middleware.HandlerMountRequest: + fields = append(fields, argField{"net.conn.remote_addr", r.RemoteAddr}) + fields = append(fields, argField{"nfs.mount.dirpath", r.Dirpath}) + case middleware.HandlerToHandleRequest: + fields = append(fields, argField{"nfs.path", r.Path}) + } + + return fields +} + +// argsToAttrs converts a typed request to OpenTelemetry attributes for tracing. +func argsToAttrs(req middleware.Request) []attribute.KeyValue { + fields := extractArgFields(req) + attrs := make([]attribute.KeyValue, 0, len(fields)) + + for _, f := range fields { + switch v := f.value.(type) { + case string: + attrs = append(attrs, attribute.String(f.key, v)) + case int: + attrs = append(attrs, attribute.Int(f.key, v)) + case int64: + attrs = append(attrs, attribute.Int64(f.key, v)) + case byteLen: + attrs = append(attrs, attribute.Int(f.key, int(v))) + case []string: + attrs = append(attrs, attribute.StringSlice(f.key, v)) + case time.Time: + attrs = append(attrs, attribute.String(f.key, v.String())) + } + } + + return attrs +} + +// argsToZapFields converts a typed request to zap fields for logging. +func argsToZapFields(req middleware.Request) []zap.Field { + fields := extractArgFields(req) + zapFields := make([]zap.Field, 0, len(fields)) + + for _, f := range fields { + switch v := f.value.(type) { + case string: + zapFields = append(zapFields, zap.String(f.key, v)) + case int: + zapFields = append(zapFields, zap.Int(f.key, v)) + case int64: + zapFields = append(zapFields, zap.Int64(f.key, v)) + case byteLen: + zapFields = append(zapFields, zap.Int(f.key, int(v))) + case []string: + zapFields = append(zapFields, zap.Strings(f.key, v)) + case time.Time: + zapFields = append(zapFields, logger.Time(f.key, v)) + } + } + + return zapFields +} diff --git a/packages/orchestrator/pkg/nfsproxy/proxy.go b/packages/orchestrator/pkg/nfsproxy/proxy.go index b50c1b948e..37fe553e6f 100644 --- a/packages/orchestrator/pkg/nfsproxy/proxy.go +++ b/packages/orchestrator/pkg/nfsproxy/proxy.go @@ -13,10 +13,8 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/pkg/chrooted" "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/chroot" - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/logged" - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/metrics" - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/recovery" - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/tracing" + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/o11y" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox" ) @@ -29,7 +27,12 @@ type Proxy struct { server *nfs.Server } -func NewProxy(ctx context.Context, builder *chrooted.Builder, sandboxes *sandbox.Map, config cfg.Config) (*Proxy, error) { +func NewProxy( + ctx context.Context, + builder *chrooted.Builder, + sandboxes *sandbox.Map, + config cfg.Config, +) (*Proxy, error) { setLogLevelOnce.Do(func() { nfs.Log.SetLevel(config.NFSLogLevel) }) @@ -44,22 +47,25 @@ func NewProxy(ctx context.Context, builder *chrooted.Builder, sandboxes *sandbox return nil, fmt.Errorf("failed to create chroot NFS handler: %w", err) } - // wrap the handler in middleware + // wrap the handler in caching handler = helpers.NewCachingHandler(handler, cacheLimit) - if config.Tracing { - handler = tracing.WrapWithTracing(handler, config) + // build skip maps for conditional tracing/logging + skipOps := make(map[string]bool) + if !config.RecordStatCalls { + skipOps["FS.Stat"] = true + skipOps["FS.Lstat"] = true } - - if config.Metrics { - handler = metrics.WrapWithMetrics(handler, config) + if !config.RecordHandleCalls { + skipOps["Handler.ToHandle"] = true + skipOps["Handler.FromHandle"] = true + skipOps["Handler.InvalidateHandle"] = true } - if config.Logging { - handler = logged.WrapWithLogging(ctx, handler, config) - } - - handler = recovery.WrapWithRecovery(ctx, handler) + interceptors := buildInterceptors(config, skipOps) + interceptors = append(interceptors, config.Interceptors...) + chain := middleware.NewChain(interceptors...) + handler = middleware.WrapHandler(handler, chain) s := &nfs.Server{ Handler: handler, @@ -74,6 +80,26 @@ func NewProxy(ctx context.Context, builder *chrooted.Builder, sandboxes *sandbox }, nil } +func buildInterceptors(config cfg.Config, skipOps map[string]bool) []middleware.Interceptor { + // build interceptor chain (order matters: recovery should be first to catch panics from all others) + var interceptors []middleware.Interceptor + interceptors = append(interceptors, Recovery()) + + if config.Tracing { + interceptors = append(interceptors, o11y.Tracing(skipOps)) + } + + if config.Metrics { + interceptors = append(interceptors, o11y.Metrics(skipOps)) + } + + if config.Logging { + interceptors = append(interceptors, o11y.Logging(skipOps)) + } + + return interceptors +} + func (p *Proxy) Serve(lis net.Listener) error { if err := p.server.Serve(lis); err != nil { if strings.Contains(err.Error(), "use of closed network connection") { diff --git a/packages/orchestrator/pkg/nfsproxy/recovery.go b/packages/orchestrator/pkg/nfsproxy/recovery.go new file mode 100644 index 0000000000..85c569a238 --- /dev/null +++ b/packages/orchestrator/pkg/nfsproxy/recovery.go @@ -0,0 +1,31 @@ +package nfsproxy + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/middleware" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +// ErrPanic is returned when a panic is recovered. +var ErrPanic = fmt.Errorf("panic") + +// Recovery intercepts panics and converts them to errors. +func Recovery() middleware.Interceptor { + return func(ctx context.Context, req middleware.Request, next func(context.Context) error) (err error) { + defer func() { + if r := recover(); r != nil { //nolint:revive // always called via defer + logger.L().Error(ctx, fmt.Sprintf("panic in %q nfs operation", req.Op()), + zap.Any("panic", r), + zap.Stack("stack"), + ) + err = ErrPanic + } + }() + + return next(ctx) + } +} diff --git a/packages/orchestrator/pkg/nfsproxy/recovery/change.go b/packages/orchestrator/pkg/nfsproxy/recovery/change.go deleted file mode 100644 index 1352ebdf27..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/recovery/change.go +++ /dev/null @@ -1,48 +0,0 @@ -package recovery - -import ( - "context" - "os" - "time" - - "github.com/go-git/go-billy/v5" -) - -type change struct { - inner billy.Change - ctx context.Context //nolint:containedctx // can't change the API, still need it -} - -var _ billy.Change = (*change)(nil) - -func wrapChange(ctx context.Context, c billy.Change) billy.Change { - if c == nil { - return nil - } - - return &change{inner: c, ctx: ctx} -} - -func (c *change) Chmod(name string, mode os.FileMode) (e error) { - defer deferErrRecovery(c.ctx, "Change.Chmod", &e) - - return c.inner.Chmod(name, mode) -} - -func (c *change) Lchown(name string, uid, gid int) (e error) { - defer deferErrRecovery(c.ctx, "Change.Lchown", &e) - - return c.inner.Lchown(name, uid, gid) -} - -func (c *change) Chown(name string, uid, gid int) (e error) { - defer deferErrRecovery(c.ctx, "Change.Chown", &e) - - return c.inner.Chown(name, uid, gid) -} - -func (c *change) Chtimes(name string, atime time.Time, mtime time.Time) (e error) { - defer deferErrRecovery(c.ctx, "Change.Chtimes", &e) - - return c.inner.Chtimes(name, atime, mtime) -} diff --git a/packages/orchestrator/pkg/nfsproxy/recovery/file.go b/packages/orchestrator/pkg/nfsproxy/recovery/file.go deleted file mode 100644 index 526b2c5e3a..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/recovery/file.go +++ /dev/null @@ -1,76 +0,0 @@ -package recovery - -import ( - "context" - - "github.com/go-git/go-billy/v5" -) - -type file struct { - inner billy.File - ctx context.Context //nolint:containedctx // can't change the API, still need it -} - -var _ billy.File = (*file)(nil) - -func wrapFile(ctx context.Context, f billy.File) billy.File { - if f == nil { - return nil - } - - return &file{inner: f, ctx: ctx} -} - -func (f *file) Name() string { - defer tryRecovery(f.ctx, "File.Name") - - return f.inner.Name() -} - -func (f *file) Write(p []byte) (n int, e error) { - defer deferErrRecovery(f.ctx, "File.Write", &e) - - return f.inner.Write(p) -} - -func (f *file) Read(p []byte) (n int, e error) { - defer deferErrRecovery(f.ctx, "File.Read", &e) - - return f.inner.Read(p) -} - -func (f *file) ReadAt(p []byte, off int64) (n int, e error) { - defer deferErrRecovery(f.ctx, "File.ReadAt", &e) - - return f.inner.ReadAt(p, off) -} - -func (f *file) Seek(offset int64, whence int) (n int64, e error) { - defer deferErrRecovery(f.ctx, "File.Seek", &e) - - return f.inner.Seek(offset, whence) -} - -func (f *file) Close() (e error) { - defer deferErrRecovery(f.ctx, "File.Close", &e) - - return f.inner.Close() -} - -func (f *file) Lock() (e error) { - defer deferErrRecovery(f.ctx, "File.Lock", &e) - - return f.inner.Lock() -} - -func (f *file) Unlock() (e error) { - defer deferErrRecovery(f.ctx, "File.Unlock", &e) - - return f.inner.Unlock() -} - -func (f *file) Truncate(size int64) (e error) { - defer deferErrRecovery(f.ctx, "File.Truncate", &e) - - return f.inner.Truncate(size) -} diff --git a/packages/orchestrator/pkg/nfsproxy/recovery/fs.go b/packages/orchestrator/pkg/nfsproxy/recovery/fs.go deleted file mode 100644 index b2007c8c90..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/recovery/fs.go +++ /dev/null @@ -1,125 +0,0 @@ -package recovery - -import ( - "context" - "os" - - "github.com/go-git/go-billy/v5" -) - -type filesystem struct { - inner billy.Filesystem - ctx context.Context //nolint:containedctx // can't change the API, still need it -} - -var _ billy.Filesystem = (*filesystem)(nil) - -func wrapFS(ctx context.Context, fs billy.Filesystem) billy.Filesystem { - if fs == nil { - return nil - } - - return &filesystem{inner: fs, ctx: ctx} -} - -func (fs *filesystem) Create(filename string) (f billy.File, e error) { - defer deferErrRecovery(fs.ctx, "FS.Create", &e) - f, e = fs.inner.Create(filename) - - return wrapFile(fs.ctx, f), e -} - -func (fs *filesystem) Open(filename string) (f billy.File, e error) { - defer deferErrRecovery(fs.ctx, "FS.Open", &e) - f, e = fs.inner.Open(filename) - - return wrapFile(fs.ctx, f), e -} - -func (fs *filesystem) OpenFile(filename string, flag int, perm os.FileMode) (f billy.File, e error) { - defer deferErrRecovery(fs.ctx, "FS.OpenFile", &e) - f, e = fs.inner.OpenFile(filename, flag, perm) - - return wrapFile(fs.ctx, f), e -} - -func (fs *filesystem) Stat(filename string) (fi os.FileInfo, e error) { - defer deferErrRecovery(fs.ctx, "FS.Stat", &e) - - return fs.inner.Stat(filename) -} - -func (fs *filesystem) Rename(oldpath, newpath string) (e error) { - defer deferErrRecovery(fs.ctx, "FS.Rename", &e) - - return fs.inner.Rename(oldpath, newpath) -} - -func (fs *filesystem) Remove(filename string) (e error) { - defer deferErrRecovery(fs.ctx, "FS.Remove", &e) - - return fs.inner.Remove(filename) -} - -func (fs *filesystem) Join(elem ...string) string { - defer tryRecovery(fs.ctx, "Join") - - return fs.inner.Join(elem...) -} - -func (fs *filesystem) TempFile(dir, prefix string) (f billy.File, e error) { - defer deferErrRecovery(fs.ctx, "FS.TempFile", &e) - f, e = fs.inner.TempFile(dir, prefix) - - return wrapFile(fs.ctx, f), e -} - -func (fs *filesystem) ReadDir(path string) (fis []os.FileInfo, e error) { - defer deferErrRecovery(fs.ctx, "FS.ReadDir", &e) - - return fs.inner.ReadDir(path) -} - -func (fs *filesystem) MkdirAll(filename string, perm os.FileMode) (e error) { - defer deferErrRecovery(fs.ctx, "FS.MkdirAll", &e) - - return fs.inner.MkdirAll(filename, perm) -} - -func (fs *filesystem) Lstat(filename string) (fi os.FileInfo, e error) { - defer deferErrRecovery(fs.ctx, "FS.Lstat", &e) - - return fs.inner.Lstat(filename) -} - -func (fs *filesystem) Symlink(target, link string) (e error) { - defer deferErrRecovery(fs.ctx, "FS.Symlink", &e) - - return fs.inner.Symlink(target, link) -} - -func (fs *filesystem) Readlink(link string) (s string, e error) { - defer deferErrRecovery(fs.ctx, "FS.Readlink", &e) - - return fs.inner.Readlink(link) -} - -func (fs *filesystem) Chroot(path string) (f billy.Filesystem, e error) { - defer deferErrRecovery(fs.ctx, "FS.Chroot", &e) - inner, err := fs.inner.Chroot(path) - if err != nil { - return nil, err - } - - return wrapFS(fs.ctx, inner), nil -} - -func (fs *filesystem) Root() string { - defer tryRecovery(fs.ctx, "Root") - - return fs.inner.Root() -} - -func (fs *filesystem) Unwrap() billy.Filesystem { - return fs.inner -} diff --git a/packages/orchestrator/pkg/nfsproxy/recovery/main.go b/packages/orchestrator/pkg/nfsproxy/recovery/main.go deleted file mode 100644 index d58f35823d..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/recovery/main.go +++ /dev/null @@ -1,96 +0,0 @@ -package recovery - -import ( - "context" - "fmt" - "net" - - "github.com/go-git/go-billy/v5" - "github.com/willscott/go-nfs" - "go.uber.org/zap" - - "github.com/e2b-dev/infra/packages/shared/pkg/logger" -) - -type Handler struct { - inner nfs.Handler - ctx context.Context //nolint:containedctx // can't change the API, still need it -} - -var _ nfs.Handler = (*Handler)(nil) - -func WrapWithRecovery(ctx context.Context, h nfs.Handler) *Handler { - return &Handler{inner: h, ctx: ctx} -} - -func (h Handler) Mount(ctx context.Context, conn net.Conn, request nfs.MountRequest) (nfs.MountStatus, billy.Filesystem, []nfs.AuthFlavor) { - defer h.tryRecovery(ctx, "Mount") - s, fs, auth := h.inner.Mount(ctx, conn, request) - fs = wrapFS(ctx, fs) - - return s, fs, auth -} - -func (h Handler) Change(ctx context.Context, filesystem billy.Filesystem) billy.Change { - defer h.tryRecovery(ctx, "Change") - c := h.inner.Change(ctx, filesystem) - - return wrapChange(ctx, c) -} - -func (h Handler) FSStat(ctx context.Context, filesystem billy.Filesystem, stat *nfs.FSStat) (e error) { - defer deferErrRecovery(ctx, "Handler.FSStat", &e) - - return h.inner.FSStat(ctx, filesystem, stat) -} - -func (h Handler) ToHandle(ctx context.Context, fs billy.Filesystem, path []string) []byte { - defer h.tryRecovery(ctx, "ToHandle") - - return h.inner.ToHandle(ctx, fs, path) -} - -func (h Handler) FromHandle(ctx context.Context, fh []byte) (fs billy.Filesystem, path []string, e error) { - defer deferErrRecovery(ctx, "Handler.FromHandle", &e) - - return h.inner.FromHandle(ctx, fh) -} - -func (h Handler) InvalidateHandle(ctx context.Context, filesystem billy.Filesystem, bytes []byte) (e error) { - defer deferErrRecovery(ctx, "Handler.InvalidateHandle", &e) - - return h.inner.InvalidateHandle(ctx, filesystem, bytes) -} - -func (h Handler) HandleLimit() int { - defer h.tryRecovery(h.ctx, "HandleLimit") - - return h.inner.HandleLimit() -} - -func (h Handler) tryRecovery(ctx context.Context, name string) { - tryRecovery(ctx, name) -} - -func tryRecovery(ctx context.Context, name string) { - if r := recover(); r != nil { //nolint:revive // tryRecovery is always called via defer - logger.L().Error(ctx, fmt.Sprintf("panic in %q nfs handler", name), - zap.Any("panic", r), - zap.Stack("stack"), - ) - } -} - -var ErrPanic = fmt.Errorf("panic") - -func deferErrRecovery(ctx context.Context, name string, perr *error) { - if r := recover(); r != nil { //nolint:revive // always called via defer - logger.L().Error(ctx, fmt.Sprintf("panic in %q nfs handler", name), - zap.Any("panic", r), - zap.Stack("stack"), - ) - if perr != nil { - *perr = ErrPanic - } - } -} diff --git a/packages/orchestrator/pkg/nfsproxy/recovery/recovery_test.go b/packages/orchestrator/pkg/nfsproxy/recovery/recovery_test.go deleted file mode 100644 index 0f5ae6c8dd..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/recovery/recovery_test.go +++ /dev/null @@ -1,213 +0,0 @@ -package recovery - -import ( - "context" - "errors" - "net" - "os" - "testing" - "time" - - "github.com/go-git/go-billy/v5" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - nfs "github.com/willscott/go-nfs" - - nfsproxymocks "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/mocks" -) - -// ---- Tests: file.go ---- - -func TestFile_Write_PanicRecovered(t *testing.T) { - t.Parallel() - mf := nfsproxymocks.NewMockFile(t) - mf.EXPECT().Write(mock.Anything). - RunAndReturn(func(_ []byte) (int, error) { panic("File.Write") }) - f := wrapFile(t.Context(), mf) - n, err := f.Write([]byte("abc")) - require.ErrorIs(t, err, ErrPanic) - require.Equal(t, 0, n) -} - -func TestFile_Truncate_Happy(t *testing.T) { - t.Parallel() - mf := nfsproxymocks.NewMockFile(t) - mf.EXPECT().Truncate(int64(0)).Return(nil) - f := wrapFile(t.Context(), mf) - require.NoError(t, f.Truncate(0)) -} - -func TestFile_Name_Panic_NoCrash(t *testing.T) { - t.Parallel() - mf := nfsproxymocks.NewMockFile(t) - mf.EXPECT().Name().RunAndReturn(func() string { panic("File.Name") }) - f := wrapFile(t.Context(), mf) - // Should not panic; should return zero value - got := f.Name() - require.Empty(t, got) -} - -func TestFile_Write_Error_Propagates(t *testing.T) { - t.Parallel() - boom := errors.New("boom") - mf := nfsproxymocks.NewMockFile(t) - mf.EXPECT().Write(mock.Anything).Return(0, boom) - f := wrapFile(t.Context(), mf) - _, err := f.Write([]byte("x")) - require.ErrorIs(t, err, boom) -} - -// ---- Tests: fs.go ---- - -func TestFS_Stat_PanicRecovered(t *testing.T) { - t.Parallel() - mfs := nfsproxymocks.NewMockFilesystem(t) - mfs.EXPECT().Stat("/x").RunAndReturn(func(string) (os.FileInfo, error) { panic("FS.Stat") }) - - fs := wrapFS(t.Context(), mfs) - _, err := fs.Stat("/x") - require.ErrorIs(t, err, ErrPanic) -} - -func TestFS_Create_Happy_WrapsFile(t *testing.T) { - t.Parallel() - mfs := nfsproxymocks.NewMockFilesystem(t) - mf := nfsproxymocks.NewMockFile(t) - mfs.EXPECT().Create("/file.txt").Return(mf, nil) - fs := wrapFS(t.Context(), mfs) - f, err := fs.Create("/file.txt") - require.NoError(t, err) - // ensure the returned file is our wrapped type - require.IsType(t, &file{}, f) -} - -func TestFS_Join_Panic_NoCrash(t *testing.T) { - t.Parallel() - mfs := nfsproxymocks.NewMockFilesystem(t) - // The generated mock treats variadic args as a single []string parameter in expectation. - mfs.EXPECT().Join([]string{"a", "b"}). - RunAndReturn(func(_ ...string) string { panic("Join") }) - fs := wrapFS(t.Context(), mfs) - require.NotPanics(t, func() { _ = fs.Join("a", "b") }) // should not panic -} - -func TestFS_Remove_Error_Propagates(t *testing.T) { - t.Parallel() - boom := errors.New("boom") - mfs := nfsproxymocks.NewMockFilesystem(t) - mfs.EXPECT().Remove("/x").Return(boom) - fs := wrapFS(t.Context(), mfs) - err := fs.Remove("/x") - require.ErrorIs(t, err, boom) -} - -// ---- Tests: change.go ---- - -func TestChange_Chmod_PanicRecovered(t *testing.T) { - t.Parallel() - mch := nfsproxymocks.NewMockChange(t) - mch.EXPECT().Chmod("/x", os.FileMode(0o644)). - RunAndReturn(func(string, os.FileMode) error { panic("Change.Chmod") }) - ch := wrapChange(t.Context(), mch) - require.ErrorIs(t, ch.Chmod("/x", 0o644), ErrPanic) -} - -func TestChange_Chown_Happy(t *testing.T) { - t.Parallel() - mch := nfsproxymocks.NewMockChange(t) - mch.EXPECT().Chown("/x", 1, 1).Return(nil) - ch := wrapChange(t.Context(), mch) - require.NoError(t, ch.Chown("/x", 1, 1)) -} - -func TestChange_Chtimes_Error_Propagates(t *testing.T) { - t.Parallel() - boom := errors.New("boom") - mch := nfsproxymocks.NewMockChange(t) - mch.EXPECT().Chtimes("/x", mock.Anything, mock.Anything).Return(boom) - ch := wrapChange(t.Context(), mch) - err := ch.Chtimes("/x", time.Unix(0, 0), time.Unix(0, 0)) - require.ErrorIs(t, err, boom) -} - -// ---- Tests: main.go (Handler) ---- - -func TestHandler_FSStat_PanicRecovered(t *testing.T) { - t.Parallel() - mh := nfsproxymocks.NewMockHandler(t) - mh.EXPECT().FSStat(mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(context.Context, billy.Filesystem, *nfs.FSStat) error { panic("Handler.FSStat") }) - h := WrapWithRecovery(t.Context(), mh) - var stat nfs.FSStat - require.ErrorIs(t, h.FSStat(t.Context(), nil, &stat), ErrPanic) -} - -func TestHandler_Mount_Panic_NoCrash(t *testing.T) { - t.Parallel() - mh := nfsproxymocks.NewMockHandler(t) - mh.EXPECT().Mount(mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(context.Context, net.Conn, nfs.MountRequest) (nfs.MountStatus, billy.Filesystem, []nfs.AuthFlavor) { - panic("Mount") - }) - h := WrapWithRecovery(t.Context(), mh) - status, fs, auth := h.Mount(t.Context(), nil, nfs.MountRequest{}) - // On panic, zero values returned. Ensure it didn't panic and fs is nil. - require.Nil(t, fs) - require.Zero(t, status) - require.Empty(t, auth) -} - -func TestHandler_Mount_WrapsFS(t *testing.T) { - t.Parallel() - base := nfsproxymocks.NewMockFilesystem(t) - mh := nfsproxymocks.NewMockHandler(t) - mh.EXPECT().Mount(mock.Anything, mock.Anything, mock.Anything).Return(nfs.MountStatus(0), base, nil) - h := WrapWithRecovery(t.Context(), mh) - _, fs, _ := h.Mount(t.Context(), nil, nfs.MountRequest{}) - require.IsType(t, &filesystem{}, fs) -} - -func TestHandler_FromHandle_PanicRecovered(t *testing.T) { - t.Parallel() - mh := nfsproxymocks.NewMockHandler(t) - mh.EXPECT().FromHandle(mock.Anything, mock.Anything). - RunAndReturn(func(context.Context, []byte) (billy.Filesystem, []string, error) { panic("Handler.FromHandle") }) - h := WrapWithRecovery(t.Context(), mh) - _, _, err := h.FromHandle(t.Context(), []byte("x")) - require.ErrorIs(t, err, ErrPanic) -} - -func TestHandler_InvalidateHandle_PanicRecovered(t *testing.T) { - t.Parallel() - mh := nfsproxymocks.NewMockHandler(t) - mh.EXPECT().InvalidateHandle(mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(context.Context, billy.Filesystem, []byte) error { panic("Handler.InvalidateHandle") }) - h := WrapWithRecovery(t.Context(), mh) - require.ErrorIs(t, h.InvalidateHandle(t.Context(), nil, []byte("x")), ErrPanic) -} - -func TestHandler_Error_Propagation(t *testing.T) { - t.Parallel() - boom := errors.New("boom") - mh := nfsproxymocks.NewMockHandler(t) - // FSStat - mh.EXPECT().FSStat(mock.Anything, mock.Anything, mock.Anything).Return(boom) - h := WrapWithRecovery(t.Context(), mh) - var stat nfs.FSStat - err := h.FSStat(t.Context(), nil, &stat) - require.ErrorIs(t, err, boom) - - // FromHandle - mh2 := nfsproxymocks.NewMockHandler(t) - mh2.EXPECT().FromHandle(mock.Anything, mock.Anything).Return(billy.Filesystem(nil), nil, boom) - h2 := WrapWithRecovery(t.Context(), mh2) - _, _, err = h2.FromHandle(t.Context(), []byte("x")) - require.ErrorIs(t, err, boom) - - // InvalidateHandle - mh3 := nfsproxymocks.NewMockHandler(t) - mh3.EXPECT().InvalidateHandle(mock.Anything, mock.Anything, mock.Anything).Return(boom) - h3 := WrapWithRecovery(t.Context(), mh3) - err = h3.InvalidateHandle(t.Context(), nil, nil) - require.ErrorIs(t, err, boom) -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/change.go b/packages/orchestrator/pkg/nfsproxy/tracing/change.go deleted file mode 100644 index 8c913aab3b..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/change.go +++ /dev/null @@ -1,60 +0,0 @@ -package tracing - -import ( - "context" - "os" - "time" - - "github.com/go-git/go-billy/v5" - "go.opentelemetry.io/otel/attribute" -) - -type tracingChange struct { - ctx context.Context //nolint:containedctx - inner billy.Change -} - -var _ billy.Change = (*tracingChange)(nil) - -func newChange(ctx context.Context, change billy.Change) billy.Change { - return &tracingChange{ctx: ctx, inner: change} -} - -func (l *tracingChange) Chmod(name string, mode os.FileMode) (err error) { - _, finish := startSpan(l.ctx, "Change.Chmod", - attribute.String("nfs.name", name), - attribute.String("nfs.mode", mode.String())) - defer func() { finish(err) }() - - return l.inner.Chmod(name, mode) -} - -func (l *tracingChange) Lchown(name string, uid, gid int) (err error) { - _, finish := startSpan(l.ctx, "Change.Lchown", - attribute.String("nfs.name", name), - attribute.Int("nfs.uid", uid), - attribute.Int("nfs.gid", gid)) - defer func() { finish(err) }() - - return l.inner.Lchown(name, uid, gid) -} - -func (l *tracingChange) Chown(name string, uid, gid int) (err error) { - _, finish := startSpan(l.ctx, "Change.Chown", - attribute.String("nfs.name", name), - attribute.Int("nfs.uid", uid), - attribute.Int("nfs.gid", gid)) - defer func() { finish(err) }() - - return l.inner.Chown(name, uid, gid) -} - -func (l *tracingChange) Chtimes(name string, atime time.Time, mtime time.Time) (err error) { - _, finish := startSpan(l.ctx, "Change.Chtimes", - attribute.String("nfs.name", name), - attribute.String("nfs.atime", atime.String()), - attribute.String("nfs.mtime", mtime.String())) - defer func() { finish(err) }() - - return l.inner.Chtimes(name, atime, mtime) -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/file.go b/packages/orchestrator/pkg/nfsproxy/tracing/file.go deleted file mode 100644 index 6534ce0b14..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/file.go +++ /dev/null @@ -1,90 +0,0 @@ -package tracing - -import ( - "context" - - "github.com/go-git/go-billy/v5" - "go.opentelemetry.io/otel/attribute" -) - -type tracingFile struct { - ctx context.Context //nolint:containedctx - inner billy.File - finishOpen finishFunc -} - -var _ billy.File = (*tracingFile)(nil) - -func wrapFile(ctx context.Context, f billy.File, finish finishFunc) billy.File { - return &tracingFile{ctx: ctx, inner: f, finishOpen: finish} -} - -func (l *tracingFile) Name() string { - return l.inner.Name() -} - -func (l *tracingFile) Write(p []byte) (n int, err error) { - _, finish := startSpan(l.ctx, "File.Write", attribute.Int("nfs.len", len(p))) - defer func() { finish(err, attribute.Int("nfs.n", n)) }() - - return l.inner.Write(p) -} - -func (l *tracingFile) Read(p []byte) (n int, err error) { - _, finish := startSpan(l.ctx, "File.Read", attribute.Int("nfs.len", len(p))) - defer func() { finish(err, attribute.Int("nfs.n", n)) }() - - return l.inner.Read(p) -} - -func (l *tracingFile) ReadAt(p []byte, off int64) (n int, err error) { - _, finish := startSpan(l.ctx, "File.ReadAt", - attribute.Int("nfs.len", len(p)), - attribute.Int64("nfs.offset", off)) - defer func() { finish(err, attribute.Int("nfs.n", n)) }() - - return l.inner.ReadAt(p, off) -} - -func (l *tracingFile) Seek(offset int64, whence int) (n int64, err error) { - _, finish := startSpan(l.ctx, "File.Seek", - attribute.Int64("nfs.offset", offset), - attribute.Int("nfs.whence", whence)) - defer func() { finish(err, attribute.Int64("nfs.n", n)) }() - - return l.inner.Seek(offset, whence) -} - -func (l *tracingFile) Close() (err error) { - _, finish := startSpan(l.ctx, "File.Close") - defer func() { - finish(err) - // End the open span when the file is closed - if l.finishOpen != nil { - l.finishOpen(err) - } - }() - - return l.inner.Close() -} - -func (l *tracingFile) Lock() (err error) { - _, finish := startSpan(l.ctx, "File.Lock") - defer func() { finish(err) }() - - return l.inner.Lock() -} - -func (l *tracingFile) Unlock() (err error) { - _, finish := startSpan(l.ctx, "File.Unlock") - defer func() { finish(err) }() - - return l.inner.Unlock() -} - -func (l *tracingFile) Truncate(size int64) (err error) { - _, finish := startSpan(l.ctx, "File.Truncate", attribute.Int64("nfs.size", size)) - defer func() { finish(err) }() - - return l.inner.Truncate(size) -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/fs.go b/packages/orchestrator/pkg/nfsproxy/tracing/fs.go deleted file mode 100644 index dbc8b8a6e3..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/fs.go +++ /dev/null @@ -1,185 +0,0 @@ -package tracing - -import ( - "context" - "os" - - "github.com/go-git/go-billy/v5" - "go.opentelemetry.io/otel/attribute" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type tracingFS struct { - ctx context.Context //nolint:containedctx - inner billy.Filesystem - - config cfg.Config -} - -var _ billy.Filesystem = (*tracingFS)(nil) - -func wrapFS(ctx context.Context, fs billy.Filesystem, config cfg.Config) billy.Filesystem { - return &tracingFS{ctx: ctx, inner: fs, config: config} -} - -func (l *tracingFS) Create(filename string) (f billy.File, err error) { - ctx, finish := startSpan(l.ctx, "FS.Create", attribute.String("nfs.filename", filename)) - - f, err = l.inner.Create(filename) - if err != nil { - finish(err) - - return - } - - f = wrapFile(ctx, f, finish) - - return -} - -func (l *tracingFS) Open(filename string) (f billy.File, err error) { - ctx, finish := startSpan(l.ctx, "FS.Open", attribute.String("nfs.filename", filename)) - - f, err = l.inner.Open(filename) - if err != nil { - finish(err) - - return - } - - f = wrapFile(ctx, f, finish) - - return -} - -func (l *tracingFS) OpenFile(filename string, flag int, perm os.FileMode) (f billy.File, err error) { - ctx, finish := startSpan(l.ctx, "FS.OpenFile", - attribute.String("nfs.filename", filename), - attribute.Int("nfs.flag", flag), - attribute.String("nfs.perm", perm.String())) - - f, err = l.inner.OpenFile(filename, flag, perm) - if err != nil { - finish(err) - - return - } - - f = wrapFile(ctx, f, finish) - - return -} - -func (l *tracingFS) Stat(filename string) (fi os.FileInfo, err error) { - // these are potentially very chatty and uninteresting - if !l.config.RecordStatCalls { - return l.inner.Stat(filename) - } - - _, finish := startSpan(l.ctx, "FS.Stat", attribute.String("nfs.filename", filename)) - defer func() { finish(err) }() - - return l.inner.Stat(filename) -} - -func (l *tracingFS) Rename(oldpath, newpath string) (err error) { - _, finish := startSpan(l.ctx, "FS.Rename", - attribute.String("nfs.oldpath", oldpath), - attribute.String("nfs.newpath", newpath)) - defer func() { finish(err) }() - - return l.inner.Rename(oldpath, newpath) -} - -func (l *tracingFS) Remove(filename string) (err error) { - _, finish := startSpan(l.ctx, "FS.Remove", attribute.String("nfs.filename", filename)) - defer func() { finish(err) }() - - return l.inner.Remove(filename) -} - -func (l *tracingFS) Join(elem ...string) string { - return l.inner.Join(elem...) -} - -func (l *tracingFS) TempFile(dir, prefix string) (f billy.File, err error) { - ctx, finish := startSpan(l.ctx, "FS.TempFile", - attribute.String("nfs.dir", dir), - attribute.String("nfs.prefix", prefix)) - - f, err = l.inner.TempFile(dir, prefix) - if err != nil { - finish(err) - - return - } - - f = wrapFile(ctx, f, finish) - - return -} - -func (l *tracingFS) ReadDir(path string) (fi []os.FileInfo, err error) { - _, finish := startSpan(l.ctx, "FS.ReadDir", attribute.String("nfs.path", path)) - defer func() { finish(err) }() - - return l.inner.ReadDir(path) -} - -func (l *tracingFS) MkdirAll(filename string, perm os.FileMode) (err error) { - _, finish := startSpan(l.ctx, "FS.MkdirAll", - attribute.String("nfs.filename", filename), - attribute.String("nfs.perm", perm.String())) - defer func() { finish(err) }() - - return l.inner.MkdirAll(filename, perm) -} - -func (l *tracingFS) Lstat(filename string) (fi os.FileInfo, err error) { - // these are potentially very chatty and uninteresting - if !l.config.RecordStatCalls { - return l.inner.Lstat(filename) - } - - _, finish := startSpan(l.ctx, "FS.Lstat", attribute.String("nfs.filename", filename)) - defer func() { finish(err) }() - - return l.inner.Lstat(filename) -} - -func (l *tracingFS) Symlink(target, link string) (err error) { - _, finish := startSpan(l.ctx, "FS.Symlink", - attribute.String("nfs.target", target), - attribute.String("nfs.link", link)) - defer func() { finish(err) }() - - return l.inner.Symlink(target, link) -} - -func (l *tracingFS) Readlink(link string) (target string, err error) { - _, finish := startSpan(l.ctx, "FS.Readlink", attribute.String("nfs.link", link)) - defer func() { finish(err) }() - - return l.inner.Readlink(link) -} - -func (l *tracingFS) Chroot(path string) (fs billy.Filesystem, err error) { - ctx, finish := startSpan(l.ctx, "FS.Chroot", attribute.String("nfs.path", path)) - defer func() { finish(err) }() - - inner, err := l.inner.Chroot(path) - if err != nil { - return nil, err - } - - return wrapFS(ctx, inner, l.config), nil -} - -func (l *tracingFS) Root() string { - return l.inner.Root() -} - -func (l *tracingFS) Unwrap() billy.Filesystem { - return l.inner -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/handler.go b/packages/orchestrator/pkg/nfsproxy/tracing/handler.go deleted file mode 100644 index 2034af3782..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/handler.go +++ /dev/null @@ -1,103 +0,0 @@ -package tracing - -import ( - "context" - "fmt" - "net" - - "github.com/go-git/go-billy/v5" - "github.com/willscott/go-nfs" - "go.opentelemetry.io/otel/attribute" - - "github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/cfg" -) - -type tracingHandler struct { - inner nfs.Handler - config cfg.Config -} - -var _ nfs.Handler = (*tracingHandler)(nil) - -func WrapWithTracing(handler nfs.Handler, config cfg.Config) nfs.Handler { - return &tracingHandler{inner: handler, config: config} -} - -func (e *tracingHandler) Mount(ctx context.Context, conn net.Conn, request nfs.MountRequest) (s nfs.MountStatus, fs billy.Filesystem, auth []nfs.AuthFlavor) { - ctx, finish := startSpan(ctx, "NFS.Mount", - attribute.String("net.conn.remote_addr", conn.RemoteAddr().String()), - attribute.String("nfs.mount.dirpath", string(request.Dirpath))) - - defer func() { - var err error - if s != nfs.MountStatusOk { - err = fmt.Errorf("mount status = %d", s) - } - finish(err) - }() - - s, fs, auth = e.inner.Mount(ctx, conn, request) - if fs != nil { - fs = wrapFS(ctx, fs, e.config) - } - - return -} - -func (e *tracingHandler) Change(ctx context.Context, filesystem billy.Filesystem) billy.Change { - ctx, finish := startSpan(ctx, "NFS.Change") - defer finish(nil) - - change := e.inner.Change(ctx, filesystem) - - return newChange(ctx, change) -} - -func (e *tracingHandler) FSStat(ctx context.Context, filesystem billy.Filesystem, stat *nfs.FSStat) (err error) { - ctx, finish := startSpan(ctx, "NFS.FSStat") - defer func() { finish(err) }() - - return e.inner.FSStat(ctx, filesystem, stat) -} - -func (e *tracingHandler) ToHandle(ctx context.Context, fs billy.Filesystem, path []string) (fh []byte) { - if !e.config.RecordHandleCalls { - return e.inner.ToHandle(ctx, fs, path) - } - - _, finish := startSpan(ctx, "NFS.ToHandle", attribute.StringSlice("nfs.path", path)) - defer finish(nil) - - return e.inner.ToHandle(ctx, fs, path) -} - -func (e *tracingHandler) FromHandle(ctx context.Context, fh []byte) (fs billy.Filesystem, paths []string, err error) { - if !e.config.RecordHandleCalls { - return e.inner.FromHandle(ctx, fh) - } - - ctx, finish := startSpan(ctx, "NFS.FromHandle") - defer func() { finish(err) }() - - fs, paths, err = e.inner.FromHandle(ctx, fh) - if fs != nil { - fs = wrapFS(ctx, fs, e.config) - } - - return -} - -func (e *tracingHandler) InvalidateHandle(ctx context.Context, filesystem billy.Filesystem, bytes []byte) (err error) { - if !e.config.RecordHandleCalls { - return e.inner.InvalidateHandle(ctx, filesystem, bytes) - } - - ctx, finish := startSpan(ctx, "NFS.InvalidateHandle") - defer func() { finish(err) }() - - return e.inner.InvalidateHandle(ctx, filesystem, bytes) -} - -func (e *tracingHandler) HandleLimit() int { - return e.inner.HandleLimit() -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/util.go b/packages/orchestrator/pkg/nfsproxy/tracing/util.go deleted file mode 100644 index aa9f37a236..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/util.go +++ /dev/null @@ -1,45 +0,0 @@ -package tracing - -import ( - "context" - "errors" - "os" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/pkg/nfsproxy/tracing") - -type finishFunc func(error, ...attribute.KeyValue) - -func startSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, finishFunc) { - ctx, span := tracer.Start(ctx, name, trace.WithAttributes(attrs...)) //nolint:spancheck // span.End called by returned function - - return ctx, func(err error, endAttrs ...attribute.KeyValue) { //nolint:spancheck // must be called by caller - if len(endAttrs) > 0 { - span.SetAttributes(endAttrs...) - } - if err != nil { - span.RecordError(err) - if !isUserError(err) { - span.SetStatus(codes.Error, err.Error()) - } - } - span.End() - } -} - -func isUserError(err error) bool { - if errors.Is(err, os.ErrNotExist) { - return true - } - - if errors.Is(err, os.ErrExist) { - return true - } - - return false -} diff --git a/packages/orchestrator/pkg/nfsproxy/tracing/util_test.go b/packages/orchestrator/pkg/nfsproxy/tracing/util_test.go deleted file mode 100644 index 1850a40d11..0000000000 --- a/packages/orchestrator/pkg/nfsproxy/tracing/util_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package tracing - -import ( - "io/fs" - "os" - "syscall" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestIsUserError(t *testing.T) { - t.Parallel() - - testCases := map[string]struct { - err error - expected bool - }{ - "os.ErrNotExist": { - err: os.ErrNotExist, - expected: true, - }, - "os.ErrExist": { - err: os.ErrExist, - expected: true, - }, - "fs.ErrNotExist": { - err: fs.ErrNotExist, - expected: true, - }, - "*fs.PathError(no such file)": { - err: &fs.PathError{Op: "open", Path: "no_such_file", Err: fs.ErrNotExist}, - expected: true, - }, - "syscall.EEXIST": { - err: syscall.EEXIST, - expected: true, - }, - "syscall.ENOEXIST": { - err: syscall.ENOENT, - expected: true, - }, - "other error": { - err: syscall.EACCES, - expected: false, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - t.Parallel() - - actual := isUserError(tc.err) - assert.Equal(t, tc.expected, actual) - }) - } -}