Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
.PHONY: install-skills docs-gateway docs-gateway-check

GATEWAY_DOCS_GENERATOR := go run -tags gatewaydocgen ./scripts/generate_gateway_rpc_examples.go

install-skills:
@./scripts/install_skills.sh

docs-gateway:
@go run -tags gatewaydocgen ./scripts/generate_gateway_rpc_examples.go
@$(GATEWAY_DOCS_GENERATOR)

docs-gateway-check:
@go run -tags gatewaydocgen ./scripts/generate_gateway_rpc_examples.go
@$(GATEWAY_DOCS_GENERATOR)
@go run ./scripts/check_gateway_docs
@git diff --exit-code -- docs/generated/gateway-rpc-examples.json
221 changes: 127 additions & 94 deletions internal/gateway/adapters/urlscheme/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,17 @@ func (d *Dispatcher) Dispatch(ctx context.Context, request DispatchRequest) (Dis
if err != nil {
return DispatchResult{}, err
}
if strings.TrimSpace(rpcResponse.JSONRPC) != protocol.JSONRPCVersion {
return DispatchResult{}, newDispatchError(
ErrorCodeUnexpectedResponse,
"unexpected response jsonrpc version",
)
}
if !rawJSONMessageEqual(rpcResponse.ID, rpcRequest.ID) {
return DispatchResult{}, newDispatchError(ErrorCodeUnexpectedResponse, "rpc correlation failed: id mismatch")
}
if rpcResponse.Error != nil && rpcResponse.Result != nil {
return DispatchResult{}, newDispatchError(
ErrorCodeUnexpectedResponse,
"unexpected response payload: both result and error are present",
)
}
if rpcResponse.Error != nil {
return DispatchResult{}, toDispatchErrorFromJSONRPC(rpcResponse.Error)
}
if rpcResponse.Result == nil {
return DispatchResult{}, newDispatchError(ErrorCodeUnexpectedResponse, "gateway response missing result payload")
}

responseFrame, err := decodeResponseFrameResult(rpcResponse.Result)
responseFrame, err := validateRPCFrameResponse(
rpcResponse,
rpcRequest.ID,
"unexpected response jsonrpc version",
"rpc correlation failed: id mismatch",
"unexpected response payload: both result and error are present",
"gateway response missing result payload",
"decode response frame: %v",
)
if err != nil {
return DispatchResult{}, newDispatchError(ErrorCodeUnexpectedResponse, fmt.Sprintf("decode response frame: %v", err))
return DispatchResult{}, err
}
if responseFrame.Action != requestFrame.Action || responseFrame.RequestID != requestFrame.RequestID {
return DispatchResult{}, newDispatchError(
Expand Down Expand Up @@ -238,21 +224,17 @@ func (d *Dispatcher) authenticate(ctx context.Context, conn net.Conn, token stri
if err != nil {
return err
}
if strings.TrimSpace(authResponse.JSONRPC) != protocol.JSONRPCVersion {
return newDispatchError(ErrorCodeUnexpectedResponse, "unexpected auth response jsonrpc version")
}
if !rawJSONMessageEqual(authResponse.ID, authRequest.ID) {
return newDispatchError(ErrorCodeUnexpectedResponse, "rpc correlation failed: auth id mismatch")
}
if authResponse.Error != nil {
return toDispatchErrorFromJSONRPC(authResponse.Error)
}
if authResponse.Result == nil {
return newDispatchError(ErrorCodeUnexpectedResponse, "gateway auth response missing result payload")
}
frame, err := decodeResponseFrameResult(authResponse.Result)
frame, err := validateRPCFrameResponse(
authResponse,
authRequest.ID,
"unexpected auth response jsonrpc version",
"rpc correlation failed: auth id mismatch",
"unexpected response payload: both result and error are present",
"gateway auth response missing result payload",
"decode auth response frame: %v",
)
if err != nil {
return newDispatchError(ErrorCodeUnexpectedResponse, fmt.Sprintf("decode auth response frame: %v", err))
return err
}
if frame.Type != gateway.FrameTypeAck || frame.Action != gateway.FrameActionAuthenticate || frame.RequestID != authRequestID {
return newDispatchError(ErrorCodeUnexpectedResponse, "unexpected auth response frame")
Expand Down Expand Up @@ -347,76 +329,79 @@ func (d *Dispatcher) launchGateway(ctx context.Context, listenAddress string, re

spec, err := resolveLaunchSpecFn()
if err != nil {
d.emitLaunchDecisionLog(launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: "launch_failed",
GatewayCode: ErrorCodeGatewayUnavailable,
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
Message: err.Error(),
})
d.emitLaunchFailureLog(requestID, listenAddress, authToken, launcher.LaunchSpec{}, err)
return err
}

d.emitLaunchDecisionLog(launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: "launch_attempt",
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
LaunchMode: spec.LaunchMode,
ResolvedExec: spec.Executable,
})
d.emitLaunchDecisionLog(newLaunchDecisionLogEntry(
requestID,
listenAddress,
authToken,
"launch_attempt",
"",
spec,
"",
))
launchSpec := spec
launchSpec.Args = buildGatewayLaunchArgs(spec.Args, listenAddress)
if err := startGatewayFn(launchSpec); err != nil {
d.emitLaunchDecisionLog(launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: "launch_failed",
GatewayCode: ErrorCodeGatewayUnavailable,
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
LaunchMode: spec.LaunchMode,
ResolvedExec: spec.Executable,
Message: err.Error(),
})
d.emitLaunchFailureLog(requestID, listenAddress, authToken, spec, err)
return err
}

if err := d.waitGatewayReady(ctx, listenAddress); err != nil {
d.emitLaunchDecisionLog(launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: "launch_failed",
GatewayCode: ErrorCodeGatewayUnavailable,
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
LaunchMode: spec.LaunchMode,
ResolvedExec: spec.Executable,
Message: err.Error(),
})
d.emitLaunchFailureLog(requestID, listenAddress, authToken, spec, err)
return err
}

d.emitLaunchDecisionLog(launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: "launch_ready",
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
LaunchMode: spec.LaunchMode,
ResolvedExec: spec.Executable,
})
d.emitLaunchDecisionLog(newLaunchDecisionLogEntry(
requestID,
listenAddress,
authToken,
"launch_ready",
"",
spec,
"",
))
return nil
}

// validateRPCFrameResponse 统一校验 JSON-RPC 基础字段并解码结果帧,保持调度与鉴权分支一致。
func validateRPCFrameResponse(
response protocol.JSONRPCResponse,
expectedID json.RawMessage,
versionMismatchMessage string,
idMismatchMessage string,
dualPayloadMessage string,
missingResultMessage string,
decodeFrameMessageFormat string,
) (gateway.MessageFrame, error) {
if strings.TrimSpace(response.JSONRPC) != protocol.JSONRPCVersion {
return gateway.MessageFrame{}, newDispatchError(ErrorCodeUnexpectedResponse, versionMismatchMessage)
}
if !rawJSONMessageEqual(response.ID, expectedID) {
return gateway.MessageFrame{}, newDispatchError(ErrorCodeUnexpectedResponse, idMismatchMessage)
}
if response.Error != nil && response.Result != nil {
return gateway.MessageFrame{}, newDispatchError(ErrorCodeUnexpectedResponse, dualPayloadMessage)
}
if response.Error != nil {
return gateway.MessageFrame{}, toDispatchErrorFromJSONRPC(response.Error)
}
if response.Result == nil {
return gateway.MessageFrame{}, newDispatchError(ErrorCodeUnexpectedResponse, missingResultMessage)
}

frame, err := decodeResponseFrameResult(response.Result)
if err != nil {
return gateway.MessageFrame{}, newDispatchError(
ErrorCodeUnexpectedResponse,
fmt.Sprintf(decodeFrameMessageFormat, err),
)
}
return frame, nil
}

// buildGatewayLaunchArgs 构造自动拉起参数,确保子进程监听地址与调度重拨地址一致。
func buildGatewayLaunchArgs(baseArgs []string, listenAddress string) []string {
args := append([]string(nil), baseArgs...)
Expand All @@ -438,12 +423,17 @@ func (d *Dispatcher) waitGatewayReady(ctx context.Context, listenAddress string)
sleepFn = time.Sleep
}

deadline := nowFn().Add(defaultGatewayLaunchTimeout)
startTime := nowFn()
deadline := startTime.Add(defaultGatewayLaunchTimeout)
if ctx != nil {
if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) {
deadline = ctxDeadline
}
}
effectiveTimeout := deadline.Sub(startTime)
if effectiveTimeout < 0 {
effectiveTimeout = 0
}

for {
if err := ensureDispatchContextActive(ctx); err != nil {
Expand All @@ -455,7 +445,7 @@ func (d *Dispatcher) waitGatewayReady(ctx context.Context, listenAddress string)
return nil
}
if !nowFn().Before(deadline) {
return fmt.Errorf("gateway did not become reachable within %s", defaultGatewayLaunchTimeout)
return fmt.Errorf("gateway did not become reachable within %s", effectiveTimeout)
}
sleepFn(defaultGatewayLaunchRetryInterval)
}
Expand All @@ -474,6 +464,49 @@ func (d *Dispatcher) emitLaunchDecisionLog(entry launchDecisionLogEntry) {
d.logger.Print(string(raw))
}

// newLaunchDecisionLogEntry 构造统一的网关拉起日志字段,避免各分支重复拼装。
func newLaunchDecisionLogEntry(
requestID string,
listenAddress string,
authToken string,
status string,
gatewayCode string,
spec launcher.LaunchSpec,
message string,
) launchDecisionLogEntry {
return launchDecisionLogEntry{
RequestID: requestID,
Method: string(protocol.MethodWakeOpenURL),
Source: "url-dispatch",
Status: status,
GatewayCode: gatewayCode,
ListenAddress: listenAddress,
AuthMode: resolveAuthMode(authToken),
LaunchMode: spec.LaunchMode,
ResolvedExec: spec.Executable,
Message: message,
}
}

// emitLaunchFailureLog 输出统一的启动失败日志,保持失败分支字段稳定。
func (d *Dispatcher) emitLaunchFailureLog(
requestID string,
listenAddress string,
authToken string,
spec launcher.LaunchSpec,
err error,
) {
d.emitLaunchDecisionLog(newLaunchDecisionLogEntry(
requestID,
listenAddress,
authToken,
"launch_failed",
ErrorCodeGatewayUnavailable,
spec,
err.Error(),
))
}

// resolveAuthMode 归一化调度鉴权模式,便于日志与兼容性测试稳定断言。
func resolveAuthMode(authToken string) string {
if strings.TrimSpace(authToken) == "" {
Expand Down
Loading
Loading