initial (crappy) prototype of bulk extension.
authorAndres Freund <andres@anarazel.de>
Mon, 25 May 2020 10:02:56 +0000 (03:02 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:58 +0000 (16:20 -0700)
Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:

src/backend/access/heap/hio.c
src/backend/access/heap/vacuumlazy.c
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/include/storage/md.h
src/include/storage/smgr.h

index aa3f14c019c44baa8fbe1590b0fea17002cb8924..bb288b9cdb2164a7605ec960f78ad90ccd744c89 100644 (file)
@@ -24,6 +24,7 @@
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 
+#define USE_NEW_EXTEND
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -118,6 +119,116 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
    return buffer;
 }
 
+#ifdef USE_NEW_EXTEND
+
+#include "storage/aio.h"
+
+static Buffer
+ExtendRelation(Relation relation, BulkInsertState bistate, bool use_fsm)
+{
+   bool        needLock;
+   Buffer      buffer;
+   //pg_streaming_write *pgsw;
+   BlockNumber newblockno;
+   Page        page;
+
+   /*
+    * Have to extend the relation.
+    *
+    * We have to use a lock to ensure no one else is extending the rel at the
+    * same time, else we will both try to initialize the same new page.  We
+    * can skip locking for new or temp relations, however, since no one else
+    * could be accessing them.
+    */
+   needLock = !RELATION_IS_LOCAL(relation);
+
+   /*
+    * If we need the lock but are not able to acquire it immediately, we'll
+    * consider extending the relation by multiple blocks at a time to manage
+    * contention on the relation extension lock.  However, this only makes
+    * sense if we're using the FSM; otherwise, there's no point.
+    */
+   if (needLock)
+       LockRelationForExtension(relation, ExclusiveLock);
+
+   {
+       BlockNumber nblocks;
+       int extendby;
+
+       RelationOpenSmgr(relation);
+       nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
+
+       newblockno = nblocks;
+       extendby = Max(Min(nblocks / 16 * BLCKSZ, (16 * 1024 * 1024)) / BLCKSZ, 1);
+
+       /*
+        * FIXME: Bulk extending without fsm would lead to those pages not
+        * being used in subsequent inserts.
+        */
+       if (!use_fsm && !bistate)
+           extendby = 1;
+
+#if 0
+       ereport(LOG, errmsg("extending from %u to %u by %d blocks",
+                           newblockno, newblockno + extendby, extendby),
+               errhidestmt(true),
+               errhidecontext(true));
+#endif
+
+       buffer = ReadBufferBI(relation, newblockno, RBM_ZERO_AND_LOCK, bistate);
+
+       smgrzeroextend(relation->rd_smgr, MAIN_FORKNUM, newblockno,
+                      extendby, false);
+
+
+       /*
+        * We need to initialize the empty new page.  Double-check that it really
+        * is empty (this should never happen, but if it does we don't want to
+        * risk wiping out valid data).
+        */
+       page = BufferGetPage(buffer);
+
+       if (!PageIsNew(page))
+           elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+                BufferGetBlockNumber(buffer),
+                RelationGetRelationName(relation));
+
+       PageInit(page, BufferGetPageSize(buffer), 0);
+       MarkBufferDirty(buffer);
+
+       for (int i = newblockno + 1; i < newblockno + extendby; i++)
+       {
+           Buffer      tbuf;
+
+           tbuf = ReadBufferBI(relation, i, RBM_ZERO_AND_LOCK, NULL);
+
+           RecordPageWithFreeSpace(relation, i, BLCKSZ - SizeOfPageHeaderData);
+           UnlockReleaseBuffer(tbuf);
+       }
+
+       /*
+        * Updating the upper levels of the free space map is too expensive to do
+        * for every block, but it's worth doing once at the end to make sure that
+        * subsequent insertion activity sees all of those nifty free pages we
+        * just inserted.
+        */
+       if (use_fsm && extendby > 1)
+           FreeSpaceMapVacuumRange(relation, newblockno, newblockno + extendby);
+   }
+
+
+   /*
+    * Release the file-extension lock; it's now OK for someone else to extend
+    * the relation some more.
+    */
+   if (needLock)
+       UnlockRelationForExtension(relation, ExclusiveLock);
+
+   return buffer;
+}
+
+#endif
+
 /*
  * For each heap page which is all-visible, acquire a pin on the appropriate
  * visibility map page, if we haven't already got one.
@@ -176,6 +287,7 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
    }
 }
 
+#ifndef USE_NEW_EXTEND
 /*
  * Extend a relation by multiple blocks to avoid future contention on the
  * relation extension lock.  Our goal is to pre-extend the relation by an
@@ -259,6 +371,7 @@ RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
     */
    FreeSpaceMapVacuumRange(relation, firstBlock, blockNum + 1);
 }
