bdr: conflict handling refactor phase 2
authorPetr Jelinek <pjmodos@pjmodos.net>
Thu, 13 Nov 2014 23:18:04 +0000 (00:18 +0100)
committerPetr Jelinek <pjmodos@pjmodos.net>
Thu, 13 Nov 2014 23:18:04 +0000 (00:18 +0100)
New logging API has one unified function that creates Conflict object
which then can be used to log both into server log and to table.

Logging of all conflict types was unified under this new API.

Added two new conflict resolution types 'apply_change' and 'skip_change'
which are used in places where it's impossible to use conflict handlers
(like delete_delete conflict) and also in UDR instead of last update
wins.

In passing fix the timeframe handling of user defined conflict handlers.

bdr--0.8.0--0.8.0.1.sql
bdr.h
bdr_apply.c
bdr_conflict_handlers.c
bdr_conflict_logging.c
expected/isolation/dmlconflict_dd.out
expected/isolation/dmlconflict_ud.out

index d14dcbc21281de2975fc11202657a783d9d4b0b0..985ce653cdcdc5e9011078c2a2cfca3a722f8c9c 100644 (file)
@@ -7,3 +7,24 @@ CREATE TABLE bdr.bdr_replication_set_config
 );
 ALTER TABLE bdr.bdr_replication_set_config SET (user_catalog_table = true);
 REVOKE ALL ON TABLE bdr.bdr_replication_set_config FROM PUBLIC;
+
+
+-- We can't use ALTER TYPE ... ADD inside transaction, so do it the hard way...
+ALTER TYPE bdr.bdr_conflict_resolution RENAME TO bdr_conflict_resolution_old;
+
+CREATE TYPE bdr.bdr_conflict_resolution AS ENUM
+(
+    'conflict_trigger_skip_change',
+    'conflict_trigger_returned_tuple',
+    'last_update_wins_keep_local',
+    'last_update_wins_keep_remote',
+    'apply_change',
+    'skip_change',
+    'unhandled_tx_abort'
+);
+
+COMMENT ON TYPE bdr.bdr_conflict_resolution IS 'Resolution of a bdr conflict - if a conflict was resolved by a conflict trigger, by last-update-wins tests on commit timestamps, etc.';
+
+ALTER TABLE bdr.bdr_conflict_history ALTER COLUMN conflict_resolution TYPE bdr.bdr_conflict_resolution USING conflict_resolution::text::bdr.bdr_conflict_resolution;
+
+DROP TYPE bdr.bdr_conflict_resolution_old;
diff --git a/bdr.h b/bdr.h
index c4fce6d1540516227ad24c50c5d3c3992550dbd5..7c7a88abffc35d8ed0ef61e39c585dc9cd68ac6a 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -89,6 +89,8 @@ typedef enum BdrConflictResolution
    BdrConflictResolution_ConflictTriggerReturnedTuple,
    BdrConflictResolution_LastUpdateWins_KeepLocal,
    BdrConflictResolution_LastUpdateWins_KeepRemote,
+   BdrConflictResolution_DefaultApplyChange,
+   BdrConflictResolution_DefaultSkipChange,
    BdrConflictResolution_UnhandledTxAbort
 } BdrConflictResolution;
 
@@ -297,16 +299,55 @@ extern bool find_pkey_tuple(struct ScanKeyData *skey, BDRRelation *rel,
                            bool lock, enum LockTupleMode mode);
 
 /* conflict logging (usable in apply only) */
+
+/*
+ * Details of a conflict detected by an apply process, destined for logging
+ * output and/or conflict triggers.
+ *
+ * Closely related to bdr.bdr_conflict_history SQL table.
+ */
+typedef struct BdrApplyConflict
+{
+   TransactionId           local_conflict_txid;
+   XLogRecPtr              local_conflict_lsn;
+   TimestampTz             local_conflict_time;
+   const char             *object_schema; /* unused if apply_error */
+   const char             *object_name;   /* unused if apply_error */
+   uint64                  remote_sysid;
+   TimeLineID              remote_tli;
+   Oid                     remote_dboid;
+   TransactionId           remote_txid;
+   TimestampTz             remote_commit_time;
+   XLogRecPtr              remote_commit_lsn;
+   BdrConflictType         conflict_type;
+   BdrConflictResolution   conflict_resolution;
+   bool                    local_tuple_null;
+   Datum                   local_tuple;    /* composite */
+   TransactionId           local_tuple_xmin;
+   uint64                  local_tuple_origin_sysid; /* init to 0 if unknown */
+   TimeLineID              local_tuple_origin_tli;
+   Oid                     local_tuple_origin_dboid;
+   bool                    remote_tuple_null;
+   Datum                   remote_tuple;   /* composite */
+   ErrorData              *apply_error;
+} BdrApplyConflict;
+
 extern void bdr_conflict_logging_startup(void);
+extern void bdr_conflict_logging_cleanup(void);
+
+extern BdrApplyConflict * bdr_make_apply_conflict(BdrConflictType conflict_type,
+                                   BdrConflictResolution resolution,
+                                   TransactionId remote_txid,
+                                   BDRRelation *conflict_relation,
+                                   struct TupleTableSlot *local_tuple,
+                                   RepNodeId local_tuple_origin_id,
+                                   struct TupleTableSlot *remote_tuple,
+                                   struct ErrorData *apply_error);
+
+extern void bdr_conflict_log_serverlog(BdrApplyConflict *conflict);
+extern void bdr_conflict_log_table(BdrApplyConflict *conflict);
 
-extern void bdr_conflict_log(BdrConflictType conflict_type,
-                            BdrConflictResolution resolution,
-                            TransactionId remote_txid,
-                            BDRRelation *conflict_relation,
-                            struct TupleTableSlot *local_tuple,
-                            RepNodeId local_tuple_origin_id,
-                            struct TupleTableSlot *remote_tuple,
-                            struct ErrorData *apply_error);
+extern void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
 
 /* sequence support */
 extern void bdr_sequencer_shmem_init(int nnodes, int sequencers);
index de7bf5170261359d22445c21fdd355050f57faa0..82503ac0f7596b527379ae78c76b7680b6439c84 100644 (file)
@@ -104,21 +104,12 @@ dlist_head bdr_lsn_association = DLIST_STATIC_INIT(bdr_lsn_association);
 static BDRRelation *read_rel(StringInfo s, LOCKMODE mode);
 extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
 
