bdr: Log all steps of DDL lock acquisition, always prefix with bdr:
authorCraig Ringer <craig@2ndquadrant.com>
Mon, 11 Aug 2014 06:20:03 +0000 (14:20 +0800)
committerAndres Freund <andres@anarazel.de>
Mon, 8 Sep 2014 15:49:55 +0000 (17:49 +0200)
The DDL lock acquision logging wasn't logging all the steps, logging
at consistent log levels, or prefixing things with bdr: for easy grepping.

Add more log detail, mention node IDs when messages are received or locks are
acquired.

Comment the individual steps in lock acquisition.

bdr_locks.c

index 9858de91aff606ef5ae88aca34f7d6239932c305..b6e774bbb5e330cdd1c050db506522f9ce3ceed4 100644 (file)
  *    allows to inject transactional and nontransactional messages in the
  *    changestream.
  *
+ *    There are really two levels of DDL lock - the global lock that only
+ *    one node can hold, and individual local DDL locks on each node. If
+ *    a node holds the global DDL lock then it owns the local DDL locks on each
+ *    node.
+ *
  *    DDL lock acquiration basically works like this:
  *
- *    1) A utility command notices that it needs the ddl lock. If there
- *       already is a local ddl lock it'll ERROR out.
+ *    1) A utility command notices that it needs the global ddl lock and the local
+ *       node doesn't already hold it. If there already is a local ddl lock
+ *       it'll ERROR out, as this indicates another node already holds or is
+ *       trying to acquire the global DDL lock.
+ *
  *   2) It sends out a 'acquire_lock' message to all other nodes.
+ *
  *    3) When another node receives a 'acquire_lock' message it checks whether
- *       the ddl lock is already held locally. If so it'll send a
- *       'decline_lock' message back causing the lock acquiration to fail.
- *    4) If a 'acquire_lock' message is received and the DDL lock is not held
- *      locally it'll be acquired and an entry into the 'bdr_global_locks'
- *      table will be made marking the lock to be in the 'catchup' phase.
+ *       the local ddl lock is already held. If so it'll send a 'decline_lock'
+ *       message back causing the lock acquiration to fail.
+ *
+ *    4) If a 'acquire_lock' message is received and the local DDL lock is not
+ *       held it'll be acquired and an entry into the 'bdr_global_locks' table
+ *       will be made marking the lock to be in the 'catchup' phase.
+ *
  *    5) All concurrent user transactions will be cancelled.
+ *
  *   6) A 'request_replay_confirm' message will be sent to all other nodes
  *      containing a lsn that has to be replayed.
+ *
  *    7) When a 'request_replay_confirm' message is received, a
  *       'replay_confirm' message will be sent back.
+ *
  *    8) Once all other nodes have replied with 'replay_confirm' the DDL lock
  *       has been successfully acquired on the node reading the 'acquire_lock'
  *       message (from 3)). The corresponding bdr_global_locks entry will be
  *       updated to the 'acquired' state and a 'confirm_lock' message will be sent out.
+ *
  *    9) Once all nodes have replied with 'confirm_lock' messages the ddl lock
  *      has been acquired.
  *
@@ -249,7 +264,10 @@ bdr_locks_find_my_database(bool create)
 }
 
 /*
+ * This node has just started up. Init its local state and send a startup
+ * announcement message.
  *
+ * Called from the per-db worker.
  */
 void
 bdr_locks_startup(Size nnodes)
@@ -285,7 +303,7 @@ bdr_locks_startup(Size nnodes)
     */
    bdr_prepare_message(&s, BDR_MESSAGE_START);
 
-   elog(DEBUG1, "sending DDL lock startup message");
+   elog(DEBUG1, "bdr: sending DDL lock startup message");
    lsn = LogStandbyMessage(s.data, s.len, false);
    resetStringInfo(&s);
    XLogFlush(lsn);
@@ -322,7 +340,8 @@ bdr_locks_startup(Size nnodes)
                bdr_locks_find_database(DatumGetObjectId(values[7]), false);
            db->lock_holder = node_id;
            db->lockcount++;
-           elog(DEBUG1, "reacquiring DDL lock held before shutdown");
+           /* A remote node might have held the local DDL lock before restart */
+           elog(DEBUG1, "bdr: reacquiring local DDL lock held before shutdown");
        }
        else if (strcmp(state, "catchup") == 0)
        {
@@ -346,10 +365,10 @@ bdr_locks_startup(Size nnodes)
            db->replay_confirmed = 0;
            db->replay_confirmed_lsn = wait_for_lsn;
 
-           elog(DEBUG1, "restarting DDL lock replay catchup phase");
+           elog(DEBUG1, "bdr: restarting DDL lock replay catchup phase");
        }
        else
