Make DML operation wait when there is DDL in progress.
authorPetr Jelinek <pjmodos@pjmodos.net>
Fri, 1 May 2015 13:51:37 +0000 (15:51 +0200)
committerPetr Jelinek <pjmodos@pjmodos.net>
Fri, 1 May 2015 13:56:21 +0000 (15:56 +0200)
Previously, we immediately raised ERROR when executing DML while DDL
lock was being acquired or held. Now we just keep them waiting until
either the lock is released or statement_timeout kills them.

Update isolation tests accordingly.

bdr_locks.c
expected/isolation/alter_table.out
specs/isolation/alter_table.spec

index 3c6628209100d8fd4757fecd50dc5c756f8db5b5..6bdc00eacec978e9eca52ad50600e29b74345fb6 100644 (file)
 #include "utils/fmgroids.h"
 #include "utils/snapmgr.h"
 
+typedef struct BDRLockWaiter {
+   PGPROC     *proc;
+   slist_node  node;
+} BDRLockWaiter;
+
 typedef struct BdrLocksDBState {
    /* db slot used */
    bool        in_use;
@@ -131,18 +136,23 @@ typedef struct BdrLocksDBState {
    int         replay_confirmed;
    XLogRecPtr  replay_confirmed_lsn;
 
-   Latch      *waiting_latch;
+   Latch      *requestor;
+   slist_head  waiters;        /* list of waiting PGPROCs */
 } BdrLocksDBState;
 
 typedef struct BdrLocksCtl {
    LWLock     *lock;
-   BdrLocksDBState dbstate[FLEXIBLE_ARRAY_MEMBER];
+   BdrLocksDBState   *dbstate;
+   BDRLockWaiter     *waiters;
 } BdrLocksCtl;
 
 static BdrLocksDBState * bdr_locks_find_database(Oid dbid, bool create);
 static void bdr_locks_find_my_database(bool create);
 static void bdr_prepare_message(StringInfo s, BdrMessageType message_type);
 
+void bdr_locks_addwaiter(PGPROC *proc);
+void bdr_locks_on_unlock(void);
+
 static BdrLocksCtl *bdr_locks_ctl;
 
 /* shmem init hook to chain to on startup, if any */
@@ -162,9 +172,11 @@ static size_t
 bdr_locks_shmem_size(void)
 {
    Size        size = 0;
+   uint32      TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS;
 
    size = add_size(size, sizeof(BdrLocksCtl));
    size = add_size(size, mul_size(sizeof(BdrLocksDBState), bdr_max_databases));
+   size = add_size(size, mul_size(sizeof(BDRLockWaiter), TotalProcs));
 
    return size;
 }
@@ -185,6 +197,9 @@ bdr_locks_shmem_startup(void)
    {
        memset(bdr_locks_ctl, 0, bdr_locks_shmem_size());
        bdr_locks_ctl->lock = LWLockAssign();
+       bdr_locks_ctl->dbstate = (BdrLocksDBState *) bdr_locks_ctl + sizeof(BdrLocksCtl);
+       bdr_locks_ctl->waiters = (BDRLockWaiter *) bdr_locks_ctl + sizeof(BdrLocksCtl) +
+           mul_size(sizeof(BdrLocksDBState), bdr_max_databases);
    }
    LWLockRelease(AddinShmemInitLock);
 }
@@ -205,6 +220,33 @@ bdr_locks_shmem_init()
    shmem_startup_hook = bdr_locks_shmem_startup;
 }
 
