Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.monitoring.v3.Aggregation.Aligner;
import com.google.monitoring.v3.TimeInterval;
import com.google.protobuf.Timestamp;
import com.google.spanner.admin.instance.v1.Instance.Edition;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
Expand Down Expand Up @@ -193,6 +194,7 @@ private synchronized void maybeCreateInstance() {
InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId))
.setInstanceConfigId(InstanceConfigId.of(projectId, "regional-" + region))
.setDisplayName(instanceId)
.setEdition(Edition.ENTERPRISE_PLUS) // Needed by Full Text Search.
Comment thread
shreyakhajanchi marked this conversation as resolved.
.setNodeCount(nodeCount)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_DEFINITION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_LANGUAGE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_NAME;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_PARAMETER;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_SECURITY;
Expand Down Expand Up @@ -167,7 +168,9 @@ public Udf toUdf(String functionSpecificName, Schema schema) {
.specificName(functionSpecificName)
.name(schema.getProp(SPANNER_UDF_NAME))
.type(schema.getProp(SPANNER_UDF_TYPE))
.definition(schema.getProp(SPANNER_UDF_DEFINITION));
.language(schema.getProp(SPANNER_UDF_LANGUAGE))
.definition(schema.getProp(SPANNER_UDF_DEFINITION))
.options(toOptionsList(schema));
if (schema.getProp(SPANNER_UDF_SECURITY) != null) {
builder.security(Udf.SqlSecurity.valueOf(schema.getProp(SPANNER_UDF_SECURITY)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private AvroUtil() {}
public static final String SPANNER_UDF = "spannerUdf";
public static final String SPANNER_UDF_NAME = "spannerUdfName";
public static final String SPANNER_UDF_TYPE = "spannerUdfType";
public static final String SPANNER_UDF_LANGUAGE = "spannerUdfLanguage";
public static final String SPANNER_UDF_DEFINITION = "spannerUdfDefinition";
public static final String SPANNER_UDF_SECURITY = "spannerUdfSecurity";
public static final String SPANNER_UDF_PARAMETER = "spannerUdfParameter_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_SKIP_RANGE_MIN;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_DEFINITION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_LANGUAGE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_NAME;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_PARAMETER;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_UDF_SECURITY;
Expand Down Expand Up @@ -134,17 +135,26 @@ public Collection<Schema> convert(Ddl ddl) {
// Indicate that this is a "CREATE FUNCTION", not a table or a view.
recordBuilder.prop(SPANNER_ENTITY, SPANNER_UDF);
recordBuilder.prop(SPANNER_UDF_NAME, udf.name());
recordBuilder.prop(SPANNER_UDF_DEFINITION, udf.definition());
if (udf.definition() != null) {
recordBuilder.prop(SPANNER_UDF_DEFINITION, udf.definition());
}
if (udf.type() != null) {
recordBuilder.prop(SPANNER_UDF_TYPE, udf.type());
}
if (udf.language() != null) {
recordBuilder.prop(SPANNER_UDF_LANGUAGE, udf.language());
}
if (udf.security() != null) {
recordBuilder.prop(SPANNER_UDF_SECURITY, udf.security().toString());
}
int i = 0;
for (UdfParameter udfParameter : udf.parameters()) {
recordBuilder.prop(SPANNER_UDF_PARAMETER + i++, udfParameter.prettyPrint());
}
for (int j = 0; j < udf.options().size(); j++) {
recordBuilder.prop(SPANNER_OPTION + j, udf.options().get(j));
}

schemas.add(recordBuilder.fields().endRecord());
}
for (Table table : ddl.allTables()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public Ddl scan() {
if (isUdfSupported()) {
listUdfs(builder);
listUdfParameters(builder);
listUdfOptions(builder);
}
listColumns(builder);
listColumnOptions(builder);
Expand Down Expand Up @@ -1032,13 +1033,20 @@ private void listUdfs(Ddl.Builder builder) {
case GOOGLE_STANDARD_SQL:
queryStatement =
Statement.of(
"SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name, "
+ "r.data_type, r.routine_definition, r.security_type"
"SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name,"
+ " r.data_type, r.routine_body, r.routine_definition, r.security_type"
+ " FROM information_schema.routines AS r"
+ " WHERE r.routine_schema NOT IN"
+ " ('INFORMATION_SCHEMA', 'SPANNER_SYS')"
+ " AND r.routine_type = 'FUNCTION'"
+ " AND r.routine_body = 'SQL'");
+ " WHERE r.routine_schema NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS')"
+ " AND r.routine_type = 'FUNCTION'");
break;
case POSTGRESQL:
queryStatement =
Statement.of(
"SELECT r.routine_schema, r.routine_name, r.specific_schema, r.specific_name,"
+ " r.data_type, r.routine_body, r.routine_definition, r.security_type"
+ " FROM information_schema.routines AS r WHERE"
+ " r.routine_schema NOT IN ('information_schema', 'spanner_sys', 'pg_catalog')"
+ " AND r.routine_type = 'FUNCTION'");
break;
default:
throw new IllegalArgumentException(
Expand All @@ -1048,20 +1056,28 @@ private void listUdfs(Ddl.Builder builder) {
ResultSet resultSet = context.executeQuery(queryStatement);

while (resultSet.next()) {
String schema = resultSet.isNull(0) ? null : resultSet.getString(0);
String functionName =
resultSet.isNull(0) || resultSet.isNull(1)
? null
: getQualifiedName(resultSet.getString(0), resultSet.getString(1));
resultSet.isNull(1) ? null : getQualifiedName(schema, resultSet.getString(1));
String functionSpecificName =
getQualifiedName(resultSet.getString(2), resultSet.getString(3));
String functionType = resultSet.isNull(4) ? null : resultSet.getString(4);
String functionDefinition = resultSet.isNull(5) ? null : resultSet.getString(5);
String functionSecurityType = resultSet.isNull(6) ? null : resultSet.getString(6);
String language = resultSet.isNull(5) ? null : resultSet.getString(5);
String functionDefinition = resultSet.isNull(6) ? null : resultSet.getString(6);
String functionSecurityType = resultSet.isNull(7) ? null : resultSet.getString(7);

// Built-in functions such as Change Stream READ_X are marked as External.
// Skip and do not re-create they will be autmatically added by change streams.
if ("EXTERNAL".equalsIgnoreCase(language)) {
continue;
}

LOG.debug("Schema user-defined function {}", functionName);
builder
.createUdf(functionSpecificName)
.name(functionName)
.type(functionType)
.language(language)
.definition(functionDefinition)
.security(Udf.SqlSecurity.valueOf(functionSecurityType))
.endUdf();
Expand Down Expand Up @@ -1095,16 +1111,64 @@ private void listUdfParameters(Ddl.Builder builder) {
}
}

private void listUdfOptions(Ddl.Builder builder) {
// PostgreSQL doesn't have ROUTINE_OPTIONS table. It uses AS DEFINITION for options.
if (dialect == Dialect.POSTGRESQL) {
return;
}
// Filter out EXTERNAL functions, which are built-in.
ResultSet resultSet =
context.executeQuery(
Statement.of(
"SELECT o.SPECIFIC_SCHEMA, o.SPECIFIC_NAME, o.OPTION_NAME, o.OPTION_TYPE,"
+ " o.OPTION_VALUE"
+ " FROM information_schema.routine_options AS o"
+ " INNER JOIN information_schema.routines AS r"
+ " USING (SPECIFIC_SCHEMA, SPECIFIC_NAME)"
+ " WHERE o.SPECIFIC_SCHEMA NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS') "
+ " AND r.routine_body != 'EXTERNAL'"
+ " ORDER BY o.SPECIFIC_NAME, o.OPTION_NAME"));

Map<String, ImmutableList.Builder<String>> allOptions = Maps.newHashMap();
while (resultSet.next()) {
String specificName = getQualifiedName(resultSet.getString(0), resultSet.getString(1));
String optionName = resultSet.isNull(2) ? "" : resultSet.getString(2);
String optionType = resultSet.isNull(3) ? "" : resultSet.getString(3);
String optionValue = resultSet.isNull(4) ? "" : resultSet.getString(4);

ImmutableList.Builder<String> options =
allOptions.computeIfAbsent(specificName, k -> ImmutableList.builder());

if (optionType.equalsIgnoreCase("STRING")) {
options.add(
optionName
+ "="
+ GSQL_LITERAL_QUOTE
+ OPTION_STRING_ESCAPER.escape(optionValue)
+ GSQL_LITERAL_QUOTE);
} else {
options.add(optionName + "=" + optionValue);
}
}

for (Map.Entry<String, ImmutableList.Builder<String>> entry : allOptions.entrySet()) {
String specificName = entry.getKey();
ImmutableList<String> options = entry.getValue().build();
builder.createUdf(specificName).options(options).endUdf();
}
}

@VisibleForTesting
Statement listFunctionParametersSQL() {
switch (dialect) {
case GOOGLE_STANDARD_SQL:
return Statement.of(
"SELECT p.specific_schema, p.specific_name, p.parameter_name, p.data_type,"
+ " p.parameter_default FROM information_schema.parameters AS p, information_schema.routines AS r"
+ " WHERE p.specific_schema NOT IN ('INFORMATION_SCHEMA', 'SPANNER_SYS') and p.specific_name ="
+ " r.specific_name and r.routine_type = 'FUNCTION' and r.routine_body = 'SQL' ORDER BY p.specific_schema,"
+ " p.specific_name, p.ordinal_position");
+ " p.parameter_default FROM information_schema.parameters AS p,"
+ " information_schema.routines AS r WHERE p.specific_schema NOT IN"
+ " ('INFORMATION_SCHEMA', 'SPANNER_SYS') and p.specific_name = r.specific_name and"
+ " r.routine_type = 'FUNCTION' ORDER BY"
+ " p.specific_schema, p.specific_name, p.ordinal_position");

default:
throw new IllegalArgumentException("Unrecognized dialect: " + dialect);
Expand Down
84 changes: 76 additions & 8 deletions v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Udf.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.spanner.Dialect;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.escape.Escapers;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedHashMap;
Expand All @@ -32,6 +34,11 @@ public abstract class Udf implements Serializable {

private static final long serialVersionUID = 1L;

// Remote UDF definition is printed using: AS '{definition}'.
// Quotes inside definiton string can be escaped using ''.
public static final Escaper PG_DEFINITION_ESCAPER =
Escapers.builder().addEscape('\'', "''").build();

/** The access rights used by the UDF for underlying data: invoker-rights or definer-rights. */
public enum SqlSecurity {
INVOKER,
Expand All @@ -57,11 +64,16 @@ public enum SqlSecurity {
@Nullable
public abstract String definition();

@Nullable
public abstract String language();

@Nullable
public abstract SqlSecurity security();

public abstract ImmutableList<UdfParameter> parameters();

public abstract ImmutableList<String> options();

public void prettyPrint(Appendable appendable) throws IOException {
appendable.append("CREATE FUNCTION ").append(quoteIdentifier(name(), dialect()));
appendable.append("(");
Expand All @@ -77,14 +89,53 @@ public void prettyPrint(Appendable appendable) throws IOException {
if (type() != null) {
appendable.append(" RETURNS ").append(type());
}
SqlSecurity rights = security();
if (rights != null) {
appendable.append(" SQL SECURITY ").append(rights.toString());

// Determinism should be added to INFORMATION_SCHEMA.ROUTINES.
// For now, we infer it from the language.
if (language() != null && language().equalsIgnoreCase("REMOTE")) {
String determinism;
if (dialect() == Dialect.GOOGLE_STANDARD_SQL) {
determinism = "NOT DETERMINISTIC";
} else {
determinism = "VOLATILE";
}
appendable.append(" ").append(determinism);
}
if (definition() != null) {
appendable.append(" AS (");
appendable.append(definition());
appendable.append(")");

if (language() != null && !language().isEmpty()) {
// GSQL does not accept LANGUAGE SQL even though it reports it.
if (dialect() != Dialect.GOOGLE_STANDARD_SQL || !language().equalsIgnoreCase("SQL")) {
appendable.append(" LANGUAGE ").append(language());
}
}

if (security() != null) {
// Remote UDF don't use SQL SECURITY, but it is marked NOT NULL in IS.
if (!"REMOTE".equalsIgnoreCase(language())) {
appendable.append(" SQL SECURITY ").append(security().toString());
}
}

if (!options().isEmpty()) {
if (dialect() == Dialect.GOOGLE_STANDARD_SQL) {
appendable.append(" OPTIONS (").append(String.join(", ", options())).append(")");
} else {
throw new IllegalArgumentException(
"Options are not supported in PostgreSQL dialect for non-remote UDFs.");
}
}

if (definition() != null && !definition().isEmpty()) {
if (dialect() == Dialect.GOOGLE_STANDARD_SQL) {
appendable.append(" AS (").append(definition()).append(")");
} else {
if (language() == null || language().isEmpty() || "SQL".equalsIgnoreCase(language())) {
appendable.append(" RETURN ").append(definition());
} else {
// Other languages use AS definition instead of sql body.
appendable.append(" AS '").append(PG_DEFINITION_ESCAPER.escape(definition())).append("'");
}
}
}
}

Expand Down Expand Up @@ -113,6 +164,10 @@ public Builder toBuilder() {
if (type() != null) {
builder.type(type());
}
if (language() != null) {
builder.language(language());
}
builder.options(options());
if (definition() != null) {
builder.definition(definition());
}
Expand All @@ -126,7 +181,10 @@ public Builder toBuilder() {
}

public static Builder builder(Dialect dialect) {
return new AutoValue_Udf.Builder().dialect(dialect).parameters(ImmutableList.of());
return new AutoValue_Udf.Builder()
.dialect(dialect)
.parameters(ImmutableList.of())
.options(ImmutableList.of());
}

public static Builder builder() {
Expand Down Expand Up @@ -165,10 +223,18 @@ public Builder ddlBuilder(Ddl.Builder ddlBuilder) {

public abstract String definition();

public abstract Builder language(String language);

public abstract String language();

public abstract Builder security(SqlSecurity rights);

public abstract SqlSecurity security();

public abstract Builder options(ImmutableList<String> options);

public abstract ImmutableList<String> options();

public abstract Builder parameters(ImmutableList<UdfParameter> parameters);

public ImmutableList<UdfParameter> parameters() {
Expand Down Expand Up @@ -208,7 +274,9 @@ public Udf build() {
.dialect(dialect())
.type(type())
.definition(definition())
.language(language())
.security(security())
.options(options())
.parameters(ImmutableList.copyOf(parameters()))
.autoBuild();
}
Expand Down
Loading
Loading