things are kinda working
authorAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:49 +0000 (16:20 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:49 +0000 (16:20 -0700)
25 files changed:
contrib/pg_prewarm/pg_prewarm.c
src/backend/access/heap/heapam.c
src/backend/access/table/tableam.c
src/backend/main/main.c
src/backend/port/sysv_shmem.c
src/backend/postmaster/postmaster.c
src/backend/storage/buffer/buf_init.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/backend/storage/file/fd.c
src/backend/storage/ipc/Makefile
src/backend/storage/ipc/aio.c [new file with mode: 0644]
src/backend/storage/ipc/ipc.c
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/lwlocknames.txt
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/backend/tcop/postgres.c
src/backend/utils/init/miscinit.c
src/include/storage/aio.h [new file with mode: 0644]
src/include/storage/buf_internals.h
src/include/storage/bufmgr.h
src/include/storage/fd.h
src/include/storage/md.h
src/include/storage/smgr.h

index 33e2d28b2767ed2c5ea200d641eba1eebb38851b..7c6ff66ea728ec3c3a13c5b9c7562ddc1da1f015 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/relation.h"
 #include "fmgr.h"
 #include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
 #include "utils/acl.h"
@@ -33,7 +34,8 @@ typedef enum
 {
    PREWARM_PREFETCH,
    PREWARM_READ,
-   PREWARM_BUFFER
+   PREWARM_BUFFER,
+   PREWARM_BUFFER_AIO
 } PrewarmType;
 
 static PGAlignedBlock blockbuffer;
@@ -86,6 +88,8 @@ pg_prewarm(PG_FUNCTION_ARGS)
        ptype = PREWARM_READ;
    else if (strcmp(ttype, "buffer") == 0)
        ptype = PREWARM_BUFFER;
+   else if (strcmp(ttype, "buffer_aio") == 0)
+       ptype = PREWARM_BUFFER_AIO;
    else
    {
        ereport(ERROR,
@@ -197,6 +201,23 @@ pg_prewarm(PG_FUNCTION_ARGS)
            ++blocks_done;
        }
    }
+   else if (ptype == PREWARM_BUFFER_AIO)
+   {
+       for (block = first_block; block <= last_block; ++block)
+       {
+           Buffer      buf;
+           PgAioInProgress *io;
+
+           CHECK_FOR_INTERRUPTS();
+           io = ReadBufferAsync(rel, forkNumber, block, RBM_NORMAL, NULL, &buf);
+           if (!io)
+               ReleaseBuffer(buf);
+           ++blocks_done;
+       }
+
+       pgaio_submit_pending();
+       pgaio_drain_outstanding();
+   }
 
    /* Close relation, release lock. */
    relation_close(rel, AccessShareLock);
index 537913d1bb3e357dfed65b1c31eb71309ce73364..21dcc45923de57c64985cfab3f9a414fd80a6351 100644 (file)
@@ -54,6 +54,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
index c814733b2222e3e08992d57295c90bd77b780dc3..f6700479a2e555fe98d718aa19ee421c958ace99 100644 (file)
@@ -25,6 +25,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "optimizer/plancat.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
@@ -473,6 +474,37 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca
    else
        page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
 
+#if 1
+#define PREFETCH_AHEAD ((4 * 1024 * 1024) / 8192)
+#define PREFETCH_BATCH 32
+   if (nallocated % PREFETCH_BATCH == 0 &&
+       (nallocated + PREFETCH_AHEAD < pbscan->phs_nblocks))
+   {
+       PgAioInProgress *io;
+       Buffer buf;
+       uint64 target = nallocated + pbscan->phs_startblock + PREFETCH_AHEAD;
+       int i;
+
+       for (i = 0; i < PREFETCH_BATCH && target < pbscan->phs_nblocks;
+            i++, target++)
+       {
+           io = ReadBufferAsync(
+               rel, MAIN_FORKNUM,
+               target % pbscan->phs_nblocks,
+               RBM_NORMAL, NULL, &buf);
+
+           if (!io)
+               ReleaseBuffer(buf);
+       }
+       pgaio_submit_pending();
+       //elog(LOG, "issued %d prefetches at %lu", i, nallocated);
+   }
+   else
+   {
+       pgaio_drain_shared();
+   }
+#endif
+
    /*
     * Report scan location.  Normally, we report the current page number.
     * When we reach the end of the scan, though, we report the starting page,
index a4dd233c7f9226b2af88436ae32a18d2570afc6e..6a6081bcce8771aa5a1e3a532e1eb0ff2983ddfa 100644 (file)
@@ -35,6 +35,7 @@
 #include "common/username.h"
 #include "port/atomics.h"
 #include "postmaster/postmaster.h"
+#include "storage/aio.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
 #include "utils/help_config.h"
index 198a6985bf3f3de6ce0ad78e122cc203ec374ffb..7c52c429a720dc66a542b5a41d9821b9ec7c4293 100644 (file)
@@ -551,7 +551,7 @@ CreateAnonymousSegment(Size *size)
            allocsize += hugepagesize - (allocsize % hugepagesize);
 
        ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
-                  PG_MMAP_FLAGS | mmap_flags, -1, 0);
+                  PG_MMAP_FLAGS | MAP_POPULATE | mmap_flags, -1, 0);
        mmap_errno = errno;
        if (huge_pages == HUGE_PAGES_TRY && ptr == MAP_FAILED)
            elog(DEBUG1, "mmap(%zu) with MAP_HUGETLB failed, huge pages disabled: %m",
@@ -567,7 +567,7 @@ CreateAnonymousSegment(Size *size)
         */
        allocsize = *size;
        ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
-                  PG_MMAP_FLAGS, -1, 0);
+                  PG_MMAP_FLAGS | MAP_POPULATE, -1, 0);
        mmap_errno = errno;
    }
 
index b4d475bb0ba2e051ed6c797612ad85eb3bc15e1d..3398ba5cf4c467ba0dc97f9c74296f0a74dfe18d 100644 (file)
 #include "postmaster/syslogger.h"
 #include "replication/logicallauncher.h"
 #include "replication/walsender.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
@@ -1002,6 +1003,13 @@ PostmasterMain(int argc, char *argv[])
     */
    InitializeMaxBackends();
 
+   /*
+    * As AIO might create interal FDs, and will trigger shared memory
+    * allocations, need to do this before reset_shared() and
+    * set_max_safe_fds().
+    */
+   pgaio_postmaster_init();
+
    /*
     * Set up shared memory and semaphores.
     */
index 4ea2541f529120d03c15754182120bdf1afa765d..921991e5e3f89cbb105024d1bce34e05a9a7d9bc 100644 (file)
@@ -78,8 +78,9 @@ InitBufferPool(void)
                        &foundDescs);
 
    BufferBlocks = (char *)
+       TYPEALIGN(BLCKSZ,
        ShmemInitStruct("Buffer Blocks",
-                       NBuffers * (Size) BLCKSZ, &foundBufs);
+                       (NBuffers + 1) * (Size) BLCKSZ, &foundBufs));
 
    /* Align lwlocks to cacheline boundary */
    BufferIOCVArray = (ConditionVariableMinimallyPadded *)
@@ -163,6 +164,8 @@ BufferShmemSize(void)
    size = add_size(size, PG_CACHE_LINE_SIZE);
 
    /* size of data pages */
+   /* to allow aligning buffer blocks */
+   size = add_size(size, BLCKSZ);
    size = add_size(size, mul_size(NBuffers, BLCKSZ));
 
    /* size of stuff controlled by freelist.c */
index 8bfd63ece58a65edde4fa7cfd78a44694d73cce8..1be0baf98c499d7aba1ed3fad49b0b6a92f3ad7b 100644 (file)
@@ -43,6 +43,7 @@
 #include "pg_trace.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
@@ -452,6 +453,9 @@ static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
                                ForkNumber forkNum, BlockNumber blockNum,
                                ReadBufferMode mode, BufferAccessStrategy strategy,
                                bool *hit);
+static BufferDesc *ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+                BlockNumber blockNum, ReadBufferMode mode,
+                BufferAccessStrategy strategy, bool *hit, bool isLocalBuf);
 static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(BufferDesc *buf);
 static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
