Preserve required !catalog tuples while computing initial decoding snapshot.
authorAndres Freund <andres@anarazel.de>
Mon, 24 Apr 2017 03:41:29 +0000 (20:41 -0700)
committerAndres Freund <andres@anarazel.de>
Thu, 27 Apr 2017 20:13:37 +0000 (13:13 -0700)
The logical decoding machinery already preserved all the required
catalog tuples, which is sufficient in the course of normal logical
decoding, but did not guarantee that non-catalog tuples were preserved
during computation of the initial snapshot when creating a slot over
the replication protocol.

This could cause a corrupted initial snapshot being exported.  The
time window for issues is usually not terribly large, but on a busy
server it's perfectly possible to it hit it.  Ongoing decoding is not
affected by this bug.

To avoid increased overhead for the SQL API, only retain additional
tuples when a logical slot is being created over the replication
protocol.  To do so this commit changes the signature of
CreateInitDecodingContext(), but it seems unlikely that it's being
used in an extension, so that's probably ok.

In a drive-by fix, fix handling of
ReplicationSlotsComputeRequiredXmin's already_locked argument, which
should only apply to ProcArrayLock, not ReplicationSlotControlLock.

Reported-By: Erik Rijkers
Analyzed-By: Petr Jelinek
Author: Petr Jelinek, heavily editorialized by Andres Freund
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.com
Backport: 9.4, where logical decoding was introduced.

src/backend/replication/logical/logical.c
src/backend/replication/logical/snapbuild.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/storage/ipc/procarray.c
src/include/replication/logical.h
src/include/storage/procarray.h

index 24e5960e29f0896501585f15e5ae81a7bd179524..e32f773e187e718225c08604c3e7e82751cbce60 100644 (file)
@@ -208,6 +208,7 @@ StartupDecodingContext(List *output_plugin_options,
 LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
+                                                 bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  LogicalOutputPluginWriterWrite do_write)
@@ -310,23 +311,31 @@ CreateInitDecodingContext(char *plugin,
         * the slot machinery about the new limit. Once that's done the
         * ProcArrayLock can be released as the slot machinery now is
         * protecting against vacuum.
+        *
+        * Note that, temporarily, the data, not just the catalog, xmin has to be
+        * reserved if a data snapshot is to be exported.  Otherwise the initial
+        * data snapshot created here is not guaranteed to be valid. After that
+        * the data xmin doesn't need to be managed anymore and the global xmin
+        * should be recomputed. As we are fine with losing the pegged data xmin
+        * after crash - no chance a snapshot would get exported anymore - we can
+        * get away with just setting the slot's
+        * effective_xmin. ReplicationSlotRelease will reset it again.
+        *
         * ----
         */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
-       slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
-       slot->data.catalog_xmin = slot->effective_catalog_xmin;
+       xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
+
+       slot->effective_catalog_xmin = xmin_horizon;
+       slot->data.catalog_xmin = xmin_horizon;
+       if (need_full_snapshot)
+               slot->effective_xmin = xmin_horizon;
 
        ReplicationSlotsComputeRequiredXmin(true);
 
        LWLockRelease(ProcArrayLock);
 
-       /*
-        * tell the snapshot builder to only assemble snapshot once reaching the a
-        * running_xact's record with the respective xmin.
-        */
-       xmin_horizon = slot->data.catalog_xmin;
-
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
 
index 513c5c37c66c7314acf99153affcd2de2d6bd045..e70e525404c50bc3e5117d294de6e7e416a579e3 100644 (file)
@@ -553,6 +553,18 @@ SnapBuildExportSnapshot(SnapBuild *builder)
         * mechanism. Due to that we can do this without locks, we're only
         * changing our own value.
         */
+#ifdef USE_ASSERT_CHECKING
+       {
+               TransactionId safeXid;
+
+               LWLockAcquire(ProcArrayLock, LW_SHARED);
+               safeXid = GetOldestSafeDecodingTransactionId(true);
+               LWLockRelease(ProcArrayLock);
+
+               Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
+       }
+#endif
+
        MyPgXact->xmin = snap->xmin;
 
        /* allocate in transaction context */
index 8ae0acd61f58d46c65a72c13970fe407409a99ca..12779029102070abe31609f5be3c34a37eda2e36 100644 (file)
@@ -396,6 +396,22 @@ ReplicationSlotRelease(void)
                SpinLockRelease(&slot->mutex);
        }
 
