bdr: Update of replication identifier emulation code
authorPetr Jelinek <pjmodos@pjmodos.net>
Fri, 17 Oct 2014 08:49:33 +0000 (10:49 +0200)
committerPetr Jelinek <pjmodos@pjmodos.net>
Fri, 21 Nov 2014 17:21:56 +0000 (18:21 +0100)
* new GetReplicationInfoByIdentifier compatible with upstream changes
* split replication_identifier emulation table into two to save on WAL
* add short explanation of replication identifier emulation
* don't replicate replication_identifier emulation table changes

bdr--0.8.0.sql
bdr_executor.c
bdr_output.c
bdr_replication_identifier.c
bdr_replication_identifier.h

index b0cc30f0fed44a663c262236575e5f071620737d..d114d9ef3191bb86e73d4c2307a894505bf7d662 100644 (file)
@@ -398,7 +398,7 @@ DECLARE
     ident TEXT;
 BEGIN
     -- don't recursively log truncation commands
-    IF pg_replication_identifier_is_replaying() THEN
+    IF bdr.bdr_replication_identifier_is_replaying() THEN
        RETURN NULL;
     END IF;
 
@@ -444,7 +444,7 @@ REVOKE ALL ON TABLE bdr_queued_drops FROM PUBLIC;
 SELECT pg_catalog.pg_extension_config_dump('bdr_queued_drops', '');
 
 DO $DO$BEGIN
-IF bdr.c() = 'BDR' THEN
+IF bdr.bdr_variant() = 'BDR' THEN
 
    CREATE OR REPLACE FUNCTION bdr.queue_dropped_objects()
    RETURNS event_trigger
@@ -456,7 +456,7 @@ IF bdr.c() = 'BDR' THEN
        otherobjs bdr.dropped_object[] = '{}';
    BEGIN
        -- don't recursively log drop commands
-       IF pg_replication_identifier_is_replaying() THEN
+       IF bdr.bdr_replication_identifier_is_replaying() THEN
           RETURN;
        END IF;
 
@@ -507,7 +507,7 @@ AS 'MODULE_PATHNAME'
 ;
 
 DO $DO$BEGIN
