bdr: Introduce "replication_identifiers" to keep track of remote nodes
authorAndres Freund <andres@anarazel.de>
Fri, 22 Feb 2013 16:43:27 +0000 (17:43 +0100)
committerAndres Freund <andres@anarazel.de>
Fri, 17 May 2013 16:40:36 +0000 (18:40 +0200)
Replication identifiers can be used to track & lookup remote nodes identified
via (sysid, tlid, remote_dbid, local_dbid, name) and map that tuple to a local
uint16.
Keyed by that replication identifier the remote lsn and the local lsn is
tracked in a crash safe manner.

Support for tracking that via output plugins is added as well.

Needs a catversion bump.

26 files changed:
src/backend/access/rmgrdesc/xactdesc.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/catalog/Makefile
src/backend/catalog/catalog.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/replication_identifier.c [new file with mode: 0644]
src/backend/storage/ipc/ipci.c
src/backend/utils/cache/syscache.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/xact.h
src/include/access/xlog.h
src/include/access/xlogdefs.h
src/include/catalog/indexing.h
src/include/catalog/pg_replication_identifier.h [new file with mode: 0644]
src/include/replication/logical.h
src/include/replication/reorderbuffer.h
src/include/replication/replication_identifier.h [new file with mode: 0644]
src/include/utils/syscache.h
src/test/regress/expected/sanity_check.out

index 11c6912753a89bd9eefc78fcd93d302d83f44b8c..c959c13ae17e88ef1dbf2a1cc642b099c1f85d16 100644 (file)
@@ -26,9 +26,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
    int         i;
    TransactionId *subxacts;
+   SharedInvalidationMessage *msgs;
 
    subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
 
+   msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
    appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
    if (xlrec->nrels > 0)
@@ -50,9 +53,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
    }
    if (xlrec->nmsgs > 0)
    {
-       SharedInvalidationMessage *msgs;
-
-       msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
 
        if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
            appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
@@ -78,6 +78,16 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
                appendStringInfo(buf, " unknown id %d", msg->id);
        }
    }
+   if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+   {
+       xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+       appendStringInfo(buf, " origin %u, lsn %X/%X",
+                        origin->origin_node_id,
+                        (uint32)(origin->origin_lsn >> 32),
+                        (uint32)origin->origin_lsn);
+   }
+
 }
 
 static void
index f2204d798bb32276016cbbbe79aef192381f3d93..83a020b176bbd0d3b0ca35be6ee776c9f2ea1ee9 100644 (file)
 #include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/walsender.h"
 #include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -1052,11 +1054,13 @@ RecordTransactionCommit(void)
        /*
         * Do we need the long commit record? If not, use the compact format.
         */
-       if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+       if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit
+           || wal_level >= WAL_LEVEL_LOGICAL)
        {
-           XLogRecData rdata[4];
+           XLogRecData rdata[5];
            int         lastrdata = 0;
            xl_xact_commit xlrec;
+           xl_xact_origin origin;
 
            /*
             * Set flags required for recovery processing of commits.
@@ -1104,6 +1108,22 @@ RecordTransactionCommit(void)
                rdata[3].buffer = InvalidBuffer;
                lastrdata = 3;
            }
+           /* dump transaction origin information */
+           if (guc_replication_origin_id != InvalidRepNodeId)
+           {
+               Assert(replication_origin_lsn != InvalidXLogRecPtr);
+               elog(LOG, "logging origin id");
+               xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+               origin.origin_node_id = guc_replication_origin_id;
+               origin.origin_lsn = replication_origin_lsn;
+               origin.origin_timestamp = replication_origin_timestamp;
+
+               rdata[lastrdata].next = &(rdata[4]);
+               rdata[4].data = (char *) &origin;
+               rdata[4].len = sizeof(xl_xact_origin);
+               rdata[4].buffer = InvalidBuffer;
+               lastrdata = 4;
+           }
            rdata[lastrdata].next = NULL;
 
            (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1134,13 +1154,19 @@ RecordTransactionCommit(void)
        }
    }
 
+   /* record plain commit ts if not replaying remote actions */
+   if (guc_replication_origin_id == InvalidRepNodeId)
+       replication_origin_timestamp = xactStopTimestamp;
+
    /*
     * We don't need to log the commit timestamp separately since the commit
     * record logged above has all the necessary action to set the timestamp
     * again.
     */
    TransactionTreeSetCommitTimestamp(xid, nchildren, children,
-                                     xactStopTimestamp, 0, false);
+                                     replication_origin_timestamp,
+                                     guc_replication_origin_id,
+                                     false);
 
    /*
     * Check if we want to commit asynchronously.  We can allow the XLOG flush
@@ -1222,9 +1248,11 @@ RecordTransactionCommit(void)
    if (wrote_xlog)
        SyncRepWaitForLSN(XactLastRecEnd);
 
+   /* remember end of last commit record */
+   XactLastCommitEnd = XactLastRecEnd;
+
    /* Reset XactLastRecEnd until the next transaction writes something */
    XactLastRecEnd = 0;
-
 cleanup:
    /* Clean up local data */
    if (rels)
@@ -4593,10 +4621,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
                          SharedInvalidationMessage *inval_msgs, int nmsgs,
                          RelFileNode *xnodes, int nrels,
                          Oid dbId, Oid tsId,
-                         uint32 xinfo)
+                         uint32 xinfo,
+                         xl_xact_origin *origin)
 {
    TransactionId max_xid;
    int         i;
+   RepNodeId   origin_node_id = InvalidRepNodeId;
 
    max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
 
@@ -4616,9 +4646,29 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
        LWLockRelease(XidGenLock);
    }
 
+   Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+   if (xinfo & XACT_CONTAINS_ORIGIN)
+   {
+       origin_node_id = origin->origin_node_id;
+       commit_time = origin->origin_timestamp;
+   }
+
    /* Set the transaction commit time */
    TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
