structure into POOL_SENT_MESSAGE structure.
Portal and PreparedStatement structure were used by only extended query
protocol before. but POOL_SENT_MESSAGE is used by simple query protocol too.
This change fixes issues that happen when two protocols were mixed in
one session.
PHP example)
// PREPARE using simple query protocol
pg_query($dbconn, 'PREPARE my_query(TEXT) AS SELECT $1::TEXT');
// EXECUTE using exetended query protocol
pg_execute($dbconn, "my_query", array('test'));
static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
static char *get_insert_command_table_name(InsertStmt *node);
-static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList p, int n);
+static int send_deallocate(POOL_CONNECTION_POOL *backend, POOL_SENT_MESSAGE_LIST msglist, int n);
static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend);
static bool is_panic_or_fatal_error(const char *message, int major);
void reset_connection(void)
{
reset_variables();
- pool_clear_prepared_statement_list();
+ pool_clear_sent_message_list();
}
*/
if (qcnt >= qn)
{
- if (session_context->pstmt_list.size == 0)
+ if (session_context->message_list.size == 0)
return 2;
- char *name = session_context->pstmt_list.pstmts[0]->name;
+ char kind = session_context->message_list.sent_messages[0]->kind;
+ char *name = session_context->message_list.sent_messages[0]->name;
- /* Delete from prepared list */
- if (send_deallocate(backend, session_context->pstmt_list, 0))
+ if ((kind == 'P' || kind == 'Q') && *name != '\0')
{
- /* Deallocate failed. We are in unknown state. Ask caller
- * to reset backend connection.
- */
+ /* Deallocate a prepared statement */
+ if (send_deallocate(backend, session_context->message_list, 0))
+ {
+ /* Deallocate failed. We are in unknown state. Ask caller
+ * to reset backend connection.
+ */
#ifdef NOT_USED
- reset_prepared_list(&prepared_list);
+ reset_prepared_list(&prepared_list);
#endif
- pool_remove_prepared_statement_by_pstmt_name(name);
- return -1;
- }
- /*
- * If DEALLOCATE returns ERROR response, instead of
- * CommandComplete, del_prepared_list is not called and the
- * prepared object keeps on sitting on the prepared list. This
- * will cause infinite call to reset_backend. So we call
- * del_prepared_list() again. This is harmless since trying to
- * remove same prepared object will be ignored.
- */
+ pool_remove_sent_message(kind, name);
+ return -1;
+ }
+ /*
+ * If DEALLOCATE returns ERROR response, instead of
+ * CommandComplete, del_prepared_list is not called and the
+ * prepared object keeps on sitting on the prepared list. This
+ * will cause infinite call to reset_backend. So we call
+ * del_prepared_list() again. This is harmless since trying to
+ * remove same prepared object will be ignored.
+ */
#ifdef NOT_USED
- del_prepared_list(&prepared_list, prepared_list.portal_list[0]);
+ del_prepared_list(&prepared_list, prepared_list.portal_list[0]);
#endif
- pool_remove_prepared_statement_by_pstmt_name(name);
- return 1;
+ pool_remove_sent_message(kind, name);
+ return 1;
+ }
+ else
+ {
+ pool_remove_sent_message(kind, name);
+ return 0;
+ }
}
query = pool_config->reset_query_list[qcnt];
if (IsA(node, DeallocateStmt))
{
- PreparedStatement *ps;
+ POOL_SENT_MESSAGE *sent_msg;
DeallocateStmt *d = (DeallocateStmt *)node;
- ps = pool_get_prepared_statement_by_pstmt_name(d->name);
- if (ps && ps->qctxt->original_query)
+ sent_msg = pool_get_sent_message('Q', d->name);
+ if (!sent_msg)
+ sent_msg = pool_get_sent_message('P', d->name);
+ if (sent_msg)
{
- string_append_char(msg, "[");
- string_append_char(msg, ps->qctxt->original_query);
- string_append_char(msg, "]");
+ if (sent_msg->query_context->original_query)
+ {
+ string_append_char(msg, "[");
+ string_append_char(msg, sent_msg->query_context->original_query);
+ string_append_char(msg, "]");
+ }
}
}
}
/*
* Send DEALLOCATE message to backend by using SimpleQuery.
*/
-static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList p,
- int n)
+static int send_deallocate(POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE_LIST msglist, int n)
{
- char *query;
int len;
- PrepareStmt *p_stmt;
+ char *name;
+ char *query;
- if (p.size <= n)
+ if (msglist.size <= n)
return 1;
- p_stmt = (PrepareStmt *)p.pstmts[n]->qctxt->parse_tree;
- len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */
+ name = msglist.sent_messages[n]->name;
+
+ len = strlen(name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */
query = malloc(len);
if (query == NULL)
{
pool_error("send_deallocate: malloc failed");
return -1;
}
- sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name);
+ sprintf(query, "DEALLOCATE \"%s\"", name);
if (SimpleQuery(NULL, backend, strlen(query)+1, query) != POOL_CONTINUE)
{
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2010 PgPool Global Development Group
+ * Copyright (c) 2003-2011 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
static void generate_error_message(char *prefix, int specific_error, char *query);
static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
- PreparedStatement *ps);
-static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context);
+ POOL_SENT_MESSAGE *message);
static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes);
static int extract_ntuples(char *message);
return POOL_ERROR;
}
}
+
if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
{
int i;
}
}
+ if (node)
+ {
+ POOL_SENT_MESSAGE *msg = NULL;
+
+ if (IsA(node, PrepareStmt))
+ {
+ msg = pool_create_sent_message('Q', len, contents, 0,
+ ((PrepareStmt *)node)->name,
+ query_context);
+ if (!msg)
+ {
+ pool_error("SimpleQuery: cannot create query message: %s", strerror(errno));
+ return POOL_END;
+ }
+ session_context->uncompleted_message = msg;
+ }
+ }
+
string = query_context->original_query;
if (!RAW_MODE)
if (node)
{
- PreparedStatement *ps = NULL;
+ POOL_SENT_MESSAGE *msg = NULL;
- if (IsA(node, PrepareStmt))
+ if (IsA(node, ExecuteStmt))
{
-#ifdef NOT_USED
- ps = session_context->pending_pstmt;
- ps->num_tsparams = 0;
-#endif
+ msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name);
+ if (!msg)
+ msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name);
}
- else if (IsA(node, ExecuteStmt))
- ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *) node)->name);
/* rewrite `now()' to timestamp literal */
- rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, ps);
+ rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg);
if (rewrite_query != NULL)
{
query_context->rewritten_query = rewrite_query;
int len, char *contents)
{
int commit = 0;
- Portal *portal;
char *query = NULL;
Node *node;
int specific_error = 0;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
+ POOL_SENT_MESSAGE *msg;
/* Get session context */
session_context = pool_get_session_context();
pool_debug("Execute: portal name <%s>", contents);
- portal = pool_get_portal_by_portal_name(contents);
- if (portal == NULL)
+ msg = pool_get_sent_message('B', contents);
+ if (!msg)
{
- pool_error("Execute: cannot get portal");
+ pool_error("Execute: cannot get bind message");
return POOL_END;
}
- if (portal->pstmt == NULL)
+ if(!msg->query_context)
{
- pool_error("Execute: cannot get prepared statement");
+ pool_error("Execute: cannot get query context");
return POOL_END;
}
- if (portal->pstmt->qctxt == NULL)
+ if (!msg->query_context->original_query)
{
- pool_error("Execute: cannot get query context");
+ pool_error("Execute: cannot get original query");
return POOL_END;
}
- if (portal->pstmt->qctxt->parse_tree== NULL)
+ if (!msg->query_context->parse_tree)
{
pool_error("Execute: cannot get parse tree");
return POOL_END;
}
- query_context = portal->pstmt->qctxt;
- node = query_context->parse_tree;
- query = portal->pstmt->qctxt->original_query;
+ query_context = msg->query_context;
+ node = msg->query_context->parse_tree;
+ query = msg->query_context->original_query;
pool_debug("Execute: query: %s", query);
strncpy(query_string_buffer, query, sizeof(query_string_buffer));
session_context->query_context = query_context;
pool_where_to_send(query_context, query, node);
-
- if (IsA(query_context->parse_tree, DeallocateStmt))
- overwrite_map_for_deallocate(query_context);
-
/* check if query is "COMMIT" or "ROLLBACK" */
commit = is_commit_query(node);
char *stmt;
List *parse_tree_list;
Node *node = NULL;
- PreparedStatement *ps;
+ POOL_SENT_MESSAGE *msg;
POOL_STATUS status;
POOL_MEMORY_POOL *old_context;
POOL_SESSION_CONTEXT *session_context;
*/
pool_start_query(query_context, pstrdup(stmt), node);
- ps = pool_create_prepared_statement(name, 0, len, contents, query_context);
- session_context->pending_pstmt = ps;
+ msg = pool_create_sent_message('P', len, contents, 0, name, query_context);
+ if (!msg)
+ {
+ pool_error("Parse: cannot create parse message: %s", strerror(errno));
+ return POOL_END;
+ }
+
+ session_context->uncompleted_message = msg;
/*
* Decide where to send query
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (IsA(query_context->parse_tree, DeallocateStmt))
- overwrite_map_for_deallocate(query_context);
-
if (REPLICATION)
{
char *rewrite_query;
*/
if (*name == '\0')
rewrite_to_params = false;
- ps->num_tsparams = 0;
- rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, ps);
+ msg->num_tsparams = 0;
+ rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg);
if (rewrite_query != NULL)
{
int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
stmt = contents + strlen(name) + 1;
pool_debug("Parse: rewrite query %s %s len=%d", name, stmt, len);
- ps->parse_len = len;
- ps->parse_contents = contents;
+ msg->len = len;
+ msg->contents = contents;
}
}
}
/* free_parser(); */
return POOL_END;
}
+
+ /*
+ * set in_progress flag, because ReadyForQuery unset it.
+ * in_progress flag influences VALID_BACKEND.
+ */
+ if (!pool_is_query_in_progress())
+ pool_set_query_in_progress();
}
if (is_strict_query(query_context->parse_tree))
char *pstmt_name;
char *portal_name;
char *rewrite_msg;
- Portal *portal = NULL;
- PreparedStatement *pstmt = NULL;
+ POOL_SENT_MESSAGE *parse_msg;
+ POOL_SENT_MESSAGE *bind_msg;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
portal_name = contents;
pstmt_name = contents + strlen(portal_name) + 1;
- pstmt = pool_get_prepared_statement_by_pstmt_name(pstmt_name);
- if (pstmt == NULL)
+ parse_msg = pool_get_sent_message('Q', pstmt_name);
+ if (!parse_msg)
+ parse_msg = pool_get_sent_message('P', pstmt_name);
+ if (!parse_msg)
{
- pool_error("Bind: cannot get prepared statement \"%s\"", pstmt_name);
+ pool_error("Bind: cannot get parse message \"%s\"", pstmt_name);
return POOL_END;
}
- portal = pool_create_portal(portal_name, pstmt->num_tsparams, pstmt);
- if (portal == NULL)
+ bind_msg = pool_create_sent_message('B', len, contents,
+ parse_msg->num_tsparams, portal_name,
+ parse_msg->query_context);
+ if (!bind_msg)
{
- pool_error("Bind: cannot create portal: %s", strerror(errno));
+ pool_error("Bind: cannot create bind message: %s", strerror(errno));
return POOL_END;
}
- query_context = pstmt->qctxt;
- if (query_context == NULL)
+ query_context = parse_msg->query_context;
+ if (!query_context)
{
pool_error("Bind: cannot get query context");
return POOL_END;
}
- session_context->pending_portal = portal;
+ session_context->uncompleted_message = bind_msg;
/* rewrite bind message */
- if (REPLICATION && portal->num_tsparams > 0)
+ if (REPLICATION && bind_msg->num_tsparams > 0)
{
- rewrite_msg = bind_rewrite_timestamp(backend, portal, contents, &len);
+ rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len);
if (rewrite_msg != NULL)
contents = rewrite_msg;
}
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (IsA(query_context->parse_tree, DeallocateStmt))
- overwrite_map_for_deallocate(query_context);
-
if (pool_config->load_balance_mode && pool_is_writing_transaction())
{
- if(parse_before_bind(frontend, backend, pstmt) != POOL_CONTINUE)
+ if(parse_before_bind(frontend, backend, parse_msg) != POOL_CONTINUE)
return POOL_END;
}
POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
- Portal *portal = NULL;
- PreparedStatement *pstmt = NULL;
+ POOL_SENT_MESSAGE *msg;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
/* Prepared Statement */
if (*contents == 'S')
{
- pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
+ msg = pool_get_sent_message('Q', contents+1);
+ if (!msg)
+ msg = pool_get_sent_message('P', contents+1);
+ if (!msg)
+ {
+ pool_error("Describe: cannot get parse message");
+ return POOL_END;
+ }
}
/* Portal */
else
{
- portal = pool_get_portal_by_portal_name(contents+1);
- if (portal == NULL)
+ msg = pool_get_sent_message('B', contents+1);
+ if (!msg)
{
- pool_error("Describe: cannot get portal \"%s\"", contents+1);
+ pool_error("Describe: cannot get bind message");
return POOL_END;
}
-
- pstmt = portal->pstmt;
}
- if (pstmt == NULL)
- {
- pool_error("Describe: cannot get prepared statement");
- return POOL_END;
- }
+ query_context = msg->query_context;
- query_context = pstmt->qctxt;
if (query_context == NULL)
{
pool_error("Describe: cannot get query context");
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (IsA(query_context->parse_tree, DeallocateStmt))
- overwrite_map_for_deallocate(query_context);
-
pool_debug("Describe: waiting for master completing the query");
if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "D")
!= POOL_CONTINUE)
POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
- Portal *portal = NULL;
- PreparedStatement *pstmt = NULL;
+ POOL_SENT_MESSAGE *msg;
POOL_SESSION_CONTEXT *session_context;
POOL_QUERY_CONTEXT *query_context;
/* Prepared Statement */
if (*contents == 'S')
{
- pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
- if (pstmt == NULL)
+ msg = pool_get_sent_message('Q', contents+1);
+ if (!msg)
+ msg = pool_get_sent_message('P', contents+1);
+ if (!msg)
{
- pool_error("Close: cannot get prepared statement");
+ pool_error("Close: cannot get parse message");
return POOL_END;
}
-
- session_context->pending_pstmt = pstmt;
- query_context = pstmt->qctxt;
}
/* Portal */
else if (*contents == 'P')
{
- portal = pool_get_portal_by_portal_name(contents+1);
- if (portal == NULL)
+ msg = pool_get_sent_message('B', contents+1);
+ if (!msg)
{
- pool_error("Close: cannot get portal");
+ pool_error("Close: cannot get bind message");
return POOL_END;
}
-
- session_context->pending_portal = portal;
- query_context = portal->qctxt;
}
else
{
return POOL_END;
}
- if (query_context == NULL)
+ session_context->uncompleted_message = msg;
+ query_context = msg->query_context;
+
+ if (!query_context)
{
pool_error("Close: cannot get query context");
return POOL_END;
signed char kind;
signed char state;
POOL_SESSION_CONTEXT *session_context;
+ Node *node = NULL;
+ char *query = NULL;
/* Get session context */
session_context = pool_get_session_context();
if (pool_is_query_in_progress() && pool_is_command_success())
{
- Node *node;
- char *query;
-
node = pool_get_parse_tree();
query = pool_get_query_string();
}
if (!pool_is_doing_extended_query_message())
- pool_query_context_destroy(pool_get_session_context()->query_context);
+ {
+ if (!(node && IsA(node, PrepareStmt)))
+ pool_query_context_destroy(pool_get_session_context()->query_context);
+ }
sp = MASTER_CONNECTION(backend)->sp;
if (MASTER(backend)->tstate == 'T')
return POOL_END;
}
- if (session_context->pending_pstmt != NULL)
+ if (session_context->uncompleted_message)
{
POOL_QUERY_CONTEXT *qc;
- pool_add_prepared_statement();
+ pool_add_sent_message(session_context->uncompleted_message);
- /* Set "parse done" to query_state */
- qc = session_context->pending_pstmt->qctxt;
- if (qc != NULL)
- pool_set_query_state(qc, 1);
+ qc = session_context->uncompleted_message->query_context;
+ if (qc)
+ pool_set_query_state(qc, POOL_PARSE_COMPLETE);
- session_context->pending_pstmt = NULL;
+ session_context->uncompleted_message = NULL;
}
return SimpleForwardToFrontend('1', frontend, backend);
return POOL_END;
}
- if (session_context->pending_portal != NULL)
+ if (session_context->uncompleted_message)
{
- PreparedStatement *pstmt;
+ POOL_QUERY_CONTEXT *qc;
- pool_add_portal();
+ pool_add_sent_message(session_context->uncompleted_message);
- /* Set "bind done" to query_state */
- pstmt = session_context->pending_portal->pstmt;
- if (pstmt != NULL && pstmt->qctxt != NULL)
- pool_set_query_state(pstmt->qctxt, 2);
+ qc = session_context->uncompleted_message->query_context;
+ if (qc)
+ pool_set_query_state(qc, POOL_BIND_COMPLETE);
- session_context->pending_portal = NULL;
+ session_context->uncompleted_message = NULL;
}
return SimpleForwardToFrontend('2', frontend, backend);
return POOL_END;
}
- if (session_context->pending_pstmt != NULL)
+ if (session_context->uncompleted_message)
{
- pool_remove_prepared_statement();
- session_context->pending_pstmt = NULL;
- }
- else if (session_context->pending_portal != NULL)
- {
- pool_remove_portal();
- session_context->pending_portal = NULL;
+ pool_remove_sent_message(session_context->uncompleted_message->kind,
+ session_context->uncompleted_message->name);
+ session_context->uncompleted_message = NULL;
}
else
{
- pool_error("CloseComplete: pending object not found");
+ pool_error("CloseComplete: uncompleted message not found");
return POOL_END;
}
if (IsA(node, PrepareStmt))
{
- pool_add_prepared_statement();
- session_context->pending_pstmt = NULL;
+ if (session_context->uncompleted_message)
+ {
+ pool_add_sent_message(session_context->uncompleted_message);
+ session_context->uncompleted_message = NULL;
+ }
}
else if (IsA(node, DeallocateStmt))
{
name = ((DeallocateStmt *)node)->name;
if (name == NULL)
- pool_clear_prepared_statement_list();
+ {
+ pool_remove_sent_messages('Q');
+ pool_remove_sent_messages('P');
+ }
else
- pool_remove_prepared_statement();
- session_context->pending_pstmt = NULL;
+ {
+ pool_remove_sent_message('Q', name);
+ pool_remove_sent_message('P', name);
+ }
}
else if (IsA(node, DiscardStmt))
{
DiscardStmt *stmt = (DiscardStmt *)node;
- if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
+
+ if (stmt->target == DISCARD_PLANS)
+ {
+ pool_remove_sent_messages('Q');
+ pool_remove_sent_messages('P');
+ }
+ else if (stmt->target == DISCARD_ALL)
{
- pool_remove_pending_objects();
- pool_clear_prepared_statement_list();
+ pool_clear_sent_message_list();
}
}
/*
POOL_CONNECTION_POOL *backend)
{
POOL_STATUS ret;
+ POOL_SESSION_CONTEXT *session_context;
ret = SimpleForwardToFrontend('E', frontend, backend);
if (ret != POOL_CONTINUE)
/* An error occurred with PREPARE or DEALLOCATE command.
* Free pending portal object.
*/
- pool_remove_pending_objects();
+ session_context = pool_get_session_context();
+ if (session_context)
+ {
+ if (session_context->uncompleted_message)
+ {
+ pool_add_sent_message(session_context->uncompleted_message);
+ pool_remove_sent_message(session_context->uncompleted_message->kind,
+ session_context->uncompleted_message->name);
+ session_context->uncompleted_message = NULL;
+ }
+ }
return POOL_CONTINUE;
}
case 'E': /* ErrorResponse */
status = ErrorResponse3(frontend, backend);
+ pool_set_command_success();
if (TSTATE(backend, REAL_MASTER_NODE_ID) != 'I')
pool_set_failed_transaction();
if (pool_is_doing_extended_query_message())
static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
- PreparedStatement *ps)
+ POOL_SENT_MESSAGE *message)
{
int i;
- int len = ps->parse_len;
+ int len = message->len;
char kind = '\0';
- char *contents = ps->parse_contents;
+ char *contents = message->contents;
bool parse_was_sent = false;
bool backup[MAX_NUM_BACKENDS];
POOL_STATUS status;
- POOL_QUERY_CONTEXT *qc = ps->qctxt;
+ POOL_QUERY_CONTEXT *qc = message->query_context;
memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
/* expect to send to master node only */
for (i = 0; i < NUM_BACKENDS; i++)
{
- if (qc->where_to_send[i] && qc->query_state[i] < 1) /* 1: parse done */
+ if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
{
pool_debug("parse_before_bind: waiting for backend %d completing parse", i);
if (pool_send_and_wait(qc, contents, len, 1, i, "P") != POOL_CONTINUE)
return POOL_CONTINUE;
}
-static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context)
-{
- char *name;
- PreparedStatement *ps;
- POOL_QUERY_CONTEXT *qc;
-
- if (IsA(query_context->parse_tree, DeallocateStmt)) {
- name = ((DeallocateStmt *)query_context->parse_tree)->name;
- if (name != NULL) /* NULL is "DEALLOCATE ALL" */
- {
- ps = pool_get_prepared_statement_by_pstmt_name(name);
- if (ps != NULL)
- {
- qc = ps->qctxt;
- if (qc != NULL)
- {
- memcpy(query_context->where_to_send, qc->where_to_send,
- sizeof(query_context->where_to_send));
- }
- }
- }
- }
-}
-
/*
* Find victim nodes by "decide by majority" rule and returns array
* of victim node ids. If no victim is found, return NULL.
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
}
-
- /* PREPARE? */
- if (IsA(node, PrepareStmt))
- {
- /* Make sure that same prepared statement does not exist */
- if (pool_get_prep_where(((PrepareStmt *)node)->name) == NULL)
- {
- /* Save the send map */
- pool_add_prep_where(((PrepareStmt *)node)->name, query_context->where_to_send);
- }
- }
-
- /*
- * EXECUTE?
- */
- else if (IsA(node, ExecuteStmt))
- {
- bool *wts;
-
- wts = pool_get_prep_where(((ExecuteStmt *)node)->name);
- if (wts)
- {
- /* Inherit same map from PREPARE */
- pool_copy_prep_where(wts, query_context->where_to_send);
- }
- }
-
- /*
- * DEALLOCATE?
- */
- else if (IsA(node, DeallocateStmt))
- {
- where_to_send_deallocate(query_context, node);
- }
}
else if (REPLICATION || PARALLEL_MODE)
{
pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
}
}
- /*
- * DEALLOCATE?
- */
- else if (IsA(node, DeallocateStmt))
- {
- where_to_send_deallocate(query_context, node);
- }
else
{
/* send to all nodes */
return;
}
+ /*
+ * EXECUTE?
+ */
+ if (IsA(node, ExecuteStmt))
+ {
+ POOL_SENT_MESSAGE *msg;
+
+ msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name);
+ if (!msg)
+ msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name);
+ if (msg)
+ pool_copy_prep_where(msg->query_context->where_to_send,
+ query_context->where_to_send);
+ }
+
+ /*
+ * DEALLOCATE?
+ */
+ else if (IsA(node, DeallocateStmt))
+ {
+ where_to_send_deallocate(query_context, node);
+ }
+
for (i=0;i<NUM_BACKENDS;i++)
{
if (query_context->where_to_send[i])
break;
}
}
+
return;
}
-
/*
* Send query and wait for response
* string:
void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node)
{
DeallocateStmt *d = (DeallocateStmt *)node;
- bool *wts;
+ POOL_SENT_MESSAGE *msg;
/* DELLOCATE ALL? */
if (d->name == NULL)
}
else
{
- wts = pool_get_prep_where(d->name);
- if (wts)
+ msg = pool_get_sent_message('Q', d->name);
+ if (!msg)
+ msg = pool_get_sent_message('P', d->name);
+ if (msg)
{
- /* Inherit same map from PREPARE */
- pool_copy_prep_where(wts, query_context->where_to_send);
- return;
- }
- else
- {
- PreparedStatement *ps;
-
- ps = pool_get_prepared_statement_by_pstmt_name(d->name);
- if (ps && ps->qctxt)
- {
- pool_copy_prep_where(ps->qctxt->where_to_send, query_context->where_to_send);
- return;
- }
+ /* Inherit same map from PREPARE or PARSE */
+ pool_copy_prep_where(msg->query_context->where_to_send,
+ query_context->where_to_send);
}
+ return;
}
/* prepared statement was not found */
pool_setall_node_to_be_sent(query_context);
}
/*
- * Set query state, if specified state less than current state
- * state:
- * 0: before parse 1: parse done 2: bind done
- * 3: describe done 4: execute done -1: in error
+ * Set query state, if a current state is before it than the specified state.
*/
-void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, short state)
+void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state)
{
int i;
return;
}
- if (state < -1 || state > 4)
- {
- pool_error("pool_set_query_state: invalid query state: %d", state);
- return;
- }
-
for (i = 0; i < NUM_BACKENDS; i++)
{
if (query_context->where_to_send[i] &&
- query_context->query_state[i] < state)
+ statecmp(query_context->query_state[i], state) < 0)
query_context->query_state[i] = state;
}
}
+
+int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2)
+{
+ int ret;
+
+ switch (s2) {
+ case POOL_UNPARSED:
+ ret = (s1 == s2) ? 0 : 1;
+ break;
+ case POOL_PARSE_COMPLETE:
+ if (s1 == POOL_UNPARSED)
+ ret = -1;
+ else
+ ret = (s1 == s2) ? 0 : 1;
+ break;
+ case POOL_BIND_COMPLETE:
+ if (s1 == POOL_UNPARSED || s1 == POOL_PARSE_COMPLETE)
+ ret = -1;
+ else
+ ret = (s1 == s2) ? 0 : 1;
+ break;
+ case POOL_EXECUTE_COMPLETE:
+ ret = (s1 == s2) ? 0 : -1;
+ break;
+ default:
+ ret = -2;
+ break;
+ }
+
+ return ret;
+}
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2010 PgPool Global Development Group
+ * Copyright (c) 2003-2011 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
#include "parser/nodes.h"
#include "parser/pool_memory.h"
+typedef enum {
+ POOL_UNPARSED,
+ POOL_PARSE_COMPLETE,
+ POOL_BIND_COMPLETE,
+ POOL_EXECUTE_COMPLETE
+} POOL_QUERY_STATE;
+
/*
* Query context:
* Manages per query context
typedef struct {
char *original_query; /* original query string */
char *rewritten_query; /* rewritten query string if any */
- Node *parse_tree; /* raw parser output if any */
+ Node *parse_tree; /* raw parser output if any */
Node *rewritten_parse_tree; /* rewritten raw parser output if any */
- bool where_to_send[MAX_NUM_BACKENDS]; /* DB node map to send query */
- int virtual_master_node_id; /* the first DB node to send query */
- POOL_MEMORY_POOL *memory_context; /* memory context for query */
-
- /*
- * query_state:
- * 0: before parse 1: parse done 2: bind done
- * 3: describe done 4: execute done -1: in error
- */
- short query_state[MAX_NUM_BACKENDS];
-
+ bool where_to_send[MAX_NUM_BACKENDS]; /* DB node map to send query */
+ int virtual_master_node_id; /* the 1st DB node to send query */
+ POOL_MEMORY_POOL *memory_context; /* memory context for query */
+ POOL_QUERY_STATE query_state[MAX_NUM_BACKENDS]; /* for extended query protocol */
} POOL_QUERY_CONTEXT;
extern POOL_QUERY_CONTEXT *pool_init_query_context(void);
extern char *pool_get_query_string(void);
extern bool is_set_transaction_serializable(Node *ndoe, char *query);
extern bool is_2pc_transaction_query(Node *node, char *query);
-extern void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, short state);
+extern void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state);
+extern int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2);
#endif /* POOL_QUERY_CONTEXT_H */
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2010 PgPool Global Development Group
+ * Copyright (c) 2003-2011 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
static POOL_SESSION_CONTEXT session_context_d;
static POOL_SESSION_CONTEXT *session_context = NULL;
-static void init_prepared_statement_list(void);
-static void init_portal_list(void);
-static bool can_prepared_statement_destroy(POOL_QUERY_CONTEXT *qc);
-static bool can_portal_destroy(POOL_QUERY_CONTEXT *qc);
+static void init_sent_message_list(void);
+static bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc);
/*
* Initialize per session context
/* Initialize local session id */
pool_incremnet_local_session_id();
- /* Initialize prepared statement list */
- init_prepared_statement_list();
-
- /* Initialize portal list */
- init_portal_list();
+ /* Initialize sent message list */
+ init_sent_message_list();
/* Create memory context */
session_context->memory_context = pool_memory_create(PREPARE_BLOCK_SIZE);
pool_unset_ignore_till_sync();
/* Initialize where to send map for PREPARE statemets */
+#ifdef NOT_USED
memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS;
-
+#endif /* NOT_USED */
/* Reset flag to indicate difference in number of affected tuples
* in UPDATE/DELETE.
*/
{
if (session_context)
{
- free(session_context->pstmt_list.pstmts);
- free(session_context->portal_list.portals);
+ pool_clear_sent_message_list();
+ free(session_context->message_list.sent_messages);
pool_memory_delete(session_context->memory_context, 0);
}
/* XXX For now, just zap memory */
}
/*
- * Remove a portal by portal name
+ * Remove a sent message
*/
-static void pool_remove_portal_by_portal_name(const char *name)
+bool pool_remove_sent_message(char kind, const char *name)
{
int i;
- PortalList *plist;
+ POOL_SENT_MESSAGE_LIST *msglist;
- if (*name == '\0')
+ if (!session_context)
{
- if (session_context->unnamed_portal)
- {
- pool_memory_free(session_context->memory_context,
- session_context->unnamed_portal);
- session_context->unnamed_portal = NULL;
- }
- return;
+ pool_error("pool_remove_sent_message: session context is not initialized");
+ return false;
}
- plist = &session_context->portal_list;
+ msglist = &session_context->message_list;
- for (i = 0; i < plist->size; i++)
+ for (i = 0; i < msglist->size; i++)
{
- if (strcmp(plist->portals[i]->name, name) == 0)
+ if (msglist->sent_messages[i]->kind == kind &&
+ !strcmp(msglist->sent_messages[i]->name, name))
{
- if (can_portal_destroy(plist->portals[i]->qctxt))
- pool_query_context_destroy(plist->portals[i]->qctxt);
- pool_memory_free(session_context->memory_context, plist->portals[i]);
+ pool_sent_message_destroy(msglist->sent_messages[i]);
break;
}
}
-
- /* portal not found */
- if (i == plist->size)
- return;
-
- if (i != plist->size - 1)
- {
- memmove(&plist->portals[i], &plist->portals[i+1],
- sizeof(Portal *) * (plist->size - i - 1));
- }
- plist->size--;
-}
-/*
- * Remove portals by prepared statement name
- * prepared statement : portal = 1 : N
- */
-#ifdef NOT_USED
-static void pool_remove_portal_by_pstmt_name(const char *name)
-{
- int i;
- PortalList *plist;
+ /* sent message not found */
+ if (i == msglist->size)
+ return false;
- if (*name == '\0')
+ if (i != msglist->size - 1)
{
- if (session_context->unnamed_portal)
- {
- pool_memory_free(session_context->memory_context,
- session_context->unnamed_portal);
- session_context->unnamed_portal = NULL;
- }
- return;
+ memmove(&msglist->sent_messages[i], &msglist->sent_messages[i+1],
+ sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1));
}
- plist = &session_context->portal_list;
+ msglist->size--;
- for (i = 0; i < plist->size; i++)
- {
- if (strcmp(plist->portals[i]->pstmt->name, name) == 0)
- pool_remove_portal_by_portal_name(plist->portals[i]->name);
- }
+ return true;
}
-#endif
/*
- * Remove a prepared statement by prepared statement name
+ * Remove same kind of sent messages
*/
-void pool_remove_prepared_statement_by_pstmt_name(const char *name)
+void pool_remove_sent_messages(char kind)
{
int i;
- PreparedStatementList *pslist;
- bool in_progress;
-
- in_progress = pool_is_query_in_progress();
+ POOL_SENT_MESSAGE_LIST *msglist;
if (!session_context)
{
- pool_error("pool_remove_prepared_statement_by_pstmt_name: session context is not initialized");
- return;
- }
-
- if (*name == '\0')
- {
- if (session_context->unnamed_pstmt)
- {
- pool_query_context_destroy(session_context->unnamed_pstmt->qctxt);
- pool_memory_free(session_context->memory_context,
- session_context->unnamed_pstmt);
- session_context->unnamed_pstmt = NULL;
- }
- if (in_progress)
- pool_set_query_in_progress();
+ pool_error("pool_remove_sent_messages: session context is not initialized");
return;
}
- pslist = &session_context->pstmt_list;
+ msglist = &session_context->message_list;
- for (i = 0; i < pslist->size; i++)
+ for (i = 0; i < msglist->size; i++)
{
- if (strcmp(pslist->pstmts[i]->name, name) == 0)
+ if (msglist->sent_messages[i]->kind == kind)
{
- if (can_prepared_statement_destroy(pslist->pstmts[i]->qctxt))
- pool_query_context_destroy(pslist->pstmts[i]->qctxt);
- pool_memory_free(session_context->memory_context, pslist->pstmts[i]);
- break;
+ if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name))
+ i--; /* for relocation by removing */
}
}
-
- /* prepared statement not found */
- if (i == pslist->size)
- {
- if (in_progress)
- pool_set_query_in_progress();
- return;
- }
-
- if (i != pslist->size - 1)
- {
- memmove(&pslist->pstmts[i], &pslist->pstmts[i+1],
- sizeof(PreparedStatement *) * (pslist->size - i - 1));
- }
- pslist->size--;
-
- /*
- * prepared statements and portals are closed separately
- * by a frontend.
- */
- /* pool_remove_portal_by_pstmt_name(name); */
-
- if (in_progress)
- pool_set_query_in_progress();
-}
-
-/*
- * Remove a pending prepared statement from prepared statement list
- */
-void pool_remove_prepared_statement(void)
-{
- char *name;
-
- if (!session_context)
- {
- pool_error("pool_remove_prepared_statement: session context is not initialized");
- return;
- }
-
- if (session_context->pending_pstmt)
- {
- name = session_context->pending_pstmt->name;
- pool_remove_prepared_statement_by_pstmt_name(name);
- }
- else
- {
- pool_debug("pool_remove_prepared_statement: pending prepared statement is NULL");
- }
}
-
/*
- * Remove a pending portal from portal list
+ * Destroy sent message
*/
-void pool_remove_portal(void)
+void pool_sent_message_destroy(POOL_SENT_MESSAGE *message)
{
- char *name;
+ bool in_progress;
+ POOL_QUERY_CONTEXT *qc = NULL;
if (!session_context)
{
- pool_error("pool_remove_portal: session context is not initialized");
+ pool_error("pool_sent_message_destroy: session context is not initialized");
return;
}
- if (session_context->pending_portal)
- {
- name = session_context->pending_portal->name;
- pool_remove_portal_by_portal_name(name);
- }
-}
-
-/*
- * Remove pending objects
- */
-void pool_remove_pending_objects(void)
-{
- PreparedStatement *ps;
- Portal *p;
-
- ps = session_context->pending_pstmt;
-
- if (ps && ps->name)
- pool_memory_free(session_context->memory_context, ps->name);
+ in_progress = pool_is_query_in_progress();
- if (ps && ps->qctxt)
+ if (message)
{
- if (can_prepared_statement_destroy(ps->qctxt) &&
- can_portal_destroy(ps->qctxt))
- pool_query_context_destroy(ps->qctxt);
- }
-
- if (ps)
- pool_memory_free(session_context->memory_context, ps);
-
- p = session_context->pending_portal;
+ if (message->contents)
+ pool_memory_free(session_context->memory_context, message->contents);
+
+ if (message->name)
+ pool_memory_free(session_context->memory_context, message->name);
- if (p && p->name)
- pool_memory_free(session_context->memory_context, p->name);
+ if (message->query_context)
+ {
+ if (session_context->query_context != message->query_context)
+ qc = session_context->query_context;
- if (p && p->pstmt)
- pool_memory_free(session_context->memory_context, p->pstmt);
+ if (can_query_context_destroy(message->query_context))
+ {
+ pool_query_context_destroy(message->query_context);
+ /*
+ * set in_progress flag, because pool_query_context_destroy()
+ * unsets in_progress flag
+ */
+ if (in_progress)
+ pool_set_query_in_progress();
+ /*
+ * set query_context of session_context, because
+ * pool_query_context_destroy() sets it to NULL.
+ */
+ if (qc)
+ session_context->query_context = qc;
+ }
+ }
- if (p && p->qctxt)
- {
- if (can_portal_destroy(p->qctxt) &&
- can_prepared_statement_destroy(p->qctxt))
- pool_query_context_destroy(p->qctxt);
+ if (session_context->memory_context)
+ pool_memory_free(session_context->memory_context, message);
}
-
- if (p)
- pool_memory_free(session_context->memory_context, p);
-
- session_context->pending_pstmt = NULL;
- session_context->pending_portal = NULL;
}
/*
- * Clear prepared statement list and portal list
+ * Clear sent message list
*/
-void pool_clear_prepared_statement_list(void)
+void pool_clear_sent_message_list(void)
{
- PreparedStatementList *pslist;
+ POOL_SENT_MESSAGE_LIST *msglist;
if (!session_context)
{
- pool_error("pool_clear_prepared_statement_list: session context is not initialized");
+ pool_error("pool_clear_sent_message_list: session context is not initialized");
return;
}
- pslist = &session_context->pstmt_list;
+ msglist = &session_context->message_list;
- while (pslist->size > 0)
+ while (msglist->size > 0)
{
- pool_remove_prepared_statement_by_pstmt_name(pslist->pstmts[0]->name);
+ pool_remove_sent_messages(msglist->sent_messages[0]->kind);
}
}
/*
- * Create a prepared statement
- * len: the length of parse message which is not network byte order
- * contents: the contents of parse message
+ * Create a sent message
+ * kind: one of 'P':Parse, 'B':Bind or'Q':Query(PREPARE)
+ * len: message length that is not network byte order
+ * contents: message contents
+ * num_tsparams: number of timestamp parameters
+ * name: prepared statement name or portal name
*/
-PreparedStatement *pool_create_prepared_statement(const char *name,
- int num_tsparams,
- int len, char *contents,
- POOL_QUERY_CONTEXT *qc)
+POOL_SENT_MESSAGE *pool_create_sent_message(char kind, int len, char *contents,
+ int num_tsparams, const char *name,
+ POOL_QUERY_CONTEXT *query_context)
{
- PreparedStatement *ps;
+ POOL_SENT_MESSAGE *msg;
if (!session_context)
{
- pool_error("pool_create_prepared_statement: session context is not initialized");
+ pool_error("pool_create_sent_message: session context is not initialized");
return NULL;
}
- ps = pool_memory_alloc(session_context->memory_context,
- sizeof(PreparedStatement));
- ps->name = pool_memory_strdup(session_context->memory_context, name);
- ps->num_tsparams = num_tsparams;
- ps->parse_len = len;
- ps->parse_contents = pool_memory_alloc(session_context->memory_context, len);
- memcpy(ps->parse_contents, contents, len);
+ msg = pool_memory_alloc(session_context->memory_context,
+ sizeof(POOL_SENT_MESSAGE));
+ msg->kind = kind;
+ msg->len = len;
+ msg->contents = pool_memory_alloc(session_context->memory_context, len);
+ memcpy(msg->contents, contents, len);
+ msg->num_tsparams = num_tsparams;
+ msg->name = pool_memory_strdup(session_context->memory_context, name);
+ msg->query_context = query_context;
-#ifdef NOT_USED
- /*
- * duplicate query_context because session_context->query_context is
- * freed by pool_query_context_destroy()
- */
- q = malloc(sizeof(POOL_QUERY_CONTEXT));
- if (q == NULL)
- {
- pool_error("pool_create_prepared_statement: malloc failed: %s", strerror(errno));
- exit(1);
- }
- ps->qctxt = memcpy(q, qc, sizeof(POOL_QUERY_CONTEXT));
-#endif
- ps->qctxt = qc;
-
- return ps;
-}
-
-/*
- * Create a portal
- */
-Portal *pool_create_portal(const char *name, int num_tsparams, PreparedStatement *pstmt)
-{
- Portal *portal;
-
- if (!session_context)
- {
- pool_error("pool_create_portal: session context is not initialized");
- return NULL;
- }
-
- portal = pool_memory_alloc(session_context->memory_context, sizeof(Portal));
- portal->name = pool_memory_strdup(session_context->memory_context, name);
- portal->num_tsparams = num_tsparams;
- portal->pstmt = pstmt;
- portal->qctxt = pstmt->qctxt;
-
- return portal;
+ return msg;
}
/*
- * Add a prepared statement to prepared statement list
+ * Add a sent message to sent message list
*/
-void pool_add_prepared_statement(void)
+void pool_add_sent_message(POOL_SENT_MESSAGE *message)
{
- PreparedStatement *ps;
- PreparedStatementList *pslist;
+ POOL_SENT_MESSAGE *old_msg;
+ POOL_SENT_MESSAGE_LIST *msglist;
if (!session_context)
{
- pool_error("pool_add_prepared_statement: session context is not initialized");
+ pool_error("pool_add_sent_message: session context is not initialized");
return;
}
- if (!session_context->pending_pstmt)
+ if (!message)
{
- pool_debug("pool_add_prepared_statement: pending prepared statement is NULL");
+ pool_debug("pool_add_sent_message: message is NULL");
return;
}
- ps = pool_get_prepared_statement_by_pstmt_name(session_context->pending_pstmt->name);
- pslist = &session_context->pstmt_list;
+ old_msg = pool_get_sent_message(message->kind, message->name);
+ msglist = &session_context->message_list;
- if (ps)
- {
- pool_remove_prepared_statement_by_pstmt_name(ps->name);
- if (*session_context->pending_pstmt->name == '\0')
- {
- session_context->unnamed_pstmt = session_context->pending_pstmt;
- session_context->query_context = session_context->pending_pstmt->qctxt;
- }
- else
- {
- pool_error("pool_add_prepared_statement: prepared statement \"%s\" already exists",
- session_context->pending_pstmt->name);
- }
- }
- else
+ if (old_msg)
{
- if (*session_context->pending_pstmt->name == '\0')
- {
- session_context->unnamed_pstmt = session_context->pending_pstmt;
- }
+ if (message->kind == 'B')
+ pool_debug("pool_add_sent_message: portal \"%s\" already exists",
+ message->name);
else
- {
- if (pslist->size == pslist->capacity)
- {
- pslist->capacity *= 2;
- pslist->pstmts = realloc(pslist->pstmts, sizeof(PreparedStatement *) * pslist->capacity);
- if (pslist->pstmts == NULL)
- {
- pool_error("pool_add_prepared_statement: realloc failed: %s", strerror(errno));
- exit(1);
- }
- }
- pslist->pstmts[pslist->size++] = session_context->pending_pstmt;
- }
- }
-}
+ pool_debug("pool_add_sent_message: prepared statement \"%s\" already exists",
+ message->name);
-/*
- * Add a portal to portal list
- */
-void pool_add_portal(void)
-{
- Portal *p;
- PortalList *plist;
-
- if (!session_context)
- {
- pool_error("pool_add_portal: session context is not initialized");
- return;
+ if (*message->name == '\0')
+ pool_remove_sent_message(old_msg->kind, old_msg->name);
+ else
+ return;
}
- if (!session_context->pending_portal)
+ if (msglist->size == msglist->capacity)
{
- pool_debug("pool_add_portal: pending portal is NULL");
- return;
- }
-
- p = pool_get_portal_by_portal_name(session_context->pending_portal->name);
- plist = &session_context->portal_list;
-
- if (p)
- {
- pool_remove_portal_by_portal_name(p->name);
- if (*session_context->pending_portal->name == '\0')
- {
- session_context->unnamed_portal = session_context->pending_portal;
- }
- else
+ msglist->capacity *= 2;
+ msglist->sent_messages = realloc(msglist->sent_messages,
+ sizeof(POOL_SENT_MESSAGE *) * msglist->capacity);
+ if (!msglist->sent_messages)
{
- pool_error("pool_add_portal: portal \"%s\" already exists",
- session_context->pending_portal->name);
+ pool_error("pool_add_sent_message: realloc failed: %s", strerror(errno));
+ exit(1);
}
}
- else
- {
- if (*session_context->pending_portal->name == '\0')
- {
- session_context->unnamed_portal = session_context->pending_portal;
- }
- else
- {
- if (plist->size == plist->capacity)
- {
- plist->capacity *= 2;
- plist->portals = realloc(plist->portals, sizeof(Portal *) * plist->capacity);
- if (plist->portals == NULL)
- {
- pool_error("pool_add_portal: realloc failed: %s", strerror(errno));
- exit(1);
- }
- }
- plist->portals[plist->size++] = session_context->pending_portal;
- }
- }
-}
-
-/*
- * Get a prepared statement by prepared statement name
- */
-PreparedStatement *pool_get_prepared_statement_by_pstmt_name(const char *name)
-{
- int i;
- PreparedStatementList *pslist;
- if (!session_context)
- {
- pool_error("pool_get_prepared_statement_by_pstmt_name: session context is not initialized");
- return NULL;
- }
-
- if (*name == '\0')
- return session_context->unnamed_pstmt;
-
- pslist = &session_context->pstmt_list;
-
- for (i = 0; i < pslist->size; i++)
- {
- if (strcmp(pslist->pstmts[i]->name, name) == 0)
- return pslist->pstmts[i];
- }
-
- return NULL;
+ msglist->sent_messages[msglist->size++] = message;
}
/*
- * Get a portal by portal name
+ * Get a sent message
*/
-Portal *pool_get_portal_by_portal_name(const char *name)
+POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name)
{
int i;
- PortalList *plist;
+ POOL_SENT_MESSAGE_LIST *msglist;
if (!session_context)
{
- pool_error("pool_get_portal_by_portal_name: session context is not initialized");
+ pool_error("pool_get_sent_message: session context is not initialized");
return NULL;
}
- if (*name == '\0')
- return session_context->unnamed_portal;
-
- plist = &session_context->portal_list;
+ msglist = &session_context->message_list;
- for (i = 0; i < plist->size; i++)
+ for (i = 0; i < msglist->size; i++)
{
- if (strcmp(plist->portals[i]->name, name) == 0)
- return plist->portals[i];
+ if (msglist->sent_messages[i]->kind == kind &&
+ !strcmp(msglist->sent_messages[i]->name, name))
+ return msglist->sent_messages[i];
}
return NULL;
{
memcpy(dest, src, sizeof(bool)*MAX_NUM_BACKENDS);
}
-
+#ifdef NOT_USED
/*
* Add to send map a PREPARED statement
*/
}
}
}
-
-/*
- * Initialize prepared statement list
- */
-static void init_prepared_statement_list(void)
-{
- PreparedStatementList *pslist;
-
- pslist = &session_context->pstmt_list;
- pslist->size = 0;
- pslist->capacity = INIT_LIST_SIZE;
- pslist->pstmts = malloc(sizeof(PreparedStatement *) * INIT_LIST_SIZE);
- if (pslist->pstmts == NULL)
- {
- pool_error("init_prepared_statement_list: malloc failed: %s", strerror(errno));
- exit(1);
- }
-}
-
+#endif /* NOT_USED */
/*
- * Initialize portal list
+ * Initialize sent message list
*/
-static void init_portal_list(void)
+static void init_sent_message_list(void)
{
- PortalList *plist;
+ POOL_SENT_MESSAGE_LIST *msglist;
- plist = &session_context->portal_list;
- plist->size = 0;
- plist->capacity = INIT_LIST_SIZE;
- plist->portals = malloc(sizeof(Portal *) * INIT_LIST_SIZE);
- if (plist->portals == NULL)
+ msglist = &session_context->message_list;
+ msglist->size = 0;
+ msglist->capacity = INIT_LIST_SIZE;
+ msglist->sent_messages = malloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE);
+ if (!msglist->sent_messages)
{
- pool_error("init_portal_list: malloc failed: %s", strerror(errno));
+ pool_error("init_sent_message_list: malloc failed: %s", strerror(errno));
exit(1);
}
}
-static bool can_prepared_statement_destroy(POOL_QUERY_CONTEXT *qc)
+static bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc)
{
int i;
- PortalList *plist;
+ int count = 0;
+ POOL_SENT_MESSAGE_LIST *msglist;
- plist = &session_context->portal_list;
+ msglist = &session_context->message_list;
- for (i = 0; i < plist->size; i++)
+ for (i = 0; i < msglist->size; i++)
{
- if (plist->portals[i]->qctxt == qc)
- {
- pool_debug("can_prepared_statement_destroy: query context is still used.");
- return false;
- }
+ if (msglist->sent_messages[i]->query_context == qc)
+ count++;
}
-
- return true;
-}
-
-static bool can_portal_destroy(POOL_QUERY_CONTEXT *qc)
-{
- int i;
- PreparedStatementList *pslist;
-
- pslist = &session_context->pstmt_list;
-
- for (i = 0; i < pslist->size; i++)
+ if (count > 1)
{
- if (pslist->pstmts[i]->qctxt == qc)
- {
- pool_debug("can_portal_destroy: query context is still used.");
- return false;
- }
+ pool_debug("can_query_context_destroy: query context is still used.");
+ return false;
}
return true;
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2010 PgPool Global Development Group
+ * Copyright (c) 2003-2011 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
#include "pool_query_context.h"
#include "parser/pool_memory.h"
-/*
- * Prepared Statement:
- */
-typedef struct {
- char *name; /* prepared statement name */
- int num_tsparams;
- int parse_len; /* the length of parse message which is
- not network byte order */
- char *parse_contents; /* contents of parse message */
- POOL_QUERY_CONTEXT *qctxt;
-} PreparedStatement;
-
-/*
- * Prepared statement list:
- */
-typedef struct {
- int capacity; /* capacity of list */
- int size; /* number of PreparedStatement */
- PreparedStatement **pstmts; /* prepared statement list */
-} PreparedStatementList;
-
-/*
- * Portal:
- */
-typedef struct {
- char *name; /* portal name */
- int num_tsparams;
- PreparedStatement *pstmt;
- POOL_QUERY_CONTEXT *qctxt;
-} Portal;
-
-/*
- * Portal list:
- */
-typedef struct {
- int capacity; /* capacity of list */
- int size; /* number of portal */
- Portal **portals; /* portal list */
-} PortalList;
-
/*
* Transaction isolation mode
*/
POOL_READ_COMMITTED, /* Read committed */
POOL_SERIALIZABLE /* Serializable */
} POOL_TRANSACTION_ISOLATION;
-
+#ifdef NOT_USED
/*
* where to send map for PREPARE/EXECUTE/DEALLOCATE
*/
char name[POOL_MAX_PREPARED_STATEMENTS][POOL_MAX_PREPARED_NAME]; /* Prepared statement name */
bool where_to_send[POOL_MAX_PREPARED_STATEMENTS][MAX_NUM_BACKENDS];
} POOL_PREPARED_SEND_MAP;
-
+#endif /* NOT_USED */
+typedef struct {
+ char kind; /* one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE) */
+ int len; /* not network byte order */
+ char *contents;
+ int num_tsparams;
+ char *name; /* object name of prepared statement or portal */
+ POOL_QUERY_CONTEXT *query_context;
+} POOL_SENT_MESSAGE;
+
+typedef struct {
+ int capacity; /* capacity of list */
+ int size; /* number of elements */
+ POOL_SENT_MESSAGE **sent_messages;
+} POOL_SENT_MESSAGE_LIST;
+
/*
* Per session context:
*/
* "PreparedStatementList *pstmt_list" (see below).
*/
POOL_QUERY_CONTEXT *query_context;
-
+#ifdef NOT_USED
/* where to send map for PREPARE/EXECUTE/DEALLOCATE */
POOL_PREPARED_SEND_MAP prep_where;
-
+#endif /* NOT_USED */
POOL_MEMORY_POOL *memory_context; /* memory context for session */
- PreparedStatement *unnamed_pstmt; /* unnamed statement */
- PreparedStatement *pending_pstmt; /* used until receive backend response */
- Portal *unnamed_portal; /* unnamed portal */
- Portal *pending_portal; /* used until receive backend response */
- PreparedStatementList pstmt_list; /* named statement list */
- PortalList portal_list; /* named portal list */
+
+ /* message which does'nt receive complete message */
+ POOL_SENT_MESSAGE *uncompleted_message;
+
+ POOL_SENT_MESSAGE_LIST message_list;
int load_balance_node_id; /* selected load balance node id */
extern bool pool_is_ignore_till_sync(void);
extern void pool_set_ignore_till_sync(void);
extern void pool_unset_ignore_till_sync(void);
-extern void pool_remove_prepared_statement_by_pstmt_name(const char *name);
-extern void pool_remove_prepared_statement(void);
-extern void pool_remove_portal(void);
-extern void pool_remove_pending_objects(void);
-extern void pool_clear_prepared_statement_list(void);
-extern PreparedStatement *pool_create_prepared_statement(const char *name, int num_tsparams,
- int len, char *contents,
- POOL_QUERY_CONTEXT *qc);
-extern Portal *pool_create_portal(const char *name, int num_tsparams, PreparedStatement *pstmt);
-extern void pool_add_prepared_statement(void);
-extern void pool_add_portal(void);
-extern PreparedStatement *pool_get_prepared_statement_by_pstmt_name(const char *name);
-extern Portal *pool_get_portal_by_portal_name(const char *name);
-
+extern POOL_SENT_MESSAGE *pool_create_sent_message(char kind, int len, char *contents,
+ int num_tsparams, const char *name,
+ POOL_QUERY_CONTEXT *query_context);
+extern void pool_add_sent_message(POOL_SENT_MESSAGE *message);
+extern bool pool_remove_sent_message(char kind, const char *name);
+extern void pool_remove_sent_messages(char kind);
+extern void pool_clear_sent_message_list(void);
+extern void pool_sent_message_destroy(POOL_SENT_MESSAGE *message);
+extern POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name);
extern void pool_unset_writing_transaction(void);
extern void pool_set_writing_transaction(void);
extern bool pool_is_writing_transaction(void);
extern void pool_set_command_success(void);
extern bool pool_is_command_success(void);
extern void pool_copy_prep_where(bool *src, bool *dest);
+#ifdef NOT_USED
extern void pool_add_prep_where(char *name, bool *map);
extern bool *pool_get_prep_where(char *name);
extern void pool_delete_prep_where(char *name);
-
+#endif /* NOT_USED */
#endif /* POOL_SESSION_CONTEXT_H */
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2010 PgPool Global Development Group
+ * Copyright (c) 2003-2011 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
*/
char *
rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node,
- bool rewrite_to_params, PreparedStatement *pstmt)
+ bool rewrite_to_params, POOL_SENT_MESSAGE *message)
{
TSRewriteContext ctx;
Node *stmt;
rewrite = ctx.rewrite;
/* add params */
- if (pstmt)
+ if (message)
{
int i;
- for (i = 0; i < pstmt->num_tsparams; i++)
+ for (i = 0; i < message->num_tsparams; i++)
{
e_stmt->params = lappend(e_stmt->params, ctx.ts_const);
rewrite = true;
if (!rewrite)
return NULL;
- if (ctx.rewrite_to_params && pstmt)
+ if (ctx.rewrite_to_params && message)
{
ListCell *lc;
int num = ctx.num_params + 1;
}
/* save to portal */
- pstmt->num_tsparams = list_length(ctx.params);
+ message->num_tsparams = list_length(ctx.params);
/* add param type */
if (IsA(node, PrepareStmt))
int i;
PrepareStmt *p_stmt = (PrepareStmt *) node;
- for (i = 0; i < pstmt->num_tsparams; i++)
+ for (i = 0; i < message->num_tsparams; i++)
p_stmt->argtypes =
lappend(p_stmt->argtypes, SystemTypeName("timestamptz"));
}
* rewrite Bind message to add parameter
*/
char *
-bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, Portal *portal,
- const char *orig_msg, int *len)
+bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE *message,
+ const char *orig_msg, int *len)
{
int16 tmp2,
num_params,
ts_len = strlen(ts);
- *len += (strlen(ts) + sizeof(int32)) * portal->num_tsparams;
- new_msg = copy_to = (char *) malloc(*len + portal->num_tsparams * sizeof(int16));
+ *len += (strlen(ts) + sizeof(int32)) * message->num_tsparams;
+ new_msg = copy_to = (char *) malloc(*len + message->num_tsparams * sizeof(int16));
copy_from = orig_msg;
/* portal_name */
if (num_formats > 1)
{
/* enlarge message length */
- *len += portal->num_tsparams * sizeof(int16);
- tmp2 += portal->num_tsparams;
+ *len += message->num_tsparams * sizeof(int16);
+ tmp2 += message->num_tsparams;
}
tmp2 = htons(tmp2);
memcpy(copy_to, &tmp2, copy_len); /* copy number of format codes */
if (num_formats > 1)
{
/* set format codes to zero(text) */
- memset(copy_to, 0, portal->num_tsparams * 2);
- copy_to += sizeof(int16) * portal->num_tsparams;
+ memset(copy_to, 0, message->num_tsparams * 2);
+ copy_to += sizeof(int16) * message->num_tsparams;
}
/* num params */
memcpy(&tmp2, copy_from, sizeof(int16));
copy_len = sizeof(int16);
num_params = ntohs(tmp2);
- tmp2 = htons(num_params + portal->num_tsparams);
+ tmp2 = htons(num_params + message->num_tsparams);
memcpy(copy_to, &tmp2, sizeof(int16));
copy_to += copy_len; copy_from += copy_len;
copy_to += copy_len; copy_from += copy_len;
tmp4 = htonl(ts_len);
- for (i = 0; i < portal->num_tsparams; i++)
+ for (i = 0; i < message->num_tsparams; i++)
{
memcpy(copy_to, &tmp4, sizeof(int32));
copy_to += sizeof(int32);
+/* -*-pgsql-c-*- */
+/*
+ *
+ * $Header$
+ *
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2011 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_timestamp.h.: header file for pool_timestamp.c,
+ * pool_process_query.c, pool_proto_modules.c
+ *
+ */
+
#ifndef POOL_TIMESTAMP_H
#define POOL_TIMESTAMP_H
#include "pool.h"
#include "parser/nodes.h"
#include "pool_session_context.h"
-char *rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node, bool rewrite_to_params, PreparedStatement *pstmt);
-char *bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, Portal *portal, const char *orig_msg, int *len);
+char *rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node, bool rewrite_to_params, POOL_SENT_MESSAGE *message);
+char *bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, POOL_SENT_MESSAGE *message, const char *orig_msg, int *len);
#endif /* POOL_TIMESTAMP_H */