@@ -705,106 +709,85 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
                             mode, strategy, &hit);
 }
 
+PgAioInProgress*
+ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
+               ReadBufferMode mode, BufferAccessStrategy strategy,
+               Buffer *buf)
+{
+   BufferDesc *bufHdr;
+   bool hit;
+   PgAioInProgress* aio;
+
+   if (mode != RBM_NORMAL || strategy != NULL || blockNum == P_NEW)
+       elog(ERROR, "unsupported");
+
+   /* Open it at the smgr level if not already done */
+   RelationOpenSmgr(reln);
+
+   bufHdr = ReadBuffer_start(reln->rd_smgr, reln->rd_rel->relpersistence, forkNum,
+                             blockNum, mode, strategy, &hit, false);
+   *buf = BufferDescriptorGetBuffer(bufHdr);
+
+   if (hit)
+       return NULL;
+
+   InProgressBuf = NULL;
+
+   aio = smgrstartread(reln->rd_smgr, forkNum, blockNum,
+                       BufHdrGetBlock(bufHdr), *buf);
+   Assert(aio != NULL);
+
+   /*
+    * Decrement local pin, but keep shared. Shared will be released upon
+    * completion.
+    */
+   {
+       PrivateRefCountEntry *ref;
+
+       ref = GetPrivateRefCountEntry(*buf, false);
+       Assert(ref != NULL);
+       Assert(ref->refcount > 0);
+
+       ResourceOwnerForgetBuffer(CurrentResourceOwner, *buf);
+       ref->refcount--;
+       ForgetPrivateRefCountEntry(ref);
+   }
+
+   return aio;
+}
 
-/*
- * ReadBuffer_common -- common logic for all ReadBuffer variants
- *
- * *hit is set to true if the request was satisfied from shared buffer cache.
- */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_extend(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
                  BlockNumber blockNum, ReadBufferMode mode,
-                 BufferAccessStrategy strategy, bool *hit)
+                 BufferAccessStrategy strategy, bool *hit, bool isLocalBuf)
 {
+   bool        found;
    BufferDesc *bufHdr;
    Block       bufBlock;
-   bool        found;
-   bool        isExtend;
-   bool        isLocalBuf = SmgrIsTemp(smgr);
 
    *hit = false;
 
    /* Make sure we will have room to remember the buffer pin */
    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
-   isExtend = (blockNum == P_NEW);
-
-   TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-                                      smgr->smgr_rnode.node.spcNode,
-                                      smgr->smgr_rnode.node.dbNode,
-                                      smgr->smgr_rnode.node.relNode,
-                                      smgr->smgr_rnode.backend,
-                                      isExtend);
-
-   /* Substitute proper block number if caller asked for P_NEW */
-   if (isExtend)
-       blockNum = smgrnblocks(smgr, forkNum);
+   /* Substitute proper block number */
+   blockNum = smgrnblocks(smgr, forkNum);
 
    if (isLocalBuf)
    {
        bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-       if (found)
-           pgBufferUsage.local_blks_hit++;
-       else if (isExtend)
-           pgBufferUsage.local_blks_written++;
-       else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-                mode == RBM_ZERO_ON_ERROR)
-           pgBufferUsage.local_blks_read++;
+       pgBufferUsage.local_blks_written++;
    }
    else
    {
-       /*
-        * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-        * not currently in memory.
-        */
        bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
                             strategy, &found);
-       if (found)
-           pgBufferUsage.shared_blks_hit++;
-       else if (isExtend)
-           pgBufferUsage.shared_blks_written++;
-       else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-                mode == RBM_ZERO_ON_ERROR)
-           pgBufferUsage.shared_blks_read++;
+       pgBufferUsage.shared_blks_written++;
    }
 
-   /* At this point we do NOT hold any locks. */
-
-   /* if it was already in the buffer pool, we're done */
    if (found)
    {
-       if (!isExtend)
-       {
-           /* Just need to update stats before we exit */
-           *hit = true;
-           VacuumPageHit++;
-
-           if (VacuumCostActive)
-               VacuumCostBalance += VacuumCostPageHit;
-
-           TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-                                             smgr->smgr_rnode.node.spcNode,
-                                             smgr->smgr_rnode.node.dbNode,
-                                             smgr->smgr_rnode.node.relNode,
-                                             smgr->smgr_rnode.backend,
-                                             isExtend,
-                                             found);
-
-           /*
-            * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
-            * locked on return.
-            */
-           if (!isLocalBuf)
-           {
-               if (mode == RBM_ZERO_AND_LOCK)
-                   LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-                                 LW_EXCLUSIVE);
-               else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-                   LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-           }
-
-           return BufferDescriptorGetBuffer(bufHdr);
-       }
+       Block       bufBlock;
 
        /*
         * We get here only in the corner case where we are trying to extend
@@ -857,13 +840,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
                UnlockBufHdr(bufHdr, buf_state);
            } while (!StartBufferIO(bufHdr, true));
        }
+
    }
 
    /*
-    * if we have gotten to this point, we have allocated a buffer for the
-    * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-    * if it's a shared buffer.
-    *
     * Note: if smgrextend fails, we will end up with a buffer that is
     * allocated but not marked BM_VALID.  P_NEW will still select the same
     * block number (because the relation didn't get any longer on disk) and
@@ -875,82 +855,235 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
    bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
 
-   if (isExtend)
+   /* new buffers are zero-filled */
+   MemSet((char *) bufBlock, 0, BLCKSZ);
+   /* don't set checksum for all-zero page */
+   smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
+
+   /*
+    * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
+    * although we're essentially performing a write. At least on linux
+    * doing so defeats the 'delayed allocation' mechanism, leading to
+    * increased file fragmentation.
+    */
+
+   /*
+    * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
+    * the page as valid, to make sure that no other backend sees the zeroed
+    * page before the caller has had a chance to initialize it.
+    *
+    * Since no-one else can be looking at the page contents yet, there is no
+    * difference between an exclusive lock and a cleanup-strength lock. (Note
+    * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
+    * they assert that the buffer is already valid.)
+    */
+   if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
+       !isLocalBuf)
+   {
+       LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+   }
+
+
+   if (isLocalBuf)
    {
-       /* new buffers are zero-filled */
-       MemSet((char *) bufBlock, 0, BLCKSZ);
-       /* don't set checksum for all-zero page */
-       smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
+       /* Only need to adjust flags */
+       uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
 
-       /*
-        * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
-        * although we're essentially performing a write. At least on linux
-        * doing so defeats the 'delayed allocation' mechanism, leading to
-        * increased file fragmentation.
-        */
+       buf_state |= BM_VALID;
+       pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+   }
+   else
+   {
+       /* Set BM_VALID, terminate IO, and wake up any waiters */
+       TerminateBufferIO(bufHdr, false, BM_VALID);
+   }
+
+   return BufferDescriptorGetBuffer(bufHdr);
+}
+
+static BufferDesc *
+ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+                BlockNumber blockNum, ReadBufferMode mode,
+                BufferAccessStrategy strategy, bool *hit, bool isLocalBuf)
+{
+   BufferDesc *bufHdr;
+   bool        found;
+
+   Assert(blockNum != P_NEW);
+
+   *hit = false;
+
+   /* Make sure we will have room to remember the buffer pin */
+   ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+
+   TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+                                      smgr->smgr_rnode.node.spcNode,
+                                      smgr->smgr_rnode.node.dbNode,
+                                      smgr->smgr_rnode.node.relNode,
+                                      smgr->smgr_rnode.backend,
+                                      isExtend);
+
+   if (isLocalBuf)
+   {
+       bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
+       if (found)
+           pgBufferUsage.local_blks_hit++;
+       else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
+                mode == RBM_ZERO_ON_ERROR)
+           pgBufferUsage.local_blks_read++;
    }
    else
    {
        /*
-        * Read in the page, unless the caller intends to overwrite it and
-        * just wants us to allocate a buffer.
+        * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
+        * not currently in memory.
         */
-       if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-           MemSet((char *) bufBlock, 0, BLCKSZ);
-       else
-       {
-           instr_time  io_start,
-                       io_time;
+       bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
+                            strategy, &found);
+       if (found)
+           pgBufferUsage.shared_blks_hit++;
+       else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
+                mode == RBM_ZERO_ON_ERROR)
+           pgBufferUsage.shared_blks_read++;
+   }
 
