bdr_output.o \
bdr_relcache.o \
bdr_remotecalls.o \
+ bdr_shmem.o \
bdr_supervisor.o \
bdr_upgrade.o
bool bdr_skip_ddl_locking;
bool bdr_do_not_replicate;
-/* shmem init hook to chain to on startup, if any */
-static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
-
-/* Store kind of BDR worker for the current proc, mainly for debugging */
-BdrWorkerType bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
-
-/* shortcut for finding the the worker shmem block */
-BdrWorkerControl *BdrWorkerCtl = NULL;
-
-/* This worker's block within BdrWorkerCtl - only valid in bdr workers */
-BdrWorker *bdr_worker_slot = NULL;
-
-/* Worker generation number; see bdr_worker_shmem_startup comments */
-static uint16 bdr_worker_generation;
-
-
PG_MODULE_MAGIC;
void _PG_init(void);
-static void bdr_worker_shmem_startup(void);
PGDLLEXPORT Datum bdr_apply_pause(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum bdr_apply_resume(PG_FUNCTION_ARGS);
PQclear(res);
}
-static void
-bdr_worker_exit(int code, Datum arg)
-{
- if (bdr_worker_slot == NULL)
- return;
-
- LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
- bdr_worker_slot->worker_pid = 0;
- bdr_worker_slot->worker_proc = NULL;
- LWLockRelease(BdrWorkerCtl->lock);
-
- bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
-}
-
/*
* Perform setup work common to all bdr worker types, such as:
*
*
*/
void
-bdr_worker_init(uint32 worker_arg)
+bdr_bgworker_init(uint32 worker_arg, BdrWorkerType worker_type)
{
uint16 worker_generation;
uint16 worker_idx;
char *dbname;
+ Assert(IsBackgroundWorker);
+
worker_generation = (uint16)(worker_arg >> 16);
worker_idx = (uint16)(worker_arg & 0x0000FFFF);
+ bdr_worker_shmem_acquire(worker_type, worker_idx);
+
if (worker_generation != BdrWorkerCtl->worker_generation)
{
elog(DEBUG1, "apply worker from generation %d exiting after finding shmem generation is %d",
proc_exit(0);
}
- Assert(IsBackgroundWorker);
-
- /* first acquire worker slot */
- bdr_worker_slot = &BdrWorkerCtl->slots[worker_idx];
- bdr_worker_type = bdr_worker_slot->worker_type;
-
- /* register release function */
- before_shmem_exit(bdr_worker_exit, 0);
-
/* figure out database to connect to */
- if (bdr_worker_type == BDR_WORKER_PERDB)
+ if (worker_type == BDR_WORKER_PERDB)
dbname = NameStr(bdr_worker_slot->data.perdb.dbname);
- else if (bdr_worker_type == BDR_WORKER_APPLY)
+ else if (worker_type == BDR_WORKER_APPLY)
{
BdrApplyWorker *apply;
BdrPerdbWorker *perdb;
return streamConn;
}
-static size_t
-bdr_worker_shmem_size()
-{
- Size size = 0;
-
- size = add_size(size, sizeof(BdrWorkerControl));
- size = add_size(size, mul_size(bdr_max_workers, sizeof(BdrWorker)));
-
- return size;
-}
-
-/*
- * Allocate a shared memory segment big enough to hold bdr_max_workers entries
- * in the array of BDR worker info structs (BdrApplyWorker).
- *
- * Called during _PG_init, but not during postmaster restart.
- */
-static void
-bdr_worker_alloc_shmem_segment()
-{
- Assert(process_shared_preload_libraries_in_progress);
-
- /* Allocate enough shmem for the worker limit ... */
- RequestAddinShmemSpace(bdr_worker_shmem_size());
-
- /*
- * We'll need to be able to take exclusive locks so only one per-db backend
- * tries to allocate or free blocks from this array at once. There won't
- * be enough contention to make anything fancier worth doing.
- */
- RequestAddinLWLocks(1);
-
- /*
- * Whether this is a first startup or crash recovery, we'll be re-initing
- * the bgworkers.
- */
- BdrWorkerCtl = NULL;
-
- prev_shmem_startup_hook = shmem_startup_hook;
- shmem_startup_hook = bdr_worker_shmem_startup;
-}
-
-/*
- * Init the header for our shm segment, if not already done.
- *
- * Called during postmaster start or restart, in the context of the postmaster.
- */
-static void
-bdr_worker_shmem_startup(void)
-{
- bool found;
-
- if (prev_shmem_startup_hook != NULL)
- prev_shmem_startup_hook();
-
- LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
- BdrWorkerCtl = ShmemInitStruct("bdr_worker",
- bdr_worker_shmem_size(),
- &found);
- if (!found)
- {
- /* Must be in postmaster its self */
- Assert(IsPostmasterEnvironment && !IsUnderPostmaster);
-
- /* Init shm segment header after postmaster start or restart */
- memset(BdrWorkerCtl, 0, bdr_worker_shmem_size());
- BdrWorkerCtl->lock = LWLockAssign();
- /* Assigned on supervisor launch */
- BdrWorkerCtl->supervisor_latch = NULL;
-
- /*
- * The postmaster keeps track of a generation number for BDR workers
- * and increments it at each restart.
- *
- * Background workers aren't unregistered when the postmaster restarts
- * and clears shared memory, so after a restart the supervisor and
- * per-db workers have no idea what workers are/aren't running, nor any
- * way to control them. To make a clean BDR restart possible the
- * workers registered before the restart need to find out about the
- * restart and terminate.
- *
- * To make that possible we pass the generation number to the worker
- * in its main argument, and also set it in shared memory. The two
- * must match. If they don't, the worker will proc_exit(0), causing its
- * self to be unregistered.
- *
- * This should really be part of the bgworker API its self, handled via
- * a BGW_NO_RESTART_ON_CRASH flag or by providing a generation number
- * as a bgworker argument. However, for now we're stuck with this
- * workaround.
- */
- if (bdr_worker_generation == UINT16_MAX)
- /* We could handle wrap-around, but really ... */
- elog(FATAL, "Too many postmaster crash/restart cycles. Restart the PostgreSQL server.");
-
- BdrWorkerCtl->worker_generation = ++bdr_worker_generation;
- }
- LWLockRelease(AddinShmemInitLock);
-
- /*
- * We don't have anything to preserve on shutdown and don't support being
- * unloaded from a running Pg, so don't register any shutdown hook.
- */
-}
-
-
-/*
- * Allocate a block from the bdr_worker shm segment in BdrWorkerCtl, or ERROR
- * if there are no free slots.
- *
- * The block is zeroed. The worker type is set in the header.
- *
- * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
- *
- * To release a block, use bdr_worker_shmem_release(...)
- *
- * You must hold BdrWorkerCtl->lock in LW_EXCLUSIVE mode for
- * this call.
- */
-BdrWorker*
-bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
-{
- int i;
-
- Assert(LWLockHeldByMe(BdrWorkerCtl->lock));
- for (i = 0; i < bdr_max_workers; i++)
- {
- BdrWorker *new_entry = &BdrWorkerCtl->slots[i];
- if (new_entry->worker_type == BDR_WORKER_EMPTY_SLOT)
- {
- memset(new_entry, 0, sizeof(BdrWorker));
- new_entry->worker_type = worker_type;
- if (ctl_idx)
- *ctl_idx = i;
- return new_entry;
- }
- }
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("No free bdr worker slots - bdr.max_workers is too low")));
- /* unreachable */
-}
-
-/*
- * Release a block allocated by bdr_worker_shmem_alloc so it can be
- * re-used.
- *
- * The bgworker *must* no longer be running.
- *
- * If passed, the bgworker handle is checked to ensure the worker
- * is not still running before the slot is released.
- */
-void
-bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle)
-{
- LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
-
- /* Already free? Do nothing */
- if (worker->worker_type != BDR_WORKER_EMPTY_SLOT)
- {
- /* Sanity check - ensure any associated dynamic bgworker is stopped */
- if (handle)
- {
- pid_t pid;
- BgwHandleStatus status;
- status = GetBackgroundWorkerPid(handle, &pid);
- if (status == BGWH_STARTED)
- {
- LWLockRelease(BdrWorkerCtl->lock);
- elog(ERROR, "BUG: Attempt to release shm segment for bdr worker type=%d pid=%d that's still alive",
- worker->worker_type, pid);
- }
- }
-
- /* Mark it as free */
- worker->worker_type = BDR_WORKER_EMPTY_SLOT;
- /* and for good measure, zero it so problems are seen immediately */
- memset(worker, 0, sizeof(BdrWorker));
- }
- LWLockRelease(BdrWorkerCtl->lock);
-}
-
static bool
bdr_do_not_replicate_check_hook(bool *newvalue, void **extra, GucSource source)
{
errmsg("bdr requires \"track_commit_timestamp\" to be enabled")));
#endif
- /*
- * _PG_init only runs on first load, not on postmaster restart, so
- * set the worker generation here. See bdr_worker_shmem_startup.
- *
- * It starts at 1 because the postmaster zeroes shmem on restart, so 0 can
- * mean "just restarted, hasn't run shmem setup callback yet".
- */
- bdr_worker_generation = 1;
-
/*
* Force btree_gist to be loaded - its absolutely not required at this
* point, but since it's required for BDR to be used it's much easier to
}
/*
- * Allocate a shared memory segment to store the bgworker connection
- * information we must pass to each worker we launch.
- *
- * This registers a hook on shm initialization, bdr_worker_shmem_startup(),
- * which populates the shm segment with configured apply workers using data
- * in bdr_connection_configs.
+ * Reserve shared memory segment to store bgworker connection information
+ * and hook into shmem initialization.
*/
- bdr_worker_alloc_shmem_segment();
+ bdr_shmem_init();
EmitWarningsOnPlaceholders("bdr");
- /*
- * initialize other modules that need shared memory
- */
-
- /* register a slot for every remote node */
- bdr_count_shmem_init(bdr_max_workers);
bdr_executor_init();
-#ifdef BUILDING_BDR
- bdr_sequencer_shmem_init(bdr_max_workers, bdr_max_databases);
-#endif
- bdr_locks_shmem_init();
+
/* Set up a ProcessUtility_hook to stop unsupported commands being run */
init_bdr_commandfilter();
/* initialize a new bdr member */
extern void bdr_init_replica(BDRNodeInfo *local_node);
-/* shared memory management */
extern void bdr_maintain_schema(bool update_extensions);
+
+/* shared memory management */
+extern void bdr_shmem_init(void);
+
extern BdrWorker* bdr_worker_shmem_alloc(BdrWorkerType worker_type,
uint32 *ctl_idx);
extern void bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle);
-
-extern BdrWorker* bdr_worker_shmem_acquire(uint32 *ctl_idx);
+extern void bdr_worker_shmem_acquire(BdrWorkerType worker_type,
+ uint32 worker_idx);
+extern void bdr_worker_shmem_release(void);
extern bool bdr_is_bdr_activated_db(Oid dboid);
PGDLLEXPORT extern void bdr_perdb_worker_main(Datum main_arg);
PGDLLEXPORT extern void bdr_supervisor_worker_main(Datum main_arg);
-extern void bdr_worker_init(uint32 worker_arg);
+extern void bdr_bgworker_init(uint32 worker_arg, BdrWorkerType worker_type);
extern void bdr_supervisor_register(void);
extern void bdr_sighup(SIGNAL_ARGS);
XLogRecPtr start_from;
NameData slot_name;
- Assert(IsBackgroundWorker);
+ bdr_bgworker_init(DatumGetInt32(main_arg), BDR_WORKER_APPLY);
- bdr_worker_init(DatumGetInt32(main_arg));
-
- Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
bdr_apply_worker = &bdr_worker_slot->data.apply;
initStringInfo(&query);
BdrPerdbWorker *perdb;
StringInfoData si;
bool wait;
- BDRNodeInfo *local_node;
initStringInfo(&si);
- bdr_worker_init(DatumGetInt32(main_arg));
+ bdr_bgworker_init(DatumGetInt32(main_arg), BDR_WORKER_PERDB);
- Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
perdb = &bdr_worker_slot->data.perdb;
perdb->nnodes = 0;
{
int spi_ret;
MemoryContext saved_ctx;
+ BDRNodeInfo *local_node;
/*
* Check the local bdr.bdr_nodes table to see if there's an entry for
SPI_finish();
CommitTransactionCommand();
- }
- /*
- * Do we need to init the local DB from a remote node?
- */
- if (local_node->status != 'r')
- bdr_init_replica(local_node);
+ /*
+ * Do we need to init the local DB from a remote node?
+ */
+ if (local_node->status != 'r')
+ bdr_init_replica(local_node);
+
+ bdr_bdr_node_free(local_node);
+ }
elog(DEBUG1, "Starting bdr apply workers for "BDR_LOCALID_FORMAT" (%s)",
BDR_LOCALID_FORMAT_ARGS, NameStr(perdb->dbname));
--- /dev/null
+/* -------------------------------------------------------------------------
+ *
+ * bdr_shmem.c
+ * BDR shared memory management
+ *
+ * Copyright (C) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * bdr_shmem.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_label.h"
+
+#include "miscadmin.h"
+
+#include "postmaster/bgworker.h"
+
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+
+/* shortcut for finding the the worker shmem block */
+BdrWorkerControl *BdrWorkerCtl = NULL;
+
+/* Store kind of BDR worker slot acquired for the current proc */
+BdrWorkerType bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
+
+/* This worker's block within BdrWorkerCtl - only valid in bdr workers */
+BdrWorker *bdr_worker_slot = NULL;
+
+/* Worker generation number; see bdr_worker_shmem_startup comments */
+static uint16 bdr_worker_generation;
+
+/* shmem init hook to chain to on startup, if any */
+static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
+
+static void bdr_worker_shmem_init(void);
+static void bdr_worker_shmem_startup(void);
+
+void
+bdr_shmem_init(void)
+{
+ /* Initialize segment to keep track of processes involved in bdr. */
+ bdr_worker_shmem_init();
+
+ /* initialize other modules that need shared memory. */
+ bdr_count_shmem_init(bdr_max_workers);
+
+#ifdef BUILDING_BDR
+ bdr_sequencer_shmem_init(bdr_max_workers, bdr_max_databases);
+#endif
+ bdr_locks_shmem_init();
+}
+
+/*
+ * Release resources upon exit of a process that has been involved in BDR
+ * work.
+ *
+ * NB: Has to be safe to execute even if no resources have been acquired - we
+ * don't unregister the before_shmem_exit handler.
+ */
+static void
+bdr_worker_exit(int code, Datum arg)
+{
+ if (bdr_worker_slot == NULL)
+ return;
+
+ bdr_worker_shmem_release();
+}
+
+static size_t
+bdr_worker_shmem_size()
+{
+ Size size = 0;
+
+ size = add_size(size, sizeof(BdrWorkerControl));
+ size = add_size(size, mul_size(bdr_max_workers, sizeof(BdrWorker)));
+
+ return size;
+}
+
+/*
+ * Allocate a shared memory segment big enough to hold bdr_max_workers entries
+ * in the array of BDR worker info structs (BdrApplyWorker).
+ *
+ * Called during _PG_init, but not during postmaster restart.
+ */
+static void
+bdr_worker_shmem_init(void)
+{
+ Assert(process_shared_preload_libraries_in_progress);
+
+ /*
+ * bdr_worker_shmem_init() only runs on first load, not on postmaster
+ * restart, so set the worker generation here. See
+ * bdr_worker_shmem_startup.
+ *
+ * It starts at 1 because the postmaster zeroes shmem on restart, so 0 can
+ * mean "just restarted, hasn't run shmem setup callback yet".
+ */
+ bdr_worker_generation = 1;
+
+ /* Allocate enough shmem for the worker limit ... */
+ RequestAddinShmemSpace(bdr_worker_shmem_size());
+
+ /*
+ * We'll need to be able to take exclusive locks so only one per-db backend
+ * tries to allocate or free blocks from this array at once. There won't
+ * be enough contention to make anything fancier worth doing.
+ */
+ RequestAddinLWLocks(1);
+
+ /*
+ * Whether this is a first startup or crash recovery, we'll be re-initing
+ * the bgworkers.
+ */
+ BdrWorkerCtl = NULL;
+
+ prev_shmem_startup_hook = shmem_startup_hook;
+ shmem_startup_hook = bdr_worker_shmem_startup;
+}
+
+/*
+ * Init the header for our shm segment, if not already done.
+ *
+ * Called during postmaster start or restart, in the context of the postmaster.
+ */
+static void
+bdr_worker_shmem_startup(void)
+{
+ bool found;
+
+ if (prev_shmem_startup_hook != NULL)
+ prev_shmem_startup_hook();
+
+ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+ BdrWorkerCtl = ShmemInitStruct("bdr_worker",
+ bdr_worker_shmem_size(),
+ &found);
+ if (!found)
+ {
+ /* Must be in postmaster its self */
+ Assert(IsPostmasterEnvironment && !IsUnderPostmaster);
+
+ /* Init shm segment header after postmaster start or restart */
+ memset(BdrWorkerCtl, 0, bdr_worker_shmem_size());
+ BdrWorkerCtl->lock = LWLockAssign();
+ /* Assigned on supervisor launch */
+ BdrWorkerCtl->supervisor_latch = NULL;
+
+ /*
+ * The postmaster keeps track of a generation number for BDR workers
+ * and increments it at each restart.
+ *
+ * Background workers aren't unregistered when the postmaster restarts
+ * and clears shared memory, so after a restart the supervisor and
+ * per-db workers have no idea what workers are/aren't running, nor any
+ * way to control them. To make a clean BDR restart possible the
+ * workers registered before the restart need to find out about the
+ * restart and terminate.
+ *
+ * To make that possible we pass the generation number to the worker
+ * in its main argument, and also set it in shared memory. The two
+ * must match. If they don't, the worker will proc_exit(0), causing its
+ * self to be unregistered.
+ *
+ * This should really be part of the bgworker API its self, handled via
+ * a BGW_NO_RESTART_ON_CRASH flag or by providing a generation number
+ * as a bgworker argument. However, for now we're stuck with this
+ * workaround.
+ */
+ if (bdr_worker_generation == UINT16_MAX)
+ /* We could handle wrap-around, but really ... */
+ elog(FATAL, "Too many postmaster crash/restart cycles. Restart the PostgreSQL server.");
+
+ BdrWorkerCtl->worker_generation = ++bdr_worker_generation;
+ }
+ LWLockRelease(AddinShmemInitLock);
+
+ /*
+ * We don't have anything to preserve on shutdown and don't support being
+ * unloaded from a running Pg, so don't register any shutdown hook.
+ */
+}
+
+
+/*
+ * Allocate a block from the bdr_worker shm segment in BdrWorkerCtl, or ERROR
+ * if there are no free slots.
+ *
+ * The block is zeroed. The worker type is set in the header.
+ *
+ * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
+ *
+ * To release a block, use bdr_worker_shmem_release(...)
+ *
+ * You must hold BdrWorkerCtl->lock in LW_EXCLUSIVE mode for
+ * this call.
+ */
+BdrWorker*
+bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
+{
+ int i;
+
+ Assert(LWLockHeldByMe(BdrWorkerCtl->lock));
+ for (i = 0; i < bdr_max_workers; i++)
+ {
+ BdrWorker *new_entry = &BdrWorkerCtl->slots[i];
+ if (new_entry->worker_type == BDR_WORKER_EMPTY_SLOT)
+ {
+ memset(new_entry, 0, sizeof(BdrWorker));
+ new_entry->worker_type = worker_type;
+ if (ctl_idx)
+ *ctl_idx = i;
+ return new_entry;
+ }
+ }
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("No free bdr worker slots - bdr.max_workers is too low")));
+ /* unreachable */
+}
+
+/*
+ * Release a block allocated by bdr_worker_shmem_alloc so it can be
+ * re-used.
+ *
+ * The bgworker *must* no longer be running.
+ *
+ * If passed, the bgworker handle is checked to ensure the worker
+ * is not still running before the slot is released.
+ */
+void
+bdr_worker_shmem_free(BdrWorker* worker, BackgroundWorkerHandle *handle)
+{
+ LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+
+ /* Already free? Do nothing */
+ if (worker->worker_type != BDR_WORKER_EMPTY_SLOT)
+ {
+ /* Sanity check - ensure any associated dynamic bgworker is stopped */
+ if (handle)
+ {
+ pid_t pid;
+ BgwHandleStatus status;
+ status = GetBackgroundWorkerPid(handle, &pid);
+ if (status == BGWH_STARTED)
+ {
+ LWLockRelease(BdrWorkerCtl->lock);
+ elog(ERROR, "BUG: Attempt to release shm segment for bdr worker type=%d pid=%d that's still alive",
+ worker->worker_type, pid);
+ }
+ }
+
+ /* Mark it as free */
+ worker->worker_type = BDR_WORKER_EMPTY_SLOT;
+ /* and for good measure, zero it so problems are seen immediately */
+ memset(worker, 0, sizeof(BdrWorker));
+ }
+ LWLockRelease(BdrWorkerCtl->lock);
+}
+
+/*
+ * Mark this process as using one of the slots created by
+ * bdr_worker_shmem_alloc().
+ */
+void
+bdr_worker_shmem_acquire(BdrWorkerType worker_type, uint32 worker_idx)
+{
+ BdrWorker *worker;
+
+ /* can't acquire if we already have one */
+ Assert(bdr_worker_type == BDR_WORKER_EMPTY_SLOT);
+ Assert(bdr_worker_slot == NULL);
+
+ worker = &BdrWorkerCtl->slots[worker_idx];
+
+ /* ensure type is correct, before acquiring the slot */
+ if (worker->worker_type != worker_type)
+ elog(FATAL, "mismatch in worker state, got %u, expected %u",
+ worker->worker_type, worker_type);
+
+ /* then acquire worker slot */
+ bdr_worker_slot = worker;
+ bdr_worker_type = worker->worker_type;
+
+ /* register release function */
+ before_shmem_exit(bdr_worker_exit, 0);
+}
+
+/*
+ * Relase shmem slot acquired by bdr_worker_shmem_acquire().
+ */
+void
+bdr_worker_shmem_release(void)
+{
+ LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+ bdr_worker_slot->worker_pid = 0;
+ bdr_worker_slot->worker_proc = NULL;
+ LWLockRelease(BdrWorkerCtl->lock);
+
+ bdr_worker_type = BDR_WORKER_EMPTY_SLOT;
+}