From 8640abfc41ff06b1e6d31315239292f4d3d4191d Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Thu, 27 Jul 2017 08:00:32 +0900 Subject: [PATCH] Fix Pgpool-II hung up bug or other errors in error case in extended query in replication mode. In extended query in streaming replication mode, responses supposed to be returned from backend is managed by the pending messages. However, if an error response returned from backend, the sequence of returned message from backend is not what we expect. The mismatch was partially solved in the code but it turned out they were not sufficient. The cause of hung is basically when an error response is received from backend before frontend sends 'S' (sync) message. If backend detects errors while doing extended query, it does not return any response until it receives sync message. Of course at this point frontend is expected to send sync message to Pgpool-II, but it may not reach to the socket of Pgpool-II. So it is possible that Pgpool-II does not notice the sync message is coming, and does not forward the sync message to backend. As a result, nothing goes on and Pgpool-II is stuck. To fix the problem following modifications are made in this commit: - When error response is received from backend, after forwarding the error response to frontend, remove all pending messages and backend message buffer data except POOL_SYNC pending message and ready for query (before we removed all messages including ready for query, which is apparently wrong). If sync message is not received yet, call ProcessFrontendResponse() to read data from frontend. This ensures eliminating the expectation of receiving messages from backend in normal cases, and receiving the sync message from frontend. - When 'S' (sync) message is received from frontend, forward it to backends and wait till "ready for query" message is received from the backends. This ensures Pgpool-II to receive the read for query message and goes into the proper sync point. - It is still possible after receiving the ready for query message, different messages arrived from each backend. If the numbers of messages are same, "kind mismatch" error will occur. If the number of messages are different, it is possible that Pgpool-II is stuck, because read_kind_from_backend() will wait till a message coming from backend. To fix this if either load balance node or primary node returns 'Z' (ready for query), try to skip messages on the other node. This is done in read_kind_from_backend(). See comments around in line 3391 of pool_process_query.c for more details. Other fixes in this commit. - Do not send intended error query to backend in streaming replication mode in ErrorResponse3(). This is not necessary in streaming replication mode. - Fix pool_virtual_master_db_node_id() to return the virtual_master_node_id only when query is in progress and query context exists. Before in progress state was not checked and may return bogus node id. --- src/context/pool_query_context.c | 2 +- src/protocol/pool_process_query.c | 19 +++- src/protocol/pool_proto_modules.c | 178 +++++++++++++++++++++++++----- 3 files changed, 166 insertions(+), 33 deletions(-) diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index c1ad541cb..6d8b1df87 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -304,7 +304,7 @@ int pool_virtual_master_db_node_id(void) return REAL_MASTER_NODE_ID; } - if (sc->query_context) + if (sc->in_progress && sc->query_context) { int node_id = sc->query_context->virtual_master_node_id; diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c index 547ec3efc..92c19ef7d 100644 --- a/src/protocol/pool_process_query.c +++ b/src/protocol/pool_process_query.c @@ -3179,6 +3179,7 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac ereport(DEBUG1, (errmsg("read_kind_from_backend: sync pending message exists"))); session_context->query_context = NULL; + pool_unset_ignore_till_sync(); pool_unset_query_in_progress(); } else @@ -3386,16 +3387,28 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac * various scenario, probably we should employ this strategy for 'Z' * (ready for response) case only, since it's a good candidate to * re-sync primary and standby. + * + * 2017/7/16 Tatsuo Ishii wrote: The issue above has been solved since + * the pending message mechanism was introduced. However, in error + * cases it is possible that similar issue could happen since returned + * messages do not follow the sequence recorded in the pending + * messages because the backend ignores requests till sync message is + * received. In this case we need to re-sync either master or + * standby. So we check not only the standby but master node. */ - if (session_context->load_balance_node_id != MASTER_NODE_ID && - kind_list[MASTER_NODE_ID] == 'Z' && STREAM) + (kind_list[MASTER_NODE_ID] == 'Z' || + kind_list[session_context->load_balance_node_id] == 'Z') + && STREAM) { POOL_CONNECTION *s; char *buf; int len; - s = CONNECTION(backend, session_context->load_balance_node_id); + if (kind_list[MASTER_NODE_ID] == 'Z') + s = CONNECTION(backend, session_context->load_balance_node_id); + else + s = CONNECTION(backend, MASTER_NODE_ID); /* skip len and contents corresponding standby data */ pool_read(s, &len, sizeof(len)); diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index e2dc3baa4..8c504584b 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -96,6 +96,9 @@ static char * flatten_set_variable_args(const char *name, List *args); static bool process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context); +static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend); +static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend); /* * This is the workhorse of processing the pg_terminate_backend function to @@ -1570,8 +1573,8 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, query_context->where_to_send[session_context->load_balance_node_id] = true; } - pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, false); - pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, false); + pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, true); + pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, true); /* Add pending message */ pmsg = pool_pending_message_create('C', len, contents); @@ -2212,8 +2215,9 @@ POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend, if (ret != POOL_CONTINUE) return ret; - raise_intentional_error_if_need(backend); - + if (!STREAM) + raise_intentional_error_if_need(backend); + return POOL_CONTINUE; } @@ -2462,6 +2466,12 @@ POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, else if (!pool_is_query_in_progress()) pool_set_query_in_progress(); status = SimpleForwardToBackend(fkind, frontend, backend, len, contents); + + if (STREAM) + { + /* Wait till Ready for query received */ + pool_wait_till_ready_for_query(backend); + } break; case 'F': /* FunctionCall */ @@ -2635,7 +2645,7 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend, { /* parse_before_bind() was called. Do not foward the * close complete message to frontend. */ - ereport(LOG, + ereport(DEBUG1, (errmsg("processing backend response"), errdetail("do not forward close complete message to frontend"))); pool_discard_packet_contents(backend); @@ -2662,31 +2672,8 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend, { pool_set_ignore_till_sync(); pool_unset_query_in_progress(); - if (STREAM) - { - POOL_PENDING_MESSAGE *pmsg; - int i; - - /* Remove all pending messages */ - do - { - pmsg = pool_pending_message_pull_out(); - pool_pending_message_free_pending_message(pmsg); - } - while (pmsg); - - pool_pending_message_reset_previous_message(); - - /* Discard read buffer */ - for (i=0;i 0) + { + buf = pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len)); + pool_push(CONNECTION(backend, i), buf, ntohl(len)-sizeof(len)); + } + + if (kind == 'Z') /* Ready for query? */ + { + pool_pop(CONNECTION(backend, i), &poplen); + ereport(DEBUG1, + (errmsg("pool_wait_till_ready_for_query: backend:%d ready for query found. buffer length:%d", + i, CONNECTION(backend, i)->len))); + break; + } + } + } + } +} + +/* + * Called when error response received in streaming replication mode and doing + * extended query. Remove all pending messages and backend message buffer data + * except POOL_SYNC pending message and ready for query. If sync message is + * not received yet, call ProcessFrontendResponse() to read data from + * frontend. + */ +static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) +{ + POOL_PENDING_MESSAGE *pmsg; + int i; + + if (!pool_is_doing_extended_query_message() || !STREAM) + return; + + /* + * Check to see if we aready received a sync + * message. If not, call ProcessFrontendResponse() to + * get the sync message from client. + */ + pmsg = pool_pending_message_get(POOL_SYNC); + if (pmsg == NULL) + { + ProcessFrontendResponse(frontend, backend); + } + pool_pending_message_free_pending_message(pmsg); + + /* Remove all pending messages except sync message */ + do + { + pmsg = pool_pending_message_head_message(); + if (pmsg && pmsg->type == POOL_SYNC) + { + ereport(LOG, + (errmsg("Process backend response: sync pending message found after receiving error response"))); + pool_unset_ignore_till_sync(); + pool_pending_message_free_pending_message(pmsg); + break; + } + pool_pending_message_free_pending_message(pmsg); + pmsg = pool_pending_message_pull_out(); + pool_pending_message_free_pending_message(pmsg); + } + while (pmsg); + + pool_pending_message_reset_previous_message(); + + /* Discard read buffer execpt "Ready for query" */ + for (i=0;ilen, sts))); + pool_unread(CONNECTION(backend, i), &kind, sizeof(kind)); + break; + } + + if (kind == 'Z') /* Ready for query? */ + { + pool_unread(CONNECTION(backend, i), &kind, sizeof(kind)); + ereport(DEBUG1, + (errmsg("pool_discard_except_sync_and_ready_for_query: Ready for query found. backend:%d", + i))); + break; + } + else + { + /* Read and discard packet */ + pool_read(CONNECTION(backend, i), &len, sizeof(len)); + if ((ntohl(len)-sizeof(len)) > 0) + { + pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len)); + } + ereport(DEBUG1, + (errmsg("pool_discard_except_sync_and_ready_for_query: discarding packet %c (len:%lu) of backend:%d", kind, ntohl(len)-sizeof(len), i))); + } + } + } + } +} -- 2.39.5