-           if (track_io_timing)
-               INSTR_TIME_SET_CURRENT(io_start);
 
-           smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
+   /* At this point we do NOT hold any locks. */
 
-           if (track_io_timing)
-           {
-               INSTR_TIME_SET_CURRENT(io_time);
-               INSTR_TIME_SUBTRACT(io_time, io_start);
-               pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
-               INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
-           }
+   /* if it was already in the buffer pool, we're done */
+   if (found)
+   {
+       /* Just need to update stats before we exit */
+       *hit = true;
+       VacuumPageHit++;
 
-           /* check for garbage data */
-           if (!PageIsVerified((Page) bufBlock, blockNum))
-           {
-               if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
-               {
-                   ereport(WARNING,
-                           (errcode(ERRCODE_DATA_CORRUPTED),
-                            errmsg("invalid page in block %u of relation %s; zeroing out page",
-                                   blockNum,
-                                   relpath(smgr->smgr_rnode, forkNum))));
-                   MemSet((char *) bufBlock, 0, BLCKSZ);
-               }
-               else
-                   ereport(ERROR,
-                           (errcode(ERRCODE_DATA_CORRUPTED),
-                            errmsg("invalid page in block %u of relation %s",
-                                   blockNum,
-                                   relpath(smgr->smgr_rnode, forkNum))));
-           }
+       if (VacuumCostActive)
+           VacuumCostBalance += VacuumCostPageHit;
+
+       TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
+                                         smgr->smgr_rnode.node.spcNode,
+                                         smgr->smgr_rnode.node.dbNode,
+                                         smgr->smgr_rnode.node.relNode,
+                                         smgr->smgr_rnode.backend,
+                                         isExtend,
+                                         found);
+
+       /*
+        * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
+        * locked on return.
+        */
+       if (!isLocalBuf)
+       {
+           if (mode == RBM_ZERO_AND_LOCK)
+               LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                             LW_EXCLUSIVE);
+           else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
+               LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
        }
+
+       return bufHdr;
    }
 
+   return bufHdr;
+}
+
+/*
+ * ReadBuffer_common -- common logic for all ReadBuffer variants
+ *
+ * *hit is set to true if the request was satisfied from shared buffer cache.
+ */
+static Buffer
+ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+                 BlockNumber blockNum, ReadBufferMode mode,
+                 BufferAccessStrategy strategy, bool *hit)
+{
+   Block       bufBlock;
+   bool        isLocalBuf = SmgrIsTemp(smgr);
+   BufferDesc *bufHdr;
+
+   if (blockNum == P_NEW)
+       return ReadBuffer_extend(smgr, relpersistence, forkNum,
+                                blockNum, mode, strategy,
+                                hit, isLocalBuf);
+
+
+   bufHdr = ReadBuffer_start(smgr, relpersistence, forkNum,
+                             blockNum, mode, strategy,
+                             hit, isLocalBuf);
+
+   if (*hit)
+       return BufferDescriptorGetBuffer(bufHdr);
+
    /*
-    * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
-    * the page as valid, to make sure that no other backend sees the zeroed
-    * page before the caller has had a chance to initialize it.
-    *
-    * Since no-one else can be looking at the page contents yet, there is no
-    * difference between an exclusive lock and a cleanup-strength lock. (Note
-    * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-    * they assert that the buffer is already valid.)
+    * if we have gotten to this point, we have allocated a buffer for the
+    * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
+    * if it's a shared buffer.
     */
-   if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-       !isLocalBuf)
+   Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));   /* spinlock not needed */
+
+   bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+
+   /*
+    * Read in the page, unless the caller intends to overwrite it and
+    * just wants us to allocate a buffer.
+    */
+   if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
    {
-       LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+       MemSet((char *) bufBlock, 0, BLCKSZ);
+
+       /*
+        * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
+        * the page as valid, to make sure that no other backend sees the zeroed
+        * page before the caller has had a chance to initialize it.
+        *
+        * Since no-one else can be looking at the page contents yet, there is no
+        * difference between an exclusive lock and a cleanup-strength lock. (Note
+        * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
+        * they assert that the buffer is already valid.)
+        */
+       if (!isLocalBuf)
+       {
+           LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+       }
+   }
+   else
+   {
+       instr_time  io_start,
+           io_time;
+
+       if (track_io_timing)
+           INSTR_TIME_SET_CURRENT(io_start);
+
+       smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
+
+       if (track_io_timing)
+       {
+           INSTR_TIME_SET_CURRENT(io_time);
+           INSTR_TIME_SUBTRACT(io_time, io_start);
+           pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
+           INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
+       }
+
+       /* check for garbage data */
+       if (!PageIsVerified((Page) bufBlock, blockNum))
+       {
+           if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+           {
+               ereport(WARNING,
+                       (errcode(ERRCODE_DATA_CORRUPTED),
+                        errmsg("invalid page in block %u of relation %s; zeroing out page",
+                               blockNum,
+                               relpath(smgr->smgr_rnode, forkNum))));
+               MemSet((char *) bufBlock, 0, BLCKSZ);
+           }
+           else
+               ereport(ERROR,
+                       (errcode(ERRCODE_DATA_CORRUPTED),
+                        errmsg("invalid page in block %u of relation %s",
+                               blockNum,
+                               relpath(smgr->smgr_rnode, forkNum))));
+       }
    }
 
    if (isLocalBuf)
@@ -1019,6 +1152,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
    bool        valid;
    uint32      buf_state;
 
+   Assert(blockNum != P_NEW);
+
    /* create a tag so we can lookup the buffer */
    INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
 
@@ -1819,6 +1954,8 @@ BufferSync(int flags)
                    CHECKPOINT_FLUSH_ALL))))
        mask |= BM_PERMANENT;
 
+   elog(DEBUG1, "checkpoint looking at buffers");
+
    /*
     * Loop over all buffers, and mark the ones that need to be written with
     * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
@@ -1874,6 +2011,8 @@ BufferSync(int flags)
 
    TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
+   elog(DEBUG1, "checkpoint predicts to write %u buffers", num_to_scan);
+
    /*
     * Sort buffers that need to be written to reduce the likelihood of random
     * IO. The sorting is also important for the implementation of balancing
@@ -1886,6 +2025,8 @@ BufferSync(int flags)
 
    num_spaces = 0;
 
+   elog(DEBUG1, "checkpoint done sorting");
+
    /*
     * Allocate progress status for each tablespace with buffers that need to
     * be flushed. This requires the to-be-flushed array to be sorted.
@@ -1971,6 +2112,9 @@ BufferSync(int flags)
 
    binaryheap_build(ts_heap);
 
+   elog(DEBUG1, "checkpoint done heaping");
+
+
    /*
     * Iterate through to-be-checkpointed buffers and write the ones (still)
     * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
@@ -4032,6 +4176,8 @@ WaitIO(BufferDesc *buf)
    {
        uint32      buf_state;
 
+       pgaio_drain_outstanding();
+
        /*
         * It may not be necessary to acquire the spinlock to check the flag
         * here, but since this test is essential for correctness, we'd better
@@ -4153,6 +4299,8 @@ AbortBufferIO(void)
 {
    BufferDesc *buf = InProgressBuf;
 
+   pgaio_at_abort();
+
    if (buf)
    {
        uint32      buf_state;
@@ -4509,6 +4657,9 @@ IssuePendingWritebacks(WritebackContext *context)
    }
 
    context->nr_pending = 0;
+
+   if (i > 0)
+       pgaio_submit_pending();
 }
 
 
index 6ffd7b330621cce9ecba2e33f6731022bb49f82d..3d617ed262e77a29848d5c3f14ef7908284dca8d 100644 (file)
@@ -117,6 +117,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
    bool        found;
    uint32      buf_state;
 
+   Assert(blockNum != P_NEW);
+
    INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
 
    /* Initialize local buffers if first request in this session */
