diff --git a/sql_scripts/remote_inference/README.md b/sql_scripts/remote_inference/README.md new file mode 100644 index 0000000..32f8fe7 --- /dev/null +++ b/sql_scripts/remote_inference/README.md @@ -0,0 +1,87 @@ +# BigQuery Remote Inference Scripts + +BigQuery supports remote models, such as Vertex AI LLMs, to perform remote inference operations on both structured and unstructured data. When using remote inference, the user needs to be aware of quotas and limits. If these limits are exceeded, it can result in retryable errors for a subset of rows. This often requires reprocessing. + +In cases when a retryable error has occurred for some rows, we provide SQL scripts to iterate through the inference call until all rows have been successfully labeled. + +There are two scripts based whether the input data is in an object table or a native BigQuery table. + +## Object table script +The object table script creates a target table to store successful ML inference results. To do this, it calls the inference in a loop. In the first iteration, a small LIMIT is set on the inference call to quickly create a table with the desired schema. The number of rows to process for each inference call can be modified through the `batch_size` parameter. + +This script applies to the following models: +- ML.ANNOTATE_IMAGE +- ML.PROCESS_DOCUMENT +- ML.TRANSCRIBE +- ML.GENERATE_TEXT + +For the object SQL script, you need to update the following parameters at the top of the [object table script](object_table_inference_loop_generic.sql): + +``` +-- The name of the object table +DECLARE obj_table DEFAULT /* obj_table name */; + +-- The name of the target table +DECLARE target_table DEFAULT /* target_table name */; + +-- The name of the unique key column +DECLARE key_column DEFAULT "uri"; + +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT /* model name */; + +-- The SQL query on the object table to perform the desired ML operation +DECLARE ml_function DEFAULT FORMAT(""" + SELECT * FROM /* ml function name */( + MODEL `%s`, + TABLE `%s`, + /* ml function options */""", + ml_model, obj_table); + +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT /* status column name */; +``` + +We provide an example using ML.ANNOTATE_IMAGE under the [object table example script](object_table_script_inference_loop_example.sql). + +## Structured table script + +This script creates a target table to track all successful ML inferences and loops through the inference call until all rows are labeled. + +To find the rows that need to be labeled at each iteration, you need to refer to the candidate key of the table, the `key_columns` parameter in the script. + +This script applies to the following models: +- ML.GENERATE_EMBEDDINGS +- ML.GENERATE_TEXT +- ML.UNDERSTAND_TEXT +- ML.TRANSLATE + +For the SQL script, you need to update the following parameters at the top of the [structured table script](structured_table_inference_loop_generic.sql): + +``` +-- The name of the source table +DECLARE source_table DEFAULT /* source table name */; + +-- The name of the target table +DECLARE target_table DEFAULT /* target table name */; + +-- The unique key columns +DECLARE key_columns DEFAULT ARRAY[/* key columns */]; + +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT /* ml model name */; + +-- The name of the ML function to use for the ML operation +DECLARE ml_function DEFAULT /* ml function name */; + +-- The ML query to use for the ML operation, requires the unique key +DECLARE ml_query DEFAULT "SELECT *, /* ML operation dependent field */ FROM `" || source_table || "`"; + +-- The ML options to use for the ML operation +DECLARE ml_options DEFAULT /* ml function options */; + +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT /* status column name */; +``` + +We provide an example using ML.GENERATE_EMBEDDING in the [structured table example script]( structured_table_script_inference_loop_example.sql). \ No newline at end of file diff --git a/sql_scripts/remote_inference/object_table_inference_loop_example.sql b/sql_scripts/remote_inference/object_table_inference_loop_example.sql new file mode 100644 index 0000000..485f909 --- /dev/null +++ b/sql_scripts/remote_inference/object_table_inference_loop_example.sql @@ -0,0 +1,82 @@ +-- The name of the object table +DECLARE obj_table DEFAULT "sample.imagesets"; +-- The name of the target table +DECLARE target_table DEFAULT "sample.annotated_imagesets"; +-- The name of the unique key column +DECLARE key_column DEFAULT "uri"; +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT "sample.vision"; +-- The SQL query on the object table to perform the desired ML operation +DECLARE ml_function DEFAULT FORMAT(""" + SELECT * FROM ML.ANNOTATE_IMAGE( + MODEL `%s`, + TABLE `%s`, + STRUCT(['LABEL_DETECTION'] AS vision_features))""", + ml_model, obj_table); +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT "ml_annotate_image_status"; +-- The filter condition for accepting the ML result into the target table +DECLARE accept_filter DEFAULT ml_status_col_name || " NOT LIKE 'A retryable error occurred:%'"; +-- The number of rows to process per each query +DECLARE batch_size DEFAULT 500; +-- The number of seconds elapsed to have this script terminated +DECLARE termination_time_secs DEFAULT (22 * 60 * 60); + + +-- Incrementally perform a given ML operation over a source table +-- until the target table is fully populated or execution time +-- exceeded the termination_time_secs +BEGIN + DECLARE cols_assignment STRING; + DECLARE selected_keys ARRAY; + + -- Creates the target table if it does not exist. + -- + -- The table is created by running the ML operation and copying rows that are accepted + -- by the filter into the target table. A small limit is used to create the table with + -- the desired schema and to avoid spending too much time in computing the ML operation. + EXECUTE IMMEDIATE FORMAT(""" + CREATE TABLE IF NOT EXISTS `%s` + AS %s + LIMIT 10""", + target_table, ml_function); + + -- Forms the field assignment statement based on the target table column. + -- It will be used for the subsequence MERGE operations + EXECUTE IMMEDIATE FORMAT(""" + SELECT STRING_AGG(column_name || ' = S.' || column_name, ', ') + FROM `%s.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '%s'""", + LEFT(target_table, INSTR(target_table, ".", -1) - 1), + SUBSTR(target_table, INSTR(target_table, ".", -1) + 1) + ) INTO cols_assignment; + + -- Repeatedly performs the ML operation for objects that are not yet in + -- the target table, or update the result for objects that + -- have been changed since the last run. + REPEAT + EXECUTE IMMEDIATE FORMAT(""" + SELECT ARRAY( + SELECT %s + FROM `%s` AS S + WHERE NOT EXISTS + (SELECT * FROM `%s` AS T WHERE S.%s = T.%s) + OR updated > (SELECT max(updated) FROM `%s`) + LIMIT %d + )""", + key_column, obj_table, target_table, key_column, key_column, target_table, batch_size) + INTO selected_keys; + + EXECUTE IMMEDIATE FORMAT(""" + MERGE %s T + USING (%s WHERE %s IN UNNEST(?) AND %s) S + ON S.%s = T.%s + WHEN NOT MATCHED THEN INSERT ROW + WHEN MATCHED THEN UPDATE SET %s""", + target_table, ml_function, key_column, accept_filter, key_column, key_column, + cols_assignment + ) USING selected_keys; + UNTIL (SELECT @@row_count) = 0 + OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), + @@script.creation_time, SECOND) >= termination_time_secs + END REPEAT; +END; diff --git a/sql_scripts/remote_inference/object_table_inference_loop_generic.sql b/sql_scripts/remote_inference/object_table_inference_loop_generic.sql new file mode 100644 index 0000000..cfe617a --- /dev/null +++ b/sql_scripts/remote_inference/object_table_inference_loop_generic.sql @@ -0,0 +1,86 @@ +-- *** Please fill in this section *** +-- Please note that the following script needs to be filled in and will not run in the given state. + +-- The name of the object table +DECLARE obj_table DEFAULT /* obj_table name */; +-- The name of the target table +DECLARE target_table DEFAULT /* target_table name */; +-- The name of the unique key column +DECLARE key_column DEFAULT "uri"; +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT /* model name */; +-- The SQL query on the object table to perform the desired ML operation +DECLARE ml_function DEFAULT FORMAT(""" + SELECT * FROM /* ml function name */( + MODEL `%s`, + TABLE `%s`, + /* ml function options */""", + ml_model, obj_table); +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT /* status column name */; +-- The filter condition for accepting the ML result into the target table +DECLARE accept_filter DEFAULT ml_status_col_name || " NOT LIKE 'A retryable error occurred:%'"; +-- The number of rows to process per each query +DECLARE batch_size DEFAULT 500; +-- The number of seconds elapsed to have this script terminated +DECLARE termination_time_secs DEFAULT (22 * 60 * 60); +-- *** End of section *** + + +-- Incrementally perform a given ML operation over a source table +-- until the target table is fully populated or execution time +-- exceeded the termination_time_secs +BEGIN + DECLARE cols_assignment STRING; + DECLARE selected_keys ARRAY; + + -- Creates the target table if it does not exist. + -- + -- The table is created by running the ML operation and copying rows that are accepted + -- by the filter into the target table. A small limit is used to create the table with + -- the desired schema and to avoid spending too much time in computing the ML operation. + EXECUTE IMMEDIATE FORMAT(""" + CREATE TABLE IF NOT EXISTS `%s` + AS %s + LIMIT 10""", + target_table, ml_function); + + -- Forms the field assignment statement based on the target table column. + -- It will be used for the subsequence MERGE operations + EXECUTE IMMEDIATE FORMAT(""" + SELECT STRING_AGG(column_name || ' = S.' || column_name, ', ') + FROM `%s.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '%s'""", + LEFT(target_table, INSTR(target_table, ".", -1) - 1), + SUBSTR(target_table, INSTR(target_table, ".", -1) + 1) + ) INTO cols_assignment; + + -- Repeatedly performs the ML operation for objects that are not yet in + -- the target table, or update the result for objects that + -- have been changed since the last run. + REPEAT + EXECUTE IMMEDIATE FORMAT(""" + SELECT ARRAY( + SELECT %s + FROM `%s` AS S + WHERE NOT EXISTS + (SELECT * FROM `%s` AS T WHERE S.%s = T.%s) + OR updated > (SELECT max(updated) FROM `%s`) + LIMIT %d + )""", + key_column, obj_table, target_table, key_column, key_column, target_table, batch_size) + INTO selected_keys; + + EXECUTE IMMEDIATE FORMAT(""" + MERGE %s T + USING (%s WHERE %s IN UNNEST(?) AND %s) S + ON S.%s = T.%s + WHEN NOT MATCHED THEN INSERT ROW + WHEN MATCHED THEN UPDATE SET %s""", + target_table, ml_function, key_column, accept_filter, key_column, key_column, + cols_assignment + ) USING selected_keys; + UNTIL (SELECT @@row_count) = 0 + OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), + @@script.creation_time, SECOND) >= termination_time_secs + END REPEAT; +END; diff --git a/sql_scripts/remote_inference/structured_table_inference_loop_example.sql b/sql_scripts/remote_inference/structured_table_inference_loop_example.sql new file mode 100644 index 0000000..aac2bc5 --- /dev/null +++ b/sql_scripts/remote_inference/structured_table_inference_loop_example.sql @@ -0,0 +1,104 @@ +-- The name of the source table +DECLARE source_table DEFAULT "sample.hacker"; +-- The name of the target table +DECLARE target_table DEFAULT "sample.hacker_embedding"; +-- The unique key columns +DECLARE key_columns DEFAULT ARRAY["id"]; +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT "sample.embedding"; +-- The name of the ML function to use for the ML operation +DECLARE ml_function DEFAULT "ML.GENERATE_EMBEDDING"; + +-- The ML query to use for the ML operation, requires the unique key +DECLARE + ml_query + DEFAULT + FORMAT( + "SELECT %s, text AS content FROM `%s`", ARRAY_TO_STRING(key_columns, ','), source_table); + +-- The ML options to use for the ML operation +DECLARE ml_options DEFAULT "STRUCT(TRUE AS flatten_json_output)"; +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT "ml_generate_embedding_status"; +-- The filter condition for accepting the ML result into the target table +DECLARE accept_filter DEFAULT ml_status_col_name || " NOT LIKE 'A retryable error occurred:%'"; +-- The number of rows to process per each query +DECLARE batch_size DEFAULT 10000; +DECLARE termination_time_secs DEFAULT(23 * 60 * 60); + +-- Create the target table first if it does not exist +EXECUTE + IMMEDIATE + FORMAT( + """ + CREATE TABLE IF NOT EXISTS `%s` AS + (SELECT * + FROM %s (MODEL `%s`, + (SELECT * + FROM (%s) + LIMIT %d), %s) + WHERE %s)""", + target_table, + ml_function, + ml_model, + ml_query, + bucket_size, + ml_options, + accept_filter); + +-- Iteratively populate the target table +BEGIN + DECLARE cols_assignment STRING; + +DECLARE + key_cols_filter + DEFAULT( + SELECT STRING_AGG("S." || key || " = T." || key, " AND ") FROM UNNEST(key_columns) AS key + ); + +EXECUTE + IMMEDIATE + FORMAT( + """ + SELECT + STRING_AGG(column_name || ' = S.' || column_name, ', ') + FROM `%s.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '%s'""", + LEFT(target_table, INSTR(target_table, ".", -1) - 1), + SUBSTR(target_table, INSTR(target_table, ".", -1) + 1)) + INTO cols_assignment; + +REPEAT + EXECUTE + IMMEDIATE + FORMAT( + """ + MERGE `%s` T + USING (SELECT * + FROM %s (MODEL `%s`, + (SELECT * + FROM (%s) AS S + WHERE NOT EXISTS (SELECT * FROM %s AS T WHERE %s) LIMIT %d), %s) + WHERE %s) S + ON %s + WHEN NOT MATCHED THEN INSERT ROW + WHEN MATCHED THEN UPDATE SET %s + """, + target_table, + ml_function, + ml_model, + ml_query, + target_table, + key_cols_filter, + bucket_size, + ml_options, + accept_filter, + key_cols_filter, + cols_assignment); + +UNTIL(SELECT @@row_count) += 0 +OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), @@script.creation_time, SECOND) + >= termination_time_secs + END REPEAT; + +END; diff --git a/sql_scripts/remote_inference/structured_table_inference_loop_generic.sql b/sql_scripts/remote_inference/structured_table_inference_loop_generic.sql new file mode 100644 index 0000000..f4070ed --- /dev/null +++ b/sql_scripts/remote_inference/structured_table_inference_loop_generic.sql @@ -0,0 +1,103 @@ +-- *** Please fill in this section *** +-- Please note that the following script needs to be filled in and will not run in the given state. + +-- The name of the source table +DECLARE source_table DEFAULT /* source table name */; +-- The name of the target table +DECLARE target_table DEFAULT /* target table name */; +-- The unique key columns +DECLARE key_columns DEFAULT ARRAY[/* key columns */]; +-- The name of the ML model to use for the ML operation +DECLARE ml_model DEFAULT /* ml model name */; +-- The name of the ML function to use for the ML operation +DECLARE ml_function DEFAULT /* ml function name */; +-- The ML query to use for the ML operation, requires the unique key +DECLARE ml_query DEFAULT "SELECT *, /* ML operation dependent field */ FROM `" || source_table || "`"; +-- The ML options to use for the ML operation +DECLARE ml_options DEFAULT /* ml function options */; +-- Name of the status column as output by the above ML operation +DECLARE ml_status_col_name DEFAULT /* status column name */; +-- The filter condition for accepting the ML result into the target table +DECLARE accept_filter DEFAULT ml_status_col_name || " NOT LIKE 'A retryable error occurred:%'"; +-- The number of rows to process per each query +DECLARE batch_size DEFAULT 10000; +-- The number of seconds elapsed to have this script terminated +DECLARE termination_time_secs DEFAULT(23 * 60 * 60); +-- *** End of section *** + +-- Create the target table first if it does not exist +EXECUTE + IMMEDIATE + FORMAT( + """ + CREATE TABLE IF NOT EXISTS `%s` AS + (SELECT * + FROM %s (MODEL `%s`, + (SELECT * + FROM (%s) + LIMIT %d), %s) + WHERE %s)""", + target_table, + ml_function, + ml_model, + ml_query, + bucket_size, + ml_options, + accept_filter); + +-- Iteratively populate the target table +BEGIN + DECLARE cols_assignment STRING; + +DECLARE + key_cols_filter + DEFAULT( + SELECT STRING_AGG("S." || key || " = T." || key, " AND ") FROM UNNEST(key_columns) AS key + ); + +EXECUTE + IMMEDIATE + FORMAT( + """ + SELECT + STRING_AGG(column_name || ' = S.' || column_name, ', ') + FROM `%s.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '%s'""", + LEFT(target_table, INSTR(target_table, ".", -1) - 1), + SUBSTR(target_table, INSTR(target_table, ".", -1) + 1)) + INTO cols_assignment; + +REPEAT + EXECUTE + IMMEDIATE + FORMAT( + """ + MERGE `%s` T + USING (SELECT * + FROM %s (MODEL `%s`, + (SELECT * + FROM (%s) AS S + WHERE NOT EXISTS (SELECT * FROM %s AS T WHERE %s) LIMIT %d), %s) + WHERE %s) S + ON %s + WHEN NOT MATCHED THEN INSERT ROW + WHEN MATCHED THEN UPDATE SET %s + """, + target_table, + ml_function, + ml_model, + ml_query, + target_table, + key_cols_filter, + bucket_size, + ml_options, + accept_filter, + key_cols_filter, + cols_assignment); + +UNTIL(SELECT @@row_count) += 0 +OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), @@script.creation_time, SECOND) + >= termination_time_secs + END REPEAT; + +END;