From 83c3263c931234f161ed40f82fb4b4a02b116fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Sat, 2 May 2026 22:25:09 +0300 Subject: [PATCH] fix: close failed gRPC streams, fix libvirt goroutine leak, add io.LimitReader - coordinator/service.go (#330): call stream.CloseAndRecv() before removing failed streams from the slice in Write and repairNodes paths - libvirt/real_client.go (#286): add ctx.Done and done checks to goroutine send to errChan to prevent blocking when context is cancelled - function_handler.go (#302): wrap io.ReadAll with io.LimitReader for code upload (10MB) and invoke payload (1MB) --- internal/handlers/function_handler.go | 12 +++++++++--- internal/repositories/libvirt/real_client.go | 14 ++++++++++++-- internal/storage/coordinator/service.go | 4 +++- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/internal/handlers/function_handler.go b/internal/handlers/function_handler.go index d19f32356..f78fcb880 100644 --- a/internal/handlers/function_handler.go +++ b/internal/handlers/function_handler.go @@ -13,7 +13,13 @@ import ( "github.com/poyrazk/thecloud/pkg/httputil" ) -const invalidFunctionIDMsg = "invalid function id" +const ( + invalidFunctionIDMsg = "invalid function id" + // maxFunctionCodeSize prevents memory exhaustion when reading function code uploads. + maxFunctionCodeSize = 10 * 1024 * 1024 // 10 MB + // maxInvokePayloadSize prevents memory exhaustion on function invocation payloads. + maxInvokePayloadSize = 1 * 1024 * 1024 // 1 MB +) // FunctionHandler handles serverless function HTTP endpoints. type FunctionHandler struct { @@ -61,7 +67,7 @@ func (h *FunctionHandler) Create(c *gin.Context) { } defer func() { _ = f.Close() }() - code, err := io.ReadAll(f) + code, err := io.ReadAll(io.LimitReader(f, maxFunctionCodeSize)) if err != nil { httputil.Error(c, errors.Wrap(errors.Internal, "failed to read code file", err)) return @@ -148,7 +154,7 @@ func (h *FunctionHandler) Invoke(c *gin.Context) { return } - payload, err := io.ReadAll(c.Request.Body) + payload, err := io.ReadAll(io.LimitReader(c.Request.Body, maxInvokePayloadSize)) if err != nil { httputil.Error(c, errors.Wrap(errors.InvalidInput, "failed to read payload", err)) return diff --git a/internal/repositories/libvirt/real_client.go b/internal/repositories/libvirt/real_client.go index 41a517a5a..0c2e913fe 100644 --- a/internal/repositories/libvirt/real_client.go +++ b/internal/repositories/libvirt/real_client.go @@ -25,7 +25,12 @@ func (r *RealLibvirtClient) Connect(ctx context.Context) error { return default: } - errChan <- r.conn.Connect() + err := r.conn.Connect() + select { + case errChan <- err: + case <-done: + case <-ctx.Done(): + } }() select { @@ -48,7 +53,12 @@ func (r *RealLibvirtClient) ConnectToURI(ctx context.Context, uri string) error return default: } - errChan <- r.conn.ConnectToURI(libvirt.ConnectURI(uri)) + err := r.conn.ConnectToURI(libvirt.ConnectURI(uri)) + select { + case errChan <- err: + case <-done: + case <-ctx.Done(): + } }() select { diff --git a/internal/storage/coordinator/service.go b/internal/storage/coordinator/service.go index 934d460ff..efdf0226b 100644 --- a/internal/storage/coordinator/service.go +++ b/internal/storage/coordinator/service.go @@ -243,7 +243,8 @@ func (c *Coordinator) Write(ctx context.Context, bucket, key string, r io.Reader }, }) if errSend != nil { - // Remove failed stream + // Close stream before removing to release resources + _, _ = streams[i].stream.CloseAndRecv() streams = append(streams[:i], streams[i+1:]...) } } @@ -465,6 +466,7 @@ func (c *Coordinator) repairNodes(ctx context.Context, bucket, key string, r io. }, }) if errSend != nil { + _, _ = streams[i].stream.CloseAndRecv() streams = append(streams[:i], streams[i+1:]...) i-- }