index 7dc6dd2f1593167a05e368a7ca487707e620e5e8..cea6de9c1a4070a6b77bbd9c1ec5e5c135068cee 100644 (file)
@@ -92,6 +92,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "portability/mem.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
@@ -465,6 +466,13 @@ pg_flush_data(int fd, off_t offset, off_t nbytes)
     * We compile all alternatives that are supported on the current platform,
     * to find portability problems more easily.
     */
+#if USE_LIBURING
+   {
+       pgaio_start_flush_range(fd, offset, nbytes);
+
+       return;
+   }
+#endif
 #if defined(HAVE_SYNC_FILE_RANGE)
    {
        int         rc;
@@ -1010,7 +1018,11 @@ tryAgain:
    fd = open(fileName, fileFlags, fileMode);
 
    if (fd >= 0)
+   {
+       //elog(DEBUG1, "opening file %s fd %d", fileName, fd);
+
        return fd;              /* success! */
+   }
 
    if (errno == EMFILE || errno == ENFILE)
    {
@@ -1154,6 +1166,8 @@ LruDelete(File file)
 
    vfdP = &VfdCache[file];
 
+   pgaio_submit_pending();
+
    /*
     * Close the file.  We aren't expecting this to fail; if it does, better
     * to leak the FD than to mess up our internal state.
@@ -1834,6 +1848,8 @@ FileClose(File file)
 
    if (!FileIsNotOpen(file))
    {
+       pgaio_submit_pending();
+
        /* close the file */
        if (close(vfdP->fd) != 0)
        {
@@ -2023,6 +2039,30 @@ retry:
    return returnCode;
 }
 
+#include "storage/aio.h"
+
+struct PgAioInProgress *
+FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid)
+{
+   int         returnCode;
+   Vfd        *vfdP;
+
+   Assert(FileIsValid(file));
+
+   DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %d %p",
+              file, VfdCache[file].fileName,
+              (int64) offset,
+              amount, buffer));
+
+   returnCode = FileAccess(file);
+   if (returnCode < 0)
+       return NULL;
+
+   vfdP = &VfdCache[file];
+
+   return pgaio_start_buffer_read(vfdP->fd, offset, amount, buffer, bufid);
+}
+
 int
 FileWrite(File file, char *buffer, int amount, off_t offset,
          uint32 wait_event_info)
@@ -2494,6 +2534,7 @@ FreeDesc(AllocateDesc *desc)
            result = closedir(desc->desc.dir);
            break;
        case AllocateDescRawFD:
+           pgaio_submit_pending();
            result = close(desc->desc.fd);
            break;
        default:
@@ -2562,6 +2603,8 @@ CloseTransientFile(int fd)
    /* Only get here if someone passes us a file not in allocatedDescs */
    elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
 
+   pgaio_submit_pending();
+
    return close(fd);
 }
 
index df90c6b093fd7b826d34108946bf75e1640db2ae..b4f846e71feb0c19c9d550f81f80387835c52e5c 100644 (file)
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+   aio.o \
    barrier.o \
    dsm.o \
    dsm_impl.o \
@@ -27,4 +28,6 @@ OBJS = \
    sinvaladt.o \
    standby.o
 
