diff --git a/pkg/lambda/grpc/client.go b/pkg/lambda/grpc/client.go index 81206188f..0c51b9999 100644 --- a/pkg/lambda/grpc/client.go +++ b/pkg/lambda/grpc/client.go @@ -5,7 +5,9 @@ import ( "encoding/base64" "encoding/json" "fmt" + "regexp" "slices" + "strconv" "strings" "github.com/aws/aws-sdk-go-v2/aws" @@ -54,29 +56,7 @@ func (l *lambdaTransport) RoundTrip(ctx context.Context, req *Request) (*Respons } } - filteredLogs := extractMeaningfulLogLines(logSummary) - - // If payload contains "Task timed out after", return a retryable error. - // This means the lambda function timed out. - // Status code is 200 in this case, so we have to check the payload for a special string. - if strings.Contains(string(invokeResp.Payload), "Task timed out after") { - return nil, status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", *invokeResp.FunctionError, filteredLogs) - } - // If log summary contains \"error\":\"context deadline exceeded\", return a retryable error. - if strings.Contains(filteredLogs, `\"error\":\"context deadline exceeded\"`) { - return nil, status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", *invokeResp.FunctionError, filteredLogs) - } - // If a third case is ever added to this, put the logic in its own function and add some test cases. - - if filteredLogs != "" { - return nil, fmt.Errorf("%s", filteredLogs) - } - - return nil, fmt.Errorf( - "lambda_transport: function returned error: %s; status code: %d", - *invokeResp.FunctionError, - invokeResp.StatusCode, - ) + return nil, classifyLambdaError(*invokeResp.FunctionError, invokeResp.StatusCode, invokeResp.Payload, logSummary) } resp := &Response{} @@ -191,7 +171,7 @@ func extractMeaningfulLogLines(raw string) string { if slices.ContainsFunc(ignoredLogPrefixes, func(prefix string) bool { return strings.HasPrefix(line, prefix) - }) || strings.Contains(line, "Runtime.ExitError") { + }) { continue } @@ -206,3 +186,53 @@ func extractMeaningfulLogLines(raw string) string { return strings.Join(filtered, "\n") } + +var ( + lambdaMemorySizeRegex = regexp.MustCompile(`Memory Size:\s*(\d+)\s*MB`) + lambdaMaxMemUsedRegex = regexp.MustCompile(`Max Memory Used:\s*(\d+)\s*MB`) +) + +// isLambdaOOM checks Lambda log output for signs of an out-of-memory crash. +func isLambdaOOM(rawLog string) bool { + if strings.Contains(rawLog, "Runtime.ExitError") && strings.Contains(rawLog, "signal: killed") { + return true + } + + sizeMatch := lambdaMemorySizeRegex.FindStringSubmatch(rawLog) + usedMatch := lambdaMaxMemUsedRegex.FindStringSubmatch(rawLog) + if len(sizeMatch) == 2 && len(usedMatch) == 2 { + memorySize, err1 := strconv.Atoi(sizeMatch[1]) + maxUsed, err2 := strconv.Atoi(usedMatch[1]) + if err1 == nil && err2 == nil && memorySize > 0 && maxUsed >= memorySize { + return true + } + } + + return false +} + +// classifyLambdaError determines the appropriate error type for a Lambda function error. +func classifyLambdaError(functionError string, statusCode int32, payload []byte, rawLog string) error { + filteredLogs := extractMeaningfulLogLines(rawLog) + + if strings.Contains(string(payload), "Task timed out after") { + return status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", functionError, filteredLogs) + } + if strings.Contains(rawLog, "context deadline exceeded") { + return status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", functionError, filteredLogs) + } + + if isLambdaOOM(rawLog) { + return status.Errorf(codes.ResourceExhausted, "lambda_transport: function ran out of memory: %s; logSummary: %s", functionError, filteredLogs) + } + + if filteredLogs != "" { + return fmt.Errorf("%s", filteredLogs) + } + + return fmt.Errorf( + "lambda_transport: function returned error: %s; status code: %d", + functionError, + statusCode, + ) +} diff --git a/pkg/lambda/grpc/client_test.go b/pkg/lambda/grpc/client_test.go index c50ca5946..727194b58 100644 --- a/pkg/lambda/grpc/client_test.go +++ b/pkg/lambda/grpc/client_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestExtractMeaningfulLogLines(t *testing.T) { @@ -39,6 +41,11 @@ func TestExtractMeaningfulLogLines(t *testing.T) { `account_inactive`, output: "lambda-run: failed to get connector: authenticating during initialization\naccount_inactive", }, + { + name: "Runtime.ExitError preserved in output", + raw: "START RequestId: abc-123 Version: $LATEST\nRuntime.ExitError\nEND RequestId: abc-123\n", + output: "Runtime.ExitError", + }, } for _, c := range cases { @@ -48,3 +55,149 @@ func TestExtractMeaningfulLogLines(t *testing.T) { }) } } + +func TestIsLambdaOOM(t *testing.T) { + cases := []struct { + name string + rawLog string + want bool + }{ + { + name: "empty log", + rawLog: "", + want: false, + }, + { + name: "normal execution", + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 100 ms Memory Size: 512 MB Max Memory Used: 128 MB\n", + want: false, + }, + { + name: "OOM via signal killed", + rawLog: "START RequestId: abc-123\nRequestId: abc-123 Error: Runtime exited with error: signal: killed\n" + + "Runtime.ExitError\nEND RequestId: abc-123\n" + + "REPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 512 MB Max Memory Used: 512 MB\n", + want: true, + }, + { + name: "OOM via memory match without signal killed", + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 256 MB Max Memory Used: 256 MB\n", + want: true, + }, + { + name: "timeout not detected as OOM", + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 300000 ms Memory Size: 512 MB Max Memory Used: 200 MB\n", + want: false, + }, + { + name: "signal killed without Runtime.ExitError not detected", + rawLog: "some log line with signal: killed but no exit error marker\n", + want: false, + }, + { + name: "Runtime.ExitError without signal killed not detected", + rawLog: "Runtime.ExitError\n", + want: false, + }, + { + name: "memory fields on separate lines", + rawLog: "Memory Size: 1024 MB\nMax Memory Used: 1024 MB\n", + want: true, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := isLambdaOOM(c.rawLog) + require.Equal(t, c.want, got) + }) + } +} + +func TestClassifyLambdaError(t *testing.T) { + cases := []struct { + name string + functionError string + statusCode int32 + payload []byte + rawLog string + wantCode codes.Code + wantSubstring string + wantIsGRPC bool + }{ + { + name: "timeout via payload", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{"errorMessage":"Task timed out after 300.00 seconds"}`), + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\n", + wantCode: codes.DeadlineExceeded, + wantSubstring: "function timed out", + wantIsGRPC: true, + }, + { + name: "timeout via context deadline exceeded in logs", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{}`), + rawLog: `{"level":"error","error":"context deadline exceeded","msg":"sync failed"}`, + wantCode: codes.DeadlineExceeded, + wantSubstring: "function timed out", + wantIsGRPC: true, + }, + { + name: "OOM via signal killed", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{}`), + rawLog: "START RequestId: abc-123\nRequestId: abc-123 Error: Runtime exited with error: signal: killed\n" + + "Runtime.ExitError\nEND RequestId: abc-123\n" + + "REPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 512 MB Max Memory Used: 512 MB\n", + wantCode: codes.ResourceExhausted, + wantSubstring: "function ran out of memory", + wantIsGRPC: true, + }, + { + name: "OOM via memory limit reached", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{}`), + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 256 MB Max Memory Used: 256 MB\n", + wantCode: codes.ResourceExhausted, + wantSubstring: "function ran out of memory", + wantIsGRPC: true, + }, + { + name: "generic error with filtered logs", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{}`), + rawLog: "START RequestId: abc-123\nlambda-run: failed to get connector: auth error\nEND RequestId: abc-123\n", + wantSubstring: "lambda-run: failed to get connector: auth error", + wantIsGRPC: false, + }, + { + name: "generic error without meaningful logs", + functionError: "Unhandled", + statusCode: 200, + payload: []byte(`{}`), + rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 100 ms Memory Size: 512 MB Max Memory Used: 128 MB\n", + wantSubstring: "lambda_transport: function returned error: Unhandled; status code: 200", + wantIsGRPC: false, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + err := classifyLambdaError(c.functionError, c.statusCode, c.payload, c.rawLog) + require.Error(t, err) + require.Contains(t, err.Error(), c.wantSubstring) + + if c.wantIsGRPC { + st, ok := status.FromError(err) + require.True(t, ok, "expected gRPC status error") + require.Equal(t, c.wantCode, st.Code()) + } + }) + } +}