Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/scheduled_tasks.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"sessionId":"ba66ac51-bf76-4fbf-9092-e8ebc75117eb","pid":1532766,"procStart":"15266838","acquiredAt":1781242320901}
96 changes: 96 additions & 0 deletions internal/indexer/temporal_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,99 @@ func setup(w Worker) {
assert.Equal(t, child.ID, stubCall.To)
assert.Equal(t, "workflow", stubCall.Meta["temporal_kind"])
}

// TestTemporalE2E_GoEnvDefaultActivity exercises the env-var-with-literal
// -default dispatch name: the workflow names its activity through a
// variable read from os.Getenv with a literal fallback. The pipeline must
// land the call on the default activity but at the speculative tier.
func TestTemporalE2E_GoEnvDefaultActivity(t *testing.T) {
dir := t.TempDir()

writeFile(t, filepath.Join(dir, "workflow.go"), `package wf

import (
"cmp"
"os"

"go.temporal.io/sdk/workflow"
)

func OrderWorkflow(ctx workflow.Context, id string) error {
actName := cmp.Or(os.Getenv("CHARGE_ACTIVITY"), "ChargeCard")
return workflow.ExecuteActivity(ctx, actName, id).Get(ctx, nil)
}
`)
writeFile(t, filepath.Join(dir, "activity.go"), `package wf

import "context"

func ChargeCard(ctx context.Context, id string) error {
return nil
}
`)
writeFile(t, filepath.Join(dir, "main.go"), `package wf

func setupWorker(w Worker) {
w.RegisterWorkflow(OrderWorkflow)
w.RegisterActivity(ChargeCard)
}
`)

g := graph.New()
idx := newTestIndexer(g)
_, err := idx.Index(dir)
require.NoError(t, err)

wf := g.FindNodesByName("OrderWorkflow")[0]
activity := g.FindNodesByName("ChargeCard")[0]

var stubCall *graph.Edge
for _, e := range g.GetOutEdges(wf.ID) {
if e != nil && e.Meta != nil && e.Meta["via"] == "temporal.stub" {
stubCall = e
break
}
}
require.NotNil(t, stubCall, "workflow must have an outbound temporal.stub edge")
assert.Equal(t, activity.ID, stubCall.To,
"env-default dispatch must land on the default activity")
assert.Equal(t, "env_default", stubCall.Meta["temporal_name_origin"])
assert.Equal(t, graph.OriginSpeculative, stubCall.Origin,
"env-default resolution must be speculative")
assert.Equal(t, true, stubCall.Meta[graph.MetaSpeculative],
"env-default edge must be hidden-by-default")
}

// TestTemporalE2E_GoQueryHandler exercises in-workflow handler detection:
// a workflow.SetQueryHandler call must surface as a via=temporal.handler
// edge from the enclosing workflow carrying its kind + name.
func TestTemporalE2E_GoQueryHandler(t *testing.T) {
dir := t.TempDir()

writeFile(t, filepath.Join(dir, "workflow.go"), `package wf

import "go.temporal.io/sdk/workflow"

func StatusWorkflow(ctx workflow.Context) error {
workflow.SetQueryHandler(ctx, "status", func() (string, error) { return "ok", nil })
return nil
}
`)

g := graph.New()
idx := newTestIndexer(g)
_, err := idx.Index(dir)
require.NoError(t, err)

wf := g.FindNodesByName("StatusWorkflow")[0]
var handler *graph.Edge
for _, e := range g.GetOutEdges(wf.ID) {
if e != nil && e.Meta != nil && e.Meta["via"] == "temporal.handler" {
handler = e
break
}
}
require.NotNil(t, handler, "workflow must have an outbound temporal.handler edge")
assert.Equal(t, "query", handler.Meta["temporal_kind"])
assert.Equal(t, "status", handler.Meta["temporal_name"])
}
207 changes: 207 additions & 0 deletions internal/parser/languages/go_temporal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,210 @@ func setup(w Worker) {
require.Len(t, stubs, 1)
require.Len(t, registers, 2)
}

// --- In-workflow handler declarations (query / signal / update) -----
//
// These mirror the Java SDK's @QueryMethod / @SignalMethod /
// @UpdateMethod annotations: from inside a workflow body the Go SDK
// declares the named query / signal / update channels the workflow
// serves. We surface each as a `via=temporal.handler` EdgeCalls edge
// carrying temporal_kind + temporal_name so the graph records, per
// workflow, the named handlers it exposes — symmetric with the Java
// side's per-method annotation edges.

func TestGoTemporal_SetQueryHandler(t *testing.T) {
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx workflow.Context) error {
workflow.SetQueryHandler(ctx, "status", func() (string, error) { return "ok", nil })
return nil
}
`)
edges := temporalEdgesByVia(fix, "temporal.handler")
require.Len(t, edges, 1)
e := edges[0]
assert.Equal(t, "query", e.Meta["temporal_kind"])
assert.Equal(t, "status", e.Meta["temporal_name"])
assert.Equal(t, "pkg/foo.go::OrderWorkflow", e.From,
"handler edge must originate from the enclosing workflow function")
}

func TestGoTemporal_GetSignalChannel(t *testing.T) {
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx workflow.Context) error {
ch := workflow.GetSignalChannel(ctx, "cancel")
_ = ch
return nil
}
`)
edges := temporalEdgesByVia(fix, "temporal.handler")
require.Len(t, edges, 1)
assert.Equal(t, "signal", edges[0].Meta["temporal_kind"])
assert.Equal(t, "cancel", edges[0].Meta["temporal_name"])
}

