Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions integration/grpc_store_gateway_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//go:build requires_docker

package integration

import (
"context"
"fmt"
"io"
"net"
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

// Import cortexpb to register the cortexCodec (buffer pooling).
_ "github.com/cortexproject/cortex/pkg/cortexpb"
)

// mockStoreGatewayServer implements storepb.StoreServer and streams
// pre-built SeriesResponse messages for benchmarking.
type mockStoreGatewayServer struct {
storepb.UnimplementedStoreServer
responses []*storepb.SeriesResponse
}

func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
for _, resp := range m.responses {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
}

// BenchmarkGrpcStoreGatewayCalls benchmarks the full gRPC path for store gateway
// Series streaming with the cortexCodec and compression enabled.
// This is the store-gateway equivalent of BenchmarkGrpcCalls (which tests the ingester path).
//
// With SeriesResponse implementing ReleasableMessage, calling Free() after each Recv()
// returns the unmarshal buffer to the pool, reducing per-message allocations by ~32KB.
func BenchmarkGrpcStoreGatewayCalls(b *testing.B) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not the right place for benchmarking store gateways. Can you move to its own folder?

// Build realistic SeriesResponse messages (large enough to trigger buffer pooling).
responses := make([]*storepb.SeriesResponse, 10)
for i := range responses {
responses[i] = createStoreGatewayBenchResponse(i)
}

mock := &mockStoreGatewayServer{responses: responses}

// Start gRPC server.
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(b, err)

gRPCServer := grpc.NewServer()
storepb.RegisterStoreServer(gRPCServer, mock)

go func() {
if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped {
b.Error(err)
}
}()
defer gRPCServer.Stop()

// Connect client with compression (zstd via cortexCodec default call options).
conn, err := grpc.NewClient(
listener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(b, err)
defer conn.Close()

client := storepb.NewStoreClient(conn)

// freeable checks if the response supports Free() (i.e., has MessageWithBufRef embedded).
// This allows the benchmark to compile and run on both old builds (without Free)
// and new builds (with Free), so you can compare results via benchstat.
type freeable interface {
Free()
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
stream, err := client.Series(context.Background(), &storepb.SeriesRequest{})
require.NoError(b, err)

for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
require.NoError(b, err)
if f, ok := interface{}(resp).(freeable); ok {
f.Free()
}
}
}
}

// createStoreGatewayBenchResponse creates a realistic SeriesResponse with chunk data
// large enough to exceed the buffer pooling threshold (~1KB).
func createStoreGatewayBenchResponse(n int) *storepb.SeriesResponse {
lbls := labels.FromStrings(
"__name__", fmt.Sprintf("http_requests_total_%d", n),
"cluster", "us-east-1",
"namespace", "production",
"pod", fmt.Sprintf("web-server-deployment-7f8b9c6d4f-abc%02d", n),
"container", "nginx",
"instance", fmt.Sprintf("10.0.%d.%d:8080", n, n+1),
"job", "kubernetes-pods",
)

// Create chunk data (~4KB per chunk, simulating real store gateway responses).
chunkData := make([]byte, 4096)
for i := range chunkData {
chunkData[i] = byte((i + n) % 256)
}

numChunks := 5 + n
chunks := make([]storepb.AggrChunk, numChunks)
for i := 0; i < numChunks; i++ {
chunks[i] = storepb.AggrChunk{
MinTime: int64(i * 7200000),
MaxTime: int64((i + 1) * 7200000),
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: chunkData,
},
}
}

return &storepb.SeriesResponse{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Chunks: chunks,
},
},
}
}
26 changes: 26 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thanos-io/thanos/pkg/pool"
thanosquery "github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"go.uber.org/atomic"
Expand Down Expand Up @@ -67,6 +68,10 @@ var (
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
defaultAggrs = []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}

// Compile-time check: SeriesResponse must satisfy cortexpb.ReleasableMessage
// so that cortexCodec registers unmarshal buffers for explicit lifecycle management.
_ cortexpb.ReleasableMessage = &storepb.SeriesResponse{}
)

// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
Expand Down Expand Up @@ -675,6 +680,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
myQueriedBlocks := []ulid.ULID(nil)

processSeries := func(s *storepb.Series) error {
// Detach series data from the gRPC unmarshal buffer so that
// resp.Free() can safely return the buffer to the pool.
detachSeriesFromBuffer(s)
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
Expand Down Expand Up @@ -746,13 +754,15 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
// Response may either contain series, batch, warning or hints.
if s := resp.GetSeries(); s != nil {
if err := processSeries(s); err != nil {
resp.Free()
return err
}
}

if b := resp.GetBatch(); b != nil {
for _, s := range b.Series {
if err := processSeries(s); err != nil {
resp.Free()
return err
}
}
Expand All @@ -765,11 +775,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if h := resp.GetHints(); h != nil {
hints := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(h, &hints); err != nil {
resp.Free()
return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress())
}

ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks)
if err != nil {
resp.Free()
return errors.Wrapf(err, "failed to parse queried block IDs from received hints")
}

Expand All @@ -778,6 +790,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
seriesQueryStats.Merge(hints.QueryStats)
}
}

resp.Free()
}

numSeries := len(mySeries)
Expand Down Expand Up @@ -1189,6 +1203,18 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) {
return res, nil
}

// detachSeriesFromBuffer re-allocates label strings and chunk data byte slices
// so that the series no longer references the gRPC unmarshal buffer. This allows
// resp.Free() to safely return the buffer to the pool without causing use-after-free.
func detachSeriesFromBuffer(s *storepb.Series) {
labelpb.ReAllocZLabelsStrings(&s.Labels, true)
for i := range s.Chunks {
if s.Chunks[i].Raw != nil && len(s.Chunks[i].Raw.Data) > 0 {
s.Chunks[i].Raw.Data = append([]byte(nil), s.Chunks[i].Raw.Data...)
}
}
}

// countChunkBytes returns the size of the chunks making up the provided series in bytes
func countChunkBytes(series ...*storepb.Series) (count int) {
for _, s := range series {
Expand Down
Loading
Loading