-           elog(PANIC, "BDR: unknown lockstate");
+           elog(PANIC, "bdr: unknown lockstate");
    }
 
    systable_endscan(scan);
@@ -358,6 +377,8 @@ bdr_locks_startup(Size nnodes)
 
    CommitTransactionCommand();
 
+   elog(DEBUG2, "bdr: ddl locking startup completed, local DML enabled");
+
    /* allow local DML */
    bdr_my_locks_database->locked_and_loaded = true;
 }
@@ -406,7 +427,7 @@ bdr_lock_xact_callback(XactEvent event, void *arg)
        if (bdr_my_locks_database->lockcount > 0)
            bdr_my_locks_database->lockcount--;
        else
-           elog(WARNING, "BDR: releasing unacquired DDL lock");
+           elog(WARNING, "bdr: releasing unacquired DDL lock");
        LWLockRelease(bdr_locks_ctl->lock);
        this_xact_acquired_lock = false;
    }
@@ -455,6 +476,9 @@ locks_begin_scan(Relation rel, Snapshot snap, uint64 sysid, TimeLineID tli, Oid
 
 /*
  * Acquire DDL lock on the side that wants to perform DDL.
+ *
+ * Called from a user backend when the command filter spots a DDL attempt; runs
+ * in the user backend.
  */
 void
 bdr_acquire_ddl_lock(void)
@@ -471,6 +495,8 @@ bdr_acquire_ddl_lock(void)
 
    bdr_locks_find_my_database(false);
 
+   elog(DEBUG2, "bdr: attempting to acquire global DDL lock for (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
+
    /* send message about ddl lock */
    bdr_prepare_message(&s, BDR_MESSAGE_ACQUIRE_LOCK);
 
@@ -493,12 +519,13 @@ bdr_acquire_ddl_lock(void)
                 errmsg("database is locked against ddl by another node"),
                 errhint("Some node in the cluster is performing DDL")));
 
+
    bdr_my_locks_database->acquire_confirmed = 0;
    bdr_my_locks_database->acquire_declined = 0;
    bdr_my_locks_database->waiting_latch = &MyProc->procLatch;
    LWLockRelease(bdr_locks_ctl->lock);
 
-   elog(DEBUG1, "waiting for the other nodes acks");
+   elog(DEBUG2, "bdr: sent DDL lock request, waiting for confirmation");
 
    while (true)
    {
@@ -538,12 +565,16 @@ bdr_acquire_ddl_lock(void)
    }
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
+
    /* TODO: recheck it's ours */
    bdr_my_locks_database->acquire_confirmed = 0;
    bdr_my_locks_database->acquire_declined = 0;
    bdr_my_locks_database->waiting_latch = NULL;
    bdr_my_locks_database->lockcount++;
    this_xact_acquired_lock = true;
+
+   elog(DEBUG1, "bdr: global DDL lock acquired successfully by (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
+
    LWLockRelease(bdr_locks_ctl->lock);
 
 }
@@ -581,6 +612,8 @@ check_is_my_node(uint64 sysid, TimeLineID tli, Oid datid)
 
 /*
  * Another node has asked for a DDL lock. Try to acquire the local ddl lock.
+ *
+ * Runs in the apply worker.
  */
 void
 bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
@@ -596,6 +629,8 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
 
    bdr_locks_find_my_database(false);
 
+   elog(DEBUG2, "bdr: node (" BDR_LOCALID_FORMAT ") requested the global DDL lock", sysid, tli, datid, "");
+
    initStringInfo(&s);
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
@@ -613,6 +648,8 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
         * No previous DDL lock found. Start acquiring it.
         */
 
+       elog(DEBUG2, "bdr: no prior DDL lock found, acquiring local DDL lock");
+
        /* Add a row to bdr_locks */
        StartTransactionCommand();
 
@@ -648,12 +685,13 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
            ForceSyncCommit(); /* async commit would be too complicated */
            heap_close(rel, NoLock);
            CommitTransactionCommand();
