Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1427,12 +1427,140 @@ public void testDeleteLog() throws SQLException
assertEquals(foundTables.size(), 0);
// assert number of rows in delete_log to be 0.
assertEquals(getRowsInDeleteLog(dbConnection), 0);
assertEquals(getRowsInDeleteLogFailed(dbConnection), 0,
"delete_log_failed should be empty after all tables are dropped successfully");
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
Assert.fail();
}
}

@Test
public void testDeleteLogBacklogWhenDropTableFails() throws SQLException
{
final String tablePrefix = "task_delete_log_lock_" + System.currentTimeMillis();
final String parentTableName = tablePrefix;

// lockAcquired[0]: set by lock thread once the lock is held — gates the procedure call.
// releaseLock[0]: set by main thread after procedure returns — signals lock thread to release.
final boolean[] lockAcquired = {false};
final boolean[] releaseLock = {false};

Thread lockThread = null;
final String firstChildTable = parentTableName + ".1";
try (final java.sql.Connection dbConnection = JobServiceConnectionUtil.getDbConnection())
{
// Create a hierarchy and queue it for asynchronous deletion.
createTaskTable(dbConnection, parentTableName);
insertRecordsInTaskTable(dbConnection, parentTableName, 20);
createAndPopulateChildTables(dbConnection, parentTableName);
insertTableNameIntoParentTableLog(dbConnection, parentTableName);

// Acquire an EXCLUSIVE lock on the first child table before the procedure starts.
lockThread = new Thread(() -> {
System.out.println("Lock thread started");
try (final java.sql.Connection lockConn = JobServiceConnectionUtil.getDbConnection()) {
lockConn.setAutoCommit(false);
try (final PreparedStatement lockStmt = lockConn.prepareStatement(
"LOCK TABLE \"" + firstChildTable + "\" IN EXCLUSIVE MODE")) {
lockStmt.execute();
}
// Signal the main thread that the lock is held.
synchronized (lockAcquired) {
lockAcquired[0] = true;
lockAcquired.notifyAll();
}
System.out.println("Lock acquired on " + firstChildTable + ", holding until procedure completes");
// Hold the lock until the main thread signals that the procedure has finished.
final long deadline = System.currentTimeMillis() + 15_000L;
synchronized (releaseLock) {
while (!releaseLock[0] && System.currentTimeMillis() < deadline) {
releaseLock.wait(500);
}
}
lockConn.rollback();
System.out.println("Lock released on " + firstChildTable);
} catch (final Exception ex) {
LOG.warn("Lock thread error", ex);
}
});
lockThread.start();

// Wait until the lock thread confirms the lock is held before calling the procedure.
synchronized (lockAcquired) {
final long deadline = System.currentTimeMillis() + 10_000L;
while (!lockAcquired[0] && System.currentTimeMillis() < deadline) {
lockAcquired.wait(500);
}
}
if (!lockAcquired[0]) {
Assert.fail("Lock thread did not acquire the lock within 10 seconds");
}

// Make the failing DROP deterministic and fast.
setLockTimeout(dbConnection, "1s");

// Procedure now handles DROP failures internally — it no longer throws to the caller.
System.out.println("Calling drop_deleted_task_tables procedure which should encounter lock and handle it internally");
callDropDeletedTaskTables(dbConnection);
System.out.println("drop_deleted_task_tables procedure completed");

// Procedure has returned — signal the lock thread to release, then assert.
synchronized (releaseLock) {
releaseLock[0] = true;
releaseLock.notifyAll();
}
// The procedure's internal WHILE loop runs until no retryable entries remain, so within
// a single call firstChildTable is attempted max_retries (3) times, exhausted, and evicted
// to delete_log_failed. It is no longer in delete_log at this point.
assertFalse(entryInDeleteLog(dbConnection, firstChildTable),
"firstChildTable should have been evicted from delete_log after exhausting max_retries");
// Verify the entry landed in the dead-letter table with a captured error message.
final String lastError = getLastErrorFromDeleteLogFailed(dbConnection, firstChildTable);
assertNotNull(lastError,
"last_error in delete_log_failed should be set after failed drop attempts");
assertFalse(lastError.isEmpty(),
"last_error in delete_log_failed should not be empty");
// delete_log must be empty — all other tables were dropped, firstChildTable was evicted.
assertEquals(getRowsInDeleteLog(dbConnection), 0,
"delete_log should be empty after procedure completes");
// The physical table must still exist — it could not be dropped.
final List<String> tablesAfterDropAttempt = getTablesByPrefix(dbConnection, firstChildTable);
assertFalse(tablesAfterDropAttempt.isEmpty(),
"firstChildTable should still physically exist since DROP was blocked");

} catch (final Exception e) {
LOG.error(e.getMessage(), e);
Assert.fail();
} finally {
// Ensure the lock thread is always unblocked even if the try block threw.
synchronized (releaseLock) {
releaseLock[0] = true;
releaseLock.notifyAll();
}
if (lockThread != null) {
try {
lockThread.join(5000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
// Best-effort cleanup so later tests are not affected by leftover task tables.
try (final java.sql.Connection cleanupConnection = JobServiceConnectionUtil.getDbConnection()) {
setLockTimeout(cleanupConnection, "30s");
for (int i = 0; i < 5; i++) {
callDropDeletedTaskTables(cleanupConnection);
if (getTablesByPrefix(cleanupConnection, tablePrefix).isEmpty()) {
break;
}
Thread.sleep(1000);
}
} catch (final Exception cleanupError) {
LOG.warn("Cleanup after testDeleteLogBacklogWhenDropTableFails did not fully complete", cleanupError);
}
}
}

@Test
public void testUpdateJobProgress()
{
Expand Down Expand Up @@ -1619,6 +1747,74 @@ private int getRowsInDeleteLog(final java.sql.Connection dbConnection) throws SQ
return 0;
}

private int getRowsInDeleteLogFailed(final java.sql.Connection dbConnection) throws SQLException
{
try (final PreparedStatement stmt = dbConnection.prepareStatement(
"SELECT COUNT(*) FROM delete_log_failed");
final ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getInt(1);
}
}
return 0;
}

private String getLastErrorFromDeleteLogFailed(final java.sql.Connection dbConnection, final String tableName) throws SQLException
{
try (final PreparedStatement stmt = dbConnection.prepareStatement(
"SELECT last_error FROM delete_log_failed WHERE table_name = ?")) {
stmt.setString(1, tableName);
try (final ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getString(1);
}
}
}
return null;
}

private boolean entryInDeleteLog(final java.sql.Connection dbConnection, final String tableName) throws SQLException
{
try (final PreparedStatement stmt = dbConnection.prepareStatement(
"SELECT 1 FROM delete_log WHERE table_name = ?")) {
stmt.setString(1, tableName);
try (final ResultSet rs = stmt.executeQuery()) {
return rs.next();
}
}
}

private List<String> getTablesByPrefix(final java.sql.Connection dbConnection, final String tablePrefix) throws SQLException
{
final List<String> foundTables = new ArrayList();
try (final PreparedStatement stmt = dbConnection.prepareStatement(
"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE ?")) {
stmt.setString(1, tablePrefix + "%");
try (final ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
foundTables.add(rs.getString(1));
}
}
}
return foundTables;
}

private void callDropDeletedTaskTables(final java.sql.Connection dbConnection) throws SQLException
{
try (final CallableStatement dropDeletedTables = dbConnection.prepareCall("call drop_deleted_task_tables()")) {
dropDeletedTables.execute();
}
}

private void setLockTimeout(final java.sql.Connection dbConnection, final String timeout) throws SQLException
{
try (final PreparedStatement lockTimeoutStmt = dbConnection.prepareStatement(
"SELECT set_config('lock_timeout', ?, false)")) {
lockTimeoutStmt.setString(1, timeout);
lockTimeoutStmt.execute();
}
}

/**
* Retrieve a single message from a queue. Call this before triggering the message publish.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
--
-- Copyright 2016-2026 Open Text.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

/*
**************************************************************
** Add retry tracking columns to delete_log and create the **
** delete_log_failed dead-letter table. **
** **
** delete_log table now records how many times a **
** deletion has been attempted, the last error encountered, **
** and when it was last tried. Entries that exceed the **
** maximum retry threshold are promoted to delete_log_failed **
** so that operators can inspect and remediate them without **
** blocking ongoing cleanup work. **
**************************************************************
*/

-- Add retry tracking columns to the existing delete_log table.
ALTER TABLE public.delete_log
ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS last_error TEXT,
ADD COLUMN IF NOT EXISTS last_attempted_at TIMESTAMPTZ;

