Skip to content

Comments

Add new experimental _AsyncStreaming module#400

Open
FranzBusch wants to merge 1 commit intoapple:mainfrom
FranzBusch:fb-async-streaming-base
Open

Add new experimental _AsyncStreaming module#400
FranzBusch wants to merge 1 commit intoapple:mainfrom
FranzBusch:fb-async-streaming-base

Conversation

@FranzBusch
Copy link
Member

This PR adds an experimental module for asynchronous streaming interfaces as a potential evolution to async sequence. These APIs are considered highly unstable and are mostly for experimentation purposes; hence, they are part of an underscored module and all APIs are hidden behind a trait. I expect significant changes to those APIs to offer more functionality such as gather scatter and more.

This PR adds an experimental module for asynchronous streaming interfaces as a potential evolution to async sequence. These APIs are considered highly unstable and are mostly for experimentation purposes; hence, they are part of an underscored module and all APIs are hidden behind a trait. I expect significant changes to those APIs to offer more functionality such as gather scatter and more.
@FranzBusch FranzBusch force-pushed the fb-async-streaming-base branch from 414c3e5 to f06696a Compare February 13, 2026 16:44
Copy link
Member

@Catfish-Man Catfish-Man left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this up :)

/// print("Read \(span.count) numbers")
/// }
/// ```
public func asyncReader() -> some AsyncReader<Element, Never> & SendableMetatype {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle thought: if we do the reader/iterator split like we discussed, is this basically the same as makeIterator() if Array conformed to AsyncReader?

/// // Process the chunk
/// }
/// ```
public consuming func forEach<Failure: Error>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concrete benefits of not being API-stable yet :)

return try await self.base
.read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in
guard span.count > 0 else {
let emptySpan = InlineArray<0, MappedElement>.zero()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmptyCollection().span should do the trick

#endif
mutating func read<Return, Failure: Error>(
maximumCount: Int?,
body: (consuming Span<ReadElement>) async throws(Failure) -> Return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with leaving scatter/gather for a later commit given how tricky it's been

/// automatically handling the iteration and transfer of elements. The operation continues
/// until the reader signals completion by producing an empty span.
@_lifetime(self: copy self)
public mutating func write<ReadFailure: Error>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I'd missed seeing this convenience earlier, I like it!

/// return outputSpan.count
/// }
/// ```
// TODO: EOF should be signaled by providing an empty output span?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the tradition of "next() returns nil to indicate end of Sequence" we could also make this optional instead, but using an empty one probably is nicer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though might lead to more accidentally unterminated loops. Optional has the upside that it requires the explicit acknowledgement of nil and probably a guard let that takes care of the terminating condition.

///
/// This error is thrown when an async writer signals that it cannot accept any more data
/// by providing an empty output span, but there are still elements remaining to be written.
public struct AsyncWriterWroteShortError: Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, was there a reason we couldn't just use "not all the elements have been drained from the span" for this?

(Also do we want InputSpan for this write()?)

Copy link

@czechboy0 czechboy0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool 🙏

#endif
mutating func write(
_ span: Span<WriteElement>
) async throws(EitherError<WriteFailure, AsyncWriterWroteShortError>)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this right that for the writer, we require both callee and caller owned buffer support, but for the writer, we only require callee?

What's the rationale for having 3/4 of the options, rather than 2/2 or 4/4?

var opt = Optional(element)
do {
try await self.write { outputSpan in
outputSpan.append(opt.take()!)
Copy link

@czechboy0 czechboy0 Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will crash if the output span is full. I guess every place where output span is used should check for remaining capacity before appending even a single element.

It's definitely a more difficult spelling to get right compared to a caller-owned write, where you don't have to explicitly check bounds.

///
/// try await writer.write { outputSpan in
/// for item in items {
/// outputSpan.append(item)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a bounds check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants