From 35e28aa17bb9460b01859e9c413815f7b9ce9f1b Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 15 Oct 2013 10:37:38 +0900 Subject: [PATCH] Fix data inconsistency problem with native replication mode + extended protocol case. It is reported that concurrent INSERT using JDBC driver causes data difference among database node. This only happens following conditions are all met: 1) Native replication mode 2) Extended protocol used 3) The portal created by parse message is reused by bind message 4) autocommit is on 5) SERIAL (sequence) is used Pgpool-II's parse message function knows it has to lock the target table when INSERT (plus #5) is issued by clients. Unfortunately bind message function did not know it. Once parse/bind/execute finishes, pgpool releases the lock obtained by parse because of #4. JDBC wants to reuse the portal and starts the cycle from bind message, which does not obtain lock. As as result, lock-free INSERT are floating around which causes data inconsistency of course. The solution is, lock the table in bind phase. For this bind needs to issue LOCK in extended protocol. This was a little bit hard because the module (do_command()) to issue internal SQL command (other than SELECT) does not support extended protocol. To solve the problem do_query() is modified so that it accepts other than SELECT because it already accepts extended protocol. The modification is minimum and is only tested for the case called from insert_lock(). I do not recommend to replace every occurrence of do_command() with do_query() at this point. BTW the reason why the bug is not reported is, most users uses JDBC with auto commit = off. In this case, the lock obtained by parse persists until user explicitly issues commit or rollback. Per bug report by Steve Kuekes in [pgpool-general: 2142]. --- src/protocol/pool_process_query.c | 79 +++++++++++++++---- src/protocol/pool_proto_modules.c | 25 ++++++ .../tests/002.native_replication/test.sh | 15 ++++ .../002.native_replication/PgTester.java | 47 +++++++++++ .../tests/002.native_replication/create.sql | 6 ++ 5 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 test/regression/tests/002.native_replication/PgTester.java create mode 100644 test/regression/tests/002.native_replication/create.sql diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c index 9f9301d68..aad73ca1d 100644 --- a/src/protocol/pool_process_query.c +++ b/src/protocol/pool_process_query.c @@ -2348,7 +2348,10 @@ void free_select_result(POOL_SELECT_RESULT *result) } /* - * Send a SELECT to one DB node. + * Send a query to one DB node and wait for it's completion. The quey + * can be SELECT or any other type of query. However at this moment, + * the only client calls this function other than SELECT is + * insert_lock(), and the qury is either LOCK or SELECT for UPDATE. */ POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result, int major) { @@ -2493,14 +2496,20 @@ POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT * pool_write(backend, prepared_name, pname_len); /* - * Send descrive message + * Send descrive message if the query is SELECT. */ - pool_write(backend, "D", 1); - len = 4 + 1 + pname_len; - len = htonl(len); - pool_write(backend, &len, sizeof(len)); - pool_write(backend, "P", 1); - pool_write(backend, prepared_name, pname_len); + if (!strcasecmp(query, "SELECT")) + { + /* + * Send descrive message + */ + pool_write(backend, "D", 1); + len = 4 + 1 + pname_len; + len = htonl(len); + pool_write(backend, &len, sizeof(len)); + pool_write(backend, "P", 1); + pool_write(backend, prepared_name, pname_len); + } /* * Send execute message @@ -2629,10 +2638,18 @@ POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT * pool_debug("do_query: Command complete received"); state |= COMMAND_COMPLETE_RECEIVED; /* - * "Comand Complete" implies data row received status. - * If there's no row returned, "command complete" comes without row data. + * "Comand Complete" implies data row received status + * if the query was SELECT. If there's no row + * returned, "command complete" comes without row + * data. */ state |= DATA_ROW_RECEIVED; + /* + * For other than SELECT, ROW_DESCRIPTION_RECEIVED + * should be set because we didn't issue DESCRIBE + * message. + */ + state |= ROW_DESCRIPTION_RECEIVED; break; case '1': /* Parse complete */ @@ -3019,6 +3036,7 @@ POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend size_t nmatch = 2; regmatch_t pmatch[nmatch]; static POOL_RELCACHE *relcache; + POOL_SELECT_RESULT *result; /* insert_lock can be used in V3 only */ if (MAJOR(backend) != PROTO_MAJOR_V3) @@ -3137,12 +3155,20 @@ POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend if (lock_kind == 1) { - status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid, - MASTER_CONNECTION(backend)->key, 0); + if (pool_get_session_context() && pool_is_doing_extended_query_message()) + { + status = do_query(MASTER(backend), qbuf, &result, MAJOR(backend)); + if (result) + free_select_result(result); + } + else + { + status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid, + MASTER_CONNECTION(backend)->key, 0); + } } else if (lock_kind == 2) { - POOL_SELECT_RESULT *result; status = do_query(MASTER(backend), qbuf, &result, MAJOR(backend)); if (result) free_select_result(result); @@ -3186,8 +3212,18 @@ POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend lock_kind = 1; snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table); per_node_statement_log(backend, MASTER_NODE_ID, qbuf); - status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid, - MASTER_CONNECTION(backend)->key, 0); + + if (pool_get_session_context() && pool_is_doing_extended_query_message()) + { + status = do_query(MASTER(backend), qbuf, &result, MAJOR(backend)); + if (result) + free_select_result(result); + } + else + { + status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid, + MASTER_CONNECTION(backend)->key, 0); + } } } @@ -3210,8 +3246,17 @@ POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend if (lock_kind == 1) { per_node_statement_log(backend, i, qbuf); - status = do_command(frontend, CONNECTION(backend, i), qbuf, PROTO_MAJOR_V3, - MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0); + if (pool_get_session_context() && pool_is_doing_extended_query_message()) + { + status = do_query(MASTER(backend), qbuf, &result, MAJOR(backend)); + if (result) + free_select_result(result); + } + else + { + status = do_command(frontend, CONNECTION(backend, i), qbuf, PROTO_MAJOR_V3, + MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0); + } } else if (lock_kind == 2) { diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index 999ceede7..664ff8671 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -1222,6 +1222,7 @@ POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, POOL_SENT_MESSAGE *bind_msg; POOL_SESSION_CONTEXT *session_context; POOL_QUERY_CONTEXT *query_context; + int insert_stmt_with_lock = 0; /* Get session context */ session_context = pool_get_session_context(); @@ -1291,6 +1292,30 @@ POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, return POOL_END; } + /* + * Start a transaction if necessary + */ + pool_debug("Bind: checking strict query"); + if (is_strict_query(query_context->parse_tree)) + { + pool_debug("Bind: strict query"); + start_internal_transaction(frontend, backend, query_context->parse_tree); + allow_close_transaction = 1; + } + + pool_debug("Bind: checking insert lock"); + insert_stmt_with_lock = need_insert_lock(backend, query_context->original_query, query_context->parse_tree); + if (insert_stmt_with_lock) + { + pool_debug("Bind: issuing insert lock"); + /* issue a LOCK command to keep consistency */ + if (insert_lock(frontend, backend, query_context->original_query, (InsertStmt *)query_context->parse_tree, insert_stmt_with_lock) != POOL_CONTINUE) + { + pool_query_context_destroy(query_context); + return POOL_END; + } + } + pool_debug("Bind: waiting for master completing the query"); if (pool_extended_send_and_wait(query_context, "B", len, contents, 1, MASTER_NODE_ID) != POOL_CONTINUE) diff --git a/src/test/regression/tests/002.native_replication/test.sh b/src/test/regression/tests/002.native_replication/test.sh index 1d7af14df..6d9a463ef 100755 --- a/src/test/regression/tests/002.native_replication/test.sh +++ b/src/test/regression/tests/002.native_replication/test.sh @@ -35,6 +35,21 @@ EOF $PGBENCH -i test $PGBENCH -f pgbench.sql -c 10 -t 10 test +# test with extended protocol (autocommit on) +# per [pgpool-general: 2144]. +cp ../PgTester.java . +javac PgTester.java +export CLASSPATH=.:$JDBC_DRIVER +psql -f ../create.sql test +env +psql -f $PGPOOL_INSTALL_DIR/share/pgpool-II/insert_lock.sql test + +java PgTester 0 & +java PgTester 10 & +java PgTester 100 & +java PgTester 1000 & +wait + $PSQL -p 11000 test <