Allow to use multiple statements extensively.
authorTatsuo Ishii <ishii@sraoss.co.jp>
Sun, 12 Feb 2023 10:59:00 +0000 (19:59 +0900)
committerTatsuo Ishii <ishii@sraoss.co.jp>
Wed, 15 Feb 2023 08:06:54 +0000 (17:06 +0900)
This commit tries to eliminate pgpool's long standing limitations
regarding multiple statements (multi-statements).

Previously

BEGIN;SELECT;
SAVEPOINT foo;

will fail in streaming replication mode because "BEGIN" was sent to
the primar node, but "SAVEPOINT" will be sent to both the primary and
standbys, and standbys will complain "SAVEPOINT can only be used in
transaction blocks".

Basic idea to solve the problem is, tracking explicit transactions
started by multi-statement queries so that all commands including
PREPARE, EXECUTE, DEALLOCATE, SAVEPOINT and COMMIT/ROLLBACK are sent
to the primary node in streaming replication mode or logical
replication mode.  In native replication or snapshot isolation mode,
those queries are sent to all of the backend nodes.

For this purpose new member: is_tx_started_by_multi_statement is added
to session context and also support functions are added.

extern bool is_tx_started_by_multi_statement_query(void);
extern void set_tx_started_by_multi_statement_query(void);
extern void unset_tx_started_by_multi_statement_query(void);

Discussion: https://www.pgpool.net/pipermail/pgpool-hackers/2023-February/004287.html
Back-patch-through: 4.2 as backport to 4.1 and before looks difficult

src/context/pool_query_context.c
src/context/pool_session_context.c
src/include/context/pool_session_context.h
src/protocol/CommandComplete.c
src/protocol/pool_process_query.c
src/protocol/pool_proto_modules.c

index 0277f9a846fabae8d270f47041e3e767ece12d33..83c0a2b27d839783b2ab478ced05057955322c67 100644 (file)
@@ -451,7 +451,7 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
        else if (MAIN_REPLICA && query_context->is_multi_statement)
        {
                /*
-                * If we are in native replication mode and we have multi statement query,
+                * If we are in streaming replication mode and we have multi statement query,
                 * we should send it to primary server only. Otherwise it is possible
                 * to send a write query to standby servers because we only use the
                 * first element of the multi statement query and don't care about the
@@ -484,7 +484,21 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
                /* Should be sent to both primary and standby? */
                else if (dest == POOL_BOTH)
                {
-                       pool_setall_node_to_be_sent(query_context);
+                       if (is_tx_started_by_multi_statement_query())
+                       {
+                               /*
+                                * If we are in an explicit transaction and the transaction
+                                * was started by a multi statement query, we should send
+                                * query to primary node only (which was supposed to be sent
+                                * to all nodes) until the transaction gets committed or
+                                * aborted.
+                                */
+                               pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+                       }
+                       else
+                       {
+                               pool_setall_node_to_be_sent(query_context);
+                       }
                }
                else if (pool_is_writing_transaction() &&
                                 pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
@@ -1351,8 +1365,19 @@ static POOL_DEST send_to_where(Node *node, char *query)
                        }
                        /* SAVEPOINT related commands are sent to both primary and standby */
                        else if (is_savepoint_query(node))
+                       {
+                               if (SL_MODE && is_tx_started_by_multi_statement_query())
+                               {
+                                       /*
+                                        * But in streaming replication mode, if a transaction was
+                                        * started by a multi statement query, SAVEPOINT should be
+                                        * sent to primary because the transaction was started on
+                                        * primary only.
+                                       */
+                                       return POOL_PRIMARY;
+                               }
                                return POOL_BOTH;
-
+                       }
                        /*
                         * 2PC commands
                         */
index 24e79c21d585b4d8b73afd11b89d8859972fd348..c62eb8334867e307a33230b9cda2d5e9657d10a4 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2022     PgPool Global Development Group
+ * Copyright (c) 2003-2023     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -180,6 +180,8 @@ pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * bac
        session_context->transaction_read_only = false;
 
        dml_adaptive_init();
+
+       unset_tx_started_by_multi_statement_query();
 }
 
 /*
@@ -2106,3 +2108,45 @@ pool_temp_tables_dump(void)
 #endif
 
 }
+
+/*
+ * Return true if an explicit transaction has been started by a
+ * multi-statement-query
+ */
+bool
+is_tx_started_by_multi_statement_query(void)
+{
+       if (!session_context)
+               ereport(ERROR,
+                               (errmsg("is_tx_started_by_multi_statement_query: session context is not initialized")));
+
+       return session_context->is_tx_started_by_multi_statement;
+}
+
+/*
+ * Remember that an explicit transaction has been started by a
+ * multi-statement-query
+ */
+void
+set_tx_started_by_multi_statement_query(void)
+{
+       if (!session_context)
+               ereport(ERROR,
+                               (errmsg("set_tx_started_by_multi_statement_query: session context is not initialized")));
+
+       session_context->is_tx_started_by_multi_statement = true;
+}
+
+/*
+ * Forget that an explicit transaction has been started by a
+ * multi-statement-query
+ */
+void
+unset_tx_started_by_multi_statement_query(void)
+{
+       if (!session_context)
+               ereport(ERROR,
+                               (errmsg("unset_tx_started_by_multi_statement_query: session context is not initialized")));
+
+       session_context->is_tx_started_by_multi_statement = false;
+}
index 7877448d92ec8293e1e84b9ed95f4c5594b82639..dd886534fd8fffbd4b7e5c80fe551d29a4a051a1 100644 (file)
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2022     PgPool Global Development Group
+ * Copyright (c) 2003-2023     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -147,6 +147,9 @@ typedef struct
         * message for frontend are sent out.
         */
        bool            flush_pending;
