bdr: Make bdr_init_copy work for UDR
authorPetr Jelinek <pjmodos@pjmodos.net>
Mon, 13 Oct 2014 12:07:08 +0000 (14:07 +0200)
committerPetr Jelinek <pjmodos@pjmodos.net>
Fri, 21 Nov 2014 13:55:51 +0000 (14:55 +0100)
bdr--0.8.0.sql
bdr_init_copy.c

index cbe5ea5d8e75dba0df0e14f6b8a0d909b291479e..6c1b5808131c085ac0324cd67fa2f08924179d8b 100644 (file)
@@ -125,7 +125,8 @@ CREATE INDEX bdr_votes__by_voter ON bdr.bdr_votes(voter_sysid, voter_tlid, voter
 
 -- register bdr am if seqam is supported
 DO $DO$BEGIN
-IF right(bdr_version(), 4) = '-udr' THEN
+PERFORM 1 FROM pg_catalog.pg_class WHERE relname = 'pg_seqam' AND relnamespace = 11;
+IF NOT FOUND THEN
     RETURN;
 END IF;
 
@@ -257,10 +258,11 @@ COMMENT ON TYPE bdr_conflict_resolution IS 'Resolution of a bdr conflict - if a
 
 -- when seqam is present, make sure the sequence is using local AM
 DO $DO$BEGIN
-IF right(bdr_version(), 4) = '-udr' THEN
-    CREATE SEQUENCE bdr_conflict_history_id_seq;
-ELSE
+PERFORM 1 FROM pg_catalog.pg_class WHERE relname = 'pg_seqam' AND relnamespace = 11;
+IF FOUND THEN
     EXECUTE 'CREATE SEQUENCE bdr_conflict_history_id_seq USING local';
+ELSE
+    CREATE SEQUENCE bdr_conflict_history_id_seq;
 END IF;
 END;$DO$;
 
@@ -514,7 +516,37 @@ CREATE TABLE bdr_replication_identifier (
 );
 
 CREATE UNIQUE INDEX bdr_replication_identifier_riiident_index ON bdr_replication_identifier(riident);
-CREATE UNIQUE INDEX pg_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
+CREATE UNIQUE INDEX bdr_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_create(i_riname text) RETURNS Oid
+AS $func$
+DECLARE
+   i smallint := 1;
+BEGIN
+   LOCK TABLE bdr.bdr_replication_identifier;
+   WHILE (SELECT 1 FROM bdr.bdr_replication_identifier WHERE riident = i) LOOP
+       i := i += 1;
+   END LOOP;
+   INSERT INTO bdr.bdr_replication_identifier(riident, riname) VALUES(i, i_riname);
+
+   RETURN i;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_advance(i_riname text, i_remote_lsn pg_lsn, i_local_lsn pg_lsn) RETURNS void
+AS $func$
+BEGIN
+   UPDATE bdr.bdr_replication_identifier SET riremote_lsn = i_remote_lsn, rilocal_lsn = i_local_lsn WHERE riname = i_riname;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION bdr_replication_identifier_drop(i_riname text) RETURNS void
+AS $func$
+BEGIN
+   DELETE FROM bdr.bdr_replication_identifier WHERE riname = i_riname;
+END;
+$func$ STRICT LANGUAGE plpgsql;
+
 
 END IF;
 END;$DO$;
index a0e925a2f537908e07e7d618e3995720e4135327..d5fb26705e84635044fb3dad3223fb40e2cbaf2d 100644 (file)
 
 #define LLOGCDIR "pg_logical/checkpoints"
 
+#ifdef BDR_MULTIMASTER
+#define RIINTERFACE_PREFIX "pg_catalog.pg_"
+#else
+#define RIINTERFACE_PREFIX "bdr.bdr_"
+#endif
+
 typedef struct RemoteInfo {
    uint64      sysid;
    TimeLineID  tlid;
@@ -63,7 +69,7 @@ static void create_replication_identifier(PGconn *conn,
 static char *create_restore_point(char *remote_connstr);
 static void initialize_replication_slots(bool init_replica);
 static void create_replication_slot(PGconn *conn, Name slot_name);
-static RemoteInfo *get_remote_info(PGconn *conn);
+static RemoteInfo *get_remote_info(PGconn *conn, char* aux_connstr);
 static Oid get_dboid_from_dbname(PGconn *conn, const char* dbname);
 
 static uint64 GenerateSystemIdentifier(void);
@@ -376,6 +382,7 @@ get_postgres_guc_value(char *guc, char *defval)
 static int
 set_sysid(void)
 {
+#ifdef BDR_MULTIMASTER
    int          ret;
    PQExpBuffer  cmd = createPQExpBuffer();
    char        *exec_path = find_other_exec_or_die(argv0, "bdr_resetxlog", "bdr_resetxlog (PostgreSQL) " PG_VERSION "\n");
@@ -387,6 +394,7 @@ set_sysid(void)
    destroyPQExpBuffer(cmd);
 
    return ret;
+#endif
 }
 
 
@@ -484,6 +492,7 @@ read_bdr_config(void)
 static void
 remove_unwanted_state(void)
 {
+#ifdef BDR_MULTIMASTER
    DIR             *lldir;
    struct dirent   *llde;
    PQExpBuffer      llpath = createPQExpBuffer();
@@ -530,6 +539,7 @@ remove_unwanted_state(void)
        die(_("Could not close directory \"%s\": %s\n"),
            LLOGCDIR, strerror(errno));
    }
+#endif
 }
 
 
@@ -568,7 +578,7 @@ initialize_replication_slots(bool init_replica)
                        PQerrorMessage(remote_conn));
        }
 
