Prevent unintended dropping of active replication origins.
authorAmit Kapila <akapila@postgresql.org>
Wed, 14 Jan 2026 07:13:35 +0000 (07:13 +0000)
committerAmit Kapila <akapila@postgresql.org>
Wed, 14 Jan 2026 07:15:46 +0000 (07:15 +0000)
Commit 5b148706c5 exposed functionality that allows multiple processes to
use the same replication origin, enabling non-builtin logical replication
solutions to implement parallel apply for large transactions.

With this functionality, if two backends acquire the same replication
origin and one of them resets it first, the acquired_by flag is cleared
without acknowledging that another backend is still actively using the
origin. This can lead to the origin being unintentionally dropped. If the
shared memory for that dropped origin is later reused for a newly created
origin, the remaining backend that still holds a pointer to the old memory
may inadvertently advance the LSN of a completely different origin,
causing unpredictable behavior.

Although the underlying issue predates commit 5b148706c5, it did not
surface earlier because the internal parallel apply worker mechanism
correctly coordinated origin resets and drops.

This commit resolves the problem by introducing a reference counter for
replication origins. The reference count increases when a backend sets the
origin and decreases when it resets it. Additionally, the backend that
first acquires the origin will not release it until all other backends
using the origin have released it as well.

The patch also prevents dropping a replication origin when acquired_by is
zero but the reference counter is nonzero, covering the scenario where the
first session exits without properly releasing the origin.

Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/TY4PR01MB169077EE72ABE9E55BAF162D494B5A@TY4PR01MB16907.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com

contrib/test_decoding/expected/parallel_session_origin.out
contrib/test_decoding/specs/parallel_session_origin.spec
src/backend/replication/logical/origin.c

index e515b39f7ce86edda900726812e2e8917e83e53f..8e41831fcbc34bd456a11efa61c51a02045d80eb 100644 (file)
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
index c0e5fda07236a0f81adb72a98c94f301920f327c..2253a7a14eba4d93d229b89e92a8ca97a1405554 100644 (file)
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
index 04bc704a332b78376e716aa4857160145cd1be7f..13808a4674b3ce5f3261b33857538eccd800e99b 100644 (file)
@@ -130,6 +130,9 @@ typedef struct ReplicationState
     */
    int         acquired_by;
 
+   /* Count of processes that are currently using this origin. */
+   int         refcount;
+
    /*
     * Condition variable that's signaled when acquired_by changes.
     */
@@ -383,16 +386,19 @@ restart:
        if (state->roident == roident)
        {
            /* found our slot, is it busy? */
-           if (state->acquired_by != 0)
+           if (state->refcount > 0)
            {
                ConditionVariable *cv;
 
                if (nowait)
                    ereport(ERROR,
                            (errcode(ERRCODE_OBJECT_IN_USE),
-                            errmsg("could not drop replication origin with ID %d, in use by PID %d",
-                                   state->roident,
-                                   state->acquired_by)));
+                            (state->acquired_by != 0)
+                            ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+                                     state->roident,
+                                     state->acquired_by)
+                            : errmsg("could not drop replication origin with ID %d, in use by another process",
+                                     state->roident)));
 
                /*
                 * We must wait and then retry.  Since we don't know which CV
@@ -959,13 +965,16 @@ replorigin_advance(RepOriginId node,
        LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
 
        /* Make sure it's not used by somebody else */
-       if (replication_state->acquired_by != 0)
+       if (replication_state->refcount > 0)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_IN_USE),
-                    errmsg("replication origin with ID %d is already active for PID %d",
-                           replication_state->roident,
-                           replication_state->acquired_by)));
+                    (replication_state->acquired_by != 0)
+                    ? errmsg("replication origin with ID %d is already active for PID %d",
+                             replication_state->roident,
+                             replication_state->acquired_by)
+                    : errmsg("replication origin with ID %d is already active in another process",
+                             replication_state->roident)));
        }
 
        break;
@@ -1069,32 +1078,48 @@ replorigin_get_progress(RepOriginId node, bool flush)
    return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helper function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-   ConditionVariable *cv = NULL;
+   ConditionVariable *cv;
 
-   if (session_replication_state == NULL)
-       return;
+   Assert(session_replication_state != NULL);
 
    LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-   if (session_replication_state->acquired_by == MyProcPid)
-   {
-       cv = &session_replication_state->origin_cv;
+   /* The origin must be held by at least one process at this point. */
+   Assert(session_replication_state->refcount > 0);
 
+   /*
+    * Reset the PID only if the current session is the first to set up this
+    * origin. This avoids clearing the first process's PID when any other
+    * session releases the origin.
+    */
+   if (session_replication_state->acquired_by == MyProcPid)
        session_replication_state->acquired_by = 0;
-       session_replication_state = NULL;
-   }
+
+   session_replication_state->refcount--;
+
+   cv = &session_replication_state->origin_cv;
+   session_replication_state = NULL;
 
    LWLockRelease(ReplicationOriginLock);
 
-   if (cv)
-       ConditionVariableBroadcast(cv);
+   ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+   if (session_replication_state == NULL)
+       return;
+
+   replorigin_session_reset_internal();
 }
 
 /*
@@ -1174,6 +1199,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
                            node, acquired_by)));
        }
 
+       /*
+        * The origin is in use, but PID is not recorded. This can happen if
+        * the process that originally acquired the origin exited without
+        * releasing it. To ensure correctness, other processes cannot acquire
+        * the origin until all processes currently using it have released it.
+        */
+       else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+           ereport(ERROR,
+                   (errcode(ERRCODE_OBJECT_IN_USE),
+                    errmsg("replication origin with ID %d is already active in another process",
+                           curstate->roident)));
+
        /* ok, found slot */
        session_replication_state = curstate;
        break;
@@ -1205,9 +1242,21 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
    Assert(session_replication_state->roident != InvalidRepOriginId);
 
    if (acquired_by == 0)
+   {
        session_replication_state->acquired_by = MyProcPid;
+       Assert(session_replication_state->refcount == 0);
+   }
    else
+   {
+       /*
+        * Sanity check: the origin must already be acquired by the process
+        * passed as input, and at least one process must be using it.
+        */
        Assert(session_replication_state->acquired_by == acquired_by);
+       Assert(session_replication_state->refcount > 0);
+   }
+
+   session_replication_state->refcount++;
 
    LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1273,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-   ConditionVariable *cv;
-
    Assert(max_active_replication_origins != 0);
 
    if (session_replication_state == NULL)
@@ -1233,15 +1280,22 @@ replorigin_session_reset(void)
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("no replication origin is configured")));
 
-   LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-   session_replication_state->acquired_by = 0;
-   cv = &session_replication_state->origin_cv;
-   session_replication_state = NULL;
-
-   LWLockRelease(ReplicationOriginLock);
+   /*
+    * Restrict explicit resetting of the replication origin if it was first
+    * acquired by this process and others are still using it. While the
+    * system handles this safely (as happens if the first session exits
+    * without calling reset), it is best to avoid doing so.
+    */
+   if (session_replication_state->acquired_by == MyProcPid &&
+       session_replication_state->refcount > 1)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+                       session_replication_state->roident),
+                errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+                errhint("Reset the replication origin in all other processes before retrying.")));
 
-   ConditionVariableBroadcast(cv);
+   replorigin_session_reset_internal();
 }
 
 /*