bdr: move perdb_worker_main to new bdr_perdb.c
authorCraig Ringer <craig@2ndquadrant.com>
Tue, 16 Dec 2014 15:26:47 +0000 (23:26 +0800)
committerCraig Ringer <craig@2ndquadrant.com>
Wed, 24 Dec 2014 02:29:27 +0000 (10:29 +0800)
Makefile.in
bdr.c
bdr.h
bdr_perdb.c [new file with mode: 0644]

index e1a548ce98f9d37897dd3f3c2fc8ff8e080c78fb..8adbb34eb5d5a92f615ac022406f3a48b57932a0 100644 (file)
@@ -25,6 +25,7 @@ SHLIB_LINK = $(libpq)
 OBJS = \
    bdr.o \
    bdr_apply.o \
+   bdr_perdb.o \
    bdr_catalogs.o \
    bdr_conflict_handlers.o \
    bdr_conflict_logging.o \
diff --git a/bdr.c b/bdr.c
index 11a1b665524db643ccdb4620b6353583bad45fa1..1e62196700e0efd1afb2228466ade72e274b0dd9 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -30,7 +30,6 @@
 #include "access/heapam.h"
 #include "access/xact.h"
 
-#include "catalog/catversion.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_extension.h"
 
@@ -42,8 +41,6 @@
 #include "libpq/libpq-be.h"
 #include "libpq/pqformat.h"
 
-#include "mb/pg_wchar.h"
-
 #include "nodes/execnodes.h"
 
 #include "postmaster/bgworker.h"
@@ -726,228 +723,6 @@ bdr_create_con_gucs(char  *name,
    return true;
 }
 
