From f3423774776bb1af44e2b2f7f22a5ea4d3e4a192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Sat, 14 Mar 2026 15:31:19 +0000 Subject: [PATCH 1/7] Reduce sqlite scanner parallel contention --- src/include/sqlite_stmt.hpp | 1 + src/sqlite_scanner.cpp | 125 ++++++++++++++++++++++-------------- src/sqlite_stmt.cpp | 4 ++ 3 files changed, 81 insertions(+), 49 deletions(-) diff --git a/src/include/sqlite_stmt.hpp b/src/include/sqlite_stmt.hpp index b737e49..c69a245 100644 --- a/src/include/sqlite_stmt.hpp +++ b/src/include/sqlite_stmt.hpp @@ -55,6 +55,7 @@ class SQLiteStatement { int expected_type, idx_t col_idx); void CheckTypeIsFloatOrInteger(sqlite3_value *val, int sqlite_column_type, idx_t col_idx); void Reset(); + void ClearBindings(); }; template <> diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 568595a..6cb51b7 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -22,7 +22,9 @@ struct SqliteLocalState : public LocalTableFunctionState { SQLiteDB owned_db; SQLiteStatement stmt; bool done = false; + bool reuses_rowid_statement = false; vector column_ids; + string prepared_sql; //! The amount of rows we scanned as part of this row group idx_t scan_count = 1; @@ -85,44 +87,62 @@ static unique_ptr SqliteBind(ClientContext &context, TableFunction return std::move(result); } +static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vector &column_ids, bool parameterized_rowid, + idx_t rowid_min, idx_t rowid_max) { + if (!bind_data.sql.empty()) { + return bind_data.sql; + } + auto col_names = StringUtil::Join(column_ids.data(), column_ids.size(), ", ", [&](const idx_t column_id) { + return column_id == (column_t)-1 ? "ROWID" + : '"' + SQLiteUtils::SanitizeIdentifier(bind_data.names[column_id]) + '"'; + }); + + auto sql = StringUtil::Format("SELECT %s FROM \"%s\"", col_names, SQLiteUtils::SanitizeIdentifier(bind_data.table_name)); + if (!bind_data.rows_per_group.IsValid()) { + D_ASSERT(rowid_min == 0); + return sql; + } + if (parameterized_rowid) { + sql += " WHERE ROWID BETWEEN ? AND ?"; + } else { + sql += StringUtil::Format(" WHERE ROWID BETWEEN %d AND %d", rowid_min, rowid_max); + } + return sql; +} + static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bind_data, SqliteLocalState &local_state, idx_t rowid_min, idx_t rowid_max) { D_ASSERT(rowid_min <= rowid_max); local_state.done = false; - // we may have leftover statements or connections from a previous call to this - // function - local_state.stmt.Close(); if (!local_state.db) { SQLiteOpenOptions options; options.access_mode = AccessMode::READ_ONLY; local_state.owned_db = SQLiteDB::Open(bind_data.file_name.c_str(), options); local_state.db = &local_state.owned_db; } - string sql; - if (bind_data.sql.empty()) { - auto col_names = StringUtil::Join( - local_state.column_ids.data(), local_state.column_ids.size(), ", ", [&](const idx_t column_id) { - return column_id == (column_t)-1 - ? "ROWID" - : '"' + SQLiteUtils::SanitizeIdentifier(bind_data.names[column_id]) + '"'; - }); - - sql = StringUtil::Format("SELECT %s FROM \"%s\"", col_names, - SQLiteUtils::SanitizeIdentifier(bind_data.table_name)); - if (bind_data.rows_per_group.IsValid()) { - // we are scanning a subset of the rows - generate a WHERE clause based on - // the rowid - auto where_clause = StringUtil::Format(" WHERE ROWID BETWEEN %d AND %d", rowid_min, rowid_max); - sql += where_clause; + auto use_reusable_rowid_statement = bind_data.sql.empty() && bind_data.rows_per_group.IsValid(); + if (use_reusable_rowid_statement) { + auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids, true, rowid_min, rowid_max); + if (!local_state.stmt.IsOpen() || !local_state.reuses_rowid_statement || local_state.prepared_sql != sql) { + local_state.stmt.Close(); + local_state.stmt = local_state.db->Prepare(sql.c_str()); + local_state.prepared_sql = sql; + local_state.reuses_rowid_statement = true; } else { - // we are scanning the entire table - no need for a WHERE clause - D_ASSERT(rowid_min == 0); + local_state.stmt.Reset(); + local_state.stmt.ClearBindings(); } + auto param_idx = bind_data.params.size(); + local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_min)); + local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_max)); } else { - sql = bind_data.sql; + local_state.reuses_rowid_statement = false; + local_state.prepared_sql.clear(); + local_state.stmt.Close(); + auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids, false, rowid_min, rowid_max); + local_state.stmt = local_state.db->Prepare(sql.c_str()); } - local_state.stmt = local_state.db->Prepare(sql.c_str()); for (idx_t i = 0; i < bind_data.params.size(); i++) { const Value ¶m = bind_data.params[i]; @@ -155,35 +175,42 @@ static idx_t SqliteMaxThreads(ClientContext &context, const FunctionData *bind_d static bool SqliteParallelStateNext(ClientContext &context, const SqliteBindData &bind_data, SqliteLocalState &lstate, SqliteGlobalState &gstate) { - lock_guard parallel_lock(gstate.lock); - if (!bind_data.rows_per_group.IsValid()) { - // not doing a parallel scan - scan everything at once - if (gstate.position > 0) { - // already scanned - return false; + idx_t rowid_min = 0; + idx_t rowid_max = 0; + bool has_next = false; + { + lock_guard parallel_lock(gstate.lock); + if (!bind_data.rows_per_group.IsValid()) { + // not doing a parallel scan - scan everything at once + if (gstate.position > 0) { + return false; + } + gstate.position = static_cast(-1); + lstate.scan_count = 0; + has_next = true; + } else { + auto max_row_id = bind_data.row_id_info.max_rowid.GetIndex(); + if (gstate.position < max_row_id) { + if (lstate.scan_count == 0 && gstate.rows_per_group < max_row_id) { + // we scanned no rows in our previous slice - double the rows per group + gstate.rows_per_group *= 2; + } + if (gstate.rows_per_group == 0) { + throw InternalException("SqliteParallelStateNext - gstate.rows_per_group not set"); + } + rowid_min = gstate.position; + rowid_max = MinValue(max_row_id, rowid_min + gstate.rows_per_group - 1); + gstate.position = rowid_max + 1; + lstate.scan_count = 0; + has_next = true; + } } - SqliteInitInternal(context, bind_data, lstate, 0, 0); - gstate.position = static_cast(-1); - lstate.scan_count = 0; - return true; } - auto max_row_id = bind_data.row_id_info.max_rowid.GetIndex(); - if (gstate.position < max_row_id) { - if (lstate.scan_count == 0 && gstate.rows_per_group < max_row_id) { - // we scanned no rows in our previous slice - double the rows per group - gstate.rows_per_group *= 2; - } - if (gstate.rows_per_group == 0) { - throw InternalException("SqliteParallelStateNext - gstate.rows_per_group not set"); - } - auto start = gstate.position; - auto end = MinValue(max_row_id, start + gstate.rows_per_group - 1); - SqliteInitInternal(context, bind_data, lstate, start, end); - gstate.position = end + 1; - lstate.scan_count = 0; - return true; + if (!has_next) { + return false; } - return false; + SqliteInitInternal(context, bind_data, lstate, rowid_min, rowid_max); + return true; } static unique_ptr diff --git a/src/sqlite_stmt.cpp b/src/sqlite_stmt.cpp index 8237295..bbd3c5d 100644 --- a/src/sqlite_stmt.cpp +++ b/src/sqlite_stmt.cpp @@ -101,6 +101,10 @@ void SQLiteStatement::Reset() { SQLiteUtils::Check(sqlite3_reset(stmt), db); } +void SQLiteStatement::ClearBindings() { + SQLiteUtils::Check(sqlite3_clear_bindings(stmt), db); +} + template <> string SQLiteStatement::GetValue(idx_t col) { D_ASSERT(stmt); From 73235283a5632fba30a157eceeb96b139a9119b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Sat, 14 Mar 2026 15:56:02 +0000 Subject: [PATCH 2/7] Simplify sqlite scanner statement reuse --- src/sqlite_scanner.cpp | 44 ++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 6cb51b7..666b2ae 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -22,7 +22,6 @@ struct SqliteLocalState : public LocalTableFunctionState { SQLiteDB owned_db; SQLiteStatement stmt; bool done = false; - bool reuses_rowid_statement = false; vector column_ids; string prepared_sql; //! The amount of rows we scanned as part of this row group @@ -87,8 +86,7 @@ static unique_ptr SqliteBind(ClientContext &context, TableFunction return std::move(result); } -static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vector &column_ids, bool parameterized_rowid, - idx_t rowid_min, idx_t rowid_max) { +static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vector &column_ids) { if (!bind_data.sql.empty()) { return bind_data.sql; } @@ -98,18 +96,25 @@ static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vectorPrepare(sql.c_str()); + local_state.prepared_sql = sql; + return; + } + local_state.stmt.Reset(); + local_state.stmt.ClearBindings(); +} + static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bind_data, SqliteLocalState &local_state, idx_t rowid_min, idx_t rowid_max) { D_ASSERT(rowid_min <= rowid_max); @@ -121,27 +126,12 @@ static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bin local_state.owned_db = SQLiteDB::Open(bind_data.file_name.c_str(), options); local_state.db = &local_state.owned_db; } - auto use_reusable_rowid_statement = bind_data.sql.empty() && bind_data.rows_per_group.IsValid(); - if (use_reusable_rowid_statement) { - auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids, true, rowid_min, rowid_max); - if (!local_state.stmt.IsOpen() || !local_state.reuses_rowid_statement || local_state.prepared_sql != sql) { - local_state.stmt.Close(); - local_state.stmt = local_state.db->Prepare(sql.c_str()); - local_state.prepared_sql = sql; - local_state.reuses_rowid_statement = true; - } else { - local_state.stmt.Reset(); - local_state.stmt.ClearBindings(); - } + auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids); + SqlitePrepareStatement(local_state, sql); + if (bind_data.rows_per_group.IsValid()) { auto param_idx = bind_data.params.size(); local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_min)); local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_max)); - } else { - local_state.reuses_rowid_statement = false; - local_state.prepared_sql.clear(); - local_state.stmt.Close(); - auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids, false, rowid_min, rowid_max); - local_state.stmt = local_state.db->Prepare(sql.c_str()); } for (idx_t i = 0; i < bind_data.params.size(); i++) { From 4b1c58ffaf619432968d1e11eb05a3245f11c31a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Sat, 14 Mar 2026 15:58:29 +0000 Subject: [PATCH 3/7] Remove dead sqlite scanner branch --- src/sqlite_scanner.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 666b2ae..3c17ad7 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -98,8 +98,6 @@ static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vector Date: Sat, 14 Mar 2026 15:59:17 +0000 Subject: [PATCH 4/7] Use explicit sql string type --- src/sqlite_scanner.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 3c17ad7..3ef421e 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -124,7 +124,7 @@ static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bin local_state.owned_db = SQLiteDB::Open(bind_data.file_name.c_str(), options); local_state.db = &local_state.owned_db; } - auto sql = SqliteGetScanSQL(bind_data, local_state.column_ids); + string sql = SqliteGetScanSQL(bind_data, local_state.column_ids); SqlitePrepareStatement(local_state, sql); if (bind_data.rows_per_group.IsValid()) { auto param_idx = bind_data.params.size(); From 1e7e88773dfb7e6c9fa140f933128e3ea1cd05c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Sat, 14 Mar 2026 16:18:12 +0000 Subject: [PATCH 5/7] Rename cached sqlite statement sql --- src/sqlite_scanner.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 3ef421e..cfd8150 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -21,9 +21,9 @@ struct SqliteLocalState : public LocalTableFunctionState { SQLiteDB *db; SQLiteDB owned_db; SQLiteStatement stmt; + string stmt_sql; bool done = false; vector column_ids; - string prepared_sql; //! The amount of rows we scanned as part of this row group idx_t scan_count = 1; @@ -103,10 +103,10 @@ static string SqliteGetScanSQL(const SqliteBindData &bind_data, const vectorPrepare(sql.c_str()); - local_state.prepared_sql = sql; + local_state.stmt_sql = sql; return; } local_state.stmt.Reset(); From aa867b36c6d31211af6f258a786da00a4cb72c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Sat, 14 Mar 2026 16:25:35 +0000 Subject: [PATCH 6/7] Refactor sqlite scanner slice claiming --- src/sqlite_scanner.cpp | 61 +++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index cfd8150..678ed6f 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -161,40 +161,41 @@ static idx_t SqliteMaxThreads(ClientContext &context, const FunctionData *bind_d return row_count / bind_data.rows_per_group.GetIndex(); } +static bool SqliteClaimNextSlice(const SqliteBindData &bind_data, SqliteLocalState &lstate, SqliteGlobalState &gstate, + idx_t &rowid_min, idx_t &rowid_max) { + lock_guard parallel_lock(gstate.lock); + if (!bind_data.rows_per_group.IsValid()) { + // not doing a parallel scan - scan everything at once + if (gstate.position > 0) { + return false; + } + gstate.position = static_cast(-1); + lstate.scan_count = 0; + return true; + } + auto max_row_id = bind_data.row_id_info.max_rowid.GetIndex(); + if (gstate.position >= max_row_id) { + return false; + } + if (lstate.scan_count == 0 && gstate.rows_per_group < max_row_id) { + // we scanned no rows in our previous slice - double the rows per group + gstate.rows_per_group *= 2; + } + if (gstate.rows_per_group == 0) { + throw InternalException("SqliteParallelStateNext - gstate.rows_per_group not set"); + } + rowid_min = gstate.position; + rowid_max = MinValue(max_row_id, rowid_min + gstate.rows_per_group - 1); + gstate.position = rowid_max + 1; + lstate.scan_count = 0; + return true; +} + static bool SqliteParallelStateNext(ClientContext &context, const SqliteBindData &bind_data, SqliteLocalState &lstate, SqliteGlobalState &gstate) { idx_t rowid_min = 0; idx_t rowid_max = 0; - bool has_next = false; - { - lock_guard parallel_lock(gstate.lock); - if (!bind_data.rows_per_group.IsValid()) { - // not doing a parallel scan - scan everything at once - if (gstate.position > 0) { - return false; - } - gstate.position = static_cast(-1); - lstate.scan_count = 0; - has_next = true; - } else { - auto max_row_id = bind_data.row_id_info.max_rowid.GetIndex(); - if (gstate.position < max_row_id) { - if (lstate.scan_count == 0 && gstate.rows_per_group < max_row_id) { - // we scanned no rows in our previous slice - double the rows per group - gstate.rows_per_group *= 2; - } - if (gstate.rows_per_group == 0) { - throw InternalException("SqliteParallelStateNext - gstate.rows_per_group not set"); - } - rowid_min = gstate.position; - rowid_max = MinValue(max_row_id, rowid_min + gstate.rows_per_group - 1); - gstate.position = rowid_max + 1; - lstate.scan_count = 0; - has_next = true; - } - } - } - if (!has_next) { + if (!SqliteClaimNextSlice(bind_data, lstate, gstate, rowid_min, rowid_max)) { return false; } SqliteInitInternal(context, bind_data, lstate, rowid_min, rowid_max); From 12944399dcedf9aae35c046c1a1cb277c6a38a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Rafael?= Date: Mon, 16 Mar 2026 10:57:40 +0000 Subject: [PATCH 7/7] Bind sqlite params before rowid slice bounds --- src/sqlite_scanner.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/sqlite_scanner.cpp b/src/sqlite_scanner.cpp index 678ed6f..91b7180 100644 --- a/src/sqlite_scanner.cpp +++ b/src/sqlite_scanner.cpp @@ -126,16 +126,17 @@ static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bin } string sql = SqliteGetScanSQL(bind_data, local_state.column_ids); SqlitePrepareStatement(local_state, sql); + + idx_t param_idx = 0; + for (; param_idx < bind_data.params.size(); param_idx++) { + const Value ¶m = bind_data.params[param_idx]; + local_state.stmt.BindParameter(param, param_idx); + } + if (bind_data.rows_per_group.IsValid()) { - auto param_idx = bind_data.params.size(); local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_min)); local_state.stmt.Bind(param_idx++, UnsafeNumericCast(rowid_max)); } - - for (idx_t i = 0; i < bind_data.params.size(); i++) { - const Value ¶m = bind_data.params[i]; - local_state.stmt.BindParameter(param, i); - } } static unique_ptr SqliteCardinality(ClientContext &context, const FunctionData *bind_data_p) {