{
int i;
TransactionId *subxacts;
+ SharedInvalidationMessage *msgs;
subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
}
if (xlrec->nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+ appendStringInfo(buf, " origin %u, lsn %X/%X",
+ origin->origin_node_id,
+ (uint32)(origin->origin_lsn >> 32),
+ (uint32)origin->origin_lsn);
+ }
+
}
static void
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit
+ || wal_level >= WAL_LEVEL_LOGICAL)
{
- XLogRecData rdata[4];
+ XLogRecData rdata[5];
int lastrdata = 0;
xl_xact_commit xlrec;
+ xl_xact_origin origin;
/*
* Set flags required for recovery processing of commits.
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
+ /* dump transaction origin information */
+ if (guc_replication_origin_id != InvalidRepNodeId)
+ {
+ Assert(replication_origin_lsn != InvalidXLogRecPtr);
+ elog(LOG, "logging origin id");
+ xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+ origin.origin_node_id = guc_replication_origin_id;
+ origin.origin_lsn = replication_origin_lsn;
+ origin.origin_timestamp = replication_origin_timestamp;
+
+ rdata[lastrdata].next = &(rdata[4]);
+ rdata[4].data = (char *) &origin;
+ rdata[4].len = sizeof(xl_xact_origin);
+ rdata[4].buffer = InvalidBuffer;
+ lastrdata = 4;
+ }
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
}
}
+ /* record plain commit ts if not replaying remote actions */
+ if (guc_replication_origin_id == InvalidRepNodeId)
+ replication_origin_timestamp = xactStopTimestamp;
+
/*
* We don't need to log the commit timestamp separately since the commit
* record logged above has all the necessary action to set the timestamp
* again.
*/
TransactionTreeSetCommitTimestamp(xid, nchildren, children,
- xactStopTimestamp, 0, false);
+ replication_origin_timestamp,
+ guc_replication_origin_id,
+ false);
/*
* Check if we want to commit asynchronously. We can allow the XLOG flush
if (wrote_xlog)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
SharedInvalidationMessage *inval_msgs, int nmsgs,
RelFileNode *xnodes, int nrels,
Oid dbId, Oid tsId,
- uint32 xinfo)
+ uint32 xinfo,
+ xl_xact_origin *origin)
{
TransactionId max_xid;
int i;
+ RepNodeId origin_node_id = InvalidRepNodeId;
max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
LWLockRelease(XidGenLock);
}
+ Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ origin_node_id = origin->origin_node_id;
+ commit_time = origin->origin_timestamp;
+ }
+
/* Set the transaction commit time */
TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
- commit_time, 0, false);
+ commit_time,
+ origin_node_id, false);
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ elog(LOG, "restoring origin of node %u to %X/%X",
+ origin->origin_node_id,
+ (uint32)(origin->origin_lsn >> 32),
+ (uint32)origin->origin_lsn);
+ AdvanceReplicationIdentifier(origin->origin_node_id,
+ origin->origin_lsn,
+ lsn);
+ }
if (standbyState == STANDBY_DISABLED)
{
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
-
+ xl_xact_origin *origin = NULL;
/* subxid array follows relfilenodes */
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+ }
+
xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
- xlrec->xinfo);
+ xlrec->xinfo,
+ origin);
}
/*
NULL, 0, /* relfilenodes */
InvalidOid, /* dbId */
InvalidOid, /* tsId */
- 0); /* xinfo */
+ 0, /* xinfo */
+ NULL /* origin */);
}
/*
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/logical.h"
+#include "replication/replication_identifier.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
+XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
+
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
* (which is almost but not quite the same as a pointer to the most recent
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
+ rechdr->xl_origin_id = guc_replication_origin_id;
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
*/
StartupLogicalReplication(checkPoint.redo);
+ /*
+ * Recover knowledge about replay progress of known replication partners.
+ */
+ StartupReplicationIdentifier(checkPoint.redo);
+
/*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
XLogRecPtr curInsert;
INSERT_RECPTR(curInsert, Insert, Insert->curridx);
- if (curInsert == ControlFile->checkPoint +
+ if (curInsert == ControlFile->checkPoint +
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
CheckPointPredicate();
CheckPointRelationMap();
CheckPointBuffers(flags); /* performs all required fsyncs */
+ CheckPointReplicationIdentifier(checkPointRedo);
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
- pg_foreign_table.h \
+ pg_foreign_table.h pg_replication_identifier.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
toasting.h indexing.h \
)
#include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId ||
- relationId == DbRoleSettingRelationId)
+ relationId == DbRoleSettingRelationId ||
+ relationId == ReplicationIdentifierRelationId)
return true;
/* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId ||
relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId ||
- relationId == DbRoleSettingDatidRolidIndexId)
+ relationId == DbRoleSettingDatidRolidIndexId ||
+ relationId == ReplicationLocalIdIndex ||
+ relationId == ReplicationRemoteIndex)
return true;
/* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable ||
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+ snapbuild.o
include $(top_srcdir)/src/backend/common.mk
}
/* replay actions of all transaction + subtransactions in order */
- ReorderBufferCommit(ctx->reorder, xid, buf->origptr);
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr,
+ buf->record.xl_origin_id);
}
static void
change = ReorderBufferGetChange(reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
change = ReorderBufferGetChange(reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
data = (char *) &xlhdr->header;
change = ReorderBufferGetChange(reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
change = ReorderBufferGetChange(reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
+
memcpy(&change->relnode, &xlrec->node, sizeof(RelFileNode));
/*
/* My slot for logical rep in the shared memory array */
LogicalDecodingSlot *MyLogicalDecodingSlot = NULL;
-/* user settable parameters */
int max_logical_slots = 0; /* the maximum number of logical slots */
+RepNodeId guc_replication_node_id = InvalidRepNodeId; /* local node id */
+RepNodeId guc_replication_origin_id = InvalidRepNodeId; /* assumed identity */
+
+XLogRecPtr replication_origin_lsn;
+TimestampTz replication_origin_timestamp;
static void LogicalSlotKill(int code, Datum arg);
if (strcmp(logical_de->d_name, "snapshots") == 0)
continue;
+ if (strcmp(logical_de->d_name, "checkpoints") == 0)
+ continue;
+
/* we crashed while a slot was being setup or deleted, clean up */
if (strcmp(logical_de->d_name, "new") == 0 ||
strcmp(logical_de->d_name, "old") == 0)
* assigned via ReorderBufferCommitChild.
*/
void
-ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn)
+ReorderBufferCommit(ReorderBuffer * buffer, TransactionId xid, XLogRecPtr lsn, RepNodeId origin)
{
ReorderBufferTXN *txn;
ReorderBufferIterTXNState *iterstate = NULL;
return;
txn->last_lsn = lsn;
+ txn->origin_id = origin;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ * Logical Replication Identifier and progress persistency support
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+#include "storage/fd.h"
+#include "storage/copydir.h"
+#include "utils/syscache.h"
+#include "utils/rel.h"
+
+typedef struct ReplicationState
+{
+ RepNodeId local_identifier;
+
+ /*
+ * Latest commit from the remote side.
+ */
+ XLogRecPtr remote_lsn;
+
+ /*
+ * Remember the local lsn of the commit record so we can XLogFlush() to it
+ * during a checkpoint so we know the commit record actually is safe on
+ * disk.
+ */
+ XLogRecPtr local_lsn;
+} ReplicationState;
+
+/*
+ * Base address into array of replication states of size max_logical_slots.
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Local ReplicationState so we don't have to search ReplicationStates for the
+ * backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/*RSTATE */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by remotesysid,
+ * remotetli, remotedb, riname, rilocaldb.
+ *
+ * Returns InvalidOid.
+ */
+RepNodeId
+GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+ Name riname, Oid rilocaldb)
+{
+ Oid riident = InvalidOid;
+ HeapTuple tuple;
+ Form_pg_replication_identifier ident;
+ NameData sysid;
+
+ sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+ tuple = SearchSysCache4(REPLIDREMOTE,
+ NameGetDatum(&sysid),
+ ObjectIdGetDatum(remotedb),
+ ObjectIdGetDatum(rilocaldb),
+ NameGetDatum(riname));
+ if (HeapTupleIsValid(tuple))
+ {
+ ident = (Form_pg_replication_identifier)GETSTRUCT(tuple);
+ riident = ident->riident;
+ ReleaseSysCache(tuple);
+ }
+ return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction and doesn't call
+ * CommandCounterIncrement().
+ */
+RepNodeId
+CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+ Name riname, Oid rilocaldb)
+{
+ Oid riident;
+ HeapTuple tuple = NULL;
+ NameData sysid;
+ Relation rel;
+
+ Assert(IsTransactionState());
+
+ sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+ /* lock table against modifications */
+ rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+ /* XXX: should we start at FirstNormalObjectId ? */
+ for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
+ {
+ bool nulls[Natts_pg_replication_identifier];
+ Datum values[Natts_pg_replication_identifier];
+
+ tuple = GetReplicationInfoByIdentifier(riident);
+
+ if (tuple != NULL)
+ {
+ ReleaseSysCache(tuple);
+ continue;
+ }
+ /* ok, found an unused riident */
+
+ memset(&nulls, 0, sizeof(nulls));
+
+ values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+ values[Anum_pg_replication_riremotesysid - 1] = NameGetDatum(&sysid);
+ values[Anum_pg_replication_rilocaldb - 1] = ObjectIdGetDatum(rilocaldb);
+ values[Anum_pg_replication_riremotedb - 1] = ObjectIdGetDatum(remotedb);
+ values[Anum_pg_replication_riname - 1] = NameGetDatum(riname);
+
+ tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+ simple_heap_insert(rel, tuple);
+ CatalogUpdateIndexes(rel, tuple);
+ CommandCounterIncrement();
+ break;
+ }
+
+ /*
+ * only release at end of transaction, so we don't have to worry about race
+ * conditions with other transactions trying to insert a new
+ * identifier. Acquiring a new identifier should be a fairly infrequent
+ * thing, so this seems fine.
+ */
+ heap_close(rel, NoLock);
+
+ if (tuple == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no free replication id could be found")));
+
+ return riident;
+}
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed
+ */
+HeapTuple
+GetReplicationInfoByIdentifier(RepNodeId riident)
+{
+ HeapTuple tuple;
+
+ Assert(OidIsValid((Oid) riident));
+ Assert(riident < UINT16_MAX);
+ tuple = SearchSysCache1(REPLIDIDENT,
+ ObjectIdGetDatum((Oid) riident));
+ return tuple;
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+ Size size = 0;
+
+ /*
+ * FIXME: max_logical_slots is the wrong thing to use here, here we keep
+ * the replay state of *remote* transactions.
+ */
+ if (max_logical_slots == 0)
+ return size;
+
+ size = add_size(size,
+ mul_size(max_logical_slots, sizeof(ReplicationState)));
+ return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+ bool found;
+
+ if (max_logical_slots == 0)
+ return;
+
+ ReplicationStates = (ReplicationState *)
+ ShmemInitStruct("ReplicationIdentifierState",
+ ReplicationIdentifierShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+ }
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+-------------------------+-------------------------+-----+
+ * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF
+ * +-------+-------------------------+-------------------------+-----+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStates. Note that the maximum number of ReplicationStates is
+ * determined by max_logical_slots.
+ *
+ * XXX: This doesn't yet work on a standby in contrast to a master doing crash
+ * recovery, since the checkpoint data file won't be available there. Thats
+ * fine for now since a standby can't perform replay on its own.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ int tmpfd;
+ int i;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+
+ if (max_logical_slots == 0)
+ return;
+
+ elog(LOG, "doing replication identifier checkpoint");
+
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+ sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ /* check whether file already exists */
+ fd = OpenTransientFile(path,
+ O_RDONLY | PG_BINARY,
+ 0);
+
+ /* usual case, no checkpoint performed yet */
+ if (fd < 0 && errno == ENOENT)
+ ;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not check replication state checkpoint \"%s\": %m",
+ path)));
+ /* already checkpointed before crash during a checkpoint or so */
+ else
+ {
+ CloseTransientFile(fd);
+ return;
+ }
+
+ /* make sure no old temp file is remaining */
+ if (unlink(tmppath) < 0 && errno != ENOENT)
+ ereport(PANIC, (errmsg("failed while unlinking %s", path)));
+
+ /*
+ * no other backend can perform this at the same time, we're protected by
+ * CheckpointLock.
+ */
+ tmpfd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (tmpfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+
+ /* write magic */
+ if ((write(tmpfd, &magic, sizeof(magic))) !=
+ sizeof(magic))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ /* write actual data */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ ReplicationState local_state;
+
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+ continue;
+
+ local_state.local_identifier = ReplicationStates[i].local_identifier;
+ local_state.remote_lsn = ReplicationStates[i].remote_lsn;
+ local_state.local_lsn = InvalidXLogRecPtr;
+
+ /* make sure we only write out a commit thats persistent */
+ XLogFlush(ReplicationStates[i].local_lsn);
+
+
+ if ((write(tmpfd, &local_state, sizeof(ReplicationState))) !=
+ sizeof(ReplicationState))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+ }
+
+ /* fsync the file */
+ if (pg_fsync(tmpfd) != 0)
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(tmpfd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ fsync_fname("pg_llog/checkpoints", true);
+ fsync_fname(path, false);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char path[MAXPGPATH];
+ int fd;
+ int readBytes;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ int last_state = 0;
+
+ /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+ static bool already_started = false;
+ Assert(!already_started);
+ already_started = true;
+#endif
+
+ if (max_logical_slots == 0)
+ return;
+
+ elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * might have had max_logical_slots == 0 last run, or we just brought up a
+ * standby.
+ */
+ if (fd < 0 && errno == ENOENT)
+ return;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open replication state checkpoint \"%s\": %m",
+ path)));
+
+ /* verify magic, thats written even if nothing was active */
+ readBytes = read(fd, &magic, sizeof(magic));
+ if (readBytes != sizeof(magic))
+ ereport(PANIC, (errmsg("could not read replication state checkpoint magic \"%s\": %m",
+ path)));
+
+ if (magic != REPLICATION_STATE_MAGIC)
+ ereport(PANIC, (errmsg("replication checkpoint has wrong magic %u instead of %u",
+ magic, REPLICATION_STATE_MAGIC)));
+
+ /* recover individual states, until there are no more to be found */
+ while (true)
+ {
+ ReplicationState local_state;
+ readBytes = read(fd, &local_state, sizeof(local_state));
+
+ /* no further data */
+ if (readBytes == 0)
+ break;
+
+ if (readBytes != sizeof(local_state))
+ {
+ int saved_errno = errno;
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read replication checkpoint file \"%s\": %m, read %d of %zu",
+ path, readBytes, sizeof(local_state))));
+ }
+
+ if (last_state == max_logical_slots)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found, increase max_logical_slots")));
+
+ /* copy data shared memory */
+ ReplicationStates[last_state++] = local_state;
+
+ elog(LOG, "recovered replication state of %u to %X/%X",
+ local_state.local_identifier,
+ (uint32)(local_state.remote_lsn >> 32),
+ (uint32)local_state.remote_lsn);
+ }
+
+ CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and guc_replication_origin_id that ensures
+ * we won't loose knowledge about that after a crash if the the transaction had
+ * a persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ int i;
+ int free_slot = -1;
+ ReplicationState *replication_state = NULL;
+
+ /*
+ * XXX: should we restore into a hashtable and dump into shmem only after
+ * recovery finished?
+ */
+
+ /* check whether slot already exists */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ /* ok, found slot */
+ replication_state = &ReplicationStates[i];
+ break;
+ }
+
+ if (replication_state == NULL && free_slot == -1)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ /* initialize new slot */
+ else if (replication_state == NULL)
+ {
+ replication_state = &ReplicationStates[free_slot];
+ Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ replication_state->local_identifier = node;
+ }
+
+ /*
+ * due to - harmless - race conditions during a checkpoint we could see
+ * values here that are older than the ones we already have in
+ * memory. Don't overwrite those.
+ */
+ if (replication_state->remote_lsn < remote_commit)
+ replication_state->remote_lsn = remote_commit;
+ if (replication_state->local_lsn < local_commit)
+ replication_state->local_lsn = local_commit;
+}
+
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the array
+ * doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only such cached identifier can exist per process and the current
+ * cached value can only be set again after the prvious value is torn down with
+ * TeardownCachedReplicationIdentifier.
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+ int i;
+ int free_slot = -1;
+
+ Assert(max_logical_slots != 0);
+ Assert(local_replication_state == NULL);
+
+ /*
+ * Aearch for either an existing slot for that identifier or a free one we
+ * can use.
+ */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ local_replication_state = &ReplicationStates[i];
+ }
+
+
+ if (local_replication_state == NULL && free_slot == -1)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ else if (local_replication_state == NULL)
+ {
+ local_replication_state = &ReplicationStates[free_slot];
+ local_replication_state->local_identifier = node;
+ Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+ }
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can be
+ * setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was cached.
+ */
+void
+TeardownCachedReplicationIdentifier(RepNodeId node)
+{
+ Assert(max_logical_slots != 0);
+ Assert(local_replication_state != NULL);
+
+ local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ Assert(local_replication_state != NULL);
+ if (local_replication_state->local_lsn < local_commit)
+ local_replication_state->local_lsn = local_commit;
+ if (local_replication_state->remote_lsn < remote_commit)
+ local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & chaced replication identifier.
+ */
+XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void)
+{
+ Assert(local_replication_state != NULL);
+ return local_replication_state->remote_lsn;
+}
#include "replication/logical.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/replication_identifier.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, LogicalDecodingShmemSize());
+ size = add_size(size, ReplicationIdentifierShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
CheckpointerShmemInit();
AutoVacuumShmemInit();
LogicalDecodingShmemInit();
+ ReplicationIdentifierShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
#include "catalog/pg_proc.h"
#include "catalog/pg_range.h"
#include "catalog/pg_rewrite.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_ts_config.h"
},
1024
},
+ {ReplicationIdentifierRelationId, /* REPLIDIDENT */
+ ReplicationLocalIdIndex,
+ 1,
+ {
+ Anum_pg_replication_riident,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
+ {ReplicationIdentifierRelationId, /* REPLIDREMOTE */
+ ReplicationRemoteIndex,
+ 4,
+ {
+ Anum_pg_replication_riremotesysid,
+ Anum_pg_replication_riremotedb,
+ Anum_pg_replication_rilocaldb,
+ Anum_pg_replication_riname
+ },
+ 16
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
static void assign_application_name(const char *newval, void *extra);
static const char *show_unix_socket_permissions(void);
static const char *show_log_file_mode(void);
+static void assign_replication_node_id(int newval, void *extra);
+static void assign_replication_origin_id(int newval, void *extra);
static char *config_enum_get_options(struct config_enum * record,
const char *prefix, const char *suffix,
static int wal_segment_size;
static bool integer_datetimes;
static int effective_io_concurrency;
-
+static int phony_replication_node_id;
+static int phony_replication_origin_id;
/* should be static, but commands/variable.c needs to get at this */
char *role_string;
NULL, NULL, NULL
},
+ {
+ {"replication_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("node id for replication."),
+ NULL
+ },
+ &phony_replication_node_id,
+ InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+ NULL, assign_replication_node_id, NULL
+ },
+
+ {
+ {"replication_origin_id", PGC_USERSET, REPLICATION_MASTER,
+ gettext_noop("current node id for replication."),
+ NULL
+ },
+ &phony_replication_origin_id,
+ InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+ NULL, assign_replication_origin_id, NULL
+ },
+
{
{"commit_siblings", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the minimum concurrent open transactions before performing "
return buf;
}
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+ guc_replication_node_id = newval;
+ /* set default to local node */
+ guc_replication_origin_id = newval;
+ phony_replication_origin_id = newval;
+}
+
+
+static void
+assign_replication_origin_id(int newval, void *extra)
+{
+ /*
+ * FIXME: add error checking hook that check wal_level and
+ * replication_node_id.
+ */
+ guc_replication_origin_id = newval;
+}
+
+
+
#include "guc-file.c"
#wal_receiver_timeout = 60s # time that receiver waits for
# communication from master
# in milliseconds; 0 disables
-
+#replication_node_id = 0 #invalid node id
#------------------------------------------------------------------------------
# QUERY TUNING
"pg_stat",
"pg_stat_tmp",
"pg_llog",
- "pg_llog/snapshots"
+ "pg_llog/snapshots",
+ "pg_llog/checkpoints"
};
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "common/fe_memutils.h"
+#include "replication/logical.h"
extern int optind;
extern char *optarg;
record->xl_len = sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+ record->xl_origin_id = InvalidRepNodeId;
memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
sizeof(CheckPoint));
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
} xl_xact_commit;
+typedef struct xl_xact_origin
+{
+ XLogRecPtr origin_lsn;
+ RepNodeId origin_node_id;
+ TimestampTz origin_timestamp;
+} xl_xact_origin;
+
#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
/*
*/
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
-
+#define XACT_CONTAINS_ORIGIN 0x04
/* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
/* 2 bytes of padding here, initialize to zero */
XLogRecPtr xl_prev; /* ptr to previous record in log */
pg_crc32 xl_crc; /* CRC for this record */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
/* If MAXALIGN==8, there are 4 wasted bytes here */
} RecoveryTargetType;
extern XLogRecPtr XactLastRecEnd;
+extern XLogRecPtr XactLastCommitEnd;
extern bool reachedConsistency;
*/
typedef uint32 TimeLineID;
+/*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops));
#define RangeTypidIndexId 3542
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3459, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdIndex 3459
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_remote_index, 3460, on pg_replication_identifier using btree(riremotesysid name_ops, riremotedb oid_ops, rilocaldb oid_ops, riname name_ops));
+#define ReplicationRemoteIndex 3460
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ *
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ * the genbki.pl script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ * pg_replication_identifier. cpp turns this into
+ * typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 3458
+
+CATALOG(pg_replication_identifier,3458) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+ /*
+ * locally known identifier that gets included into wal.
+ *
+ * This should never leave the system.
+ *
+ * Needs to fit into a uint16, so we don't waste too much space. For this
+ * reason we don't use a normal Oid column here, since we need to handle
+ * allocation of new values manually.
+ */
+ Oid riident;
+
+ /* ----
+ * remote system identifier, including tli, separated by a -.
+ *
+ * They are packed together for two reasons:
+ * a) we can't represent sysids as uint64 because there's no such type on
+ * sql level, so we need a fixed width string anyway. And a name already
+ * has enough space for that.
+ * b) syscaches can only have 4 keys, and were already at that with
+ * combined keys
+ * ----
+ */
+ NameData riremotesysid;
+
+ /* local database */
+ Oid rilocaldb;
+
+ /* remote database */
+ Oid riremotedb;
+
+ /* optional name, zero length string */
+ NameData riname;
+#ifdef CATALOG_VARLEN /* variable-length fields start here */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ * Form_pg_extension corresponds to a pointer to a tuple with
+ * the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ * compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier 5
+#define Anum_pg_replication_riident 1
+#define Anum_pg_replication_riremotesysid 2
+#define Anum_pg_replication_rilocaldb 3
+#define Anum_pg_replication_riremotedb 4
+#define Anum_pg_replication_riname 5
+
+/* ----------------
+ * pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif /* PG_REPLICTION_IDENTIFIER_H */
/* GUCs */
extern PGDLLIMPORT int max_logical_slots;
+#define InvalidRepNodeId 0
+extern PGDLLIMPORT RepNodeId guc_replication_node_id;
+extern PGDLLIMPORT RepNodeId guc_replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+
extern Size LogicalDecodingShmemSize(void);
extern void LogicalDecodingShmemInit(void);
int action_internal;
};
+ RepNodeId origin_id;
+
/*
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
* LSN of the first wal record with knowledge about this xid.
*/
XLogRecPtr lsn;
+
+ /*
+ * LSN of the commit record
+ */
XLogRecPtr last_lsn;
/*
*/
XLogRecPtr restart_decoding_lsn;
+ /* origin of the change that caused this transaction */
+ RepNodeId origin_id;
+
/* did the TX have catalog changes */
bool does_timetravel;
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferAddChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
-void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RepNodeId origin_id);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
--- /dev/null
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ * XXX
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+ Oid remotedb, Name riname,
+ Oid rilocaldb);
+extern RepNodeId CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+ Oid remotedb, Name riname,
+ Oid rilocaldb);
+
+extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident);
+
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(RepNodeId node);
+
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+#endif
RELFILENODE,
RELNAMENSP,
RELOID,
+ REPLIDIDENT,
+ REPLIDREMOTE,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,
FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = relnamespace
WHERE relkind = 'r' AND (nspname ~ '^pg_temp_') IS NOT TRUE
ORDER BY relname;
- relname | relhasindex
--------------------------+-------------
- a | f
- a_star | f
- abstime_tbl | f
- aggtest | f
- array_index_op_test | t
- array_op_test | f
- b | f
- b_star | f
- box_tbl | f
- bprime | f
- bt_f8_heap | t
- bt_i4_heap | t
- bt_name_heap | t
- bt_txt_heap | t
- c | f
- c_star | f
- char_tbl | f
- check2_tbl | f
- check_tbl | f
- circle_tbl | t
- city | f
- copy_tbl | f
- d | f
- d_star | f
- date_tbl | f
- default_tbl | f
- defaultexpr_tbl | f
- dept | f
- dupindexcols | t
- e_star | f
- emp | f
- equipment_r | f
- f_star | f
- fast_emp4000 | t
- float4_tbl | f
- float8_tbl | f
- func_index_heap | t
- hash_f8_heap | t
- hash_i4_heap | t
- hash_name_heap | t
- hash_txt_heap | t
- hobbies_r | f
- ihighway | t
- inet_tbl | f
- inhf | f
- inhx | t
- insert_tbl | f
- int2_tbl | f
- int4_tbl | f
- int8_tbl | f
- interval_tbl | f
- iportaltest | f
- kd_point_tbl | t
- log_table | f
- lseg_tbl | f
- main_table | f
- money_data | f
- num_data | f
- num_exp_add | t
- num_exp_div | t
- num_exp_ln | t
- num_exp_log10 | t
- num_exp_mul | t
- num_exp_power_10_ln | t
- num_exp_sqrt | t
- num_exp_sub | t
- num_input_test | f
- num_result | f
- onek | t
- onek2 | t
- path_tbl | f
- person | f
- pg_aggregate | t
- pg_am | t
- pg_amop | t
- pg_amproc | t
- pg_attrdef | t
- pg_attribute | t
- pg_auth_members | t
- pg_authid | t
- pg_cast | t
- pg_class | t
- pg_collation | t
- pg_constraint | t
- pg_conversion | t
- pg_database | t
- pg_db_role_setting | t
- pg_default_acl | t
- pg_depend | t
- pg_description | t
- pg_enum | t
- pg_event_trigger | t
- pg_extension | t
- pg_foreign_data_wrapper | t
- pg_foreign_server | t
- pg_foreign_table | t
- pg_index | t
- pg_inherits | t
- pg_language | t
- pg_largeobject | t
- pg_largeobject_metadata | t
- pg_namespace | t
- pg_opclass | t
- pg_operator | t
- pg_opfamily | t
- pg_pltemplate | t
- pg_proc | t
- pg_range | t
- pg_rewrite | t
- pg_seclabel | t
- pg_shdepend | t
- pg_shdescription | t
- pg_shseclabel | t
- pg_statistic | t
- pg_tablespace | t
- pg_trigger | t
- pg_ts_config | t
- pg_ts_config_map | t
- pg_ts_dict | t
- pg_ts_parser | t
- pg_ts_template | t
- pg_type | t
- pg_user_mapping | t
- point_tbl | t
- polygon_tbl | t
- quad_point_tbl | t
- radix_text_tbl | t
- ramp | f
- real_city | f
- reltime_tbl | f
- road | t
- shighway | t
- slow_emp4000 | f
- sql_features | f
- sql_implementation_info | f
- sql_languages | f
- sql_packages | f
- sql_parts | f
- sql_sizing | f
- sql_sizing_profiles | f
- stud_emp | f
- student | f
- tenk1 | t
- tenk2 | t
- test_range_excl | t
- test_range_gist | t
- test_range_spgist | t
- test_tsvector | f
- text_tbl | f
- time_tbl | f
- timestamp_tbl | f
- timestamptz_tbl | f
- timetz_tbl | f
- tinterval_tbl | f
- varchar_tbl | f
-(155 rows)
+ relname | relhasindex
+---------------------------+-------------
+ a | f
+ a_star | f
+ abstime_tbl | f
+ aggtest | f
+ array_index_op_test | t
+ array_op_test | f
+ b | f
+ b_star | f
+ box_tbl | f
+ bprime | f
+ bt_f8_heap | t
+ bt_i4_heap | t
+ bt_name_heap | t
+ bt_txt_heap | t
+ c | f
+ c_star | f
+ char_tbl | f
+ check2_tbl | f
+ check_tbl | f
+ circle_tbl | t
+ city | f
+ copy_tbl | f
+ d | f
+ d_star | f
+ date_tbl | f
+ default_tbl | f
+ defaultexpr_tbl | f
+ dept | f
+ dupindexcols | t
+ e_star | f
+ emp | f
+ equipment_r | f
+ f_star | f
+ fast_emp4000 | t
+ float4_tbl | f
+ float8_tbl | f
+ func_index_heap | t
+ hash_f8_heap | t
+ hash_i4_heap | t
+ hash_name_heap | t
+ hash_txt_heap | t
+ hobbies_r | f
+ ihighway | t
+ inet_tbl | f
+ inhf | f
+ inhx | t
+ insert_tbl | f
+ int2_tbl | f
+ int4_tbl | f
+ int8_tbl | f
+ interval_tbl | f
+ iportaltest | f
+ kd_point_tbl | t
+ log_table | f
+ lseg_tbl | f
+ main_table | f
+ money_data | f
+ num_data | f
+ num_exp_add | t
+ num_exp_div | t
+ num_exp_ln | t
+ num_exp_log10 | t
+ num_exp_mul | t
+ num_exp_power_10_ln | t
+ num_exp_sqrt | t
+ num_exp_sub | t
+ num_input_test | f
+ num_result | f
+ onek | t
+ onek2 | t
+ path_tbl | f
+ person | f
+ pg_aggregate | t
+ pg_am | t
+ pg_amop | t
+ pg_amproc | t
+ pg_attrdef | t
+ pg_attribute | t
+ pg_auth_members | t
+ pg_authid | t
+ pg_cast | t
+ pg_class | t
+ pg_collation | t
+ pg_constraint | t
+ pg_conversion | t
+ pg_database | t
+ pg_db_role_setting | t
+ pg_default_acl | t
+ pg_depend | t
+ pg_description | t
+ pg_enum | t
+ pg_event_trigger | t
+ pg_extension | t
+ pg_foreign_data_wrapper | t
+ pg_foreign_server | t
+ pg_foreign_table | t
+ pg_index | t
+ pg_inherits | t
+ pg_language | t
+ pg_largeobject | t
+ pg_largeobject_metadata | t
+ pg_namespace | t
+ pg_opclass | t
+ pg_operator | t
+ pg_opfamily | t
+ pg_pltemplate | t
+ pg_proc | t
+ pg_range | t
+ pg_replication_identifier | t
+ pg_rewrite | t
+ pg_seclabel | t
+ pg_shdepend | t
+ pg_shdescription | t
+ pg_shseclabel | t
+ pg_statistic | t
+ pg_tablespace | t
+ pg_trigger | t
+ pg_ts_config | t
+ pg_ts_config_map | t
+ pg_ts_dict | t
+ pg_ts_parser | t
+ pg_ts_template | t
+ pg_type | t
+ pg_user_mapping | t
+ point_tbl | t
+ polygon_tbl | t
+ quad_point_tbl | t
+ radix_text_tbl | t
+ ramp | f
+ real_city | f
+ reltime_tbl | f
+ road | t
+ shighway | t
+ slow_emp4000 | f
+ sql_features | f
+ sql_implementation_info | f
+ sql_languages | f
+ sql_packages | f
+ sql_parts | f
+ sql_sizing | f
+ sql_sizing_profiles | f
+ stud_emp | f
+ student | f
+ tenk1 | t
+ tenk2 | t
+ test_range_excl | t
+ test_range_gist | t
+ test_range_spgist | t
+ test_tsvector | f
+ text_tbl | f
+ time_tbl | f
+ timestamp_tbl | f
+ timestamptz_tbl | f
+ timetz_tbl | f
+ tinterval_tbl | f
+ varchar_tbl | f
+(156 rows)
--
-- another sanity check: every system catalog that has OIDs should have