aio: Significantly improve prefetcher using the new callbacks.
authorAndres Freund <andres@anarazel.de>
Mon, 25 May 2020 23:35:36 +0000 (16:35 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 24 Jun 2020 23:20:59 +0000 (16:20 -0700)
Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:

src/backend/storage/buffer/bufmgr.c
src/backend/storage/ipc/aio_util.c

index 79bed6249153c0ffc5dcd757575d17bafcc8170c..34f7b13e40fd399e9022dddf1ac7f17858e5ea4d 100644 (file)
@@ -805,7 +805,6 @@ ReadBufferAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
    {
        release_io = false;
        aio = *aiop;
-       pgaio_io_recycle(aio);
    }
 
    /*
index 3d3ab36d60c5618ff07744639bd644d0fcb537fa..7c1bfa08c4c67f10901f77578a39017bb0d3b5be 100644 (file)
@@ -149,31 +149,66 @@ pg_streaming_write_free(pg_streaming_write *pgsw)
    pfree(pgsw);
 }
 
-
-typedef struct OutstandingRead
+typedef struct PgStreamingReadItem
 {
+   /* membership in PgStreamingRead->issued, PgStreamingRead->available */
+   dlist_node node;
+   /* membership in PgStreamingRead->in_order */
+   dlist_node sequence_node;
+
+   PgAioOnCompletionLocalContext on_completion;
+   PgStreamingRead *pgsr;
    PgAioInProgress *aio;
+
    bool in_progress;
-   bool done;
+   bool valid;
    uintptr_t read_private;
-} OutstandingRead;
+   uint32 sequence;
+
+} PgStreamingReadItem;
 
 struct PgStreamingRead
 {
-   uint32 iodepth;
+   uint32 iodepth_max;
+   uint32 distance_max;
+   uint32 all_items_count;
+
    uintptr_t pgsr_private;
    PgStreamingReadDetermineNextCB determine_next_cb;
    PgStreamingReadRelease release_cb;
 
    uint32 current_window;
 
-   uint32 end_at;
-   uint32 scan_at;
-   uint32 prefetch_at;
+   /* number of requests issued */
+   uint64 prefetched_total_count;
+   /* number of requests submitted to kernel */
+   uint64 submitted_total_count;
+   /* number of current requests completed */
+   uint32 completed_count;
+   /* number of current requests in flight */
+   int32 inflight_count;
+   int32 pending_count;
+
+   bool hit_end;
+
+   /* submitted reads */
+   dlist_head issued;
 
-   OutstandingRead outstanding_reads[];
+   /* available reads (unused or completed) */
+   dlist_head available;
+
+   /*
+    * IOs, be they completed or in progress, in the order that the callback
+    * returned them.
+    */
+   dlist_head in_order;
+
+   PgStreamingReadItem all_items[];
 };
 
+static void pg_streaming_read_complete(PgAioOnCompletionLocalContext *ocb, PgAioInProgress *io);
+static void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+
 PgStreamingRead *
 pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private,
                        PgStreamingReadDetermineNextCB determine_next_cb,
@@ -181,33 +216,51 @@ pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private,
 {
    PgStreamingRead *pgsr;
 
-   pgsr = palloc0(offsetof(PgStreamingRead, outstanding_reads) +
-                  sizeof(OutstandingRead) * iodepth);
+   iodepth = Max(Min(iodepth, NBuffers / 128), 1);
+
+   pgsr = palloc0(offsetof(PgStreamingRead, all_items) +
+                  sizeof(PgStreamingReadItem) * iodepth * 2);
 
-   pgsr->iodepth = iodepth;
+   pgsr->iodepth_max = iodepth;
+   pgsr->distance_max = iodepth;
+   pgsr->all_items_count = pgsr->iodepth_max + pgsr->distance_max;
    pgsr->pgsr_private = pgsr_private;
-   pgsr->end_at = -1;
    pgsr->determine_next_cb = determine_next_cb;
    pgsr->release_cb = release_cb;
 
+   pgsr->current_window = 0;
+
+   dlist_init(&pgsr->available);
+   dlist_init(&pgsr->in_order);
+   dlist_init(&pgsr->issued);
+
+   for (int i = 0; i < pgsr->all_items_count; i++)
+   {
+       PgStreamingReadItem *this_read = &pgsr->all_items[i];
+
+       this_read->on_completion.callback = pg_streaming_read_complete;
+       this_read->pgsr = pgsr;
+       dlist_push_tail(&pgsr->available, &this_read->node);
+   }
+
    return pgsr;
 }
 
 void
 pg_streaming_read_free(PgStreamingRead *pgsr)
 {
-   for (int i = 0; i < pgsr->iodepth; i++)
+   for (int i = 0; i < pgsr->all_items_count; i++)
    {
-       OutstandingRead *this_read = &pgsr->outstanding_reads[i];
+       PgStreamingReadItem *this_read = &pgsr->all_items[i];
 
        if (this_read->in_progress)
        {
+           Assert(this_read->valid);
            pgaio_io_wait(this_read->aio, true);
            this_read->in_progress = false;
-           this_read->done = true;
        }
 
-       if (this_read->done)
+       if (this_read->valid)
            pgsr->release_cb(pgsr->pgsr_private, this_read->read_private);
 
        if (this_read->aio)
@@ -216,17 +269,43 @@ pg_streaming_read_free(PgStreamingRead *pgsr)
            this_read->aio = NULL;
        }
    }
