From b9f9dd580a39c606a163d87f9f4f2bbdcf5f86a7 Mon Sep 17 00:00:00 2001 From: "tamas.albert" Date: Thu, 26 Mar 2026 08:36:42 +0200 Subject: [PATCH] (upstream) chore: Add backfill time window to ensure data consistency chore: Add backfill time window to ensure data consistency (cherry picked from commit 52a9555b782860f9c8ba9db409bd56e0c8f58272) --- .../tasks/enterprise_metrics_collector.go | 39 +---- .../tasks/metrics_collector_test.go | 76 +++++++++- .../gh-copilot/tasks/org_metrics_collector.go | 26 +--- .../tasks/report_download_helper.go | 133 ++++++++++++++++++ .../tasks/user_metrics_collector.go | 64 ++++----- 5 files changed, 244 insertions(+), 94 deletions(-) diff --git a/backend/plugins/gh-copilot/tasks/enterprise_metrics_collector.go b/backend/plugins/gh-copilot/tasks/enterprise_metrics_collector.go index 076b7df7799..60b80bfacc3 100644 --- a/backend/plugins/gh-copilot/tasks/enterprise_metrics_collector.go +++ b/backend/plugins/gh-copilot/tasks/enterprise_metrics_collector.go @@ -20,7 +20,6 @@ package tasks import ( "encoding/json" "fmt" - "io" "net/http" "net/url" "time" @@ -76,6 +75,7 @@ func CollectEnterpriseMetrics(taskCtx plugin.SubTaskContext) errors.Error { now := time.Now().UTC() start, until := computeReportDateRange(now, collector.GetSince()) + start = clampDailyMetricsStartForBackfill(start, until) logger := taskCtx.GetLogger() dayIter := newDayIterator(start, until) @@ -95,42 +95,7 @@ func CollectEnterpriseMetrics(taskCtx plugin.SubTaskContext) errors.Error { Concurrency: 1, AfterResponse: ignore404, ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - // Parse metadata response to get download links - body, readErr := io.ReadAll(res.Body) - res.Body.Close() - if readErr != nil { - return nil, errors.Default.Wrap(readErr, "failed to read report metadata") - } - - var meta reportMetadataResponse - if jsonErr := json.Unmarshal(body, &meta); jsonErr != nil { - snippet := string(body) - if len(snippet) > 200 { - snippet = snippet[:200] - } - logger.Error(jsonErr, "failed to parse report metadata, body=%s", snippet) - return nil, errors.Default.Wrap(jsonErr, "failed to parse report metadata") - } - - if len(meta.DownloadLinks) == 0 { - logger.Info("No download links for report day=%s, skipping", meta.ReportDay) - return nil, nil - } - - // Download each report file and return contents as raw messages - var results []json.RawMessage - for _, link := range meta.DownloadLinks { - reportBody, dlErr := downloadReport(link, logger) - if dlErr != nil { - logger.Error(nil, "failed to download report for day=%s: %s", meta.ReportDay, dlErr.Error()) - return nil, dlErr - } - if reportBody == nil { - continue // blob not found, skip - } - results = append(results, json.RawMessage(reportBody)) - } - return results, nil + return parseRawReportResponse(res, logger) }, }) if err != nil { diff --git a/backend/plugins/gh-copilot/tasks/metrics_collector_test.go b/backend/plugins/gh-copilot/tasks/metrics_collector_test.go index cfbab30406c..10d820eaf4e 100644 --- a/backend/plugins/gh-copilot/tasks/metrics_collector_test.go +++ b/backend/plugins/gh-copilot/tasks/metrics_collector_test.go @@ -18,6 +18,8 @@ limitations under the License. package tasks import ( + "bytes" + "io" "net/http" "testing" "time" @@ -45,6 +47,7 @@ func TestComputeReportDateRangeDefaultLookback(t *testing.T) { } func TestComputeReportDateRangeUsesSince(t *testing.T) { + // since is far enough in the past that the lookback buffer doesn't apply. now := time.Date(2025, 1, 10, 12, 0, 0, 0, time.UTC) since := time.Date(2025, 1, 3, 12, 0, 0, 0, time.UTC) start, until := computeReportDateRange(now, &since) @@ -61,9 +64,80 @@ func TestComputeReportDateRangeClampsToLookback(t *testing.T) { } func TestComputeReportDateRangeClampsFutureSince(t *testing.T) { + // Future since is clamped to until, then the lookback buffer applies. now := time.Date(2025, 1, 10, 12, 0, 0, 0, time.UTC) since := now.Add(24 * time.Hour) start, until := computeReportDateRange(now, &since) require.Equal(t, time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC), until) - require.Equal(t, time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC), start) + require.Equal(t, time.Date(2025, 1, 7, 0, 0, 0, 0, time.UTC), start) +} + +func TestComputeReportDateRangeLookbackBuffer(t *testing.T) { + // since is yesterday: without the buffer we'd only request 1 day (yesterday). + // With the buffer we look back reportLookbackDays days to retry any 404'd days. + now := time.Date(2025, 1, 10, 0, 0, 0, 0, time.UTC) // midnight run + since := time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC) // LatestSuccessStart from previous midnight run + start, until := computeReportDateRange(now, &since) + require.Equal(t, time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC), until) + require.Equal(t, time.Date(2025, 1, 7, 0, 0, 0, 0, time.UTC), start) +} + +func TestClampDailyMetricsStartForBackfillRecentStart(t *testing.T) { + until := time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC) + start := time.Date(2025, 1, 7, 0, 0, 0, 0, time.UTC) + + clamped := clampDailyMetricsStartForBackfill(start, until) + require.Equal(t, time.Date(2025, 1, 6, 0, 0, 0, 0, time.UTC), clamped) +} + +func TestClampDailyMetricsStartForBackfillKeepsOlderStart(t *testing.T) { + until := time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC) + start := time.Date(2025, 1, 3, 0, 0, 0, 0, time.UTC) + + clamped := clampDailyMetricsStartForBackfill(start, until) + require.Equal(t, start, clamped) +} + +func TestUserMetricsDateRangeAppliesFourDayBackfillWindow(t *testing.T) { + now := time.Date(2025, 1, 10, 0, 0, 0, 0, time.UTC) + since := time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC) + + start, until := computeReportDateRange(now, &since) + start = clampDailyMetricsStartForBackfill(start, until) + + require.Equal(t, time.Date(2025, 1, 9, 0, 0, 0, 0, time.UTC), until) + require.Equal(t, time.Date(2025, 1, 6, 0, 0, 0, 0, time.UTC), start) +} + +func TestParseReportMetadataResponseNoContent(t *testing.T) { + res := &http.Response{ + StatusCode: http.StatusNoContent, + Body: io.NopCloser(bytes.NewReader(nil)), + } + + meta, err := parseReportMetadataResponse(res, nil) + require.NoError(t, err) + require.Nil(t, meta) +} + +func TestParseReportMetadataResponseEmptyBody(t *testing.T) { + res := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(nil)), + } + + meta, err := parseReportMetadataResponse(res, nil) + require.NoError(t, err) + require.Nil(t, meta) +} + +func TestParseReportMetadataResponseEmptyString(t *testing.T) { + res := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte(`""`))), + } + + meta, err := parseReportMetadataResponse(res, nil) + require.NoError(t, err) + require.Nil(t, meta) } diff --git a/backend/plugins/gh-copilot/tasks/org_metrics_collector.go b/backend/plugins/gh-copilot/tasks/org_metrics_collector.go index 9eee0fe33fa..aa97c4bb9f6 100644 --- a/backend/plugins/gh-copilot/tasks/org_metrics_collector.go +++ b/backend/plugins/gh-copilot/tasks/org_metrics_collector.go @@ -20,7 +20,6 @@ package tasks import ( "encoding/json" "fmt" - "io" "net/http" "net/url" "time" @@ -70,6 +69,7 @@ func CollectOrgMetrics(taskCtx plugin.SubTaskContext) errors.Error { now := time.Now().UTC() start, until := computeReportDateRange(now, collector.GetSince()) + start = clampDailyMetricsStartForBackfill(start, until) logger := taskCtx.GetLogger() dayIter := newDayIterator(start, until) @@ -89,29 +89,7 @@ func CollectOrgMetrics(taskCtx plugin.SubTaskContext) errors.Error { Concurrency: 1, AfterResponse: ignore404, ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - body, readErr := io.ReadAll(res.Body) - res.Body.Close() - if readErr != nil { - return nil, errors.Default.Wrap(readErr, "failed to read report metadata") - } - - var meta reportMetadataResponse - if jsonErr := json.Unmarshal(body, &meta); jsonErr != nil { - return nil, errors.Default.Wrap(jsonErr, "failed to parse report metadata") - } - - var results []json.RawMessage - for _, link := range meta.DownloadLinks { - reportBody, dlErr := downloadReport(link, logger) - if dlErr != nil { - return nil, dlErr - } - if reportBody == nil { - continue // blob not found, skip - } - results = append(results, json.RawMessage(reportBody)) - } - return results, nil + return parseRawReportResponse(res, logger) }, }) if err != nil { diff --git a/backend/plugins/gh-copilot/tasks/report_download_helper.go b/backend/plugins/gh-copilot/tasks/report_download_helper.go index 857712b8a30..009150ce36e 100644 --- a/backend/plugins/gh-copilot/tasks/report_download_helper.go +++ b/backend/plugins/gh-copilot/tasks/report_download_helper.go @@ -33,6 +33,14 @@ import ( // reportMaxDays is the maximum historical window the new report API supports (1 year). const reportMaxDays = 365 +// reportLookbackDays: extra days rewound from 'until' on incremental runs. +// GitHub reports are generated hours after midnight, so a midnight run gets 404 for the previous +// day. Without this buffer, 'LatestSuccessStart' advances past the missed day permanently. +const reportLookbackDays = 2 + +// dailyMetricsTrailingBackfillDays extends retries for delayed daily report generation. +const dailyMetricsTrailingBackfillDays = 4 + // copilotRawParams identifies a set of raw data records for a given connection/scope. type copilotRawParams struct { ConnectionId uint64 @@ -60,6 +68,14 @@ func ignore404(res *http.Response) errors.Error { return nil } +func clampDailyMetricsStartForBackfill(start, until time.Time) time.Time { + trailingStart := until.AddDate(0, 0, -(dailyMetricsTrailingBackfillDays - 1)) + if start.After(trailingStart) { + return trailingStart + } + return start +} + // reportMetadataResponse represents the JSON returned by the report metadata endpoints. type reportMetadataResponse struct { DownloadLinks []string `json:"download_links"` @@ -69,7 +85,120 @@ type reportMetadataResponse struct { ReportEndDay string `json:"report_end_day"` } +func readReportMetadataBody(res *http.Response) ([]byte, errors.Error) { + body, readErr := io.ReadAll(res.Body) + res.Body.Close() + if readErr != nil { + return nil, errors.Default.Wrap(readErr, "failed to read report metadata") + } + return body, nil +} + +func logReportMetadataParseError(body []byte, err error, logger log.Logger) { + if logger == nil { + return + } + snippet := string(body) + if len(snippet) > 200 { + snippet = snippet[:200] + } + logger.Error(err, "failed to parse report metadata, body=%s", snippet) +} + +func reportMetadataRange(meta reportMetadataResponse) string { + if meta.ReportDay != "" { + return meta.ReportDay + } + if meta.ReportStartDay != "" && meta.ReportEndDay != "" { + return fmt.Sprintf("%s..%s", meta.ReportStartDay, meta.ReportEndDay) + } + return "" +} + +func logMissingDownloadLinks(meta reportMetadataResponse, logger log.Logger) { + if logger == nil || len(meta.DownloadLinks) != 0 { + return + } + reportRange := reportMetadataRange(meta) + if reportRange != "" { + logger.Info("No download links for report day=%s, skipping", reportRange) + return + } + logger.Info("No download links in report metadata, skipping") +} + +func parseReportMetadata(body []byte, logger log.Logger) (*reportMetadataResponse, errors.Error) { + trimmed := bytes.TrimSpace(body) + if len(trimmed) == 0 { + if logger != nil { + logger.Info("Report metadata response was empty, skipping") + } + return nil, nil + } + + // Handle JSON-encoded empty string "" + if bytes.Equal(trimmed, []byte(`""`)) { + if logger != nil { + logger.Info("Report metadata response was empty string, skipping") + } + return nil, nil + } + + var meta reportMetadataResponse + if jsonErr := json.Unmarshal(trimmed, &meta); jsonErr != nil { + logReportMetadataParseError(trimmed, jsonErr, logger) + return nil, errors.Default.Wrap(jsonErr, "failed to parse report metadata") + } + + logMissingDownloadLinks(meta, logger) + + return &meta, nil +} + +func parseReportMetadataResponse(res *http.Response, logger log.Logger) (*reportMetadataResponse, errors.Error) { + if res.StatusCode == http.StatusNoContent { + if logger != nil { + logger.Info("Report metadata not ready yet (204), skipping for now") + } + res.Body.Close() + return nil, nil + } + + body, readErr := readReportMetadataBody(res) + if readErr != nil { + return nil, readErr + } + + return parseReportMetadata(body, logger) +} + +func collectRawReportRecords(downloadLinks []string, logger log.Logger) ([]json.RawMessage, errors.Error) { + var results []json.RawMessage + for _, link := range downloadLinks { + reportBody, dlErr := downloadReport(link, logger) + if dlErr != nil { + return nil, dlErr + } + if reportBody == nil { + continue + } + results = append(results, json.RawMessage(reportBody)) + } + return results, nil +} + +func parseRawReportResponse(res *http.Response, logger log.Logger) ([]json.RawMessage, errors.Error) { + meta, err := parseReportMetadataResponse(res, logger) + if err != nil || meta == nil { + return nil, err + } + + return collectRawReportRecords(meta.DownloadLinks, logger) +} + // computeReportDateRange returns the range of dates to collect, clamped to the API max. +// When 'since' is set, 'start' is rewound to at least 'until - reportLookbackDays' +// so days that returned 404 (report not yet generated) are retried on subsequent runs. func computeReportDateRange(now time.Time, since *time.Time) (start, until time.Time) { until = utcDate(now).AddDate(0, 0, -1) // reports are available for the previous day min := until.AddDate(0, 0, -(reportMaxDays - 1)) @@ -82,6 +211,10 @@ func computeReportDateRange(now time.Time, since *time.Time) (start, until time. if start.After(until) { start = until } + // Rewind 'start' by 'reportLookbackDays' so recently-missed days are retried. + if lookback := until.AddDate(0, 0, -reportLookbackDays); start.After(lookback) { + start = lookback + } } return start, until } diff --git a/backend/plugins/gh-copilot/tasks/user_metrics_collector.go b/backend/plugins/gh-copilot/tasks/user_metrics_collector.go index 526c13bf34b..91df9d79aab 100644 --- a/backend/plugins/gh-copilot/tasks/user_metrics_collector.go +++ b/backend/plugins/gh-copilot/tasks/user_metrics_collector.go @@ -20,18 +20,47 @@ package tasks import ( "encoding/json" "fmt" - "io" "net/http" "net/url" "time" "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/log" "github.com/apache/incubator-devlake/core/plugin" helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" ) const rawUserMetricsTable = "copilot_user_metrics" +func collectUserMetricsRecords(downloadLinks []string, logger log.Logger) ([]json.RawMessage, errors.Error) { + var results []json.RawMessage + for _, link := range downloadLinks { + reportBody, dlErr := downloadReport(link, logger) + if dlErr != nil { + return nil, dlErr + } + if reportBody == nil { + continue // blob not found, skip + } + // Parse JSONL: split by newlines and return each non-empty line. + userRecords, parseErr := parseJSONL(reportBody) + if parseErr != nil { + return nil, parseErr + } + results = append(results, userRecords...) + } + return results, nil +} + +func parseUserMetricsReportResponse(res *http.Response, logger log.Logger) ([]json.RawMessage, errors.Error) { + meta, err := parseReportMetadataResponse(res, logger) + if err != nil || meta == nil { + return nil, err + } + + return collectUserMetricsRecords(meta.DownloadLinks, logger) +} + // CollectUserMetrics collects enterprise user-level daily Copilot usage reports. // These reports are in JSONL format (one JSON object per line per user). // Only available for enterprise-scoped connections. @@ -71,6 +100,7 @@ func CollectUserMetrics(taskCtx plugin.SubTaskContext) errors.Error { now := time.Now().UTC() start, until := computeReportDateRange(now, collector.GetSince()) + start = clampDailyMetricsStartForBackfill(start, until) logger := taskCtx.GetLogger() dayIter := newDayIterator(start, until) @@ -90,37 +120,7 @@ func CollectUserMetrics(taskCtx plugin.SubTaskContext) errors.Error { Concurrency: 1, AfterResponse: ignore404, ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - body, readErr := io.ReadAll(res.Body) - res.Body.Close() - if readErr != nil { - return nil, errors.Default.Wrap(readErr, "failed to read report metadata") - } - - var meta reportMetadataResponse - if jsonErr := json.Unmarshal(body, &meta); jsonErr != nil { - return nil, errors.Default.Wrap(jsonErr, "failed to parse report metadata") - } - - // User reports are JSONL — each download link returns one file where - // each line is a separate JSON object for one user's daily metrics. - // We download the file and split into individual JSON messages. - var results []json.RawMessage - for _, link := range meta.DownloadLinks { - reportBody, dlErr := downloadReport(link, logger) - if dlErr != nil { - return nil, dlErr - } - if reportBody == nil { - continue // blob not found, skip - } - // Parse JSONL: split by newlines and return each non-empty line - userRecords, parseErr := parseJSONL(reportBody) - if parseErr != nil { - return nil, parseErr - } - results = append(results, userRecords...) - } - return results, nil + return parseUserMetricsReportResponse(res, logger) }, }) if err != nil {