Skip to content

Commit 688cb3f

Browse files
committed
Fix metrics
1 parent abe19c7 commit 688cb3f

2 files changed

Lines changed: 58 additions & 2 deletions

File tree

internal/metrics/aggregator/aggregator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,13 @@ func ListProcessorPods(ctx context.Context, c client.Client, namespace string) (
189189
func NewMetricsFilter(scraper *Scraper) func(log logr.Logger, handler http.Handler) (http.Handler, error) {
190190
return func(log logr.Logger, handler http.Handler) (http.Handler, error) {
191191
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
192-
// 1. Get operator metrics by invoking the default handler
192+
// 1. Get operator metrics by invoking the default handler.
193+
// Strip Accept-Encoding so we get uncompressed response — Prometheus expects plain text
194+
// and does not decompress gzip (causes "expected a valid start token, got \"\x1f\"" error).
193195
rec := httptest.NewRecorder()
194-
handler.ServeHTTP(rec, r)
196+
r2 := r.Clone(r.Context())
197+
r2.Header.Del("Accept-Encoding")
198+
handler.ServeHTTP(rec, r2)
195199
operatorMetrics := rec.Body.Bytes()
196200

197201
// 2. Scrape processor pods

internal/metrics/aggregator/aggregator_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,18 @@ limitations under the License.
1717
package aggregator
1818

1919
import (
20+
"compress/gzip"
21+
"net/http"
22+
"net/http/httptest"
2023
"strings"
2124
"testing"
25+
26+
"github.com/go-logr/logr"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
29+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
30+
31+
dataflowv1 "github.com/dataflow-operator/dataflow/api/v1"
2232
)
2333

2434
func TestFilterDataflowMetrics(t *testing.T) {
@@ -92,3 +102,45 @@ func TestMergeWithOperatorMetrics_EmptyProcessor(t *testing.T) {
92102
t.Errorf("MergeWithOperatorMetrics(nil) = %q, want %q", merged, operator)
93103
}
94104
}
105+
106+
// TestMetricsFilter_NoGzip verifies that the filter strips Accept-Encoding so Prometheus
107+
// receives plain text (gzip causes "expected a valid start token, got \"\x1f\"" error).
108+
func TestMetricsFilter_NoGzip(t *testing.T) {
109+
scheme := runtime.NewScheme()
110+
_ = dataflowv1.AddToScheme(scheme)
111+
_ = clientgoscheme.AddToScheme(scheme)
112+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
113+
scraper := NewScraper(fakeClient)
114+
115+
// Handler that returns gzip when Accept-Encoding: gzip is present
116+
gzipHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
117+
plain := "# HELP dataflow_status Status\n# TYPE dataflow_status gauge\ndataflow_status 1\n"
118+
if r.Header.Get("Accept-Encoding") == "gzip" {
119+
w.Header().Set("Content-Encoding", "gzip")
120+
gz := gzip.NewWriter(w)
121+
_, _ = gz.Write([]byte(plain))
122+
_ = gz.Close()
123+
} else {
124+
w.Write([]byte(plain))
125+
}
126+
})
127+
128+
filterFn := NewMetricsFilter(scraper)
129+
wrapped, err := filterFn(logr.Discard(), gzipHandler)
130+
if err != nil {
131+
t.Fatalf("NewMetricsFilter: %v", err)
132+
}
133+
134+
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
135+
req.Header.Set("Accept-Encoding", "gzip")
136+
rec := httptest.NewRecorder()
137+
wrapped.ServeHTTP(rec, req)
138+
139+
body := rec.Body.Bytes()
140+
if len(body) >= 2 && body[0] == 0x1f && body[1] == 0x8b {
141+
t.Error("response is gzip-compressed; Prometheus expects plain text")
142+
}
143+
if !strings.Contains(string(body), "dataflow_status") {
144+
t.Errorf("response should contain dataflow_status, got %q", string(body))
145+
}
146+
}

0 commit comments

Comments
 (0)