+           elog(DEBUG3, "bdr: no conflicting global DDL lock found");
        }
        PG_CATCH();
        {
            if (geterrcode() == ERRCODE_UNIQUE_VIOLATION)
            {
-               elog(DEBUG1, "declining DDL lock because a conflicting DDL lock exists in bdr_global_locks");
+               elog(DEBUG1, "bdr: declining DDL lock because a conflicting DDL lock exists in bdr_global_locks");
                AbortOutOfAnyTransaction();
                goto decline;
            }
@@ -668,7 +706,7 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
        LWLockRelease(bdr_locks_ctl->lock);
 
        /*
-        * Now kill all local processes that are still writing. We won't
+        * Now kill all local processes that are still writing. We can't just
         * prevent them from writing via the acquired lock as they are still
         * running. We're using drastic measures here because it'd be a bad
         * idea to wait: The primary is waiting for us and during that time
@@ -678,6 +716,7 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
         * held conflicting locks, but that's not easiliy possible at the
         * moment.
         */
+       elog(DEBUG3, "bdr: terminating any local processes that conflict with the DDL lock");
        conflicts = GetConflictingVirtualXIDs(InvalidTransactionId, MyDatabaseId);
        while (conflicts->backendId != InvalidBackendId)
        {
@@ -701,6 +740,8 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
                conflicts++;
            else
                pg_usleep(5000);
+
+           elog(DEBUG3, "bdr: signaled %d to terminate, conflicts with ddl lock requested by another node", p);
        }
 
        /*
@@ -709,8 +750,12 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
         * by all other nodes. When the required number of messages is back we
         * can confirm the lock to the original requestor
         * (c.f. bdr_process_replay_confirm()).
+        *
+        * If we didn't wait for everyone to replay local changes then a DDL
+        * change that caused those local changes not to apply on remote
+        * nodes might occur, causing a divergent conflict.
         */
-       elog(DEBUG1, "requesting replay confirmation from all other nodes");
+       elog(DEBUG2, "bdr: ddl locking requesting replay confirmation from all other nodes before confirming lock granted");
 
        wait_for_lsn = GetXLogInsertRecPtr();
        bdr_prepare_message(&s, BDR_MESSAGE_REQUEST_REPLAY_CONFIRM);
@@ -723,6 +768,8 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
        bdr_my_locks_database->replay_confirmed = 0;
        bdr_my_locks_database->replay_confirmed_lsn = wait_for_lsn;
        LWLockRelease(bdr_locks_ctl->lock);
+
+       elog(DEBUG1, "bdr: ddl lock granted to remote node");
    }
    else
    {
@@ -732,7 +779,7 @@ bdr_process_acquire_ddl_lock(uint64 sysid, TimeLineID tli, Oid datid)
        LWLockRelease(bdr_locks_ctl->lock);
 decline:
        ereport(LOG,
-               (errmsg("declining remote DDL lock request, already locked")));
+               (errmsg("bdr: declining remote DDL lock request, already locked")));
        bdr_prepare_message(&s, BDR_MESSAGE_DECLINE_LOCK);
 
        Assert(!IsTransactionState());
@@ -753,6 +800,11 @@ decline:
    }
 }
 
+/*
+ * Another node has released the global DDL lock, update our local state.
+ *
+ * Runs in the apply worker.
+ */
 void
 bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
                             uint64 lock_sysid, TimeLineID lock_tli, Oid lock_datid)
@@ -774,6 +826,8 @@ bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
 
    initStringInfo(&s);
 
+   elog(DEBUG2, "bdr: node (" BDR_LOCALID_FORMAT ") released DDL lock", lock_sysid, lock_tli, lock_datid, "");
+
    /*
     * Remove row from bdr_locks *before* releasing the in memory lock. If we
     * crash we'll replay the event again.
@@ -786,7 +840,7 @@ bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
 
    while ((tuple = systable_getnext(scan)) != NULL)
    {
-       elog(DEBUG1, "found ddl lock entry to delete in response to ddl lock release message");
+       elog(DEBUG3, "bdr: found DDL lock entry to delete in response to DDL lock release message");
        simple_heap_delete(rel, &tuple->t_self);
        ForceSyncCommit(); /* async commit would be too complicated */
        found = true;
@@ -798,7 +852,11 @@ bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
    CommitTransactionCommand();
 
    if (!found)
-       elog(WARNING, "did not find local DDL lock entry about a remotely released lock");
+       ereport(WARNING,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("bdr: Did not find local DDL lock entry for a remotely released global DDL lock"),
+                errdetail("node ("BDR_LOCALID_FORMAT") sent a release message but the lock isn't held locally",
+                          lock_sysid, lock_tli, lock_datid, "")));
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
    if (bdr_my_locks_database->lockcount > 0)