+#endif
 
 /*
  * RelationGetBufferForTuple
@@ -329,7 +442,9 @@ RelationGetBufferForTuple(Relation relation, Size len,
                saveFreeSpace = 0;
    BlockNumber targetBlock,
                otherBlock;
+#ifndef USE_NEW_EXTEND
    bool        needLock;
+#endif
 
    len = MAXALIGN(len);        /* be conservative */
 
@@ -522,6 +637,23 @@ loop:
            ReleaseBuffer(buffer);
        }
 
+       /*
+        * FIXME: definitely needs a better solution.
+        */
+       if (!use_fsm && bistate && bistate->current_buf != InvalidBuffer)
+       {
+           BlockNumber blocknum = BufferGetBlockNumber(bistate->current_buf) + 1;
+
+           RelationOpenSmgr(relation);
+
+           if (blocknum < smgrnblocks(relation->rd_smgr, MAIN_FORKNUM))
+           {
+               targetBlock = blocknum;
+
+               goto loop;
+           }
+       }
+
        /* Without FSM, always fall out of the loop and extend */
        if (!use_fsm)
            break;
@@ -536,6 +668,12 @@ loop:
                                                    len + saveFreeSpace);
    }
 
+#ifdef USE_NEW_EXTEND
+   buffer = ExtendRelation(relation, bistate, use_fsm);
+
+   page = BufferGetPage(buffer);
+
+#else /* !USE_NEW_EXTEND */
    /*
     * Have to extend the relation.
     *
@@ -614,6 +752,7 @@ loop:
     */
    if (needLock)
        UnlockRelationForExtension(relation, ExclusiveLock);
+#endif /* !USE_NEW_EXTEND */
 
    /*
     * Lock the other buffer. It's guaranteed to be of a lower page number
index 3bef0e124bac2f524aab44697586f4ec28b62615..8ac1afdc303aaf6a1a2f9fc3dbb833deb6f95bee 100644 (file)
@@ -88,7 +88,7 @@
  * REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
  * is less) potentially-freeable pages.
  */