-
-/*
- * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on
- * the database identified by dbname.
- *
- * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY
- * with a matching database name and launches them.
- */
-static List*
-bdr_launch_apply_workers(char *dbname)
-{
-   List             *apply_workers = NIL;
-   BackgroundWorker  apply_worker;
-   int               i;
-
-   Assert(IsBackgroundWorker);
-
-   /* Common apply worker values */
-   apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
-       BGWORKER_BACKEND_DATABASE_CONNECTION;
-   apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
-   /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
-   apply_worker.bgw_main = bdr_apply_main;
-   apply_worker.bgw_restart_time = 5;
-   apply_worker.bgw_notify_pid = 0;
-
-   /* Launch apply workers */
-   LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
-   for (i = 0; i < bdr_max_workers; i++)
-   {
-       BdrWorker *worker = &BdrWorkerCtl->slots[i];
-
-       switch(worker->worker_type)
-       {
-           case BDR_WORKER_APPLY:
-               {
-                   BdrApplyWorker *con = &worker->worker_data.apply_worker;
-                   BdrConnectionConfig *cfg =
-                       bdr_connection_configs[con->connection_config_idx];
-                   Assert(cfg != NULL);
-                   if ( strcmp(cfg->dbname, dbname) == 0 )
-                   {
-                       /* It's an apply worker for our DB; register it */
-                       BackgroundWorkerHandle *bgw_handle;
-
-                       if (con->bgw_is_registered)
-                           /*
-                            * This worker was registered on a previous pass;
-                            * this is probably a restart of the per-db worker.
-                            * Don't register a duplicate.
-                            */
-                           continue;
-
-                       snprintf(apply_worker.bgw_name, BGW_MAXLEN,
-                                BDR_LOCALID_FORMAT": %s: apply",
-                                BDR_LOCALID_FORMAT_ARGS, cfg->name);
-                       apply_worker.bgw_main_arg = Int32GetDatum(i);
-
-                       if (!RegisterDynamicBackgroundWorker(&apply_worker,
-                                                            &bgw_handle))
-                       {
-                           ereport(ERROR,
-                                   (errmsg("bdr: Failed to register background worker"
-                                           " %s, see previous log messages",
-                                           cfg->name)));
-                       }
-                       /* We've launched this one, don't do it again */
-                       con->bgw_is_registered = true;
-                       apply_workers = lcons(bgw_handle, apply_workers);
-                   }
-               }
-               break;
-           case BDR_WORKER_EMPTY_SLOT:
-           case BDR_WORKER_PERDB:
-               /* Nothing to do; switch only so we get warnings for insane cases */
-               break;
-           default:
-               /* Bogus value */
-               elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?",
-                    worker->worker_type);
-               break;
-       }
-   }
-   LWLockRelease(BdrWorkerCtl->lock);
-
-   return apply_workers;
-}
-
-/*
- * Each database with BDR enabled on it has a static background worker,
- * registered at shared_preload_libraries time during postmaster start. This is
- * the entry point for these bgworkers.
- *
- * This worker handles BDR startup on the database and launches apply workers
- * for each BDR connection.
- *
- * Since the worker is fork()ed from the postmaster, all globals initialised in
- * _PG_init remain valid.
- *
- * This worker can use the SPI and shared memory.
- */
-static void
-bdr_perdb_worker_main(Datum main_arg)
-{
-   int               rc = 0;
-   List             *apply_workers;
-   ListCell         *c;
-   BdrPerdbWorker   *bdr_perdb_worker;
-   BdrWorker        *bdr_worker_slot;
-   StringInfoData    si;
-   bool              wait;
-
-   initStringInfo(&si);
-
-   Assert(IsBackgroundWorker);
-
-   bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
-   Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
-   bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
-   bdr_worker_type = BDR_WORKER_PERDB;
-
-   bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
-
-   elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS);
-
-   appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker");
-   SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION);
-
-   CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
-   bdr_saved_resowner = CurrentResourceOwner;
-
-   /* need to be able to perform writes ourselves */
-   bdr_executor_always_allow_writes(true);
-   bdr_locks_startup(bdr_perdb_worker->nnodes);
-
-   /*
-    * Do we need to init the local DB from a remote node?
-    *
-    * Checks bdr.bdr_nodes.status, does any remote initialization required if
-    * there's an init_replica connection, and ensures that
-    * bdr.bdr_nodes.status=r for our entry before continuing.
-    */
-   bdr_init_replica(&bdr_perdb_worker->dbname);
-
-   elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
-
-   /* Launch the apply workers */
-   apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname));
-
-   /*
-    * For now, just free the bgworker handles. Later we'll probably want them
-    * for adding/removing/reconfiguring bgworkers.
-    */
-   foreach(c, apply_workers)
-   {
-       BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
-       pfree(h);
-   }
-
-#ifdef BUILDING_BDR
-   elog(DEBUG1, "BDR starting sequencer on db \"%s\"",
-        NameStr(bdr_perdb_worker->dbname));
-
-   /* initialize sequencer */
-   bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes);
-#endif
-
-   while (!got_SIGTERM)
-   {
-       wait = true;
-
-       if (got_SIGHUP)
-       {
-           got_SIGHUP = false;
-           ProcessConfigFile(PGC_SIGHUP);
-       }
-
-#ifdef BUILDING_BDR
-       /* check whether we need to start new elections */
-       if (bdr_sequencer_start_elections())
-           wait = false;
-
-       /* check whether we need to vote */
-       if (bdr_sequencer_vote())
-           wait = false;
-
-       /* check whether any of our elections needs to be tallied */
-       bdr_sequencer_tally();
-
-       /* check all bdr sequences for used up chunks */
-       bdr_sequencer_fill_sequences();
-#endif
-
-       pgstat_report_activity(STATE_IDLE, NULL);
-
-       /*
-        * Background workers mustn't call usleep() or any direct equivalent:
-        * instead, they may wait on their process latch, which sleeps as
-        * necessary, but is awakened if postmaster dies.  That way the
-        * background process goes away immediately in an emergency.
-        *
-        * We wake up everytime our latch gets set or if 180 seconds have
-        * passed without events. That's a stopgap for the case a backend
-        * committed sequencer changes but died before setting the latch.
-        */
-       if (wait)
-       {
-           rc = WaitLatch(&MyProc->procLatch,
-                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                          180000L);
-
-           ResetLatch(&MyProc->procLatch);
-
-           /* emergency bailout if postmaster has died */
-           if (rc & WL_POSTMASTER_DEATH)
-               proc_exit(1);
-       }
-   }
-
-   proc_exit(0);
-}
-
 static size_t
 bdr_worker_shmem_size()
 {
diff --git a/bdr.h b/bdr.h
index 3122775d3276b4a97b3fafedeac6312042c804e1..19ba087befd1c52305baa597bda7f6c7a3be4c9a 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -422,6 +422,7 @@ extern void bdr_locks_check_query(void);
 /* background workers */
 extern void bdr_worker_init(char* dbname);
 extern void bdr_apply_main(Datum main_arg);
+extern void bdr_perdb_worker_main(Datum main_arg);
 
 /* manipulation of bdr catalogs */
 extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Oid dboid);