-       ri = get_remote_info(remote_conn);
+       ri = get_remote_info(remote_conn, cfg->dsn);
        dboid = cfg->init_replica ? ri->dboid : get_dboid_from_dbname(local_conn, cfg->dbname);
 
        /* XXX: this might break if timeline switch happens in meantime */
@@ -596,9 +606,10 @@ initialize_replication_slots(bool init_replica)
  * Read replication info about remote connection
  */
 static RemoteInfo *
-get_remote_info(PGconn *conn)
+get_remote_info(PGconn *conn, char* aux_connstr)
 {
    RemoteInfo  *ri = (RemoteInfo *)pg_malloc(sizeof(RemoteInfo));
+   char       *remote_sysid;
    char       *remote_tlid;
    char       *remote_dboid;
    PGresult   *res;
@@ -611,27 +622,59 @@ get_remote_info(PGconn *conn)
             "IDENTIFY_SYSTEM", PQerrorMessage(conn));
    }
 
-   if (PQntuples(res) != 1 || PQnfields(res) != 5)
+   if (PQntuples(res) != 1 || PQnfields(res) < 4 || PQnfields(res) > 5)
    {
        PQclear(res);
-       die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-            PQntuples(res), PQnfields(res), 1, 5);
+       die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d or %d fields\n"),
+            PQntuples(res), PQnfields(res), 1, 4, 5);
+   }
+
+   remote_sysid = PQgetvalue(res, 0, 0);
+   remote_tlid = PQgetvalue(res, 0, 1);
+
+   if (PQnfields(res) == 5)
+   {
+       remote_dboid = PQgetvalue(res, 0, 4);
+       if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
+           die(_("could not parse remote database OID %s"), remote_dboid);
+   }
+   else
+   {
+       PGconn     *db_conn;
+       PGresult   *res2;
+
+       db_conn = PQconnectdb(aux_connstr);
+       if (PQstatus(db_conn) != CONNECTION_OK)
+       {
+           PQfinish(db_conn);
+           die(_("Could not connect to the primary server: %s"), PQerrorMessage(db_conn));
+       }
+
+       res2 = PQexec(db_conn, "SELECT oid FROM pg_database WHERE datname = current_database()");
+       if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+           die(_("Could fetch database oid: %s"), PQerrorMessage(db_conn));
+
+       if (PQntuples(res2) != 1 || PQnfields(res2) != 1)
+           die(_("Could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                PQntuples(res2), PQnfields(res2), 1, 1);
+
+       remote_dboid = PQgetvalue(res2, 0, 0);
+       if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
+           die(_("could not parse remote database OID %s"), remote_dboid);
+
+       PQclear(res2);
+       PQfinish(db_conn);
    }
 
 #ifdef HAVE_STRTOULL
-   ri->sysid = strtoull(PQgetvalue(res, 0, 0), NULL, 10);
+   ri->sysid = strtoull(remote_sysid, NULL, 10);
 #else
-   ri->sysid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+   ri->sysid = strtoul(remote_sysid, NULL, 10);
 #endif
 
-   remote_tlid = PQgetvalue(res, 0, 1);
    if (sscanf(remote_tlid, "%u", &ri->tlid) != 1)
        die(_("Could not parse remote tlid %s\n"), remote_tlid);
 
-   remote_dboid = PQgetvalue(res, 0, 4);
-   if (sscanf(remote_dboid, "%u", &ri->dboid) != 1)
-       die(_("Could not parse remote database OID %s\n"), remote_dboid);
-
    PQclear(res);
 
    return ri;
@@ -705,7 +748,7 @@ initialize_replication_identifiers(char *remote_lsn)
    PGresult        *res;
 
    /* Remove replication identifiers */
-   res = PQexec(local_conn, "SELECT pg_catalog.pg_replication_identifier_drop(riname) FROM pg_catalog.pg_replication_identifier;");
+   res = PQexec(local_conn, "SELECT "RIINTERFACE_PREFIX"replication_identifier_drop(riname) FROM "RIINTERFACE_PREFIX"replication_identifier;");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        PQclear(res);
@@ -734,7 +777,7 @@ initialize_replication_identifiers(char *remote_lsn)
                        PQerrorMessage(remote_conn));
        }
 
-       ri = get_remote_info(remote_conn);
+       ri = get_remote_info(remote_conn, cfg->dsn);
        dboid = cfg->init_replica ? ri->dboid : get_dboid_from_dbname(local_conn, cfg->dbname);
 
        PQfinish(remote_conn);
@@ -759,7 +802,7 @@ create_replication_identifier(PGconn *conn, const char *remote_ident, char *remo
    PQExpBuffer query = createPQExpBuffer();
    PGresult   *res;
 
-   printfPQExpBuffer(query, "SELECT pg_replication_identifier_create('%s')",
+   printfPQExpBuffer(query, "SELECT "RIINTERFACE_PREFIX"replication_identifier_create('%s')",
                     remote_ident);
 
    res = PQexec(conn, query->data);
@@ -774,7 +817,7 @@ create_replication_identifier(PGconn *conn, const char *remote_ident, char *remo
 
    if (remote_lsn)
    {
-       printfPQExpBuffer(query, "SELECT pg_replication_identifier_advance('%s', '%s', '0/0')",
+       printfPQExpBuffer(query, "SELECT "RIINTERFACE_PREFIX"replication_identifier_advance('%s', '%s', '0/0')",
                         remote_ident, remote_lsn);
 
        res = PQexec(conn, query->data);