-static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
-
-static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
-                  BDRRelation *rel, HeapTuple local_tuple,
-                  HeapTuple remote_tuple, HeapTuple *new_tuple,
-                  bool *perform_update, bool *log_update,
-                  BdrConflictResolution *resolution);
-static void do_log_conflict(BdrConflictType conflict_type,
-               BdrConflictResolution resolution,
-               BDRRelation *conflict_relation,
-               RepNodeId local_node_id,
-               bool apply_remote,
-               TimestampTz local_timestamp,
-               HeapTuple old_key,
-               HeapTuple user_tuple);
+static void check_apply_update(BdrConflictType conflict_type,
+                              RepNodeId local_node_id, TimestampTz local_ts,
+                              BDRRelation *rel, HeapTuple local_tuple,
+                              HeapTuple remote_tuple, HeapTuple *new_tuple,
+                              bool *perform_update, bool *log_update,
+                              BdrConflictResolution *resolution);
 static void do_apply_update(BDRRelation *rel, EState *estate, TupleTableSlot *oldslot,
                TupleTableSlot *newslot);
 
@@ -134,6 +125,13 @@ static void process_remote_update(StringInfo s);
 static void process_remote_delete(StringInfo s);
 static void process_remote_message(StringInfo s);
 
+static void get_local_tuple_origin(HeapTuple tuple,
+                                  TimestampTz *commit_ts,
+                                  RepNodeId *node_id);
+static void abs_timestamp_difference(TimestampTz start_time,
+                                    TimestampTz stop_time,
+                                    long *secs, int *microsecs);
+
 static void
 process_remote_begin(StringInfo s)
 {
@@ -497,38 +495,42 @@ process_remote_insert(StringInfo s)
     */
    if (conflict)
    {
-       TransactionId xmin;
        TimestampTz local_ts;
        RepNodeId   local_node_id;
        bool        apply_update;
        bool        log_update;
-       CommitExtraData local_node_id_raw;
+       BdrApplyConflict *apply_conflict = NULL; /* Mute compiler */
        BdrConflictResolution resolution;
 
-       /* refetch tuple, check for old commit ts & origin */
-       xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
+       get_local_tuple_origin(oldslot->tts_tuple, &local_ts, &local_node_id);
 
        /*
         * Use conflict triggers and/or last-update-wins to decide which tuple
         * to retain.
         */
-       TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
-       local_node_id = local_node_id_raw;
-
-       check_apply_update(local_node_id, local_ts, rel,
-                          NULL, NULL, NULL, &apply_update, &log_update,
-                          &resolution);
+       check_apply_update(BdrConflictType_InsertInsert,
+                          local_node_id, local_ts, rel,
+                          oldslot->tts_tuple, NULL, NULL,
+                          &apply_update, &log_update, &resolution);
 
+       /*
+        * Log conflict to server log.
+        */
        if (log_update)
        {
-           do_log_conflict(BdrConflictType_InsertInsert, resolution,
-                           rel, local_node_id, apply_update,
-                           local_ts,
-                           oldslot->tts_tuple, NULL /*no user tuple*/);
+           apply_conflict = bdr_make_apply_conflict(
+               BdrConflictType_InsertInsert, resolution,
+               replication_origin_xid, rel, oldslot, local_node_id,
+               newslot, NULL /*no error*/);
+
+           bdr_conflict_log_serverlog(apply_conflict);
 
            bdr_count_insert_conflict();
        }
 
+       /*
+        * Finally, apply the update.
+        */
        if (apply_update)
        {
            simple_heap_update(rel->rel,
@@ -540,10 +542,12 @@ process_remote_insert(StringInfo s)
            bdr_count_insert();
        }
 
+       /* Log conflict to table */
        if (log_update)
-           bdr_conflict_log(BdrConflictType_InsertInsert, resolution,
-                            replication_origin_xid, rel, oldslot,
-                            local_node_id, newslot, NULL /*no error*/);
+       {
+           bdr_conflict_log_table(apply_conflict);
+           bdr_conflict_logging_cleanup();
+       }
    }
    else
    {
@@ -697,15 +701,13 @@ process_remote_update(StringInfo s)
 
    if (found_tuple)
    {
-       TransactionId xmin;
        TimestampTz local_ts;
        RepNodeId   local_node_id;
        bool        apply_update;
        bool        log_update;
+       BdrApplyConflict *apply_conflict = NULL; /* Mute compiler */
        BdrConflictResolution resolution;
 
-       CommitExtraData local_node_id_raw;
-
        remote_tuple = heap_modify_tuple(oldslot->tts_tuple,
                                         RelationGetDescr(rel->rel),
                                         new_tuple.values,
@@ -723,26 +725,29 @@ process_remote_update(StringInfo s)
        resetStringInfo(&o);
 #endif
 
-       /* refetch tuple, check for old commit ts & origin */
-       xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
+       get_local_tuple_origin(oldslot->tts_tuple, &local_ts, &local_node_id);
 
        /*
         * Use conflict triggers and/or last-update-wins to decide which tuple
         * to retain.
         */
-       TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
-       local_node_id = local_node_id_raw;
-
-       check_apply_update(local_node_id, local_ts, rel, oldslot->tts_tuple,
-                          remote_tuple, &user_tuple, &apply_update,
+       check_apply_update(BdrConflictType_UpdateUpdate,
+                          local_node_id, local_ts, rel,
+                          oldslot->tts_tuple, newslot->tts_tuple,
+                          &user_tuple, &apply_update,
                           &log_update, &resolution);
 
+       /*
+        * Log conflict to both server log and table.
+        */
        if (log_update)
        {
-           do_log_conflict(BdrConflictType_UpdateUpdate, resolution,
-                           rel, local_node_id, apply_update,
-                           local_ts,
-                           oldslot->tts_tuple, user_tuple);
+           apply_conflict = bdr_make_apply_conflict(
+               BdrConflictType_UpdateUpdate, resolution,
+               replication_origin_xid, rel, oldslot, local_node_id,
+               newslot, NULL /*no error*/);
+
+           bdr_conflict_log_serverlog(apply_conflict);
 
            bdr_count_update_conflict();
        }
@@ -765,11 +770,12 @@ process_remote_update(StringInfo s)
            do_apply_update(rel, estate, oldslot, newslot);
        }
 
+       /* Log conflict to table */
        if (log_update)
-           bdr_conflict_log(BdrConflictType_UpdateUpdate, resolution,
-                            replication_origin_xid, rel, oldslot,
-                            local_node_id, newslot, NULL /*no error*/);
-
+       {
+           bdr_conflict_log_table(apply_conflict);
+           bdr_conflict_logging_cleanup();
+       }
    }
    else
    {
@@ -780,6 +786,7 @@ process_remote_update(StringInfo s)
         */
 
        bool skip = false;
+       BdrApplyConflict *apply_conflict;
        BdrConflictResolution resolution;
 
        remote_tuple = heap_form_tuple(RelationGetDescr(rel->rel),
@@ -802,17 +809,20 @@ process_remote_update(StringInfo s)
                    (errmsg("UPDATE vs DELETE handler returned a row which"
                            " isn't allowed for now")));
 
-       resolution = user_tuple ? BdrConflictResolution_ConflictTriggerReturnedTuple :
-                                 BdrConflictResolution_ConflictTriggerSkipChange;
-
-       do_log_conflict(BdrConflictType_UpdateDelete, resolution,
-                       rel, InvalidRepNodeId, false, 0,
-                       remote_tuple, user_tuple);
+       if (skip)
+           resolution = BdrConflictResolution_ConflictTriggerSkipChange;
+       else if (user_tuple)
+           resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
+       else
+           resolution = BdrConflictResolution_DefaultSkipChange;
 
-       bdr_conflict_log(BdrConflictType_UpdateDelete, resolution,
-                        replication_origin_xid, rel, NULL,
-                        InvalidRepNodeId, newslot, NULL /*no error*/);
+       apply_conflict = bdr_make_apply_conflict(
+           BdrConflictType_UpdateDelete, resolution, replication_origin_xid,
+           rel, NULL, InvalidRepNodeId, newslot, NULL /*no error*/);
 
+       bdr_conflict_log_serverlog(apply_conflict);
+       bdr_conflict_log_table(apply_conflict);
+       bdr_conflict_logging_cleanup();
    }
 
    PopActiveSnapshot();
@@ -927,6 +937,9 @@ process_remote_delete(StringInfo s)
         * (This can also arise with an UPDATE that changes the PRIMARY KEY,
         * as that's effectively a DELETE + INSERT).
         */
+
+       BdrApplyConflict *apply_conflict;
+
        bdr_count_delete_conflict();
 
        /* Since the local tuple is missing, fill slot from the received data. */
@@ -937,14 +950,14 @@ process_remote_delete(StringInfo s)
            ExecStoreTuple(tup, oldslot, InvalidBuffer, true);
        }
 
-       do_log_conflict(BdrConflictType_DeleteDelete, BdrConflictResolution_ConflictTriggerSkipChange,
-                       rel, InvalidRepNodeId, false, 0,
-                       oldslot->tts_tuple, NULL);
-
-       bdr_conflict_log(BdrConflictType_DeleteDelete, BdrConflictResolution_ConflictTriggerSkipChange,
-                        replication_origin_xid, rel, NULL,
-                        InvalidRepNodeId, oldslot, NULL /*no error*/);
+       apply_conflict = bdr_make_apply_conflict(
+           BdrConflictType_DeleteDelete,
+           BdrConflictResolution_DefaultSkipChange, replication_origin_xid,
+           rel, NULL, InvalidRepNodeId, oldslot, NULL /*no error*/);
 
+       bdr_conflict_log_serverlog(apply_conflict);
+       bdr_conflict_log_table(apply_conflict);
+       bdr_conflict_logging_cleanup();
    }
 
    PopActiveSnapshot();
@@ -960,6 +973,111 @@ process_remote_delete(StringInfo s)
    CommandCounterIncrement();
 }
 
+/*
+ * Get commit timestamp and origin of the tuple
+ */
+static void
+get_local_tuple_origin(HeapTuple tuple, TimestampTz *commit_ts, RepNodeId *node_id)
+{
+   TransactionId   xmin;
+   CommitExtraData node_id_raw;
+
+   /* refetch tuple, check for old commit ts & origin */
+   xmin = HeapTupleHeaderGetXmin(tuple->t_data);
+
+   TransactionIdGetCommitTsData(xmin, commit_ts, &node_id_raw);
+   *node_id = node_id_raw;
+}
+
+/*
+ * Last update wins conflict handling.
+ */
+static void
+bdr_conflict_last_update_wins(RepNodeId local_node_id,
+                             RepNodeId remote_node_id,
+                             TimestampTz local_ts,
+                             TimestampTz remote_ts,
+                             bool *perform_update, bool *log_update,
+                             BdrConflictResolution *resolution)
+{
+   int         cmp;
+
+   cmp = timestamptz_cmp_internal(remote_ts, local_ts);
+   if (cmp > 0)
+   {
+       /* The most recent update is the remote one; apply it */
+       *perform_update = true;
+       *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+       return;
+   }
+   else if (cmp < 0)
+   {
+       /* The most recent update is the local one; retain it */
+       *log_update = true;
+       *perform_update = false;
+       *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+       return;
+   }
+   else
+   {
+       uint64      local_sysid,
+                   remote_origin_sysid;
+       TimeLineID  local_tli,
+                   remote_tli;
+       Oid         local_dboid,
+                   remote_origin_dboid;
+       /*
+        * Timestamps are equal. Use sysid + timeline id to decide which
+        * tuple to retain.
+        */
+       bdr_fetch_sysid_via_node_id(local_node_id,
+                                   &local_sysid, &local_tli,
+                                   &local_dboid);
+       bdr_fetch_sysid_via_node_id(remote_node_id,
+                                   &remote_origin_sysid, &remote_tli,
+                                   &remote_origin_dboid);
+
+       /*
+        * As the timestamps were equal, we have to break the tie in a
+        * consistent manner that'll match across all nodes.
+        *
+        * Use the ordering of the node's unique identifier, the tuple of
+        * (sysid, timelineid, dboid).
+        */
+       if (local_sysid < remote_origin_sysid)
+           *perform_update = true;
+       else if (local_sysid > remote_origin_sysid)
+           *perform_update = false;
+       else if (local_tli < remote_tli)
+           *perform_update = true;
+       else if (local_tli > remote_tli)
+           *perform_update = false;
+       else if (local_dboid < remote_origin_dboid)
+           *perform_update = true;
+       else if (local_dboid > remote_origin_dboid)
+           *perform_update = false;
+       else
+           /* shouldn't happen */
+           elog(ERROR, "unsuccessful node comparison");
+
+       /*
+        * We don't log whether we used timestamp, sysid or timeline id to
+        * decide which tuple to retain. That'll be in the log record
+        * anyway, so we can reconstruct the decision from the log record
+        * later.
+        */
+       if (*perform_update)
+       {
+           *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+       }
+       else
+       {
+           *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+           *log_update = true;
+       }
+   }
+}
+
 /*
  * Check whether a remote insert or update conflicts with the local row
  * version.
@@ -973,12 +1091,14 @@ process_remote_delete(StringInfo s)
  * is true. Its value is undefined if log_update is false.
  */
 static void
-check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
-                BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple,
-               HeapTuple *new_tuple, bool *perform_update, bool *log_update,
-               BdrConflictResolution *resolution)
+check_apply_update(BdrConflictType conflict_type,
+                  RepNodeId local_node_id, TimestampTz local_ts,
+                  BDRRelation *rel, HeapTuple local_tuple,
+                  HeapTuple remote_tuple, HeapTuple *new_tuple,
+                  bool *perform_update, bool *log_update,
+                  BdrConflictResolution *resolution)
 {
-   int         cmp, microsecs;
+   int         microsecs;
    long        secs;
 
    bool        skip = false;
@@ -1003,138 +1123,65 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
        *perform_update = true;
        return;
    }
-   else
+
+   /*
+    * Decide whether to keep the remote or local tuple based on a conflict
+    * trigger (if defined) or last-update-wins.
+    *
+    * If the caller doesn't provide storage for the conflict handler to
+    * store a new tuple in, don't fire any conflict triggers.
+    */
+
+   if (new_tuple)
    {
        /*
-        * Decide whether to keep the remote or local tuple based on a conflict
-        * trigger (if defined) or last-update-wins.
-        *
-        * If the caller doesn't provide storage for the conflict handler to
-        * store a new tuple in, don't fire any conflict triggers.
+        * --------------
+        * Conflict trigger conflict handling - let the user decide whether to:
+        * - Ignore the remote update;
+        * - Supply a new tuple to replace the current tuple; or
+        * - Take no action and fall through to the next handling option
+        * --------------
         */
 
-       if (new_tuple)
-       {
-           /*
-            * --------------
-            * Conflict trigger conflict handling - let the user decide whether to:
-            * - Ignore the remote update;
-            * - Supply a new tuple to replace the current tuple; or
-            * - Take no action and fall through to the next handling option
-            * --------------
-            */
-
-           TimestampDifference(replication_origin_timestamp, local_ts,
-                               &secs, &microsecs);
-
-           *new_tuple = bdr_conflict_handlers_resolve(rel, local_tuple,
-                                                      remote_tuple, "UPDATE",
-                                                      BdrConflictType_UpdateUpdate,
-                                                      abs(secs) * 1000000 + abs(microsecs),
-                                                      &skip);
-
-           if (skip)
-           {
-               *log_update = true;
-               *perform_update = false;
-               *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
-               return;
-           }
-           else if (*new_tuple)
-           {
-               /* Custom conflict handler returned tuple, log it. */
-               *log_update = true;
-               *perform_update = true;
-               *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
-               return;
-           }
+       abs_timestamp_difference(replication_origin_timestamp, local_ts,
+                                &secs, &microsecs);
 
-           /*
-            * if user decided not to skip the conflict but didn't provide a
-            * resolving tuple we fall back to default handling
-            */
-       }
+       *new_tuple = bdr_conflict_handlers_resolve(rel, local_tuple, remote_tuple,
+                                                  conflict_type == BdrConflictType_InsertInsert ?
+                                                  "INSERT" : "UPDATE",
+                                                  conflict_type,
+                                                  abs(secs) * 1000000 + abs(microsecs),
+                                                  &skip);
 
-       /* Last update wins conflict handling */
-       cmp = timestamptz_cmp_internal(replication_origin_timestamp, local_ts);
-       if (cmp > 0)
+       if (skip)
        {
-           /* The most recent update is the remote one; apply it */
-           *perform_update = true;
-           *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
-           return;
-       }
-       else if (cmp < 0)
-       {
-           /* The most recent update is the local one; retain it */
            *log_update = true;
            *perform_update = false;
-           *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
+           *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
            return;
        }
-       else if (cmp == 0)
+       else if (*new_tuple)
        {
-           uint64      local_sysid,
-                       remote_origin_sysid;
-           TimeLineID  local_tli,
-                       remote_tli;
-           Oid         local_dboid,
-                       remote_origin_dboid;
-           /*
-            * Timestamps are equal. Use sysid + timeline id to decide which
-            * tuple to retain.
-            */
-           bdr_fetch_sysid_via_node_id(local_node_id,
-                                       &local_sysid, &local_tli,
-                                       &local_dboid);
-           bdr_fetch_sysid_via_node_id(replication_origin_id,
-                                       &remote_origin_sysid, &remote_tli,
-                                       &remote_origin_dboid);
-
-           /*
-            * As the timestamps were equal, we have to break the tie in a
-            * consistent manner that'll match across all nodes.
-            *
-            * Use the ordering of the node's unique identifier, the tuple of
-            * (sysid, timelineid, dboid).
-            */
-           if (local_sysid < remote_origin_sysid)
-               *perform_update = true;
-           else if (local_sysid > remote_origin_sysid)
-               *perform_update = false;
-           else if (local_tli < remote_tli)
-               *perform_update = true;
-           else if (local_tli > remote_tli)
-               *perform_update = false;
-           else if (local_dboid < remote_origin_dboid)
-               *perform_update = true;
-           else if (local_dboid > remote_origin_dboid)
-               *perform_update = false;
-           else
-               /* shouldn't happen */
-               elog(ERROR, "unsuccessful node comparison");
-
-           /*
-            * We don't log whether we used timestamp, sysid or timeline id to
-            * decide which tuple to retain. That'll be in the log record
-            * anyway, so we can reconstruct the decision from the log record
-            * later.
-            */
-           if (*perform_update)
-           {
-               *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
-           }
-           else
-           {
-               *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
-               *log_update = true;
-           }
-
+           /* Custom conflict handler returned tuple, log it. */
+           *log_update = true;
+           *perform_update = true;
+           *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
            return;
        }
+
+       /*
+        * if user decided not to skip the conflict but didn't provide a
+        * resolving tuple we fall back to default handling
+        */
    }
 
-   elog(ERROR, "unreachable code");
+   /* Use last update wins conflict handling. */
+   bdr_conflict_last_update_wins(local_node_id,
+                                 replication_origin_id,
+                                 local_ts,
+                                 replication_origin_timestamp,
+                                 perform_update, log_update,
+                                 resolution);
 }
 
 static void
