-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Python] Add UnboundedSource SDF wrapper (#19137) #38724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
278ceb0
0753268
8a4328a
8871dc5
c23d077
85f10ef
7497cb8
11dbf62
0750e11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,12 @@ | |
|
|
||
| import unittest | ||
|
|
||
| import mock | ||
|
|
||
| import apache_beam as beam | ||
| from apache_beam.io.concat_source import ConcatSource | ||
| from apache_beam.io.concat_source_test import RangeSource | ||
| import mock | ||
| from apache_beam.io import iobase | ||
| from apache_beam.io import range_trackers | ||
| from apache_beam.io.concat_source import ConcatSource | ||
| from apache_beam.io.concat_source_test import RangeSource | ||
| from apache_beam.io.iobase import SourceBundle | ||
| from apache_beam.options.pipeline_options import DebugOptions | ||
| from apache_beam.testing.util import assert_that | ||
|
|
@@ -220,5 +219,68 @@ def test_sdf_wrap_range_source(self): | |
| self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3]) | ||
|
|
||
|
|
||
| class UseSdfUnboundedSourcesTests(unittest.TestCase): | ||
| """Covers the UnboundedSource branch in | ||
| ``iobase.Read.expand()``. Uses ``UnboundedCountingSource`` from | ||
| ``unbounded_source_test`` as a finite fake source (no network). | ||
| """ | ||
| def test_read_end_to_end_unbounded(self): | ||
| from apache_beam.io.unbounded_source_test import UnboundedCountingSource | ||
| with beam.Pipeline() as p: | ||
| out = p | beam.io.Read(UnboundedCountingSource(5)) | ||
| assert_that(out, equal_to([0, 1, 2, 3, 4])) | ||
|
|
||
| def test_read_unbounded_pcollection_is_unbounded(self): | ||
| from apache_beam.io.unbounded_source_test import UnboundedCountingSource | ||
| with beam.Pipeline() as p: | ||
| out = p | beam.io.Read(UnboundedCountingSource(3)) | ||
| self.assertFalse(out.is_bounded) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to run this pipeline to assertFalse Could just use |
||
|
|
||
| def test_to_runner_api_emits_unbounded_read_payload(self): | ||
| """``Read.to_runner_api_parameter`` must serialize an UnboundedSource as | ||
| ``READ.urn`` with ``IsBounded.UNBOUNDED`` so the wire format round-trips | ||
| consistently for pipeline persistence and cross-runner submission. | ||
| """ | ||
| from apache_beam.io.unbounded_source_test import UnboundedCountingSource | ||
| from apache_beam.portability import common_urns | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. apache_beam.portability import shouldn't cause circular import, should just be able to move it top level |
||
| from apache_beam.portability.api import beam_runner_api_pb2 | ||
| from apache_beam.runners.pipeline_context import PipelineContext | ||
|
|
||
| read = beam.io.Read(UnboundedCountingSource(5)) | ||
| urn, payload = read.to_runner_api_parameter(PipelineContext()) | ||
|
|
||
| self.assertEqual(urn, common_urns.deprecated_primitives.READ.urn) | ||
| self.assertIsInstance(payload, beam_runner_api_pb2.ReadPayload) | ||
| self.assertEqual( | ||
| payload.is_bounded, beam_runner_api_pb2.IsBounded.UNBOUNDED) | ||
| # The source field must be populated -- a non-empty FunctionSpec proto. | ||
| self.assertTrue(payload.source.urn) | ||
|
|
||
| def test_read_unbounded_round_trips_through_runner_api(self): | ||
| """Encode then decode via the runner-API protobuf. The restored | ||
| transform must be a ``Read`` wrapping an equivalent UnboundedSource. | ||
| """ | ||
| from apache_beam.io.unbounded_source import UnboundedSource | ||
| from apache_beam.io.unbounded_source_test import UnboundedCountingSource | ||
| from apache_beam.portability.api import beam_runner_api_pb2 | ||
| from apache_beam.runners.pipeline_context import PipelineContext | ||
|
|
||
| original = beam.io.Read(UnboundedCountingSource(7)) | ||
| context = PipelineContext() | ||
| urn, payload = original.to_runner_api_parameter(context) | ||
|
|
||
| transform_proto = beam_runner_api_pb2.PTransform() | ||
| transform_proto.spec.urn = urn | ||
| restored = iobase.Read.from_runner_api_parameter( | ||
| transform_proto, payload, context) | ||
|
|
||
| self.assertIsInstance(restored, iobase.Read) | ||
| self.assertIsInstance(restored.source, UnboundedSource) | ||
| self.assertIsInstance(restored.source, UnboundedCountingSource) | ||
| self.assertFalse(restored.source.is_bounded()) | ||
| # Verify the source's internal state survived pickle round-trip. | ||
| self.assertEqual(restored.source._count, 7) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure whether we should consider Python UnboundedSource as a "primitive" in any case. UnboundedSource was a primitive for Dataflow V1, but Dataflow V1 no longer suport newer Python SDK for long. I believe all other Python runners are portable, which treats SDF as primitive.
cc: @tvalentyn @kennknowles should it be fine to leave "to_runner_api_parameter" unmodified?