#include "storage/lmgr.h"
#include "storage/smgr.h"
+#define USE_NEW_EXTEND
/*
* RelationPutHeapTuple - place tuple at specified page
return buffer;
}
+#ifdef USE_NEW_EXTEND
+
+#include "storage/aio.h"
+
+static Buffer
+ExtendRelation(Relation relation, BulkInsertState bistate, bool use_fsm)
+{
+ bool needLock;
+ Buffer buffer;
+ //pg_streaming_write *pgsw;
+ BlockNumber newblockno;
+ Page page;
+
+ /*
+ * Have to extend the relation.
+ *
+ * We have to use a lock to ensure no one else is extending the rel at the
+ * same time, else we will both try to initialize the same new page. We
+ * can skip locking for new or temp relations, however, since no one else
+ * could be accessing them.
+ */
+ needLock = !RELATION_IS_LOCAL(relation);
+
+ /*
+ * If we need the lock but are not able to acquire it immediately, we'll
+ * consider extending the relation by multiple blocks at a time to manage
+ * contention on the relation extension lock. However, this only makes
+ * sense if we're using the FSM; otherwise, there's no point.
+ */
+ if (needLock)
+ LockRelationForExtension(relation, ExclusiveLock);
+
+ {
+ BlockNumber nblocks;
+ int extendby;
+
+ RelationOpenSmgr(relation);
+ nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
+
+ newblockno = nblocks;
+ extendby = Max(Min(nblocks / 16 * BLCKSZ, (16 * 1024 * 1024)) / BLCKSZ, 1);
+
+ /*
+ * FIXME: Bulk extending without fsm would lead to those pages not
+ * being used in subsequent inserts.
+ */
+ if (!use_fsm && !bistate)
+ extendby = 1;
+
+#if 0
+ ereport(LOG, errmsg("extending from %u to %u by %d blocks",
+ newblockno, newblockno + extendby, extendby),
+ errhidestmt(true),
+ errhidecontext(true));
+#endif
+
+ buffer = ReadBufferBI(relation, newblockno, RBM_ZERO_AND_LOCK, bistate);
+
+ smgrzeroextend(relation->rd_smgr, MAIN_FORKNUM, newblockno,
+ extendby, false);
+
+
+ /*
+ * We need to initialize the empty new page. Double-check that it really
+ * is empty (this should never happen, but if it does we don't want to
+ * risk wiping out valid data).
+ */
+ page = BufferGetPage(buffer);
+
+ if (!PageIsNew(page))
+ elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+ BufferGetBlockNumber(buffer),
+ RelationGetRelationName(relation));
+
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ MarkBufferDirty(buffer);
+
+ for (int i = newblockno + 1; i < newblockno + extendby; i++)
+ {
+ Buffer tbuf;
+
+ tbuf = ReadBufferBI(relation, i, RBM_ZERO_AND_LOCK, NULL);
+
+ RecordPageWithFreeSpace(relation, i, BLCKSZ - SizeOfPageHeaderData);
+ UnlockReleaseBuffer(tbuf);
+ }
+
+ /*
+ * Updating the upper levels of the free space map is too expensive to do
+ * for every block, but it's worth doing once at the end to make sure that
+ * subsequent insertion activity sees all of those nifty free pages we
+ * just inserted.
+ */
+ if (use_fsm && extendby > 1)
+ FreeSpaceMapVacuumRange(relation, newblockno, newblockno + extendby);
+ }
+
+
+ /*
+ * Release the file-extension lock; it's now OK for someone else to extend
+ * the relation some more.
+ */
+ if (needLock)
+ UnlockRelationForExtension(relation, ExclusiveLock);
+
+ return buffer;
+}
+
+#endif
+
/*
* For each heap page which is all-visible, acquire a pin on the appropriate
* visibility map page, if we haven't already got one.
}
}
+#ifndef USE_NEW_EXTEND
/*
* Extend a relation by multiple blocks to avoid future contention on the
* relation extension lock. Our goal is to pre-extend the relation by an
*/
FreeSpaceMapVacuumRange(relation, firstBlock, blockNum + 1);
}
+#endif
/*
* RelationGetBufferForTuple
saveFreeSpace = 0;
BlockNumber targetBlock,
otherBlock;
+#ifndef USE_NEW_EXTEND
bool needLock;
+#endif
len = MAXALIGN(len); /* be conservative */
ReleaseBuffer(buffer);
}
+ /*
+ * FIXME: definitely needs a better solution.
+ */
+ if (!use_fsm && bistate && bistate->current_buf != InvalidBuffer)
+ {
+ BlockNumber blocknum = BufferGetBlockNumber(bistate->current_buf) + 1;
+
+ RelationOpenSmgr(relation);
+
+ if (blocknum < smgrnblocks(relation->rd_smgr, MAIN_FORKNUM))
+ {
+ targetBlock = blocknum;
+
+ goto loop;
+ }
+ }
+
/* Without FSM, always fall out of the loop and extend */
if (!use_fsm)
break;
len + saveFreeSpace);
}
+#ifdef USE_NEW_EXTEND
+ buffer = ExtendRelation(relation, bistate, use_fsm);
+
+ page = BufferGetPage(buffer);
+
+#else /* !USE_NEW_EXTEND */
/*
* Have to extend the relation.
*
*/
if (needLock)
UnlockRelationForExtension(relation, ExclusiveLock);
+#endif /* !USE_NEW_EXTEND */
/*
* Lock the other buffer. It's guaranteed to be of a lower page number
* REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
* is less) potentially-freeable pages.
*/
-#define REL_TRUNCATE_MINIMUM 1000
+#define REL_TRUNCATE_MINIMUM 2000
#define REL_TRUNCATE_FRACTION 16
/*
Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
}
+static void
+zeroextend_complete(void *pgsw_private, void *write_private)
+{
+ BlockNumber *latest = (BlockNumber *) write_private;
+}
+
+BlockNumber
+mdzeroextend(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, int nblocks, bool skipFsync)
+{
+ int nbytes;
+ MdfdVec *v;
+ char *zerobuf = palloc_io_aligned(BLCKSZ, MCXT_ALLOC_ZERO);
+ pg_streaming_write *pgsw ;
+ BlockNumber latest;
+ BlockNumber curblocknum = blocknum;
+
+ Assert(nblocks > 0);
+
+ pgsw = pg_streaming_write_alloc(Min(32, nblocks), &latest, zeroextend_complete);
+
+ /* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+ Assert(blocknum >= mdnblocks(reln, forknum));
+#endif
+
+ /*
+ * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+ * more --- we mustn't create a block whose number actually is
+ * InvalidBlockNumber.
+ */
+ // FIXME
+#if 0
+ if (blocknum == InvalidBlockNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("cannot extend file \"%s\" beyond %u blocks",
+ relpath(reln->smgr_rnode, forknum),
+ InvalidBlockNumber)));
+#endif
+
+ while (nblocks > 0)
+ {
+ int fd;
+ int ret;
+ int segstartblock = curblocknum % ((BlockNumber) RELSEG_SIZE);
+ int segendblock = (curblocknum % ((BlockNumber) RELSEG_SIZE)) + nblocks;
+ off_t seekpos;
+
+ if (segendblock > RELSEG_SIZE)
+ segendblock = RELSEG_SIZE;
+
+ v = _mdfd_getseg(reln, forknum, curblocknum, skipFsync, EXTENSION_CREATE);
+
+ Assert(segstartblock < RELSEG_SIZE);
+ Assert(segendblock <= RELSEG_SIZE);
+
+ seekpos = (off_t) BLCKSZ * segstartblock;
+
+ fd = FileGetRawDesc(v->mdfd_vfd);
+ ret = posix_fallocate(fd,
+ seekpos,
+ (off_t) BLCKSZ * (segendblock - segstartblock));
+
+ if (ret != 0)
+ elog(ERROR, "fallocate failed: %m");
+
+ for (BlockNumber i = segstartblock; i < segendblock; i++)
+ {
+ PgAioInProgress *aio = pg_streaming_write_get_io(pgsw);
+ AioBufferTag tag = {.rnode = reln->smgr_rnode, .forkNum = forknum, .blockNum = i};
+
+ FileStartWrite(aio, v->mdfd_vfd, zerobuf, BLCKSZ, i * BLCKSZ, &tag, InvalidBuffer);
+
+ pg_streaming_write_write(pgsw, aio, (void*) &i);
+#if 0
+ if ((nbytes = FileWrite(v->mdfd_vfd, zerobuf, BLCKSZ,
+ i * BLCKSZ, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
+ {
+ if (nbytes < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not extend file \"%s\": %m",
+ FilePathName(v->mdfd_vfd)),
+ errhint("Check free disk space.")));
+ /* short write: complain appropriately */
+ ereport(ERROR,
+ (errcode(ERRCODE_DISK_FULL),
+ errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
+ FilePathName(v->mdfd_vfd),
+ nbytes, BLCKSZ, blocknum),
+ errhint("Check free disk space.")));
+ }
+#endif
+ }
+
+ if (!skipFsync && !SmgrIsTemp(reln))
+ register_dirty_segment(reln, forknum, v);
+
+ Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+
+ nblocks -= segendblock - segstartblock;
+ curblocknum += segendblock - segstartblock;
+ }
+
+ pg_streaming_write_wait_all(pgsw);
+ pg_streaming_write_free(pgsw);
+
+ pfree(zerobuf);
+
+ return blocknum + (nblocks - 1);
+}
+
+
/*
* mdopenfork() -- Open one fork of the specified relation.
*
bool isRedo);
void (*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
+ BlockNumber (*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, int nblocks, bool skipFsync);
bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
void (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
.smgr_exists = mdexists,
.smgr_unlink = mdunlink,
.smgr_extend = mdextend,
+ .smgr_zeroextend = mdzeroextend,
.smgr_prefetch = mdprefetch,
.smgr_read = mdread,
.smgr_startread = mdstartread,
buffer, skipFsync);
}
+BlockNumber
+smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ int nblocks, bool skipFsync)
+{
+ return smgrsw[reln->smgr_which].smgr_zeroextend(reln, forknum, blocknum,
+ nblocks, skipFsync);
+}
+
/*
* smgrprefetch() -- Initiate asynchronous read of the specified block of a relation.
*
extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
extern void mdextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
+extern BlockNumber mdzeroextend(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, int nblocks, bool skipFsync);
extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
+extern BlockNumber smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, int nblocks, bool skipFsync);
extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void smgrread(SMgrRelation reln, ForkNumber forknum,