@@ -1258,131 +1305,6 @@ process_remote_message(StringInfo s)
 }
 
 
-static void
-do_log_conflict(BdrConflictType conflict_type,
-               BdrConflictResolution resolution,
-               BDRRelation *conflict_relation,
-               RepNodeId local_node_id,
-               bool apply_remote,
-               TimestampTz local_timestamp,
-               HeapTuple primary_key,
-               HeapTuple user_tuple)
-{
-   StringInfoData s_key,
-               s_user_tuple;
-   char        remote_ts[MAXDATELEN + 1];
-   char        local_ts[MAXDATELEN + 1];
-   const char *object_namespace;
-   const char *object_name;
-
-   uint64      local_sysid,
-               remote_origin_sysid;
-   TimeLineID  local_tli,
-               remote_tli;
-   Oid         local_dboid,
-               remote_origin_dboid;
-
-#define CONFLICT_MSG_PREFIX "CONFLICT: %s remote %s on relation %s.%s originating at node " UINT64_FORMAT ":%u:%u at ts %s;"
-
-   /*
-    * By default we only log conflicts where the remote change is discarded or
-    * where a conflict handler emits a replacement tuple.
-    *
-    * Optionally the user may request that we log even conflicts where the
-    * local tuple is replaced by a newer remote tuple. Most of the time these
-    * are just noise, but there are times it's useful for debugging and tracing.
-    */
-
-   bdr_fetch_sysid_via_node_id(local_node_id,
-                               &local_sysid, &local_tli,
-                               &local_dboid);
-   bdr_fetch_sysid_via_node_id(replication_origin_id,
-                               &remote_origin_sysid, &remote_tli,
-                               &remote_origin_dboid);
-
-   Assert(remote_origin_sysid == origin_sysid);
-   Assert(remote_tli == origin_timeline);
-   Assert(remote_origin_dboid == origin_dboid);
-
-   memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
-          MAXDATELEN);
-   memcpy(local_ts, timestamptz_to_str(local_timestamp),
-          MAXDATELEN);
-
-   initStringInfo(&s_key);
-   tuple_to_stringinfo(&s_key, RelationGetDescr(conflict_relation->rel), primary_key);
-
-   object_namespace = get_namespace_name(RelationGetNamespace(conflict_relation->rel));
-   object_name = RelationGetRelationName(conflict_relation->rel);
-
-   switch(conflict_type)
-   {
-       case BdrConflictType_InsertInsert:
-               ereport(LOG,
-                       (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                        errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
-                               apply_remote ? "applying" : "skipping",
-                               apply_remote ? "INSERT as UPDATE" : "INSERT",
-                               object_namespace, object_name,
-                               remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
-                               local_node_id == InvalidRepNodeId ? "local" : "remote",
-                               local_sysid, local_tli, local_ts, s_key.data)));
-           break;
-       case BdrConflictType_InsertUpdate:
-           /* XXX? */
-           break;
-       case BdrConflictType_UpdateUpdate:
-           {
-               if (user_tuple != NULL)
-               {
-                   /* Conflict handler returned new tuple, need to report it */
-                   initStringInfo(&s_user_tuple);
-                   tuple_to_stringinfo(&s_user_tuple, RelationGetDescr(conflict_relation->rel),
-                                       user_tuple);
-
-                   ereport(LOG,
-                           (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                            errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s, resolved by user tuple:%s",
-                                   apply_remote ? "applying" : "skipping", "UPDATE",
-                                   object_namespace, object_name,
-                                   remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
-                                   local_node_id == InvalidRepNodeId ? "local" : "remote",
-                                   local_sysid, local_tli, local_ts, s_key.data,
-                                   s_user_tuple.data)));
-               }
-               else
-               {
-                   /* Handled by last update wins, or conflict handler w/o new tuple */
-                   ereport(LOG,
-                           (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                            errmsg(CONFLICT_MSG_PREFIX " row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
-                                   apply_remote ? "applying" : "skipping", "UPDATE",
-                                   object_namespace, object_name,
-                                   remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
-                                   local_node_id == InvalidRepNodeId ? "local" : "remote",
-                                   local_sysid, local_tli, local_ts, s_key.data)));
-               }
-           }
-           break;
-       case BdrConflictType_UpdateDelete:
-       case BdrConflictType_DeleteDelete:
-           ereport(LOG,
-                   (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                    errmsg(CONFLICT_MSG_PREFIX " could not find existing tuple. PKEY:%s",
-                           "skipping", BdrConflictType_UpdateDelete ? "UPDATE" : "DELETE",
-                           object_namespace, object_name,
-                           remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
-                           s_key.data)));
-
-           break;
-       case BdrConflictType_UnhandledTxAbort:
-           /* XXX? */
-           break;
-   }
-
-   resetStringInfo(&s_key);
-}
-
 static void
 do_apply_update(BDRRelation *rel, EState *estate, TupleTableSlot *oldslot,
                TupleTableSlot *newslot)
@@ -1937,92 +1859,6 @@ read_rel(StringInfo s, LOCKMODE mode)
    return bdr_heap_open(relid, NoLock);
 }
 
-/* print the tuple 'tuple' into the StringInfo s */
-static void
-tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
-{
-   int         natt;
-   Oid         oid;
-
-   /* print oid of tuple, it's not included in the TupleDesc */
-   if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
-   {
-       appendStringInfo(s, " oid[oid]:%u", oid);
-   }
-
-   /* print all columns individually */
-   for (natt = 0; natt < tupdesc->natts; natt++)
-   {
-       Form_pg_attribute attr; /* the attribute itself */
-       Oid         typid;      /* type of current attribute */
-       HeapTuple   type_tuple; /* information about a type */
-       Form_pg_type type_form;
-       Oid         typoutput;  /* output function */
-       bool        typisvarlena;
-       Datum       origval;    /* possibly toasted Datum */
-       Datum       val;        /* definitely detoasted Datum */
-       char       *outputstr = NULL;
-       bool        isnull;     /* column is null? */
-
-       attr = tupdesc->attrs[natt];
-
-       /*
-        * don't print dropped columns, we can't be sure everything is
-        * available for them
-        */
-       if (attr->attisdropped)
-           continue;
-
-       /*
-        * Don't print system columns
-        */
-       if (attr->attnum < 0)
-           continue;
-
-       typid = attr->atttypid;
-
-       /* gather type name */
-       type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
-       if (!HeapTupleIsValid(type_tuple))
-           elog(ERROR, "cache lookup failed for type %u", typid);
-       type_form = (Form_pg_type) GETSTRUCT(type_tuple);
-
-       /* print attribute name */
-       appendStringInfoChar(s, ' ');
-       appendStringInfoString(s, NameStr(attr->attname));
-
-       /* print attribute type */
-       appendStringInfoChar(s, '[');
-       appendStringInfoString(s, NameStr(type_form->typname));
-       appendStringInfoChar(s, ']');
-
-       /* query output function */
-       getTypeOutputInfo(typid,
-                         &typoutput, &typisvarlena);
-
-       ReleaseSysCache(type_tuple);
-
-       /* get Datum from tuple */
-       origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
-
-       if (isnull)
-           outputstr = "(null)";
-       else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
-           outputstr = "(unchanged-toast-datum)";
-       else if (typisvarlena)
-           val = PointerGetDatum(PG_DETOAST_DATUM(origval));
-       else
-           val = origval;
-
-       /* print data */
-       if (outputstr == NULL)
-           outputstr = OidOutputFunctionCall(typoutput, val);
-
-       appendStringInfoChar(s, ':');
-       appendStringInfoString(s, outputstr);
-   }
-}
-
 /*
  * Read a remote action type and process the action record.
  *
@@ -2218,6 +2054,37 @@ bdr_send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force)
    return true;
 }
 
+/*
+ * abs_timestamp_difference -- convert the difference between two timestamps
+ *     into integer seconds and microseconds
+ *
+ * The result is always the absolute (pozitive difference), so the order
+ * of input is not important.
+ *
+ * If either input is not finite, we return zeroes.
+ */
+static void
+abs_timestamp_difference(TimestampTz start_time, TimestampTz stop_time,
+                   long *secs, int *microsecs)
+{
+   if (TIMESTAMP_NOT_FINITE(start_time) || TIMESTAMP_NOT_FINITE(stop_time))
+   {
+       *secs = 0;
+       *microsecs = 0;
+   }
+   else
+   {
+       TimestampTz diff = abs(stop_time - start_time);
+#ifdef HAVE_INT64_TIMESTAMP
+       *secs = (long) (diff / USECS_PER_SEC);
+       *microsecs = (int) (diff % USECS_PER_SEC);
+#else
+       *secs = (long) diff;
+       *microsecs = (int) ((diff - *secs) * 1000000.0);
+#endif
+   }
+}
+
 /*
  * The actual main loop of a BDR apply worker.
  */
