Parsed test spec with 2 sessions
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
step s0_setup: SELECT pg_replication_origin_session_setup('origin');
pg_replication_origin_session_setup
-----------------------------------
t
(1 row)
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
step s0_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR: cannot reset replication origin with ID 1 because it is still in use by other processes
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
*/
int acquired_by;
+ /* Count of processes that are currently using this origin. */
+ int refcount;
+
/*
* Condition variable that's signaled when acquired_by changes.
*/
if (state->roident == roident)
{
/* found our slot, is it busy? */
- if (state->acquired_by != 0)
+ if (state->refcount > 0)
{
ConditionVariable *cv;
if (nowait)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with ID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ (state->acquired_by != 0)
+ ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)
+ : errmsg("could not drop replication origin with ID %d, in use by another process",
+ state->roident)));
/*
* We must wait and then retry. Since we don't know which CV
LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
/* Make sure it's not used by somebody else */
- if (replication_state->acquired_by != 0)
+ if (replication_state->refcount > 0)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("replication origin with ID %d is already active for PID %d",
- replication_state->roident,
- replication_state->acquired_by)));
+ (replication_state->acquired_by != 0)
+ ? errmsg("replication origin with ID %d is already active for PID %d",
+ replication_state->roident,
+ replication_state->acquired_by)
+ : errmsg("replication origin with ID %d is already active in another process",
+ replication_state->roident)));
}
break;
return remote_lsn;
}
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helper function to reset the session replication origin */
static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
{
- ConditionVariable *cv = NULL;
+ ConditionVariable *cv;
- if (session_replication_state == NULL)
- return;
+ Assert(session_replication_state != NULL);
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- if (session_replication_state->acquired_by == MyProcPid)
- {
- cv = &session_replication_state->origin_cv;
+ /* The origin must be held by at least one process at this point. */
+ Assert(session_replication_state->refcount > 0);
+ /*
+ * Reset the PID only if the current session is the first to set up this
+ * origin. This avoids clearing the first process's PID when any other
+ * session releases the origin.
+ */
+ if (session_replication_state->acquired_by == MyProcPid)
session_replication_state->acquired_by = 0;
- session_replication_state = NULL;
- }
+
+ session_replication_state->refcount--;
+
+ cv = &session_replication_state->origin_cv;
+ session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
- if (cv)
- ConditionVariableBroadcast(cv);
+ ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+ if (session_replication_state == NULL)
+ return;
+
+ replorigin_session_reset_internal();
}
/*
node, acquired_by)));
}
+ /*
+ * The origin is in use, but PID is not recorded. This can happen if
+ * the process that originally acquired the origin exited without
+ * releasing it. To ensure correctness, other processes cannot acquire
+ * the origin until all processes currently using it have released it.
+ */
+ else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with ID %d is already active in another process",
+ curstate->roident)));
+
/* ok, found slot */
session_replication_state = curstate;
break;
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
+ {
session_replication_state->acquired_by = MyProcPid;
+ Assert(session_replication_state->refcount == 0);
+ }
else
+ {
+ /*
+ * Sanity check: the origin must already be acquired by the process
+ * passed as input, and at least one process must be using it.
+ */
Assert(session_replication_state->acquired_by == acquired_by);
+ Assert(session_replication_state->refcount > 0);
+ }
+
+ session_replication_state->refcount++;
LWLockRelease(ReplicationOriginLock);
void
replorigin_session_reset(void)
{
- ConditionVariable *cv;
-
Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
- session_replication_state->acquired_by = 0;
- cv = &session_replication_state->origin_cv;
- session_replication_state = NULL;
-
- LWLockRelease(ReplicationOriginLock);
+ /*
+ * Restrict explicit resetting of the replication origin if it was first
+ * acquired by this process and others are still using it. While the
+ * system handles this safely (as happens if the first session exits
+ * without calling reset), it is best to avoid doing so.
+ */
+ if (session_replication_state->acquired_by == MyProcPid &&
+ session_replication_state->refcount > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+ session_replication_state->roident),
+ errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+ errhint("Reset the replication origin in all other processes before retrying.")));
- ConditionVariableBroadcast(cv);
+ replorigin_session_reset_internal();
}
/*