*/
/*-------------------------------------------------------------------------
- * Async Notification Model as of 9.0:
+ * Async Notification Model as of v19:
*
- * 1. Multiple backends on same machine. Multiple backends listening on
- * several channels. (Channels are also called "conditions" in other
- * parts of the code.)
+ * 1. Multiple backends on same machine. Multiple backends may be listening
+ * on each of several channels.
*
* 2. There is one central queue in disk-based storage (directory pg_notify/),
* with actively-used pages mapped into shared memory by the slru.c module.
* All notification messages are placed in the queue and later read out
- * by listening backends.
- *
- * There is no central knowledge of which backend listens on which channel;
- * every backend has its own list of interesting channels.
+ * by listening backends. The single queue allows us to guarantee that
+ * notifications are received in commit order.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
* page number and the offset in that page. This is done before marking the
* transaction as committed in clog. If we run into problems writing the
* notifications, we can still call elog(ERROR, ...) and the transaction
- * will roll back.
+ * will roll back safely.
*
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the effective listen state (listenChannels).
+ * make any required updates to the effective listen state (see below).
* Then we signal any backends that may be interested in our messages
* (including our own backend, if listening). This is done by
- * SignalBackends(), which scans the list of listening backends and sends a
- * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- * know which backend is listening on which channel so we must signal them
- * all). We can exclude backends that are already up to date, though, and
- * we can also exclude backends that are in other databases (unless they
- * are way behind and should be kicked to make them advance their
- * pointers).
+ * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
+ * each relevant backend, as described below.
*
* Finally, after we are out of the transaction altogether and about to go
* idle, we scan the queue for messages that need to be sent to our
* often. We make sending backends do this work if they advanced the queue
* head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
*
+ * 7. So far we have not discussed how backends change their listening state,
+ * nor how notification senders know which backends to awaken. To handle
+ * the latter, we maintain a global channel table (implemented as a dynamic
+ * shared hash table, or dshash) that maps channel names to the set of
+ * backends listening on each channel. This table is created lazily on the
+ * first LISTEN command and grows dynamically as needed. There is also a
+ * local channel table (a plain dynahash table) in each listening backend,
+ * tracking which channels that backend is listening to. The local table
+ * serves to reduce the number of accesses needed to the shared table.
+ *
+ * If the current transaction has executed any LISTEN/UNLISTEN actions,
+ * PreCommit_Notify() prepares to commit those. For LISTEN, it
+ * pre-allocates entries in both the per-backend localChannelTable and the
+ * shared globalChannelTable (with listening=false so that these entries
+ * are no-ops for the moment). It also records the final per-channel
+ * intent in pendingListenActions, so post-commit/abort processing can
+ * apply that in a single step. Since all these allocations happen before
+ * committing to clog, we can safely abort the transaction on failure.
+ *
+ * After commit, AtCommit_Notify() runs through pendingListenActions and
+ * updates the backend's per-channel listening flags to activate or
+ * deactivate listening. This happens before sending signals.
+ *
+ * SignalBackends() consults the shared global channel table to identify
+ * listeners for the channels that the current transaction sent
+ * notification(s) to. Each selected backend is marked as having a wakeup
+ * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ * signal is sent to it.
+ *
+ * 8. While writing notifications, PreCommit_Notify() records the queue head
+ * position both before and after the write. Because all writers serialize
+ * on a cluster-wide heavyweight lock, no other backend can insert entries
+ * between these two points. SignalBackends() uses this fact to directly
+ * advance the queue pointer for any backend that is still positioned at
+ * the old head, or within the range written, but is not interested in any
+ * of our notifications. This avoids unnecessary wakeups for idle
+ * listeners that have nothing to read. Backends that are not interested
+ * in our notifications, but cannot be directly advanced, are signaled only
+ * if they are far behind the current queue head; that is to ensure that
+ * we can advance the queue tail without undue delay.
+ *
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
* The amount of shared memory used for notify management (notify_buffers)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
- * by max_notify_queue_pages GUC.
+ * by the max_notify_queue_pages GUC.
*-------------------------------------------------------------------------
*/
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
+#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
/*
* Parameter determining how often we try to advance the tail pointer:
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend that's not interested in our
+ * notifications needs to be behind before we'll decide we need to wake it
+ * up so it can advance its pointer.
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ bool wakeupPending; /* signal sent to backend, not yet processed */
+ bool isAdvancing; /* backend is advancing its position */
} QueueBackendStatus;
/*
* (since no other backend will inspect it).
*
* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends and also change the head pointer. They can
+ * also advance other backends' queue positions, unless the other backend
+ * has isAdvancing set (i.e., is in process of doing that itself).
+ *
+ * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
+ * mode, backends can change the tail pointers.
*
* SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
* the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get
- * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
+ * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
+ * globalChannelTable partition locks.
*
* Each backend uses the backend[] array entry with index equal to its
* ProcNumber. We rely on this to make SendProcSignal fast.
ProcNumber firstListener; /* id of first listener, or
* INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
+ dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
+ /* Array with room for MaxBackends entries: */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
/*
* The SLRU buffer area through which we access the notification queue
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * Global channel table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening or about to listen
+ * on each channel. The "listening" flags allow us to create hash table
+ * entries pre-commit and not have to assume that creating them post-commit
+ * will succeed.
+ */
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct GlobalChannelKey
+{
+ Oid dboid;
+ char channel[NAMEDATALEN];
+} GlobalChannelKey;
+
+typedef struct ListenerEntry
+{
+ ProcNumber procNo; /* listener's ProcNumber */
+ bool listening; /* true if committed listener */
+} ListenerEntry;
+
+typedef struct GlobalChannelEntry
+{
+ GlobalChannelKey key; /* hash key */
+ dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
+ int numListeners; /* Number of listeners currently stored */
+ int allocatedListeners; /* Allocated size of array */
+} GlobalChannelEntry;
+
+static dshash_table *globalChannelTable = NULL;
+static dsa_area *globalChannelDSA = NULL;
+
+/*
+ * localChannelTable caches the channel names this backend is listening on
+ * (including those we have staged to be listened on, but not yet committed).
+ * Used by IsListeningOn() for fast lookups when reading notifications.
*/
-static List *listenChannels = NIL; /* list of C strings */
+static HTAB *localChannelTable = NULL;
+
+/* We test this condition to detect that we're not listening at all */
+#define LocalChannelTableIsEmpty() \
+ (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listen state until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
static ActionList *pendingActions = NULL;
+/*
+ * Hash table recording the final listen/unlisten intent per channel for
+ * the current transaction. Key is channel name, value is PENDING_LISTEN or
+ * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
+ * per channel instead of replaying every action. This is built from the
+ * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
+ * AtAbort_Notify.
+ */
+typedef enum
+{
+ PENDING_LISTEN,
+ PENDING_UNLISTEN,
+} PendingListenAction;
+
+typedef struct PendingListenEntry
+{
+ char channel[NAMEDATALEN]; /* hash key */
+ PendingListenAction action; /* which action should we perform? */
+} PendingListenEntry;
+
+static HTAB *pendingListenActions = NULL;
+
/*
* State for outbound notifies consists of a list of all channels+payloads
* NOTIFYed in the current transaction. We do not actually perform a NOTIFY
int nestingLevel; /* current transaction nesting depth */
List *events; /* list of Notification structs */
HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+ List *uniqueChannelNames; /* unique channel names being notified */
+ HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
struct NotificationList *upper; /* details for upper transaction levels */
} NotificationList;
static NotificationList *pendingNotifies = NULL;
+/*
+ * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
+ * (both just carry the channel name, with no payload).
+ */
+typedef struct ChannelName
+{
+ char channel[NAMEDATALEN]; /* hash key */
+} ChannelName;
+
/*
* Inbound notifications are initially processed by HandleNotifyInterrupt(),
* called from inside a signal handler. That just sets the
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them. SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * Workspace arrays for SignalBackends. These are preallocated in
+ * PreCommit_Notify to avoid needing memory allocation after committing to
+ * clog.
+ */
+static int32 *signalPids = NULL;
+static ProcNumber *signalProcnos = NULL;
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
/* local function prototypes */
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
+static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
+ const char *channel);
+static dshash_hash globalChannelTableHash(const void *key, size_t size,
+ void *arg);
+static void initGlobalChannelTable(void);
+static void initLocalChannelTable(void);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void BecomeRegisteredListener(void);
+static void PrepareTableEntriesForListen(const char *channel);
+static void PrepareTableEntriesForUnlisten(const char *channel);
+static void PrepareTableEntriesForUnlistenAll(void);
+static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
+ ListenerEntry *listeners,
+ int idx);
+static void ApplyPendingListenActions(bool isCommit);
+static void CleanupListenersOnExit(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
return p < q;
}
+/*
+ * GlobalChannelKeyInit
+ * Prepare a global channel table key for hashing.
+ */
+static inline void
+GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
+{
+ memset(key, 0, sizeof(GlobalChannelKey));
+ key->dboid = dboid;
+ strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * globalChannelTableHash
+ * Hash function for global channel table keys.
+ */
+static dshash_hash
+globalChannelTableHash(const void *key, size_t size, void *arg)
+{
+ const GlobalChannelKey *k = (const GlobalChannelKey *) key;
+ dshash_hash h;
+
+ h = DatumGetUInt32(hash_uint32(k->dboid));
+ h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ strnlen(k->channel, NAMEDATALEN)));
+
+ return h;
+}
+
+/* parameters for the global channel table */
+static const dshash_parameters globalChannelTableDSHParams = {
+ sizeof(GlobalChannelKey),
+ sizeof(GlobalChannelEntry),
+ dshash_memcmp,
+ globalChannelTableHash,
+ dshash_memcpy,
+ LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initGlobalChannelTable
+ * Lazy initialization of the global channel table.
+ */
+static void
+initGlobalChannelTable(void)
+{
+ MemoryContext oldcontext;
+
+ /* Quick exit if we already did this */
+ if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
+ globalChannelTable != NULL)
+ return;
+
+ /* Otherwise, use a lock to ensure only one process creates the table */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /* Be sure any local memory allocated by DSA routines is persistent */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
+ {
+ /* Initialize dynamic shared hash table for global channels */
+ globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+ dsa_pin(globalChannelDSA);
+ dsa_pin_mapping(globalChannelDSA);
+ globalChannelTable = dshash_create(globalChannelDSA,
+ &globalChannelTableDSHParams,
+ NULL);
+
+ /* Store handles in shared memory for other backends to use */
+ asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
+ asyncQueueControl->globalChannelTableDSH =
+ dshash_get_hash_table_handle(globalChannelTable);
+ }
+ else if (!globalChannelTable)
+ {
+ /* Attach to existing dynamic shared hash table */
+ globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
+ dsa_pin_mapping(globalChannelDSA);
+ globalChannelTable = dshash_attach(globalChannelDSA,
+ &globalChannelTableDSHParams,
+ asyncQueueControl->globalChannelTableDSH,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * initLocalChannelTable
+ * Lazy initialization of the local channel table.
+ * Once created, this table lasts for the life of the session.
+ */
+static void
+initLocalChannelTable(void)
+{
+ HASHCTL hash_ctl;
+
+ /* Quick exit if we already did this */
+ if (localChannelTable != NULL)
+ return;
+
+ /* Initialize local hash table for this backend's listened channels */
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(ChannelName);
+
+ localChannelTable =
+ hash_create("Local Listen Channels",
+ 64,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * initPendingListenActions
+ * Lazy initialization of the pending listen actions hash table.
+ * This is allocated in CurTransactionContext during PreCommit_Notify,
+ * and destroyed at transaction end.
+ */
+static void
+initPendingListenActions(void)
+{
+ HASHCTL hash_ctl;
+
+ if (pendingListenActions != NULL)
+ return;
+
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(PendingListenEntry);
+ hash_ctl.hcxt = CurTransactionContext;
+
+ pendingListenActions =
+ hash_create("Pending Listen Actions",
+ list_length(pendingActions->actions),
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+}
+
/*
* Report space needed for our shared memory area
*/
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
asyncQueueControl->lastQueueFillWarn = 0;
+ asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
+ asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
for (int i = 0; i < MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+ QUEUE_BACKEND_IS_ADVANCING(i) = false;
}
}
notifies->events = list_make1(n);
/* We certainly don't need a hashtable yet */
notifies->hashtab = NULL;
+ /* We won't build uniqueChannelNames/Hash till later, either */
+ notifies->uniqueChannelNames = NIL;
+ notifies->uniqueChannelHash = NULL;
notifies->upper = pendingNotifies;
pendingNotifies = notifies;
}
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
- * commit.
+ * Actual update of localChannelTable and globalChannelTable happens during
+ * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
*/
static void
queue_listen(ListenActionKind action, const char *channel)
int my_level = GetCurrentTransactionNestLevel();
/*
- * Unlike Async_Notify, we don't try to collapse out duplicates. It would
- * be too complicated to ensure we get the right interactions of
- * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
- * would be any performance benefit anyway in sane applications.
+ * Unlike Async_Notify, we don't try to collapse out duplicates here. We
+ * keep the ordered list to preserve interactions like UNLISTEN ALL; the
+ * final per-channel intent is computed during PreCommit_Notify.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
* SQL function: return a set of the channel names this backend is actively
* listening to.
*
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the localChannelTable cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *status;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+
+ /* Initialize hash table iteration if we have any channels */
+ if (localChannelTable != NULL)
+ {
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
+ hash_seq_init(status, localChannelTable);
+ funcctx->user_fctx = status;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ funcctx->user_fctx = NULL;
+ }
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
+ status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (status != NULL)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ ChannelName *entry;
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ entry = (ChannelName *) hash_seq_search(status);
+ if (entry != NULL)
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
}
SRF_RETURN_DONE(funcctx);
static void
Async_UnlistenOnExit(int code, Datum arg)
{
- Exec_UnlistenAllCommit();
+ CleanupListenersOnExit();
asyncQueueUnregister();
}
elog(DEBUG1, "PreCommit_Notify");
/* Preflight for any pending listen/unlisten actions */
+ initGlobalChannelTable();
+
if (pendingActions != NULL)
{
+ /* Ensure we have a local channel table */
+ initLocalChannelTable();
+ /* Create pendingListenActions hash table for this transaction */
+ initPendingListenActions();
+
+ /* Stage all the actions this transaction wants to perform */
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ BecomeRegisteredListener();
+ PrepareTableEntriesForListen(actrec->channel);
break;
case LISTEN_UNLISTEN:
- /* there is no Exec_UnlistenPreCommit() */
+ PrepareTableEntriesForUnlisten(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
- /* there is no Exec_UnlistenAllPreCommit() */
+ PrepareTableEntriesForUnlistenAll();
break;
}
}
if (pendingNotifies)
{
ListCell *nextNotify;
+ bool firstIteration = true;
+
+ /*
+ * Build list of unique channel names being notified for use by
+ * SignalBackends().
+ *
+ * If uniqueChannelHash is available, use it to efficiently get the
+ * unique channels. Otherwise, fall back to the O(N^2) approach.
+ */
+ pendingNotifies->uniqueChannelNames = NIL;
+ if (pendingNotifies->uniqueChannelHash != NULL)
+ {
+ HASH_SEQ_STATUS status;
+ ChannelName *channelEntry;
+
+ hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
+ while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
+ pendingNotifies->uniqueChannelNames =
+ lappend(pendingNotifies->uniqueChannelNames,
+ channelEntry->channel);
+ }
+ else
+ {
+ /* O(N^2) approach is better for small number of notifications */
+ foreach_ptr(Notification, n, pendingNotifies->events)
+ {
+ char *channel = n->data;
+ bool found = false;
+
+ /* Name present in list? */
+ foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
+ {
+ if (strcmp(oldchan, channel) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+ /* Add if not already in list */
+ if (!found)
+ pendingNotifies->uniqueChannelNames =
+ lappend(pendingNotifies->uniqueChannelNames,
+ channel);
+ }
+ }
+
+ /* Preallocate workspace that will be needed by SignalBackends() */
+ if (signalPids == NULL)
+ signalPids = MemoryContextAlloc(TopMemoryContext,
+ MaxBackends * sizeof(int32));
+
+ if (signalProcnos == NULL)
+ signalProcnos = MemoryContextAlloc(TopMemoryContext,
+ MaxBackends * sizeof(ProcNumber));
/*
* Make sure that we have an XID assigned to the current transaction.
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
+ /*
+ * For the direct advancement optimization in SignalBackends(), we
+ * need to ensure that no other backend can insert queue entries
+ * between queueHeadBeforeWrite and queueHeadAfterWrite. The
+ * heavyweight lock above provides this guarantee, since it serializes
+ * all writers.
+ *
+ * Note: if the heavyweight lock were ever removed for scalability
+ * reasons, we could achieve the same guarantee by holding
+ * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+ * than releasing and reacquiring it for each page as we do below.
+ */
+
+ /* Initialize values to a safe default in case list is empty */
+ SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+ SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
+
/* Now push the notifications into the queue */
nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
* point in time we can still roll the transaction back.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ if (firstIteration)
+ {
+ queueHeadBeforeWrite = QUEUE_HEAD;
+ firstIteration = false;
+ }
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+ queueHeadAfterWrite = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
}
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
+ * Apply pending listen/unlisten changes and clear transaction-local state.
*
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
void
AtCommit_Notify(void)
{
- ListCell *p;
-
/*
* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
* return as soon as possible
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
- /* Perform any pending listen/unlisten actions */
- if (pendingActions != NULL)
- {
- foreach(p, pendingActions->actions)
- {
- ListenAction *actrec = (ListenAction *) lfirst(p);
-
- switch (actrec->action)
- {
- case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN_ALL:
- Exec_UnlistenAllCommit();
- break;
- }
- }
- }
+ /* Apply staged listen/unlisten changes */
+ ApplyPendingListenActions(true);
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && LocalChannelTableIsEmpty())
asyncQueueUnregister();
/*
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * BecomeRegisteredListener --- subroutine for PreCommit_Notify
*
* This function must make sure we are ready to catch any incoming messages.
*/
static void
-Exec_ListenPreCommit(void)
+BecomeRegisteredListener(void)
{
QueuePosition head;
QueuePosition max;
return;
if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+ elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
/*
* Before registering, make sure we will unlisten before dying. (Note:
QUEUE_BACKEND_POS(MyProcNumber) = max;
QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
/* Insert backend into list of listeners at correct position */
if (prevListener != INVALID_PROC_NUMBER)
{
}
/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
*
- * Add the channel to the list of channels we are listening on.
+ * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
+ * an entry in localChannelTable, and pre-allocating an entry in the shared
+ * globalChannelTable with listening=false. The listening flag will be set
+ * to true in AtCommit_Notify. If we abort later, unwanted table entries
+ * will be removed.
*/
static void
-Exec_ListenCommit(const char *channel)
+PrepareTableEntriesForListen(const char *channel)
{
- MemoryContext oldcontext;
+ GlobalChannelKey key;
+ GlobalChannelEntry *entry;
+ bool found;
+ ListenerEntry *listeners;
+ PendingListenEntry *pending;
+
+ /*
+ * Record in local pending hash that we want to LISTEN, overwriting any
+ * earlier attempt to UNLISTEN.
+ */
+ pending = (PendingListenEntry *)
+ hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
+ pending->action = PENDING_LISTEN;
+
+ /*
+ * Ensure that there is an entry for the channel in localChannelTable.
+ * (Should this fail, we can just roll back.) If the transaction fails
+ * after this point, we will remove the entry if appropriate during
+ * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
+ * to return TRUE; we assume nothing is going to consult that before
+ * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
+ * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
+ * present to ensure they do the right things; see
+ * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
+ */
+ (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
+
+ /* Pre-allocate entry in shared globalChannelTable with listening=false */
+ GlobalChannelKeyInit(&key, MyDatabaseId, channel);
+ entry = dshash_find_or_insert(globalChannelTable, &key, &found);
+
+ if (!found)
+ {
+ /* New channel entry, so initialize it to a safe state */
+ entry->listenersArray = InvalidDsaPointer;
+ entry->numListeners = 0;
+ entry->allocatedListeners = 0;
+ }
+
+ /*
+ * Create listenersArray if entry doesn't have one. It's tempting to fold
+ * this into the !found case, but this coding allows us to cope in case
+ * dsa_allocate() failed in an earlier attempt.
+ */
+ if (!DsaPointerIsValid(entry->listenersArray))
+ {
+ entry->listenersArray = dsa_allocate(globalChannelDSA,
+ sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
+ entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+ }
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
+ listeners = (ListenerEntry *)
+ dsa_get_address(globalChannelDSA, entry->listenersArray);
+
+ /*
+ * Check if we already have a ListenerEntry (possibly from earlier in this
+ * transaction)
+ */
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ /* Already have an entry; listening flag stays as-is until commit */
+ dshash_release_lock(globalChannelTable, entry);
+ return;
+ }
+ }
+
+ /* Need to add a new entry; grow array if necessary */
+ if (entry->numListeners >= entry->allocatedListeners)
+ {
+ int new_size = entry->allocatedListeners * 2;
+ dsa_pointer old_array = entry->listenersArray;
+ dsa_pointer new_array = dsa_allocate(globalChannelDSA,
+ sizeof(ListenerEntry) * new_size);
+ ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
+
+ memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
+ entry->listenersArray = new_array;
+ entry->allocatedListeners = new_size;
+ dsa_free(globalChannelDSA, old_array);
+ listeners = new_listeners;
+ }
+
+ listeners[entry->numListeners].procNo = MyProcNumber;
+ listeners[entry->numListeners].listening = false; /* staged, not yet
+ * committed */
+ entry->numListeners++;
+
+ dshash_release_lock(globalChannelTable, entry);
+}
+
+/*
+ * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
+ *
+ * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
+ * we're currently listening (committed or staged). We don't touch
+ * globalChannelTable yet - the listener keeps receiving signals until
+ * commit, when the entry is removed.
+ */
+static void
+PrepareTableEntriesForUnlisten(const char *channel)
+{
+ PendingListenEntry *pending;
+
+ /*
+ * If the channel name is not in localChannelTable, then we are neither
+ * listening on it nor preparing to listen on it, so we don't need to
+ * record an UNLISTEN action.
+ */
+ Assert(localChannelTable != NULL);
+ if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
return;
/*
- * Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
+ * Record in local pending hash that we want to UNLISTEN, overwriting any
+ * earlier attempt to LISTEN. Don't touch localChannelTable or
+ * globalChannelTable yet - we keep receiving signals until commit.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ pending = (PendingListenEntry *)
+ hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
+ pending->action = PENDING_UNLISTEN;
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
*
- * Remove the specified channel name from listenChannels.
+ * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
+ * about-to-be-listened channels in pendingListenActions.
*/
static void
-Exec_UnlistenCommit(const char *channel)
+PrepareTableEntriesForUnlistenAll(void)
{
- ListCell *q;
+ HASH_SEQ_STATUS seq;
+ ChannelName *channelEntry;
+ PendingListenEntry *pending;
- if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+ /*
+ * Scan localChannelTable, which will have the names of all channels that
+ * we are listening on or have prepared to listen on. Record an UNLISTEN
+ * action for each one, overwriting any earlier attempt to LISTEN.
+ */
+ hash_seq_init(&seq, localChannelTable);
+ while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
+ {
+ pending = (PendingListenEntry *)
+ hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
+ pending->action = PENDING_UNLISTEN;
+ }
+}
+
+/*
+ * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
+ *
+ * Decrements numListeners, compacts the array, and frees the entry if empty.
+ * Sets *entry_ptr to NULL if the entry was deleted.
+ *
+ * We could get the listeners pointer from the entry, but all callers
+ * already have it at hand.
+ */
+static void
+RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
+ ListenerEntry *listeners,
+ int idx)
+{
+ GlobalChannelEntry *entry = *entry_ptr;
+
+ entry->numListeners--;
+ if (idx < entry->numListeners)
+ memmove(&listeners[idx], &listeners[idx + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - idx));
- foreach(q, listenChannels)
+ if (entry->numListeners == 0)
{
- char *lchan = (char *) lfirst(q);
+ dsa_free(globalChannelDSA, entry->listenersArray);
+ dshash_delete_entry(globalChannelTable, entry);
+ /* tells caller not to release the entry's lock: */
+ *entry_ptr = NULL;
+ }
+}
+
+/*
+ * ApplyPendingListenActions
+ *
+ * Apply, or revert, staged listen/unlisten changes to the local and global
+ * hash tables.
+ */
+static void
+ApplyPendingListenActions(bool isCommit)
+{
+ HASH_SEQ_STATUS seq;
+ PendingListenEntry *pending;
+
+ /* Quick exit if nothing to do */
+ if (pendingListenActions == NULL)
+ return;
- if (strcmp(lchan, channel) == 0)
+ /* We made a globalChannelTable before building pendingListenActions */
+ if (globalChannelTable == NULL)
+ elog(PANIC, "global channel table missing post-commit/abort");
+
+ /* For each staged action ... */
+ hash_seq_init(&seq, pendingListenActions);
+ while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
+ {
+ GlobalChannelKey key;
+ GlobalChannelEntry *entry;
+ bool removeLocal = true;
+ bool foundListener = false;
+
+ /*
+ * Find the global entry for this channel. If isCommit, it had better
+ * exist (it was created in PreCommit). In an abort, it might not
+ * exist, in which case we are not listening and should discard any
+ * local entry that PreCommit may have managed to create.
+ */
+ GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
+ entry = dshash_find(globalChannelTable, &key, true);
+ if (entry != NULL)
{
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
- break;
+ /* Scan entry to find the ListenerEntry for this backend */
+ ListenerEntry *listeners;
+
+ listeners = (ListenerEntry *)
+ dsa_get_address(globalChannelDSA, entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo != MyProcNumber)
+ continue;
+ foundListener = true;
+ if (isCommit)
+ {
+ if (pending->action == PENDING_LISTEN)
+ {
+ /*
+ * LISTEN being committed: set listening=true.
+ * localChannelTable entry was created during
+ * PreCommit and should be kept.
+ */
+ listeners[i].listening = true;
+ removeLocal = false;
+ }
+ else
+ {
+ /*
+ * UNLISTEN being committed: remove pre-allocated
+ * entries from both tables.
+ */
+ RemoveListenerFromChannel(&entry, listeners, i);
+ }
+ }
+ else
+ {
+ /*
+ * Note: this part is reachable only if the transaction
+ * aborts after PreCommit_Notify() has made some
+ * pendingListenActions entries, so it's pretty hard to
+ * test.
+ */
+ if (!listeners[i].listening)
+ {
+ /*
+ * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
+ * and we weren't listening before, so remove
+ * pre-allocated entries from both tables.
+ */
+ RemoveListenerFromChannel(&entry, listeners, i);
+ }
+ else
+ {
+ /*
+ * We're aborting, but the previous state was that
+ * we're listening, so keep localChannelTable entry.
+ */
+ removeLocal = false;
+ }
+ }
+ break; /* there shouldn't be another match */
+ }
+
+ /* We might have already released the entry by removing it */
+ if (entry != NULL)
+ dshash_release_lock(globalChannelTable, entry);
}
- }
- /*
- * We do not complain about unlistening something not being listened;
- * should we?
- */
+ /*
+ * If we're committing a LISTEN action, we should have found a
+ * matching ListenerEntry, but otherwise it's okay if we didn't.
+ */
+ if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
+ elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
+ pending->channel, MyProcNumber);
+
+ /*
+ * If we did not find a globalChannelTable entry for our backend, or
+ * if we are unlistening, remove any localChannelTable entry that may
+ * exist. (Note in particular that this cleans up if we created a
+ * localChannelTable entry and then failed while trying to create a
+ * globalChannelTable entry.)
+ */
+ if (removeLocal && localChannelTable != NULL)
+ (void) hash_search(localChannelTable, pending->channel,
+ HASH_REMOVE, NULL);
+ }
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * CleanupListenersOnExit --- called from Async_UnlistenOnExit
*
- * Unlisten on all channels for this backend.
+ * Remove this backend from all channels in the shared global table.
*/
static void
-Exec_UnlistenAllCommit(void)
+CleanupListenersOnExit(void)
{
+ dshash_seq_status status;
+ GlobalChannelEntry *entry;
+
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
+
+ /* Clear our local cache (not really necessary, but be consistent) */
+ if (localChannelTable != NULL)
+ {
+ hash_destroy(localChannelTable);
+ localChannelTable = NULL;
+ }
+
+ /* Now remove our entries from the shared globalChannelTable */
+ if (globalChannelTable == NULL)
+ return;
+
+ dshash_seq_init(&status, globalChannelTable, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
+ {
+ ListenerEntry *listeners;
+
+ if (entry->key.dboid != MyDatabaseId)
+ continue; /* not relevant */
+
+ listeners = (ListenerEntry *)
+ dsa_get_address(globalChannelDSA, entry->listenersArray);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(globalChannelDSA, entry->listenersArray);
+ dshash_delete_current(&status);
+ }
+ break;
+ }
+ }
+ }
+ dshash_seq_term(&status);
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
-
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
+ if (localChannelTable == NULL)
+ return false;
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
}
/*
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(LocalChannelTableIsEmpty()); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
/* and remove it from the list */
if (QUEUE_FIRST_LISTENER == MyProcNumber)
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
/*
* Send signals to listening backends.
*
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send. However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind. Waken them anyway if they're far enough behind, so that they'll
+ * Normally we signal only backends that are interested in the notifies that
+ * we just sent. However, that will leave idle listeners falling further and
+ * further behind. Waken them anyway if they're far enough behind, so they'll
* advance their queue position pointers, allowing the global tail to advance.
*
* Since we know the ProcNumber and the Pid the signaling is quite cheap.
static void
SignalBackends(void)
{
- int32 *pids;
- ProcNumber *procnos;
int count;
+ /* Can't get here without PreCommit_Notify having made the global table */
+ Assert(globalChannelTable != NULL);
+
+ /* It should have set up these arrays, too */
+ Assert(signalPids != NULL && signalProcnos != NULL);
+
/*
* Identify backends that we need to signal. We don't want to send
- * signals while holding the NotifyQueueLock, so this loop just builds a
- * list of target PIDs.
- *
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
+ * signals while holding the NotifyQueueLock, so this part just builds a
+ * list of target PIDs in signalPids[] and signalProcnos[].
*/
- pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /* Scan each channel name that we notified in this transaction */
+ foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
+ {
+ GlobalChannelKey key;
+ GlobalChannelEntry *entry;
+ ListenerEntry *listeners;
+
+ GlobalChannelKeyInit(&key, MyDatabaseId, channel);
+ entry = dshash_find(globalChannelTable, &key, false);
+ if (entry == NULL)
+ continue; /* nobody is listening */
+
+ listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
+ entry->listenersArray);
+
+ /* Identify listeners that now need waking, add them to arrays */
+ for (int j = 0; j < entry->numListeners; j++)
+ {
+ ProcNumber i;
+ int32 pid;
+ QueuePosition pos;
+
+ if (!listeners[j].listening)
+ continue; /* ignore not-yet-committed listeners */
+
+ i = listeners[j].procNo;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue; /* already signaled, no need to repeat */
+
+ pid = QUEUE_BACKEND_PID(i);
+ pos = QUEUE_BACKEND_POS(i);
+
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue; /* it's fully caught up already */
+
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ signalPids[count] = pid;
+ signalProcnos[count] = i;
+ count++;
+ }
+
+ dshash_release_lock(globalChannelTable, entry);
+ }
+
+ /*
+ * Scan all listeners. Any that are not already pending wakeup must not
+ * be interested in our notifications (else we'd have set their wakeup
+ * flags above). Check to see if we can directly advance their queue
+ * pointers to save a wakeup. Otherwise, if they are far behind, wake
+ * them anyway so they will catch up.
+ */
for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
- int32 pid = QUEUE_BACKEND_PID(i);
+ int32 pid;
QueuePosition pos;
- Assert(pid != InvalidPid);
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue;
+
+ /* If it's currently advancing, we should not touch it */
+ if (QUEUE_BACKEND_IS_ADVANCING(i))
+ continue;
+
+ pid = QUEUE_BACKEND_PID(i);
pos = QUEUE_BACKEND_POS(i);
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+
+ /*
+ * We can directly advance the other backend's queue pointer if it's
+ * not currently advancing (else there are race conditions), and its
+ * current pointer is not behind queueHeadBeforeWrite (else we'd make
+ * it miss some older messages), and we'd not be moving the pointer
+ * backward.
+ */
+ if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
+ QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
{
- /*
- * Always signal listeners in our own database, unless they're
- * already caught up (unlikely, but possible).
- */
- if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
- continue;
+ /* We can directly advance its pointer past what we wrote */
+ QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
}
- else
+ else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+ QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
{
- /*
- * Listeners in other databases should be signaled only if they
- * are far behind.
- */
- if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
- QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
- continue;
+ /* It's idle and far behind, so wake it up */
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ signalPids[count] = pid;
+ signalProcnos[count] = i;
+ count++;
}
- /* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
- count++;
}
+
LWLockRelease(NotifyQueueLock);
/* Now send signals */
for (int i = 0; i < count; i++)
{
- int32 pid = pids[i];
+ int32 pid = signalPids[i];
/*
* If we are signaling our own process, no need to involve the kernel;
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
-
- pfree(pids);
- pfree(procnos);
}
/*
*
* This is called at transaction abort.
*
- * Gets rid of pending actions and outbound notifies that we would have
- * executed if the transaction got committed.
+ * Revert any staged listen/unlisten changes and clean up transaction state.
+ * This only does anything if we abort after PreCommit_Notify has staged
+ * some entries.
*/
void
AtAbort_Notify(void)
{
- /*
- * If we LISTEN but then roll back the transaction after PreCommit_Notify,
- * we have registered as a listener but have not made any entry in
- * listenChannels. In that case, deregister again.
- */
- if (amRegisteredListener && listenChannels == NIL)
+ /* Revert staged listen/unlisten changes */
+ ApplyPendingListenActions(false);
+
+ /* If we're no longer listening on anything, unregister */
+ if (amRegisteredListener && LocalChannelTableIsEmpty())
asyncQueueUnregister();
/* And clean up */
QueuePosition head;
Snapshot snapshot;
- /* Fetch current state */
+ /*
+ * Fetch current state, indicate to others that we have woken up, and that
+ * we are in process of advancing our position.
+ */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
- LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
/* Nothing to do, we have read all notifications already. */
+ LWLockRelease(NotifyQueueLock);
return;
}
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
+ LWLockRelease(NotifyQueueLock);
+
/*----------
* Get snapshot we'll use to decide which xacts are still in progress.
* This is trickier than it might seem, because of race conditions.
*
* What we do guarantee is that we'll see all notifications from
* transactions committing after the snapshot we take here.
- * Exec_ListenPreCommit has already added us to the listener array,
+ * BecomeRegisteredListener has already added us to the listener array,
* so no not-yet-committed messages can be removed from the queue
* before we see them.
*----------
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
LWLockRelease(NotifyQueueLock);
ExitOnAnyError = save_ExitOnAnyError;
* that if there's a bad entry in the queue for which
* TransactionIdDidCommit() fails for some reason, we can skip
* over it on the first LISTEN in a session, and not get stuck on
- * it indefinitely.
+ * it indefinitely. (This is a little trickier than it looks: it
+ * works because BecomeRegisteredListener runs this code before we
+ * have made the first entry in localChannelTable.)
*/
- if (listenChannels == NIL)
+ if (LocalChannelTableIsEmpty())
continue;
if (TransactionIdDidCommit(qe->xid))
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (LocalChannelTableIsEmpty())
return;
if (Trace_notify)
{
Assert(pendingNotifies->events != NIL);
- /* Create the hash table if it's time to */
+ /* Create the hash tables if it's time to */
if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
pendingNotifies->hashtab == NULL)
{
&hash_ctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+ /* Create the unique channel name table */
+ Assert(pendingNotifies->uniqueChannelHash == NULL);
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(ChannelName);
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->uniqueChannelHash =
+ hash_create("Pending Notify Channel Names",
+ 64L,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
/* Insert all the already-existing events */
foreach(l, pendingNotifies->events)
{
Notification *oldn = (Notification *) lfirst(l);
+ char *channel = oldn->data;
bool found;
(void) hash_search(pendingNotifies->hashtab,
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Add channel name to uniqueChannelHash; might be there already */
+ (void) hash_search(pendingNotifies->uniqueChannelHash,
+ channel,
+ HASH_ENTER,
+ NULL);
}
}
/* Add new event to the list, in order */
pendingNotifies->events = lappend(pendingNotifies->events, n);
- /* Add event to the hash table if needed */
+ /* Add event to the hash tables if needed */
if (pendingNotifies->hashtab != NULL)
{
+ char *channel = n->data;
bool found;
(void) hash_search(pendingNotifies->hashtab,
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Add channel name to uniqueChannelHash; might be there already */
+ (void) hash_search(pendingNotifies->uniqueChannelHash,
+ channel,
+ HASH_ENTER,
+ NULL);
}
}
*/
pendingActions = NULL;
pendingNotifies = NULL;
+ /* Also clear pendingListenActions, which is derived from pendingActions */
+ pendingListenActions = NULL;
}
/*