+
+   pfree(pgsr);
+}
+
+static void
+pg_streaming_read_complete(PgAioOnCompletionLocalContext *ocb, PgAioInProgress *io)
+{
+   PgStreamingReadItem *this_read = pgaio_ocb_container(PgStreamingReadItem, on_completion, ocb);
+   PgStreamingRead *pgsr = this_read->pgsr;
+
+#if 0
+   if ((pgsr->prefetched_total_count % 10000) == 0)
+       ereport(LOG, errmsg("pgsr read completed: qd %d completed: %d",
+                           pgsr->inflight_count, pgsr->completed_count),
+               errhidestmt(true),
+               errhidecontext(true));
+#endif
+
+   dlist_delete_from(&pgsr->issued, &this_read->node);
+   pgsr->inflight_count--;
+   pgsr->completed_count++;
+   this_read->in_progress = false;
+
+   pg_streaming_read_prefetch(pgsr);
 }
 
 static void
 pg_streaming_read_prefetch_one(PgStreamingRead *pgsr)
 {
-   uint32 off = (pgsr->prefetch_at) % pgsr->iodepth;
-   OutstandingRead *this_read = &pgsr->outstanding_reads[off];
+   PgStreamingReadItem *this_read;
    PgStreamingReadNextStatus status;
 
+   Assert(!dlist_is_empty(&pgsr->available));
+
+   this_read = dlist_container(PgStreamingReadItem, node, dlist_pop_head_node(&pgsr->available));
+   Assert(!this_read->valid);
    Assert(!this_read->in_progress);
-   Assert(!this_read->done);
 
    if (this_read->aio == NULL)
    {
@@ -237,32 +316,39 @@ pg_streaming_read_prefetch_one(PgStreamingRead *pgsr)
        pgaio_io_recycle(this_read->aio);
    }
 
-#if 0
-   ereport(DEBUG2,
-           (errmsg("fetching into slot %d", off),
-            errhidestmt(true),
-            errhidecontext(true)));
-#endif
+   pgaio_io_on_completion_local(this_read->aio, &this_read->on_completion);
+   this_read->in_progress = true;
+   this_read->valid = true;
+   dlist_push_tail(&pgsr->issued, &this_read->node);
+   dlist_push_tail(&pgsr->in_order, &this_read->sequence_node);
+   pgsr->pending_count++;
+   pgsr->inflight_count++;
+   pgsr->prefetched_total_count++;
 
    status = pgsr->determine_next_cb(pgsr->pgsr_private, this_read->aio, &this_read->read_private);
 
    if (status == PGSR_NEXT_END)
    {
-       pgsr->end_at = pgsr->prefetch_at;
+       pgsr->pending_count--;
+       pgsr->inflight_count--;
+       pgsr->hit_end = true;
+       this_read->in_progress = false;
+       this_read->valid = false;
+       dlist_delete_from(&pgsr->in_order, &this_read->sequence_node);
+       dlist_push_tail(&pgsr->available, &this_read->node);
    }
    else if (status == PGSR_NEXT_NO_IO)
    {
        Assert(this_read->read_private != 0);
-       this_read->done = true;
        this_read->in_progress = false;
-       pgsr->prefetch_at++;
+       pgsr->pending_count--;
+       pgsr->inflight_count--;
+       pgsr->completed_count++;
+       dlist_delete_from(&pgsr->issued, &this_read->node);
    }
    else
    {
        Assert(this_read->read_private != 0);
-       this_read->done = false;
-       this_read->in_progress = true;
-       pgsr->prefetch_at++;
    }
 }
 