+aio.o: override CPPFLAGS += $(LIBURING_CFLAGS)
+
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/aio.c b/src/backend/storage/ipc/aio.c
new file mode 100644 (file)
index 0000000..70f1bd5
--- /dev/null
@@ -0,0 +1,891 @@
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <liburing.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#include "lib/ilist.h"
+#include "miscadmin.h"
+#include "storage/aio.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+
+typedef enum PgAioAction
+{
+   /* intentionally the zero value, to help catch zeroed memory etc */
+   PGAIO_INVALID = 0,
+
+   PGAIO_NOP,
+   PGAIO_FLUSH_RANGE,
+   PGAIO_READ_BUFFER,
+   PGAIO_WRITE_BUFFER
+} PgAioAction;
+
+typedef enum PgAioInProgressFlags
+{
+   /* request in the ->unused list */
+   PGAIOIP_IDLE = 1 << 0,
+   /* request used, may also be in other states */
+   PGAIOIP_IN_USE = 1 << 1,
+   /* request in kernel */
+   PGAIOIP_INFLIGHT = 1 << 2,
+   /* completion callback was called */
+   PGAIOIP_DONE = 1 << 3,
+} PgAioInProgressFlags;
+
+/* IO completion callback */
+typedef void (*PgAioCompletedCB)(PgAioInProgress *io);
+
+struct PgAioInProgress
+{
+   dlist_node node;
+
+   ConditionVariable cv;
+
+   uint8 flags;
+
+   /* PgAioAction, indexes PgAioCompletionCallbacks */
+   PgAioAction type;
+
+   /* which AIO ring is this entry active for */
+   uint8 ring;
+
+   /* index into allProcs, or PG_UINT32_MAX for process local IO */
+   uint32 initiatorProcIndex;
+
+   /* the IOs result, depends on operation. E.g. the length of a read */
+   int32 result;
+
+   /*
+    * NB: Note that fds in here may *not* be relied upon for re-issuing
+    * requests (e.g. for partial reads/writes) - the fd might be from another
+    * process, or closed since. That's not a problem for IOs waiting to be
+    * issued only because the queue is flushed when closing an fd.
+    */
+   union {
+       struct
+       {
+           int fd;
+           off_t nbytes;
+           off_t offset;
+       } flush_range;
+
+       struct
+       {
+           Buffer buf;
+           int fd;
+           int already_done;
+           off_t offset;
+           struct iovec iovec;
+       } read_buffer;
+
+       struct
+       {
+           Buffer buf;
+           struct iovec iovec;
+       } write_buffer;
+   } d;
+};
+
+static void pgaio_put_io_locked(PgAioInProgress *io);
+static void pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe);
+static void pgaio_complete_ios(bool in_error);
+static void pgaio_backpressure(struct io_uring *ring, const char *loc);
+
+
+static void pgaio_complete_nop(PgAioInProgress *io);
+static void pgaio_complete_flush_range(PgAioInProgress *io);
+static void pgaio_complete_read_buffer(PgAioInProgress *io);
+static void pgaio_complete_write_buffer(PgAioInProgress *io);
+
+
+/*
+ * To support EXEC_BACKEND environments, where we cannot rely on callback
+ * addresses being equivalent across processes, completion actions are just
+ * indices into a process local array of callbacks, indexed by the type of
+ * action.  Also makes the shared memory entries a bit smaller, but that's not
+ * a huge win.
+ */
+static PgAioCompletedCB completion_callbacks[] =
+{
+   [PGAIO_NOP] = pgaio_complete_nop,
+   [PGAIO_FLUSH_RANGE] = pgaio_complete_flush_range,
+   [PGAIO_READ_BUFFER] = pgaio_complete_read_buffer,
+   [PGAIO_WRITE_BUFFER] = pgaio_complete_write_buffer,
+};
+
+typedef struct PgAioCtl
+{
+   /* FIXME: this needs to be partitioned */
+   /* PgAioInProgress that are not used */
+   dlist_head unused_ios;
+
+   /* FIXME: this should be per ring */
+   /*
+    * PgAioInProgress that are issued to the ringbuffer, and have not yet
+    * been processed (but they may have completed without us processing the
+    * completions).
+    */
+   dlist_head inflight;
+
+   /*
+    * FIXME: there should be multiple rings, at least one for data integrity
+    * writes, allowing efficient interleaving with WAL writes, and one for
+    * the rest. But likely a small number of non integrity rings too.
+    *
+    * It could also make sense to have a / a few rings for specific purposes
+    * like prefetching, and vacuum too. Configuring different depths could be
+    * a nice tool to manage maximum overall system impact (in particular by
+    * limiting queue/inflight operations size).
+    */
+   struct io_uring shared_ring;
+
+   /*
+    * Number of PgAioInProgressIOs that are in use. This includes pending
+    * requests, as well as requests actually issues to the queue.
+    */
+   int32 outstanding;
+
+   PgAioInProgress in_progress_io[FLEXIBLE_ARRAY_MEMBER];
+} PgAioCtl;
+
+
+/* (future) GUC controlling global MAX number of in-progress IO entries */
+extern int max_aio_in_progress;
+int max_aio_in_progress = 2048;
+
+/* global list of in-progress IO */
+static PgAioCtl *aio_ctl;
+
+/*
+ * Requests waiting to be issued to the kernel. They are submitted to the
+ * kernel in batches, for efficiency (including allowing for better queue
+ * processing).
+ */
+int num_local_pending_requests = 0;
+static dlist_head local_pending_requests;
+
+/*
+ * Requests completions received from the kernel. These are in global
+ * variables so we can continue processing, if a completion callback fails.
+ */
+#define MAX_LOCAL_REAPED 32
+int num_local_reaped = 0;
+struct io_uring_cqe *local_reaped_cqes[MAX_LOCAL_REAPED];
+PgAioInProgress *local_reaped_ios[MAX_LOCAL_REAPED];
+
+
+# ifndef __NR_io_uring_enter
+#  define __NR_io_uring_enter      426
+# endif
+
+static int
+__sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
+            unsigned flags, sigset_t *sig)
+{
+   return syscall(__NR_io_uring_enter, fd, to_submit, min_complete,
+           flags, sig, _NSIG / 8);
+}
+
+#define PGAIO_VERBOSE
+
+#define PGAIO_SUBMIT_BATCH_SIZE 16
+#define PGAIO_BACKPRESSURE_LIMIT 500
+
+
+Size
+AioShmemSize(void)
+{
+   return add_size(mul_size(max_aio_in_progress, sizeof(PgAioInProgress)),
+                   offsetof(PgAioCtl, in_progress_io));
+}
+
+void
+AioShmemInit(void)
+{
+   bool        found;
+
+   aio_ctl = (PgAioCtl *)
+       ShmemInitStruct("PgAio", AioShmemSize(), &found);
+
+   if (!found)
+   {
+       memset(aio_ctl, 0, AioShmemSize());
+
+       dlist_init(&aio_ctl->unused_ios);
+       dlist_init(&aio_ctl->inflight);
+
+       for (int i = 0; i < max_aio_in_progress; i++)
+       {
+           PgAioInProgress *io = &aio_ctl->in_progress_io[i];
+
+           ConditionVariableInit(&io->cv);
+           dlist_push_head(&aio_ctl->unused_ios,
+                           &io->node);
+           io->flags = PGAIOIP_IDLE;
+       }
+
+       {
+           int ret;
+
+           ret = io_uring_queue_init(512, &aio_ctl->shared_ring, 0);
+           if (ret < 0)
+               elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+       }
+
+   }
+}
+
+void
+pgaio_postmaster_init(void)
+{
+#if 0
+   int ret;
+
+   Assert(shared_ring.ring_fd == -1);
+
+   ret = io_uring_queue_init(512, &shared_ring, 0);
+   if (ret < 0)
+       elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+
+   //pgaio_test_fsync(1);
+#endif
+   dlist_init(&local_pending_requests);
+   num_local_pending_requests = 0;
+   num_local_reaped = 0;
+}
+
+void
+pgaio_postmaster_child_init(void)
+{
+   /* no locking needed here, only affects this process */
+   io_uring_ring_dontfork(&aio_ctl->shared_ring);
+   dlist_init(&local_pending_requests);
+   num_local_pending_requests = 0;
+   num_local_reaped = 0;
+}
+
+void
+pgaio_at_abort(void)
+{
+   if (num_local_reaped > 0)
+   {
+       elog(LOG, "at abort with %d pending", num_local_reaped);
+
+       pgaio_complete_ios(/* in_error = */ true);
+   }
+
+   pgaio_submit_pending();
+}
+
+static void
+pgaio_complete_cqes(struct io_uring *ring, PgAioInProgress **ios, struct io_uring_cqe **cqes, int ready)
+{
+   for (int i = 0; i < ready; i++)
+   {
+       struct io_uring_cqe *cqe = cqes[i];
+       PgAioInProgress *io;
+
+       ios[i] = io = io_uring_cqe_get_data(cqe);
+       Assert(ios[i] != NULL);
+       ios[i]->result = cqe->res;
+
+       Assert(io->flags & PGAIOIP_INFLIGHT);
+       Assert(io->flags & PGAIOIP_IN_USE);
+       Assert(!(io->flags == PGAIOIP_IDLE));
+       io->flags &= ~PGAIOIP_INFLIGHT;
+
+       if (cqe->res < 0)
+       {
+           elog(WARNING, "cqe: u: %p s: %d/%s f: %u",
+                io_uring_cqe_get_data(cqe),
+                cqe->res,
+                cqe->res < 0 ? strerror(-cqe->res) : "",
+                cqe->flags);
+       }
+
+       io_uring_cqe_seen(ring, cqe);
+
+       /* delete from PgAioCtl->inflight */
+       dlist_delete(&io->node);
+   }
+   //io_uring_cq_advance(ring, ready);
+}
+
+static void
+pgaio_complete_ios(bool in_error)
+{
+   Assert(!LWLockHeldByMe(SharedAIOLock));
+
+   /* call all callbacks, without holding lock */
+   for (int i = 0; i < num_local_reaped; i++)
+   {
+       PgAioInProgress *io = local_reaped_ios[i];
+
+       Assert(io->flags & PGAIOIP_IN_USE);
+
+       if (!(io->flags & PGAIOIP_DONE))
+       {
+           PgAioCompletedCB cb;
+
+           cb = completion_callbacks[io->type];
+           cb(io);
+
+           io->flags |= PGAIOIP_DONE;
+
+           /* signal state change */
+           ConditionVariableBroadcast(&io->cv);
+       }
+       else
+       {
+           Assert(in_error);
+       }
+   }
+
+   /* and recycle io entries */
+   START_CRIT_SECTION();
+   LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+   for (int i = 0; i < num_local_reaped; i++)
+       pgaio_put_io_locked(local_reaped_ios[i]);
+   num_local_reaped = 0;
+   LWLockRelease(SharedAIOLock);
+   END_CRIT_SECTION();
+}
+
+/*
+ * This checks if there are completions to be processed, before unlocking the
+ * ring.
+ */
+static int
+pgaio_drain_and_unlock(struct io_uring *ring)
+{
+   Assert(LWLockHeldByMe(SharedAIOLock));
+   Assert(num_local_reaped == 0);
+
+   START_CRIT_SECTION();
+
+   if (io_uring_cq_ready(ring))
+   {
+       num_local_reaped =
+           io_uring_peek_batch_cqe(ring,
+                                   local_reaped_cqes,
+                                   MAX_LOCAL_REAPED);
+
+       pgaio_complete_cqes(ring,
+                           local_reaped_ios,
+                           local_reaped_cqes,
+                           num_local_reaped);
+   }
+
+   LWLockRelease(SharedAIOLock);
+
+   END_CRIT_SECTION();
+
+   if (num_local_reaped > 0)
+       pgaio_complete_ios(false);
+
+   return num_local_reaped;
+}
+
+static void
+pgaio_drain(struct io_uring *ring, bool already_locked)
+{
+   bool lock_held = already_locked;
+
+   while (true)
+   {
+       uint32 ready = io_uring_cq_ready(ring);
+       uint32 processed;
+
+       if (ready == 0)
+           break;
+
+       if (!lock_held)
+       {
+           LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+           lock_held = true;
+       }
+
+       processed = pgaio_drain_and_unlock(ring);
+       lock_held = false;
+
+       if (processed >= ready)
+           break;
+   }
+
+   if (already_locked && !lock_held)
+       LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+}
+
+void
+pgaio_drain_shared(void)
+{
+   pgaio_drain(&aio_ctl->shared_ring, false);
+}
+
+void
+pgaio_drain_outstanding(void)
+{
+   int outstanding;
+
+   Assert(!LWLockHeldByMe(SharedAIOLock));
+
+   pgaio_submit_pending();
+   pgaio_drain(&aio_ctl->shared_ring, false);
+
+   outstanding = aio_ctl->outstanding;
+
+   while (outstanding > 0)
+   {
+#if 0
+       int ret = __sys_io_uring_enter(aio_ctl->shared_ring.ring_fd,
+                                      0, 1,
+                                      IORING_ENTER_GETEVENTS, NULL);
+       if (ret != 0 && errno != EINTR)
+           elog(WARNING, "unexpected: %d/%s", ret, strerror(-ret));
+
+       pgaio_drain(&aio_ctl->shared_ring, false);
+
+#else
+       LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+       outstanding = aio_ctl->outstanding;
+
+       if (!dlist_is_empty(&aio_ctl->inflight) &&
+           io_uring_cq_ready(&aio_ctl->shared_ring) == 0)
+       {
+           int ret = __sys_io_uring_enter(aio_ctl->shared_ring.ring_fd,
+                                          0, 1,
+                                          IORING_ENTER_GETEVENTS, NULL);
+           if (ret != 0 && errno != EINTR)
+               elog(WARNING, "unexpected: %d/%s: %m", ret, strerror(-ret));
+
+           pgaio_drain(&aio_ctl->shared_ring, true);
+       }
+
+       if (io_uring_cq_ready(&aio_ctl->shared_ring))
+           pgaio_drain(&aio_ctl->shared_ring, true);
+
+       LWLockRelease(SharedAIOLock);
+#endif
+   }
+}
+
+void
+pgaio_submit_pending(void)
+{
+   PgAioInProgress *ios[PGAIO_SUBMIT_BATCH_SIZE];
+   struct io_uring_sqe *sqe[PGAIO_SUBMIT_BATCH_SIZE];
+   int total_submitted = 0;
+
+   /* FIXME: */
+   if (!aio_ctl || dlist_is_empty(&local_pending_requests))
+   {
+       Assert(num_local_pending_requests == 0);
+       return;
+   }
+
+   while (!dlist_is_empty(&local_pending_requests))
+   {
+       int nios = 0;
+       int nsubmit = Min(num_local_pending_requests, PGAIO_SUBMIT_BATCH_SIZE);
+
+       Assert(nsubmit != 0 && nsubmit <= num_local_pending_requests);
+       LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+       for (int i = 0; i < nsubmit; i++)
+       {
+           dlist_node *node;
+
+           sqe[nios] = io_uring_get_sqe(&aio_ctl->shared_ring);
+
+           if (!sqe[nios])
+               break;
+
+           node = dlist_pop_head_node(&local_pending_requests);
+           ios[nios] = dlist_container(PgAioInProgress, node,node);
+
+           pgaio_sq_from_io(ios[nios], sqe[nios]);
+           Assert(ios[nios]->flags & PGAIOIP_IN_USE);
+           ios[nios]->flags |= PGAIOIP_INFLIGHT;
+
+           dlist_push_head(&aio_ctl->inflight,
+                           &ios[nios]->node);
+
+           nios++;
+           num_local_pending_requests--;
+           total_submitted++;
+       }
+
+       if (nios > 0)
+       {
+           int ret = io_uring_submit(&aio_ctl->shared_ring);
+
+   again:
+           if (ret == -EINTR)
+               goto again;
+
+           if (ret < 0)
+               elog(PANIC, "failed: %d/%s",
+                    ret, strerror(-ret));
+       }
+
+       /* while still holding the lock, extract all CQs we can */
+       pgaio_drain_and_unlock(&aio_ctl->shared_ring);
+   }
+
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "submitted %d", total_submitted);
+#endif
+
+   pgaio_backpressure(&aio_ctl->shared_ring, "submit_pending");
+}
+
+static void
+pgaio_backpressure(struct io_uring *ring, const char *loc)
+{
+   if (aio_ctl->outstanding > PGAIO_BACKPRESSURE_LIMIT)
+   {
+       for (int i = 0; i < 1; i++)
+       {
+           int ret;
+           int waitfor;
+           int cqr_before;
+           int cqr_after;
+
+           /*
+            * FIXME: This code likely has a race condition, where the queue
+            * might be emptied after our check, but before we wait for events
+            * to be completed.  Using a lock around this could fix that, but
+            * we dont want that (both because others should be able to add
+            * requests, and because we'd rather have the kernel wake everyone
+            * up, if there's some readiness - it's quite likely multiple
+            * backends may wait for the same IO).
+            *
+            * Possible fix: While holding lock, register for CV for one of
+            * the inflight requests. Then, using appropriate sigmask'ery,
+            * wait until either that request is processed by somebody else,
+            * or a new completion is ready. The latter is much more likely.
+            */
+           cqr_before = io_uring_cq_ready(ring);
+
+           waitfor = 1;
+           ret = __sys_io_uring_enter(ring->ring_fd,
+                                      0, waitfor,
+                                      IORING_ENTER_GETEVENTS, NULL);
+           if (ret < 0 && errno != EINTR)
+               elog(WARNING, "enter failed: %d/%s", ret, strerror(-ret));
+
+           cqr_after = io_uring_cq_ready(ring);
+#if 1
+           elog(DEBUG1, "nonlock at %s for depth %d waited for %d got %d "
+                "cqr before %d after %d "
+                "space left: %d, sq ready: %d",
+                loc,
+                aio_ctl->outstanding, waitfor, ret, cqr_before,
+                io_uring_cq_ready(ring),
+                io_uring_sq_space_left(ring),
+                io_uring_sq_ready(ring));
+#endif
+           if (cqr_after)
+               pgaio_drain(ring, false);
+       }
+   }
+
+   if (aio_ctl->outstanding > 1024)
+   {
+       elog(WARNING, "something's up: %d outstanding! cq ready: %u sq space left: %d, sq ready: %d",
+            aio_ctl->outstanding,
+            io_uring_cq_ready(ring),
+            io_uring_sq_space_left(ring),
+            io_uring_sq_ready(ring));
+   }
+}
+
+static PgAioInProgress*
+pgaio_start_get_io(PgAioAction action)
+{
+   dlist_node *elem;
+   PgAioInProgress *io;
+
+   Assert(!LWLockHeldByMe(SharedAIOLock));
+   Assert(num_local_pending_requests < PGAIO_SUBMIT_BATCH_SIZE);
+
+   /* FIXME: wait for an IO to complete if full */
+
+   LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+   while (dlist_is_empty(&aio_ctl->unused_ios))
+   {
+       pgaio_drain(&aio_ctl->shared_ring, true);
+       Assert(LWLockHeldByMe(SharedAIOLock));
+   }
+
+   START_CRIT_SECTION();
+
+   elem = dlist_pop_head_node(&aio_ctl->unused_ios);
+
+   io = dlist_container(PgAioInProgress, node, elem);
+
+   Assert(!(io->flags & PGAIOIP_IN_USE));
+   Assert(io->flags == PGAIOIP_IDLE);
+
+   io->flags &= ~PGAIOIP_IDLE;
+   io->flags |= PGAIOIP_IN_USE;
+   io->type = action;
+   io->initiatorProcIndex = MyProc->pgprocno;
+
+   aio_ctl->outstanding++;
+
+   dlist_push_tail(&local_pending_requests,
+                   &io->node);
+   num_local_pending_requests++;
+
+   END_CRIT_SECTION();
+
+   return io;
+}
+
+static void
+pgaio_end_get_io(void)
+{
+   Assert(LWLockHeldByMe(SharedAIOLock));
+
+   pgaio_drain_and_unlock(&aio_ctl->shared_ring);
+
+   if (num_local_pending_requests >= PGAIO_SUBMIT_BATCH_SIZE)
+       pgaio_submit_pending();
+   else
+       pgaio_backpressure(&aio_ctl->shared_ring, "get_io");
+}
+
+static void
+pgaio_put_io_locked(PgAioInProgress *io)
+{
+   Assert(LWLockHeldByMe(SharedAIOLock));
+   Assert(io->flags & PGAIOIP_DONE);
+
+   io->flags &= ~(PGAIOIP_IN_USE|PGAIOIP_DONE);
+   io->flags |= PGAIOIP_IDLE;
+   io->type = 0;
+   io->initiatorProcIndex = INVALID_PGPROCNO;
+
+   aio_ctl->outstanding--;
+   dlist_push_head(&aio_ctl->unused_ios,
+                   &io->node);
+}
+
+static void
+pgaio_complete_nop(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "completed nop");
+#endif
+}
+
+static void
+pgaio_complete_flush_range(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "completed flush_range: %zu, %s",
+        io - aio_ctl->in_progress_io,
+        io->result < 0 ? strerror(-io->result) : "ok");
+#endif
+}
+
+static void
+pgaio_complete_read_buffer(PgAioInProgress *io)
+{
+   BufferDesc *bufHdr = GetBufferDescriptor(io->d.read_buffer.buf - 1);
+   uint32      buf_state;
+   bool        failed = false;
+   RelFileNode rnode = bufHdr->tag.rnode;
+   BlockNumber forkNum = bufHdr->tag.forkNum;
+   BlockNumber blockNum = bufHdr->tag.blockNum;
+
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "completed read_buffer: %zu, %d/%s, buf %d",
+        io - aio_ctl->in_progress_io,
+        io->result,
+        io->result < 0 ? strerror(-io->result) : "ok",
+        io->d.read_buffer.buf);
+#endif
+
+   if (io->result < 0)
+   {
+       if (io->result == EAGAIN || io->result == EINTR)
+       {
+           elog(WARNING, "need to implement retries");
+       }
+
+       failed = true;
+       ereport(WARNING,
+               (errcode_for_file_access(),
+                errmsg("could not read block %u in file \"%s\": %s",
+                       blockNum,
+                       relpathperm(rnode, forkNum),
+                       strerror(-io->result))));
+   }
+   else if (io->result != BLCKSZ)
+   {
+       failed = true;
+       ereport(WARNING,
+               (errcode(ERRCODE_DATA_CORRUPTED),
+                errmsg("could not read block %u in file \"%s\": read only %d of %d bytes",
+                       blockNum,
+                       relpathperm(rnode, forkNum),
+                       io->result, BLCKSZ)));
+   }
+
+   /* FIXME: needs to be in bufmgr.c */
+   {
+       Block       bufBlock;
+       RelFileNode rnode = bufHdr->tag.rnode;
+       BlockNumber forkNum = bufHdr->tag.forkNum;
+       BlockNumber blockNum = bufHdr->tag.blockNum;
+
+       bufBlock = BufferGetBlock(io->d.read_buffer.buf);
+
+       /* check for garbage data */
+       if (!PageIsVerified((Page) bufBlock, blockNum))
+       {
+           failed = true;
+           ereport(ERROR,
+                   (errcode(ERRCODE_DATA_CORRUPTED),
+                    errmsg("invalid page in block %u of relation %s",
+                           blockNum,
+                           relpathperm(rnode, forkNum))));
+       }
+   }
+
+   buf_state = LockBufHdr(bufHdr);
+
+   Assert(buf_state & BM_IO_IN_PROGRESS);
+
+   buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
+   if (failed)
+       buf_state |= BM_IO_ERROR;
+   else
+       buf_state |= BM_VALID;
+
+   buf_state -= BUF_REFCOUNT_ONE;
+
+   UnlockBufHdr(bufHdr, buf_state);
+
+   ConditionVariableBroadcast(BufferDescriptorGetIOCV(bufHdr));
+}
+
+static void
+pgaio_complete_write_buffer(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "completed write_buffer");
+#endif
+}
+
+static void
+pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe)
+{
+   switch (io->type)
+   {
+       case PGAIO_READ_BUFFER:
+           io_uring_prep_readv(sqe,
+                               io->d.read_buffer.fd,
+                               &io->d.read_buffer.iovec,
+                               1,
+                               io->d.read_buffer.offset);
+           break;
+       case PGAIO_FLUSH_RANGE:
+           io_uring_prep_rw(IORING_OP_SYNC_FILE_RANGE,
+                            sqe,
+                            io->d.flush_range.fd,
+                            NULL,
+                            io->d.flush_range.nbytes,
+                            io->d.flush_range.offset);
+           sqe->sync_range_flags = SYNC_FILE_RANGE_WRITE;
+           break;
+       case PGAIO_WRITE_BUFFER:
+       case PGAIO_NOP:
+           elog(ERROR, "not yet");
+           break;
+       case PGAIO_INVALID:
+           elog(ERROR, "invalid");
+   }
+
+   io_uring_sqe_set_data(sqe, io);
+}
+
+PgAioInProgress *
+pgaio_start_flush_range(int fd, off_t offset, off_t nbytes)
+{
+   PgAioInProgress *io;
+   //struct io_uring_sqe *sqe;
+
+   io = pgaio_start_get_io(PGAIO_FLUSH_RANGE);
+
+   io->d.flush_range.fd = fd;
+   io->d.flush_range.offset = offset;
+   io->d.flush_range.nbytes = nbytes;
+
+   pgaio_end_get_io();
+
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "start_flush_range %zu: %d, %llu, %llu",
+        io - aio_ctl->in_progress_io,
+        fd, (unsigned long long) offset, (unsigned long long) nbytes);
+#endif
+
+   return io;
+}
+
+
+PgAioInProgress *
+pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, char* data, int buffno)
+{
+   PgAioInProgress *io;
+   //struct io_uring_sqe *sqe;
+
+   io = pgaio_start_get_io(PGAIO_READ_BUFFER);
+
+   io->d.read_buffer.buf = buffno;
+   io->d.read_buffer.fd = fd;
+   io->d.read_buffer.offset = offset;
+   io->d.read_buffer.already_done = 0;
+   io->d.read_buffer.iovec.iov_base = data;
+   io->d.read_buffer.iovec.iov_len = nbytes;
+
+   pgaio_end_get_io();
+
+#ifdef PGAIO_VERBOSE
+   elog(DEBUG1, "start_buffer_read %zu:"
+        "fd %d, off: %llu, bytes: %llu, buff: %d, data %p",
+        io - aio_ctl->in_progress_io,
+        fd,
+        (unsigned long long) offset,
+        (unsigned long long) nbytes,
+        buffno,
+        data);
+#endif
+
+   return io;
+}
+
+PgAioInProgress *
+pgaio_start_nop(void)
+{
+   PgAioInProgress *io;
+
+   io = pgaio_start_get_io(PGAIO_NOP);
+   pgaio_end_get_io();
+
+   return io;
+}
index bdbc2c3ac4bc06c7f55bba340200d568cad56e83..e76f3506b4817ee7d77bee4b1661b00a0be87f02 100644 (file)
@@ -27,6 +27,7 @@
 #ifdef PROFILE_PID_DIR
 #include "postmaster/autovacuum.h"
 #endif