@@ -807,19 +865,25 @@ bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
        bdr_my_locks_database->lock_holder = InvalidRepNodeId;
    }
    else
-       elog(WARNING, "releasing DDL lock without corresponding in-memory state");
+       /* This shouldn't happen; probable bug if reached */
+       elog(WARNING, "bdr: releasing DDL lock without corresponding in-memory state");
 
    latch = bdr_my_locks_database->waiting_latch;
    LWLockRelease(bdr_locks_ctl->lock);
 
+   elog(DEBUG1, "bdr: local DDL lock released");
+
    /* notify an eventual waiter */
    if(latch)
        SetLatch(latch);
 }
 
 /*
- * Another node has confirmed a lock. Changed shared memory state and wakeup
- * the locker.
+ * Another node has confirmed that a node has acquired the DDL lock
+ * successfully. If the acquiring node was us, change shared memory state and
+ * wake up the user backend that was trying to acquire the lock.
+ *
+ * Runs in the apply worker.
  */
 void
 bdr_process_confirm_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
@@ -840,8 +904,9 @@ bdr_process_confirm_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
    bdr_my_locks_database->acquire_confirmed++;
    latch = bdr_my_locks_database->waiting_latch;
 
-   elog(DEBUG1, "received ddl lock confirmation number %d/%zu",
-        bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes);
+   elog(DEBUG2, "bdr: received DDL lock confirmation number %d/%zu from ("BDR_LOCALID_FORMAT")",
+        bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes,
+        origin_sysid, origin_tli, origin_datid, "");
    LWLockRelease(bdr_locks_ctl->lock);
 
    if(latch)
@@ -849,8 +914,10 @@ bdr_process_confirm_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
 }
 
 /*
- * Another node has declined a lock. Changed shared memory state and wakeup
- * the locker.
+ * Another node has declined a lock. If it was us, change shared memory state
+ * and wakeup the user backend that tried to acquire the lock.
+ *
+ * Runs in the apply worker.
  */
 void
 bdr_process_decline_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid origin_datid,
@@ -870,8 +937,17 @@ bdr_process_decline_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
    LWLockRelease(bdr_locks_ctl->lock);
    if(latch)
        SetLatch(latch);
+
+   elog(DEBUG2, "bdr: node ("BDR_LOCALID_FORMAT") declined our DDL global lock request",
+        origin_sysid, origin_tli, origin_datid, "");
 }
 
+/*
+ * Another node has asked us to confirm that we've replayed up to a given LSN.
+ * We've seen the request message, so send the requested confirmation.
+ *
+ * Runs in the walsender.
+ */
 void
 bdr_process_request_replay_confirm(uint64 sysid, TimeLineID tli,
                                   Oid datid, XLogRecPtr request_lsn)
@@ -884,13 +960,27 @@ bdr_process_request_replay_confirm(uint64 sysid, TimeLineID tli,
 
    bdr_locks_find_my_database(false);
 
+   elog(DEBUG2, "bdr: node ("BDR_LOCALID_FORMAT") requested replay confirmation; sending",
+        sysid, tli, datid, "");
+
    initStringInfo(&s);
    bdr_prepare_message(&s, BDR_MESSAGE_REPLAY_CONFIRM);
    pq_sendint64(&s, request_lsn);
    lsn = LogStandbyMessage(s.data, s.len, false);
    XLogFlush(lsn);
+
 }
 
+/*
+ * A remote node has seen a replay confirmation request and replied to it.
+ *
+ * If we sent the original request, update local state appropriately.
+ *
+ * If a DDL lock request has reached quorum as a result of this confirmation,
+ * write a log acquisition confirmation and bdr_global_locks update to xlog.
+ *
+ * Runs in the apply worker.
+ */
 void
 bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
                           Oid datid, XLogRecPtr request_lsn)
