Skip to content

Commit 2860fe6

Browse files
refactor(tables): wrapper owns row-write transactions; trx no longer crosses the boundary
1 parent 83ada0e commit 2860fe6

4 files changed

Lines changed: 237 additions & 136 deletions

File tree

apps/sim/lib/table/__tests__/table-wrapper.test.ts

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ vi.mock('@/lib/table/sql', () => ({
3939

4040
import {
4141
buildOrderedRowValues,
42-
closeGapAfterDelete,
43-
compactAfterDelete,
42+
deleteOrderedRow,
43+
deleteOrderedRowsByIds,
44+
insertOrderedRow,
4445
queryRows,
4546
reserveBatchPositions,
4647
reserveInsertPosition,
@@ -123,23 +124,66 @@ describe('table-wrapper', () => {
123124
})
124125
})
125126

126-
describe('closeGapAfterDelete', () => {
127-
it('shifts trailing rows down with a single UPDATE', async () => {
128-
await closeGapAfterDelete(trx, 'tbl-1', 3)
127+
describe('insertOrderedRow', () => {
128+
it('reserves an append position and inserts in one transaction', async () => {
129+
dbChainMockFns.where.mockResolvedValueOnce([{ maxPos: 2 }]) // nextRowPosition
130+
dbChainMockFns.returning.mockResolvedValueOnce([
131+
{ id: 'row-1', data: { a: 1 }, position: 3, createdAt: new Date(), updatedAt: new Date() },
132+
])
133+
const row = await insertOrderedRow({
134+
tableId: 'tbl-1',
135+
workspaceId: 'ws-1',
136+
data: { a: 1 },
137+
rowId: 'row-1',
138+
now: new Date('2024-01-01'),
139+
})
140+
expect(row.id).toBe('row-1')
141+
expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1)
142+
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
143+
})
144+
})
145+
146+
describe('deleteOrderedRow', () => {
147+
it('returns false when no row matches', async () => {
148+
dbChainMockFns.returning.mockResolvedValueOnce([])
149+
expect(await deleteOrderedRow({ tableId: 'tbl-1', rowId: 'x', workspaceId: 'ws-1' })).toBe(
150+
false
151+
)
152+
expect(dbChainMockFns.update).not.toHaveBeenCalled()
153+
})
154+
155+
it('deletes and closes the gap when a row matches', async () => {
156+
dbChainMockFns.returning.mockResolvedValueOnce([{ position: 4 }])
157+
expect(await deleteOrderedRow({ tableId: 'tbl-1', rowId: 'r', workspaceId: 'ws-1' })).toBe(
158+
true
159+
)
160+
// closeGapAfterDelete → one shift-down UPDATE.
129161
expect(dbChainMockFns.update).toHaveBeenCalledTimes(1)
130-
expect(dbChainMockFns.set).toHaveBeenCalledTimes(1)
131162
})
132163
})
133164

134-
describe('compactAfterDelete', () => {
135-
it('runs a recompaction UPDATE (scoped form)', async () => {
136-
await compactAfterDelete(trx, 'tbl-1', 4)
137-
expect(trx.execute).toHaveBeenCalledTimes(1)
165+
describe('deleteOrderedRowsByIds', () => {
166+
it('no-ops on an empty id list', async () => {
167+
expect(
168+
await deleteOrderedRowsByIds({ tableId: 'tbl-1', workspaceId: 'ws-1', rowIds: [] })
169+
).toEqual([])
170+
expect(dbChainMockFns.transaction).not.toHaveBeenCalled()
138171
})
139172

140-
it('runs a recompaction UPDATE (full form)', async () => {
141-
await compactAfterDelete(trx, 'tbl-1')
142-
expect(trx.execute).toHaveBeenCalledTimes(1)
173+
it('deletes in a transaction and recompacts when rows were removed', async () => {
174+
dbChainMockFns.returning.mockResolvedValueOnce([
175+
{ id: 'a', position: 1 },
176+
{ id: 'b', position: 3 },
177+
])
178+
const deleted = await deleteOrderedRowsByIds({
179+
tableId: 'tbl-1',
180+
workspaceId: 'ws-1',
181+
rowIds: ['a', 'b'],
182+
})
183+
expect(deleted).toHaveLength(2)
184+
expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1)
185+
// Recompaction runs as a raw UPDATE via trx.execute.
186+
expect(trx.execute).toHaveBeenCalled()
143187
})
144188
})
145189

apps/sim/lib/table/service.ts

Lines changed: 22 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,17 @@ import { CSV_MAX_BATCH_SIZE } from './import'
2828
import { buildFilterClause } from './sql'
2929
import {
3030
acquireRowOrderLock,
31-
closeGapAfterDelete,
32-
compactAfterDelete,
31+
deleteOrderedRow,
32+
deleteOrderedRowsByIds,
33+
insertOrderedRow,
3334
loadExecutionsByRow,
3435
loadExecutionsForRow,
3536
queryRows as queryRowsWrapper,
3637
reserveBatchPositions,
3738
reserveInsertPosition,
3839
} from './table-wrapper'
3940
import { fireTableTrigger } from './trigger'
41+
import { type DbTransaction, setTableTxTimeouts } from './tx'
4042
import type {
4143
AddWorkflowGroupData,
4244
BatchInsertData,
@@ -100,29 +102,6 @@ export class TableConflictError extends Error {
100102

101103
export type TableScope = 'active' | 'archived' | 'all'
102104

103-
type DbTransaction = Parameters<Parameters<typeof db.transaction>[0]>[0]
104-
105-
/**
106-
* Sets per-transaction Postgres timeouts via `SET LOCAL`.
107-
*
108-
* `lock_timeout` is the critical one: without it, a waiter inherits the full
109-
* `statement_timeout` clock, so one stuck writer can drain the pool.
110-
*
111-
* Safe under pgBouncer transaction pooling — `SET LOCAL` is transaction-scoped
112-
* and cleared at COMMIT/ROLLBACK before the session returns to the pool.
113-
*/
114-
async function setTableTxTimeouts(
115-
trx: DbTransaction,
116-
opts?: { statementMs?: number; lockMs?: number; idleMs?: number }
117-
) {
118-
const s = opts?.statementMs ?? 10_000
119-
const l = opts?.lockMs ?? 3_000
120-
const i = opts?.idleMs ?? 5_000
121-
await trx.execute(sql.raw(`SET LOCAL statement_timeout = '${s}ms'`))
122-
await trx.execute(sql.raw(`SET LOCAL lock_timeout = '${l}ms'`))
123-
await trx.execute(sql.raw(`SET LOCAL idle_in_transaction_session_timeout = '${i}ms'`))
124-
}
125-
126105
/**
127106
* Serializes schema/metadata read-modify-writes for a single table so
128107
* concurrent mutators can't clobber each other's `schema` JSONB
@@ -996,24 +975,14 @@ export async function insertRow(
996975
// (migration 0198): a single conditional UPDATE on user_table_definitions
997976
// increments row_count iff row_count < max_rows, taking the row lock
998977
// atomically. No app-level FOR UPDATE / COUNT needed.
999-
const [row] = await db.transaction(async (trx) => {
1000-
await setTableTxTimeouts(trx)
1001-
1002-
const targetPosition = await reserveInsertPosition(trx, data.tableId, data.position)
1003-
1004-
return trx
1005-
.insert(userTableRows)
1006-
.values({
1007-
id: rowId,
1008-
tableId: data.tableId,
1009-
workspaceId: data.workspaceId,
1010-
data: data.data,
1011-
position: targetPosition,
1012-
createdAt: now,
1013-
updatedAt: now,
1014-
...(data.userId ? { createdBy: data.userId } : {}),
1015-
})
1016-
.returning()
978+
const row = await insertOrderedRow({
979+
tableId: data.tableId,
980+
workspaceId: data.workspaceId,
981+
data: data.data,
982+
rowId,
983+
position: data.position,
984+
createdBy: data.userId,
985+
now,
1017986
})
1018987

1019988
logger.info(`[${requestId}] Inserted row ${rowId} into table ${data.tableId}`)
@@ -2307,23 +2276,8 @@ export async function deleteRow(
23072276
workspaceId: string,
23082277
requestId: string
23092278
): Promise<void> {
2310-
await db.transaction(async (trx) => {
2311-
await setTableTxTimeouts(trx)
2312-
const [deleted] = await trx
2313-
.delete(userTableRows)
2314-
.where(
2315-
and(
2316-
eq(userTableRows.id, rowId),
2317-
eq(userTableRows.tableId, tableId),
2318-
eq(userTableRows.workspaceId, workspaceId)
2319-
)
2320-
)
2321-
.returning({ position: userTableRows.position })
2322-
2323-
if (!deleted) throw new Error('Row not found')
2324-
2325-
await closeGapAfterDelete(trx, tableId, deleted.position)
2326-
})
2279+
const deleted = await deleteOrderedRow({ tableId, rowId, workspaceId })
2280+
if (!deleted) throw new Error('Row not found')
23272281

23282282
logger.info(`[${requestId}] Deleted row ${rowId} from table ${tableId}`)
23292283
}
@@ -2707,28 +2661,11 @@ export async function deleteRowsByFilter(
27072661
}
27082662

27092663
const rowIds = matchingRows.map((r) => r.id)
2710-
const minDeletedPos = matchingRows.reduce(
2711-
(min, r) => (r.position < min ? r.position : min),
2712-
matchingRows[0].position
2713-
)
2714-
2715-
await db.transaction(async (trx) => {
2716-
await setTableTxTimeouts(trx, { statementMs: 60_000 })
2717-
for (let i = 0; i < rowIds.length; i += TABLE_LIMITS.DELETE_BATCH_SIZE) {
2718-
const batch = rowIds.slice(i, i + TABLE_LIMITS.DELETE_BATCH_SIZE)
2719-
await trx.delete(userTableRows).where(
2720-
and(
2721-
eq(userTableRows.tableId, table.id),
2722-
eq(userTableRows.workspaceId, table.workspaceId),
2723-
sql`${userTableRows.id} = ANY(ARRAY[${sql.join(
2724-
batch.map((id) => sql`${id}`),
2725-
sql`, `
2726-
)}])`
2727-
)
2728-
)
2729-
}
27302664

2731-
await compactAfterDelete(trx, table.id, minDeletedPos)
2665+
await deleteOrderedRowsByIds({
2666+
tableId: table.id,
2667+
workspaceId: table.workspaceId,
2668+
rowIds,
27322669
})
27332670

27342671
logger.info(`[${requestId}] Deleted ${matchingRows.length} rows from table ${table.id}`)
@@ -2752,36 +2689,10 @@ export async function deleteRowsByIds(
27522689
): Promise<BulkDeleteByIdsResult> {
27532690
const uniqueRequestedRowIds = Array.from(new Set(data.rowIds))
27542691

2755-
const deletedRows = await db.transaction(async (trx) => {
2756-
await setTableTxTimeouts(trx, { statementMs: 60_000 })
2757-
const deleted: { id: string; position: number }[] = []
2758-
for (let i = 0; i < uniqueRequestedRowIds.length; i += TABLE_LIMITS.DELETE_BATCH_SIZE) {
2759-
const batch = uniqueRequestedRowIds.slice(i, i + TABLE_LIMITS.DELETE_BATCH_SIZE)
2760-
const rows = await trx
2761-
.delete(userTableRows)
2762-
.where(
2763-
and(
2764-
eq(userTableRows.tableId, data.tableId),
2765-
eq(userTableRows.workspaceId, data.workspaceId),
2766-
sql`${userTableRows.id} = ANY(ARRAY[${sql.join(
2767-
batch.map((id) => sql`${id}`),
2768-
sql`, `
2769-
)}])`
2770-
)
2771-
)
2772-
.returning({ id: userTableRows.id, position: userTableRows.position })
2773-
deleted.push(...rows)
2774-
}
2775-
2776-
if (deleted.length > 0) {
2777-
const minDeletedPos = deleted.reduce(
2778-
(min, r) => (r.position < min ? r.position : min),
2779-
deleted[0].position
2780-
)
2781-
await compactAfterDelete(trx, data.tableId, minDeletedPos)
2782-
}
2783-
2784-
return deleted
2692+
const deletedRows = await deleteOrderedRowsByIds({
2693+
tableId: data.tableId,
2694+
workspaceId: data.workspaceId,
2695+
rowIds: uniqueRequestedRowIds,
27852696
})
27862697

27872698
const deletedIds = deletedRows.map((r) => r.id)

0 commit comments

Comments
 (0)