+#include "storage/aio.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
 #include "tcop/tcopprot.h"
index 427b0d59cde2cc02e2aa48ff45186f858cfe038b..3c3cf5e568ce78bd51f74a79e7bf1499bce46464 100644 (file)
@@ -33,6 +33,7 @@
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
        size = add_size(size, BTreeShmemSize());
        size = add_size(size, SyncScanShmemSize());
        size = add_size(size, AsyncShmemSize());
+       size = add_size(size, AioShmemSize());
 #ifdef EXEC_BACKEND
        size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,8 @@ CreateSharedMemoryAndSemaphores(void)
    SyncScanShmemInit();
    AsyncShmemInit();
 
+   AioShmemInit();
+
 #ifdef EXEC_BACKEND
 
    /*
index e6985e8eedfb1dee824b24ad83fd26b401970640..06efb268648026ab6b140b955d9092572368a578 100644 (file)
@@ -50,3 +50,4 @@ MultiXactTruncationLock               41
 OldSnapshotTimeMapLock             42
 LogicalRepWorkerLock               43
 XactTruncationLock                 44
+SharedAIOLock                      45
index 0eacd461cd382d57ae437f73bd77fb352e129ff1..d4e975843606b18555d6e2abb424cb0b8ad82bf7 100644 (file)
@@ -32,6 +32,7 @@
 #include "pg_trace.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/md.h"
@@ -117,6 +118,7 @@ static MemoryContext MdCxt;     /* context for all MdfdVec objects */
  */
 #define EXTENSION_DONT_CHECK_SIZE  (1 << 4)
 