func TestGoTemporal_SetUpdateHandler(t *testing.T) {
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx workflow.Context) error {
workflow.SetUpdateHandler(ctx, "retry", func() error { return nil })
return nil
}
`)
edges := temporalEdgesByVia(fix, "temporal.handler")
require.Len(t, edges, 1)
assert.Equal(t, "update", edges[0].Meta["temporal_kind"])
assert.Equal(t, "retry", edges[0].Meta["temporal_name"])
}

func TestGoTemporal_SetUpdateHandlerWithOptions(t *testing.T) {
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx workflow.Context) error {
workflow.SetUpdateHandlerWithOptions(ctx, "retry", func() error { return nil }, workflow.UpdateHandlerOptions{})
return nil
}
`)
edges := temporalEdgesByVia(fix, "temporal.handler")
require.Len(t, edges, 1)
assert.Equal(t, "update", edges[0].Meta["temporal_kind"])
assert.Equal(t, "retry", edges[0].Meta["temporal_name"])
}

func TestGoTemporal_HandlerNonLiteralNameUndetected(t *testing.T) {
// Query / signal / update names are matched by string at runtime;
// a non-literal name (variable / selector) can't be pinned here, so
// no handler edge is emitted — high-precision, no guessing.
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx workflow.Context, q string) error {
workflow.SetQueryHandler(ctx, q, func() (string, error) { return "ok", nil })
return nil
}
`)
assert.Empty(t, temporalEdgesByVia(fix, "temporal.handler"),
"non-literal handler name must not be detected")
}

func TestGoTemporal_HandlerAliasedImportNotDetected(t *testing.T) {
// Consistent with the dispatch detector: only the canonical
// "workflow" receiver alias is recognised.
fix := runGoExtract(t, `package wf

import wf "go.temporal.io/sdk/workflow"

func OrderWorkflow(ctx wf.Context) error {
wf.SetQueryHandler(ctx, "status", func() (string, error) { return "ok", nil })
return nil
}
`)
assert.Empty(t, temporalEdgesByVia(fix, "temporal.handler"))
}

// --- Dispatch name from an env-var-with-literal-default variable -----
//
// When the activity / workflow name is a local variable read from an
// env var with a literal fallback, resolve to the literal default and
// flag the stub edge `temporal_name_origin=env_default` so the resolver
// lands it at the speculative tier (the runtime env override may differ
// from the default). Anchored on a literal os.Getenv / os.LookupEnv read
// so the value is provably env-sourced — no general data-flow guessing.

func TestGoTemporal_ExecuteActivity_EnvDefault_CmpOr(t *testing.T) {
fix := runGoExtract(t, `package wf

import (
"cmp"
"os"
"go.temporal.io/sdk/workflow"
)

func WF(ctx workflow.Context) {
actName := cmp.Or(os.Getenv("CHARGE_ACTIVITY"), "ChargeCard")
workflow.ExecuteActivity(ctx, actName, 1)
}
`)
edges := temporalEdgesByVia(fix, "temporal.stub")
require.Len(t, edges, 1)
e := edges[0]
assert.Equal(t, "unresolved::temporal::activity::ChargeCard", e.To,
"name must resolve to the literal default, not the variable identifier")
assert.Equal(t, "ChargeCard", e.Meta["temporal_name"])
assert.Equal(t, "env_default", e.Meta["temporal_name_origin"])
}

func TestGoTemporal_ExecuteActivity_EnvDefault_IfEmpty(t *testing.T) {
fix := runGoExtract(t, `package wf

import (
"os"
"go.temporal.io/sdk/workflow"
)

func WF(ctx workflow.Context) {
name := os.Getenv("CHARGE_ACTIVITY")
if name == "" {
name = "ChargeCard"
}
workflow.ExecuteActivity(ctx, name, 1)
}
`)
edges := temporalEdgesByVia(fix, "temporal.stub")
require.Len(t, edges, 1)
assert.Equal(t, "unresolved::temporal::activity::ChargeCard", edges[0].To)
assert.Equal(t, "ChargeCard", edges[0].Meta["temporal_name"])
assert.Equal(t, "env_default", edges[0].Meta["temporal_name_origin"])
}

func TestGoTemporal_ExecuteActivity_PlainVarNotEnvDefault(t *testing.T) {
// A variable NOT sourced from an env read keeps the existing
// behaviour (trailing identifier as the name) and carries no
// env_default flag — we don't guess at arbitrary variables.
fix := runGoExtract(t, `package wf

import "go.temporal.io/sdk/workflow"

