-- register bdr am if seqam is supported
DO $DO$BEGIN
-IF right(bdr_version(), 4) = '-udr' THEN
+PERFORM 1 FROM pg_catalog.pg_class WHERE relname = 'pg_seqam' AND relnamespace = 11;
+IF NOT FOUND THEN
RETURN;
END IF;
-- when seqam is present, make sure the sequence is using local AM
DO $DO$BEGIN
-IF right(bdr_version(), 4) = '-udr' THEN
- CREATE SEQUENCE bdr_conflict_history_id_seq;
-ELSE
+PERFORM 1 FROM pg_catalog.pg_class WHERE relname = 'pg_seqam' AND relnamespace = 11;
+IF FOUND THEN
EXECUTE 'CREATE SEQUENCE bdr_conflict_history_id_seq USING local';
+ELSE
+ CREATE SEQUENCE bdr_conflict_history_id_seq;
END IF;
END;$DO$;
);
CREATE UNIQUE INDEX bdr_replication_identifier_riiident_index ON bdr_replication_identifier(riident);
-CREATE UNIQUE INDEX pg_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
+CREATE UNIQUE INDEX bdr_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_create(i_riname text) RETURNS Oid
+AS $func$
+DECLARE
+ i smallint := 1;
+BEGIN
+ LOCK TABLE bdr.bdr_replication_identifier;
+ WHILE (SELECT 1 FROM bdr.bdr_replication_identifier WHERE riident = i) LOOP
+ i := i += 1;
+ END LOOP;
+ INSERT INTO bdr.bdr_replication_identifier(riident, riname) VALUES(i, i_riname);
+
+ RETURN i;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_advance(i_riname text, i_remote_lsn pg_lsn, i_local_lsn pg_lsn) RETURNS void
+AS $func$
+BEGIN
+ UPDATE bdr.bdr_replication_identifier SET riremote_lsn = i_remote_lsn, rilocal_lsn = i_local_lsn WHERE riname = i_riname;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_drop(i_riname text) RETURNS void
+AS $func$
+BEGIN
+ DELETE FROM bdr.bdr_replication_identifier WHERE riname = i_riname;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
END IF;
END;$DO$;
#define LLOGCDIR "pg_logical/checkpoints"
+#ifdef BDR_MULTIMASTER
+#define RIINTERFACE_PREFIX "pg_catalog.pg_"
+#else
+#define RIINTERFACE_PREFIX "bdr.bdr_"
+#endif
+
typedef struct RemoteInfo {
uint64 sysid;
TimeLineID tlid;
static char *create_restore_point(char *remote_connstr);
static void initialize_replication_slots(bool init_replica);
static void create_replication_slot(PGconn *conn, Name slot_name);
-static RemoteInfo *get_remote_info(PGconn *conn);
+static RemoteInfo *get_remote_info(PGconn *conn, char* aux_connstr);
static Oid get_dboid_from_dbname(PGconn *conn, const char* dbname);
static uint64 GenerateSystemIdentifier(void);
static int
set_sysid(void)
{
+#ifdef BDR_MULTIMASTER
int ret;
PQExpBuffer cmd = createPQExpBuffer();
char *exec_path = find_other_exec_or_die(argv0, "bdr_resetxlog", "bdr_resetxlog (PostgreSQL) " PG_VERSION "\n");
destroyPQExpBuffer(cmd);
return ret;
+#endif
}
static void
remove_unwanted_state(void)
{
+#ifdef BDR_MULTIMASTER
DIR *lldir;
struct dirent *llde;
PQExpBuffer llpath = createPQExpBuffer();
die(_("Could not close directory \"%s\": %s\n"),
LLOGCDIR, strerror(errno));
}
+#endif
}
PQerrorMessage(remote_conn));
}
- ri = get_remote_info(remote_conn);
+ ri = get_remote_info(remote_conn, cfg->dsn);
dboid = cfg->init_replica ? ri->dboid : get_dboid_from_dbname(local_conn, cfg->dbname);
/* XXX: this might break if timeline switch happens in meantime */
* Read replication info about remote connection
*/
static RemoteInfo *
-get_remote_info(PGconn *conn)
+get_remote_info(PGconn *conn, char* aux_connstr)
{
RemoteInfo *ri = (RemoteInfo *)pg_malloc(sizeof(RemoteInfo));
+ char *remote_sysid;
char *remote_tlid;
char *remote_dboid;
PGresult *res;
"IDENTIFY_SYSTEM", PQerrorMessage(conn));
}
- if (PQntuples(res) != 1 || PQnfields(res) != 5)
+ if (PQntuples(res) != 1 || PQnfields(res) < 4 || PQnfields(res) > 5)
{
PQclear(res);
- die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
- PQntuples(res), PQnfields(res), 1, 5);
+ die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n"),
+ PQntuples(res), PQnfields(res), 1, 4, 5);
+ }
+
+ remote_sysid = PQgetvalue(res, 0, 0);
+ remote_tlid = PQgetvalue(res, 0, 1);
+
+ if (PQnfields(res) == 5)
+ {
+ remote_dboid = PQgetvalue(res, 0, 4);
+ if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
+ die(_("could not parse remote database OID %s"), remote_dboid);
+ }
+ else
+ {
+ PGconn *db_conn;
+ PGresult *res2;
+
+ db_conn = PQconnectdb(aux_connstr);
+ if (PQstatus(db_conn) != CONNECTION_OK)
+ {
+ PQfinish(db_conn);
+ die(_("Could not connect to the primary server: %s"), PQerrorMessage(db_conn));
+ }
+
+ res2 = PQexec(db_conn, "SELECT oid FROM pg_database WHERE datname = current_database()");
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+ die(_("Could fetch database oid: %s"), PQerrorMessage(db_conn));
+
+ if (PQntuples(res2) != 1 || PQnfields(res2) != 1)
+ die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+ PQntuples(res2), PQnfields(res2), 1, 1);
+
+ remote_dboid = PQgetvalue(res2, 0, 0);
+ if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
+ die(_("could not parse remote database OID %s"), remote_dboid);
+
+ PQclear(res2);
+ PQfinish(db_conn);
}
#ifdef HAVE_STRTOULL
- ri->sysid = strtoull(PQgetvalue(res, 0, 0), NULL, 10);
+ ri->sysid = strtoull(remote_sysid, NULL, 10);
#else
- ri->sysid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+ ri->sysid = strtoul(remote_sysid, NULL, 10);
#endif
- remote_tlid = PQgetvalue(res, 0, 1);
if (sscanf(remote_tlid, "%u", &ri->tlid) != 1)
die(_("Could not parse remote tlid %s\n"), remote_tlid);
- remote_dboid = PQgetvalue(res, 0, 4);
- if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
- die(_("Could not parse remote database OID %s\n"), remote_dboid);
-
PQclear(res);
return ri;
PGresult *res;
/* Remove replication identifiers */
- res = PQexec(local_conn, "SELECT pg_catalog.pg_replication_identifier_drop(riname) FROM pg_catalog.pg_replication_identifier;");
+ res = PQexec(local_conn, "SELECT "RIINTERFACE_PREFIX"replication_identifier_drop(riname) FROM "RIINTERFACE_PREFIX"replication_identifier;");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
PQerrorMessage(remote_conn));
}
- ri = get_remote_info(remote_conn);
+ ri = get_remote_info(remote_conn, cfg->dsn);
dboid = cfg->init_replica ? ri->dboid : get_dboid_from_dbname(local_conn, cfg->dbname);
PQfinish(remote_conn);
PQExpBuffer query = createPQExpBuffer();
PGresult *res;
- printfPQExpBuffer(query, "SELECT pg_replication_identifier_create('%s')",
+ printfPQExpBuffer(query, "SELECT "RIINTERFACE_PREFIX"replication_identifier_create('%s')",
remote_ident);
res = PQexec(conn, query->data);
if (remote_lsn)
{
- printfPQExpBuffer(query, "SELECT pg_replication_identifier_advance('%s', '%s', '0/0')",
+ printfPQExpBuffer(query, "SELECT "RIINTERFACE_PREFIX"replication_identifier_advance('%s', '%s', '0/0')",
remote_ident, remote_lsn);
res = PQexec(conn, query->data);