static POOL_DEST send_to_where(Node *node);
static void where_to_send_deallocate(POOL_QUERY_CONTEXT * query_context, Node *node);
+static void where_to_send_main_replica(POOL_QUERY_CONTEXT * query_context, char *query, Node *node);
+static void where_to_send_native_replication(POOL_QUERY_CONTEXT * query_context, char *query, Node *node);
+
static char *remove_read_write(int len, const char *contents, int *rewritten_len);
static void set_virtual_main_node(POOL_QUERY_CONTEXT *query_context);
static void set_load_balance_info(POOL_QUERY_CONTEXT *query_context);
void
pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
{
- POOL_SESSION_CONTEXT *session_context;
- POOL_CONNECTION_POOL *backend;
-
CHECK_QUERY_CONTEXT_IS_VALID;
- session_context = pool_get_session_context(false);
- backend = session_context->backend;
-
/*
* Zap out DB node map
*/
{
pool_set_node_to_be_sent(query_context, REAL_MAIN_NODE_ID);
}
- else if (MAIN_REPLICA && query_context->is_multi_statement)
- {
- /*
- * If we are in streaming replication mode and we have multi statement query,
- * we should send it to primary server only. Otherwise it is possible
- * to send a write query to standby servers because we only use the
- * first element of the multi statement query and don't care about the
- * rest. Typical situation where we are bugged by this is,
- * "BEGIN;DELETE FROM table;END". Note that from pgpool-II 3.1.0
- * transactional statements such as "BEGIN" is unconditionally sent to
- * all nodes(see send_to_where() for more details). Someday we might
- * be able to understand all part of multi statement queries, but
- * until that day we need this band aid.
- */
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
else if (MAIN_REPLICA)
{
- POOL_DEST dest;
-
- dest = send_to_where(node);
-
- dml_adaptive(node, query);
-
- ereport(DEBUG1,
- (errmsg("decide where to send the query"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- /* Should be sent to primary only? */
- if (dest == POOL_PRIMARY)
- {
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- /* Should be sent to both primary and standby? */
- else if (dest == POOL_BOTH)
- {
- if (is_tx_started_by_multi_statement_query())
- {
- /*
- * If we are in an explicit transaction and the transaction
- * was started by a multi statement query, we should send
- * query to primary node only (which was supposed to be sent
- * to all nodes) until the transaction gets committed or
- * aborted.
- */
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- else
- {
- pool_setall_node_to_be_sent(query_context);
- }
- }
- else if (pool_is_writing_transaction() &&
- pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
+ if (query_context->is_multi_statement)
{
+ /*
+ * If we are in streaming replication mode and we have multi statement query,
+ * we should send it to primary server only. Otherwise it is possible
+ * to send a write query to standby servers because we only use the
+ * first element of the multi statement query and don't care about the
+ * rest. Typical situation where we are bugged by this is,
+ * "BEGIN;DELETE FROM table;END". Note that from pgpool-II 3.1.0
+ * transactional statements such as "BEGIN" is unconditionally sent to
+ * all nodes(see send_to_where() for more details). Someday we might
+ * be able to understand all part of multi statement queries, but
+ * until that day we need this band aid.
+ */
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
-
- /*
- * Ok, we might be able to load balance the SELECT query.
- */
else
- {
- if (pool_config->load_balance_mode &&
- is_select_query(node, query) &&
- MAJOR(backend) == PROTO_MAJOR_V3)
- {
- /*
- * If (we are outside of an explicit transaction) OR (the
- * transaction has not issued a write query yet, AND
- * transaction isolation level is not SERIALIZABLE) we might
- * be able to load balance.
- */
-
- ereport(DEBUG1,
- (errmsg("checking load balance preconditions. TSTATE:%c writing_transaction:%d failed_transaction:%d isolation:%d",
- TSTATE(backend, PRIMARY_NODE_ID),
- pool_is_writing_transaction(),
- pool_is_failed_transaction(),
- pool_get_transaction_isolation()),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- if (TSTATE(backend, PRIMARY_NODE_ID) == 'I' ||
- (!pool_is_writing_transaction() &&
- !pool_is_failed_transaction() &&
- pool_get_transaction_isolation() != POOL_SERIALIZABLE))
- {
- BackendInfo *bkinfo = pool_get_node_info(session_context->load_balance_node_id);
-
- /*
- * Load balance if possible
- */
-
- /*
- * If system catalog is used in the SELECT, we prefer to
- * send to the primary. Example: SELECT * FROM pg_class
- * WHERE relname = 't1'; Because 't1' is a constant, it's
- * hard to recognize as table name. Most use case such
- * query is against system catalog, and the table name can
- * be a temporary table, it's best to query against
- * primary system catalog. Please note that this test must
- * be done *before* test using pool_has_temp_table.
- */
- if (pool_has_system_catalog(node))
- {
- ereport(DEBUG1,
- (errmsg("could not load balance because systems catalogs are used"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
-
- /*
- * If temporary table is used in the SELECT, we prefer to
- * send to the primary.
- */
- else if (pool_config->check_temp_table && pool_has_temp_table(node))
- {
- ereport(DEBUG1,
- (errmsg("could not load balance because temporary tables are used"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
-
- /*
- * If unlogged table is used in the SELECT, we prefer to
- * send to the primary.
- */
- else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
- {
- ereport(DEBUG1,
- (errmsg("could not load balance because unlogged tables are used"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- /*
- * When query match the query patterns in primary_routing_query_pattern_list, we
- * send only to main node.
- */
- else if (pattern_compare(query, WRITELIST, "primary_routing_query_pattern_list") == 1)
- {
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- /*
- * If a writing function call is used, we prefer to send
- * to the primary.
- */
- else if (pool_has_function_call(node))
- {
- ereport(DEBUG1,
- (errmsg("could not load balance because writing functions are used"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- else if (is_select_object_in_temp_write_list(node, query))
- {
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- else
- {
- if (pool_config->statement_level_load_balance)
- {
- session_context->load_balance_node_id = select_load_balancing_node();
- bkinfo = pool_get_node_info(session_context->load_balance_node_id);
- }
-
- /*
- * As streaming replication delay is too much, if
- * prefer_lower_delay_standby is true then elect new
- * load balance node which is lowest delayed,
- * false then send to the primary.
- */
- if (STREAM &&
- pool_config->delay_threshold &&
- bkinfo->standby_delay > pool_config->delay_threshold)
- {
- ereport(DEBUG1,
- (errmsg("could not load balance because of too much replication delay"),
- errdetail("destination = %d for query= \"%s\"", dest, query)));
-
- if (pool_config->prefer_lower_delay_standby)
- {
- int new_load_balancing_node = select_load_balancing_node();
-
- session_context->load_balance_node_id = new_load_balancing_node;
- session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
- pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
- }
- else
- {
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- }
- else
- {
- session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
- pool_set_node_to_be_sent(query_context,
- session_context->query_context->load_balance_node_id);
- }
- }
- }
- else
- {
- /* Send to the primary only */
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- }
- else
- {
- /* Send to the primary only */
- pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
- }
- }
- }
- else if (REPLICATION && query_context->is_multi_statement)
- {
- pool_setall_node_to_be_sent(query_context);
+ where_to_send_main_replica(query_context, query, node);
}
else if (REPLICATION)
{
- if (pool_config->load_balance_mode &&
- is_select_query(node, query) &&
- MAJOR(backend) == PROTO_MAJOR_V3)
+ if (query_context->is_multi_statement)
{
- /*
- * In snapshot isolation mode, we always load balance if current
- * transaction is read only unless load balance mode is off.
- */
- if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
- pool_config->load_balance_mode)
- {
- if (TSTATE(backend, MAIN_NODE_ID) == 'T')
- {
- /*
- * We are in an explicit transaction. If the transaction is
- * read only, we can load balance.
- */
- if (session_context->transaction_read_only)
- {
- /* Ok, we can load balance. We are done! */
- set_load_balance_info(query_context);
- set_virtual_main_node(query_context);
- return;
- }
- }
- else if (TSTATE(backend, MAIN_NODE_ID) == 'I')
- {
- /*
- * We are out side transaction. If default transaction is read only,
- * we can load balance.
- */
- static char *si_query = "SELECT current_setting('transaction_read_only')";
- POOL_SELECT_RESULT *res;
- bool load_balance = false;
-
- do_query(CONNECTION(backend, MAIN_NODE_ID), si_query, &res, MAJOR(backend));
- if (res)
- {
- if (res->data[0] && !strcmp(res->data[0], "on"))
- {
- load_balance = true;
- }
- free_select_result(res);
- }
-
- per_node_statement_log(backend, MAIN_NODE_ID, si_query);
-
- if (load_balance)
- {
- /* Ok, we can load balance. We are done! */
- set_load_balance_info(query_context);
- set_virtual_main_node(query_context);
- return;
- }
- }
- }
-
- /*
- * If a writing function call is used or replicate_select is true,
- * we prefer to send to all nodes.
- */
- if (pool_has_function_call(node) || pool_config->replicate_select)
- {
- pool_setall_node_to_be_sent(query_context);
- }
-
- /*
- * If (we are outside of an explicit transaction) OR (the
- * transaction has not issued a write query yet, AND transaction
- * isolation level is not SERIALIZABLE) we might be able to load
- * balance.
- */
- else if (TSTATE(backend, MAIN_NODE_ID) == 'I' ||
- (!pool_is_writing_transaction() &&
- !pool_is_failed_transaction() &&
- pool_get_transaction_isolation() != POOL_SERIALIZABLE))
- {
- set_load_balance_info(query_context);
- }
- else
- {
- /* only send to main node */
- pool_set_node_to_be_sent(query_context, REAL_MAIN_NODE_ID);
- }
+ pool_setall_node_to_be_sent(query_context);
}
else
- {
- if (is_select_query(node, query) && !pool_config->replicate_select &&
- !pool_has_function_call(node))
- {
- /* only send to main node */
- pool_set_node_to_be_sent(query_context, REAL_MAIN_NODE_ID);
- }
- else
- {
- /* send to all nodes */
- pool_setall_node_to_be_sent(query_context);
- }
- }
+ where_to_send_native_replication(query_context, query, node);
}
else
{
}
}
+
+/*
+ * Decide the backend node to be sent in streaming replication mode, logical
+ * replication mode and slony mode. Called by pool_where_to_send.
+ */
+static void
+where_to_send_main_replica(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
+{
+ POOL_DEST dest;
+ POOL_SESSION_CONTEXT *session_context;
+ POOL_CONNECTION_POOL *backend;
+
+ dest = send_to_where(node);
+ session_context = pool_get_session_context(false);
+ backend = session_context->backend;
+
+ dml_adaptive(node, query);
+
+ ereport(DEBUG1,
+ (errmsg("decide where to send the query"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ /* Should be sent to primary only? */
+ if (dest == POOL_PRIMARY)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ /* Should be sent to both primary and standby? */
+ else if (dest == POOL_BOTH)
+ {
+ if (is_tx_started_by_multi_statement_query())
+ {
+ /*
+ * If we are in an explicit transaction and the transaction
+ * was started by a multi statement query, we should send
+ * query to primary node only (which was supposed to be sent
+ * to all nodes) until the transaction gets committed or
+ * aborted.
+ */
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ pool_setall_node_to_be_sent(query_context);
+ }
+ }
+ else if (pool_is_writing_transaction() &&
+ pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+
+ /*
+ * Ok, we might be able to load balance the SELECT query.
+ */
+ else
+ {
+ if (pool_config->load_balance_mode &&
+ is_select_query(node, query) &&
+ MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ /*
+ * If (we are outside of an explicit transaction) OR (the
+ * transaction has not issued a write query yet, AND
+ * transaction isolation level is not SERIALIZABLE) we might
+ * be able to load balance.
+ */
+
+ ereport(DEBUG1,
+ (errmsg("checking load balance preconditions. TSTATE:%c writing_transaction:%d failed_transaction:%d isolation:%d",
+ TSTATE(backend, PRIMARY_NODE_ID),
+ pool_is_writing_transaction(),
+ pool_is_failed_transaction(),
+ pool_get_transaction_isolation()),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (TSTATE(backend, PRIMARY_NODE_ID) == 'I' ||
+ (!pool_is_writing_transaction() &&
+ !pool_is_failed_transaction() &&
+ pool_get_transaction_isolation() != POOL_SERIALIZABLE))
+ {
+ BackendInfo *bkinfo = pool_get_node_info(session_context->load_balance_node_id);
+
+ /*
+ * Load balance if possible
+ */
+
+ /*
+ * If system catalog is used in the SELECT, we prefer to
+ * send to the primary. Example: SELECT * FROM pg_class
+ * WHERE relname = 't1'; Because 't1' is a constant, it's
+ * hard to recognize as table name. Most use case such
+ * query is against system catalog, and the table name can
+ * be a temporary table, it's best to query against
+ * primary system catalog. Please note that this test must
+ * be done *before* test using pool_has_temp_table.
+ */
+ if (pool_has_system_catalog(node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because systems catalogs are used"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+
+ /*
+ * If temporary table is used in the SELECT, we prefer to
+ * send to the primary.
+ */
+ else if (pool_config->check_temp_table && pool_has_temp_table(node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because temporary tables are used"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+
+ /*
+ * If unlogged table is used in the SELECT, we prefer to
+ * send to the primary.
+ */
+ else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because unlogged tables are used"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ /*
+ * When query match the query patterns in primary_routing_query_pattern_list, we
+ * send only to main node.
+ */
+ else if (pattern_compare(query, WRITELIST, "primary_routing_query_pattern_list") == 1)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ /*
+ * If a writing function call is used, we prefer to send
+ * to the primary.
+ */
+ else if (pool_has_function_call(node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because writing functions are used"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else if (is_select_object_in_temp_write_list(node, query))
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id = select_load_balancing_node();
+ bkinfo = pool_get_node_info(session_context->load_balance_node_id);
+ }
+
+ /*
+ * As streaming replication delay is too much, if
+ * prefer_lower_delay_standby is true then elect new
+ * load balance node which is lowest delayed,
+ * false then send to the primary.
+ */
+ if (STREAM &&
+ pool_config->delay_threshold &&
+ bkinfo->standby_delay > pool_config->delay_threshold)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of too much replication delay"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ int new_load_balancing_node = select_load_balancing_node();
+
+ session_context->load_balance_node_id = new_load_balancing_node;
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context,
+ session_context->query_context->load_balance_node_id);
+ }
+ }
+ }
+ else
+ {
+ /* Send to the primary only */
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ /* Send to the primary only */
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+}
+
+/*
+ * Decide the backend node to be sent in replication mode and snapshot
+ * isolation mode.
+ * Called by pool_where_to_send.
+ */
+static void
+where_to_send_native_replication(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
+{
+ POOL_SESSION_CONTEXT *session_context;
+ POOL_CONNECTION_POOL *backend;
+
+ session_context = pool_get_session_context(false);
+ backend = session_context->backend;
+
+ if (pool_config->load_balance_mode &&
+ is_select_query(node, query) &&
+ MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ /*
+ * In snapshot isolation mode, we always load balance if current
+ * transaction is read only unless load balance mode is off.
+ */
+ if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
+ pool_config->load_balance_mode)
+ {
+ if (TSTATE(backend, MAIN_NODE_ID) == 'T')
+ {
+ /*
+ * We are in an explicit transaction. If the transaction is
+ * read only, we can load balance.
+ */
+ if (session_context->transaction_read_only)
+ {
+ /* Ok, we can load balance. We are done! */
+ set_load_balance_info(query_context);
+ set_virtual_main_node(query_context);
+ return;
+ }
+ }
+ else if (TSTATE(backend, MAIN_NODE_ID) == 'I')
+ {
+ /*
+ * We are out side transaction. If default transaction is read only,
+ * we can load balance.
+ */
+ static char *si_query = "SELECT current_setting('transaction_read_only')";
+ POOL_SELECT_RESULT *res;
+ bool load_balance = false;
+
+ do_query(CONNECTION(backend, MAIN_NODE_ID), si_query, &res, MAJOR(backend));
+ if (res)
+ {
+ if (res->data[0] && !strcmp(res->data[0], "on"))
+ {
+ load_balance = true;
+ }
+ free_select_result(res);
+ }
+
+ per_node_statement_log(backend, MAIN_NODE_ID, si_query);
+
+ if (load_balance)
+ {
+ /* Ok, we can load balance. We are done! */
+ set_load_balance_info(query_context);
+ set_virtual_main_node(query_context);
+ return;
+ }
+ }
+ }
+
+ /*
+ * If a writing function call is used or replicate_select is true,
+ * we prefer to send to all nodes.
+ */
+ if (pool_has_function_call(node) || pool_config->replicate_select)
+ {
+ pool_setall_node_to_be_sent(query_context);
+ }
+
+ /*
+ * If (we are outside of an explicit transaction) OR (the
+ * transaction has not issued a write query yet, AND transaction
+ * isolation level is not SERIALIZABLE) we might be able to load
+ * balance.
+ */
+ else if (TSTATE(backend, MAIN_NODE_ID) == 'I' ||
+ (!pool_is_writing_transaction() &&
+ !pool_is_failed_transaction() &&
+ pool_get_transaction_isolation() != POOL_SERIALIZABLE))
+ {
+ set_load_balance_info(query_context);
+ }
+ else
+ {
+ /* only send to main node */
+ pool_set_node_to_be_sent(query_context, REAL_MAIN_NODE_ID);
+ }
+ }
+ else
+ {
+ if (is_select_query(node, query) && !pool_config->replicate_select &&
+ !pool_has_function_call(node))
+ {
+ /* only send to main node */
+ pool_set_node_to_be_sent(query_context, REAL_MAIN_NODE_ID);
+ }
+ else
+ {
+ /* send to all nodes */
+ pool_setall_node_to_be_sent(query_context);
+ }
+ }
+}