diff --git a/job-service-container-tests/src/test/java/com/github/jobservice/containertests/JobServiceIT.java b/job-service-container-tests/src/test/java/com/github/jobservice/containertests/JobServiceIT.java index 10df8a562..e62f1a484 100644 --- a/job-service-container-tests/src/test/java/com/github/jobservice/containertests/JobServiceIT.java +++ b/job-service-container-tests/src/test/java/com/github/jobservice/containertests/JobServiceIT.java @@ -1427,12 +1427,140 @@ public void testDeleteLog() throws SQLException assertEquals(foundTables.size(), 0); // assert number of rows in delete_log to be 0. assertEquals(getRowsInDeleteLog(dbConnection), 0); + assertEquals(getRowsInDeleteLogFailed(dbConnection), 0, + "delete_log_failed should be empty after all tables are dropped successfully"); } catch (final Exception e) { LOG.error(e.getMessage(), e); Assert.fail(); } } + @Test + public void testDeleteLogBacklogWhenDropTableFails() throws SQLException + { + final String tablePrefix = "task_delete_log_lock_" + System.currentTimeMillis(); + final String parentTableName = tablePrefix; + + // lockAcquired[0]: set by lock thread once the lock is held — gates the procedure call. + // releaseLock[0]: set by main thread after procedure returns — signals lock thread to release. + final boolean[] lockAcquired = {false}; + final boolean[] releaseLock = {false}; + + Thread lockThread = null; + final String firstChildTable = parentTableName + ".1"; + try (final java.sql.Connection dbConnection = JobServiceConnectionUtil.getDbConnection()) + { + // Create a hierarchy and queue it for asynchronous deletion. + createTaskTable(dbConnection, parentTableName); + insertRecordsInTaskTable(dbConnection, parentTableName, 20); + createAndPopulateChildTables(dbConnection, parentTableName); + insertTableNameIntoParentTableLog(dbConnection, parentTableName); + + // Acquire an EXCLUSIVE lock on the first child table before the procedure starts. + lockThread = new Thread(() -> { + System.out.println("Lock thread started"); + try (final java.sql.Connection lockConn = JobServiceConnectionUtil.getDbConnection()) { + lockConn.setAutoCommit(false); + try (final PreparedStatement lockStmt = lockConn.prepareStatement( + "LOCK TABLE \"" + firstChildTable + "\" IN EXCLUSIVE MODE")) { + lockStmt.execute(); + } + // Signal the main thread that the lock is held. + synchronized (lockAcquired) { + lockAcquired[0] = true; + lockAcquired.notifyAll(); + } + System.out.println("Lock acquired on " + firstChildTable + ", holding until procedure completes"); + // Hold the lock until the main thread signals that the procedure has finished. + final long deadline = System.currentTimeMillis() + 15_000L; + synchronized (releaseLock) { + while (!releaseLock[0] && System.currentTimeMillis() < deadline) { + releaseLock.wait(500); + } + } + lockConn.rollback(); + System.out.println("Lock released on " + firstChildTable); + } catch (final Exception ex) { + LOG.warn("Lock thread error", ex); + } + }); + lockThread.start(); + + // Wait until the lock thread confirms the lock is held before calling the procedure. + synchronized (lockAcquired) { + final long deadline = System.currentTimeMillis() + 10_000L; + while (!lockAcquired[0] && System.currentTimeMillis() < deadline) { + lockAcquired.wait(500); + } + } + if (!lockAcquired[0]) { + Assert.fail("Lock thread did not acquire the lock within 10 seconds"); + } + + // Make the failing DROP deterministic and fast. + setLockTimeout(dbConnection, "1s"); + + // Procedure now handles DROP failures internally — it no longer throws to the caller. + System.out.println("Calling drop_deleted_task_tables procedure which should encounter lock and handle it internally"); + callDropDeletedTaskTables(dbConnection); + System.out.println("drop_deleted_task_tables procedure completed"); + + // Procedure has returned — signal the lock thread to release, then assert. + synchronized (releaseLock) { + releaseLock[0] = true; + releaseLock.notifyAll(); + } + // The procedure's internal WHILE loop runs until no retryable entries remain, so within + // a single call firstChildTable is attempted max_retries (3) times, exhausted, and evicted + // to delete_log_failed. It is no longer in delete_log at this point. + assertFalse(entryInDeleteLog(dbConnection, firstChildTable), + "firstChildTable should have been evicted from delete_log after exhausting max_retries"); + // Verify the entry landed in the dead-letter table with a captured error message. + final String lastError = getLastErrorFromDeleteLogFailed(dbConnection, firstChildTable); + assertNotNull(lastError, + "last_error in delete_log_failed should be set after failed drop attempts"); + assertFalse(lastError.isEmpty(), + "last_error in delete_log_failed should not be empty"); + // delete_log must be empty — all other tables were dropped, firstChildTable was evicted. + assertEquals(getRowsInDeleteLog(dbConnection), 0, + "delete_log should be empty after procedure completes"); + // The physical table must still exist — it could not be dropped. + final List tablesAfterDropAttempt = getTablesByPrefix(dbConnection, firstChildTable); + assertFalse(tablesAfterDropAttempt.isEmpty(), + "firstChildTable should still physically exist since DROP was blocked"); + + } catch (final Exception e) { + LOG.error(e.getMessage(), e); + Assert.fail(); + } finally { + // Ensure the lock thread is always unblocked even if the try block threw. + synchronized (releaseLock) { + releaseLock[0] = true; + releaseLock.notifyAll(); + } + if (lockThread != null) { + try { + lockThread.join(5000); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + // Best-effort cleanup so later tests are not affected by leftover task tables. + try (final java.sql.Connection cleanupConnection = JobServiceConnectionUtil.getDbConnection()) { + setLockTimeout(cleanupConnection, "30s"); + for (int i = 0; i < 5; i++) { + callDropDeletedTaskTables(cleanupConnection); + if (getTablesByPrefix(cleanupConnection, tablePrefix).isEmpty()) { + break; + } + Thread.sleep(1000); + } + } catch (final Exception cleanupError) { + LOG.warn("Cleanup after testDeleteLogBacklogWhenDropTableFails did not fully complete", cleanupError); + } + } + } + @Test public void testUpdateJobProgress() { @@ -1619,6 +1747,74 @@ private int getRowsInDeleteLog(final java.sql.Connection dbConnection) throws SQ return 0; } + private int getRowsInDeleteLogFailed(final java.sql.Connection dbConnection) throws SQLException + { + try (final PreparedStatement stmt = dbConnection.prepareStatement( + "SELECT COUNT(*) FROM delete_log_failed"); + final ResultSet rs = stmt.executeQuery()) { + if (rs.next()) { + return rs.getInt(1); + } + } + return 0; + } + + private String getLastErrorFromDeleteLogFailed(final java.sql.Connection dbConnection, final String tableName) throws SQLException + { + try (final PreparedStatement stmt = dbConnection.prepareStatement( + "SELECT last_error FROM delete_log_failed WHERE table_name = ?")) { + stmt.setString(1, tableName); + try (final ResultSet rs = stmt.executeQuery()) { + if (rs.next()) { + return rs.getString(1); + } + } + } + return null; + } + + private boolean entryInDeleteLog(final java.sql.Connection dbConnection, final String tableName) throws SQLException + { + try (final PreparedStatement stmt = dbConnection.prepareStatement( + "SELECT 1 FROM delete_log WHERE table_name = ?")) { + stmt.setString(1, tableName); + try (final ResultSet rs = stmt.executeQuery()) { + return rs.next(); + } + } + } + + private List getTablesByPrefix(final java.sql.Connection dbConnection, final String tablePrefix) throws SQLException + { + final List foundTables = new ArrayList(); + try (final PreparedStatement stmt = dbConnection.prepareStatement( + "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE ?")) { + stmt.setString(1, tablePrefix + "%"); + try (final ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + foundTables.add(rs.getString(1)); + } + } + } + return foundTables; + } + + private void callDropDeletedTaskTables(final java.sql.Connection dbConnection) throws SQLException + { + try (final CallableStatement dropDeletedTables = dbConnection.prepareCall("call drop_deleted_task_tables()")) { + dropDeletedTables.execute(); + } + } + + private void setLockTimeout(final java.sql.Connection dbConnection, final String timeout) throws SQLException + { + try (final PreparedStatement lockTimeoutStmt = dbConnection.prepareStatement( + "SELECT set_config('lock_timeout', ?, false)")) { + lockTimeoutStmt.setString(1, timeout); + lockTimeoutStmt.execute(); + } + } + /** * Retrieve a single message from a queue. Call this before triggering the message publish. * diff --git a/job-service-db/src/main/resources/db/migration/V9__add_delete_log_retry_tracking.sql b/job-service-db/src/main/resources/db/migration/V9__add_delete_log_retry_tracking.sql new file mode 100644 index 000000000..b4a81a8da --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/V9__add_delete_log_retry_tracking.sql @@ -0,0 +1,45 @@ +-- +-- Copyright 2016-2026 Open Text. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + ************************************************************** + ** Add retry tracking columns to delete_log and create the ** + ** delete_log_failed dead-letter table. ** + ** ** + ** delete_log table now records how many times a ** + ** deletion has been attempted, the last error encountered, ** + ** and when it was last tried. Entries that exceed the ** + ** maximum retry threshold are promoted to delete_log_failed ** + ** so that operators can inspect and remediate them without ** + ** blocking ongoing cleanup work. ** + ************************************************************** + */ + +-- Add retry tracking columns to the existing delete_log table. +ALTER TABLE public.delete_log + ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS last_error TEXT, + ADD COLUMN IF NOT EXISTS last_attempted_at TIMESTAMPTZ; + +-- Dead-letter table for delete_log entries that have exhausted all retries. +CREATE TABLE IF NOT EXISTS public.delete_log_failed +( + table_name VARCHAR(63) NOT NULL, + last_error TEXT, + first_failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_attempted_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT delete_log_failed_pkey PRIMARY KEY (table_name) +); diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__insertDeleteLog.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__insertDeleteLog.sql index 66196d396..cfc4ebe15 100644 --- a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__insertDeleteLog.sql +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__insertDeleteLog.sql @@ -1,5 +1,5 @@ -- --- Copyright 2016-2022 Micro Focus or one of its affiliates. +-- Copyright 2016-2026 Micro Focus or one of its affiliates. -- -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. @@ -27,6 +27,6 @@ RETURNS VOID LANGUAGE plpgsql VOLATILE AS $$ BEGIN - INSERT INTO public.delete_log VALUES (task_table_name); + INSERT INTO public.delete_log (table_name) VALUES (task_table_name); END $$; diff --git a/job-service-db/src/main/resources/db/migration/procedures/public/R__dropDeletedTaskTables.sql b/job-service-db/src/main/resources/db/migration/procedures/public/R__dropDeletedTaskTables.sql index c9bd2aae2..d24481f40 100644 --- a/job-service-db/src/main/resources/db/migration/procedures/public/R__dropDeletedTaskTables.sql +++ b/job-service-db/src/main/resources/db/migration/procedures/public/R__dropDeletedTaskTables.sql @@ -20,8 +20,13 @@ * Description: * This procedure reads parent task table names from deleted_parent_table_log table and populates delete_log table with names of * parent as well as child task tables to be dropped. After populating the tables, it then reads the table names from delete_log table - * and drops them. + * and drops them. * All the above is done through batch commits. The batch is defined by commit_limit variable. Default batch size being 10. + * + * DROP loop includes retry-and-dead-letter behaviour: each DROP is attempted up to max_retries times (default 3). + * On failure the retry_count, last_error, and last_attempted_at columns of delete_log are updated. Once a row reaches + * max_retries without a successful DROP it is evicted to the dead-letter table delete_log_failed (and removed from + * delete_log) before the batch COMMIT so that it does not block subsequent processing. */ CREATE OR REPLACE PROCEDURE drop_deleted_task_tables() LANGUAGE plpgsql @@ -32,6 +37,8 @@ DECLARE commit_limit INTEGER:=10; parent_table_log_rec RECORD; rec RECORD; + max_retries CONSTANT INTEGER := 3; + drop_error TEXT; BEGIN -- insert table names into delete_log @@ -48,15 +55,33 @@ BEGIN COMMIT; END LOOP; - selected_table_names := $q$SELECT table_name FROM delete_log LIMIT $q$ || commit_limit || $q$ FOR UPDATE SKIP LOCKED$q$; + selected_table_names := $q$SELECT table_name FROM delete_log WHERE retry_count < $q$ || max_retries || $q$ LIMIT $q$ || commit_limit || $q$ FOR UPDATE SKIP LOCKED$q$; - WHILE EXISTS (SELECT 1 FROM delete_log) + WHILE EXISTS (SELECT 1 FROM delete_log WHERE retry_count < max_retries) LOOP FOR rec IN EXECUTE selected_table_names LOOP - EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(rec.table_name); - DELETE FROM delete_log WHERE table_name = rec.table_name; + BEGIN + EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(rec.table_name); + DELETE FROM delete_log WHERE table_name = rec.table_name; + EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS drop_error = MESSAGE_TEXT; + UPDATE delete_log + SET retry_count = retry_count + 1, + last_error = drop_error, + last_attempted_at = now() + WHERE table_name = rec.table_name; + END; END LOOP; + -- Move exhausted entries to dead-letter table + INSERT INTO delete_log_failed (table_name, last_error, last_attempted_at) + SELECT table_name, last_error, last_attempted_at + FROM delete_log + WHERE retry_count >= max_retries + ON CONFLICT (table_name) DO UPDATE + SET last_error = EXCLUDED.last_error, + last_attempted_at = EXCLUDED.last_attempted_at; + DELETE FROM delete_log WHERE retry_count >= max_retries; COMMIT; END LOOP; END diff --git a/release-notes-10.3.0.md b/release-notes-10.3.0.md index b36546391..ea0bd5117 100644 --- a/release-notes-10.3.0.md +++ b/release-notes-10.3.0.md @@ -5,4 +5,7 @@ ${version-number} #### New Features +#### Bug fixes +- **D1060213**: Proactively handle if any exception happens while dropping `task_` tables. + #### Known Issues