diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java index 7be2e5195a..052f6e60ee 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java @@ -49,6 +49,7 @@ import com.google.monitoring.v3.Aggregation.Aligner; import com.google.monitoring.v3.TimeInterval; import com.google.protobuf.Timestamp; +import com.google.spanner.admin.instance.v1.Instance.Edition; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import java.time.Duration; @@ -193,6 +194,7 @@ private synchronized void maybeCreateInstance() { InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId)) .setInstanceConfigId(InstanceConfigId.of(projectId, "regional-" + region)) .setDisplayName(instanceId) + .setEdition(Edition.ENTERPRISE_PLUS) // Needed by Full Text Search. .setNodeCount(nodeCount) .build(); diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java index cf6ae4fda6..561b15155a 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java @@ -52,6 +52,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_DEFINITION; +import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_LANGUAGE; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_NAME; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_PARAMETER; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_SECURITY; @@ -167,7 +168,9 @@ public Udf toUdf(String functionSpecificName, Schema schema) { .specificName(functionSpecificName) .name(schema.getProp(SPANNER_UDF_NAME)) .type(schema.getProp(SPANNER_UDF_TYPE)) - .definition(schema.getProp(SPANNER_UDF_DEFINITION)); + .language(schema.getProp(SPANNER_UDF_LANGUAGE)) + .definition(schema.getProp(SPANNER_UDF_DEFINITION)) + .options(toOptionsList(schema)); if (schema.getProp(SPANNER_UDF_SECURITY) != null) { builder.security(Udf.SqlSecurity.valueOf(schema.getProp(SPANNER_UDF_SECURITY))); } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java index 7ba212247b..4c3e6447a7 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java @@ -55,6 +55,7 @@ private AvroUtil() {} public static final String SPANNER_UDF = "spannerUdf"; public static final String SPANNER_UDF_NAME = "spannerUdfName"; public static final String SPANNER_UDF_TYPE = "spannerUdfType"; + public static final String SPANNER_UDF_LANGUAGE = "spannerUdfLanguage"; public static final String SPANNER_UDF_DEFINITION = "spannerUdfDefinition"; public static final String SPANNER_UDF_SECURITY = "spannerUdfSecurity"; public static final String SPANNER_UDF_PARAMETER = "spannerUdfParameter_"; diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java index c2c4b92433..a001fce20e 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java @@ -55,6 +55,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_DEFINITION; +import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_LANGUAGE; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_NAME; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_PARAMETER; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_SECURITY; @@ -134,10 +135,15 @@ public Collection convert(Ddl ddl) { // Indicate that this is a "CREATE FUNCTION", not a table or a view. recordBuilder.prop(SPANNER_ENTITY, SPANNER_UDF); recordBuilder.prop(SPANNER_UDF_NAME, udf.name()); - recordBuilder.prop(SPANNER_UDF_DEFINITION, udf.definition()); + if (udf.definition() != null) { + recordBuilder.prop(SPANNER_UDF_DEFINITION, udf.definition()); + } if (udf.type() != null) { recordBuilder.prop(SPANNER_UDF_TYPE, udf.type()); } + if (udf.language() != null) { + recordBuilder.prop(SPANNER_UDF_LANGUAGE, udf.language()); + } if (udf.security() != null) { recordBuilder.prop(SPANNER_UDF_SECURITY, udf.security().toString()); } @@ -145,6 +151,10 @@ public Collection convert(Ddl ddl) { for (UdfParameter udfParameter : udf.parameters()) { recordBuilder.prop(SPANNER_UDF_PARAMETER + i++, udfParameter.prettyPrint()); } + for (int j = 0; j < udf.options().size(); j++) { + recordBuilder.prop(SPANNER_OPTION + j, udf.options().get(j)); + } + schemas.add(recordBuilder.fields().endRecord()); } for (Table table : ddl.allTables()) { diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java index b35c3cde61..ebfa67bb93 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java @@ -90,6 +90,7 @@ public Ddl scan() { if (isUdfSupported()) { listUdfs(builder); listUdfParameters(builder); + listUdfOptions(builder); } listColumns(builder); listColumnOptions(builder); @@ -1032,13 +1033,20 @@ private void listUdfs(Ddl.Builder builder) { case GOOGLE_STANDARD_SQL: queryStatement = Statement.of( - "SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name, " - + "r.data_type, r.routine_definition, r.security_type" + "SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name," + + " r.data_type, r.routine_body, r.routine_definition, r.security_type" + " FROM information_schema.routines AS r" - + " WHERE r.routine_schema NOT IN" - + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" - + " AND r.routine_type = 'FUNCTION'" - + " AND r.routine_body = 'SQL'"); + + " WHERE r.routine_schema NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS')" + + " AND r.routine_type = 'FUNCTION'"); + break; + case POSTGRESQL: + queryStatement = + Statement.of( + "SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name," + + " r.data_type, r.routine_body, r.routine_definition, r.security_type" + + " FROM information_schema.routines AS r WHERE" + + " r.routine_schema NOT IN ('information_schema', 'spanner_sys', 'pg_catalog')" + + " AND r.routine_type = 'FUNCTION'"); break; default: throw new IllegalArgumentException( @@ -1048,20 +1056,28 @@ private void listUdfs(Ddl.Builder builder) { ResultSet resultSet = context.executeQuery(queryStatement); while (resultSet.next()) { + String schema = resultSet.isNull(0) ? null : resultSet.getString(0); String functionName = - resultSet.isNull(0) || resultSet.isNull(1) - ? null - : getQualifiedName(resultSet.getString(0), resultSet.getString(1)); + resultSet.isNull(1) ? null : getQualifiedName(schema, resultSet.getString(1)); String functionSpecificName = getQualifiedName(resultSet.getString(2), resultSet.getString(3)); String functionType = resultSet.isNull(4) ? null : resultSet.getString(4); - String functionDefinition = resultSet.isNull(5) ? null : resultSet.getString(5); - String functionSecurityType = resultSet.isNull(6) ? null : resultSet.getString(6); + String language = resultSet.isNull(5) ? null : resultSet.getString(5); + String functionDefinition = resultSet.isNull(6) ? null : resultSet.getString(6); + String functionSecurityType = resultSet.isNull(7) ? null : resultSet.getString(7); + + // Built-in functions such as Change Stream READ_X are marked as External. + // Skip and do not re-create they will be autmatically added by change streams. + if ("EXTERNAL".equalsIgnoreCase(language)) { + continue; + } + LOG.debug("Schema user-defined function {}", functionName); builder .createUdf(functionSpecificName) .name(functionName) .type(functionType) + .language(language) .definition(functionDefinition) .security(Udf.SqlSecurity.valueOf(functionSecurityType)) .endUdf(); @@ -1095,16 +1111,64 @@ private void listUdfParameters(Ddl.Builder builder) { } } + private void listUdfOptions(Ddl.Builder builder) { + // PostgreSQL doesn't have ROUTINE_OPTIONS table. It uses AS DEFINITION for options. + if (dialect == Dialect.POSTGRESQL) { + return; + } + // Filter out EXTERNAL functions, which are built-in. + ResultSet resultSet = + context.executeQuery( + Statement.of( + "SELECT o.SPECIFIC_SCHEMA, o.SPECIFIC_NAME, o.OPTION_NAME, o.OPTION_TYPE," + + " o.OPTION_VALUE" + + " FROM information_schema.routine_options AS o" + + " INNER JOIN information_schema.routines AS r" + + " USING (SPECIFIC_SCHEMA, SPECIFIC_NAME)" + + " WHERE o.SPECIFIC_SCHEMA NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS') " + + " AND r.routine_body != 'EXTERNAL'" + + " ORDER BY o.SPECIFIC_NAME, o.OPTION_NAME")); + + Map> allOptions = Maps.newHashMap(); + while (resultSet.next()) { + String specificName = getQualifiedName(resultSet.getString(0), resultSet.getString(1)); + String optionName = resultSet.isNull(2) ? "" : resultSet.getString(2); + String optionType = resultSet.isNull(3) ? "" : resultSet.getString(3); + String optionValue = resultSet.isNull(4) ? "" : resultSet.getString(4); + + ImmutableList.Builder options = + allOptions.computeIfAbsent(specificName, k -> ImmutableList.builder()); + + if (optionType.equalsIgnoreCase("STRING")) { + options.add( + optionName + + "=" + + GSQL_LITERAL_QUOTE + + OPTION_STRING_ESCAPER.escape(optionValue) + + GSQL_LITERAL_QUOTE); + } else { + options.add(optionName + "=" + optionValue); + } + } + + for (Map.Entry> entry : allOptions.entrySet()) { + String specificName = entry.getKey(); + ImmutableList options = entry.getValue().build(); + builder.createUdf(specificName).options(options).endUdf(); + } + } + @VisibleForTesting Statement listFunctionParametersSQL() { switch (dialect) { case GOOGLE_STANDARD_SQL: return Statement.of( "SELECT p.specific_schema, p.specific_name, p.parameter_name, p.data_type," - + " p.parameter_default FROM information_schema.parameters AS p, information_schema.routines AS r" - + " WHERE p.specific_schema NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS') and p.specific_name =" - + " r.specific_name and r.routine_type = 'FUNCTION' and r.routine_body = 'SQL' ORDER BY p.specific_schema," - + " p.specific_name, p.ordinal_position"); + + " p.parameter_default FROM information_schema.parameters AS p," + + " information_schema.routines AS r WHERE p.specific_schema NOT IN" + + " ('INFORMATION_SCHEMA', 'SPANNER_SYS') and p.specific_name = r.specific_name and" + + " r.routine_type = 'FUNCTION' ORDER BY" + + " p.specific_schema, p.specific_name, p.ordinal_position"); default: throw new IllegalArgumentException("Unrecognized dialect: " + dialect); diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Udf.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Udf.java index e4a91653c3..9ab5caac2b 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Udf.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Udf.java @@ -21,6 +21,8 @@ import com.google.cloud.spanner.Dialect; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.google.common.escape.Escaper; +import com.google.common.escape.Escapers; import java.io.IOException; import java.io.Serializable; import java.util.LinkedHashMap; @@ -32,6 +34,11 @@ public abstract class Udf implements Serializable { private static final long serialVersionUID = 1L; + // Remote UDF definition is printed using: AS '{definition}'. + // Quotes inside definiton string can be escaped using ''. + public static final Escaper PG_DEFINITION_ESCAPER = + Escapers.builder().addEscape('\'', "''").build(); + /** The access rights used by the UDF for underlying data: invoker-rights or definer-rights. */ public enum SqlSecurity { INVOKER, @@ -57,11 +64,16 @@ public enum SqlSecurity { @Nullable public abstract String definition(); + @Nullable + public abstract String language(); + @Nullable public abstract SqlSecurity security(); public abstract ImmutableList parameters(); + public abstract ImmutableList options(); + public void prettyPrint(Appendable appendable) throws IOException { appendable.append("CREATE FUNCTION ").append(quoteIdentifier(name(), dialect())); appendable.append("("); @@ -77,14 +89,53 @@ public void prettyPrint(Appendable appendable) throws IOException { if (type() != null) { appendable.append(" RETURNS ").append(type()); } - SqlSecurity rights = security(); - if (rights != null) { - appendable.append(" SQL SECURITY ").append(rights.toString()); + + // Determinism should be added to INFORMATION_SCHEMA.ROUTINES. + // For now, we infer it from the language. + if (language() != null && language().equalsIgnoreCase("REMOTE")) { + String determinism; + if (dialect() == Dialect.GOOGLE_STANDARD_SQL) { + determinism = "NOT DETERMINISTIC"; + } else { + determinism = "VOLATILE"; + } + appendable.append(" ").append(determinism); } - if (definition() != null) { - appendable.append(" AS ("); - appendable.append(definition()); - appendable.append(")"); + + if (language() != null && !language().isEmpty()) { + // GSQL does not accept LANGUAGE SQL even though it reports it. + if (dialect() != Dialect.GOOGLE_STANDARD_SQL || !language().equalsIgnoreCase("SQL")) { + appendable.append(" LANGUAGE ").append(language()); + } + } + + if (security() != null) { + // Remote UDF don't use SQL SECURITY, but it is marked NOT NULL in IS. + if (!"REMOTE".equalsIgnoreCase(language())) { + appendable.append(" SQL SECURITY ").append(security().toString()); + } + } + + if (!options().isEmpty()) { + if (dialect() == Dialect.GOOGLE_STANDARD_SQL) { + appendable.append(" OPTIONS (").append(String.join(", ", options())).append(")"); + } else { + throw new IllegalArgumentException( + "Options are not supported in PostgreSQL dialect for non-remote UDFs."); + } + } + + if (definition() != null && !definition().isEmpty()) { + if (dialect() == Dialect.GOOGLE_STANDARD_SQL) { + appendable.append(" AS (").append(definition()).append(")"); + } else { + if (language() == null || language().isEmpty() || "SQL".equalsIgnoreCase(language())) { + appendable.append(" RETURN ").append(definition()); + } else { + // Other languages use AS definition instead of sql body. + appendable.append(" AS '").append(PG_DEFINITION_ESCAPER.escape(definition())).append("'"); + } + } } } @@ -113,6 +164,10 @@ public Builder toBuilder() { if (type() != null) { builder.type(type()); } + if (language() != null) { + builder.language(language()); + } + builder.options(options()); if (definition() != null) { builder.definition(definition()); } @@ -126,7 +181,10 @@ public Builder toBuilder() { } public static Builder builder(Dialect dialect) { - return new AutoValue_Udf.Builder().dialect(dialect).parameters(ImmutableList.of()); + return new AutoValue_Udf.Builder() + .dialect(dialect) + .parameters(ImmutableList.of()) + .options(ImmutableList.of()); } public static Builder builder() { @@ -165,10 +223,18 @@ public Builder ddlBuilder(Ddl.Builder ddlBuilder) { public abstract String definition(); + public abstract Builder language(String language); + + public abstract String language(); + public abstract Builder security(SqlSecurity rights); public abstract SqlSecurity security(); + public abstract Builder options(ImmutableList options); + + public abstract ImmutableList options(); + public abstract Builder parameters(ImmutableList parameters); public ImmutableList parameters() { @@ -208,7 +274,9 @@ public Udf build() { .dialect(dialect()) .type(type()) .definition(definition()) + .language(language()) .security(security()) + .options(options()) .parameters(ImmutableList.copyOf(parameters())) .autoBuild(); } diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java index 912e54ebe4..2885836762 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java @@ -857,7 +857,7 @@ public void udfSimple() { } @Test - public void udfAllOptions() { + public void udfSqlAllOptions() { String avroString = "{" + " \"type\" : \"record\"," @@ -887,6 +887,104 @@ public void udfAllOptions() { + " RETURNS STRING SQL SECURITY INVOKER AS (SELECT 1)")); } + @Test + public void pgUdfSqlAllOptions() { + String avroString = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"spanner.Foo\"," + + " \"spannerEntity\" : \"spannerUdf\", " + + " \"fields\" : []," + + " \"namespace\" : \"spannertest\"," + + " \"googleStorage\" : \"CloudSpanner\"," + + " \"googleFormatVersion\" : \"booleans\"," + + " \"spannerUdfName\" : \"Foo\"," + + " \"spannerUdfType\" : \"TEXT\"," + + " \"spannerUdfSecurity\" : \"INVOKER\"," + + " \"spannerUdfParameter_0\" : \"arg0 TEXT\"," + + " \"spannerUdfParameter_1\" : \"arg1 TEXT DEFAULT \\\"bar\\\"\"," + + " \"spannerUdfDefinition\" : \"SELECT 1\"" + + "}"; + + Schema schema = new Schema.Parser().parse(avroString); + + AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(Dialect.POSTGRESQL); + Ddl ddl = converter.toDdl(Collections.singleton(schema)); + assertThat(ddl.udfs(), hasSize(1)); + assertThat( + ddl.prettyPrint(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"Foo\"(\"arg0\" TEXT, \"arg1\" TEXT DEFAULT \"bar\")" + + " RETURNS TEXT SQL SECURITY INVOKER RETURN SELECT 1")); + } + + @Test + public void udfRemote() { + String avroString = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"UdfSchema_Foo\"," + + " \"fields\" : []," + + " \"namespace\" : \"spannertest\"," + + " \"googleStorage\" : \"CloudSpanner\"," + + " \"googleFormatVersion\" : \"booleans\"," + + " \"spannerEntity\" : \"spannerUdf\", " + + " \"spannerName\" : \"UdfSchema.Foo\"," + + " \"spannerUdfName\" : \"UdfSchema.Foo\"," + + " \"spannerUdfType\" : \"STRING\"," + + " \"spannerUdfLanguage\" : \"REMOTE\"," + + " \"spannerUdfParameter_0\" : \"arg0 STRING\"," + + " \"spannerUdfParameter_1\" : \"arg1 STRING DEFAULT \\\"bar\\\"\"," + + " \"spannerOption_0\" : \"endpoint=\\\"https://us-central1-myproject.cloudfunctions.net/myfunc\\\"\"," + + " \"spannerOption_1\" : \"max_batching_rows=50\"" + + "}"; + + Schema schema = new Schema.Parser().parse(avroString); + + AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(); + Ddl ddl = converter.toDdl(Collections.singleton(schema)); + assertThat(ddl.udfs(), hasSize(1)); + assertThat( + ddl.prettyPrint(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION `UdfSchema`.`Foo`(`arg0` STRING, `arg1` STRING DEFAULT \"bar\")" + + " RETURNS STRING NOT DETERMINISTIC LANGUAGE REMOTE" + + " OPTIONS (endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\", max_batching_rows=50)")); + } + + @Test + public void pgUdfRemote() { + String avroString = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"UdfSchema.Foo\"," + + " \"spannerEntity\" : \"spannerUdf\", " + + " \"fields\" : []," + + " \"namespace\" : \"spannertest\"," + + " \"googleStorage\" : \"CloudSpanner\"," + + " \"googleFormatVersion\" : \"booleans\"," + + " \"spannerName\" : \"UdfSchema.Foo\"," + + " \"spannerUdfName\" : \"UdfSchema.Foo\"," + + " \"spannerUdfType\" : \"STRING\"," + + " \"spannerUdfLanguage\" : \"REMOTE\"," + + " \"spannerUdfParameter_0\" : \"arg0 STRING\"," + + " \"spannerUdfParameter_1\" : \"arg1 STRING DEFAULT \\\"bar\\\"\"," + + " \"spannerUdfDefinition\" : \"{\\\"endpoint\\\": \\\"https://us-central1-myproject.cloudfunctions.net/myfunc\\\", \\\"max_batching_rows\\\": 50}\"" + + "}"; + + Schema schema = new Schema.Parser().parse(avroString); + + AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(Dialect.POSTGRESQL); + Ddl ddl = converter.toDdl(Collections.singleton(schema)); + assertThat(ddl.udfs(), hasSize(1)); + assertThat( + ddl.prettyPrint(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"UdfSchema\".\"Foo\"(\"arg0\" STRING, \"arg1\" STRING DEFAULT \"bar\")" + + " RETURNS STRING VOLATILE LANGUAGE REMOTE" + + " AS '{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\": 50}'")); + } + @Test public void invokerRightsView() { String avroString = diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java index b67f22a13d..c1a64702e0 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java @@ -1101,6 +1101,59 @@ public void udfs() throws Exception { UdfParameter.parse( "arg1 STRING DEFAULT 'bar'", "s1.Foo2", Dialect.GOOGLE_STANDARD_SQL)) .endUdf() + .createUdf("s1.Foo3") + .dialect(Dialect.GOOGLE_STANDARD_SQL) + .name("s1.Foo3") + .language("REMOTE") + .type("INT64") + .addParameter(UdfParameter.parse("arg0 INT64", "s1.Foo3", Dialect.GOOGLE_STANDARD_SQL)) + .options( + ImmutableList.of( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"")) + .endUdf() + .build(); + createAndPopulate(ddl, 0); + runTest(); + } + + @Test + public void pgUdfs() throws Exception { + Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); + List dbOptionList = new ArrayList<>(); + dbOptionList.add( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("\"bit_reversed_positive\"") + .build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); + Ddl ddl = + ddlBuilder + .createSchema("s1") + .endNamedSchema() + .createUdf("s1.Foo1") + .dialect(Dialect.POSTGRESQL) + .name("s1.Foo1") + .definition("(SELECT 'bar')") + .endUdf() + .createUdf("s1.Foo2") + .dialect(Dialect.POSTGRESQL) + .name("s1.Foo2") + .definition("(SELECT 'bar')") + .security(SqlSecurity.INVOKER) + .type("TEXT") + .addParameter(UdfParameter.parse("arg0 TEXT", "s1.Foo2", Dialect.POSTGRESQL)) + .addParameter( + UdfParameter.parse("arg1 TEXT DEFAULT 'bar'", "s1.Foo2", Dialect.POSTGRESQL)) + .endUdf() + .createUdf("s1.Foo3") + .dialect(Dialect.POSTGRESQL) + .name("s1.Foo2") + .language("REMOTE") + .type("BIGINT") + .addParameter(UdfParameter.parse("arg0 BIGINT", "s1.Foo3", Dialect.POSTGRESQL)) + .definition( + "{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\"}") + .endUdf() .build(); createAndPopulate(ddl, 0); runTest(); diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java index 2c833e102d..81e39e5f3a 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java @@ -38,6 +38,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_INDEX; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_INTERLEAVE_TYPE; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_LABEL; +import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_NAME; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_NODE_TABLE; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_ON_DELETE_ACTION; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_OPTION; @@ -51,6 +52,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MAX; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_DEFINITION; +import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_LANGUAGE; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_NAME; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_PARAMETER; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_SECURITY; @@ -733,6 +735,95 @@ public void udfAllOptions() { assertThat(avroUdf.getName(), equalTo("spanner_Foo")); } + @Test + public void udfRemote() { + DdlToAvroSchemaConverter converter = + new DdlToAvroSchemaConverter("spannertest", "booleans", false); + Ddl ddl = + Ddl.builder() + .createUdf("UdfSchema.Foo") + .name("UdfSchema.Foo") + .type("STRING") + .language("REMOTE") + .addParameter( + UdfParameter.parse("arg0 STRING", "UdfSchema.Foo", Dialect.GOOGLE_STANDARD_SQL)) + .addParameter( + UdfParameter.parse( + "arg1 STRING DEFAULT \"bar\"", "UdfSchema.Foo", Dialect.GOOGLE_STANDARD_SQL)) + .options( + ImmutableList.of( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"", + "max_batching_rows=50")) + .endUdf() + .build(); + + Collection result = converter.convert(ddl); + assertThat(result, hasSize(1)); + Schema avroUdf = result.iterator().next(); + + assertThat(avroUdf, notNullValue()); + + assertThat(avroUdf.getName(), equalTo("UdfSchema_Foo")); + assertThat(avroUdf.getNamespace(), equalTo("spannertest")); + assertThat(avroUdf.getProp(GOOGLE_FORMAT_VERSION), equalTo("booleans")); + assertThat(avroUdf.getProp(GOOGLE_STORAGE), equalTo("CloudSpanner")); + assertThat(avroUdf.getProp(SPANNER_NAME), equalTo("UdfSchema.Foo")); + assertThat(avroUdf.getProp(SPANNER_UDF_NAME), equalTo("UdfSchema.Foo")); + assertThat(avroUdf.getProp(SPANNER_UDF_DEFINITION), nullValue()); + assertThat(avroUdf.getProp(SPANNER_UDF_SECURITY), nullValue()); + assertThat(avroUdf.getProp(SPANNER_UDF_TYPE), equalTo("STRING")); + assertThat(avroUdf.getProp(SPANNER_UDF_LANGUAGE), equalTo("REMOTE")); + assertThat(avroUdf.getProp(SPANNER_UDF_PARAMETER + 0), equalTo("`arg0` STRING")); + assertThat( + avroUdf.getProp(SPANNER_UDF_PARAMETER + 1), equalTo("`arg1` STRING DEFAULT \"bar\"")); + assertThat( + avroUdf.getProp(SPANNER_OPTION + 0), + equalTo("endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"")); + assertThat(avroUdf.getProp(SPANNER_OPTION + 1), equalTo("max_batching_rows=50")); + } + + @Test + public void pgUdfRemote() { + DdlToAvroSchemaConverter converter = + new DdlToAvroSchemaConverter("spannertest", "booleans", false); + Ddl ddl = + Ddl.builder(Dialect.POSTGRESQL) + .createUdf("UdfSchema.Foo") + .name("UdfSchema.Foo") + .type("TEXT") + .language("REMOTE") + .definition( + "{\"endpoint\":\"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\":50}") + .addParameter(UdfParameter.parse("arg0 TEXT", "UdfSchema.Foo", Dialect.POSTGRESQL)) + .addParameter( + UdfParameter.parse("arg1 TEXT DEFAULT \"bar\"", "spanner.Foo", Dialect.POSTGRESQL)) + .endUdf() + .build(); + + Collection result = converter.convert(ddl); + assertThat(result, hasSize(1)); + Schema avroUdf = result.iterator().next(); + + assertThat(avroUdf, notNullValue()); + + assertThat(avroUdf.getName(), equalTo("UdfSchema_Foo")); + assertThat(avroUdf.getNamespace(), equalTo("spannertest")); + assertThat(avroUdf.getProp(GOOGLE_FORMAT_VERSION), equalTo("booleans")); + assertThat(avroUdf.getProp(GOOGLE_STORAGE), equalTo("CloudSpanner")); + assertThat(avroUdf.getProp(SPANNER_NAME), equalTo("UdfSchema.Foo")); + assertThat(avroUdf.getProp(SPANNER_UDF_NAME), equalTo("UdfSchema.Foo")); + assertThat( + avroUdf.getProp(SPANNER_UDF_DEFINITION), + equalTo( + "{\"endpoint\":\"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\":50}")); + assertThat(avroUdf.getProp(SPANNER_UDF_SECURITY), nullValue()); + assertThat(avroUdf.getProp(SPANNER_UDF_TYPE), equalTo("TEXT")); + assertThat(avroUdf.getProp(SPANNER_UDF_LANGUAGE), equalTo("REMOTE")); + assertThat(avroUdf.getProp(SPANNER_UDF_PARAMETER + 0), equalTo("\"arg0\" TEXT")); + assertThat( + avroUdf.getProp(SPANNER_UDF_PARAMETER + 1), equalTo("\"arg1\" TEXT DEFAULT \"bar\"")); + } + @Test public void invokerRightsView() { DdlToAvroSchemaConverter converter = diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java index 733e00c42c..9c375ae404 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java @@ -207,18 +207,19 @@ private void testSpannerToGCSAvroBase( // an empty database without any tables. spannerResourceManager.executeDdlStatement(setDefaultTimeZoneStatement); + String prefix = testName + "_"; String resourceFileName = "ExportPipelineIT/spanner-gsql-ddl.sql"; String ddl = String.join( " ", Resources.readLines( Resources.getResource(resourceFileName), StandardCharsets.UTF_8)) - .replaceAll("%PREFIX%", testName); + .replaceAll("%PREFIX%", prefix); ddl = ddl.trim(); List ddls = Arrays.stream(ddl.split(";")).filter(d -> !d.isBlank()).toList(); spannerResourceManager.executeDdlStatements(ddls); - List expectedData = generateTableRows(String.format("%s_Singers", testName)); + List expectedData = generateTableRows(String.format("%sSingers", prefix)); spannerResourceManager.write(expectedData); PipelineLauncher.LaunchConfig.Builder options = paramsAdder.apply( @@ -238,31 +239,33 @@ private void testSpannerToGCSAvroBase( List singersArtifacts = gcsClient.listArtifacts( - "output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Singers"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Singers"))); List emptyArtifacts = gcsClient.listArtifacts( - "output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Empty"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Empty"))); List modelStructArtifacts = + gcsClient.listArtifacts( + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "ModelStruct"))); + List udfRemoteArtifacts = gcsClient.listArtifacts( "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "ModelStruct"))); + Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "UdfSchema.Remote"))); List searchIndexArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "SearchIndex"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "SearchIndex"))); List identityArtifacts = gcsClient.listArtifacts( - "output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Identity"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Identity"))); List sequenceArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence1"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Sequence1"))); List sequenceNoKindArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Sequence2"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Sequence2"))); + assertThat(singersArtifacts).isNotEmpty(); assertThat(emptyArtifacts).isNotEmpty(); + assertThat(udfRemoteArtifacts).isNotEmpty(); assertThat(modelStructArtifacts).isNotEmpty(); assertThat(identityArtifacts).isNotEmpty(); assertThat(sequenceArtifacts).isNotEmpty(); @@ -297,8 +300,7 @@ private void testPGSpannerToAvroBase( Function paramsAdder) throws IOException { - String tableNamePrefix = testName.substring(0, 15); - + String prefix = testName.substring(0, 15) + "_"; String setDefaultTimeZoneStatement = "ALTER DATABASE db SET spanner.default_time_zone = 'UTC'"; // Setting default time zone needs to be the first statement because it requires // an empty database without any tables. @@ -310,12 +312,12 @@ private void testPGSpannerToAvroBase( " ", Resources.readLines( Resources.getResource(resourceFileName), StandardCharsets.UTF_8)) - .replaceAll("%PREFIX%", tableNamePrefix); + .replaceAll("%PREFIX%", prefix); ddl = ddl.trim(); List ddls = Arrays.stream(ddl.split(";")).filter(d -> !d.isBlank()).toList(); spannerResourceManager.executeDdlStatements(ddls); - List expectedData = generateTableRows(String.format("%s_Singers", tableNamePrefix)); + List expectedData = generateTableRows(String.format("%sSingers", prefix)); spannerResourceManager.write(expectedData); PipelineLauncher.LaunchConfig.Builder options = paramsAdder.apply( @@ -335,28 +337,22 @@ private void testPGSpannerToAvroBase( List singersArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "Singers"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Singers"))); List emptyArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "Empty"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Empty"))); List searchIndexArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "SearchIndex"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "SearchIndex"))); List identityArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "Identity"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Identity"))); List sequenceArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "Sequence1"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Sequence1"))); List sequenceNoKindArtifacts = gcsClient.listArtifacts( - "output/", - Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", tableNamePrefix, "Sequence2"))); + "output/", Pattern.compile(String.format(".*/%s%s.*\\.avro.*", prefix, "Sequence2"))); assertThat(singersArtifacts).isNotEmpty(); assertThat(emptyArtifacts).isNotEmpty(); assertThat(identityArtifacts).isNotEmpty(); diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java index 54df2b1a73..82aef50deb 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; import org.apache.beam.it.common.utils.ResourceManagerUtils; @@ -109,8 +110,24 @@ private void uploadImportPipelineArtifacts(String subdirectory) throws IOExcepti "input/Sequence2-manifest.json", Resources.getResource("ImportPipelineIT/" + subdirectory + "/Sequence2-manifest.json") .getPath()); + gcsClient.uploadArtifact( + "input/UdfSchema.avro-00000-of-00001", + Resources.getResource("ImportPipelineIT/" + subdirectory + "/UdfSchema.avro").getPath()); + gcsClient.uploadArtifact( + "input/UdfSchema-manifest.json", + Resources.getResource("ImportPipelineIT/" + subdirectory + "/UdfSchema-manifest.json") + .getPath()); if (Objects.equals(subdirectory, "googlesql")) { + gcsClient.uploadArtifact( + "input/UdfSchema.Remote.avro-00000-of-00001", + Resources.getResource("ImportPipelineIT/" + subdirectory + "/UdfSchema.Remote.avro") + .getPath()); + gcsClient.uploadArtifact( + "input/UdfSchema.Remote-manifest.json", + Resources.getResource( + "ImportPipelineIT/" + subdirectory + "/UdfSchema.Remote-manifest.json") + .getPath()); gcsClient.uploadArtifact( "input/ModelStruct.avro-00000-of-00001", Resources.getResource("ImportPipelineIT/" + subdirectory + "/ModelStruct.avro") @@ -235,6 +252,15 @@ private void testGoogleSqlImportPipelineBase( assertThat(float32Records).hasSize(9); assertThatStructs(float32Records).hasRecordsUnordered(getFloat32TableExpectedRows()); + + assertThat( + spannerResourceManager + .runQuery( + "SELECT CONCAT(ROUTINE_SCHEMA, '.', ROUTINE_NAME) FROM INFORMATION_SCHEMA.ROUTINES") + .stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList())) + .containsExactly("UdfSchema.Remote"); } @Test @@ -299,6 +325,8 @@ private void testPostgresImportPipelineBase( assertThat(float32Records).hasSize(9); assertThatStructs(float32Records).hasRecordsUnordered(getFloat32TableExpectedRows()); + + // TODO(b/485601737): Add PG UDFs. } // TODO(b/395532087): Consolidate this with other tests after UUID launch. diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java index 1a75574860..480bf82c34 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java @@ -1370,6 +1370,18 @@ public void udfs() { .addParameter( UdfParameter.parse( "arg1 STRING DEFAULT 'bar'", "spanner.Foo", Dialect.GOOGLE_STANDARD_SQL)) + .endUdf() + .createUdf("spanner.Foo3") + .dialect(Dialect.GOOGLE_STANDARD_SQL) + .name("Foo3") + .type("STRING") + .language("REMOTE") + .addParameter( + UdfParameter.parse("arg0 INT64", "spanner.Foo3", Dialect.GOOGLE_STANDARD_SQL)) + .options( + ImmutableList.of( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"", + "max_batching_rows=50")) .endUdf(); assertThat(ddlBuilder.hasUdf("spanner.Foo1")); assertThat(ddlBuilder.createUdf("spanner.Foo1").name().equals("Foo1")); @@ -1378,11 +1390,13 @@ public void udfs() { String expectedDdlString = "\nCREATE FUNCTION `Foo1`() AS ((SELECT 'bar'))\n" + "CREATE FUNCTION `Foo2`(`arg0` STRING, `arg1` STRING DEFAULT 'bar')" - + " RETURNS STRING SQL SECURITY INVOKER AS ((SELECT 'bar'))"; + + " RETURNS STRING SQL SECURITY INVOKER AS ((SELECT 'bar'))\n" + + "CREATE FUNCTION `Foo3`(`arg0` INT64) RETURNS STRING NOT DETERMINISTIC LANGUAGE REMOTE" + + " OPTIONS (endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\", max_batching_rows=50)"; assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(expectedDdlString)); List statements = ddl.statements(); - assertEquals(2, statements.size()); + assertEquals(3, statements.size()); assertThat( statements.get(0), equalToCompressingWhiteSpace("CREATE FUNCTION `Foo1`() AS ((SELECT 'bar'))")); @@ -1391,6 +1405,72 @@ public void udfs() { equalToCompressingWhiteSpace( "CREATE FUNCTION `Foo2`(`arg0` STRING, `arg1` STRING DEFAULT 'bar')" + " RETURNS STRING SQL SECURITY INVOKER AS ((SELECT 'bar'))")); + assertThat( + statements.get(2), + equalToCompressingWhiteSpace( + "CREATE FUNCTION `Foo3`(`arg0` INT64) RETURNS STRING NOT DETERMINISTIC LANGUAGE REMOTE" + + " OPTIONS (endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\", max_batching_rows=50)")); + assertNotNull(ddl.hashCode()); + + assertThat( + ddl.toBuilder().build().prettyPrint(), equalToCompressingWhiteSpace(expectedDdlString)); + } + + @Test + public void pgUdfs() { + Ddl.Builder ddlBuilder = + Ddl.builder(Dialect.POSTGRESQL) + .createUdf("spanner.Foo1") + .dialect(Dialect.POSTGRESQL) + .name("Foo1") + .definition("(SELECT 'bar')") + .endUdf() + .createUdf("spanner.Foo2") + .dialect(Dialect.POSTGRESQL) + .name("Foo2") + .definition("(SELECT 'bar')") + .security(SqlSecurity.INVOKER) + .type("STRING") + .addParameter(UdfParameter.parse("arg0 TEXT", "spanner.Foo", Dialect.POSTGRESQL)) + .addParameter( + UdfParameter.parse("arg1 TEXT DEFAULT 'bar'", "spanner.Foo", Dialect.POSTGRESQL)) + .endUdf() + .createUdf("spanner.Foo3") + .dialect(Dialect.POSTGRESQL) + .name("Foo3") + .type("STRING") + .language("REMOTE") + .addParameter(UdfParameter.parse("arg0 BIGINT", "spanner.Foo3", Dialect.POSTGRESQL)) + .definition( + "{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\": 50}") + .endUdf(); + assertThat(ddlBuilder.hasUdf("spanner.Foo1")); + assertThat(ddlBuilder.createUdf("spanner.Foo1").name().equals("Foo1")); + Ddl ddl = ddlBuilder.build(); + + String expectedDdlString = + "\nCREATE FUNCTION \"Foo1\"() RETURN (SELECT 'bar')\n" + + "CREATE FUNCTION \"Foo2\"(\"arg0\" TEXT, \"arg1\" TEXT DEFAULT 'bar')" + + " RETURNS STRING SQL SECURITY INVOKER RETURN (SELECT 'bar')\n" + + "CREATE FUNCTION \"Foo3\"(\"arg0\" BIGINT) RETURNS STRING VOLATILE LANGUAGE REMOTE" + + " AS '{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\": 50}'"; + assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(expectedDdlString)); + + List statements = ddl.statements(); + assertEquals(3, statements.size()); + assertThat( + statements.get(0), + equalToCompressingWhiteSpace("CREATE FUNCTION \"Foo1\"() RETURN (SELECT 'bar')")); + assertThat( + statements.get(1), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"Foo2\"(\"arg0\" TEXT, \"arg1\" TEXT DEFAULT 'bar')" + + " RETURNS STRING SQL SECURITY INVOKER RETURN (SELECT 'bar')")); + assertThat( + statements.get(2), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"Foo3\"(\"arg0\" BIGINT) RETURNS STRING VOLATILE LANGUAGE REMOTE" + + " AS '{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\", \"max_batching_rows\": 50}'")); assertNotNull(ddl.hashCode()); assertThat( diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index 0036985f35..a2fb664747 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -19,6 +19,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_KIND; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MAX; import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; @@ -635,14 +636,18 @@ public void simpleUdf() throws Exception { + "C STRING DEFAULT 'NULL', " + "D STRING DEFAULT '') " + "RETURNS STRING AS (CONCAT(A, '::', B, '::', C, '::', D))"; + String udfDef3 = + "CREATE FUNCTION s1.remote_udf(x INT64, y INT64) " + + "RETURNS INT64 NOT DETERMINISTIC LANGUAGE REMOTE " + + "OPTIONS ( endpoint = 'https://us-central1-myproject.cloudfunctions.net/myfunc', max_batching_rows = 50 )"; - SPANNER_SERVER.createDatabase(dbId, Arrays.asList(namedSchemaDef, udfDef1, udfDef2)); + SPANNER_SERVER.createDatabase(dbId, Arrays.asList(namedSchemaDef, udfDef1, udfDef2, udfDef3)); Ddl ddl = getDatabaseDdl(); assertThat(ddl.schemas(), hasSize(1)); assertThat(ddl.schema("s1"), notNullValue()); - assertThat(ddl.udfs(), hasSize(2)); + assertThat(ddl.udfs(), hasSize(3)); Udf udf1 = ddl.udf("s1.foo"); assertThat(udf1, notNullValue()); assertThat(ddl.udf("S1.FOO"), sameInstance(udf1)); @@ -651,13 +656,21 @@ public void simpleUdf() throws Exception { assertThat(udf2, notNullValue()); assertThat(ddl.udf("S1.DEFault_values"), sameInstance(udf2)); + Udf udf3 = ddl.udf("s1.remote_udf"); + assertThat(udf3, notNullValue()); + assertThat(ddl.udf("S1.REMOTE_UDF"), sameInstance(udf3)); + assertThat(udf1.name(), equalTo("s1.foo")); assertThat(udf1.type(), equalTo("INT64")); + assertEquals(udf1.language(), "SQL"); + assertThat(udf1.options(), empty()); assertThat(udf1.definition(), equalTo("1")); assertEquals(udf1.security(), Udf.SqlSecurity.INVOKER); assertThat(udf2.name(), equalTo("s1.default_values")); assertThat(udf2.type(), equalTo("STRING")); + assertEquals(udf2.language(), "SQL"); + assertThat(udf2.options(), empty()); assertThat(udf2.definition(), equalTo("CONCAT(A, '::', B, '::', C, '::', D)")); assertEquals(udf2.security(), Udf.SqlSecurity.INVOKER); assertThat( @@ -687,6 +700,47 @@ public void simpleUdf() throws Exception { .type("STRING") .defaultExpression("''") .autoBuild())); + + assertThat(udf3.name(), equalTo("s1.remote_udf")); + assertThat(udf3.type(), equalTo("INT64")); + assertEquals(udf3.language(), "REMOTE"); + assertThat( + udf3.options(), + hasItems( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"", + "max_batching_rows=50")); + assertEquals(udf3.definition(), ""); + assertEquals(udf3.security(), Udf.SqlSecurity.INVOKER); + assertThat( + udf3.parameters(), + hasItems( + UdfParameter.builder() + .functionSpecificName("s1.remote_udf") + .name("x") + .type("INT64") + .defaultExpression(null) + .autoBuild(), + UdfParameter.builder() + .functionSpecificName("s1.remote_udf") + .name("y") + .type("INT64") + .defaultExpression(null) + .autoBuild())); + } + + @Test + public void pgSimpleUdf() throws Exception { + String namedSchemaDef = "CREATE SCHEMA s1"; + + SPANNER_SERVER.createPgDatabase(dbId, Arrays.asList(namedSchemaDef)); + Ddl ddl = getPgDatabaseDdl(); + + assertThat(ddl.schemas(), hasSize(1)); + assertThat(ddl.schema("s1"), notNullValue()); + + // TODO(b/485601737): Add PG UDFs. + + assertThat(ddl.udfs(), hasSize(0)); } @Test diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java index fe12f1faa9..5284370302 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java @@ -234,7 +234,7 @@ public void testListFunctionParametersSQL() { "SELECT p.specific_schema, p.specific_name, p.parameter_name, p.data_type," + " p.parameter_default FROM information_schema.parameters AS p, information_schema.routines AS r" + " WHERE p.specific_schema NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS') and p.specific_name =" - + " r.specific_name and r.routine_type = 'FUNCTION' and r.routine_body = 'SQL' ORDER BY p.specific_schema," + + " r.specific_name and r.routine_type = 'FUNCTION' ORDER BY p.specific_schema," + " p.specific_name, p.ordinal_position")); assertThrows( diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java index c85cf1c8bd..0e9ab7bd52 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java @@ -102,6 +102,8 @@ public abstract class RandomDdlGenerator { Type.Code.PG_NUMERIC, Type.Code.PG_DATE)); + private static final String[] UDF_LANGUAGES = new String[] {"SQL", "REMOTE"}; + private static final int MAX_PKS = 16; public abstract Dialect getDialect(); @@ -110,6 +112,8 @@ public abstract class RandomDdlGenerator { public abstract int getArrayChance(); + public abstract int getRemoteUdfChance(); + public abstract int[] getMaxBranchPerLevel(); public abstract int getMaxPkComponents(); @@ -146,6 +150,7 @@ public static Builder builder(Dialect dialect) { .setDialect(dialect) .setRandom(new Random()) .setArrayChance(20) + .setRemoteUdfChance(20) .setMaxPkComponents(3) .setMaxBranchPerLevel(new int[] {2, 2, 1, 1, 1, 1, 1}) .setMaxUdfs(0) @@ -172,6 +177,8 @@ public abstract static class Builder { public abstract Builder setArrayChance(int chance); + public abstract Builder setRemoteUdfChance(int chance); + public abstract Builder setMaxBranchPerLevel(int[] arr); public abstract Builder setMaxPkComponents(int val); @@ -236,16 +243,35 @@ private void generateUdf(Ddl.Builder builder) { .dialect(Dialect.GOOGLE_STANDARD_SQL) .name(name); if (getRandom().nextBoolean()) { - Type type = generateType(PK_TYPES, -1); + Type type = + generateType((getDialect() == Dialect.GOOGLE_STANDARD_SQL) ? PK_TYPES : PG_PK_TYPES, -1); udfBuilder.type(type.getCode().getName()); } - if (getRandom().nextBoolean()) { - udfBuilder.security(SqlSecurity.INVOKER); + + if (getRandom().nextInt(100) <= getRemoteUdfChance()) { + udfBuilder.language("REMOTE"); } + + if (!"REMOTE".equals(udfBuilder.language())) { + if (getRandom().nextBoolean()) { + udfBuilder.security(SqlSecurity.INVOKER); + } + } else { + if (getDialect() == Dialect.GOOGLE_STANDARD_SQL) { + udfBuilder.options( + ImmutableList.of( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"")); + } else { + udfBuilder.definition( + "{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\"}"); + } + } + int numUdfParameters = getRandom().nextInt(getMaxUdfParameters() + 1); for (int i = 0; i < numUdfParameters; i++) { String paramName = generateIdentifier(getMaxIdLength()); - Type type = generateType(PK_TYPES, -1); + Type type = + generateType((getDialect() == Dialect.GOOGLE_STANDARD_SQL) ? PK_TYPES : PG_PK_TYPES, -1); UdfParameter.Builder udfParameterBuilder = udfBuilder.parameter(paramName).type(type.getCode().getName()); if (getRandom().nextBoolean()) { diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/UdfTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/UdfTest.java index e9ed6bd64c..db8ade30b3 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/UdfTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/UdfTest.java @@ -21,6 +21,7 @@ import com.google.cloud.spanner.Dialect; import com.google.cloud.teleport.spanner.ddl.Udf.SqlSecurity; +import com.google.common.collect.ImmutableList; import org.junit.Test; /** Unit tests for Udf class. */ @@ -73,4 +74,73 @@ public void testUdfWithInvalidParameter() { assertThrows(IllegalArgumentException.class, () -> udf.parameter("p1")); } + + @Test + public void testRemoteUdf() { + Udf udf = + Udf.builder() + .name("foo") + .specificName("s1.foo") + .dialect(Dialect.GOOGLE_STANDARD_SQL) + .type("string") + .language("REMOTE") + .addParameter(UdfParameter.parse("p1 int32", "s1.foo", Dialect.GOOGLE_STANDARD_SQL)) + .options( + ImmutableList.of( + "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"", + "max_batching_rows=50")) + .build(); + + assertThat( + udf.toString(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION `foo`(`p1` int32) RETURNS string NOT DETERMINISTIC LANGUAGE REMOTE" + + " OPTIONS (endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\", max_batching_rows=50)")); + + assertThat( + udf.toBuilder().build().toString(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION `foo`(`p1` int32) RETURNS string NOT DETERMINISTIC LANGUAGE REMOTE" + + " OPTIONS (endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\", max_batching_rows=50)")); + } + + @Test + public void testPgRemoteUdf() { + Udf udf = + Udf.builder() + .name("foo") + .specificName("s1.foo") + .dialect(Dialect.POSTGRESQL) + .type("TEXT") + .language("REMOTE") + .addParameter(UdfParameter.parse("p1 BIGINT", "s1.foo", Dialect.POSTGRESQL)) + .definition( + "{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/'myfunc\"}") + .build(); + + assertThat( + udf.toString(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"foo\"(\"p1\" BIGINT) RETURNS TEXT VOLATILE LANGUAGE REMOTE" + + " AS '{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/''myfunc\"}'")); + + assertThat( + udf.toBuilder().build().toString(), + equalToCompressingWhiteSpace( + "CREATE FUNCTION \"foo\"(\"p1\" BIGINT) RETURNS TEXT VOLATILE LANGUAGE REMOTE" + + " AS '{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/''myfunc\"}'")); + + assertThrows( + IllegalArgumentException.class, + () -> + Udf.builder() + .name("bar") + .specificName("s1.bar") + .dialect(Dialect.POSTGRESQL) + .type("TEXT") + .language("REMOTE") + .options(ImmutableList.of("option = value")) + .build() + .toString()); + } } diff --git a/v1/src/test/resources/ExportPipelineIT/spanner-gsql-ddl.sql b/v1/src/test/resources/ExportPipelineIT/spanner-gsql-ddl.sql index c849546c17..f30df2bed4 100644 --- a/v1/src/test/resources/ExportPipelineIT/spanner-gsql-ddl.sql +++ b/v1/src/test/resources/ExportPipelineIT/spanner-gsql-ddl.sql @@ -1,45 +1,54 @@ -DROP TABLE IF EXISTS `%PREFIX%_EmptyTable`; -CREATE TABLE `%PREFIX%_EmptyTable` ( +DROP TABLE IF EXISTS `%PREFIX%EmptyTable`; +CREATE TABLE `%PREFIX%EmptyTable` ( id INT64 NOT NULL ) PRIMARY KEY(id); ALTER DATABASE db SET OPTIONS (default_sequence_kind = 'bit_reversed_positive'); -DROP TABLE IF EXISTS `%PREFIX%_Identity`; -CREATE TABLE `%PREFIX%_Identity` ( +DROP TABLE IF EXISTS `%PREFIX%Identity`; +CREATE TABLE `%PREFIX%Identity` ( Id INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), NonKeyIdCol1 INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY, NonKeyIdCol2 INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (SKIP RANGE 1000, 2000) ) PRIMARY KEY(Id); -DROP SEQUENCE IF EXISTS `%PREFIX%_Sequence1`; -CREATE SEQUENCE `%PREFIX%_Sequence1` BIT_REVERSED_POSITIVE SKIP RANGE 99, 999; +DROP SEQUENCE IF EXISTS `%PREFIX%Sequence1`; +CREATE SEQUENCE `%PREFIX%Sequence1` BIT_REVERSED_POSITIVE SKIP RANGE 99, 999; -DROP SEQUENCE IF EXISTS `%PREFIX%_Sequence2`; -CREATE SEQUENCE `%PREFIX%_Sequence2`; +DROP SEQUENCE IF EXISTS `%PREFIX%Sequence2`; +CREATE SEQUENCE `%PREFIX%Sequence2`; -DROP TABLE IF EXISTS `%PREFIX%_Root`; -CREATE TABLE `%PREFIX%_Root` ( +DROP TABLE IF EXISTS `%PREFIX%Root`; +CREATE TABLE `%PREFIX%Root` ( Id INT64 NOT NULL ) PRIMARY KEY(Id); -DROP TABLE IF EXISTS `%PREFIX%_Singers`; -CREATE TABLE `%PREFIX%_Singers` ( +DROP TABLE IF EXISTS `%PREFIX%Singers`; +CREATE TABLE `%PREFIX%Singers` ( Id INT64 NOT NULL, FirstName String(1024), LastName String(1024), Rating FLOAT32, Review String(MAX), `MyTokens` TOKENLIST AS (TOKENIZE_FULLTEXT(Review)) HIDDEN -) PRIMARY KEY(Id), INTERLEAVE IN `%PREFIX%_Root`; +) PRIMARY KEY(Id), INTERLEAVE IN `%PREFIX%Root`; -DROP MODEL IF EXISTS `%PREFIX%_ModelStruct`; -CREATE MODEL `%PREFIX%_ModelStruct` +CREATE CHANGE STREAM `%PREFIX%SingersChanges` FOR `%PREFIX%Singers`; + +DROP MODEL IF EXISTS `%PREFIX%ModelStruct`; +CREATE MODEL `%PREFIX%ModelStruct` INPUT(content STRING(MAX)) OUTPUT (embeddings STRUCT, values ARRAY>) REMOTE OPTIONS (endpoint="//aiplatform.googleapis.com/projects/span-cloud-testing/locations/us-central1/publishers/google/models/textembedding-gecko"); -DROP SEARCH INDEX IF EXISTS `%PREFIX%_SequenceIndex`; -CREATE SEARCH INDEX `%PREFIX%_SearchIndex` - ON `%PREFIX%_Singers`(`MyTokens`) +CREATE SCHEMA `%PREFIX%UdfSchema`; + +CREATE FUNCTION `%PREFIX%UdfSchema`.`Remote`(x INT64, y INT64) RETURNS INT64 NOT DETERMINISTIC LANGUAGE REMOTE OPTIONS ( + endpoint = "https://us-central1-myproject.cloudfunctions.net/myfunc", + max_batching_rows = 10 +); + +DROP SEARCH INDEX IF EXISTS `%PREFIX%SequenceIndex`; +CREATE SEARCH INDEX `%PREFIX%SearchIndex` + ON `%PREFIX%Singers`(`MyTokens`) OPTIONS (sort_order_sharding=TRUE); diff --git a/v1/src/test/resources/ExportPipelineIT/spanner-pg-ddl.sql b/v1/src/test/resources/ExportPipelineIT/spanner-pg-ddl.sql index a02970568e..14c1882e9a 100644 --- a/v1/src/test/resources/ExportPipelineIT/spanner-pg-ddl.sql +++ b/v1/src/test/resources/ExportPipelineIT/spanner-pg-ddl.sql @@ -1,40 +1,44 @@ -DROP TABLE IF EXISTS "%PREFIX%_EmptyTable"; -CREATE TABLE "%PREFIX%_EmptyTable" ( +DROP TABLE IF EXISTS "%PREFIX%EmptyTable"; +CREATE TABLE "%PREFIX%EmptyTable" ( id bigint NOT NULL, PRIMARY KEY(id) ); ALTER DATABASE db SET spanner.default_sequence_kind = 'bit_reversed_positive'; -DROP TABLE IF EXISTS "%PREFIX%_Identity"; -CREATE TABLE "%PREFIX%_Identity" ( +DROP TABLE IF EXISTS "%PREFIX%Identity"; +CREATE TABLE "%PREFIX%Identity" ( Id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE) PRIMARY KEY, NonKeyIdCol1 bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, NonKeyIdCol2 bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (SKIP RANGE 1000 2000) ); -DROP SEQUENCE IF EXISTS "%PREFIX%_Sequence1"; -CREATE SEQUENCE "%PREFIX%_Sequence1" BIT_REVERSED_POSITIVE SKIP RANGE 99 999; +DROP SEQUENCE IF EXISTS "%PREFIX%Sequence1"; +CREATE SEQUENCE "%PREFIX%Sequence1" BIT_REVERSED_POSITIVE SKIP RANGE 99 999; -DROP SEQUENCE IF EXISTS "%PREFIX%_Sequence2"; -CREATE SEQUENCE "%PREFIX%_Sequence2"; +DROP SEQUENCE IF EXISTS "%PREFIX%Sequence2"; +CREATE SEQUENCE "%PREFIX%Sequence2"; -DROP TABLE IF EXISTS "%PREFIX%_Root"; -CREATE TABLE "%PREFIX%_Root" ( +DROP TABLE IF EXISTS "%PREFIX%Root"; +CREATE TABLE "%PREFIX%Root" ( "Id" bigint, PRIMARY KEY("Id") ); -DROP TABLE IF EXISTS "%PREFIX%_Singers"; -CREATE TABLE "%PREFIX%_Singers" ( +DROP TABLE IF EXISTS "%PREFIX%Singers"; +CREATE TABLE "%PREFIX%Singers" ( "Id" bigint, "FirstName" character varying(256), "LastName" character varying(256), "Rating" real, "NameTokens" spanner.tokenlist generated always as (spanner.tokenize_fulltext("FirstName")) stored hidden, - PRIMARY KEY("Id")) INTERLEAVE IN "%PREFIX%_Root"; + PRIMARY KEY("Id")) INTERLEAVE IN "%PREFIX%Root"; -DROP SEARCH INDEX IF EXISTS "%PREFIX%_SearchIndex"; -CREATE SEARCH INDEX "%PREFIX%_SearchIndex" - ON "%PREFIX%_Singers"("NameTokens") ORDER BY "Id" WHERE "Id" IS NOT NULL +CREATE CHANGE STREAM "%PREFIX%SingersChanges" FOR "%PREFIX%Singers"; + +CREATE SCHEMA "%PREFIX%UdfSchema"; + +DROP SEARCH INDEX IF EXISTS "%PREFIX%SearchIndex"; +CREATE SEARCH INDEX "%PREFIX%SearchIndex" + ON "%PREFIX%Singers"("NameTokens") ORDER BY "Id" WHERE "Id" IS NOT NULL WITH (sort_order_sharding=TRUE); diff --git a/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema-manifest.json b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema-manifest.json new file mode 100644 index 0000000000..75dbc2c1b4 --- /dev/null +++ b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema-manifest.json @@ -0,0 +1,6 @@ +{ + "files": [{ + "name": "UdfSchema.avro-00000-of-00001", + "md5": "ru78pNHqG1/4/+Aj3c5bCA\u003d\u003d" + }] +} diff --git a/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote-manifest.json b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote-manifest.json new file mode 100644 index 0000000000..bf8ab801f9 --- /dev/null +++ b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote-manifest.json @@ -0,0 +1,6 @@ +{ + "files": [{ + "name": "UdfSchema.Remote.avro-00000-of-00001", + "md5": "dmM+e7CpXKLF0w2c5uiQ2w\u003d\u003d" + }] +} diff --git a/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote.avro b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote.avro new file mode 100644 index 0000000000..c5f25bf300 Binary files /dev/null and b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.Remote.avro differ diff --git a/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.avro b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.avro new file mode 100644 index 0000000000..01cdb837c6 Binary files /dev/null and b/v1/src/test/resources/ImportPipelineIT/googlesql/UdfSchema.avro differ diff --git a/v1/src/test/resources/ImportPipelineIT/googlesql/spanner-export.json b/v1/src/test/resources/ImportPipelineIT/googlesql/spanner-export.json index 0a8c3c1bd3..c6ad02dc79 100644 --- a/v1/src/test/resources/ImportPipelineIT/googlesql/spanner-export.json +++ b/v1/src/test/resources/ImportPipelineIT/googlesql/spanner-export.json @@ -14,6 +14,9 @@ }, { "name": "ModelStruct", "manifestFile": "ModelStruct-manifest.json" + }, { + "name": "UdfSchema", + "manifestFile": "UdfSchema-manifest.json" }], "databaseOptions": [{ "optionName": "default_time_zone", @@ -30,5 +33,9 @@ }, { "name": "Sequence2", "manifestFile": "Sequence2-manifest.json" + }], + "udfs": [{ + "name": "UdfSchema.Remote", + "manifestFile": "UdfSchema.Remote-manifest.json" }] } diff --git a/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema-manifest.json b/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema-manifest.json new file mode 100644 index 0000000000..75dbc2c1b4 --- /dev/null +++ b/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema-manifest.json @@ -0,0 +1,6 @@ +{ + "files": [{ + "name": "UdfSchema.avro-00000-of-00001", + "md5": "ru78pNHqG1/4/+Aj3c5bCA\u003d\u003d" + }] +} diff --git a/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema.avro b/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema.avro new file mode 100644 index 0000000000..01cdb837c6 Binary files /dev/null and b/v1/src/test/resources/ImportPipelineIT/postgres/UdfSchema.avro differ diff --git a/v1/src/test/resources/ImportPipelineIT/postgres/spanner-export.json b/v1/src/test/resources/ImportPipelineIT/postgres/spanner-export.json index cdbb916cd3..9f417d05cf 100644 --- a/v1/src/test/resources/ImportPipelineIT/postgres/spanner-export.json +++ b/v1/src/test/resources/ImportPipelineIT/postgres/spanner-export.json @@ -11,6 +11,9 @@ }, { "name": "Identity", "manifestFile": "Identity-manifest.json" + }, { + "name": "UdfSchema", + "manifestFile": "UdfSchema-manifest.json" }], "databaseOptions": [{ "optionName": "default_time_zone",