func WF(ctx workflow.Context, picked string) {
actName := picked
workflow.ExecuteActivity(ctx, actName, 1)
}
`)
edges := temporalEdgesByVia(fix, "temporal.stub")
require.Len(t, edges, 1)
assert.Equal(t, "actName", edges[0].Meta["temporal_name"])
_, flagged := edges[0].Meta["temporal_name_origin"]
assert.False(t, flagged, "plain variable must not be flagged env_default")
}

func TestGoTemporal_ExecuteActivity_EnvReadNoLiteralDefault(t *testing.T) {
// os.Getenv with no literal fallback can't be pinned to a name —
// keep the variable identifier, no env_default flag.
fix := runGoExtract(t, `package wf

import (
"os"
"go.temporal.io/sdk/workflow"
)

func WF(ctx workflow.Context) {
name := os.Getenv("CHARGE_ACTIVITY")
workflow.ExecuteActivity(ctx, name, 1)
}
`)
edges := temporalEdgesByVia(fix, "temporal.stub")
require.Len(t, edges, 1)
assert.Equal(t, "name", edges[0].Meta["temporal_name"])
_, flagged := edges[0].Meta["temporal_name_origin"]
assert.False(t, flagged)
}
36 changes: 35 additions & 1 deletion internal/parser/languages/golang.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ type goDeferredCall struct {
tempKind string
tempName string
tempLocal bool
// tempHandlerKind is "query" / "signal" / "update" when this call
// is a `workflow.SetQueryHandler` / `GetSignalChannel` /
// `SetUpdateHandler` in-workflow handler declaration; tempName then
// carries the handler's string name. `via=temporal.handler` meta is
// stamped on the emitted edge in the call post-pass below.
tempHandlerKind string
// tempEnvDefault is set when tempName was resolved from a bare
// variable read from an env var with a literal default (e.g.
// `cmp.Or(os.Getenv("K"), "Default")`). The stub edge is then tagged
// `temporal_name_origin=env_default` so the resolver lands it at the
// speculative tier — the runtime env value may differ from the default.
tempEnvDefault bool
}

type goDeferredTypeRef struct {
Expand Down Expand Up @@ -334,10 +346,20 @@ func (e *GoExtractor) Extract(filePath string, src []byte) (*parser.ExtractionRe
// Temporal workflow → activity dispatch:
// `workflow.ExecuteActivity(ctx, X, ...)` etc.
if kind, local, ok := goTemporalDispatchKind(receiver, method); ok {
if name := goTemporalDispatchName(expr.Node, src); name != "" {
argNode := goTemporalDispatchArg(expr.Node)
if name := goTemporalNameFromExpr(argNode, src); name != "" {
dc.tempKind = kind
dc.tempName = name
dc.tempLocal = local
// Env-default refinement: when the name is a bare local
// variable, try to resolve it to an env-var-with-literal
// -default so the dispatch lands on the default activity.
if argNode != nil && argNode.Type() == "identifier" {
if def, ok := goTemporalEnvDefaultName(expr.Node, name, src); ok {
dc.tempName = def
dc.tempEnvDefault = true
}
}
}
} else if kind, _, ok := goTemporalRegisterKind(method); ok {
// Temporal worker registration:
Expand All @@ -346,6 +368,13 @@ func (e *GoExtractor) Extract(filePath string, src []byte) (*parser.ExtractionRe
dc.tempKind = "register_" + kind
dc.tempName = name
}
} else if hkind, ok := goTemporalHandlerKind(receiver, method); ok {
// Temporal in-workflow handler declaration:
// `workflow.SetQueryHandler(ctx, "name", fn)` etc.
if name := goTemporalHandlerName(expr.Node, src); name != "" {
dc.tempHandlerKind = hkind
dc.tempName = name
}
}
calls = append(calls, dc)
if name, ok := detectGoLogEvent(expr.Node, method, src); ok {
Expand Down Expand Up @@ -650,6 +679,9 @@ func (e *GoExtractor) Extract(filePath string, src []byte) (*parser.ExtractionRe
if c.tempLocal {
meta["temporal_local"] = true
}
if c.tempEnvDefault {
meta["temporal_name_origin"] = "env_default"
}
result.Edges = append(result.Edges, &graph.Edge{
From: callerID, To: target,
Kind: graph.EdgeCalls, FilePath: filePath, Line: c.line,
Expand All @@ -668,6 +700,7 @@ func (e *GoExtractor) Extract(filePath string, src []byte) (*parser.ExtractionRe
}
applyGoGRPCRegisterMeta(edge, c, src, tenv)
applyGoTemporalRegisterMeta(edge, c)
applyGoTemporalHandlerMeta(edge, c)
result.Edges = append(result.Edges, edge)
emitGoSpawnEdge(c, callerID, target, filePath, result)
continue
Expand All @@ -680,6 +713,7 @@ func (e *GoExtractor) Extract(filePath string, src []byte) (*parser.ExtractionRe
}
applyGoGRPCRegisterMeta(edge, c, src, tenv)
applyGoTemporalRegisterMeta(edge, c)
applyGoTemporalHandlerMeta(edge, c)
result.Edges = append(result.Edges, edge)
emitGoSpawnEdge(c, callerID, target, filePath, result)
continue
Expand Down
Loading