Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SageMaker transport for the [Deepgram Java SDK](https://github.com/deepgram/deep

```groovy
dependencies {
implementation 'com.deepgram:deepgram-java-sdk:0.2.1'
implementation 'com.deepgram:deepgram-java-sdk:0.4.0'
implementation 'com.deepgram:deepgram-sagemaker:0.1.2' // x-release-please-version
}
```
Expand All @@ -29,7 +29,7 @@ dependencies {
## Requirements

- Java 11+
- [Deepgram Java SDK](https://github.com/deepgram/deepgram-java-sdk) v0.2.1+
- [Deepgram Java SDK](https://github.com/deepgram/deepgram-java-sdk) v0.4.0+ (the `default ReconnectOptions reconnectOptions()` hook on `DeepgramTransportFactory` is required for storm absorption)
- AWS credentials configured (environment variables, shared credentials file, or IAM role)
- A Deepgram model deployed to an AWS SageMaker endpoint

Expand Down Expand Up @@ -95,6 +95,15 @@ The transport is transparent — the SDK API is identical whether using Deepgram
|-----------|----------|---------|-------------|
| `endpointName` | Yes | — | SageMaker endpoint name |
| `region` | No | `us-west-2` | AWS region |
| `connectionTimeout` | No | `30s` | Max time for the underlying TCP/TLS connect (AWS Netty default is 2 s — bumped here so cold-start endpoints under burst load have time to accept TLS handshakes). |
| `connectionAcquireTimeout` | No | `60s` | Max time to acquire a connection from the Netty pool (AWS Netty default is 10 s — bumped so a 200–500-stream burst doesn't drain the acquire pool). |
| `subscriptionTimeout` | No | `60s` | Max time the transport waits for the AWS SDK to subscribe to the bidi-stream input publisher before failing. A timeout here is treated as a transient connect failure and counts against `maxRetries` / `retryBudget`. |
| `maxConcurrency` | No | `500` | Max simultaneous in-flight HTTP/2 streams across the shared Netty pool. With `maxStreams=1` this is the cap on simultaneous bidirectional streams. |
| `maxRetries` | No | `5` | Max retries on transient AWS errors (throttling, pool-exhausted, transient connect/timeout). Set to `0` to disable internal retry. Terminal errors (auth, validation) bypass this. |
| `initialBackoff` | No | `100ms` | First backoff delay applied after the initial failure. |
| `maxBackoff` | No | `5s` | Cap on the per-attempt backoff delay regardless of multiplier. |
| `backoffMultiplier` | No | `2.0` | Exponential growth factor between retry attempts. Must be `>= 1.0`. |
| `retryBudget` | No | `30s` | Total wall-clock cap across all retry attempts before giving up and surfacing the error to listeners. |

```java
SageMakerConfig config = SageMakerConfig.builder()
Expand All @@ -103,6 +112,88 @@ SageMakerConfig config = SageMakerConfig.builder()
.build();
```

#### High-concurrency notes

The transport's defaults are tuned for high-burst workloads (large numbers of
streams opened in a tight loop against an endpoint that may need to scale up).
If you're opening 200–500 streams simultaneously against a cold endpoint,
the AWS Netty defaults (2 s connect / 10 s acquire) will fire before
the load balancer has accepted all of the inbound TLS handshakes — you'll
see a wave of `connection acquire` and `connect timed out` errors that look
like server-side problems but are really client-side fail-fast tripping early.

This transport ships with more lenient defaults (30 s / 60 s) so the
common high-concurrency path works out of the box. Tighten them if you need
fail-fast behavior in low-latency pipelines:

```java
SageMakerConfig config = SageMakerConfig.builder()
.endpointName("my-deepgram-endpoint")
.region("us-east-2")
.connectionTimeout(Duration.ofSeconds(5))
.connectionAcquireTimeout(Duration.ofSeconds(15))
.build();
```

#### Retry & storm absorption

Transient AWS-side failures (`ThrottlingException`, connection-pool exhaustion, transient
connect/timeout failures) are absorbed by the transport itself: classified as retryable, retried
with exponential backoff up to `maxRetries` and `retryBudget`, with messages enqueued during the
reset window persisted across the reconnect so audio isn't dropped. Only **terminal** errors (auth,
validation) and budget-exhausted retryable errors propagate to `transport.onError(...)` and reach
the application's error handler.

This means the SDK's wrapper-level reconnect (`ReconnectingWebSocketListener`) would compound the
plugin's internal retries into a Throttling-on-Throttling storm under burst load, so the plugin
declares `ReconnectOptions.builder().maxRetries(0).build()` via the
`DeepgramTransportFactory.reconnectOptions()` hook. The SDK applies it automatically when it sees
a `transportFactory` in use; no user wiring required.

To tune retry behavior:

```java
SageMakerConfig config = SageMakerConfig.builder()
.endpointName("my-deepgram-endpoint")
.maxRetries(10)
.initialBackoff(Duration.ofMillis(200))
.maxBackoff(Duration.ofSeconds(10))
.retryBudget(Duration.ofMinutes(1))
.build();
```

Set `maxRetries(0)` to disable internal retry entirely (every transient AWS error then surfaces
immediately to the application).

#### Connection-pool sharing

The default `new SageMakerTransportFactory(config)` constructor backs every factory instance with
a **process-wide shared** `SageMakerRuntimeHttp2AsyncClient`, keyed by the parts of
`SageMakerConfig` that affect the underlying Netty HTTP/2 client (region, max concurrency,
connect/acquire timeouts). Multiple factories built with the same config fingerprint reuse one
Netty event loop group and one connection pool — so naive code that constructs a fresh factory
per stream still gets a single, well-behaved client underneath.

Without sharing, every factory instantiates its own Netty pool, and a burst of N factories
triggers N simultaneous TLS handshakes from N distinct Netty clients against the same SageMaker
endpoint. Under high concurrency (100+ streams) the SageMaker HTTP/2 frontline silently drops a
large fraction of those streams before they ever reach the model container — verified
end-to-end with CloudWatch logs from a 400-stream burst test against a 1× ml.g6.2xlarge endpoint:
without sharing, ~65% of streams never appeared in the Deepgram container's listen log; with
sharing, the burst behaves the same as the canonical Python load-test harness.

Lifecycle:

| Constructor | Client backing | `factory.shutdown()` |
|---|---|---|
| `SageMakerTransportFactory(config)` | shared (lazy-init, keyed by config fingerprint) | no-op — call `SageMakerTransportFactory.shutdownAllSharedClients()` once at app shutdown to release Netty resources |
| `SageMakerTransportFactory(config, smClient)` | caller-provided (BYO, used for testing or custom credential providers) | no-op — caller owns the client lifecycle |

```java
// At app shutdown — releases all shared Netty pools the plugin lazily created.
Runtime.getRuntime().addShutdownHook(new Thread(SageMakerTransportFactory::shutdownAllSharedClients));
```

### Custom AWS Client

For custom credential providers, proxy configuration, or testing:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.deepgram.resources.listen.v2.websocket.V2WebSocketClient;
import com.deepgram.sagemaker.SageMakerConfig;
import com.deepgram.sagemaker.SageMakerTransportFactory;
import com.deepgram.types.ListenV2Model;

import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -85,10 +86,10 @@ public static void main(String[] args) throws Exception {
done.countDown();
});

// Connect — V2 uses model name as string via additionalProperty
// Connect using the typed Flux model constant from the SDK.
CompletableFuture<Void> connectFuture = wsClient.connect(
V2ConnectOptions.builder()
.model("flux-general-en")
.model(ListenV2Model.FLUX_GENERAL_EN)
.build());
connectFuture.get(30, TimeUnit.SECONDS);
System.out.println("Connected. Streaming audio...\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.deepgram.sagemaker.SageMakerConfig;
import com.deepgram.sagemaker.SageMakerTransportFactory;
import com.deepgram.types.ListenV2Encoding;
import com.deepgram.types.ListenV2Model;
import com.deepgram.types.ListenV2SampleRate;

import javax.sound.sampled.AudioFormat;
Expand Down Expand Up @@ -100,7 +101,7 @@ public static void main(String[] args) throws Exception {

CompletableFuture<Void> connectFuture = wsClient.connect(
V2ConnectOptions.builder()
.model("flux-general-en")
.model(ListenV2Model.FLUX_GENERAL_EN)
.encoding(ListenV2Encoding.LINEAR16)
.sampleRate(ListenV2SampleRate.of(16000))
.build());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>com.deepgram</groupId>
<artifactId>deepgram-java-sdk</artifactId>
<version>0.2.1</version>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sagemaker-transport/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies {
// Deepgram Java SDK — provides DeepgramTransport / DeepgramTransportFactory interfaces
api 'com.deepgram:deepgram-java-sdk:0.2.1'
api 'com.deepgram:deepgram-java-sdk:0.4.0'

// AWS SDK v2 — SageMaker Runtime HTTP/2 bidirectional streaming
api platform('software.amazon.awssdk:bom:2.42.0')
Expand Down
Loading
Loading