* allows to inject transactional and nontransactional messages in the
* changestream.
*
+ * There are really two levels of DDL lock - the global lock that only
+ * one node can hold, and individual local DDL locks on each node. If
+ * a node holds the global DDL lock then it owns the local DDL locks on each
+ * node.
+ *
* DDL lock acquiration basically works like this:
*
- * 1) A utility command notices that it needs the ddl lock. If there
- * already is a local ddl lock it'll ERROR out.
+ * 1) A utility command notices that it needs the global ddl lock and the local
+ * node doesn't already hold it. If there already is a local ddl lock
+ * it'll ERROR out, as this indicates another node already holds or is
+ * trying to acquire the global DDL lock.
+ *
* 2) It sends out a 'acquire_lock' message to all other nodes.
+ *
* 3) When another node receives a 'acquire_lock' message it checks whether
- * the ddl lock is already held locally. If so it'll send a
- * 'decline_lock' message back causing the lock acquiration to fail.
- * 4) If a 'acquire_lock' message is received and the DDL lock is not held
- * locally it'll be acquired and an entry into the 'bdr_global_locks'
- * table will be made marking the lock to be in the 'catchup' phase.
+ * the local ddl lock is already held. If so it'll send a 'decline_lock'
+ * message back causing the lock acquiration to fail.
+ *
+ * 4) If a 'acquire_lock' message is received and the local DDL lock is not
+ * held it'll be acquired and an entry into the 'bdr_global_locks' table
+ * will be made marking the lock to be in the 'catchup' phase.
+ *
* 5) All concurrent user transactions will be cancelled.
+ *
* 6) A 'request_replay_confirm' message will be sent to all other nodes
* containing a lsn that has to be replayed.
+ *
* 7) When a 'request_replay_confirm' message is received, a
* 'replay_confirm' message will be sent back.
+ *
* 8) Once all other nodes have replied with 'replay_confirm' the DDL lock
* has been successfully acquired on the node reading the 'acquire_lock'
* message (from 3)). The corresponding bdr_global_locks entry will be
* updated to the 'acquired' state and a 'confirm_lock' message will be sent out.
+ *
* 9) Once all nodes have replied with 'confirm_lock' messages the ddl lock
* has been acquired.
*
}
/*
+ * This node has just started up. Init its local state and send a startup
+ * announcement message.
*
+ * Called from the per-db worker.
*/
void
bdr_locks_startup(Size nnodes)
*/
bdr_prepare_message(&s, BDR_MESSAGE_START);
- elog(DEBUG1, "sending DDL lock startup message");
+ elog(DEBUG1, "bdr: sending DDL lock startup message");
lsn = LogStandbyMessage(s.data, s.len, false);
resetStringInfo(&s);
XLogFlush(lsn);
bdr_locks_find_database(DatumGetObjectId(values[7]), false);
db->lock_holder = node_id;
db->lockcount++;
- elog(DEBUG1, "reacquiring DDL lock held before shutdown");
+ /* A remote node might have held the local DDL lock before restart */
+ elog(DEBUG1, "bdr: reacquiring local DDL lock held before shutdown");
}
else if (strcmp(state, "catchup") == 0)
{
db->replay_confirmed = 0;
db->replay_confirmed_lsn = wait_for_lsn;
- elog(DEBUG1, "restarting DDL lock replay catchup phase");
+ elog(DEBUG1, "bdr: restarting DDL lock replay catchup phase");
}
else
- elog(PANIC, "BDR: unknown lockstate");
+ elog(PANIC, "bdr: unknown lockstate");
}
systable_endscan(scan);
CommitTransactionCommand();
+ elog(DEBUG2, "bdr: ddl locking startup completed, local DML enabled");
+
/* allow local DML */
bdr_my_locks_database->locked_and_loaded = true;
}
if (bdr_my_locks_database->lockcount > 0)
bdr_my_locks_database->lockcount--;
else
- elog(WARNING, "BDR: releasing unacquired DDL lock");
+ elog(WARNING, "bdr: releasing unacquired DDL lock");
LWLockRelease(bdr_locks_ctl->lock);
this_xact_acquired_lock = false;
}
/*
* Acquire DDL lock on the side that wants to perform DDL.
+ *
+ * Called from a user backend when the command filter spots a DDL attempt; runs
+ * in the user backend.
*/
void
bdr_acquire_ddl_lock(void)
bdr_locks_find_my_database(false);
+ elog(DEBUG2, "bdr: attempting to acquire global DDL lock for (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
+
/* send message about ddl lock */
bdr_prepare_message(&s, BDR_MESSAGE_ACQUIRE_LOCK);
errmsg("database is locked against ddl by another node"),
errhint("Some node in the cluster is performing DDL")));
+
bdr_my_locks_database->acquire_confirmed = 0;
bdr_my_locks_database->acquire_declined = 0;
bdr_my_locks_database->waiting_latch = &MyProc->procLatch;
LWLockRelease(bdr_locks_ctl->lock);
- elog(DEBUG1, "waiting for the other nodes acks");
+ elog(DEBUG2, "bdr: sent DDL lock request, waiting for confirmation");
while (true)
{
}
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
+
/* TODO: recheck it's ours */
bdr_my_locks_database->acquire_confirmed = 0;
bdr_my_locks_database->acquire_declined = 0;
bdr_my_locks_database->waiting_latch = NULL;
bdr_my_locks_database->lockcount++;
this_xact_acquired_lock = true;
+
+ elog(DEBUG1, "bdr: global DDL lock acquired successfully by (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
+
LWLockRelease(bdr_locks_ctl->lock);
}
/*
* Another node has asked for a DDL lock. Try to acquire the local ddl lock.
+ *
+ * Runs in the apply worker.
*/
void
bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
bdr_locks_find_my_database(false);
+ elog(DEBUG2, "bdr: node (" BDR_LOCALID_FORMAT ") requested the global DDL lock", sysid, tli, datid, "");
+
initStringInfo(&s);
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
* No previous DDL lock found. Start acquiring it.
*/
+ elog(DEBUG2, "bdr: no prior DDL lock found, acquiring local DDL lock");
+
/* Add a row to bdr_locks */
StartTransactionCommand();
ForceSyncCommit(); /* async commit would be too complicated */
heap_close(rel, NoLock);
CommitTransactionCommand();
+ elog(DEBUG3, "bdr: no conflicting global DDL lock found");
}
PG_CATCH();
{
if (geterrcode() == ERRCODE_UNIQUE_VIOLATION)
{
- elog(DEBUG1, "declining DDL lock because a conflicting DDL lock exists in bdr_global_locks");
+ elog(DEBUG1, "bdr: declining DDL lock because a conflicting DDL lock exists in bdr_global_locks");
AbortOutOfAnyTransaction();
goto decline;
}
LWLockRelease(bdr_locks_ctl->lock);
/*
- * Now kill all local processes that are still writing. We won't
+ * Now kill all local processes that are still writing. We can't just
* prevent them from writing via the acquired lock as they are still
* running. We're using drastic measures here because it'd be a bad
* idea to wait: The primary is waiting for us and during that time
* held conflicting locks, but that's not easiliy possible at the
* moment.
*/
+ elog(DEBUG3, "bdr: terminating any local processes that conflict with the DDL lock");
conflicts = GetConflictingVirtualXIDs(InvalidTransactionId, MyDatabaseId);
while (conflicts->backendId != InvalidBackendId)
{
conflicts++;
else
pg_usleep(5000);
+
+ elog(DEBUG3, "bdr: signaled %d to terminate, conflicts with ddl lock requested by another node", p);
}
/*
* by all other nodes. When the required number of messages is back we
* can confirm the lock to the original requestor
* (c.f. bdr_process_replay_confirm()).
+ *
+ * If we didn't wait for everyone to replay local changes then a DDL
+ * change that caused those local changes not to apply on remote
+ * nodes might occur, causing a divergent conflict.
*/
- elog(DEBUG1, "requesting replay confirmation from all other nodes");
+ elog(DEBUG2, "bdr: ddl locking requesting replay confirmation from all other nodes before confirming lock granted");
wait_for_lsn = GetXLogInsertRecPtr();
bdr_prepare_message(&s, BDR_MESSAGE_REQUEST_REPLAY_CONFIRM);
bdr_my_locks_database->replay_confirmed = 0;
bdr_my_locks_database->replay_confirmed_lsn = wait_for_lsn;
LWLockRelease(bdr_locks_ctl->lock);
+
+ elog(DEBUG1, "bdr: ddl lock granted to remote node");
}
else
{
LWLockRelease(bdr_locks_ctl->lock);
decline:
ereport(LOG,
- (errmsg("declining remote DDL lock request, already locked")));
+ (errmsg("bdr: declining remote DDL lock request, already locked")));
bdr_prepare_message(&s, BDR_MESSAGE_DECLINE_LOCK);
Assert(!IsTransactionState());
}
}
+/*
+ * Another node has released the global DDL lock, update our local state.
+ *
+ * Runs in the apply worker.
+ */
void
bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
uint64 lock_sysid, TimeLineID lock_tli, Oid lock_datid)
initStringInfo(&s);
+ elog(DEBUG2, "bdr: node (" BDR_LOCALID_FORMAT ") released DDL lock", lock_sysid, lock_tli, lock_datid, "");
+
/*
* Remove row from bdr_locks *before* releasing the in memory lock. If we
* crash we'll replay the event again.
while ((tuple = systable_getnext(scan)) != NULL)
{
- elog(DEBUG1, "found ddl lock entry to delete in response to ddl lock release message");
+ elog(DEBUG3, "bdr: found DDL lock entry to delete in response to DDL lock release message");
simple_heap_delete(rel, &tuple->t_self);
ForceSyncCommit(); /* async commit would be too complicated */
found = true;
CommitTransactionCommand();
if (!found)
- elog(WARNING, "did not find local DDL lock entry about a remotely released lock");
+ ereport(WARNING,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bdr: Did not find local DDL lock entry for a remotely released global DDL lock"),
+ errdetail("node ("BDR_LOCALID_FORMAT") sent a release message but the lock isn't held locally",
+ lock_sysid, lock_tli, lock_datid, "")));
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
if (bdr_my_locks_database->lockcount > 0)
bdr_my_locks_database->lock_holder = InvalidRepNodeId;
}
else
- elog(WARNING, "releasing DDL lock without corresponding in-memory state");
+ /* This shouldn't happen; probable bug if reached */
+ elog(WARNING, "bdr: releasing DDL lock without corresponding in-memory state");
latch = bdr_my_locks_database->waiting_latch;
LWLockRelease(bdr_locks_ctl->lock);
+ elog(DEBUG1, "bdr: local DDL lock released");
+
/* notify an eventual waiter */
if(latch)
SetLatch(latch);
}
/*
- * Another node has confirmed a lock. Changed shared memory state and wakeup
- * the locker.
+ * Another node has confirmed that a node has acquired the DDL lock
+ * successfully. If the acquiring node was us, change shared memory state and
+ * wake up the user backend that was trying to acquire the lock.
+ *
+ * Runs in the apply worker.
*/
void
bdr_process_confirm_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
bdr_my_locks_database->acquire_confirmed++;
latch = bdr_my_locks_database->waiting_latch;
- elog(DEBUG1, "received ddl lock confirmation number %d/%zu",
- bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes);
+ elog(DEBUG2, "bdr: received DDL lock confirmation number %d/%zu from ("BDR_LOCALID_FORMAT")",
+ bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes,
+ origin_sysid, origin_tli, origin_datid, "");
LWLockRelease(bdr_locks_ctl->lock);
if(latch)
}
/*
- * Another node has declined a lock. Changed shared memory state and wakeup
- * the locker.
+ * Another node has declined a lock. If it was us, change shared memory state
+ * and wakeup the user backend that tried to acquire the lock.
+ *
+ * Runs in the apply worker.
*/
void
bdr_process_decline_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
LWLockRelease(bdr_locks_ctl->lock);
if(latch)
SetLatch(latch);
+
+ elog(DEBUG2, "bdr: node ("BDR_LOCALID_FORMAT") declined our DDL global lock request",
+ origin_sysid, origin_tli, origin_datid, "");
}
+/*
+ * Another node has asked us to confirm that we've replayed up to a given LSN.
+ * We've seen the request message, so send the requested confirmation.
+ *
+ * Runs in the walsender.
+ */
void
bdr_process_request_replay_confirm(uint64 sysid, TimeLineID tli,
Oid datid, XLogRecPtr request_lsn)
bdr_locks_find_my_database(false);
+ elog(DEBUG2, "bdr: node ("BDR_LOCALID_FORMAT") requested replay confirmation; sending",
+ sysid, tli, datid, "");
+
initStringInfo(&s);
bdr_prepare_message(&s, BDR_MESSAGE_REPLAY_CONFIRM);
pq_sendint64(&s, request_lsn);
lsn = LogStandbyMessage(s.data, s.len, false);
XLogFlush(lsn);
+
}
+/*
+ * A remote node has seen a replay confirmation request and replied to it.
+ *
+ * If we sent the original request, update local state appropriately.
+ *
+ * If a DDL lock request has reached quorum as a result of this confirmation,
+ * write a log acquisition confirmation and bdr_global_locks update to xlog.
+ *
+ * Runs in the apply worker.
+ */
void
bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
Oid datid, XLogRecPtr request_lsn)
bdr_locks_find_my_database(false);
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
- elog(DEBUG1, "processing replay confirmation for request %X/%X at %X/%X",
+ elog(DEBUG2, "bdr: processing replay confirmation from node ("BDR_LOCALID_FORMAT") for request %X/%X at %X/%X",
+ sysid, tli, datid, "",
(uint32)(bdr_my_locks_database->replay_confirmed_lsn >> 32),
(uint32)bdr_my_locks_database->replay_confirmed_lsn,
(uint32)(request_lsn >> 32),
{
bdr_my_locks_database->replay_confirmed++;
- elog(DEBUG1, "confirming replay %u/%zu",
+ elog(DEBUG2, "bdr: confirming replay %u/%zu",
bdr_my_locks_database->replay_confirmed,
bdr_my_locks_database->nnodes);
initStringInfo(&s);
+ elog(DEBUG2, "bdr: DDL lock quorum reached, logging confirmation of this node's acquisition of global DDL lock");
+
/* clear out information about requested confirmations */
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
bdr_my_locks_database->replay_confirmed = 0;
bool isnull[10];
if (found)
- elog(PANIC, "duplicate lock?");
+ elog(PANIC, "bdr: duplicate lock?");
- elog(DEBUG1, "updating DDL lock state from 'catchup' to 'acquired'");
+ elog(DEBUG1, "bdr: updating DDL lock state from 'catchup' to 'acquired'");
heap_deform_tuple(tuple, RelationGetDescr(rel),
values, isnull);
}
if (!found)
- elog(PANIC, "unknown lock");
+ elog(PANIC, "bdr: got confirmation for unknown lock");
systable_endscan(scan);
UnregisterSnapshot(snap);
CommitTransactionCommand();
- elog(DEBUG1, "sending confirmation for DDL lock replay confirmation request");
+ elog(DEBUG2, "bdr: sent confirmation of successful DDL lock acquisition");
}
}
bdr_always_allow_writes = always_allow;
}
+/*
+ * A remote node has sent a startup message. Update any appropriate local state
+ * like any locally held DDL locks for it.
+ *
+ * Runs in the apply worker.
+ */
void
bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
{
initStringInfo(&s);
+ elog(DEBUG2, "bdr: Got startup message from node ("BDR_LOCALID_FORMAT"), clearing any locks it held",
+ sysid, tli, datid);
+
StartTransactionCommand();
snap = RegisterSnapshot(GetLatestSnapshot());
rel = heap_open(BdrLocksRelid, RowExclusiveLock);
while ((tuple = systable_getnext(scan)) != NULL)
{
- elog(DEBUG1, "found remote lock to delete (after remote restart)");
+ elog(DEBUG2, "bdr: found remote lock to delete (after remote restart)");
simple_heap_delete(rel, &tuple->t_self);
/* FIXME: locks */
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
if (bdr_my_locks_database->lockcount == 0)
- elog(WARNING, "bdr_global_locks row exists without corresponding in memory state");
+ elog(WARNING, "bdr: bdr_global_locks row exists without corresponding in memory state");
else
{
bdr_my_locks_database->lockcount--;
CommitTransactionCommand();
}
+/*
+ * The BDR ExecutorStart_hook that detects DDL and handles DDL lock acquision
+ * requests.
+ *
+ * Runs in user backends.
+ */
static void
BdrExecutorStart(QueryDesc *queryDesc, int eflags)
{
/* is the database still starting up and hasn't loaded locks */
if (!bdr_my_locks_database->locked_and_loaded)
+ /*
+ * TODO: sleep here instead of ERRORing, and let statement_timeout
+ * ERROR if appropriate
+ */
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("database is not yet ready for writes"),
+ errmsg("bdr: database is not yet ready for DDL operations"),
+ errdetail("BDR DDL locking is still starting up"),
errhint("Wait for a short time and retry.")));
/* Is this database locked against user initiated ddl? */
if (bdr_my_locks_database->lockcount > 0 && !this_xact_acquired_lock)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("database is locked against writes"),
+ errmsg("bdr: database is locked against DDL operations"),
errhint("Some node in the cluster is performing DDL")));
done: