Fix races around recycling of IOs.
authorAndres Freund <andres@anarazel.de>
Mon, 25 May 2020 10:02:56 +0000 (03:02 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:58 +0000 (16:20 -0700)
Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:

src/backend/access/transam/xlog.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/ipc/aio.c
src/include/storage/aio.h

index ae05a3b4f10d241af26586d5ee55ebc62c417fae..1b190555a9a4a9fa0464a85830e8fc3b80e10a35 100644 (file)
@@ -10449,7 +10449,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
                PgAioInProgress *aio = pgaio_io_get();
 
                pgaio_start_fdatasync(aio, fd, true);
-               pgaio_wait_for_io(aio);
+               pgaio_wait_for_io(aio, true);
                pgaio_release(aio);
            }
 #else
@@ -10464,7 +10464,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
                PgAioInProgress *aio = pgaio_io_get();
 
                pgaio_start_fdatasync(aio, fd, true);
-               pgaio_wait_for_io(aio);
+               pgaio_wait_for_io(aio, true);
                pgaio_release(aio);
            }
            /* write synced it already */
index ec75eeff3568df02b80e182051d19eae1561493b..07a8778cf373ca009afebb5a0885c746cdf36597 100644 (file)
@@ -1138,7 +1138,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
        UnlockBufHdr(bufHdr, buf_state);
 
        aio = ReadBufferInitRead(smgr, forkNum, blockNum, buf, bufHdr, mode);
-       pgaio_wait_for_io(aio);
+       pgaio_wait_for_io(aio, true);
        pgaio_release(aio);
    }
 
@@ -3158,7 +3158,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
                       bufToWrite,
                       BufferDescriptorGetBuffer(buf),
                       false);
-       pgaio_wait_for_io(aio);
+       pgaio_wait_for_io(aio, true);
        pgaio_release(aio);
    }
 
@@ -4463,7 +4463,7 @@ WaitIO(BufferDesc *buf)
 
        if (aio)
        {
-           pgaio_wait_for_io(aio);
+           pgaio_wait_for_io(aio, false);
            ConditionVariablePrepareToSleep(cv);
        }
 
index 3d6465dba92d6937072f9eece8aca647f4e0ae07..eb2dc70f36df1b32928203320852abd5d75460a4 100644 (file)
@@ -399,8 +399,6 @@ pgaio_complete_ios(bool in_error)
    {
        PgAioInProgress *io = local_reaped_ios[i];
 
-       Assert(io->flags & PGAIOIP_IN_USE);
-
        if (!(io->flags & PGAIOIP_DONE))
        {
            PgAioCompletedCB cb;
@@ -412,6 +410,10 @@ pgaio_complete_ios(bool in_error)
            if (done)
            {
                io->flags |= PGAIOIP_DONE;
+               LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+               pgaio_put_io_locked(io, true);
+               LWLockRelease(SharedAIOCtlLock);
+
                num_complete++;
            }
            else
@@ -428,26 +430,8 @@ pgaio_complete_ios(bool in_error)
        }
    }
 
-   START_CRIT_SECTION();
-
-   /* and recycle io entries */
-   if (num_complete > 0)
-   {
-       LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
-       for (int i = 0; i < num_local_reaped; i++)
-       {
-           PgAioInProgress *io = local_reaped_ios[i];
-
-           if (io->flags & PGAIOIP_DONE)
-               pgaio_put_io_locked(io, true);
-       }
-       LWLockRelease(SharedAIOCtlLock);
-   }
-
    num_local_reaped = 0;
 
-   END_CRIT_SECTION();
-
    /* if any IOs weren't fully done, re-submit them */
    if (num_reissued)
        pgaio_submit_pending(false);
@@ -731,34 +715,70 @@ pgaio_backpressure(struct io_uring *ring, const char *loc)
 }
 
 void
-pgaio_wait_for_io(PgAioInProgress *io)
+pgaio_wait_for_io(PgAioInProgress *io, bool holding_reference)
 {
    uint8 init_flags;
+   uint8 flags;
+   bool increased_refcount = false;
 
 #ifdef PGAIO_VERBOSE
    elog(DEBUG2, "waiting for %zu",
         io - aio_ctl->in_progress_io);
 #endif
 
-   init_flags = *(volatile uint8*) &io->flags;
+   if (!holding_reference)
+   {
+       LWLockAcquire(SharedAIOCompletionLock, LW_SHARED);
+       LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+
+       init_flags = flags = *(volatile uint8*) &io->flags;
+
+       if (init_flags & (PGAIOIP_IN_USE))
+       {
+           io->refcount++;
+           increased_refcount = true;
+       }
+       else
+       {
+           Assert(init_flags & (PGAIOIP_IDLE | PGAIOIP_DONE | PGAIOIP_ONLY_USER));
+       }
+
+       LWLockRelease(SharedAIOCtlLock);
+       LWLockRelease(SharedAIOCompletionLock);
+
+       if (!increased_refcount)
+           return;
+   }
+   else
+   {
+       init_flags = flags = *(volatile uint8*) &io->flags;
+   }
+
+   Assert(io->refcount > 0);
+   Assert(!(init_flags & PGAIOIP_IDLE));
+   Assert(init_flags & (PGAIOIP_IN_USE | PGAIOIP_ONLY_USER));
 
    if (!(init_flags & PGAIOIP_INFLIGHT) &&
-       !(init_flags & (PGAIOIP_DONE | PGAIOIP_IDLE)))
+       !(init_flags & PGAIOIP_DONE))
    {
        pgaio_submit_pending(false);
    }
 
    while (true)
    {
-       if (io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE))
+       flags = *(volatile uint8*) &io->flags;
+
+       if (flags & PGAIOIP_DONE)
            break;
 
        pgaio_drain(&aio_ctl->shared_ring, false);
 
-       if (io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE))
+       flags = *(volatile uint8*) &io->flags;
+
+       if (flags & PGAIOIP_DONE)
            break;
 
-       if (io->flags & PGAIOIP_INFLIGHT)
+       if (flags & PGAIOIP_INFLIGHT)
        {
            int ret;
 
@@ -770,7 +790,8 @@ pgaio_wait_for_io(PgAioInProgress *io)
                ResetLatch(MyLatch);
            }
 
-           if (!(io->flags & PGAIOIP_INFLIGHT))
+           flags = *(volatile uint8*) &io->flags;
+           if (!(flags & PGAIOIP_INFLIGHT))
            {
                PG_SETMASK(&UnBlockSig);
                continue;
@@ -809,13 +830,24 @@ pgaio_wait_for_io(PgAioInProgress *io)
            if (IsUnderPostmaster)
                ConditionVariablePrepareToSleep(&io->cv);
 
-           if (!(io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE)))
+           flags = *(volatile uint8*) &io->flags;
+           if (!(flags & PGAIOIP_DONE))
                ConditionVariableSleep(&io->cv, 0);
 
            if (IsUnderPostmaster)
                ConditionVariableCancelSleep();
        }
    }
+
+   flags = *(volatile uint8*) &io->flags;
+   Assert(flags & (PGAIOIP_DONE | PGAIOIP_ONLY_USER));
+
+   if (increased_refcount)
+   {
+       LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+       pgaio_put_io_locked(io, false);
+       LWLockRelease(SharedAIOCtlLock);
+   }
 }
 
 PgAioInProgress *
@@ -860,16 +892,28 @@ pgaio_io_get(void)
    return io;
 }
 
+extern void
+pgaio_io_recycle(PgAioInProgress *io)
+{
+   uint32 init_flags = *(volatile uint8*) &io->flags;
+
+   Assert(init_flags & PGAIOIP_ONLY_USER);
+   Assert(io->refcount == 1);
+
+   io->flags &= ~PGAIOIP_DONE;
+}
+
 static void  __attribute__((noinline))
 pgaio_prepare_io(PgAioInProgress *io, PgAioAction action)
 {
-   Assert(!(io->flags & PGAIOIP_IDLE));
+   Assert(!(io->flags & (PGAIOIP_IDLE | PGAIOIP_DONE)));
    /* true for now, but not necessarily in the future */
    Assert(io->flags & PGAIOIP_ONLY_USER);
+   Assert(io->refcount == 1);
 
    Assert(num_local_pending_requests < PGAIO_SUBMIT_BATCH_SIZE);
 
-   io->flags &= ~(PGAIOIP_ONLY_USER | PGAIOIP_DONE);
+   io->flags &= ~PGAIOIP_ONLY_USER;
 
    io->flags |= PGAIOIP_IN_USE;
    /* for this module */
@@ -903,15 +947,17 @@ pgaio_put_io_locked(PgAioInProgress *io, bool release_internal)
    if (release_internal && io->refcount > 0)
    {
        io->flags |= PGAIOIP_ONLY_USER;
+       io->flags &= ~PGAIOIP_IN_USE;
        pgaio_bounce_buffer_release_locked(io);
    }
 
    if (io->refcount > 0)
        return;
 
-   Assert(io->flags & PGAIOIP_DONE);
+   Assert(io->flags & PGAIOIP_DONE ||
+          io->flags & PGAIOIP_ONLY_USER);
 
-   io->flags &= ~(PGAIOIP_IN_USE|PGAIOIP_DONE | PGAIOIP_ONLY_USER);
+   io->flags &= ~(PGAIOIP_IN_USE | PGAIOIP_DONE | PGAIOIP_ONLY_USER);
    io->flags |= PGAIOIP_IDLE;
    io->type = 0;
    io->initiatorProcIndex = INVALID_PGPROCNO;
index d67655d807cd0c360af60632908412c55b1b67fa..c26f6f5536e36ec39b6bba728c40a3efa7de61cd 100644 (file)
@@ -48,6 +48,7 @@ extern void pgaio_at_abort(void);
  * management of checkpointer, for readahead)
  */
 extern PgAioInProgress *pgaio_io_get(void);
+extern void pgaio_io_recycle(PgAioInProgress *io);
 
 extern void pgaio_start_flush_range(PgAioInProgress *io, int fd, off_t offset, off_t nbytes);
 extern void pgaio_start_nop(PgAioInProgress *io);
@@ -66,7 +67,7 @@ extern void pgaio_submit_pending(bool drain);
 extern void pgaio_drain_shared(void);
 extern void pgaio_drain_outstanding(void);
 
-extern void pgaio_wait_for_io(PgAioInProgress *io);
+extern void pgaio_wait_for_io(PgAioInProgress *io, bool holding_reference);
 
 extern void pgaio_print_queues(void);