@@ -903,7 +993,8 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
    bdr_locks_find_my_database(false);
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
-   elog(DEBUG1, "processing replay confirmation for request %X/%X at %X/%X",
+   elog(DEBUG2, "bdr: processing replay confirmation from node ("BDR_LOCALID_FORMAT") for request %X/%X at %X/%X",
+        sysid, tli, datid, "",
         (uint32)(bdr_my_locks_database->replay_confirmed_lsn >> 32),
         (uint32)bdr_my_locks_database->replay_confirmed_lsn,
         (uint32)(request_lsn >> 32),
@@ -914,7 +1005,7 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
    {
        bdr_my_locks_database->replay_confirmed++;
 
-       elog(DEBUG1, "confirming replay %u/%zu",
+       elog(DEBUG2, "bdr: confirming replay %u/%zu",
             bdr_my_locks_database->replay_confirmed,
             bdr_my_locks_database->nnodes);
 
@@ -938,6 +1029,8 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
 
        initStringInfo(&s);
 
+       elog(DEBUG2, "bdr: DDL lock quorum reached, logging confirmation of this node's acquisition of global DDL lock");
+
        /* clear out information about requested confirmations */
        LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
        bdr_my_locks_database->replay_confirmed = 0;
@@ -977,9 +1070,9 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
            bool        isnull[10];
 
            if (found)
-               elog(PANIC, "duplicate lock?");
+               elog(PANIC, "bdr: duplicate lock?");
 
-           elog(DEBUG1, "updating DDL lock state from 'catchup' to 'acquired'");
+           elog(DEBUG1, "bdr: updating DDL lock state from 'catchup' to 'acquired'");
 
            heap_deform_tuple(tuple, RelationGetDescr(rel),
                              values, isnull);
@@ -994,7 +1087,7 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
        }
 
        if (!found)
-           elog(PANIC, "unknown lock");
+           elog(PANIC, "bdr: got confirmation for unknown lock");
 
        systable_endscan(scan);
        UnregisterSnapshot(snap);
@@ -1002,7 +1095,7 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
 
        CommitTransactionCommand();
 
-       elog(DEBUG1, "sending confirmation for DDL lock replay confirmation request");
+       elog(DEBUG2, "bdr: sent confirmation of successful DDL lock acquisition");
    }
 }
 
@@ -1013,6 +1106,12 @@ bdr_locks_always_allow_writes(bool always_allow)
    bdr_always_allow_writes = always_allow;
 }
 
+/*
+ * A remote node has sent a startup message. Update any appropriate local state
+ * like any locally held DDL locks for it.
+ *
+ * Runs in the apply worker.
+ */
 void
 bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
 {
@@ -1026,6 +1125,9 @@ bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
 
    initStringInfo(&s);
 
+   elog(DEBUG2, "bdr: Got startup message from node ("BDR_LOCALID_FORMAT"), clearing any locks it held",
+        sysid, tli, datid);
+
    StartTransactionCommand();
    snap = RegisterSnapshot(GetLatestSnapshot());
    rel = heap_open(BdrLocksRelid, RowExclusiveLock);
@@ -1034,13 +1136,13 @@ bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
 
    while ((tuple = systable_getnext(scan)) != NULL)
    {
-       elog(DEBUG1, "found remote lock to delete (after remote restart)");
+       elog(DEBUG2, "bdr: found remote lock to delete (after remote restart)");
 
        simple_heap_delete(rel, &tuple->t_self);
        /* FIXME: locks */
        LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
        if (bdr_my_locks_database->lockcount == 0)
-           elog(WARNING, "bdr_global_locks row exists without corresponding in memory state");
+           elog(WARNING, "bdr: bdr_global_locks row exists without corresponding in memory state");
        else
        {
            bdr_my_locks_database->lockcount--;
@@ -1057,6 +1159,12 @@ bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
    CommitTransactionCommand();
 }
 
+/*
+ * The BDR ExecutorStart_hook that detects DDL and handles DDL lock acquision
+ * requests.
+ *
+ * Runs in user backends.
+ */
 static void
 BdrExecutorStart(QueryDesc *queryDesc, int eflags)
 {
@@ -1079,9 +1187,14 @@ BdrExecutorStart(QueryDesc *queryDesc, int eflags)
 
    /* is the database still starting up and hasn't loaded locks */
    if (!bdr_my_locks_database->locked_and_loaded)
+       /*
+        * TODO: sleep here instead of ERRORing, and let statement_timeout
+        * ERROR if appropriate
+        */
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("database is not yet ready for writes"),
+                errmsg("bdr: database is not yet ready for DDL operations"),
+                errdetail("BDR DDL locking is still starting up"),
                 errhint("Wait for a short time and retry.")));
 
    /* Is this database locked against user initiated ddl? */
@@ -1089,7 +1202,7 @@ BdrExecutorStart(QueryDesc *queryDesc, int eflags)
    if (bdr_my_locks_database->lockcount > 0 && !this_xact_acquired_lock)
        ereport(ERROR,
                (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                errmsg("database is locked against writes"),
+                errmsg("bdr: database is locked against DDL operations"),
                 errhint("Some node in the cluster is performing DDL")));
 
 done: