Skip to content

Commit f1430c1

Browse files
committed
batches: upload large exec artifacts
Batch executor jobs can produce stdout, stderr, and diffs large enough to bloat the final JSONL result stream. Upload oversized step artifacts from the existing `src batch exec` execution path instead of adding a separate command or user-facing auth flags. `src batch exec` now discovers the executor job context from the environment, uses the existing executor job bearer token plus executor name auth model to stream large artifacts to the Batch Changes artifacts endpoint, and records returned references on `AfterStepResult`. Small outputs stay inline for compatibility, and per-step progress output is suppressed when artifact uploads are enabled so large logs are not duplicated into task-step JSONL events. companian branch in sg: wb/proto-exec-upload Test Plan: - go test ./cmd/src ./internal/batches/executor ./internal/batches/ui github.com/sourcegraph/sourcegraph/lib/batches/execution
1 parent ed8850d commit f1430c1

7 files changed

Lines changed: 429 additions & 42 deletions

File tree

cmd/src/batch_exec.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,11 @@ Examples:
123123
}
124124

125125
func executeBatchSpecInWorkspaces(ctx context.Context, flags *executorModeFlags) (err error) {
126-
ui := &ui.JSONLines{BinaryDiffs: flags.binaryDiffs}
126+
artifactUploader, err := newBatchArtifactUploaderFromEnv(cfg.endpointURL)
127+
if err != nil {
128+
return err
129+
}
130+
ui := &ui.JSONLines{BinaryDiffs: flags.binaryDiffs, SuppressStepOutput: artifactUploader != nil}
127131

128132
// Ensure the temp dir exists.
129133
tempDir := flags.tempDir
@@ -214,6 +218,9 @@ func executeBatchSpecInWorkspaces(ctx context.Context, flags *executorModeFlags)
214218
UI: taskExecUI.StepsExecutionUI(task),
215219
ForceRoot: !flags.runAsImageUser,
216220
BinaryDiffs: flags.binaryDiffs,
221+
222+
ArtifactUploader: artifactUploader,
223+
ArtifactUploadThresholdBytes: defaultArtifactUploadThresholdBytes,
217224
}
218225
results, err := executor.RunSteps(ctx, opts)
219226