-                                     commit_time, 0, false);
+                                     commit_time,
+                                     origin_node_id, false);
+
+   if (xinfo & XACT_CONTAINS_ORIGIN)
+   {
+       elog(LOG, "restoring origin of node %u to %X/%X",
+            origin->origin_node_id,
+            (uint32)(origin->origin_lsn >> 32),
+            (uint32)origin->origin_lsn);
+       AdvanceReplicationIdentifier(origin->origin_node_id,
+                                    origin->origin_lsn,
+                                    lsn);
+   }
 
    if (standbyState == STANDBY_DISABLED)
    {
@@ -4734,19 +4784,25 @@ xact_redo_commit(xl_xact_commit *xlrec,
 {
    TransactionId *subxacts;
    SharedInvalidationMessage *inval_msgs;
-
+   xl_xact_origin *origin = NULL;
    /* subxid array follows relfilenodes */
    subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
    /* invalidation messages array follows subxids */
    inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+   if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+   {
+       origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+   }
+
    xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
                              subxacts, xlrec->nsubxacts,
                              inval_msgs, xlrec->nmsgs,
                              xlrec->xnodes, xlrec->nrels,
                              xlrec->dbId,
                              xlrec->tsId,
-                             xlrec->xinfo);
+                             xlrec->xinfo,
+                             origin);
 }
 
 /*
@@ -4762,7 +4818,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
                              NULL, 0,  /* relfilenodes */
                              InvalidOid,       /* dbId */
                              InvalidOid,       /* tsId */
-                             0);       /* xinfo */
+                             0,        /* xinfo */
+                             NULL      /* origin */);
 }
 
 /*
index 50feb0ff759bf2faf9d99d84a9cdf0f454949b53..1e3d34761225398026397442e8f199b08f7f0b43 100644 (file)
@@ -43,6 +43,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/logical.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -276,6 +277,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
 
 XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
 
+XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
+
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
  * (which is almost but not quite the same as a pointer to the most recent
@@ -939,6 +942,7 @@ begin:;
    rechdr->xl_len = len;       /* doesn't include backup blocks */
    rechdr->xl_info = info;
    rechdr->xl_rmid = rmid;
+   rechdr->xl_origin_id = guc_replication_origin_id;
 
    hdr_rdt.next = rdata;
    hdr_rdt.data = (char *) rechdr;
@@ -5216,6 +5220,11 @@ StartupXLOG(void)
     */
    StartupLogicalReplication(checkPoint.redo);
 
+   /*
+    * Recover knowledge about replay progress of known replication partners.
+    */
+   StartupReplicationIdentifier(checkPoint.redo);
+
    /*
     * Initialize unlogged LSN. On a clean shutdown, it's restored from the
     * control file. On recovery, all unlogged relations are blown away, so
@@ -6886,7 +6895,7 @@ CreateCheckPoint(int flags)
        XLogRecPtr  curInsert;
 
        INSERT_RECPTR(curInsert, Insert, Insert->curridx);
-       if (curInsert == ControlFile->checkPoint + 
+       if (curInsert == ControlFile->checkPoint +
            MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
            ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
        {
@@ -7267,6 +7276,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
    CheckPointPredicate();
    CheckPointRelationMap();
    CheckPointBuffers(flags);   /* performs all required fsyncs */
+   CheckPointReplicationIdentifier(checkPointRedo);
    /* We deliberately delay 2PC checkpointing as long as possible */
    CheckPointTwoPhase(checkPointRedo);
 }
index c4d3f3c1dcc11a5d3fa87297b60199b5b082d16e..c5801d8040b9b66464faba28c7250c16937f9628 100644 (file)
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
    pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
    pg_ts_parser.h pg_ts_template.h pg_extension.h \
    pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
-   pg_foreign_table.h \
+   pg_foreign_table.h pg_replication_identifier.h \
    pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
    toasting.h indexing.h \
     )
index 967182b541bc98d2a8ec81e83a89c7e6014499cd..98b9861e14f1f9bb6cd91d78c1aae5b1e58987e7 100644 (file)
@@ -32,6 +32,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_pltemplate.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
@@ -245,7 +246,8 @@ IsSharedRelation(Oid relationId)
        relationId == SharedDependRelationId ||
        relationId == SharedSecLabelRelationId ||
        relationId == TableSpaceRelationId ||
-       relationId == DbRoleSettingRelationId)
+       relationId == DbRoleSettingRelationId ||
+       relationId == ReplicationIdentifierRelationId)
        return true;
    /* These are their indexes (see indexing.h) */
    if (relationId == AuthIdRolnameIndexId ||
@@ -261,7 +263,9 @@ IsSharedRelation(Oid relationId)
        relationId == SharedSecLabelObjectIndexId ||
        relationId == TablespaceOidIndexId ||
        relationId == TablespaceNameIndexId ||
-       relationId == DbRoleSettingDatidRolidIndexId)
+       relationId == DbRoleSettingDatidRolidIndexId ||
+       relationId ==  ReplicationLocalIdIndex ||
+       relationId ==  ReplicationRemoteIndex)
        return true;
    /* These are their toast tables and toast indexes (see toasting.h) */
    if (relationId == PgShdescriptionToastTable ||
index 6fae2781ca1a2c2b49809d89d7ccf1f7099ed079..f24dbbe297adc92cb4593cb83dfe15a7563409bb 100644 (file)
@@ -14,7 +14,8 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+   snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
 
index c65c266668f406f008e8b87b7baafb0b403c89c5..fa6871d4c2f541341939dd031c8aca96079ebde2 100644 (file)
@@ -298,7 +298,8 @@ DecodeCommit(LogicalDecodingContext * ctx, XLogRecordBuffer * buf, TransactionId
    }
 
    /* replay actions of all transaction + subtransactions in order */
-   ReorderBufferCommit(ctx->reorder, xid, buf->origptr);
+   ReorderBufferCommit(ctx->reorder, xid, buf->origptr,
+                       buf->record.xl_origin_id);
 }
 
 static void
@@ -330,6 +331,7 @@ DecodeInsert(ReorderBuffer * reorder, XLogRecordBuffer * buf)
 
    change = ReorderBufferGetChange(reorder);
    change->action = REORDER_BUFFER_CHANGE_INSERT;
+   change->origin_id = r->xl_origin_id;
    memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
 
    if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -362,6 +364,7 @@ DecodeUpdate(ReorderBuffer * reorder, XLogRecordBuffer * buf)
 
    change = ReorderBufferGetChange(reorder);
    change->action = REORDER_BUFFER_CHANGE_UPDATE;
+   change->origin_id = r->xl_origin_id;
    memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
 
    data = (char *) &xlhdr->header;
@@ -418,6 +421,7 @@ DecodeDelete(ReorderBuffer * reorder, XLogRecordBuffer * buf)
 
    change = ReorderBufferGetChange(reorder);
    change->action = REORDER_BUFFER_CHANGE_DELETE;
+   change->origin_id = r->xl_origin_id;
 
    memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
 
@@ -469,6 +473,8 @@ DecodeMultiInsert(ReorderBuffer * reorder, XLogRecordBuffer * buf)
 
        change = ReorderBufferGetChange(reorder);
        change->action = REORDER_BUFFER_CHANGE_INSERT;