index 99090d19686aa72b9e17510a50433a2cd8fcbb68..00b514a14d88fa6d6e345121de53f2d64188af57 100644 (file)
@@ -36,7 +36,6 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
-#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
index 1a37a656b5eeacab113fe7e92ab375103811159a..e6c58e3342b0fb6d0c5023693ed1ad3f1059ec9a 100644 (file)
@@ -13,8 +13,6 @@
 
 #include "commands/sequence.h"
 
-#include "replication/replication_identifier.h"
-
 #include "tcop/tcopprot.h"
 
 #include "utils/builtins.h"
@@ -24,6 +22,8 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/syscache.h"
+#include "utils/typcache.h"
+
 
 /* GUCs */
 bool bdr_log_conflicts_to_table = false;
@@ -42,33 +42,8 @@ extern BdrApplyWorker *bdr_apply_worker;
 #define BDR_CONFLICT_HISTORY_COLS 30
 #define SYSID_DIGITS 33
 
-/*
- * Details of a conflict detected by an apply process, destined for logging
- * output and/or conflict triggers.
- *
- * Closely related to bdr.bdr_conflict_history SQL table.
- */
-typedef struct BdrApplyConflict
-{
-   TransactionId           local_conflict_txid;
-   XLogRecPtr              local_conflict_lsn;
-   TimestampTz             local_conflict_time;
-   const char             *object_schema; /* unused if apply_error */
-   const char             *object_name;   /* unused if apply_error */
-   uint64                  remote_sysid;
-   TransactionId           remote_txid;
-   TimestampTz             remote_commit_time;
-   XLogRecPtr              remote_commit_lsn;
-   BdrConflictType         conflict_type;
-   BdrConflictResolution   conflict_resolution;
-   bool                    local_tuple_null;
-   Datum                   local_tuple;    /* composite */
-   TransactionId           local_tuple_xmin;
-   uint64                  local_tuple_origin_sysid; /* init to 0 if unknown */
-   bool                    remote_tuple_null;
-   Datum                   remote_tuple;   /* composite */
-   ErrorData              *apply_error;
-} BdrApplyConflict;
+/* We want our own memory ctx to clean up easily & reliably */
+MemoryContext conflict_log_context;
 
 /*
  * Perform syscache lookups etc for BDR conflict logging.
@@ -84,6 +59,10 @@ bdr_conflict_logging_startup()
 {
    Oid schema_oid;
 
+   conflict_log_context = AllocSetContextCreate(CurrentMemoryContext,
+       "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
+       ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
+
    StartTransactionCommand();
 
    schema_oid = get_namespace_oid("bdr", false);
@@ -102,6 +81,17 @@ bdr_conflict_logging_startup()
    CommitTransactionCommand();
 }
 
+/*
+ * Cleanup our memory context.
+ */
+void
+bdr_conflict_logging_cleanup(void)
+{
+   if (conflict_log_context)
+       MemoryContextResetAndDeleteChildren(conflict_log_context);
+}
+
+
 /* Get the enum oid for a given BdrConflictType */
 static Datum
 bdr_conflict_type_get_datum(BdrConflictType conflict_type)
