wal_decoding: mergme: implement "caught-up" logic for LLSR
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 17 May 2013 17:40:33 +0000 (13:40 -0400)
committerAndres Freund <andres@anarazel.de>
Mon, 27 May 2013 22:19:46 +0000 (00:19 +0200)
This lets walsender terminate nicely when a signal from postmaster
arrives.

src/backend/replication/walsender.c

index ae094e9dbf740ce4aa3802206a5dc497d4f84839..b66e1376deb3d6c84137f804309212c3ad7aa26a 100644 (file)
@@ -155,6 +155,9 @@ static bool ping_sent = false;
 static bool    streamingDoneSending;
 static bool    streamingDoneReceiving;
 
+/* Are we there yet? */
+static bool        WalSndCaughtUp = false;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t walsender_ready_to_stop = false;
@@ -180,12 +183,13 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-typedef void (*WalSndSendData)(bool *);
+typedef void (*WalSndSendData)(void);
 static void WalSndLoop(WalSndSendData send_data);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogSendPhysical(bool *caughtup);
-static void XLogSendLogical(bool *caughtup);
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendData send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -685,7 +689,7 @@ replay_read_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
                 XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
 {
    XLogRecPtr flushptr;
-   int count;
+   int     count;
 
    flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -723,6 +727,7 @@ InitLogicalReplication(InitLogicalReplicationCmd *cmd)
 
    Assert(!MyLogicalDecodingSlot);
 
+   /* XXX apply sanity checking to slot name? */
    LogicalDecodingAcquireFreeSlot(cmd->name, cmd->plugin);
 
    Assert(MyLogicalDecodingSlot);
@@ -733,6 +738,7 @@ InitLogicalReplication(InitLogicalReplicationCmd *cmd)
                                         ALLOCSET_DEFAULT_INITSIZE,
                                         ALLOCSET_DEFAULT_MAXSIZE);
    old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+   /* XXX pointless? */
    TopTransactionContext = decoding_ctx;
 
    /* setup state for XLogReadPage */
@@ -742,7 +748,6 @@ InitLogicalReplication(InitLogicalReplicationCmd *cmd)
    ctx = CreateLogicalDecodingContext(MyLogicalDecodingSlot, false, NIL,
                        replay_read_page, WalSndPrepareWrite, WalSndWriteData);
 
-
    MemoryContextSwitchTo(old_decoding_ctx);
    TopTransactionContext = NULL;
 
@@ -886,8 +891,7 @@ StartLogicalReplication(StartLogicalReplicationCmd *cmd)
    if (am_cascading_walsender && !RecoveryInProgress())
    {
        ereport(LOG,
-          (errmsg("terminating walsender process to force cascaded standby "
-                  "to update timeline and reconnect")));
+               (errmsg("terminating walsender process to force cascaded standby to update timeline and reconnect")));
        walsender_ready_to_stop = true;
    }
 
@@ -965,9 +969,11 @@ FreeLogicalReplication(FreeLogicalReplicationCmd *cmd)
 }
 
 /*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
  * Prepare a write into a StringInfo.
  *
- * Don't do anything lasting in here, its quite possible that nothing will done
+ * Don't do anything lasting in here, it's quite possible that nothing will done
  * with the data.
  */
 static void
@@ -981,11 +987,13 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
    pq_sendint64(ctx->out, lsn);    /* dataStart */
    /* XXX: overwrite when data is assembled */
    pq_sendint64(ctx->out, lsn);    /* walEnd */
-   /* XXX: gather that value later just as its done in XLogSendPhysical */
+   /* XXX: gather that value later just as it's done in XLogSendPhysical */
    pq_sendint64(ctx->out, 0 /*GetCurrentIntegerTimestamp() */);/* sendtime */
 }
 
 /*
+ * LogicalDecodingContext 'write' callback.
+ *
  * Actually write out data previously prepared by WalSndPrepareWrite out to the
  * network, take as long as needed but process replies from the other side
  * during that.
@@ -998,7 +1006,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
    AssertVariableIsOfType(&WalSndWriteData, LogicalOutputPluginWriterWrite);
 
-
+   /* output previously gathered data in a CopyData packet */
    pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
    /* fast path */
