{
PgAioInProgress *io = local_reaped_ios[i];
- Assert(io->flags & PGAIOIP_IN_USE);
-
if (!(io->flags & PGAIOIP_DONE))
{
PgAioCompletedCB cb;
if (done)
{
io->flags |= PGAIOIP_DONE;
+ LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+ pgaio_put_io_locked(io, true);
+ LWLockRelease(SharedAIOCtlLock);
+
num_complete++;
}
else
}
}
- START_CRIT_SECTION();
-
- /* and recycle io entries */
- if (num_complete > 0)
- {
- LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
- for (int i = 0; i < num_local_reaped; i++)
- {
- PgAioInProgress *io = local_reaped_ios[i];
-
- if (io->flags & PGAIOIP_DONE)
- pgaio_put_io_locked(io, true);
- }
- LWLockRelease(SharedAIOCtlLock);
- }
-
num_local_reaped = 0;
- END_CRIT_SECTION();
-
/* if any IOs weren't fully done, re-submit them */
if (num_reissued)
pgaio_submit_pending(false);
}
void
-pgaio_wait_for_io(PgAioInProgress *io)
+pgaio_wait_for_io(PgAioInProgress *io, bool holding_reference)
{
uint8 init_flags;
+ uint8 flags;
+ bool increased_refcount = false;
#ifdef PGAIO_VERBOSE
elog(DEBUG2, "waiting for %zu",
io - aio_ctl->in_progress_io);
#endif
- init_flags = *(volatile uint8*) &io->flags;
+ if (!holding_reference)
+ {
+ LWLockAcquire(SharedAIOCompletionLock, LW_SHARED);
+ LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+
+ init_flags = flags = *(volatile uint8*) &io->flags;
+
+ if (init_flags & (PGAIOIP_IN_USE))
+ {
+ io->refcount++;
+ increased_refcount = true;
+ }
+ else
+ {
+ Assert(init_flags & (PGAIOIP_IDLE | PGAIOIP_DONE | PGAIOIP_ONLY_USER));
+ }
+
+ LWLockRelease(SharedAIOCtlLock);
+ LWLockRelease(SharedAIOCompletionLock);
+
+ if (!increased_refcount)
+ return;
+ }
+ else
+ {
+ init_flags = flags = *(volatile uint8*) &io->flags;
+ }
+
+ Assert(io->refcount > 0);
+ Assert(!(init_flags & PGAIOIP_IDLE));
+ Assert(init_flags & (PGAIOIP_IN_USE | PGAIOIP_ONLY_USER));
if (!(init_flags & PGAIOIP_INFLIGHT) &&
- !(init_flags & (PGAIOIP_DONE | PGAIOIP_IDLE)))
+ !(init_flags & PGAIOIP_DONE))
{
pgaio_submit_pending(false);
}
while (true)
{
- if (io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE))
+ flags = *(volatile uint8*) &io->flags;
+
+ if (flags & PGAIOIP_DONE)
break;
pgaio_drain(&aio_ctl->shared_ring, false);
- if (io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE))
+ flags = *(volatile uint8*) &io->flags;
+
+ if (flags & PGAIOIP_DONE)
break;
- if (io->flags & PGAIOIP_INFLIGHT)
+ if (flags & PGAIOIP_INFLIGHT)
{
int ret;
ResetLatch(MyLatch);
}
- if (!(io->flags & PGAIOIP_INFLIGHT))
+ flags = *(volatile uint8*) &io->flags;
+ if (!(flags & PGAIOIP_INFLIGHT))
{
PG_SETMASK(&UnBlockSig);
continue;
if (IsUnderPostmaster)
ConditionVariablePrepareToSleep(&io->cv);
- if (!(io->flags & (PGAIOIP_DONE | PGAIOIP_IDLE)))
+ flags = *(volatile uint8*) &io->flags;
+ if (!(flags & PGAIOIP_DONE))
ConditionVariableSleep(&io->cv, 0);
if (IsUnderPostmaster)
ConditionVariableCancelSleep();
}
}
+
+ flags = *(volatile uint8*) &io->flags;
+ Assert(flags & (PGAIOIP_DONE | PGAIOIP_ONLY_USER));
+
+ if (increased_refcount)
+ {
+ LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
+ pgaio_put_io_locked(io, false);
+ LWLockRelease(SharedAIOCtlLock);
+ }
}
PgAioInProgress *
return io;
}
+extern void
+pgaio_io_recycle(PgAioInProgress *io)
+{
+ uint32 init_flags = *(volatile uint8*) &io->flags;
+
+ Assert(init_flags & PGAIOIP_ONLY_USER);
+ Assert(io->refcount == 1);
+
+ io->flags &= ~PGAIOIP_DONE;
+}
+
static void __attribute__((noinline))
pgaio_prepare_io(PgAioInProgress *io, PgAioAction action)
{
- Assert(!(io->flags & PGAIOIP_IDLE));
+ Assert(!(io->flags & (PGAIOIP_IDLE | PGAIOIP_DONE)));
/* true for now, but not necessarily in the future */
Assert(io->flags & PGAIOIP_ONLY_USER);
+ Assert(io->refcount == 1);
Assert(num_local_pending_requests < PGAIO_SUBMIT_BATCH_SIZE);
- io->flags &= ~(PGAIOIP_ONLY_USER | PGAIOIP_DONE);
+ io->flags &= ~PGAIOIP_ONLY_USER;
io->flags |= PGAIOIP_IN_USE;
/* for this module */
if (release_internal && io->refcount > 0)
{
io->flags |= PGAIOIP_ONLY_USER;
+ io->flags &= ~PGAIOIP_IN_USE;
pgaio_bounce_buffer_release_locked(io);
}
if (io->refcount > 0)
return;
- Assert(io->flags & PGAIOIP_DONE);
+ Assert(io->flags & PGAIOIP_DONE ||
+ io->flags & PGAIOIP_ONLY_USER);
- io->flags &= ~(PGAIOIP_IN_USE|PGAIOIP_DONE | PGAIOIP_ONLY_USER);
+ io->flags &= ~(PGAIOIP_IN_USE | PGAIOIP_DONE | PGAIOIP_ONLY_USER);
io->flags |= PGAIOIP_IDLE;
io->type = 0;
io->initiatorProcIndex = INVALID_PGPROCNO;