@@ -139,11 +129,10 @@ bdr_conflict_type_get_datum(BdrConflictType conflict_type)
    return conflict_type_oid;
 }
 
-/* Get the enum oid for a given BdrConflictResolution */
-static Datum
-bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+/* Get the enum name for a given BdrConflictResolution */
+static char *
+bdr_conflict_resolution_get_name(BdrConflictResolution conflict_resolution)
 {
-   Oid conflict_resolution_oid;
    char *enumname = NULL;
 
    switch (conflict_resolution)
@@ -160,11 +149,29 @@ bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
        case BdrConflictResolution_LastUpdateWins_KeepRemote:
            enumname = "last_update_wins_keep_remote";
            break;
+       case BdrConflictResolution_DefaultApplyChange:
+           enumname = "apply_change";
+           break;
+       case BdrConflictResolution_DefaultSkipChange:
+           enumname = "skip_change";
+           break;
        case BdrConflictResolution_UnhandledTxAbort:
            enumname = "unhandled_tx_abort";
            break;
    }
+
    Assert(enumname != NULL);
+   return enumname;
+}
+
+/* Get the enum oid for a given BdrConflictResolution */
+static Datum
+bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+{
+   Oid conflict_resolution_oid;
+
+   char *enumname = bdr_conflict_resolution_get_name(conflict_resolution);
+
    conflict_resolution_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
        BdrConflictResolutionOid, CStringGetDatum(enumname));
    if (conflict_resolution_oid == InvalidOid)
@@ -202,6 +209,121 @@ bdr_conflict_row_to_json(Datum row, bool row_isnull, bool *ret_isnull)
    return row_json;
 }
 
