Add new experimental _AsyncStreaming module#400
Add new experimental _AsyncStreaming module#400FranzBusch wants to merge 1 commit intoapple:mainfrom
_AsyncStreaming module#400Conversation
This PR adds an experimental module for asynchronous streaming interfaces as a potential evolution to async sequence. These APIs are considered highly unstable and are mostly for experimentation purposes; hence, they are part of an underscored module and all APIs are hidden behind a trait. I expect significant changes to those APIs to offer more functionality such as gather scatter and more.
414c3e5 to
f06696a
Compare
Catfish-Man
left a comment
There was a problem hiding this comment.
Thanks for putting this up :)
| /// print("Read \(span.count) numbers") | ||
| /// } | ||
| /// ``` | ||
| public func asyncReader() -> some AsyncReader<Element, Never> & SendableMetatype { |
There was a problem hiding this comment.
Idle thought: if we do the reader/iterator split like we discussed, is this basically the same as makeIterator() if Array conformed to AsyncReader?
| /// // Process the chunk | ||
| /// } | ||
| /// ``` | ||
| public consuming func forEach<Failure: Error>( |
There was a problem hiding this comment.
Concrete benefits of not being API-stable yet :)
| return try await self.base | ||
| .read(maximumCount: maximumCount) { (span) throws(Failure) -> Return in | ||
| guard span.count > 0 else { | ||
| let emptySpan = InlineArray<0, MappedElement>.zero() |
There was a problem hiding this comment.
EmptyCollection().span should do the trick
| #endif | ||
| mutating func read<Return, Failure: Error>( | ||
| maximumCount: Int?, | ||
| body: (consuming Span<ReadElement>) async throws(Failure) -> Return |
There was a problem hiding this comment.
I agree with leaving scatter/gather for a later commit given how tricky it's been
| /// automatically handling the iteration and transfer of elements. The operation continues | ||
| /// until the reader signals completion by producing an empty span. | ||
| @_lifetime(self: copy self) | ||
| public mutating func write<ReadFailure: Error>( |
There was a problem hiding this comment.
Oh I'd missed seeing this convenience earlier, I like it!
| /// return outputSpan.count | ||
| /// } | ||
| /// ``` | ||
| // TODO: EOF should be signaled by providing an empty output span? |
There was a problem hiding this comment.
I guess in the tradition of "next() returns nil to indicate end of Sequence" we could also make this optional instead, but using an empty one probably is nicer
There was a problem hiding this comment.
Though might lead to more accidentally unterminated loops. Optional has the upside that it requires the explicit acknowledgement of nil and probably a guard let that takes care of the terminating condition.
| /// | ||
| /// This error is thrown when an async writer signals that it cannot accept any more data | ||
| /// by providing an empty output span, but there are still elements remaining to be written. | ||
| public struct AsyncWriterWroteShortError: Error { |
There was a problem hiding this comment.
I forget, was there a reason we couldn't just use "not all the elements have been drained from the span" for this?
(Also do we want InputSpan for this write()?)
| #endif | ||
| mutating func write( | ||
| _ span: Span<WriteElement> | ||
| ) async throws(EitherError<WriteFailure, AsyncWriterWroteShortError>) |
There was a problem hiding this comment.
Am I reading this right that for the writer, we require both callee and caller owned buffer support, but for the writer, we only require callee?
What's the rationale for having 3/4 of the options, rather than 2/2 or 4/4?
| var opt = Optional(element) | ||
| do { | ||
| try await self.write { outputSpan in | ||
| outputSpan.append(opt.take()!) |
There was a problem hiding this comment.
This will crash if the output span is full. I guess every place where output span is used should check for remaining capacity before appending even a single element.
It's definitely a more difficult spelling to get right compared to a caller-owned write, where you don't have to explicitly check bounds.
| /// | ||
| /// try await writer.write { outputSpan in | ||
| /// for item in items { | ||
| /// outputSpan.append(item) |
This PR adds an experimental module for asynchronous streaming interfaces as a potential evolution to async sequence. These APIs are considered highly unstable and are mostly for experimentation purposes; hence, they are part of an underscored module and all APIs are hidden behind a trait. I expect significant changes to those APIs to offer more functionality such as gather scatter and more.