#include "access/relation.h"
#include "fmgr.h"
#include "miscadmin.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "utils/acl.h"
{
PREWARM_PREFETCH,
PREWARM_READ,
- PREWARM_BUFFER
+ PREWARM_BUFFER,
+ PREWARM_BUFFER_AIO
} PrewarmType;
static PGAlignedBlock blockbuffer;
ptype = PREWARM_READ;
else if (strcmp(ttype, "buffer") == 0)
ptype = PREWARM_BUFFER;
+ else if (strcmp(ttype, "buffer_aio") == 0)
+ ptype = PREWARM_BUFFER_AIO;
else
{
ereport(ERROR,
++blocks_done;
}
}
+ else if (ptype == PREWARM_BUFFER_AIO)
+ {
+ for (block = first_block; block <= last_block; ++block)
+ {
+ Buffer buf;
+ PgAioInProgress *io;
+
+ CHECK_FOR_INTERRUPTS();
+ io = ReadBufferAsync(rel, forkNumber, block, RBM_NORMAL, NULL, &buf);
+ if (!io)
+ ReleaseBuffer(buf);
+ ++blocks_done;
+ }
+
+ pgaio_submit_pending();
+ pgaio_drain_outstanding();
+ }
/* Close relation, release lock. */
relation_close(rel, AccessShareLock);
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "optimizer/plancat.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
else
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
+#if 1
+#define PREFETCH_AHEAD ((4 * 1024 * 1024) / 8192)
+#define PREFETCH_BATCH 32
+ if (nallocated % PREFETCH_BATCH == 0 &&
+ (nallocated + PREFETCH_AHEAD < pbscan->phs_nblocks))
+ {
+ PgAioInProgress *io;
+ Buffer buf;
+ uint64 target = nallocated + pbscan->phs_startblock + PREFETCH_AHEAD;
+ int i;
+
+ for (i = 0; i < PREFETCH_BATCH && target < pbscan->phs_nblocks;
+ i++, target++)
+ {
+ io = ReadBufferAsync(
+ rel, MAIN_FORKNUM,
+ target % pbscan->phs_nblocks,
+ RBM_NORMAL, NULL, &buf);
+
+ if (!io)
+ ReleaseBuffer(buf);
+ }
+ pgaio_submit_pending();
+ //elog(LOG, "issued %d prefetches at %lu", i, nallocated);
+ }
+ else
+ {
+ pgaio_drain_shared();
+ }
+#endif
+
/*
* Report scan location. Normally, we report the current page number.
* When we reach the end of the scan, though, we report the starting page,
#include "common/username.h"
#include "port/atomics.h"
#include "postmaster/postmaster.h"
+#include "storage/aio.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
#include "utils/help_config.h"
allocsize += hugepagesize - (allocsize % hugepagesize);
ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
- PG_MMAP_FLAGS | mmap_flags, -1, 0);
+ PG_MMAP_FLAGS | MAP_POPULATE | mmap_flags, -1, 0);
mmap_errno = errno;
if (huge_pages == HUGE_PAGES_TRY && ptr == MAP_FAILED)
elog(DEBUG1, "mmap(%zu) with MAP_HUGETLB failed, huge pages disabled: %m",
*/
allocsize = *size;
ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
- PG_MMAP_FLAGS, -1, 0);
+ PG_MMAP_FLAGS | MAP_POPULATE, -1, 0);
mmap_errno = errno;
}
#include "postmaster/syslogger.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
+#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
*/
InitializeMaxBackends();
+ /*
+ * As AIO might create interal FDs, and will trigger shared memory
+ * allocations, need to do this before reset_shared() and
+ * set_max_safe_fds().
+ */
+ pgaio_postmaster_init();
+
/*
* Set up shared memory and semaphores.
*/
&foundDescs);
BufferBlocks = (char *)
+ TYPEALIGN(BLCKSZ,
ShmemInitStruct("Buffer Blocks",
- NBuffers * (Size) BLCKSZ, &foundBufs);
+ (NBuffers + 1) * (Size) BLCKSZ, &foundBufs));
/* Align lwlocks to cacheline boundary */
BufferIOCVArray = (ConditionVariableMinimallyPadded *)
size = add_size(size, PG_CACHE_LINE_SIZE);
/* size of data pages */
+ /* to allow aligning buffer blocks */
+ size = add_size(size, BLCKSZ);
size = add_size(size, mul_size(NBuffers, BLCKSZ));
/* size of stuff controlled by freelist.c */
#include "pg_trace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy,
bool *hit);
+static BufferDesc *ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ BlockNumber blockNum, ReadBufferMode mode,
+ BufferAccessStrategy strategy, bool *hit, bool isLocalBuf);
static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
mode, strategy, &hit);
}
+PgAioInProgress*
+ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
+ ReadBufferMode mode, BufferAccessStrategy strategy,
+ Buffer *buf)
+{
+ BufferDesc *bufHdr;
+ bool hit;
+ PgAioInProgress* aio;
+
+ if (mode != RBM_NORMAL || strategy != NULL || blockNum == P_NEW)
+ elog(ERROR, "unsupported");
+
+ /* Open it at the smgr level if not already done */
+ RelationOpenSmgr(reln);
+
+ bufHdr = ReadBuffer_start(reln->rd_smgr, reln->rd_rel->relpersistence, forkNum,
+ blockNum, mode, strategy, &hit, false);
+ *buf = BufferDescriptorGetBuffer(bufHdr);
+
+ if (hit)
+ return NULL;
+
+ InProgressBuf = NULL;
+
+ aio = smgrstartread(reln->rd_smgr, forkNum, blockNum,
+ BufHdrGetBlock(bufHdr), *buf);
+ Assert(aio != NULL);
+
+ /*
+ * Decrement local pin, but keep shared. Shared will be released upon
+ * completion.
+ */
+ {
+ PrivateRefCountEntry *ref;
+
+ ref = GetPrivateRefCountEntry(*buf, false);
+ Assert(ref != NULL);
+ Assert(ref->refcount > 0);
+
+ ResourceOwnerForgetBuffer(CurrentResourceOwner, *buf);
+ ref->refcount--;
+ ForgetPrivateRefCountEntry(ref);
+ }
+
+ return aio;
+}
-/*
- * ReadBuffer_common -- common logic for all ReadBuffer variants
- *
- * *hit is set to true if the request was satisfied from shared buffer cache.
- */
static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_extend(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
- BufferAccessStrategy strategy, bool *hit)
+ BufferAccessStrategy strategy, bool *hit, bool isLocalBuf)
{
+ bool found;
BufferDesc *bufHdr;
Block bufBlock;
- bool found;
- bool isExtend;
- bool isLocalBuf = SmgrIsTemp(smgr);
*hit = false;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
- isExtend = (blockNum == P_NEW);
-
- TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
- smgr->smgr_rnode.node.spcNode,
- smgr->smgr_rnode.node.dbNode,
- smgr->smgr_rnode.node.relNode,
- smgr->smgr_rnode.backend,
- isExtend);
-
- /* Substitute proper block number if caller asked for P_NEW */
- if (isExtend)
- blockNum = smgrnblocks(smgr, forkNum);
+ /* Substitute proper block number */
+ blockNum = smgrnblocks(smgr, forkNum);
if (isLocalBuf)
{
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
- if (found)
- pgBufferUsage.local_blks_hit++;
- else if (isExtend)
- pgBufferUsage.local_blks_written++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.local_blks_read++;
+ pgBufferUsage.local_blks_written++;
}
else
{
- /*
- * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
- * not currently in memory.
- */
bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
strategy, &found);
- if (found)
- pgBufferUsage.shared_blks_hit++;
- else if (isExtend)
- pgBufferUsage.shared_blks_written++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.shared_blks_read++;
+ pgBufferUsage.shared_blks_written++;
}
- /* At this point we do NOT hold any locks. */
-
- /* if it was already in the buffer pool, we're done */
if (found)
{
- if (!isExtend)
- {
- /* Just need to update stats before we exit */
- *hit = true;
- VacuumPageHit++;
-
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageHit;
-
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
- smgr->smgr_rnode.node.spcNode,
- smgr->smgr_rnode.node.dbNode,
- smgr->smgr_rnode.node.relNode,
- smgr->smgr_rnode.backend,
- isExtend,
- found);
-
- /*
- * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
- * locked on return.
- */
- if (!isLocalBuf)
- {
- if (mode == RBM_ZERO_AND_LOCK)
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE);
- else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
- LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
- }
-
- return BufferDescriptorGetBuffer(bufHdr);
- }
+ Block bufBlock;
/*
* We get here only in the corner case where we are trying to extend
UnlockBufHdr(bufHdr, buf_state);
} while (!StartBufferIO(bufHdr, true));
}
+
}
/*
- * if we have gotten to this point, we have allocated a buffer for the
- * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
- * if it's a shared buffer.
- *
* Note: if smgrextend fails, we will end up with a buffer that is
* allocated but not marked BM_VALID. P_NEW will still select the same
* block number (because the relation didn't get any longer on disk) and
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
- if (isExtend)
+ /* new buffers are zero-filled */
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ /* don't set checksum for all-zero page */
+ smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
+
+ /*
+ * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
+ * although we're essentially performing a write. At least on linux
+ * doing so defeats the 'delayed allocation' mechanism, leading to
+ * increased file fragmentation.
+ */
+
+ /*
+ * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
+ * the page as valid, to make sure that no other backend sees the zeroed
+ * page before the caller has had a chance to initialize it.
+ *
+ * Since no-one else can be looking at the page contents yet, there is no
+ * difference between an exclusive lock and a cleanup-strength lock. (Note
+ * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
+ * they assert that the buffer is already valid.)
+ */
+ if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
+ !isLocalBuf)
+ {
+ LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+ }
+
+
+ if (isLocalBuf)
{
- /* new buffers are zero-filled */
- MemSet((char *) bufBlock, 0, BLCKSZ);
- /* don't set checksum for all-zero page */
- smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
+ /* Only need to adjust flags */
+ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
- /*
- * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
- * although we're essentially performing a write. At least on linux
- * doing so defeats the 'delayed allocation' mechanism, leading to
- * increased file fragmentation.
- */
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ }
+ else
+ {
+ /* Set BM_VALID, terminate IO, and wake up any waiters */
+ TerminateBufferIO(bufHdr, false, BM_VALID);
+ }
+
+ return BufferDescriptorGetBuffer(bufHdr);
+}
+
+static BufferDesc *
+ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ BlockNumber blockNum, ReadBufferMode mode,
+ BufferAccessStrategy strategy, bool *hit, bool isLocalBuf)
+{
+ BufferDesc *bufHdr;
+ bool found;
+
+ Assert(blockNum != P_NEW);
+
+ *hit = false;
+
+ /* Make sure we will have room to remember the buffer pin */
+ ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+
+ TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+ smgr->smgr_rnode.node.spcNode,
+ smgr->smgr_rnode.node.dbNode,
+ smgr->smgr_rnode.node.relNode,
+ smgr->smgr_rnode.backend,
+ isExtend);
+
+ if (isLocalBuf)
+ {
+ bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
+ if (found)
+ pgBufferUsage.local_blks_hit++;
+ else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
+ mode == RBM_ZERO_ON_ERROR)
+ pgBufferUsage.local_blks_read++;
}
else
{
/*
- * Read in the page, unless the caller intends to overwrite it and
- * just wants us to allocate a buffer.
+ * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
+ * not currently in memory.
*/
- if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
- MemSet((char *) bufBlock, 0, BLCKSZ);
- else
- {
- instr_time io_start,
- io_time;
+ bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
+ strategy, &found);
+ if (found)
+ pgBufferUsage.shared_blks_hit++;
+ else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
+ mode == RBM_ZERO_ON_ERROR)
+ pgBufferUsage.shared_blks_read++;
+ }
- if (track_io_timing)
- INSTR_TIME_SET_CURRENT(io_start);
- smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
+ /* At this point we do NOT hold any locks. */
- if (track_io_timing)
- {
- INSTR_TIME_SET_CURRENT(io_time);
- INSTR_TIME_SUBTRACT(io_time, io_start);
- pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
- INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
- }
+ /* if it was already in the buffer pool, we're done */
+ if (found)
+ {
+ /* Just need to update stats before we exit */
+ *hit = true;
+ VacuumPageHit++;
- /* check for garbage data */
- if (!PageIsVerified((Page) bufBlock, blockNum))
- {
- if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
- {
- ereport(WARNING,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s; zeroing out page",
- blockNum,
- relpath(smgr->smgr_rnode, forkNum))));
- MemSet((char *) bufBlock, 0, BLCKSZ);
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s",
- blockNum,
- relpath(smgr->smgr_rnode, forkNum))));
- }
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageHit;
+
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
+ smgr->smgr_rnode.node.spcNode,
+ smgr->smgr_rnode.node.dbNode,
+ smgr->smgr_rnode.node.relNode,
+ smgr->smgr_rnode.backend,
+ isExtend,
+ found);
+
+ /*
+ * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
+ * locked on return.
+ */
+ if (!isLocalBuf)
+ {
+ if (mode == RBM_ZERO_AND_LOCK)
+ LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+ LW_EXCLUSIVE);
+ else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
+ LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
}
+
+ return bufHdr;
}
+ return bufHdr;
+}
+
+/*
+ * ReadBuffer_common -- common logic for all ReadBuffer variants
+ *
+ * *hit is set to true if the request was satisfied from shared buffer cache.
+ */
+static Buffer
+ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ BlockNumber blockNum, ReadBufferMode mode,
+ BufferAccessStrategy strategy, bool *hit)
+{
+ Block bufBlock;
+ bool isLocalBuf = SmgrIsTemp(smgr);
+ BufferDesc *bufHdr;
+
+ if (blockNum == P_NEW)
+ return ReadBuffer_extend(smgr, relpersistence, forkNum,
+ blockNum, mode, strategy,
+ hit, isLocalBuf);
+
+
+ bufHdr = ReadBuffer_start(smgr, relpersistence, forkNum,
+ blockNum, mode, strategy,
+ hit, isLocalBuf);
+
+ if (*hit)
+ return BufferDescriptorGetBuffer(bufHdr);
+
/*
- * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
- * the page as valid, to make sure that no other backend sees the zeroed
- * page before the caller has had a chance to initialize it.
- *
- * Since no-one else can be looking at the page contents yet, there is no
- * difference between an exclusive lock and a cleanup-strength lock. (Note
- * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
- * they assert that the buffer is already valid.)
+ * if we have gotten to this point, we have allocated a buffer for the
+ * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
+ * if it's a shared buffer.
*/
- if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
- !isLocalBuf)
+ Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
+
+ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+
+ /*
+ * Read in the page, unless the caller intends to overwrite it and
+ * just wants us to allocate a buffer.
+ */
+ if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
{
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+
+ /*
+ * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
+ * the page as valid, to make sure that no other backend sees the zeroed
+ * page before the caller has had a chance to initialize it.
+ *
+ * Since no-one else can be looking at the page contents yet, there is no
+ * difference between an exclusive lock and a cleanup-strength lock. (Note
+ * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
+ * they assert that the buffer is already valid.)
+ */
+ if (!isLocalBuf)
+ {
+ LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+ }
+ }
+ else
+ {
+ instr_time io_start,
+ io_time;
+
+ if (track_io_timing)
+ INSTR_TIME_SET_CURRENT(io_start);
+
+ smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
+
+ if (track_io_timing)
+ {
+ INSTR_TIME_SET_CURRENT(io_time);
+ INSTR_TIME_SUBTRACT(io_time, io_start);
+ pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
+ INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
+ }
+
+ /* check for garbage data */
+ if (!PageIsVerified((Page) bufBlock, blockNum))
+ {
+ if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s; zeroing out page",
+ blockNum,
+ relpath(smgr->smgr_rnode, forkNum))));
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s",
+ blockNum,
+ relpath(smgr->smgr_rnode, forkNum))));
+ }
}
if (isLocalBuf)
bool valid;
uint32 buf_state;
+ Assert(blockNum != P_NEW);
+
/* create a tag so we can lookup the buffer */
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
CHECKPOINT_FLUSH_ALL))))
mask |= BM_PERMANENT;
+ elog(DEBUG1, "checkpoint looking at buffers");
+
/*
* Loop over all buffers, and mark the ones that need to be written with
* BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we
TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
+ elog(DEBUG1, "checkpoint predicts to write %u buffers", num_to_scan);
+
/*
* Sort buffers that need to be written to reduce the likelihood of random
* IO. The sorting is also important for the implementation of balancing
num_spaces = 0;
+ elog(DEBUG1, "checkpoint done sorting");
+
/*
* Allocate progress status for each tablespace with buffers that need to
* be flushed. This requires the to-be-flushed array to be sorted.
binaryheap_build(ts_heap);
+ elog(DEBUG1, "checkpoint done heaping");
+
+
/*
* Iterate through to-be-checkpointed buffers and write the ones (still)
* marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
{
uint32 buf_state;
+ pgaio_drain_outstanding();
+
/*
* It may not be necessary to acquire the spinlock to check the flag
* here, but since this test is essential for correctness, we'd better
{
BufferDesc *buf = InProgressBuf;
+ pgaio_at_abort();
+
if (buf)
{
uint32 buf_state;
}
context->nr_pending = 0;
+
+ if (i > 0)
+ pgaio_submit_pending();
}
bool found;
uint32 buf_state;
+ Assert(blockNum != P_NEW);
+
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
/* Initialize local buffers if first request in this session */
#include "miscadmin.h"
#include "pgstat.h"
#include "portability/mem.h"
+#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/guc.h"
* We compile all alternatives that are supported on the current platform,
* to find portability problems more easily.
*/
+#if USE_LIBURING
+ {
+ pgaio_start_flush_range(fd, offset, nbytes);
+
+ return;
+ }
+#endif
#if defined(HAVE_SYNC_FILE_RANGE)
{
int rc;
fd = open(fileName, fileFlags, fileMode);
if (fd >= 0)
+ {
+ //elog(DEBUG1, "opening file %s fd %d", fileName, fd);
+
return fd; /* success! */
+ }
if (errno == EMFILE || errno == ENFILE)
{
vfdP = &VfdCache[file];
+ pgaio_submit_pending();
+
/*
* Close the file. We aren't expecting this to fail; if it does, better
* to leak the FD than to mess up our internal state.
if (!FileIsNotOpen(file))
{
+ pgaio_submit_pending();
+
/* close the file */
if (close(vfdP->fd) != 0)
{
return returnCode;
}
+#include "storage/aio.h"
+
+struct PgAioInProgress *
+FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid)
+{
+ int returnCode;
+ Vfd *vfdP;
+
+ Assert(FileIsValid(file));
+
+ DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %d %p",
+ file, VfdCache[file].fileName,
+ (int64) offset,
+ amount, buffer));
+
+ returnCode = FileAccess(file);
+ if (returnCode < 0)
+ return NULL;
+
+ vfdP = &VfdCache[file];
+
+ return pgaio_start_buffer_read(vfdP->fd, offset, amount, buffer, bufid);
+}
+
int
FileWrite(File file, char *buffer, int amount, off_t offset,
uint32 wait_event_info)
result = closedir(desc->desc.dir);
break;
case AllocateDescRawFD:
+ pgaio_submit_pending();
result = close(desc->desc.fd);
break;
default:
/* Only get here if someone passes us a file not in allocatedDescs */
elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
+ pgaio_submit_pending();
+
return close(fd);
}
include $(top_builddir)/src/Makefile.global
OBJS = \
+ aio.o \
barrier.o \
dsm.o \
dsm_impl.o \
sinvaladt.o \
standby.o
+aio.o: override CPPFLAGS += $(LIBURING_CFLAGS)
+
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <liburing.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#include "lib/ilist.h"
+#include "miscadmin.h"
+#include "storage/aio.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+
+typedef enum PgAioAction
+{
+ /* intentionally the zero value, to help catch zeroed memory etc */
+ PGAIO_INVALID = 0,
+
+ PGAIO_NOP,
+ PGAIO_FLUSH_RANGE,
+ PGAIO_READ_BUFFER,
+ PGAIO_WRITE_BUFFER
+} PgAioAction;
+
+typedef enum PgAioInProgressFlags
+{
+ /* request in the ->unused list */
+ PGAIOIP_IDLE = 1 << 0,
+ /* request used, may also be in other states */
+ PGAIOIP_IN_USE = 1 << 1,
+ /* request in kernel */
+ PGAIOIP_INFLIGHT = 1 << 2,
+ /* completion callback was called */
+ PGAIOIP_DONE = 1 << 3,
+} PgAioInProgressFlags;
+
+/* IO completion callback */
+typedef void (*PgAioCompletedCB)(PgAioInProgress *io);
+
+struct PgAioInProgress
+{
+ dlist_node node;
+
+ ConditionVariable cv;
+
+ uint8 flags;
+
+ /* PgAioAction, indexes PgAioCompletionCallbacks */
+ PgAioAction type;
+
+ /* which AIO ring is this entry active for */
+ uint8 ring;
+
+ /* index into allProcs, or PG_UINT32_MAX for process local IO */
+ uint32 initiatorProcIndex;
+
+ /* the IOs result, depends on operation. E.g. the length of a read */
+ int32 result;
+
+ /*
+ * NB: Note that fds in here may *not* be relied upon for re-issuing
+ * requests (e.g. for partial reads/writes) - the fd might be from another
+ * process, or closed since. That's not a problem for IOs waiting to be
+ * issued only because the queue is flushed when closing an fd.
+ */
+ union {
+ struct
+ {
+ int fd;
+ off_t nbytes;
+ off_t offset;
+ } flush_range;
+
+ struct
+ {
+ Buffer buf;
+ int fd;
+ int already_done;
+ off_t offset;
+ struct iovec iovec;
+ } read_buffer;
+
+ struct
+ {
+ Buffer buf;
+ struct iovec iovec;
+ } write_buffer;
+ } d;
+};
+
+static void pgaio_put_io_locked(PgAioInProgress *io);
+static void pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe);
+static void pgaio_complete_ios(bool in_error);
+static void pgaio_backpressure(struct io_uring *ring, const char *loc);
+
+
+static void pgaio_complete_nop(PgAioInProgress *io);
+static void pgaio_complete_flush_range(PgAioInProgress *io);
+static void pgaio_complete_read_buffer(PgAioInProgress *io);
+static void pgaio_complete_write_buffer(PgAioInProgress *io);
+
+
+/*
+ * To support EXEC_BACKEND environments, where we cannot rely on callback
+ * addresses being equivalent across processes, completion actions are just
+ * indices into a process local array of callbacks, indexed by the type of
+ * action. Also makes the shared memory entries a bit smaller, but that's not
+ * a huge win.
+ */
+static PgAioCompletedCB completion_callbacks[] =
+{
+ [PGAIO_NOP] = pgaio_complete_nop,
+ [PGAIO_FLUSH_RANGE] = pgaio_complete_flush_range,
+ [PGAIO_READ_BUFFER] = pgaio_complete_read_buffer,
+ [PGAIO_WRITE_BUFFER] = pgaio_complete_write_buffer,
+};
+
+typedef struct PgAioCtl
+{
+ /* FIXME: this needs to be partitioned */
+ /* PgAioInProgress that are not used */
+ dlist_head unused_ios;
+
+ /* FIXME: this should be per ring */
+ /*
+ * PgAioInProgress that are issued to the ringbuffer, and have not yet
+ * been processed (but they may have completed without us processing the
+ * completions).
+ */
+ dlist_head inflight;
+
+ /*
+ * FIXME: there should be multiple rings, at least one for data integrity
+ * writes, allowing efficient interleaving with WAL writes, and one for
+ * the rest. But likely a small number of non integrity rings too.
+ *
+ * It could also make sense to have a / a few rings for specific purposes
+ * like prefetching, and vacuum too. Configuring different depths could be
+ * a nice tool to manage maximum overall system impact (in particular by
+ * limiting queue/inflight operations size).
+ */
+ struct io_uring shared_ring;
+
+ /*
+ * Number of PgAioInProgressIOs that are in use. This includes pending
+ * requests, as well as requests actually issues to the queue.
+ */
+ int32 outstanding;
+
+ PgAioInProgress in_progress_io[FLEXIBLE_ARRAY_MEMBER];
+} PgAioCtl;
+
+
+/* (future) GUC controlling global MAX number of in-progress IO entries */
+extern int max_aio_in_progress;
+int max_aio_in_progress = 2048;
+
+/* global list of in-progress IO */
+static PgAioCtl *aio_ctl;
+
+/*
+ * Requests waiting to be issued to the kernel. They are submitted to the
+ * kernel in batches, for efficiency (including allowing for better queue
+ * processing).
+ */
+int num_local_pending_requests = 0;
+static dlist_head local_pending_requests;
+
+/*
+ * Requests completions received from the kernel. These are in global
+ * variables so we can continue processing, if a completion callback fails.
+ */
+#define MAX_LOCAL_REAPED 32
+int num_local_reaped = 0;
+struct io_uring_cqe *local_reaped_cqes[MAX_LOCAL_REAPED];
+PgAioInProgress *local_reaped_ios[MAX_LOCAL_REAPED];
+
+
+# ifndef __NR_io_uring_enter
+# define __NR_io_uring_enter 426
+# endif
+
+static int
+__sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
+ unsigned flags, sigset_t *sig)
+{
+ return syscall(__NR_io_uring_enter, fd, to_submit, min_complete,
+ flags, sig, _NSIG / 8);
+}
+
+#define PGAIO_VERBOSE
+
+#define PGAIO_SUBMIT_BATCH_SIZE 16
+#define PGAIO_BACKPRESSURE_LIMIT 500
+
+
+Size
+AioShmemSize(void)
+{
+ return add_size(mul_size(max_aio_in_progress, sizeof(PgAioInProgress)),
+ offsetof(PgAioCtl, in_progress_io));
+}
+
+void
+AioShmemInit(void)
+{
+ bool found;
+
+ aio_ctl = (PgAioCtl *)
+ ShmemInitStruct("PgAio", AioShmemSize(), &found);
+
+ if (!found)
+ {
+ memset(aio_ctl, 0, AioShmemSize());
+
+ dlist_init(&aio_ctl->unused_ios);
+ dlist_init(&aio_ctl->inflight);
+
+ for (int i = 0; i < max_aio_in_progress; i++)
+ {
+ PgAioInProgress *io = &aio_ctl->in_progress_io[i];
+
+ ConditionVariableInit(&io->cv);
+ dlist_push_head(&aio_ctl->unused_ios,
+ &io->node);
+ io->flags = PGAIOIP_IDLE;
+ }
+
+ {
+ int ret;
+
+ ret = io_uring_queue_init(512, &aio_ctl->shared_ring, 0);
+ if (ret < 0)
+ elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+ }
+
+ }
+}
+
+void
+pgaio_postmaster_init(void)
+{
+#if 0
+ int ret;
+
+ Assert(shared_ring.ring_fd == -1);
+
+ ret = io_uring_queue_init(512, &shared_ring, 0);
+ if (ret < 0)
+ elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+
+ //pgaio_test_fsync(1);
+#endif
+ dlist_init(&local_pending_requests);
+ num_local_pending_requests = 0;
+ num_local_reaped = 0;
+}
+
+void
+pgaio_postmaster_child_init(void)
+{
+ /* no locking needed here, only affects this process */
+ io_uring_ring_dontfork(&aio_ctl->shared_ring);
+ dlist_init(&local_pending_requests);
+ num_local_pending_requests = 0;
+ num_local_reaped = 0;
+}
+
+void
+pgaio_at_abort(void)
+{
+ if (num_local_reaped > 0)
+ {
+ elog(LOG, "at abort with %d pending", num_local_reaped);
+
+ pgaio_complete_ios(/* in_error = */ true);
+ }
+
+ pgaio_submit_pending();
+}
+
+static void
+pgaio_complete_cqes(struct io_uring *ring, PgAioInProgress **ios, struct io_uring_cqe **cqes, int ready)
+{
+ for (int i = 0; i < ready; i++)
+ {
+ struct io_uring_cqe *cqe = cqes[i];
+ PgAioInProgress *io;
+
+ ios[i] = io = io_uring_cqe_get_data(cqe);
+ Assert(ios[i] != NULL);
+ ios[i]->result = cqe->res;
+
+ Assert(io->flags & PGAIOIP_INFLIGHT);
+ Assert(io->flags & PGAIOIP_IN_USE);
+ Assert(!(io->flags == PGAIOIP_IDLE));
+ io->flags &= ~PGAIOIP_INFLIGHT;
+
+ if (cqe->res < 0)
+ {
+ elog(WARNING, "cqe: u: %p s: %d/%s f: %u",
+ io_uring_cqe_get_data(cqe),
+ cqe->res,
+ cqe->res < 0 ? strerror(-cqe->res) : "",
+ cqe->flags);
+ }
+
+ io_uring_cqe_seen(ring, cqe);
+
+ /* delete from PgAioCtl->inflight */
+ dlist_delete(&io->node);
+ }
+ //io_uring_cq_advance(ring, ready);
+}
+
+static void
+pgaio_complete_ios(bool in_error)
+{
+ Assert(!LWLockHeldByMe(SharedAIOLock));
+
+ /* call all callbacks, without holding lock */
+ for (int i = 0; i < num_local_reaped; i++)
+ {
+ PgAioInProgress *io = local_reaped_ios[i];
+
+ Assert(io->flags & PGAIOIP_IN_USE);
+
+ if (!(io->flags & PGAIOIP_DONE))
+ {
+ PgAioCompletedCB cb;
+
+ cb = completion_callbacks[io->type];
+ cb(io);
+
+ io->flags |= PGAIOIP_DONE;
+
+ /* signal state change */
+ ConditionVariableBroadcast(&io->cv);
+ }
+ else
+ {
+ Assert(in_error);
+ }
+ }
+
+ /* and recycle io entries */
+ START_CRIT_SECTION();
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+ for (int i = 0; i < num_local_reaped; i++)
+ pgaio_put_io_locked(local_reaped_ios[i]);
+ num_local_reaped = 0;
+ LWLockRelease(SharedAIOLock);
+ END_CRIT_SECTION();
+}
+
+/*
+ * This checks if there are completions to be processed, before unlocking the
+ * ring.
+ */
+static int
+pgaio_drain_and_unlock(struct io_uring *ring)
+{
+ Assert(LWLockHeldByMe(SharedAIOLock));
+ Assert(num_local_reaped == 0);
+
+ START_CRIT_SECTION();
+
+ if (io_uring_cq_ready(ring))
+ {
+ num_local_reaped =
+ io_uring_peek_batch_cqe(ring,
+ local_reaped_cqes,
+ MAX_LOCAL_REAPED);
+
+ pgaio_complete_cqes(ring,
+ local_reaped_ios,
+ local_reaped_cqes,
+ num_local_reaped);
+ }
+
+ LWLockRelease(SharedAIOLock);
+
+ END_CRIT_SECTION();
+
+ if (num_local_reaped > 0)
+ pgaio_complete_ios(false);
+
+ return num_local_reaped;
+}
+
+static void
+pgaio_drain(struct io_uring *ring, bool already_locked)
+{
+ bool lock_held = already_locked;
+
+ while (true)
+ {
+ uint32 ready = io_uring_cq_ready(ring);
+ uint32 processed;
+
+ if (ready == 0)
+ break;
+
+ if (!lock_held)
+ {
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+ lock_held = true;
+ }
+
+ processed = pgaio_drain_and_unlock(ring);
+ lock_held = false;
+
+ if (processed >= ready)
+ break;
+ }
+
+ if (already_locked && !lock_held)
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+}
+
+void
+pgaio_drain_shared(void)
+{
+ pgaio_drain(&aio_ctl->shared_ring, false);
+}
+
+void
+pgaio_drain_outstanding(void)
+{
+ int outstanding;
+
+ Assert(!LWLockHeldByMe(SharedAIOLock));
+
+ pgaio_submit_pending();
+ pgaio_drain(&aio_ctl->shared_ring, false);
+
+ outstanding = aio_ctl->outstanding;
+
+ while (outstanding > 0)
+ {
+#if 0
+ int ret = __sys_io_uring_enter(aio_ctl->shared_ring.ring_fd,
+ 0, 1,
+ IORING_ENTER_GETEVENTS, NULL);
+ if (ret != 0 && errno != EINTR)
+ elog(WARNING, "unexpected: %d/%s", ret, strerror(-ret));
+
+ pgaio_drain(&aio_ctl->shared_ring, false);
+
+#else
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+ outstanding = aio_ctl->outstanding;
+
+ if (!dlist_is_empty(&aio_ctl->inflight) &&
+ io_uring_cq_ready(&aio_ctl->shared_ring) == 0)
+ {
+ int ret = __sys_io_uring_enter(aio_ctl->shared_ring.ring_fd,
+ 0, 1,
+ IORING_ENTER_GETEVENTS, NULL);
+ if (ret != 0 && errno != EINTR)
+ elog(WARNING, "unexpected: %d/%s: %m", ret, strerror(-ret));
+
+ pgaio_drain(&aio_ctl->shared_ring, true);
+ }
+
+ if (io_uring_cq_ready(&aio_ctl->shared_ring))
+ pgaio_drain(&aio_ctl->shared_ring, true);
+
+ LWLockRelease(SharedAIOLock);
+#endif
+ }
+}
+
+void
+pgaio_submit_pending(void)
+{
+ PgAioInProgress *ios[PGAIO_SUBMIT_BATCH_SIZE];
+ struct io_uring_sqe *sqe[PGAIO_SUBMIT_BATCH_SIZE];
+ int total_submitted = 0;
+
+ /* FIXME: */
+ if (!aio_ctl || dlist_is_empty(&local_pending_requests))
+ {
+ Assert(num_local_pending_requests == 0);
+ return;
+ }
+
+ while (!dlist_is_empty(&local_pending_requests))
+ {
+ int nios = 0;
+ int nsubmit = Min(num_local_pending_requests, PGAIO_SUBMIT_BATCH_SIZE);
+
+ Assert(nsubmit != 0 && nsubmit <= num_local_pending_requests);
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+ for (int i = 0; i < nsubmit; i++)
+ {
+ dlist_node *node;
+
+ sqe[nios] = io_uring_get_sqe(&aio_ctl->shared_ring);
+
+ if (!sqe[nios])
+ break;
+
+ node = dlist_pop_head_node(&local_pending_requests);
+ ios[nios] = dlist_container(PgAioInProgress, node,node);
+
+ pgaio_sq_from_io(ios[nios], sqe[nios]);
+ Assert(ios[nios]->flags & PGAIOIP_IN_USE);
+ ios[nios]->flags |= PGAIOIP_INFLIGHT;
+
+ dlist_push_head(&aio_ctl->inflight,
+ &ios[nios]->node);
+
+ nios++;
+ num_local_pending_requests--;
+ total_submitted++;
+ }
+
+ if (nios > 0)
+ {
+ int ret = io_uring_submit(&aio_ctl->shared_ring);
+
+ again:
+ if (ret == -EINTR)
+ goto again;
+
+ if (ret < 0)
+ elog(PANIC, "failed: %d/%s",
+ ret, strerror(-ret));
+ }
+
+ /* while still holding the lock, extract all CQs we can */
+ pgaio_drain_and_unlock(&aio_ctl->shared_ring);
+ }
+
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "submitted %d", total_submitted);
+#endif
+
+ pgaio_backpressure(&aio_ctl->shared_ring, "submit_pending");
+}
+
+static void
+pgaio_backpressure(struct io_uring *ring, const char *loc)
+{
+ if (aio_ctl->outstanding > PGAIO_BACKPRESSURE_LIMIT)
+ {
+ for (int i = 0; i < 1; i++)
+ {
+ int ret;
+ int waitfor;
+ int cqr_before;
+ int cqr_after;
+
+ /*
+ * FIXME: This code likely has a race condition, where the queue
+ * might be emptied after our check, but before we wait for events
+ * to be completed. Using a lock around this could fix that, but
+ * we dont want that (both because others should be able to add
+ * requests, and because we'd rather have the kernel wake everyone
+ * up, if there's some readiness - it's quite likely multiple
+ * backends may wait for the same IO).
+ *
+ * Possible fix: While holding lock, register for CV for one of
+ * the inflight requests. Then, using appropriate sigmask'ery,
+ * wait until either that request is processed by somebody else,
+ * or a new completion is ready. The latter is much more likely.
+ */
+ cqr_before = io_uring_cq_ready(ring);
+
+ waitfor = 1;
+ ret = __sys_io_uring_enter(ring->ring_fd,
+ 0, waitfor,
+ IORING_ENTER_GETEVENTS, NULL);
+ if (ret < 0 && errno != EINTR)
+ elog(WARNING, "enter failed: %d/%s", ret, strerror(-ret));
+
+ cqr_after = io_uring_cq_ready(ring);
+#if 1
+ elog(DEBUG1, "nonlock at %s for depth %d waited for %d got %d "
+ "cqr before %d after %d "
+ "space left: %d, sq ready: %d",
+ loc,
+ aio_ctl->outstanding, waitfor, ret, cqr_before,
+ io_uring_cq_ready(ring),
+ io_uring_sq_space_left(ring),
+ io_uring_sq_ready(ring));
+#endif
+ if (cqr_after)
+ pgaio_drain(ring, false);
+ }
+ }
+
+ if (aio_ctl->outstanding > 1024)
+ {
+ elog(WARNING, "something's up: %d outstanding! cq ready: %u sq space left: %d, sq ready: %d",
+ aio_ctl->outstanding,
+ io_uring_cq_ready(ring),
+ io_uring_sq_space_left(ring),
+ io_uring_sq_ready(ring));
+ }
+}
+
+static PgAioInProgress*
+pgaio_start_get_io(PgAioAction action)
+{
+ dlist_node *elem;
+ PgAioInProgress *io;
+
+ Assert(!LWLockHeldByMe(SharedAIOLock));
+ Assert(num_local_pending_requests < PGAIO_SUBMIT_BATCH_SIZE);
+
+ /* FIXME: wait for an IO to complete if full */
+
+ LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE);
+
+ while (dlist_is_empty(&aio_ctl->unused_ios))
+ {
+ pgaio_drain(&aio_ctl->shared_ring, true);
+ Assert(LWLockHeldByMe(SharedAIOLock));
+ }
+
+ START_CRIT_SECTION();
+
+ elem = dlist_pop_head_node(&aio_ctl->unused_ios);
+
+ io = dlist_container(PgAioInProgress, node, elem);
+
+ Assert(!(io->flags & PGAIOIP_IN_USE));
+ Assert(io->flags == PGAIOIP_IDLE);
+
+ io->flags &= ~PGAIOIP_IDLE;
+ io->flags |= PGAIOIP_IN_USE;
+ io->type = action;
+ io->initiatorProcIndex = MyProc->pgprocno;
+
+ aio_ctl->outstanding++;
+
+ dlist_push_tail(&local_pending_requests,
+ &io->node);
+ num_local_pending_requests++;
+
+ END_CRIT_SECTION();
+
+ return io;
+}
+
+static void
+pgaio_end_get_io(void)
+{
+ Assert(LWLockHeldByMe(SharedAIOLock));
+
+ pgaio_drain_and_unlock(&aio_ctl->shared_ring);
+
+ if (num_local_pending_requests >= PGAIO_SUBMIT_BATCH_SIZE)
+ pgaio_submit_pending();
+ else
+ pgaio_backpressure(&aio_ctl->shared_ring, "get_io");
+}
+
+static void
+pgaio_put_io_locked(PgAioInProgress *io)
+{
+ Assert(LWLockHeldByMe(SharedAIOLock));
+ Assert(io->flags & PGAIOIP_DONE);
+
+ io->flags &= ~(PGAIOIP_IN_USE|PGAIOIP_DONE);
+ io->flags |= PGAIOIP_IDLE;
+ io->type = 0;
+ io->initiatorProcIndex = INVALID_PGPROCNO;
+
+ aio_ctl->outstanding--;
+ dlist_push_head(&aio_ctl->unused_ios,
+ &io->node);
+}
+
+static void
+pgaio_complete_nop(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "completed nop");
+#endif
+}
+
+static void
+pgaio_complete_flush_range(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "completed flush_range: %zu, %s",
+ io - aio_ctl->in_progress_io,
+ io->result < 0 ? strerror(-io->result) : "ok");
+#endif
+}
+
+static void
+pgaio_complete_read_buffer(PgAioInProgress *io)
+{
+ BufferDesc *bufHdr = GetBufferDescriptor(io->d.read_buffer.buf - 1);
+ uint32 buf_state;
+ bool failed = false;
+ RelFileNode rnode = bufHdr->tag.rnode;
+ BlockNumber forkNum = bufHdr->tag.forkNum;
+ BlockNumber blockNum = bufHdr->tag.blockNum;
+
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "completed read_buffer: %zu, %d/%s, buf %d",
+ io - aio_ctl->in_progress_io,
+ io->result,
+ io->result < 0 ? strerror(-io->result) : "ok",
+ io->d.read_buffer.buf);
+#endif
+
+ if (io->result < 0)
+ {
+ if (io->result == EAGAIN || io->result == EINTR)
+ {
+ elog(WARNING, "need to implement retries");
+ }
+
+ failed = true;
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read block %u in file \"%s\": %s",
+ blockNum,
+ relpathperm(rnode, forkNum),
+ strerror(-io->result))));
+ }
+ else if (io->result != BLCKSZ)
+ {
+ failed = true;
+ ereport(WARNING,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read block %u in file \"%s\": read only %d of %d bytes",
+ blockNum,
+ relpathperm(rnode, forkNum),
+ io->result, BLCKSZ)));
+ }
+
+ /* FIXME: needs to be in bufmgr.c */
+ {
+ Block bufBlock;
+ RelFileNode rnode = bufHdr->tag.rnode;
+ BlockNumber forkNum = bufHdr->tag.forkNum;
+ BlockNumber blockNum = bufHdr->tag.blockNum;
+
+ bufBlock = BufferGetBlock(io->d.read_buffer.buf);
+
+ /* check for garbage data */
+ if (!PageIsVerified((Page) bufBlock, blockNum))
+ {
+ failed = true;
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s",
+ blockNum,
+ relpathperm(rnode, forkNum))));
+ }
+ }
+
+ buf_state = LockBufHdr(bufHdr);
+
+ Assert(buf_state & BM_IO_IN_PROGRESS);
+
+ buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
+ if (failed)
+ buf_state |= BM_IO_ERROR;
+ else
+ buf_state |= BM_VALID;
+
+ buf_state -= BUF_REFCOUNT_ONE;
+
+ UnlockBufHdr(bufHdr, buf_state);
+
+ ConditionVariableBroadcast(BufferDescriptorGetIOCV(bufHdr));
+}
+
+static void
+pgaio_complete_write_buffer(PgAioInProgress *io)
+{
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "completed write_buffer");
+#endif
+}
+
+static void
+pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe)
+{
+ switch (io->type)
+ {
+ case PGAIO_READ_BUFFER:
+ io_uring_prep_readv(sqe,
+ io->d.read_buffer.fd,
+ &io->d.read_buffer.iovec,
+ 1,
+ io->d.read_buffer.offset);
+ break;
+ case PGAIO_FLUSH_RANGE:
+ io_uring_prep_rw(IORING_OP_SYNC_FILE_RANGE,
+ sqe,
+ io->d.flush_range.fd,
+ NULL,
+ io->d.flush_range.nbytes,
+ io->d.flush_range.offset);
+ sqe->sync_range_flags = SYNC_FILE_RANGE_WRITE;
+ break;
+ case PGAIO_WRITE_BUFFER:
+ case PGAIO_NOP:
+ elog(ERROR, "not yet");
+ break;
+ case PGAIO_INVALID:
+ elog(ERROR, "invalid");
+ }
+
+ io_uring_sqe_set_data(sqe, io);
+}
+
+PgAioInProgress *
+pgaio_start_flush_range(int fd, off_t offset, off_t nbytes)
+{
+ PgAioInProgress *io;
+ //struct io_uring_sqe *sqe;
+
+ io = pgaio_start_get_io(PGAIO_FLUSH_RANGE);
+
+ io->d.flush_range.fd = fd;
+ io->d.flush_range.offset = offset;
+ io->d.flush_range.nbytes = nbytes;
+
+ pgaio_end_get_io();
+
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "start_flush_range %zu: %d, %llu, %llu",
+ io - aio_ctl->in_progress_io,
+ fd, (unsigned long long) offset, (unsigned long long) nbytes);
+#endif
+
+ return io;
+}
+
+
+PgAioInProgress *
+pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, char* data, int buffno)
+{
+ PgAioInProgress *io;
+ //struct io_uring_sqe *sqe;
+
+ io = pgaio_start_get_io(PGAIO_READ_BUFFER);
+
+ io->d.read_buffer.buf = buffno;
+ io->d.read_buffer.fd = fd;
+ io->d.read_buffer.offset = offset;
+ io->d.read_buffer.already_done = 0;
+ io->d.read_buffer.iovec.iov_base = data;
+ io->d.read_buffer.iovec.iov_len = nbytes;
+
+ pgaio_end_get_io();
+
+#ifdef PGAIO_VERBOSE
+ elog(DEBUG1, "start_buffer_read %zu:"
+ "fd %d, off: %llu, bytes: %llu, buff: %d, data %p",
+ io - aio_ctl->in_progress_io,
+ fd,
+ (unsigned long long) offset,
+ (unsigned long long) nbytes,
+ buffno,
+ data);
+#endif
+
+ return io;
+}
+
+PgAioInProgress *
+pgaio_start_nop(void)
+{
+ PgAioInProgress *io;
+
+ io = pgaio_start_get_io(PGAIO_NOP);
+ pgaio_end_get_io();
+
+ return io;
+}
#ifdef PROFILE_PID_DIR
#include "postmaster/autovacuum.h"
#endif
+#include "storage/aio.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
#include "tcop/tcopprot.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, AioShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
SyncScanShmemInit();
AsyncShmemInit();
+ AioShmemInit();
+
#ifdef EXEC_BACKEND
/*
OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
XactTruncationLock 44
+SharedAIOLock 45
#include "pg_trace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/md.h"
*/
#define EXTENSION_DONT_CHECK_SIZE (1 << 4)
+#define MD_OPEN_FLAGS O_RDWR | PG_BINARY | O_DIRECT
/* local routines */
static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum,
path = relpath(reln->smgr_rnode, forkNum);
- fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
+ fd = PathNameOpenFile(path, MD_OPEN_FLAGS | O_CREAT | O_EXCL);
if (fd < 0)
{
int save_errno = errno;
if (isRedo)
- fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+ fd = PathNameOpenFile(path, MD_OPEN_FLAGS);
if (fd < 0)
{
/* be sure to report the error reported by create, not open */
/* truncate(2) would be easier here, but Windows hasn't got it */
int fd;
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
+ fd = OpenTransientFile(path, MD_OPEN_FLAGS);
if (fd >= 0)
{
int save_errno;
path = relpath(reln->smgr_rnode, forknum);
- fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+ fd = PathNameOpenFile(path, MD_OPEN_FLAGS);
if (fd < 0)
{
}
}
+/*
+ * mdread() -- Read the specified block from a relation.
+ */
+PgAioInProgress *
+mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ char *buffer, int bufno)
+{
+ off_t seekpos;
+ MdfdVec *v;
+
+ v = _mdfd_getseg(reln, forknum, blocknum, false,
+ EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+ seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+ Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+ return FileStartRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, bufno);
+}
+
/*
* mdwrite() -- Write the supplied block at the appropriate location.
*
fullpath = _mdfd_segpath(reln, forknum, segno);
/* open the file */
- fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags);
+ fd = PathNameOpenFile(fullpath, MD_OPEN_FLAGS | oflags);
pfree(fullpath);
strlcpy(path, p, MAXPGPATH);
pfree(p);
- file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+ file = PathNameOpenFile(path, MD_OPEN_FLAGS);
if (file < 0)
return -1;
need_to_close = true;
BlockNumber blocknum);
void (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
+ struct PgAioInProgress* (*smgr_startread) (SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, char *buffer,
+ int bufno);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
.smgr_extend = mdextend,
.smgr_prefetch = mdprefetch,
.smgr_read = mdread,
+ .smgr_startread = mdstartread,
.smgr_write = mdwrite,
.smgr_writeback = mdwriteback,
.smgr_nblocks = mdnblocks,
smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer);
}
+struct PgAioInProgress*
+smgrstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ char *buffer, int bufno)
+{
+ return smgrsw[reln->smgr_which].smgr_startread(reln, forknum, blocknum, buffer, bufno);
+}
+
/*
* smgrwrite() -- Write the supplied buffer out.
*
#include "replication/slot.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/proc.h"
/* Initialize MaxBackends (if under postmaster, was done already) */
InitializeMaxBackends();
+
+ /* AIO is needed during InitPostgres() */
+ pgaio_postmaster_init();
}
/* Early initialization */
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
MyLatch = &LocalLatchData;
InitLatch(MyLatch);
+ pgaio_postmaster_child_init();
+
/*
* If possible, make this process a group leader, so that the postmaster
* can signal any child processes too. Not all processes will have
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * aio.h
+ * aio
+ *
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/aio.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef AIO_H
+#define AIO_H
+
+#include "common/relpath.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/relfilenode.h"
+
+typedef struct PgAioInProgress PgAioInProgress;
+
+/* initialization */
+extern void pgaio_postmaster_init(void);
+extern Size AioShmemSize(void);
+extern void AioShmemInit(void);
+extern void pgaio_postmaster_child_init(void);
+
+extern void pgaio_at_abort(void);
+
+
+extern PgAioInProgress *pgaio_start_flush_range(int fd, off_t offset, off_t nbytes);
+extern PgAioInProgress *pgaio_start_nop(void);
+extern PgAioInProgress *pgaio_start_fsync(int fd);
+
+struct BufferDesc;
+extern PgAioInProgress *pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes,
+ char *data, int buffno);
+
+extern void pgaio_submit_pending(void);
+
+extern void pgaio_drain_shared(void);
+extern void pgaio_drain_outstanding(void);
+
+#endif /* AIO_H */
int wait_backend_pid; /* backend PID of pin-count waiter */
int freeNext; /* link in freelist chain */
-
+ uint32 io_in_progress;
LWLock content_lock; /* to lock access to buffer contents */
} BufferDesc;
extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy);
+struct PgAioInProgress;
+extern struct PgAioInProgress* ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
+ ReadBufferMode mode, BufferAccessStrategy strategy,
+ Buffer *buf);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);
extern void FileClose(File file);
extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info);
extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
+extern struct PgAioInProgress *FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid);
extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
extern int FileSync(File file, uint32 wait_event_info);
extern off_t FileSize(File file);
BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
+extern struct PgAioInProgress *mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ char *buffer, int bufno);
extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
+struct PgAioInProgress;
+extern struct PgAioInProgress* smgrstartread(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum, char *buffer,
+ int bufno);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,