diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 971295e8..c1ba1088 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -1731,7 +1731,7 @@ func (h *handler) handleListOffsets(ctx context.Context, header *protocol.Reques } func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeader, req *kmsg.FetchRequest) ([]byte, error) { - if header.APIVersion < 11 || header.APIVersion > 13 { + if header.APIVersion > 13 { return nil, fmt.Errorf("fetch version %d not supported", header.APIVersion) } topicResponses := make([]kmsg.FetchResponseTopic, 0, len(req.Topics)) diff --git a/pkg/broker/server.go b/pkg/broker/server.go index 2efdfa4e..d1c4ebfc 100644 --- a/pkg/broker/server.go +++ b/pkg/broker/server.go @@ -130,8 +130,13 @@ func (s *Server) handleConnection(conn net.Conn) { } respPayload, err := s.Handler.Handle(ctx, header, req) if err != nil { - log.Printf("handle request: %v", err) - return + log.Printf("handle request api=%d v=%d: %v", header.APIKey, header.APIVersion, err) + // Send an UNKNOWN_SERVER_ERROR response instead of dropping the + // connection so the client can recover gracefully. + if errResp := buildErrorResponse(header); errResp != nil { + _ = protocol.WriteFrame(conn, errResp) + } + continue } if respPayload == nil { continue @@ -142,3 +147,15 @@ func (s *Server) handleConnection(conn net.Conn) { } } } + +// buildErrorResponse creates a minimal Kafka error response for the given +// request header so the client receives a proper error instead of a closed +// connection. Returns nil if no suitable response can be constructed. +func buildErrorResponse(header *protocol.RequestHeader) []byte { + resp := kmsg.ResponseForKey(header.APIKey) + if resp == nil { + return nil + } + resp.SetVersion(header.APIVersion) + return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp) +} diff --git a/pkg/broker/server_test.go b/pkg/broker/server_test.go index 3e985f9f..e2a392a9 100644 --- a/pkg/broker/server_test.go +++ b/pkg/broker/server_test.go @@ -204,3 +204,193 @@ func TestServerListenAndServe_Shutdown(t *testing.T) { t.Fatalf("server did not exit after cancel") } } + +func TestServerListenAndServeNoHandler(t *testing.T) { + s := &Server{Addr: "127.0.0.1:0"} + err := s.ListenAndServe(context.Background()) + if err == nil || err.Error() != "broker.Server requires a Handler" { + t.Fatalf("expected handler required error, got: %v", err) + } +} + +func TestServerWait(t *testing.T) { + s := &Server{Handler: &testHandler{}} + // No goroutines → Wait returns immediately + done := make(chan struct{}) + go func() { + s.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Wait should return immediately with no connections") + } +} + +func TestServerListenAddress(t *testing.T) { + s := &Server{Addr: "127.0.0.1:9999", Handler: &testHandler{}} + // Before listening, returns configured addr + if got := s.ListenAddress(); got != "127.0.0.1:9999" { + t.Fatalf("expected configured addr, got %q", got) + } +} + +func TestServerHandleConnection_ConnContext(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + s := &Server{ + Handler: &testHandler{}, + ConnContextFunc: func(conn net.Conn) (net.Conn, *ConnContext, error) { + return conn, &ConnContext{Principal: "test-user", RemoteAddr: "1.2.3.4:5678"}, nil + }, + } + + done := make(chan struct{}) + go func() { + defer close(done) + s.handleConnection(serverConn) + }() + + if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + + resp, err := protocol.ReadFrame(clientConn) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + + reader := bytes.NewReader(resp.Payload) + var corr int32 + if err := binary.Read(reader, binary.BigEndian, &corr); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if corr != 42 { + t.Fatalf("expected correlation 42, got %d", corr) + } + + _ = clientConn.Close() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("handleConnection did not exit") + } +} + +func TestServerHandleConnection_ConnContextError(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + s := &Server{ + Handler: &testHandler{}, + ConnContextFunc: func(conn net.Conn) (net.Conn, *ConnContext, error) { + return nil, nil, errors.New("auth failed") + }, + } + + done := make(chan struct{}) + go func() { + defer close(done) + s.handleConnection(serverConn) + }() + + select { + case <-done: + // Connection should close immediately due to error + case <-time.After(time.Second): + t.Fatal("handleConnection should exit immediately on ConnContext error") + } +} + +func TestServerHandleConnection_BadFrame(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + s := &Server{Handler: &testHandler{}} + + done := make(chan struct{}) + go func() { + defer close(done) + s.handleConnection(serverConn) + }() + + // Write invalid data (too short for a frame header) + _, _ = clientConn.Write([]byte{0, 0, 0, 2, 0xff, 0xff}) + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("handleConnection should exit on bad parse") + } +} + +type errorHandler struct{} + +func (h *errorHandler) Handle(ctx context.Context, header *protocol.RequestHeader, req kmsg.Request) ([]byte, error) { + return nil, errors.New("handler error") +} + +func TestServerHandleConnection_HandlerError(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + s := &Server{Handler: &errorHandler{}} + + done := make(chan struct{}) + go func() { + defer close(done) + s.handleConnection(serverConn) + }() + + if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + + // Handler error should send an error response instead of closing the + // connection so the client can recover gracefully. + frame, err := protocol.ReadFrame(clientConn) + if err != nil { + t.Fatalf("expected error response frame, got read error: %v", err) + } + if len(frame.Payload) == 0 { + t.Fatal("expected non-empty error response payload") + } +} + +type nilHandler struct{} + +func (h *nilHandler) Handle(ctx context.Context, header *protocol.RequestHeader, req kmsg.Request) ([]byte, error) { + return nil, nil +} + +func TestServerHandleConnection_NilResponse(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + s := &Server{Handler: &nilHandler{}} + + done := make(chan struct{}) + go func() { + defer close(done) + s.handleConnection(serverConn) + }() + + if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + + // Handler returns nil → server continues loop, send another then close + if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil { + t.Fatalf("WriteFrame 2: %v", err) + } + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("handleConnection should exit after client close") + } +}