static bool streamingDoneSending;
static bool streamingDoneReceiving;
+/* Are we there yet? */
+static bool WalSndCaughtUp = false;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t walsender_ready_to_stop = false;
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-typedef void (*WalSndSendData)(bool *);
+typedef void (*WalSndSendData)(void);
static void WalSndLoop(WalSndSendData send_data);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
-static void XLogSendPhysical(bool *caughtup);
-static void XLogSendLogical(bool *caughtup);
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendData send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
{
XLogRecPtr flushptr;
- int count;
+ int count;
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
Assert(!MyLogicalDecodingSlot);
+ /* XXX apply sanity checking to slot name? */
LogicalDecodingAcquireFreeSlot(cmd->name, cmd->plugin);
Assert(MyLogicalDecodingSlot);
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+ /* XXX pointless? */
TopTransactionContext = decoding_ctx;
/* setup state for XLogReadPage */
ctx = CreateLogicalDecodingContext(MyLogicalDecodingSlot, false, NIL,
replay_read_page, WalSndPrepareWrite, WalSndWriteData);
-
MemoryContextSwitchTo(old_decoding_ctx);
TopTransactionContext = NULL;
if (am_cascading_walsender && !RecoveryInProgress())
{
ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
+ (errmsg("terminating walsender process to force cascaded standby to update timeline and reconnect")));
walsender_ready_to_stop = true;
}
}
/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
* Prepare a write into a StringInfo.
*
- * Don't do anything lasting in here, its quite possible that nothing will done
+ * Don't do anything lasting in here, it's quite possible that nothing will done
* with the data.
*/
static void
pq_sendint64(ctx->out, lsn); /* dataStart */
/* XXX: overwrite when data is assembled */
pq_sendint64(ctx->out, lsn); /* walEnd */
- /* XXX: gather that value later just as its done in XLogSendPhysical */
+ /* XXX: gather that value later just as it's done in XLogSendPhysical */
pq_sendint64(ctx->out, 0 /*GetCurrentIntegerTimestamp() */);/* sendtime */
}
/*
+ * LogicalDecodingContext 'write' callback.
+ *
* Actually write out data previously prepared by WalSndPrepareWrite out to the
* network, take as long as needed but process replies from the other side
* during that.
AssertVariableIsOfType(&WalSndWriteData, LogicalOutputPluginWriterWrite);
-
+ /* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
/* fast path */
for (;;)
{
-
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
if (pq_flush_if_writable() != 0)
break;
+ /* If we finished clearing the buffered data, we're done here. */
if (!pq_is_send_pending())
break;
MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
}
+
/* reactivate latch so WalSndLoop knows to continue */
SetLatch(&MyWalSnd->latch);
}
/* Clear any already-pending wakeups */
ResetLatch(&MyWalSnd->latch);
- /* check whether we're done */
+ /* Update our idea of flushed position. */
flushptr = GetFlushRecPtr();
+
+ /* If postmaster asked us to stop, don't wait here anymore */
+ if (walsender_ready_to_stop)
+ break;
+
+ /* check whether we're done */
if (loc <= flushptr)
break;
static void
WalSndLoop(WalSndSendData send_data)
{
- bool caughtup = false;
-
/*
* Allocate buffers that will be used for each outgoing and incoming
* message. We do this just once to reduce palloc overhead.
* caught up.
*/
if (!pq_is_send_pending())
- send_data(&caughtup);
+ send_data();
else
- caughtup = false;
+ WalSndCaughtUp = false;
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
goto send_failure;
/* If nothing remains to be sent right now ... */
- if (caughtup && !pq_is_send_pending())
+ if (WalSndCaughtUp && !pq_is_send_pending())
{
/*
* If we're in catchup state, move to streaming. This is an
* the walsender is not sure which.
*/
if (walsender_ready_to_stop)
- {
- /* ... let's just be real sure we're caught up ... */
- send_data(&caughtup);
- if (caughtup && !pq_is_send_pending())
- {
- /* Inform the standby that XLOG streaming is done */
- EndCommand("COPY 0", DestRemote);
- pq_flush();
-
- proc_exit(0);
- }
- }
+ WalSndDone(send_data);
}
/*
* loaded a subset of the available data but then pq_flush_if_writable
* flushed it all --- we should immediately try to send more.
*/
- if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+ if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
{
TimestampTz timeout = 0;
long sleeptime = 10000; /* 10 s */
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
*/
static void
-XLogSendPhysical(bool *caughtup)
+XLogSendPhysical(void)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
if (streamingDoneSending)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
- *caughtup = true;
+ WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
Assert(sentPtr <= SendRqstPtr);
if (SendRqstPtr <= sentPtr)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
{
endptr = SendRqstPtr;
if (sendTimeLineIsHistoric)
- *caughtup = false;
+ WalSndCaughtUp = false;
else
- *caughtup = true;
+ WalSndCaughtUp = true;
}
else
{
/* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ);
- *caughtup = false;
+ WalSndCaughtUp = false;
}
nbytes = endptr - startptr;
* plugin specified in INIT_LOGICAL_DECODING
*/
static void
-XLogSendLogical(bool *caughtup)
+XLogSendLogical(void)
{
-#ifdef NOT_YET
- XLogRecPtr endptr;
- XLogRecPtr curptr;
-#endif
XLogRecord *record;
- char *errm = NULL;
+ char *errm;
if (decoding_ctx == NULL)
{
logical_startptr = InvalidXLogRecPtr;
/* xlog record was invalid */
- if (errm)
+ if (errm != NULL)
elog(ERROR, "%s", errm);
if (record != NULL)
MemoryContextSwitchTo(old_decoding_ctx);
TopTransactionContext = NULL;
- }
-
-#ifdef NOT_YET
- logical_reader->endptr = logical_reader->curptr;
- curptr = logical_reader->curptr;
- /*
- * read at most MAX_SEND_SIZE of wal. We chunk the reading only to allow
- * reading keepalives and such inbetween.
- */
- logical_reader->endptr += MAX_SEND_SIZE;
-
- /* only read up to already flushed wal */
- endptr = GetFlushRecPtr();
- if (endptr < logical_reader->endptr)
- logical_reader->endptr = endptr;
-
- if (curptr == logical_reader->curptr ||
- logical_reader->curptr == endptr)
- *caughtup = true;
+ /*
+ * If the record we just read is at or beyond the flushed point, then
+ * we're caught up.
+ */
+ WalSndCaughtUp =
+ logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr();
+ }
else
- *caughtup = false;
-
-#endif
-
- *caughtup = false;
+ /*
+ * xlogreader failed, and no error was reported? we must be caught up.
+ */
+ WalSndCaughtUp = true;
/* Update shared memory status */
{
}
}
+/*
+ * The sender is caught up, so we can go away for shutdown processing
+ * to finish normally. (This should only be called when the shutdown
+ * signal has been received from postmaster.)
+ *
+ * Note that if while doing this we determine that there's still more
+ * data to send, this function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendData send_data)
+{
+ /* ... let's just be real sure we're caught up ... */
+ send_data();
+
+ if (WalSndCaughtUp && !pq_is_send_pending())
+ {
+ /* Inform the standby that XLOG streaming is done */
+ EndCommand("COPY 0", DestRemote);
+ pq_flush();
+
+ proc_exit(0);
+ }
+}
+
/*
* Returns the latest point in WAL that has been safely flushed to disk, and
* can be sent to the standby. This should only be called when in recovery,