+/* Waiter manipulation. */
+void
+bdr_locks_addwaiter(PGPROC *proc)
+{
+   BDRLockWaiter  *waiter = &bdr_locks_ctl->waiters[proc->pgprocno];
+
+   waiter->proc = proc;
+   slist_push_head(&bdr_my_locks_database->waiters, &waiter->node);
+}
+
+void
+bdr_locks_on_unlock(void)
+{
+   while (!slist_is_empty(&bdr_my_locks_database->waiters))
+   {
+       slist_node *node;
+       BDRLockWaiter  *waiter;
+       PGPROC     *proc;
+
+       node = slist_pop_head_node(&bdr_my_locks_database->waiters);
+       waiter = slist_container(BDRLockWaiter, node, node);
+       proc = waiter->proc;
+
+       SetLatch(&proc->procLatch);
+   }
+}
+
 /*
  * Find, and create if necessary, the lock state entry for dboid.
  */
@@ -279,9 +321,8 @@ bdr_locks_startup()
    SysScanDesc     scan;
    Snapshot        snap;
    HeapTuple       tuple;
-
-   XLogRecPtr  lsn;
-   StringInfoData s;
+   XLogRecPtr      lsn;
+   StringInfoData  s;
 
    Assert(IsUnderPostmaster);
    Assert(!IsTransactionState());
@@ -296,6 +337,8 @@ bdr_locks_startup()
    if (bdr_my_locks_database->locked_and_loaded)
        return;
 
+   slist_init(&bdr_my_locks_database->waiters);
+
    /* We haven't yet established how many nodes we're connected to. */
    bdr_my_locks_database->nnodes = 0;
 
@@ -478,7 +521,10 @@ bdr_lock_xact_callback(XactEvent event, void *arg)
        this_xact_acquired_lock = false;
        bdr_my_locks_database->replay_confirmed = 0;
        bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
-       bdr_my_locks_database->waiting_latch = NULL;
+       bdr_my_locks_database->requestor = NULL;
+
+       if (bdr_my_locks_database->lockcount == 0)
+            bdr_locks_on_unlock();
 
        LWLockRelease(bdr_locks_ctl->lock);
    }
@@ -606,7 +652,7 @@ bdr_acquire_ddl_lock(void)
    this_xact_acquired_lock = true;
    bdr_my_locks_database->acquire_confirmed = 0;
    bdr_my_locks_database->acquire_declined = 0;
-   bdr_my_locks_database->waiting_latch = &MyProc->procLatch;
+   bdr_my_locks_database->requestor = &MyProc->procLatch;
 
    /* lock looks to be free, try to acquire it */
 
@@ -664,7 +710,7 @@ bdr_acquire_ddl_lock(void)
    /* TODO: recheck it's ours */
    bdr_my_locks_database->acquire_confirmed = 0;
    bdr_my_locks_database->acquire_declined = 0;
-   bdr_my_locks_database->waiting_latch = NULL;
+   bdr_my_locks_database->requestor = NULL;
 
    elog(DEBUG1, "global DDL lock acquired successfully by (" BDR_LOCALID_FORMAT ")", BDR_LOCALID_FORMAT_ARGS);
 
@@ -1003,11 +1049,14 @@ bdr_process_release_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
        /* XXX: recheck owner of lock */
    }
 
-   latch = bdr_my_locks_database->waiting_latch;
+   latch = bdr_my_locks_database->requestor;
 
    bdr_my_locks_database->replay_confirmed = 0;
    bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
-   bdr_my_locks_database->waiting_latch = NULL;
+   bdr_my_locks_database->requestor = NULL;
+
+   if (bdr_my_locks_database->lockcount == 0)
+        bdr_locks_on_unlock();
 
    LWLockRelease(bdr_locks_ctl->lock);
 
@@ -1044,7 +1093,7 @@ bdr_process_confirm_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
    bdr_my_locks_database->acquire_confirmed++;
