diff --git a/Cargo.lock b/Cargo.lock index b4d9ef8..f24b7ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,9 +1207,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +checksum = "8b4baf93f58d4425749ca49a51c50ebab072c5df6994d08fed93541c331481dc" dependencies = [ "either", ] @@ -1396,7 +1396,7 @@ dependencies = [ [[package]] name = "mq-db" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "axum", @@ -1418,9 +1418,9 @@ dependencies = [ [[package]] name = "mq-lang" -version = "0.6.0" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945450433662b786c8ce51f19ab70891dc442da43d39967c51537927e7f7305a" +checksum = "1212fc83c06eef62bf76ccea314ae51cfa72304bd73445bae64960281835c229" dependencies = [ "base64", "chrono", @@ -1428,7 +1428,7 @@ dependencies = [ "csv", "dirs", "hcl-rs", - "itertools 0.14.0", + "itertools 0.15.0", "md5", "miette", "mq-macros", @@ -1456,9 +1456,9 @@ dependencies = [ [[package]] name = "mq-macros" -version = "0.6.0" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f40d74822fe55553384f385363530300ea41ceadd598baf68fe0373bbb1fae" +checksum = "31e8b069c44f6802457aac42a52b2a44953b55b128ad1cfd7960d263bb3a318d" dependencies = [ "proc-macro2", "quote", @@ -1467,12 +1467,12 @@ dependencies = [ [[package]] name = "mq-markdown" -version = "0.6.0" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "791c374af833381678f62652884282f70043110e35b8a4a9f76c72cefca2def2" +checksum = "97cb203323d6225628fc2dfbd1c208e914b85aabf96250e217fc4e883cef16e9" dependencies = [ "ego-tree", - "itertools 0.14.0", + "itertools 0.15.0", "markdown", "miette", "rustc-hash", diff --git a/Cargo.toml b/Cargo.toml index 9c6dc4f..37622ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mq-db" -version = "0.1.2" +version = "0.1.3" edition = "2024" description = "Markdown-specialized embedded database with interval-indexed block storage and hierarchical query support" keywords = ["markdown", "jq", "query", "database"] @@ -10,8 +10,8 @@ repository = "https://github.com/harehare/mq-db" license = "MIT" [dependencies] -mq-markdown = "0.6.0" -mq-lang = "0.6.0" +mq-markdown = "0.6.3" +mq-lang = "0.6.3" serde_yaml = "0.9" thiserror = "2.0" sqlparser = "0.62" diff --git a/src/index.rs b/src/index.rs index 77f70f8..bd3d048 100644 --- a/src/index.rs +++ b/src/index.rs @@ -182,6 +182,11 @@ fn node_to_parts(node: &Node) -> Option<(BlockType, String, Properties)> { | Node::MdxJsEsm(_) => Some((BlockType::Paragraph, node.value(), props)), Node::Fragment(_) | Node::Empty => None, + + // New node types added by a newer mq-markdown — not yet mapped to a + // dedicated BlockType. Skip for now, consistent with Fragment/Empty. + #[allow(unreachable_patterns)] + _ => None, } } diff --git a/src/sql.rs b/src/sql.rs index fb559b9..97b45e5 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -62,6 +62,7 @@ use crate::{ block::{Block, BlockType, Properties, PropertyValue}, document::Document, indexes::{DocumentIndex, IndexHint}, + store::CustomTableState, }; #[derive(Debug, Clone, PartialEq)] @@ -955,8 +956,9 @@ impl<'a> SqlEngine<'a> { }); } let guard = self.store.custom_tables.read().unwrap(); - if let Some((columns, _)) = guard.get(table_name) { - let rows = columns + if let Some(state) = guard.get(table_name) { + let rows = state + .columns .iter() .map(|c| vec![c.clone(), "text".to_string()]) .collect(); @@ -1002,11 +1004,15 @@ impl<'a> SqlEngine<'a> { // CREATE TABLE name AS SELECT ... let result = self.exec_query(query)?; let n = result.rows.len(); - self.store - .custom_tables - .write() - .unwrap() - .insert(table_name, (result.columns, result.rows)); + self.store.custom_tables.write().unwrap().insert( + table_name, + CustomTableState { + columns: result.columns, + rows: result.rows, + first_row_page: 0, + last_row_page: 0, + }, + ); self.store.try_flush_catalog_to_storage(); return Ok(QueryOutput { columns: vec!["rows".to_string()], @@ -1038,11 +1044,15 @@ impl<'a> SqlEngine<'a> { "table '{table_name}' already exists" ))); } - self.store - .custom_tables - .write() - .unwrap() - .insert(table_name, (columns, vec![])); + self.store.custom_tables.write().unwrap().insert( + table_name, + CustomTableState { + columns, + rows: vec![], + first_row_page: 0, + last_row_page: 0, + }, + ); self.store.try_flush_catalog_to_storage(); Ok(QueryOutput { columns: vec!["result".to_string()], @@ -1071,7 +1081,7 @@ impl<'a> SqlEngine<'a> { let guard = self.store.custom_tables.read().unwrap(); let table_cols = guard .get(&table_name) - .map(|(c, _)| c.clone()) + .map(|state| state.columns.clone()) .ok_or_else(|| MqdbError::SqlExec(format!("unknown table: {table_name}")))?; drop(guard); let indices: Result, _> = ins @@ -1088,14 +1098,14 @@ impl<'a> SqlEngine<'a> { Some(indices?) }; - let inserted = { + let new_rows = { let mut guard = self.store.custom_tables.write().unwrap(); - let (table_cols, table_rows) = guard + let state = guard .get_mut(&table_name) .ok_or_else(|| MqdbError::SqlExec(format!("unknown table: {table_name}")))?; - let ncols = table_cols.len(); + let ncols = state.columns.len(); - let mut inserted = 0usize; + let mut new_rows = Vec::with_capacity(values_out.rows.len()); for src_row in &values_out.rows { let mut row = vec![String::new(); ncols]; match &col_indices { @@ -1116,12 +1126,17 @@ impl<'a> SqlEngine<'a> { } } } - table_rows.push(row); - inserted += 1; + state.rows.push(row.clone()); + new_rows.push(row); } - inserted + new_rows }; // write lock released before flush - self.store.try_flush_catalog_to_storage(); + let inserted = new_rows.len(); + // Append only the new rows to the on-disk chain instead of rewriting + // the whole table, so INSERT cost stays proportional to the rows + // being added rather than the table's total size. + self.store + .try_append_table_rows_to_storage(&table_name, &new_rows); Ok(QueryOutput { columns: vec!["rows_affected".to_string()], rows: vec![vec![inserted.to_string()]], @@ -1353,14 +1368,15 @@ impl<'a> SqlEngine<'a> { } other => { let guard = self.store.custom_tables.read().unwrap(); - if let Some((columns, custom_rows)) = guard.get(other) { + if let Some(state) = guard.get(other) { let prefix = alias.as_deref().unwrap_or(other); - let rows = custom_rows + let rows = state + .rows .iter() .map(|row_vals| { qualify_row( Row { - columns: columns.clone(), + columns: state.columns.clone(), values: row_vals .iter() .map(|v| Value::Str(v.clone())) diff --git a/src/storage.rs b/src/storage.rs index 2ce9cd7..edbc14a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -10,14 +10,19 @@ use crate::{ error::MqdbError, storage::{ catalog::{CatalogEntry, CustomTableEntry, read_catalog, write_catalog}, - codec::{decode_block, encode_block}, + codec::{decode_block, decode_table_rows, encode_block, encode_table_rows}, page::{ PAGE_BODY_SIZE, PAGE_HEADER_SIZE, PAGE_TYPE_BLOCK_DATA, PAGE_TYPE_CATALOG, - PAGE_TYPE_INDEX, PAGE_TYPE_OVERFLOW, PageFile, make_page, parse_page_header, + PAGE_TYPE_INDEX, PAGE_TYPE_OVERFLOW, PAGE_TYPE_TABLE_DATA, PageFile, make_page, + parse_page_header, }, }, }; +/// Usable bytes per table-row page, reserving 2 bytes for the page's real +/// (unpadded) chunk length. See [`Storage::write_table_row_chunks`]. +const TABLE_ROW_PAGE_CAPACITY: usize = PAGE_BODY_SIZE - 2; + pub struct Storage { page_file: PageFile, } @@ -235,6 +240,148 @@ impl Storage { Ok(bytes) } + + /// Write a fresh chain of table-row pages, starting a brand-new table. + /// Returns `(first_page, last_page)`, or `(0, 0)` if `rows` is empty + /// (nothing written — 0 is never a valid page id). + pub fn write_table_rows(&mut self, rows: &[Vec]) -> Result<(u32, u32), MqdbError> { + self.write_table_row_chunks(rows, PAGE_TYPE_TABLE_DATA) + } + + /// Append `rows` after an existing table-row chain by writing new pages + /// and relinking the current tail (`tail_page`) to point at them. + /// Returns the new tail page id (unchanged if `rows` is empty). + pub fn append_table_rows( + &mut self, + tail_page: u32, + rows: &[Vec], + ) -> Result { + if rows.is_empty() { + return Ok(tail_page); + } + + // The first page of an appended batch continues the existing chain, + // so it must be tagged OVERFLOW like every other non-head page — + // only the table's very first page is ever PAGE_TYPE_TABLE_DATA. + let (first_new, last_new) = self.write_table_row_chunks(rows, PAGE_TYPE_OVERFLOW)?; + self.relink_next(tail_page, first_new)?; + Ok(last_new) + } + + /// Table-row chains are built incrementally across many separate write + /// calls (one per `INSERT`), so — unlike block/index chains, which are + /// always written whole in one pass — a short trailing chunk can end up + /// in the *middle* of the logical chain, not just at its very end. + /// Padding it out to `PAGE_BODY_SIZE` would silently splice zero bytes + /// between two batches' real data. So each table-data/overflow page + /// reserves its first 2 bytes for the real length of the chunk it holds. + fn write_table_row_chunks( + &mut self, + rows: &[Vec], + head_page_type: u32, + ) -> Result<(u32, u32), MqdbError> { + if rows.is_empty() { + return Ok((0, 0)); + } + + let bytes = encode_table_rows(rows); + let chunks: Vec<&[u8]> = bytes.chunks(TABLE_ROW_PAGE_CAPACITY).collect(); + + let placeholder = make_page(PAGE_TYPE_OVERFLOW, 0, 0, &[]); + let mut page_ids = Vec::with_capacity(chunks.len()); + for _ in 0..chunks.len() { + page_ids.push(self.page_file.append_page(&placeholder)?); + } + + for (index, chunk) in chunks.iter().enumerate() { + let page_id = page_ids[index]; + let next_page = page_ids.get(index + 1).copied().unwrap_or(0); + let page_type = if index == 0 { + head_page_type + } else { + PAGE_TYPE_OVERFLOW + }; + let mut body = Vec::with_capacity(2 + chunk.len()); + body.extend_from_slice(&(chunk.len() as u16).to_le_bytes()); + body.extend_from_slice(chunk); + let page = make_page(page_type, page_id, next_page, &body); + self.page_file.write_page(page_id, &page)?; + } + + let first = *page_ids.first().expect("checked non-empty above"); + let last = *page_ids.last().expect("checked non-empty above"); + Ok((first, last)) + } + + /// Rewrite a single page's `next_page` pointer in place, preserving its + /// type, id, and body. Used to extend a page chain without touching any + /// other page. + fn relink_next(&mut self, page_id: u32, next_page: u32) -> Result<(), MqdbError> { + let page = self.page_file.read_page(page_id)?; + let (page_type, _, stored_page_id, _) = parse_page_header(&page); + let body = &page[PAGE_HEADER_SIZE..]; + let relinked = make_page(page_type, stored_page_id, next_page, body); + self.page_file.write_page(page_id, &relinked) + } + + /// Read all rows for a table given its chain head, row count, and column count. + pub fn read_table_rows( + &mut self, + first_page: u32, + num_rows: u32, + num_cols: usize, + ) -> Result>, MqdbError> { + if first_page == 0 || num_rows == 0 { + return Ok(Vec::new()); + } + + let mut bytes = Vec::new(); + let mut page_id = first_page; + let mut visited = HashSet::new(); + let mut first = true; + + loop { + if !visited.insert(page_id) { + return Err(invalid_data("table row page chain contains a cycle")); + } + + let page = self.page_file.read_page(page_id)?; + let (page_type, _, stored_page_id, next_page) = parse_page_header(&page); + let expected_type = if first { + PAGE_TYPE_TABLE_DATA + } else { + PAGE_TYPE_OVERFLOW + }; + if page_type != expected_type { + return Err(invalid_data(format!( + "unexpected page type {page_type} in table row chain; expected {expected_type}" + ))); + } + if stored_page_id != page_id { + return Err(invalid_data(format!( + "table row page header mismatch: expected {page_id}, found {stored_page_id}" + ))); + } + + let body = &page[PAGE_HEADER_SIZE..]; + let chunk_len = usize::from(u16::from_le_bytes([body[0], body[1]])); + let chunk_end = chunk_len + .checked_add(2) + .ok_or_else(|| invalid_data("table row page chunk length overflow"))?; + if chunk_end > body.len() { + return Err(invalid_data("table row page chunk length out of bounds")); + } + bytes.extend_from_slice(&body[2..chunk_end]); + + if next_page == 0 { + break; + } + page_id = next_page; + first = false; + } + + decode_table_rows(&bytes, num_rows as usize, num_cols) + } } #[cfg(test)] @@ -517,6 +664,54 @@ mod tests { assert_eq!(decoded, block); } + #[test] + fn table_row_chain_round_trip_across_multiple_appends() { + // Regression test for the incremental INSERT path: each batch is + // written with `append_table_rows` (mirroring multiple separate SQL + // INSERTs), and only the very first page of the whole chain should + // be tagged PAGE_TYPE_TABLE_DATA — every later page, including the + // head of each appended batch, must be PAGE_TYPE_OVERFLOW or the + // chain reader rejects it. + let path = test_file_path("table-row-chain-append"); + cleanup(&path); + + let mut storage = Storage::create(&path).unwrap(); + storage.flush_catalog(&[], &[]).unwrap(); + + let batch1 = vec![ + vec!["1".to_string(), "a".to_string()], + vec!["2".to_string(), "b".to_string()], + ]; + let batch2 = vec![vec!["3".to_string(), "c".to_string()]]; + let batch3 = vec![ + vec!["4".to_string(), "d".to_string()], + vec!["5".to_string(), "e".to_string()], + ]; + + let (first_page, last_page) = storage.write_table_rows(&batch1).unwrap(); + let last_page = storage.append_table_rows(last_page, &batch2).unwrap(); + let last_page = storage.append_table_rows(last_page, &batch3).unwrap(); + assert_ne!(last_page, 0); + + let all_rows = storage.read_table_rows(first_page, 5, 2).unwrap(); + let expected: Vec> = batch1.into_iter().chain(batch2).chain(batch3).collect(); + assert_eq!(all_rows, expected); + + // A batch large enough to span multiple pages, appended after the + // small single-page batches above, must not corrupt either side. + let big_batch: Vec> = (0..10) + .map(|i| vec![i.to_string(), "x".repeat(PAGE_BODY_SIZE)]) + .collect(); + let last_page = storage.append_table_rows(last_page, &big_batch).unwrap(); + assert_ne!(last_page, 0); + + let all_rows = storage.read_table_rows(first_page, 15, 2).unwrap(); + let expected: Vec> = expected.into_iter().chain(big_batch).collect(); + assert_eq!(all_rows, expected); + + cleanup(&path); + } + #[test] fn custom_table_round_trip() { let path = test_file_path("custom-table-round-trip"); diff --git a/src/storage/catalog.rs b/src/storage/catalog.rs index 579d25d..1650008 100644 --- a/src/storage/catalog.rs +++ b/src/storage/catalog.rs @@ -19,11 +19,20 @@ pub struct CatalogEntry { } /// A user-defined table entry stored in the catalog. +/// +/// Row data is *not* stored inline — it lives in its own page chain +/// (see [`crate::storage::Storage::write_table_rows`]) so that appending +/// rows only requires writing the new pages plus this small fixed-size +/// entry, instead of rewriting every previously-inserted row. #[derive(Debug, Clone, PartialEq)] pub struct CustomTableEntry { pub name: String, pub columns: Vec, - pub rows: Vec>, + /// First page of the row chain. 0 = no rows persisted yet. + pub first_row_page: u32, + /// Last page of the row chain — new rows are appended after this page. + pub last_row_page: u32, + pub num_rows: u32, } fn invalid_data(message: impl Into) -> MqdbError { @@ -88,14 +97,6 @@ impl<'a> Decoder<'a> { .map_err(|e| invalid_data(format!("invalid catalog string UTF-8: {e}"))) } - fn read_string_u32(&mut self) -> Result { - let len = usize::try_from(self.read_u32()?) - .map_err(|_| invalid_data("string length exceeds usize range"))?; - let bytes = self.read_exact(len)?; - String::from_utf8(bytes.to_vec()) - .map_err(|e| invalid_data(format!("invalid catalog string UTF-8: {e}"))) - } - fn remaining(&self) -> usize { self.data.len() - self.pos } @@ -122,8 +123,6 @@ fn serialize_catalog(entries: &[CatalogEntry], custom_tables: &[CustomTableEntry out.extend_from_slice(&entry.index_start_page.to_le_bytes()); } - // Custom tables section (appended for backward compatibility — old readers see trailing zeros - // and stop; new readers detect the count field). out.extend_from_slice(&as_u32(custom_tables.len(), "custom table count").to_le_bytes()); for ct in custom_tables { out.extend_from_slice(&as_u16(ct.name.len(), "table name length").to_le_bytes()); @@ -133,13 +132,9 @@ fn serialize_catalog(entries: &[CatalogEntry], custom_tables: &[CustomTableEntry out.extend_from_slice(&as_u16(col.len(), "column name length").to_le_bytes()); out.extend_from_slice(col.as_bytes()); } - out.extend_from_slice(&as_u32(ct.rows.len(), "row count").to_le_bytes()); - for row in &ct.rows { - for cell in row { - out.extend_from_slice(&as_u32(cell.len(), "cell length").to_le_bytes()); - out.extend_from_slice(cell.as_bytes()); - } - } + out.extend_from_slice(&ct.first_row_page.to_le_bytes()); + out.extend_from_slice(&ct.last_row_page.to_le_bytes()); + out.extend_from_slice(&ct.num_rows.to_le_bytes()); } out @@ -246,8 +241,6 @@ pub fn read_catalog( }); } - // Custom tables section — present in new-format files; old files have trailing zeros here - // which decode as count=0 (backward compatible). let custom_tables = if decoder.remaining() >= 4 { let count = usize::try_from(decoder.read_u32()?) .map_err(|_| invalid_data("custom table count exceeds usize range"))?; @@ -259,20 +252,15 @@ pub fn read_catalog( for _ in 0..num_cols { columns.push(decoder.read_string_u16()?); } - let num_rows = usize::try_from(decoder.read_u32()?) - .map_err(|_| invalid_data("row count exceeds usize range"))?; - let mut rows = Vec::with_capacity(num_rows); - for _ in 0..num_rows { - let mut row = Vec::with_capacity(num_cols); - for _ in 0..num_cols { - row.push(decoder.read_string_u32()?); - } - rows.push(row); - } + let first_row_page = decoder.read_u32()?; + let last_row_page = decoder.read_u32()?; + let num_rows = decoder.read_u32()?; tables.push(CustomTableEntry { name, columns, - rows, + first_row_page, + last_row_page, + num_rows, }); } tables diff --git a/src/storage/codec.rs b/src/storage/codec.rs index 14463c6..8abe22e 100644 --- a/src/storage/codec.rs +++ b/src/storage/codec.rs @@ -286,6 +286,36 @@ pub fn decode_block(data: &[u8]) -> Result<(Block, usize), MqdbError> { )) } +/// Encode custom-table rows as a flat stream of u32-length-prefixed cells. +/// Decoding requires the column count since rows are not self-delimiting. +pub fn encode_table_rows(rows: &[Vec]) -> Vec { + let mut out = Vec::new(); + for row in rows { + for cell in row { + out.extend_from_slice(&as_u32(cell.len(), "cell length").to_le_bytes()); + out.extend_from_slice(cell.as_bytes()); + } + } + out +} + +pub fn decode_table_rows( + data: &[u8], + num_rows: usize, + num_cols: usize, +) -> Result>, MqdbError> { + let mut decoder = Decoder::new(data); + let mut rows = Vec::with_capacity(num_rows); + for _ in 0..num_rows { + let mut row = Vec::with_capacity(num_cols); + for _ in 0..num_cols { + row.push(decoder.read_string_u32()?); + } + rows.push(row); + } + Ok(rows) +} + pub fn encode_zone_map(zm: &ZoneMaps) -> Vec { let mut out = Vec::new(); out.push(zm.max_heading_depth); diff --git a/src/storage/page.rs b/src/storage/page.rs index dee9929..c07e4d1 100644 --- a/src/storage/page.rs +++ b/src/storage/page.rs @@ -16,9 +16,10 @@ pub(crate) const PAGE_TYPE_CATALOG: u32 = 2; pub(crate) const PAGE_TYPE_BLOCK_DATA: u32 = 3; pub(crate) const PAGE_TYPE_OVERFLOW: u32 = 4; pub(crate) const PAGE_TYPE_INDEX: u32 = 5; +pub(crate) const PAGE_TYPE_TABLE_DATA: u32 = 6; const FILE_MAGIC: u32 = 0x4D51_4442; -const FILE_VERSION: u32 = 2; +const FILE_VERSION: u32 = 3; const CATALOG_START_PAGE: u32 = 1; fn invalid_data(message: impl Into) -> MqdbError { diff --git a/src/store.rs b/src/store.rs index 5b24b49..5ce6dd2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -4,7 +4,18 @@ use std::{ sync::{Mutex, RwLock}, }; -type CustomTable = (Vec, Vec>); +/// In-memory state for a user-defined table. +/// +/// `first_row_page`/`last_row_page` track where this table's rows live in +/// the backing storage file (0 = not persisted yet), so a SQL `INSERT` +/// can append just the new rows to the chain instead of rewriting `rows` +/// in full on every call. See [`Storage::write_table_rows`]. +pub(crate) struct CustomTableState { + pub columns: Vec, + pub rows: Vec>, + pub first_row_page: u32, + pub last_row_page: u32, +} use mq_markdown::Markdown; @@ -22,6 +33,36 @@ use crate::{ }, }; +/// Persists any table whose rows have never been written to `storage` (i.e. +/// `first_row_page == 0`), then builds catalog entries for every table. +/// +/// Tables that already have a row-page chain are left untouched here — their +/// pages were already written by an earlier flush or incremental `INSERT` +/// append (see [`DocumentStore::try_append_table_rows_to_storage`]). +fn persist_unsaved_table_rows( + storage: &mut Storage, + custom_tables: &RwLock>, +) -> Result, MqdbError> { + let mut guard = custom_tables.write().unwrap(); + for state in guard.values_mut() { + if state.first_row_page == 0 && !state.rows.is_empty() { + let (first, last) = storage.write_table_rows(&state.rows)?; + state.first_row_page = first; + state.last_row_page = last; + } + } + Ok(guard + .iter() + .map(|(name, state)| CustomTableEntry { + name: name.clone(), + columns: state.columns.clone(), + first_row_page: state.first_row_page, + last_row_page: state.last_row_page, + num_rows: state.rows.len() as u32, + }) + .collect()) +} + /// The top-level embedded document store. /// /// Holds a collection of parsed Markdown documents and provides access to @@ -67,7 +108,7 @@ pub struct DocumentStore { /// User-registered virtual tables: name → (columns, rows). /// Uses `RwLock` for interior mutability so `SqlEngine` can execute DDL /// (`CREATE TABLE`, `INSERT INTO`, `DROP TABLE`) with only `&DocumentStore`. - pub(crate) custom_tables: RwLock>, + pub(crate) custom_tables: RwLock>, } impl Default for DocumentStore { @@ -107,10 +148,15 @@ impl DocumentStore { columns: Vec, rows: Vec>, ) { - self.custom_tables - .write() - .unwrap() - .insert(name.into(), (columns, rows)); + self.custom_tables.write().unwrap().insert( + name.into(), + CustomTableState { + columns, + rows, + first_row_page: 0, + last_row_page: 0, + }, + ); } /// Remove a previously registered custom table. Returns `true` if it existed. @@ -202,18 +248,7 @@ impl DocumentStore { let mut storage_guard = self.storage.lock().unwrap(); if let Some(storage) = storage_guard.as_mut() { // Reconstruct catalog entries from already-loaded document metadata. - let mut entries: Vec = self - .documents - .iter() - .map(|d| CatalogEntry { - document_id: d.id, - path: d.path.as_ref().map(|p| p.to_string_lossy().into_owned()), - first_block_page: d.first_block_page, - num_blocks: d.block_count, - zone_map_bytes: encode_zone_map(&d.zone_maps), - index_start_page: d.index_start_page, - }) - .collect(); + let mut entries = self.catalog_entries(); let first_block_page = storage.write_document(&doc)?; doc.first_block_page = first_block_page; @@ -231,17 +266,7 @@ impl DocumentStore { index_start_page, }); - let ct_guard = self.custom_tables.read().unwrap(); - let custom: Vec = ct_guard - .iter() - .map(|(name, (cols, rows))| CustomTableEntry { - name: name.clone(), - columns: cols.clone(), - rows: rows.clone(), - }) - .collect(); - drop(ct_guard); - + let custom = persist_unsaved_table_rows(storage, &self.custom_tables)?; storage.flush_catalog(&entries, &custom)?; Some(idx) } else { @@ -336,38 +361,93 @@ impl DocumentStore { self.doc_indexes.get(i).and_then(|o| o.as_ref()) } + /// Builds catalog entries for every in-memory document. + fn catalog_entries(&self) -> Vec { + self.documents + .iter() + .map(|d| CatalogEntry { + document_id: d.id, + path: d.path.as_ref().map(|p| p.to_string_lossy().into_owned()), + first_block_page: d.first_block_page, + num_blocks: d.block_count, + zone_map_bytes: encode_zone_map(&d.zone_maps), + index_start_page: d.index_start_page, + }) + .collect() + } + /// Flush the catalog (including custom tables) to the backing storage file, - /// if one is open. Called automatically after DDL operations. No-op for - /// in-memory stores. + /// if one is open. Called automatically after DDL operations such as + /// `CREATE TABLE` and `DROP TABLE`. No-op for in-memory stores. + /// + /// Any table whose rows have never been persisted is written out in full + /// here (a one-time cost). Tables already backed by a row-page chain keep + /// their existing pages untouched — see + /// [`try_append_table_rows_to_storage`](DocumentStore::try_append_table_rows_to_storage) + /// for the incremental `INSERT` path. pub(crate) fn try_flush_catalog_to_storage(&self) { let mut guard = self.storage.lock().unwrap(); if let Some(storage) = guard.as_mut() { - let entries: Vec = self - .documents - .iter() - .map(|d| CatalogEntry { - document_id: d.id, - path: d.path.as_ref().map(|p| p.to_string_lossy().into_owned()), - first_block_page: d.first_block_page, - num_blocks: d.block_count, - zone_map_bytes: encode_zone_map(&d.zone_maps), - index_start_page: d.index_start_page, - }) - .collect(); - let ct_guard = self.custom_tables.read().unwrap(); - let custom: Vec = ct_guard - .iter() - .map(|(name, (cols, rows))| CustomTableEntry { - name: name.clone(), - columns: cols.clone(), - rows: rows.clone(), - }) - .collect(); - drop(ct_guard); - let _ = storage.flush_catalog(&entries, &custom); + let entries = self.catalog_entries(); + if let Ok(custom) = persist_unsaved_table_rows(storage, &self.custom_tables) { + let _ = storage.flush_catalog(&entries, &custom); + } } } + /// Append `new_rows` to `table_name`'s on-disk row chain and flush a + /// lightweight catalog update — no full row rewrite. No-op for in-memory + /// stores or unknown tables. + /// + /// This is what makes `INSERT INTO ` incremental: the cost is + /// proportional to the rows being inserted, not to the table's total size. + pub(crate) fn try_append_table_rows_to_storage( + &self, + table_name: &str, + new_rows: &[Vec], + ) { + let mut guard = self.storage.lock().unwrap(); + let storage = match guard.as_mut() { + Some(s) => s, + None => return, + }; + + { + let mut ct_guard = self.custom_tables.write().unwrap(); + if let Some(state) = ct_guard.get_mut(table_name) { + let persisted = if state.first_row_page == 0 { + // Nothing persisted yet for this table — write everything + // currently in memory (covers rows seeded via + // `register_table` plus the ones just inserted). + storage.write_table_rows(&state.rows) + } else { + storage + .append_table_rows(state.last_row_page, new_rows) + .map(|last| (state.first_row_page, last)) + }; + if let Ok((first, last)) = persisted { + state.first_row_page = first; + state.last_row_page = last; + } + } + } + + let entries = self.catalog_entries(); + let ct_guard = self.custom_tables.read().unwrap(); + let custom: Vec = ct_guard + .iter() + .map(|(name, state)| CustomTableEntry { + name: name.clone(), + columns: state.columns.clone(), + first_row_page: state.first_row_page, + last_row_page: state.last_row_page, + num_rows: state.rows.len() as u32, + }) + .collect(); + drop(ct_guard); + let _ = storage.flush_catalog(&entries, &custom); + } + // ───────────────────────────────────────────────────────────────────────── // Persistence // ───────────────────────────────────────────────────────────────────────── @@ -409,15 +489,22 @@ impl DocumentStore { entries[i].index_start_page = storage.write_index(&bytes)?; } + // This writes into a brand-new file, so each table's rows are + // written fresh here rather than reusing `first_row_page` / + // `last_row_page` from `self`, which (if set) point into a + // *different*, already-open backing file. let ct_guard = self.custom_tables.read().unwrap(); - let custom: Vec = ct_guard - .iter() - .map(|(name, (cols, rows))| CustomTableEntry { + let mut custom = Vec::with_capacity(ct_guard.len()); + for (name, state) in ct_guard.iter() { + let (first_row_page, last_row_page) = storage.write_table_rows(&state.rows)?; + custom.push(CustomTableEntry { name: name.clone(), - columns: cols.clone(), - rows: rows.clone(), - }) - .collect(); + columns: state.columns.clone(), + first_row_page, + last_row_page, + num_rows: state.rows.len() as u32, + }); + } drop(ct_guard); storage.flush_catalog(&entries, &custom)?; @@ -464,7 +551,16 @@ impl DocumentStore { let mut custom_tables = HashMap::new(); for ct in custom_table_entries { - custom_tables.insert(ct.name, (ct.columns, ct.rows)); + let rows = storage.read_table_rows(ct.first_row_page, ct.num_rows, ct.columns.len())?; + custom_tables.insert( + ct.name, + CustomTableState { + columns: ct.columns, + rows, + first_row_page: ct.first_row_page, + last_row_page: ct.last_row_page, + }, + ); } Ok(Self { @@ -502,7 +598,16 @@ impl DocumentStore { let mut custom_tables = HashMap::new(); for ct in custom_table_entries { - custom_tables.insert(ct.name, (ct.columns, ct.rows)); + let rows = storage.read_table_rows(ct.first_row_page, ct.num_rows, ct.columns.len())?; + custom_tables.insert( + ct.name, + CustomTableState { + columns: ct.columns, + rows, + first_row_page: ct.first_row_page, + last_row_page: ct.last_row_page, + }, + ); } Ok(Self {