+#define MD_OPEN_FLAGS O_RDWR | PG_BINARY | O_DIRECT
 
 /* local routines */
 static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum,
@@ -201,14 +203,14 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
 
    path = relpath(reln->smgr_rnode, forkNum);
 
-   fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
+   fd = PathNameOpenFile(path, MD_OPEN_FLAGS | O_CREAT | O_EXCL);
 
    if (fd < 0)
    {
        int         save_errno = errno;
 
        if (isRedo)
-           fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+           fd = PathNameOpenFile(path, MD_OPEN_FLAGS);
        if (fd < 0)
        {
            /* be sure to report the error reported by create, not open */
@@ -315,7 +317,7 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
        /* truncate(2) would be easier here, but Windows hasn't got it */
        int         fd;
 
-       fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
+       fd = OpenTransientFile(path, MD_OPEN_FLAGS);
        if (fd >= 0)
        {
            int         save_errno;
@@ -460,7 +462,7 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior)
 
    path = relpath(reln->smgr_rnode, forknum);
 
-   fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+   fd = PathNameOpenFile(path, MD_OPEN_FLAGS);
 
    if (fd < 0)
    {
@@ -659,6 +661,26 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    }
 }
 
+/*
+ * mdread() -- Read the specified block from a relation.
+ */
+PgAioInProgress *
+mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+           char *buffer, int bufno)
+{
+   off_t       seekpos;
+   MdfdVec    *v;
+
+   v = _mdfd_getseg(reln, forknum, blocknum, false,
+                    EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+   seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+   Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+   return FileStartRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, bufno);
+}
+
 /*
  * mdwrite() -- Write the supplied block at the appropriate location.
  *
@@ -1120,7 +1142,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno,
    fullpath = _mdfd_segpath(reln, forknum, segno);
 
    /* open the file */