@@ -1011,7 +1019,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
    for (;;)
    {
-
        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
         * necessity for manual cleanup of all postmaster children.
@@ -1039,6 +1046,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
        if (pq_flush_if_writable() != 0)
            break;
 
+       /* If we finished clearing the buffered data, we're done here. */
        if (!pq_is_send_pending())
            break;
 
@@ -1052,6 +1060,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
                          MyProcPort->sock, sleeptime);
        ImmediateInterruptOK = false;
    }
+
    /* reactivate latch so WalSndLoop knows to continue */
    SetLatch(&MyWalSnd->latch);
 }
@@ -1100,8 +1109,14 @@ WalSndWaitForWal(XLogRecPtr loc)
        /* Clear any already-pending wakeups */
        ResetLatch(&MyWalSnd->latch);
 
-       /* check whether we're done */
+       /* Update our idea of flushed position. */
        flushptr = GetFlushRecPtr();
+
+       /* If postmaster asked us to stop, don't wait here anymore */
+       if (walsender_ready_to_stop)
+           break;
+
+       /* check whether we're done */
        if (loc <= flushptr)
            break;
 
@@ -1482,8 +1497,6 @@ ProcessStandbyHSFeedbackMessage(void)
 static void
 WalSndLoop(WalSndSendData send_data)
 {
-   bool        caughtup = false;
-
    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once to reduce palloc overhead.
@@ -1540,16 +1553,16 @@ WalSndLoop(WalSndSendData send_data)
         * caught up.
         */
        if (!pq_is_send_pending())
-           send_data(&caughtup);
+           send_data();
        else
-           caughtup = false;
+           WalSndCaughtUp = false;
 
        /* Try to flush pending output to the client */
        if (pq_flush_if_writable() != 0)
            goto send_failure;
 
        /* If nothing remains to be sent right now ... */
-       if (caughtup && !pq_is_send_pending())
+       if (WalSndCaughtUp && !pq_is_send_pending())
        {
            /*
             * If we're in catchup state, move to streaming.  This is an
@@ -1574,18 +1587,7 @@ WalSndLoop(WalSndSendData send_data)
             * the walsender is not sure which.
             */
            if (walsender_ready_to_stop)
-           {
-               /* ... let's just be real sure we're caught up ... */
-               send_data(&caughtup);
-               if (caughtup && !pq_is_send_pending())
-               {
-                   /* Inform the standby that XLOG streaming is done */
-                   EndCommand("COPY 0", DestRemote);
-                   pq_flush();
-
-                   proc_exit(0);
-               }
-           }
+               WalSndDone(send_data);
        }
 
        /*
@@ -1595,7 +1597,7 @@ WalSndLoop(WalSndSendData send_data)
         * loaded a subset of the available data but then pq_flush_if_writable
         * flushed it all --- we should immediately try to send more.
         */
-       if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+       if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
        {
            TimestampTz timeout = 0;
            long        sleeptime = 10000;      /* 10 s */
@@ -1929,11 +1931,11 @@ retry:
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
  *
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
  */
 static void
-XLogSendPhysical(bool *caughtup)
+XLogSendPhysical(void)
 {
    XLogRecPtr  SendRqstPtr;
    XLogRecPtr  startptr;
@@ -1942,7 +1944,7 @@ XLogSendPhysical(bool *caughtup)
 
    if (streamingDoneSending)
    {
-       *caughtup = true;
+       WalSndCaughtUp = true;
        return;
    }
 
@@ -2058,7 +2060,7 @@ XLogSendPhysical(bool *caughtup)
        pq_putmessage_noblock('c', NULL, 0);
        streamingDoneSending = true;
 
-       *caughtup = true;
+       WalSndCaughtUp = true;
 
        elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
             (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
@@ -2070,7 +2072,7 @@ XLogSendPhysical(bool *caughtup)
    Assert(sentPtr <= SendRqstPtr);
    if (SendRqstPtr <= sentPtr)
    {
-       *caughtup = true;
+       WalSndCaughtUp = true;
        return;
    }
 
@@ -2094,15 +2096,15 @@ XLogSendPhysical(bool *caughtup)
    {
        endptr = SendRqstPtr;
        if (sendTimeLineIsHistoric)
-           *caughtup = false;
+           WalSndCaughtUp = false;
        else
-           *caughtup = true;
+           WalSndCaughtUp = true;
    }
    else
    {
        /* round down to page boundary. */
        endptr -= (endptr % XLOG_BLCKSZ);
-       *caughtup = false;
+       WalSndCaughtUp = false;
    }
 
    nbytes = endptr - startptr;
@@ -2167,14 +2169,10 @@ XLogSendPhysical(bool *caughtup)
  * plugin specified in INIT_LOGICAL_DECODING
  */
 static void
-XLogSendLogical(bool *caughtup)
+XLogSendLogical(void)
 {
-#ifdef NOT_YET
-   XLogRecPtr endptr;
-   XLogRecPtr curptr;
-#endif
    XLogRecord *record;
-   char *errm = NULL;
+   char       *errm;
 
    if (decoding_ctx == NULL)
    {
@@ -2189,7 +2187,7 @@ XLogSendLogical(bool *caughtup)
    logical_startptr = InvalidXLogRecPtr;
 
    /* xlog record was invalid */
-   if (errm)
+   if (errm != NULL)
        elog(ERROR, "%s", errm);
 
    if (record != NULL)
@@ -2207,32 +2205,19 @@ XLogSendLogical(bool *caughtup)
 
        MemoryContextSwitchTo(old_decoding_ctx);
        TopTransactionContext = NULL;
-   }
-
-#ifdef NOT_YET
-   logical_reader->endptr = logical_reader->curptr;
-   curptr = logical_reader->curptr;
 
-   /*
-    * read at most MAX_SEND_SIZE of wal. We chunk the reading only to allow
-    * reading keepalives and such inbetween.
-    */
-   logical_reader->endptr += MAX_SEND_SIZE;
-
-   /* only read up to already flushed wal */
-   endptr = GetFlushRecPtr();
-   if (endptr < logical_reader->endptr)
-       logical_reader->endptr = endptr;
-
-   if (curptr == logical_reader->curptr ||
-       logical_reader->curptr == endptr)
-       *caughtup = true;
+       /*
+        * If the record we just read is at or beyond the flushed point, then
+        * we're caught up.
+        */
+       WalSndCaughtUp =
+           logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr();
+   }
    else
-       *caughtup = false;
-
-#endif
-
-   *caughtup = false;
+       /*
+        * xlogreader failed, and no error was reported? we must be caught up.
+        */
+       WalSndCaughtUp = true;
 
    /* Update shared memory status */
    {
@@ -2245,6 +2230,30 @@ XLogSendLogical(bool *caughtup)
    }
 }
 
+/*
+ * The sender is caught up, so we can go away for shutdown processing
+ * to finish normally.  (This should only be called when the shutdown
+ * signal has been received from postmaster.)
+ *
+ * Note that if while doing this we determine that there's still more
+ * data to send, this function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendData send_data)
+{
+   /* ... let's just be real sure we're caught up ... */
+   send_data();
+
+   if (WalSndCaughtUp && !pq_is_send_pending())
+   {
+       /* Inform the standby that XLOG streaming is done */
+       EndCommand("COPY 0", DestRemote);
+       pq_flush();
+
+       proc_exit(0);
+   }
+}
+
 /*
  * Returns the latest point in WAL that has been safely flushed to disk, and
  * can be sent to the standby. This should only be called when in recovery,