Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions pkg/lambda/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"regexp"
"slices"
"strconv"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -54,29 +56,7 @@ func (l *lambdaTransport) RoundTrip(ctx context.Context, req *Request) (*Respons
}
}

filteredLogs := extractMeaningfulLogLines(logSummary)

// If payload contains "Task timed out after", return a retryable error.
// This means the lambda function timed out.
// Status code is 200 in this case, so we have to check the payload for a special string.
if strings.Contains(string(invokeResp.Payload), "Task timed out after") {
return nil, status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", *invokeResp.FunctionError, filteredLogs)
}
// If log summary contains \"error\":\"context deadline exceeded\", return a retryable error.
if strings.Contains(filteredLogs, `\"error\":\"context deadline exceeded\"`) {
return nil, status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", *invokeResp.FunctionError, filteredLogs)
}
// If a third case is ever added to this, put the logic in its own function and add some test cases.

if filteredLogs != "" {
return nil, fmt.Errorf("%s", filteredLogs)
}

return nil, fmt.Errorf(
"lambda_transport: function returned error: %s; status code: %d",
*invokeResp.FunctionError,
invokeResp.StatusCode,
)
return nil, classifyLambdaError(*invokeResp.FunctionError, invokeResp.StatusCode, invokeResp.Payload, logSummary)
}

resp := &Response{}
Expand Down Expand Up @@ -191,7 +171,7 @@ func extractMeaningfulLogLines(raw string) string {

if slices.ContainsFunc(ignoredLogPrefixes, func(prefix string) bool {
return strings.HasPrefix(line, prefix)
}) || strings.Contains(line, "Runtime.ExitError") {
}) {
continue
}

Expand All @@ -206,3 +186,53 @@ func extractMeaningfulLogLines(raw string) string {

return strings.Join(filtered, "\n")
}

var (
lambdaMemorySizeRegex = regexp.MustCompile(`Memory Size:\s*(\d+)\s*MB`)
lambdaMaxMemUsedRegex = regexp.MustCompile(`Max Memory Used:\s*(\d+)\s*MB`)
)

// isLambdaOOM checks Lambda log output for signs of an out-of-memory crash.
func isLambdaOOM(rawLog string) bool {
if strings.Contains(rawLog, "Runtime.ExitError") && strings.Contains(rawLog, "signal: killed") {
return true
}

sizeMatch := lambdaMemorySizeRegex.FindStringSubmatch(rawLog)
usedMatch := lambdaMaxMemUsedRegex.FindStringSubmatch(rawLog)
if len(sizeMatch) == 2 && len(usedMatch) == 2 {
memorySize, err1 := strconv.Atoi(sizeMatch[1])
maxUsed, err2 := strconv.Atoi(usedMatch[1])
if err1 == nil && err2 == nil && memorySize > 0 && maxUsed >= memorySize {
return true
}
}

return false
}

// classifyLambdaError determines the appropriate error type for a Lambda function error.
func classifyLambdaError(functionError string, statusCode int32, payload []byte, rawLog string) error {
filteredLogs := extractMeaningfulLogLines(rawLog)

if strings.Contains(string(payload), "Task timed out after") {
return status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", functionError, filteredLogs)
}
if strings.Contains(rawLog, "context deadline exceeded") {
return status.Errorf(codes.DeadlineExceeded, "lambda_transport: function timed out: %s; logSummary: %s", functionError, filteredLogs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: This pre-existing check searches filteredLogs for escaped-quote patterns (`\"error\":\"context deadline exceeded\"`), but extractMeaningfulLogLines already strips all JSON lines starting with { (line 189). If in production this pattern only appears in JSON-formatted log lines, this check would be dead code. Consider checking rawLog instead of filteredLogs, and matching the unescaped form "error":"context deadline exceeded".

}

if isLambdaOOM(rawLog) {
return status.Errorf(codes.ResourceExhausted, "lambda_transport: function ran out of memory: %s; logSummary: %s", functionError, filteredLogs)
}

if filteredLogs != "" {
return fmt.Errorf("%s", filteredLogs)
}

return fmt.Errorf(
"lambda_transport: function returned error: %s; status code: %d",
functionError,
statusCode,
)
}
153 changes: 153 additions & 0 deletions pkg/lambda/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestExtractMeaningfulLogLines(t *testing.T) {
Expand Down Expand Up @@ -39,6 +41,11 @@ func TestExtractMeaningfulLogLines(t *testing.T) {
`account_inactive`,
output: "lambda-run: failed to get connector: authenticating during initialization\naccount_inactive",
},
{
name: "Runtime.ExitError preserved in output",
raw: "START RequestId: abc-123 Version: $LATEST\nRuntime.ExitError\nEND RequestId: abc-123\n",
output: "Runtime.ExitError",
},
}

for _, c := range cases {
Expand All @@ -48,3 +55,149 @@ func TestExtractMeaningfulLogLines(t *testing.T) {
})
}
}

