#include "parser/pool_string.h"
#include "pool_session_context.h"
#include "pool_query_context.h"
+#include "pool_lobj.h"
int force_replication;
int replication_was_enabled; /* replication mode was enabled */
static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
static void generate_error_message(char *prefix, int specific_error, char *query);
static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
+static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ PreparedStatement *ps);
+static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context);
/*
* Process Query('Q') message
* Query messages include an SQL string.
*/
- POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
- POOL_CONNECTION_POOL *backend, char *query)
+POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, int len, char *contents)
{
- char *string;
- int len;
static char *sq = "show pool_status";
int commit;
List *parse_tree_list;
return POOL_END;
}
- if (query == NULL) /* need to read query from frontend? */
- {
- /* read actual query */
- if (MAJOR(backend) == PROTO_MAJOR_V3)
- {
- if (pool_read(frontend, &len, sizeof(len)) < 0)
- return POOL_END;
- len = ntohl(len) - 4;
- string = pool_read2(frontend, len);
- }
- else
- string = pool_read_string(frontend, &len, 0);
-
- if (string == NULL)
- return POOL_END;
- }
- else
- {
- len = strlen(query)+1;
- string = query;
- }
-
/* save last query string for logging purpose */
- strncpy(query_string_buffer, string, sizeof(query_string_buffer));
+ strncpy(query_string_buffer, contents, sizeof(query_string_buffer));
/* show ps status */
- query_ps_status(string, backend);
+ query_ps_status(contents, backend);
/* log query to log file if necessary */
if (pool_config->log_statement)
{
- pool_log("statement: %s", string);
+ pool_log("statement: %s", contents);
}
else
{
- pool_debug("statement2: %s", string);
+ pool_debug("statement2: %s", contents);
}
/* Create query context */
pool_set_query_in_progress();
/* parse SQL string */
- parse_tree_list = raw_parser(string);
+ parse_tree_list = raw_parser(contents);
if (parse_tree_list == NIL)
{
*/
char *p = "INSERT INTO foo VALUES(1)";
- pool_log("SimpleQuery: Unable to parse the query: %s", string);
+ pool_log("SimpleQuery: Unable to parse the query: %s", contents);
parse_tree_list = raw_parser(p);
}
/*
* Start query context
*/
- pool_start_query(query_context, string, node);
+ pool_start_query(query_context, contents, node);
if (PARALLEL_MODE)
{
bool parallel = true;
is_parallel_table = is_partition_table(backend,node);
- status = pool_do_parallel_query(frontend, backend, node, ¶llel, &string, &len);
+ status = pool_do_parallel_query(frontend, backend, node, ¶llel, &contents, &len);
if (parallel)
{
pool_query_context_destroy(query_context);
}
/* process status reporting? */
- if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, strlen(sq)) == 0)
+ if (IsA(node, VariableShowStmt) && strncasecmp(sq, contents, strlen(sq)) == 0)
{
StartupPacket *sp;
char psbuf[1024];
free_parser();
pool_query_context_destroy(query_context);
+ pool_set_pool_status_stmt();
return POOL_CONTINUE;
}
if (IsA(node, PrepareStmt))
{
PreparedStatement *ps;
-#ifdef NOT_USED
+
ps = pool_create_prepared_statement(((PrepareStmt *)node)->name,
- 0, query_context);
+ 0, 0, NULL, query_context);
if (ps == NULL)
{
pool_error("SimpleQuery: failed to create prepared statement: %s", strerror(errno));
}
session_context->pending_pstmt = ps;
- session_context->pending_function = pool_add_prepared_statement;
-#endif
}
else if (IsA(node, DeallocateStmt))
{
char *name;
PreparedStatement *ps;
-#ifdef NOT_USED
+
name = ((DeallocateStmt *)node)->name;
if (name == NULL)
- ps = pool_create_prepared_statement("", 0, query_context);
+ ps = pool_create_prepared_statement("", 0, 0, NULL, query_context);
else
- ps = pool_create_prepared_statement(name, 0, query_context);
+ ps = pool_create_prepared_statement(name, 0, 0, NULL, query_context);
if (ps == NULL)
{
pool_error("SimpleQuery: failed to create prepared statement: %s", strerror(errno));
session_context->pending_pstmt = ps;
+#ifdef NOT_USED
if (name == NULL)
session_context->pending_function = pool_clear_prepared_statement_list;
else
session_context->pending_function = pool_remove_prepared_statement;
-#endif
}
else if (IsA(node, DiscardStmt))
{
{
session_context->pending_pstmt = NULL;
session_context->pending_portal = NULL;
- session_context->pending_function = pool_clear_prepared_statement_list;
}
+#endif
}
}
if (frontend && IsA(node, ExecuteStmt))
{
-#ifdef NOT_USED
+#ifdef NOT_USED
PreparedStatement *ps;
ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *)node)->name);
return POOL_END;
/* check if need lock */
- if (need_insert_lock(backend, string, node))
+ if (need_insert_lock(backend, contents, node))
{
/* if so, issue lock command */
- status = insert_lock(frontend, backend, string, (InsertStmt *)node);
+ status = insert_lock(frontend, backend, contents, (InsertStmt *)node);
if (status != POOL_CONTINUE)
{
free_parser();
}
}
}
- else if (REPLICATION && query == NULL && start_internal_transaction(frontend, backend, node))
+ else if (REPLICATION && contents == NULL && start_internal_transaction(frontend, backend, node))
{
free_parser();
return POOL_ERROR;
*/
if (!commit)
{
- char *rewrite_query;
+ char *rewrite_query;
+ char *string;
if (node)
{
if (IsA(node, PrepareStmt))
{
-#ifdef NOT_USED
ps = session_context->pending_pstmt;
ps->num_tsparams = 0;
-#endif
}
else if (IsA(node, ExecuteStmt))
ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *) node)->name);
if (rewrite_query != NULL)
{
query_context->rewritten_query = rewrite_query;
- len = strlen(string) + 1;
+ len = strlen(rewrite_query) + 1;
}
}
/*
* Send the query to other than master node.
*/
- if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "") != POOL_CONTINUE)
{
free_parser();
return POOL_END;
/* Send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
if (commit)
{
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
{
/*
free_parser();
}
else
{
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
{
free_parser();
return POOL_END;
/*
* process EXECUTE (V3 only)
*/
-POOL_STATUS Execute(POOL_CONNECTION *frontend,
- POOL_CONNECTION_POOL *backend)
+POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
{
- char *string; /* portal name + null terminate + max_tobe_returned_rows */
- int len;
- char kind;
- int status, commit = 0;
- PreparedStatement *ps;
+// char kind;
+// int status;
+ int commit = 0;
+// PreparedStatement *ps;
Portal *portal;
- char *string1 = NULL;
- PrepareStmt *p_stmt;
- POOL_STATUS ret;
+ char *query = NULL;
+ Node *node;
+// PrepareStmt *p_stmt;
+// POOL_STATUS ret;
int specific_error = 0;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
return POOL_END;
}
- /* read Execute packet */
- if (pool_read(frontend, &len, sizeof(len)) < 0)
- return POOL_END;
-
- len = ntohl(len) - 4;
- string = pool_read2(frontend, len);
-
- pool_debug("Execute: portal name <%s>", string);
-
- portal = pool_get_portal_by_portal_name(string);
- ps = pool_get_prepared_statement_by_pstmt_name(portal->pstmt->name);
- query_context = ps->qctxt;
+ pool_debug("Execute: portal name <%s>", contents);
- /* load balance trick */
- if (portal)
+ portal = pool_get_portal_by_portal_name(contents);
+ if (portal == NULL)
{
- Node *node;
+ pool_error("Execute: cannot get portal");
+ return POOL_END;
+ }
+ if (portal->pstmt == NULL)
+ {
+ pool_error("Execute: cannot get prepared statement");
+ return POOL_END;
+ }
+ if (portal->pstmt->qctxt == NULL)
+ {
+ pool_error("Execute: cannot get query context");
+ return POOL_END;
+ }
+ if (portal->pstmt->qctxt->parse_tree== NULL)
+ {
+ pool_error("Execute: cannot get parse tree");
+ return POOL_END;
+ }
- p_stmt = (PrepareStmt *)portal->pstmt->qctxt->parse_tree;
+ query_context = portal->pstmt->qctxt;
+ node = query_context->parse_tree;
+ query = portal->pstmt->qctxt->original_query;
+ pool_debug("Execute: query: %s", query);
+ strncpy(query_string_buffer, query, sizeof(query_string_buffer));
- string1 = portal->pstmt->qctxt->original_query;
- pool_debug("Execute: query: %s", string1);
- node = (Node *)p_stmt;
- strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
+// pool_set_query_in_progress();
- /*
- * Start query context
- */
- pool_start_query(query_context, string1, node);
+ /*
+ * Decide where to send query
+ */
+ session_context->query_context = query_context;
+ pool_where_to_send(query_context, query, node);
- /*
- * Decide where to send query
- */
- pool_where_to_send(query_context, query_context->original_query, node);
- if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
- IsA(node, VariableSetStmt)) &&
- MASTER_SLAVE && TSTATE(backend, MASTER_NODE_ID) != 'E')
- {
- /*
- * PREPARE, DEALLOCATE, SET, DISCARD
- * should be executed on all nodes. So we set
- * force_replication.
- */
- force_replication = 1;
- }
- /*
- * JDBC driver sends "BEGIN" query internally if
- * setAutoCommit(false). But it does not send Sync message
- * after "BEGIN" query. In extended query protocol,
- * PostgreSQL returns ReadyForQuery when a client sends Sync
- * message. Problem is, pgpool can't know the transaction
- * state without receiving ReadyForQuery. So we remember that
- * we need to send Sync message internally afterward, whenever
- * we receive BEGIN in extended protocol.
- */
- else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
- {
- TransactionStmt *stmt = (TransactionStmt *) node;
+ if (IsA(query_context->parse_tree, DeallocateStmt))
+ overwrite_map_for_deallocate(query_context);
- if (stmt->kind == TRANS_STMT_BEGIN ||
- stmt->kind == TRANS_STMT_START)
- /* Remember we need to send sync later in extended protocol */
- receive_extended_begin = 1;
- }
+#ifdef NOT_USED
+ /*
+ * JDBC driver sends "BEGIN" query internally if
+ * setAutoCommit(false). But it does not send Sync message
+ * after "BEGIN" query. In extended query protocol,
+ * PostgreSQL returns ReadyForQuery when a client sends Sync
+ * message. Problem is, pgpool can't know the transaction
+ * state without receiving ReadyForQuery. So we remember that
+ * we need to send Sync message internally afterward, whenever
+ * we receive BEGIN in extended protocol.
+ */
+ if (IsA(node, TransactionStmt) && MASTER_SLAVE)
+ {
+ TransactionStmt *stmt = (TransactionStmt *) node;
- /* check if query is "COMMIT" or "ROLLBACK" */
- commit = is_commit_query((Node *)p_stmt->query);
+ if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
+ /* Remember we need to send sync later in extended protocol */
+ receive_extended_begin = 1;
}
+#endif
+ /* check if query is "COMMIT" or "ROLLBACK" */
+ commit = is_commit_query(node);
if (REPLICATION || PARALLEL_MODE)
{
if (!commit)
{
/* Send the query to master node */
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
{
return POOL_END;
}
if (specific_error)
{
/* log error message */
- generate_error_message("Execute: ", specific_error, string);
+ generate_error_message("Execute: ", specific_error, contents);
}
}
if (pool_send_and_wait(query_context, msg, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
return POOL_END;
}
- else
- {
- if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+// else
+// {
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
return POOL_END;
- }
+// }
/* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
if (commit)
{
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
{
return POOL_END;
}
}
else
{
- if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+ {
+ return POOL_END;
+ }
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
{
return POOL_END;
}
}
+#ifdef NOT_USED
while ((ret = read_kind_from_backend(frontend, backend, &kind)) == POOL_CONTINUE)
{
/*
status = SimpleForwardToFrontend(kind, frontend, backend);
if (status != POOL_CONTINUE)
return status;
+#endif
return POOL_CONTINUE;
}
/*
* process Parse (V3 only)
*/
-POOL_STATUS Parse(POOL_CONNECTION *frontend,
- POOL_CONNECTION_POOL *backend)
+POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
{
- char kind;
- int len;
- char *string;
- int i;
- POOL_MEMORY_POOL *old_context;
- PrepareStmt *p_stmt;
- char *name, *stmt;
- List *parse_tree_list;
- Node *node = NULL;
int deadlock_detected = 0;
int insert_stmt_with_lock = 0;
- POOL_STATUS status;
- char per_node_statement_log_buffer[1024];
+ char *name;
+ char *stmt;
+ List *parse_tree_list;
+ Node *node = NULL;
+// PrepareStmt *p_stmt;
PreparedStatement *ps;
+ POOL_STATUS status;
+ POOL_MEMORY_POOL *old_context;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
/* Create query context */
query_context = pool_init_query_context();
- /* read Parse packet */
- if (pool_read(frontend, &len, sizeof(len)) < 0)
- return POOL_END;
-
- len = ntohl(len) - 4;
- string = pool_read2(frontend, len);
+ pool_debug("Parse: statement name <%s>", contents);
- pool_debug("Parse: statement name <%s>", string);
+ name = contents;
+ stmt = contents + strlen(contents) + 1;
- name = string;
- stmt = string + strlen(string) + 1;
+ /* switch memory context */
+ old_context = pool_memory;
+ pool_memory = query_context->memory_context;
parse_tree_list = raw_parser(stmt);
if (parse_tree_list == NIL)
insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
- /* Special treatment for master/slave + temp tables */
- if (MASTER_SLAVE)
- {
- /* Is there "NO LOAD BALANCE" comment? */
- if (!strncasecmp(stmt, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ) ||
- /* or the table used in a query is a temporary one ? */
- is_temp_table(backend, node))
- {
- /*
- * From now on, let only master handle queries. This is
- * typically usefull for using temp tables in master/slave
- * mode
- */
- master_slave_was_enabled = 1;
- MASTER_SLAVE = 0;
- master_slave_dml = 1;
- }
- }
-
- /* switch memory context */
- old_context = pool_memory;
- pool_memory = session_context->memory_context;
-
- /* translate Parse message to PrepareStmt */
- p_stmt = palloc(sizeof(PrepareStmt));
- p_stmt->type = T_PrepareStmt;
-
- /* XXX: there's a confusion here. Someone mixed up statement
- * name with portal name. It is regarded that statment name ==
- * portal name. Someday we should fix this. Sigh.
- */
- p_stmt->name = pstrdup(name);
- p_stmt->query = copyObject(node);
- ps = pool_create_prepared_statement(name, 0, query_context);
-
/*
* Start query context
*/
- pool_start_query(query_context, pstrdup(stmt), (Node *)p_stmt);
+ pool_start_query(query_context, pstrdup(stmt), node);
+
+ ps = pool_create_prepared_statement(name, 0, len, contents, query_context);
+ session_context->pending_pstmt = ps;
/*
* Decide where to send query
*/
- pool_where_to_send(query_context, query_context->original_query, (Node *)p_stmt);
-
- if (*name)
- {
- session_context->pending_pstmt = ps;
- session_context->pending_function = pool_add_prepared_statement;
- }
- else /* unnamed statement */
- {
- pfree(p_stmt->name);
- ps->name = "";
- session_context->pending_pstmt = ps;
- session_context->pending_function = pool_add_prepared_statement;
- }
+ pool_where_to_send(query_context, query_context->original_query,
+ query_context->parse_tree);
- /*
- * Switch to old memory context. Caution. Now we are in parser
- * memory context.
- * Palloced memories will be gone if free_parser() called!
- */
- pool_memory = old_context;
+ if (IsA(query_context->parse_tree, DeallocateStmt))
+ overwrite_map_for_deallocate(query_context);
if (REPLICATION)
{
- char *rewrite_query;
- bool rewrite_to_params = true;
+ char *rewrite_query;
+ bool rewrite_to_params = true;
/*
* rewrite `now()'.
if (rewrite_query != NULL)
{
int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
- string = palloc(alloc_len);
- strcpy(string, name);
- strcpy(string + strlen(name) + 1, rewrite_query);
- memcpy(string + strlen(name) + strlen(rewrite_query) + 2,
- stmt + strlen(stmt) + 1,
- len - (strlen(name) + strlen(stmt) + 2));
-
- len = len - strlen(stmt) + strlen(rewrite_query);
- name = string;
- stmt = string + strlen(name) + 1;
- pool_debug("rewrite query %s %s len=%d", name, stmt, len);
+ contents = palloc(alloc_len);
+ strcpy(contents, name);
+ strcpy(contents + strlen(name) + 1, rewrite_query);
+ memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
+ stmt + strlen(stmt) + 1,
+ len - (strlen(name) + strlen(stmt) + 2));
+
+ len = alloc_len;
+ name = contents;
+ stmt = contents + strlen(name) + 1;
+ pool_debug("Parse: rewrite query %s %s len=%d", name, stmt, len);
+
+ ps->parse_len = len;
+ ps->parse_contents = contents;
}
}
+ }
+ pool_memory = old_context;
- if (REPLICATION)
+ if (REPLICATION)
+ {
+ char kind;
+
+ if (TSTATE(backend, MASTER_NODE_ID) != 'T')
{
- char kind;
+ int i;
- if (TSTATE(backend, MASTER_NODE_ID) != 'T')
+ /* synchronize transaction state */
+ for (i = 0; i < NUM_BACKENDS; i++)
{
- /* synchronize transaction state */
- for (i = 0; i < NUM_BACKENDS; i++)
- {
- if (!VALID_BACKEND(i))
- continue;
+ if (!VALID_BACKEND(i))
+ continue;
- /* send sync message */
- send_extended_protocol_message(backend, i, "S", 0, "");
- }
+ /* send sync message */
+ send_extended_protocol_message(backend, i, "S", 0, "");
+ }
- kind = pool_read_kind(backend);
- if (kind != 'Z')
- {
- free_parser();
- return POOL_END;
- }
+ kind = pool_read_kind(backend);
+ if (kind != 'Z')
+ {
+// free_parser();
+ return POOL_END;
+ }
- if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
- {
- free_parser();
- return POOL_END;
- }
+ if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
+ {
+// free_parser();
+ return POOL_END;
}
+ }
- if (is_strict_query(node))
- start_internal_transaction(frontend, backend, node);
+ if (is_strict_query(query_context->parse_tree))
+ start_internal_transaction(frontend, backend, query_context->parse_tree);
- if (insert_stmt_with_lock)
+ if (insert_stmt_with_lock)
+ {
+ /* start a transaction if needed and lock the table */
+ status = insert_lock(frontend, backend, stmt, (InsertStmt *)query_context->parse_tree);
+ if (status != POOL_CONTINUE)
{
- /* start a transaction if needed and lock the table */
- status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
- if (status != POOL_CONTINUE)
- {
- free_parser();
- return status;
- }
+// free_parser();
+ return status;
}
}
}
- /* send to master node */
- snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
-
/*
* Cannot call free_parser() here. Since "string" might be allocated in parser context.
* free_parser();
* locks.
*/
pool_debug("Parse: waiting for master completing the query");
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
{
- free_parser();
+// free_parser();
return POOL_END;
}
deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
if (deadlock_detected < 0)
{
- free_parser();
+// free_parser();
return POOL_END;
}
else
* this case, PostgreSQL does not report what statement caused
* that error and make users confused.
*/
- per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse(): Error or notice message from backend: ", true);
+ per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
}
if (deadlock_detected)
strlen(POOL_ERROR_QUERY)+1, -1,
MASTER_NODE_ID, "") != POOL_CONTINUE)
{
- free_parser();
+// free_parser();
return POOL_END;
}
}
else
{
- snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
- if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
{
- free_parser();
+// free_parser();
return POOL_END;
}
}
}
else
{
- if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
{
- free_parser();
+// free_parser();
return POOL_END;
}
}
/*
* Ok. we are safe to call free_parser();
*/
- free_parser();
+// free_parser();
+#ifdef NOT_USED
for (;;)
{
+ char kind;
POOL_STATUS ret;
ret = read_kind_from_backend(frontend, backend, &kind);
if (kind != 'N')
break;
}
+#endif
+
return POOL_CONTINUE;
+
}
-/*
- * Process ReadyForQuery('Z') message.
- *
- * - if the global error status "mismatch_ntuples" is set, send an error query
- * to all DB nodes to abort transaction.
- * - internal transaction is closed
- */
-POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
- POOL_CONNECTION_POOL *backend, int send_ready)
+POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
{
- StartupPacket *sp;
- char psbuf[1024];
- int i;
- int len;
- signed char kind;
- signed char state;
+ char *pstmt_name;
+ char *portal_name;
+ char *rewrite_msg;
+ Portal *portal = NULL;
+ PreparedStatement *pstmt = NULL;
POOL_SESSION_CONTEXT *session_context;
+ POOL_QUERY_CONTEXT *query_context;
/* Get session context */
session_context = pool_get_session_context();
if (!session_context)
{
- pool_error("ReadyForQuery: cannot get session context");
+ pool_error("Bind: cannot get session context");
return POOL_END;
}
/*
- * If the numbers of update tuples are differ, we need to abort transaction
- * by using do_error_command. This only works with PROTO_MAJOR_V3.
+ * Rewrite message
*/
- if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
- {
- int i;
- signed char state;
- char kind;
+ portal_name = contents;
+ pstmt_name = contents + strlen(portal_name) + 1;
- /*
- * XXX: discard rest of ReadyForQuery packet
- */
- if (pool_read_message_length(backend) < 0)
- return POOL_END;
+ pstmt = pool_get_prepared_statement_by_pstmt_name(pstmt_name);
+ if (pstmt == NULL)
+ {
+ pool_error("Bind: cannot get prepared statement \"%s\"", pstmt_name);
+ return POOL_END;
+ }
- state = pool_read_kind(backend);
- if (state < 0)
- return POOL_END;
+ portal = pool_create_portal(portal_name, pstmt->num_tsparams, pstmt);
+ if (portal == NULL)
+ {
+ pool_error("Bind: cannot create portal: %s", strerror(errno));
+ return POOL_END;
+ }
- pool_debug("ReadyForQuery: transaction state: %c", state);
+ query_context = pstmt->qctxt;
+ if (query_context == NULL)
+ {
+ pool_error("Bind: cannot get query context");
+ return POOL_END;
+ }
- for (i = 0; i < NUM_BACKENDS; i++)
- {
- if (VALID_BACKEND(i))
- {
- /* abort transaction on all nodes. */
- do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
- }
- }
+ session_context->pending_portal = portal;
- /* loop through until we get ReadyForQuery */
- for(;;)
- {
- kind = pool_read_kind(backend);
- if (kind < 0)
- return POOL_END;
+ /* rewrite bind message */
+ if (REPLICATION && portal->num_tsparams > 0)
+ {
+ rewrite_msg = bind_rewrite_timestamp(backend, portal, contents, &len);
+ if (rewrite_msg != NULL)
+ contents = rewrite_msg;
+ }
- if (kind == 'Z')
- break;
+ session_context->query_context = query_context;
+ pool_where_to_send(query_context, query_context->original_query,
+ query_context->parse_tree);
- /* put the message back to read buffer */
- for (i=0;i<NUM_BACKENDS;i++)
- {
- if (VALID_BACKEND(i))
- {
- pool_unread(CONNECTION(backend,i), &kind, 1);
- }
- }
+ if (IsA(query_context->parse_tree, DeallocateStmt))
+ overwrite_map_for_deallocate(query_context);
- /* discard rest of the packet */
- if (pool_discard_packet(backend) != POOL_CONTINUE)
- {
- pool_error("ReadyForQuery: pool_discard_packet failed");
- return POOL_END;
- }
- }
- mismatch_ntuples = 0;
+ if (pool_config->load_balance_mode && pool_is_writing_transaction())
+ {
+ if(parse_before_bind(frontend, backend, pstmt) != POOL_CONTINUE)
+ return POOL_END;
}
- /*
- * if a transaction is started for insert lock, we need to close
- * the transaction.
- */
- if (pool_is_query_in_progress() && allow_close_transaction)
+ pool_debug("Bind: waiting for master completing the query");
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "B")
+ != POOL_CONTINUE)
{
- if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
- return POOL_END;
+// if (rewrite_msg != NULL)
+// free(rewrite_msg);
+ return POOL_END;
}
- if (MAJOR(backend) == PROTO_MAJOR_V3)
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "B")
+ != POOL_CONTINUE)
{
- if ((len = pool_read_message_length(backend)) < 0)
- return POOL_END;
+// if (rewrite_msg != NULL)
+// free(rewrite_msg);
+ return POOL_END;
+ }
-#ifdef NOT_USED
- pool_debug("ReadyForQuery: message length: %d", len);
+// if (rewrite_msg != NULL)
+// free(rewrite_msg);
- /*
- * Do not check transaction state in master/slave mode.
- * Because SET, PREPARE, DEALLOCATE are replicated.
- * If these queries are executed inside a transaction block,
- * transation state will be inconsistent. But it is no problem.
- */
- if (master_slave_dml)
- {
- char kind, kind1;
+ return POOL_CONTINUE;
+}
+
+POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
+{
+ Portal *portal = NULL;
+ PreparedStatement *pstmt = NULL;
+ POOL_SESSION_CONTEXT *session_context;
+ POOL_QUERY_CONTEXT *query_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("Describe: cannot get session context");
+ return POOL_END;
+ }
+
+ /* Prepared Statement */
+ if (*contents == 'S')
+ {
+ pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
+ }
+ /* Portal */
+ else
+ {
+ portal = pool_get_portal_by_portal_name(contents+1);
+ if (portal == NULL)
+ {
+ pool_error("Describe: cannot get portal \"%s\"", contents+1);
+ return POOL_END;
+ }
+
+ pstmt = portal->pstmt;
+ }
+
+ if (pstmt == NULL)
+ {
+ pool_error("Describe: cannot get prepared statement");
+ return POOL_END;
+ }
+
+ query_context = pstmt->qctxt;
+ if (query_context == NULL)
+ {
+ pool_error("Describe: cannot get query context");
+ return POOL_END;
+ }
+
+ session_context->query_context = query_context;
+ pool_where_to_send(query_context, query_context->original_query,
+ query_context->parse_tree);
+
+ if (IsA(query_context->parse_tree, DeallocateStmt))
+ overwrite_map_for_deallocate(query_context);
+
+ pool_debug("Describe: waiting for master completing the query");
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "D")
+ != POOL_CONTINUE)
+ return POOL_END;
+
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "D")
+ != POOL_CONTINUE)
+ return POOL_END;
+
+ return POOL_CONTINUE;
+}
+
+POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
+{
+ Portal *portal = NULL;
+ PreparedStatement *pstmt = NULL;
+ POOL_SESSION_CONTEXT *session_context;
+ POOL_QUERY_CONTEXT *query_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("Close: cannot get session context");
+ return POOL_END;
+ }
+
+ /* Prepared Statement */
+ if (*contents == 'S')
+ {
+ pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
+ if (pstmt == NULL)
+ {
+ pool_error("Close: cannot get prepared statement");
+ return POOL_END;
+ }
+
+ session_context->pending_pstmt = pstmt;
+ }
+ /* Portal */
+ else if (*contents == 'P')
+ {
+ portal = pool_get_portal_by_portal_name(contents+1);
+ if (portal == NULL)
+ {
+ pool_error("Close: cannot get portal");
+ return POOL_END;
+ }
+
+ session_context->pending_portal = portal;
+ }
+ else
+ {
+ pool_error("Close: invalid message");
+ return POOL_END;
+ }
+
+ query_context = pstmt->qctxt;
+ if (query_context == NULL)
+ {
+ pool_error("Close: cannot get query context");
+ return POOL_END;
+ }
+
+ pool_where_to_send(query_context, query_context->original_query,
+ query_context->parse_tree);
+
+ pool_debug("Close: waiting for master completing the query");
+ if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "C")
+ != POOL_CONTINUE)
+ return POOL_END;
+
+ if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "C")
+ != POOL_CONTINUE)
+ return POOL_END;
+
+ return POOL_CONTINUE;
+}
+
+
+POOL_STATUS FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+ int len, char *contents)
+{
+ /*
+ * If Function call message for lo_creat, rewrite it
+ */
+ char *rewrite_lo;
+ int rewrite_len;
+
+ rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
+ backend, &rewrite_len);
+
+ if (rewrite_lo != NULL)
+ {
+ contents = rewrite_lo;
+ len = rewrite_len;
+ }
+ return SimpleForwardToBackend('F', frontend, backend, len, contents);
+}
+
+/*
+ * Process ReadyForQuery('Z') message.
+ *
+ * - if the global error status "mismatch_ntuples" is set, send an error query
+ * to all DB nodes to abort transaction.
+ * - internal transaction is closed
+ */
+POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, int send_ready)
+{
+ StartupPacket *sp;
+ char psbuf[1024];
+ int i;
+ int len;
+ signed char kind;
+ signed char state;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("ReadyForQuery: cannot get session context");
+ return POOL_END;
+ }
+
+ /*
+ * If the numbers of update tuples are differ, we need to abort transaction
+ * by using do_error_command. This only works with PROTO_MAJOR_V3.
+ */
+ if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ int i;
+ signed char state;
+ char kind;
+
+ /*
+ * XXX: discard rest of ReadyForQuery packet
+ */
+ if (pool_read_message_length(backend) < 0)
+ return POOL_END;
+
+ state = pool_read_kind(backend);
+ if (state < 0)
+ return POOL_END;
+
+ pool_debug("ReadyForQuery: transaction state: %c", state);
+
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (VALID_BACKEND(i))
+ {
+ /* abort transaction on all nodes. */
+ do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
+ }
+ }
+
+ /* loop through until we get ReadyForQuery */
+ for(;;)
+ {
+ kind = pool_read_kind(backend);
+ if (kind < 0)
+ return POOL_END;
+
+ if (kind == 'Z')
+ break;
+
+ /* put the message back to read buffer */
+ for (i=0;i<NUM_BACKENDS;i++)
+ {
+ if (VALID_BACKEND(i))
+ {
+ pool_unread(CONNECTION(backend,i), &kind, 1);
+ }
+ }
+
+ /* discard rest of the packet */
+ if (pool_discard_packet(backend) != POOL_CONTINUE)
+ {
+ pool_error("ReadyForQuery: pool_discard_packet failed");
+ return POOL_END;
+ }
+ }
+ mismatch_ntuples = 0;
+ }
+
+ /*
+ * if a transaction is started for insert lock, we need to close
+ * the transaction.
+ */
+// if (pool_is_query_in_progress() && allow_close_transaction)
+ if (allow_close_transaction)
+ {
+ if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
+ return POOL_END;
+ }
+
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ if ((len = pool_read_message_length(backend)) < 0)
+ return POOL_END;
+
+#ifdef NOT_USED
+ pool_debug("ReadyForQuery: message length: %d", len);
+
+ /*
+ * Do not check transaction state in master/slave mode.
+ * Because SET, PREPARE, DEALLOCATE are replicated.
+ * If these queries are executed inside a transaction block,
+ * transation state will be inconsistent. But it is no problem.
+ */
+ if (master_slave_dml)
+ {
+ char kind, kind1;
if (pool_read(MASTER(backend), &kind, sizeof(kind)))
return POOL_END;
* If the query was not READ SELECT, remember that we had
* a write query in this transaction.
*/
- else if (!is_select_query(node, query))
+ else if (!is_select_query(node, query))
+ {
+ pool_set_writing_transaction();
+ }
+ }
+ pool_unset_query_in_progress();
+ }
+
+ if (!pool_is_doing_extended_query_message())
+ pool_query_context_destroy(pool_get_session_context()->query_context);
+
+ sp = MASTER_CONNECTION(backend)->sp;
+ if (MASTER(backend)->tstate == 'T')
+ snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
+ sp->user, sp->database, remote_ps_data);
+ else
+ snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
+ sp->user, sp->database, remote_ps_data);
+ set_ps_display(psbuf, false);
+
+ return POOL_CONTINUE;
+}
+
+POOL_STATUS ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("ParseComplete: cannot get session context");
+ return POOL_END;
+ }
+
+ if (session_context->pending_pstmt != NULL)
+ {
+ POOL_QUERY_CONTEXT *qc;
+
+ pool_add_prepared_statement();
+
+ /* Set "parse done" to query_state */
+ qc = session_context->pending_pstmt->qctxt;
+ if (qc != NULL)
+ pool_set_query_state(qc, 1);
+
+ session_context->pending_pstmt = NULL;
+ }
+
+ return SimpleForwardToFrontend('1', frontend, backend);
+}
+
+POOL_STATUS BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("BindComplete: cannot get session context");
+ return POOL_END;
+ }
+
+ if (session_context->pending_portal != NULL)
+ {
+ PreparedStatement *pstmt;
+
+ pool_add_portal();
+
+ /* Set "bind done" to query_state */
+ pstmt = session_context->pending_portal->pstmt;
+ if (pstmt != NULL && pstmt->qctxt != NULL)
+ pool_set_query_state(pstmt->qctxt, 2);
+
+ session_context->pending_portal = NULL;
+ }
+
+ return SimpleForwardToFrontend('2', frontend, backend);
+}
+
+POOL_STATUS CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("CloseComplete: cannot get session context");
+ return POOL_END;
+ }
+
+ if (session_context->pending_pstmt != NULL)
+ {
+ pool_remove_prepared_statement();
+ session_context->pending_pstmt = NULL;
+ }
+ else if (session_context->pending_portal != NULL)
+ {
+ pool_remove_portal();
+ session_context->pending_portal = NULL;
+ }
+ else
+ {
+ pool_error("CloseComplete: pending object not found");
+ return POOL_END;
+ }
+
+ return SimpleForwardToFrontend('3', frontend, backend);
+}
+
+POOL_STATUS CommandComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("BindComplete: cannot get session context");
+ return POOL_END;
+ }
+
+ if (session_context->query_context != NULL)
+ {
+ Node *node = session_context->query_context->parse_tree;
+
+ if (IsA(node, PrepareStmt))
+ {
+ pool_add_prepared_statement();
+ session_context->pending_pstmt = NULL;
+ }
+ else if (IsA(node, DeallocateStmt))
+ {
+ char *name;
+
+ name = ((DeallocateStmt *)node)->name;
+ if (name == NULL)
+ pool_clear_prepared_statement_list();
+ else
+ pool_remove_prepared_statement();
+ session_context->pending_pstmt = NULL;
+ }
+ else if (IsA(node, DiscardStmt))
+ {
+ DiscardStmt *stmt = (DiscardStmt *)node;
+ if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
+ {
+ pool_remove_pending_objects();
+ pool_clear_prepared_statement_list();
+ }
+ }
+ else if (IsA(node, TransactionStmt))
+ {
+ TransactionStmt *stmt = (TransactionStmt *) node;
+
+ if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
+ {
+ int i;
+
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (!VALID_BACKEND(i))
+ continue;
+
+ TSTATE(backend, i) = 'T';
+ }
+ }
+ }
+ }
+
+ return SimpleForwardToFrontend('C', frontend, backend);
+}
+
+POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
+{
+// int i;
+ int ret;
+// int res1;
+// int status;
+// char *p1;
+// char kind1;
+
+ /* An error occurred with PREPARE or DEALLOCATE command.
+ * Free pending portal object.
+ */
+ pool_remove_pending_objects();
+
+ ret = SimpleForwardToFrontend('E', frontend, backend);
+ if (ret != POOL_CONTINUE)
+ return ret;
+
+ if (select_in_transaction)
+ {
+ int i;
+
+ in_load_balance = 0;
+ REPLICATION = 1;
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
+ {
+ /*
+ * We must abort transaction to sync transaction state.
+ * If the error was caused by an Execute message,
+ * we must send invalid Execute message to abort
+ * transaction.
+ *
+ * Because extended query protocol ignores all
+ * messages before receiving Sync message inside error state.
+ */
+ if (execute_select)
+ do_error_execute_command(backend, i, PROTO_MAJOR_V3);
+ else
+ do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
+ }
+ }
+ select_in_transaction = 0;
+ execute_select = 0;
+ }
+
+#ifdef NOT_USED
+ for (i = 0;i < NUM_BACKENDS; i++)
+ {
+ if (VALID_BACKEND(i))
+ {
+ POOL_CONNECTION *cp = CONNECTION(backend, i);
+
+ /* We need to send "sync" message to backend in extend mode
+ * so that it accepts next command.
+ * Note that this may be overkill since client may send
+ * it by itself. Moreover we do not need it in non-extend mode.
+ * At this point we regard it is not harmful since error response
+ * will not be sent too frequently.
+ */
+ pool_write(cp, "S", 1);
+ res1 = htonl(4);
+ if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
{
- pool_set_writing_transaction();
+ return POOL_END;
}
}
- pool_unset_query_in_progress();
}
- pool_query_context_destroy(pool_get_session_context()->query_context);
+ while ((ret = read_kind_from_backend(frontend, backend, &kind1)) == POOL_CONTINUE)
+ {
+ if (kind1 == 'Z') /* ReadyForQuery? */
+ break;
- sp = MASTER_CONNECTION(backend)->sp;
- if (MASTER(backend)->tstate == 'T')
- snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
- sp->user, sp->database, remote_ps_data);
- else
- snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
- sp->user, sp->database, remote_ps_data);
- set_ps_display(psbuf, false);
+ ret = SimpleForwardToFrontend(kind1, frontend, backend);
+ if (ret != POOL_CONTINUE)
+ return ret;
+ pool_flush(frontend);
+ }
+
+ if (ret != POOL_CONTINUE)
+ return ret;
+
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (VALID_BACKEND(i))
+ {
+ status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1));
+ if (status < 0)
+ {
+ pool_error("SimpleForwardToFrontend: error while reading message length");
+ return POOL_END;
+ }
+ res1 = ntohl(res1) - sizeof(res1);
+ p1 = pool_read2(CONNECTION(backend, i), res1);
+ if (p1 == NULL)
+ return POOL_END;
+ }
+ }
+#endif
return POOL_CONTINUE;
}
-
POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
}
POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
- POOL_CONNECTION_POOL *backend)
+ POOL_CONNECTION_POOL *backend)
{
char fkind;
- char kind;
+// char kind;
+ char *contents = NULL;
POOL_STATUS status;
- int i;
+// int i;
+ int len;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("Parse: cannot get session context");
+ return POOL_END;
+ }
if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
return POOL_CONTINUE;
return POOL_END;
}
- pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
+ pool_debug("ProcessFrontendResponse: kind from frontend %c(%02x)", fkind, fkind);
+
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ if (pool_read(frontend, &len, sizeof(len)) < 0)
+ return POOL_END;
+ len = ntohl(len) - 4;
+ if (len > 0)
+ contents = pool_read2(frontend, len);
+ }
+ else
+ {
+ if (fkind != 'F')
+ contents = pool_read_string(frontend, &len, 0);
+ }
+
+ if (len > 0 && contents == NULL)
+ return POOL_END;
+
+ if (fkind != 'S' && pool_is_ignore_till_sync())
+ return POOL_CONTINUE;
+#ifdef NOT_USED
/*
* If we have received BEGIN in extended protocol before, we need
* to send a sync message to know the transaction stare.
if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
return POOL_END;
}
+#endif
+
+ pool_unset_doing_extended_query_message();
switch (fkind)
{
- case 'X': /* Terminate message*/
- if (MAJOR(backend) == PROTO_MAJOR_V3)
- {
- int len;
- pool_read(frontend, &len, sizeof(len));
- }
+ case 'X': /* Terminate */
return POOL_END;
- case 'Q': /* Query message*/
+ case 'Q': /* Query */
allow_close_transaction = 1;
- status = SimpleQuery(frontend, backend, NULL);
+ status = SimpleQuery(frontend, backend, len, contents);
break;
- case 'E': /* Execute message */
+ case 'E': /* Execute */
allow_close_transaction = 1;
- status = Execute(frontend, backend);
+ pool_set_doing_extended_query_message();
+ if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+ pool_set_query_in_progress();
+ status = Execute(frontend, backend, len, contents);
break;
- case 'P': /* Parse message */
+ case 'P': /* Parse */
allow_close_transaction = 0;
+ pool_set_doing_extended_query_message();
+ if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+ pool_set_query_in_progress();
+ status = Parse(frontend, backend, len, contents);
+ break;
- if (MASTER_SLAVE &&
- (TSTATE(backend, MASTER_NODE_ID) != 'I' || receive_extended_begin))
- {
- pool_debug("kind: %c master_slave_dml enabled", fkind);
- master_slave_was_enabled = 1;
- MASTER_SLAVE = 0;
- master_slave_dml = 1;
- }
+ case 'B': /* Bind */
+ pool_set_doing_extended_query_message();
+ if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+ pool_set_query_in_progress();
+ status = Bind(frontend, backend, len, contents);
+ break;
+
+ case 'C': /* Close */
+ if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+ pool_set_query_in_progress();
+ status = Close(frontend, backend, len, contents);
+ break;
- status = Parse(frontend, backend);
+ case 'D': /* Describe */
+ pool_set_doing_extended_query_message();
+ if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+ pool_set_query_in_progress();
+ status = Describe(frontend, backend, len, contents);
break;
- case 'S': /* Sync message */
+ case 'S': /* Sync */
+ pool_set_doing_extended_query_message();
receive_extended_begin = 0;
- /* fall through */
+ if (pool_is_ignore_till_sync())
+ pool_unset_ignore_till_sync();
+ if (!pool_is_query_in_progress())
+ pool_set_query_in_progress();
+ status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+ break;
- default:
- if ((MAJOR(backend) == PROTO_MAJOR_V3) &&
- (fkind == 'S' || fkind == 'H' || fkind == 'D' || fkind == 'f'||
- fkind == 'C' || fkind == 'B' || fkind == 'F' || fkind == 'd' || fkind == 'c'))
+ case 'F': /* FunctionCall */
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
{
- /* Function call? */
- if (fkind == 'F' && MASTER_SLAVE)
- {
- /*
- * Send to primary/master node only.
- * For this we treat function call as if INSERT.
- */
- POOL_QUERY_CONTEXT *query_context;
- char *query = "INSERT INTO foo VALUES(1)";
- Node *node;
- List *parse_tree_list;
-
- /* Create query context */
- query_context = pool_init_query_context();
- if (!query_context)
- {
- pool_error("ProcessFrontendResponse: pool_init_query_context failed");
- return POOL_END;
- }
- parse_tree_list = raw_parser(query);
- node = (Node *) lfirst(list_head(parse_tree_list));
- pool_start_query(query_context, query, node);
- pool_where_to_send(query_context, query_context->original_query,
- query_context->parse_tree);
- }
-
-#ifdef NOT_USED
- if (MASTER_SLAVE &&
- (TSTATE(backend, MASTER_NODE_ID) != 'I' || receive_extended_begin))
- {
- pool_debug("kind: %c master_slave_dml enabled", fkind);
- master_slave_was_enabled = 1;
- MASTER_SLAVE = 0;
- master_slave_dml = 1;
- }
-#endif
-
- status = SimpleForwardToBackend(fkind, frontend, backend);
- for (i=0;i<NUM_BACKENDS;i++)
- {
- if (VALID_BACKEND(i))
- {
- if (pool_flush(CONNECTION(backend, i)))
- status = POOL_ERROR;
- }
- }
+ status = FunctionCall3(frontend, backend, len, contents);
+ break;
}
- else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F')
+ else
+ {
status = FunctionCall(frontend, backend);
+ break;
+ }
- else
+ case 'c': /* CopyDone */
+ case 'd': /* CopyData */
+ case 'f': /* CopyFail */
+ case 'H': /* Flush */
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
{
- pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
- status = POOL_ERROR;
+ status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+ break;
}
- break;
+
+ default:
+ pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
+ status = POOL_ERROR;
+ }
+
+ if (status != POOL_CONTINUE)
+ status = POOL_ERROR;
+ return status;
+}
+
+POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, int *state)
+{
+ int status;
+ char kind;
+ short num_fields = 0;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context();
+ if (!session_context)
+ {
+ pool_error("ProcessBackendResponse: cannot get session context");
+ return POOL_END;
+ }
+
+ if (pool_is_ignore_till_sync())
+ {
+ return POOL_CONTINUE;
+ }
+
+ if (pool_is_pool_status_stmt())
+ {
+ pool_unset_pool_status_stmt();
+ return POOL_CONTINUE;
+ }
+
+ status = read_kind_from_backend(frontend, backend, &kind);
+ if (status != POOL_CONTINUE)
+ return status;
+
+ /*
+ * Sanity check
+ */
+ if (kind == 0)
+ {
+ pool_error("ProcessBackendResponse: kind is 0!");
+ return POOL_ERROR;
+ }
+
+ pool_debug("ProcessBackendResponse: kind from backend: %c", kind);
+
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ switch (kind)
+ {
+ case 'G': /* CopyInResponse */
+ status = CopyInResponse(frontend, backend);
+ break;
+
+ case 'S': /* ParameterStatus */
+ status = ParameterStatus(frontend, backend);
+ break;
+
+ case 'Z': /* ReadyForQuery */
+ status = ReadyForQuery(frontend, backend, 1);
+ break;
+
+ case '1': /* ParseComplete */
+ status = ParseComplete(frontend, backend);
+ pool_unset_query_in_progress();
+ break;
+
+ case '2': /* BindComplete */
+ status = BindComplete(frontend, backend);
+ pool_unset_query_in_progress();
+ break;
+
+ case '3': /* CloseComplete */
+ status = CloseComplete(frontend, backend);
+ pool_unset_query_in_progress();
+ break;
+
+ case 'E': /* ErrorResponse */
+ status = ErrorResponse3(frontend, backend);
+ if (pool_is_doing_extended_query_message())
+ pool_set_ignore_till_sync();
+ pool_unset_query_in_progress();
+ break;
+
+ case 'C': /* CommandComplete */
+ status = CommandComplete(frontend, backend);
+ if (pool_is_doing_extended_query_message())
+ pool_unset_query_in_progress();
+ break;
+
+ case 'T': /* RowDescription */
+ status = SimpleForwardToFrontend(kind, frontend, backend);
+ if (pool_is_doing_extended_query_message())
+ pool_unset_query_in_progress();
+ break;
+
+ case 'n': /* NoData */
+ status = SimpleForwardToFrontend(kind, frontend, backend);
+ pool_unset_query_in_progress();
+ break;
+
+ default:
+ status = SimpleForwardToFrontend(kind, frontend, backend);
+ if (pool_flush(frontend))
+ return POOL_END;
+ break;
+ }
+ }
+ else
+ {
+ switch (kind)
+ {
+ case 'A': /* NotificationResponse */
+ status = NotificationResponse(frontend, backend);
+ break;
+
+ case 'B': /* BinaryRow */
+ status = BinaryRow(frontend, backend, num_fields);
+ break;
+
+ case 'C': /* CompletedResponse */
+ status = CompletedResponse(frontend, backend);
+ break;
+
+ case 'D': /* AsciiRow */
+ status = AsciiRow(frontend, backend, num_fields);
+ break;
+
+ case 'E': /* ErrorResponse */
+ status = ErrorResponse(frontend, backend);
+ break;
+
+ case 'G': /* CopyInResponse */
+ status = CopyInResponse(frontend, backend);
+ break;
+
+ case 'H': /* CopyOutResponse */
+ status = CopyOutResponse(frontend, backend);
+ break;
+
+ case 'I': /* EmptyQueryResponse */
+ status = EmptyQueryResponse(frontend, backend);
+ break;
+
+ case 'N': /* NoticeResponse */
+ status = NoticeResponse(frontend, backend);
+ break;
+
+ case 'P': /* CursorResponse */
+ status = CursorResponse(frontend, backend);
+ break;
+
+ case 'T': /* RowDescription */
+ status = RowDescription(frontend, backend, &num_fields);
+ break;
+
+ case 'V': /* FunctionResultResponse and FunctionVoidResponse */
+ status = FunctionResultResponse(frontend, backend);
+ break;
+
+ case 'Z': /* ReadyForQuery */
+ status = ReadyForQuery(frontend, backend, 1);
+ break;
+
+ default:
+ pool_error("Unknown message type %c(%02x)", kind, kind);
+ exit(1);
+ }
+ }
+
+ /* Do we receive ready for query while processing reset
+ * request?
+ */
+ if (kind == 'Z' && frontend->no_forward && *state == 1)
+ {
+ *state = 0;
}
if (status != POOL_CONTINUE)
}
else
{
- SimpleForwardToBackend(kind, frontend, backend);
+ char *contents = NULL;
+
+ if (pool_read(frontend, &kind, 1) < 0)
+ {
+ pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited");
+ return POOL_END;
+ }
+
+ pool_debug("read kind from frontend %c(%02x)", kind, kind);
+
+ if (pool_read(frontend, &len, sizeof(len)) < 0)
+ return POOL_END;
+ len = ntohl(len) - 4;
+ if (len > 0)
+ contents = pool_read2(frontend, len);
+
+ SimpleForwardToBackend(kind, frontend, backend, len, contents);
}
/* CopyData? */
prefix, node_id, ntohl(slot->pid), query, message);
}
}
+
+static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ PreparedStatement *ps)
+{
+ int i;
+ int len = ps->parse_len;
+ char kind = '\0';
+ char *contents = ps->parse_contents;
+ bool parse_was_sent = false;
+ bool backup[MAX_NUM_BACKENDS];
+ POOL_STATUS status;
+ POOL_QUERY_CONTEXT *qc = ps->qctxt;
+
+ memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
+
+ /* expect to send to master node only */
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (qc->where_to_send[i] && qc->query_state[i] < 1) /* 1: parse done */
+ {
+ pool_debug("parse_before_bind: waiting for backend %d completing parse", i);
+ if (pool_send_and_wait(qc, contents, len, 1, i, "P") != POOL_CONTINUE)
+ return POOL_END;
+ }
+ else
+ {
+ qc->where_to_send[i] = 0;
+ }
+ }
+
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (qc->where_to_send[i])
+ {
+ parse_was_sent = true;
+ break;
+ }
+ }
+
+ if (parse_was_sent)
+ {
+ while (kind != '1')
+ {
+ status = read_kind_from_backend(frontend, backend, &kind);
+ if (status != POOL_CONTINUE)
+ {
+ memcpy(qc->where_to_send, backup, sizeof(backup));
+ return status;
+ }
+ status = pool_discard_packet_contents(backend);
+ if (status != POOL_CONTINUE)
+ {
+ memcpy(qc->where_to_send, backup, sizeof(backup));
+ return status;
+ }
+ }
+ }
+
+ memcpy(qc->where_to_send, backup, sizeof(backup));
+ return POOL_CONTINUE;
+}
+
+static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context)
+{
+ char *name;
+ PreparedStatement *ps;
+ POOL_QUERY_CONTEXT *qc;
+
+ if (IsA(query_context->parse_tree, DeallocateStmt)) {
+ name = ((DeallocateStmt *)query_context->parse_tree)->name;
+ if (name != NULL) /* NULL is "DEALLOCATE ALL" */
+ {
+ ps = pool_get_prepared_statement_by_pstmt_name(name);
+ if (ps != NULL)
+ {
+ qc = ps->qctxt;
+ if (qc != NULL)
+ {
+ memcpy(query_context->where_to_send, qc->where_to_send,
+ sizeof(query_context->where_to_send));
+ }
+ }
+ }
+ }
+}