pfree(pgsw);
}
-
-typedef struct OutstandingRead
+typedef struct PgStreamingReadItem
{
+ /* membership in PgStreamingRead->issued, PgStreamingRead->available */
+ dlist_node node;
+ /* membership in PgStreamingRead->in_order */
+ dlist_node sequence_node;
+
+ PgAioOnCompletionLocalContext on_completion;
+ PgStreamingRead *pgsr;
PgAioInProgress *aio;
+
bool in_progress;
- bool done;
+ bool valid;
uintptr_t read_private;
-} OutstandingRead;
+ uint32 sequence;
+
+} PgStreamingReadItem;
struct PgStreamingRead
{
- uint32 iodepth;
+ uint32 iodepth_max;
+ uint32 distance_max;
+ uint32 all_items_count;
+
uintptr_t pgsr_private;
PgStreamingReadDetermineNextCB determine_next_cb;
PgStreamingReadRelease release_cb;
uint32 current_window;
- uint32 end_at;
- uint32 scan_at;
- uint32 prefetch_at;
+ /* number of requests issued */
+ uint64 prefetched_total_count;
+ /* number of requests submitted to kernel */
+ uint64 submitted_total_count;
+ /* number of current requests completed */
+ uint32 completed_count;
+ /* number of current requests in flight */
+ int32 inflight_count;
+ int32 pending_count;
+
+ bool hit_end;
+
+ /* submitted reads */
+ dlist_head issued;
- OutstandingRead outstanding_reads[];
+ /* available reads (unused or completed) */
+ dlist_head available;
+
+ /*
+ * IOs, be they completed or in progress, in the order that the callback
+ * returned them.
+ */
+ dlist_head in_order;
+
+ PgStreamingReadItem all_items[];
};
+static void pg_streaming_read_complete(PgAioOnCompletionLocalContext *ocb, PgAioInProgress *io);
+static void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+
PgStreamingRead *
pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private,
PgStreamingReadDetermineNextCB determine_next_cb,
{
PgStreamingRead *pgsr;
- pgsr = palloc0(offsetof(PgStreamingRead, outstanding_reads) +
- sizeof(OutstandingRead) * iodepth);
+ iodepth = Max(Min(iodepth, NBuffers / 128), 1);
+
+ pgsr = palloc0(offsetof(PgStreamingRead, all_items) +
+ sizeof(PgStreamingReadItem) * iodepth * 2);
- pgsr->iodepth = iodepth;
+ pgsr->iodepth_max = iodepth;
+ pgsr->distance_max = iodepth;
+ pgsr->all_items_count = pgsr->iodepth_max + pgsr->distance_max;
pgsr->pgsr_private = pgsr_private;
- pgsr->end_at = -1;
pgsr->determine_next_cb = determine_next_cb;
pgsr->release_cb = release_cb;
+ pgsr->current_window = 0;
+
+ dlist_init(&pgsr->available);
+ dlist_init(&pgsr->in_order);
+ dlist_init(&pgsr->issued);
+
+ for (int i = 0; i < pgsr->all_items_count; i++)
+ {
+ PgStreamingReadItem *this_read = &pgsr->all_items[i];
+
+ this_read->on_completion.callback = pg_streaming_read_complete;
+ this_read->pgsr = pgsr;
+ dlist_push_tail(&pgsr->available, &this_read->node);
+ }
+
return pgsr;
}
void
pg_streaming_read_free(PgStreamingRead *pgsr)
{
- for (int i = 0; i < pgsr->iodepth; i++)
+ for (int i = 0; i < pgsr->all_items_count; i++)
{
- OutstandingRead *this_read = &pgsr->outstanding_reads[i];
+ PgStreamingReadItem *this_read = &pgsr->all_items[i];
if (this_read->in_progress)
{
+ Assert(this_read->valid);
pgaio_io_wait(this_read->aio, true);
this_read->in_progress = false;
- this_read->done = true;
}
- if (this_read->done)
+ if (this_read->valid)
pgsr->release_cb(pgsr->pgsr_private, this_read->read_private);
if (this_read->aio)
this_read->aio = NULL;
}
}
+
+ pfree(pgsr);
+}
+
+static void
+pg_streaming_read_complete(PgAioOnCompletionLocalContext *ocb, PgAioInProgress *io)
+{
+ PgStreamingReadItem *this_read = pgaio_ocb_container(PgStreamingReadItem, on_completion, ocb);
+ PgStreamingRead *pgsr = this_read->pgsr;
+
+#if 0
+ if ((pgsr->prefetched_total_count % 10000) == 0)
+ ereport(LOG, errmsg("pgsr read completed: qd %d completed: %d",
+ pgsr->inflight_count, pgsr->completed_count),
+ errhidestmt(true),
+ errhidecontext(true));
+#endif
+
+ dlist_delete_from(&pgsr->issued, &this_read->node);
+ pgsr->inflight_count--;
+ pgsr->completed_count++;
+ this_read->in_progress = false;
+
+ pg_streaming_read_prefetch(pgsr);
}
static void
pg_streaming_read_prefetch_one(PgStreamingRead *pgsr)
{
- uint32 off = (pgsr->prefetch_at) % pgsr->iodepth;
- OutstandingRead *this_read = &pgsr->outstanding_reads[off];
+ PgStreamingReadItem *this_read;
PgStreamingReadNextStatus status;
+ Assert(!dlist_is_empty(&pgsr->available));
+
+ this_read = dlist_container(PgStreamingReadItem, node, dlist_pop_head_node(&pgsr->available));
+ Assert(!this_read->valid);
Assert(!this_read->in_progress);
- Assert(!this_read->done);
if (this_read->aio == NULL)
{
pgaio_io_recycle(this_read->aio);
}
-#if 0
- ereport(DEBUG2,
- (errmsg("fetching into slot %d", off),
- errhidestmt(true),
- errhidecontext(true)));
-#endif
+ pgaio_io_on_completion_local(this_read->aio, &this_read->on_completion);
+ this_read->in_progress = true;
+ this_read->valid = true;
+ dlist_push_tail(&pgsr->issued, &this_read->node);
+ dlist_push_tail(&pgsr->in_order, &this_read->sequence_node);
+ pgsr->pending_count++;
+ pgsr->inflight_count++;
+ pgsr->prefetched_total_count++;
status = pgsr->determine_next_cb(pgsr->pgsr_private, this_read->aio, &this_read->read_private);
if (status == PGSR_NEXT_END)
{
- pgsr->end_at = pgsr->prefetch_at;
+ pgsr->pending_count--;
+ pgsr->inflight_count--;
+ pgsr->hit_end = true;
+ this_read->in_progress = false;
+ this_read->valid = false;
+ dlist_delete_from(&pgsr->in_order, &this_read->sequence_node);
+ dlist_push_tail(&pgsr->available, &this_read->node);
}
else if (status == PGSR_NEXT_NO_IO)
{
Assert(this_read->read_private != 0);
- this_read->done = true;
this_read->in_progress = false;
- pgsr->prefetch_at++;
+ pgsr->pending_count--;
+ pgsr->inflight_count--;
+ pgsr->completed_count++;
+ dlist_delete_from(&pgsr->issued, &this_read->node);
}
else
{
Assert(this_read->read_private != 0);
- this_read->done = false;
- this_read->in_progress = true;
- pgsr->prefetch_at++;
}
}
pg_streaming_read_prefetch(PgStreamingRead *pgsr)
{
uint32 min_issue;
- uint32 pending = 0;
- if (pgsr->end_at != -1)
+ if (pgsr->hit_end)
return;
- if (pgsr->current_window < pgsr->iodepth)
+ Assert(pgsr->inflight_count <= pgsr->current_window);
+ Assert(pgsr->completed_count <= (pgsr->iodepth_max + pgsr->distance_max));
+
+ /*
+ * XXX: Some issues:
+ *
+ * - We should probably do the window calculation based on the number of
+ * buffers the user actually requested, i.e. only recompute this
+ * whenever pg_streaming_read_get_next() is called. Otherwise we will
+ * always read the whole prefetch window.
+ *
+ * - The algorithm here is pretty stupid. Should take distance / iodepth
+ * properly into account in a distitcn way. Grow slower.
+ *
+ * - It'd be good to have a usage dependent iodepth management. After an
+ * initial increase, we should only increase further if the the user the
+ * window proves too small (i.e. we don't manage to keep 'completed'
+ * close to full).
+ *
+ * - If most requests don't trigger IO, we should probably reduce the
+ * prefetch window.
+ */
+ if (pgsr->current_window < pgsr->iodepth_max)
{
if (pgsr->current_window == 0)
pgsr->current_window = 4;
else
pgsr->current_window *= 2;
- if (pgsr->current_window > pgsr->iodepth)
- pgsr->current_window = pgsr->iodepth;
+ if (pgsr->current_window > pgsr->iodepth_max)
+ pgsr->current_window = pgsr->iodepth_max;
min_issue = 1;
}
else
{
- min_issue = Min(pgsr->iodepth, 8);
+ min_issue = Min(pgsr->iodepth_max, pgsr->current_window / 4);
}
- if (pgsr->scan_at + pgsr->current_window < pgsr->prefetch_at + min_issue)
+ Assert(pgsr->inflight_count <= pgsr->current_window);
+ Assert(pgsr->completed_count <= (pgsr->iodepth_max + pgsr->distance_max));
+
+ if (pgsr->completed_count >= pgsr->current_window)
return;
-#if 0
- ereport(DEBUG2,
- errmsg("checking prefetch at scan_at: %d, prefetch_at: %d, window: %d, fetching %d",
- pgsr->scan_at, pgsr->prefetch_at, pgsr->current_window,
- (pgsr->scan_at + pgsr->current_window) - pgsr->prefetch_at),
- errhidestmt(true),
- errhidecontext(true));
-#endif
+ if (pgsr->inflight_count >= pgsr->current_window)
+ return;
- while(pgsr->prefetch_at < pgsr->scan_at + pgsr->current_window)
+ while (!pgsr->hit_end &&
+ (pgsr->inflight_count < pgsr->current_window) &&
+ (pgsr->completed_count < pgsr->current_window))
{
pg_streaming_read_prefetch_one(pgsr);
- pending++;
- if (pending >= 16)
+ if (pgsr->pending_count >= min_issue)
{
+ pgsr->submitted_total_count += pgsr->pending_count;
+ pgsr->pending_count = 0;
pgaio_submit_pending(true);
- pending = 0;
}
-
- if (pgsr->end_at != -1)
- return;
}
- if (pending > 0)
+ if (pgsr->pending_count >= min_issue)
+ {
+ pgsr->submitted_total_count += pgsr->pending_count;
+ pgsr->pending_count = 0;
pgaio_submit_pending(true);
+ }
}
uintptr_t
pg_streaming_read_get_next(PgStreamingRead *pgsr)
{
- uint32 off = (pgsr->scan_at) % pgsr->iodepth;
- OutstandingRead *this_read = &pgsr->outstanding_reads[off];
+ if (pgsr->prefetched_total_count == 0)
+ {
+ pg_streaming_read_prefetch(pgsr);
+ Assert(pgsr->hit_end || pgsr->prefetched_total_count > 0);
+ }
- if (pgsr->scan_at == pgsr->end_at)
+ if (dlist_is_empty(&pgsr->in_order))
+ {
+ Assert(pgsr->hit_end);
return 0;
+ }
+ else
+ {
+ PgStreamingReadItem *this_read;
+ uint64_t ret;
- //if (pgsr->prefetch_at == 0)
- pg_streaming_read_prefetch(pgsr);
+ Assert(pgsr->prefetched_total_count > 0);
- if (pgsr->scan_at == pgsr->end_at)
- return 0;
+ this_read = dlist_container(PgStreamingReadItem, sequence_node, dlist_pop_head_node(&pgsr->in_order));
+ Assert(this_read->valid);
- Assert(pgsr->scan_at < pgsr->prefetch_at);
- Assert(this_read->aio);
- Assert(this_read->done || this_read->in_progress);
+ if (this_read->in_progress)
+ {
+ pgaio_io_wait(this_read->aio, true);
+ /* callback should have updated */
+ Assert(!this_read->in_progress);
- if (!this_read->done)
- {
- pgaio_io_wait(this_read->aio, true);
- Assert(pgaio_io_success(this_read->aio));
- this_read->in_progress = false;
- this_read->done = true;
- }
+ ret = this_read->read_private;
- pgsr->scan_at++;
- this_read->done = false;
+ pgaio_io_recycle(this_read->aio);
+ }
+ else
+ {
+ ret = this_read->read_private;
+ }
- //pg_streaming_read_prefetch(pgsr);
+ this_read->valid = false;
+ pgsr->completed_count--;
+ dlist_push_tail(&pgsr->available, &this_read->node);
+ pg_streaming_read_prefetch(pgsr);
- return this_read->read_private;
+ return ret;
+ }
}