-IF bdr.bdr_variant() = 'BDR' THEN
+IF bdr.bdr_variant() = 'UDR' THEN
 
    CREATE TABLE bdr_replication_identifier (
        riident oid NOT NULL,
@@ -519,6 +519,14 @@ IF bdr.bdr_variant() = 'BDR' THEN
    CREATE UNIQUE INDEX bdr_replication_identifier_riiident_index ON bdr_replication_identifier(riident);
    CREATE UNIQUE INDEX bdr_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
 
+   CREATE TABLE bdr_replication_identifier_pos (
+       riident oid NOT NULL,
+       riremote_lsn pg_lsn,
+       rilocal_lsn pg_lsn
+   );
+
+   CREATE UNIQUE INDEX bdr_replication_identifier_pos_riiident_index ON bdr_replication_identifier_pos(riident);
+
    CREATE OR REPLACE FUNCTION bdr_replication_identifier_create(i_riname text) RETURNS Oid
    AS $func$
    DECLARE
@@ -529,6 +537,7 @@ IF bdr.bdr_variant() = 'BDR' THEN
            i := i += 1;
        END LOOP;
        INSERT INTO bdr.bdr_replication_identifier(riident, riname) VALUES(i, i_riname);
+       INSERT INTO bdr.bdr_replication_identifier_pos(riident) VALUES(i);
 
        RETURN i;
    END;
@@ -543,13 +552,29 @@ IF bdr.bdr_variant() = 'BDR' THEN
 
    CREATE OR REPLACE FUNCTION bdr_replication_identifier_drop(i_riname text) RETURNS void
    AS $func$
+   DECLARE
+       v_riident int;
    BEGIN
-       DELETE FROM bdr.bdr_replication_identifier WHERE riname = i_riname;
+       DELETE FROM bdr.bdr_replication_identifier WHERE riname = i_riname RETURNING riident INTO v_riident;
+       IF FOUND THEN
+           DELETE FROM bdr.bdr_replication_identifier_pos WHERE riident = v_riident;
+       END IF;
    END;
    $func$ STRICT LANGUAGE plpgsql;
 
+   CREATE OR REPLACE FUNCTION bdr.bdr_replication_identifier_is_replaying()
+   RETURNS boolean
+   LANGUAGE C
+   AS 'MODULE_PATHNAME';
+
+ELSE
+
+   CREATE OR REPLACE FUNCTION bdr.bdr_replication_identifier_is_replaying()
+   RETURNS boolean
+   LANGUAGE SQL
+   AS 'SELECT pg_replication_identifier_is_replaying()';
 
-   END IF;
+END IF;
 END;$DO$;
 
 ---
index 8564b43573fa719f9a37366fdbec3d0752e3de0a..7b7743a14bdaea4caf5da6bce5d6b6030deb39b0 100644 (file)
@@ -366,11 +366,11 @@ bdr_queue_ddl_command(char *command_tag, char *command)
 Datum
 bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
 {
-    EventTriggerData   *trigdata;
+   EventTriggerData   *trigdata;
    char               *skip_ddl;
 
-    if (!CALLED_AS_EVENT_TRIGGER(fcinfo))  /* internal error */
-        elog(ERROR, "not fired by event trigger manager");
+   if (!CALLED_AS_EVENT_TRIGGER(fcinfo))  /* internal error */
+       elog(ERROR, "not fired by event trigger manager");
 
    /*
     * If we're currently replaying something from a remote node, don't queue
@@ -388,7 +388,7 @@ bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
    if (strcmp(skip_ddl, "on") == 0)
        PG_RETURN_VOID();
 
-    trigdata = (EventTriggerData *) fcinfo->context;
+   trigdata = (EventTriggerData *) fcinfo->context;
 
    if (strcmp(trigdata->tag, "CREATE TABLE") == 0 &&
        IsA(trigdata->parsetree, CreateStmt))
@@ -544,8 +544,10 @@ bdr_replicate_ddl_command(PG_FUNCTION_ARGS)
 void
 bdr_executor_always_allow_writes(bool always_allow)
 {
+#ifdef BUILDING_BDR
    Assert(IsUnderPostmaster);
    bdr_always_allow_writes = always_allow;
+#endif
 }
 
 /*
index 04ade7ebccb936d7eb5c4a287896a106b943f669..e8c5c503e8f6bcb4ec7eba0265246c13cf837b23 100644 (file)
@@ -76,6 +76,10 @@ typedef struct
    Oid bdr_schema_oid;
    Oid bdr_conflict_handlers_reloid;
    Oid bdr_locks_reloid;
+#ifdef BUILDING_UDR
+   Oid bdr_replication_identifier_reloid;
+   Oid bdr_replication_identifier_pos_reloid;
+#endif
 
    int num_replication_sets;
    char **replication_sets;
@@ -362,6 +366,10 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
 
    data->bdr_conflict_handlers_reloid = InvalidOid;
    data->bdr_locks_reloid = InvalidOid;
+#ifdef BUILDING_UDR
+   data->bdr_replication_identifier_reloid = InvalidOid;
+   data->bdr_replication_identifier_pos_reloid = InvalidOid;
+#endif
    data->bdr_schema_oid = InvalidOid;
    data->num_replication_sets = -1;
 
@@ -544,6 +552,12 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
                get_relname_relid("bdr_global_locks", schema_oid);
            Assert(data->bdr_locks_reloid != InvalidOid); /* FIXME */
 
+#ifdef BUILDING_UDR
+           data->bdr_replication_identifier_reloid =
+               get_relname_relid("bdr_replication_identifier", schema_oid);
+           data->bdr_replication_identifier_pos_reloid =
+               get_relname_relid("bdr_replication_identifier_pos", schema_oid);
+#endif
        }
        else
            elog(WARNING, "cache lookup for schema bdr failed");
@@ -567,7 +581,7 @@ static inline bool
 should_forward_changeset(LogicalDecodingContext *ctx, BdrOutputData *data,
                         ReorderBufferTXN *txn)
 {
-#ifdef BDR_MULTIMASTER
+#ifdef BUILDING_BDR
    return txn->origin_id == InvalidRepNodeId || data->forward_changesets;
 #else
    return true;
@@ -579,9 +593,14 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
                      BDRRelation *r, enum ReorderBufferChangeType change)
 {
    /* internal bdr relations that may not be replicated */
-   if(RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
-      RelationGetRelid(r->rel) == data->bdr_locks_reloid)
+   if (RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
+       RelationGetRelid(r->rel) == data->bdr_locks_reloid)
        return false;
+#ifdef BUILDING_UDR
+   if (RelationGetRelid(r->rel) == data->bdr_replication_identifier_reloid ||
+       RelationGetRelid(r->rel) == data->bdr_replication_identifier_pos_reloid)
+       return false;
+#endif
 
    /*
     * Quite ugly, but there's no neat way right now: Flush replication set
index bfc56b9cad9bd58cecdd880b20f3cd0eae8d1dde..a898aa0e60e9050e3d227f520a6785e07146e8ae 100644 (file)
@@ -3,6 +3,18 @@
  * bdr_replication_identifier.c
  *     Replication identifiers emulation
  *
+ * The replication identifier is used to track the position to which local
+ * node has replayed the replication stream received from the remote node.
+ * The information is used after reconnect to specify from which position
+ * we want to receive changes.
+ *
+ * The replication identifier information has to be crash safe in order
+ * to guarantee that we always start replaying the stream from the position
+ * that was sucessfully saved to disk on local node.
+ *
+ * In BDR patched PostgreSQL we attach the replication identifier info into
+ * xlog records. But because extensions don't have access to xlog, UDR has
+ * to store data in a table which is less efficient.
  *
  * Copyright (C) 2012-2013, PostgreSQL Global Development Group
  *
@@ -42,15 +54,19 @@ RepNodeId   replication_origin_id = InvalidRepNodeId; /* assumed identity */
 XLogRecPtr replication_origin_lsn;
 TimestampTz    replication_origin_timestamp;
 
-#define Natts_pg_replication_identifier        4
+#define Natts_pg_replication_identifier        2
 #define Anum_pg_replication_riident            1
 #define Anum_pg_replication_riname         2
-#define Anum_pg_replication_riremote_lsn   3
-#define Anum_pg_replication_rilocal_lsn    4
 
+#define Natts_pg_replication_identifier_pos        3
+#define Anum_pg_replication_pos_riident            1
+#define Anum_pg_replication_pos_riremote_lsn   2
+#define Anum_pg_replication_pos_rilocal_lsn    3
 
 static Oid ReplicationIdentifierRelationId = InvalidOid;
+static Oid ReplicationIdentifierPosRelationId = InvalidOid;
 static Oid ReplicationLocalIdentIndex = InvalidOid;
+static Oid ReplicationPosLocalIdentIndex = InvalidOid;
 
 /*
  * Replay progress of a single remote node.
@@ -82,11 +98,15 @@ typedef struct ReplicationState
  */
 static ReplicationState *local_replication_state = NULL;
 
+PG_FUNCTION_INFO_V1(bdr_replication_identifier_is_replaying);
+
 static void
 EnsureReplicationIdentifierRelationId(void)
 {
    if (ReplicationIdentifierRelationId == InvalidOid ||
-       ReplicationLocalIdentIndex == InvalidOid)
+       ReplicationLocalIdentIndex == InvalidOid ||
+       ReplicationIdentifierPosRelationId == InvalidOid ||
+       ReplicationPosLocalIdentIndex == InvalidOid)
    {
        Oid schema_oid = get_namespace_oid("bdr", false);
 
@@ -94,41 +114,46 @@ EnsureReplicationIdentifierRelationId(void)
            bdr_lookup_relid("bdr_replication_identifier", schema_oid);
        ReplicationLocalIdentIndex =
            bdr_lookup_relid("bdr_replication_identifier_riiident_index", schema_oid);
+
+       ReplicationIdentifierPosRelationId =
+           bdr_lookup_relid("bdr_replication_identifier_pos", schema_oid);
+       ReplicationPosLocalIdentIndex =
+           bdr_lookup_relid("bdr_replication_identifier_pos_riiident_index", schema_oid);
    }
 }
 
 RepNodeId
 GetReplicationIdentifier(char *riname, bool missing_ok)
 {
-    Relation        rel;
+   Relation        rel;
    Snapshot        snap;
    SysScanDesc     scan;
-   ScanKeyData     key;
+   ScanKeyData     key;
    HeapTuple       tuple;
-   Oid             riident = InvalidOid;
+   Oid             riident = InvalidOid;
 
    EnsureReplicationIdentifierRelationId();
 
    snap = RegisterSnapshot(GetLatestSnapshot());
    rel = heap_open(ReplicationIdentifierRelationId, RowExclusiveLock);
 
-    ScanKeyInit(&key,
+   ScanKeyInit(&key,
                Anum_pg_replication_riname,
                BTEqualStrategyNumber, F_TEXTEQ,
                CStringGetTextDatum(riname));
-    scan = systable_beginscan(rel, 0, true, snap, 1, &key);
-    tuple = systable_getnext(scan);
+   scan = systable_beginscan(rel, 0, true, snap, 1, &key);
+   tuple = systable_getnext(scan);
 
-    if (HeapTupleIsValid(tuple))
+   if (HeapTupleIsValid(tuple))
    {
        Datum       values[Natts_pg_replication_identifier];
        bool        nulls[Natts_pg_replication_identifier];
 
-        heap_deform_tuple(tuple, RelationGetDescr(rel),
+       heap_deform_tuple(tuple, RelationGetDescr(rel),
                          values, nulls);
        riident = DatumGetObjectId(values[0]);
 
-    }
+   }
    else if (!missing_ok)
        elog(ERROR, "cache lookup failed for replication identifier named %s",
            riname);
@@ -143,10 +168,11 @@ GetReplicationIdentifier(char *riname, bool missing_ok)
 RepNodeId
 CreateReplicationIdentifier(char *riname)
 {
-   Oid     riident;
-   HeapTuple tuple = NULL;
-   Relation rel;
-   Datum   riname_d;
+   Oid         riident;
+   HeapTuple   tuple = NULL;
+   Relation    rel,
+               relpos;
+   Datum       riname_d;
    SnapshotData SnapshotDirty;
    SysScanDesc scan;
    ScanKeyData key;
@@ -175,11 +201,12 @@ CreateReplicationIdentifier(char *riname)
    InitDirtySnapshot(SnapshotDirty);
 
    rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+   relpos = heap_open(ReplicationIdentifierPosRelationId, ExclusiveLock);
 
    for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
    {
-       bool        nulls[Natts_pg_replication_identifier];
-       Datum       values[Natts_pg_replication_identifier];
+       bool        nulls[Natts_pg_replication_identifier_pos];
+       Datum       values[Natts_pg_replication_identifier_pos];
        bool        collides;
        CHECK_FOR_INTERRUPTS();
 
@@ -203,7 +230,8 @@ CreateReplicationIdentifier(char *riname)
             * Ok, found an unused riident, insert the new row and do a CCI,
             * so our callers can look it up if they want to.
             */
-           memset(&nulls, 0, sizeof(nulls));
+           memset(&nulls, false, sizeof(nulls));
+           memset(values, 0, sizeof(values));
 
            values[0] = ObjectIdGetDatum(riident);
            values[1] = riname_d;
@@ -211,6 +239,17 @@ CreateReplicationIdentifier(char *riname)
            tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
            simple_heap_insert(rel, tuple);
            CatalogUpdateIndexes(rel, tuple);
+
+           /* Insert new tuple to the position tracking table too */
+           memset(&nulls, false, sizeof(nulls));
+           memset(values, 0, sizeof(values));
+
+           values[0] = ObjectIdGetDatum(riident);
+
+           tuple = heap_form_tuple(RelationGetDescr(relpos), values, nulls);
+           simple_heap_insert(relpos, tuple);
+           CatalogUpdateIndexes(relpos, tuple);
+
            CommandCounterIncrement();
            break;
        }
@@ -218,6 +257,7 @@ CreateReplicationIdentifier(char *riname)
 
    /* now release lock again,  */
    heap_close(rel, ExclusiveLock);
+   heap_close(relpos, ExclusiveLock);
 
    if (tuple == NULL)
        ereport(ERROR,
@@ -231,11 +271,12 @@ CreateReplicationIdentifier(char *riname)
 void
 DropReplicationIdentifier(RepNodeId riident)
 {
-   HeapTuple tuple = NULL;
-   Relation rel;
+   HeapTuple   tuple = NULL;
+   Relation    rel,
+               relpos;
    SnapshotData SnapshotDirty;
-   SysScanDesc scan;
-   ScanKeyData key;
+   SysScanDesc scan;
+   ScanKeyData key;
 
    Assert(IsTransactionState());
 
@@ -244,7 +285,9 @@ DropReplicationIdentifier(RepNodeId riident)
    InitDirtySnapshot(SnapshotDirty);
 
    rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+   relpos = heap_open(ReplicationIdentifierPosRelationId, ExclusiveLock);
 
+   /* Find and delete tuple from name table */
    ScanKeyInit(&key,
                Anum_pg_replication_riident,
                BTEqualStrategyNumber, F_OIDEQ,
@@ -262,6 +305,24 @@ DropReplicationIdentifier(RepNodeId riident)
 
    systable_endscan(scan);
 
+   /* Find and delete tuple from position tracking table */
+   ScanKeyInit(&key,
+               Anum_pg_replication_pos_riident,
+               BTEqualStrategyNumber, F_OIDEQ,
+               ObjectIdGetDatum(riident));
+
+   scan = systable_beginscan(relpos, ReplicationPosLocalIdentIndex,
+                             true /* indexOK */,
+                             &SnapshotDirty,
+                             1, &key);
+
+   tuple = systable_getnext(scan);
+
+   if (HeapTupleIsValid(tuple))
+       simple_heap_delete(relpos, &tuple->t_self);
+
+   systable_endscan(scan);
+
    CommandCounterIncrement();
 
    /* now release lock again,  */
@@ -285,14 +346,14 @@ AdvanceReplicationIdentifier(RepNodeId node,
 
    InitDirtySnapshot(SnapshotDirty);
 
-   rel = heap_open(ReplicationIdentifierRelationId, RowExclusiveLock);
+   rel = heap_open(ReplicationIdentifierPosRelationId, RowExclusiveLock);
 
    ScanKeyInit(&key,
-               Anum_pg_replication_riident,
+               Anum_pg_replication_pos_riident,
                BTEqualStrategyNumber, F_OIDEQ,
                ObjectIdGetDatum(node));
 
-   scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+   scan = systable_beginscan(rel, ReplicationPosLocalIdentIndex,
                              true /* indexOK */,
                              &SnapshotDirty,
                              1, &key);
@@ -300,22 +361,22 @@ AdvanceReplicationIdentifier(RepNodeId node,
    tuple = systable_getnext(scan);
 
    if (HeapTupleIsValid(tuple))
-    {
+   {
        HeapTuple   newtuple;
-       Datum       values[Natts_pg_replication_identifier];
-       bool        nulls[Natts_pg_replication_identifier];
+       Datum       values[Natts_pg_replication_identifier_pos];
+       bool        nulls[Natts_pg_replication_identifier_pos];
 
-        heap_deform_tuple(tuple, RelationGetDescr(rel),
+       heap_deform_tuple(tuple, RelationGetDescr(rel),
                              values, nulls);
 
-        values[2] = LSNGetDatum(remote_commit);
-        values[3] = LSNGetDatum(local_commit);
+       values[1] = LSNGetDatum(remote_commit);
+       values[2] = LSNGetDatum(local_commit);
 
        newtuple = heap_form_tuple(RelationGetDescr(rel),
                                    values, nulls);
        simple_heap_update(rel, &tuple->t_self, newtuple);
        CatalogUpdateIndexes(rel, newtuple);
-    }
+   }
 
    systable_endscan(scan);
 
@@ -333,7 +394,7 @@ SetupCachedReplicationIdentifier(RepNodeId node)
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("cannot setup replication origin when one is already setup")));
 
-    local_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
+   local_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
    local_replication_state->local_identifier = node;
    local_replication_state->remote_lsn = InvalidXLogRecPtr;
    local_replication_state->local_lsn = InvalidXLogRecPtr;
@@ -343,19 +404,19 @@ void
 AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
                                   XLogRecPtr local_commit)
 {
-    bool start_transaction = !IsTransactionState();
+   bool start_transaction = !IsTransactionState();
 
    Assert(local_replication_state != NULL);
    local_replication_state->local_lsn = local_commit;
    local_replication_state->remote_lsn = remote_commit;
 
-    if (start_transaction)
-       StartTransactionCommand();
+   if (start_transaction)
+       StartTransactionCommand();
 
-    AdvanceReplicationIdentifier(local_replication_state->local_identifier, remote_commit, local_commit);
+   AdvanceReplicationIdentifier(local_replication_state->local_identifier, remote_commit, local_commit);
 
-    if (start_transaction)
-        CommitTransactionCommand();
+   if (start_transaction)
+       CommitTransactionCommand();
 }
 
 XLogRecPtr
@@ -372,14 +433,15 @@ RemoteCommitFromCachedReplicationIdentifier(void)
  * The result needs to be ReleaseSysCache'ed and is an invalid HeapTuple if
  * the lookup failed.
  */
-HeapTuple
-GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok)
+void
+GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname)
 {
-    Relation        rel;
+   Relation        rel;
    Snapshot        snap;
    SysScanDesc     scan;
-   ScanKeyData     key;
+   ScanKeyData     key;
    HeapTuple       tuple = NULL;
+   Form_pg_replication_identifier ric;
 
    EnsureReplicationIdentifierRelationId();
 
@@ -394,16 +456,42 @@ GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok)
                              true /* indexOK */,
                              snap,
                              1, &key);
-    tuple = systable_getnext(scan);
+   tuple = systable_getnext(scan);
 
-    if (!HeapTupleIsValid(tuple) && !missing_ok)
+   if (HeapTupleIsValid(tuple))
+   {
+       ric = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+       *riname = pstrdup(text_to_cstring(&ric->riname));
+   }
+
+   if (!HeapTupleIsValid(tuple) && !missing_ok)
        elog(ERROR, "cache lookup failed for replication identifier id: %u",
            riident);
 
    systable_endscan(scan);
    UnregisterSnapshot(snap);
    heap_close(rel, RowExclusiveLock);
+}
+
+static void
+CheckReplicationIdentifierPrerequisites(bool check_slots)
+{
+   if (!superuser())
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("only superusers can query or manipulate replication identifiers")));
+
+   if (check_slots && max_replication_slots == 0)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("cannot query or manipulate replication identifiers when max_replication_slots = 0")));
 
