From 1ee3304026e31a9264e7387702c9b7210e534f77 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 25 May 2020 03:02:56 -0700 Subject: [PATCH] Bounce buffer management for checksum'ed pages. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/buffer/bufmgr.c | 10 ++- src/backend/storage/ipc/aio.c | 125 +++++++++++++++++++++++++++- src/backend/storage/page/bufpage.c | 29 ++++--- src/include/storage/aio.h | 5 ++ src/include/storage/bufpage.h | 3 +- 5 files changed, 155 insertions(+), 17 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 17506b7df2..898ad1cafd 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3088,6 +3088,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) Block bufBlock; char *bufToWrite; uint32 buf_state; + PgAioBounceBuffer *bb; /* * Try to start an I/O operation. If StartBufferIO returns false, then @@ -3159,7 +3160,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * buffer, other processes might be updating hint bits in it, so we must * copy the page to private storage if we do checksumming. */ - bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum); + bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum, &bb); if (0) { @@ -3207,6 +3208,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) bufToWrite, BufferDescriptorGetBuffer(buf), false); + if (bb) + pgaio_assoc_bounce_buffer(aio, bb); pgaio_wait_for_io(aio); pgaio_release(aio); } @@ -3229,6 +3232,7 @@ AsyncFlushBuffer(BufferDesc *buf, SMgrRelation reln) char *bufToWrite; uint32 buf_state; PgAioInProgress *aio; + PgAioBounceBuffer *bb; /* * Try to start an I/O operation. If StartBufferIO returns false, then @@ -3263,7 +3267,7 @@ AsyncFlushBuffer(BufferDesc *buf, SMgrRelation reln) bufBlock = BufHdrGetBlock(buf); - bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum); + bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum, &bb); /* FIXME: improve */ InProgressBuf = NULL; @@ -3276,6 +3280,8 @@ AsyncFlushBuffer(BufferDesc *buf, SMgrRelation reln) bufToWrite, BufferDescriptorGetBuffer(buf), false); + if (bb) + pgaio_assoc_bounce_buffer(aio, bb); return aio; } diff --git a/src/backend/storage/ipc/aio.c b/src/backend/storage/ipc/aio.c index cf0804be58..6e52499e59 100644 --- a/src/backend/storage/ipc/aio.c +++ b/src/backend/storage/ipc/aio.c @@ -90,6 +90,8 @@ struct PgAioInProgress uint32 refcount; + PgAioBounceBuffer *bb; + /* * NB: Note that fds in here may *not* be relied upon for re-issuing * requests (e.g. for partial reads/writes) - the fd might be from another @@ -141,6 +143,15 @@ struct PgAioInProgress } d; }; +/* typedef in header */ +struct PgAioBounceBuffer +{ + union + { + char buffer[BLCKSZ]; + dlist_node node; + } d; +}; typedef struct PgAioCtl { @@ -174,9 +185,12 @@ typedef struct PgAioCtl */ pg_atomic_uint32 outstanding; + dlist_head bounce_buffers; + PgAioInProgress in_progress_io[FLEXIBLE_ARRAY_MEMBER]; } PgAioCtl; +struct io_uring local_ring; /* general pgaio helper functions */ static void pgaio_complete_ios(bool in_error); @@ -184,6 +198,8 @@ static void pgaio_backpressure(struct io_uring *ring, const char *loc); static PgAioInProgress* pgaio_start_get_io(PgAioAction action); static void pgaio_end_get_io(void); +static void pgaio_bounce_buffer_release_locked(PgAioInProgress *io); + /* io_uring related functions */ static void pgaio_put_io_locked(PgAioInProgress *io); static void pgaio_sq_from_io(PgAioInProgress *io, struct io_uring_sqe *sqe); @@ -243,12 +259,22 @@ static struct io_uring_cqe *local_reaped_cqes[PGAIO_MAX_LOCAL_REAPED]; static PgAioInProgress *local_reaped_ios[PGAIO_MAX_LOCAL_REAPED]; +static Size AioCtlShmemSize(void) +{ + return add_size(mul_size(max_aio_in_progress, sizeof(PgAioInProgress)), + offsetof(PgAioCtl, in_progress_io)); +} + +static Size AioBounceShmemSize(void) +{ + return add_size(BLCKSZ /* alignment padding */, + mul_size(BLCKSZ, max_aio_in_progress)); +} Size AioShmemSize(void) { - return add_size(mul_size(max_aio_in_progress, sizeof(PgAioInProgress)), - offsetof(PgAioCtl, in_progress_io)); + return add_size(AioCtlShmemSize(), AioBounceShmemSize()); } void @@ -257,11 +283,11 @@ AioShmemInit(void) bool found; aio_ctl = (PgAioCtl *) - ShmemInitStruct("PgAio", AioShmemSize(), &found); + ShmemInitStruct("PgAio", AioCtlShmemSize(), &found); if (!found) { - memset(aio_ctl, 0, AioShmemSize()); + memset(aio_ctl, 0, AioCtlShmemSize()); dlist_init(&aio_ctl->unused_ios); pg_atomic_init_u32(&aio_ctl->inflight, 0); @@ -277,6 +303,24 @@ AioShmemInit(void) io->flags = PGAIOIP_IDLE; } + { + char *p; + PgAioBounceBuffer *buffers; + + dlist_init(&aio_ctl->bounce_buffers); + p = ShmemInitStruct("PgAioBounceBuffers", AioBounceShmemSize(), &found); + Assert(!found); + buffers = (PgAioBounceBuffer *) TYPEALIGN(BLCKSZ, (uintptr_t) p); + + for (int i = 0; i < max_aio_in_progress; i++) + { + PgAioBounceBuffer *bb = &buffers[i]; + + memset(bb, 0, BLCKSZ); + dlist_push_tail(&aio_ctl->bounce_buffers, &bb->d.node); + } + } + { int ret; @@ -833,6 +877,9 @@ pgaio_put_io_locked(PgAioInProgress *io) io->type = 0; io->initiatorProcIndex = INVALID_PGPROCNO; + /* could do this earlier or conditionally */ + pgaio_bounce_buffer_release_locked(io); + //pg_atomic_fetch_sub_u32(&aio_ctl->outstanding, 1); pg_atomic_write_u32(&aio_ctl->outstanding, pg_atomic_read_u32(&aio_ctl->outstanding) - 1); dlist_push_tail(&aio_ctl->unused_ios, @@ -860,6 +907,76 @@ pgaio_print_queues(void) ); } +PgAioBounceBuffer * +pgaio_bounce_buffer_get(void) +{ + PgAioBounceBuffer *bb = NULL; + + while (true) + { + LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE); + if (!dlist_is_empty(&aio_ctl->bounce_buffers)) + { + dlist_node *node = dlist_pop_head_node(&aio_ctl->bounce_buffers); + + bb = dlist_container(PgAioBounceBuffer, d.node, node); + } + LWLockRelease(SharedAIOCtlLock); + + if (!bb) + pgaio_drain_outstanding(); + else + break; + } + + return bb; +} + +static void +pgaio_bounce_buffer_release_locked(PgAioInProgress *io) +{ + Assert(LWLockHeldByMe(SharedAIOCtlLock)); + + if (!io->bb) + return; + + dlist_push_tail(&aio_ctl->bounce_buffers, &io->bb->d.node); + io->bb = NULL; +} + +char * +pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb) +{ + return bb->d.buffer; +} + +void +pgaio_assoc_bounce_buffer(PgAioInProgress *io, PgAioBounceBuffer *bb) +{ + Assert(io->bb == NULL); + + /* + * FIXME: temporary hack until io acquisition is moved to caller. Instead + * this should insist that bounce buffers are associated with IOs before + * there's a chance they get submitted. + */ + LWLockAcquire(SharedAIOCompletionLock, LW_SHARED); + + if (io->flags & (PGAIOIP_IDLE | PGAIOIP_DONE)) + { + LWLockRelease(SharedAIOCompletionLock); + + LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE); + dlist_push_tail(&aio_ctl->bounce_buffers, &bb->d.node); + LWLockRelease(SharedAIOCtlLock); + } + else + { + io->bb = bb; + LWLockRelease(SharedAIOCompletionLock); + } +} + /* -------------------------------------------------------------------------------- * io_uring related code * -------------------------------------------------------------------------------- diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index d31ef3fe5c..c0d3112c04 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -18,6 +18,7 @@ #include "access/itup.h" #include "access/xlog.h" #include "pgstat.h" +#include "storage/aio.h" #include "storage/checksum.h" #include "utils/memdebug.h" #include "utils/memutils.h" @@ -1162,22 +1163,30 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum, * returned page and not refer to it again. */ char * -PageSetChecksumCopy(Page page, BlockNumber blkno) +PageSetChecksumCopy(Page page, BlockNumber blkno, PgAioBounceBuffer **bb) { - static char *pageCopy = NULL; + static char *pageCopySync = NULL; + char *pageCopy; + + if (bb) + *bb = NULL; /* If we don't need a checksum, just return the passed-in data */ if (PageIsNew(page) || !DataChecksumsEnabled()) return (char *) page; - /* - * We allocate the copy space once and use it over on each subsequent - * call. The point of palloc'ing here, rather than having a static char - * array, is first to ensure adequate alignment for the checksumming code - * and second to avoid wasting space in processes that never call this. - */ - if (pageCopy == NULL) - pageCopy = MemoryContextAllocIOAligned(TopMemoryContext, BLCKSZ, 0); + if (bb) + { + *bb = pgaio_bounce_buffer_get(); + pageCopy = pgaio_bounce_buffer_buffer(*bb); + } + else if (!pageCopySync) + { + pageCopySync = MemoryContextAllocIOAligned(TopMemoryContext, BLCKSZ, 0); + pageCopy = pageCopySync; + } + else + pageCopy = pageCopySync; memcpy(pageCopy, (char *) page, BLCKSZ); ((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno); diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 7d5cb83440..5dc27c6783 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -20,6 +20,7 @@ #include "storage/relfilenode.h" typedef struct PgAioInProgress PgAioInProgress; +typedef struct PgAioBounceBuffer PgAioBounceBuffer; /* initialization */ extern void pgaio_postmaster_init(void); @@ -68,4 +69,8 @@ extern void pgaio_wait_for_io(PgAioInProgress *io); extern void pgaio_print_queues(void); +extern void pgaio_assoc_bounce_buffer(PgAioInProgress *io, PgAioBounceBuffer *bb); + +extern PgAioBounceBuffer *pgaio_bounce_buffer_get(void); +extern char *pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb); #endif /* AIO_H */ diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 3f88683a05..d65a7a751e 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -446,7 +446,8 @@ extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems); extern void PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offset); extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); -extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); +struct PgAioBounceBuffer; +extern char *PageSetChecksumCopy(Page page, BlockNumber blkno, struct PgAioBounceBuffer **bb); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); #endif /* BUFPAGE_H */ -- 2.39.5