From 8459040789b40335c34403e4b9c84e0d1d1f43ed Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 22 Feb 2013 17:43:27 +0100 Subject: [PATCH] bdr: Introduce "replication_identifiers" to keep track of remote nodes Replication identifiers can be used to track & lookup remote nodes identified via (sysid, tlid, remote_dbid, local_dbid, name) and map that tuple to a local uint16. Keyed by that replication identifier the remote lsn and the local lsn is tracked in a crash safe manner. Support for tracking that via output plugins is added as well. Needs a catversion bump. --- src/backend/access/rmgrdesc/xactdesc.c | 16 +- src/backend/access/transam/xact.c | 75 ++- src/backend/access/transam/xlog.c | 12 +- src/backend/catalog/Makefile | 2 +- src/backend/catalog/catalog.c | 8 +- src/backend/replication/logical/Makefile | 3 +- src/backend/replication/logical/decode.c | 8 +- src/backend/replication/logical/logical.c | 9 +- .../replication/logical/reorderbuffer.c | 3 +- .../logical/replication_identifier.c | 619 ++++++++++++++++++ src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/cache/syscache.c | 23 + src/backend/utils/misc/guc.c | 47 +- src/backend/utils/misc/postgresql.conf.sample | 2 +- src/bin/initdb/initdb.c | 3 +- src/bin/pg_resetxlog/pg_resetxlog.c | 2 + src/include/access/xact.h | 9 +- src/include/access/xlog.h | 2 + src/include/access/xlogdefs.h | 6 + src/include/catalog/indexing.h | 6 + .../catalog/pg_replication_identifier.h | 92 +++ src/include/replication/logical.h | 7 + src/include/replication/reorderbuffer.h | 11 +- .../replication/replication_identifier.h | 41 ++ src/include/utils/syscache.h | 2 + src/test/regress/expected/sanity_check.out | 317 ++++----- 26 files changed, 1145 insertions(+), 183 deletions(-) create mode 100644 src/backend/replication/logical/replication_identifier.c create mode 100644 src/include/catalog/pg_replication_identifier.h create mode 100644 src/include/replication/replication_identifier.h diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 11c6912753..c959c13ae1 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -26,9 +26,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) { int i; TransactionId *subxacts; + SharedInvalidationMessage *msgs; subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; + msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; + appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); if (xlrec->nrels > 0) @@ -50,9 +53,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) } if (xlrec->nmsgs > 0) { - SharedInvalidationMessage *msgs; - - msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; if (XactCompletionRelcacheInitFileInval(xlrec->xinfo)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", @@ -78,6 +78,16 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) appendStringInfo(buf, " unknown id %d", msg->id); } } + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + { + xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]); + + appendStringInfo(buf, " origin %u, lsn %X/%X", + origin->origin_node_id, + (uint32)(origin->origin_lsn >> 32), + (uint32)origin->origin_lsn); + } + } static void diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index f2204d798b..83a020b176 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -37,8 +37,10 @@ #include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/walsender.h" #include "replication/syncrep.h" +#include "replication/replication_identifier.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -1052,11 +1054,13 @@ RecordTransactionCommit(void) /* * Do we need the long commit record? If not, use the compact format. */ - if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit) + if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit + || wal_level >= WAL_LEVEL_LOGICAL) { - XLogRecData rdata[4]; + XLogRecData rdata[5]; int lastrdata = 0; xl_xact_commit xlrec; + xl_xact_origin origin; /* * Set flags required for recovery processing of commits. @@ -1104,6 +1108,22 @@ RecordTransactionCommit(void) rdata[3].buffer = InvalidBuffer; lastrdata = 3; } + /* dump transaction origin information */ + if (guc_replication_origin_id != InvalidRepNodeId) + { + Assert(replication_origin_lsn != InvalidXLogRecPtr); + elog(LOG, "logging origin id"); + xlrec.xinfo |= XACT_CONTAINS_ORIGIN; + origin.origin_node_id = guc_replication_origin_id; + origin.origin_lsn = replication_origin_lsn; + origin.origin_timestamp = replication_origin_timestamp; + + rdata[lastrdata].next = &(rdata[4]); + rdata[4].data = (char *) &origin; + rdata[4].len = sizeof(xl_xact_origin); + rdata[4].buffer = InvalidBuffer; + lastrdata = 4; + } rdata[lastrdata].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); @@ -1134,13 +1154,19 @@ RecordTransactionCommit(void) } } + /* record plain commit ts if not replaying remote actions */ + if (guc_replication_origin_id == InvalidRepNodeId) + replication_origin_timestamp = xactStopTimestamp; + /* * We don't need to log the commit timestamp separately since the commit * record logged above has all the necessary action to set the timestamp * again. */ TransactionTreeSetCommitTimestamp(xid, nchildren, children, - xactStopTimestamp, 0, false); + replication_origin_timestamp, + guc_replication_origin_id, + false); /* * Check if we want to commit asynchronously. We can allow the XLOG flush @@ -1222,9 +1248,11 @@ RecordTransactionCommit(void) if (wrote_xlog) SyncRepWaitForLSN(XactLastRecEnd); + /* remember end of last commit record */ + XactLastCommitEnd = XactLastRecEnd; + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; - cleanup: /* Clean up local data */ if (rels) @@ -4593,10 +4621,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, SharedInvalidationMessage *inval_msgs, int nmsgs, RelFileNode *xnodes, int nrels, Oid dbId, Oid tsId, - uint32 xinfo) + uint32 xinfo, + xl_xact_origin *origin) { TransactionId max_xid; int i; + RepNodeId origin_node_id = InvalidRepNodeId; max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); @@ -4616,9 +4646,29 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, LWLockRelease(XidGenLock); } + Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL)); + + if (xinfo & XACT_CONTAINS_ORIGIN) + { + origin_node_id = origin->origin_node_id; + commit_time = origin->origin_timestamp; + } + /* Set the transaction commit time */ TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids, - commit_time, 0, false); + commit_time, + origin_node_id, false); + + if (xinfo & XACT_CONTAINS_ORIGIN) + { + elog(LOG, "restoring origin of node %u to %X/%X", + origin->origin_node_id, + (uint32)(origin->origin_lsn >> 32), + (uint32)origin->origin_lsn); + AdvanceReplicationIdentifier(origin->origin_node_id, + origin->origin_lsn, + lsn); + } if (standbyState == STANDBY_DISABLED) { @@ -4734,19 +4784,25 @@ xact_redo_commit(xl_xact_commit *xlrec, { TransactionId *subxacts; SharedInvalidationMessage *inval_msgs; - + xl_xact_origin *origin = NULL; /* subxid array follows relfilenodes */ subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); /* invalidation messages array follows subxids */ inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + { + origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]); + } + xact_redo_commit_internal(xid, lsn, xlrec->xact_time, subxacts, xlrec->nsubxacts, inval_msgs, xlrec->nmsgs, xlrec->xnodes, xlrec->nrels, xlrec->dbId, xlrec->tsId, - xlrec->xinfo); + xlrec->xinfo, + origin); } /* @@ -4762,7 +4818,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec, NULL, 0, /* relfilenodes */ InvalidOid, /* dbId */ InvalidOid, /* tsId */ - 0); /* xinfo */ + 0, /* xinfo */ + NULL /* origin */); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50feb0ff75..1e3d347612 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -43,6 +43,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/logical.h" +#include "replication/replication_identifier.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -276,6 +277,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; +XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; + /* * RedoRecPtr is this backend's local copy of the REDO record pointer * (which is almost but not quite the same as a pointer to the most recent @@ -939,6 +942,7 @@ begin:; rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; + rechdr->xl_origin_id = guc_replication_origin_id; hdr_rdt.next = rdata; hdr_rdt.data = (char *) rechdr; @@ -5216,6 +5220,11 @@ StartupXLOG(void) */ StartupLogicalReplication(checkPoint.redo); + /* + * Recover knowledge about replay progress of known replication partners. + */ + StartupReplicationIdentifier(checkPoint.redo); + /* * Initialize unlogged LSN. On a clean shutdown, it's restored from the * control file. On recovery, all unlogged relations are blown away, so @@ -6886,7 +6895,7 @@ CreateCheckPoint(int flags) XLogRecPtr curInsert; INSERT_RECPTR(curInsert, Insert, Insert->curridx); - if (curInsert == ControlFile->checkPoint + + if (curInsert == ControlFile->checkPoint + MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && ControlFile->checkPoint == ControlFile->checkPointCopy.redo) { @@ -7267,6 +7276,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointPredicate(); CheckPointRelationMap(); CheckPointBuffers(flags); /* performs all required fsyncs */ + CheckPointReplicationIdentifier(checkPointRedo); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); } diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index c4d3f3c1dc..c5801d8040 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \ pg_ts_parser.h pg_ts_template.h pg_extension.h \ pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \ - pg_foreign_table.h \ + pg_foreign_table.h pg_replication_identifier.h \ pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \ toasting.h indexing.h \ ) diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index 967182b541..98b9861e14 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -32,6 +32,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_pltemplate.h" #include "catalog/pg_db_role_setting.h" +#include "catalog/pg_replication_identifier.h" #include "catalog/pg_shdepend.h" #include "catalog/pg_shdescription.h" #include "catalog/pg_shseclabel.h" @@ -245,7 +246,8 @@ IsSharedRelation(Oid relationId) relationId == SharedDependRelationId || relationId == SharedSecLabelRelationId || relationId == TableSpaceRelationId || - relationId == DbRoleSettingRelationId) + relationId == DbRoleSettingRelationId || + relationId == ReplicationIdentifierRelationId) return true; /* These are their indexes (see indexing.h) */ if (relationId == AuthIdRolnameIndexId || @@ -261,7 +263,9 @@ IsSharedRelation(Oid relationId) relationId == SharedSecLabelObjectIndexId || relationId == TablespaceOidIndexId || relationId == TablespaceNameIndexId || - relationId == DbRoleSettingDatidRolidIndexId) + relationId == DbRoleSettingDatidRolidIndexId || + relationId == ReplicationLocalIdIndex || + relationId == ReplicationRemoteIndex) return true; /* These are their toast tables and toast indexes (see toasting.h) */ if (relationId == PgShdescriptionToastTable || diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 6fae2781ca..f24dbbe297 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -14,7 +14,8 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) -OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o +OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \ + snapbuild.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c65c266668..fa6871d4c2 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -298,7 +298,8 @@ DecodeCommit(LogicalDecodingContext * ctx, XLogRecordBuffer * buf, TransactionId } /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr); + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, + buf->record.xl_origin_id); } static void @@ -330,6 +331,7 @@ DecodeInsert(ReorderBuffer * reorder, XLogRecordBuffer * buf) change = ReorderBufferGetChange(reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = r->xl_origin_id; memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -362,6 +364,7 @@ DecodeUpdate(ReorderBuffer * reorder, XLogRecordBuffer * buf) change = ReorderBufferGetChange(reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = r->xl_origin_id; memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode)); data = (char *) &xlhdr->header; @@ -418,6 +421,7 @@ DecodeDelete(ReorderBuffer * reorder, XLogRecordBuffer * buf) change = ReorderBufferGetChange(reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = r->xl_origin_id; memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode)); @@ -469,6 +473,8 @@ DecodeMultiInsert(ReorderBuffer * reorder, XLogRecordBuffer * buf) change = ReorderBufferGetChange(reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = r->xl_origin_id; + memcpy(&change->relnode, &xlrec->node, sizeof(RelFileNode)); /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 53b2dbc9b7..2606117898 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -51,8 +51,12 @@ LogicalDecodingCtlData *LogicalDecodingCtl = NULL; /* My slot for logical rep in the shared memory array */ LogicalDecodingSlot *MyLogicalDecodingSlot = NULL; -/* user settable parameters */ int max_logical_slots = 0; /* the maximum number of logical slots */ +RepNodeId guc_replication_node_id = InvalidRepNodeId; /* local node id */ +RepNodeId guc_replication_origin_id = InvalidRepNodeId; /* assumed identity */ + +XLogRecPtr replication_origin_lsn; +TimestampTz replication_origin_timestamp; static void LogicalSlotKill(int code, Datum arg); @@ -600,6 +604,9 @@ StartupLogicalReplication(XLogRecPtr checkPointRedo) if (strcmp(logical_de->d_name, "snapshots") == 0) continue; + if (strcmp(logical_de->d_name, "checkpoints") == 0) + continue; + /* we crashed while a slot was being setup or deleted, clean up */ if (strcmp(logical_de->d_name, "new") == 0 || strcmp(logical_de->d_name, "old") == 0) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1d7dff5ffa..14a06f5e5e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1187,7 +1187,7 @@ ReorderBufferFreeSnap(ReorderBuffer * buffer, Snapshot snap) * assigned via ReorderBufferCommitChild. */ void -ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn) +ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn, RepNodeId origin) { ReorderBufferTXN *txn; ReorderBufferIterTXNState *iterstate = NULL; @@ -1204,6 +1204,7 @@ ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn) return; txn->last_lsn = lsn; + txn->origin_id = origin; /* serialize the last bunch of changes if we need start earlier anyway */ if (txn->nentries_mem != txn->nentries) diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c new file mode 100644 index 0000000000..0fbbbb204b --- /dev/null +++ b/src/backend/replication/logical/replication_identifier.c @@ -0,0 +1,619 @@ +/*------------------------------------------------------------------------- + * + * replication_identifier.c + * Logical Replication Identifier and progress persistency support + * + * Copyright (c) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/replication_identifier.c + * + */ + +#include "postgres.h" + +#include + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "replication/replication_identifier.h" +#include "replication/logical.h" +#include "storage/fd.h" +#include "storage/copydir.h" +#include "utils/syscache.h" +#include "utils/rel.h" + +typedef struct ReplicationState +{ + RepNodeId local_identifier; + + /* + * Latest commit from the remote side. + */ + XLogRecPtr remote_lsn; + + /* + * Remember the local lsn of the commit record so we can XLogFlush() to it + * during a checkpoint so we know the commit record actually is safe on + * disk. + */ + XLogRecPtr local_lsn; +} ReplicationState; + +/* + * Base address into array of replication states of size max_logical_slots. + */ +static ReplicationState *ReplicationStates; + +/* + * Local ReplicationState so we don't have to search ReplicationStates for the + * backends current RepNodeId. + */ +static ReplicationState *local_replication_state = NULL; + +/*RSTATE */ +#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE + +#ifndef UINT16_MAX +#define UINT16_MAX ((1<<16) - 1) +#else +#if UINT16_MAX != ((1<<16) - 1) +#error "uh, wrong UINT16_MAX?" +#endif +#endif + +/* + * Check for a persistent repication identifier identified by remotesysid, + * remotetli, remotedb, riname, rilocaldb. + * + * Returns InvalidOid. + */ +RepNodeId +GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb, + Name riname, Oid rilocaldb) +{ + Oid riident = InvalidOid; + HeapTuple tuple; + Form_pg_replication_identifier ident; + NameData sysid; + + sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli); + + tuple = SearchSysCache4(REPLIDREMOTE, + NameGetDatum(&sysid), + ObjectIdGetDatum(remotedb), + ObjectIdGetDatum(rilocaldb), + NameGetDatum(riname)); + if (HeapTupleIsValid(tuple)) + { + ident = (Form_pg_replication_identifier)GETSTRUCT(tuple); + riident = ident->riident; + ReleaseSysCache(tuple); + } + return riident; +} + +/* + * Create a persistent replication identifier. + * + * Needs to be called in a transaction and doesn't call + * CommandCounterIncrement(). + */ +RepNodeId +CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb, + Name riname, Oid rilocaldb) +{ + Oid riident; + HeapTuple tuple = NULL; + NameData sysid; + Relation rel; + + Assert(IsTransactionState()); + + sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli); + + /* lock table against modifications */ + rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock); + + /* XXX: should we start at FirstNormalObjectId ? */ + for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++) + { + bool nulls[Natts_pg_replication_identifier]; + Datum values[Natts_pg_replication_identifier]; + + tuple = GetReplicationInfoByIdentifier(riident); + + if (tuple != NULL) + { + ReleaseSysCache(tuple); + continue; + } + /* ok, found an unused riident */ + + memset(&nulls, 0, sizeof(nulls)); + + values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident); + values[Anum_pg_replication_riremotesysid - 1] = NameGetDatum(&sysid); + values[Anum_pg_replication_rilocaldb - 1] = ObjectIdGetDatum(rilocaldb); + values[Anum_pg_replication_riremotedb - 1] = ObjectIdGetDatum(remotedb); + values[Anum_pg_replication_riname - 1] = NameGetDatum(riname); + + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + simple_heap_insert(rel, tuple); + CatalogUpdateIndexes(rel, tuple); + CommandCounterIncrement(); + break; + } + + /* + * only release at end of transaction, so we don't have to worry about race + * conditions with other transactions trying to insert a new + * identifier. Acquiring a new identifier should be a fairly infrequent + * thing, so this seems fine. + */ + heap_close(rel, NoLock); + + if (tuple == NULL) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("no free replication id could be found"))); + + return riident; +} + +/* + * Lookup pg_replication_identifier tuple via its riident. + * + * The result needs to be ReleaseSysCache'ed + */ +HeapTuple +GetReplicationInfoByIdentifier(RepNodeId riident) +{ + HeapTuple tuple; + + Assert(OidIsValid((Oid) riident)); + Assert(riident < UINT16_MAX); + tuple = SearchSysCache1(REPLIDIDENT, + ObjectIdGetDatum((Oid) riident)); + return tuple; +} + +Size +ReplicationIdentifierShmemSize(void) +{ + Size size = 0; + + /* + * FIXME: max_logical_slots is the wrong thing to use here, here we keep + * the replay state of *remote* transactions. + */ + if (max_logical_slots == 0) + return size; + + size = add_size(size, + mul_size(max_logical_slots, sizeof(ReplicationState))); + return size; +} + +void +ReplicationIdentifierShmemInit(void) +{ + bool found; + + if (max_logical_slots == 0) + return; + + ReplicationStates = (ReplicationState *) + ShmemInitStruct("ReplicationIdentifierState", + ReplicationIdentifierShmemSize(), + &found); + + if (!found) + { + MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize()); + } +} + +/* --------------------------------------------------------------------------- + * Perform a checkpoint of replication identifier's progress with respect to + * the replayed remote_lsn. Make sure that all transactions we refer to in the + * checkpoint (local_lsn) are actually on-disk. This might not yet be the case + * if the transactions were originally committed asynchronously. + * + * We store checkpoints in the following format: + * +-------+-------------------------+-------------------------+-----+ + * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF + * +-------+-------------------------+-------------------------+-----+ + * + * So its just the magic, followed by the statically sized + * ReplicationStates. Note that the maximum number of ReplicationStates is + * determined by max_logical_slots. + * + * XXX: This doesn't yet work on a standby in contrast to a master doing crash + * recovery, since the checkpoint data file won't be available there. Thats + * fine for now since a standby can't perform replay on its own. + * --------------------------------------------------------------------------- + */ +void +CheckPointReplicationIdentifier(XLogRecPtr ckpt) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + int fd; + int tmpfd; + int i; + uint32 magic = REPLICATION_STATE_MAGIC; + + if (max_logical_slots == 0) + return; + + elog(LOG, "doing replication identifier checkpoint"); + + sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt", + (uint32)(ckpt >> 32), (uint32)ckpt); + sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp", + (uint32)(ckpt >> 32), (uint32)ckpt); + + /* check whether file already exists */ + fd = OpenTransientFile(path, + O_RDONLY | PG_BINARY, + 0); + + /* usual case, no checkpoint performed yet */ + if (fd < 0 && errno == ENOENT) + ; + else if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not check replication state checkpoint \"%s\": %m", + path))); + /* already checkpointed before crash during a checkpoint or so */ + else + { + CloseTransientFile(fd); + return; + } + + /* make sure no old temp file is remaining */ + if (unlink(tmppath) < 0 && errno != ENOENT) + ereport(PANIC, (errmsg("failed while unlinking %s", path))); + + /* + * no other backend can perform this at the same time, we're protected by + * CheckpointLock. + */ + tmpfd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (tmpfd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not create replication identifier checkpoint \"%s\": %m", + tmppath))); + + /* write magic */ + if ((write(tmpfd, &magic, sizeof(magic))) != + sizeof(magic)) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write replication identifier checkpoint \"%s\": %m", + tmppath))); + } + + /* write actual data */ + for (i = 0; i < max_logical_slots; i++) + { + ReplicationState local_state; + + if (ReplicationStates[i].local_identifier == InvalidRepNodeId) + continue; + + local_state.local_identifier = ReplicationStates[i].local_identifier; + local_state.remote_lsn = ReplicationStates[i].remote_lsn; + local_state.local_lsn = InvalidXLogRecPtr; + + /* make sure we only write out a commit thats persistent */ + XLogFlush(ReplicationStates[i].local_lsn); + + + if ((write(tmpfd, &local_state, sizeof(ReplicationState))) != + sizeof(ReplicationState)) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write replication identifier checkpoint \"%s\": %m", + tmppath))); + } + } + + /* fsync the file */ + if (pg_fsync(tmpfd) != 0) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync replication identifier checkpoint \"%s\": %m", + tmppath))); + } + + CloseTransientFile(tmpfd); + + /* rename to permanent file, fsync file and directory */ + if (rename(tmppath, path) != 0) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m", + tmppath, path))); + } + + fsync_fname("pg_llog/checkpoints", true); + fsync_fname(path, false); +} + +/* + * Recover replication replay status from checkpoint data saved earlier by + * CheckPointReplicationIdentifier. + * + * This only needs to be called at startup and *not* during every checkpoint + * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All + * state thereafter can be recovered by looking at commit records. + */ +void +StartupReplicationIdentifier(XLogRecPtr ckpt) +{ + char path[MAXPGPATH]; + int fd; + int readBytes; + uint32 magic = REPLICATION_STATE_MAGIC; + int last_state = 0; + + /* don't want to overwrite already existing state */ +#ifdef USE_ASSERT_CHECKING + static bool already_started = false; + Assert(!already_started); + already_started = true; +#endif + + if (max_logical_slots == 0) + return; + + elog(LOG, "starting up replication identifier with ckpt at %X/%X", + (uint32)(ckpt >> 32), (uint32)ckpt); + + sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt", + (uint32)(ckpt >> 32), (uint32)ckpt); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + + /* + * might have had max_logical_slots == 0 last run, or we just brought up a + * standby. + */ + if (fd < 0 && errno == ENOENT) + return; + else if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open replication state checkpoint \"%s\": %m", + path))); + + /* verify magic, thats written even if nothing was active */ + readBytes = read(fd, &magic, sizeof(magic)); + if (readBytes != sizeof(magic)) + ereport(PANIC, (errmsg("could not read replication state checkpoint magic \"%s\": %m", + path))); + + if (magic != REPLICATION_STATE_MAGIC) + ereport(PANIC, (errmsg("replication checkpoint has wrong magic %u instead of %u", + magic, REPLICATION_STATE_MAGIC))); + + /* recover individual states, until there are no more to be found */ + while (true) + { + ReplicationState local_state; + readBytes = read(fd, &local_state, sizeof(local_state)); + + /* no further data */ + if (readBytes == 0) + break; + + if (readBytes != sizeof(local_state)) + { + int saved_errno = errno; + CloseTransientFile(fd); + errno = saved_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read replication checkpoint file \"%s\": %m, read %d of %zu", + path, readBytes, sizeof(local_state)))); + } + + if (last_state == max_logical_slots) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found, increase max_logical_slots"))); + + /* copy data shared memory */ + ReplicationStates[last_state++] = local_state; + + elog(LOG, "recovered replication state of %u to %X/%X", + local_state.local_identifier, + (uint32)(local_state.remote_lsn >> 32), + (uint32)local_state.remote_lsn); + } + + CloseTransientFile(fd); +} + +/* + * Tell the replication identifier machinery that a commit from 'node' that + * originated at the LSN remote_commit on the remote node was replayed + * successfully and that we don't need to do so again. In combination with + * setting up replication_origin_lsn and guc_replication_origin_id that ensures + * we won't loose knowledge about that after a crash if the the transaction had + * a persistent effect (think of asynchronous commits). + * + * local_commit needs to be a local LSN of the commit so that we can make sure + * uppon a checkpoint that enough WAL has been persisted to disk. + */ +void +AdvanceReplicationIdentifier(RepNodeId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit) +{ + int i; + int free_slot = -1; + ReplicationState *replication_state = NULL; + + /* + * XXX: should we restore into a hashtable and dump into shmem only after + * recovery finished? + */ + + /* check whether slot already exists */ + for (i = 0; i < max_logical_slots; i++) + { + /* remember where to insert if necessary */ + if (ReplicationStates[i].local_identifier == InvalidRepNodeId && + free_slot == -1) + { + free_slot = i; + continue; + } + + /* not our slot */ + if (ReplicationStates[i].local_identifier != node) + continue; + + /* ok, found slot */ + replication_state = &ReplicationStates[i]; + break; + } + + if (replication_state == NULL && free_slot == -1) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found for %u, increase max_logical_slots", + node))); + /* initialize new slot */ + else if (replication_state == NULL) + { + replication_state = &ReplicationStates[free_slot]; + Assert(replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(replication_state->local_lsn == InvalidXLogRecPtr); + replication_state->local_identifier = node; + } + + /* + * due to - harmless - race conditions during a checkpoint we could see + * values here that are older than the ones we already have in + * memory. Don't overwrite those. + */ + if (replication_state->remote_lsn < remote_commit) + replication_state->remote_lsn = remote_commit; + if (replication_state->local_lsn < local_commit) + replication_state->local_lsn = local_commit; +} + + +/* + * Setup a replication identifier in the shared memory struct if it doesn't + * already exists and cache access to the specific ReplicationSlot so the array + * doesn't have to be searched when calling + * AdvanceCachedReplicationIdentifier(). + * + * Obviously only such cached identifier can exist per process and the current + * cached value can only be set again after the prvious value is torn down with + * TeardownCachedReplicationIdentifier. + */ +void +SetupCachedReplicationIdentifier(RepNodeId node) +{ + int i; + int free_slot = -1; + + Assert(max_logical_slots != 0); + Assert(local_replication_state == NULL); + + /* + * Aearch for either an existing slot for that identifier or a free one we + * can use. + */ + for (i = 0; i < max_logical_slots; i++) + { + /* remember where to insert if necessary */ + if (ReplicationStates[i].local_identifier == InvalidRepNodeId && + free_slot == -1) + { + free_slot = i; + continue; + } + + /* not our slot */ + if (ReplicationStates[i].local_identifier != node) + continue; + + local_replication_state = &ReplicationStates[i]; + } + + + if (local_replication_state == NULL && free_slot == -1) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found for %u, increase max_logical_slots", + node))); + else if (local_replication_state == NULL) + { + local_replication_state = &ReplicationStates[free_slot]; + local_replication_state->local_identifier = node; + Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(local_replication_state->local_lsn == InvalidXLogRecPtr); + } +} + +/* + * Make currently cached replication identifier unavailable so a new one can be + * setup with SetupCachedReplicationIdentifier(). + * + * This function may only be called if a previous identifier was cached. + */ +void +TeardownCachedReplicationIdentifier(RepNodeId node) +{ + Assert(max_logical_slots != 0); + Assert(local_replication_state != NULL); + + local_replication_state = NULL; +} + +/* + * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached + * identifier. This is noticeably cheaper if you only ever work on a single + * replication identifier. + */ +void +AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit, + XLogRecPtr local_commit) +{ + Assert(local_replication_state != NULL); + if (local_replication_state->local_lsn < local_commit) + local_replication_state->local_lsn = local_commit; + if (local_replication_state->remote_lsn < remote_commit) + local_replication_state->remote_lsn = remote_commit; +} + +/* + * Ask the machinery about the point up to which we successfully replayed + * changes from a already setup & chaced replication identifier. + */ +XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void) +{ + Assert(local_replication_state != NULL); + return local_replication_state->remote_lsn; +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 0c2e55bcfe..78fd1dd8c4 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -30,6 +30,7 @@ #include "replication/logical.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/replication_identifier.h" #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, CheckpointerShmemSize()); size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, LogicalDecodingShmemSize()); + size = add_size(size, ReplicationIdentifierShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); @@ -233,6 +235,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) CheckpointerShmemInit(); AutoVacuumShmemInit(); LogicalDecodingShmemInit(); + ReplicationIdentifierShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index b5fe64f998..734a810019 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -47,6 +47,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_range.h" #include "catalog/pg_rewrite.h" +#include "catalog/pg_replication_identifier.h" #include "catalog/pg_statistic.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_ts_config.h" @@ -624,6 +625,28 @@ static const struct cachedesc cacheinfo[] = { }, 1024 }, + {ReplicationIdentifierRelationId, /* REPLIDIDENT */ + ReplicationLocalIdIndex, + 1, + { + Anum_pg_replication_riident, + 0, + 0, + 0 + }, + 16 + }, + {ReplicationIdentifierRelationId, /* REPLIDREMOTE */ + ReplicationRemoteIndex, + 4, + { + Anum_pg_replication_riremotesysid, + Anum_pg_replication_riremotedb, + Anum_pg_replication_rilocaldb, + Anum_pg_replication_riname + }, + 16 + }, {RewriteRelationId, /* RULERELNAME */ RewriteRelRulenameIndexId, 2, diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f34358334b..9a19a35437 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -199,6 +199,8 @@ static bool check_application_name(char **newval, void **extra, GucSource source static void assign_application_name(const char *newval, void *extra); static const char *show_unix_socket_permissions(void); static const char *show_log_file_mode(void); +static void assign_replication_node_id(int newval, void *extra); +static void assign_replication_origin_id(int newval, void *extra); static char *config_enum_get_options(struct config_enum * record, const char *prefix, const char *suffix, @@ -471,7 +473,8 @@ static int wal_block_size; static int wal_segment_size; static bool integer_datetimes; static int effective_io_concurrency; - +static int phony_replication_node_id; +static int phony_replication_origin_id; /* should be static, but commands/variable.c needs to get at this */ char *role_string; @@ -2091,6 +2094,26 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replication_node_id", PGC_POSTMASTER, REPLICATION_MASTER, + gettext_noop("node id for replication."), + NULL + }, + &phony_replication_node_id, + InvalidRepNodeId, InvalidRepNodeId, INT_MAX, + NULL, assign_replication_node_id, NULL + }, + + { + {"replication_origin_id", PGC_USERSET, REPLICATION_MASTER, + gettext_noop("current node id for replication."), + NULL + }, + &phony_replication_origin_id, + InvalidRepNodeId, InvalidRepNodeId, INT_MAX, + NULL, assign_replication_origin_id, NULL + }, + { {"commit_siblings", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the minimum concurrent open transactions before performing " @@ -8819,4 +8842,26 @@ show_log_file_mode(void) return buf; } +static void +assign_replication_node_id(int newval, void *extra) +{ + guc_replication_node_id = newval; + /* set default to local node */ + guc_replication_origin_id = newval; + phony_replication_origin_id = newval; +} + + +static void +assign_replication_origin_id(int newval, void *extra) +{ + /* + * FIXME: add error checking hook that check wal_level and + * replication_node_id. + */ + guc_replication_origin_id = newval; +} + + + #include "guc-file.c" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 66dfe853c0..a712260669 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -248,7 +248,7 @@ #wal_receiver_timeout = 60s # time that receiver waits for # communication from master # in milliseconds; 0 disables - +#replication_node_id = 0 #invalid node id #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index c6f3e9ace6..78c1784a4e 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -196,7 +196,8 @@ const char *subdirs[] = { "pg_stat", "pg_stat_tmp", "pg_llog", - "pg_llog/snapshots" + "pg_llog/snapshots", + "pg_llog/checkpoints" }; diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index 9c340b2db5..395c392866 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -55,6 +55,7 @@ #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "common/fe_memutils.h" +#include "replication/logical.h" extern int optind; extern char *optarg; @@ -967,6 +968,7 @@ WriteEmptyXLOG(void) record->xl_len = sizeof(CheckPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_rmid = RM_XLOG_ID; + record->xl_origin_id = InvalidRepNodeId; memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy, sizeof(CheckPoint)); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 835f6acbee..513ed38e29 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -146,6 +146,13 @@ typedef struct xl_xact_commit /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ } xl_xact_commit; +typedef struct xl_xact_origin +{ + XLogRecPtr origin_lsn; + RepNodeId origin_node_id; + TimestampTz origin_timestamp; +} xl_xact_origin; + #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) /* @@ -158,7 +165,7 @@ typedef struct xl_xact_commit */ #define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01 #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 - +#define XACT_CONTAINS_ORIGIN 0x04 /* Access macros for above flags */ #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) #define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index efda7620f3..97eb9c675c 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -48,6 +48,7 @@ typedef struct XLogRecord /* 2 bytes of padding here, initialize to zero */ XLogRecPtr xl_prev; /* ptr to previous record in log */ pg_crc32 xl_crc; /* CRC for this record */ + RepNodeId xl_origin_id; /* what node did originally cause this record to be written */ /* If MAXALIGN==8, there are 4 wasted bytes here */ @@ -177,6 +178,7 @@ typedef enum } RecoveryTargetType; extern XLogRecPtr XactLastRecEnd; +extern XLogRecPtr XactLastCommitEnd; extern bool reachedConsistency; diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index fa6497adad..0affcdc415 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -44,6 +44,12 @@ typedef uint64 XLogSegNo; */ typedef uint32 TimeLineID; +/* + * Denotes the node on which the action causing a wal record to be logged + * originated on. + */ +typedef uint16 RepNodeId; + /* * Because O_DIRECT bypasses the kernel buffers, and because we never * read those buffers except during crash recovery or if wal_level != minimal, diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index 2a3cd8215e..313fcda820 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -313,6 +313,12 @@ DECLARE_UNIQUE_INDEX(pg_extension_name_index, 3081, on pg_extension using btree( DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops)); #define RangeTypidIndexId 3542 +DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3459, on pg_replication_identifier using btree(riident oid_ops)); +#define ReplicationLocalIdIndex 3459 + +DECLARE_UNIQUE_INDEX(pg_replication_identifier_remote_index, 3460, on pg_replication_identifier using btree(riremotesysid name_ops, riremotedb oid_ops, rilocaldb oid_ops, riname name_ops)); +#define ReplicationRemoteIndex 3460 + /* last step of initialization script: build the indexes declared above */ BUILD_INDICES diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h new file mode 100644 index 0000000000..918f25fc26 --- /dev/null +++ b/src/include/catalog/pg_replication_identifier.h @@ -0,0 +1,92 @@ +/*------------------------------------------------------------------------- + * + * pg_replication_identifier.h + * + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_replication_identifier.h + * + * NOTES + * the genbki.pl script reads this file and generates .bki + * information from the DATA() statements. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_REPLICATION_IDENTIFIER_H +#define PG_REPLICATION_IDENTIFIER_H + +#include "catalog/genbki.h" +#include "access/xlogdefs.h" + +/* ---------------- + * pg_replication_identifier. cpp turns this into + * typedef struct FormData_pg_replication_identifier + * ---------------- + */ +#define ReplicationIdentifierRelationId 3458 + +CATALOG(pg_replication_identifier,3458) BKI_SHARED_RELATION BKI_WITHOUT_OIDS +{ + /* + * locally known identifier that gets included into wal. + * + * This should never leave the system. + * + * Needs to fit into a uint16, so we don't waste too much space. For this + * reason we don't use a normal Oid column here, since we need to handle + * allocation of new values manually. + */ + Oid riident; + + /* ---- + * remote system identifier, including tli, separated by a -. + * + * They are packed together for two reasons: + * a) we can't represent sysids as uint64 because there's no such type on + * sql level, so we need a fixed width string anyway. And a name already + * has enough space for that. + * b) syscaches can only have 4 keys, and were already at that with + * combined keys + * ---- + */ + NameData riremotesysid; + + /* local database */ + Oid rilocaldb; + + /* remote database */ + Oid riremotedb; + + /* optional name, zero length string */ + NameData riname; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ +#endif +} FormData_pg_replication_identifier; + +/* ---------------- + * Form_pg_extension corresponds to a pointer to a tuple with + * the format of pg_extension relation. + * ---------------- + */ +typedef FormData_pg_replication_identifier *Form_pg_replication_identifier; + +/* ---------------- + * compiler constants for pg_replication_identifier + * ---------------- + */ + +#define Natts_pg_replication_identifier 5 +#define Anum_pg_replication_riident 1 +#define Anum_pg_replication_riremotesysid 2 +#define Anum_pg_replication_rilocaldb 3 +#define Anum_pg_replication_riremotedb 4 +#define Anum_pg_replication_riname 5 + +/* ---------------- + * pg_replication_identifier has no initial contents + * ---------------- + */ + +#endif /* PG_REPLICTION_IDENTIFIER_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 246c7e7392..288efc886e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -161,6 +161,13 @@ typedef struct LogicalDecodingContext /* GUCs */ extern PGDLLIMPORT int max_logical_slots; +#define InvalidRepNodeId 0 +extern PGDLLIMPORT RepNodeId guc_replication_node_id; +extern PGDLLIMPORT RepNodeId guc_replication_origin_id; +extern PGDLLIMPORT XLogRecPtr replication_origin_lsn; +extern PGDLLIMPORT TimestampTz replication_origin_timestamp; + + extern Size LogicalDecodingShmemSize(void); extern void LogicalDecodingShmemInit(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b796b74c9..f91511b1da 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -62,6 +62,8 @@ typedef struct ReorderBufferChange int action_internal; }; + RepNodeId origin_id; + /* * Context data for the change, which part of the union is valid depends * on action/action_internal. @@ -114,6 +116,10 @@ typedef struct ReorderBufferTXN * LSN of the first wal record with knowledge about this xid. */ XLogRecPtr lsn; + + /* + * LSN of the commit record + */ XLogRecPtr last_lsn; /* @@ -123,6 +129,9 @@ typedef struct ReorderBufferTXN */ XLogRecPtr restart_decoding_lsn; + /* origin of the change that caused this transaction */ + RepNodeId origin_id; + /* did the TX have catalog changes */ bool does_timetravel; @@ -292,7 +301,7 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferAddChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); -void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RepNodeId origin_id); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn); void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h new file mode 100644 index 0000000000..97750d926b --- /dev/null +++ b/src/include/replication/replication_identifier.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * replication_identifier.h + * XXX + * + * Copyright (c) 2013, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef REPLICATION_IDENTIFIER_H +#define REPLICATION_IDENTIFIER_H + +#include "catalog/pg_replication_identifier.h" +#include "replication/logical.h" + +extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, + Oid remotedb, Name riname, + Oid rilocaldb); +extern RepNodeId CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, + Oid remotedb, Name riname, + Oid rilocaldb); + +extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident); + +extern Size ReplicationIdentifierShmemSize(void); +extern void ReplicationIdentifierShmemInit(void); + +extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt); +extern void StartupReplicationIdentifier(XLogRecPtr ckpt); + +extern void SetupCachedReplicationIdentifier(RepNodeId node); +extern void TeardownCachedReplicationIdentifier(RepNodeId node); + +extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit, + XLogRecPtr local_commit); + +extern void AdvanceReplicationIdentifier(RepNodeId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit); + +extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void); +#endif diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index 2a149052bf..8bbc2ed4fd 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -78,6 +78,8 @@ enum SysCacheIdentifier RELFILENODE, RELNAMENSP, RELOID, + REPLIDIDENT, + REPLIDREMOTE, RULERELNAME, STATRELATTINH, TABLESPACEOID, diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 432d39a491..907c804078 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -9,164 +9,165 @@ SELECT relname, relhasindex FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = relnamespace WHERE relkind = 'r' AND (nspname ~ '^pg_temp_') IS NOT TRUE ORDER BY relname; - relname | relhasindex --------------------------+------------- - a | f - a_star | f - abstime_tbl | f - aggtest | f - array_index_op_test | t - array_op_test | f - b | f - b_star | f - box_tbl | f - bprime | f - bt_f8_heap | t - bt_i4_heap | t - bt_name_heap | t - bt_txt_heap | t - c | f - c_star | f - char_tbl | f - check2_tbl | f - check_tbl | f - circle_tbl | t - city | f - copy_tbl | f - d | f - d_star | f - date_tbl | f - default_tbl | f - defaultexpr_tbl | f - dept | f - dupindexcols | t - e_star | f - emp | f - equipment_r | f - f_star | f - fast_emp4000 | t - float4_tbl | f - float8_tbl | f - func_index_heap | t - hash_f8_heap | t - hash_i4_heap | t - hash_name_heap | t - hash_txt_heap | t - hobbies_r | f - ihighway | t - inet_tbl | f - inhf | f - inhx | t - insert_tbl | f - int2_tbl | f - int4_tbl | f - int8_tbl | f - interval_tbl | f - iportaltest | f - kd_point_tbl | t - log_table | f - lseg_tbl | f - main_table | f - money_data | f - num_data | f - num_exp_add | t - num_exp_div | t - num_exp_ln | t - num_exp_log10 | t - num_exp_mul | t - num_exp_power_10_ln | t - num_exp_sqrt | t - num_exp_sub | t - num_input_test | f - num_result | f - onek | t - onek2 | t - path_tbl | f - person | f - pg_aggregate | t - pg_am | t - pg_amop | t - pg_amproc | t - pg_attrdef | t - pg_attribute | t - pg_auth_members | t - pg_authid | t - pg_cast | t - pg_class | t - pg_collation | t - pg_constraint | t - pg_conversion | t - pg_database | t - pg_db_role_setting | t - pg_default_acl | t - pg_depend | t - pg_description | t - pg_enum | t - pg_event_trigger | t - pg_extension | t - pg_foreign_data_wrapper | t - pg_foreign_server | t - pg_foreign_table | t - pg_index | t - pg_inherits | t - pg_language | t - pg_largeobject | t - pg_largeobject_metadata | t - pg_namespace | t - pg_opclass | t - pg_operator | t - pg_opfamily | t - pg_pltemplate | t - pg_proc | t - pg_range | t - pg_rewrite | t - pg_seclabel | t - pg_shdepend | t - pg_shdescription | t - pg_shseclabel | t - pg_statistic | t - pg_tablespace | t - pg_trigger | t - pg_ts_config | t - pg_ts_config_map | t - pg_ts_dict | t - pg_ts_parser | t - pg_ts_template | t - pg_type | t - pg_user_mapping | t - point_tbl | t - polygon_tbl | t - quad_point_tbl | t - radix_text_tbl | t - ramp | f - real_city | f - reltime_tbl | f - road | t - shighway | t - slow_emp4000 | f - sql_features | f - sql_implementation_info | f - sql_languages | f - sql_packages | f - sql_parts | f - sql_sizing | f - sql_sizing_profiles | f - stud_emp | f - student | f - tenk1 | t - tenk2 | t - test_range_excl | t - test_range_gist | t - test_range_spgist | t - test_tsvector | f - text_tbl | f - time_tbl | f - timestamp_tbl | f - timestamptz_tbl | f - timetz_tbl | f - tinterval_tbl | f - varchar_tbl | f -(155 rows) + relname | relhasindex +---------------------------+------------- + a | f + a_star | f + abstime_tbl | f + aggtest | f + array_index_op_test | t + array_op_test | f + b | f + b_star | f + box_tbl | f + bprime | f + bt_f8_heap | t + bt_i4_heap | t + bt_name_heap | t + bt_txt_heap | t + c | f + c_star | f + char_tbl | f + check2_tbl | f + check_tbl | f + circle_tbl | t + city | f + copy_tbl | f + d | f + d_star | f + date_tbl | f + default_tbl | f + defaultexpr_tbl | f + dept | f + dupindexcols | t + e_star | f + emp | f + equipment_r | f + f_star | f + fast_emp4000 | t + float4_tbl | f + float8_tbl | f + func_index_heap | t + hash_f8_heap | t + hash_i4_heap | t + hash_name_heap | t + hash_txt_heap | t + hobbies_r | f + ihighway | t + inet_tbl | f + inhf | f + inhx | t + insert_tbl | f + int2_tbl | f + int4_tbl | f + int8_tbl | f + interval_tbl | f + iportaltest | f + kd_point_tbl | t + log_table | f + lseg_tbl | f + main_table | f + money_data | f + num_data | f + num_exp_add | t + num_exp_div | t + num_exp_ln | t + num_exp_log10 | t + num_exp_mul | t + num_exp_power_10_ln | t + num_exp_sqrt | t + num_exp_sub | t + num_input_test | f + num_result | f + onek | t + onek2 | t + path_tbl | f + person | f + pg_aggregate | t + pg_am | t + pg_amop | t + pg_amproc | t + pg_attrdef | t + pg_attribute | t + pg_auth_members | t + pg_authid | t + pg_cast | t + pg_class | t + pg_collation | t + pg_constraint | t + pg_conversion | t + pg_database | t + pg_db_role_setting | t + pg_default_acl | t + pg_depend | t + pg_description | t + pg_enum | t + pg_event_trigger | t + pg_extension | t + pg_foreign_data_wrapper | t + pg_foreign_server | t + pg_foreign_table | t + pg_index | t + pg_inherits | t + pg_language | t + pg_largeobject | t + pg_largeobject_metadata | t + pg_namespace | t + pg_opclass | t + pg_operator | t + pg_opfamily | t + pg_pltemplate | t + pg_proc | t + pg_range | t + pg_replication_identifier | t + pg_rewrite | t + pg_seclabel | t + pg_shdepend | t + pg_shdescription | t + pg_shseclabel | t + pg_statistic | t + pg_tablespace | t + pg_trigger | t + pg_ts_config | t + pg_ts_config_map | t + pg_ts_dict | t + pg_ts_parser | t + pg_ts_template | t + pg_type | t + pg_user_mapping | t + point_tbl | t + polygon_tbl | t + quad_point_tbl | t + radix_text_tbl | t + ramp | f + real_city | f + reltime_tbl | f + road | t + shighway | t + slow_emp4000 | f + sql_features | f + sql_implementation_info | f + sql_languages | f + sql_packages | f + sql_parts | f + sql_sizing | f + sql_sizing_profiles | f + stud_emp | f + student | f + tenk1 | t + tenk2 | t + test_range_excl | t + test_range_gist | t + test_range_spgist | t + test_tsvector | f + text_tbl | f + time_tbl | f + timestamp_tbl | f + timestamptz_tbl | f + timetz_tbl | f + tinterval_tbl | f + varchar_tbl | f +(156 rows) -- -- another sanity check: every system catalog that has OIDs should have -- 2.39.5