+       change->origin_id = r->xl_origin_id;
+
        memcpy(&change->relnode, &xlrec->node, sizeof(RelFileNode));
 
        /*
index 53b2dbc9b71eb31d0936c99c158f8b3b7f4e9b80..26061178983efff49f37ea63bbbfc414b4355868 100644 (file)
@@ -51,8 +51,12 @@ LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
 /* My slot for logical rep in the shared memory array */
 LogicalDecodingSlot *MyLogicalDecodingSlot = NULL;
 
-/* user settable parameters */
 int            max_logical_slots = 0;      /* the maximum number of logical slots */
+RepNodeId  guc_replication_node_id = InvalidRepNodeId; /* local node id */
+RepNodeId  guc_replication_origin_id = InvalidRepNodeId; /* assumed identity */
+
+XLogRecPtr replication_origin_lsn;
+TimestampTz replication_origin_timestamp;
 
 static void LogicalSlotKill(int code, Datum arg);
 
@@ -600,6 +604,9 @@ StartupLogicalReplication(XLogRecPtr checkPointRedo)
        if (strcmp(logical_de->d_name, "snapshots") == 0)
            continue;
 
+       if (strcmp(logical_de->d_name, "checkpoints") == 0)
+           continue;
+
        /* we crashed while a slot was being setup or deleted, clean up */
        if (strcmp(logical_de->d_name, "new") == 0 ||
            strcmp(logical_de->d_name, "old") == 0)
index 1d7dff5ffa8dbce9b61ca7dae1ebf0aefc968f50..14a06f5e5e1c0ebb784eaa7147732a79e450659d 100644 (file)
@@ -1187,7 +1187,7 @@ ReorderBufferFreeSnap(ReorderBuffer * buffer, Snapshot snap)
  * assigned via ReorderBufferCommitChild.
  */
 void
-ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn)
+ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn, RepNodeId origin)
 {
    ReorderBufferTXN *txn;
    ReorderBufferIterTXNState *iterstate = NULL;
@@ -1204,6 +1204,7 @@ ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn)
        return;
 
    txn->last_lsn = lsn;
+   txn->origin_id = origin;
 
    /* serialize the last bunch of changes if we need start earlier anyway */
    if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644 (file)
