Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/pgactive.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "miscadmin.h"
#include "access/xlogdefs.h"
#include "executor/execdesc.h"
#include "postmaster/bgworker.h"

/* Postgres commit 7dbfea3c455e introduced SIGHUP handler in version 13. */
Expand Down Expand Up @@ -677,7 +678,7 @@ extern void pgactive_capture_ddl(Node *parsetree, const char *queryString,
DestReceiver *dest, CommandTag completionTag);

extern void pgactive_locks_shmem_init(void);
extern void pgactive_locks_check_dml(void);
extern void pgactive_locks_check_dml(QueryDesc *queryDesc);

/* background workers and supporting functions for them */
PGDLLEXPORT extern void pgactive_apply_main(Datum main_arg);
Expand Down
2 changes: 1 addition & 1 deletion src/pgactive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ pgactiveExecutorStart(QueryDesc *queryDesc, int eflags)
read_only_node = pgactive_local_node_read_only() && !pgactive_skip_ddl_replication;

/* check for concurrent global DDL locks */
pgactive_locks_check_dml();
pgactive_locks_check_dml(queryDesc);

/*
* Are we in pgactive.replicate_ddl_command? If so, it's not safe to do
Expand Down
93 changes: 92 additions & 1 deletion src/pgactive_locks.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,15 @@

#include "libpq/pqformat.h"

#include "parser/parsetree.h"

#include "replication/message.h"
#include "replication/origin.h"
#include "replication/slot.h"

#include "storage/barrier.h"
#include "storage/lwlock.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/shmem.h"
Expand Down Expand Up @@ -2338,15 +2341,96 @@ pgactive_locks_peer_has_lock(pgactiveLockType min_mode)
return lock_held_by_peer;
}


/*
* Unlock the relation if locked by toplevel transaction,
* there could be a queued DDL trying to be replayed
* using same relation.
*
* This also locks the rel if it was unlocked for a DDL exectution.
* Use `unlockRel` to lock & unlock the rel.
*
* Returns true if unlock was successful.
*/
static bool
lock_unlock_rel_conditionaly(QueryDesc *queryDesc, bool unlockRel)
{
List *rangeTable;
ListCell *l;
ResourceOwner oldowner;
PlannedStmt *plannedstmt = queryDesc->plannedstmt;

if (!(queryDesc->operation == CMD_INSERT
|| queryDesc->operation == CMD_UPDATE
|| queryDesc->operation == CMD_DELETE))
return false;

rangeTable = plannedstmt->rtable;
oldowner = CurrentResourceOwner;

/*
* The relation would be locked by the toplevel transaction,
* lmgr won't let us unlock as a different resource owner,
* so change the CurrentResourceOwner to proceed with
* the unlocking
*/
if (unlockRel)
{
CurrentResourceOwner = TopTransactionResourceOwner;

foreach(l, plannedstmt->resultRelations)
{
Index rtei = lfirst_int(l);
RangeTblEntry *rte = rt_fetch(rtei, rangeTable);
Relation rel;

rel = RelationIdGetRelation(rte->relid);

if(CheckRelationLockedByMe(rel, rte->rellockmode, true))
{
UnlockRelationOid(RelationGetRelid(rel), rte->rellockmode);
RelationClose(rel);
}
}

CurrentResourceOwner = oldowner;
return true;
}

/*
* Lock relation if it wsa unlocked
*/
CurrentResourceOwner = TopTransactionResourceOwner;

foreach(l, plannedstmt->resultRelations)
{
Index rtei = lfirst_int(l);
RangeTblEntry *rte = rt_fetch(rtei, rangeTable);
Relation rel;

rel = RelationIdGetRelation(rte->relid);

if(!CheckRelationLockedByMe(rel, rte->rellockmode, true))
{
LockRelationOid(RelationGetRelid(rel), rte->rellockmode);
RelationClose(rel);
}
}

CurrentResourceOwner = oldowner;
return false;
}

/*
* Function for checking if there is no conflicting pgactive lock.
*
* Should be caled from ExecutorStart_hook.
*/
void
pgactive_locks_check_dml(void)
pgactive_locks_check_dml(QueryDesc *queryDesc)
{
bool lock_held_by_peer;
bool rel_was_unlocked;

/*
* replace pgactive_skip_ddl_locking by pgactive_skip_ddl_replication for
Expand Down Expand Up @@ -2391,6 +2475,9 @@ pgactive_locks_check_dml(void)
{
TimestampTz canceltime;

/* unlock rel if locked by toplevel transaction, so that a queued DDL (using same rel) can be executed */
rel_was_unlocked = lock_unlock_rel_conditionaly(queryDesc, true);

/*
* If we add a waiter after the lock is released we may get woken
* unnecessarily, but it won't do any harm.
Expand Down Expand Up @@ -2427,6 +2514,10 @@ pgactive_locks_check_dml(void)
ResetLatch(&MyProc->procLatch);
CHECK_FOR_INTERRUPTS();
}

/* lock rel again if it was unlocked */
if (rel_was_unlocked)
lock_unlock_rel_conditionaly(queryDesc, false);
}
}

Expand Down