diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 7446f9df38c9..3cce2c5bb773 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -829,10 +829,9 @@ def _apply_internal( assert isinstance(result.producer.inputs, tuple) if isinstance(result, pvalue.DoOutputsTuple): - all_tags = [result._main_tag] + list(result._tags) - for tag in all_tags: + for tag, pc in list(result._pcolls.items()): if tag not in current.outputs: - current.add_output(result[tag], tag) + current.add_output(pc, tag) continue # If there is already a tag with the same name, increase a counter for diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ac3e6ac4afce..b28fe3c3d14e 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -1648,10 +1648,9 @@ def expand(self, pcoll): all_applied_transforms[xform.full_label] = xform current_transforms.extend(xform.parts) xform = all_applied_transforms['Split Sales'] - # Confirm that Split Sales correctly has three outputs: the main - # (untagged) output plus the two tagged outputs specified by - # ParDo.with_outputs in ParentSalesSplitter. - assert len(xform.outputs) == 3 + # Confirm that Split Sales correctly has two outputs as specified by + # ParDo.with_outputs in ParentSalesSplitter. + assert len(xform.outputs) == 2 if __name__ == '__main__':