bdr: Move bdr_apply_main into bdr_apply.c
authorCraig Ringer <craig@2ndquadrant.com>
Tue, 16 Dec 2014 10:42:57 +0000 (18:42 +0800)
committerCraig Ringer <craig@2ndquadrant.com>
Tue, 16 Dec 2014 10:42:57 +0000 (18:42 +0800)
bdr.c
bdr.h
bdr_apply.c

diff --git a/bdr.c b/bdr.c
index c698977b8efd35ec178d836d4dc6273b759338a5..11a1b665524db643ccdb4620b6353583bad45fa1 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -93,14 +93,6 @@ int bdr_default_apply_delay;
 int bdr_max_workers;
 static bool bdr_skip_ddl_replication;
 bool bdr_skip_ddl_locking;
-/*
- * These globals are valid only for apply bgworkers, not for
- * bdr running in the postmaster or for per-db workers.
- *
- * TODO: move into bdr_apply.c when bdr_apply_main moved.
- */
-extern BdrApplyWorker *bdr_apply_worker;
-extern BdrConnectionConfig *bdr_apply_config;
 
 /* shmem init hook to chain to on startup, if any */
 static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
@@ -407,7 +399,7 @@ bdr_create_slot(PGconn *streamConn, Name slot_name,
  * - set search_path
  *
  */
-static void
+void
 bdr_worker_init(char *dbname)
 {
    Assert(IsBackgroundWorker);
@@ -525,158 +517,6 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
    return streamConn;
 }
 
-/*
- * Entry point for a BDR apply worker.
- *
- * Responsible for establishing a replication connection, creating slots
- * and starting the reply loop.
- *
- * TODO: move to bdr_apply.c
- */
-void
-bdr_apply_main(Datum main_arg)
-{
-   PGconn     *streamConn;
-   PGresult   *res;
-   StringInfoData query;
-   char       *sqlstate;
-   RepNodeId   replication_identifier;
-   XLogRecPtr  start_from;
-   NameData    slot_name;
-   BdrWorker  *bdr_worker_slot;
-
-   Assert(IsBackgroundWorker);
-
-   initStringInfo(&query);
-
-   bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
-   Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
-   bdr_apply_worker = &bdr_worker_slot->worker_data.apply_worker;
-   bdr_worker_type = BDR_WORKER_APPLY;
-
-   bdr_apply_config = bdr_connection_configs[bdr_apply_worker->connection_config_idx];
-   Assert(bdr_apply_config != NULL);
-
-   bdr_worker_init(bdr_apply_config->dbname);
-
-   CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr apply top-level resource owner");
-   bdr_saved_resowner = CurrentResourceOwner;
-
-   elog(DEBUG1, "%s initialized on %s",
-        MyBgworkerEntry->bgw_name, bdr_apply_config->dbname);
-
-   /* Set our local application_name for our SPI connections */
-   resetStringInfo(&query);
-   appendStringInfo(&query, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "apply");
-   if (bdr_apply_worker->forward_changesets)
-       appendStringInfoString(&query, " catchup");
-
-   if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
-       appendStringInfo(&query, " up to %X/%X",
-                        (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
-                        (uint32)bdr_apply_worker->replay_stop_lsn);
-
-   SetConfigOption("application_name", query.data, PGC_USERSET, PGC_S_SESSION);
-
-   /* Form an application_name string to send to the remote end */
-   resetStringInfo(&query);
-   appendStringInfoString(&query, "receive");
-
-   if (bdr_apply_worker->forward_changesets)
-       appendStringInfoString(&query, " catchup");
-
-   if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
-       appendStringInfo(&query, " up to %X/%X",
-                        (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
-                        (uint32)bdr_apply_worker->replay_stop_lsn);
-
-   /* Make the replication connection to the remote end */
-   streamConn = bdr_establish_connection_and_slot(bdr_apply_config,
-       query.data, &slot_name, &origin_sysid, &origin_timeline,
-       &origin_dboid, &replication_identifier, NULL);
-
-
-   /* initialize stat subsystem, our id won't change further */
-   bdr_count_set_current_node(replication_identifier);
-
-   /*
-    * tell replication_identifier.c about our identifier so it can cache the
-    * search in shared memory.
-    */
-   SetupCachedReplicationIdentifier(replication_identifier);
-
-   /*
-    * Check whether we already replayed something so we don't replay it
-    * multiple times.
-    */
-
-   start_from = RemoteCommitFromCachedReplicationIdentifier();
-
-   elog(INFO, "starting up replication from %u at %X/%X",
-        replication_identifier,
-        (uint32) (start_from >> 32), (uint32) start_from);
-
-   resetStringInfo(&query);
-   appendStringInfo(&query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
-                    NameStr(slot_name), (uint32) (start_from >> 32),
-                    (uint32) start_from);
-   appendStringInfo(&query, "pg_version '%u'", PG_VERSION_NUM);
-   appendStringInfo(&query, ", pg_catversion '%u'", CATALOG_VERSION_NO);
-   appendStringInfo(&query, ", bdr_version '%u'", BDR_VERSION_NUM);
-   appendStringInfo(&query, ", bdr_variant '%s'", BDR_VARIANT);
-   appendStringInfo(&query, ", min_bdr_version '%u'", BDR_MIN_REMOTE_VERSION_NUM);
-   appendStringInfo(&query, ", sizeof_int '%zu'", sizeof(int));
-   appendStringInfo(&query, ", sizeof_long '%zu'", sizeof(long));
-   appendStringInfo(&query, ", sizeof_datum '%zu'", sizeof(Datum));
-   appendStringInfo(&query, ", maxalign '%d'", MAXIMUM_ALIGNOF);
-   appendStringInfo(&query, ", float4_byval '%d'", bdr_get_float4byval());
-   appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
-   appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
-   appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
-   appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
-   if (bdr_apply_config->replication_sets != NULL &&
-       bdr_apply_config->replication_sets[0] != 0)
-       appendStringInfo(&query, ", replication_sets '%s'",
-                        bdr_apply_config->replication_sets);
-
-   appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
-   if (bdr_apply_worker->forward_changesets)
-       appendStringInfo(&query, ", forward_changesets 't'");
-
-   appendStringInfoChar(&query, ')');
-
-   elog(DEBUG3, "Sending replication command: %s", query.data);
-
-   res = PQexec(streamConn, query.data);
-
-   sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
-
-   if (PQresultStatus(res) != PGRES_COPY_BOTH)
-   {
-       elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
-            query.data, PQresultErrorMessage(res), sqlstate);
-   }
-   PQclear(res);
-
-   replication_origin_id = replication_identifier;
-
-   bdr_conflict_logging_startup();
-
-   PG_TRY();
-   {
-       bdr_apply_work(streamConn);
-   }
-   PG_CATCH();
-   {
-       if (IsTransactionState())
-           bdr_count_rollback();
-       PG_RE_THROW();
-   }
-   PG_END_TRY();
-
-   proc_exit(0);
-}
-
 /*
  * In postmaster, at shared_preload_libaries time, create the GUCs for a
  * connection. They'll be accessed by the apply worker that uses these GUCs
diff --git a/bdr.h b/bdr.h
index 5735037cf5081d135bdabfd41c9f64fde15e3f33..3122775d3276b4a97b3fafedeac6312042c804e1 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -290,7 +290,6 @@ extern Oid  BdrReplicationSetConfigRelid;
 extern Oid bdr_lookup_relid(const char *relname, Oid schema_oid);
 
 /* apply support */
-extern void bdr_process_remote_action(StringInfo s);
 extern void bdr_fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid,
                                        TimeLineID *tli, Oid *remote_dboid);
 extern RepNodeId bdr_fetch_node_id_via_sysid(uint64 sysid, TimeLineID tli, Oid dboid);
@@ -421,8 +420,8 @@ extern void bdr_locks_shmem_init(Size num_used_databases);
 extern void bdr_locks_check_query(void);
 
 /* background workers */
+extern void bdr_worker_init(char* dbname);
 extern void bdr_apply_main(Datum main_arg);
-extern void bdr_apply_work(PGconn* streamConn);
 
 /* manipulation of bdr catalogs */
 extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Oid dboid);
index 63900cf37730a7bdb5f3a3f39f4d01fc63eb42d6..7e3d3e489478f1335718846c916ce71295af454a 100644 (file)
@@ -29,6 +29,7 @@
 #include "access/relscan.h"
 #include "access/xact.h"
 
+#include "catalog/catversion.h"
 #include "catalog/dependency.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
@@ -36,6 +37,8 @@
 
 #include "libpq/pqformat.h"
 
+#include "mb/pg_wchar.h"
+
 #include "parser/parse_type.h"
 
 #include "replication/logical.h"
@@ -108,7 +111,7 @@ BdrConnectionConfig *bdr_apply_config = NULL;
 dlist_head bdr_lsn_association = DLIST_STATIC_INIT(bdr_lsn_association);
 
 static BDRRelation *read_rel(StringInfo s, LOCKMODE mode);
-extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
+static void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
 
 static void check_apply_update(BdrConflictType conflict_type,
                               RepNodeId local_node_id, TimestampTz local_ts,
@@ -1833,7 +1836,7 @@ check_sequencer_wakeup(BDRRelation *rel)
 #endif
 }
 
-void
+static void
 read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup)
 {
    TupleDesc   desc = RelationGetDescr(rel->rel);
@@ -1964,7 +1967,7 @@ read_rel(StringInfo s, LOCKMODE mode)
  *
  * May set got_SIGTERM to stop processing before next record.
  */
-void
+static void
 bdr_process_remote_action(StringInfo s)
 {
    char action = pq_getmsgbyte(s);
@@ -2190,7 +2193,7 @@ abs_timestamp_difference(TimestampTz start_time, TimestampTz stop_time,
 /*
  * The actual main loop of a BDR apply worker.
  */
-void
+static void
 bdr_apply_work(PGconn* streamConn)
 {
    int         fd;
@@ -2347,3 +2350,153 @@ bdr_apply_work(PGconn* streamConn)
        MemoryContextResetAndDeleteChildren(MessageContext);
    }
 }
+
+/*
+ * Entry point for a BDR apply worker.
+ *
+ * Responsible for establishing a replication connection, creating slots
+ * and starting the reply loop.
+ */
+void
+bdr_apply_main(Datum main_arg)
+{
+   PGconn     *streamConn;
+   PGresult   *res;
+   StringInfoData query;
+   char       *sqlstate;
+   RepNodeId   replication_identifier;
+   XLogRecPtr  start_from;
+   NameData    slot_name;
+   BdrWorker  *bdr_worker_slot;
+
+   Assert(IsBackgroundWorker);
+
+   initStringInfo(&query);
+
+   bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+   Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
+   bdr_apply_worker = &bdr_worker_slot->worker_data.apply_worker;
+   bdr_worker_type = BDR_WORKER_APPLY;
+
+   bdr_apply_config = bdr_connection_configs[bdr_apply_worker->connection_config_idx];
+   Assert(bdr_apply_config != NULL);
+
+   bdr_worker_init(bdr_apply_config->dbname);
+
+   CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr apply top-level resource owner");
+   bdr_saved_resowner = CurrentResourceOwner;
+
+   elog(DEBUG1, "%s initialized on %s",
+        MyBgworkerEntry->bgw_name, bdr_apply_config->dbname);
+
+   /* Set our local application_name for our SPI connections */
+   resetStringInfo(&query);
+   appendStringInfo(&query, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "apply");
+   if (bdr_apply_worker->forward_changesets)
+       appendStringInfoString(&query, " catchup");
+
+   if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
+       appendStringInfo(&query, " up to %X/%X",
+                        (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
+                        (uint32)bdr_apply_worker->replay_stop_lsn);
+
+   SetConfigOption("application_name", query.data, PGC_USERSET, PGC_S_SESSION);
+
+   /* Form an application_name string to send to the remote end */
+   resetStringInfo(&query);
+   appendStringInfoString(&query, "receive");
+
+   if (bdr_apply_worker->forward_changesets)
+       appendStringInfoString(&query, " catchup");
+
+   if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
+       appendStringInfo(&query, " up to %X/%X",
+                        (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
+                        (uint32)bdr_apply_worker->replay_stop_lsn);
+
+   /* Make the replication connection to the remote end */
+   streamConn = bdr_establish_connection_and_slot(bdr_apply_config,
+       query.data, &slot_name, &origin_sysid, &origin_timeline,
+       &origin_dboid, &replication_identifier, NULL);
+
+
+   /* initialize stat subsystem, our id won't change further */
+   bdr_count_set_current_node(replication_identifier);
+
+   /*
+    * tell replication_identifier.c about our identifier so it can cache the
+    * search in shared memory.
+    */
+   SetupCachedReplicationIdentifier(replication_identifier);
+
+   /*
+    * Check whether we already replayed something so we don't replay it
+    * multiple times.
+    */
+
+   start_from = RemoteCommitFromCachedReplicationIdentifier();
+
+   elog(INFO, "starting up replication from %u at %X/%X",
+        replication_identifier,
+        (uint32) (start_from >> 32), (uint32) start_from);
+
+   resetStringInfo(&query);
+   appendStringInfo(&query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
+                    NameStr(slot_name), (uint32) (start_from >> 32),
+                    (uint32) start_from);
+   appendStringInfo(&query, "pg_version '%u'", PG_VERSION_NUM);
+   appendStringInfo(&query, ", pg_catversion '%u'", CATALOG_VERSION_NO);
+   appendStringInfo(&query, ", bdr_version '%u'", BDR_VERSION_NUM);
+   appendStringInfo(&query, ", bdr_variant '%s'", BDR_VARIANT);
+   appendStringInfo(&query, ", min_bdr_version '%u'", BDR_MIN_REMOTE_VERSION_NUM);
+   appendStringInfo(&query, ", sizeof_int '%zu'", sizeof(int));
+   appendStringInfo(&query, ", sizeof_long '%zu'", sizeof(long));
+   appendStringInfo(&query, ", sizeof_datum '%zu'", sizeof(Datum));
+   appendStringInfo(&query, ", maxalign '%d'", MAXIMUM_ALIGNOF);
+   appendStringInfo(&query, ", float4_byval '%d'", bdr_get_float4byval());
+   appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
+   appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
+   appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
+   appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+   if (bdr_apply_config->replication_sets != NULL &&
+       bdr_apply_config->replication_sets[0] != 0)
+       appendStringInfo(&query, ", replication_sets '%s'",
+                        bdr_apply_config->replication_sets);
+
+   appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+   if (bdr_apply_worker->forward_changesets)
+       appendStringInfo(&query, ", forward_changesets 't'");
+
+   appendStringInfoChar(&query, ')');
+
+   elog(DEBUG3, "Sending replication command: %s", query.data);
+
+   res = PQexec(streamConn, query.data);
+
+   sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+   if (PQresultStatus(res) != PGRES_COPY_BOTH)
+   {
+       elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
+            query.data, PQresultErrorMessage(res), sqlstate);
+   }
+   PQclear(res);
+
+   replication_origin_id = replication_identifier;
+
+   bdr_conflict_logging_startup();
+
+   PG_TRY();
+   {
+       bdr_apply_work(streamConn);
+   }
+   PG_CATCH();
+   {
+       if (IsTransactionState())
+           bdr_count_rollback();
+       PG_RE_THROW();
+   }
+   PG_END_TRY();
+
+   proc_exit(0);
+}