-   fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags);
+   fd = PathNameOpenFile(fullpath, MD_OPEN_FLAGS | oflags);
 
    pfree(fullpath);
 
@@ -1324,7 +1346,7 @@ mdsyncfiletag(const FileTag *ftag, char *path)
        strlcpy(path, p, MAXPGPATH);
        pfree(p);
 
-       file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+       file = PathNameOpenFile(path, MD_OPEN_FLAGS);
        if (file < 0)
            return -1;
        need_to_close = true;
index 7d667c6586f1fff05e6afa465432951053c53072..e2d0ab311f7300ca53e671a00e21d482e9ffe7e5 100644 (file)
@@ -53,6 +53,9 @@ typedef struct f_smgr
                                  BlockNumber blocknum);
    void        (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
                              BlockNumber blocknum, char *buffer);
+   struct PgAioInProgress* (*smgr_startread) (SMgrRelation reln, ForkNumber forknum,
+                                            BlockNumber blocknum, char *buffer,
+                                            int bufno);
    void        (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
                               BlockNumber blocknum, char *buffer, bool skipFsync);
    void        (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
@@ -76,6 +79,7 @@ static const f_smgr smgrsw[] = {
        .smgr_extend = mdextend,
        .smgr_prefetch = mdprefetch,
        .smgr_read = mdread,
+       .smgr_startread = mdstartread,
        .smgr_write = mdwrite,
        .smgr_writeback = mdwriteback,
        .smgr_nblocks = mdnblocks,
@@ -494,6 +498,13 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer);
 }
 
+struct PgAioInProgress*
+smgrstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+             char *buffer, int bufno)
+{
+   return smgrsw[reln->smgr_which].smgr_startread(reln, forknum, blocknum, buffer, bufno);
+}
+
 /*
  * smgrwrite() -- Write the supplied buffer out.
  *
index c9424f167c8d9c40b244d695020a0716e4a08c2f..5d1ea2d8c1e51565e97eb9b6fcce7798dc6876b3 100644 (file)
@@ -65,6 +65,7 @@
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
@@ -3902,6 +3903,9 @@ PostgresMain(int argc, char *argv[],
 
        /* Initialize MaxBackends (if under postmaster, was done already) */
        InitializeMaxBackends();
+
+       /* AIO is needed during InitPostgres() */
+       pgaio_postmaster_init();
    }
 
    /* Early initialization */
index cca9704d2d7afb2c1012c2a107a899d09033ed0f..8cf70e970a66892f0d783f2a288804df7e04ac47 100644 (file)
@@ -37,6 +37,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
@@ -121,6 +122,8 @@ InitPostmasterChild(void)
    MyLatch = &LocalLatchData;
    InitLatch(MyLatch);
 
+   pgaio_postmaster_child_init();
+
    /*
     * If possible, make this process a group leader, so that the postmaster
     * can signal any child processes too. Not all processes will have
diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
new file mode 100644 (file)
index 0000000..26e17e4
--- /dev/null
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio.h
+ *   aio
+ *
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/aio.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef AIO_H
+#define AIO_H
+
+#include "common/relpath.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/relfilenode.h"
+
+typedef struct PgAioInProgress PgAioInProgress;
+
+/* initialization */
+extern void pgaio_postmaster_init(void);
+extern Size AioShmemSize(void);
+extern void AioShmemInit(void);
+extern void pgaio_postmaster_child_init(void);
+
+extern void pgaio_at_abort(void);
+
+
+extern PgAioInProgress *pgaio_start_flush_range(int fd, off_t offset, off_t nbytes);
+extern PgAioInProgress *pgaio_start_nop(void);
+extern PgAioInProgress *pgaio_start_fsync(int fd);
+
+struct BufferDesc;
+extern PgAioInProgress *pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes,
+                                               char *data, int buffno);
+
+extern void pgaio_submit_pending(void);
+
+extern void pgaio_drain_shared(void);
+extern void pgaio_drain_outstanding(void);
+
+#endif                         /* AIO_H */
index 7f03f39c69250b803e5684b123034bf17a5ab3ed..0bf11cb9e5a75b35e227fabeae498c990b098e09 100644 (file)
@@ -185,7 +185,7 @@ typedef struct BufferDesc
 
    int         wait_backend_pid;   /* backend PID of pin-count waiter */
    int         freeNext;       /* link in freelist chain */
-
+   uint32      io_in_progress;
    LWLock      content_lock;   /* to lock access to buffer contents */
 } BufferDesc;
 
index ee91b8fa26c178a35a7d220de8f32393f6753ce5..4a3047390233d261fb1f593a983ee2e656152605 100644 (file)
@@ -183,6 +183,10 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
                                        ForkNumber forkNum, BlockNumber blockNum,
                                        ReadBufferMode mode, BufferAccessStrategy strategy);
+struct PgAioInProgress;
+extern struct PgAioInProgress* ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
+                                              ReadBufferMode mode, BufferAccessStrategy strategy,
+                                              Buffer *buf);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
index 8cd125d7dfaa65b4e74138fed9ac1ccefc9b9ecd..65ac1e5a4d761f30d2a0efef4775183694a027b0 100644 (file)
@@ -82,6 +82,7 @@ extern File OpenTemporaryFile(bool interXact);
 extern void FileClose(File file);
 extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info);
 extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
+extern struct PgAioInProgress *FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid);
 extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
 extern int FileSync(File file, uint32 wait_event_info);
 extern off_t FileSize(File file);
index 07fd1bb7d06c8045962907aeeef6ba44567cd6de..2ed124a62bfa91de0157e19f67769c38d78dd1e8 100644 (file)
@@ -32,6 +32,8 @@ extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
                       BlockNumber blocknum);
 extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
                   char *buffer);
+extern struct PgAioInProgress *mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+                                          char *buffer, int bufno);
 extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
                    BlockNumber blocknum, char *buffer, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
index 656665959319ee98dc3a6d9e35195f48ef6817c1..f2d94a91e446d7a59e89e80c10dcdbf60a49b72d 100644 (file)
@@ -96,6 +96,10 @@ extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
                         BlockNumber blocknum);
 extern void smgrread(SMgrRelation reln, ForkNumber forknum,
                     BlockNumber blocknum, char *buffer);
+struct PgAioInProgress;
+extern struct PgAioInProgress* smgrstartread(SMgrRelation reln, ForkNumber forknum,
+                                            BlockNumber blocknum, char *buffer,
+                                            int bufno);
 extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
                      BlockNumber blocknum, char *buffer, bool skipFsync);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,