Add Redis Stream streaming implementation#8401
Conversation
|
Many thanks for this PR, @buzzers. We will take a closer look at it tomorrow. Thank you for your patience. |
|
Hi @buzzers , It seems that your implementation doesn't guarantee stream event order. While it is true that some providers don't guarantee it (like It appears that the Redis streaming API allow querying from ranges, which would allow us to have a proper rewindable Redis implementation, that would have the same behavior than Do you think you could switch to the range API instead? It would be tremendously valuable to have that in Orleans. |
|
Of course, rewindable streams are very valuable. I will refer to |
|
For But message deletion, if needed, should be done in |
|
@benjaminpetit |
|
Is work on this abandoned? I am really looking forward for this to be merged.... |
9064f9d to
c2476c7
Compare
3e87e9e to
fe10476
Compare
3af5bdc to
1ea7c06
Compare
1ea7c06 to
512f830
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new Redis Streams–backed persistent streaming provider to Orleans (ported from a 3.x implementation) and wires it into the solution with a companion test suite alongside a few test infrastructure cleanups.
Changes:
- Introduces
Orleans.Streaming.Redisprovider (adapter, storage, checkpointer, options, hosting extensions, provider builder). - Adds Redis streaming test coverage in
Orleans.Redis.Testsand shares test runners viaInternalsVisibleTo. - Improves test reliability/consistency (precondition enforcement, await fan-out, “Intermittent” spelling fix across subscription tests).
Reviewed changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| test/TestInfrastructure/TestExtensions/TestClusterPerTest.cs | Ensures preconditions are enforced before cluster initialization. |
| test/Orleans.Streaming.Tests/StreamingTests/SubscriptionMultiplicityTestRunner.cs | Renames intermittent-stream test method for correct spelling. |
| test/Orleans.Streaming.Tests/StreamingTests/StreamingCacheMissTests.cs | Awaits stream fan-out to avoid fire-and-forget tasks. |
| test/Orleans.Streaming.Tests/StreamingTests/ProgrammaticSubscribeTests/SubscriptionObserverWithImplicitSubscribingTestRunner.cs | Enables reuse/overriding of shared test runner logic for Redis tests. |
| test/Orleans.Streaming.Tests/Orleans.Streaming.Tests.csproj | Adds InternalsVisibleTo for Orleans.Redis.Tests. |
| test/Extensions/Orleans.Streaming.NATS.Tests/NatsSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream test method + log message. |
| test/Extensions/Orleans.Streaming.EventHubs.Tests/Streaming/EHSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream test method + log message. |
| test/Extensions/Orleans.Streaming.EventHubs.Tests/Streaming/EHBatchedSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream test method + log message. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisSubscriptionObserverWithImplicitSubscribingTests.cs | Adds Redis tests for implicit subscribing observer scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisSubscriptionMultiplicityTests.cs | Adds Redis subscription multiplicity tests using shared runner. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamTestUtils.cs | Adds Redis test helpers (connection options + key cleanup). |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamTests.cs | Adds Redis single/multi stream end-to-end tests via shared runners. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamStorageTests.cs | Adds unit tests for Redis stream entry-id successor logic. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamingResumeTests.cs | Adds Redis resume tests using shared StreamingResumeTests base. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamFilteringTests.cs | Adds Redis stream filter tests with shared base. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamCacheMissTests.cs | Adds Redis cache-miss behavior tests using shared base. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamBatchingTests.cs | Adds Redis batching tests using shared base runner. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamBatchContainerTests.cs | Adds unit tests for batch container/token behavior. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamAdapterReceiverTests.cs | Adds receiver/checkpoint persistence behavior test against Redis. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamAdapterFactoryTests.cs | Adds factory/options wiring tests for Redis adapter factory. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisProgrammaticSubscribeTests.cs | Adds programmatic subscribe tests for Redis provider. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisClientStreamTests.cs | Adds client stream tests for Redis provider. |
| test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj | References new streaming provider + shared streaming test project. |
| test/Extensions/Orleans.Azure.Tests/Streaming/AQSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream test method + log message. |
| test/Extensions/Orleans.AWS.Tests/Streaming/SQSSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream test method + log message. |
| test/Extensions/Orleans.AdoNet.Tests/Streaming/AdoNetSubscriptionMultiplicityTests.cs | Updates renamed intermittent-stream runner method call. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamSequenceToken.cs | Adds Redis-specific sequence token implementation. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamReceiverOptions.cs | Adds receiver configuration options for Redis streams. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamingOptions.cs | Adds provider configuration options (multiplexer factory, retention, trimming). |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamingException.cs | Adds provider-specific exception type. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamBuilder.cs | Adds silo/client configurator types for Redis streams. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamBatchContainer.cs | Adds batch container encoding/decoding from Redis stream entries. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapterReceiver.cs | Adds receiver implementation for Redis stream queues. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapterFactory.cs | Adds adapter factory and named-options wiring. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapter.cs | Adds queue adapter for producing/receiving via Redis streams. |
| src/Redis/Orleans.Streaming.Redis/Storage/RedisStreamStorage.cs | Adds Redis stream storage abstraction with checkpoint integration. |
| src/Redis/Orleans.Streaming.Redis/Storage/RedisStreamCheckpointer.cs | Adds Redis-backed checkpoint persistence logic. |
| src/Redis/Orleans.Streaming.Redis/Orleans.Streaming.Redis.csproj | Adds new streaming provider project/package definition. |
| src/Redis/Orleans.Streaming.Redis/Hosting/SiloBuilderExtensions.cs | Adds ISiloBuilder extension methods for Redis streams. |
| src/Redis/Orleans.Streaming.Redis/Hosting/RedisStreamingProviderBuilder.cs | Adds provider builder for configuration-based registration. |
| src/Redis/Orleans.Streaming.Redis/Hosting/ClientBuilderExtensions.cs | Adds IClientBuilder extension methods for Redis streams. |
| Orleans.slnx | Adds the new Redis streaming project to the solution. |
Comments suppressed due to low confidence (1)
src/Redis/Orleans.Streaming.Redis/Storage/RedisStreamStorage.cs:119
RedisStreamStorage.ShutdownAsyncflushes the checkpointer but does not close/dispose any Redis connection resources. IfConnectAsynccreated a newIConnectionMultiplexer(the default), it will remain open for the lifetime of the process. Ensure shutdown closes/disposes the multiplexer when this component owns it, or refactor to share a single multiplexer per provider instance.
public async Task ShutdownAsync()
{
if (_checkpointer is { } checkpointer)
{
await checkpointer.FlushAsync();
}
}
ea3f9a0 to
e76a6a1
Compare
Add tests
Use Redis stream IDs as durable sequence tokens, persist checkpoints for range-based reads, move Redis streaming tests into Orleans.Redis.Tests, and fix the stale NATS subscription test rename surfaced by the full solution build. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use the next concrete Redis stream entry id instead of the exclusive range syntax so repeated reads continue after the first batch. Add focused storage tests for ID advancement edge cases. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add service-aware Redis stream builder overloads and mark the new Redis streaming package plus related alpha packages with alpha.1 suffixes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove the options-based AddRedisStreams overloads and track Redis multiplexer ownership so shared keyed connections are not disposed by streaming storage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove the client IServiceCollection overload and expose RedisStreamingOptions as a named options builder property instead of a ConfigureRedis delegate method. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adopt DiagnosticListener-based waits in shared streaming test runners, preserve Redis entry-id sequencing, and address PR feedback in the Redis streaming provider and tests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
6e8f42c to
130c3cb
Compare
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds a new Redis Streams-backed Orleans streaming provider (ported from Orleans 3.x) and extends the test suite to exercise it, while also improving streaming test determinism by waiting on streaming diagnostic events instead of fixed delays/polling.
Changes:
- Introduce
Orleans.Streaming.Redisprovider implementation (adapter, storage, checkpointing, hosting/builder APIs, options, tokens). - Add a comprehensive Redis streaming test suite under
Orleans.Redis.Tests, plus necessary test project wiring. - Refactor/strengthen existing streaming tests to use
StreamingDiagnosticObserverand fix “Intermittent” naming typos.
Show a summary per file
| File | Description |
|---|---|
| test/TestInfrastructure/TestExtensions/TestClusterPerTest.cs | Ensures preconditions are enforced during cluster initialization. |
| test/TestInfrastructure/TestExtensions/Diagnostics/StreamingDiagnosticObserver.cs | Adds subscription-specific item delivery waiting helper used by tests. |
| test/Orleans.Streaming.Tests/StreamingTests/SubscriptionMultiplicityTestRunner.cs | Fixes “Intermitent” typo in helper method name. |
| test/Orleans.Streaming.Tests/StreamingTests/StreamingCacheMissTests.cs | Makes cache-miss tests more deterministic using diagnostics + wait utilities. |
| test/Orleans.Streaming.Tests/StreamingTests/StreamBatchingTestRunner.cs | Switches batching tests to diagnostic-driven waits and simpler assertions. |
| test/Orleans.Streaming.Tests/StreamingTests/SingleStreamTestRunner.cs | Reworks produce/wait phases to use diagnostics and reduce polling flakiness. |
| test/Orleans.Streaming.Tests/StreamingTests/ProgrammaticSubscribeTests/SubscriptionObserverWithImplicitSubscribingTestRunner.cs | Makes runner extensible for Redis and uses diagnostics for deterministic waits. |
| test/Orleans.Streaming.Tests/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsRunner.cs | Refactors programmatic subscribe tests to fixed-count producing + diagnostics. |
| test/Orleans.Streaming.Tests/StreamingTests/ClientStreamTestRunner.cs | Refactors client drop tests to produce exact counts and use diagnostics. |
| test/Orleans.Streaming.Tests/Orleans.Streaming.Tests.csproj | Adds InternalsVisibleTo for Orleans.Redis.Tests. |
| test/Extensions/Orleans.Streaming.NATS.Tests/NatsSubscriptionMultiplicityTests.cs | Renames test/method/log strings to “Intermittent”. |
| test/Extensions/Orleans.Streaming.EventHubs.Tests/Streaming/EHSubscriptionMultiplicityTests.cs | Renames test/method/log strings to “Intermittent”. |
| test/Extensions/Orleans.Streaming.EventHubs.Tests/Streaming/EHBatchedSubscriptionMultiplicityTests.cs | Renames test/method/log strings to “Intermittent”. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisSubscriptionObserverWithImplicitSubscribingTests.cs | Adds Redis coverage for implicit subscription observer scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisSubscriptionMultiplicityTests.cs | Adds Redis coverage for subscription multiplicity scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamTestUtils.cs | Adds Redis test utilities for configuration + cleanup. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamTests.cs | Adds Redis coverage for the core single-stream runner scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamStorageTests.cs | Adds storage lifecycle tests for shared/exclusive multiplexers. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamingResumeTests.cs | Adds Redis coverage for resume/checkpoint scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamingProviderBuilderTests.cs | Tests provider builder behavior with keyed multiplexer registration. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamFilteringTests.cs | Adds Redis coverage for stream filtering scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamCacheMissTests.cs | Adds Redis coverage for cache miss/eviction scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamBuilderExtensionsTests.cs | Tests service registration for client/silo Redis stream builder extensions. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamBatchingTests.cs | Adds Redis coverage for batching scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamBatchContainerTests.cs | Adds unit tests for batch container token creation/ordering semantics. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamAdapterReceiverTests.cs | Adds receiver/checkpoint integration test against Redis. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisStreamAdapterFactoryTests.cs | Adds factory/unit tests for adapter/cache/mapper/failure handler wiring. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisProgrammaticSubscribeTests.cs | Adds Redis coverage for programmatic subscribe runner scenarios. |
| test/Extensions/Orleans.Redis.Tests/Streaming/RedisClientStreamTests.cs | Adds Redis coverage for dropped client scenarios. |
| test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj | Adds Redis streaming project + streaming tests project references. |
| test/Extensions/Orleans.Azure.Tests/Streaming/AQSubscriptionMultiplicityTests.cs | Renames test/method/log strings to “Intermittent”. |
| test/Extensions/Orleans.AWS.Tests/Streaming/SQSSubscriptionMultiplicityTests.cs | Renames test/method/log strings to “Intermittent”. |
| test/Extensions/Orleans.AdoNet.Tests/Streaming/AdoNetSubscriptionMultiplicityTests.cs | Updates call site to renamed “Intermittent” runner method. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamSequenceToken.cs | Implements Redis stream sequence token ordering and serialization. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamReceiverOptions.cs | Adds receiver options (field name, read count). |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamingOptions.cs | Adds provider options (multiplexer creation, expiry, trimming, checkpoint interval). |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamingException.cs | Adds provider-specific exception type. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamBuilder.cs | Adds named configurators for silo/client registration and options wiring. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamBatchContainer.cs | Implements batch container serialization and token parsing from Redis entries. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapterReceiver.cs | Implements IQueueAdapterReceiver for reading/acking Redis stream entries. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapterFactory.cs | Implements IQueueAdapterFactory for Redis stream adapter creation and wiring. |
| src/Redis/Orleans.Streaming.Redis/Streams/RedisStreamAdapter.cs | Implements IQueueAdapter for enqueueing batches into Redis streams. |
| src/Redis/Orleans.Streaming.Redis/Storage/RedisStreamStorage.cs | Implements Redis stream read/write + checkpoint key management. |
| src/Redis/Orleans.Streaming.Redis/Storage/RedisStreamCheckpointer.cs | Implements checkpoint load/save throttling for Redis stream receivers. |
| src/Redis/Orleans.Streaming.Redis/Orleans.Streaming.Redis.csproj | Adds new Redis streaming provider project. |
| src/Redis/Orleans.Streaming.Redis/Hosting/SiloBuilderExtensions.cs | Adds ISiloBuilder.AddRedisStreams(...) extension. |
| src/Redis/Orleans.Streaming.Redis/Hosting/RedisStreamingProviderBuilder.cs | Adds provider builder for config-driven registration (silo + client). |
| src/Redis/Orleans.Streaming.Redis/Hosting/ClientBuilderExtensions.cs | Adds IClientBuilder.AddRedisStreams(...) extension. |
| src/AdoNet/Orleans.Streaming.AdoNet/Orleans.Streaming.AdoNet.csproj | Ensures alpha version suffix is preserved when VersionSuffix is set. |
| src/AdoNet/Orleans.GrainDirectory.AdoNet/Orleans.GrainDirectory.AdoNet.csproj | Ensures alpha version suffix is preserved when VersionSuffix is set. |
| Orleans.slnx | Adds the new Redis streaming provider project to the solution. |
Copilot's findings
- Files reviewed: 51/51 changed files
- Comments generated: 3
| public static ConfigurationOptions GetConfigurationOptions() => ConfigurationOptions.Parse(TestDefaultConfiguration.RedisConnectionString); | ||
|
|
||
| public static async Task DeleteServiceKeysAsync(string serviceId) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(serviceId) || string.IsNullOrWhiteSpace(TestDefaultConfiguration.RedisConnectionString)) | ||
| { | ||
| return; |
There was a problem hiding this comment.
DeleteServiceKeysAsync takes a non-nullable string, but all current call sites pass HostedCluster?.Options.ServiceId (a string?). With Nullable=enable and TreatWarningsAsErrors=true, this will fail the build (CS8604). Change the parameter type to string? (it already guards with IsNullOrWhiteSpace) or adjust call sites to pass a non-null value.
| [GenerateSerializer] | ||
| public class RedisStreamingException : Exception | ||
| { | ||
| /// <summary> | ||
| /// Initializes a new instance of <see cref="RedisStreamingException"/>. | ||
| /// </summary> | ||
| public RedisStreamingException() | ||
| { | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of <see cref="RedisStreamingException"/>. | ||
| /// </summary> | ||
| /// <param name="message">The error message that explains the reason for the exception.</param> | ||
| public RedisStreamingException(string message) | ||
| : base(message) | ||
| { | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of <see cref="RedisStreamingException"/>. | ||
| /// </summary> | ||
| /// <param name="message">The error message that explains the reason for the exception.</param> | ||
| /// <param name="inner">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified.</param> | ||
| public RedisStreamingException(string message, Exception inner) | ||
| : base(message, inner) | ||
| { | ||
| } |
There was a problem hiding this comment.
As a public exception type, this should follow the standard exception constructor pattern used elsewhere in the repo (including the Redis providers): add the [Obsolete] serialization constructor (SerializationInfo, StreamingContext) to avoid analyzer warnings (eg CA1032) and keep behavior consistent with RedisStorageException/RedisClusteringException.
| public void Update(string offset, DateTime utcNow) | ||
| { | ||
| if (string.IsNullOrEmpty(offset) || string.Equals(offset, Offset, StringComparison.Ordinal)) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| Offset = offset; | ||
| if (_inProgressSave.IsCompleted && (!_throttleSavesUntilUtc.HasValue || _throttleSavesUntilUtc.Value <= utcNow)) | ||
| { | ||
| StartSave(offset, utcNow); | ||
| } | ||
| } |
There was a problem hiding this comment.
Update starts SaveAsync in a fire-and-forget way by assigning it to _inProgressSave, but if that task faults it can be overwritten on the next Update (since IsCompleted is true for faulted tasks). That can drop the exception entirely (unobserved task) and hide checkpoint persistence failures. Consider observing/propagating failures (eg keep the faulted task until FlushAsync observes it, or check IsFaulted/Exception before overwriting, or attach a continuation which logs/records the exception).
This was migrated from the 3.x version I'm using and it should work.
Microsoft Reviewers: Open in CodeFlow