From 4ec7c7a8fce444b187169e2498466c2f589674ed Mon Sep 17 00:00:00 2001 From: carloea2 Date: Tue, 12 May 2026 18:57:37 -0600 Subject: [PATCH 1/3] feat(workflow-operator): add Python UDF UI parameter injection model --- .../texera/amber/core/tuple/Attribute.java | 2 + .../python/PythonUdfUiParameterInjector.scala | 184 +++++++++++++++ .../operator/udf/python/UiUDFParameter.scala | 41 ++++ .../PythonUdfUiParameterInjectorSpec.scala | 217 ++++++++++++++++++ 4 files changed, 444 insertions(+) create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala create mode 100644 common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index 84d52fddced..fb434e08752 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -49,6 +50,7 @@ public Attribute( @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") + @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala new file mode 100644 index 00000000000..f5ce2909d58 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext + +import scala.util.matching.Regex + +object PythonUdfUiParameterInjector { + + private val ReservedHookMethod = "_texera_injected_ui_parameters" + private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY, AttributeType.LARGE_BINARY) + + // Match user-facing UDF classes (the ones users write) + private val SupportedUserClassRegex: Regex = + """(?m)^([ \t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r + + private def validate(uiParameters: List[UiUDFParameter]): Unit = { + uiParameters.foreach { parameter => + if (parameter.attribute == null) { + throw new RuntimeException("UiParameter attribute is required.") + } + + if (UnsupportedUiParameterTypes.contains(parameter.attribute.getType)) { + throw new RuntimeException( + s"UiParameter type '${parameter.attribute.getType.name()}' is not supported. " + + "Use string, integer, long, double, boolean, or timestamp instead." + ) + } + } + + val grouped = uiParameters.groupBy(_.attribute.getName) + grouped.foreach { + case (key, values) => + val typeSet = values.map(_.attribute.getType).toSet + if (typeSet.size > 1) { + throw new RuntimeException( + s"UiParameter key '$key' has multiple types: ${typeSet.map(_.name()).mkString(",")}." + ) + } + } + } + + private def buildInjectedParametersMap( + uiParameters: List[UiUDFParameter] + ): PythonTemplateBuilder = { + val entries = uiParameters.map { parameter => + pyb"${parameter.attribute.getName}: ${parameter.value}" + } + + entries.reduceOption((acc, entry) => acc + pyb", " + entry).getOrElse(pyb"") + } + + private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { + val injectedParametersMap = buildInjectedParametersMap(uiParameters) + + // unindented method; we indent it when inserting into the class body + (pyb"""|@overrides + |def """ + pyb"$ReservedHookMethod" + pyb"""(self) -> Dict[str, Any]: + | return {""" + + injectedParametersMap + + pyb"""} + |""").encode + } + + private def indentBlock(block: String, indent: String): String = { + block + .split("\n", -1) + .map { line => + if (line.nonEmpty) indent + line else line + } + .mkString("\n") + } + + private def lineEndIndex(text: String, from: Int): Int = { + val idx = text.indexOf('\n', from) + if (idx < 0) text.length else idx + } + + private def detectClassBlockEnd(code: String, classHeaderStart: Int, classIndent: String): Int = { + val classLineEnd = lineEndIndex(code, classHeaderStart) + var pos = if (classLineEnd < code.length) classLineEnd + 1 else code.length + + while (pos < code.length) { + val end = lineEndIndex(code, pos) + val line = code.substring(pos, end) + + val trimmed = line.trim + val isBlank = trimmed.isEmpty + + // a top-level (or same/lower-indented) non-blank line ends the class block + val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t') + val classIndentLen = classIndent.length + + if (!isBlank && currentIndentLen <= classIndentLen) { + return pos + } + + pos = if (end < code.length) end + 1 else code.length + } + + code.length + } + + private def containsReservedHook(classBlock: String): Boolean = { + val hookRegex = ("""(?m)^[ \t]+def\s+""" + Regex.quote(ReservedHookMethod) + """\s*\(""").r + hookRegex.findFirstIn(classBlock).isDefined + } + + private def injectHookIntoUserClass(encodedUserCode: String, hookMethod: String): String = { + val m = SupportedUserClassRegex.findFirstMatchIn(encodedUserCode).getOrElse { + return encodedUserCode + } + + val classHeaderStart = m.start + val classIndent = m.group(1) + val classBlockEnd = detectClassBlockEnd(encodedUserCode, classHeaderStart, classIndent) + + val classBlock = encodedUserCode.substring(classHeaderStart, classBlockEnd) + + if (containsReservedHook(classBlock)) { + throw new RuntimeException( + s"Reserved method '$ReservedHookMethod' is already defined in the UDF class. Please rename your method." + ) + } + + val bodyIndent = inferClassBodyIndent(classBlock, classIndent).getOrElse(classIndent + " ") + val indentedHook = indentBlock( + (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n", + bodyIndent + ) + + encodedUserCode.substring(0, classBlockEnd) + + indentedHook + + encodedUserCode.substring(classBlockEnd) + } + + private def inferClassBodyIndent(classBlock: String, classIndent: String): Option[String] = { + val lines = classBlock.split("\n", -1).toList.drop(1) // skip class header line + + lines.collectFirst { + case line if line.trim.nonEmpty => + val leading = line.takeWhile(ch => ch == ' ' || ch == '\t') + if (leading.length > classIndent.length) leading else classIndent + " " + } + } + + def inject(code: String, uiParameters: List[UiUDFParameter]): String = { + val params = Option(uiParameters).getOrElse(List.empty) + validate(params) + + // Let pyb encode the user's source normally + val encodedUserCode = pyb"$code".encode + + // If there are no UI params, return unchanged code (no hook injection needed) + if (params.isEmpty) { + return encodedUserCode + } + + // Build encoded hook method (contains self.decode_python_template(...)) + val hookMethod = buildInjectedHookMethod(params) + + // Inject hook into the UDF class body; Python base class will auto-call it before open() + injectHookIntoUserClass(encodedUserCode, hookMethod) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala new file mode 100644 index 00000000000..71ce2596788 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.tuple.Attribute +import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString + +import javax.validation.Valid +import javax.validation.constraints.NotNull + +class UiUDFParameter { + + @JsonProperty(required = true) + @JsonSchemaTitle("Attribute") + @Valid + @NotNull(message = "Attribute is required") + var attribute: Attribute = _ + + @JsonProperty() + @JsonSchemaTitle("Value") + var value: EncodableString = "" +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala new file mode 100644 index 00000000000..bd37cbc6af7 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { + + private def createParameter( + key: String, + attributeType: AttributeType, + value: String + ): UiUDFParameter = { + val parameter = new UiUDFParameter + parameter.attribute = new Attribute(key, attributeType) + parameter.value = value + parameter + } + + private val baseUdfCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + |""".stripMargin + + it should "return encoded user code unchanged when there are no ui parameters" in { + val injectedCode = PythonUdfUiParameterInjector.inject(baseUdfCode, Nil) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("""print("open")""") + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should not include ("self.decode_python_template") + injectedCode should not include ("import typing") + } + + it should "inject ui parameter hook into supported UDF class using Dict and Any from pytexera" in { + val injectedCode = PythonUdfUiParameterInjector.inject( + baseUdfCode, + List( + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + ) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("return {") + injectedCode should include("self.decode_python_template") + injectedCode should include("""print("open")""") + injectedCode should not include ("import typing") + injectedCode should not include ("typing.Dict") + injectedCode should not include ("typing.Any") + } + + it should "append the reserved hook inside the class before the next top-level statement" in { + val udfCodeWithSiblingDefinition = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + | + |def helper(): + | return "outside" + |""".stripMargin + + val injectedCode = PythonUdfUiParameterInjector.inject( + udfCodeWithSiblingDefinition, + List(createParameter("k", AttributeType.STRING, "v")) + ) + + val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self)") + val processTupleIndex = + injectedCode.indexOf("def process_tuple(self, tuple_: Tuple, port: int):") + val helperIndex = injectedCode.indexOf("def helper():") + + hookIndex should be >= 0 + processTupleIndex should be < hookIndex + helperIndex should be > hookIndex + } + + it should "preserve multiple ui parameters in the injected map" in { + val injectedCode = PythonUdfUiParameterInjector.inject( + baseUdfCode, + List( + createParameter("param1", AttributeType.DOUBLE, "12.5"), + createParameter("param2", AttributeType.INTEGER, "1"), + createParameter("param3", AttributeType.STRING, "Hola"), + createParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") + ) + ) + + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("self.decode_python_template") + injectedCode.count(_ == ':') should be > 0 + injectedCode should not include ("import typing") + } + + it should "throw when a parameter attribute is missing" in { + val invalidParameter = new UiUDFParameter + invalidParameter.attribute = null + invalidParameter.value = "anything" + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, List(invalidParameter)) + } + + exception.getMessage should include("UiParameter attribute is required") + } + + it should "throw when a key is declared with conflicting attribute types" in { + val conflictingParameters = List( + createParameter("date", AttributeType.STRING, "2024-01-01"), + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, conflictingParameters) + } + + exception.getMessage should include("UiParameter key 'date' has multiple types") + } + + it should "throw when a ui parameter uses a binary type" in { + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject( + baseUdfCode, + List(createParameter("payload", AttributeType.BINARY, "68656c6c6f")) + ) + } + + exception.getMessage should include("UiParameter type 'BINARY' is not supported") + } + + it should "allow duplicate keys when the attribute type is the same" in { + val sameTypeParameters = List( + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01"), + createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + + noException should be thrownBy { + PythonUdfUiParameterInjector.inject(baseUdfCode, sameTypeParameters) + } + } + + it should "throw when the reserved hook is already defined by the user" in { + val udfWithReservedHook = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def _texera_injected_ui_parameters(self): + | return {} + | + | def open(self): + | pass + |""".stripMargin + + val exception = the[RuntimeException] thrownBy { + PythonUdfUiParameterInjector.inject( + udfWithReservedHook, + List(createParameter("k", AttributeType.STRING, "v")) + ) + } + + exception.getMessage should include( + "Reserved method '_texera_injected_ui_parameters' is already defined" + ) + } + + it should "leave code unchanged when no supported user class is present" in { + val nonSupportedCode = + """from pytexera import * + | + |class SomethingElse: + | def open(self): + | pass + |""".stripMargin + + val injectedCode = PythonUdfUiParameterInjector.inject( + nonSupportedCode, + List(createParameter("k", AttributeType.STRING, "v")) + ) + + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should include("class SomethingElse:") + injectedCode should not include ("import typing") + } +} From f3865223dac920f84c35ace1de52017c8481d70f Mon Sep 17 00:00:00 2001 From: carloea2 Date: Thu, 21 May 2026 02:58:03 -0600 Subject: [PATCH 2/3] refactor(workflow-operator): tighten UI parameter injector --- .../texera/amber/core/tuple/Attribute.java | 2 - .../python/PythonUdfUiParameterInjector.scala | 125 ++++++++++-------- .../operator/udf/python/UiUDFParameter.scala | 6 + .../PythonUdfUiParameterInjectorSpec.scala | 102 ++++++-------- 4 files changed, 118 insertions(+), 117 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index fb434e08752..84d52fddced 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -50,7 +49,6 @@ public Attribute( @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") - @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala index f5ce2909d58..69fc4e1289c 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -18,64 +18,77 @@ */ package org.apache.texera.amber.operator.udf.python -import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.apache.texera.amber.pybuilder.PyStringTypes.{EncodableString, EncodableStringFactory} import org.apache.texera.amber.pybuilder.PythonTemplateBuilder import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext import scala.util.matching.Regex +/** + * Injects the reserved UI-parameter hook into user-written Python UDF code. + * + * Operator descriptors should call this after loading saved [[UiUDFParameter]] values and before sending Python source + * to runtime execution. The injected hook returns decoded parameter names and values that Python runtime support reads + * before the user's `open()` method runs. + */ object PythonUdfUiParameterInjector { - private val ReservedHookMethod = "_texera_injected_ui_parameters" + private val InjectedUiParametersHookMethodName = "_texera_injected_ui_parameters" + private val InjectedUiParametersHookMethodHeader = + s"def $InjectedUiParametersHookMethodName(self) -> Dict[str, Any]:" private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY, AttributeType.LARGE_BINARY) - // Match user-facing UDF classes (the ones users write) - private val SupportedUserClassRegex: Regex = + // Keep supported user-facing UDF class names in sync with the frontend parser. + private val SupportedPythonUdfClassHeaderRegex: Regex = """(?m)^([ \t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r private def validate(uiParameters: List[UiUDFParameter]): Unit = { - uiParameters.foreach { parameter => - if (parameter.attribute == null) { - throw new RuntimeException("UiParameter attribute is required.") - } + val attributes = uiParameters.map(parameterAttribute) + attributes.foreach(validateSupportedType) - if (UnsupportedUiParameterTypes.contains(parameter.attribute.getType)) { - throw new RuntimeException( - s"UiParameter type '${parameter.attribute.getType.name()}' is not supported. " + - "Use string, integer, long, double, boolean, or timestamp instead." - ) + attributes + .groupBy(_.getName) + .collectFirst { + case (parameterName, matchingAttributes) if matchingAttributes.size > 1 => parameterName + } + .foreach { duplicateName => + throw new RuntimeException(s"UiParameter name '$duplicateName' is declared more than once.") } + } + + private def parameterAttribute(parameter: UiUDFParameter): Attribute = + Option(parameter).flatMap(parameter => Option(parameter.attribute)).getOrElse { + throw new RuntimeException("UiParameter attribute is required.") } - val grouped = uiParameters.groupBy(_.attribute.getName) - grouped.foreach { - case (key, values) => - val typeSet = values.map(_.attribute.getType).toSet - if (typeSet.size > 1) { - throw new RuntimeException( - s"UiParameter key '$key' has multiple types: ${typeSet.map(_.name()).mkString(",")}." - ) - } + private def validateSupportedType(attribute: Attribute): Unit = { + if (UnsupportedUiParameterTypes.contains(attribute.getType)) { + throw new RuntimeException( + s"UiParameter type '${attribute.getType.name()}' is not supported. " + + "Use string, integer, long, double, boolean, or timestamp instead." + ) } } + private def buildInjectedParameterEntry(parameter: UiUDFParameter): PythonTemplateBuilder = { + val parameterName: EncodableString = EncodableStringFactory(parameter.attribute.getName) + pyb"$parameterName: ${parameter.value}" + } + private def buildInjectedParametersMap( uiParameters: List[UiUDFParameter] ): PythonTemplateBuilder = { - val entries = uiParameters.map { parameter => - pyb"${parameter.attribute.getName}: ${parameter.value}" - } - + val entries = uiParameters.map(buildInjectedParameterEntry) entries.reduceOption((acc, entry) => acc + pyb", " + entry).getOrElse(pyb"") } private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { val injectedParametersMap = buildInjectedParametersMap(uiParameters) - // unindented method; we indent it when inserting into the class body (pyb"""|@overrides - |def """ + pyb"$ReservedHookMethod" + pyb"""(self) -> Dict[str, Any]: - | return {""" + + |$InjectedUiParametersHookMethodHeader + | return {""" + injectedParametersMap + pyb"""} |""").encode @@ -91,54 +104,55 @@ object PythonUdfUiParameterInjector { } private def lineEndIndex(text: String, from: Int): Int = { - val idx = text.indexOf('\n', from) - if (idx < 0) text.length else idx + val lineEnd = text.indexOf('\n', from) + if (lineEnd < 0) text.length else lineEnd } private def detectClassBlockEnd(code: String, classHeaderStart: Int, classIndent: String): Int = { val classLineEnd = lineEndIndex(code, classHeaderStart) - var pos = if (classLineEnd < code.length) classLineEnd + 1 else code.length + var lineStart = if (classLineEnd < code.length) classLineEnd + 1 else code.length - while (pos < code.length) { - val end = lineEndIndex(code, pos) - val line = code.substring(pos, end) + while (lineStart < code.length) { + val lineEnd = lineEndIndex(code, lineStart) + val line = code.substring(lineStart, lineEnd) val trimmed = line.trim val isBlank = trimmed.isEmpty - // a top-level (or same/lower-indented) non-blank line ends the class block val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t') val classIndentLen = classIndent.length if (!isBlank && currentIndentLen <= classIndentLen) { - return pos + return lineStart } - pos = if (end < code.length) end + 1 else code.length + lineStart = if (lineEnd < code.length) lineEnd + 1 else code.length } code.length } private def containsReservedHook(classBlock: String): Boolean = { - val hookRegex = ("""(?m)^[ \t]+def\s+""" + Regex.quote(ReservedHookMethod) + """\s*\(""").r + val hookRegex = + ("""(?m)^[ \t]+def\s+""" + Regex.quote(InjectedUiParametersHookMethodName) + """\s*\(""").r hookRegex.findFirstIn(classBlock).isDefined } private def injectHookIntoUserClass(encodedUserCode: String, hookMethod: String): String = { - val m = SupportedUserClassRegex.findFirstMatchIn(encodedUserCode).getOrElse { - return encodedUserCode - } + val classHeaderMatch = + SupportedPythonUdfClassHeaderRegex.findFirstMatchIn(encodedUserCode).getOrElse { + return encodedUserCode + } - val classHeaderStart = m.start - val classIndent = m.group(1) + val classHeaderStart = classHeaderMatch.start + val classIndent = classHeaderMatch.group(1) val classBlockEnd = detectClassBlockEnd(encodedUserCode, classHeaderStart, classIndent) val classBlock = encodedUserCode.substring(classHeaderStart, classBlockEnd) if (containsReservedHook(classBlock)) { throw new RuntimeException( - s"Reserved method '$ReservedHookMethod' is already defined in the UDF class. Please rename your method." + s"Reserved method '$InjectedUiParametersHookMethodName' is already defined in the UDF class. Please rename your method." ) } @@ -154,7 +168,7 @@ object PythonUdfUiParameterInjector { } private def inferClassBodyIndent(classBlock: String, classIndent: String): Option[String] = { - val lines = classBlock.split("\n", -1).toList.drop(1) // skip class header line + val lines = classBlock.split("\n", -1).toList.drop(1) lines.collectFirst { case line if line.trim.nonEmpty => @@ -163,22 +177,23 @@ object PythonUdfUiParameterInjector { } } + /** + * Returns Python code with the UI-parameter hook injected into the supported UDF class. + * + * If `uiParameters` is empty, the code is only passed through normal Python-template encoding. Throws + * [[RuntimeException]] when parameter metadata is invalid or the user already defines the reserved hook method. + */ def inject(code: String, uiParameters: List[UiUDFParameter]): String = { - val params = Option(uiParameters).getOrElse(List.empty) - validate(params) + val parameters = Option(uiParameters).getOrElse(List.empty) + validate(parameters) - // Let pyb encode the user's source normally val encodedUserCode = pyb"$code".encode - // If there are no UI params, return unchanged code (no hook injection needed) - if (params.isEmpty) { + if (parameters.isEmpty) { return encodedUserCode } - // Build encoded hook method (contains self.decode_python_template(...)) - val hookMethod = buildInjectedHookMethod(params) - - // Inject hook into the UDF class body; Python base class will auto-call it before open() + val hookMethod = buildInjectedHookMethod(parameters) injectHookIntoUserClass(encodedUserCode, hookMethod) } } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala index 71ce2596788..b18b9a181d7 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala @@ -27,6 +27,12 @@ import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString import javax.validation.Valid import javax.validation.constraints.NotNull +/** + * Serialized operator property for one Python UDF UI parameter. + * + * `attribute` carries the inferred parameter name and type. `value` is user-entered text and is marked as + * [[EncodableString]] so Python code generation decodes it at runtime instead of embedding raw text into generated code. + */ class UiUDFParameter { @JsonProperty(required = true) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala index bd37cbc6af7..3d0fb824075 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -24,17 +24,26 @@ import org.scalatest.matchers.should.Matchers class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { - private def createParameter( - key: String, + private def uiParameter( + attributeName: String, attributeType: AttributeType, value: String ): UiUDFParameter = { val parameter = new UiUDFParameter - parameter.attribute = new Attribute(key, attributeType) + parameter.attribute = new Attribute(attributeName, attributeType) parameter.value = value parameter } + private def inject(parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(baseUdfCode, parameters.toList) + + private def inject(code: String, parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(code, parameters.toList) + + private def decoderCallCount(code: String): Int = + code.sliding("self.decode_python_template".length).count(_ == "self.decode_python_template") + private val baseUdfCode: String = """from pytexera import * | @@ -48,8 +57,8 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { | yield tuple_ |""".stripMargin - it should "return encoded user code unchanged when there are no ui parameters" in { - val injectedCode = PythonUdfUiParameterInjector.inject(baseUdfCode, Nil) + it should "return encoded user code unchanged when there are no UI parameters" in { + val injectedCode = inject() injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") injectedCode should include("""print("open")""") @@ -58,18 +67,14 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { injectedCode should not include ("import typing") } - it should "inject ui parameter hook into supported UDF class using Dict and Any from pytexera" in { - val injectedCode = PythonUdfUiParameterInjector.inject( - baseUdfCode, - List( - createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") - ) - ) + it should "inject UI parameter hook into supported UDF class using Dict and Any from pytexera" in { + val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")) injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") injectedCode should include("return {") injectedCode should include("self.decode_python_template") + decoderCallCount(injectedCode) shouldBe 2 injectedCode should include("""print("open")""") injectedCode should not include ("import typing") injectedCode should not include ("typing.Dict") @@ -93,10 +98,8 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { | return "outside" |""".stripMargin - val injectedCode = PythonUdfUiParameterInjector.inject( - udfCodeWithSiblingDefinition, - List(createParameter("k", AttributeType.STRING, "v")) - ) + val injectedCode = + inject(udfCodeWithSiblingDefinition, uiParameter("k", AttributeType.STRING, "v")) val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self)") val processTupleIndex = @@ -108,20 +111,17 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { helperIndex should be > hookIndex } - it should "preserve multiple ui parameters in the injected map" in { - val injectedCode = PythonUdfUiParameterInjector.inject( - baseUdfCode, - List( - createParameter("param1", AttributeType.DOUBLE, "12.5"), - createParameter("param2", AttributeType.INTEGER, "1"), - createParameter("param3", AttributeType.STRING, "Hola"), - createParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") - ) + it should "preserve multiple UI parameters in the injected map" in { + val injectedCode = inject( + uiParameter("param1", AttributeType.DOUBLE, "12.5"), + uiParameter("param2", AttributeType.INTEGER, "1"), + uiParameter("param3", AttributeType.STRING, "Hola"), + uiParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") ) injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") injectedCode should include("self.decode_python_template") - injectedCode.count(_ == ':') should be > 0 + decoderCallCount(injectedCode) shouldBe 8 injectedCode should not include ("import typing") } @@ -131,44 +131,32 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { invalidParameter.value = "anything" val exception = the[RuntimeException] thrownBy { - PythonUdfUiParameterInjector.inject(baseUdfCode, List(invalidParameter)) + inject(invalidParameter) } exception.getMessage should include("UiParameter attribute is required") } - it should "throw when a key is declared with conflicting attribute types" in { - val conflictingParameters = List( - createParameter("date", AttributeType.STRING, "2024-01-01"), - createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") - ) - - val exception = the[RuntimeException] thrownBy { - PythonUdfUiParameterInjector.inject(baseUdfCode, conflictingParameters) - } - - exception.getMessage should include("UiParameter key 'date' has multiple types") - } - - it should "throw when a ui parameter uses a binary type" in { + it should "throw when a UI parameter name is duplicated" in { val exception = the[RuntimeException] thrownBy { - PythonUdfUiParameterInjector.inject( - baseUdfCode, - List(createParameter("payload", AttributeType.BINARY, "68656c6c6f")) + inject( + uiParameter("date", AttributeType.STRING, "2024-01-01"), + uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") ) } - exception.getMessage should include("UiParameter type 'BINARY' is not supported") + exception.getMessage should include("UiParameter name 'date' is declared more than once") } - it should "allow duplicate keys when the attribute type is the same" in { - val sameTypeParameters = List( - createParameter("date", AttributeType.TIMESTAMP, "2024-01-01"), - createParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") - ) + Seq(AttributeType.BINARY, AttributeType.LARGE_BINARY).foreach { unsupportedType => + it should s"throw when a UI parameter uses ${unsupportedType.name()} type" in { + val exception = the[RuntimeException] thrownBy { + inject(uiParameter("payload", unsupportedType, "68656c6c6f")) + } - noException should be thrownBy { - PythonUdfUiParameterInjector.inject(baseUdfCode, sameTypeParameters) + exception.getMessage should include( + s"UiParameter type '${unsupportedType.name()}' is not supported" + ) } } @@ -185,10 +173,7 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { |""".stripMargin val exception = the[RuntimeException] thrownBy { - PythonUdfUiParameterInjector.inject( - udfWithReservedHook, - List(createParameter("k", AttributeType.STRING, "v")) - ) + inject(udfWithReservedHook, uiParameter("k", AttributeType.STRING, "v")) } exception.getMessage should include( @@ -205,10 +190,7 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { | pass |""".stripMargin - val injectedCode = PythonUdfUiParameterInjector.inject( - nonSupportedCode, - List(createParameter("k", AttributeType.STRING, "v")) - ) + val injectedCode = inject(nonSupportedCode, uiParameter("k", AttributeType.STRING, "v")) injectedCode should not include ("_texera_injected_ui_parameters") injectedCode should include("class SomethingElse:") From 4652aac053941299724c598cae9353311f1d3ce7 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Thu, 21 May 2026 03:01:52 -0600 Subject: [PATCH 3/3] fix(workflow-operator): restore attribute name encoding --- .../scala/org/apache/texera/amber/core/tuple/Attribute.java | 2 ++ .../operator/udf/python/PythonUdfUiParameterInjector.scala | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index 84d52fddced..fb434e08752 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -49,6 +50,7 @@ public Attribute( @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") + @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala index 69fc4e1289c..70836cbeda4 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -19,7 +19,6 @@ package org.apache.texera.amber.operator.udf.python import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} -import org.apache.texera.amber.pybuilder.PyStringTypes.{EncodableString, EncodableStringFactory} import org.apache.texera.amber.pybuilder.PythonTemplateBuilder import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext @@ -72,8 +71,7 @@ object PythonUdfUiParameterInjector { } private def buildInjectedParameterEntry(parameter: UiUDFParameter): PythonTemplateBuilder = { - val parameterName: EncodableString = EncodableStringFactory(parameter.attribute.getName) - pyb"$parameterName: ${parameter.value}" + pyb"${parameter.attribute.getName}: ${parameter.value}" } private def buildInjectedParametersMap(