index 0000000..0fbbbb2
--- /dev/null
@@ -0,0 +1,619 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ *   Logical Replication Identifier and progress persistency support
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *   src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+#include "storage/fd.h"
+#include "storage/copydir.h"
+#include "utils/syscache.h"
+#include "utils/rel.h"
+
+typedef struct ReplicationState
+{
+   RepNodeId   local_identifier;
+
+   /*
+    * Latest commit from the remote side.
+    */
+   XLogRecPtr  remote_lsn;
+
+   /*
+    * Remember the local lsn of the commit record so we can XLogFlush() to it
+    * during a checkpoint so we know the commit record actually is safe on
+    * disk.
+    */
+   XLogRecPtr  local_lsn;
+} ReplicationState;
+
+/*
+ * Base address into array of replication states of size max_logical_slots.
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Local ReplicationState so we don't have to search ReplicationStates for the
+ * backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/*RSTATE */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by remotesysid,
+ * remotetli, remotedb, riname, rilocaldb.
+ *
+ * Returns InvalidOid.
+ */
+RepNodeId
+GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+                         Name riname, Oid rilocaldb)
+{
+   Oid riident = InvalidOid;
+   HeapTuple tuple;
+   Form_pg_replication_identifier ident;
+   NameData sysid;
+
+   sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+   tuple = SearchSysCache4(REPLIDREMOTE,
+                           NameGetDatum(&sysid),
+                           ObjectIdGetDatum(remotedb),
+                           ObjectIdGetDatum(rilocaldb),
+                           NameGetDatum(riname));
+   if (HeapTupleIsValid(tuple))
+   {
+       ident = (Form_pg_replication_identifier)GETSTRUCT(tuple);
+       riident = ident->riident;
+       ReleaseSysCache(tuple);
+   }
+   return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction and doesn't call
+ * CommandCounterIncrement().
+ */
+RepNodeId
+CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+                            Name riname, Oid rilocaldb)
+{
+   Oid riident;
+   HeapTuple tuple = NULL;
+   NameData sysid;
+   Relation rel;
+
+   Assert(IsTransactionState());
+
+   sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+   /* lock table against modifications */
+   rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+   /* XXX: should we start at FirstNormalObjectId ? */
+   for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
+   {
+       bool        nulls[Natts_pg_replication_identifier];
+       Datum       values[Natts_pg_replication_identifier];
+
+       tuple = GetReplicationInfoByIdentifier(riident);
+
+       if (tuple != NULL)
+       {
+           ReleaseSysCache(tuple);
+           continue;
+       }
+       /* ok, found an unused riident */
+
+       memset(&nulls, 0, sizeof(nulls));
+
+       values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+       values[Anum_pg_replication_riremotesysid - 1] = NameGetDatum(&sysid);
+       values[Anum_pg_replication_rilocaldb - 1] = ObjectIdGetDatum(rilocaldb);
+       values[Anum_pg_replication_riremotedb - 1] = ObjectIdGetDatum(remotedb);
+       values[Anum_pg_replication_riname - 1] = NameGetDatum(riname);
+
+       tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+       simple_heap_insert(rel, tuple);
+       CatalogUpdateIndexes(rel, tuple);
+       CommandCounterIncrement();
+       break;
+   }
+
+   /*
+    * only release at end of transaction, so we don't have to worry about race
+    * conditions with other transactions trying to insert a new
+    * identifier. Acquiring a new identifier should be a fairly infrequent
+    * thing, so this seems fine.
+    */
+   heap_close(rel, NoLock);
+
+   if (tuple == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                errmsg("no free replication id could be found")));
+
+   return riident;
+}
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed
+ */
+HeapTuple
+GetReplicationInfoByIdentifier(RepNodeId riident)
+{
+   HeapTuple tuple;
+
+   Assert(OidIsValid((Oid) riident));
+   Assert(riident < UINT16_MAX);
+   tuple = SearchSysCache1(REPLIDIDENT,
+                           ObjectIdGetDatum((Oid) riident));
+   return tuple;
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+   Size        size = 0;
+
+   /*
+    * FIXME: max_logical_slots is the wrong thing to use here, here we keep
+    * the replay state of *remote* transactions.
+    */
+   if (max_logical_slots == 0)
+       return size;
+
+   size = add_size(size,
+                   mul_size(max_logical_slots, sizeof(ReplicationState)));
+   return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+   bool        found;
+
+   if (max_logical_slots == 0)
+       return;
+
+   ReplicationStates = (ReplicationState *)
+       ShmemInitStruct("ReplicationIdentifierState",
+                       ReplicationIdentifierShmemSize(),
+                       &found);
+
+   if (!found)
+   {
+       MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+   }
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+-------------------------+-------------------------+-----+
+ * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF
+ * +-------+-------------------------+-------------------------+-----+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStates. Note that the maximum number of ReplicationStates is
+ * determined by max_logical_slots.
+ *
+ * XXX: This doesn't yet work on a standby in contrast to a master doing crash
+ * recovery, since the checkpoint data file won't be available there. Thats
+ * fine for now since a standby can't perform replay on its own.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+   char tmppath[MAXPGPATH];
+   char path[MAXPGPATH];
+   int fd;
+   int tmpfd;
+   int i;
+   uint32 magic = REPLICATION_STATE_MAGIC;
+
+   if (max_logical_slots == 0)
+       return;
+
+   elog(LOG, "doing replication identifier checkpoint");
+
+   sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+           (uint32)(ckpt >> 32), (uint32)ckpt);
+   sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp",
+           (uint32)(ckpt >> 32), (uint32)ckpt);
+
+   /* check whether file already exists */
+   fd = OpenTransientFile(path,
+                          O_RDONLY | PG_BINARY,
+                          0);
+
+   /* usual case, no checkpoint performed yet */
+   if (fd < 0 && errno == ENOENT)
+       ;
+   else if (fd < 0)
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not check replication state checkpoint \"%s\": %m",
+                       path)));
+   /* already checkpointed before crash during a checkpoint or so */
+   else
+   {
+       CloseTransientFile(fd);
+       return;
+   }
+
+   /* make sure no old temp file is remaining */
+   if (unlink(tmppath) < 0 && errno != ENOENT)
+       ereport(PANIC, (errmsg("failed while unlinking %s",  path)));
+
+   /*
+    * no other backend can perform this at the same time, we're protected by
+    * CheckpointLock.
+    */
+   tmpfd = OpenTransientFile(tmppath,
+                             O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                             S_IRUSR | S_IWUSR);
+   if (tmpfd < 0)
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not create replication identifier checkpoint \"%s\": %m",
+                       tmppath)));
+
+   /* write magic */
+   if ((write(tmpfd, &magic, sizeof(magic))) !=
+       sizeof(magic))
+   {
+       CloseTransientFile(tmpfd);
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not write replication identifier checkpoint \"%s\": %m",
+                       tmppath)));
+   }
+
+   /* write actual data */
+   for (i = 0; i < max_logical_slots; i++)
+   {
+       ReplicationState local_state;
+
+       if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+           continue;
+
+       local_state.local_identifier = ReplicationStates[i].local_identifier;
+       local_state.remote_lsn = ReplicationStates[i].remote_lsn;
+       local_state.local_lsn = InvalidXLogRecPtr;
+
+       /* make sure we only write out a commit thats persistent */
+       XLogFlush(ReplicationStates[i].local_lsn);
+
+
+       if ((write(tmpfd, &local_state, sizeof(ReplicationState))) !=
+           sizeof(ReplicationState))
+       {
+           CloseTransientFile(tmpfd);
+           ereport(PANIC,
+                   (errcode_for_file_access(),
+                    errmsg("could not write replication identifier checkpoint \"%s\": %m",
+                           tmppath)));
+       }
+   }
+
+   /* fsync the file */
+   if (pg_fsync(tmpfd) != 0)
+   {
+       CloseTransientFile(tmpfd);
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+                       tmppath)));
+   }
+
+   CloseTransientFile(tmpfd);
+
+   /* rename to permanent file, fsync file and directory */
+   if (rename(tmppath, path) != 0)
+   {
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+                       tmppath, path)));
+   }
+
+   fsync_fname("pg_llog/checkpoints", true);
+   fsync_fname(path, false);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+   char path[MAXPGPATH];
+   int fd;
+   int readBytes;
+   uint32 magic = REPLICATION_STATE_MAGIC;
+   int last_state = 0;
+
+   /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+   static bool already_started = false;
+   Assert(!already_started);
+   already_started = true;
+#endif
+
+   if (max_logical_slots == 0)
+       return;
+
+   elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+        (uint32)(ckpt >> 32), (uint32)ckpt);
+
+   sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+           (uint32)(ckpt >> 32), (uint32)ckpt);
+
+   fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+   /*
+    * might have had max_logical_slots == 0 last run, or we just brought up a
+    * standby.
+    */
+   if (fd < 0 && errno == ENOENT)
+       return;
+   else if (fd < 0)
+       ereport(PANIC,
+               (errcode_for_file_access(),
+                errmsg("could not open replication state checkpoint \"%s\": %m",
+                       path)));
+
+   /* verify magic, thats written even if nothing was active */
+   readBytes = read(fd, &magic, sizeof(magic));
+   if (readBytes != sizeof(magic))
+       ereport(PANIC, (errmsg("could not read replication state checkpoint magic \"%s\": %m",
+                              path)));
+
+   if (magic != REPLICATION_STATE_MAGIC)
+       ereport(PANIC, (errmsg("replication checkpoint has wrong magic %u instead of %u",
+                              magic, REPLICATION_STATE_MAGIC)));
+
+   /* recover individual states, until there are no more to be found */
+   while (true)
+   {
+       ReplicationState local_state;
+       readBytes = read(fd, &local_state, sizeof(local_state));
+
+       /* no further data */
+       if (readBytes == 0)
+           break;
+
+       if (readBytes != sizeof(local_state))
+       {
+           int saved_errno = errno;
+           CloseTransientFile(fd);
+           errno = saved_errno;
+           ereport(PANIC,
+                   (errcode_for_file_access(),
+                    errmsg("could not read replication checkpoint file \"%s\": %m, read %d of %zu",
+                           path, readBytes, sizeof(local_state))));
+       }
+
+       if (last_state == max_logical_slots)
+           ereport(PANIC,
+                   (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+                    errmsg("no free replication state could be found, increase max_logical_slots")));
+
+       /* copy data shared memory */
+       ReplicationStates[last_state++] = local_state;
+
+       elog(LOG, "recovered replication state of %u to %X/%X",
+            local_state.local_identifier,
+            (uint32)(local_state.remote_lsn >> 32),
+            (uint32)local_state.remote_lsn);
+   }
+
+   CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and guc_replication_origin_id that ensures
+ * we won't loose knowledge about that after a crash if the the transaction had
+ * a persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+                            XLogRecPtr remote_commit,
+                            XLogRecPtr local_commit)
+{
+   int i;
+   int free_slot = -1;
+   ReplicationState *replication_state = NULL;
+
+   /*
+    * XXX: should we restore into a hashtable and dump into shmem only after
+    * recovery finished?
+    */
+
+   /* check whether slot already exists */
+   for (i = 0; i < max_logical_slots; i++)
+   {
+       /* remember where to insert if necessary */
+       if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+           free_slot == -1)
+       {
+           free_slot = i;
+           continue;
+       }
+
+       /* not our slot */
+       if (ReplicationStates[i].local_identifier != node)
+           continue;
+
+       /* ok, found slot */
+       replication_state = &ReplicationStates[i];
+       break;
+   }
+
+   if (replication_state == NULL && free_slot == -1)
+       ereport(PANIC,
+               (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+                errmsg("no free replication state could be found for %u, increase max_logical_slots",
+                       node)));
+   /* initialize new slot */
+   else if (replication_state == NULL)
+   {
+       replication_state = &ReplicationStates[free_slot];
+       Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+       Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+       replication_state->local_identifier = node;
+   }
+
+   /*
+    * due to - harmless - race conditions during a checkpoint we could see
+    * values here that are older than the ones we already have in
+    * memory. Don't overwrite those.
+    */
+   if (replication_state->remote_lsn < remote_commit)
+       replication_state->remote_lsn = remote_commit;
+   if (replication_state->local_lsn < local_commit)
+       replication_state->local_lsn = local_commit;
+}
+
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the array
+ * doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only such cached identifier can exist per process and the current
+ * cached value can only be set again after the prvious value is torn down with
+ * TeardownCachedReplicationIdentifier.
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+   int i;
+   int free_slot = -1;
+
+   Assert(max_logical_slots != 0);
+   Assert(local_replication_state == NULL);
+
+   /*
+    * Aearch for either an existing slot for that identifier or a free one we
+    * can use.
+    */
+   for (i = 0; i < max_logical_slots; i++)
+   {
+       /* remember where to insert if necessary */
+       if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+           free_slot == -1)
+       {
+           free_slot = i;
+           continue;
+       }
+
+       /* not our slot */
+       if (ReplicationStates[i].local_identifier != node)
+           continue;
+
+       local_replication_state = &ReplicationStates[i];
+   }
+
+
+   if (local_replication_state == NULL && free_slot == -1)
+       ereport(PANIC,
+               (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+                errmsg("no free replication state could be found for %u, increase max_logical_slots",
+                       node)));
+   else if (local_replication_state == NULL)
+   {
+       local_replication_state = &ReplicationStates[free_slot];
+       local_replication_state->local_identifier = node;
+       Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+       Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+   }
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can be
+ * setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was cached.
+ */
+void
+TeardownCachedReplicationIdentifier(RepNodeId node)
+{
+   Assert(max_logical_slots != 0);
+   Assert(local_replication_state != NULL);
+
+   local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+                                  XLogRecPtr local_commit)
+{
+   Assert(local_replication_state != NULL);
+   if (local_replication_state->local_lsn < local_commit)
+       local_replication_state->local_lsn = local_commit;
+   if (local_replication_state->remote_lsn < remote_commit)
+       local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & chaced replication identifier.
+ */
+XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void)
+{
+   Assert(local_replication_state != NULL);
+   return local_replication_state->remote_lsn;
+}
index 0c2e55bcfe141b2367c08feec0f701b6e8d8a6f9..78fd1dd8c4db87deff4be0fb92295a65bcd2b2f5 100644 (file)
@@ -30,6 +30,7 @@
 #include "replication/logical.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
        size = add_size(size, CheckpointerShmemSize());
        size = add_size(size, AutoVacuumShmemSize());
        size = add_size(size, LogicalDecodingShmemSize());
