static BDRRelation *read_rel(StringInfo s, LOCKMODE mode);
extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
-static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
-
-static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
- BDRRelation *rel, HeapTuple local_tuple,
- HeapTuple remote_tuple, HeapTuple *new_tuple,
- bool *perform_update, bool *log_update,
- BdrConflictResolution *resolution);
-static void do_log_conflict(BdrConflictType conflict_type,
- BdrConflictResolution resolution,
- BDRRelation *conflict_relation,
- RepNodeId local_node_id,
- bool apply_remote,
- TimestampTz local_timestamp,
- HeapTuple old_key,
- HeapTuple user_tuple);
+static void check_apply_update(BdrConflictType conflict_type,
+ RepNodeId local_node_id, TimestampTz local_ts,
+ BDRRelation *rel, HeapTuple local_tuple,
+ HeapTuple remote_tuple, HeapTuple *new_tuple,
+ bool *perform_update, bool *log_update,
+ BdrConflictResolution *resolution);
static void do_apply_update(BDRRelation *rel, EState *estate, TupleTableSlot *oldslot,
TupleTableSlot *newslot);
static void process_remote_delete(StringInfo s);
static void process_remote_message(StringInfo s);
+static void get_local_tuple_origin(HeapTuple tuple,
+ TimestampTz *commit_ts,
+ RepNodeId *node_id);
+static void abs_timestamp_difference(TimestampTz start_time,
+ TimestampTz stop_time,
+ long *secs, int *microsecs);
+
static void
process_remote_begin(StringInfo s)
{
*/
if (conflict)
{
- TransactionId xmin;
TimestampTz local_ts;
RepNodeId local_node_id;
bool apply_update;
bool log_update;
- CommitExtraData local_node_id_raw;
+ BdrApplyConflict *apply_conflict = NULL; /* Mute compiler */
BdrConflictResolution resolution;
- /* refetch tuple, check for old commit ts & origin */
- xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
+ get_local_tuple_origin(oldslot->tts_tuple, &local_ts, &local_node_id);
/*
* Use conflict triggers and/or last-update-wins to decide which tuple
* to retain.
*/
- TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
- local_node_id = local_node_id_raw;
-
- check_apply_update(local_node_id, local_ts, rel,
- NULL, NULL, NULL, &apply_update, &log_update,
- &resolution);
+ check_apply_update(BdrConflictType_InsertInsert,
+ local_node_id, local_ts, rel,
+ oldslot->tts_tuple, NULL, NULL,
+ &apply_update, &log_update, &resolution);
+ /*
+ * Log conflict to server log.
+ */
if (log_update)
{
- do_log_conflict(BdrConflictType_InsertInsert, resolution,
- rel, local_node_id, apply_update,
- local_ts,
- oldslot->tts_tuple, NULL /*no user tuple*/);
+ apply_conflict = bdr_make_apply_conflict(
+ BdrConflictType_InsertInsert, resolution,
+ replication_origin_xid, rel, oldslot, local_node_id,
+ newslot, NULL /*no error*/);
+
+ bdr_conflict_log_serverlog(apply_conflict);
bdr_count_insert_conflict();
}
+ /*
+ * Finally, apply the update.
+ */
if (apply_update)
{
simple_heap_update(rel->rel,
bdr_count_insert();
}
+ /* Log conflict to table */
if (log_update)
- bdr_conflict_log(BdrConflictType_InsertInsert, resolution,
- replication_origin_xid, rel, oldslot,
- local_node_id, newslot, NULL /*no error*/);
+ {
+ bdr_conflict_log_table(apply_conflict);
+ bdr_conflict_logging_cleanup();
+ }
}
else
{
if (found_tuple)
{
- TransactionId xmin;
TimestampTz local_ts;
RepNodeId local_node_id;
bool apply_update;
bool log_update;
+ BdrApplyConflict *apply_conflict = NULL; /* Mute compiler */
BdrConflictResolution resolution;
- CommitExtraData local_node_id_raw;
-
remote_tuple = heap_modify_tuple(oldslot->tts_tuple,
RelationGetDescr(rel->rel),
new_tuple.values,
resetStringInfo(&o);
#endif
- /* refetch tuple, check for old commit ts & origin */
- xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
+ get_local_tuple_origin(oldslot->tts_tuple, &local_ts, &local_node_id);
/*
* Use conflict triggers and/or last-update-wins to decide which tuple
* to retain.
*/
- TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
- local_node_id = local_node_id_raw;
-
- check_apply_update(local_node_id, local_ts, rel, oldslot->tts_tuple,
- remote_tuple, &user_tuple, &apply_update,
+ check_apply_update(BdrConflictType_UpdateUpdate,
+ local_node_id, local_ts, rel,
+ oldslot->tts_tuple, newslot->tts_tuple,
+ &user_tuple, &apply_update,
&log_update, &resolution);
+ /*
+ * Log conflict to both server log and table.
+ */
if (log_update)
{
- do_log_conflict(BdrConflictType_UpdateUpdate, resolution,
- rel, local_node_id, apply_update,
- local_ts,
- oldslot->tts_tuple, user_tuple);
+ apply_conflict = bdr_make_apply_conflict(
+ BdrConflictType_UpdateUpdate, resolution,
+ replication_origin_xid, rel, oldslot, local_node_id,
+ newslot, NULL /*no error*/);
+
+ bdr_conflict_log_serverlog(apply_conflict);
bdr_count_update_conflict();
}
do_apply_update(rel, estate, oldslot, newslot);
}
+ /* Log conflict to table */
if (log_update)
- bdr_conflict_log(BdrConflictType_UpdateUpdate, resolution,
- replication_origin_xid, rel, oldslot,
- local_node_id, newslot, NULL /*no error*/);
-
+ {
+ bdr_conflict_log_table(apply_conflict);
+ bdr_conflict_logging_cleanup();
+ }
}
else
{
*/
bool skip = false;
+ BdrApplyConflict *apply_conflict;
BdrConflictResolution resolution;
remote_tuple = heap_form_tuple(RelationGetDescr(rel->rel),
(errmsg("UPDATE vs DELETE handler returned a row which"
" isn't allowed for now")));
- resolution = user_tuple ? BdrConflictResolution_ConflictTriggerReturnedTuple :
- BdrConflictResolution_ConflictTriggerSkipChange;
-
- do_log_conflict(BdrConflictType_UpdateDelete, resolution,
- rel, InvalidRepNodeId, false, 0,
- remote_tuple, user_tuple);
+ if (skip)
+ resolution = BdrConflictResolution_ConflictTriggerSkipChange;
+ else if (user_tuple)
+ resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
+ else
+ resolution = BdrConflictResolution_DefaultSkipChange;
- bdr_conflict_log(BdrConflictType_UpdateDelete, resolution,
- replication_origin_xid, rel, NULL,
- InvalidRepNodeId, newslot, NULL /*no error*/);
+ apply_conflict = bdr_make_apply_conflict(
+ BdrConflictType_UpdateDelete, resolution, replication_origin_xid,
+ rel, NULL, InvalidRepNodeId, newslot, NULL /*no error*/);
+ bdr_conflict_log_serverlog(apply_conflict);
+ bdr_conflict_log_table(apply_conflict);
+ bdr_conflict_logging_cleanup();
}
PopActiveSnapshot();
* (This can also arise with an UPDATE that changes the PRIMARY KEY,
* as that's effectively a DELETE + INSERT).
*/
+
+ BdrApplyConflict *apply_conflict;
+
bdr_count_delete_conflict();
/* Since the local tuple is missing, fill slot from the received data. */
ExecStoreTuple(tup, oldslot, InvalidBuffer, true);
}
- do_log_conflict(BdrConflictType_DeleteDelete, BdrConflictResolution_ConflictTriggerSkipChange,
- rel, InvalidRepNodeId, false, 0,
- oldslot->tts_tuple, NULL);
-
- bdr_conflict_log(BdrConflictType_DeleteDelete, BdrConflictResolution_ConflictTriggerSkipChange,
- replication_origin_xid, rel, NULL,
- InvalidRepNodeId, oldslot, NULL /*no error*/);
+ apply_conflict = bdr_make_apply_conflict(
+ BdrConflictType_DeleteDelete,
+ BdrConflictResolution_DefaultSkipChange, replication_origin_xid,
+ rel, NULL, InvalidRepNodeId, oldslot, NULL /*no error*/);
+ bdr_conflict_log_serverlog(apply_conflict);
+ bdr_conflict_log_table(apply_conflict);
+ bdr_conflict_logging_cleanup();
}
PopActiveSnapshot();
CommandCounterIncrement();
}
+/*
+ * Get commit timestamp and origin of the tuple
+ */
+static void
+get_local_tuple_origin(HeapTuple tuple, TimestampTz *commit_ts, RepNodeId *node_id)
+{
+ TransactionId xmin;
+ CommitExtraData node_id_raw;
+
+ /* refetch tuple, check for old commit ts & origin */
+ xmin = HeapTupleHeaderGetXmin(tuple->t_data);
+
+ TransactionIdGetCommitTsData(xmin, commit_ts, &node_id_raw);
+ *node_id = node_id_raw;
+}
+
+/*
+ * Last update wins conflict handling.
+ */
+static void
+bdr_conflict_last_update_wins(RepNodeId local_node_id,
+ RepNodeId remote_node_id,
+ TimestampTz local_ts,
+ TimestampTz remote_ts,
+ bool *perform_update, bool *log_update,
+ BdrConflictResolution *resolution)
+{
+ int cmp;
+
+ cmp = timestamptz_cmp_internal(remote_ts, local_ts);
+ if (cmp > 0)
+ {
+ /* The most recent update is the remote one; apply it */
+ *perform_update = true;
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+ return;
+ }
+ else if (cmp < 0)
+ {
+ /* The most recent update is the local one; retain it */
+ *log_update = true;
+ *perform_update = false;
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+ return;
+ }
+ else
+ {
+ uint64 local_sysid,
+ remote_origin_sysid;
+ TimeLineID local_tli,
+ remote_tli;
+ Oid local_dboid,
+ remote_origin_dboid;
+ /*
+ * Timestamps are equal. Use sysid + timeline id to decide which
+ * tuple to retain.
+ */
+ bdr_fetch_sysid_via_node_id(local_node_id,
+ &local_sysid, &local_tli,
+ &local_dboid);
+ bdr_fetch_sysid_via_node_id(remote_node_id,
+ &remote_origin_sysid, &remote_tli,
+ &remote_origin_dboid);
+
+ /*
+ * As the timestamps were equal, we have to break the tie in a
+ * consistent manner that'll match across all nodes.
+ *
+ * Use the ordering of the node's unique identifier, the tuple of
+ * (sysid, timelineid, dboid).
+ */
+ if (local_sysid < remote_origin_sysid)
+ *perform_update = true;
+ else if (local_sysid > remote_origin_sysid)
+ *perform_update = false;
+ else if (local_tli < remote_tli)
+ *perform_update = true;
+ else if (local_tli > remote_tli)
+ *perform_update = false;
+ else if (local_dboid < remote_origin_dboid)
+ *perform_update = true;
+ else if (local_dboid > remote_origin_dboid)
+ *perform_update = false;
+ else
+ /* shouldn't happen */
+ elog(ERROR, "unsuccessful node comparison");
+
+ /*
+ * We don't log whether we used timestamp, sysid or timeline id to
+ * decide which tuple to retain. That'll be in the log record
+ * anyway, so we can reconstruct the decision from the log record
+ * later.
+ */
+ if (*perform_update)
+ {
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+ }
+ else
+ {
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+ *log_update = true;
+ }
+ }
+}
+
/*
* Check whether a remote insert or update conflicts with the local row
* version.
* is true. Its value is undefined if log_update is false.
*/
static void
-check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
- BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple,
- HeapTuple *new_tuple, bool *perform_update, bool *log_update,
- BdrConflictResolution *resolution)
+check_apply_update(BdrConflictType conflict_type,
+ RepNodeId local_node_id, TimestampTz local_ts,
+ BDRRelation *rel, HeapTuple local_tuple,
+ HeapTuple remote_tuple, HeapTuple *new_tuple,
+ bool *perform_update, bool *log_update,
+ BdrConflictResolution *resolution)
{
- int cmp, microsecs;
+ int microsecs;
long secs;
bool skip = false;
*perform_update = true;
return;
}
- else
+
+ /*
+ * Decide whether to keep the remote or local tuple based on a conflict
+ * trigger (if defined) or last-update-wins.
+ *
+ * If the caller doesn't provide storage for the conflict handler to
+ * store a new tuple in, don't fire any conflict triggers.
+ */
+
+ if (new_tuple)
{
/*
- * Decide whether to keep the remote or local tuple based on a conflict
- * trigger (if defined) or last-update-wins.
- *
- * If the caller doesn't provide storage for the conflict handler to
- * store a new tuple in, don't fire any conflict triggers.
+ * --------------
+ * Conflict trigger conflict handling - let the user decide whether to:
+ * - Ignore the remote update;
+ * - Supply a new tuple to replace the current tuple; or
+ * - Take no action and fall through to the next handling option
+ * --------------
*/
- if (new_tuple)
- {
- /*
- * --------------
- * Conflict trigger conflict handling - let the user decide whether to:
- * - Ignore the remote update;
- * - Supply a new tuple to replace the current tuple; or
- * - Take no action and fall through to the next handling option
- * --------------
- */
-
- TimestampDifference(replication_origin_timestamp, local_ts,
- &secs, µsecs);
-
- *new_tuple = bdr_conflict_handlers_resolve(rel, local_tuple,
- remote_tuple, "UPDATE",
- BdrConflictType_UpdateUpdate,
- abs(secs) * 1000000 + abs(microsecs),
- &skip);
-
- if (skip)
- {
- *log_update = true;
- *perform_update = false;
- *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
- return;
- }
- else if (*new_tuple)
- {
- /* Custom conflict handler returned tuple, log it. */
- *log_update = true;
- *perform_update = true;
- *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
- return;
- }
+ abs_timestamp_difference(replication_origin_timestamp, local_ts,
+ &secs, µsecs);
- /*
- * if user decided not to skip the conflict but didn't provide a
- * resolving tuple we fall back to default handling
- */
- }
+ *new_tuple = bdr_conflict_handlers_resolve(rel, local_tuple, remote_tuple,
+ conflict_type == BdrConflictType_InsertInsert ?
+ "INSERT" : "UPDATE",
+ conflict_type,
+ abs(secs) * 1000000 + abs(microsecs),
+ &skip);
- /* Last update wins conflict handling */
- cmp = timestamptz_cmp_internal(replication_origin_timestamp, local_ts);
- if (cmp > 0)
+ if (skip)
{
- /* The most recent update is the remote one; apply it */
- *perform_update = true;
- *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
- return;
- }
- else if (cmp < 0)
- {
- /* The most recent update is the local one; retain it */
*log_update = true;
*perform_update = false;
- *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+ *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
return;
}
- else if (cmp == 0)
+ else if (*new_tuple)
{
- uint64 local_sysid,
- remote_origin_sysid;
- TimeLineID local_tli,
- remote_tli;
- Oid local_dboid,
- remote_origin_dboid;
- /*
- * Timestamps are equal. Use sysid + timeline id to decide which
- * tuple to retain.
- */
- bdr_fetch_sysid_via_node_id(local_node_id,
- &local_sysid, &local_tli,
- &local_dboid);
- bdr_fetch_sysid_via_node_id(replication_origin_id,
- &remote_origin_sysid, &remote_tli,
- &remote_origin_dboid);
-
- /*
- * As the timestamps were equal, we have to break the tie in a
- * consistent manner that'll match across all nodes.
- *
- * Use the ordering of the node's unique identifier, the tuple of
- * (sysid, timelineid, dboid).
- */
- if (local_sysid < remote_origin_sysid)
- *perform_update = true;
- else if (local_sysid > remote_origin_sysid)
- *perform_update = false;
- else if (local_tli < remote_tli)
- *perform_update = true;
- else if (local_tli > remote_tli)
- *perform_update = false;
- else if (local_dboid < remote_origin_dboid)
- *perform_update = true;
- else if (local_dboid > remote_origin_dboid)
- *perform_update = false;
- else
- /* shouldn't happen */
- elog(ERROR, "unsuccessful node comparison");
-
- /*
- * We don't log whether we used timestamp, sysid or timeline id to
- * decide which tuple to retain. That'll be in the log record
- * anyway, so we can reconstruct the decision from the log record
- * later.
- */
- if (*perform_update)
- {
- *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
- }
- else
- {
- *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
- *log_update = true;
- }
-
+ /* Custom conflict handler returned tuple, log it. */
+ *log_update = true;
+ *perform_update = true;
+ *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
return;
}
+
+ /*
+ * if user decided not to skip the conflict but didn't provide a
+ * resolving tuple we fall back to default handling
+ */
}
- elog(ERROR, "unreachable code");
+ /* Use last update wins conflict handling. */
+ bdr_conflict_last_update_wins(local_node_id,
+ replication_origin_id,
+ local_ts,
+ replication_origin_timestamp,
+ perform_update, log_update,
+ resolution);
}
static void
}
-static void
-do_log_conflict(BdrConflictType conflict_type,
- BdrConflictResolution resolution,
- BDRRelation *conflict_relation,
- RepNodeId local_node_id,
- bool apply_remote,
- TimestampTz local_timestamp,
- HeapTuple primary_key,
- HeapTuple user_tuple)
-{
- StringInfoData s_key,
- s_user_tuple;
- char remote_ts[MAXDATELEN + 1];
- char local_ts[MAXDATELEN + 1];
- const char *object_namespace;
- const char *object_name;
-
- uint64 local_sysid,
- remote_origin_sysid;
- TimeLineID local_tli,
- remote_tli;
- Oid local_dboid,
- remote_origin_dboid;
-
-#define CONFLICT_MSG_PREFIX "CONFLICT: %s remote %s on relation %s.%s originating at node " UINT64_FORMAT ":%u:%u at ts %s;"
-
- /*
- * By default we only log conflicts where the remote change is discarded or
- * where a conflict handler emits a replacement tuple.
- *
- * Optionally the user may request that we log even conflicts where the
- * local tuple is replaced by a newer remote tuple. Most of the time these
- * are just noise, but there are times it's useful for debugging and tracing.
- */
-
- bdr_fetch_sysid_via_node_id(local_node_id,
- &local_sysid, &local_tli,
- &local_dboid);
- bdr_fetch_sysid_via_node_id(replication_origin_id,
- &remote_origin_sysid, &remote_tli,
- &remote_origin_dboid);
-
- Assert(remote_origin_sysid == origin_sysid);
- Assert(remote_tli == origin_timeline);
- Assert(remote_origin_dboid == origin_dboid);
-
- memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
- MAXDATELEN);
- memcpy(local_ts, timestamptz_to_str(local_timestamp),
- MAXDATELEN);
-
- initStringInfo(&s_key);
- tuple_to_stringinfo(&s_key, RelationGetDescr(conflict_relation->rel), primary_key);
-
- object_namespace = get_namespace_name(RelationGetNamespace(conflict_relation->rel));
- object_name = RelationGetRelationName(conflict_relation->rel);
-
- switch(conflict_type)
- {
- case BdrConflictType_InsertInsert:
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
- apply_remote ? "applying" : "skipping",
- apply_remote ? "INSERT as UPDATE" : "INSERT",
- object_namespace, object_name,
- remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
- local_node_id == InvalidRepNodeId ? "local" : "remote",
- local_sysid, local_tli, local_ts, s_key.data)));
- break;
- case BdrConflictType_InsertUpdate:
- /* XXX? */
- break;
- case BdrConflictType_UpdateUpdate:
- {
- if (user_tuple != NULL)
- {
- /* Conflict handler returned new tuple, need to report it */
- initStringInfo(&s_user_tuple);
- tuple_to_stringinfo(&s_user_tuple, RelationGetDescr(conflict_relation->rel),
- user_tuple);
-
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s, resolved by user tuple:%s",
- apply_remote ? "applying" : "skipping", "UPDATE",
- object_namespace, object_name,
- remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
- local_node_id == InvalidRepNodeId ? "local" : "remote",
- local_sysid, local_tli, local_ts, s_key.data,
- s_user_tuple.data)));
- }
- else
- {
- /* Handled by last update wins, or conflict handler w/o new tuple */
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
- apply_remote ? "applying" : "skipping", "UPDATE",
- object_namespace, object_name,
- remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
- local_node_id == InvalidRepNodeId ? "local" : "remote",
- local_sysid, local_tli, local_ts, s_key.data)));
- }
- }
- break;
- case BdrConflictType_UpdateDelete:
- case BdrConflictType_DeleteDelete:
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg(CONFLICT_MSG_PREFIX " could not find existing tuple. PKEY:%s",
- "skipping", BdrConflictType_UpdateDelete ? "UPDATE" : "DELETE",
- object_namespace, object_name,
- remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
- s_key.data)));
-
- break;
- case BdrConflictType_UnhandledTxAbort:
- /* XXX? */
- break;
- }
-
- resetStringInfo(&s_key);
-}
-
static void
do_apply_update(BDRRelation *rel, EState *estate, TupleTableSlot *oldslot,
TupleTableSlot *newslot)
return bdr_heap_open(relid, NoLock);
}
-/* print the tuple 'tuple' into the StringInfo s */
-static void
-tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
-{
- int natt;
- Oid oid;
-
- /* print oid of tuple, it's not included in the TupleDesc */
- if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
- {
- appendStringInfo(s, " oid[oid]:%u", oid);
- }
-
- /* print all columns individually */
- for (natt = 0; natt < tupdesc->natts; natt++)
- {
- Form_pg_attribute attr; /* the attribute itself */
- Oid typid; /* type of current attribute */
- HeapTuple type_tuple; /* information about a type */
- Form_pg_type type_form;
- Oid typoutput; /* output function */
- bool typisvarlena;
- Datum origval; /* possibly toasted Datum */
- Datum val; /* definitely detoasted Datum */
- char *outputstr = NULL;
- bool isnull; /* column is null? */
-
- attr = tupdesc->attrs[natt];
-
- /*
- * don't print dropped columns, we can't be sure everything is
- * available for them
- */
- if (attr->attisdropped)
- continue;
-
- /*
- * Don't print system columns
- */
- if (attr->attnum < 0)
- continue;
-
- typid = attr->atttypid;
-
- /* gather type name */
- type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
- if (!HeapTupleIsValid(type_tuple))
- elog(ERROR, "cache lookup failed for type %u", typid);
- type_form = (Form_pg_type) GETSTRUCT(type_tuple);
-
- /* print attribute name */
- appendStringInfoChar(s, ' ');
- appendStringInfoString(s, NameStr(attr->attname));
-
- /* print attribute type */
- appendStringInfoChar(s, '[');
- appendStringInfoString(s, NameStr(type_form->typname));
- appendStringInfoChar(s, ']');
-
- /* query output function */
- getTypeOutputInfo(typid,
- &typoutput, &typisvarlena);
-
- ReleaseSysCache(type_tuple);
-
- /* get Datum from tuple */
- origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
-
- if (isnull)
- outputstr = "(null)";
- else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
- outputstr = "(unchanged-toast-datum)";
- else if (typisvarlena)
- val = PointerGetDatum(PG_DETOAST_DATUM(origval));
- else
- val = origval;
-
- /* print data */
- if (outputstr == NULL)
- outputstr = OidOutputFunctionCall(typoutput, val);
-
- appendStringInfoChar(s, ':');
- appendStringInfoString(s, outputstr);
- }
-}
-
/*
* Read a remote action type and process the action record.
*
return true;
}
+/*
+ * abs_timestamp_difference -- convert the difference between two timestamps
+ * into integer seconds and microseconds
+ *
+ * The result is always the absolute (pozitive difference), so the order
+ * of input is not important.
+ *
+ * If either input is not finite, we return zeroes.
+ */
+static void
+abs_timestamp_difference(TimestampTz start_time, TimestampTz stop_time,
+ long *secs, int *microsecs)
+{
+ if (TIMESTAMP_NOT_FINITE(start_time) || TIMESTAMP_NOT_FINITE(stop_time))
+ {
+ *secs = 0;
+ *microsecs = 0;
+ }
+ else
+ {
+ TimestampTz diff = abs(stop_time - start_time);
+#ifdef HAVE_INT64_TIMESTAMP
+ *secs = (long) (diff / USECS_PER_SEC);
+ *microsecs = (int) (diff % USECS_PER_SEC);
+#else
+ *secs = (long) diff;
+ *microsecs = (int) ((diff - *secs) * 1000000.0);
+#endif
+ }
+}
+
/*
* The actual main loop of a BDR apply worker.
*/
#include "commands/sequence.h"
-#include "replication/replication_identifier.h"
-
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
+#include "utils/typcache.h"
+
/* GUCs */
bool bdr_log_conflicts_to_table = false;
#define BDR_CONFLICT_HISTORY_COLS 30
#define SYSID_DIGITS 33
-/*
- * Details of a conflict detected by an apply process, destined for logging
- * output and/or conflict triggers.
- *
- * Closely related to bdr.bdr_conflict_history SQL table.
- */
-typedef struct BdrApplyConflict
-{
- TransactionId local_conflict_txid;
- XLogRecPtr local_conflict_lsn;
- TimestampTz local_conflict_time;
- const char *object_schema; /* unused if apply_error */
- const char *object_name; /* unused if apply_error */
- uint64 remote_sysid;
- TransactionId remote_txid;
- TimestampTz remote_commit_time;
- XLogRecPtr remote_commit_lsn;
- BdrConflictType conflict_type;
- BdrConflictResolution conflict_resolution;
- bool local_tuple_null;
- Datum local_tuple; /* composite */
- TransactionId local_tuple_xmin;
- uint64 local_tuple_origin_sysid; /* init to 0 if unknown */
- bool remote_tuple_null;
- Datum remote_tuple; /* composite */
- ErrorData *apply_error;
-} BdrApplyConflict;
+/* We want our own memory ctx to clean up easily & reliably */
+MemoryContext conflict_log_context;
/*
* Perform syscache lookups etc for BDR conflict logging.
{
Oid schema_oid;
+ conflict_log_context = AllocSetContextCreate(CurrentMemoryContext,
+ "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
+
StartTransactionCommand();
schema_oid = get_namespace_oid("bdr", false);
CommitTransactionCommand();
}
+/*
+ * Cleanup our memory context.
+ */
+void
+bdr_conflict_logging_cleanup(void)
+{
+ if (conflict_log_context)
+ MemoryContextResetAndDeleteChildren(conflict_log_context);
+}
+
+
/* Get the enum oid for a given BdrConflictType */
static Datum
bdr_conflict_type_get_datum(BdrConflictType conflict_type)
return conflict_type_oid;
}
-/* Get the enum oid for a given BdrConflictResolution */
-static Datum
-bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+/* Get the enum name for a given BdrConflictResolution */
+static char *
+bdr_conflict_resolution_get_name(BdrConflictResolution conflict_resolution)
{
- Oid conflict_resolution_oid;
char *enumname = NULL;
switch (conflict_resolution)
case BdrConflictResolution_LastUpdateWins_KeepRemote:
enumname = "last_update_wins_keep_remote";
break;
+ case BdrConflictResolution_DefaultApplyChange:
+ enumname = "apply_change";
+ break;
+ case BdrConflictResolution_DefaultSkipChange:
+ enumname = "skip_change";
+ break;
case BdrConflictResolution_UnhandledTxAbort:
enumname = "unhandled_tx_abort";
break;
}
+
Assert(enumname != NULL);
+ return enumname;
+}
+
+/* Get the enum oid for a given BdrConflictResolution */
+static Datum
+bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+{
+ Oid conflict_resolution_oid;
+
+ char *enumname = bdr_conflict_resolution_get_name(conflict_resolution);
+
conflict_resolution_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
BdrConflictResolutionOid, CStringGetDatum(enumname));
if (conflict_resolution_oid == InvalidOid)
return row_json;
}
+/* print the tuple 'tuple' into the StringInfo s */
+void
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
+{
+ int natt;
+ Oid oid;
+
+ /* print oid of tuple, it's not included in the TupleDesc */
+ if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
+ {
+ appendStringInfo(s, " oid[oid]:%u", oid);
+ }
+
+ /* print all columns individually */
+ for (natt = 0; natt < tupdesc->natts; natt++)
+ {
+ Form_pg_attribute attr; /* the attribute itself */
+ Oid typid; /* type of current attribute */
+ HeapTuple type_tuple; /* information about a type */
+ Form_pg_type type_form;
+ Oid typoutput; /* output function */
+ bool typisvarlena;
+ Datum origval; /* possibly toasted Datum */
+ Datum val; /* definitely detoasted Datum */
+ char *outputstr = NULL;
+ bool isnull; /* column is null? */
+
+ attr = tupdesc->attrs[natt];
+
+ /*
+ * don't print dropped columns, we can't be sure everything is
+ * available for them
+ */
+ if (attr->attisdropped)
+ continue;
+
+ /*
+ * Don't print system columns
+ */
+ if (attr->attnum < 0)
+ continue;
+
+ typid = attr->atttypid;
+
+ /* gather type name */
+ type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+ if (!HeapTupleIsValid(type_tuple))
+ elog(ERROR, "cache lookup failed for type %u", typid);
+ type_form = (Form_pg_type) GETSTRUCT(type_tuple);
+
+ /* print attribute name */
+ appendStringInfoChar(s, ' ');
+ appendStringInfoString(s, NameStr(attr->attname));
+
+ /* print attribute type */
+ appendStringInfoChar(s, '[');
+ appendStringInfoString(s, NameStr(type_form->typname));
+ appendStringInfoChar(s, ']');
+
+ /* query output function */
+ getTypeOutputInfo(typid,
+ &typoutput, &typisvarlena);
+
+ ReleaseSysCache(type_tuple);
+
+ /* get Datum from tuple */
+ origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
+
+ if (isnull)
+ outputstr = "(null)";
+ else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+ outputstr = "(unchanged-toast-datum)";
+ else if (typisvarlena)
+ val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+ else
+ val = origval;
+
+ /* print data */
+ if (outputstr == NULL)
+ outputstr = OidOutputFunctionCall(typoutput, val);
+
+ appendStringInfoChar(s, ':');
+ appendStringInfoString(s, outputstr);
+ }
+}
+
+
+static void
+row_to_stringinfo(StringInfo s, Datum composite)
+{
+ HeapTupleHeader td;
+ Oid tupType;
+ int32 tupTypmod;
+ TupleDesc tupdesc;
+ HeapTupleData tmptup,
+ *tuple;
+
+ td = DatumGetHeapTupleHeader(composite);
+
+ /* Extract rowtype info and find a tupdesc */
+ tupType = HeapTupleHeaderGetTypeId(td);
+ tupTypmod = HeapTupleHeaderGetTypMod(td);
+ tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
+
+ /* Build a temporary HeapTuple control structure */
+ tmptup.t_len = HeapTupleHeaderGetDatumLength(td);
+ tmptup.t_data = td;
+ tuple = &tmptup;
+
+ /* Print the tuple to stringinfo */
+ tuple_to_stringinfo(s, tupdesc, tuple);
+
+ ReleaseTupleDesc(tupdesc);
+}
+
static void
bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx,
const char *in_str)
*
* The change will then be replicated to other nodes.
*/
-static void
+void
bdr_conflict_log_table(BdrApplyConflict *conflict)
{
Datum values[BDR_CONFLICT_HISTORY_COLS];
char remote_sysid[SYSID_DIGITS];
char origin_sysid[SYSID_DIGITS];
+ if (IsAbortedTransactionBlockState())
+ elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+
+ if (!IsTransactionState())
+ elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+
if (!bdr_log_conflicts_to_table)
/* No logging enabled and we don't own any memory, just bail */
return;
FreeExecutorState(log_estate);
}
+/*
+ * Log a BDR apply conflict to the postgreql log.
+ */
+void
+bdr_conflict_log_serverlog(BdrApplyConflict *conflict)
+{
+ StringInfoData s_key;
+ char *resolution_name;
+
+#define CONFLICT_MSG_PREFIX "CONFLICT: remote %s on relation %s.%s originating at node " UINT64_FORMAT ":%u:%u at ts %s;"
+
+ /* Create text representation of the PKEY tuple */
+ initStringInfo(&s_key);
+ if (!conflict->local_tuple_null)
+ row_to_stringinfo(&s_key, conflict->local_tuple);
+
+ resolution_name = bdr_conflict_resolution_get_name(conflict->conflict_resolution);
+
+ switch(conflict->conflict_type)
+ {
+ case BdrConflictType_InsertInsert:
+ case BdrConflictType_UpdateUpdate:
+ ereport(LOG,
+ (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg(CONFLICT_MSG_PREFIX " row was previously updated at node " UINT64_FORMAT ":%u. Resolution: %s; PKEY:%s",
+ BdrConflictType_InsertInsert ? "INSERT" : "UPDATE",
+ conflict->object_schema, conflict->object_name,
+ conflict->remote_sysid, conflict->remote_tli,
+ conflict->remote_dboid,
+ timestamptz_to_str(conflict->remote_commit_time),
+ conflict->local_tuple_origin_sysid,
+ conflict->local_tuple_origin_tli,
+ resolution_name,
+ s_key.data)));
+ break;
+ case BdrConflictType_UpdateDelete:
+ case BdrConflictType_DeleteDelete:
+ ereport(LOG,
+ (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg(CONFLICT_MSG_PREFIX " could not find existing row. Resolution: %s; PKEY:%s",
+ BdrConflictType_UpdateDelete ? "UPDATE" : "DELETE",
+ conflict->object_schema, conflict->object_name,
+ conflict->remote_sysid, conflict->remote_tli,
+ conflict->remote_dboid,
+ timestamptz_to_str(conflict->remote_commit_time),
+ resolution_name,
+ s_key.data)));
+
+ break;
+ case BdrConflictType_InsertUpdate:
+ /* XXX? */
+ break;
+ case BdrConflictType_UnhandledTxAbort:
+ /* XXX? */
+ break;
+ }
+
+ resetStringInfo(&s_key);
+}
+
+
/*
* Log a BDR apply conflict to the bdr.bdr_conflict_history table and/or
* system log.
* TupleTableSlot for the tuple. It isn't done here because the caller
* frequently already has the node id to hand.
*/
-void
-bdr_conflict_log(BdrConflictType conflict_type,
- BdrConflictResolution resolution, TransactionId remote_txid,
- BDRRelation *conflict_relation, TupleTableSlot *local_tuple,
- RepNodeId local_tuple_origin_id, TupleTableSlot *remote_tuple,
- ErrorData *apply_error)
+BdrApplyConflict *
+bdr_make_apply_conflict(BdrConflictType conflict_type,
+ BdrConflictResolution resolution,
+ TransactionId remote_txid,
+ BDRRelation *conflict_relation,
+ TupleTableSlot *local_tuple,
+ RepNodeId local_tuple_origin_id,
+ TupleTableSlot *remote_tuple,
+ ErrorData *apply_error)
{
- MemoryContext log_context, old_context;
- BdrApplyConflict conflict;
- TimeLineID tli;
- Oid dboid;
-
- if (IsAbortedTransactionBlockState())
- elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+ MemoryContext old_context;
+ BdrApplyConflict *conflict;
- if (!IsTransactionState())
- elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+ old_context = MemoryContextSwitchTo(conflict_log_context);
- /* We want our own memory ctx to clean up easily & reliably */
- log_context = AllocSetContextCreate(CurrentMemoryContext,
- "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
- old_context = MemoryContextSwitchTo(log_context);
+ conflict = palloc0(sizeof(BdrApplyConflict));
/* Populate the conflict record we're going to log */
- conflict.conflict_type = conflict_type;
- conflict.conflict_resolution = resolution;
+ conflict->conflict_type = conflict_type;
+ conflict->conflict_resolution = resolution;
- conflict.local_conflict_txid = GetTopTransactionIdIfAny();
- conflict.local_conflict_lsn = GetXLogInsertRecPtr();
- conflict.local_conflict_time = GetCurrentTimestamp();
- conflict.remote_txid = remote_txid;
+ conflict->local_conflict_txid = GetTopTransactionIdIfAny();
+ conflict->local_conflict_lsn = GetXLogInsertRecPtr();
+ conflict->local_conflict_time = GetCurrentTimestamp();
+ conflict->remote_txid = remote_txid;
/* set using bdr_conflict_setrel */
if (conflict_relation == NULL)
{
- conflict.object_schema = NULL;
- conflict.object_name = NULL;
+ conflict->object_schema = NULL;
+ conflict->object_name = NULL;
}
else
{
- conflict.object_name = RelationGetRelationName(conflict_relation->rel);
- conflict.object_schema =
+ conflict->object_name = RelationGetRelationName(conflict_relation->rel);
+ conflict->object_schema =
get_namespace_name(RelationGetNamespace(conflict_relation->rel));
}
/* TODO: May make sense to cache the remote sysid in a global too... */
bdr_fetch_sysid_via_node_id(replication_origin_id,
- &conflict.remote_sysid, &tli, &dboid);
- conflict.remote_commit_time = replication_origin_timestamp;
- conflict.remote_txid = remote_txid;
- conflict.remote_commit_lsn = replication_origin_lsn;
+ &conflict->remote_sysid,
+ &conflict->remote_tli,
+ &conflict->remote_dboid);
+ conflict->remote_commit_time = replication_origin_timestamp;
+ conflict->remote_txid = remote_txid;
+ conflict->remote_commit_lsn = replication_origin_lsn;
if (local_tuple != NULL)
{
/* Log local tuple xmin even if actual tuple value logging is off */
- conflict.local_tuple_xmin =
+ conflict->local_tuple_xmin =
HeapTupleHeaderGetXmin(local_tuple->tts_tuple->t_data);
- Assert(conflict.local_tuple_xmin >= FirstNormalTransactionId ||
- conflict.local_tuple_xmin == FrozenTransactionId);
+ Assert(conflict->local_tuple_xmin >= FirstNormalTransactionId ||
+ conflict->local_tuple_xmin == FrozenTransactionId);
if (bdr_conflict_logging_include_tuples)
{
- conflict.local_tuple = ExecFetchSlotTupleDatum(local_tuple);
- conflict.local_tuple_null = false;
+ conflict->local_tuple = ExecFetchSlotTupleDatum(local_tuple);
+ conflict->local_tuple_null = false;
}
}
else
{
- conflict.local_tuple_null = true;
- conflict.local_tuple = (Datum) 0;
- conflict.local_tuple_xmin = InvalidTransactionId;
+ conflict->local_tuple_null = true;
+ conflict->local_tuple = (Datum) 0;
+ conflict->local_tuple_xmin = InvalidTransactionId;
}
if (local_tuple_origin_id != InvalidRepNodeId)
{
bdr_fetch_sysid_via_node_id(local_tuple_origin_id,
- &conflict.local_tuple_origin_sysid, &tli,
- &dboid);
+ &conflict->local_tuple_origin_sysid,
+ &conflict->local_tuple_origin_tli,
+ &conflict->local_tuple_origin_dboid);
}
else
{
- conflict.local_tuple_origin_sysid = 0;
+ conflict->local_tuple_origin_sysid = 0;
}
if (remote_tuple != NULL && bdr_conflict_logging_include_tuples)
{
- conflict.remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
- conflict.remote_tuple_null = false;
+ conflict->remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
+ conflict->remote_tuple_null = false;
}
else
{
- conflict.remote_tuple_null = true;
- conflict.remote_tuple = (Datum) 0;
+ conflict->remote_tuple_null = true;
+ conflict->remote_tuple = (Datum) 0;
}
- conflict.apply_error = apply_error;
-
- bdr_conflict_log_table(&conflict);
+ conflict->apply_error = apply_error;
MemoryContextSwitchTo(old_context);
- MemoryContextDelete(log_context);
+
+ return conflict;
}