-- Dead-letter table for delete_log entries that have exhausted all retries.
CREATE TABLE IF NOT EXISTS public.delete_log_failed
(
table_name VARCHAR(63) NOT NULL,
last_error TEXT,
first_failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_attempted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT delete_log_failed_pkey PRIMARY KEY (table_name)
);
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
--
-- Copyright 2016-2022 Micro Focus or one of its affiliates.
-- Copyright 2016-2026 Micro Focus or one of its affiliates.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,6 @@ RETURNS VOID
LANGUAGE plpgsql VOLATILE
AS $$
BEGIN
INSERT INTO public.delete_log VALUES (task_table_name);
INSERT INTO public.delete_log (table_name) VALUES (task_table_name);
END
$$;
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
* Description:
* This procedure reads parent task table names from deleted_parent_table_log table and populates delete_log table with names of
* parent as well as child task tables to be dropped. After populating the tables, it then reads the table names from delete_log table
* and drops them.
* and drops them.
* All the above is done through batch commits. The batch is defined by commit_limit variable. Default batch size being 10.
*
* DROP loop includes retry-and-dead-letter behaviour: each DROP is attempted up to max_retries times (default 3).
* On failure the retry_count, last_error, and last_attempted_at columns of delete_log are updated. Once a row reaches
* max_retries without a successful DROP it is evicted to the dead-letter table delete_log_failed (and removed from
* delete_log) before the batch COMMIT so that it does not block subsequent processing.
*/
CREATE OR REPLACE PROCEDURE drop_deleted_task_tables()
LANGUAGE plpgsql
Expand All @@ -32,6 +37,8 @@ DECLARE
commit_limit INTEGER:=10;
parent_table_log_rec RECORD;
rec RECORD;
max_retries CONSTANT INTEGER := 3;
drop_error TEXT;

BEGIN
-- insert table names into delete_log
Expand All @@ -48,15 +55,33 @@ BEGIN
COMMIT;
END LOOP;

selected_table_names := $q$SELECT table_name FROM delete_log LIMIT $q$ || commit_limit || $q$ FOR UPDATE SKIP LOCKED$q$;
selected_table_names := $q$SELECT table_name FROM delete_log WHERE retry_count < $q$ || max_retries || $q$ LIMIT $q$ || commit_limit || $q$ FOR UPDATE SKIP LOCKED$q$;

WHILE EXISTS (SELECT 1 FROM delete_log)
WHILE EXISTS (SELECT 1 FROM delete_log WHERE retry_count < max_retries)
LOOP
FOR rec IN EXECUTE selected_table_names
LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(rec.table_name);
DELETE FROM delete_log WHERE table_name = rec.table_name;
BEGIN
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(rec.table_name);
DELETE FROM delete_log WHERE table_name = rec.table_name;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS drop_error = MESSAGE_TEXT;
UPDATE delete_log
SET retry_count = retry_count + 1,
last_error = drop_error,
last_attempted_at = now()
WHERE table_name = rec.table_name;
END;
END LOOP;
-- Move exhausted entries to dead-letter table
INSERT INTO delete_log_failed (table_name, last_error, last_attempted_at)
SELECT table_name, last_error, last_attempted_at
FROM delete_log
WHERE retry_count >= max_retries
ON CONFLICT (table_name) DO UPDATE
SET last_error = EXCLUDED.last_error,
last_attempted_at = EXCLUDED.last_attempted_at;
DELETE FROM delete_log WHERE retry_count >= max_retries;
COMMIT;
END LOOP;
END
Expand Down
3 changes: 3 additions & 0 deletions release-notes-10.3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ ${version-number}

#### New Features

#### Bug fixes
- **D1060213**: Proactively handle if any exception happens while dropping `task_` tables.

#### Known Issues