+/* print the tuple 'tuple' into the StringInfo s */
+void
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
+{
+   int         natt;
+   Oid         oid;
+
+   /* print oid of tuple, it's not included in the TupleDesc */
+   if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
+   {
+       appendStringInfo(s, " oid[oid]:%u", oid);
+   }
+
+   /* print all columns individually */
+   for (natt = 0; natt < tupdesc->natts; natt++)
+   {
+       Form_pg_attribute attr; /* the attribute itself */
+       Oid         typid;      /* type of current attribute */
+       HeapTuple   type_tuple; /* information about a type */
+       Form_pg_type type_form;
+       Oid         typoutput;  /* output function */
+       bool        typisvarlena;
+       Datum       origval;    /* possibly toasted Datum */
+       Datum       val;        /* definitely detoasted Datum */
+       char       *outputstr = NULL;
+       bool        isnull;     /* column is null? */
+
+       attr = tupdesc->attrs[natt];
+
+       /*
+        * don't print dropped columns, we can't be sure everything is
+        * available for them
+        */
+       if (attr->attisdropped)
+           continue;
+
+       /*
+        * Don't print system columns
+        */
+       if (attr->attnum < 0)
+           continue;
+
+       typid = attr->atttypid;
+
+       /* gather type name */
+       type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+       if (!HeapTupleIsValid(type_tuple))
+           elog(ERROR, "cache lookup failed for type %u", typid);
+       type_form = (Form_pg_type) GETSTRUCT(type_tuple);
+
+       /* print attribute name */
+       appendStringInfoChar(s, ' ');
+       appendStringInfoString(s, NameStr(attr->attname));
+
+       /* print attribute type */
+       appendStringInfoChar(s, '[');
+       appendStringInfoString(s, NameStr(type_form->typname));
+       appendStringInfoChar(s, ']');
+
+       /* query output function */
+       getTypeOutputInfo(typid,
+                         &typoutput, &typisvarlena);
+
+       ReleaseSysCache(type_tuple);
+
+       /* get Datum from tuple */
+       origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
+
+       if (isnull)
+           outputstr = "(null)";
+       else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+           outputstr = "(unchanged-toast-datum)";
+       else if (typisvarlena)
+           val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+       else
+           val = origval;
+
+       /* print data */
+       if (outputstr == NULL)
+           outputstr = OidOutputFunctionCall(typoutput, val);
+
+       appendStringInfoChar(s, ':');
+       appendStringInfoString(s, outputstr);
+   }
+}
+
+
+static void
+row_to_stringinfo(StringInfo s, Datum composite)
+{
+   HeapTupleHeader td;
+   Oid         tupType;
+   int32       tupTypmod;
+   TupleDesc   tupdesc;
+   HeapTupleData tmptup,
+              *tuple;
+
+   td = DatumGetHeapTupleHeader(composite);
+
+   /* Extract rowtype info and find a tupdesc */
+   tupType = HeapTupleHeaderGetTypeId(td);
+   tupTypmod = HeapTupleHeaderGetTypMod(td);
+   tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
+
+   /* Build a temporary HeapTuple control structure */
+   tmptup.t_len = HeapTupleHeaderGetDatumLength(td);
+   tmptup.t_data = td;
+   tuple = &tmptup;
+
+   /* Print the tuple to stringinfo */
+   tuple_to_stringinfo(s, tupdesc, tuple);
+
+   ReleaseTupleDesc(tupdesc);
+}
+
 static void
 bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx,
                        const char *in_str)
@@ -223,7 +345,7 @@ bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx,
  *
  * The change will then be replicated to other nodes.
  */
-static void
+void
 bdr_conflict_log_table(BdrApplyConflict *conflict)
 {
    Datum           values[BDR_CONFLICT_HISTORY_COLS];
@@ -239,6 +361,12 @@ bdr_conflict_log_table(BdrApplyConflict *conflict)
    char            remote_sysid[SYSID_DIGITS];
    char            origin_sysid[SYSID_DIGITS];
 
+   if (IsAbortedTransactionBlockState())
+       elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+
+   if (!IsTransactionState())
+       elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+
    if (!bdr_log_conflicts_to_table)
        /* No logging enabled and we don't own any memory, just bail */
        return;
@@ -391,6 +519,67 @@ bdr_conflict_log_table(BdrApplyConflict *conflict)
    FreeExecutorState(log_estate);
 }
 
+/*
+ * Log a BDR apply conflict to the postgreql log.
+ */
+void
+bdr_conflict_log_serverlog(BdrApplyConflict *conflict)
+{
+   StringInfoData  s_key;
+   char           *resolution_name;
+
+#define CONFLICT_MSG_PREFIX "CONFLICT: remote %s on relation %s.%s originating at node " UINT64_FORMAT ":%u:%u at ts %s;"
+
+   /* Create text representation of the PKEY tuple */
+   initStringInfo(&s_key);
+   if (!conflict->local_tuple_null)
+       row_to_stringinfo(&s_key, conflict->local_tuple);
+
+   resolution_name = bdr_conflict_resolution_get_name(conflict->conflict_resolution);
+
+   switch(conflict->conflict_type)
+   {
+       case BdrConflictType_InsertInsert:
+       case BdrConflictType_UpdateUpdate:
+           ereport(LOG,
+                   (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+                    errmsg(CONFLICT_MSG_PREFIX " row was previously updated at node " UINT64_FORMAT ":%u. Resolution: %s; PKEY:%s",
+                           BdrConflictType_InsertInsert ? "INSERT" : "UPDATE",
+                           conflict->object_schema, conflict->object_name,
+                           conflict->remote_sysid, conflict->remote_tli,
+                           conflict->remote_dboid,
+                           timestamptz_to_str(conflict->remote_commit_time),
+                           conflict->local_tuple_origin_sysid,
+                           conflict->local_tuple_origin_tli,
+                           resolution_name,
+                           s_key.data)));
+           break;
+       case BdrConflictType_UpdateDelete:
+       case BdrConflictType_DeleteDelete:
+           ereport(LOG,
+                   (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+                    errmsg(CONFLICT_MSG_PREFIX " could not find existing row. Resolution: %s; PKEY:%s",
+                           BdrConflictType_UpdateDelete ? "UPDATE" : "DELETE",
+                           conflict->object_schema, conflict->object_name,
+                           conflict->remote_sysid, conflict->remote_tli,
+                           conflict->remote_dboid,
+                           timestamptz_to_str(conflict->remote_commit_time),
+                           resolution_name,
+                           s_key.data)));
+
+           break;
+       case BdrConflictType_InsertUpdate:
+           /* XXX? */
+           break;
+       case BdrConflictType_UnhandledTxAbort:
+           /* XXX? */
+           break;
+   }
+
+   resetStringInfo(&s_key);
+}
+
+
 /*
  * Log a BDR apply conflict to the bdr.bdr_conflict_history table and/or
  * system log.
@@ -417,105 +606,100 @@ bdr_conflict_log_table(BdrApplyConflict *conflict)
  * TupleTableSlot for the tuple. It isn't done here because the caller
  * frequently already has the node id to hand.
  */