+       size = add_size(size, ReplicationIdentifierShmemSize());
        size = add_size(size, WalSndShmemSize());
        size = add_size(size, WalRcvShmemSize());
        size = add_size(size, BTreeShmemSize());
@@ -233,6 +235,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
    CheckpointerShmemInit();
    AutoVacuumShmemInit();
    LogicalDecodingShmemInit();
+   ReplicationIdentifierShmemInit();
    WalSndShmemInit();
    WalRcvShmemInit();
 
index b5fe64f998b594c9a6aa878fcd3f8bf900f7de72..734a81001914121dd28003390b51d28ce2e602b8 100644 (file)
@@ -47,6 +47,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_range.h"
 #include "catalog/pg_rewrite.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_ts_config.h"
@@ -624,6 +625,28 @@ static const struct cachedesc cacheinfo[] = {
        },
        1024
    },
+   {ReplicationIdentifierRelationId,       /* REPLIDIDENT */
+       ReplicationLocalIdIndex,
+       1,
+       {
+           Anum_pg_replication_riident,
+           0,
+           0,
+           0
+       },
+       16
+   },
+   {ReplicationIdentifierRelationId,       /* REPLIDREMOTE */
+       ReplicationRemoteIndex,
+       4,
+       {
+           Anum_pg_replication_riremotesysid,
+           Anum_pg_replication_riremotedb,
+           Anum_pg_replication_rilocaldb,
+           Anum_pg_replication_riname
+       },
+       16
+   },
    {RewriteRelationId,         /* RULERELNAME */
        RewriteRelRulenameIndexId,
        2,
index f34358334b06644db33d1852ffa2c6109b3643e0..9a19a3543706e17ca12c912c31facf0f2277dad7 100644 (file)
@@ -199,6 +199,8 @@ static bool check_application_name(char **newval, void **extra, GucSource source
 static void assign_application_name(const char *newval, void *extra);
 static const char *show_unix_socket_permissions(void);
 static const char *show_log_file_mode(void);
+static void assign_replication_node_id(int newval, void *extra);
+static void assign_replication_origin_id(int newval, void *extra);
 
 static char *config_enum_get_options(struct config_enum * record,
                        const char *prefix, const char *suffix,
@@ -471,7 +473,8 @@ static int  wal_block_size;
 static int wal_segment_size;
 static bool integer_datetimes;
 static int effective_io_concurrency;
-
+static int phony_replication_node_id;
+static int phony_replication_origin_id;
 /* should be static, but commands/variable.c needs to get at this */
 char      *role_string;
 
@@ -2091,6 +2094,26 @@ static struct config_int ConfigureNamesInt[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"replication_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+        gettext_noop("node id for replication."),
+        NULL
+       },
+       &phony_replication_node_id,
+       InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+       NULL, assign_replication_node_id, NULL
+   },
+
+   {
+       {"replication_origin_id", PGC_USERSET, REPLICATION_MASTER,
+        gettext_noop("current node id for replication."),
+        NULL
+       },
+       &phony_replication_origin_id,
+       InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+       NULL, assign_replication_origin_id, NULL
+   },
+
    {
        {"commit_siblings", PGC_USERSET, WAL_SETTINGS,
            gettext_noop("Sets the minimum concurrent open transactions before performing "
@@ -8819,4 +8842,26 @@ show_log_file_mode(void)
    return buf;
 }
 
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+   guc_replication_node_id = newval;
+   /* set default to local node */
+   guc_replication_origin_id = newval;
+   phony_replication_origin_id = newval;
+}
+
+
+static void
+assign_replication_origin_id(int newval, void *extra)
+{
+   /*
+    * FIXME: add error checking hook that check wal_level and
+    * replication_node_id.
+    */
+   guc_replication_origin_id = newval;
+}
+
+
+
 #include "guc-file.c"
index 66dfe853c0ecd8df76726191afa51ba6bddf8ea9..a71226066929c628365b42659b41194c073b4d4c 100644 (file)
 #wal_receiver_timeout = 60s        # time that receiver waits for
                    # communication from master
                    # in milliseconds; 0 disables
-
+#replication_node_id = 0       #invalid node id
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
index c6f3e9ace614a30d198559ae54ba74f220e5ae59..78c1784a4e96e5bd2a9ab3ee5558274bdb6d1ed0 100644 (file)
@@ -196,7 +196,8 @@ const char *subdirs[] = {
    "pg_stat",
    "pg_stat_tmp",
    "pg_llog",
-   "pg_llog/snapshots"
+   "pg_llog/snapshots",
+   "pg_llog/checkpoints"
 };
 
 
index 9c340b2db590bd1ff2fc5e11f3c50cb253ce3d5c..395c39286616d149dc687c20a92a779df4b67c7e 100644 (file)
@@ -55,6 +55,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "common/fe_memutils.h"
+#include "replication/logical.h"
 
 extern int optind;
 extern char *optarg;
@@ -967,6 +968,7 @@ WriteEmptyXLOG(void)
    record->xl_len = sizeof(CheckPoint);
    record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
    record->xl_rmid = RM_XLOG_ID;
+   record->xl_origin_id = InvalidRepNodeId;
    memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
           sizeof(CheckPoint));
 
index 835f6acbee0e10ee51e5a2295429efd5141e3b77..513ed38e2991566cfce9c6abba6b468c98aa7d17 100644 (file)
@@ -146,6 +146,13 @@ typedef struct xl_xact_commit
    /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
 } xl_xact_commit;
 
+typedef struct xl_xact_origin
+{
+   XLogRecPtr  origin_lsn;
+   RepNodeId   origin_node_id;
+   TimestampTz origin_timestamp;
+} xl_xact_origin;
+
 #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
 
 /*
@@ -158,7 +165,7 @@ typedef struct xl_xact_commit
  */
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE   0x01
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT      0x02
-
+#define XACT_CONTAINS_ORIGIN                   0x04
 /* Access macros for above flags */
 #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
 #define XactCompletionForceSyncCommit(xinfo)       (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
index efda7620f3431be235b5ca98e7049d9905c60338..97eb9c675c50bce537118f84f7b5dbe9cab26996 100644 (file)
@@ -48,6 +48,7 @@ typedef struct XLogRecord
    /* 2 bytes of padding here, initialize to zero */
    XLogRecPtr  xl_prev;        /* ptr to previous record in log */
    pg_crc32    xl_crc;         /* CRC for this record */
+   RepNodeId   xl_origin_id;   /* what node did originally cause this record to be written */
 
    /* If MAXALIGN==8, there are 4 wasted bytes here */
 
@@ -177,6 +178,7 @@ typedef enum
 } RecoveryTargetType;
 
 extern XLogRecPtr XactLastRecEnd;
+extern XLogRecPtr XactLastCommitEnd;
 
 extern bool reachedConsistency;
 
index fa6497adaddc830cfcb4dbaab4deb43a464cea35..0affcdc415483a38549e4ce5042bb580866413f6 100644 (file)
@@ -44,6 +44,12 @@ typedef uint64 XLogSegNo;
  */
 typedef uint32 TimeLineID;
 
+/*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
 /*
  * Because O_DIRECT bypasses the kernel buffers, and because we never
  * read those buffers except during crash recovery or if wal_level != minimal,
index 2a3cd8215e517657c9ca4c0dea0ac2681646bd48..313fcda820fd95b050b27d37404544792e610acb 100644 (file)
@@ -313,6 +313,12 @@ DECLARE_UNIQUE_INDEX(pg_extension_name_index, 3081, on pg_extension using btree(
 DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops));
 #define RangeTypidIndexId                  3542
 
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3459, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdIndex 3459
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_remote_index, 3460, on pg_replication_identifier using btree(riremotesysid name_ops, riremotedb oid_ops, rilocaldb oid_ops, riname name_ops));
+#define ReplicationRemoteIndex 3460
+
 /* last step of initialization script: build the indexes declared above */
 BUILD_INDICES
 
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644 (file)
index 0000000..918f25f
--- /dev/null
@@ -0,0 +1,92 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ *
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ *   the genbki.pl script reads this file and generates .bki
+ *   information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ *     pg_replication_identifier.  cpp turns this into
+ *     typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 3458
+
+CATALOG(pg_replication_identifier,3458) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+   /*
+    * locally known identifier that gets included into wal.
+    *
+    * This should never leave the system.
+    *
+    * Needs to fit into a uint16, so we don't waste too much space. For this
+    * reason we don't use a normal Oid column here, since we need to handle
+    * allocation of new values manually.
+    */
+   Oid     riident;
+
+   /* ----
+    * remote system identifier, including tli, separated by a -.
+    *
+    * They are packed together for two reasons:
+    * a) we can't represent sysids as uint64 because there's no such type on
+    *    sql level, so we need a fixed width string anyway. And a name already
+    *    has enough space for that.
+    * b) syscaches can only have 4 keys, and were already at that with
+    *    combined keys
+    * ----
+    */
+   NameData riremotesysid;
+
+   /* local database */
+   Oid     rilocaldb;
+
+   /* remote database */
+   Oid     riremotedb;
+
+   /* optional name, zero length string */
+   NameData riname;
+#ifdef CATALOG_VARLEN      /* variable-length fields start here */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ *     Form_pg_extension corresponds to a pointer to a tuple with
+ *     the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ *     compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier        5
+#define Anum_pg_replication_riident            1
+#define Anum_pg_replication_riremotesysid  2
+#define Anum_pg_replication_rilocaldb      3
+#define Anum_pg_replication_riremotedb     4
+#define Anum_pg_replication_riname         5
+
+/* ----------------
+ *     pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif   /* PG_REPLICTION_IDENTIFIER_H */
index 246c7e7392e26c37e5d62a1df3ddf4a47d85eab4..288efc886ec3a9a4dfbc7a642e3d21295bfde82f 100644 (file)
@@ -161,6 +161,13 @@ typedef struct LogicalDecodingContext
 /* GUCs */
 extern PGDLLIMPORT int max_logical_slots;
 