-   latch = bdr_my_locks_database->waiting_latch;
+   latch = bdr_my_locks_database->requestor;
 
    elog(DEBUG2, "received DDL lock confirmation number %d/%zu from ("BDR_LOCALID_FORMAT")",
         bdr_my_locks_database->acquire_confirmed, bdr_my_locks_database->nnodes,
@@ -1077,7 +1126,7 @@ bdr_process_decline_ddl_lock(uint64 origin_sysid, TimeLineID origin_tli, Oid ori
 
    LWLockAcquire(bdr_locks_ctl->lock, LW_EXCLUSIVE);
    bdr_my_locks_database->acquire_declined++;
-   latch = bdr_my_locks_database->waiting_latch;
+   latch = bdr_my_locks_database->requestor;
    LWLockRelease(bdr_locks_ctl->lock);
    if(latch)
        SetLatch(latch);
@@ -1180,7 +1229,7 @@ bdr_process_replay_confirm(uint64 sysid, TimeLineID tli,
 
        bdr_my_locks_database->replay_confirmed = 0;
        bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
-       bdr_my_locks_database->waiting_latch = NULL;
+       bdr_my_locks_database->requestor = NULL;
 
        bdr_prepare_message(&s, BDR_MESSAGE_CONFIRM_LOCK);
 
@@ -1291,6 +1340,10 @@ bdr_locks_process_remote_startup(uint64 sysid, TimeLineID tli, Oid datid)
            bdr_my_locks_database->replay_confirmed = 0;
            bdr_my_locks_database->replay_confirmed_lsn = InvalidXLogRecPtr;
        }
+
+       if (bdr_my_locks_database->lockcount == 0)
+            bdr_locks_on_unlock();
+
        LWLockRelease(bdr_locks_ctl->lock);
    }
 
@@ -1314,36 +1367,45 @@ bdr_locks_check_dml(void)
 
    bdr_locks_find_my_database(false);
 
-   /* is the database still starting up and hasn't loaded locks */
-   if (!bdr_my_locks_database->locked_and_loaded)
-       /*
-        * TODO: sleep here instead of ERRORing, and let statement_timeout
-        * ERROR if appropriate
-        */
-       ereport(ERROR,
-               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("Database is not yet ready for DDL operations"),
-                errdetail("BDR DDL locking is still starting up"),
-                errhint("Wait for a short time and retry.")));
+   /*
+    * The bdr is still starting up and hasn't loaded locks, wait for it.
+    * The statement_timeout will kill us if necessary.
+    */
+   while (!bdr_my_locks_database->locked_and_loaded)
+   {
+       CHECK_FOR_INTERRUPTS();
+
+       /* Probably can't use latch here easily, since init didn't happen yet. */
+       pg_usleep(5000L);
+   }
 
    /* Is this database locked against user initiated ddl? */
    pg_memory_barrier();
    if (bdr_my_locks_database->lockcount > 0 && !this_xact_acquired_lock)
    {
-       uint64      holder_sysid;
-       TimeLineID  holder_tli;
-       Oid         holder_datid;
+       bdr_locks_addwaiter(MyProc);
 
-       bdr_fetch_sysid_via_node_id(bdr_my_locks_database->lock_holder,
-                                   &holder_sysid, &holder_tli,
-                                   &holder_datid);
+       /* Wait for lock to be released. */
+       for (;;)
+       {
+           int rc;
 
-       ereport(ERROR,
-               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                errmsg("Database is locked against DDL operations"),
-                errhint("Node ("UINT64_FORMAT",%u,%u) in the cluster is already performing DDL",
-                        holder_sysid, holder_tli, holder_datid)));
+           CHECK_FOR_INTERRUPTS();
+
+           pg_memory_barrier();
+           if (bdr_my_locks_database->lockcount == 0)
+               break;
 
+           rc = WaitLatch(&MyProc->procLatch,
+                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                          10000L);
+
+           ResetLatch(&MyProc->procLatch);
+
+           /* emergency bailout if postmaster has died */
+           if (rc & WL_POSTMASTER_DEATH)
+               proc_exit(1);
+       }
    }
 }
 
