Redesign sati as plain Java client for v3 protocol#42
Open
Redesign sati as plain Java client for v3 protocol#42
Conversation
Remove all Micronaut dependencies. The library is now plain Java 21 with only gRPC, protobuf, and SLF4J. Architecture: - ExileClient: builder-based entry point, composes WorkStream + services - ExileConfig: immutable connection config (certs, endpoint) - WorkStreamClient: single bidirectional gRPC stream implementing the v3 WorkStream protocol (replaces GateClientJobQueue, GateClientEventStream, and SubmitJobResults) - JobHandler interface: integrations implement to process jobs, return results directly (no more calling gateClient.submitJobResults internally) - EventHandler interface: integrations implement to handle events - Domain service clients: thin wrappers around AgentService, CallService, RecordingService, ScrubListService, ConfigService gRPC stubs Removed: - Micronaut framework (context, core, serde, http, validation, aot) - GateClientAbstract, GateClientJobQueue, GateClientEventStream, GateClientPollEvents, GateClientJobStream (all replaced by WorkStreamClient) - PluginInterface god interface (replaced by JobHandler + EventHandler) - 20+ model wrapper classes (use proto types directly) - Config/DiagnosticsService (replaced by ExileConfig/ConfigService) - StructuredLogger/LogCategory (use SLF4J directly) - Jackson, Reactor, HikariCP, Jakarta, SnakeYAML dependencies - demo module (was Micronaut app with copy-pasted controllers) What integrations need to change: - Implement JobHandler (return results) instead of PluginInterface (void) - Implement EventHandler instead of PluginInterface event methods - Use ExileClient.builder() instead of manually creating GateClient + Plugin + GateClientJobQueue + GateClientEventStream - Use proto types directly instead of model wrappers - Use domain service clients (client.agents(), client.calls()) instead of GateClient for unary RPCs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
No protobuf generated interfaces are exposed in the public API. Integrations only see plain Java records, enums, and java.time types. Added: - model/ package: Pool, Record, Field, Filter, Agent, Skill, CallType, AgentState, TaskData, Page<T> — all Java records - model/event/ package: AgentCallEvent, TelephonyResultEvent, AgentResponseEvent, TransferInstanceEvent, CallRecordingEvent, TaskEvent — Java records with java.time.Duration/Instant fields - internal/ProtoConverter: bidirectional conversion between proto types and the Java model. Only class that touches generated proto classes. - service/ServiceFactory: creates service instances with package-private constructors so ManagedChannel doesn't leak Changed: - JobHandler: methods take/return plain Java types (List<Pool>, Page<Record>, Map<String,Object>) instead of proto messages - EventHandler: methods take event records instead of proto messages - All 5 service clients: public methods use Java types only, constructors are package-private - WorkStreamClient: converts between proto and model at the boundary Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New public type StreamStatus with Phase enum tracking the stream lifecycle: IDLE → CONNECTING → REGISTERING → ACTIVE → RECONNECTING → CLOSED. Includes: clientId, connectedSince, lastDisconnect, lastError, inflight count, completedTotal, failedTotal, reconnectAttempts. Accessible via ExileClient.streamStatus() — returns a snapshot. Uses isHealthy() convenience method for simple health checks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New module that eliminates the 400-500 line ConfigChangeWatcher copy-pasted across finvi, capone, latitude, and debtnet. Components: - ConfigParser: reads the Base64-encoded JSON config file (com.tcn.exiles.sati.config.cfg) and produces an ExileConfig. No Jackson dependency — minimal built-in JSON parser. - ConfigFileWatcher: watches /workdir/config for file changes using directory-watcher. Fires Listener callbacks on create/modify/delete. Handles directory discovery and creation. - CertificateRotator: checks certificate expiration and rotates via the gate config service. Writes rotated cert back to the config file (triggering watcher reload). - ExileClientManager: single-tenant lifecycle manager. Watches config file, creates/destroys ExileClient on changes, detects org changes, schedules cert rotation. One builder replaces the entire ConfigChangeWatcher + manual bean wiring. - MultiTenantManager: multi-tenant lifecycle manager for velosidy-like deployments. Polls a tenant provider, reconciles desired vs actual tenants, creates/destroys ExileClients. Dependencies: sati-core + directory-watcher (single external dep). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The api() dependency configuration requires the java-library plugin. Both modules were missing it, causing build failure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace pre-built buf.build Maven artifacts with local code generation using the build.buf gradle plugin. Proto stubs are now generated from source at build time instead of depending on published artifacts. Changes: - Add build.buf plugin (v0.11.0) to core module - Add buf.yaml, buf.gen.yaml, buf.lock for code generation config - Copy v3 proto files into core/proto/ (source of truth) - Pin protoc java plugin to v28.3 and grpc java to v1.68.1 - Add java_multiple_files=true to all proto files (generates top-level classes instead of nested inner classes) - Bump protobuf-java to 4.28.3 (matches remote plugin output) - Move grpc-netty-shaded from runtimeOnly to implementation (ChannelFactory needs compile-time access) - Rename model.Record to model.DataRecord (avoids collision with java.lang.Record) - Fix WorkItem.task oneof field name collision (task→exile_task) - Remove exileapi Maven version pins from gradle.properties - Apply spotless and buf format fixes The build now generates stubs from proto source, so sati can build independently of whether exileapi v3 is published to buf.build BSR. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
62 tests across 7 test classes, all passing. core module (53 tests): - BackoffTest: exponential growth, jitter bounds, cap at 30s, reset - ProtoConverterTest: round-trip conversion for Duration, Timestamp, CallType, AgentState, Pool (all statuses), DataRecord, Field, Filter (all operators), Struct/Map (nested, lists, nulls), TaskData, Agent (with/without connected party), Skill, and all event types (AgentCall, TelephonyResult, CallRecording, Task) - ModelTest: Page.hasMore(), record equality, enum value counts - StreamStatusTest: isHealthy() for all phases - ExileConfigTest: builder, default port, null validation config module (9 tests): - ConfigParserTest: Base64 encoded, raw JSON, missing port, missing endpoint, missing certs, garbage input, empty input, trailing newline, escaped newlines in certificate values Added grpc-testing and grpc-inprocess test dependencies to core. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace local buf proto generation with pre-built artifacts from buf.build BSR, now that exileapi v3 is merged to master. Changes: - Remove build.buf gradle plugin, local proto files, buf.yaml/gen/lock - Add exileapi version pins in gradle.properties pointing to the published v3 artifacts (commit 461190882f3b) - Update all Java imports from old sub-packages (tcnapi.exile.types.v3, tcnapi.exile.worker.v3, etc.) to flat package with BSR prefix (build.buf.gen.tcnapi.exile.v3) - Service files use FQN for proto types to avoid ambiguity with identically-named model classes (Pool, Agent, Filter, etc.) - Move buf Maven repo declaration to root allprojects block Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Minimal demo showing how to use the sati library. No framework dependencies — plain Java 21 with built-in HttpServer. Components: - Main: bootstraps ExileClientManager with config file watching, starts status HTTP server, registers shutdown hook - DemoJobHandler: implements all JobHandler methods with stub data (fake pools, records, payments) for testing without a real CRM - DemoEventHandler: logs all received events (agent calls, telephony results, transfers, recordings, tasks) - StatusServer: lightweight HTTP server using com.sun.net.httpserver with /health (200/503) and /status (JSON) endpoints Usage: ./gradlew :demo:run # run locally ./gradlew :demo:shadowJar # build fat jar java -jar demo/build/libs/demo-all.jar PORT=9090 CONFIG_DIR=/path/to/config java -jar demo-all.jar Env vars: PORT — HTTP port (default: 8080) CONFIG_DIR — override config directory path Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Match the exileapi rename (tcnapi/exile/v3 → tcnapi/exile/gate/v3). All Java imports updated to build.buf.gen.tcnapi.exile.gate.v3. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches exileapi change: AddRecordToJourneyBuffer moved to its own JourneyService. Bumps exileapi to 85794b77f79d.
Polls GetClientConfiguration from the gate every 10 seconds (configurable). Fires onConfigPolled callback when config changes. This replaces the v2 GateClientConfiguration polling loop. New Builder methods: - onConfigPolled(Consumer<ClientConfiguration>) — callback for config changes - configPollInterval(Duration) — polling interval (default 10s) New accessor: - lastPolledConfig() — returns the last config received from gate
- WorkStreamClient: log endpoint, attempt number, backoff delay, exception class name on disconnect/error - ExileClientManager: log endpoint and org when creating client - ExileClient: log exception class on config poll failure - Channel creation: debug log before/after channel setup
- ConfigParser: handle URL-format api_endpoint (https://host) by stripping scheme and extracting host:port - ChannelFactory: convert PKCS#1 (BEGIN RSA PRIVATE KEY) to PKCS#8 using BouncyCastle ASN.1 parser; use InetSocketAddress to avoid unix domain socket name resolver on macOS - Bump grpc to 1.80.0 (matches buf-generated stub transitive dep) - Bump bouncycastle to 1.79 - Demo shadowJar: add mergeServiceFiles() for gRPC service loader
start() now only begins the config poller. The WorkStream opens after the first successful GetClientConfiguration response from the gate and the onConfigPolled callback completes without error. This ensures the integration's resources (database, HTTP client) are initialized before any work items arrive. If the gate is unreachable or the cert is invalid, the client stays in IDLE phase without wasting reconnect attempts on the WorkStream.
New Plugin interface extends both JobHandler and EventHandler, and adds
onConfig(ClientConfiguration) for config validation. The WorkStream
only opens after the plugin accepts the first config.
- Plugin.onConfig() returns boolean: true = ready, false = reject
- Plugin.pluginName() for diagnostics
- ExileClient.builder().plugin(myPlugin) replaces separate
jobHandler/eventHandler/onConfigPolled
- ExileClientManager.builder().plugin(myPlugin) replaces separate
handlers + onConfigChange callback
- DemoPlugin replaces DemoJobHandler + DemoEventHandler
Integration usage:
var manager = ExileClientManager.builder()
.clientName("sati-finvi")
.plugin(new FinviPlugin(dataSource))
.build();
The second config poll (unchanged config) was bypassing the plugin check and starting the WorkStream. Now tracks pluginReady separately from workStreamStarted — the stream only opens if the plugin has explicitly returned true from onConfig().
Plugin now provides default implementations for: - listTenantLogs: reads from MemoryAppender's in-memory log buffer - setLogLevel: uses logback API to change runtime log levels - info: returns basic JVM/OS metadata - diagnostics: returns JVM heap/OS stats Integrations no longer need to implement these — they get real log retrieval and level changes for free. - core depends on logback-ext for MemoryAppender access - logback-ext exports logback-classic as api (was implementation) - demo logback.xml adds MemoryAppender alongside console - DemoPlugin removes redundant overrides that are now defaults
Plugin interface is now lean — just onConfig() and pluginName().
PluginBase provides default implementations for:
- listTenantLogs (reads from MemoryAppender)
- setLogLevel (changes logback level at runtime)
- info (JVM/OS metadata)
- diagnostics (heap stats, processors)
- shutdown (logs warning)
- processLog (logs payload)
Integrations extend PluginBase and only override CRM-specific methods:
public class FinviPlugin extends PluginBase {
public boolean onConfig(...) { ... }
public List<Pool> listPools() { ... }
public void onTelephonyResult(...) { ... }
}
- StatusServer: add /logs endpoint that dumps MemoryAppender contents as JSON (appender_exists, event_count, first 50 entries with timestamps) - PluginBase: add debug logs showing appender event count and returned entry count in listTenantLogs - Demo: add logback-ext dependency for /logs endpoint
Collects: - System: OS, arch, processors, load average, hostname, container detection (docker/cgroup), pod name, storage per mount - Runtime: Java version/vendor/VM, heap (init/used/committed/max), non-heap, thread counts (live/daemon/peak/total started), uptime, GC stats per collector (count + time) - Database: empty by default, plugins override for connection pool stats - Custom: plugin name, MemoryAppender event count
Waits 2 seconds (to allow the result to be sent back) then calls System.exit(0). The delay runs on a virtual thread so it doesn't block the work stream response.
The channel is now created once in start() and reused for all subsequent stream reconnects. Previously, every stream disconnect destroyed the channel and created a new one, requiring a full TLS handshake + DNS resolution + TCP connection each time. Now: stream drops → just open a new WorkStream on the existing channel. The channel handles TCP/TLS reconnection internally. Channel is only destroyed on close().
Compares new-channel-per-stream vs reused-channel performance using in-process gRPC transport. Results show 9-41x speedup with reuse: - avg: 1.37ms → 0.09ms (16x) - p50: 0.36ms → 0.04ms (9x) - p99: 92ms → 2.2ms (41x) - Concurrent: 17,983 streams/sec on single channel
New benchmarks on single reused channel: - Throughput: 208K msgs/sec (10K messages in 48ms) - Round-trip: 2.6µs avg, 1.4µs p50, 16.8µs p99 - Channel reuse: 137x avg, 344x p99 faster than new-channel-per-stream
- Set 4MB flow control window in ChannelFactory to match envoy upstream - Add in-process StreamBenchmark: throughput, payload, flow-controlled, ping - Add LiveBenchmark against exile.dev.tcn.com with mTLS
Metrics: - Add OpenTelemetry SDK with custom GrpcMetricExporter (60s interval) - MetricsManager registers built-in instruments: work completed/failed/ reconnects/inflight/phase, JVM heap/threads, work duration histogram - Expose Meter via ExileClient.meter() for plugin custom metrics - Record work item processing duration in WorkStreamClient Logs: - Enrich MemoryAppender.LogEvent with level, logger, thread, MDC, stack trace - Switch from per-event shipping to periodic 10s drain thread - Increase log buffer from 100 to 1000 events - Add shipStructuredLogs() default method to LogShipper interface - GrpcLogShipper converts structured logs to proto with OTel trace context - Wire log shipper lifecycle into ExileClient start/close BSR stubs bumped to pick up new TelemetryService proto.
- Defer MetricsManager creation to after first config poll (org_id needed) - Set OTel Resource attributes: exile.org_id, exile.config_name, exile.client_id - Merge resource attributes into every exported MetricDataPoint - Split LogEvent.message (raw) from formattedMessage (encoder pattern) - LogRecord.message now carries the raw message, structured fields carry metadata
- Add TraceContextExtractor interface to MemoryAppender (pluggable, OTel-free) - ExileClient wires it to Span.current() so each LogEvent captures the active trace/span IDs at the moment the log event is produced - GrpcLogShipper reads traceId/spanId from LogEvent instead of Span.current() (shipper thread has no active span — must capture at source)
GrpcLogShipper now serializes each log event as a JSON object in the message field: timestamp, level, logger, message, thread, mdc, stackTrace, traceId, spanId. The gate parses this and emits it as messageJson in zerolog output.
- Create a span around each processWorkItem() call with work_id and category attributes. All log lines during job/event handling now carry trace_id and span_id via the TraceContextExtractor. - Register SdkTracerProvider + OpenTelemetrySdk globally so GlobalOpenTelemetry.getTracer() returns a real tracer. - Add opentelemetry-sdk-trace dependency. - Exclude LiveBenchmark from default test runs (requires network).
… init Catch IllegalStateException from buildAndRegisterGlobal() and fall back to build() without global registration. Handles multi-tenant and restart scenarios where GlobalOpenTelemetry was already set.
…essing When the gate sets trace_parent on a WorkItem, sati now parses the W3C traceparent and creates a child span. All logs during job/event handling carry the upstream trace_id and span_id, enabling end-to-end trace correlation: station → gate → sati.
…Expiring, Error) Store SpanContext per work_id in ConcurrentHashMap. When async responses arrive on the gRPC callback thread, temporarily activate the span context so log lines carry the same trace_id/span_id. Cleaned up on RESULT_ACCEPTED (jobs) or after processing (events).
Populate SLF4J MDC with traceId/spanId when processing work items so logback pattern can display them. Update demo logback.xml to include trace context in the console output pattern.
Two new OTel instruments: - exile.plugin.calls (counter) — invocations per method, with status - exile.plugin.duration (histogram) — execution time per method Both carry method and status (ok/error) attributes. Covers all 15 job handler methods and 6 event handler methods automatically.
…trics - Keepalive: 10s ping / 5s timeout (was 32s/30s) — detect dead connections in 15s instead of 62s - Backoff: 500ms base / 10s max (was 2s/30s) — faster recovery on real failures - Skip backoff entirely on RST_STREAM NO_ERROR (envoy stream recycling) and server-initiated close — reconnect immediately - Add exile.work.reconnect_duration histogram and log reconnect time on every re-registration
runStream() returns normally after onError (via latch), so backoff was always reset. Now only reset backoff when lastDisconnectGraceful is true (RST_STREAM NO_ERROR). UNAVAILABLE and other real errors trigger exponential backoff (500ms, 1s, 2s, ... up to 10s).
… cumulative growth
Parse certificate_name from the disk config file, add to ExileConfig, and pass as OTel resource attribute exile.certificate_name. Replaces exile.config_name (from gate poll) for metric label grouping.
Replace pull-on-completion with a periodic pull thread that sends Pull(maxConcurrency - inflight) every 2s. The gate dispatches events only when Pull arrives — no server-side capacity tracking.
20 concurrent work items with 1s pull interval allows ~20 items/sec from periodic pulls alone. Virtual thread parallelism multiplies this further as completed items free capacity before the next pull.
Sati sends one Pull after registration telling the gate to push continuously. gRPC HTTP/2 flow control handles backpressure. Removes the periodic pull thread entirely.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Remove all Micronaut dependencies. The library is now plain Java 21 with only gRPC, protobuf, and SLF4J.
Architecture:
Removed:
What integrations need to change: