Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.25'
go-version-file: go.mod

- name: Verify go mod tidy
run: |
Expand All @@ -61,6 +61,11 @@ jobs:
- name: Test
run: go test -v ./...

- name: Build (harness)
run: go build -tags harness -v ./...

- name: Test (harness)
run: go test -tags harness -v ./...


- name: Check for binary files in PR
Expand Down
2 changes: 1 addition & 1 deletion cmd/ax/internal/cliutil/cliutil_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewControllerFromConfig(ctx context.Context, cfg *Config) (*controller2.Con
return nil, fmt.Errorf("custom substrate harnesses require AX_SUBSTRATE=1")
}
for _, sc := range cfg.Harnesses.Substrate {
h, err := sc.NewHarness(endpoint)
h, err := sc.NewHarness("")
if err != nil {
return nil, fmt.Errorf("substrate harness %q: %w", sc.ID, err)
}
Expand Down
184 changes: 22 additions & 162 deletions internal/harness/antigravity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,204 +16,64 @@ package harness

import (
"context"
"net"
"strings"
"sync"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/google/ax/proto"
)

type mockHandler struct {
mu sync.Mutex
messages []*proto.Message
complete bool
err error
}

func (h *mockHandler) OnMessage(ctx context.Context, execID string, msg *proto.Message) error {
h.mu.Lock()
defer h.mu.Unlock()
h.messages = append(h.messages, msg)
return h.err
}

func (h *mockHandler) OnComplete(ctx context.Context, execID string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.complete = true
return nil
}

// mockHarnessServer implements proto.HarnessServiceServer for testing.
type mockHarnessServer struct {
proto.UnimplementedHarnessServiceServer
failConnect bool
}

func (s *mockHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error {
if s.failConnect {
return status.Error(codes.Internal, "internal mock server crash")
}

// Read the initiating HarnessRequest{start}.
req, err := stream.Recv()
if err != nil {
return err
}

// 1. Verify conversation details
if req.GetConversationId() != "conv-test" {
return status.Error(codes.InvalidArgument, "invalid conversation_id")
}

// 2. Stream thought frame
tMsg := &proto.Message{
Role: "model",
Content: &proto.Content{
Type: &proto.Content_Thought{
Thought: &proto.ThoughtContent{
Summary: []*proto.ThoughtSummaryContent{
{
Type: &proto.ThoughtSummaryContent_Text{
Text: &proto.TextContent{Text: "Analyzing"},
},
},
},
},
},
},
}
if err := stream.Send(&proto.HarnessResponse{
ConversationId: req.GetConversationId(),
Type: &proto.HarnessResponse_Outputs{
Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{tMsg}},
},
}); err != nil {
return err
}

// 3. Stream text frame
txtMsg := &proto.Message{
Role: "assistant",
Content: &proto.Content{
Type: &proto.Content_Text{
Text: &proto.TextContent{Text: "Hello world"},
},
},
}
if err := stream.Send(&proto.HarnessResponse{
ConversationId: req.GetConversationId(),
Type: &proto.HarnessResponse_Outputs{
Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{txtMsg}},
},
}); err != nil {
return err
}

// 4. Stream end frame
return stream.Send(&proto.HarnessResponse{
ConversationId: req.GetConversationId(),
Type: &proto.HarnessResponse_End{
End: &proto.HarnessEnd{State: proto.State_STATE_COMPLETED},
},
})
}

func TestAntigravityHarness_Run_Success(t *testing.T) {
// Spin up a local TCP listener
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
srv := &mockHarnessServer{
outputs: []*proto.Message{thoughtText("Analyzing"), assistantText("Hello world")},
}
defer lis.Close()

// Initialize and start local gRPC server
grpcServer := grpc.NewServer()
mockServer := &mockHarnessServer{}
proto.RegisterHarnessServiceServer(grpcServer, mockServer)
harnessClient := NewAntigravityHarness(startHarnessServer(t, srv))

go func() {
if err := grpcServer.Serve(lis); err != nil && err != grpc.ErrServerStopped {
t.Errorf("Serve failed: %v", err)
}
}()
defer grpcServer.Stop()

harnessClient := NewAntigravityHarness(lis.Addr().String())
exec, err := harnessClient.Start(context.Background(), "conv-test")
if err != nil {
t.Fatalf("failed to start execution: %v", err)
}
defer exec.Close(context.Background())

msg := &proto.Message{
Role: "user",
Content: &proto.Content{
Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}},
},
}
if err := exec.Queue(context.Background(), msg); err != nil {
if err := exec.Queue(context.Background(), userText("Hi")); err != nil {
t.Fatalf("failed to queue message: %v", err)
}

handler := &mockHandler{}
err = exec.Run(context.Background(), handler)
if err != nil {
if err := exec.Run(context.Background(), handler); err != nil {
t.Fatalf("Run failed: %v", err)
}

handler.mu.Lock()
defer handler.mu.Unlock()

if !handler.complete {
if !handler.isDone() {
t.Error("expected OnComplete to be called")
}
if len(handler.messages) != 2 {
t.Fatalf("expected 2 messages, got %d", len(handler.messages))
msgs := handler.collected()
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if got := msgs[0].GetContent().GetThought().GetSummary()[0].GetText().GetText(); got != "Analyzing" {
t.Errorf("expected 'Analyzing', got %q", got)
}
if handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText() != "Analyzing" {
t.Errorf("expected 'Analyzing', got %q", handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText())
if got := msgs[1].GetContent().GetText().GetText(); got != "Hello world" {
t.Errorf("expected 'Hello world', got %q", got)
}
if handler.messages[1].GetContent().GetText().GetText() != "Hello world" {
t.Errorf("expected 'Hello world', got %q", handler.messages[1].GetContent().GetText().GetText())
// The harness propagated the conversation id to the server.
if convID, _, _ := srv.received(); convID != "conv-test" {
t.Errorf("server got convID=%q, want conv-test", convID)
}
}

func TestAntigravityHarness_Run_ErrorFrame(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
srv := &mockHarnessServer{failConnect: true, errMessage: "internal mock server crash"}
harnessClient := NewAntigravityHarness(startHarnessServer(t, srv))

grpcServer := grpc.NewServer()
mockServer := &mockHarnessServer{failConnect: true}
proto.RegisterHarnessServiceServer(grpcServer, mockServer)

go func() {
_ = grpcServer.Serve(lis)
}()
defer grpcServer.Stop()

harnessClient := NewAntigravityHarness(lis.Addr().String())
exec, _ := harnessClient.Start(context.Background(), "conv-test")
defer exec.Close(context.Background())

msg := &proto.Message{
Role: "user",
Content: &proto.Content{
Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}},
},
if err := exec.Queue(context.Background(), userText("Hi")); err != nil {
t.Fatalf("failed to queue message: %v", err)
}
_ = exec.Queue(context.Background(), msg)

handler := &mockHandler{}
err = exec.Run(context.Background(), handler)
err := exec.Run(context.Background(), &mockHandler{})
if err == nil {
t.Fatal("expected error from Run(), got nil")
}
Expand Down
Loading
Loading