From 739bc788d7fa69f48f1469c1449c9aa9db6e7e61 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 9 Apr 2026 13:28:54 +0000 Subject: [PATCH 01/11] switch from js2py to quickjs --- sdks/python/apache_beam/yaml/yaml_mapping.py | 123 ++++++++---------- sdks/python/apache_beam/yaml/yaml_udf_test.py | 14 +- sdks/python/setup.py | 2 +- 3 files changed, 62 insertions(+), 77 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..5d07d82197b4 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -53,13 +53,11 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists +# Import quickjs package if it exists try: - import js2py - from js2py.base import JsObjectWrapper + import quickjs except ImportError: - js2py = None - JsObjectWrapper = object + quickjs = None _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -178,18 +176,34 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) +class _QuickJsCallable: + def __init__(self, source, name=None): + self.source = source + self.name = name + self._func = None + + def _get_func(self): + if self._func is None: + if quickjs is None: + raise ValueError("quickjs is not installed.") + context = quickjs.Context() + if self.name: + context.eval(self.source) + self._func = context.get(self.name) + else: + self._func = context.eval(self.source) + return self._func + + def __call__(self, *args, **kwargs): + return self._get_func()(*args, **kwargs) def __getstate__(self): - return self.__dict__.copy() + return {'source': self.source, 'name': self.name} def __setstate__(self, state): - self.__dict__.update(state) + self.source = state['source'] + self.name = state['name'] + self._func = None # TODO(yaml) Improve type inferencing for JS UDF's @@ -210,78 +224,49 @@ def py_value_to_js_dict(py_value): def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): - # Check for installed js2py package - if js2py is None: + # Check for installed quickjs package + if quickjs is None: raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) + "Javascript mapping functions require the 'quickjs' package.") - return obj + import json if expression: - source = '\n'.join(['function(__row__) {'] + [ - f' {name} = __row__.{name}' - for name in original_fields if name in expression - ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) + source = '\n'.join( + ['function fn(json_row) {', ' const __row__ = JSON.parse(json_row);'] + + [ + f' const {name} = __row__.{name};' + for name in original_fields if name in expression + ] + [' return JSON.stringify(' + expression + ');'] + ['}']) + js_func = _QuickJsCallable(source, "fn") elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) + # Wrap the callable in a named function to use quickjs.Function + source = ( + f"function fn(json_row) {{ " + f"const row = JSON.parse(json_row); " + f"return JSON.stringify(({callable})(row)); }}") + js_func = _QuickJsCallable(source, "fn") else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) + bridge_source = ( + udf_code + f"\nfunction bridge_fn(json_row) {{ " + f"return JSON.stringify({name}(JSON.parse(json_row))); }}") + js_func = _QuickJsCallable(bridge_source, "bridge_fn") def js_wrapper(row): row_as_dict = py_value_to_js_dict(row) + row_json = json.dumps(row_as_dict) try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: + js_result_json = js_func(row_json) + js_result = json.loads(js_result_json) + except Exception as exn: raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) + f"Error evaluating javascript expression: {exn}") from exn + return dicts_to_rows(js_result) return js_wrapper diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..63116b41c5fa 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -32,10 +32,10 @@ from apache_beam.yaml.yaml_transform import YamlTransform try: - import js2py + import quickjs except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + quickjs = None + logging.warning('quickjs is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +63,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +197,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +252,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +296,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 5d4f86ae4d97..8041c2f0729e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -616,7 +616,7 @@ def get_portability_package_data(): 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + 'quickjs', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From 13867609c6265411ff4248a811da2aa1712f9d1f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 9 Apr 2026 13:48:43 +0000 Subject: [PATCH 02/11] pin to less than 3.13 or not windows --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 8041c2f0729e..0dafb2f1fcd7 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -616,7 +616,7 @@ def get_portability_package_data(): 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'quickjs', + 'quickjs; python_version < "3.13" or platform_system != "Windows"', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From b9dddd36c17bc2ac0dbe22d3f2bd71231e67d4fe Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 13 Apr 2026 13:18:16 +0000 Subject: [PATCH 03/11] fix lint issue --- sdks/python/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 0dafb2f1fcd7..8446f3bd0b43 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -616,7 +616,8 @@ def get_portability_package_data(): 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'quickjs; python_version < "3.13" or platform_system != "Windows"', + 'quickjs; ' + 'python_version < "3.13" or platform_system != "Windows"', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From 901943c935add6a473b8ae711afeccb48976c16f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 15 Apr 2026 11:30:21 +0000 Subject: [PATCH 04/11] update per gemini review --- sdks/python/apache_beam/yaml/yaml_mapping.py | 26 +++++++------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 5d07d82197b4..f9df4458ca94 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -229,23 +229,17 @@ def _expand_javascript_mapping_func( raise ValueError( "Javascript mapping functions require the 'quickjs' package.") - import json - if expression: - source = '\n'.join( - ['function fn(json_row) {', ' const __row__ = JSON.parse(json_row);'] + - [ - f' const {name} = __row__.{name};' - for name in original_fields if name in expression - ] + [' return JSON.stringify(' + expression + ');'] + ['}']) + source = '\n'.join(['function fn(__row__) {'] + [ + f' const {name} = __row__["{name}"];' for name in original_fields + if name.isidentifier() and name in expression + ] + [' return (' + expression + ');'] + ['}']) js_func = _QuickJsCallable(source, "fn") elif callable: # Wrap the callable in a named function to use quickjs.Function - source = ( - f"function fn(json_row) {{ " - f"const row = JSON.parse(json_row); " - f"return JSON.stringify(({callable})(row)); }}") + source = (f"function fn(row) {{ " + f"return ({callable})(row); }}") js_func = _QuickJsCallable(source, "fn") else: @@ -253,16 +247,14 @@ def _expand_javascript_mapping_func( raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() bridge_source = ( - udf_code + f"\nfunction bridge_fn(json_row) {{ " - f"return JSON.stringify({name}(JSON.parse(json_row))); }}") + udf_code + f"\nfunction bridge_fn(row) {{ " + f"return {name}(row); }}") js_func = _QuickJsCallable(bridge_source, "bridge_fn") def js_wrapper(row): row_as_dict = py_value_to_js_dict(row) - row_json = json.dumps(row_as_dict) try: - js_result_json = js_func(row_json) - js_result = json.loads(js_result_json) + js_result = js_func(row_as_dict) except Exception as exn: raise RuntimeError( f"Error evaluating javascript expression: {exn}") from exn From 75a75a8ba699bea4668e52477cf8d72169840479 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 15 Apr 2026 20:28:23 +0000 Subject: [PATCH 05/11] try to fix dict not supported error --- sdks/python/apache_beam/yaml/yaml_mapping.py | 50 +++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index f9df4458ca94..7cedbcbbf50c 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -230,34 +230,58 @@ def _expand_javascript_mapping_func( "Javascript mapping functions require the 'quickjs' package.") if expression: - source = '\n'.join(['function fn(__row__) {'] + [ - f' const {name} = __row__["{name}"];' for name in original_fields + args = [ + name for name in original_fields if name.isidentifier() and name in expression - ] + [' return (' + expression + ');'] + ['}']) + ] + source = '\n'.join([f'function fn({", ".join(args)}) {{'] + + [' return (' + expression + ');'] + ['}']) js_func = _QuickJsCallable(source, "fn") + used_fields = args elif callable: # Wrap the callable in a named function to use quickjs.Function - source = (f"function fn(row) {{ " - f"return ({callable})(row); }}") + source = ( + f"function fn(keys, values) {{ " + f" const row = {{}}; " + f" for (let i = 0; i < keys.length; i++) {{ " + f" row[keys[i]] = values[i]; " + f" }} " + f" return ({callable})(row); }}") js_func = _QuickJsCallable(source, "fn") + used_fields = None else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() bridge_source = ( - udf_code + f"\nfunction bridge_fn(row) {{ " - f"return {name}(row); }}") + udf_code + f"\nfunction bridge_fn(keys, values) {{ " + f" const row = {{}}; " + f" for (let i = 0; i < keys.length; i++) {{ " + f" row[keys[i]] = values[i]; " + f" }} " + f" return {name}(row); }}") js_func = _QuickJsCallable(bridge_source, "bridge_fn") + used_fields = None def js_wrapper(row): - row_as_dict = py_value_to_js_dict(row) - try: - js_result = js_func(row_as_dict) - except Exception as exn: - raise RuntimeError( - f"Error evaluating javascript expression: {exn}") from exn + if expression: + vals = [getattr(row, name) for name in used_fields] + js_vals = [py_value_to_js_dict(val) for val in vals] + try: + js_result = js_func(*js_vals) + except Exception as exn: + raise RuntimeError( + f"Error evaluating javascript expression: {exn}") from exn + else: + row_as_dict = py_value_to_js_dict(row) + try: + js_result = js_func( + list(row_as_dict.keys()), list(row_as_dict.values())) + except Exception as exn: + raise RuntimeError( + f"Error evaluating javascript expression: {exn}") from exn return dicts_to_rows(js_result) return js_wrapper From 69f8d1fb031cc492d4ad7ea1c66804a2dcbed17b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 03:33:45 +0000 Subject: [PATCH 06/11] hybrid approach to fix list issue --- sdks/python/apache_beam/yaml/yaml_mapping.py | 86 +++++++++++++++----- 1 file changed, 64 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 7cedbcbbf50c..48671c7d995d 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -17,6 +17,7 @@ """This module defines the basic MapToFields operation.""" import itertools +import json import re from collections import abc from collections.abc import Callable @@ -229,25 +230,46 @@ def _expand_javascript_mapping_func( raise ValueError( "Javascript mapping functions require the 'quickjs' package.") + def make_bridge_source(func_name, call_expr): + keys_json = json.dumps(list(original_fields)) + return ( + f"function {func_name}(serialized_flags, ...values) {{ " + f" const keys = JSON.parse('{keys_json}'); " + f" const flags = serialized_flags.split(','); " + f" const row = {{}}; " + f" for (let i = 0; i < keys.length; i++) {{ " + f" let val = values[i]; " + f" if (flags[i] === '1') val = JSON.parse(val); " + f" row[keys[i]] = val; " + f" }} " + f" const result = {call_expr}; " + f" return (typeof result === 'object' && result !== null) " + f"? '__json__:' + JSON.stringify(result) : result; " + f"}}") + if expression: args = [ name for name in original_fields if name.isidentifier() and name in expression ] - source = '\n'.join([f'function fn({", ".join(args)}) {{'] + - [' return (' + expression + ');'] + ['}']) + parses = [] + for i, arg in enumerate(args): + parses.append(f" if (flags[{i}] === '1') {arg} = JSON.parse({arg});") + + source = f""" +function fn(serialized_flags, {", ".join(args)}) {{ + const flags = serialized_flags.split(','); +{chr(10).join(parses)} + const result = ({expression}); + return (typeof result === 'object' && result !== null) ? + "__json__:" + JSON.stringify(result) : result; +}} +""" js_func = _QuickJsCallable(source, "fn") used_fields = args elif callable: - # Wrap the callable in a named function to use quickjs.Function - source = ( - f"function fn(keys, values) {{ " - f" const row = {{}}; " - f" for (let i = 0; i < keys.length; i++) {{ " - f" row[keys[i]] = values[i]; " - f" }} " - f" return ({callable})(row); }}") + source = make_bridge_source("fn", f"({callable})(row)") js_func = _QuickJsCallable(source, "fn") used_fields = None @@ -255,33 +277,53 @@ def _expand_javascript_mapping_func( if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - bridge_source = ( - udf_code + f"\nfunction bridge_fn(keys, values) {{ " - f" const row = {{}}; " - f" for (let i = 0; i < keys.length; i++) {{ " - f" row[keys[i]] = values[i]; " - f" }} " - f" return {name}(row); }}") + bridge_source = udf_code + "\n" + make_bridge_source( + "bridge_fn", f"{name}(row)") js_func = _QuickJsCallable(bridge_source, "bridge_fn") used_fields = None def js_wrapper(row): if expression: - vals = [getattr(row, name) for name in used_fields] - js_vals = [py_value_to_js_dict(val) for val in vals] + vals = [py_value_to_js_dict(getattr(row, name)) for name in used_fields] + js_vals = [] + flags = [] + for val in vals: + if isinstance(val, (list, dict)): + js_vals.append(json.dumps(val)) + flags.append('1') + else: + js_vals.append(val) + flags.append('0') + + flags_str = ",".join(flags) try: - js_result = js_func(*js_vals) + js_result = js_func(flags_str, *js_vals) except Exception as exn: raise RuntimeError( f"Error evaluating javascript expression: {exn}") from exn else: row_as_dict = py_value_to_js_dict(row) + js_vals = [] + flags = [] + for name in original_fields: + val = row_as_dict.get(name) + if isinstance(val, (list, dict)): + js_vals.append(json.dumps(val)) + flags.append('1') + else: + js_vals.append(val) + flags.append('0') + + flags_str = ",".join(flags) try: - js_result = js_func( - list(row_as_dict.keys()), list(row_as_dict.values())) + js_result = js_func(flags_str, *js_vals) except Exception as exn: raise RuntimeError( f"Error evaluating javascript expression: {exn}") from exn + + if isinstance(js_result, str) and js_result.startswith("__json__:"): + js_result = json.loads(js_result[9:]) + return dicts_to_rows(js_result) return js_wrapper From 38abfe19e144923a6ea4e15135fac4bf47c1515b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 12:07:27 +0000 Subject: [PATCH 07/11] add comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 48671c7d995d..f718038ae6ea 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -231,6 +231,11 @@ def _expand_javascript_mapping_func( "Javascript mapping functions require the 'quickjs' package.") def make_bridge_source(func_name, call_expr): + # The bridge function facilitates high-performance data transfer from Python + # to QuickJS by reconstructing the row object in JS. + # To minimize JSON overhead, primitives are passed directly, while complex + # types (lists/dicts) are passed as JSON strings and parsed in JS. + # The 'flags' argument indicates which values need parsing. keys_json = json.dumps(list(original_fields)) return ( f"function {func_name}(serialized_flags, ...values) {{ " @@ -283,6 +288,10 @@ def make_bridge_source(func_name, call_expr): used_fields = None def js_wrapper(row): + # Prepare arguments for the JS function. We optimize performance by + # passing primitives directly and only serializing complex types (lists, + # dicts) to JSON strings. A string of flags ('0' or '1') is passed to + # inform the JS bridge which arguments need to be JSON.parsed. if expression: vals = [py_value_to_js_dict(getattr(row, name)) for name in used_fields] js_vals = [] From bff5622ac8a9285c9ef70aacd0100d16368dcc3a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 12:39:41 +0000 Subject: [PATCH 08/11] update logic to no stringify explicitly --- sdks/python/apache_beam/yaml/yaml_mapping.py | 80 +++++++++---------- sdks/python/apache_beam/yaml/yaml_udf_test.py | 50 ++++++++++++ 2 files changed, 87 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index f718038ae6ea..8939cdabdbcb 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -195,8 +195,8 @@ def _get_func(self): self._func = context.eval(self.source) return self._func - def __call__(self, *args, **kwargs): - return self._get_func()(*args, **kwargs) + def __call__(self, *args): + return self._get_func()(*args) def __getstate__(self): return {'source': self.source, 'name': self.name} @@ -239,7 +239,7 @@ def make_bridge_source(func_name, call_expr): keys_json = json.dumps(list(original_fields)) return ( f"function {func_name}(serialized_flags, ...values) {{ " - f" const keys = JSON.parse('{keys_json}'); " + f" const keys = {keys_json}; " f" const flags = serialized_flags.split(','); " f" const row = {{}}; " f" for (let i = 0; i < keys.length; i++) {{ " @@ -248,8 +248,9 @@ def make_bridge_source(func_name, call_expr): f" row[keys[i]] = val; " f" }} " f" const result = {call_expr}; " - f" return (typeof result === 'object' && result !== null) " - f"? '__json__:' + JSON.stringify(result) : result; " + f" if (result instanceof Date) " + f"return {{__type__: 'date', value: result.toISOString()}}; " + f" return result; " f"}}") if expression: @@ -266,8 +267,8 @@ def make_bridge_source(func_name, call_expr): const flags = serialized_flags.split(','); {chr(10).join(parses)} const result = ({expression}); - return (typeof result === 'object' && result !== null) ? - "__json__:" + JSON.stringify(result) : result; + if (result instanceof Date) return {{__type__: 'date', value: result.toISOString()}}; + return result; }} """ js_func = _QuickJsCallable(source, "fn") @@ -287,6 +288,18 @@ def make_bridge_source(func_name, call_expr): js_func = _QuickJsCallable(bridge_source, "bridge_fn") used_fields = None + def _prepare_args(vals): + js_vals = [] + flags = [] + for val in vals: + if isinstance(val, (list, dict)): + js_vals.append(json.dumps(val)) + flags.append('1') + else: + js_vals.append(val) + flags.append('0') + return ",".join(flags), js_vals + def js_wrapper(row): # Prepare arguments for the JS function. We optimize performance by # passing primitives directly and only serializing complex types (lists, @@ -294,44 +307,25 @@ def js_wrapper(row): # inform the JS bridge which arguments need to be JSON.parsed. if expression: vals = [py_value_to_js_dict(getattr(row, name)) for name in used_fields] - js_vals = [] - flags = [] - for val in vals: - if isinstance(val, (list, dict)): - js_vals.append(json.dumps(val)) - flags.append('1') - else: - js_vals.append(val) - flags.append('0') - - flags_str = ",".join(flags) - try: - js_result = js_func(flags_str, *js_vals) - except Exception as exn: - raise RuntimeError( - f"Error evaluating javascript expression: {exn}") from exn else: row_as_dict = py_value_to_js_dict(row) - js_vals = [] - flags = [] - for name in original_fields: - val = row_as_dict.get(name) - if isinstance(val, (list, dict)): - js_vals.append(json.dumps(val)) - flags.append('1') - else: - js_vals.append(val) - flags.append('0') - - flags_str = ",".join(flags) - try: - js_result = js_func(flags_str, *js_vals) - except Exception as exn: - raise RuntimeError( - f"Error evaluating javascript expression: {exn}") from exn - - if isinstance(js_result, str) and js_result.startswith("__json__:"): - js_result = json.loads(js_result[9:]) + vals = [row_as_dict.get(name) for name in original_fields] + + flags_str, js_vals = _prepare_args(vals) + + try: + js_result = js_func(flags_str, *js_vals) + except Exception as exn: + raise RuntimeError( + f"Error evaluating javascript expression: {exn}") from exn + + if isinstance(js_result, quickjs.Object): + obj = json.loads(js_result.json()) + if isinstance(obj, dict) and obj.get('__type__') == 'date': + import datetime + js_result = datetime.datetime.fromisoformat(obj['value']) + else: + js_result = obj return dicts_to_rows(js_result) diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 63116b41c5fa..b1d5846d95a5 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -109,6 +109,56 @@ def test_map_to_fields_filter_inline_js(self): row=beam.Row(rank=2, values=[7, 8, 9, 12])), ])) + @unittest.skipIf(quickjs is None, 'quickjs not installed.') + def test_map_to_fields_date_js(self): + import datetime + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: + elements = p | beam.Create([beam.Row(val=1)]) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + date: + callable: | + function get_date(x) { + return new Date("2026-04-16T12:00:00.000Z") + } + ''') + assert_that( + result, + equal_to([ + beam.Row( + date=datetime.datetime( + 2026, 4, 16, 12, 0, 0, tzinfo=datetime.timezone.utc)), + ])) + + @unittest.skipIf(quickjs is None, 'quickjs not installed.') + def test_map_to_fields_new_complex_types_js(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: + elements = p | beam.Create([beam.Row(val=1)]) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + arr: + callable: "function(x) { return [1, 2, 3]; }" + obj: + callable: "function(x) { return {a: 1, b: 'two'}; }" + ''') + assert_that( + result, + equal_to([ + beam.Row(arr=[1, 2, 3], obj=beam.Row(a=1, b='two')), + ])) + def test_map_to_fields_filter_inline_py(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: From ddc986c811e0410a793082ebeefe41f9d7ea8821 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 13:05:37 +0000 Subject: [PATCH 09/11] address gemini design comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 8939cdabdbcb..7717f75d7378 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -16,9 +16,11 @@ # """This module defines the basic MapToFields operation.""" +import datetime import itertools import json import re +import threading from collections import abc from collections.abc import Callable from collections.abc import Collection @@ -181,19 +183,20 @@ class _QuickJsCallable: def __init__(self, source, name=None): self.source = source self.name = name - self._func = None + self._local = threading.local() def _get_func(self): - if self._func is None: + if not hasattr(self._local, 'func'): if quickjs is None: raise ValueError("quickjs is not installed.") context = quickjs.Context() if self.name: context.eval(self.source) - self._func = context.get(self.name) + self._local.func = context.get(self.name) else: - self._func = context.eval(self.source) - return self._func + self._local.func = context.eval(self.source) + self._local.context = context # Keep context alive + return self._local.func def __call__(self, *args): return self._get_func()(*args) @@ -204,7 +207,7 @@ def __getstate__(self): def __setstate__(self, state): self.source = state['source'] self.name = state['name'] - self._func = None + self._local = threading.local() # TODO(yaml) Improve type inferencing for JS UDF's @@ -263,7 +266,7 @@ def make_bridge_source(func_name, call_expr): parses.append(f" if (flags[{i}] === '1') {arg} = JSON.parse({arg});") source = f""" -function fn(serialized_flags, {", ".join(args)}) {{ +function fn(serialized_flags{', ' + ', '.join(args) if args else ''}) {{ const flags = serialized_flags.split(','); {chr(10).join(parses)} const result = ({expression}); @@ -322,7 +325,6 @@ def js_wrapper(row): if isinstance(js_result, quickjs.Object): obj = json.loads(js_result.json()) if isinstance(obj, dict) and obj.get('__type__') == 'date': - import datetime js_result = datetime.datetime.fromisoformat(obj['value']) else: js_result = obj From a96fc5e444f0bef5766b2067132e4f16d460e7b4 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 16 Apr 2026 20:44:16 +0000 Subject: [PATCH 10/11] going to serialize all to make less brittle --- sdks/python/apache_beam/yaml/yaml_mapping.py | 71 +++++++------------- 1 file changed, 23 insertions(+), 48 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 7717f75d7378..1dd7fbb77f5a 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -180,6 +180,12 @@ def _check_mapping_arguments( class _QuickJsCallable: + """A wrapper for QuickJS callables to ensure thread-safety and context reuse. + + QuickJS contexts are not thread-safe. This class uses thread-local storage + to ensure each thread has its own QuickJS context, while reusing it across + multiple calls on the same thread. + """ def __init__(self, source, name=None): self.source = source self.name = name @@ -234,22 +240,11 @@ def _expand_javascript_mapping_func( "Javascript mapping functions require the 'quickjs' package.") def make_bridge_source(func_name, call_expr): - # The bridge function facilitates high-performance data transfer from Python - # to QuickJS by reconstructing the row object in JS. - # To minimize JSON overhead, primitives are passed directly, while complex - # types (lists/dicts) are passed as JSON strings and parsed in JS. - # The 'flags' argument indicates which values need parsing. - keys_json = json.dumps(list(original_fields)) + # The bridge function facilitates data transfer from Python to QuickJS by + # parsing a JSON string representing the row object. return ( - f"function {func_name}(serialized_flags, ...values) {{ " - f" const keys = {keys_json}; " - f" const flags = serialized_flags.split(','); " - f" const row = {{}}; " - f" for (let i = 0; i < keys.length; i++) {{ " - f" let val = values[i]; " - f" if (flags[i] === '1') val = JSON.parse(val); " - f" row[keys[i]] = val; " - f" }} " + f"function {func_name}(row_json) {{ " + f" const row = JSON.parse(row_json); " f" const result = {call_expr}; " f" if (result instanceof Date) " f"return {{__type__: 'date', value: result.toISOString()}}; " @@ -261,26 +256,25 @@ def make_bridge_source(func_name, call_expr): name for name in original_fields if name.isidentifier() and name in expression ] - parses = [] - for i, arg in enumerate(args): - parses.append(f" if (flags[{i}] === '1') {arg} = JSON.parse({arg});") + + row_var_name = "row" + while row_var_name in args: + row_var_name += "_" source = f""" -function fn(serialized_flags{', ' + ', '.join(args) if args else ''}) {{ - const flags = serialized_flags.split(','); -{chr(10).join(parses)} +function fn(row_json) {{ + const {row_var_name} = JSON.parse(row_json); + {chr(10).join([f" const {name} = {row_var_name}.{name};" for name in args])} const result = ({expression}); if (result instanceof Date) return {{__type__: 'date', value: result.toISOString()}}; return result; }} """ js_func = _QuickJsCallable(source, "fn") - used_fields = args elif callable: source = make_bridge_source("fn", f"({callable})(row)") js_func = _QuickJsCallable(source, "fn") - used_fields = None else: if not path.endswith('.js'): @@ -289,41 +283,22 @@ def make_bridge_source(func_name, call_expr): bridge_source = udf_code + "\n" + make_bridge_source( "bridge_fn", f"{name}(row)") js_func = _QuickJsCallable(bridge_source, "bridge_fn") - used_fields = None - - def _prepare_args(vals): - js_vals = [] - flags = [] - for val in vals: - if isinstance(val, (list, dict)): - js_vals.append(json.dumps(val)) - flags.append('1') - else: - js_vals.append(val) - flags.append('0') - return ",".join(flags), js_vals def js_wrapper(row): - # Prepare arguments for the JS function. We optimize performance by - # passing primitives directly and only serializing complex types (lists, - # dicts) to JSON strings. A string of flags ('0' or '1') is passed to - # inform the JS bridge which arguments need to be JSON.parsed. - if expression: - vals = [py_value_to_js_dict(getattr(row, name)) for name in used_fields] - else: - row_as_dict = py_value_to_js_dict(row) - vals = [row_as_dict.get(name) for name in original_fields] - - flags_str, js_vals = _prepare_args(vals) + # Serialize the entire row to JSON to pass to QuickJS. + row_as_dict = py_value_to_js_dict(row) + row_json = json.dumps(row_as_dict) try: - js_result = js_func(flags_str, *js_vals) + js_result = js_func(row_json) except Exception as exn: raise RuntimeError( f"Error evaluating javascript expression: {exn}") from exn if isinstance(js_result, quickjs.Object): + # Use native json() method to transfer complex types from JS to Python. obj = json.loads(js_result.json()) + # Handle special tagged types like Date if isinstance(obj, dict) and obj.get('__type__') == 'date': js_result = datetime.datetime.fromisoformat(obj['value']) else: From 7ad3a237de06cca270dc9966aa83e9040eafb7dd Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 17 Apr 2026 14:50:52 +0000 Subject: [PATCH 11/11] fix datetime issue --- sdks/python/apache_beam/yaml/yaml_mapping.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 1dd7fbb77f5a..567ba8d16acf 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -300,7 +300,10 @@ def js_wrapper(row): obj = json.loads(js_result.json()) # Handle special tagged types like Date if isinstance(obj, dict) and obj.get('__type__') == 'date': - js_result = datetime.datetime.fromisoformat(obj['value']) + val = obj['value'] + if val.endswith('Z'): + val = val[:-1] + '+00:00' + js_result = datetime.datetime.fromisoformat(val) else: js_result = obj