-#define REL_TRUNCATE_MINIMUM   1000
+#define REL_TRUNCATE_MINIMUM   2000
 #define REL_TRUNCATE_FRACTION  16
 
 /*
index 5701ff6b79e2d823ace965824bb9951db89036c7..6ad609bcf93b307ced08647c720fb19624026524 100644 (file)
@@ -441,6 +441,120 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
 }
 
+static void
+zeroextend_complete(void *pgsw_private, void *write_private)
+{
+   BlockNumber *latest = (BlockNumber *) write_private;
+}
+
+BlockNumber
+mdzeroextend(SMgrRelation reln, ForkNumber forknum,
+            BlockNumber blocknum, int nblocks, bool skipFsync)
+{
+   int         nbytes;
+   MdfdVec    *v;
+   char       *zerobuf = palloc_io_aligned(BLCKSZ, MCXT_ALLOC_ZERO);
+   pg_streaming_write *pgsw ;
+   BlockNumber latest;
+   BlockNumber curblocknum = blocknum;
+
+   Assert(nblocks > 0);
+
+   pgsw = pg_streaming_write_alloc(Min(32, nblocks), &latest, zeroextend_complete);
+
+   /* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+   Assert(blocknum >= mdnblocks(reln, forknum));
+#endif
+
+   /*
+    * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+    * more --- we mustn't create a block whose number actually is
+    * InvalidBlockNumber.
+    */
+   // FIXME
+#if 0
+   if (blocknum == InvalidBlockNumber)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                errmsg("cannot extend file \"%s\" beyond %u blocks",
+                       relpath(reln->smgr_rnode, forknum),
+                       InvalidBlockNumber)));
+#endif
+
+   while (nblocks > 0)
+   {
+       int fd;
+       int ret;
+       int segstartblock = curblocknum % ((BlockNumber) RELSEG_SIZE);
+       int segendblock = (curblocknum % ((BlockNumber) RELSEG_SIZE)) + nblocks;
+       off_t       seekpos;
+
+       if (segendblock > RELSEG_SIZE)
+           segendblock = RELSEG_SIZE;
+
+       v = _mdfd_getseg(reln, forknum, curblocknum, skipFsync, EXTENSION_CREATE);
+
+       Assert(segstartblock < RELSEG_SIZE);
+       Assert(segendblock <= RELSEG_SIZE);
+
+       seekpos = (off_t) BLCKSZ * segstartblock;
+
+       fd = FileGetRawDesc(v->mdfd_vfd);
+       ret = posix_fallocate(fd,
+                             seekpos,
+                             (off_t) BLCKSZ * (segendblock - segstartblock));
+
+       if (ret != 0)
+           elog(ERROR, "fallocate failed: %m");
+
+       for (BlockNumber i = segstartblock; i < segendblock; i++)
+       {
+           PgAioInProgress *aio = pg_streaming_write_get_io(pgsw);
+           AioBufferTag tag = {.rnode = reln->smgr_rnode, .forkNum = forknum, .blockNum = i};
+
+           FileStartWrite(aio, v->mdfd_vfd, zerobuf, BLCKSZ, i * BLCKSZ, &tag, InvalidBuffer);
+
+           pg_streaming_write_write(pgsw, aio, (void*) &i);
+#if 0
+           if ((nbytes = FileWrite(v->mdfd_vfd, zerobuf, BLCKSZ,
+                                   i * BLCKSZ, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
+           {
+               if (nbytes < 0)
+                   ereport(ERROR,
+                           (errcode_for_file_access(),
+                            errmsg("could not extend file \"%s\": %m",
+                                   FilePathName(v->mdfd_vfd)),
+                            errhint("Check free disk space.")));
+               /* short write: complain appropriately */
+               ereport(ERROR,
+                       (errcode(ERRCODE_DISK_FULL),
+                        errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
+                               FilePathName(v->mdfd_vfd),
+                               nbytes, BLCKSZ, blocknum),
+                        errhint("Check free disk space.")));
+           }
+#endif
+       }
+
+       if (!skipFsync && !SmgrIsTemp(reln))
+           register_dirty_segment(reln, forknum, v);
+
+       Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+
+       nblocks -= segendblock - segstartblock;
+       curblocknum += segendblock - segstartblock;
+   }
+
+   pg_streaming_write_wait_all(pgsw);
+   pg_streaming_write_free(pgsw);
+
+   pfree(zerobuf);
+
+   return blocknum + (nblocks - 1);
+}
+
+
 /*
  * mdopenfork() -- Open one fork of the specified relation.
  *
index 08d150966073c5c40d4d848ad5aaab6f19311825..b3b3a5fe267463e67a8b0f3cbc355b8b4cba67bf 100644 (file)
@@ -49,6 +49,8 @@ typedef struct f_smgr
                                bool isRedo);
    void        (*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
                                BlockNumber blocknum, char *buffer, bool skipFsync);
+   BlockNumber (*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
+                                   BlockNumber blocknum, int nblocks, bool skipFsync);
    bool        (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
                                  BlockNumber blocknum);
    void        (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
@@ -83,6 +85,7 @@ static const f_smgr smgrsw[] = {
        .smgr_exists = mdexists,
        .smgr_unlink = mdunlink,
        .smgr_extend = mdextend,
+       .smgr_zeroextend = mdzeroextend,
        .smgr_prefetch = mdprefetch,
        .smgr_read = mdread,
        .smgr_startread = mdstartread,
@@ -478,6 +481,14 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
                                         buffer, skipFsync);
 }
 
+BlockNumber
+smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+              int nblocks, bool skipFsync)
+{
+   return smgrsw[reln->smgr_which].smgr_zeroextend(reln, forknum, blocknum,
+                                                   nblocks, skipFsync);
+}
+
 /*
  * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation.
  *
index b9463b1063cb68b950a51a5b49be21660f241ce2..a5088a02800098bb725b6b0c9655c7679416e3ba 100644 (file)
@@ -28,6 +28,8 @@ extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
 extern void mdextend(SMgrRelation reln, ForkNumber forknum,
                     BlockNumber blocknum, char *buffer, bool skipFsync);
+extern BlockNumber mdzeroextend(SMgrRelation reln, ForkNumber forknum,
+                               BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
                       BlockNumber blocknum);
 extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
index 7121b9ff0f6ce8964c094a7d450cd092d10e149e..55f1224cb4623d963cd63c4ffdf040d5a5b5d301 100644 (file)
@@ -92,6 +92,8 @@ extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
                       BlockNumber blocknum, char *buffer, bool skipFsync);
+extern BlockNumber smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
+                                 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
                         BlockNumber blocknum);
 extern void smgrread(SMgrRelation reln, ForkNumber forknum,