bdr: refactor bdr_connect to be more useful to other callers
authorCraig Ringer <craig@2ndquadrant.com>
Fri, 6 Feb 2015 13:37:58 +0000 (02:37 +1300)
committerAndres Freund <andres@anarazel.de>
Thu, 12 Feb 2015 09:16:58 +0000 (10:16 +0100)
bdr.c
bdr.h
bdr_apply.c
bdr_init_replica.c

diff --git a/bdr.c b/bdr.c
index d385af0fe175624829ff8e681adbe49cf7178a49..276fa1c705a98d969985dc9b7bcc63071596fc9a 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -161,7 +161,7 @@ bdr_sighup(SIGNAL_ARGS)
  * Get database Oid of the remotedb.
  */
 static Oid
-bdr_get_remote_dboid(char *conninfo_db)
+bdr_get_remote_dboid(const char *conninfo_db)
 {
    PGconn     *dbConn;
    PGresult   *res;
@@ -202,11 +202,60 @@ bdr_get_remote_dboid(char *conninfo_db)
    return remote_dboid_i;
 }
 
+/*
+ * Format a slot name and replication identifier for a connection to a remote
+ * node, suitable for use as the slot name on the remote server and as the
+ * local replication identifier for that slot. Uses the local node's current
+ * dboid.
+ *
+ * Does NOT enforce that the remote and local node identities must differ.
+ *
+ * The replication identifier is allocated in the current memory context.
+ */
+void
+bdr_build_ident_and_slotname(uint64 remote_sysid, TimeLineID remote_tlid,
+       Oid remote_dboid, char **out_replication_identifier,
+       Name out_slot_name)
+{
+   StringInfoData  replication_identifier;
+   char            local_sysid[33];
+
+   initStringInfo(&replication_identifier);
+
+   Assert(MyDatabaseId != InvalidOid);
+   Assert(remote_dboid != InvalidOid);
+
+   snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
+            GetSystemIdentifier());
+
+   /*
+    * Build slot name identifying the local node to the remote end.
+    */
+   snprintf(NameStr(*out_slot_name), NAMEDATALEN, BDR_SLOT_NAME_FORMAT,
+            remote_dboid, local_sysid, ThisTimeLineID,
+            MyDatabaseId, EMPTY_REPLICATION_NAME);
+   NameStr(*out_slot_name)[NAMEDATALEN - 1] = '\0';
+
+   /*
+    * Build replication identifier.
+    */
+   appendStringInfo(&replication_identifier, BDR_NODE_ID_FORMAT,
+            remote_sysid, remote_tlid, remote_dboid, MyDatabaseId,
+            EMPTY_REPLICATION_NAME);
+
+   *out_replication_identifier = replication_identifier.data;
+}
+
 /*
  * Establish a BDR connection
  *
  * Connects to the remote node, identifies it, and generates local and remote
- * replication identifiers and slot name.
+ * replication identifiers and slot name. The conninfo string passed should
+ * specify a dbname. It must not contain a replication= parameter.
+ *
+ * Does NOT enforce that the remote and local node identities must differ.
+ *
+ * appname may be NULL.
  *
  * The local replication identifier is not saved, the caller must do that.
  *
@@ -219,33 +268,32 @@ bdr_get_remote_dboid(char *conninfo_db)
  *   remote_tlid_i
  */
 PGconn*