cmd/src/batch_exec_artifacts.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/url"
9+
"os"
10+
"strconv"
11+
"strings"
12+
13+
"github.com/sourcegraph/src-cli/internal/batches/executor"
14+
15+
"github.com/sourcegraph/sourcegraph/lib/batches/execution"
16+
"github.com/sourcegraph/sourcegraph/lib/errors"
17+
)
18+
19+
// defaultArtifactUploadThresholdBytes is 1 MiB.
20+
const defaultArtifactUploadThresholdBytes = 1 << 20
21+
22+
type batchArtifactUploader struct {
23+
endpointURL *url.URL
24+
jobID int
25+
jobToken string
26+
executorName string
27+
client *http.Client
28+
}
29+
30+
var _ executor.ArtifactUploader = (*batchArtifactUploader)(nil)
31+
32+
func newBatchArtifactUploaderFromEnv(endpointURL *url.URL) (*batchArtifactUploader, error) {
33+
jobID, _ := strconv.Atoi(os.Getenv("SRC_EXECUTOR_JOB_ID"))
34+
jobToken := os.Getenv("SRC_EXECUTOR_JOB_TOKEN")
35+
executorName := os.Getenv("SRC_EXECUTOR_NAME")
36+
37+
configured := jobID != 0 || jobToken != "" || executorName != ""
38+
if !configured {
39+
return nil, nil
40+
}
41+
if jobID == 0 || jobToken == "" || executorName == "" {
42+
return nil, errors.New("artifact upload requires job ID, job token, and executor name")
43+
}
44+
45+
return &batchArtifactUploader{
46+
endpointURL: endpointURL,
47+
jobID: jobID,
48+
jobToken: jobToken,
49+
executorName: executorName,
50+
client: http.DefaultClient,
51+
}, nil
52+
}
53+
54+
func (u *batchArtifactUploader) Upload(ctx context.Context, artifactKey string, r io.Reader) (execution.Artifact, error) {
55+
if strings.Contains(artifactKey, "/") || strings.Contains(artifactKey, "\\") || strings.Contains(artifactKey, "..") {
56+
return execution.Artifact{}, errors.Newf("invalid artifact key %q", artifactKey)
57+
}
58+
sizeReader := &artifactSizeReader{r: r}
59+
60+
url := u.endpointURL.JoinPath(
61+
".executors",
62+
"queue",
63+
"batches",
64+
"artifacts",
65+
artifactKey,
66+
)
67+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), sizeReader)
68+
if err != nil {
69+
return execution.Artifact{}, err
70+
}
71+
req.Header.Set("Authorization", "Bearer "+u.jobToken)
72+
req.Header.Set("X-Sourcegraph-Executor-Name", u.executorName)
73+
req.Header.Set("X-Sourcegraph-Job-ID", strconv.Itoa(u.jobID))
74+
75+
resp, err := u.client.Do(req)
76+
if err != nil {
77+
return execution.Artifact{}, err
78+
}
79+
defer resp.Body.Close()
80+
81+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
82+
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
83+
return execution.Artifact{}, errors.Newf("artifact upload failed with status %s: %s", resp.Status, strings.TrimSpace(string(body)))
84+
}
85+
86+
var ref execution.Artifact
87+
if err := json.NewDecoder(resp.Body).Decode(&ref); err != nil {
88+
return execution.Artifact{}, errors.Wrap(err, "decoding artifact upload response")
89+
}
90+
if ref.ObjectStorageKey == "" {
91+
return execution.Artifact{}, errors.New("artifact upload response did not include an object storage key")
92+
}
93+
ref.Size = sizeReader.size
94+
return ref, nil
95+
}
96+
97+
type artifactSizeReader struct {
98+
r io.Reader
99+
size int64
100+
}
101+
102+
func (r *artifactSizeReader) Read(p []byte) (int, error) {
103+
n, err := r.r.Read(p)
104+
if n > 0 {
105+
r.size += int64(n)
106+
}
107+
return n, err
108+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"net/url"
10+
"strings"
11+
"testing"
12+
13+
"github.com/sourcegraph/sourcegraph/lib/batches/execution"
14+
)
15+
16+
func TestBatchArtifactUploaderUploadAddsMetadata(t *testing.T) {
17+
const artifactContents = "artifact contents"
18+
19+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
20+
if r.Method != http.MethodPost {
21+
t.Fatalf("unexpected method %q", r.Method)
22+
}
23+
if r.URL.Path != "/.executors/queue/batches/artifacts/stdout" {
24+
t.Fatalf("unexpected path %q", r.URL.Path)
25+
}
26+
if got := r.Header.Get("Authorization"); got != "Bearer token" {
27+
t.Fatalf("unexpected authorization header %q", got)
28+
}
29+
if got := r.Header.Get("X-Sourcegraph-Executor-Name"); got != "executor" {
30+
t.Fatalf("unexpected executor header %q", got)
31+
}
32+
if got := r.Header.Get("X-Sourcegraph-Job-ID"); got != "42" {
33+
t.Fatalf("unexpected job ID header %q", got)
34+
}
35+
36+
body, err := io.ReadAll(r.Body)
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
if string(body) != artifactContents {
41+
t.Fatalf("unexpected body %q", string(body))
42+
}
43+
44+
w.Header().Set("Content-Type", "application/json")
45+
_ = json.NewEncoder(w).Encode(execution.Artifact{ObjectStorageKey: "key"})
46+
}))
47+
t.Cleanup(server.Close)
48+
49+
endpointURL, err := url.Parse(server.URL)
50+
if err != nil {
51+
t.Fatal(err)
52+
}
53+
uploader := &batchArtifactUploader{
54+
endpointURL: endpointURL,
55+
jobID: 42,
56+
jobToken: "token",
57+
executorName: "executor",
58+
client: server.Client(),
59+
}
60+
61+
ref, err := uploader.Upload(context.Background(), "stdout", strings.NewReader(artifactContents))
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
if ref.ObjectStorageKey != "key" {
67+
t.Fatalf("unexpected object storage key %q", ref.ObjectStorageKey)
68+
}
69+
if ref.Size != int64(len(artifactContents)) {
70+
t.Fatalf("unexpected size %d", ref.Size)
71+
}
72+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package executor
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"math"
9+
"os"
10+
11+
"github.com/sourcegraph/sourcegraph/lib/batches/execution"
12+
"github.com/sourcegraph/sourcegraph/lib/errors"
13+
)
14+
15+
const maxInlineArtifactSize = math.MaxInt64
16+
17+
type ArtifactUploader interface {
18+
Upload(ctx context.Context, artifactKey string, r io.Reader) (execution.Artifact, error)
19+
}
20+
21+
type stepOutput struct {
22+
stdout *artifactOutput
23+
stderr *artifactOutput
24+
}
25+
26+
func newStepOutput(dir string, threshold int64) (*stepOutput, error) {
27+
stdout, err := newArtifactOutput(dir, "stdout-*", threshold)
28+
if err != nil {
29+
return nil, err
30+
}
31+
stderr, err := newArtifactOutput(dir, "stderr-*", threshold)
32+
if err != nil {
33+
stdout.cleanup()
34+
return nil, err
35+
}
36+
return &stepOutput{stdout: stdout, stderr: stderr}, nil
37+
}
38+
39+
func (o *stepOutput) cleanup() {
40+
o.stdout.cleanup()
41+
o.stderr.cleanup()
42+
}
43+
44+
type artifactOutput struct {
45+
file *os.File
46+
buf bytes.Buffer
47+
size int64
48+
threshold int64
49+
}
50+
51+
func newArtifactOutput(dir, pattern string, threshold int64) (*artifactOutput, error) {
52+
file, err := os.CreateTemp(dir, pattern)
53+
if err != nil {
54+
return nil, errors.Wrap(err, "creating artifact output file")
55+
}
56+
return &artifactOutput{file: file, threshold: threshold}, nil
57+
}
58+
59+
func (o *artifactOutput) writer() io.Writer { return o }
60+
61+
func (o *artifactOutput) Write(p []byte) (int, error) {
62+
n, err := o.file.Write(p)
63+
o.size += int64(n)
64+
if o.size <= o.threshold {
65+
_, _ = o.buf.Write(p[:n])
66+
}
67+
return n, err
68+
}
69+
70+
func (o *artifactOutput) inline() string {
71+
if o.size > o.threshold {
72+
return ""
73+
}
74+
return o.buf.String()
75+
}
76+
77+
func (o *artifactOutput) shouldUpload(threshold int64) bool {
78+
return o.size > threshold
79+
}
80+
81+
func (o *artifactOutput) reader() (io.Reader, error) {
82+
if _, err := o.file.Seek(0, io.SeekStart); err != nil {
83+
return nil, errors.Wrap(err, "rewinding artifact output file")
84+
}
85+
return o.file, nil
86+
}
87+
88+
func (o *artifactOutput) cleanup() {
89+
if o.file == nil {
90+
return
91+
}
92+
name := o.file.Name()
93+
_ = o.file.Close()
94+
_ = os.Remove(name)
95+
o.file = nil
96+
}
97+
98+
func uploadStepArtifacts(ctx context.Context, uploader ArtifactUploader, threshold int64, stepIndex int, result *execution.AfterStepResult, output *stepOutput) error {
99+
defer output.cleanup()
100+
result.Artifacts = make(map[string]execution.Artifact)
101+
102+
if output.stdout.shouldUpload(threshold) {
103+
ref, err := uploadArtifactOutput(ctx, uploader, artifactKey(stepIndex, execution.ArtifactStdout), output.stdout)
104+
if err != nil {
105+
return err
106+
}
107+
result.Artifacts[execution.ArtifactStdout] = ref
108+
}
109+
110+
if output.stderr.shouldUpload(threshold) {
111+
ref, err := uploadArtifactOutput(ctx, uploader, artifactKey(stepIndex, execution.ArtifactStderr), output.stderr)
112+
if err != nil {
113+
return err
114+
}
115+
result.Artifacts[execution.ArtifactStderr] = ref
116+
}
117+
118+
if int64(len(result.Diff)) > threshold {
119+
ref, err := uploader.Upload(ctx, artifactKey(stepIndex, execution.ArtifactDiff), bytes.NewReader(result.Diff))
120+
if err != nil {
121+
return errors.Wrap(err, "uploading diff artifact")
122+
}
123+
result.Artifacts[execution.ArtifactDiff] = ref
124+
result.Diff = nil
125+
}
126+
127+
return nil
128+
}
129+
130+
func uploadArtifactOutput(ctx context.Context, uploader ArtifactUploader, key string, output *artifactOutput) (execution.Artifact, error) {
131+
reader, err := output.reader()
132+
if err != nil {
133+
return execution.Artifact{}, err
134+
}
135+
ref, err := uploader.Upload(ctx, key, reader)
136+
if err != nil {
137+
return execution.Artifact{}, errors.Wrapf(err, "uploading %s artifact", key)
138+
}
139+
return ref, nil
140+
}
141+
142+
func artifactKey(stepIndex int, name string) string {
143+
return fmt.Sprintf("step-%d-%s", stepIndex, name)
144+
}

0 commit comments

Comments
 (0)