-   return tuple;
 }
 
+Datum
+bdr_replication_identifier_is_replaying(PG_FUNCTION_ARGS)
+{
+   CheckReplicationIdentifierPrerequisites(true);
+
+   PG_RETURN_BOOL(replication_origin_id != InvalidRepNodeId);
+}
index c409e94740701f7f8dbb3563c7403e5fde7e3b38..00ca5805443a8d62940ba498174c4155962c01ec 100644 (file)
@@ -16,6 +16,7 @@
 
 #else
 
+#include "fmgr.h"
 #include "access/htup.h"
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
@@ -39,20 +40,22 @@ extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
 
 extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
 
-extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok);
+extern void GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname);
+
+extern Datum bdr_replication_identifier_is_replaying(PG_FUNCTION_ARGS);
 
 /*
- * XXX: ugly
  * bdr_replication_identifier struct
+ *
+ * Used by GetReplicationInfoByIdentifier()
  */
 typedef struct {
-   Oid     riident;
-   text    riname;
-   XLogRecPtr  riremote_lsn;
-   XLogRecPtr  rilocal_lsn;
+        Oid        riident;
+        text   riname;
 } FormData_pg_replication_identifier;
 typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
 
+
 #endif
 
 #endif   /* BDR_REPLICATION_IDENTIFIER_H */