Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/storage/sqlite_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "storage/sqlite_transaction.hpp"
#include "sqlite_db.hpp"
#include "sqlite_stmt.hpp"
#include <mutex>

namespace duckdb {

Expand All @@ -25,7 +26,10 @@ class SQLiteDeleteGlobalState : public GlobalSinkState {

SQLiteTableEntry &table;
SQLiteStatement statement;
std::mutex rowid_lock;
vector<row_t> rowids;
idx_t delete_count;
bool delete_done = false;
};

string GetDeleteSQL(const string &table_name) {
Expand All @@ -38,9 +42,7 @@ string GetDeleteSQL(const string &table_name) {
unique_ptr<GlobalSinkState> SQLiteDelete::GetGlobalSinkState(ClientContext &context) const {
auto &sqlite_table = table.Cast<SQLiteTableEntry>();

auto &transaction = SQLiteTransaction::Get(context, sqlite_table.catalog);
auto result = make_uniq<SQLiteDeleteGlobalState>(sqlite_table);
result->statement = transaction.GetDB().Prepare(GetDeleteSQL(sqlite_table.name));
return std::move(result);
}

Expand All @@ -53,12 +55,8 @@ SinkResultType SQLiteDelete::Sink(ExecutionContext &context, DataChunk &chunk, O
chunk.Flatten();
auto &row_identifiers = chunk.data[row_id_index];
auto row_data = FlatVector::GetData<row_t>(row_identifiers);
for (idx_t i = 0; i < chunk.size(); i++) {
gstate.statement.Bind<int64_t>(0, row_data[i]);
gstate.statement.Step();
gstate.statement.Reset();
}
gstate.delete_count += chunk.size();
std::lock_guard<std::mutex> lock(gstate.rowid_lock);
gstate.rowids.insert(gstate.rowids.end(), row_data, row_data + chunk.size());
return SinkResultType::NEED_MORE_INPUT;
}

Expand All @@ -67,6 +65,19 @@ SinkResultType SQLiteDelete::Sink(ExecutionContext &context, DataChunk &chunk, O
//===--------------------------------------------------------------------===//
SourceResultType SQLiteDelete::GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const {
auto &insert_gstate = sink_state->Cast<SQLiteDeleteGlobalState>();
if (!insert_gstate.delete_done) {
if (!insert_gstate.statement.IsOpen()) {
auto &transaction = SQLiteTransaction::Get(context.client, insert_gstate.table.catalog);
insert_gstate.statement = transaction.GetDB().Prepare(GetDeleteSQL(insert_gstate.table.name));
}
for (auto row_id : insert_gstate.rowids) {
insert_gstate.statement.Bind<int64_t>(0, row_id);
insert_gstate.statement.Step();
insert_gstate.statement.Reset();
}
insert_gstate.delete_count = insert_gstate.rowids.size();
insert_gstate.delete_done = true;
}
chunk.SetCardinality(1);
chunk.SetValue(0, 0, Value::BIGINT(insert_gstate.delete_count));

Expand Down
36 changes: 36 additions & 0 deletions test/sql/storage/attach_delete_hang_repro.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# name: test/sql/storage/attach_delete_hang_repro.test
# description: reproduce large full-table delete behavior on attached SQLite table
# group: [sqlite_storage]

require sqlite_scanner

statement ok
SET threads=8

statement ok
ATTACH '__TEST_DIR__/attach_delete_hang_repro.db' AS s (TYPE SQLITE)

statement ok
CREATE TABLE s.t(i BIGINT, p VARCHAR, b BOOLEAN, ts TIMESTAMPTZ);

query I
INSERT INTO s.t
SELECT i, 'f/' || i::VARCHAR || '.parquet', TRUE, now()
FROM range(50000) t(i);
----
50000

query I
SELECT COUNT(*) FROM s.t;
----
50000

query I
DELETE FROM s.t;
----
50000

query I
SELECT COUNT(*) FROM s.t;
----
0
Loading