#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
+#include "storage/proclist.h"
#include "storage/read_stream.h"
#include "storage/smgr.h"
#include "storage/standby.h"
* How many times has the buffer been pinned by this backend.
*/
int32 refcount;
+
+ /*
+ * Is the buffer locked by this backend? BUFFER_LOCK_UNLOCK indicates that
+ * the buffer is not locked.
+ */
+ BufferLockMode lockmode;
} PrivateRefCountData;
typedef struct PrivateRefCountEntry
* Each buffer also has a private refcount that keeps track of the number of
* times the buffer is pinned in the current process. This is so that the
* shared refcount needs to be modified only once if a buffer is pinned more
- * than once by an individual backend. It's also used to check that no buffers
- * are still pinned at the end of transactions and when exiting.
+ * than once by an individual backend. It's also used to check that no
+ * buffers are still pinned at the end of transactions and when exiting. We
+ * also use this mechanism to track whether this backend has a buffer locked,
+ * and, if so, in what mode.
*
*
* To avoid - as we used to - requiring an array with NBuffers entries to keep
/* ResourceOwner callbacks to hold in-progress I/Os and buffer pins */
static void ResOwnerReleaseBufferIO(Datum res);
static char *ResOwnerPrintBufferIO(Datum res);
-static void ResOwnerReleaseBufferPin(Datum res);
-static char *ResOwnerPrintBufferPin(Datum res);
+static void ResOwnerReleaseBuffer(Datum res);
+static char *ResOwnerPrintBuffer(Datum res);
const ResourceOwnerDesc buffer_io_resowner_desc =
{
.DebugPrint = ResOwnerPrintBufferIO
};
-const ResourceOwnerDesc buffer_pin_resowner_desc =
+const ResourceOwnerDesc buffer_resowner_desc =
{
- .name = "buffer pin",
+ .name = "buffer",
.release_phase = RESOURCE_RELEASE_BEFORE_LOCKS,
.release_priority = RELEASE_PRIO_BUFFER_PINS,
- .ReleaseResource = ResOwnerReleaseBufferPin,
- .DebugPrint = ResOwnerPrintBufferPin
+ .ReleaseResource = ResOwnerReleaseBuffer,
+ .DebugPrint = ResOwnerPrintBuffer
};
/*
/* clear the whole data member, just for future proofing */
memset(&victim_entry->data, 0, sizeof(victim_entry->data));
victim_entry->data.refcount = 0;
+ victim_entry->data.lockmode = BUFFER_LOCK_UNLOCK;
PrivateRefCountOverflowed++;
}
PrivateRefCountArrayKeys[ReservedRefCountSlot] = buffer;
res->buffer = buffer;
res->data.refcount = 0;
+ res->data.lockmode = BUFFER_LOCK_UNLOCK;
/* update cache for the next lookup */
PrivateRefCountEntryLast = ReservedRefCountSlot;
ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
{
Assert(ref->data.refcount == 0);
+ Assert(ref->data.lockmode == BUFFER_LOCK_UNLOCK);
if (ref >= &PrivateRefCountArray[0] &&
ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void);
#ifdef USE_ASSERT_CHECKING
-static void AssertNotCatalogBufferLock(LWLock *lock, LWLockMode mode,
- void *unused_context);
+static void AssertNotCatalogBufferLock(Buffer buffer, BufferLockMode mode);
#endif
static int rlocator_comparator(const void *p1, const void *p2);
static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb);
static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b);
static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
+static void BufferLockAcquire(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockUnlock(Buffer buffer, BufferDesc *buf_hdr);
+static bool BufferLockConditional(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode);
+static bool BufferLockHeldByMeInMode(BufferDesc *buf_hdr, BufferLockMode mode);
+static bool BufferLockHeldByMe(BufferDesc *buf_hdr);
+static inline void BufferLockDisown(Buffer buffer, BufferDesc *buf_hdr);
+static inline int BufferLockDisownInternal(Buffer buffer, BufferDesc *buf_hdr);
+static inline bool BufferLockAttempt(BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockQueueSelf(BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockDequeueSelf(BufferDesc *buf_hdr);
+static void BufferLockWakeup(BufferDesc *buf_hdr, bool unlocked);
+static void BufferLockProcessRelease(BufferDesc *buf_hdr, BufferLockMode mode, uint64 lockstate);
+static inline uint64 BufferLockReleaseSub(BufferLockMode mode);
+
/*
* Implementation of PrefetchBuffer() for shared buffers.
goto retry;
}
+ /*
+ * An invalidated buffer should not have any backends waiting to lock the
+ * buffer, therefore BM_LOCK_WAKE_IN_PROGRESS should not be set.
+ */
+ Assert(!(buf_state & BM_LOCK_WAKE_IN_PROGRESS));
+
/*
* Clear out the buffer's tag and flags. We must do this to ensure that
* linear scans of the buffer array don't think the buffer is valid.
return false;
}
+ /*
+ * An invalidated buffer should not have any backends waiting to lock the
+ * buffer, therefore BM_LOCK_WAKE_IN_PROGRESS should not be set.
+ */
+ Assert(!(buf_state & BM_LOCK_WAKE_IN_PROGRESS));
+
/*
* Clear out the buffer's tag and flags and usagecount. This is not
* strictly required, as BM_TAG_VALID/BM_VALID needs to be checked before
*/
if (buf_state & BM_DIRTY)
{
- LWLock *content_lock;
-
Assert(buf_state & BM_TAG_VALID);
Assert(buf_state & BM_VALID);
* one just happens to be trying to split the page the first one got
* from StrategyGetBuffer.)
*/
- content_lock = BufferDescriptorGetContentLock(buf_hdr);
- if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+ if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE))
{
/*
* Someone else has locked the buffer, so give it up and loop back
if (XLogNeedsFlush(lsn)
&& StrategyRejectBuffer(strategy, buf_hdr, from_ring))
{
- LWLockRelease(content_lock);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
UnpinBuffer(buf_hdr);
goto again;
}
/* OK, do the I/O */
FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
- LWLockRelease(content_lock);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
&buf_hdr->tag);
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
- return LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr));
+ return BufferLockHeldByMe(bufHdr);
}
}
}
else
{
- LWLockMode lw_mode;
-
- switch (mode)
- {
- case BUFFER_LOCK_EXCLUSIVE:
- lw_mode = LW_EXCLUSIVE;
- break;
- case BUFFER_LOCK_SHARE:
- lw_mode = LW_SHARED;
- break;
- default:
- pg_unreachable();
- }
-
bufHdr = GetBufferDescriptor(buffer - 1);
- return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
- lw_mode);
+ return BufferLockHeldByMeInMode(bufHdr, mode);
}
}
* I'd better not still hold the buffer content lock. Can't use
* BufferIsLockedByMe(), as that asserts the buffer is pinned.
*/
- Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
+ Assert(!BufferLockHeldByMe(buf));
/* decrement the shared reference count */
old_buf_state = pg_atomic_fetch_sub_u64(&buf->state, BUF_REFCOUNT_ONE);
* Check for exclusive-locked catalog buffers. This is the core of
* AssertCouldGetRelation().
*
- * A backend would self-deadlock on LWLocks if the catalog scan read the
- * exclusive-locked buffer. The main threat is exclusive-locked buffers of
- * catalogs used in relcache, because a catcache search on any catalog may
+ * A backend would self-deadlock on the content lock if the catalog scan read
+ * the exclusive-locked buffer. The main threat is exclusive-locked buffers
+ * of catalogs used in relcache, because a catcache search on any catalog may
* build that catalog's relcache entry. We don't have an inventory of
* catalogs relcache uses, so just check buffers of most catalogs.
*
void
AssertBufferLocksPermitCatalogRead(void)
{
- ForEachLWLockHeldByMe(AssertNotCatalogBufferLock, NULL);
+ PrivateRefCountEntry *res;
+
+ /* check the array */
+ for (int i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
+ {
+ if (PrivateRefCountArrayKeys[i] != InvalidBuffer)
+ {
+ res = &PrivateRefCountArray[i];
+
+ if (res->buffer == InvalidBuffer)
+ continue;
+
+ AssertNotCatalogBufferLock(res->buffer, res->data.lockmode);
+ }
+ }
+
+ /* if necessary search the hash */
+ if (PrivateRefCountOverflowed)
+ {
+ HASH_SEQ_STATUS hstat;
+
+ hash_seq_init(&hstat, PrivateRefCountHash);
+ while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ AssertNotCatalogBufferLock(res->buffer, res->data.lockmode);
+ }
+ }
}
static void
-AssertNotCatalogBufferLock(LWLock *lock, LWLockMode mode,
- void *unused_context)
+AssertNotCatalogBufferLock(Buffer buffer, BufferLockMode mode)
{
- BufferDesc *bufHdr;
+ BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
BufferTag tag;
Oid relid;
- if (mode != LW_EXCLUSIVE)
+ if (mode != BUFFER_LOCK_EXCLUSIVE)
return;
- if (!((BufferDescPadded *) lock > BufferDescriptors &&
- (BufferDescPadded *) lock < BufferDescriptors + NBuffers))
- return; /* not a buffer lock */
-
- bufHdr = (BufferDesc *)
- ((char *) lock - offsetof(BufferDesc, content_lock));
tag = bufHdr->tag;
/*
FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context)
{
- LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+ Buffer buffer = BufferDescriptorGetBuffer(buf);
+
+ BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE);
FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
- LWLockRelease(BufferDescriptorGetContentLock(buf));
+ BufferLockUnlock(buffer, buf);
}
/*
*
* Used to clean up after errors.
*
- * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
- * of releasing buffer content locks per se; the only thing we need to deal
- * with here is clearing any PIN_COUNT request that was in progress.
+ * Currently, we can expect that resource owner cleanup, via
+ * ResOwnerReleaseBufferPin(), took care of releasing buffer content locks per
+ * se; the only thing we need to deal with here is clearing any PIN_COUNT
+ * request that was in progress.
*/
void
UnlockBuffers(void)
}
/*
- * Acquire or release the content_lock for the buffer.
+ * Acquire the buffer content lock in the specified mode
+ *
+ * If the lock is not available, sleep until it is.
+ *
+ * Side effect: cancel/die interrupts are held off until lock release.
+ *
+ * This uses almost the same locking approach as lwlock.c's
+ * LWLockAcquire(). See documentation at the top of lwlock.c for a more
+ * detailed discussion.
+ *
+ * The reason that this, and most of the other BufferLock* functions, get both
+ * the Buffer and BufferDesc* as parameters, is that looking up one from the
+ * other repeatedly shows up noticeably in profiles.
+ *
+ * Callers should provide a constant for mode, for more efficient code
+ * generation.
+ */
+static inline void
+BufferLockAcquire(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode)
+{
+ PrivateRefCountEntry *entry;
+ int extraWaits = 0;
+
+ /*
+ * Get reference to the refcount entry before we hold the lock, it seems
+ * better to do before holding the lock.
+ */
+ entry = GetPrivateRefCountEntry(buffer, true);
+
+ /*
+ * We better not already hold a lock on the buffer.
+ */
+ Assert(entry->data.lockmode == BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the content lock. This ensures that interrupts will not interfere
+ * with manipulations of data structures in shared memory.
+ */
+ HOLD_INTERRUPTS();
+
+ for (;;)
+ {
+ bool mustwait;
+ uint32 wait_event;
+
+ /*
+ * Try to grab the lock the first time, we're not in the waitqueue
+ * yet/anymore.
+ */
+ mustwait = BufferLockAttempt(buf_hdr, mode);
+
+ if (likely(!mustwait))
+ {
+ break;
+ }
+
+ /*
+ * Ok, at this point we couldn't grab the lock on the first try. We
+ * cannot simply queue ourselves to the end of the list and wait to be
+ * woken up because by now the lock could long have been released.
+ * Instead add us to the queue and try to grab the lock again. If we
+ * succeed we need to revert the queuing and be happy, otherwise we
+ * recheck the lock. If we still couldn't grab it, we know that the
+ * other locker will see our queue entries when releasing since they
+ * existed before we checked for the lock.
+ */
+
+ /* add to the queue */
+ BufferLockQueueSelf(buf_hdr, mode);
+
+ /* we're now guaranteed to be woken up if necessary */
+ mustwait = BufferLockAttempt(buf_hdr, mode);
+
+ /* ok, grabbed the lock the second time round, need to undo queueing */
+ if (!mustwait)
+ {
+ BufferLockDequeueSelf(buf_hdr);
+ break;
+ }
+
+ switch (mode)
+ {
+ case BUFFER_LOCK_EXCLUSIVE:
+ wait_event = WAIT_EVENT_BUFFER_EXCLUSIVE;
+ break;
+ case BUFFER_LOCK_SHARE_EXCLUSIVE:
+ wait_event = WAIT_EVENT_BUFFER_SHARE_EXCLUSIVE;
+ break;
+ case BUFFER_LOCK_SHARE:
+ wait_event = WAIT_EVENT_BUFFER_SHARED;
+ break;
+ case BUFFER_LOCK_UNLOCK:
+ pg_unreachable();
+
+ }
+ pgstat_report_wait_start(wait_event);
+
+ /*
+ * Wait until awakened.
+ *
+ * It is possible that we get awakened for a reason other than being
+ * signaled by BufferLockWakeup(). If so, loop back and wait again.
+ * Once we've gotten the lock, re-increment the sema by the number of
+ * additional signals received.
+ */
+ for (;;)
+ {
+ PGSemaphoreLock(MyProc->sem);
+ if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
+ break;
+ extraWaits++;
+ }
+
+ pgstat_report_wait_end();
+
+ /* Retrying, allow BufferLockRelease to release waiters again. */
+ pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_WAKE_IN_PROGRESS);
+ }
+
+ /* Remember that we now hold this lock */
+ entry->data.lockmode = mode;
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (unlikely(extraWaits-- > 0))
+ PGSemaphoreUnlock(MyProc->sem);
+}
+
+/*
+ * Release a previously acquired buffer content lock.
+ */
+static void
+BufferLockUnlock(Buffer buffer, BufferDesc *buf_hdr)
+{
+ BufferLockMode mode;
+ uint64 oldstate;
+ uint64 sub;
+
+ mode = BufferLockDisownInternal(buffer, buf_hdr);
+
+ /*
+ * Release my hold on lock, after that it can immediately be acquired by
+ * others, even if we still have to wakeup other waiters.
+ */
+ sub = BufferLockReleaseSub(mode);
+
+ oldstate = pg_atomic_sub_fetch_u64(&buf_hdr->state, sub);
+
+ BufferLockProcessRelease(buf_hdr, mode, oldstate);
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ RESUME_INTERRUPTS();
+}
+
+
+/*
+ * Acquire the content lock for the buffer, but only if we don't have to wait.
+ */
+static bool
+BufferLockConditional(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode)
+{
+ PrivateRefCountEntry *entry = GetPrivateRefCountEntry(buffer, true);
+ bool mustwait;
+
+ /*
+ * We better not already hold a lock on the buffer.
+ */
+ Assert(entry->data.lockmode == BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the content lock. This ensures that interrupts will not interfere
+ * with manipulations of data structures in shared memory.
+ */
+ HOLD_INTERRUPTS();
+
+ /* Check for the lock */
+ mustwait = BufferLockAttempt(buf_hdr, mode);
+
+ if (mustwait)
+ {
+ /* Failed to get lock, so release interrupt holdoff */
+ RESUME_INTERRUPTS();
+ }
+ else
+ {
+ entry->data.lockmode = mode;
+ }
+
+ return !mustwait;
+}
+
+/*
+ * Internal function that tries to atomically acquire the content lock in the
+ * passed in mode.
+ *
+ * This function will not block waiting for a lock to become free - that's the
+ * caller's job.
+ *
+ * Similar to LWLockAttemptLock().
+ */
+static inline bool
+BufferLockAttempt(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+ uint64 old_state;
+
+ /*
+ * Read once outside the loop, later iterations will get the newer value
+ * via compare & exchange.
+ */
+ old_state = pg_atomic_read_u64(&buf_hdr->state);
+
+ /* loop until we've determined whether we could acquire the lock or not */
+ while (true)
+ {
+ uint64 desired_state;
+ bool lock_free;
+
+ desired_state = old_state;
+
+ if (mode == BUFFER_LOCK_EXCLUSIVE)
+ {
+ lock_free = (old_state & BM_LOCK_MASK) == 0;
+ if (lock_free)
+ desired_state += BM_LOCK_VAL_EXCLUSIVE;
+ }
+ else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ {
+ lock_free = (old_state & (BM_LOCK_VAL_EXCLUSIVE | BM_LOCK_VAL_SHARE_EXCLUSIVE)) == 0;
+ if (lock_free)
+ desired_state += BM_LOCK_VAL_SHARE_EXCLUSIVE;
+ }
+ else
+ {
+ lock_free = (old_state & BM_LOCK_VAL_EXCLUSIVE) == 0;
+ if (lock_free)
+ desired_state += BM_LOCK_VAL_SHARED;
+ }
+
+ /*
+ * Attempt to swap in the state we are expecting. If we didn't see
+ * lock to be free, that's just the old value. If we saw it as free,
+ * we'll attempt to mark it acquired. The reason that we always swap
+ * in the value is that this doubles as a memory barrier. We could try
+ * to be smarter and only swap in values if we saw the lock as free,
+ * but benchmark haven't shown it as beneficial so far.
+ *
+ * Retry if the value changed since we last looked at it.
+ */
+ if (likely(pg_atomic_compare_exchange_u64(&buf_hdr->state,
+ &old_state, desired_state)))
+ {
+ if (lock_free)
+ {
+ /* Great! Got the lock. */
+ return false;
+ }
+ else
+ return true; /* somebody else has the lock */
+ }
+ }
+
+ pg_unreachable();
+}
+
+/*
+ * Add ourselves to the end of the content lock's wait queue.
+ */
+static void
+BufferLockQueueSelf(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+ /*
+ * If we don't have a PGPROC structure, there's no way to wait. This
+ * should never occur, since MyProc should only be null during shared
+ * memory initialization.
+ */
+ if (MyProc == NULL)
+ elog(PANIC, "cannot wait without a PGPROC structure");
+
+ if (MyProc->lwWaiting != LW_WS_NOT_WAITING)
+ elog(PANIC, "queueing for lock while waiting on another one");
+
+ LockBufHdr(buf_hdr);
+
+ /* setting the flag is protected by the spinlock */
+ pg_atomic_fetch_or_u64(&buf_hdr->state, BM_LOCK_HAS_WAITERS);
+
+ /*
+ * These are currently used both for lwlocks and buffer content locks,
+ * which is acceptable, although not pretty, because a backend can't wait
+ * for both types of locks at the same time.
+ */
+ MyProc->lwWaiting = LW_WS_WAITING;
+ MyProc->lwWaitMode = mode;
+
+ proclist_push_tail(&buf_hdr->lock_waiters, MyProcNumber, lwWaitLink);
+
+ /* Can release the mutex now */
+ UnlockBufHdr(buf_hdr);
+}
+
+/*
+ * Remove ourselves from the waitlist.
+ *
+ * This is used if we queued ourselves because we thought we needed to sleep
+ * but, after further checking, we discovered that we don't actually need to
+ * do so.
+ */
+static void
+BufferLockDequeueSelf(BufferDesc *buf_hdr)
+{
+ bool on_waitlist;
+
+ LockBufHdr(buf_hdr);
+
+ on_waitlist = MyProc->lwWaiting == LW_WS_WAITING;
+ if (on_waitlist)
+ proclist_delete(&buf_hdr->lock_waiters, MyProcNumber, lwWaitLink);
+
+ if (proclist_is_empty(&buf_hdr->lock_waiters) &&
+ (pg_atomic_read_u64(&buf_hdr->state) & BM_LOCK_HAS_WAITERS) != 0)
+ {
+ pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_HAS_WAITERS);
+ }
+
+ /* XXX: combine with fetch_and above? */
+ UnlockBufHdr(buf_hdr);
+
+ /* clear waiting state again, nice for debugging */
+ if (on_waitlist)
+ MyProc->lwWaiting = LW_WS_NOT_WAITING;
+ else
+ {
+ int extraWaits = 0;
+
+
+ /*
+ * Somebody else dequeued us and has or will wake us up. Deal with the
+ * superfluous absorption of a wakeup.
+ */
+
+ /*
+ * Clear BM_LOCK_WAKE_IN_PROGRESS if somebody woke us before we
+ * removed ourselves - they'll have set it.
+ */
+ pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_WAKE_IN_PROGRESS);
+
+ /*
+ * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
+ * get reset at some inconvenient point later. Most of the time this
+ * will immediately return.
+ */
+ for (;;)
+ {
+ PGSemaphoreLock(MyProc->sem);
+ if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
+ break;
+ extraWaits++;
+ }
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(MyProc->sem);
+ }
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * After calling this function it's the callers responsibility to ensure that
+ * the lock gets released, even in case of an error. This only is desirable if
+ * the lock is going to be released in a different process than the process
+ * that acquired it.
+ */
+static inline void
+BufferLockDisown(Buffer buffer, BufferDesc *buf_hdr)
+{
+ BufferLockDisownInternal(buffer, buf_hdr);
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * This is the code that can be shared between actually releasing a lock
+ * (BufferLockUnlock()) and just not tracking ownership of the lock anymore
+ * without releasing the lock (BufferLockDisown()).
+ */
+static inline int
+BufferLockDisownInternal(Buffer buffer, BufferDesc *buf_hdr)
+{
+ BufferLockMode mode;
+ PrivateRefCountEntry *ref;
+
+ ref = GetPrivateRefCountEntry(buffer, false);
+ if (ref == NULL)
+ elog(ERROR, "lock %d is not held", buffer);
+ mode = ref->data.lockmode;
+ ref->data.lockmode = BUFFER_LOCK_UNLOCK;
+
+ return mode;
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to acquire the lock.
+ *
+ * wake_exclusive indicates whether exclusive lock waiters should be woken up.
+ */
+static void
+BufferLockWakeup(BufferDesc *buf_hdr, bool wake_exclusive)
+{
+ bool new_wake_in_progress = false;
+ bool wake_share_exclusive = true;
+ proclist_head wakeup;
+ proclist_mutable_iter iter;
+
+ proclist_init(&wakeup);
+
+ /* lock wait list while collecting backends to wake up */
+ LockBufHdr(buf_hdr);
+
+ proclist_foreach_modify(iter, &buf_hdr->lock_waiters, lwWaitLink)
+ {
+ PGPROC *waiter = GetPGProcByNumber(iter.cur);
+
+ /*
+ * Already woke up a conflicting lock, so skip over this wait list
+ * entry.
+ */
+ if (!wake_exclusive && waiter->lwWaitMode == BUFFER_LOCK_EXCLUSIVE)
+ continue;
+ if (!wake_share_exclusive && waiter->lwWaitMode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ continue;
+
+ proclist_delete(&buf_hdr->lock_waiters, iter.cur, lwWaitLink);
+ proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
+
+ /*
+ * Prevent additional wakeups until retryer gets to run. Backends that
+ * are just waiting for the lock to become free don't retry
+ * automatically.
+ */
+ new_wake_in_progress = true;
+
+ /*
+ * Signal that the process isn't on the wait list anymore. This allows
+ * BufferLockDequeueSelf() to remove itself from the waitlist with a
+ * proclist_delete(), rather than having to check if it has been
+ * removed from the list.
+ */
+ Assert(waiter->lwWaiting == LW_WS_WAITING);
+ waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
+
+ /*
+ * Don't wakeup further waiters after waking a conflicting waiter.
+ */
+ if (waiter->lwWaitMode == BUFFER_LOCK_SHARE)
+ {
+ /*
+ * Share locks conflict with exclusive locks.
+ */
+ wake_exclusive = false;
+ }
+ else if (waiter->lwWaitMode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ {
+ /*
+ * Share-exclusive locks conflict with share-exclusive and
+ * exclusive locks.
+ */
+ wake_exclusive = false;
+ wake_share_exclusive = false;
+ }
+ else if (waiter->lwWaitMode == BUFFER_LOCK_EXCLUSIVE)
+ {
+ /*
+ * Exclusive locks conflict with all other locks, there's no point
+ * in waking up anybody else.
+ */
+ break;
+ }
+ }
+
+ Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u64(&buf_hdr->state) & BM_LOCK_HAS_WAITERS);
+
+ /* unset required flags, and release lock, in one fell swoop */
+ {
+ uint64 old_state;
+ uint64 desired_state;
+
+ old_state = pg_atomic_read_u64(&buf_hdr->state);
+ while (true)
+ {
+ desired_state = old_state;
+
+ /* compute desired flags */
+
+ if (new_wake_in_progress)
+ desired_state |= BM_LOCK_WAKE_IN_PROGRESS;
+ else
+ desired_state &= ~BM_LOCK_WAKE_IN_PROGRESS;
+
+ if (proclist_is_empty(&buf_hdr->lock_waiters))
+ desired_state &= ~BM_LOCK_HAS_WAITERS;
+
+ desired_state &= ~BM_LOCKED; /* release lock */
+
+ if (pg_atomic_compare_exchange_u64(&buf_hdr->state, &old_state,
+ desired_state))
+ break;
+ }
+ }
+
+ /* Awaken any waiters I removed from the queue. */
+ proclist_foreach_modify(iter, &wakeup, lwWaitLink)
+ {
+ PGPROC *waiter = GetPGProcByNumber(iter.cur);
+
+ proclist_delete(&wakeup, iter.cur, lwWaitLink);
+
+ /*
+ * Guarantee that lwWaiting being unset only becomes visible once the
+ * unlink from the link has completed. Otherwise the target backend
+ * could be woken up for other reason and enqueue for a new lock - if
+ * that happens before the list unlink happens, the list would end up
+ * being corrupted.
+ *
+ * The barrier pairs with the LockBufHdr() when enqueuing for another
+ * lock.
+ */
+ pg_write_barrier();
+ waiter->lwWaiting = LW_WS_NOT_WAITING;
+ PGSemaphoreUnlock(waiter->sem);
+ }
+}
+
+/*
+ * Compute subtraction from buffer state for a release of a held lock in
+ * `mode`.
+ *
+ * This is separated from BufferLockUnlock() as we want to combine the lock
+ * release with other atomic operations when possible, leading to the lock
+ * release being done in multiple places, each needing to compute what to
+ * subtract from the lock state.
+ */
+static inline uint64
+BufferLockReleaseSub(BufferLockMode mode)
+{
+ /*
+ * Turns out that a switch() leads gcc to generate sufficiently worse code
+ * for this to show up in profiles...
+ */
+ if (mode == BUFFER_LOCK_EXCLUSIVE)
+ return BM_LOCK_VAL_EXCLUSIVE;
+ else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ return BM_LOCK_VAL_SHARE_EXCLUSIVE;
+ else
+ {
+ Assert(mode == BUFFER_LOCK_SHARE);
+ return BM_LOCK_VAL_SHARED;
+ }
+
+ return 0; /* keep compiler quiet */
+}
+
+/*
+ * Handle work that needs to be done after releasing a lock that was held in
+ * `mode`, where `lockstate` is the result of the atomic operation modifying
+ * the state variable.
+ *
+ * This is separated from BufferLockUnlock() as we want to combine the lock
+ * release with other atomic operations when possible, leading to the lock
+ * release being done in multiple places.
+ */
+static void
+BufferLockProcessRelease(BufferDesc *buf_hdr, BufferLockMode mode, uint64 lockstate)
+{
+ bool check_waiters = false;
+ bool wake_exclusive = false;
+
+ /* nobody else can have that kind of lock */
+ Assert(!(lockstate & BM_LOCK_VAL_EXCLUSIVE));
+
+ /*
+ * If we're still waiting for backends to get scheduled, don't wake them
+ * up again. Otherwise check if we need to look through the waitqueue to
+ * wake other backends.
+ */
+ if ((lockstate & BM_LOCK_HAS_WAITERS) &&
+ !(lockstate & BM_LOCK_WAKE_IN_PROGRESS))
+ {
+ if ((lockstate & BM_LOCK_MASK) == 0)
+ {
+ /*
+ * We released a lock and the lock was, in that moment, free. We
+ * therefore can wake waiters for any kind of lock.
+ */
+ check_waiters = true;
+ wake_exclusive = true;
+ }
+ else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ {
+ /*
+ * We released the lock, but another backend still holds a lock.
+ * We can't have released an exclusive lock, as there couldn't
+ * have been other lock holders. If we released a share lock, no
+ * waiters need to be woken up, as there must be other share
+ * lockers. However, if we held a share-exclusive lock, another
+ * backend now could acquire a share-exclusive lock.
+ */
+ check_waiters = true;
+ wake_exclusive = false;
+ }
+ }
+
+ /*
+ * As waking up waiters requires the spinlock to be acquired, only do so
+ * if necessary.
+ */
+ if (check_waiters)
+ BufferLockWakeup(buf_hdr, wake_exclusive);
+}
+
+/*
+ * BufferLockHeldByMeInMode - test whether my process holds the content lock
+ * in the specified mode
+ *
+ * This is meant as debug support only.
+ */
+static bool
+BufferLockHeldByMeInMode(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+ PrivateRefCountEntry *entry =
+ GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf_hdr), false);
+
+ if (!entry)
+ return false;
+ else
+ return entry->data.lockmode == mode;
+}
+
+/*
+ * BufferLockHeldByMe - test whether my process holds the content lock in any
+ * mode
+ *
+ * This is meant as debug support only.
+ */
+static bool
+BufferLockHeldByMe(BufferDesc *buf_hdr)
+{
+ PrivateRefCountEntry *entry =
+ GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf_hdr), false);
+
+ if (!entry)
+ return false;
+ else
+ return entry->data.lockmode != BUFFER_LOCK_UNLOCK;
+}
+
+/*
+ * Release the content lock for the buffer.
*/
void
-LockBuffer(Buffer buffer, BufferLockMode mode)
+UnlockBuffer(Buffer buffer)
{
- BufferDesc *buf;
+ BufferDesc *buf_hdr;
Assert(BufferIsPinned(buffer));
if (BufferIsLocal(buffer))
return; /* local buffers need no lock */
- buf = GetBufferDescriptor(buffer - 1);
+ buf_hdr = GetBufferDescriptor(buffer - 1);
+ BufferLockUnlock(buffer, buf_hdr);
+}
+
+/*
+ * Acquire the content_lock for the buffer.
+ */
+void
+LockBufferInternal(Buffer buffer, BufferLockMode mode)
+{
+ BufferDesc *buf_hdr;
+
+ /*
+ * We can't wait if we haven't got a PGPROC. This should only occur
+ * during bootstrap or shared memory initialization. Put an Assert here
+ * to catch unsafe coding practices.
+ */
+ Assert(!(MyProc == NULL && IsUnderPostmaster));
+
+ /* handled in LockBuffer() wrapper */
+ Assert(mode != BUFFER_LOCK_UNLOCK);
- if (mode == BUFFER_LOCK_UNLOCK)
- LWLockRelease(BufferDescriptorGetContentLock(buf));
- else if (mode == BUFFER_LOCK_SHARE)
- LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+ Assert(BufferIsPinned(buffer));
+ if (BufferIsLocal(buffer))
+ return; /* local buffers need no lock */
+
+ buf_hdr = GetBufferDescriptor(buffer - 1);
+
+ /*
+ * Test the most frequent lock modes first. While a switch (mode) would be
+ * nice, at least gcc generates considerably worse code for it.
+ *
+ * Call BufferLockAcquire() with a constant argument for mode, to generate
+ * more efficient code for the different lock modes.
+ */
+ if (mode == BUFFER_LOCK_SHARE)
+ BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_SHARE);
else if (mode == BUFFER_LOCK_EXCLUSIVE)
- LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
+ BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_EXCLUSIVE);
+ else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+ BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_SHARE_EXCLUSIVE);
else
elog(ERROR, "unrecognized buffer lock mode: %d", mode);
}
buf = GetBufferDescriptor(buffer - 1);
- return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
- LW_EXCLUSIVE);
+ return BufferLockConditional(buffer, buf, BUFFER_LOCK_EXCLUSIVE);
}
/*
/*
* AbortBufferIO: Clean up active buffer I/O after an error.
*
- * All LWLocks we might have held have been released,
- * but we haven't yet released buffer pins, so the buffer is still pinned.
+ * All LWLocks & content locks we might have held have been released, but we
+ * haven't yet released buffer pins, so the buffer is still pinned.
*
* If I/O was in progress, we always set BM_IO_ERROR, even though it's
* possible the error condition wasn't related to the I/O.
return psprintf("lost track of buffer IO on buffer %d", buffer);
}
+/*
+ * Release buffer as part of resource owner cleanup. This will only be called
+ * if the buffer is pinned. If this backend held the content lock at the time
+ * of the error we also need to release that (note that it is not possible to
+ * hold a content lock without a pin).
+ */
static void
-ResOwnerReleaseBufferPin(Datum res)
+ResOwnerReleaseBuffer(Datum res)
{
Buffer buffer = DatumGetInt32(res);
if (BufferIsLocal(buffer))
UnpinLocalBufferNoOwner(buffer);
else
+ {
+ PrivateRefCountEntry *ref;
+
+ ref = GetPrivateRefCountEntry(buffer, false);
+
+ /* not having a private refcount would imply resowner corruption */
+ Assert(ref != NULL);
+
+ /*
+ * If the buffer was locked at the time of the resowner release,
+ * release the lock now. This should only happen after errors.
+ */
+ if (ref->data.lockmode != BUFFER_LOCK_UNLOCK)
+ {
+ BufferDesc *buf = GetBufferDescriptor(buffer - 1);
+
+ HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
+ BufferLockUnlock(buffer, buf);
+ }
+
UnpinBufferNoOwner(GetBufferDescriptor(buffer - 1));
+ }
}
static char *
-ResOwnerPrintBufferPin(Datum res)
+ResOwnerPrintBuffer(Datum res)
{
return DebugPrintBufferRefcount(DatumGetInt32(res));
}
/* If it was not already dirty, mark it as dirty. */
if (!(buf_state & BM_DIRTY))
{
- LWLockAcquire(BufferDescriptorGetContentLock(desc), LW_EXCLUSIVE);
+ BufferLockAcquire(buf, desc, BUFFER_LOCK_EXCLUSIVE);
MarkBufferDirty(buf);
result = true;
- LWLockRelease(BufferDescriptorGetContentLock(desc));
+ BufferLockUnlock(buf, desc);
}
else
*buffer_already_dirty = true;
*/
if (is_write && !is_temp)
{
- LWLock *content_lock;
-
- content_lock = BufferDescriptorGetContentLock(buf_hdr);
-
- Assert(LWLockHeldByMe(content_lock));
+ Assert(BufferLockHeldByMe(buf_hdr));
/*
* Lock is now owned by AIO subsystem.
*/
- LWLockDisown(content_lock);
+ BufferLockDisown(buffer, buf_hdr);
}
/*