Replace buffer I/O locks with condition variables.
authorAndres Freund <andres@anarazel.de>
Mon, 25 May 2020 10:02:55 +0000 (03:02 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:22 +0000 (16:20 -0700)
Author: Robert Haas
Discussion: https://postgr.es/m/CA+Tgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr=C56Xng@mail.gmail.com

src/backend/postmaster/pgstat.c
src/backend/storage/buffer/buf_init.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/lmgr/lwlock.c
src/include/pgstat.h
src/include/storage/buf_internals.h
src/include/storage/condition_variable.h
src/include/storage/lwlock.h

index c022597bc09ab9ac5a34f6d875203a5fa4427eff..bd1eb52cf19d8c0f9f24d4cc8f22ea9ce9a9eb24 100644 (file)
@@ -3940,6 +3940,9 @@ pgstat_get_wait_io(WaitEventIO w)
        case WAIT_EVENT_BUFFILE_WRITE:
            event_name = "BufFileWrite";
            break;
+       case WAIT_EVENT_BUFFILE_WAITIO:
+           event_name = "BufFileWaitIO";
+           break;
        case WAIT_EVENT_CONTROL_FILE_READ:
            event_name = "ControlFileRead";
            break;
index a8ce6603ed00b6e95608ccbda48fa724c435eabf..4ea2541f529120d03c15754182120bdf1afa765d 100644 (file)
@@ -19,7 +19,7 @@
 
 BufferDescPadded *BufferDescriptors;
 char      *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
 WritebackContext BackendWritebackContext;
 CkptSortItem *CkptBufferIds;
 
@@ -68,7 +68,7 @@ InitBufferPool(void)
 {
    bool        foundBufs,
                foundDescs,
-               foundIOLocks,
+               foundIOCV,
                foundBufCkpt;
 
    /* Align descriptors to a cacheline boundary. */
@@ -82,10 +82,10 @@ InitBufferPool(void)
                        NBuffers * (Size) BLCKSZ, &foundBufs);
 
    /* Align lwlocks to cacheline boundary */
-   BufferIOLWLockArray = (LWLockMinimallyPadded *)
-       ShmemInitStruct("Buffer IO Locks",
-                       NBuffers * (Size) sizeof(LWLockMinimallyPadded),
-                       &foundIOLocks);
+   BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+       ShmemInitStruct("Buffer IO Condition Variables",
+             NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+                                      &foundIOCV);
 
    /*
     * The array used to sort to-be-checkpointed buffer ids is located in
@@ -98,10 +98,10 @@ InitBufferPool(void)
        ShmemInitStruct("Checkpoint BufferIds",
                        NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
 
-   if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+   if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
    {
        /* should find all of these, or none of them */
-       Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+       Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
        /* note: this path is only taken in EXEC_BACKEND case */
    }
    else
@@ -131,8 +131,7 @@ InitBufferPool(void)
            LWLockInitialize(BufferDescriptorGetContentLock(buf),
                             LWTRANCHE_BUFFER_CONTENT);
 
-           LWLockInitialize(BufferDescriptorGetIOLock(buf),
-                            LWTRANCHE_BUFFER_IO);
+           ConditionVariableInit(BufferDescriptorGetIOCV(buf));
        }
 
        /* Correct last entry of linked list */
index 29c920800a63840336e6949e19086fea62171f14..8bfd63ece58a65edde4fa7cfd78a44694d73cce8 100644 (file)
@@ -1337,8 +1337,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
    LWLockRelease(newPartitionLock);
 
    /*
-    * Buffer contents are currently invalid.  Try to get the io_in_progress
-    * lock.  If StartBufferIO returns false, then someone else managed to
+    * Buffer contents are currently invalid.  Try to obtain the right to start
+    * I/O.  If StartBufferIO returns false, then someone else managed to
     * read it before we did, so there's nothing left for BufferAlloc() to do.
     */
    if (StartBufferIO(buf, true))
@@ -1728,9 +1728,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
        uint32      buf_state;
        uint32      old_buf_state;
 
-       /* I'd better not still hold any locks on the buffer */
+       /* I'd better not still hold the buffer content lock */
        Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
-       Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
 
        /*
         * Decrement the shared reference count.
@@ -2701,9 +2700,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
    uint32      buf_state;
 
    /*
-    * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
-    * false, then someone else flushed the buffer before we could, so we need
-    * not do anything.
+    * Try to start an I/O operation.  If StartBufferIO returns false, then
+    * someone else flushed the buffer before we could, so we need not do
+    * anything.
     */
    if (!StartBufferIO(buf, false))
        return;
@@ -2759,7 +2758,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
    /*
     * Now it's safe to write buffer to disk. Note that no one else should
     * have been able to write it while we were busy with log flushing because
-    * we have the io_in_progress lock.
+    * only one process at a time can set the BM_IO_IN_PROGRESS bit.
     */
    bufBlock = BufHdrGetBlock(buf);
 
@@ -2794,7 +2793,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 
    /*
     * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
-    * end the io_in_progress state.
+    * end the BM_IO_IN_PROGRESS state.
     */
    TerminateBufferIO(buf, true, 0);
 
@@ -4014,7 +4013,7 @@ IsBufferCleanupOK(Buffer buffer)
  * Functions for buffer I/O handling
  *
  * Note: We assume that nested buffer I/O never occurs.
- * i.e at most one io_in_progress lock is held per proc.
+ * i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
  *
  * Also note that these are used only for shared buffers, not local ones.
  */