index 9af4e9d13c5560134b40eb657efb615569c5c028..492c88c2900d013dce53aeb7821e26e73de5542e 100644 (file)
@@ -1,8 +1,8 @@
 Parsed test spec with 2 sessions
 
-starting permutation: n1setup n2setup n1s1 n1sync n2read n1s2 n1sync n2read n1s3 n1s4 n1sync n2read n1s5 n1s6 n1sync n2read n1s7 n1sync n2read n1s8 n1s9 n1s10 n2s1 n2sync n1s11 n1sync n2sync n2read n1read n2s2 n2sync n1read
+starting permutation: n1setup n2setup n1s1 n1sync n2read n1s2 n1sync n2read n1s3 n1s4 n1sync n2read n1s5 n1s6 n1sync n2read n1s7 n1sync n2read n1s8 n1s9 n1s10 n2sync n2s1 n1s11 n1sync n2sync n2read n1read n2s2 n2sync n1read
 step n1setup: SET bdr.permit_ddl_locking = true;
-step n2setup: SET bdr.permit_ddl_locking = true;
+step n2setup: SET bdr.permit_ddl_locking = true; SET statement_timeout = '5s';
 step n1s1: INSERT INTO tst (a, b) VALUES (4, 'four');
 step n1sync: SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
 pg_xlog_wait_remote_apply
@@ -98,8 +98,6 @@ step n1s8: ALTER TABLE tst ADD COLUMN d INTEGER DEFAULT 100;
 ERROR:  ALTER TABLE ... ADD COLUMN ... DEFAULT may only affect UNLOGGED or TEMPORARY tables when BDR is active; tst is a regular table
 step n1s9: BEGIN;
 step n1s10: ALTER TABLE tst DROP COLUMN c;
-step n2s1: UPDATE tst SET c = 'changed' WHERE a = 1;
-ERROR:  Database is locked against DDL operations
 step n2sync: SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
 pg_xlog_wait_remote_apply
 
@@ -109,6 +107,8 @@ pg_xlog_wait_remote_apply
                
                
                
+step n2s1: UPDATE tst SET c = 'changed' WHERE a = 1;
+ERROR:  canceling statement due to statement timeout
 step n1s11: COMMIT;
 step n1sync: SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
 pg_xlog_wait_remote_apply
index 76deb152354944dace74cb08c901c2348c892839..11506b10e5f9e590cb9149e4d76d27a9a10cec8a 100644 (file)
@@ -34,10 +34,10 @@ step "n1s11" { COMMIT; }
 
 session "s2"
 connection "node2"
-step "n2setup" { SET bdr.permit_ddl_locking = true; }
+step "n2setup" { SET bdr.permit_ddl_locking = true; SET statement_timeout = '5s'; }
 step "n2sync" { SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication; }
 step "n2read" { SELECT * FROM tst ORDER BY a; }
 step "n2s1" { UPDATE tst SET c = 'changed' WHERE a = 1; }
 step "n2s2" { UPDATE tst SET b = 'changed' WHERE a = 1; }
 
-permutation "n1setup" "n2setup" "n1s1" "n1sync" "n2read" "n1s2" "n1sync" "n2read" "n1s3" "n1s4" "n1sync" "n2read" "n1s5" "n1s6" "n1sync" "n2read" "n1s7" "n1sync" "n2read" "n1s8" "n1s9" "n1s10" "n2s1" "n2sync" "n1s11" "n1sync" "n2sync" "n2read" "n1read" "n2s2" "n2sync" "n1read"
+permutation "n1setup" "n2setup" "n1s1" "n1sync" "n2read" "n1s2" "n1sync" "n2read" "n1s3" "n1s4" "n1sync" "n2read" "n1s5" "n1s6" "n1sync" "n2read" "n1s7" "n1sync" "n2read" "n1s8" "n1s9" "n1s10" "n2sync" "n2s1" "n1s11" "n1sync" "n2sync" "n2read" "n1read" "n2s2" "n2sync" "n1read"