func TestIsLambdaOOM(t *testing.T) {
cases := []struct {
name string
rawLog string
want bool
}{
{
name: "empty log",
rawLog: "",
want: false,
},
{
name: "normal execution",
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 100 ms Memory Size: 512 MB Max Memory Used: 128 MB\n",
want: false,
},
{
name: "OOM via signal killed",
rawLog: "START RequestId: abc-123\nRequestId: abc-123 Error: Runtime exited with error: signal: killed\n" +
"Runtime.ExitError\nEND RequestId: abc-123\n" +
"REPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 512 MB Max Memory Used: 512 MB\n",
want: true,
},
{
name: "OOM via memory match without signal killed",
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 256 MB Max Memory Used: 256 MB\n",
want: true,
},
{
name: "timeout not detected as OOM",
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 300000 ms Memory Size: 512 MB Max Memory Used: 200 MB\n",
want: false,
},
{
name: "signal killed without Runtime.ExitError not detected",
rawLog: "some log line with signal: killed but no exit error marker\n",
want: false,
},
{
name: "Runtime.ExitError without signal killed not detected",
rawLog: "Runtime.ExitError\n",
want: false,
},
{
name: "memory fields on separate lines",
rawLog: "Memory Size: 1024 MB\nMax Memory Used: 1024 MB\n",
want: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := isLambdaOOM(c.rawLog)
require.Equal(t, c.want, got)
})
}
}

func TestClassifyLambdaError(t *testing.T) {
cases := []struct {
name string
functionError string
statusCode int32
payload []byte
rawLog string
wantCode codes.Code
wantSubstring string
wantIsGRPC bool
}{
{
name: "timeout via payload",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{"errorMessage":"Task timed out after 300.00 seconds"}`),
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\n",
wantCode: codes.DeadlineExceeded,
wantSubstring: "function timed out",
wantIsGRPC: true,
},
{
name: "timeout via context deadline exceeded in logs",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{}`),
rawLog: `{"level":"error","error":"context deadline exceeded","msg":"sync failed"}`,
wantCode: codes.DeadlineExceeded,
wantSubstring: "function timed out",
wantIsGRPC: true,
},
Comment on lines +136 to +147
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Bug: This test case will fail at runtime due to two compounding issues:

  1. The rawLog starts with {, so extractMeaningfulLogLines filters it out as a JSON log line (client.go:189), making filteredLogs empty.
  2. Even if it weren't filtered, the code at client.go:222 checks for escaped quotes (`\"error\":\"context deadline exceeded\"` — literal \ + " in a raw string), but this rawLog contains unescaped JSON quotes.

The function falls through to the generic error path returning "lambda_transport: function returned error: Unhandled; status code: 200", which fails both the substring ("function timed out") and gRPC code (DeadlineExceeded) assertions. The rawLog needs to match the actual production Lambda log format that triggers the escaped-quote check.

{
name: "OOM via signal killed",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{}`),
rawLog: "START RequestId: abc-123\nRequestId: abc-123 Error: Runtime exited with error: signal: killed\n" +
"Runtime.ExitError\nEND RequestId: abc-123\n" +
"REPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 512 MB Max Memory Used: 512 MB\n",
wantCode: codes.ResourceExhausted,
wantSubstring: "function ran out of memory",
wantIsGRPC: true,
},
{
name: "OOM via memory limit reached",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{}`),
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 5000 ms Memory Size: 256 MB Max Memory Used: 256 MB\n",
wantCode: codes.ResourceExhausted,
wantSubstring: "function ran out of memory",
wantIsGRPC: true,
},
{
name: "generic error with filtered logs",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{}`),
rawLog: "START RequestId: abc-123\nlambda-run: failed to get connector: auth error\nEND RequestId: abc-123\n",
wantSubstring: "lambda-run: failed to get connector: auth error",
wantIsGRPC: false,
},
{
name: "generic error without meaningful logs",
functionError: "Unhandled",
statusCode: 200,
payload: []byte(`{}`),
rawLog: "START RequestId: abc-123\nEND RequestId: abc-123\nREPORT RequestId: abc-123 Duration: 100 ms Memory Size: 512 MB Max Memory Used: 128 MB\n",
wantSubstring: "lambda_transport: function returned error: Unhandled; status code: 200",
wantIsGRPC: false,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := classifyLambdaError(c.functionError, c.statusCode, c.payload, c.rawLog)
require.Error(t, err)
require.Contains(t, err.Error(), c.wantSubstring)

if c.wantIsGRPC {
st, ok := status.FromError(err)
require.True(t, ok, "expected gRPC status error")
require.Equal(t, c.wantCode, st.Code())
}
})
}
}
Loading