@@ -4025,13 +4024,10 @@ IsBufferCleanupOK(Buffer buffer)
 static void
 WaitIO(BufferDesc *buf)
 {
-   /*
-    * Changed to wait until there's no IO - Inoue 01/13/2000
-    *
-    * Note this is *necessary* because an error abort in the process doing
-    * I/O could release the io_in_progress_lock prematurely. See
-    * AbortBufferIO.
-    */
+   ConditionVariable   *cv = BufferDescriptorGetIOCV(buf);
+
+   ConditionVariablePrepareToSleep(cv);
+
    for (;;)
    {
        uint32      buf_state;
@@ -4046,9 +4042,9 @@ WaitIO(BufferDesc *buf)
 
        if (!(buf_state & BM_IO_IN_PROGRESS))
            break;
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
+       ConditionVariableSleep(cv, WAIT_EVENT_BUFFILE_WAITIO);
    }
+   ConditionVariableCancelSleep();
 }
 
 /*
@@ -4060,7 +4056,7 @@ WaitIO(BufferDesc *buf)
  * In some scenarios there are race conditions in which multiple backends
  * could attempt the same I/O operation concurrently.  If someone else
  * has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
  *
  * Input operations are only attempted on buffers that are not BM_VALID,
  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -4078,25 +4074,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 
    for (;;)
    {
-       /*
-        * Grab the io_in_progress lock so that other processes can wait for
-        * me to finish the I/O.
-        */
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
        buf_state = LockBufHdr(buf);
 
        if (!(buf_state & BM_IO_IN_PROGRESS))
            break;
-
-       /*
-        * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
-        * lock isn't held is if the process doing the I/O is recovering from
-        * an error (see AbortBufferIO).  If that's the case, we must wait for
-        * him to get unwedged.
-        */
        UnlockBufHdr(buf, buf_state);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
        WaitIO(buf);
    }
 
@@ -4106,7 +4088,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
    {
        /* someone else already did the I/O */
        UnlockBufHdr(buf, buf_state);
-       LWLockRelease(BufferDescriptorGetIOLock(buf));
        return false;
    }
 
@@ -4124,7 +4105,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
  * (Assumptions)
  * My process is executing IO for the buffer
  * BM_IO_IN_PROGRESS bit is set for the buffer
- * We hold the buffer's io_in_progress lock
  * The buffer is Pinned
  *
  * If clear_dirty is true and BM_JUST_DIRTIED is not set, we clear the
@@ -4156,7 +4136,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 
    InProgressBuf = NULL;
 
-   LWLockRelease(BufferDescriptorGetIOLock(buf));
+   ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
@@ -4177,14 +4157,6 @@ AbortBufferIO(void)
    {
        uint32      buf_state;
 
-       /*
-        * Since LWLockReleaseAll has already been called, we're not holding
-        * the buffer's io_in_progress_lock. We have to re-acquire it so that
-        * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
-        * buffer will be in a busy spin until we succeed in doing this.
-        */
-       LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
        buf_state = LockBufHdr(buf);
        Assert(buf_state & BM_IO_IN_PROGRESS);
        if (IsForInput)
index 2fa90cc0954d9e58f311c18d0e7e402ec900f16e..f36e42cb21b47ac378ba1e28fb1a4b2a2513f54d 100644 (file)
@@ -146,8 +146,6 @@ static const char *const BuiltinTrancheNames[] = {
    "WALInsert",
    /* LWTRANCHE_BUFFER_CONTENT: */
    "BufferContent",
-   /* LWTRANCHE_BUFFER_IO: */
-   "BufferIO",
    /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
    "ReplicationOriginState",
    /* LWTRANCHE_REPLICATION_SLOT_IO: */
index 13872013823ec6bdbdaa87845ed911d583f68bab..4ad7173d9292c28242c2964132a58a145b60a4d9 100644 (file)
@@ -916,6 +916,7 @@ typedef enum
    WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO,
    WAIT_EVENT_BUFFILE_READ,
    WAIT_EVENT_BUFFILE_WRITE,
+   WAIT_EVENT_BUFFILE_WAITIO,
    WAIT_EVENT_CONTROL_FILE_READ,
    WAIT_EVENT_CONTROL_FILE_SYNC,
    WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE,
index e57f84ee9c8ce0ae731558c323dd12eea7c7ac11..7f03f39c69250b803e5684b123034bf17a5ab3ed 100644 (file)
@@ -18,6 +18,7 @@
 #include "port/atomics.h"
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
 
 #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
 
-#define BufferDescriptorGetIOLock(bdesc) \
-   (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+   (&(BufferIOCVArray[(bdesc)->buf_id]).cv)
 #define BufferDescriptorGetContentLock(bdesc) \
    ((LWLock*) (&(bdesc)->content_lock))
 
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
 
 /*
  * The freeNext field is either the index of the next freelist entry,
index ad209acfac06873560b4a40cde1f7f5989a09c92..1bdf08429cd8dab457589a38bee7911bf42a1ef2 100644 (file)
@@ -31,6 +31,17 @@ typedef struct
    proclist_head wakeup;       /* list of wake-able processes */
 } ConditionVariable;
 
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE        (sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+   ConditionVariable   cv;
+   char        pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *cv);
 
index af9b41795d26d9b789326dea60726efa24d5e848..e00bd3f39e95226b51daf3d8c41a41a35c79bdb1 100644 (file)
@@ -202,7 +202,6 @@ typedef enum BuiltinTrancheIds
    LWTRANCHE_SERIAL_BUFFER,
    LWTRANCHE_WAL_INSERT,
    LWTRANCHE_BUFFER_CONTENT,
-   LWTRANCHE_BUFFER_IO,
    LWTRANCHE_REPLICATION_ORIGIN_STATE,
    LWTRANCHE_REPLICATION_SLOT_IO,
    LWTRANCHE_LOCK_FASTPATH,