+#define InvalidRepNodeId 0
+extern PGDLLIMPORT RepNodeId guc_replication_node_id;
+extern PGDLLIMPORT RepNodeId guc_replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+
 extern Size LogicalDecodingShmemSize(void);
 extern void LogicalDecodingShmemInit(void);
 
index 5b796b74c90e25a0fa0f99339176dce1166b6acb..f91511b1da43f9c6e26aad933a876efe53ce786c 100644 (file)
@@ -62,6 +62,8 @@ typedef struct ReorderBufferChange
        int         action_internal;
    };
 
+   RepNodeId origin_id;
+
    /*
     * Context data for the change, which part of the union is valid depends
     * on action/action_internal.
@@ -114,6 +116,10 @@ typedef struct ReorderBufferTXN
     * LSN of the first wal record with knowledge about this xid.
     */
    XLogRecPtr  lsn;
+
+   /*
+    * LSN of the commit record
+    */
    XLogRecPtr  last_lsn;
 
    /*
@@ -123,6 +129,9 @@ typedef struct ReorderBufferTXN
     */
    XLogRecPtr  restart_decoding_lsn;
 
+   /* origin of the change that caused this transaction */
+   RepNodeId origin_id;
+
    /* did the TX have catalog changes */
    bool        does_timetravel;
 
@@ -292,7 +301,7 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void       ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void       ReorderBufferAddChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
-void       ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+void       ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RepNodeId origin_id);
 void       ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn);
 void       ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn);
 void       ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644 (file)
index 0000000..97750d9
--- /dev/null
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ *     XXX
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+                                         Oid remotedb, Name riname,
+                                         Oid rilocaldb);
+extern RepNodeId CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+                                            Oid remotedb, Name riname,
+                                            Oid rilocaldb);
+
+extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident);
+
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(RepNodeId node);
+
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+                                              XLogRecPtr local_commit);
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+                                        XLogRecPtr remote_commit,
+                                        XLogRecPtr local_commit);
+
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+#endif
index 2a149052bfac417a2185be65fb2c2ee11102eab7..8bbc2ed4fdb1b385ffbb42b6e537aab16a2ab65a 100644 (file)
@@ -78,6 +78,8 @@ enum SysCacheIdentifier
    RELFILENODE,
    RELNAMENSP,
    RELOID,
+   REPLIDIDENT,
+   REPLIDREMOTE,
    RULERELNAME,
    STATRELATTINH,
    TABLESPACEOID,
index 432d39a4911ffaa7107a9011d584b6be1c61c8d7..907c804078ff66d1796fa144ed03b6e3548ba741 100644 (file)
@@ -9,164 +9,165 @@ SELECT relname, relhasindex
    FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = relnamespace
    WHERE relkind = 'r' AND (nspname ~ '^pg_temp_') IS NOT TRUE
    ORDER BY relname;
