* bdr_replication_identifier.c
* Replication identifiers emulation
*
+ * The replication identifier is used to track the position to which local
+ * node has replayed the replication stream received from the remote node.
+ * The information is used after reconnect to specify from which position
+ * we want to receive changes.
+ *
+ * The replication identifier information has to be crash safe in order
+ * to guarantee that we always start replaying the stream from the position
+ * that was sucessfully saved to disk on local node.
+ *
+ * In BDR patched PostgreSQL we attach the replication identifier info into
+ * xlog records. But because extensions don't have access to xlog, UDR has
+ * to store data in a table which is less efficient.
*
* Copyright (C) 2012-2013, PostgreSQL Global Development Group
*
XLogRecPtr replication_origin_lsn;
TimestampTz replication_origin_timestamp;
-#define Natts_pg_replication_identifier 4
+#define Natts_pg_replication_identifier 2
#define Anum_pg_replication_riident 1
#define Anum_pg_replication_riname 2
-#define Anum_pg_replication_riremote_lsn 3
-#define Anum_pg_replication_rilocal_lsn 4
+#define Natts_pg_replication_identifier_pos 3
+#define Anum_pg_replication_pos_riident 1
+#define Anum_pg_replication_pos_riremote_lsn 2
+#define Anum_pg_replication_pos_rilocal_lsn 3
static Oid ReplicationIdentifierRelationId = InvalidOid;
+static Oid ReplicationIdentifierPosRelationId = InvalidOid;
static Oid ReplicationLocalIdentIndex = InvalidOid;
+static Oid ReplicationPosLocalIdentIndex = InvalidOid;
/*
* Replay progress of a single remote node.
*/
static ReplicationState *local_replication_state = NULL;
+PG_FUNCTION_INFO_V1(bdr_replication_identifier_is_replaying);
+
static void
EnsureReplicationIdentifierRelationId(void)
{
if (ReplicationIdentifierRelationId == InvalidOid ||
- ReplicationLocalIdentIndex == InvalidOid)
+ ReplicationLocalIdentIndex == InvalidOid ||
+ ReplicationIdentifierPosRelationId == InvalidOid ||
+ ReplicationPosLocalIdentIndex == InvalidOid)
{
Oid schema_oid = get_namespace_oid("bdr", false);
bdr_lookup_relid("bdr_replication_identifier", schema_oid);
ReplicationLocalIdentIndex =
bdr_lookup_relid("bdr_replication_identifier_riiident_index", schema_oid);
+
+ ReplicationIdentifierPosRelationId =
+ bdr_lookup_relid("bdr_replication_identifier_pos", schema_oid);
+ ReplicationPosLocalIdentIndex =
+ bdr_lookup_relid("bdr_replication_identifier_pos_riiident_index", schema_oid);
}
}
RepNodeId
GetReplicationIdentifier(char *riname, bool missing_ok)
{
- Relation rel;
+ Relation rel;
Snapshot snap;
SysScanDesc scan;
- ScanKeyData key;
+ ScanKeyData key;
HeapTuple tuple;
- Oid riident = InvalidOid;
+ Oid riident = InvalidOid;
EnsureReplicationIdentifierRelationId();
snap = RegisterSnapshot(GetLatestSnapshot());
rel = heap_open(ReplicationIdentifierRelationId, RowExclusiveLock);
- ScanKeyInit(&key,
+ ScanKeyInit(&key,
Anum_pg_replication_riname,
BTEqualStrategyNumber, F_TEXTEQ,
CStringGetTextDatum(riname));
- scan = systable_beginscan(rel, 0, true, snap, 1, &key);
- tuple = systable_getnext(scan);
+ scan = systable_beginscan(rel, 0, true, snap, 1, &key);
+ tuple = systable_getnext(scan);
- if (HeapTupleIsValid(tuple))
+ if (HeapTupleIsValid(tuple))
{
Datum values[Natts_pg_replication_identifier];
bool nulls[Natts_pg_replication_identifier];
- heap_deform_tuple(tuple, RelationGetDescr(rel),
+ heap_deform_tuple(tuple, RelationGetDescr(rel),
values, nulls);
riident = DatumGetObjectId(values[0]);
- }
+ }
else if (!missing_ok)
elog(ERROR, "cache lookup failed for replication identifier named %s",
riname);
RepNodeId
CreateReplicationIdentifier(char *riname)
{
- Oid riident;
- HeapTuple tuple = NULL;
- Relation rel;
- Datum riname_d;
+ Oid riident;
+ HeapTuple tuple = NULL;
+ Relation rel,
+ relpos;
+ Datum riname_d;
SnapshotData SnapshotDirty;
SysScanDesc scan;
ScanKeyData key;
InitDirtySnapshot(SnapshotDirty);
rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+ relpos = heap_open(ReplicationIdentifierPosRelationId, ExclusiveLock);
for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
{
- bool nulls[Natts_pg_replication_identifier];
- Datum values[Natts_pg_replication_identifier];
+ bool nulls[Natts_pg_replication_identifier_pos];
+ Datum values[Natts_pg_replication_identifier_pos];
bool collides;
CHECK_FOR_INTERRUPTS();
* Ok, found an unused riident, insert the new row and do a CCI,
* so our callers can look it up if they want to.
*/
- memset(&nulls, 0, sizeof(nulls));
+ memset(&nulls, false, sizeof(nulls));
+ memset(values, 0, sizeof(values));
values[0] = ObjectIdGetDatum(riident);
values[1] = riname_d;
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
simple_heap_insert(rel, tuple);
CatalogUpdateIndexes(rel, tuple);
+
+ /* Insert new tuple to the position tracking table too */
+ memset(&nulls, false, sizeof(nulls));
+ memset(values, 0, sizeof(values));
+
+ values[0] = ObjectIdGetDatum(riident);
+
+ tuple = heap_form_tuple(RelationGetDescr(relpos), values, nulls);
+ simple_heap_insert(relpos, tuple);
+ CatalogUpdateIndexes(relpos, tuple);
+
CommandCounterIncrement();
break;
}
/* now release lock again, */
heap_close(rel, ExclusiveLock);
+ heap_close(relpos, ExclusiveLock);
if (tuple == NULL)
ereport(ERROR,
void
DropReplicationIdentifier(RepNodeId riident)
{
- HeapTuple tuple = NULL;
- Relation rel;
+ HeapTuple tuple = NULL;
+ Relation rel,
+ relpos;
SnapshotData SnapshotDirty;
- SysScanDesc scan;
- ScanKeyData key;
+ SysScanDesc scan;
+ ScanKeyData key;
Assert(IsTransactionState());
InitDirtySnapshot(SnapshotDirty);
rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+ relpos = heap_open(ReplicationIdentifierPosRelationId, ExclusiveLock);
+ /* Find and delete tuple from name table */
ScanKeyInit(&key,
Anum_pg_replication_riident,
BTEqualStrategyNumber, F_OIDEQ,
systable_endscan(scan);
+ /* Find and delete tuple from position tracking table */
+ ScanKeyInit(&key,
+ Anum_pg_replication_pos_riident,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(riident));
+
+ scan = systable_beginscan(relpos, ReplicationPosLocalIdentIndex,
+ true /* indexOK */,
+ &SnapshotDirty,
+ 1, &key);
+
+ tuple = systable_getnext(scan);
+
+ if (HeapTupleIsValid(tuple))
+ simple_heap_delete(relpos, &tuple->t_self);
+
+ systable_endscan(scan);
+
CommandCounterIncrement();
/* now release lock again, */
InitDirtySnapshot(SnapshotDirty);
- rel = heap_open(ReplicationIdentifierRelationId, RowExclusiveLock);
+ rel = heap_open(ReplicationIdentifierPosRelationId, RowExclusiveLock);
ScanKeyInit(&key,
- Anum_pg_replication_riident,
+ Anum_pg_replication_pos_riident,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(node));
- scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+ scan = systable_beginscan(rel, ReplicationPosLocalIdentIndex,
true /* indexOK */,
&SnapshotDirty,
1, &key);
tuple = systable_getnext(scan);
if (HeapTupleIsValid(tuple))
- {
+ {
HeapTuple newtuple;
- Datum values[Natts_pg_replication_identifier];
- bool nulls[Natts_pg_replication_identifier];
+ Datum values[Natts_pg_replication_identifier_pos];
+ bool nulls[Natts_pg_replication_identifier_pos];
- heap_deform_tuple(tuple, RelationGetDescr(rel),
+ heap_deform_tuple(tuple, RelationGetDescr(rel),
values, nulls);
- values[2] = LSNGetDatum(remote_commit);
- values[3] = LSNGetDatum(local_commit);
+ values[1] = LSNGetDatum(remote_commit);
+ values[2] = LSNGetDatum(local_commit);
newtuple = heap_form_tuple(RelationGetDescr(rel),
values, nulls);
simple_heap_update(rel, &tuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
- }
+ }
systable_endscan(scan);
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot setup replication origin when one is already setup")));
- local_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
+ local_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
local_replication_state->local_identifier = node;
local_replication_state->remote_lsn = InvalidXLogRecPtr;
local_replication_state->local_lsn = InvalidXLogRecPtr;
AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
XLogRecPtr local_commit)
{
- bool start_transaction = !IsTransactionState();
+ bool start_transaction = !IsTransactionState();
Assert(local_replication_state != NULL);
local_replication_state->local_lsn = local_commit;
local_replication_state->remote_lsn = remote_commit;
- if (start_transaction)
- StartTransactionCommand();
+ if (start_transaction)
+ StartTransactionCommand();
- AdvanceReplicationIdentifier(local_replication_state->local_identifier, remote_commit, local_commit);
+ AdvanceReplicationIdentifier(local_replication_state->local_identifier, remote_commit, local_commit);
- if (start_transaction)
- CommitTransactionCommand();
+ if (start_transaction)
+ CommitTransactionCommand();
}
XLogRecPtr
* The result needs to be ReleaseSysCache'ed and is an invalid HeapTuple if
* the lookup failed.
*/
-HeapTuple
-GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok)
+void
+GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname)
{
- Relation rel;
+ Relation rel;
Snapshot snap;
SysScanDesc scan;
- ScanKeyData key;
+ ScanKeyData key;
HeapTuple tuple = NULL;
+ Form_pg_replication_identifier ric;
EnsureReplicationIdentifierRelationId();
true /* indexOK */,
snap,
1, &key);
- tuple = systable_getnext(scan);
+ tuple = systable_getnext(scan);
- if (!HeapTupleIsValid(tuple) && !missing_ok)
+ if (HeapTupleIsValid(tuple))
+ {
+ ric = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+ *riname = pstrdup(text_to_cstring(&ric->riname));
+ }
+
+ if (!HeapTupleIsValid(tuple) && !missing_ok)
elog(ERROR, "cache lookup failed for replication identifier id: %u",
riident);
systable_endscan(scan);
UnregisterSnapshot(snap);
heap_close(rel, RowExclusiveLock);
+}
+
+static void
+CheckReplicationIdentifierPrerequisites(bool check_slots)
+{
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("only superusers can query or manipulate replication identifiers")));
+
+ if (check_slots && max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot query or manipulate replication identifiers when max_replication_slots = 0")));
- return tuple;
}
+Datum
+bdr_replication_identifier_is_replaying(PG_FUNCTION_ARGS)
+{
+ CheckReplicationIdentifierPrerequisites(true);
+
+ PG_RETURN_BOOL(replication_origin_id != InvalidRepNodeId);
+}