+
+       bool            is_tx_started_by_multi_statement; /* true if an explicit transaction has been strated by
+                                                                                                        multi statement query */
 }                      POOL_PENDING_MESSAGE;
 
 typedef enum {
@@ -312,6 +315,11 @@ typedef struct
         * Set by read_kind_from_backend and reset by SimpleForwardToFrontend.
         */
        bool            flush_pending;
+
+       bool            is_tx_started_by_multi_statement;       /* True if an explicit
+                                                                                                        * transaction has been
+                                                                                                        * started by a
+                                                                                                        * multi-statement-query */
 }                      POOL_SESSION_CONTEXT;
 
 extern void pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
@@ -396,6 +404,10 @@ extern void        pool_temp_tables_commit_pending(void);
 extern void    pool_temp_tables_remove_pending(void);
 extern void    pool_temp_tables_dump(void);
 
+extern bool is_tx_started_by_multi_statement_query(void);
+extern void set_tx_started_by_multi_statement_query(void);
+extern void unset_tx_started_by_multi_statement_query(void);
+
 #ifdef NOT_USED
 extern void pool_set_preferred_main_node_id(int node_id);
 extern int     pool_get_preferred_main_node_id(void);
index d285f826da46b85bd3d24bae27366e92fa2bd4cf..963a034c625a576f486b738ed1718a2d69ed56bc 100644 (file)
@@ -5,7 +5,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2022     PgPool Global Development Group
+ * Copyright (c) 2003-2023     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -140,6 +140,36 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool
                memcpy(p1, p, len);
        }
 
+       /*
+        * Check if a transaction start command was issued by a multi statement
+        * query by looking at the command tag. We cannot look into the parse tree
+        * because our parse tree is only the first query in the multi statement.
+        */
+       if (SL_MODE && session_context->query_context &&
+               session_context->query_context->is_multi_statement)
+       {
+               if (!strcmp(p1, "BEGIN"))
+               {
+                       /*
+                        * If the query was a transaction starting command, remember it
+                        * until it gets committed or roll backed.
+                        */
+                       elog(DEBUG1, "Call set_tx_started_by_multi_statement_query() in CommandComplete");
+                       set_tx_started_by_multi_statement_query();
+               }
+               else if (!strcmp(p1, "COMMIT") || !strcmp(p1, "ROLLBACK"))
+               {
+                       /*
+                        * It is possible that the multi statement query included both
+                        * BEGIN and COMMIT/ROLLBACK, or just COMMIT/ROLLBACK command.  In
+                        * this case we forget that a transaction was started by multi
+                        * statement query.
+                        */
+                       elog(DEBUG1, "Call unset_tx_started_by_multi_statement_query() in CommandComplete");
+                       unset_tx_started_by_multi_statement_query();
+               }
+       }
+
        /*
         * If operated in streaming replication mode and extended query mode, just
         * forward the packet to frontend and we are done. Otherwise, we need to
@@ -334,11 +364,18 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
                {
                        /* Commit ongoing CREATE/DROP temp table status */
                        pool_temp_tables_commit_pending();                      
+
+                       /* Forget a transaction was started by multi statement query */
+                       unset_tx_started_by_multi_statement_query();
                }
                else if (stmt->kind == TRANS_STMT_ROLLBACK)
                {
                        /* Remove ongoing CREATE/DROP temp table status */
                        pool_temp_tables_remove_pending();
+
+                       /* Forget a transaction was started by multi statement query */
+                       elog(LOG, "unset_tx_started_by_multi_statement_query is called in CommandComplete");
+                       unset_tx_started_by_multi_statement_query();
                }
        }
        else if (IsA(node, CreateStmt))
index f2db65eabe80eb9af33a08812955ab1010435c3e..d4389d7b331dec63c0a72e38f62d93b6a3a172c8 100644 (file)
@@ -4184,6 +4184,12 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back
 
                                INTERNAL_TRANSACTION_STARTED(backend, i) = false;
 
+                               /*
+                                * Explicitly set the tx state to 'Idle'. This is necessary
+                                * because ReadyForQuery only takes care VALID_BACKEND.
+                                */
+                               TSTATE(backend, i) = 'I';
+
                                if (MAJOR(backend) == PROTO_MAJOR_V3 && !VALID_BACKEND(i))
                                {
                                        /*
@@ -4230,6 +4236,12 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back
                        PG_END_TRY();
                        INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID) = false;
 
+                       /*
+                        * Explicitly set the tx state to 'Idle'. This is necessary
+                        * because ReadyForQuery only takes care VALID_BACKEND.
+                        */
+                       TSTATE(backend, MAIN_NODE_ID) = 'I';
+
                        if (MAJOR(backend) == PROTO_MAJOR_V3 && !VALID_BACKEND(MAIN_NODE_ID))
                        {
                                /*
index 94aed730abe511948f6aecd08dfe47ec27002fc5..ed45b51d05e6b4cbeeeeee669b7fd11bf5b606c0 100644 (file)
@@ -2115,7 +2115,7 @@ ReadyForQuery(POOL_CONNECTION * frontend,
                        TSTATE(backend, i) = kind;
                        ereport(DEBUG5,
                                        (errmsg("processing ReadyForQuery"),
-                                        errdetail("transaction state '%c'(%02x)", state, state)));
+                                        errdetail("transaction state of node %d '%c'(%02x)", i, kind , kind)));
 
                        /*
                         * The transaction state to be returned to frontend is main node's.
@@ -4554,6 +4554,9 @@ check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * fro
 {
        int             len;
 
+       if (TSTATE(backend, MAIN_NODE_ID) != 'E')
+               return true;
+
        /*
         * Are we in failed transaction and the command is not a transaction close
         * command?