bdr: Further improve and seperate out BDR worker infrastructure.
authorAndres Freund <andres@anarazel.de>
Thu, 5 Feb 2015 13:08:01 +0000 (14:08 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 12 Feb 2015 09:16:58 +0000 (10:16 +0100)
Most shared memory handling now resides in bdr_shmem.c and worker
slots are acquired using a distinct function.

This has the advantage of making bdr.c a bit less of a hodgepodge and
is preparation for acquiring worker slots in walsenders.

Makefile.in
bdr.c
bdr.h
bdr_apply.c
bdr_perdb.c
bdr_shmem.c [new file with mode: 0644]

index 318c063f1be03956fba13b3087a37030234346fa..d100292e2267da869704bac46563469b031186cd 100644 (file)
@@ -59,6 +59,7 @@ OBJS = \
    bdr_output.o \
    bdr_relcache.o \
    bdr_remotecalls.o \
+   bdr_shmem.o \
    bdr_supervisor.o \
    bdr_upgrade.o
 
diff --git a/bdr.c b/bdr.c
index e97b02354432bcf2ba8e9db413032f0ad38c582f..9b46f1d64352f100abf0c4b4dd7daa3b329a80fc 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -87,26 +87,9 @@ static bool bdr_skip_ddl_replication;
 bool bdr_skip_ddl_locking;
 bool bdr_do_not_replicate;
 
-/* shmem init hook to chain to on startup, if any */
-static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
-
-/* Store kind of BDR worker for the current proc, mainly for debugging */
-BdrWorkerType bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
-
-/* shortcut for finding the the worker shmem block */
-BdrWorkerControl *BdrWorkerCtl = NULL;
-
-/* This worker's block within BdrWorkerCtl - only valid in bdr workers */
-BdrWorker  *bdr_worker_slot = NULL;
-
-/* Worker generation number; see bdr_worker_shmem_startup comments */
-static uint16 bdr_worker_generation;
-
-
 PG_MODULE_MAGIC;
 
 void       _PG_init(void);
-static void bdr_worker_shmem_startup(void);
 
 PGDLLEXPORT Datum bdr_apply_pause(PG_FUNCTION_ARGS);
 PGDLLEXPORT Datum bdr_apply_resume(PG_FUNCTION_ARGS);
@@ -413,20 +396,6 @@ bdr_create_slot(PGconn *streamConn, Name slot_name,
    PQclear(res);
 }
 
-static void
-bdr_worker_exit(int code, Datum arg)
-{
-   if (bdr_worker_slot == NULL)
-       return;
-
-   LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
-   bdr_worker_slot->worker_pid = 0;
-   bdr_worker_slot->worker_proc = NULL;
-   LWLockRelease(BdrWorkerCtl->lock);
-
-   bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
-}
-
 /*
  * Perform setup work common to all bdr worker types, such as:
  *
@@ -436,15 +405,19 @@ bdr_worker_exit(int code, Datum arg)
  *
  */
 void
-bdr_worker_init(uint32 worker_arg)
+bdr_bgworker_init(uint32 worker_arg, BdrWorkerType worker_type)
 {
    uint16  worker_generation;
    uint16  worker_idx;
    char   *dbname;
 
+   Assert(IsBackgroundWorker);
+
    worker_generation = (uint16)(worker_arg >> 16);
    worker_idx = (uint16)(worker_arg & 0x0000FFFF);
 
+   bdr_worker_shmem_acquire(worker_type, worker_idx);
+
    if (worker_generation != BdrWorkerCtl->worker_generation)
    {
        elog(DEBUG1, "apply worker from generation %d exiting after finding shmem generation is %d",
@@ -452,19 +425,10 @@ bdr_worker_init(uint32 worker_arg)
        proc_exit(0);
    }
 
-   Assert(IsBackgroundWorker);
-
-   /* first acquire worker slot */
-   bdr_worker_slot = &BdrWorkerCtl->slots[worker_idx];
-   bdr_worker_type = bdr_worker_slot->worker_type;
-
-   /* register release function */
-   before_shmem_exit(bdr_worker_exit, 0);
-
    /* figure out database to connect to */
-   if (bdr_worker_type == BDR_WORKER_PERDB)
+   if (worker_type == BDR_WORKER_PERDB)
        dbname = NameStr(bdr_worker_slot->data.perdb.dbname);
-   else if (bdr_worker_type == BDR_WORKER_APPLY)
+   else if (worker_type == BDR_WORKER_APPLY)
    {
        BdrApplyWorker  *apply;
        BdrPerdbWorker  *perdb;
@@ -629,188 +593,6 @@ bdr_establish_connection_and_slot(const char *dsn,
    return streamConn;
 }
 
-static size_t
-bdr_worker_shmem_size()
-{
-   Size        size = 0;
-
-   size = add_size(size, sizeof(BdrWorkerControl));
-   size = add_size(size, mul_size(bdr_max_workers, sizeof(BdrWorker)));
-
-   return size;
-}
-
-/*
- * Allocate a shared memory segment big enough to hold bdr_max_workers entries
- * in the array of BDR worker info structs (BdrApplyWorker).
- *
- * Called during _PG_init, but not during postmaster restart.
- */
-static void
-bdr_worker_alloc_shmem_segment()
-{
-   Assert(process_shared_preload_libraries_in_progress);
-
-   /* Allocate enough shmem for the worker limit ... */
-   RequestAddinShmemSpace(bdr_worker_shmem_size());
-
-   /*
-    * We'll need to be able to take exclusive locks so only one per-db backend
-    * tries to allocate or free blocks from this array at once.  There won't
-    * be enough contention to make anything fancier worth doing.
-    */
-   RequestAddinLWLocks(1);
-
-   /*
-    * Whether this is a first startup or crash recovery, we'll be re-initing
-    * the bgworkers.
-    */
-   BdrWorkerCtl = NULL;
-
-   prev_shmem_startup_hook = shmem_startup_hook;
-   shmem_startup_hook = bdr_worker_shmem_startup;
-}
-
-/*
- * Init the header for our shm segment, if not already done.
- *
- * Called during postmaster start or restart, in the context of the postmaster.
- */
-static void
-bdr_worker_shmem_startup(void)
-{
-   bool        found;
-
-   if (prev_shmem_startup_hook != NULL)
-       prev_shmem_startup_hook();
-
-   LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
-   BdrWorkerCtl = ShmemInitStruct("bdr_worker",
-                                 bdr_worker_shmem_size(),
-                                 &found);
-   if (!found)
-   {
-       /* Must be in postmaster its self */
-       Assert(IsPostmasterEnvironment && !IsUnderPostmaster);
-
-       /* Init shm segment header after postmaster start or restart */
-       memset(BdrWorkerCtl, 0, bdr_worker_shmem_size());
-       BdrWorkerCtl->lock = LWLockAssign();
-       /* Assigned on supervisor launch */
-       BdrWorkerCtl->supervisor_latch = NULL;
-
-       /*
-        * The postmaster keeps track of a generation number for BDR workers
-        * and increments it at each restart.
-        *
-        * Background workers aren't unregistered when the postmaster restarts
-        * and clears shared memory, so after a restart the supervisor and
-        * per-db workers have no idea what workers are/aren't running, nor any
-        * way to control them. To make a clean BDR restart possible the
-        * workers registered before the restart need to find out about the
-        * restart and terminate.
-        *
-        * To make that possible we pass the generation number to the worker
-        * in its main argument, and also set it in shared memory. The two
-        * must match. If they don't, the worker will proc_exit(0), causing its
-        * self to be unregistered.
-        *
-        * This should really be part of the bgworker API its self, handled via
-        * a BGW_NO_RESTART_ON_CRASH flag or by providing a generation number
-        * as a bgworker argument. However, for now we're stuck with this
-        * workaround.
-        */
-       if (bdr_worker_generation == UINT16_MAX)
-           /* We could handle wrap-around, but really ... */
-           elog(FATAL, "Too many postmaster crash/restart cycles. Restart the PostgreSQL server.");
-
-       BdrWorkerCtl->worker_generation = ++bdr_worker_generation;
-   }
-   LWLockRelease(AddinShmemInitLock);
-
-   /*
-    * We don't have anything to preserve on shutdown and don't support being
-    * unloaded from a running Pg, so don't register any shutdown hook.
-    */
-}
-
-
-/*
- * Allocate a block from the bdr_worker shm segment in BdrWorkerCtl, or ERROR
- * if there are no free slots.
- *
- * The block is zeroed. The worker type is set in the header.
- *
- * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
- *
- * To release a block, use bdr_worker_shmem_release(...)
- *
- * You must hold BdrWorkerCtl->lock in LW_EXCLUSIVE mode for
- * this call.
- */
-BdrWorker*
-bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
-{
-   int i;
-
-   Assert(LWLockHeldByMe(BdrWorkerCtl->lock));
-   for (i = 0; i < bdr_max_workers; i++)
-   {
-       BdrWorker *new_entry = &BdrWorkerCtl->slots[i];
-       if (new_entry->worker_type == BDR_WORKER_EMPTY_SLOT)
-       {
-           memset(new_entry, 0, sizeof(BdrWorker));
-           new_entry->worker_type = worker_type;
-           if (ctl_idx)
-               *ctl_idx = i;
-           return new_entry;
-       }
-   }
-   ereport(ERROR,
-           (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-           errmsg("No free bdr worker slots - bdr.max_workers is too low")));
-   /* unreachable */
-}
-
-/*
- * Release a block allocated by bdr_worker_shmem_alloc so it can be
- * re-used.
- *
- * The bgworker *must* no longer be running.
- *
- * If passed, the bgworker handle is checked to ensure the worker
- * is not still running before the slot is released.
- */
-void
-bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle)
-{
-   LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
-
-   /* Already free? Do nothing */
-   if (worker->worker_type != BDR_WORKER_EMPTY_SLOT)
-   {
-       /* Sanity check - ensure any associated dynamic bgworker is stopped */
-       if (handle)
-       {
-           pid_t pid;
-           BgwHandleStatus status;
-           status = GetBackgroundWorkerPid(handle, &pid);
-           if (status == BGWH_STARTED)
-           {
-               LWLockRelease(BdrWorkerCtl->lock);
-               elog(ERROR, "BUG: Attempt to release shm segment for bdr worker type=%d pid=%d that's still alive",
-                    worker->worker_type, pid);
-           }
-       }
-
-       /* Mark it as free */
-       worker->worker_type = BDR_WORKER_EMPTY_SLOT;
-       /* and for good measure, zero it so problems are seen immediately */
-       memset(worker, 0, sizeof(BdrWorker));
-   }
-   LWLockRelease(BdrWorkerCtl->lock);
-}
-
 static bool
 bdr_do_not_replicate_check_hook(bool *newvalue, void **extra, GucSource source)
 {
@@ -873,15 +655,6 @@ _PG_init(void)
                 errmsg("bdr requires \"track_commit_timestamp\" to be enabled")));
 #endif
 
-   /*
-    * _PG_init only runs on first load, not on postmaster restart, so
-    * set the worker generation here. See bdr_worker_shmem_startup.
-    *
-    * It starts at 1 because the postmaster zeroes shmem on restart, so 0 can
-    * mean "just restarted, hasn't run shmem setup callback yet".
-    */
-   bdr_worker_generation = 1;
-
    /*
     * Force btree_gist to be loaded - its absolutely not required at this
     * point, but since it's required for BDR to be used it's much easier to
@@ -1052,28 +825,15 @@ _PG_init(void)
    }
 
    /*
-    * Allocate a shared memory segment to store the bgworker connection
-    * information we must pass to each worker we launch.
-    *
-    * This registers a hook on shm initialization, bdr_worker_shmem_startup(),
-    * which populates the shm segment with configured apply workers using data
-    * in bdr_connection_configs.
+    * Reserve shared memory segment to store bgworker connection information
+    * and hook into shmem initialization.
     */
-   bdr_worker_alloc_shmem_segment();
+   bdr_shmem_init();
 
    EmitWarningsOnPlaceholders("bdr");
 
-   /*
-    * initialize other modules that need shared memory
-    */
-
-   /* register a slot for every remote node */
-   bdr_count_shmem_init(bdr_max_workers);
    bdr_executor_init();
-#ifdef BUILDING_BDR
-   bdr_sequencer_shmem_init(bdr_max_workers, bdr_max_databases);
-#endif
-   bdr_locks_shmem_init();
+
    /* Set up a ProcessUtility_hook to stop unsupported commands being run */
    init_bdr_commandfilter();
 
diff --git a/bdr.h b/bdr.h
index faa16206cdf4479b031d16714db8eea76f5d7331..f48afd5f7efd031b278c9ab21706669a9315f68f 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -441,13 +441,17 @@ extern bool bdr_get_bigendian(void);
 /* initialize a new bdr member */
 extern void bdr_init_replica(BDRNodeInfo *local_node);
 
-/* shared memory management */
 extern void bdr_maintain_schema(bool update_extensions);
+
+/* shared memory management */
+extern void bdr_shmem_init(void);
+
 extern BdrWorker* bdr_worker_shmem_alloc(BdrWorkerType worker_type,
                                         uint32 *ctl_idx);
 extern void bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle);
-
-extern BdrWorker* bdr_worker_shmem_acquire(uint32 *ctl_idx);
+extern void bdr_worker_shmem_acquire(BdrWorkerType worker_type,
+                                          uint32 worker_idx);
+extern void bdr_worker_shmem_release(void);
 
 extern bool bdr_is_bdr_activated_db(Oid dboid);
 
@@ -468,7 +472,7 @@ PGDLLEXPORT extern void bdr_apply_main(Datum main_arg);
 PGDLLEXPORT extern void bdr_perdb_worker_main(Datum main_arg);
 PGDLLEXPORT extern void bdr_supervisor_worker_main(Datum main_arg);
 
-extern void bdr_worker_init(uint32 worker_arg);
+extern void bdr_bgworker_init(uint32 worker_arg, BdrWorkerType worker_type);
 extern void bdr_supervisor_register(void);
 
 extern void bdr_sighup(SIGNAL_ARGS);
index 0f7f85bd67a086a96f28342731c8fd4201b939d8..1cabee4b0ba9b8856fb03a2107b4fcb91bbaa30e 100644 (file)
@@ -2401,11 +2401,8 @@ bdr_apply_main(Datum main_arg)
    XLogRecPtr  start_from;
    NameData    slot_name;
 
-   Assert(IsBackgroundWorker);
+   bdr_bgworker_init(DatumGetInt32(main_arg), BDR_WORKER_APPLY);
 
-   bdr_worker_init(DatumGetInt32(main_arg));
-
-   Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
    bdr_apply_worker = &bdr_worker_slot->data.apply;
 
    initStringInfo(&query);
index b98ff2f0b6c0ecca76a671196bdfe02acc5a442c..d67e0fb5e7b4dce997dcfeb2e306f3bfae1e0cd1 100644 (file)
@@ -483,13 +483,11 @@ bdr_perdb_worker_main(Datum main_arg)
    BdrPerdbWorker      *perdb;
    StringInfoData      si;
    bool                wait;
-   BDRNodeInfo        *local_node;
 
    initStringInfo(&si);
 
-   bdr_worker_init(DatumGetInt32(main_arg));
+   bdr_bgworker_init(DatumGetInt32(main_arg), BDR_WORKER_PERDB);
 
-   Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
    perdb = &bdr_worker_slot->data.perdb;
 
    perdb->nnodes = 0;
@@ -522,6 +520,7 @@ bdr_perdb_worker_main(Datum main_arg)
    {
        int             spi_ret;
        MemoryContext   saved_ctx;
+       BDRNodeInfo    *local_node;
 
        /*
         * Check the local bdr.bdr_nodes table to see if there's an entry for
@@ -547,13 +546,15 @@ bdr_perdb_worker_main(Datum main_arg)
 
        SPI_finish();
        CommitTransactionCommand();
-   }
 
-   /*
-    * Do we need to init the local DB from a remote node?
-    */
-   if (local_node->status != 'r')
-       bdr_init_replica(local_node);
+       /*
+        * Do we need to init the local DB from a remote node?
+        */
+       if (local_node->status != 'r')
+           bdr_init_replica(local_node);
+
+       bdr_bdr_node_free(local_node);
+   }
 
    elog(DEBUG1, "Starting bdr apply workers for "BDR_LOCALID_FORMAT" (%s)",
         BDR_LOCALID_FORMAT_ARGS, NameStr(perdb->dbname));
diff --git a/bdr_shmem.c b/bdr_shmem.c
new file mode 100644 (file)
index 0000000..da64946
--- /dev/null
@@ -0,0 +1,307 @@
+/* -------------------------------------------------------------------------
+ *
+ * bdr_shmem.c
+ *     BDR shared memory management
+ *
+ * Copyright (C) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *     bdr_shmem.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_label.h"
+
+#include "miscadmin.h"
+
+#include "postmaster/bgworker.h"
+
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+
+/* shortcut for finding the the worker shmem block */
+BdrWorkerControl *BdrWorkerCtl = NULL;
+
+/* Store kind of BDR worker slot acquired for the current proc */
+BdrWorkerType bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
+
+/* This worker's block within BdrWorkerCtl - only valid in bdr workers */
+BdrWorker  *bdr_worker_slot = NULL;
+
+/* Worker generation number; see bdr_worker_shmem_startup comments */
+static uint16 bdr_worker_generation;
+
+/* shmem init hook to chain to on startup, if any */
+static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
+
+static void bdr_worker_shmem_init(void);
+static void bdr_worker_shmem_startup(void);
+
+void
+bdr_shmem_init(void)
+{
+   /* Initialize segment to keep track of processes involved in bdr. */
+   bdr_worker_shmem_init();
+
+   /* initialize other modules that need shared memory. */
+   bdr_count_shmem_init(bdr_max_workers);
+
+#ifdef BUILDING_BDR
+   bdr_sequencer_shmem_init(bdr_max_workers, bdr_max_databases);
+#endif
+   bdr_locks_shmem_init();
+}
+
+/*
+ * Release resources upon exit of a process that has been involved in BDR
+ * work.
+ *
+ * NB: Has to be safe to execute even if no resources have been acquired - we
+ * don't unregister the before_shmem_exit handler.
+ */
+static void
+bdr_worker_exit(int code, Datum arg)
+{
+   if (bdr_worker_slot == NULL)
+       return;
+
+   bdr_worker_shmem_release();
+}
+
+static size_t
+bdr_worker_shmem_size()
+{
+   Size        size = 0;
+
+   size = add_size(size, sizeof(BdrWorkerControl));
+   size = add_size(size, mul_size(bdr_max_workers, sizeof(BdrWorker)));
+
+   return size;
+}
+
+/*
+ * Allocate a shared memory segment big enough to hold bdr_max_workers entries
+ * in the array of BDR worker info structs (BdrApplyWorker).
+ *
+ * Called during _PG_init, but not during postmaster restart.
+ */
+static void
+bdr_worker_shmem_init(void)
+{
+   Assert(process_shared_preload_libraries_in_progress);
+
+   /*
+    * bdr_worker_shmem_init() only runs on first load, not on postmaster
+    * restart, so set the worker generation here. See
+    * bdr_worker_shmem_startup.
+    *
+    * It starts at 1 because the postmaster zeroes shmem on restart, so 0 can
+    * mean "just restarted, hasn't run shmem setup callback yet".
+    */
+   bdr_worker_generation = 1;
+
+   /* Allocate enough shmem for the worker limit ... */
+   RequestAddinShmemSpace(bdr_worker_shmem_size());
+
+   /*
+    * We'll need to be able to take exclusive locks so only one per-db backend
+    * tries to allocate or free blocks from this array at once.  There won't
+    * be enough contention to make anything fancier worth doing.
+    */
+   RequestAddinLWLocks(1);
+
+   /*
+    * Whether this is a first startup or crash recovery, we'll be re-initing
+    * the bgworkers.
+    */
+   BdrWorkerCtl = NULL;
+
+   prev_shmem_startup_hook = shmem_startup_hook;
+   shmem_startup_hook = bdr_worker_shmem_startup;
+}
+
+/*
+ * Init the header for our shm segment, if not already done.
+ *
+ * Called during postmaster start or restart, in the context of the postmaster.
+ */
+static void
+bdr_worker_shmem_startup(void)
+{
+   bool        found;
+
+   if (prev_shmem_startup_hook != NULL)
+       prev_shmem_startup_hook();
+
+   LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+   BdrWorkerCtl = ShmemInitStruct("bdr_worker",
+                                 bdr_worker_shmem_size(),
+                                 &found);
+   if (!found)
+   {
+       /* Must be in postmaster its self */
+       Assert(IsPostmasterEnvironment && !IsUnderPostmaster);
+
+       /* Init shm segment header after postmaster start or restart */
+       memset(BdrWorkerCtl, 0, bdr_worker_shmem_size());
+       BdrWorkerCtl->lock = LWLockAssign();
+       /* Assigned on supervisor launch */
+       BdrWorkerCtl->supervisor_latch = NULL;
+
+       /*
+        * The postmaster keeps track of a generation number for BDR workers
+        * and increments it at each restart.
+        *
+        * Background workers aren't unregistered when the postmaster restarts
+        * and clears shared memory, so after a restart the supervisor and
+        * per-db workers have no idea what workers are/aren't running, nor any
+        * way to control them. To make a clean BDR restart possible the
+        * workers registered before the restart need to find out about the
+        * restart and terminate.
+        *
+        * To make that possible we pass the generation number to the worker
+        * in its main argument, and also set it in shared memory. The two
+        * must match. If they don't, the worker will proc_exit(0), causing its
+        * self to be unregistered.
+        *
+        * This should really be part of the bgworker API its self, handled via
+        * a BGW_NO_RESTART_ON_CRASH flag or by providing a generation number
+        * as a bgworker argument. However, for now we're stuck with this
+        * workaround.
+        */
+       if (bdr_worker_generation == UINT16_MAX)
+           /* We could handle wrap-around, but really ... */
+           elog(FATAL, "Too many postmaster crash/restart cycles. Restart the PostgreSQL server.");
+
+       BdrWorkerCtl->worker_generation = ++bdr_worker_generation;
+   }
+   LWLockRelease(AddinShmemInitLock);
+
+   /*
+    * We don't have anything to preserve on shutdown and don't support being
+    * unloaded from a running Pg, so don't register any shutdown hook.
+    */
+}
+
+
+/*
+ * Allocate a block from the bdr_worker shm segment in BdrWorkerCtl, or ERROR
+ * if there are no free slots.
+ *
+ * The block is zeroed. The worker type is set in the header.
+ *
+ * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
+ *
+ * To release a block, use bdr_worker_shmem_release(...)
+ *
+ * You must hold BdrWorkerCtl->lock in LW_EXCLUSIVE mode for
+ * this call.
+ */
+BdrWorker*
+bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
+{
+   int i;
+
+   Assert(LWLockHeldByMe(BdrWorkerCtl->lock));
+   for (i = 0; i < bdr_max_workers; i++)
+   {
+       BdrWorker *new_entry = &BdrWorkerCtl->slots[i];
+       if (new_entry->worker_type == BDR_WORKER_EMPTY_SLOT)
+       {
+           memset(new_entry, 0, sizeof(BdrWorker));
+           new_entry->worker_type = worker_type;
+           if (ctl_idx)
+               *ctl_idx = i;
+           return new_entry;
+       }
+   }
+   ereport(ERROR,
+           (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+           errmsg("No free bdr worker slots - bdr.max_workers is too low")));
+   /* unreachable */
+}
+
+/*
+ * Release a block allocated by bdr_worker_shmem_alloc so it can be
+ * re-used.
+ *
+ * The bgworker *must* no longer be running.
+ *
+ * If passed, the bgworker handle is checked to ensure the worker
+ * is not still running before the slot is released.
+ */
+void
+bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle)
+{
+   LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+
+   /* Already free? Do nothing */
+   if (worker->worker_type != BDR_WORKER_EMPTY_SLOT)
+   {
+       /* Sanity check - ensure any associated dynamic bgworker is stopped */
+       if (handle)
+       {
+           pid_t pid;
+           BgwHandleStatus status;
+           status = GetBackgroundWorkerPid(handle, &pid);
+           if (status == BGWH_STARTED)
+           {
+               LWLockRelease(BdrWorkerCtl->lock);
+               elog(ERROR, "BUG: Attempt to release shm segment for bdr worker type=%d pid=%d that's still alive",
+                    worker->worker_type, pid);
+           }
+       }
+
+       /* Mark it as free */
+       worker->worker_type = BDR_WORKER_EMPTY_SLOT;
+       /* and for good measure, zero it so problems are seen immediately */
+       memset(worker, 0, sizeof(BdrWorker));
+   }
+   LWLockRelease(BdrWorkerCtl->lock);
+}
+
+/*
+ * Mark this process as using one of the slots created by
+ * bdr_worker_shmem_alloc().
+ */
+void
+bdr_worker_shmem_acquire(BdrWorkerType worker_type, uint32 worker_idx)
+{
+   BdrWorker *worker;
+
+   /* can't acquire if we already have one */
+   Assert(bdr_worker_type == BDR_WORKER_EMPTY_SLOT);
+   Assert(bdr_worker_slot == NULL);
+
+   worker = &BdrWorkerCtl->slots[worker_idx];
+
+   /* ensure type is correct, before acquiring the slot */
+   if (worker->worker_type != worker_type)
+       elog(FATAL, "mismatch in worker state, got %u, expected %u",
+            worker->worker_type, worker_type);
+
+   /* then acquire worker slot */
+   bdr_worker_slot = worker;
+   bdr_worker_type = worker->worker_type;
+
+   /* register release function */
+   before_shmem_exit(bdr_worker_exit, 0);
+}
+
+/*
+ * Relase shmem slot acquired by bdr_worker_shmem_acquire().
+ */
+void
+bdr_worker_shmem_release(void)
+{
+   LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+   bdr_worker_slot->worker_pid = 0;
+   bdr_worker_slot->worker_proc = NULL;
+   LWLockRelease(BdrWorkerCtl->lock);
+
+   bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
+}