diff --git a/syncserver-settings/src/lib.rs b/syncserver-settings/src/lib.rs index c73bd6b97d..f461ad90a4 100644 --- a/syncserver-settings/src/lib.rs +++ b/syncserver-settings/src/lib.rs @@ -170,7 +170,6 @@ impl Settings { settings.port = 8000; settings.syncstorage.database_pool_max_size = 1; settings.syncstorage.database_use_test_transactions = true; - settings.syncstorage.database_spanner_use_mutations = false; settings.syncstorage.database_pool_connection_max_idle = Some(300); settings.syncstorage.database_pool_connection_lifespan = Some(300); settings diff --git a/syncstorage-db/src/tests/batch.rs b/syncstorage-db/src/tests/batch.rs index 3e31c917c1..102ae2eef6 100644 --- a/syncstorage-db/src/tests/batch.rs +++ b/syncstorage-db/src/tests/batch.rs @@ -375,6 +375,128 @@ async fn test_append_async_w_empty_string() -> Result<(), DbError> { .await } +#[tokio::test] +async fn test_append_upsert_overwrites_same_batch_bso_id() -> Result<(), DbError> { + let settings = Settings::test_settings().syncstorage; + with_test_transaction(settings, async |db: &mut dyn Db| { + let uid = 1; + let coll = "clients"; + let bid = "b0"; + let first_payload = "wibble"; + let second_payload = "over 9000"; + + let new_batch = db.create_batch(cb(uid, coll, vec![])).await?; + db.append_to_batch(ab( + uid, + coll, + new_batch.clone(), + vec![postbso(bid, Some(first_payload), Some(10), None)], + )) + .await?; + db.append_to_batch(ab( + uid, + coll, + new_batch.clone(), + vec![postbso(bid, Some(second_payload), None, None)], + )) + .await?; + + let batch = db + .get_batch(gb(uid, coll, new_batch.id.clone())) + .await? + .unwrap(); + db.commit_batch(params::CommitBatch { + user_id: hid(uid), + collection: coll.to_owned(), + batch, + }) + .await?; + + let bso = db.get_bso(gbso(uid, coll, bid)).await?.unwrap(); + assert_eq!(bso.payload, second_payload); + assert_eq!(bso.sortindex, Some(10)); + Ok(()) + }) + .await +} + +#[tokio::test] +async fn test_commit_batch_partial_overlap() -> Result<(), DbError> { + let settings = Settings::test_settings().syncstorage; + with_test_transaction(settings, async |db: &mut dyn Db| { + let uid = 1; + let coll = "clients"; + let bid_overlap = "b_overlap"; + let bid_existing_only = "b_existing"; + let bid_new = "b_new"; + + let original_overlap_payload = "fizz"; + let existing_only_payload = "buzz"; + let updated_overlap_payload = "quux"; + let new_payload = "new hotness"; + + db.put_bso(pbso( + uid, + coll, + bid_overlap, + Some(original_overlap_payload), + Some(1), + None, + )) + .await?; + db.put_bso(pbso( + uid, + coll, + bid_existing_only, + Some(existing_only_payload), + Some(2), + None, + )) + .await?; + + let new_batch = db.create_batch(cb(uid, coll, vec![])).await?; + db.append_to_batch(ab( + uid, + coll, + new_batch.clone(), + vec![ + postbso(bid_overlap, Some(updated_overlap_payload), None, None), + postbso(bid_new, Some(new_payload), Some(3), None), + ], + )) + .await?; + + let batch = db + .get_batch(gb(uid, coll, new_batch.id.clone())) + .await? + .unwrap(); + db.commit_batch(params::CommitBatch { + user_id: hid(uid), + collection: coll.to_owned(), + batch, + }) + .await?; + + let overlap = db.get_bso(gbso(uid, coll, bid_overlap)).await?.unwrap(); + assert_eq!(overlap.payload, updated_overlap_payload); + assert_eq!(overlap.sortindex, Some(1)); + + let existing = db + .get_bso(gbso(uid, coll, bid_existing_only)) + .await? + .unwrap(); + assert_eq!(existing.payload, existing_only_payload); + assert_eq!(existing.sortindex, Some(2)); + + let new = db.get_bso(gbso(uid, coll, bid_new)).await?.unwrap(); + assert_eq!(new.payload, new_payload); + assert_eq!(new.sortindex, Some(3)); + + Ok(()) + }) + .await +} + #[tokio::test] async fn pretouch() -> Result<(), DbError> { let settings = Settings::test_settings().syncstorage; diff --git a/syncstorage-db/src/tests/db.rs b/syncstorage-db/src/tests/db.rs index e550b88ecc..7206cbdfc5 100644 --- a/syncstorage-db/src/tests/db.rs +++ b/syncstorage-db/src/tests/db.rs @@ -1104,3 +1104,46 @@ async fn heartbeat() -> Result<(), DbError> { }) .await } + +#[tokio::test] +async fn put_bso_preserves_payload_on_sortindex_only_update() -> Result<(), DbError> { + with_test_transaction(None, async |db: &mut dyn Db| { + let uid = *UID; + let coll = "clients"; + let bid = "preserve_payload"; + + db.put_bso(pbso(uid, coll, bid, Some("quux"), Some(10), None)) + .await?; + + db.put_bso(pbso(uid, coll, bid, None, Some(20), None)) + .await?; + + let bso = db.get_bso(gbso(uid, coll, bid)).await?.unwrap(); + assert_eq!(bso.payload, "quux"); + assert_eq!(bso.sortindex, Some(20)); + Ok(()) + }) + .await +} + +#[tokio::test] +async fn put_bso_preserves_expiry_on_partial_update() -> Result<(), DbError> { + with_test_transaction(None, async |db: &mut dyn Db| { + let uid = *UID; + let coll = "clients"; + let bid = "preserve_expiry"; + + db.put_bso(pbso(uid, coll, bid, Some("quux"), None, Some(3600))) + .await?; + let original_expiry = db.get_bso(gbso(uid, coll, bid)).await?.unwrap().expiry; + + db.put_bso(pbso(uid, coll, bid, Some("baz"), None, None)) + .await?; + + let bso = db.get_bso(gbso(uid, coll, bid)).await?.unwrap(); + assert_eq!(bso.payload, "baz"); + assert_eq!(bso.expiry, original_expiry); + Ok(()) + }) + .await +} diff --git a/syncstorage-settings/src/lib.rs b/syncstorage-settings/src/lib.rs index 980debc550..105aabfa52 100644 --- a/syncstorage-settings/src/lib.rs +++ b/syncstorage-settings/src/lib.rs @@ -81,8 +81,6 @@ pub struct Settings { pub database_pool_sweeper_task_interval: u32, #[cfg(debug_assertions)] pub database_use_test_transactions: bool, - #[cfg(debug_assertions)] - pub database_spanner_use_mutations: bool, /// Whether leader aware router headers are sent to Spanner pub database_spanner_route_to_leader: bool, @@ -119,8 +117,6 @@ impl Default for Settings { database_pool_connection_timeout: Some(30), #[cfg(debug_assertions)] database_use_test_transactions: false, - #[cfg(debug_assertions)] - database_spanner_use_mutations: true, database_spanner_route_to_leader: false, limits: ServerLimits::default(), statsd_label: "syncstorage".to_string(), diff --git a/syncstorage-spanner/src/db/BATCH_COMMIT.txt b/syncstorage-spanner/src/db/BATCH_COMMIT.txt deleted file mode 100644 index 91d1dffc2c..0000000000 --- a/syncstorage-spanner/src/db/BATCH_COMMIT.txt +++ /dev/null @@ -1,58 +0,0 @@ -Batches serve as a temporary storage for POST'd bsos. Committing a batch -entails moving the bsos from the batch table to their final destination in the -`bsos` table. - -The move operation is like an UPSERT: the bsos in the batch will either be -INSERT'd into the `bsos` table or UPDATE'd if they already exist. Only columns -with NON NULL values in the batch table will be UPDATE'd in the `bsos` table: -those with NULL values won't be touched. When INSERTing bsos, columns with NULL -values recieve defaults. - -The entire move (batch commit) happens in one single transaction. Spanner -limits the number of writes (or "mutations") within a transaction to 20000 -total mutations. A mutation is a write to an individual column. Any writes -requiring modifications to secondary indices also incur additional mutations. -DELETEs are generally cheaper (incurring one mutation per DELETE, not including -secondary indices). - -The batch commit mutations are as follows: - -- Ensure a parent record exists in `user_collections` (due to bsos INTERLEAVE - IN PARENT `user_collections`): 4 mutations (quota: False) or 6 mutations - (quota: True) - - INSERT or UPDATE: - - 3 key columns - - quota: False - - 1 non key column - - quota: True - - 3 non key columns - -- Possibly direct inserts via post_bsos (but these only reduce total mutations - by avoiding writing to the batch table, so not included here) - -- Write 1664 (MAX_TOTAL_RECORDS) to `bsos`: max 19968 (1664 * 12) mutations - - INSERT takes 10 mutations: - - 4 key columns - - 4 non key columns - - INSERT into 2 secondary indices: 2 mutations - - UPDATE takes 12 mutations: - - 4 key columns - - 4 non key columns - - UPDATE of 2 secondary indices: Each requires deleting + inserting a row: - 2 mutations each. 4 total - -- Delete the batch - - DELETE incurs 1 mutation - -- Update `user_collections` quota counts (only when quota: True): 6 mutations - - UPDATE: - - 3 key columns - - quota: True - - 3 non key columns - -Totals: -- quota: False - 4 + 19968 + 1 = 19973 - -- quota: True - 6 + 19968 + 1 + 6 = 19981 diff --git a/syncstorage-spanner/src/db/batch_commit_insert.sql b/syncstorage-spanner/src/db/batch_commit_insert.sql deleted file mode 100644 index 81dad3f8dd..0000000000 --- a/syncstorage-spanner/src/db/batch_commit_insert.sql +++ /dev/null @@ -1,26 +0,0 @@ -INSERT INTO bsos (fxa_uid, fxa_kid, collection_id, bso_id, sortindex, payload, modified, expiry) -SELECT - batch_bsos.fxa_uid, - batch_bsos.fxa_kid, - batch_bsos.collection_id, - batch_bsos.batch_bso_id, - - batch_bsos.sortindex, - COALESCE(batch_bsos.payload, ''), - @timestamp, - COALESCE( - TIMESTAMP_ADD(@timestamp, INTERVAL batch_bsos.ttl SECOND), - TIMESTAMP_ADD(@timestamp, INTERVAL @default_bso_ttl SECOND) - ) - FROM batch_bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND batch_id = @batch_id - AND batch_bso_id NOT in ( - SELECT bso_id - FROM bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - ) diff --git a/syncstorage-spanner/src/db/batch_commit_update.sql b/syncstorage-spanner/src/db/batch_commit_update.sql deleted file mode 100644 index 238ac025a6..0000000000 --- a/syncstorage-spanner/src/db/batch_commit_update.sql +++ /dev/null @@ -1,50 +0,0 @@ -UPDATE bsos - SET sortindex = COALESCE( - (SELECT sortindex - FROM batch_bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND batch_id = @batch_id - AND batch_bso_id = bsos.bso_id - ), - bsos.sortindex - ), - - payload = COALESCE( - (SELECT payload - FROM batch_bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND batch_id = @batch_id - AND batch_bso_id = bsos.bso_id - ), - bsos.payload - ), - - modified = @timestamp, - - expiry = COALESCE( - -- TIMESTAMP_ADD returns NULL when ttl is null - (SELECT TIMESTAMP_ADD(@timestamp, INTERVAL ttl SECOND) - FROM batch_bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND batch_id = @batch_id - AND batch_bso_id = bsos.bso_id - ), - bsos.expiry - ) - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND bso_id in ( - SELECT batch_bso_id - FROM batch_bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND batch_id = @batch_id - ) diff --git a/syncstorage-spanner/src/db/batch_commit_upsert.sql b/syncstorage-spanner/src/db/batch_commit_upsert.sql new file mode 100644 index 0000000000..7de916ef2a --- /dev/null +++ b/syncstorage-spanner/src/db/batch_commit_upsert.sql @@ -0,0 +1,37 @@ +-- bsos.payload/modified/expiry are NOT NULL with no schema default, so the +-- insert of INSERT OR UPDATE must supply a value for every column. Each +-- driving `batch_bsos` row LEFT JOINs to a subquery `existing` row. On update +-- it provides values to COALESCE over; on insert the join misses, `existing.*` +-- is NULL, and the COALESCE fallback applies. +INSERT OR UPDATE INTO bsos + (fxa_uid, fxa_kid, collection_id, bso_id, sortindex, payload, modified, expiry) +SELECT + bb.fxa_uid, + bb.fxa_kid, + bb.collection_id, + bb.batch_bso_id, + COALESCE(bb.sortindex, existing.sortindex), + COALESCE(bb.payload, existing.payload, ''), + @timestamp, + COALESCE( + TIMESTAMP_ADD(@timestamp, INTERVAL bb.ttl SECOND), + existing.expiry, + TIMESTAMP_ADD(@timestamp, INTERVAL @default_bso_ttl SECOND) + ) + FROM batch_bsos AS bb + LEFT JOIN ( + SELECT fxa_uid, fxa_kid, collection_id, bso_id, + sortindex, payload, expiry + FROM bsos + WHERE fxa_uid = @fxa_uid + AND fxa_kid = @fxa_kid + AND collection_id = @collection_id + ) AS existing + ON existing.fxa_uid = bb.fxa_uid + AND existing.fxa_kid = bb.fxa_kid + AND existing.collection_id = bb.collection_id + AND existing.bso_id = bb.batch_bso_id + WHERE bb.fxa_uid = @fxa_uid + AND bb.fxa_kid = @fxa_kid + AND bb.collection_id = @collection_id + AND bb.batch_id = @batch_id diff --git a/syncstorage-spanner/src/db/batch_impl.rs b/syncstorage-spanner/src/db/batch_impl.rs index 2b5846ce56..1b4b02bdba 100644 --- a/syncstorage-spanner/src/db/batch_impl.rs +++ b/syncstorage-spanner/src/db/batch_impl.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::{collections::HashMap, str::FromStr}; use async_trait::async_trait; use google_cloud_rust_raw::spanner::v1::type_pb::{StructType, Type, TypeCode}; @@ -190,47 +187,25 @@ impl BatchDb for SpannerDb { }) .await?; - let as_rfc3339 = timestamp.as_rfc3339()?; { - // First, UPDATE existing rows in the bsos table with any new values - // supplied in this batch let mut timer2 = self.metrics.clone(); - timer2.start_timer("storage.spanner.apply_batch_update", None); - let (sqlparams, mut sqlparam_types) = params! { - "fxa_uid" => params.user_id.fxa_uid.clone(), - "fxa_kid" => params.user_id.fxa_kid.clone(), - "collection_id" => collection_id, - "batch_id" => params.batch.id.clone(), - "timestamp" => as_rfc3339.clone(), - }; - sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP)); - // NOTE: This write treats both expired and non-expired as existing - // bsos. See the note in [SpannerDb::post_bsos_with_mutations] - self.sql(include_str!("batch_commit_update.sql")) - .await? - .params(sqlparams) - .param_types(sqlparam_types) - .execute_dml(&self.conn) - .await?; - } - - { - // Then INSERT INTO SELECT remaining rows from this batch into the bsos - // table (that didn't already exist there) + timer2.start_timer("storage.spanner.apply_batch_upsert", None); let (sqlparams, mut sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid.clone(), "fxa_kid" => params.user_id.fxa_kid.clone(), "collection_id" => collection_id, "batch_id" => params.batch.id.clone(), - "timestamp" => as_rfc3339, + "timestamp" => timestamp.as_rfc3339()?, "default_bso_ttl" => DEFAULT_BSO_TTL, }; sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP)); - let mut timer3 = self.metrics.clone(); - timer3.start_timer("storage.spanner.apply_batch_insert", None); - // NOTE: This write treats both expired and non-expired as existing - // bsos. See the note in [SpannerDb::post_bsos_with_mutations] - self.sql(include_str!("batch_commit_insert.sql")) + // NOTE: This write treats expired and non-expired as existing bsos. We don't filter on + // `expiry > CURRENT_TIMESTAMP()` to avoid having to delete expired rows before + // inserting new ones with the same id. Unfortunately, this means updates may resurrect + // expired bsos (or at least a subset of their fields), or possibly even write new data + // without an associated ttl to an expired record that will be deleted. This in + // practice should be a very rare occurrence. + self.sql(include_str!("batch_commit_upsert.sql")) .await? .params(sqlparams) .param_types(sqlparam_types) @@ -263,39 +238,7 @@ pub async fn do_append( bsos: Vec, collection: &str, ) -> DbResult<()> { - // Pass an array of struct objects as @values (for UNNEST), e.g.: - // [("", "", 101, "ba1", "bso1", NULL, "payload1", NULL), - // ("", "", 101, "ba1", "bso2", NULL, "payload2", NULL)] - // https://cloud.google.com/spanner/docs/structs#creating_struct_objects let mut running_size: usize = 0; - - // problem: Append may try to insert a duplicate record into the batch_bsos table. - // this is because spanner doesn't do upserts easily. An upsert like operation can - // be performed by carefully crafting a complex protobuf struct. (See - // https://github.com/mozilla-services/syncstorage-rs/issues/618#issuecomment-680227710 - // for details.) - // Batch_bso is a temp table and items are eventually rolled into bsos. - - // create a simple key for a HashSet to see if a given record has already been - // created - fn exist_idx(collection_id: &str, batch_id: &str, bso_id: &str) -> String { - format!( - "{collection_id}::{batch_id}::{bso_id}", - collection_id = collection_id, - batch_id = batch_id, - bso_id = bso_id, - ) - } - - struct UpdateRecord { - bso_id: String, - sortindex: Option, - payload: Option, - ttl: Option, - } - - //prefetch the existing batch_bsos for this user's batch. - let mut existing = HashSet::new(); let mut tags = HashMap::new(); tags.insert( "collection".to_owned(), @@ -304,96 +247,40 @@ pub async fn do_append( .unwrap_or_else(|| "UNKNOWN".to_string()), ); - let bso_ids = bsos - .iter() - .map(|pbso| pbso.id.clone()) - .collect::>(); - let (sqlparams, sqlparam_types) = params! { - "fxa_uid" => user_id.fxa_uid.clone(), - "fxa_kid" => user_id.fxa_kid.clone(), - "collection_id" => collection_id, - "batch_id" => batch.id.clone(), - "ids" => bso_ids, - }; - let mut existing_stream = db - .sql( - "SELECT batch_bso_id - FROM batch_bsos - WHERE fxa_uid=@fxa_uid - AND fxa_kid=@fxa_kid - AND collection_id=@collection_id - AND batch_id=@batch_id - AND batch_bso_id in UNNEST(@ids);", - ) - .await? - .params(sqlparams) - .param_types(sqlparam_types) - .execute(&db.conn)?; - while let Some(row) = existing_stream.try_next().await? { - existing.insert(exist_idx( - &collection_id.to_string(), - &batch.id, - row[0].get_string_value(), - )); - } - - db.metrics.count_with_tags( - "storage.spanner.batch.pre-existing", - existing.len() as i64, - tags.clone(), - ); - - // Approach 1: - // iterate and check to see if the record is in batch_bso table already - let mut insert: Vec = Vec::new(); - let mut update: Vec = Vec::new(); + // Build an ARRAY of incoming rows for a `INSERT OR UPDATE`. COALESCE(new, existing) + // is used so an update only overwrites fields the request supplied. + let mut rows: Vec = Vec::with_capacity(bsos.len()); for bso in bsos { if let Some(ref payload) = bso.payload { running_size += payload.len(); } - let exist_idx = exist_idx(&collection_id.to_string(), &batch.id, &bso.id); - - if existing.contains(&exist_idx) { - // need to update this record - // reject this record since you can only have one update per batch - update.push(UpdateRecord { - bso_id: bso.id, - sortindex: bso.sortindex, - payload: bso.payload, - ttl: bso.ttl, - }); - } else { - let sortindex = bso - .sortindex - .map(IntoSpannerValue::into_spanner_value) - .unwrap_or_else(null_value); - let payload = bso - .payload - .map(IntoSpannerValue::into_spanner_value) - .unwrap_or_else(null_value); - let ttl = bso - .ttl - .map(IntoSpannerValue::into_spanner_value) - .unwrap_or_else(null_value); - - // convert to a protobuf structure for direct insertion to - // avoid some mutation limits. - let mut row = ListValue::new(); - row.set_values(RepeatedField::from_vec(vec![ - user_id.fxa_uid.clone().into_spanner_value(), - user_id.fxa_kid.clone().into_spanner_value(), - collection_id.into_spanner_value(), - batch.id.clone().into_spanner_value(), - bso.id.into_spanner_value(), - sortindex, - payload, - ttl, - ])); - let mut value = Value::new(); - value.set_list_value(row); - insert.push(value); - existing.insert(exist_idx.clone()); - }; + let sortindex = bso + .sortindex + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + let payload = bso + .payload + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + let ttl = bso + .ttl + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + + let mut row = ListValue::new(); + row.set_values(RepeatedField::from_vec(vec![ + user_id.fxa_uid.clone().into_spanner_value(), + user_id.fxa_kid.clone().into_spanner_value(), + collection_id.into_spanner_value(), + batch.id.clone().into_spanner_value(), + bso.id.into_spanner_value(), + sortindex, + payload, + ttl, + ])); + let mut value = Value::new(); + value.set_list_value(row); + rows.push(value); } if db.quota.enabled @@ -407,6 +294,10 @@ pub async fn do_append( } } + if rows.is_empty() { + return Ok(()); + } + let fields = vec![ ("fxa_uid", TypeCode::STRING), ("fxa_kid", TypeCode::STRING), @@ -421,93 +312,53 @@ pub async fn do_append( .map(|(name, field_type)| struct_type_field(name, field_type)) .collect(); - if !insert.is_empty() { - let mut list_values = ListValue::new(); - let count_inserts = insert.len(); - list_values.set_values(RepeatedField::from_vec(insert)); - let mut values = Value::new(); - values.set_list_value(list_values); - - // values' type is an ARRAY of STRUCTs - let mut param_type = Type::new(); - param_type.set_code(TypeCode::ARRAY); - let mut array_type = Type::new(); - array_type.set_code(TypeCode::STRUCT); - - // STRUCT requires definition of all its field types - let mut struct_type = StructType::new(); - struct_type.set_fields(RepeatedField::from_vec(fields)); - array_type.set_struct_type(struct_type); - param_type.set_array_element_type(array_type); - - let mut sqlparams = HashMap::new(); - sqlparams.insert("values".to_owned(), values); - let mut sqlparam_types = HashMap::new(); - sqlparam_types.insert("values".to_owned(), param_type); - db.sql( - "INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id, - sortindex, payload, ttl) - SELECT * FROM UNNEST(@values)", - ) - .await? - .params(sqlparams) - .param_types(sqlparam_types) - .execute_dml(&db.conn) - .await?; - db.metrics.count_with_tags( - "storage.spanner.batch.insert", - count_inserts as i64, - tags.clone(), - ); - } - - // assuming that "update" is rarer than an insert, we can try using the standard API for that. - if !update.is_empty() { - for val in update { - let mut fields = Vec::new(); - let (mut params, mut param_types) = params! { - "fxa_uid" => user_id.fxa_uid.clone(), - "fxa_kid" => user_id.fxa_kid.clone(), - "collection_id" => collection_id, - "batch_id" => batch.id.clone(), - "batch_bso_id" => val.bso_id, - }; - if let Some(sortindex) = val.sortindex { - fields.push("sortindex"); - param_types.insert("sortindex".to_owned(), sortindex.spanner_type()); - params.insert("sortindex".to_owned(), sortindex.into_spanner_value()); - } - if let Some(payload) = val.payload { - fields.push("payload"); - param_types.insert("payload".to_owned(), payload.spanner_type()); - params.insert("payload".to_owned(), payload.into_spanner_value()); - }; - if let Some(ttl) = val.ttl { - fields.push("ttl"); - param_types.insert("ttl".to_owned(), ttl.spanner_type()); - params.insert("ttl".to_owned(), ttl.into_spanner_value()); - } - if fields.is_empty() { - continue; - }; - let updatable = fields - .iter() - .map(|field| format!("{field}=@{field}", field = field)) - .collect::>() - .join(", "); - db.sql(&format!( - "UPDATE batch_bsos SET {updatable} - WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id - AND batch_id=@batch_id AND batch_bso_id=@batch_bso_id", - updatable = updatable - )) - .await? - .params(params) - .param_types(param_types.clone()) - .execute_dml(&db.conn) - .await?; - } - } + let row_count = rows.len(); + let mut list_values = ListValue::new(); + list_values.set_values(RepeatedField::from_vec(rows)); + let mut values = Value::new(); + values.set_list_value(list_values); + + let mut param_type = Type::new(); + param_type.set_code(TypeCode::ARRAY); + let mut array_type = Type::new(); + array_type.set_code(TypeCode::STRUCT); + let mut struct_type = StructType::new(); + struct_type.set_fields(RepeatedField::from_vec(fields)); + array_type.set_struct_type(struct_type); + param_type.set_array_element_type(array_type); + + let mut sqlparams = HashMap::new(); + sqlparams.insert("values".to_owned(), values); + let mut sqlparam_types = HashMap::new(); + sqlparam_types.insert("values".to_owned(), param_type); + db.sql( + "INSERT OR UPDATE INTO batch_bsos + (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id, + sortindex, payload, ttl) + SELECT + incoming.fxa_uid, + incoming.fxa_kid, + incoming.collection_id, + incoming.batch_id, + incoming.batch_bso_id, + COALESCE(incoming.sortindex, existing.sortindex), + COALESCE(incoming.payload, existing.payload), + COALESCE(incoming.ttl, existing.ttl) + FROM UNNEST(@values) AS incoming + LEFT JOIN batch_bsos AS existing + ON existing.fxa_uid = incoming.fxa_uid + AND existing.fxa_kid = incoming.fxa_kid + AND existing.collection_id = incoming.collection_id + AND existing.batch_id = incoming.batch_id + AND existing.batch_bso_id = incoming.batch_bso_id", + ) + .await? + .params(sqlparams) + .param_types(sqlparam_types) + .execute_dml(&db.conn) + .await?; + db.metrics + .count_with_tags("storage.spanner.batch.upsert", row_count as i64, tags); Ok(()) } diff --git a/syncstorage-spanner/src/db/db_impl.rs b/syncstorage-spanner/src/db/db_impl.rs index 7ad14a4422..c120f09cce 100644 --- a/syncstorage-spanner/src/db/db_impl.rs +++ b/syncstorage-spanner/src/db/db_impl.rs @@ -9,11 +9,7 @@ use google_cloud_rust_raw::spanner::v1::{ }, type_pb::TypeCode, }; -#[allow(unused_imports)] -use protobuf::{ - Message, RepeatedField, - well_known_types::{ListValue, Value}, -}; +use syncserver_common::MAX_SPANNER_LOAD_SIZE; use syncstorage_db_common::{Db, error::DbErrorIntrospect, params, results, util::SyncTimestamp}; use super::{ @@ -163,9 +159,6 @@ impl Db for SpannerDb { let mut req = CommitRequest::new(); req.set_session(spanner.session.get_name().to_owned()); req.set_transaction_id(transaction.get_id().to_vec()); - if let Some(mutations) = self.session.mutations.take() { - req.set_mutations(RepeatedField::from_vec(mutations)); - } spanner .client .commit_async_opt(&req, spanner.session_opt()?)? @@ -518,15 +511,6 @@ impl Db for SpannerDb { &mut self, params: params::UpdateCollection, ) -> DbResult { - // NOTE: Spanner supports upserts via its InsertOrUpdate mutation but - // lacks a SQL equivalent. This call could be 1 InsertOrUpdate instead - // of 2 queries but would require put/post_bsos to also use mutations. - // Due to case of when no parent row exists (in user_collections) - // before writing to bsos. Spanner requires a parent table row exist - // before child table rows are written. - // Mutations don't run in the same order as ExecuteSql calls, they are - // buffered on the client side and only issued to Spanner in the final - // transaction Commit. let timestamp = self.checked_timestamp()?; if !cfg!(debug_assertions) && self.session.updated_collection { // No need to touch it again (except during tests where we @@ -794,19 +778,79 @@ impl Db for SpannerDb { } async fn put_bso(&mut self, params: params::PutBso) -> DbResult { - if self.conn.settings.use_mutations { - self.put_bso_with_mutations(params).await - } else { - self.put_bso_without_mutations(params).await - } + let user_id = params.user_id; + let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; + + self.check_quota(&user_id, ¶ms.collection).await?; + + // Ensure a parent record exists in user_collections before writing to bsos + let timestamp = self + .update_collection(params::UpdateCollection { + user_id: user_id.clone(), + collection_id, + collection: params.collection, + }) + .await?; + + self.put_bso_dml( + &user_id, + collection_id, + params::PostCollectionBso { + id: params.id, + sortindex: params.sortindex, + payload: params.payload, + ttl: params.ttl, + }, + timestamp, + ) + .await?; + + self.update_user_collection_quotas(&user_id, collection_id) + .await } async fn post_bsos(&mut self, params: params::PostBsos) -> DbResult { - if self.conn.settings.use_mutations { - self.post_bsos_with_mutations(params).await - } else { - self.post_bsos_without_mutations(params).await + let user_id = params.user_id; + let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; + + if !params.for_batch { + self.check_quota(&user_id, ¶ms.collection).await?; } + + let load_size: usize = params + .bsos + .iter() + .map(|bso| bso.payload.as_ref().map_or(0, String::len)) + .sum(); + if load_size > MAX_SPANNER_LOAD_SIZE { + self.metrics.clone().incr("error.tooMuchData"); + trace!( + "⚠️Attempted to load too much data into Spanner: {:?} bytes", + load_size + ); + return Err(DbError::too_large(format!( + "Committed data too large: {}", + load_size + ))); + } + + // Ensure a parent record exists in user_collections before writing to bsos + let timestamp = self + .update_collection(params::UpdateCollection { + user_id: user_id.clone(), + collection_id, + collection: params.collection, + }) + .await?; + + self.post_bsos_dml(&user_id, collection_id, params.bsos, timestamp) + .await?; + + if !params.for_batch { + self.update_user_collection_quotas(&user_id, collection_id) + .await?; + } + Ok(timestamp) } async fn check(&mut self) -> DbResult { diff --git a/syncstorage-spanner/src/db/mod.rs b/syncstorage-spanner/src/db/mod.rs index d2195e6ff8..b144f392c6 100644 --- a/syncstorage-spanner/src/db/mod.rs +++ b/syncstorage-spanner/src/db/mod.rs @@ -1,25 +1,20 @@ -use std::{ - collections::{HashMap, HashSet}, - convert::TryInto, - fmt, - sync::Arc, -}; +use std::{collections::HashMap, convert::TryInto, fmt, sync::Arc}; use google_cloud_rust_raw::spanner::v1::{ - mutation::{Mutation, Mutation_Write}, spanner::ExecuteSqlRequest, transaction::TransactionSelector, - type_pb::TypeCode, + type_pb::{StructType, Type, TypeCode}, }; -#[allow(unused_imports)] use protobuf::{ - Message, RepeatedField, + RepeatedField, well_known_types::{ListValue, Value}, }; -use syncserver_common::{MAX_SPANNER_LOAD_SIZE, Metrics}; +use syncserver_common::Metrics; use syncstorage_db_common::{ DEFAULT_BSO_TTL, Db, FIRST_CUSTOM_COLLECTION_ID, Sorting, UserIdentifier, - error::DbErrorIntrospect, params, results, util::SyncTimestamp, + error::DbErrorIntrospect, + params, + util::{SyncTimestamp, to_rfc3339}, }; use syncstorage_settings::Quota; @@ -29,8 +24,8 @@ use crate::{ pool::{CollectionCache, Conn}, }; use support::{ - ExecuteSqlRequestBuilder, IntoSpannerValue, StreamedResultSetAsync, as_type, bso_to_insert_row, - bso_to_update_row, + ExecuteSqlRequestBuilder, IntoSpannerValue, StreamedResultSetAsync, as_type, null_value, + struct_type_field, }; mod batch_impl; @@ -82,9 +77,6 @@ struct SpannerDbSession { /// Currently locked collections coll_locks: HashMap<(UserIdentifier, i32), CollectionLock>, transaction: Option, - /// Behind Vec so commit can take() it (maybe commit() should consume self - /// instead?) - mutations: Option>, in_write_transaction: bool, execute_sql_count: u64, /// Whether update_collection has already been called @@ -210,56 +202,6 @@ impl SpannerDb { Ok(ExecuteSqlRequestBuilder::new(self.sql_request(sql).await?)) } - #[allow(unused)] - pub(super) fn insert(&mut self, table: &str, columns: &[&str], values: Vec) { - let mut mutation = Mutation::new(); - mutation.set_insert(self.mutation_write(table, columns, values)); - self.session - .mutations - .get_or_insert_with(Vec::new) - .push(mutation); - } - - #[allow(unused)] - pub(super) fn update(&mut self, table: &str, columns: &[&str], values: Vec) { - let mut mutation = Mutation::new(); - mutation.set_update(self.mutation_write(table, columns, values)); - self.session - .mutations - .get_or_insert_with(Vec::new) - .push(mutation); - } - - #[allow(unused)] - pub(super) fn insert_or_update( - &mut self, - table: &str, - columns: &[&str], - values: Vec, - ) { - let mut mutation = Mutation::new(); - mutation.set_insert_or_update(self.mutation_write(table, columns, values)); - self.session - .mutations - .get_or_insert_with(Vec::new) - .push(mutation); - } - - fn mutation_write( - &self, - table: &str, - columns: &[&str], - values: Vec, - ) -> Mutation_Write { - let mut write = Mutation_Write::new(); - write.set_table(table.to_owned()); - write.set_columns(RepeatedField::from_vec( - columns.iter().map(|&column| column.to_owned()).collect(), - )); - write.set_values(RepeatedField::from_vec(values)); - write - } - fn in_write_transaction(&self) -> bool { self.session.in_write_transaction } @@ -640,141 +582,6 @@ impl SpannerDb { */ } - async fn put_bso_with_mutations( - &mut self, - params: params::PutBso, - ) -> DbResult { - let bsos = vec![params::PostCollectionBso { - id: params.id, - sortindex: params.sortindex, - payload: params.payload, - ttl: params.ttl, - }]; - let result = self - .post_bsos_with_mutations(params::PostBsos { - user_id: params.user_id, - collection: params.collection, - bsos, - for_batch: false, - }) - .await?; - - Ok(result) - } - - async fn post_bsos_with_mutations( - &mut self, - params: params::PostBsos, - ) -> DbResult { - let user_id = params.user_id; - let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; - - if !params.for_batch { - self.check_quota(&user_id, ¶ms.collection).await?; - } - - // Ensure a parent record exists in user_collections before writing to - // bsos (INTERLEAVE IN PARENT user_collections) - let timestamp = self - .update_collection(params::UpdateCollection { - user_id: user_id.clone(), - collection_id, - collection: params.collection, - }) - .await?; - - let (sqlparams, sqlparam_types) = params! { - "fxa_uid" => user_id.fxa_uid.clone(), - "fxa_kid" => user_id.fxa_kid.clone(), - "collection_id" => collection_id, - "ids" => params - .bsos - .iter() - .map(|pbso| pbso.id.clone()) - .collect::>(), - }; - // Determine what bsos already exist (need to be inserted vs updated) - // NOTE: Here and in batch commit we match the original Python - // syncstorage's behavior: - // - not specifying "AND expiry > CURRENT_TIMESTAMP()" - // - thus treating existing but expired bsos as existing (and not - // expired) - // This simplifies the writes, avoiding the need to delete those - // expired bsos before inserting new ones with the same id. - // Unfortunately, this means updates may resurrect expired bsos (or at - // least a subset of their fields), or possibly even write new data - // without an associated ttl to an expired record that will be - // deleted. This in practice should be a very rare occurrence - let mut streaming = self - .sql( - "SELECT bso_id - FROM bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND bso_id IN UNNEST(@ids)", - ) - .await? - .params(sqlparams) - .param_types(sqlparam_types) - .execute(&self.conn)?; - let mut existing = HashSet::new(); - while let Some(mut row) = streaming.try_next().await? { - existing.insert(row[0].take_string_value()); - } - let mut inserts = vec![]; - let mut updates = HashMap::new(); - let mut load_size: usize = 0; - for bso in params.bsos { - if existing.contains(&bso.id) { - let (columns, values) = bso_to_update_row(&user_id, collection_id, bso, timestamp)?; - load_size += values.compute_size() as usize; - updates.entry(columns).or_insert_with(Vec::new).push(values); - } else { - let values = bso_to_insert_row(&user_id, collection_id, bso, timestamp)?; - load_size += values.compute_size() as usize; - inserts.push(values); - } - } - if load_size > MAX_SPANNER_LOAD_SIZE { - self.metrics.clone().incr("error.tooMuchData"); - trace!( - "⚠️Attempted to load too much data into Spanner: {:?} bytes", - load_size - ); - return Err(DbError::too_large(format!( - "Committed data too large: {}", - load_size - ))); - } - - if !inserts.is_empty() { - self.insert( - "bsos", - &[ - "fxa_uid", - "fxa_kid", - "collection_id", - "bso_id", - "sortindex", - "payload", - "modified", - "expiry", - ], - inserts, - ); - } - for (columns, values) in updates { - self.update("bsos", &columns, values); - } - if !params.for_batch { - // update the quotas - self.update_user_collection_quotas(&user_id, collection_id) - .await?; - }; - Ok(timestamp) - } - pub fn quota_error(&self, collection: &str) -> DbError { // return the over quota error. let mut tags = HashMap::default(); @@ -808,206 +615,189 @@ impl SpannerDb { Ok(Some(usage.total_bytes)) } - // NOTE: Currently this put_bso_async_without_mutations impl is only used - // during db tests, see the with_mutations impl for the non-tests version - async fn put_bso_without_mutations( + /// Write a bso using an `INSERT OR UPDATE`. + async fn put_bso_dml( &mut self, - bso: params::PutBso, - ) -> DbResult { - use syncstorage_db_common::util::to_rfc3339; - let collection_id = self.get_or_create_collection_id(&bso.collection).await?; - - self.check_quota(&bso.user_id, &bso.collection).await?; + user_id: &UserIdentifier, + collection_id: i32, + bso: params::PostCollectionBso, + timestamp: SyncTimestamp, + ) -> DbResult<()> { + let has_payload_or_sortindex = bso.payload.is_some() || bso.sortindex.is_some(); let (mut sqlparams, mut sqlparam_types) = params! { - "fxa_uid" => bso.user_id.fxa_uid.clone(), - "fxa_kid" => bso.user_id.fxa_kid.clone(), + "fxa_uid" => user_id.fxa_uid.clone(), + "fxa_kid" => user_id.fxa_kid.clone(), "collection_id" => collection_id, "bso_id" => bso.id, + "timestamp" => timestamp.as_rfc3339()?, + "default_bso_ttl" => DEFAULT_BSO_TTL, }; - // prewarm the collections table by ensuring that the row is added if not present. - self.update_collection(params::UpdateCollection { - user_id: bso.user_id.clone(), - collection_id, - collection: bso.collection, - }) - .await?; - let timestamp = self.checked_timestamp()?; + sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP)); - let result = self - .sql( - "SELECT 1 AS count - FROM bsos - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND bso_id = @bso_id", - ) - .await? - .params(sqlparams.clone()) - .param_types(sqlparam_types.clone()) - .execute(&self.conn)? - .one_or_none() - .await?; - let exists = result.is_some(); - - let sql = if exists { - let mut q = "".to_string(); - let comma = |q: &String| if q.is_empty() { "" } else { ", " }; - - q = format!( - "{}{}", - q, - if let Some(sortindex) = bso.sortindex { - sqlparam_types.insert("sortindex".to_string(), sortindex.spanner_type()); - sqlparams.insert("sortindex".to_string(), sortindex.into_spanner_value()); - - format!("{}{}", comma(&q), "sortindex = @sortindex") - } else { - "".to_string() - } - ); - - q = format!( - "{}{}", - q, - if let Some(ttl) = bso.ttl { - let expiry = timestamp.as_i64() + (i64::from(ttl) * 1000); - sqlparams.insert( - "expiry".to_string(), - to_rfc3339(expiry)?.into_spanner_value(), - ); - sqlparam_types.insert("expiry".to_string(), as_type(TypeCode::TIMESTAMP)); - format!("{}{}", comma(&q), "expiry = @expiry") - } else { - "".to_string() - } - ); - - q = format!( - "{}{}", - q, - if bso.payload.is_some() || bso.sortindex.is_some() { - sqlparams.insert( - "modified".to_string(), - timestamp.as_rfc3339()?.into_spanner_value(), - ); - sqlparam_types.insert("modified".to_string(), as_type(TypeCode::TIMESTAMP)); - format!("{}{}", comma(&q), "modified = @modified") - } else { - "".to_string() - } - ); - - q = format!( - "{}{}", - q, - if let Some(payload) = bso.payload { - sqlparam_types.insert("payload".to_string(), payload.spanner_type()); - sqlparams.insert("payload".to_string(), payload.into_spanner_value()); - format!("{}{}", comma(&q), "payload = @payload") - } else { - "".to_string() - } - ); - - if q.is_empty() { - // Nothing to update - return Ok(timestamp); - } - - format!( - "UPDATE bsos SET {}{}", - q, - " WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id - AND bso_id = @bso_id" - ) + let modified_expr = if has_payload_or_sortindex { + "@timestamp" } else { - let use_sortindex = bso - .sortindex - .map(|sortindex| sortindex.to_string()) - .unwrap_or_else(|| "NULL".to_owned()) - != "NULL"; - let sql = if use_sortindex { - "INSERT INTO bsos - (fxa_uid, fxa_kid, collection_id, bso_id, sortindex, payload, modified, - expiry) - VALUES - (@fxa_uid, @fxa_kid, @collection_id, @bso_id, @sortindex, @payload, - @modified, @expiry)" - } else { - "INSERT INTO bsos (fxa_uid, fxa_kid, collection_id, bso_id, payload, modified, - expiry) - VALUES (@fxa_uid, @fxa_kid, @collection_id, @bso_id, @payload, @modified, - @expiry)" - }; + "COALESCE(existing.modified, @timestamp)" + }; - if use_sortindex { - use support::null_value; - let sortindex = bso - .sortindex - .map(|sortindex| sortindex.into_spanner_value()) - .unwrap_or_else(null_value); - sqlparams.insert("sortindex".to_string(), sortindex); - sqlparam_types.insert("sortindex".to_string(), as_type(TypeCode::INT64)); - } - let payload = bso.payload.unwrap_or_else(|| "".to_owned()); + let payload_expr = if let Some(payload) = bso.payload { sqlparam_types.insert("payload".to_owned(), payload.spanner_type()); - sqlparams.insert("payload".to_string(), payload.into_spanner_value()); - let now_millis = timestamp.as_i64(); - let ttl = bso.ttl.map_or(i64::from(DEFAULT_BSO_TTL), |ttl| ttl.into()) * 1000; - let expirystring = to_rfc3339(now_millis + ttl)?; - debug!( - "!!!!! [test] INSERT expirystring:{:?}, timestamp:{:?}, ttl:{:?}", - &expirystring, timestamp, ttl - ); - sqlparams.insert("expiry".to_string(), expirystring.into_spanner_value()); - sqlparam_types.insert("expiry".to_string(), as_type(TypeCode::TIMESTAMP)); + sqlparams.insert("payload".to_owned(), payload.into_spanner_value()); + "@payload" + } else { + "COALESCE(existing.payload, '')" + }; + let expiry_expr = if let Some(ttl) = bso.ttl { + let expiry = timestamp.as_i64() + (i64::from(ttl) * 1000); sqlparams.insert( - "modified".to_string(), - timestamp.as_rfc3339()?.into_spanner_value(), + "ttl_expiry".to_owned(), + to_rfc3339(expiry)?.into_spanner_value(), ); - sqlparam_types.insert("modified".to_string(), as_type(TypeCode::TIMESTAMP)); - sql.to_owned() + sqlparam_types.insert("ttl_expiry".to_owned(), as_type(TypeCode::TIMESTAMP)); + "@ttl_expiry" + } else { + "COALESCE(existing.expiry, TIMESTAMP_ADD(@timestamp, INTERVAL @default_bso_ttl SECOND))" }; + let (sortindex_col, sortindex_expr) = if let Some(sortindex) = bso.sortindex { + sqlparam_types.insert("sortindex".to_owned(), sortindex.spanner_type()); + sqlparams.insert("sortindex".to_owned(), sortindex.into_spanner_value()); + (", sortindex", ", @sortindex") + } else { + ("", "") + }; + + let sql = format!( + "INSERT OR UPDATE INTO bsos + (fxa_uid, fxa_kid, collection_id, bso_id, modified, payload, expiry{sortindex_col}) + SELECT + @fxa_uid, @fxa_kid, @collection_id, @bso_id, + {modified_expr}, + {payload_expr}, + {expiry_expr}{sortindex_expr} + FROM UNNEST([1]) -- provides a row source for the LEFT JOIN + LEFT JOIN ( + SELECT modified, payload, expiry, sortindex + FROM bsos + WHERE fxa_uid = @fxa_uid + AND fxa_kid = @fxa_kid + AND collection_id = @collection_id + AND bso_id = @bso_id + ) AS existing ON TRUE" + ); + self.sql(&sql) .await? .params(sqlparams) .param_types(sqlparam_types) .execute_dml(&self.conn) .await?; - // update the counts for the user_collections table. - self.update_user_collection_quotas(&bso.user_id, collection_id) - .await + Ok(()) } - // NOTE: Currently this post_bsos_without_mutations impl is only used - // during db tests, see the with_mutations impl for the non-tests version - async fn post_bsos_without_mutations( + /// Write N bsos to the same `(fxa_uid, fxa_kid, collection_id)` in an `INSERT OR UPDATE` + async fn post_bsos_dml( &mut self, - input: params::PostBsos, - ) -> DbResult { - let collection_id = self.get_or_create_collection_id(&input.collection).await?; - let modified = self.checked_timestamp()?; - - for pbso in input.bsos { - let id = pbso.id; - self.put_bso_without_mutations(params::PutBso { - user_id: input.user_id.clone(), - collection: input.collection.clone(), - id: id.clone(), - payload: pbso.payload, - sortindex: pbso.sortindex, - ttl: pbso.ttl, - }) - .await?; + user_id: &UserIdentifier, + collection_id: i32, + bsos: Vec, + timestamp: SyncTimestamp, + ) -> DbResult<()> { + if bsos.is_empty() { + return Ok(()); } - self.update_user_collection_quotas(&input.user_id, collection_id) - .await?; - Ok(modified) + + let mut rows: Vec = Vec::with_capacity(bsos.len()); + for bso in bsos { + // Optional columns are encoded as NULL when the request omitted them + let sortindex = bso + .sortindex + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + let payload = bso + .payload + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + let ttl = bso + .ttl + .map(IntoSpannerValue::into_spanner_value) + .unwrap_or_else(null_value); + + let mut row = ListValue::new(); + row.set_values(vec![bso.id.into_spanner_value(), sortindex, payload, ttl].into()); + let mut value = Value::new(); + value.set_list_value(row); + rows.push(value); + } + + let fields = vec![ + ("bso_id", TypeCode::STRING), + ("sortindex", TypeCode::INT64), + ("payload", TypeCode::STRING), + ("ttl", TypeCode::INT64), + ] + .into_iter() + .map(|(name, field_type)| struct_type_field(name, field_type)) + .collect(); + + let mut list_values = ListValue::new(); + list_values.set_values(RepeatedField::from_vec(rows)); + let mut values = Value::new(); + values.set_list_value(list_values); + + let mut param_type = Type::new(); + param_type.set_code(TypeCode::ARRAY); + let mut array_type = Type::new(); + array_type.set_code(TypeCode::STRUCT); + let mut struct_type = StructType::new(); + struct_type.set_fields(RepeatedField::from_vec(fields)); + array_type.set_struct_type(struct_type); + param_type.set_array_element_type(array_type); + + let (mut sqlparams, mut sqlparam_types) = params! { + "fxa_uid" => user_id.fxa_uid.clone(), + "fxa_kid" => user_id.fxa_kid.clone(), + "collection_id" => collection_id, + "timestamp" => timestamp.as_rfc3339()?, + "default_bso_ttl" => DEFAULT_BSO_TTL, + }; + sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP)); + sqlparams.insert("bsos".to_owned(), values); + sqlparam_types.insert("bsos".to_owned(), param_type); + + self.sql( + "INSERT OR UPDATE INTO bsos + (fxa_uid, fxa_kid, collection_id, bso_id, + sortindex, payload, modified, expiry) + SELECT + @fxa_uid, + @fxa_kid, + @collection_id, + incoming.bso_id, + COALESCE(incoming.sortindex, existing.sortindex), + COALESCE(incoming.payload, existing.payload, ''), + IF(incoming.payload IS NOT NULL OR incoming.sortindex IS NOT NULL, + @timestamp, + COALESCE(existing.modified, @timestamp)), + COALESCE( + TIMESTAMP_ADD(@timestamp, INTERVAL incoming.ttl SECOND), + existing.expiry, + TIMESTAMP_ADD(@timestamp, INTERVAL @default_bso_ttl SECOND) + ) + FROM UNNEST(@bsos) AS incoming + LEFT JOIN bsos AS existing + ON existing.fxa_uid = @fxa_uid + AND existing.fxa_kid = @fxa_kid + AND existing.collection_id = @collection_id + AND existing.bso_id = incoming.bso_id", + ) + .await? + .params(sqlparams) + .param_types(sqlparam_types) + .execute_dml(&self.conn) + .await?; + Ok(()) } } diff --git a/syncstorage-spanner/src/db/support.rs b/syncstorage-spanner/src/db/support.rs index 8fcc4bd1d1..d7a9f18436 100644 --- a/syncstorage-spanner/src/db/support.rs +++ b/syncstorage-spanner/src/db/support.rs @@ -9,9 +9,7 @@ use protobuf::{ RepeatedField, well_known_types::{ListValue, NullValue, Struct, Value}, }; -use syncstorage_db_common::{ - DEFAULT_BSO_TTL, UserIdentifier, params, results, util::SyncTimestamp, util::to_rfc3339, -}; +use syncstorage_db_common::{results, util::SyncTimestamp}; pub use super::stream::StreamedResultSetAsync; use crate::{DbResult, error::DbError, pool::Conn}; @@ -200,68 +198,3 @@ pub fn bso_from_row(mut row: Vec) -> DbResult { .as_i64(), }) } - -pub fn bso_to_insert_row( - user_id: &UserIdentifier, - collection_id: i32, - bso: params::PostCollectionBso, - now: SyncTimestamp, -) -> DbResult { - let sortindex = bso - .sortindex - .map(|sortindex| sortindex.into_spanner_value()) - .unwrap_or_else(null_value); - let ttl = bso.ttl.unwrap_or(DEFAULT_BSO_TTL); - let expiry = to_rfc3339(now.as_i64() + (i64::from(ttl) * 1000))?; - - let mut row = ListValue::new(); - row.set_values(RepeatedField::from_vec(vec![ - user_id.fxa_uid.clone().into_spanner_value(), - user_id.fxa_kid.clone().into_spanner_value(), - collection_id.into_spanner_value(), - bso.id.into_spanner_value(), - sortindex, - bso.payload.unwrap_or_default().into_spanner_value(), - now.as_rfc3339()?.into_spanner_value(), - expiry.into_spanner_value(), - ])); - Ok(row) -} - -pub fn bso_to_update_row( - user_id: &UserIdentifier, - collection_id: i32, - bso: params::PostCollectionBso, - now: SyncTimestamp, -) -> DbResult<(Vec<&'static str>, ListValue)> { - let mut columns = vec!["fxa_uid", "fxa_kid", "collection_id", "bso_id"]; - let mut values = vec![ - user_id.fxa_uid.clone().into_spanner_value(), - user_id.fxa_kid.clone().into_spanner_value(), - collection_id.into_spanner_value(), - bso.id.into_spanner_value(), - ]; - - let modified = bso.payload.is_some() || bso.sortindex.is_some(); - if let Some(sortindex) = bso.sortindex { - columns.push("sortindex"); - values.push(sortindex.into_spanner_value()); - } - if let Some(payload) = bso.payload { - columns.push("payload"); - values.push(payload.into_spanner_value()); - } - if modified { - columns.push("modified"); - values.push(now.as_rfc3339()?.into_spanner_value()); - } - if let Some(ttl) = bso.ttl { - columns.push("expiry"); - let expiry = now.as_i64() + (i64::from(ttl) * 1000); - values.push(to_rfc3339(expiry)?.into_spanner_value()); - } - - let mut row = ListValue::new(); - row.set_values(RepeatedField::from_vec(values)); - Ok((columns, row)) -} diff --git a/syncstorage-spanner/src/manager/session.rs b/syncstorage-spanner/src/manager/session.rs index b25dc8847c..e9e6e14a00 100644 --- a/syncstorage-spanner/src/manager/session.rs +++ b/syncstorage-spanner/src/manager/session.rs @@ -53,10 +53,6 @@ pub struct SpannerSessionSettings { /// The database name pub database: String, - /// Whether [SpannerDb] uses mutations, which should be more efficient for - /// some of its bulk operations - pub use_mutations: bool, - /// Whether the Leader Aware Routing header should be included in gRPC /// metdata pub route_to_leader: bool, @@ -79,14 +75,8 @@ impl SpannerSessionSettings { })? .to_owned(); - #[cfg(not(debug_assertions))] - let use_mutations = true; - #[cfg(debug_assertions)] - let use_mutations = settings.database_spanner_use_mutations; - Ok(Self { database, - use_mutations, route_to_leader: settings.database_spanner_route_to_leader, max_lifespan: settings.database_pool_connection_lifespan, max_idle: settings.database_pool_connection_max_idle,