-void
-bdr_conflict_log(BdrConflictType conflict_type,
-                BdrConflictResolution resolution, TransactionId remote_txid,
-                BDRRelation *conflict_relation, TupleTableSlot *local_tuple,
-                RepNodeId local_tuple_origin_id, TupleTableSlot *remote_tuple,
-                ErrorData *apply_error)
+BdrApplyConflict *
+bdr_make_apply_conflict(BdrConflictType conflict_type,
+                       BdrConflictResolution resolution,
+                       TransactionId remote_txid,
+                       BDRRelation *conflict_relation,
+                       TupleTableSlot *local_tuple,
+                       RepNodeId local_tuple_origin_id,
+                       TupleTableSlot *remote_tuple,
+                       ErrorData *apply_error)
 {
-   MemoryContext   log_context, old_context;
-   BdrApplyConflict conflict;
-   TimeLineID tli;
-   Oid dboid;
-
-   if (IsAbortedTransactionBlockState())
-       elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+   MemoryContext   old_context;
+   BdrApplyConflict *conflict;
 
-   if (!IsTransactionState())
-       elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+   old_context = MemoryContextSwitchTo(conflict_log_context);
 
-   /* We want our own memory ctx to clean up easily & reliably */
-   log_context = AllocSetContextCreate(CurrentMemoryContext,
-       "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
-       ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
-   old_context = MemoryContextSwitchTo(log_context);
+   conflict = palloc0(sizeof(BdrApplyConflict));
 
    /* Populate the conflict record we're going to log */
-   conflict.conflict_type = conflict_type;
-   conflict.conflict_resolution = resolution;
+   conflict->conflict_type = conflict_type;
+   conflict->conflict_resolution = resolution;
 
-   conflict.local_conflict_txid = GetTopTransactionIdIfAny();
-   conflict.local_conflict_lsn = GetXLogInsertRecPtr();
-   conflict.local_conflict_time = GetCurrentTimestamp();
-   conflict.remote_txid = remote_txid;
+   conflict->local_conflict_txid = GetTopTransactionIdIfAny();
+   conflict->local_conflict_lsn = GetXLogInsertRecPtr();
+   conflict->local_conflict_time = GetCurrentTimestamp();
+   conflict->remote_txid = remote_txid;
 
    /* set using bdr_conflict_setrel */
    if (conflict_relation == NULL)
    {
-       conflict.object_schema = NULL;
-       conflict.object_name = NULL;
+       conflict->object_schema = NULL;
+       conflict->object_name = NULL;
    }
    else
    {
-       conflict.object_name = RelationGetRelationName(conflict_relation->rel);
-       conflict.object_schema =
+       conflict->object_name = RelationGetRelationName(conflict_relation->rel);
+       conflict->object_schema =
            get_namespace_name(RelationGetNamespace(conflict_relation->rel));
    }
 
    /* TODO: May make sense to cache the remote sysid in a global too... */
    bdr_fetch_sysid_via_node_id(replication_origin_id,
-           &conflict.remote_sysid, &tli, &dboid);
-   conflict.remote_commit_time = replication_origin_timestamp;
-   conflict.remote_txid = remote_txid;
-   conflict.remote_commit_lsn = replication_origin_lsn;
+                               &conflict->remote_sysid,
+                               &conflict->remote_tli,
+                               &conflict->remote_dboid);
+   conflict->remote_commit_time = replication_origin_timestamp;
+   conflict->remote_txid = remote_txid;
+   conflict->remote_commit_lsn = replication_origin_lsn;
 
    if (local_tuple != NULL)
    {
        /* Log local tuple xmin even if actual tuple value logging is off */
-       conflict.local_tuple_xmin =
+       conflict->local_tuple_xmin =
            HeapTupleHeaderGetXmin(local_tuple->tts_tuple->t_data);
-       Assert(conflict.local_tuple_xmin >= FirstNormalTransactionId ||
-              conflict.local_tuple_xmin == FrozenTransactionId);
+       Assert(conflict->local_tuple_xmin >= FirstNormalTransactionId ||
+              conflict->local_tuple_xmin == FrozenTransactionId);
        if (bdr_conflict_logging_include_tuples)
        {
-           conflict.local_tuple = ExecFetchSlotTupleDatum(local_tuple);
-           conflict.local_tuple_null = false;
+           conflict->local_tuple = ExecFetchSlotTupleDatum(local_tuple);
+           conflict->local_tuple_null = false;
        }
    }
    else
    {
-       conflict.local_tuple_null = true;
-       conflict.local_tuple = (Datum) 0;
-       conflict.local_tuple_xmin = InvalidTransactionId;
+       conflict->local_tuple_null = true;
+       conflict->local_tuple = (Datum) 0;
+       conflict->local_tuple_xmin = InvalidTransactionId;
    }
 
    if (local_tuple_origin_id != InvalidRepNodeId)
    {
        bdr_fetch_sysid_via_node_id(local_tuple_origin_id,
-                                   &conflict.local_tuple_origin_sysid, &tli,
-                                   &dboid);
+                                   &conflict->local_tuple_origin_sysid,
+                                   &conflict->local_tuple_origin_tli,
+                                   &conflict->local_tuple_origin_dboid);
    }
    else
    {
-       conflict.local_tuple_origin_sysid = 0;
+       conflict->local_tuple_origin_sysid = 0;
    }
 
    if (remote_tuple != NULL && bdr_conflict_logging_include_tuples)
    {
-       conflict.remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
-       conflict.remote_tuple_null = false;
+       conflict->remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
+       conflict->remote_tuple_null = false;
    }
    else
    {
-       conflict.remote_tuple_null = true;
-       conflict.remote_tuple = (Datum) 0;
+       conflict->remote_tuple_null = true;
+       conflict->remote_tuple = (Datum) 0;
    }
 
-   conflict.apply_error = apply_error;
-
-   bdr_conflict_log_table(&conflict);
+   conflict->apply_error = apply_error;
 
    MemoryContextSwitchTo(old_context);
-   MemoryContextDelete(log_context);
+
+   return conflict;
 }
index 82db80dda7fbeb8cac20e00a534ba68897dd171d..6ba7b0f9293246f80f6e99cb8b3955858b9a9e15 100644 (file)
@@ -51,15 +51,15 @@ a              b              c
 step s1h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
 step s2h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
 step s3h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
-public         test_dmlconflictdelete_delete  conflict_trigger_skip_change               {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
+public         test_dmlconflictdelete_delete  skip_change                   {"a":null,"b":1,"c":null}               
index af8490e94d13b3884fa8598fa4b4094208c8edd5..7da73e50714612aa7e37c5c212b0d6663cb50bb8 100644 (file)
@@ -51,13 +51,13 @@ a              b              c
 step s1h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictupdate_delete  conflict_trigger_skip_change               {"a":"y","b":1,"c":"bar"}               
-public         test_dmlconflictupdate_delete  conflict_trigger_skip_change               {"a":"y","b":1,"c":"baz"}               
+public         test_dmlconflictupdate_delete  skip_change                   {"a":"y","b":1,"c":"bar"}               
+public         test_dmlconflictupdate_delete  skip_change                   {"a":"y","b":1,"c":"baz"}               
 step s2h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictupdate_delete  conflict_trigger_skip_change               {"a":"y","b":1,"c":"baz"}               
+public         test_dmlconflictupdate_delete  skip_change                   {"a":"y","b":1,"c":"baz"}               
 step s3h: SELECT object_schema, object_name, conflict_type, conflict_resolution, local_tuple, remote_tuple, error_sqlstate FROM bdr.bdr_conflict_history ORDER BY conflict_id;
 object_schema  object_name    conflict_type  conflict_resolutionlocal_tuple    remote_tuple   error_sqlstate 
 
-public         test_dmlconflictupdate_delete  conflict_trigger_skip_change               {"a":"y","b":1,"c":"bar"}               
+public         test_dmlconflictupdate_delete  skip_change                   {"a":"y","b":1,"c":"bar"}