* Get database Oid of the remotedb.
*/
static Oid
-bdr_get_remote_dboid(char *conninfo_db)
+bdr_get_remote_dboid(const char *conninfo_db)
{
PGconn *dbConn;
PGresult *res;
return remote_dboid_i;
}
+/*
+ * Format a slot name and replication identifier for a connection to a remote
+ * node, suitable for use as the slot name on the remote server and as the
+ * local replication identifier for that slot. Uses the local node's current
+ * dboid.
+ *
+ * Does NOT enforce that the remote and local node identities must differ.
+ *
+ * The replication identifier is allocated in the current memory context.
+ */
+void
+bdr_build_ident_and_slotname(uint64 remote_sysid, TimeLineID remote_tlid,
+ Oid remote_dboid, char **out_replication_identifier,
+ Name out_slot_name)
+{
+ StringInfoData replication_identifier;
+ char local_sysid[33];
+
+ initStringInfo(&replication_identifier);
+
+ Assert(MyDatabaseId != InvalidOid);
+ Assert(remote_dboid != InvalidOid);
+
+ snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+
+ /*
+ * Build slot name identifying the local node to the remote end.
+ */
+ snprintf(NameStr(*out_slot_name), NAMEDATALEN, BDR_SLOT_NAME_FORMAT,
+ remote_dboid, local_sysid, ThisTimeLineID,
+ MyDatabaseId, EMPTY_REPLICATION_NAME);
+ NameStr(*out_slot_name)[NAMEDATALEN - 1] = '\0';
+
+ /*
+ * Build replication identifier.
+ */
+ appendStringInfo(&replication_identifier, BDR_NODE_ID_FORMAT,
+ remote_sysid, remote_tlid, remote_dboid, MyDatabaseId,
+ EMPTY_REPLICATION_NAME);
+
+ *out_replication_identifier = replication_identifier.data;
+}
+
/*
* Establish a BDR connection
*
* Connects to the remote node, identifies it, and generates local and remote
- * replication identifiers and slot name.
+ * replication identifiers and slot name. The conninfo string passed should
+ * specify a dbname. It must not contain a replication= parameter.
+ *
+ * Does NOT enforce that the remote and local node identities must differ.
+ *
+ * appname may be NULL.
*
* The local replication identifier is not saved, the caller must do that.
*
* remote_tlid_i
*/
PGconn*
-bdr_connect(char *conninfo_repl,
- char *conninfo_db,
- char* remote_ident, size_t remote_ident_length,
- NameData* slot_name,
+bdr_connect(const char *conninfo,
+ Name appname,
uint64* remote_sysid_i, TimeLineID *remote_tlid_i,
Oid *remote_dboid_i)
{
PGconn *streamConn;
PGresult *res;
- StringInfoData query;
+ StringInfoData conninfo_repl;
char *remote_sysid;
char *remote_tlid;
- char *remote_dbname;
char local_sysid[32];
- NameData replication_name;
- initStringInfo(&query);
- NameStr(replication_name)[0] = '\0';
+ initStringInfo(&conninfo_repl);
+
+ appendStringInfo(&conninfo_repl, "%s %s fallback_application_name='%s'",
+ conninfo, "replication=database",
+ (appname == NULL ? "bdr" : NameStr(*appname)));
- streamConn = PQconnectdb(conninfo_repl);
+ streamConn = PQconnectdb(conninfo_repl.data);
if (PQstatus(streamConn) != CONNECTION_OK)
{
- ereport(FATAL,
+ ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn)),
- errdetail("Connection string is '%s'", conninfo_repl)));
+ errdetail("Connection string is '%s'", conninfo_repl.data)));
}
elog(DEBUG3, "Sending replication command: IDENTIFY_SYSTEM");
res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- elog(FATAL, "could not send replication command \"%s\": %s",
+ elog(ERROR, "could not send replication command \"%s\": %s",
"IDENTIFY_SYSTEM", PQerrorMessage(streamConn));
}
if (PQntuples(res) != 1 || PQnfields(res) < 4 || PQnfields(res) > 5)
{
- elog(FATAL, "could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n",
+ elog(ERROR, "could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n",
PQntuples(res), PQnfields(res), 1, 4, 5);
}
remote_sysid = PQgetvalue(res, 0, 0);
remote_tlid = PQgetvalue(res, 0, 1);
- remote_dbname = PQgetvalue(res, 0, 3);
if (PQnfields(res) == 5)
{
char *remote_dboid = PQgetvalue(res, 0, 4);
}
else
{
- *remote_dboid_i = bdr_get_remote_dboid(conninfo_db);
+ *remote_dboid_i = bdr_get_remote_dboid(conninfo);
}
if (sscanf(remote_sysid, UINT64_FORMAT, remote_sysid_i) != 1)
snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
GetSystemIdentifier());
- if (strcmp(remote_sysid, local_sysid) == 0
- && ThisTimeLineID == *remote_tlid_i
- && MyDatabaseId == *remote_dboid_i)
- {
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("The system identifier, timeline ID and/or database oid must differ between the nodes"),
- errdetail("Both keys are (sysid, timelineid, dboid) = (%s,%s,%s)",
- remote_sysid, remote_tlid, remote_dbname)));
- }
- else
- elog(DEBUG2, "local node (%s,%u,%u), remote node (%s,%s,%u)",
- local_sysid, ThisTimeLineID, MyDatabaseId, remote_sysid,
- remote_tlid, *remote_dboid_i);
-
- /*
- * build slot name.
- *
- * FIXME: This might truncate the identifier if replication_name is
- * somewhat longer...
- */
- snprintf(NameStr(*slot_name), NAMEDATALEN, BDR_SLOT_NAME_FORMAT,
- *remote_dboid_i, local_sysid, ThisTimeLineID,
- MyDatabaseId, NameStr(replication_name));
- NameStr(*slot_name)[NAMEDATALEN - 1] = '\0';
-
- /*
- * Build replication identifier.
- */
- snprintf(remote_ident, remote_ident_length,
- BDR_NODE_ID_FORMAT,
- *remote_sysid_i, *remote_tlid_i, *remote_dboid_i, MyDatabaseId,
- NameStr(replication_name));
+ elog(DEBUG2, "local node (%s,%u,%u), remote node (%s,%s,%u)",
+ local_sysid, ThisTimeLineID, MyDatabaseId, remote_sysid,
+ remote_tlid, *remote_dboid_i);
/* no parts of IDENTIFY_SYSTEM's response needed anymore */
PQclear(res);
*----------------------
*/
PGconn*
-bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
+bdr_establish_connection_and_slot(const char *dsn,
const char *application_name_suffix, Name out_slot_name, uint64 *out_sysid,
TimeLineID* out_timeline, Oid *out_dboid,
RepNodeId *out_replication_identifier, char **out_snapshot)
{
- char remote_ident[256];
- PGconn *streamConn;
- StringInfoData conninfo_repl;
+ PGconn *streamConn;
+ bool tx_started = false;
+ NameData appname;
+ char *remote_ident;
/*
* Make sure the local and remote nodes aren't the same node.
bdr_error_nodeids_must_differ(*out_sysid, *out_timeline, *out_dboid);
}
- initStringInfo (&conninfo_repl);
-
- appendStringInfo(&conninfo_repl,
- "%s replication=database fallback_application_name='"BDR_LOCALID_FORMAT": %s'",
- cfg->dsn, BDR_LOCALID_FORMAT_ARGS,
- application_name_suffix);
+ snprintf(NameStr(appname), NAMEDATALEN, BDR_LOCALID_FORMAT":%s",
+ BDR_LOCALID_FORMAT_ARGS, application_name_suffix);
- /* Establish BDR conn and IDENTIFY_SYSTEM */
+ /*
+ * Establish BDR conn and IDENTIFY_SYSTEM, ERROR on things like
+ * connection failure.
+ */
streamConn = bdr_connect(
- conninfo_repl.data,
- cfg->dsn,
- remote_ident, sizeof(remote_ident),
- out_slot_name, out_sysid, out_timeline, out_dboid
- );
+ dsn, &appname, out_sysid, out_timeline, out_dboid);
- StartTransactionCommand();
+ bdr_build_ident_and_slotname(*out_sysid, *out_timeline, *out_dboid,
+ &remote_ident, out_slot_name);
+
+ Assert(remote_ident != NULL);
+
+ if (!IsTransactionState())
+ {
+ tx_started = true;
+ StartTransactionCommand();
+ }
*out_replication_identifier = GetReplicationIdentifier(remote_ident, true);
- CommitTransactionCommand();
+ if (tx_started)
+ CommitTransactionCommand();
if (OidIsValid(*out_replication_identifier))
{
out_replication_identifier, out_snapshot);
}
+ pfree(remote_ident);
+ remote_ident = NULL;
+
return streamConn;
}
bdr_drop_slot_and_replication_identifier(BdrConnectionConfig *cfg)
{
- char remote_ident[256];
PGconn *streamConn;
RepNodeId replication_identifier;
NameData slot_name;
PGresult *res;
StringInfoData query;
char *sqlstate;
+ NameData appname;
+ char *remote_ident;
- initStringInfo(&query);
elog(DEBUG1, "bdr %s: Dropping slot and local ident from connection %s",
cfg->dbname, cfg->name);
- appendStringInfo(&query,
- "%s replication=database fallback_application_name='"BDR_LOCALID_FORMAT": %s: drop slot'",
- cfg->dsn, BDR_LOCALID_FORMAT_ARGS, cfg->name);
+ snprintf(NameStr(appname), NAMEDATALEN, "slot drop");
+ (NameStr(appname))[NAMEDATALEN-1] = '\0';
/* Establish BDR conn and IDENTIFY_SYSTEM */
streamConn = bdr_connect(
- query.data,
- cfg->dsn,
- remote_ident, sizeof(remote_ident),
- &slot_name, &sysid, &timeline, &dboid
+ cfg->dsn, &appname,
+ &sysid, &timeline, &dboid
);
+ bdr_build_ident_and_slotname(sysid, timeline, dboid,
+ &remote_ident, &slot_name);
+
+
StartTransactionCommand();
replication_identifier = GetReplicationIdentifier(remote_ident, true);
+ pfree(remote_ident);
+
if (OidIsValid(replication_identifier))
{
/* Local replication identifier exists and must be dropped. */
* whether it exists or not silently over the replication protocol,
* so we just try it and cope if it's missing.
*/
- resetStringInfo(&query);
+ initStringInfo(&query);
appendStringInfo(&query, "DROP_REPLICATION_SLOT %s", NameStr(slot_name));
res = PQexec(streamConn, query.data);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
* are all discarded; they're not needed here, and will be obtained
* again by the apply workers when they're launched after init.
*/
- conn = bdr_establish_connection_and_slot(cfg, "create slot",
+ conn = bdr_establish_connection_and_slot(cfg->dsn, "slot",
&slot_name, &sysid, &timeline, &dboid, &replication_identifier,
&snapshot);