Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 58 additions & 252 deletions cmd/broker/acl_test.go

Large diffs are not rendered by default.

1,683 changes: 769 additions & 914 deletions cmd/broker/main.go

Large diffs are not rendered by default.

848 changes: 189 additions & 659 deletions cmd/broker/main_test.go

Large diffs are not rendered by default.

864 changes: 390 additions & 474 deletions cmd/proxy/main.go

Large diffs are not rendered by default.

197 changes: 99 additions & 98 deletions cmd/proxy/main_test.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions internal/console/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,14 @@ func statusFromMetadata(meta *metadata.ClusterMetadata, metrics *MetricsSnapshot
partitions := make([]partitionDetails, 0, len(topic.Partitions))
for _, part := range topic.Partitions {
partitions = append(partitions, partitionDetails{
ID: part.PartitionIndex,
Leader: part.LeaderID,
Replicas: len(part.ReplicaNodes),
ISR: len(part.ISRNodes),
ID: part.Partition,
Leader: part.Leader,
Replicas: len(part.Replicas),
ISR: len(part.ISR),
})
}
resp.Topics = append(resp.Topics, topicInfo{
Name: topic.Name,
Name: *topic.Topic,
Partitions: len(topic.Partitions),
State: state,
PartitionsDetails: partitions,
Expand Down
3 changes: 2 additions & 1 deletion internal/console/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/KafScale/platform/pkg/metadata"
"github.com/KafScale/platform/pkg/protocol"
"github.com/twmb/franz-go/pkg/kmsg"
)

func TestConsoleStatusEndpoint(t *testing.T) {
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestStatusFromMetadataInjectsBrokerRuntime(t *testing.T) {
{NodeID: 1, Host: "broker-1"},
},
Topics: []protocol.MetadataTopic{
{Name: "orders"},
{Topic: kmsg.StringPtr("orders")},
},
}
snap := &MetricsSnapshot{
Expand Down
20 changes: 10 additions & 10 deletions internal/mcpserver/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ func fetchOffsetsHandler(opts Options) mcp.ToolHandlerFor[FetchOffsetsInput, Fet
out := FetchOffsetsOutput{GroupID: input.GroupID}
for _, topic := range meta.Topics {
for _, partition := range topic.Partitions {
offset, metaText, err := store.FetchConsumerOffset(ctx, input.GroupID, topic.Name, partition.PartitionIndex)
offset, metaText, err := store.FetchConsumerOffset(ctx, input.GroupID, *topic.Topic, partition.Partition)
if err != nil {
return nil, FetchOffsetsOutput{}, err
}
out.Offsets = append(out.Offsets, OffsetDetails{
Topic: topic.Name,
Partition: partition.PartitionIndex,
Topic: *topic.Topic,
Partition: partition.Partition,
Offset: offset,
Metadata: metaText,
})
Expand Down Expand Up @@ -430,7 +430,7 @@ func describeConfigsHandler(opts Options) mcp.ToolHandlerFor[TopicConfigInput, T
}
topics = make([]string, 0, len(meta.Topics))
for _, topic := range meta.Topics {
topics = append(topics, topic.Name)
topics = append(topics, *topic.Topic)
}
}
out := make([]TopicConfigOutput, 0, len(topics))
Expand All @@ -457,7 +457,7 @@ func summarizeTopics(topics []protocol.MetadataTopic) []TopicSummary {
out := make([]TopicSummary, 0, len(topics))
for _, topic := range topics {
out = append(out, TopicSummary{
Name: topic.Name,
Name: *topic.Topic,
PartitionCount: len(topic.Partitions),
ErrorCode: topic.ErrorCode,
})
Expand All @@ -470,17 +470,17 @@ func toTopicDetail(topic protocol.MetadataTopic) TopicDetail {
partitions := make([]PartitionDetails, 0, len(topic.Partitions))
for _, partition := range topic.Partitions {
partitions = append(partitions, PartitionDetails{
Partition: partition.PartitionIndex,
LeaderID: partition.LeaderID,
Partition: partition.Partition,
LeaderID: partition.Leader,
LeaderEpoch: partition.LeaderEpoch,
ReplicaNodes: copyInt32Slice(partition.ReplicaNodes),
ISRNodes: copyInt32Slice(partition.ISRNodes),
ReplicaNodes: copyInt32Slice(partition.Replicas),
ISRNodes: copyInt32Slice(partition.ISR),
OfflineReplicas: copyInt32Slice(partition.OfflineReplicas),
ErrorCode: partition.ErrorCode,
})
}
return TopicDetail{
Name: topic.Name,
Name: *topic.Topic,
ErrorCode: topic.ErrorCode,
Partitions: partitions,
}
Expand Down
5 changes: 3 additions & 2 deletions internal/mcpserver/tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metadatapb "github.com/KafScale/platform/pkg/gen/metadata"
"github.com/KafScale/platform/pkg/metadata"
"github.com/KafScale/platform/pkg/protocol"
"github.com/twmb/franz-go/pkg/kmsg"
)

func TestRequireStore(t *testing.T) {
Expand All @@ -35,8 +36,8 @@ func TestRequireStore(t *testing.T) {

func TestSummarizeTopicsSorts(t *testing.T) {
topics := []protocol.MetadataTopic{
{Name: "b", Partitions: []protocol.MetadataPartition{{PartitionIndex: 0}}},
{Name: "a", Partitions: []protocol.MetadataPartition{{PartitionIndex: 0}, {PartitionIndex: 1}}},
{Topic: kmsg.StringPtr("b"), Partitions: []protocol.MetadataPartition{{Partition: 0}}},
{Topic: kmsg.StringPtr("a"), Partitions: []protocol.MetadataPartition{{Partition: 0}, {Partition: 1}}},
}
out := summarizeTopics(topics)
if len(out) != 2 || out[0].Name != "a" || out[1].Name != "b" {
Expand Down
Loading