diff --git a/bdr_perdb.c b/bdr_perdb.c
new file mode 100644 (file)
index 0000000..2fcfa73
--- /dev/null
@@ -0,0 +1,254 @@
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_locks.h"
+
+#include "miscadmin.h"
+#include "pgstat.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "postmaster/bgworker.h"
+
+#include "lib/stringinfo.h"
+
+/* For struct Port only! */
+#include "libpq/libpq-be.h"
+
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/ipc.h"
+
+#include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+
+/*
+ * Launch a dynamic bgworker to run bdr_apply_main for each bdr connection on
+ * the database identified by dbname.
+ *
+ * Scans the BdrWorkerCtl shmem segment for workers of type BDR_WORKER_APPLY
+ * with a matching database name and launches them.
+ */
+static List*
+bdr_launch_apply_workers(char *dbname)
+{
+   List             *apply_workers = NIL;
+   BackgroundWorker  apply_worker;
+   int               i;
+
+   Assert(IsBackgroundWorker);
+
+   /* Common apply worker values */
+   apply_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+       BGWORKER_BACKEND_DATABASE_CONNECTION;
+   apply_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+   /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
+   apply_worker.bgw_main = bdr_apply_main;
+   apply_worker.bgw_restart_time = 5;
+   apply_worker.bgw_notify_pid = 0;
+
+   /* Launch apply workers */
+   LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
+   for (i = 0; i < bdr_max_workers; i++)
+   {
+       BdrWorker *worker = &BdrWorkerCtl->slots[i];
+
+       switch(worker->worker_type)
+       {
+           case BDR_WORKER_APPLY:
+               {
+                   BdrApplyWorker *con = &worker->worker_data.apply_worker;
+                   BdrConnectionConfig *cfg =
+                       bdr_connection_configs[con->connection_config_idx];
+                   Assert(cfg != NULL);
+                   if ( strcmp(cfg->dbname, dbname) == 0 )
+                   {
+                       /* It's an apply worker for our DB; register it */
+                       BackgroundWorkerHandle *bgw_handle;
+
+                       if (con->bgw_is_registered)
+                           /*
+                            * This worker was registered on a previous pass;
+                            * this is probably a restart of the per-db worker.
+                            * Don't register a duplicate.
+                            */
+                           continue;
+
+                       snprintf(apply_worker.bgw_name, BGW_MAXLEN,
+                                BDR_LOCALID_FORMAT": %s: apply",
+                                BDR_LOCALID_FORMAT_ARGS, cfg->name);
+                       apply_worker.bgw_main_arg = Int32GetDatum(i);
+
+                       if (!RegisterDynamicBackgroundWorker(&apply_worker,
+                                                            &bgw_handle))
+                       {
+                           ereport(ERROR,
+                                   (errmsg("bdr: Failed to register background worker"
+                                           " %s, see previous log messages",
+                                           cfg->name)));
+                       }
+                       /* We've launched this one, don't do it again */
+                       con->bgw_is_registered = true;
+                       apply_workers = lcons(bgw_handle, apply_workers);
+                   }
+               }
+               break;
+           case BDR_WORKER_EMPTY_SLOT:
+           case BDR_WORKER_PERDB:
+               /* Nothing to do; switch only so we get warnings for insane cases */
+               break;
+           default:
+               /* Bogus value */
+               elog(FATAL, "Unhandled BdrWorkerType case %i, memory corruption?",
+                    worker->worker_type);
+               break;
+       }
+   }
+   LWLockRelease(BdrWorkerCtl->lock);
+
+   return apply_workers;
+}
+
+/*
+ * Each database with BDR enabled on it has a static background worker,
+ * registered at shared_preload_libraries time during postmaster start. This is
+ * the entry point for these bgworkers.
+ *
+ * This worker handles BDR startup on the database and launches apply workers
+ * for each BDR connection.
+ *
+ * Since the worker is fork()ed from the postmaster, all globals initialised in
+ * _PG_init remain valid.
+ *
+ * This worker can use the SPI and shared memory.
+ */
+void
+bdr_perdb_worker_main(Datum main_arg)
+{
+   int               rc = 0;
+   List             *apply_workers;
+   ListCell         *c;
+   BdrPerdbWorker   *bdr_perdb_worker;
+   BdrWorker        *bdr_worker_slot;
+   StringInfoData    si;
+   bool              wait;
+
+   initStringInfo(&si);
+
+   Assert(IsBackgroundWorker);
+
+   bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+   Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
+   bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
+   bdr_worker_type = BDR_WORKER_PERDB;
+
+   bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
+
+   elog(DEBUG1, "per-db worker for node " BDR_LOCALID_FORMAT " starting", BDR_LOCALID_FORMAT_ARGS);
+
+   appendStringInfo(&si, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "perdb worker");
+   SetConfigOption("application_name", si.data, PGC_USERSET, PGC_S_SESSION);
+
+   CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
+   bdr_saved_resowner = CurrentResourceOwner;
+
+   /* need to be able to perform writes ourselves */
+   bdr_executor_always_allow_writes(true);
+   bdr_locks_startup(bdr_perdb_worker->nnodes);
+
+   /*
+    * Do we need to init the local DB from a remote node?
+    *
+    * Checks bdr.bdr_nodes.status, does any remote initialization required if
+    * there's an init_replica connection, and ensures that
+    * bdr.bdr_nodes.status=r for our entry before continuing.
+    */
+   bdr_init_replica(&bdr_perdb_worker->dbname);
+
+   elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
+
+   /* Launch the apply workers */
+   apply_workers = bdr_launch_apply_workers(NameStr(bdr_perdb_worker->dbname));
+
+   /*
+    * For now, just free the bgworker handles. Later we'll probably want them
+    * for adding/removing/reconfiguring bgworkers.
+    */
+   foreach(c, apply_workers)
+   {
+       BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
+       pfree(h);
+   }
+
+#ifdef BUILDING_BDR
+   elog(DEBUG1, "BDR starting sequencer on db \"%s\"",
+        NameStr(bdr_perdb_worker->dbname));
+
+   /* initialize sequencer */
+   bdr_sequencer_init(bdr_perdb_worker->seq_slot, bdr_perdb_worker->nnodes);
+#endif
+
+   while (!got_SIGTERM)
+   {
+       wait = true;
+
+       if (got_SIGHUP)
+       {
+           got_SIGHUP = false;
+           ProcessConfigFile(PGC_SIGHUP);
+       }
+
+#ifdef BUILDING_BDR
+       /* check whether we need to start new elections */
+       if (bdr_sequencer_start_elections())
+           wait = false;
+
+       /* check whether we need to vote */
+       if (bdr_sequencer_vote())
+           wait = false;
+
+       /* check whether any of our elections needs to be tallied */
+       bdr_sequencer_tally();
+
+       /* check all bdr sequences for used up chunks */
+       bdr_sequencer_fill_sequences();
+#endif
+
+       pgstat_report_activity(STATE_IDLE, NULL);
+
+       /*
+        * Background workers mustn't call usleep() or any direct equivalent:
+        * instead, they may wait on their process latch, which sleeps as
+        * necessary, but is awakened if postmaster dies.  That way the
+        * background process goes away immediately in an emergency.
+        *
+        * We wake up everytime our latch gets set or if 180 seconds have
+        * passed without events. That's a stopgap for the case a backend
+        * committed sequencer changes but died before setting the latch.
+        */
+       if (wait)
+       {
+           rc = WaitLatch(&MyProc->procLatch,
+                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                          180000L);
+
+           ResetLatch(&MyProc->procLatch);
+
+           /* emergency bailout if postmaster has died */
+           if (rc & WL_POSTMASTER_DEATH)
+               proc_exit(1);
+       }
+   }
+
+   proc_exit(0);
+}