-         relname         | relhasindex 
--------------------------+-------------
- a                       | f
- a_star                  | f
- abstime_tbl             | f
- aggtest                 | f
- array_index_op_test     | t
- array_op_test           | f
- b                       | f
- b_star                  | f
- box_tbl                 | f
- bprime                  | f
- bt_f8_heap              | t
- bt_i4_heap              | t
- bt_name_heap            | t
- bt_txt_heap             | t
- c                       | f
- c_star                  | f
- char_tbl                | f
- check2_tbl              | f
- check_tbl               | f
- circle_tbl              | t
- city                    | f
- copy_tbl                | f
- d                       | f
- d_star                  | f
- date_tbl                | f
- default_tbl             | f
- defaultexpr_tbl         | f
- dept                    | f
- dupindexcols            | t
- e_star                  | f
- emp                     | f
- equipment_r             | f
- f_star                  | f
- fast_emp4000            | t
- float4_tbl              | f
- float8_tbl              | f
- func_index_heap         | t
- hash_f8_heap            | t
- hash_i4_heap            | t
- hash_name_heap          | t
- hash_txt_heap           | t
- hobbies_r               | f
- ihighway                | t
- inet_tbl                | f
- inhf                    | f
- inhx                    | t
- insert_tbl              | f
- int2_tbl                | f
- int4_tbl                | f
- int8_tbl                | f
- interval_tbl            | f
- iportaltest             | f
- kd_point_tbl            | t
- log_table               | f
- lseg_tbl                | f
- main_table              | f
- money_data              | f
- num_data                | f
- num_exp_add             | t
- num_exp_div             | t
- num_exp_ln              | t
- num_exp_log10           | t
- num_exp_mul             | t
- num_exp_power_10_ln     | t
- num_exp_sqrt            | t
- num_exp_sub             | t
- num_input_test          | f
- num_result              | f
- onek                    | t
- onek2                   | t
- path_tbl                | f
- person                  | f
- pg_aggregate            | t
- pg_am                   | t
- pg_amop                 | t
- pg_amproc               | t
- pg_attrdef              | t
- pg_attribute            | t
- pg_auth_members         | t
- pg_authid               | t
- pg_cast                 | t
- pg_class                | t
- pg_collation            | t
- pg_constraint           | t
- pg_conversion           | t
- pg_database             | t
- pg_db_role_setting      | t
- pg_default_acl          | t
- pg_depend               | t
- pg_description          | t
- pg_enum                 | t
- pg_event_trigger        | t
- pg_extension            | t
- pg_foreign_data_wrapper | t
- pg_foreign_server       | t
- pg_foreign_table        | t
- pg_index                | t
- pg_inherits             | t
- pg_language             | t
- pg_largeobject          | t
- pg_largeobject_metadata | t
- pg_namespace            | t
- pg_opclass              | t
- pg_operator             | t
- pg_opfamily             | t
- pg_pltemplate           | t
- pg_proc                 | t
- pg_range                | t
- pg_rewrite              | t
- pg_seclabel             | t
- pg_shdepend             | t
- pg_shdescription        | t
- pg_shseclabel           | t
- pg_statistic            | t
- pg_tablespace           | t
- pg_trigger              | t
- pg_ts_config            | t
- pg_ts_config_map        | t
- pg_ts_dict              | t
- pg_ts_parser            | t
- pg_ts_template          | t
- pg_type                 | t
- pg_user_mapping         | t
- point_tbl               | t
- polygon_tbl             | t
- quad_point_tbl          | t
- radix_text_tbl          | t
- ramp                    | f
- real_city               | f
- reltime_tbl             | f
- road                    | t
- shighway                | t
- slow_emp4000            | f
- sql_features            | f
- sql_implementation_info | f
- sql_languages           | f
- sql_packages            | f
- sql_parts               | f
- sql_sizing              | f
- sql_sizing_profiles     | f
- stud_emp                | f
- student                 | f
- tenk1                   | t
- tenk2                   | t
- test_range_excl         | t
- test_range_gist         | t
- test_range_spgist       | t
- test_tsvector           | f
- text_tbl                | f
- time_tbl                | f
- timestamp_tbl           | f
- timestamptz_tbl         | f
- timetz_tbl              | f
- tinterval_tbl           | f
- varchar_tbl             | f
-(155 rows)
+          relname          | relhasindex 
+---------------------------+-------------
+ a                         | f
+ a_star                    | f
+ abstime_tbl               | f
+ aggtest                   | f
+ array_index_op_test       | t
+ array_op_test             | f
+ b                         | f
+ b_star                    | f
+ box_tbl                   | f
+ bprime                    | f
+ bt_f8_heap                | t
+ bt_i4_heap                | t
+ bt_name_heap              | t
+ bt_txt_heap               | t
+ c                         | f
+ c_star                    | f
+ char_tbl                  | f
+ check2_tbl                | f
+ check_tbl                 | f
+ circle_tbl                | t
+ city                      | f
+ copy_tbl                  | f
+ d                         | f
+ d_star                    | f
+ date_tbl                  | f
+ default_tbl               | f
+ defaultexpr_tbl           | f
+ dept                      | f
+ dupindexcols              | t
+ e_star                    | f
+ emp                       | f
+ equipment_r               | f
+ f_star                    | f
+ fast_emp4000              | t
+ float4_tbl                | f
+ float8_tbl                | f
+ func_index_heap           | t
+ hash_f8_heap              | t
+ hash_i4_heap              | t
+ hash_name_heap            | t
+ hash_txt_heap             | t
+ hobbies_r                 | f
+ ihighway                  | t
+ inet_tbl                  | f
+ inhf                      | f
+ inhx                      | t
+ insert_tbl                | f
+ int2_tbl                  | f
+ int4_tbl                  | f
+ int8_tbl                  | f
+ interval_tbl              | f
+ iportaltest               | f
+ kd_point_tbl              | t
+ log_table                 | f
+ lseg_tbl                  | f
+ main_table                | f
+ money_data                | f
+ num_data                  | f
+ num_exp_add               | t
+ num_exp_div               | t
+ num_exp_ln                | t
+ num_exp_log10             | t
+ num_exp_mul               | t
+ num_exp_power_10_ln       | t
+ num_exp_sqrt              | t
+ num_exp_sub               | t
+ num_input_test            | f
+ num_result                | f
+ onek                      | t
+ onek2                     | t
+ path_tbl                  | f
+ person                    | f
+ pg_aggregate              | t
+ pg_am                     | t
+ pg_amop                   | t
+ pg_amproc                 | t
+ pg_attrdef                | t
+ pg_attribute              | t
+ pg_auth_members           | t
+ pg_authid                 | t
+ pg_cast                   | t
+ pg_class                  | t
+ pg_collation              | t
+ pg_constraint             | t
+ pg_conversion             | t
+ pg_database               | t
+ pg_db_role_setting        | t
+ pg_default_acl            | t
+ pg_depend                 | t
+ pg_description            | t
+ pg_enum                   | t
+ pg_event_trigger          | t
+ pg_extension              | t
+ pg_foreign_data_wrapper   | t
+ pg_foreign_server         | t
+ pg_foreign_table          | t
+ pg_index                  | t
+ pg_inherits               | t
+ pg_language               | t
+ pg_largeobject            | t
+ pg_largeobject_metadata   | t
+ pg_namespace              | t
+ pg_opclass                | t
+ pg_operator               | t
+ pg_opfamily               | t
+ pg_pltemplate             | t
+ pg_proc                   | t
+ pg_range                  | t
+ pg_replication_identifier | t
+ pg_rewrite                | t
+ pg_seclabel               | t
+ pg_shdepend               | t
+ pg_shdescription          | t
+ pg_shseclabel             | t
+ pg_statistic              | t
+ pg_tablespace             | t
+ pg_trigger                | t
+ pg_ts_config              | t
+ pg_ts_config_map          | t
+ pg_ts_dict                | t
+ pg_ts_parser              | t
+ pg_ts_template            | t
+ pg_type                   | t
+ pg_user_mapping           | t
+ point_tbl                 | t
+ polygon_tbl               | t
+ quad_point_tbl            | t
+ radix_text_tbl            | t
+ ramp                      | f
+ real_city                 | f
+ reltime_tbl               | f
+ road                      | t
+ shighway                  | t
+ slow_emp4000              | f
+ sql_features              | f
+ sql_implementation_info   | f
+ sql_languages             | f
+ sql_packages              | f
+ sql_parts                 | f
+ sql_sizing                | f
+ sql_sizing_profiles       | f
+ stud_emp                  | f
+ student                   | f
+ tenk1                     | t
+ tenk2                     | t
+ test_range_excl           | t
+ test_range_gist           | t
+ test_range_spgist         | t
+ test_tsvector             | f
+ text_tbl                  | f
+ time_tbl                  | f
+ timestamp_tbl             | f
+ timestamptz_tbl           | f
+ timetz_tbl                | f
+ tinterval_tbl             | f
+ varchar_tbl               | f
+(156 rows)
 
 --
 -- another sanity check: every system catalog that has OIDs should have