From: Craig Ringer Date: Tue, 16 Dec 2014 15:26:47 +0000 (+0800) Subject: bdr: move perdb_worker_main to new bdr_perdb.c X-Git-Tag: bdr-plugin/0.8.0beta1~10 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=55c46f469b6fd0798e9cda6d1bc190a98aea06c3;p=2ndquadrant_bdr.git bdr: move perdb_worker_main to new bdr_perdb.c --- diff --git a/Makefile.in b/Makefile.in index e1a548ce98..8adbb34eb5 100644 --- a/Makefile.in +++ b/Makefile.in @@ -25,6 +25,7 @@ SHLIB_LINK = $(libpq) OBJS = \ bdr.o \ bdr_apply.o \ + bdr_perdb.o \ bdr_catalogs.o \ bdr_conflict_handlers.o \ bdr_conflict_logging.o \ diff --git a/bdr.c b/bdr.c index 11a1b66552..1e62196700 100644 --- a/bdr.c +++ b/bdr.c @@ -30,7 +30,6 @@ #include "access/heapam.h" #include "access/xact.h" -#include "catalog/catversion.h" #include "catalog/namespace.h" #include "catalog/pg_extension.h" @@ -42,8 +41,6 @@ #include "libpq/libpq-be.h" #include "libpq/pqformat.h" -#include "mb/pg_wchar.h" - #include "nodes/execnodes.h" #include "postmaster/bgworker.h" @@ -726,228 +723,6 @@ bdr_create_con_gucs(char *name, return true; } - -/* - * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on - * the database identified by dbname. - * - * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY - * with a matching database name and launches them. - */ -static List* -bdr_launch_apply_workers(char *dbname) -{ - List *apply_workers = NIL; - BackgroundWorker apply_worker; - int i; - - Assert(IsBackgroundWorker); - - /* Common apply worker values */ - apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS | - BGWORKER_BACKEND_DATABASE_CONNECTION; - apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */ - apply_worker.bgw_main = bdr_apply_main; - apply_worker.bgw_restart_time = 5; - apply_worker.bgw_notify_pid = 0; - - /* Launch apply workers */ - LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED); - for (i = 0; i < bdr_max_workers; i++) - { - BdrWorker *worker = &BdrWorkerCtl->slots[i]; - - switch(worker->worker_type) - { - case BDR_WORKER_APPLY: - { - BdrApplyWorker *con = &worker->worker_data.apply_worker; - BdrConnectionConfig *cfg = - bdr_connection_configs[con->connection_config_idx]; - Assert(cfg != NULL); - if ( strcmp(cfg->dbname, dbname) == 0 ) - { - /* It's an apply worker for our DB; register it */ - BackgroundWorkerHandle *bgw_handle; - - if (con->bgw_is_registered) - /* - * This worker was registered on a previous pass; - * this is probably a restart of the per-db worker. - * Don't register a duplicate. - */ - continue; - - snprintf(apply_worker.bgw_name, BGW_MAXLEN, - BDR_LOCALID_FORMAT": %s: apply", - BDR_LOCALID_FORMAT_ARGS, cfg->name); - apply_worker.bgw_main_arg = Int32GetDatum(i); - - if (!RegisterDynamicBackgroundWorker(&apply_worker, - &bgw_handle)) - { - ereport(ERROR, - (errmsg("bdr: Failed to register background worker" - " %s, see previous log messages", - cfg->name))); - } - /* We've launched this one, don't do it again */ - con->bgw_is_registered = true; - apply_workers = lcons(bgw_handle, apply_workers); - } - } - break; - case BDR_WORKER_EMPTY_SLOT: - case BDR_WORKER_PERDB: - /* Nothing to do; switch only so we get warnings for insane cases */ - break; - default: - /* Bogus value */ - elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?", - worker->worker_type); - break; - } - } - LWLockRelease(BdrWorkerCtl->lock); - - return apply_workers; -} - -/* - * Each database with BDR enabled on it has a static background worker, - * registered at shared_preload_libraries time during postmaster start. This is - * the entry point for these bgworkers. - * - * This worker handles BDR startup on the database and launches apply workers - * for each BDR connection. - * - * Since the worker is fork()ed from the postmaster, all globals initialised in - * _PG_init remain valid. - * - * This worker can use the SPI and shared memory. - */ -static void -bdr_perdb_worker_main(Datum main_arg) -{ - int rc = 0; - List *apply_workers; - ListCell *c; - BdrPerdbWorker *bdr_perdb_worker; - BdrWorker *bdr_worker_slot; - StringInfoData si; - bool wait; - - initStringInfo(&si); - - Assert(IsBackgroundWorker); - - bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ]; - Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB); - bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker; - bdr_worker_type = BDR_WORKER_PERDB; - - bdr_worker_init(NameStr(bdr_perdb_worker->dbname)); - - elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS); - - appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker"); - SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION); - - CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner"); - bdr_saved_resowner = CurrentResourceOwner; - - /* need to be able to perform writes ourselves */ - bdr_executor_always_allow_writes(true); - bdr_locks_startup(bdr_perdb_worker->nnodes); - - /* - * Do we need to init the local DB from a remote node? - * - * Checks bdr.bdr_nodes.status, does any remote initialization required if - * there's an init_replica connection, and ensures that - * bdr.bdr_nodes.status=r for our entry before continuing. - */ - bdr_init_replica(&bdr_perdb_worker->dbname); - - elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname)); - - /* Launch the apply workers */ - apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname)); - - /* - * For now, just free the bgworker handles. Later we'll probably want them - * for adding/removing/reconfiguring bgworkers. - */ - foreach(c, apply_workers) - { - BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c); - pfree(h); - } - -#ifdef BUILDING_BDR - elog(DEBUG1, "BDR starting sequencer on db \"%s\"", - NameStr(bdr_perdb_worker->dbname)); - - /* initialize sequencer */ - bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes); -#endif - - while (!got_SIGTERM) - { - wait = true; - - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } - -#ifdef BUILDING_BDR - /* check whether we need to start new elections */ - if (bdr_sequencer_start_elections()) - wait = false; - - /* check whether we need to vote */ - if (bdr_sequencer_vote()) - wait = false; - - /* check whether any of our elections needs to be tallied */ - bdr_sequencer_tally(); - - /* check all bdr sequences for used up chunks */ - bdr_sequencer_fill_sequences(); -#endif - - pgstat_report_activity(STATE_IDLE, NULL); - - /* - * Background workers mustn't call usleep() or any direct equivalent: - * instead, they may wait on their process latch, which sleeps as - * necessary, but is awakened if postmaster dies. That way the - * background process goes away immediately in an emergency. - * - * We wake up everytime our latch gets set or if 180 seconds have - * passed without events. That's a stopgap for the case a backend - * committed sequencer changes but died before setting the latch. - */ - if (wait) - { - rc = WaitLatch(&MyProc->procLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 180000L); - - ResetLatch(&MyProc->procLatch); - - /* emergency bailout if postmaster has died */ - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - } - } - - proc_exit(0); -} - static size_t bdr_worker_shmem_size() { diff --git a/bdr.h b/bdr.h index 3122775d32..19ba087bef 100644 --- a/bdr.h +++ b/bdr.h @@ -422,6 +422,7 @@ extern void bdr_locks_check_query(void); /* background workers */ extern void bdr_worker_init(char* dbname); extern void bdr_apply_main(Datum main_arg); +extern void bdr_perdb_worker_main(Datum main_arg); /* manipulation of bdr catalogs */ extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Oid dboid); diff --git a/bdr_perdb.c b/bdr_perdb.c new file mode 100644 index 0000000000..2fcfa733e5 --- /dev/null +++ b/bdr_perdb.c @@ -0,0 +1,254 @@ +#include "postgres.h" + +#include "bdr.h" +#include "bdr_locks.h" + +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" + +#include "catalog/pg_type.h" + +#include "commands/dbcommands.h" + +#include "executor/spi.h" + +#include "postmaster/bgworker.h" + +#include "lib/stringinfo.h" + +/* For struct Port only! */ +#include "libpq/libpq-be.h" + +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/ipc.h" + +#include "utils/builtins.h" +#include "utils/elog.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + +/* + * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on + * the database identified by dbname. + * + * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY + * with a matching database name and launches them. + */ +static List* +bdr_launch_apply_workers(char *dbname) +{ + List *apply_workers = NIL; + BackgroundWorker apply_worker; + int i; + + Assert(IsBackgroundWorker); + + /* Common apply worker values */ + apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */ + apply_worker.bgw_main = bdr_apply_main; + apply_worker.bgw_restart_time = 5; + apply_worker.bgw_notify_pid = 0; + + /* Launch apply workers */ + LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED); + for (i = 0; i < bdr_max_workers; i++) + { + BdrWorker *worker = &BdrWorkerCtl->slots[i]; + + switch(worker->worker_type) + { + case BDR_WORKER_APPLY: + { + BdrApplyWorker *con = &worker->worker_data.apply_worker; + BdrConnectionConfig *cfg = + bdr_connection_configs[con->connection_config_idx]; + Assert(cfg != NULL); + if ( strcmp(cfg->dbname, dbname) == 0 ) + { + /* It's an apply worker for our DB; register it */ + BackgroundWorkerHandle *bgw_handle; + + if (con->bgw_is_registered) + /* + * This worker was registered on a previous pass; + * this is probably a restart of the per-db worker. + * Don't register a duplicate. + */ + continue; + + snprintf(apply_worker.bgw_name, BGW_MAXLEN, + BDR_LOCALID_FORMAT": %s: apply", + BDR_LOCALID_FORMAT_ARGS, cfg->name); + apply_worker.bgw_main_arg = Int32GetDatum(i); + + if (!RegisterDynamicBackgroundWorker(&apply_worker, + &bgw_handle)) + { + ereport(ERROR, + (errmsg("bdr: Failed to register background worker" + " %s, see previous log messages", + cfg->name))); + } + /* We've launched this one, don't do it again */ + con->bgw_is_registered = true; + apply_workers = lcons(bgw_handle, apply_workers); + } + } + break; + case BDR_WORKER_EMPTY_SLOT: + case BDR_WORKER_PERDB: + /* Nothing to do; switch only so we get warnings for insane cases */ + break; + default: + /* Bogus value */ + elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?", + worker->worker_type); + break; + } + } + LWLockRelease(BdrWorkerCtl->lock); + + return apply_workers; +} + +/* + * Each database with BDR enabled on it has a static background worker, + * registered at shared_preload_libraries time during postmaster start. This is + * the entry point for these bgworkers. + * + * This worker handles BDR startup on the database and launches apply workers + * for each BDR connection. + * + * Since the worker is fork()ed from the postmaster, all globals initialised in + * _PG_init remain valid. + * + * This worker can use the SPI and shared memory. + */ +void +bdr_perdb_worker_main(Datum main_arg) +{ + int rc = 0; + List *apply_workers; + ListCell *c; + BdrPerdbWorker *bdr_perdb_worker; + BdrWorker *bdr_worker_slot; + StringInfoData si; + bool wait; + + initStringInfo(&si); + + Assert(IsBackgroundWorker); + + bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ]; + Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB); + bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker; + bdr_worker_type = BDR_WORKER_PERDB; + + bdr_worker_init(NameStr(bdr_perdb_worker->dbname)); + + elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS); + + appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker"); + SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION); + + CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner"); + bdr_saved_resowner = CurrentResourceOwner; + + /* need to be able to perform writes ourselves */ + bdr_executor_always_allow_writes(true); + bdr_locks_startup(bdr_perdb_worker->nnodes); + + /* + * Do we need to init the local DB from a remote node? + * + * Checks bdr.bdr_nodes.status, does any remote initialization required if + * there's an init_replica connection, and ensures that + * bdr.bdr_nodes.status=r for our entry before continuing. + */ + bdr_init_replica(&bdr_perdb_worker->dbname); + + elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname)); + + /* Launch the apply workers */ + apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname)); + + /* + * For now, just free the bgworker handles. Later we'll probably want them + * for adding/removing/reconfiguring bgworkers. + */ + foreach(c, apply_workers) + { + BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c); + pfree(h); + } + +#ifdef BUILDING_BDR + elog(DEBUG1, "BDR starting sequencer on db \"%s\"", + NameStr(bdr_perdb_worker->dbname)); + + /* initialize sequencer */ + bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes); +#endif + + while (!got_SIGTERM) + { + wait = true; + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + +#ifdef BUILDING_BDR + /* check whether we need to start new elections */ + if (bdr_sequencer_start_elections()) + wait = false; + + /* check whether we need to vote */ + if (bdr_sequencer_vote()) + wait = false; + + /* check whether any of our elections needs to be tallied */ + bdr_sequencer_tally(); + + /* check all bdr sequences for used up chunks */ + bdr_sequencer_fill_sequences(); +#endif + + pgstat_report_activity(STATE_IDLE, NULL); + + /* + * Background workers mustn't call usleep() or any direct equivalent: + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. + * + * We wake up everytime our latch gets set or if 180 seconds have + * passed without events. That's a stopgap for the case a backend + * committed sequencer changes but died before setting the latch. + */ + if (wait) + { + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 180000L); + + ResetLatch(&MyProc->procLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } + } + + proc_exit(0); +}