-bdr_connect(char *conninfo_repl,
-           char *conninfo_db,
-           char* remote_ident, size_t remote_ident_length,
-           NameData* slot_name,
+bdr_connect(const char *conninfo,
+           Name appname,
            uint64* remote_sysid_i, TimeLineID *remote_tlid_i,
            Oid *remote_dboid_i)
 {
    PGconn     *streamConn;
    PGresult   *res;
-   StringInfoData query;
+   StringInfoData conninfo_repl;
    char       *remote_sysid;
    char       *remote_tlid;
-   char       *remote_dbname;
    char        local_sysid[32];
-   NameData    replication_name;
 
-   initStringInfo(&query);
-   NameStr(replication_name)[0] = '\0';
+   initStringInfo(&conninfo_repl);
+
+   appendStringInfo(&conninfo_repl, "%s %s fallback_application_name='%s'",
+           conninfo, "replication=database",
+           (appname == NULL ? "bdr" : NameStr(*appname)));
 
-   streamConn = PQconnectdb(conninfo_repl);
+   streamConn = PQconnectdb(conninfo_repl.data);
    if (PQstatus(streamConn) != CONNECTION_OK)
    {
-       ereport(FATAL,
+       ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_FAILURE),
                 errmsg("could not connect to the primary server: %s",
                        PQerrorMessage(streamConn)),
-                errdetail("Connection string is '%s'", conninfo_repl)));
+                errdetail("Connection string is '%s'", conninfo_repl.data)));
    }
 
    elog(DEBUG3, "Sending replication command: IDENTIFY_SYSTEM");
