OBJS = \
bdr.o \
bdr_apply.o \
+ bdr_perdb.o \
bdr_catalogs.o \
bdr_conflict_handlers.o \
bdr_conflict_logging.o \
#include "access/heapam.h"
#include "access/xact.h"
-#include "catalog/catversion.h"
#include "catalog/namespace.h"
#include "catalog/pg_extension.h"
#include "libpq/libpq-be.h"
#include "libpq/pqformat.h"
-#include "mb/pg_wchar.h"
-
#include "nodes/execnodes.h"
#include "postmaster/bgworker.h"
return true;
}
-
-/*
- * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on
- * the database identified by dbname.
- *
- * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY
- * with a matching database name and launches them.
- */
-static List*
-bdr_launch_apply_workers(char *dbname)
-{
- List *apply_workers = NIL;
- BackgroundWorker apply_worker;
- int i;
-
- Assert(IsBackgroundWorker);
-
- /* Common apply worker values */
- apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
- BGWORKER_BACKEND_DATABASE_CONNECTION;
- apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
- /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
- apply_worker.bgw_main = bdr_apply_main;
- apply_worker.bgw_restart_time = 5;
- apply_worker.bgw_notify_pid = 0;
-
- /* Launch apply workers */
- LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
- for (i = 0; i < bdr_max_workers; i++)
- {
- BdrWorker *worker = &BdrWorkerCtl->slots[i];
-
- switch(worker->worker_type)
- {
- case BDR_WORKER_APPLY:
- {
- BdrApplyWorker *con = &worker->worker_data.apply_worker;
- BdrConnectionConfig *cfg =
- bdr_connection_configs[con->connection_config_idx];
- Assert(cfg != NULL);
- if ( strcmp(cfg->dbname, dbname) == 0 )
- {
- /* It's an apply worker for our DB; register it */
- BackgroundWorkerHandle *bgw_handle;
-
- if (con->bgw_is_registered)
- /*
- * This worker was registered on a previous pass;
- * this is probably a restart of the per-db worker.
- * Don't register a duplicate.
- */
- continue;
-
- snprintf(apply_worker.bgw_name, BGW_MAXLEN,
- BDR_LOCALID_FORMAT": %s: apply",
- BDR_LOCALID_FORMAT_ARGS, cfg->name);
- apply_worker.bgw_main_arg = Int32GetDatum(i);
-
- if (!RegisterDynamicBackgroundWorker(&apply_worker,
- &bgw_handle))
- {
- ereport(ERROR,
- (errmsg("bdr: Failed to register background worker"
- " %s, see previous log messages",
- cfg->name)));
- }
- /* We've launched this one, don't do it again */
- con->bgw_is_registered = true;
- apply_workers = lcons(bgw_handle, apply_workers);
- }
- }
- break;
- case BDR_WORKER_EMPTY_SLOT:
- case BDR_WORKER_PERDB:
- /* Nothing to do; switch only so we get warnings for insane cases */
- break;
- default:
- /* Bogus value */
- elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?",
- worker->worker_type);
- break;
- }
- }
- LWLockRelease(BdrWorkerCtl->lock);
-
- return apply_workers;
-}
-
-/*
- * Each database with BDR enabled on it has a static background worker,
- * registered at shared_preload_libraries time during postmaster start. This is
- * the entry point for these bgworkers.
- *
- * This worker handles BDR startup on the database and launches apply workers
- * for each BDR connection.
- *
- * Since the worker is fork()ed from the postmaster, all globals initialised in
- * _PG_init remain valid.
- *
- * This worker can use the SPI and shared memory.
- */
-static void
-bdr_perdb_worker_main(Datum main_arg)
-{
- int rc = 0;
- List *apply_workers;
- ListCell *c;
- BdrPerdbWorker *bdr_perdb_worker;
- BdrWorker *bdr_worker_slot;
- StringInfoData si;
- bool wait;
-
- initStringInfo(&si);
-
- Assert(IsBackgroundWorker);
-
- bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
- Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
- bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
- bdr_worker_type = BDR_WORKER_PERDB;
-
- bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
-
- elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS);
-
- appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker");
- SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION);
-
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
- bdr_saved_resowner = CurrentResourceOwner;
-
- /* need to be able to perform writes ourselves */
- bdr_executor_always_allow_writes(true);
- bdr_locks_startup(bdr_perdb_worker->nnodes);
-
- /*
- * Do we need to init the local DB from a remote node?
- *
- * Checks bdr.bdr_nodes.status, does any remote initialization required if
- * there's an init_replica connection, and ensures that
- * bdr.bdr_nodes.status=r for our entry before continuing.
- */
- bdr_init_replica(&bdr_perdb_worker->dbname);
-
- elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
-
- /* Launch the apply workers */
- apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname));
-
- /*
- * For now, just free the bgworker handles. Later we'll probably want them
- * for adding/removing/reconfiguring bgworkers.
- */
- foreach(c, apply_workers)
- {
- BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
- pfree(h);
- }
-
-#ifdef BUILDING_BDR
- elog(DEBUG1, "BDR starting sequencer on db \"%s\"",
- NameStr(bdr_perdb_worker->dbname));
-
- /* initialize sequencer */
- bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes);
-#endif
-
- while (!got_SIGTERM)
- {
- wait = true;
-
- if (got_SIGHUP)
- {
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
-
-#ifdef BUILDING_BDR
- /* check whether we need to start new elections */
- if (bdr_sequencer_start_elections())
- wait = false;
-
- /* check whether we need to vote */
- if (bdr_sequencer_vote())
- wait = false;
-
- /* check whether any of our elections needs to be tallied */
- bdr_sequencer_tally();
-
- /* check all bdr sequences for used up chunks */
- bdr_sequencer_fill_sequences();
-#endif
-
- pgstat_report_activity(STATE_IDLE, NULL);
-
- /*
- * Background workers mustn't call usleep() or any direct equivalent:
- * instead, they may wait on their process latch, which sleeps as
- * necessary, but is awakened if postmaster dies. That way the
- * background process goes away immediately in an emergency.
- *
- * We wake up everytime our latch gets set or if 180 seconds have
- * passed without events. That's a stopgap for the case a backend
- * committed sequencer changes but died before setting the latch.
- */
- if (wait)
- {
- rc = WaitLatch(&MyProc->procLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 180000L);
-
- ResetLatch(&MyProc->procLatch);
-
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
- }
- }
-
- proc_exit(0);
-}
-
static size_t
bdr_worker_shmem_size()
{
/* background workers */
extern void bdr_worker_init(char* dbname);
extern void bdr_apply_main(Datum main_arg);
+extern void bdr_perdb_worker_main(Datum main_arg);
/* manipulation of bdr catalogs */
extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Oid dboid);
--- /dev/null
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_locks.h"
+
+#include "miscadmin.h"
+#include "pgstat.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "postmaster/bgworker.h"
+
+#include "lib/stringinfo.h"
+
+/* For struct Port only! */
+#include "libpq/libpq-be.h"
+
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/ipc.h"
+
+#include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+
+/*
+ * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on
+ * the database identified by dbname.
+ *
+ * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY
+ * with a matching database name and launches them.
+ */
+static List*
+bdr_launch_apply_workers(char *dbname)
+{
+ List *apply_workers = NIL;
+ BackgroundWorker apply_worker;
+ int i;
+
+ Assert(IsBackgroundWorker);
+
+ /* Common apply worker values */
+ apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
+ apply_worker.bgw_main = bdr_apply_main;
+ apply_worker.bgw_restart_time = 5;
+ apply_worker.bgw_notify_pid = 0;
+
+ /* Launch apply workers */
+ LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
+ for (i = 0; i < bdr_max_workers; i++)
+ {
+ BdrWorker *worker = &BdrWorkerCtl->slots[i];
+
+ switch(worker->worker_type)
+ {
+ case BDR_WORKER_APPLY:
+ {
+ BdrApplyWorker *con = &worker->worker_data.apply_worker;
+ BdrConnectionConfig *cfg =
+ bdr_connection_configs[con->connection_config_idx];
+ Assert(cfg != NULL);
+ if ( strcmp(cfg->dbname, dbname) == 0 )
+ {
+ /* It's an apply worker for our DB; register it */
+ BackgroundWorkerHandle *bgw_handle;
+
+ if (con->bgw_is_registered)
+ /*
+ * This worker was registered on a previous pass;
+ * this is probably a restart of the per-db worker.
+ * Don't register a duplicate.
+ */
+ continue;
+
+ snprintf(apply_worker.bgw_name, BGW_MAXLEN,
+ BDR_LOCALID_FORMAT": %s: apply",
+ BDR_LOCALID_FORMAT_ARGS, cfg->name);
+ apply_worker.bgw_main_arg = Int32GetDatum(i);
+
+ if (!RegisterDynamicBackgroundWorker(&apply_worker,
+ &bgw_handle))
+ {
+ ereport(ERROR,
+ (errmsg("bdr: Failed to register background worker"
+ " %s, see previous log messages",
+ cfg->name)));
+ }
+ /* We've launched this one, don't do it again */
+ con->bgw_is_registered = true;
+ apply_workers = lcons(bgw_handle, apply_workers);
+ }
+ }
+ break;
+ case BDR_WORKER_EMPTY_SLOT:
+ case BDR_WORKER_PERDB:
+ /* Nothing to do; switch only so we get warnings for insane cases */
+ break;
+ default:
+ /* Bogus value */
+ elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?",
+ worker->worker_type);
+ break;
+ }
+ }
+ LWLockRelease(BdrWorkerCtl->lock);
+
+ return apply_workers;
+}
+
+/*
+ * Each database with BDR enabled on it has a static background worker,
+ * registered at shared_preload_libraries time during postmaster start. This is
+ * the entry point for these bgworkers.
+ *
+ * This worker handles BDR startup on the database and launches apply workers
+ * for each BDR connection.
+ *
+ * Since the worker is fork()ed from the postmaster, all globals initialised in
+ * _PG_init remain valid.
+ *
+ * This worker can use the SPI and shared memory.
+ */
+void
+bdr_perdb_worker_main(Datum main_arg)
+{
+ int rc = 0;
+ List *apply_workers;
+ ListCell *c;
+ BdrPerdbWorker *bdr_perdb_worker;
+ BdrWorker *bdr_worker_slot;
+ StringInfoData si;
+ bool wait;
+
+ initStringInfo(&si);
+
+ Assert(IsBackgroundWorker);
+
+ bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+ Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
+ bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
+ bdr_worker_type = BDR_WORKER_PERDB;
+
+ bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
+
+ elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS);
+
+ appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker");
+ SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION);
+
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
+ bdr_saved_resowner = CurrentResourceOwner;
+
+ /* need to be able to perform writes ourselves */
+ bdr_executor_always_allow_writes(true);
+ bdr_locks_startup(bdr_perdb_worker->nnodes);
+
+ /*
+ * Do we need to init the local DB from a remote node?
+ *
+ * Checks bdr.bdr_nodes.status, does any remote initialization required if
+ * there's an init_replica connection, and ensures that
+ * bdr.bdr_nodes.status=r for our entry before continuing.
+ */
+ bdr_init_replica(&bdr_perdb_worker->dbname);
+
+ elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
+
+ /* Launch the apply workers */
+ apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname));
+
+ /*
+ * For now, just free the bgworker handles. Later we'll probably want them
+ * for adding/removing/reconfiguring bgworkers.
+ */
+ foreach(c, apply_workers)
+ {
+ BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
+ pfree(h);
+ }
+
+#ifdef BUILDING_BDR
+ elog(DEBUG1, "BDR starting sequencer on db \"%s\"",
+ NameStr(bdr_perdb_worker->dbname));
+
+ /* initialize sequencer */
+ bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes);
+#endif
+
+ while (!got_SIGTERM)
+ {
+ wait = true;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+#ifdef BUILDING_BDR
+ /* check whether we need to start new elections */
+ if (bdr_sequencer_start_elections())
+ wait = false;
+
+ /* check whether we need to vote */
+ if (bdr_sequencer_vote())
+ wait = false;
+
+ /* check whether any of our elections needs to be tallied */
+ bdr_sequencer_tally();
+
+ /* check all bdr sequences for used up chunks */
+ bdr_sequencer_fill_sequences();
+#endif
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+
+ /*
+ * Background workers mustn't call usleep() or any direct equivalent:
+ * instead, they may wait on their process latch, which sleeps as
+ * necessary, but is awakened if postmaster dies. That way the
+ * background process goes away immediately in an emergency.
+ *
+ * We wake up everytime our latch gets set or if 180 seconds have
+ * passed without events. That's a stopgap for the case a backend
+ * committed sequencer changes but died before setting the latch.
+ */
+ if (wait)
+ {
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 180000L);
+
+ ResetLatch(&MyProc->procLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
+ }
+
+ proc_exit(0);
+}