@@ -270,90 +356,124 @@ static void
 pg_streaming_read_prefetch(PgStreamingRead *pgsr)
 {
    uint32 min_issue;
-   uint32 pending = 0;
 
-   if (pgsr->end_at != -1)
+   if (pgsr->hit_end)
        return;
 
-   if (pgsr->current_window < pgsr->iodepth)
+   Assert(pgsr->inflight_count <= pgsr->current_window);
+   Assert(pgsr->completed_count <= (pgsr->iodepth_max + pgsr->distance_max));
+
+   /*
+    * XXX: Some issues:
+    *
+    * - We should probably do the window calculation based on the number of
+    *   buffers the user actually requested, i.e. only recompute this
+    *   whenever pg_streaming_read_get_next() is called. Otherwise we will
+    *   always read the whole prefetch window.
+    *
+    * - The algorithm here is pretty stupid. Should take distance / iodepth
+    *   properly into account in a distitcn way. Grow slower.
+    *
+    * - It'd be good to have a usage dependent iodepth management. After an
+    *   initial increase, we should only increase further if the the user the
+    *   window proves too small (i.e. we don't manage to keep 'completed'
+    *   close to full).
+    *
+    * - If most requests don't trigger IO, we should probably reduce the
+    *   prefetch window.
+    */
+   if (pgsr->current_window < pgsr->iodepth_max)
    {
        if (pgsr->current_window == 0)
            pgsr->current_window = 4;
        else
            pgsr->current_window *= 2;
 
-       if (pgsr->current_window > pgsr->iodepth)
-           pgsr->current_window = pgsr->iodepth;
+       if (pgsr->current_window > pgsr->iodepth_max)
+           pgsr->current_window = pgsr->iodepth_max;
 
        min_issue = 1;
    }
    else
    {
-       min_issue = Min(pgsr->iodepth, 8);
+       min_issue = Min(pgsr->iodepth_max, pgsr->current_window / 4);
    }
 
-   if (pgsr->scan_at + pgsr->current_window < pgsr->prefetch_at + min_issue)
+   Assert(pgsr->inflight_count <= pgsr->current_window);
+   Assert(pgsr->completed_count <= (pgsr->iodepth_max + pgsr->distance_max));
+
+   if (pgsr->completed_count >= pgsr->current_window)
        return;
 
-#if 0
-   ereport(DEBUG2,
-           errmsg("checking prefetch at scan_at: %d, prefetch_at: %d, window: %d, fetching %d",
-                  pgsr->scan_at, pgsr->prefetch_at, pgsr->current_window,
-                  (pgsr->scan_at + pgsr->current_window) - pgsr->prefetch_at),
-           errhidestmt(true),
-           errhidecontext(true));
-#endif
+   if (pgsr->inflight_count >= pgsr->current_window)
+       return;
 
-   while(pgsr->prefetch_at < pgsr->scan_at + pgsr->current_window)
+   while (!pgsr->hit_end &&
+          (pgsr->inflight_count < pgsr->current_window) &&
+          (pgsr->completed_count < pgsr->current_window))
    {
        pg_streaming_read_prefetch_one(pgsr);
-       pending++;
 
-       if (pending >= 16)
+       if (pgsr->pending_count >= min_issue)
        {
+           pgsr->submitted_total_count += pgsr->pending_count;
+           pgsr->pending_count = 0;
            pgaio_submit_pending(true);
-           pending = 0;
        }
-
-       if (pgsr->end_at != -1)
-           return;
    }
 
-   if (pending > 0)
+   if (pgsr->pending_count >= min_issue)
+   {
+       pgsr->submitted_total_count += pgsr->pending_count;
+       pgsr->pending_count = 0;
        pgaio_submit_pending(true);
+   }
 }
 
 uintptr_t
 pg_streaming_read_get_next(PgStreamingRead *pgsr)
 {
-   uint32 off = (pgsr->scan_at) % pgsr->iodepth;
-   OutstandingRead *this_read = &pgsr->outstanding_reads[off];
+   if (pgsr->prefetched_total_count == 0)
+   {
+       pg_streaming_read_prefetch(pgsr);
+       Assert(pgsr->hit_end || pgsr->prefetched_total_count > 0);
+   }
 
-   if (pgsr->scan_at == pgsr->end_at)
+   if (dlist_is_empty(&pgsr->in_order))
+   {
+       Assert(pgsr->hit_end);
        return 0;
+   }
+   else
+   {
+       PgStreamingReadItem *this_read;
+       uint64_t ret;
 
-   //if (pgsr->prefetch_at == 0)
-   pg_streaming_read_prefetch(pgsr);
+       Assert(pgsr->prefetched_total_count > 0);
 
-   if (pgsr->scan_at == pgsr->end_at)
-       return 0;
+       this_read = dlist_container(PgStreamingReadItem, sequence_node, dlist_pop_head_node(&pgsr->in_order));
+       Assert(this_read->valid);
 
-   Assert(pgsr->scan_at < pgsr->prefetch_at);
-   Assert(this_read->aio);
-   Assert(this_read->done || this_read->in_progress);
+       if (this_read->in_progress)
+       {
+           pgaio_io_wait(this_read->aio, true);
+           /* callback should have updated */
+           Assert(!this_read->in_progress);
 
-   if (!this_read->done)
-   {
-       pgaio_io_wait(this_read->aio, true);
-       Assert(pgaio_io_success(this_read->aio));
-       this_read->in_progress = false;
-       this_read->done = true;
-   }
+           ret = this_read->read_private;
 
-   pgsr->scan_at++;
-   this_read->done = false;
+           pgaio_io_recycle(this_read->aio);
+       }
+       else
+       {
+           ret = this_read->read_private;
+       }
 
-   //pg_streaming_read_prefetch(pgsr);
+       this_read->valid = false;
+       pgsr->completed_count--;
+       dlist_push_tail(&pgsr->available, &this_read->node);
+       pg_streaming_read_prefetch(pgsr);
 
-   return this_read->read_private;
+       return ret;
+   }
 }