#include "utils/fmgroids.h"
#include "utils/snapmgr.h"
+typedef struct BDRLockWaiter {
+ PGPROC *proc;
+ slist_node node;
+} BDRLockWaiter;
+
typedef struct BdrLocksDBState {
/* db slot used */
bool in_use;
int replay_confirmed;
XLogRecPtr replay_confirmed_lsn;
- Latch *waiting_latch;
+ Latch *requestor;
+ slist_head waiters; /* list of waiting PGPROCs */
} BdrLocksDBState;
typedef struct BdrLocksCtl {
LWLock *lock;
- BdrLocksDBState dbstate[FLEXIBLE_ARRAY_MEMBER];
+ BdrLocksDBState *dbstate;
+ BDRLockWaiter *waiters;
} BdrLocksCtl;
static BdrLocksDBState * bdr_locks_find_database(Oid dbid, bool create);
static void bdr_locks_find_my_database(bool create);
static void bdr_prepare_message(StringInfo s, BdrMessageType message_type);
+void bdr_locks_addwaiter(PGPROC *proc);
+void bdr_locks_on_unlock(void);
+
static BdrLocksCtl *bdr_locks_ctl;
/* shmem init hook to chain to on startup, if any */
bdr_locks_shmem_size(void)
{
Size size = 0;
+ uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS;
size = add_size(size, sizeof(BdrLocksCtl));
size = add_size(size, mul_size(sizeof(BdrLocksDBState), bdr_max_databases));
+ size = add_size(size, mul_size(sizeof(BDRLockWaiter), TotalProcs));
return size;
}
{
memset(bdr_locks_ctl, 0, bdr_locks_shmem_size());
bdr_locks_ctl->lock = LWLockAssign();
+ bdr_locks_ctl->dbstate = (BdrLocksDBState *) bdr_locks_ctl + sizeof(BdrLocksCtl);
+ bdr_locks_ctl->waiters = (BDRLockWaiter *) bdr_locks_ctl + sizeof(BdrLocksCtl) +
+ mul_size(sizeof(BdrLocksDBState), bdr_max_databases);
}
LWLockRelease(AddinShmemInitLock);
}
shmem_startup_hook = bdr_locks_shmem_startup;
}
+/* Waiter manipulation. */
+void
+bdr_locks_addwaiter(PGPROC *proc)
+{
+ BDRLockWaiter *waiter = &bdr_locks_ctl->waiters[proc->pgprocno];
+
+ waiter->proc = proc;
+ slist_push_head(&bdr_my_locks_database->waiters, &waiter->node);
+}
+
+void
+bdr_locks_on_unlock(void)
+{
+ while (!slist_is_empty(&bdr_my_locks_database->waiters))
+ {
+ slist_node *node;
+ BDRLockWaiter *waiter;
+ PGPROC *proc;
+
+ node = slist_pop_head_node(&bdr_my_locks_database->waiters);
+ waiter = slist_container(BDRLockWaiter, node, node);
+ proc = waiter->proc;
+
+ SetLatch(&proc->procLatch);
+ }
+}
+
/*
* Find, and create if necessary, the lock state entry for dboid.
*/
SysScanDesc scan;
Snapshot snap;
HeapTuple tuple;
-
- XLogRecPtr lsn;
- StringInfoData s;
+ XLogRecPtr lsn;
+ StringInfoData s;
Assert(IsUnderPostmaster);
Assert(!IsTransactionState());
if (bdr_my_locks_database->locked_and_loaded)
return;
+ slist_init(&bdr_my_locks_database->waiters);
+
/* We haven't yet established how many nodes we're connected to. */
bdr_my_locks_database->nnodes = 0;
this_xact_acquired_lock = false;
bdr_my_locks_database->replay_confirmed = 0;
bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
- bdr_my_locks_database->waiting_latch = NULL;
+ bdr_my_locks_database->requestor = NULL;
+
+ if (bdr_my_locks_database->lockcount == 0)
+ bdr_locks_on_unlock();
LWLockRelease(bdr_locks_ctl->lock);
}
this_xact_acquired_lock = true;
bdr_my_locks_database->acquire_confirmed = 0;
bdr_my_locks_database->acquire_declined = 0;
- bdr_my_locks_database->waiting_latch = &MyProc->procLatch;
+ bdr_my_locks_database->requestor = &MyProc->procLatch;
/* lock looks to be free, try to acquire it */
/* TODO: recheck it's ours */
bdr_my_locks_database->acquire_confirmed = 0;
bdr_my_locks_database->acquire_declined = 0;
- bdr_my_locks_database->waiting_latch = NULL;
+ bdr_my_locks_database->requestor = NULL;
elog(DEBUG1, "global DDL lock acquired successfully by (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
/* XXX: recheck owner of lock */
}
- latch = bdr_my_locks_database->waiting_latch;
+ latch = bdr_my_locks_database->requestor;
bdr_my_locks_database->replay_confirmed = 0;
bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
- bdr_my_locks_database->waiting_latch = NULL;
+ bdr_my_locks_database->requestor = NULL;
+
+ if (bdr_my_locks_database->lockcount == 0)
+ bdr_locks_on_unlock();
LWLockRelease(bdr_locks_ctl->lock);
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
bdr_my_locks_database->acquire_confirmed++;
- latch = bdr_my_locks_database->waiting_latch;
+ latch = bdr_my_locks_database->requestor;
elog(DEBUG2, "received DDL lock confirmation number %d/%zu from ("BDR_LOCALID_FORMAT")",
bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes,
LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
bdr_my_locks_database->acquire_declined++;
- latch = bdr_my_locks_database->waiting_latch;
+ latch = bdr_my_locks_database->requestor;
LWLockRelease(bdr_locks_ctl->lock);
if(latch)
SetLatch(latch);
bdr_my_locks_database->replay_confirmed = 0;
bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
- bdr_my_locks_database->waiting_latch = NULL;
+ bdr_my_locks_database->requestor = NULL;
bdr_prepare_message(&s, BDR_MESSAGE_CONFIRM_LOCK);
bdr_my_locks_database->replay_confirmed = 0;
bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
}
+
+ if (bdr_my_locks_database->lockcount == 0)
+ bdr_locks_on_unlock();
+
LWLockRelease(bdr_locks_ctl->lock);
}
bdr_locks_find_my_database(false);
- /* is the database still starting up and hasn't loaded locks */
- if (!bdr_my_locks_database->locked_and_loaded)
- /*
- * TODO: sleep here instead of ERRORing, and let statement_timeout
- * ERROR if appropriate
- */
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("Database is not yet ready for DDL operations"),
- errdetail("BDR DDL locking is still starting up"),
- errhint("Wait for a short time and retry.")));
+ /*
+ * The bdr is still starting up and hasn't loaded locks, wait for it.
+ * The statement_timeout will kill us if necessary.
+ */
+ while (!bdr_my_locks_database->locked_and_loaded)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Probably can't use latch here easily, since init didn't happen yet. */
+ pg_usleep(5000L);
+ }
/* Is this database locked against user initiated ddl? */
pg_memory_barrier();
if (bdr_my_locks_database->lockcount > 0 && !this_xact_acquired_lock)
{
- uint64 holder_sysid;
- TimeLineID holder_tli;
- Oid holder_datid;
+ bdr_locks_addwaiter(MyProc);
- bdr_fetch_sysid_via_node_id(bdr_my_locks_database->lock_holder,
- &holder_sysid, &holder_tli,
- &holder_datid);
+ /* Wait for lock to be released. */
+ for (;;)
+ {
+ int rc;
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("Database is locked against DDL operations"),
- errhint("Node ("UINT64_FORMAT",%u,%u) in the cluster is already performing DDL",
- holder_sysid, holder_tli, holder_datid)));
+ CHECK_FOR_INTERRUPTS();
+
+ pg_memory_barrier();
+ if (bdr_my_locks_database->lockcount == 0)
+ break;
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 10000L);
+
+ ResetLatch(&MyProc->procLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
}
}