@@ -253,18 +301,17 @@ bdr_connect(char *conninfo_repl,
    res = PQexec(streamConn, "IDENTIFY_SYSTEM");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
-       elog(FATAL, "could not send replication command \"%s\": %s",
+       elog(ERROR, "could not send replication command \"%s\": %s",
             "IDENTIFY_SYSTEM", PQerrorMessage(streamConn));
    }
    if (PQntuples(res) != 1 || PQnfields(res) < 4 || PQnfields(res) > 5)
    {
-       elog(FATAL, "could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n",
+       elog(ERROR, "could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n",
             PQntuples(res), PQnfields(res), 1, 4, 5);
    }
 
    remote_sysid = PQgetvalue(res, 0, 0);
    remote_tlid = PQgetvalue(res, 0, 1);
-   remote_dbname = PQgetvalue(res, 0, 3);
    if (PQnfields(res) == 5)
    {
        char       *remote_dboid = PQgetvalue(res, 0, 4);
@@ -274,7 +321,7 @@ bdr_connect(char *conninfo_repl,
    }
    else
    {
-       *remote_dboid_i = bdr_get_remote_dboid(conninfo_db);
+       *remote_dboid_i = bdr_get_remote_dboid(conninfo);
    }
 
    if (sscanf(remote_sysid, UINT64_FORMAT, remote_sysid_i) != 1)
@@ -286,39 +333,9 @@ bdr_connect(char *conninfo_repl,
    snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
             GetSystemIdentifier());
 
-   if (strcmp(remote_sysid, local_sysid) == 0
-       && ThisTimeLineID == *remote_tlid_i
-       && MyDatabaseId == *remote_dboid_i)
-   {
-       ereport(FATAL,
-               (errcode(ERRCODE_INVALID_NAME),
-                errmsg("The system identifier, timeline ID and/or database oid must differ between the nodes"),
-                errdetail("Both keys are (sysid, timelineid, dboid) = (%s,%s,%s)",
-                remote_sysid, remote_tlid, remote_dbname)));
-   }
-   else
-       elog(DEBUG2, "local node (%s,%u,%u), remote node (%s,%s,%u)",
-            local_sysid, ThisTimeLineID, MyDatabaseId, remote_sysid,
-            remote_tlid, *remote_dboid_i);
-
-   /*
-    * build slot name.
-    *
-    * FIXME: This might truncate the identifier if replication_name is
-    * somewhat longer...
-    */
-   snprintf(NameStr(*slot_name), NAMEDATALEN, BDR_SLOT_NAME_FORMAT,
-            *remote_dboid_i, local_sysid, ThisTimeLineID,
-            MyDatabaseId, NameStr(replication_name));
-   NameStr(*slot_name)[NAMEDATALEN - 1] = '\0';
-
-   /*
-    * Build replication identifier.
-    */
-   snprintf(remote_ident, remote_ident_length,
-            BDR_NODE_ID_FORMAT,
-            *remote_sysid_i, *remote_tlid_i, *remote_dboid_i, MyDatabaseId,
-            NameStr(replication_name));
+   elog(DEBUG2, "local node (%s,%u,%u), remote node (%s,%s,%u)",
+        local_sysid, ThisTimeLineID, MyDatabaseId, remote_sysid,
+        remote_tlid, *remote_dboid_i);
 
    /* no parts of IDENTIFY_SYSTEM's response needed anymore */
    PQclear(res);
@@ -480,14 +497,15 @@ bdr_error_nodeids_must_differ(uint64 sysid, TimeLineID timeline, Oid dboid)
  *----------------------
  */
 PGconn*
-bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
+bdr_establish_connection_and_slot(const char *dsn,
    const char *application_name_suffix, Name out_slot_name, uint64 *out_sysid,
    TimeLineID* out_timeline, Oid *out_dboid,
    RepNodeId *out_replication_identifier, char **out_snapshot)
 {
-   char        remote_ident[256];
-   PGconn     *streamConn;
-   StringInfoData conninfo_repl;
+   PGconn      *streamConn;
+   bool        tx_started = false;
+   NameData    appname;
+   char        *remote_ident;
 
    /*
     * Make sure the local and remote nodes aren't the same node.
@@ -499,24 +517,29 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
        bdr_error_nodeids_must_differ(*out_sysid, *out_timeline, *out_dboid);
    }
 
-   initStringInfo (&conninfo_repl);
-
-   appendStringInfo(&conninfo_repl,
-                    "%s replication=database fallback_application_name='"BDR_LOCALID_FORMAT": %s'",
-                    cfg->dsn, BDR_LOCALID_FORMAT_ARGS,
-                    application_name_suffix);
+   snprintf(NameStr(appname), NAMEDATALEN, BDR_LOCALID_FORMAT":%s",
+           BDR_LOCALID_FORMAT_ARGS, application_name_suffix);
 
-   /* Establish BDR conn and IDENTIFY_SYSTEM */
+   /*
+    * Establish BDR conn and IDENTIFY_SYSTEM, ERROR on things like
+    * connection failure.
+    */
    streamConn = bdr_connect(
-       conninfo_repl.data,
-       cfg->dsn,
-       remote_ident, sizeof(remote_ident),
-       out_slot_name, out_sysid, out_timeline, out_dboid
-       );
+       dsn, &appname, out_sysid, out_timeline, out_dboid);
 
-   StartTransactionCommand();
+   bdr_build_ident_and_slotname(*out_sysid, *out_timeline, *out_dboid,
+           &remote_ident, out_slot_name);
+
+   Assert(remote_ident != NULL);
+
+   if (!IsTransactionState())
+   {
+       tx_started = true;
+       StartTransactionCommand();
+   }
    *out_replication_identifier = GetReplicationIdentifier(remote_ident, true);
-   CommitTransactionCommand();
+   if (tx_started)
+       CommitTransactionCommand();
 
    if (OidIsValid(*out_replication_identifier))
    {
@@ -541,6 +564,9 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
                        out_replication_identifier, out_snapshot);
    }
 
+   pfree(remote_ident);
+   remote_ident = NULL;
+
    return streamConn;
 }
 
diff --git a/bdr.h b/bdr.h
index 00b1056692c4e628c20cd3b060f41842785af077..f2fda9911c0e59f37c833ef926bd909536210d15 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -448,17 +448,12 @@ extern Oid GetSysCacheOidError(int cacheId, Datum key1, Datum key2, Datum key3,
 
 
 /* helpers shared by multiple worker types */
-extern struct pg_conn* bdr_connect(char *conninfo_repl,
-                                  char *conninfo_db,
-                                  char* remote_ident,
-                                  size_t remote_ident_length,
-                                  NameData* slot_name,
-                                  uint64* remote_sysid_i,
-                                  TimeLineID *remote_tlid_i,
+extern struct pg_conn* bdr_connect(const char *conninfo, Name appname,
+                                  uint64* remote_sysid_i, TimeLineID *remote_tlid_i,
                                   Oid *out_dboid_i);
 
 extern struct pg_conn *
-bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
+bdr_establish_connection_and_slot(const char *dsn,
                                  const char *application_name_suffix,
                                  Name out_slot_name,
                                  uint64 *out_sysid,
@@ -466,6 +461,10 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
                                  Oid *out_dboid,
                                  RepNodeId *out_replication_identifier,
                                  char **out_snapshot);
+extern void
+bdr_build_ident_and_slotname(uint64 remote_sysid, TimeLineID remote_tlid,
+       Oid remote_dboid, char **out_replication_identifier,
+       Name out_slot_name);
 
 extern PGconn* bdr_connect_nonrepl(const char *connstring,
        const char *appnamesuffix);
index a0b75e0bb10ba6eae95f3963d1f9155489665ebf..f901f488089253961d3466e4b7ba98ff71ffd7c4 100644 (file)
@@ -2453,7 +2453,7 @@ bdr_apply_main(Datum main_arg)
                         (uint32)bdr_apply_worker->replay_stop_lsn);
 
    /* Make the replication connection to the remote end */
-   streamConn = bdr_establish_connection_and_slot(bdr_apply_config,
+   streamConn = bdr_establish_connection_and_slot(bdr_apply_config->dsn,
        query.data, &slot_name, &origin_sysid, &origin_timeline,
        &origin_dboid, &replication_identifier, NULL);
 
index 9cc134b795934cf2a0e06b21448b3cd5945847b9..b5a5931bd612395040c7604e0f113f44677267eb 100644 (file)
@@ -411,7 +411,6 @@ static void
 bdr_drop_slot_and_replication_identifier(BdrConnectionConfig *cfg)
 {
 
-   char        remote_ident[256];
    PGconn     *streamConn;
    RepNodeId   replication_identifier;
    NameData    slot_name;
@@ -421,27 +420,31 @@ bdr_drop_slot_and_replication_identifier(BdrConnectionConfig *cfg)
    PGresult   *res;
    StringInfoData query;
    char       *sqlstate;
+   NameData    appname;
+   char       *remote_ident;
 
-   initStringInfo(&query);
 
    elog(DEBUG1, "bdr %s: Dropping slot and local ident from connection %s",
         cfg->dbname, cfg->name);
 
-   appendStringInfo(&query,
-                    "%s replication=database fallback_application_name='"BDR_LOCALID_FORMAT": %s: drop slot'",
-                     cfg->dsn, BDR_LOCALID_FORMAT_ARGS, cfg->name);
+   snprintf(NameStr(appname), NAMEDATALEN, "slot drop");
+   (NameStr(appname))[NAMEDATALEN-1] = '\0';
 
    /* Establish BDR conn and IDENTIFY_SYSTEM */
    streamConn = bdr_connect(
-       query.data,
-       cfg->dsn,
-       remote_ident, sizeof(remote_ident),
-       &slot_name, &sysid, &timeline, &dboid
+       cfg->dsn, &appname,
+       &sysid, &timeline, &dboid
        );
 
+   bdr_build_ident_and_slotname(sysid, timeline, dboid,
+           &remote_ident, &slot_name);
+
+
    StartTransactionCommand();
    replication_identifier = GetReplicationIdentifier(remote_ident, true);
 
+   pfree(remote_ident);
+
    if (OidIsValid(replication_identifier))
    {
        /* Local replication identifier exists and must be dropped. */
@@ -465,7 +468,7 @@ bdr_drop_slot_and_replication_identifier(BdrConnectionConfig *cfg)
     * whether it exists or not silently over the replication protocol,
     * so we just try it and cope if it's missing.
     */
-   resetStringInfo(&query);
+   initStringInfo(&query);
    appendStringInfo(&query, "DROP_REPLICATION_SLOT %s", NameStr(slot_name));
    res = PQexec(streamConn, query.data);
    if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -992,7 +995,7 @@ bdr_init_replica(Name dbname)
                 * are all discarded; they're not needed here, and will be obtained
                 * again by the apply workers when they're launched after init.
                 */
-               conn = bdr_establish_connection_and_slot(cfg, "create slot",
+               conn = bdr_establish_connection_and_slot(cfg->dsn, "slot",
                    &slot_name, &sysid, &timeline, &dboid, &replication_identifier,
                    &snapshot);