+
+       /*
+        * If slot needed to temporarily restrain both data and catalog xmin to
+        * create the catalog snapshot, remove that temporary constraint.
+        * Snapshots can only be exported while the initial snapshot is still
+        * acquired.
+        */
+       if (!TransactionIdIsValid(slot->data.xmin) &&
+               TransactionIdIsValid(slot->effective_xmin))
+       {
+               SpinLockAcquire(&slot->mutex);
+               slot->effective_xmin = InvalidTransactionId;
+               SpinLockRelease(&slot->mutex);
+               ReplicationSlotsComputeRequiredXmin(false);
+       }
+
        MyReplicationSlot = NULL;
 
        /* might not have been set when we've been a plain slot */
@@ -580,6 +596,9 @@ ReplicationSlotPersist(void)
 
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
+ *
+ * If already_locked is true, ProcArrayLock has already been acquired
+ * exclusively.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -590,8 +609,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 
        Assert(ReplicationSlotCtl != NULL);
 
-       if (!already_locked)
-               LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
        for (i = 0; i < max_replication_slots; i++)
        {
@@ -624,8 +642,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                        agg_catalog_xmin = effective_catalog_xmin;
        }
 
-       if (!already_locked)
-               LWLockRelease(ReplicationSlotControlLock);
+       LWLockRelease(ReplicationSlotControlLock);
 
        ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
 }
index 348c7fe9fce08023f97f41fe81f783d31f82efb7..da874d5ada1c496af011ebd0f1e8b4a1a3ff62ab 100644 (file)
@@ -109,8 +109,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
        /*
         * Create logical decoding context, to build the initial snapshot.
         */
-       ctx = CreateInitDecodingContext(
-                                                                       NameStr(*plugin), NIL,
+       ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+                                                                       false, /* do not build snapshot */
                                                                        logical_read_local_xlog_page, NULL, NULL);
 
        /* build initial snapshot, might take a while */
index b4c5a7d3b3fa57bbd247bbbbb6b69d41c0986a53..31e9c8ad6def8e4b17f6cfe6e73b827b51dcad6d 100644 (file)
@@ -812,6 +812,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                LogicalDecodingContext *ctx;
 
                ctx = CreateInitDecodingContext(cmd->plugin, NIL,
+                                                                               true, /* build snapshot */
                                                                                logical_read_xlog_page,
                                                                                WalSndPrepareWrite, WalSndWriteData);
 
index 4f3c5c9dec9c128ea9ee8f04b5a52d9f3bb6c3e3..12b74da41dea4bc31547d72aef8ee6025488210d 100644 (file)
@@ -1978,7 +1978,7 @@ GetOldestActiveTransactionId(void)
  * that the caller will immediately use the xid to peg the xmin horizon.
  */
 TransactionId
-GetOldestSafeDecodingTransactionId(void)
+GetOldestSafeDecodingTransactionId(bool catalogOnly)
 {
        ProcArrayStruct *arrayP = procArray;
        TransactionId oldestSafeXid;
@@ -2001,9 +2001,17 @@ GetOldestSafeDecodingTransactionId(void)
        /*
         * If there's already a slot pegging the xmin horizon, we can start with
         * that value, it's guaranteed to be safe since it's computed by this
-        * routine initially and has been enforced since.
+        * routine initially and has been enforced since.  We can always use the
+        * slot's general xmin horizon, but the catalog horizon is only usable
+        * when we only catalog data is going to be looked at.
         */
-       if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
+       if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
+               TransactionIdPrecedes(procArray->replication_slot_xmin,
+                                                         oldestSafeXid))
+               oldestSafeXid = procArray->replication_slot_xmin;
+
+       if (catalogOnly &&
+               TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
                TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
                                                          oldestSafeXid))
                oldestSafeXid = procArray->replication_slot_catalog_xmin;
index dfdbe6535f1295b8e3b81170bd009dde274c142f..73b2836064408bce53ef3c33fc6fc39c984d32c7 100644 (file)
@@ -79,6 +79,7 @@ extern void CheckLogicalDecodingRequirements(void);
 
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
+                                                 bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  LogicalOutputPluginWriterWrite do_write);
index a9b40ed944f7e8dbb526efb05eaa2c4d75746715..d40ef3eb570eec5cc6efedc3f10215bfac094558 100644 (file)
@@ -54,7 +54,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
 extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
 extern TransactionId GetOldestActiveTransactionId(void);
-extern TransactionId GetOldestSafeDecodingTransactionId(void);
+extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
 
 extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
 extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);