Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ func (h *handler) handleListOffsets(ctx context.Context, header *protocol.Reques
}

func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeader, req *kmsg.FetchRequest) ([]byte, error) {
if header.APIVersion < 11 || header.APIVersion > 13 {
if header.APIVersion > 13 {
return nil, fmt.Errorf("fetch version %d not supported", header.APIVersion)
}
topicResponses := make([]kmsg.FetchResponseTopic, 0, len(req.Topics))
Expand Down
21 changes: 19 additions & 2 deletions pkg/broker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@
}
respPayload, err := s.Handler.Handle(ctx, header, req)
if err != nil {
log.Printf("handle request: %v", err)
return
log.Printf("handle request api=%d v=%d: %v", header.APIKey, header.APIVersion, err)
// Send an UNKNOWN_SERVER_ERROR response instead of dropping the
// connection so the client can recover gracefully.
if errResp := buildErrorResponse(header); errResp != nil {
_ = protocol.WriteFrame(conn, errResp)
}
continue
}
if respPayload == nil {
continue
Expand All @@ -142,3 +147,15 @@
}
}
}

// buildErrorResponse creates a minimal Kafka error response for the given
// request header so the client receives a proper error instead of a closed
// connection. Returns nil if no suitable response can be constructed.
func buildErrorResponse(header *protocol.RequestHeader) []byte {
resp := kmsg.ResponseForKey(header.APIKey)
if resp == nil {
return nil
}
resp.SetVersion(header.APIVersion)
return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp)
}
190 changes: 190 additions & 0 deletions pkg/broker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,193 @@ func TestServerListenAndServe_Shutdown(t *testing.T) {
t.Fatalf("server did not exit after cancel")
}
}

func TestServerListenAndServeNoHandler(t *testing.T) {
s := &Server{Addr: "127.0.0.1:0"}
err := s.ListenAndServe(context.Background())
if err == nil || err.Error() != "broker.Server requires a Handler" {
t.Fatalf("expected handler required error, got: %v", err)
}
}

func TestServerWait(t *testing.T) {
s := &Server{Handler: &testHandler{}}
// No goroutines → Wait returns immediately
done := make(chan struct{})
go func() {
s.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Wait should return immediately with no connections")
}
}

func TestServerListenAddress(t *testing.T) {
s := &Server{Addr: "127.0.0.1:9999", Handler: &testHandler{}}
// Before listening, returns configured addr
if got := s.ListenAddress(); got != "127.0.0.1:9999" {
t.Fatalf("expected configured addr, got %q", got)
}
}

func TestServerHandleConnection_ConnContext(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

s := &Server{
Handler: &testHandler{},
ConnContextFunc: func(conn net.Conn) (net.Conn, *ConnContext, error) {
return conn, &ConnContext{Principal: "test-user", RemoteAddr: "1.2.3.4:5678"}, nil
},
}

done := make(chan struct{})
go func() {
defer close(done)
s.handleConnection(serverConn)
}()

if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil {
t.Fatalf("WriteFrame: %v", err)
}

resp, err := protocol.ReadFrame(clientConn)
if err != nil {
t.Fatalf("ReadFrame: %v", err)
}

reader := bytes.NewReader(resp.Payload)
var corr int32
if err := binary.Read(reader, binary.BigEndian, &corr); err != nil {
t.Fatalf("read correlation id: %v", err)
}
if corr != 42 {
t.Fatalf("expected correlation 42, got %d", corr)
}

_ = clientConn.Close()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handleConnection did not exit")
}
}

func TestServerHandleConnection_ConnContextError(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

s := &Server{
Handler: &testHandler{},
ConnContextFunc: func(conn net.Conn) (net.Conn, *ConnContext, error) {
return nil, nil, errors.New("auth failed")
},
}

done := make(chan struct{})
go func() {
defer close(done)
s.handleConnection(serverConn)
}()

select {
case <-done:
// Connection should close immediately due to error
case <-time.After(time.Second):
t.Fatal("handleConnection should exit immediately on ConnContext error")
}
}

func TestServerHandleConnection_BadFrame(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

s := &Server{Handler: &testHandler{}}

done := make(chan struct{})
go func() {
defer close(done)
s.handleConnection(serverConn)
}()

// Write invalid data (too short for a frame header)
_, _ = clientConn.Write([]byte{0, 0, 0, 2, 0xff, 0xff})
_ = clientConn.Close()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handleConnection should exit on bad parse")
}
}

type errorHandler struct{}

func (h *errorHandler) Handle(ctx context.Context, header *protocol.RequestHeader, req kmsg.Request) ([]byte, error) {
return nil, errors.New("handler error")
}

func TestServerHandleConnection_HandlerError(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

s := &Server{Handler: &errorHandler{}}

done := make(chan struct{})
go func() {
defer close(done)
s.handleConnection(serverConn)
}()

if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil {
t.Fatalf("WriteFrame: %v", err)
}

// Handler error should send an error response instead of closing the
// connection so the client can recover gracefully.
frame, err := protocol.ReadFrame(clientConn)
if err != nil {
t.Fatalf("expected error response frame, got read error: %v", err)
}
if len(frame.Payload) == 0 {
t.Fatal("expected non-empty error response payload")
}
}

type nilHandler struct{}

func (h *nilHandler) Handle(ctx context.Context, header *protocol.RequestHeader, req kmsg.Request) ([]byte, error) {
return nil, nil
}

func TestServerHandleConnection_NilResponse(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

s := &Server{Handler: &nilHandler{}}

done := make(chan struct{})
go func() {
defer close(done)
s.handleConnection(serverConn)
}()

if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil {
t.Fatalf("WriteFrame: %v", err)
}

// Handler returns nil → server continues loop, send another then close
if err := protocol.WriteFrame(clientConn, buildApiVersionsRequest()); err != nil {
t.Fatalf("WriteFrame 2: %v", err)
}
_ = clientConn.Close()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handleConnection should exit after client close")
}
}