From 0c300a5dccfcfb294703560ac5ba11e2a42b6993 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 25 May 2020 03:02:55 -0700 Subject: [PATCH] tmp Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/access/table/tableam.c | 2 +- src/backend/access/transam/xlog.c | 11 +- src/backend/storage/buffer/bufmgr.c | 238 ++++++++++++++++++++-------- src/backend/storage/file/fd.c | 28 +++- src/backend/storage/ipc/aio.c | 188 +++++++++++++--------- src/backend/storage/ipc/ipc.c | 1 - src/backend/storage/smgr/md.c | 68 +++++++- src/backend/storage/smgr/smgr.c | 9 +- src/include/storage/aio.h | 4 +- src/include/storage/buf_internals.h | 3 + src/include/storage/fd.h | 3 +- src/include/storage/md.h | 4 +- src/include/storage/smgr.h | 5 +- 13 files changed, 403 insertions(+), 161 deletions(-) diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index f6700479a2..60383218ee 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -501,7 +501,7 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca } else { - pgaio_drain_shared(); + //pgaio_drain_shared(); } #endif diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50f062e64f..6e83350955 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2539,7 +2539,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) do { errno = 0; -#if 0 +#if 1 pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE); written = pg_pwrite(openLogFile, from, nleft, startoffset); pgstat_report_wait_end(); @@ -2662,7 +2662,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * fsync more than one file. */ if ((sync_method != SYNC_METHOD_OPEN && - sync_method != SYNC_METHOD_OPEN_DSYNC) || true) + sync_method != SYNC_METHOD_OPEN_DSYNC)) { if (openLogFile >= 0 && !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, @@ -10340,11 +10340,12 @@ get_sync_bit(int method) * after its written. Also, walreceiver performs unaligned writes, which * don't work with O_DIRECT, so it is required for correctness too. */ -#if 0 +#if 1 if (!XLogIsNeeded() && !AmWalReceiverProcess()) - ; -#endif + o_direct_flag = PG_O_DIRECT; +#else o_direct_flag = PG_O_DIRECT; +#endif switch (method) { diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 1be0baf98c..263dc88869 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -44,6 +44,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "storage/aio.h" +#include "storage/buf.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -465,8 +466,12 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); -static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - uint32 set_flag_bits); +static void TerminateBufferIO(BufferDesc *buf, bool local, bool syncio, + bool clear_dirty, uint32 set_flag_bits); +static void TerminateSharedBufferIO(BufferDesc *buf, + bool sync_io, + bool clear_dirty, + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -709,14 +714,44 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum, mode, strategy, &hit); } +static PgAioInProgress * +ReadBufferInitRead(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, + Buffer buf, BufferDesc *bufHdr, int mode) +{ + PgAioInProgress* aio; + Block bufBlock; + + //bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + if (BufferIsLocal(buf)) + bufBlock = LocalBufHdrGetBlock(bufHdr); + else + bufBlock = BufHdrGetBlock(bufHdr); + + /* + * if we have gotten to this point, we have allocated a buffer for the + * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, + * if it's a shared buffer. + */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + + /* FIXME: improve */ + InProgressBuf = NULL; + + aio = smgrstartread(smgr, forkNum, blockNum, + bufBlock, buf, mode); + Assert(aio != NULL); + + return aio; +} + PgAioInProgress* ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, Buffer *buf) { BufferDesc *bufHdr; - bool hit; PgAioInProgress* aio; + bool hit; if (mode != RBM_NORMAL || strategy != NULL || blockNum == P_NEW) elog(ERROR, "unsupported"); @@ -731,15 +766,13 @@ ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum, if (hit) return NULL; - InProgressBuf = NULL; - - aio = smgrstartread(reln->rd_smgr, forkNum, blockNum, - BufHdrGetBlock(bufHdr), *buf); - Assert(aio != NULL); - /* - * Decrement local pin, but keep shared. Shared will be released upon - * completion. + * Decrement local pin, but keep shared pin. The latter will be released + * upon completion of the IO. Otherwise the buffer could be recycled while + * the IO is ongoing. + * + * FIXME: Make this optional? It's only useful for fire-and-forget style + * IO. */ { PrivateRefCountEntry *ref; @@ -753,6 +786,8 @@ ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ForgetPrivateRefCountEntry(ref); } + aio = ReadBufferInitRead(reln->rd_smgr, forkNum, blockNum, *buf, bufHdr, mode); + return aio; } @@ -884,19 +919,9 @@ ReadBuffer_extend(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, } - if (isLocalBuf) - { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID); - } + TerminateBufferIO(bufHdr, isLocalBuf, + /* syncio = */ true, /* clear_dirty = */ false, + BM_VALID); return BufferDescriptorGetBuffer(bufHdr); } @@ -916,7 +941,6 @@ ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, @@ -984,6 +1008,41 @@ ReadBuffer_start(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, return bufHdr; } + else if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) + { + /* + * The caller intends to overwrite the page and just wants us to + * allocate a buffer. Finish IO here, so sync/async don't have to + * duplicate the logic. + */ + + Block bufBlock; + + bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + + MemSet((char *) bufBlock, 0, BLCKSZ); + + /* + * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking + * the page as valid, to make sure that no other backend sees the zeroed + * page before the caller has had a chance to initialize it. + * + * Since no-one else can be looking at the page contents yet, there is no + * difference between an exclusive lock and a cleanup-strength lock. (Note + * that we cannot use LockBuffer() or LockBufferForCleanup() here, because + * they assert that the buffer is already valid.) + */ + if (!isLocalBuf) + { + LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); + } + + TerminateBufferIO(bufHdr, isLocalBuf, + /* syncio = */ true, /* clear_dirty = */ false, + BM_VALID); + + *hit = true; + } return bufHdr; } @@ -1024,30 +1083,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); - /* - * Read in the page, unless the caller intends to overwrite it and - * just wants us to allocate a buffer. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - { - MemSet((char *) bufBlock, 0, BLCKSZ); - - /* - * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking - * the page as valid, to make sure that no other backend sees the zeroed - * page before the caller has had a chance to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if (!isLocalBuf) - { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); - } - } - else { instr_time io_start, io_time; @@ -1086,19 +1121,9 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, } } - if (isLocalBuf) - { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID); - } + TerminateBufferIO(bufHdr, isLocalBuf, + /* syncio = */ true, /* clear_dirty = */ false, + BM_VALID); VacuumPageMiss++; if (VacuumCostActive) @@ -1115,6 +1140,59 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, return BufferDescriptorGetBuffer(bufHdr); } +void +ReadBufferCompleteRead(Buffer buffer, int mode, bool failed) +{ + BufferDesc *bufHdr; + bool islocal = BufferIsLocal(buffer); + + if (islocal) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + bufHdr = GetBufferDescriptor(buffer - 1); + + /* FIXME: implement track_io_timing */ + + if (!failed) + { + Block bufBlock; + BlockNumber blockNum = bufHdr->tag.blockNum; + + bufBlock = BufferGetBlock(buffer); + + /* check for garbage data */ + if (!PageIsVerified((Page) bufBlock, blockNum)) + { + RelFileNode rnode = bufHdr->tag.rnode; + BlockNumber forkNum = bufHdr->tag.forkNum; + + failed = true; + + if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + blockNum, + relpathperm(rnode, forkNum)))); + MemSet((char *) bufBlock, 0, BLCKSZ); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + blockNum, + relpathperm(rnode, forkNum)))); + } + } + } + + TerminateBufferIO(bufHdr, islocal, + /* syncio = */ false, /* clear_dirty = */ false, + failed ? BM_IO_ERROR : BM_VALID); +} + /* * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared * buffer. If no buffer exists already, selects a replacement @@ -2939,7 +3017,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and * end the BM_IO_IN_PROGRESS state. */ - TerminateBufferIO(buf, true, 0); + TerminateSharedBufferIO(buf, /* syncio = */ true, /* clear_dirty = */ true, 0); TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum, buf->tag.blockNum, @@ -4246,8 +4324,26 @@ StartBufferIO(BufferDesc *buf, bool forInput) return true; } +static void +TerminateBufferIO(BufferDesc *bufHdr, bool local, bool syncio, + bool clear_dirty, uint32 set_flag_bits) +{ + if (local) + { + /* Only need to adjust flags */ + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= set_flag_bits; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + TerminateSharedBufferIO(bufHdr, syncio, clear_dirty, set_flag_bits); + } +} + /* - * TerminateBufferIO: release a buffer we were doing I/O on + * TerminateSharedBufferIO: release a buffer we were doing I/O on * (Assumptions) * My process is executing IO for the buffer * BM_IO_IN_PROGRESS bit is set for the buffer @@ -4263,11 +4359,12 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) +TerminateSharedBufferIO(BufferDesc *buf, bool syncio, bool clear_dirty, uint32 set_flag_bits) { uint32 buf_state; - Assert(buf == InProgressBuf); + if (syncio) + Assert(buf == InProgressBuf); buf_state = LockBufHdr(buf); @@ -4278,9 +4375,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); buf_state |= set_flag_bits; + + if (!syncio) + buf_state -= BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); - InProgressBuf = NULL; + if (syncio) + InProgressBuf = NULL; ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf)); } @@ -4334,7 +4436,7 @@ AbortBufferIO(void) pfree(path); } } - TerminateBufferIO(buf, false, BM_IO_ERROR); + TerminateSharedBufferIO(buf, /* syncio = */ true, /* clear_dirty = */ false, BM_IO_ERROR); } } diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index cea6de9c1a..6048c0ccd0 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2042,14 +2042,14 @@ retry: #include "storage/aio.h" struct PgAioInProgress * -FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid) +FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid, int mode) { int returnCode; Vfd *vfdP; Assert(FileIsValid(file)); - DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %d %p", + DO_DB(elog(LOG, "FileStartRead: %d (%s) " INT64_FORMAT " %d %p", file, VfdCache[file].fileName, (int64) offset, amount, buffer)); @@ -2060,7 +2060,7 @@ FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid) vfdP = &VfdCache[file]; - return pgaio_start_buffer_read(vfdP->fd, offset, amount, buffer, bufid); + return pgaio_start_read_buffer(vfdP->fd, offset, amount, buffer, bufid, mode); } int @@ -2161,6 +2161,28 @@ retry: return returnCode; } +struct PgAioInProgress * +FileStartWrite(File file, char *buffer, int amount, off_t offset, int bufid) +{ + int returnCode; + Vfd *vfdP; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartWrite: %d (%s) " INT64_FORMAT " %d %p", + file, VfdCache[file].fileName, + (int64) offset, + amount, buffer)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return NULL; + + vfdP = &VfdCache[file]; + + return pgaio_start_write_buffer(vfdP->fd, offset, amount, buffer, bufid); +} + int FileSync(File file, uint32 wait_event_info) { diff --git a/src/backend/storage/ipc/aio.c b/src/backend/storage/ipc/aio.c index 3c34ec7304..b363df3d81 100644 --- a/src/backend/storage/ipc/aio.c +++ b/src/backend/storage/ipc/aio.c @@ -21,7 +21,7 @@ #define PGAIO_VERBOSE -#define PGAIO_SUBMIT_BATCH_SIZE 16 +#define PGAIO_SUBMIT_BATCH_SIZE 32 #define PGAIO_BACKPRESSURE_LIMIT 500 #define PGAIO_MAX_LOCAL_REAPED 32 @@ -50,7 +50,7 @@ typedef enum PgAioInProgressFlags } PgAioInProgressFlags; /* IO completion callback */ -typedef void (*PgAioCompletedCB)(PgAioInProgress *io); +typedef bool (*PgAioCompletedCB)(PgAioInProgress *io); struct PgAioInProgress { @@ -89,6 +89,7 @@ struct PgAioInProgress struct { Buffer buf; + int mode; int fd; int already_done; off_t offset; @@ -98,6 +99,9 @@ struct PgAioInProgress struct { Buffer buf; + int fd; + int already_done; + off_t offset; struct iovec iovec; } write_buffer; @@ -120,11 +124,11 @@ static void pgaio_end_get_io(void); /* io completions */ /* FIXME: parts of these probably don't belong here */ -static void pgaio_complete_nop(PgAioInProgress *io); -static void pgaio_complete_flush_range(PgAioInProgress *io); -static void pgaio_complete_read_buffer(PgAioInProgress *io); -static void pgaio_complete_write_buffer(PgAioInProgress *io); -static void pgaio_complete_write_wal(PgAioInProgress *io); +static bool pgaio_complete_nop(PgAioInProgress *io); +static bool pgaio_complete_flush_range(PgAioInProgress *io); +static bool pgaio_complete_read_buffer(PgAioInProgress *io); +static bool pgaio_complete_write_buffer(PgAioInProgress *io); +static bool pgaio_complete_write_wal(PgAioInProgress *io); /* io_uring related functions */ static void pgaio_put_io_locked(PgAioInProgress *io); @@ -320,7 +324,7 @@ pgaio_start_flush_range(int fd, off_t offset, off_t nbytes) PgAioInProgress * -pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, char* data, int buffno) +pgaio_start_read_buffer(int fd, off_t offset, off_t nbytes, char* data, int buffno, int mode) { PgAioInProgress *io; //struct io_uring_sqe *sqe; @@ -328,6 +332,7 @@ pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, char* data, int buff io = pgaio_start_get_io(PGAIO_READ_BUFFER); io->d.read_buffer.buf = buffno; + io->d.read_buffer.mode = mode; io->d.read_buffer.fd = fd; io->d.read_buffer.offset = offset; io->d.read_buffer.already_done = 0; @@ -350,7 +355,38 @@ pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, char* data, int buff return io; } -extern PgAioInProgress * +PgAioInProgress * +pgaio_start_write_buffer(int fd, off_t offset, off_t nbytes, char* data, int buffno) +{ + PgAioInProgress *io; + //struct io_uring_sqe *sqe; + + io = pgaio_start_get_io(PGAIO_READ_BUFFER); + + io->d.write_buffer.buf = buffno; + io->d.write_buffer.fd = fd; + io->d.write_buffer.offset = offset; + io->d.write_buffer.already_done = 0; + io->d.write_buffer.iovec.iov_base = data; + io->d.write_buffer.iovec.iov_len = nbytes; + + pgaio_end_get_io(); + +#ifdef PGAIO_VERBOSE + elog(DEBUG1, "start_buffer_write %zu:" + "fd %d, off: %llu, bytes: %llu, buff: %d, data %p", + io - aio_ctl->in_progress_io, + fd, + (unsigned long long) offset, + (unsigned long long) nbytes, + buffno, + data); +#endif + + return io; +} + +PgAioInProgress * pgaio_start_write_wal(int fd, off_t offset, off_t nbytes, char *data, bool no_reorder) { PgAioInProgress *io; @@ -407,11 +443,13 @@ pgaio_complete_ios(bool in_error) if (!(io->flags & PGAIOIP_DONE)) { PgAioCompletedCB cb; + bool done; cb = completion_callbacks[io->type]; - cb(io); + done = cb(io); - io->flags |= PGAIOIP_DONE; + if (done) + io->flags |= PGAIOIP_DONE; /* signal state change */ ConditionVariableBroadcast(&io->cv); @@ -426,7 +464,12 @@ pgaio_complete_ios(bool in_error) START_CRIT_SECTION(); LWLockAcquire(SharedAIOLock, LW_EXCLUSIVE); for (int i = 0; i < num_local_reaped; i++) - pgaio_put_io_locked(local_reaped_ios[i]); + { + PgAioInProgress *io = local_reaped_ios[i]; + + if (io->flags & PGAIOIP_DONE) + pgaio_put_io_locked(io); + } num_local_reaped = 0; LWLockRelease(SharedAIOLock); END_CRIT_SECTION(); @@ -744,15 +787,17 @@ pgaio_put_io_locked(PgAioInProgress *io) &io->node); } -static void +static bool pgaio_complete_nop(PgAioInProgress *io) { #ifdef PGAIO_VERBOSE elog(DEBUG1, "completed nop"); #endif + + return true; } -static void +static bool pgaio_complete_flush_range(PgAioInProgress *io) { #ifdef PGAIO_VERBOSE @@ -760,17 +805,15 @@ pgaio_complete_flush_range(PgAioInProgress *io) io - aio_ctl->in_progress_io, io->result < 0 ? strerror(-io->result) : "ok"); #endif + + return true; } -static void +static bool pgaio_complete_read_buffer(PgAioInProgress *io) { - BufferDesc *bufHdr = GetBufferDescriptor(io->d.read_buffer.buf - 1); - uint32 buf_state; bool failed = false; - RelFileNode rnode = bufHdr->tag.rnode; - BlockNumber forkNum = bufHdr->tag.forkNum; - BlockNumber blockNum = bufHdr->tag.blockNum; + Buffer buffer = io->d.read_buffer.buf; #ifdef PGAIO_VERBOSE elog(DEBUG1, "completed read_buffer: %zu, %d/%s, buf %d", @@ -780,79 +823,66 @@ pgaio_complete_read_buffer(PgAioInProgress *io) io->d.read_buffer.buf); #endif - if (io->result < 0) + if (io->result != BLCKSZ) { - if (io->result == EAGAIN || io->result == EINTR) - { - elog(WARNING, "need to implement retries"); - } - - failed = true; - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not read block %u in file \"%s\": %s", - blockNum, - relpathperm(rnode, forkNum), - strerror(-io->result)))); - } - else if (io->result != BLCKSZ) - { - failed = true; - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", - blockNum, - relpathperm(rnode, forkNum), - io->result, BLCKSZ))); - } + RelFileNode rnode; + ForkNumber forkNum; + BlockNumber blockNum; - /* FIXME: needs to be in bufmgr.c */ - { - Block bufBlock; - RelFileNode rnode = bufHdr->tag.rnode; - BlockNumber forkNum = bufHdr->tag.forkNum; - BlockNumber blockNum = bufHdr->tag.blockNum; + BufferGetTag(buffer, &rnode, &forkNum, &blockNum); - bufBlock = BufferGetBlock(io->d.read_buffer.buf); + if (io->result < 0) + { + failed = true; - /* check for garbage data */ - if (!PageIsVerified((Page) bufBlock, blockNum)) + if (io->result == EAGAIN || io->result == EINTR) + { + elog(DEBUG1, "need to implement retries"); + } + else + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read block %u in file \"%s\": %s", + blockNum, + relpathperm(rnode, forkNum), + strerror(-io->result)))); + } + } + else { + failed = true; - ereport(ERROR, + ereport(DEBUG1, (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", + errmsg("could not read block %u in file \"%s\": read only %d of %d bytes (init: %d, cur: %d)", blockNum, - relpathperm(rnode, forkNum)))); + relpathperm(rnode, forkNum), + io->result, BLCKSZ, + io->initiatorProcIndex, MyProc->pgprocno))); } } - buf_state = LockBufHdr(bufHdr); - - Assert(buf_state & BM_IO_IN_PROGRESS); - - buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (failed) - buf_state |= BM_IO_ERROR; - else - buf_state |= BM_VALID; - - buf_state -= BUF_REFCOUNT_ONE; + ReadBufferCompleteRead(io->d.read_buffer.buf, io->d.read_buffer.mode, failed); - UnlockBufHdr(bufHdr, buf_state); - - ConditionVariableBroadcast(BufferDescriptorGetIOCV(bufHdr)); + return true; } -static void +static bool pgaio_complete_write_buffer(PgAioInProgress *io) { #ifdef PGAIO_VERBOSE - elog(DEBUG1, "completed write_buffer"); + elog(DEBUG1, "completed write_buffer: %zu, %d/%s, buf %d", + io - aio_ctl->in_progress_io, + io->result, + io->result < 0 ? strerror(-io->result) : "ok", + io->d.write_buffer.buf); #endif + + return true; } -static void +static bool pgaio_complete_write_wal(PgAioInProgress *io) { #ifdef PGAIO_VERBOSE @@ -886,6 +916,8 @@ pgaio_complete_write_wal(PgAioInProgress *io) } /* FIXME: update xlog.c state */ + + return true; } @@ -941,6 +973,15 @@ pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe) 1, io->d.read_buffer.offset); break; + case PGAIO_WRITE_BUFFER: + io_uring_prep_writev(sqe, + io->d.write_wal.fd, + &io->d.write_wal.iovec, + 1, + io->d.write_wal.offset); + if (io->d.write_wal.no_reorder) + sqe->flags = IOSQE_IO_DRAIN; + break; case PGAIO_FLUSH_RANGE: io_uring_prep_rw(IORING_OP_SYNC_FILE_RANGE, sqe, @@ -959,7 +1000,6 @@ pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe) if (io->d.write_wal.no_reorder) sqe->flags = IOSQE_IO_DRAIN; break; - case PGAIO_WRITE_BUFFER: case PGAIO_NOP: elog(ERROR, "not yet"); break; diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c index e76f3506b4..bdbc2c3ac4 100644 --- a/src/backend/storage/ipc/ipc.c +++ b/src/backend/storage/ipc/ipc.c @@ -27,7 +27,6 @@ #ifdef PROFILE_PID_DIR #include "postmaster/autovacuum.h" #endif -#include "storage/aio.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "tcop/tcopprot.h" diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 8de7e8bd87..5e6dced0c0 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -670,7 +670,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, */ PgAioInProgress * mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - char *buffer, int bufno) + char *buffer, int bufno, int mode) { off_t seekpos; MdfdVec *v; @@ -684,7 +684,71 @@ mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - return FileStartRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, bufno); + return FileStartRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, bufno, mode); +} + +/* + * mdread() -- Read the specified block from a relation. + */ +PgAioInProgress * +mdstartwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, int bufno, bool skipFsync) +{ + off_t seekpos; + int nbytes; + MdfdVec *v; + + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum < mdnblocks(reln, forknum)); +#endif + + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + reln->smgr_rnode.backend); + + v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + return FileStartWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + +#if 0 + nbytes = ...; + + TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + reln->smgr_rnode.backend, + nbytes, + BLCKSZ); + + if (nbytes != BLCKSZ) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write block %u in file \"%s\": %m", + blocknum, FilePathName(v->mdfd_vfd)))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not write block %u in file \"%s\": wrote only %d of %d bytes", + blocknum, + FilePathName(v->mdfd_vfd), + nbytes, BLCKSZ), + errhint("Check free disk space."))); + } + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); +#endif } /* diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index e2d0ab311f..2f815dc1db 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -55,7 +55,10 @@ typedef struct f_smgr BlockNumber blocknum, char *buffer); struct PgAioInProgress* (*smgr_startread) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, - int bufno); + int bufno, int mode); + struct PgAioInProgress* (*smgr_startwrite) (SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, char *buffer, + int bufno); void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, @@ -500,9 +503,9 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, struct PgAioInProgress* smgrstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - char *buffer, int bufno) + char *buffer, int bufno, int mode) { - return smgrsw[reln->smgr_which].smgr_startread(reln, forknum, blocknum, buffer, bufno); + return smgrsw[reln->smgr_which].smgr_startread(reln, forknum, blocknum, buffer, bufno, mode); } /* diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 2838d7fd23..ba57e5f28c 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -51,7 +51,9 @@ extern PgAioInProgress *pgaio_start_nop(void); extern PgAioInProgress *pgaio_start_fsync(int fd); struct BufferDesc; -extern PgAioInProgress *pgaio_start_buffer_read(int fd, off_t offset, off_t nbytes, +extern PgAioInProgress *pgaio_start_read_buffer(int fd, off_t offset, off_t nbytes, + char *data, int buffno, int mode); +extern PgAioInProgress *pgaio_start_write_buffer(int fd, off_t offset, off_t nbytes, char *data, int buffno); extern PgAioInProgress *pgaio_start_write_wal(int fd, off_t offset, off_t nbytes, char *data, bool no_reorder); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 0bf11cb9e5..2a10b67466 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -305,6 +305,9 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *context); extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); +extern void ReadBufferCompleteRead(Buffer buffer, int mode, bool failed); + + /* freelist.c */ extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state); diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 65ac1e5a4d..b11a147dbb 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -82,8 +82,9 @@ extern File OpenTemporaryFile(bool interXact); extern void FileClose(File file); extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info); extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); -extern struct PgAioInProgress *FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid); +extern struct PgAioInProgress *FileStartRead(File file, char *buffer, int amount, off_t offset, int bufid, int mode); extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); +extern struct PgAioInProgress *FileStartWrite(File file, char *buffer, int amount, off_t offset, int bufid); extern int FileSync(File file, uint32 wait_event_info); extern off_t FileSize(File file); extern int FileTruncate(File file, off_t offset, uint32 wait_event_info); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 2ed124a62b..3508078de9 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -33,7 +33,9 @@ extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum, extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); extern struct PgAioInProgress *mdstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - char *buffer, int bufno); + char *buffer, int bufno, int mode); +extern struct PgAioInProgress *mdstartwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, int bufno, bool skipFsync); extern void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index f2d94a91e4..5f86d6b685 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -99,7 +99,10 @@ extern void smgrread(SMgrRelation reln, ForkNumber forknum, struct PgAioInProgress; extern struct PgAioInProgress* smgrstartread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, - int bufno); + int bufno, int mode); +extern struct PgAioInProgress* smgrstartwrite(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, char *buffer, + int bufno, bool skipFsync); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, -- 2.39.5