Add function parse_before_bind() which is used in Load balance mode.
authorToshihiro Kitagawa <kitagawa at sraoss.co.jp>
Wed, 21 Jul 2010 05:31:39 +0000 (05:31 +0000)
committerToshihiro Kitagawa <kitagawa at sraoss.co.jp>
Wed, 21 Jul 2010 05:31:39 +0000 (05:31 +0000)
Add function overwrite_map_for_deallocate() which is used to overwrite
where_ot_send map by a specified query context.
Add length and contents arguments to private functions that process message.
Remove pending_function variable.
Change ReadyForQuery() so that it does not destroy a query context in
extended query protocol.
Refactor following functions.
- Parse()
- Execute()
- ProcessFrontendResponse()
Add private functions that process message.
- Bind()
- Describe()
- Close()
- FunctionCall3()
- ParseComplete()
- BindComplete()
- CloseComplete()
- CommandComplete()
- ErrorResponse3()

pool.h
pool_proto_modules.c
pool_proto_modules.h

diff --git a/pool.h b/pool.h
index 8aafe7ae977f4cd284da44fabaa5a0d0b1c31ca0..4c29eb8da493e0926363c8517361c397df461562 100644 (file)
--- a/pool.h
+++ b/pool.h
@@ -457,7 +457,7 @@ extern signed char pool_read_kind(POOL_CONNECTION_POOL *cp);
 extern int pool_read_int(POOL_CONNECTION_POOL *cp);
 
 extern POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
-extern POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
+extern POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents);
 extern POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
 
 extern int pool_init_params(ParamStatus *params);
@@ -569,6 +569,8 @@ extern POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backen
 extern POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result, int major);
 extern void free_select_result(POOL_SELECT_RESULT *result);
 extern int compare(const void *p1, const void *p2);
+extern POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major);
+extern POOL_STATUS pool_discard_packet_contents(POOL_CONNECTION_POOL *cp);
 
 /* pool_auth.c */
 extern void pool_random_salt(char *md5Salt);
index 7ee3c2d1c4ca9e50f8dd828501e489e455ed68b2..4bac4341c2e9381ebe4ca4e8084f3afbdf203c4d 100644 (file)
@@ -54,6 +54,7 @@
 #include "parser/pool_string.h"
 #include "pool_session_context.h"
 #include "pool_query_context.h"
+#include "pool_lobj.h"
 
 int force_replication;
 int replication_was_enabled;           /* replication mode was enabled */
@@ -97,16 +98,18 @@ char *parsed_query = NULL;
 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
 static void generate_error_message(char *prefix, int specific_error, char *query);
 static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
+static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
+                                                                        POOL_CONNECTION_POOL *backend,
+                                                                        PreparedStatement *ps);
+static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context);
 
 /*
  * Process Query('Q') message
  * Query messages include an SQL string.
  */
- POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
-                                                POOL_CONNECTION_POOL *backend, char *query)
+POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
+                                               POOL_CONNECTION_POOL *backend, int len, char *contents)
 {
-       char *string;
-       int len;
        static char *sq = "show pool_status";
        int commit;
        List *parse_tree_list;
@@ -124,42 +127,20 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                return POOL_END;
        }
 
-       if (query == NULL)      /* need to read query from frontend? */
-       {
-               /* read actual query */
-               if (MAJOR(backend) == PROTO_MAJOR_V3)
-               {
-                       if (pool_read(frontend, &len, sizeof(len)) < 0)
-                               return POOL_END;
-                       len = ntohl(len) - 4;
-                       string = pool_read2(frontend, len);
-               }
-               else
-                       string = pool_read_string(frontend, &len, 0);
-
-               if (string == NULL)
-                       return POOL_END;
-       }
-       else
-       {
-               len = strlen(query)+1;
-               string = query;
-       }
-
        /* save last query string for logging purpose */
-       strncpy(query_string_buffer, string, sizeof(query_string_buffer));
+       strncpy(query_string_buffer, contents, sizeof(query_string_buffer));
 
        /* show ps status */
-       query_ps_status(string, backend);
+       query_ps_status(contents, backend);
 
        /* log query to log file if necessary */
        if (pool_config->log_statement)
        {
-               pool_log("statement: %s", string);
+               pool_log("statement: %s", contents);
        }
        else
        {
-               pool_debug("statement2: %s", string);
+               pool_debug("statement2: %s", contents);
        }
 
        /* Create query context */
@@ -174,7 +155,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
        pool_set_query_in_progress();
 
        /* parse SQL string */
-       parse_tree_list = raw_parser(string);
+       parse_tree_list = raw_parser(contents);
 
        if (parse_tree_list == NIL)
        {
@@ -189,7 +170,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                 */
                char *p = "INSERT INTO foo VALUES(1)";
 
-               pool_log("SimpleQuery: Unable to parse the query: %s", string);
+               pool_log("SimpleQuery: Unable to parse the query: %s", contents);
                parse_tree_list = raw_parser(p);
        }
 
@@ -216,14 +197,14 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                /*
                 * Start query context
                 */
-               pool_start_query(query_context, string, node);
+               pool_start_query(query_context, contents, node);
 
                if (PARALLEL_MODE)
                {
                        bool parallel = true;
 
                        is_parallel_table = is_partition_table(backend,node);
-                       status = pool_do_parallel_query(frontend, backend, node, &parallel, &string, &len);
+                       status = pool_do_parallel_query(frontend, backend, node, &parallel, &contents, &len);
                        if (parallel)
                        {
                                pool_query_context_destroy(query_context);
@@ -261,7 +242,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                }
 
                /* process status reporting? */
-               if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, strlen(sq)) == 0)
+               if (IsA(node, VariableShowStmt) && strncasecmp(sq, contents, strlen(sq)) == 0)
                {
                        StartupPacket *sp;
                        char psbuf[1024];
@@ -277,6 +258,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
 
                        free_parser();
                        pool_query_context_destroy(query_context);
+                       pool_set_pool_status_stmt();
                        return POOL_CONTINUE;
                }
 
@@ -293,9 +275,9 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                        if (IsA(node, PrepareStmt))
                        {
                                PreparedStatement *ps;
-#ifdef NOT_USED
+
                                ps = pool_create_prepared_statement(((PrepareStmt *)node)->name, 
-                                                                                                       0, query_context);
+                                                                                                       0, 0, NULL, query_context);
                                if (ps == NULL)
                                {
                                        pool_error("SimpleQuery: failed to create prepared statement: %s", strerror(errno));
@@ -303,19 +285,17 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                                }
 
                                session_context->pending_pstmt = ps;
-                               session_context->pending_function = pool_add_prepared_statement;
-#endif
                        }
                        else if (IsA(node, DeallocateStmt))
                        {
                                char *name;
                                PreparedStatement *ps;
-#ifdef NOT_USED
+
                                name = ((DeallocateStmt *)node)->name;
                                if (name == NULL)
-                                       ps = pool_create_prepared_statement("", 0, query_context);
+                                       ps = pool_create_prepared_statement("", 0, 0, NULL, query_context);
                                else
-                                       ps = pool_create_prepared_statement(name, 0, query_context);
+                                       ps = pool_create_prepared_statement(name, 0, 0, NULL, query_context);
                                if (ps == NULL)
                                {
                                        pool_error("SimpleQuery: failed to create prepared statement: %s", strerror(errno));
@@ -324,11 +304,11 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
 
                                session_context->pending_pstmt = ps;
 
+#ifdef NOT_USED
                                if (name == NULL)
                                        session_context->pending_function = pool_clear_prepared_statement_list;
                                else
                                        session_context->pending_function = pool_remove_prepared_statement;
-#endif
                        }
                        else if (IsA(node, DiscardStmt))
                        {
@@ -337,14 +317,14 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                                {
                                        session_context->pending_pstmt = NULL;
                                        session_context->pending_portal = NULL;
-                                       session_context->pending_function = pool_clear_prepared_statement_list;
                                }
+#endif
                        }
                }
 
                if (frontend && IsA(node, ExecuteStmt))
                {
-#ifdef NOT_USED                        
+#ifdef NOT_USED
                        PreparedStatement *ps;
 
                        ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *)node)->name);
@@ -385,10 +365,10 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                                        return POOL_END;
 
                                /* check if need lock */
-                               if (need_insert_lock(backend, string, node))
+                               if (need_insert_lock(backend, contents, node))
                                {
                                        /* if so, issue lock command */
-                                       status = insert_lock(frontend, backend, string, (InsertStmt *)node);
+                                       status = insert_lock(frontend, backend, contents, (InsertStmt *)node);
                                        if (status != POOL_CONTINUE)
                                        {
                                                free_parser();
@@ -397,7 +377,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                                }
                        }
                }
-               else if (REPLICATION && query == NULL && start_internal_transaction(frontend, backend, node))
+               else if (REPLICATION && contents == NULL && start_internal_transaction(frontend, backend, node))
                {
                        free_parser();
                        return POOL_ERROR;
@@ -423,7 +403,8 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                 */
                if (!commit)
                {
-                       char            *rewrite_query;
+                       char *rewrite_query;
+                       char *string;
 
                        if (node)
                        {
@@ -431,10 +412,8 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
 
                                if (IsA(node, PrepareStmt))
                                {
-#ifdef NOT_USED
                                        ps = session_context->pending_pstmt;
                                        ps->num_tsparams = 0;
-#endif
                                }
                                else if (IsA(node, ExecuteStmt))
                                        ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *) node)->name);
@@ -444,7 +423,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                                if (rewrite_query != NULL)
                                {
                                        query_context->rewritten_query = rewrite_query;
-                                       len = strlen(string) + 1;
+                                       len = strlen(rewrite_query) + 1;
                                }
 
                        }
@@ -480,7 +459,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                /*
                 * Send the query to other than master node.
                 */
-               if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+               if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "") != POOL_CONTINUE)
                {
                        free_parser();
                        return POOL_END;
@@ -489,7 +468,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
                /* Send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
                if (commit)
                {
-                       if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+                       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
                        {
 /*
                                free_parser();
@@ -501,7 +480,7 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
        }
        else
        {
-               if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
+               if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "") != POOL_CONTINUE)
                {
                        free_parser();
                        return POOL_END;
@@ -516,18 +495,18 @@ static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
 /*
  * process EXECUTE (V3 only)
  */
-POOL_STATUS Execute(POOL_CONNECTION *frontend,
-                                                  POOL_CONNECTION_POOL *backend)
+POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                       int len, char *contents)
 {
-       char *string;           /* portal name + null terminate + max_tobe_returned_rows */
-       int len;
-       char kind;
-       int status, commit = 0;
-       PreparedStatement *ps;
+//     char kind;
+//     int status;
+       int commit = 0;
+//     PreparedStatement *ps;
        Portal *portal;
-       char *string1 = NULL;
-       PrepareStmt *p_stmt;
-       POOL_STATUS ret;
+       char *query = NULL;
+       Node *node;
+//     PrepareStmt *p_stmt;
+//     POOL_STATUS ret;
        int specific_error = 0;
        POOL_SESSION_CONTEXT *session_context;
        POOL_QUERY_CONTEXT *query_context;
@@ -540,76 +519,71 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
                return POOL_END;
        }
 
-       /* read Execute packet */
-       if (pool_read(frontend, &len, sizeof(len)) < 0)
-               return POOL_END;
-
-       len = ntohl(len) - 4;
-       string = pool_read2(frontend, len);
-
-       pool_debug("Execute: portal name <%s>", string);
-
-       portal = pool_get_portal_by_portal_name(string);
-       ps = pool_get_prepared_statement_by_pstmt_name(portal->pstmt->name);
-       query_context = ps->qctxt;
+       pool_debug("Execute: portal name <%s>", contents);
 
-       /* load balance trick */
-       if (portal)
+       portal = pool_get_portal_by_portal_name(contents);
+       if (portal == NULL)
        {
-               Node *node;
+               pool_error("Execute: cannot get portal");
+               return POOL_END;
+       }
+       if (portal->pstmt == NULL)
+       {
+               pool_error("Execute: cannot get prepared statement");
+               return POOL_END;
+       }
+       if (portal->pstmt->qctxt == NULL)
+       {
+               pool_error("Execute: cannot get query context");
+               return POOL_END;
+       }
+       if (portal->pstmt->qctxt->parse_tree== NULL)
+       {
+               pool_error("Execute: cannot get parse tree");
+               return POOL_END;
+       }
 
-               p_stmt = (PrepareStmt *)portal->pstmt->qctxt->parse_tree;
+       query_context = portal->pstmt->qctxt;
+       node = query_context->parse_tree;
+       query = portal->pstmt->qctxt->original_query;
+       pool_debug("Execute: query: %s", query);
+       strncpy(query_string_buffer, query, sizeof(query_string_buffer));
 
-               string1 = portal->pstmt->qctxt->original_query;
-               pool_debug("Execute: query: %s", string1);
-               node = (Node *)p_stmt;
-               strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
+//     pool_set_query_in_progress();
 
-               /*
-                * Start query context
-                */
-               pool_start_query(query_context, string1, node);
+       /*
+        * Decide where to send query
+        */
+       session_context->query_context = query_context;
+       pool_where_to_send(query_context, query, node);
 
-               /*
-                * Decide where to send query
-                */
-               pool_where_to_send(query_context, query_context->original_query, node);
 
-               if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
-                        IsA(node, VariableSetStmt)) &&
-                       MASTER_SLAVE && TSTATE(backend, MASTER_NODE_ID) != 'E')
-               {
-                       /*
-                        * PREPARE, DEALLOCATE, SET, DISCARD
-                        * should be executed on all nodes.  So we set
-                        * force_replication.
-                        */
-                       force_replication = 1;
-               }
-               /*
-                * JDBC driver sends "BEGIN" query internally if
-                * setAutoCommit(false).  But it does not send Sync message
-                * after "BEGIN" query.  In extended query protocol,
-                * PostgreSQL returns ReadyForQuery when a client sends Sync
-                * message.  Problem is, pgpool can't know the transaction
-                * state without receiving ReadyForQuery. So we remember that
-                * we need to send Sync message internally afterward, whenever
-                * we receive BEGIN in extended protocol.
-                */
-               else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
-               {
-                       TransactionStmt *stmt = (TransactionStmt *) node;
+       if (IsA(query_context->parse_tree, DeallocateStmt))
+               overwrite_map_for_deallocate(query_context);
 
-                       if (stmt->kind == TRANS_STMT_BEGIN ||
-                               stmt->kind == TRANS_STMT_START)
-                               /* Remember we need to send sync later in extended protocol */
-                               receive_extended_begin = 1;
-               }
+#ifdef NOT_USED
+       /*
+        * JDBC driver sends "BEGIN" query internally if
+        * setAutoCommit(false).  But it does not send Sync message
+        * after "BEGIN" query.  In extended query protocol,
+        * PostgreSQL returns ReadyForQuery when a client sends Sync
+        * message.  Problem is, pgpool can't know the transaction
+        * state without receiving ReadyForQuery. So we remember that
+        * we need to send Sync message internally afterward, whenever
+        * we receive BEGIN in extended protocol.
+        */
+       if (IsA(node, TransactionStmt) && MASTER_SLAVE)
+       {
+               TransactionStmt *stmt = (TransactionStmt *) node;
 
-               /* check if query is "COMMIT" or "ROLLBACK" */
-               commit = is_commit_query((Node *)p_stmt->query);
+               if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
+                       /* Remember we need to send sync later in extended protocol */
+                       receive_extended_begin = 1;
        }
+#endif
 
+       /* check if query is "COMMIT" or "ROLLBACK" */
+       commit = is_commit_query(node);
 
        if (REPLICATION || PARALLEL_MODE)
        {
@@ -619,7 +593,7 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
                if (!commit)
                {
                        /* Send the query to master node */
-                       if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+                       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
                        {
                                return POOL_END;
                        }
@@ -630,7 +604,7 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
                        if (specific_error)
                        {
                                /* log error message */
-                               generate_error_message("Execute: ", specific_error, string);
+                               generate_error_message("Execute: ", specific_error, contents);
                        }
                }
 
@@ -645,16 +619,16 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
                        if (pool_send_and_wait(query_context, msg, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
                                return POOL_END;
                }
-               else
-               {
-                       if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+//             else
+//             {
+                       if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
                                return POOL_END;
-               }
+//             }
                
                /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
                if (commit)
                {
-                       if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+                       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
                        {
                                return POOL_END;
                        }
@@ -662,12 +636,17 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
        }
        else
        {
-               if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+               if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
+               {
+                       return POOL_END;
+               }
+               if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "E") != POOL_CONTINUE)
                {
                        return POOL_END;
                }
        }
 
+#ifdef NOT_USED
        while ((ret = read_kind_from_backend(frontend, backend, &kind)) == POOL_CONTINUE)
        {
                /*
@@ -687,6 +666,7 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
        status = SimpleForwardToFrontend(kind, frontend, backend);
        if (status != POOL_CONTINUE)
                return status;
+#endif
 
        return POOL_CONTINUE;
 }
@@ -694,23 +674,19 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend,
 /*
  * process Parse (V3 only)
  */
-POOL_STATUS Parse(POOL_CONNECTION *frontend,
-                                                POOL_CONNECTION_POOL *backend)
+POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                 int len, char *contents)
 {
-       char kind;
-       int len;
-       char *string;
-       int i;
-       POOL_MEMORY_POOL *old_context;
-       PrepareStmt *p_stmt;
-       char *name, *stmt;
-       List *parse_tree_list;
-       Node *node = NULL;
        int deadlock_detected = 0;
        int insert_stmt_with_lock = 0;
-       POOL_STATUS status;
-       char per_node_statement_log_buffer[1024];
+       char *name;
+       char *stmt;
+       List *parse_tree_list;
+       Node *node = NULL;
+//     PrepareStmt *p_stmt;
        PreparedStatement *ps;
+       POOL_STATUS status;
+       POOL_MEMORY_POOL *old_context;
        POOL_SESSION_CONTEXT *session_context;
        POOL_QUERY_CONTEXT *query_context;
 
@@ -725,17 +701,14 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
        /* Create query context */
        query_context = pool_init_query_context();
 
-       /* read Parse packet */
-       if (pool_read(frontend, &len, sizeof(len)) < 0)
-               return POOL_END;
-
-       len = ntohl(len) - 4;
-       string = pool_read2(frontend, len);
+       pool_debug("Parse: statement name <%s>", contents);
 
-       pool_debug("Parse: statement name <%s>", string);
+       name = contents;
+       stmt = contents + strlen(contents) + 1;
 
-       name = string;
-       stmt = string + strlen(string) + 1;
+       /* switch memory context */
+       old_context = pool_memory;
+       pool_memory = query_context->memory_context;
 
        parse_tree_list = raw_parser(stmt);
        if (parse_tree_list == NIL)
@@ -752,75 +725,27 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
 
                insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
 
-               /* Special treatment for master/slave + temp tables */
-               if (MASTER_SLAVE)
-               {
-                       /* Is there "NO LOAD BALANCE" comment? */
-                       if (!strncasecmp(stmt, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ) ||
-                               /* or the table used in a query is a temporary one ? */
-                               is_temp_table(backend, node))
-                       {
-                               /*
-                                * From now on, let only master handle queries.  This is
-                                * typically usefull for using temp tables in master/slave
-                                * mode
-                                */
-                               master_slave_was_enabled = 1;
-                               MASTER_SLAVE = 0;
-                               master_slave_dml = 1;
-                       }
-               }
-
-               /* switch memory context */
-               old_context = pool_memory;
-               pool_memory = session_context->memory_context;
-
-               /* translate Parse message to PrepareStmt */
-               p_stmt = palloc(sizeof(PrepareStmt));
-               p_stmt->type = T_PrepareStmt;
-
-               /* XXX: there's a confusion here. Someone mixed up statement
-                * name with portal name. It is regarded that statment name ==
-                * portal name. Someday we should fix this. Sigh.
-                */
-               p_stmt->name = pstrdup(name);
-               p_stmt->query = copyObject(node);
-               ps = pool_create_prepared_statement(name, 0, query_context);
-
                /*
                 * Start query context
                 */
-               pool_start_query(query_context, pstrdup(stmt), (Node *)p_stmt);
+               pool_start_query(query_context, pstrdup(stmt), node);
+
+               ps = pool_create_prepared_statement(name, 0, len, contents, query_context);
+               session_context->pending_pstmt = ps;
 
                /*
                 * Decide where to send query
                 */
-               pool_where_to_send(query_context, query_context->original_query, (Node *)p_stmt);
-               
-               if (*name)
-               {
-                       session_context->pending_pstmt = ps;
-                       session_context->pending_function = pool_add_prepared_statement;
-               }
-               else /* unnamed statement */
-               {
-                       pfree(p_stmt->name);
-                       ps->name = "";
-                       session_context->pending_pstmt = ps;
-                       session_context->pending_function = pool_add_prepared_statement;
-               }
+               pool_where_to_send(query_context, query_context->original_query,
+                                                  query_context->parse_tree);
 
-               /*
-                * Switch to old memory context. Caution. Now we are in parser
-                * memory context.
-                * Palloced memories will be gone if free_parser() called!
-                */
-               pool_memory = old_context;
+               if (IsA(query_context->parse_tree, DeallocateStmt))
+                       overwrite_map_for_deallocate(query_context);
 
                if (REPLICATION)
                {
-                       char            *rewrite_query;
-                       bool             rewrite_to_params = true;
+                       char *rewrite_query;
+                       bool rewrite_to_params = true;
 
                        /*
                         * rewrite `now()'.
@@ -835,69 +760,72 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                        if (rewrite_query != NULL)
                        {
                                int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
-                               string = palloc(alloc_len);
-                               strcpy(string, name);
-                               strcpy(string + strlen(name) + 1, rewrite_query);
-                               memcpy(string + strlen(name) + strlen(rewrite_query) + 2,
-                                               stmt + strlen(stmt) + 1,
-                                               len - (strlen(name) + strlen(stmt) + 2));
-
-                               len = len - strlen(stmt) + strlen(rewrite_query);
-                               name = string;
-                               stmt = string + strlen(name) + 1;
-                               pool_debug("rewrite query  %s %s len=%d", name, stmt, len);
+                               contents = palloc(alloc_len);
+                               strcpy(contents, name);
+                               strcpy(contents + strlen(name) + 1, rewrite_query);
+                               memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
+                                          stmt + strlen(stmt) + 1,
+                                          len - (strlen(name) + strlen(stmt) + 2));
+
+                               len = alloc_len;
+                               name = contents;
+                               stmt = contents + strlen(name) + 1;
+                               pool_debug("Parse: rewrite query  %s %s len=%d", name, stmt, len);
+
+                               ps->parse_len = len;
+                               ps->parse_contents = contents;
                        }
                }
+       }
+       pool_memory = old_context;
 
-               if (REPLICATION)
+       if (REPLICATION)
+       {
+               char kind;
+
+               if (TSTATE(backend, MASTER_NODE_ID) != 'T')
                {
-                       char kind;
+                       int i;
 
-                       if (TSTATE(backend, MASTER_NODE_ID) != 'T')
+                       /* synchronize transaction state */
+                       for (i = 0; i < NUM_BACKENDS; i++)
                        {
-                               /* synchronize transaction state */
-                               for (i = 0; i < NUM_BACKENDS; i++)
-                               {
-                                       if (!VALID_BACKEND(i))
-                                               continue;
+                               if (!VALID_BACKEND(i))
+                                       continue;
 
-                                       /* send sync message */
-                                       send_extended_protocol_message(backend, i, "S", 0, "");
-                               }
+                               /* send sync message */
+                               send_extended_protocol_message(backend, i, "S", 0, "");
+                       }
 
-                               kind = pool_read_kind(backend);
-                               if (kind != 'Z')
-                               {
-                                       free_parser();
-                                       return POOL_END;
-                               }
+                       kind = pool_read_kind(backend);
+                       if (kind != 'Z')
+                       {
+//                             free_parser();
+                               return POOL_END;
+                       }
 
-                               if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
-                               {
-                                       free_parser();
-                                       return POOL_END;
-                               }
+                       if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
+                       {
+//                             free_parser();
+                               return POOL_END;
                        }
+               }
 
-                       if (is_strict_query(node))
-                               start_internal_transaction(frontend, backend, node);
+               if (is_strict_query(query_context->parse_tree))
+                       start_internal_transaction(frontend, backend, query_context->parse_tree);
 
-                       if (insert_stmt_with_lock)
+               if (insert_stmt_with_lock)
+               {
+                       /* start a transaction if needed and lock the table */
+                       status = insert_lock(frontend, backend, stmt, (InsertStmt *)query_context->parse_tree);
+                       if (status != POOL_CONTINUE)
                        {
-                               /* start a transaction if needed and lock the table */
-                               status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
-                               if (status != POOL_CONTINUE)
-                               {
-                                       free_parser();
-                                       return status;
-                               }
+//                             free_parser();
+                               return status;
                        }
                }
        }
 
-       /* send to master node */
-       snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
-
        /*
         * Cannot call free_parser() here. Since "string" might be allocated in parser context.
         * free_parser();
@@ -910,9 +838,9 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                 * locks.
                 */
                pool_debug("Parse: waiting for master completing the query");
-               if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+               if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
                {
-                       free_parser();
+//                     free_parser();
                        return POOL_END;
                }
 
@@ -925,7 +853,7 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
                if (deadlock_detected < 0)
                {
-                       free_parser();
+//                     free_parser();
                        return POOL_END;
                }
                else
@@ -936,7 +864,7 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                         * this case, PostgreSQL does not report what statement caused
                         * that error and make users confused.
                         */
-                       per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse(): Error or notice message from backend: ", true);
+                       per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
                }
 
                if (deadlock_detected)
@@ -946,25 +874,24 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                                                                   strlen(POOL_ERROR_QUERY)+1, -1,
                                                                   MASTER_NODE_ID, "") != POOL_CONTINUE)
                        {
-                               free_parser();
+//                             free_parser();
                                return POOL_END;
                        }
                }
                else
                {
-                       snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
-                       if (pool_send_and_wait(query_context, string, len, -1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+                       if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
                        {
-                               free_parser();
+//                             free_parser();
                                return POOL_END;
                        }
                }
        }
        else
        {
-               if (pool_send_and_wait(query_context, string, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
+               if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "P") != POOL_CONTINUE)
                {
-                       free_parser();
+//                     free_parser();
                        return POOL_END;
                }
        }
@@ -972,10 +899,12 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
        /*
         * Ok. we are safe to call free_parser();
         */
-       free_parser();
+//     free_parser();
 
+#ifdef NOT_USED
        for (;;)
        {
+               char kind;
                POOL_STATUS ret;
                ret = read_kind_from_backend(frontend, backend, &kind);
 
@@ -989,122 +918,373 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
                if (kind != 'N')
                        break;
        }
+#endif
+
        return POOL_CONTINUE;
+
 }
 
-/*
- * Process ReadyForQuery('Z') message.
- *
- * - if the global error status "mismatch_ntuples" is set, send an error query
- *      to all DB nodes to abort transaction.
- * - internal transaction is closed
- */
-POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
-                                                                POOL_CONNECTION_POOL *backend, int send_ready)
+POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                int len, char *contents)
 {
-       StartupPacket *sp;
-       char psbuf[1024];
-       int i;
-       int len;
-       signed char kind;
-       signed char state;
+       char *pstmt_name;
+       char *portal_name;
+       char *rewrite_msg;
+       Portal *portal = NULL;
+       PreparedStatement *pstmt = NULL;
        POOL_SESSION_CONTEXT *session_context;
+       POOL_QUERY_CONTEXT *query_context;
 
        /* Get session context */
        session_context = pool_get_session_context();
        if (!session_context)
        {
-               pool_error("ReadyForQuery: cannot get session context");
+               pool_error("Bind: cannot get session context");
                return POOL_END;
        }
 
        /*
-        * If the numbers of update tuples are differ, we need to abort transaction
-        * by using do_error_command. This only works with PROTO_MAJOR_V3.
+        * Rewrite message
         */
-       if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
-       {
-               int i;
-               signed char state;
-               char kind;
+       portal_name = contents;
+       pstmt_name = contents + strlen(portal_name) + 1;
 
-               /*
-                * XXX: discard rest of ReadyForQuery packet
-                */
-               if (pool_read_message_length(backend) < 0)
-                       return POOL_END;
+       pstmt = pool_get_prepared_statement_by_pstmt_name(pstmt_name);
+       if (pstmt == NULL)
+       {
+               pool_error("Bind: cannot get prepared statement \"%s\"", pstmt_name);
+               return POOL_END;
+       }
 
-               state = pool_read_kind(backend);
-               if (state < 0)
-                       return POOL_END;
+       portal = pool_create_portal(portal_name, pstmt->num_tsparams, pstmt);
+       if (portal == NULL)
+       {
+               pool_error("Bind: cannot create portal: %s", strerror(errno));
+               return POOL_END;
+       }
 
-               pool_debug("ReadyForQuery: transaction state: %c", state);
+       query_context = pstmt->qctxt;
+       if (query_context == NULL)
+       {
+               pool_error("Bind: cannot get query context");
+               return POOL_END;
+       }
 
-               for (i = 0; i < NUM_BACKENDS; i++)
-               {
-                       if (VALID_BACKEND(i))
-                       {
-                               /* abort transaction on all nodes. */
-                               do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
-                       }
-               }
+       session_context->pending_portal = portal;
 
-               /* loop through until we get ReadyForQuery */
-               for(;;)
-               {
-                       kind = pool_read_kind(backend);
-                       if (kind < 0)
-                               return POOL_END;
+       /* rewrite bind message */
+       if (REPLICATION && portal->num_tsparams > 0)
+       {
+               rewrite_msg = bind_rewrite_timestamp(backend, portal, contents, &len);
+               if (rewrite_msg != NULL)
+                       contents = rewrite_msg;
+       }
 
-                       if (kind == 'Z')
-                               break;
+       session_context->query_context = query_context;
+       pool_where_to_send(query_context, query_context->original_query,
+                                          query_context->parse_tree);
 
-                       /* put the message back to read buffer */
-                       for (i=0;i<NUM_BACKENDS;i++)
-                       {
-                               if (VALID_BACKEND(i))
-                               {
-                                       pool_unread(CONNECTION(backend,i), &kind, 1);
-                               }
-                       }
+       if (IsA(query_context->parse_tree, DeallocateStmt))
+               overwrite_map_for_deallocate(query_context);
 
-                       /* discard rest of the packet */
-                       if (pool_discard_packet(backend) != POOL_CONTINUE)
-                       {
-                               pool_error("ReadyForQuery: pool_discard_packet failed");
-                               return POOL_END;
-                       }
-               }
-               mismatch_ntuples = 0;
+       if (pool_config->load_balance_mode && pool_is_writing_transaction())
+       {
+               if(parse_before_bind(frontend, backend, pstmt) != POOL_CONTINUE)
+                       return POOL_END;
        }
 
-       /*
-        * if a transaction is started for insert lock, we need to close
-        * the transaction.
-        */
-       if (pool_is_query_in_progress() && allow_close_transaction)
+       pool_debug("Bind: waiting for master completing the query");
+       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "B")
+               != POOL_CONTINUE)
        {
-               if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
-                       return POOL_END;
+//             if (rewrite_msg != NULL)
+//                     free(rewrite_msg);
+               return POOL_END;
        }
 
-       if (MAJOR(backend) == PROTO_MAJOR_V3)
+       if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "B")
+               != POOL_CONTINUE)
        {
-               if ((len = pool_read_message_length(backend)) < 0)
-                       return POOL_END;
+//             if (rewrite_msg != NULL)
+//                     free(rewrite_msg);
+               return POOL_END;
+       }
 
-#ifdef NOT_USED
-               pool_debug("ReadyForQuery: message length: %d", len);
+//     if (rewrite_msg != NULL)
+//             free(rewrite_msg);
 
-               /*
-                * Do not check transaction state in master/slave mode.
-                * Because SET, PREPARE, DEALLOCATE are replicated.
-                * If these queries are executed inside a transaction block,
-                * transation state will be inconsistent. But it is no problem.
-                */
-               if (master_slave_dml)
-               {
-                       char kind, kind1;
+       return POOL_CONTINUE;
+}
+
+POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                                       int len, char *contents)
+{
+       Portal *portal = NULL;
+       PreparedStatement *pstmt = NULL;
+       POOL_SESSION_CONTEXT *session_context;
+       POOL_QUERY_CONTEXT *query_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("Describe: cannot get session context");
+               return POOL_END;
+       }
+
+       /* Prepared Statement */
+       if (*contents == 'S')
+       {
+               pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
+       }
+       /* Portal */
+       else
+       {
+               portal = pool_get_portal_by_portal_name(contents+1);
+               if (portal == NULL)
+               {
+                       pool_error("Describe: cannot get portal \"%s\"", contents+1);
+                       return POOL_END;
+               }
+
+               pstmt = portal->pstmt;
+       }
+
+       if (pstmt == NULL)
+       {
+               pool_error("Describe: cannot get prepared statement");
+               return POOL_END;
+       }
+
+       query_context = pstmt->qctxt;
+       if (query_context == NULL)
+       {
+               pool_error("Describe: cannot get query context");
+               return POOL_END;
+       }
+
+       session_context->query_context = query_context;
+       pool_where_to_send(query_context, query_context->original_query,
+                                          query_context->parse_tree);
+
+       if (IsA(query_context->parse_tree, DeallocateStmt))
+               overwrite_map_for_deallocate(query_context);
+
+       pool_debug("Describe: waiting for master completing the query");
+       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "D")
+               != POOL_CONTINUE)
+               return POOL_END;
+
+       if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "D")
+               != POOL_CONTINUE)
+               return POOL_END;
+
+       return POOL_CONTINUE;
+}
+
+POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                 int len, char *contents)
+{
+       Portal *portal = NULL;
+       PreparedStatement *pstmt = NULL;
+       POOL_SESSION_CONTEXT *session_context;
+       POOL_QUERY_CONTEXT *query_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("Close: cannot get session context");
+               return POOL_END;
+       }
+
+       /* Prepared Statement */
+       if (*contents == 'S')
+       {
+               pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1);
+               if (pstmt == NULL)
+               {
+                       pool_error("Close: cannot get prepared statement");
+                       return POOL_END;
+               }
+
+               session_context->pending_pstmt = pstmt;
+       }
+       /* Portal */
+       else if (*contents == 'P')
+       {
+               portal = pool_get_portal_by_portal_name(contents+1);
+               if (portal == NULL)
+               {
+                       pool_error("Close: cannot get portal");
+                       return POOL_END;
+               }
+
+               session_context->pending_portal = portal;
+       }
+       else
+       {
+               pool_error("Close: invalid message");
+               return POOL_END;
+       }
+
+       query_context = pstmt->qctxt;
+       if (query_context == NULL)
+       {
+               pool_error("Close: cannot get query context");
+               return POOL_END;
+       }
+
+       pool_where_to_send(query_context, query_context->original_query,
+                                          query_context->parse_tree);
+
+       pool_debug("Close: waiting for master completing the query");
+       if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "C")
+               != POOL_CONTINUE)
+               return POOL_END;
+
+       if (pool_send_and_wait(query_context, contents, len, -1, MASTER_NODE_ID, "C")
+               != POOL_CONTINUE)
+               return POOL_END;
+
+       return POOL_CONTINUE;
+}
+
+
+POOL_STATUS FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
+                                                 int len, char *contents)
+{
+       /*
+        * If Function call message for lo_creat, rewrite it
+        */
+       char *rewrite_lo;
+       int rewrite_len;
+
+       rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
+                                                                          backend, &rewrite_len);
+
+       if (rewrite_lo != NULL)
+       {
+               contents = rewrite_lo;
+               len = rewrite_len;
+       }
+       return  SimpleForwardToBackend('F', frontend, backend, len, contents);
+}
+
+/*
+ * Process ReadyForQuery('Z') message.
+ *
+ * - if the global error status "mismatch_ntuples" is set, send an error query
+ *      to all DB nodes to abort transaction.
+ * - internal transaction is closed
+ */
+POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
+                                                 POOL_CONNECTION_POOL *backend, int send_ready)
+{
+       StartupPacket *sp;
+       char psbuf[1024];
+       int i;
+       int len;
+       signed char kind;
+       signed char state;
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("ReadyForQuery: cannot get session context");
+               return POOL_END;
+       }
+
+       /*
+        * If the numbers of update tuples are differ, we need to abort transaction
+        * by using do_error_command. This only works with PROTO_MAJOR_V3.
+        */
+       if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
+       {
+               int i;
+               signed char state;
+               char kind;
+
+               /*
+                * XXX: discard rest of ReadyForQuery packet
+                */
+               if (pool_read_message_length(backend) < 0)
+                       return POOL_END;
+
+               state = pool_read_kind(backend);
+               if (state < 0)
+                       return POOL_END;
+
+               pool_debug("ReadyForQuery: transaction state: %c", state);
+
+               for (i = 0; i < NUM_BACKENDS; i++)
+               {
+                       if (VALID_BACKEND(i))
+                       {
+                               /* abort transaction on all nodes. */
+                               do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
+                       }
+               }
+
+               /* loop through until we get ReadyForQuery */
+               for(;;)
+               {
+                       kind = pool_read_kind(backend);
+                       if (kind < 0)
+                               return POOL_END;
+
+                       if (kind == 'Z')
+                               break;
+
+                       /* put the message back to read buffer */
+                       for (i=0;i<NUM_BACKENDS;i++)
+                       {
+                               if (VALID_BACKEND(i))
+                               {
+                                       pool_unread(CONNECTION(backend,i), &kind, 1);
+                               }
+                       }
+
+                       /* discard rest of the packet */
+                       if (pool_discard_packet(backend) != POOL_CONTINUE)
+                       {
+                               pool_error("ReadyForQuery: pool_discard_packet failed");
+                               return POOL_END;
+                       }
+               }
+               mismatch_ntuples = 0;
+       }
+
+       /*
+        * if a transaction is started for insert lock, we need to close
+        * the transaction.
+        */
+//     if (pool_is_query_in_progress() && allow_close_transaction)
+       if (allow_close_transaction)
+       {
+               if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
+                       return POOL_END;
+       }
+
+       if (MAJOR(backend) == PROTO_MAJOR_V3)
+       {
+               if ((len = pool_read_message_length(backend)) < 0)
+                       return POOL_END;
+
+#ifdef NOT_USED
+               pool_debug("ReadyForQuery: message length: %d", len);
+
+               /*
+                * Do not check transaction state in master/slave mode.
+                * Because SET, PREPARE, DEALLOCATE are replicated.
+                * If these queries are executed inside a transaction block,
+                * transation state will be inconsistent. But it is no problem.
+                */
+               if (master_slave_dml)
+               {
+                       char kind, kind1;
 
                        if (pool_read(MASTER(backend), &kind, sizeof(kind)))
                                return POOL_END;
@@ -1233,29 +1413,287 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
                         * If the query was not READ SELECT, remember that we had
                         * a write query in this transaction.
                         */
-                       else if (!is_select_query(node, query))
+                       else if (!is_select_query(node, query))
+                       {
+                               pool_set_writing_transaction();
+                       }
+               }
+               pool_unset_query_in_progress();
+       }
+
+       if (!pool_is_doing_extended_query_message())
+               pool_query_context_destroy(pool_get_session_context()->query_context);
+
+       sp = MASTER_CONNECTION(backend)->sp;
+       if (MASTER(backend)->tstate == 'T')
+               snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
+                                sp->user, sp->database, remote_ps_data);
+       else
+               snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
+                                sp->user, sp->database, remote_ps_data);
+       set_ps_display(psbuf, false);
+
+       return POOL_CONTINUE;
+}
+
+POOL_STATUS ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("ParseComplete: cannot get session context");
+               return POOL_END;
+       }
+
+       if (session_context->pending_pstmt != NULL)
+       {
+               POOL_QUERY_CONTEXT *qc;
+
+               pool_add_prepared_statement();
+
+               /* Set "parse done" to query_state */
+               qc = session_context->pending_pstmt->qctxt;
+               if (qc != NULL)
+                       pool_set_query_state(qc, 1);
+
+               session_context->pending_pstmt = NULL;
+       }
+
+       return SimpleForwardToFrontend('1', frontend, backend);
+}
+
+POOL_STATUS BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("BindComplete: cannot get session context");
+               return POOL_END;
+       }
+
+       if (session_context->pending_portal != NULL)
+       {
+               PreparedStatement *pstmt;
+
+               pool_add_portal();
+
+               /* Set "bind done" to query_state */
+               pstmt = session_context->pending_portal->pstmt;
+               if (pstmt != NULL && pstmt->qctxt != NULL)
+                       pool_set_query_state(pstmt->qctxt, 2);
+
+               session_context->pending_portal = NULL;
+       }
+
+       return SimpleForwardToFrontend('2', frontend, backend);
+}
+
+POOL_STATUS CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("CloseComplete: cannot get session context");
+               return POOL_END;
+       }
+
+       if (session_context->pending_pstmt != NULL)
+       {
+               pool_remove_prepared_statement();
+               session_context->pending_pstmt = NULL;
+       }
+       else if (session_context->pending_portal != NULL)
+       {
+               pool_remove_portal();
+               session_context->pending_portal = NULL;
+       }
+       else
+       {
+               pool_error("CloseComplete: pending object not found");
+               return POOL_END;
+       }
+
+       return SimpleForwardToFrontend('3', frontend, backend);
+}
+
+POOL_STATUS CommandComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
+{
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("BindComplete: cannot get session context");
+               return POOL_END;
+       }
+
+       if (session_context->query_context != NULL)
+       {
+               Node *node = session_context->query_context->parse_tree;
+
+               if (IsA(node, PrepareStmt))
+               {
+                       pool_add_prepared_statement();
+                       session_context->pending_pstmt = NULL;
+               }
+               else if (IsA(node, DeallocateStmt))
+               {
+                       char *name;
+                       
+                       name = ((DeallocateStmt *)node)->name;
+                       if (name == NULL)
+                               pool_clear_prepared_statement_list();
+                       else
+                               pool_remove_prepared_statement();
+                       session_context->pending_pstmt = NULL;
+               }
+               else if (IsA(node, DiscardStmt))
+               {
+                       DiscardStmt *stmt = (DiscardStmt *)node;
+                       if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
+                       {
+                               pool_remove_pending_objects();
+                               pool_clear_prepared_statement_list();
+                       }
+               }
+               else if (IsA(node, TransactionStmt))
+               {
+                       TransactionStmt *stmt = (TransactionStmt *) node;
+
+                       if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
+                       {
+                               int i;
+
+                               for (i = 0; i < NUM_BACKENDS; i++)
+                               {
+                                       if (!VALID_BACKEND(i))
+                                               continue;
+
+                                       TSTATE(backend, i) = 'T';
+                               }
+                       }
+               }
+       }
+
+       return SimpleForwardToFrontend('C', frontend, backend);
+}
+
+POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend,
+                                                  POOL_CONNECTION_POOL *backend)
+{
+//     int i;
+       int ret;
+//     int res1;
+//     int status;
+//     char *p1;
+//     char kind1;
+
+       /* An error occurred with PREPARE or DEALLOCATE command.
+        * Free pending portal object.
+        */
+       pool_remove_pending_objects();
+
+       ret = SimpleForwardToFrontend('E', frontend, backend);
+       if (ret != POOL_CONTINUE)
+               return ret;
+
+       if (select_in_transaction)
+       {
+               int i;
+
+               in_load_balance = 0;
+               REPLICATION = 1;
+               for (i = 0; i < NUM_BACKENDS; i++)
+               {
+                       if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
+                       {
+                               /*
+                                * We must abort transaction to sync transaction state.
+                                * If the error was caused by an Execute message,
+                                * we must send invalid Execute message to abort
+                                * transaction.
+                                *
+                                * Because extended query protocol ignores all
+                                * messages before receiving Sync message inside error state.
+                                */
+                               if (execute_select)
+                                       do_error_execute_command(backend, i, PROTO_MAJOR_V3);
+                               else
+                                       do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
+                       }
+               }
+               select_in_transaction = 0;
+               execute_select = 0;
+       }
+
+#ifdef NOT_USED
+       for (i = 0;i < NUM_BACKENDS; i++)
+       {
+               if (VALID_BACKEND(i))
+               {
+                       POOL_CONNECTION *cp = CONNECTION(backend, i);
+
+                       /* We need to send "sync" message to backend in extend mode
+                        * so that it accepts next command.
+                        * Note that this may be overkill since client may send
+                        * it by itself. Moreover we do not need it in non-extend mode.
+                        * At this point we regard it is not harmful since error response
+                        * will not be sent too frequently.
+                        */
+                       pool_write(cp, "S", 1);
+                       res1 = htonl(4);
+                       if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
                        {
-                               pool_set_writing_transaction();
+                               return POOL_END;
                        }
                }
-               pool_unset_query_in_progress();
        }
 
-       pool_query_context_destroy(pool_get_session_context()->query_context);
+       while ((ret = read_kind_from_backend(frontend, backend, &kind1)) == POOL_CONTINUE)
+       {
+               if (kind1 == 'Z') /* ReadyForQuery? */
+                       break;
 
-       sp = MASTER_CONNECTION(backend)->sp;
-       if (MASTER(backend)->tstate == 'T')
-               snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
-                                sp->user, sp->database, remote_ps_data);
-       else
-               snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
-                                sp->user, sp->database, remote_ps_data);
-       set_ps_display(psbuf, false);
+               ret = SimpleForwardToFrontend(kind1, frontend, backend);
+               if (ret != POOL_CONTINUE)
+                       return ret;
+               pool_flush(frontend);
+       }
+
+       if (ret != POOL_CONTINUE)
+               return ret;
+
+       for (i = 0; i < NUM_BACKENDS; i++)
+       {
+               if (VALID_BACKEND(i))
+               {
+                       status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1));
+                       if (status < 0)
+                       {
+                               pool_error("SimpleForwardToFrontend: error while reading message length");
+                               return POOL_END;
+                       }
+                       res1 = ntohl(res1) - sizeof(res1);
+                       p1 = pool_read2(CONNECTION(backend, i), res1);
+                       if (p1 == NULL)
+                               return POOL_END;
+               }
+       }
+#endif
 
        return POOL_CONTINUE;
 }
 
-
 POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
                                                                POOL_CONNECTION_POOL *backend)
 {
@@ -1354,12 +1792,23 @@ POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
 }
 
 POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
-                                                                                  POOL_CONNECTION_POOL *backend)
+                                                                       POOL_CONNECTION_POOL *backend)
 {
        char fkind;
-       char kind;
+//     char kind;
+       char *contents = NULL;
        POOL_STATUS status;
-       int i;
+//     int i;
+       int len;
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("Parse: cannot get session context");
+               return POOL_END;
+       }
 
        if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
                return POOL_CONTINUE;
@@ -1370,8 +1819,29 @@ POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                return POOL_END;
        }
 
-       pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
+       pool_debug("ProcessFrontendResponse: kind from frontend %c(%02x)", fkind, fkind);
+
+       if (MAJOR(backend) == PROTO_MAJOR_V3)
+       {
+               if (pool_read(frontend, &len, sizeof(len)) < 0)
+                       return POOL_END;
+               len = ntohl(len) - 4;
+               if (len > 0)
+                       contents = pool_read2(frontend, len);
+       }
+       else
+       {
+               if (fkind != 'F')
+                       contents = pool_read_string(frontend, &len, 0);
+       }
+
+       if (len > 0 && contents == NULL)
+               return POOL_END;
+
+       if (fkind != 'S' && pool_is_ignore_till_sync())
+               return POOL_CONTINUE;
 
+#ifdef NOT_USED
        /*
         * If we have received BEGIN in extended protocol before, we need
         * to send a sync message to know the transaction stare.
@@ -1389,108 +1859,271 @@ POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
                        return POOL_END;
        }
+#endif
+
+       pool_unset_doing_extended_query_message();
 
        switch (fkind)
        {
 
-               case 'X':  /* Terminate message*/
-                       if (MAJOR(backend) == PROTO_MAJOR_V3)
-                       {
-                               int len;
-                               pool_read(frontend, &len, sizeof(len));
-                       }
+               case 'X':       /* Terminate */
                        return POOL_END;
 
-               case 'Q':  /* Query message*/
+               case 'Q':       /* Query */
                        allow_close_transaction = 1;
-                       status = SimpleQuery(frontend, backend, NULL);
+                       status = SimpleQuery(frontend, backend, len, contents);
                        break;
 
-               case 'E':  /* Execute message */
+               case 'E':       /* Execute */
                        allow_close_transaction = 1;
-                       status = Execute(frontend, backend);
+                       pool_set_doing_extended_query_message();
+                       if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+                               pool_set_query_in_progress();
+                       status = Execute(frontend, backend, len, contents);
                        break;
 
-               case 'P':  /* Parse message */
+               case 'P':       /* Parse */
                        allow_close_transaction = 0;
+                       pool_set_doing_extended_query_message();
+                       if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+                               pool_set_query_in_progress();
+                       status = Parse(frontend, backend, len, contents);
+                       break;
 
-                       if (MASTER_SLAVE &&
-                               (TSTATE(backend, MASTER_NODE_ID) != 'I' || receive_extended_begin))
-                       {
-                               pool_debug("kind: %c master_slave_dml enabled", fkind);
-                               master_slave_was_enabled = 1;
-                               MASTER_SLAVE = 0;
-                               master_slave_dml = 1;
-                       }
+               case 'B':       /* Bind */
+                       pool_set_doing_extended_query_message();
+                       if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+                               pool_set_query_in_progress();
+                       status = Bind(frontend, backend, len, contents);
+                       break;
+
+               case 'C':       /* Close */
+                       if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+                               pool_set_query_in_progress();
+                       status = Close(frontend, backend, len, contents);
+                       break;
 
-                       status = Parse(frontend, backend);
+               case 'D':       /* Describe */
+                       pool_set_doing_extended_query_message();
+                       if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
+                               pool_set_query_in_progress();
+                       status = Describe(frontend, backend, len, contents);
                        break;
 
-               case 'S':  /* Sync message */
+               case 'S':  /* Sync */
+                       pool_set_doing_extended_query_message();
                        receive_extended_begin = 0;
-                       /* fall through */
+                       if (pool_is_ignore_till_sync())
+                               pool_unset_ignore_till_sync();
+                       if (!pool_is_query_in_progress())
+                               pool_set_query_in_progress();
+                       status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+                       break;
 
-               default:
-                       if ((MAJOR(backend) == PROTO_MAJOR_V3) &&
-                           (fkind == 'S' || fkind == 'H' || fkind == 'D' || fkind == 'f'||
-                                fkind == 'C' || fkind == 'B' || fkind == 'F' || fkind == 'd' || fkind == 'c'))
+               case 'F':       /* FunctionCall */
+                       if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
-                               /* Function call? */
-                               if (fkind == 'F' && MASTER_SLAVE)
-                               {
-                                       /*
-                                        * Send to primary/master node only.
-                                        * For this we treat function call as if INSERT.
-                                        */
-                                       POOL_QUERY_CONTEXT *query_context;
-                                       char *query = "INSERT INTO foo VALUES(1)";
-                                       Node *node;
-                                       List *parse_tree_list;
-
-                                       /* Create query context */
-                                       query_context = pool_init_query_context();
-                                       if (!query_context)
-                                       {
-                                               pool_error("ProcessFrontendResponse: pool_init_query_context failed");
-                                               return POOL_END;
-                                       }
-                                       parse_tree_list = raw_parser(query);
-                                       node = (Node *) lfirst(list_head(parse_tree_list));
-                                       pool_start_query(query_context, query, node);
-                                       pool_where_to_send(query_context, query_context->original_query,
-                                                                          query_context->parse_tree);
-                               }
-
-#ifdef NOT_USED
-                               if (MASTER_SLAVE &&
-                                       (TSTATE(backend, MASTER_NODE_ID) != 'I' || receive_extended_begin))
-                               {
-                                       pool_debug("kind: %c master_slave_dml enabled", fkind);
-                                       master_slave_was_enabled = 1;
-                                       MASTER_SLAVE = 0;
-                                       master_slave_dml = 1;
-                               }
-#endif
-       
-                               status = SimpleForwardToBackend(fkind, frontend, backend);
-                               for (i=0;i<NUM_BACKENDS;i++)
-                               {
-                                       if (VALID_BACKEND(i))
-                                       {
-                                               if (pool_flush(CONNECTION(backend, i)))
-                                                       status = POOL_ERROR;
-                                       }
-                               }
+                               status = FunctionCall3(frontend, backend, len, contents);
+                               break;
                        }
-                       else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F')
+                       else
+                       {
                                status = FunctionCall(frontend, backend);
+                               break;
+                       }
 
-                       else
+               case 'c':       /* CopyDone */
+               case 'd':       /* CopyData */
+               case 'f':       /* CopyFail */
+               case 'H':       /* Flush */
+                       if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
-                               pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
-                               status = POOL_ERROR;
+                               status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+                               break;
                        }
-                       break;
+
+               default:
+                       pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
+                       status = POOL_ERROR;
+       }
+
+       if (status != POOL_CONTINUE)
+               status = POOL_ERROR;
+       return status;
+}
+
+POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
+                                                                  POOL_CONNECTION_POOL *backend, int *state)
+{
+       int status;
+       char kind;
+       short num_fields = 0;
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context();
+       if (!session_context)
+       {
+               pool_error("ProcessBackendResponse: cannot get session context");
+               return POOL_END;
+       }
+
+       if (pool_is_ignore_till_sync())
+       {
+               return POOL_CONTINUE;
+       }
+
+       if (pool_is_pool_status_stmt())
+       {
+               pool_unset_pool_status_stmt();
+               return POOL_CONTINUE;
+       }
+
+    status = read_kind_from_backend(frontend, backend, &kind);
+    if (status != POOL_CONTINUE)
+        return status;
+
+       /*
+        * Sanity check
+        */
+       if (kind == 0)
+       {
+               pool_error("ProcessBackendResponse: kind is 0!");
+               return POOL_ERROR;
+       }
+
+       pool_debug("ProcessBackendResponse: kind from backend: %c", kind);
+
+       if (MAJOR(backend) == PROTO_MAJOR_V3)
+       {
+               switch (kind)
+               {
+                       case 'G':       /* CopyInResponse */
+                               status = CopyInResponse(frontend, backend);
+                               break;
+
+                       case 'S':       /* ParameterStatus */
+                               status = ParameterStatus(frontend, backend);
+                               break;
+
+                       case 'Z':       /* ReadyForQuery */
+                               status = ReadyForQuery(frontend, backend, 1);
+                               break;
+
+                       case '1':       /* ParseComplete */
+                               status = ParseComplete(frontend, backend);
+                               pool_unset_query_in_progress();
+                               break;
+
+                       case '2':       /* BindComplete */
+                               status = BindComplete(frontend, backend);
+                               pool_unset_query_in_progress();
+                               break;
+
+                       case '3':       /* CloseComplete */
+                               status = CloseComplete(frontend, backend);
+                               pool_unset_query_in_progress();
+                               break;
+
+                       case 'E':       /* ErrorResponse */
+                               status = ErrorResponse3(frontend, backend);
+                               if (pool_is_doing_extended_query_message())
+                                       pool_set_ignore_till_sync();
+                               pool_unset_query_in_progress();
+                               break;
+
+                       case 'C':       /* CommandComplete */                           
+                               status = CommandComplete(frontend, backend);
+                               if (pool_is_doing_extended_query_message())
+                                       pool_unset_query_in_progress();
+                               break;
+
+                       case 'T':       /* RowDescription */
+                               status = SimpleForwardToFrontend(kind, frontend, backend);
+                               if (pool_is_doing_extended_query_message())
+                                       pool_unset_query_in_progress();
+                               break;
+
+                       case 'n':       /* NoData */
+                               status = SimpleForwardToFrontend(kind, frontend, backend);
+                               pool_unset_query_in_progress();
+                               break;
+
+                       default:
+                               status = SimpleForwardToFrontend(kind, frontend, backend);
+                               if (pool_flush(frontend))
+                                       return POOL_END;
+                               break;
+               }
+       }
+       else
+       {
+               switch (kind)
+               {
+                       case 'A':       /* NotificationResponse */
+                               status = NotificationResponse(frontend, backend);
+                               break;
+
+                       case 'B':       /* BinaryRow */
+                               status = BinaryRow(frontend, backend, num_fields);
+                               break;
+
+                       case 'C':       /* CompletedResponse */
+                               status = CompletedResponse(frontend, backend);
+                               break;
+
+                       case 'D':       /* AsciiRow */
+                               status = AsciiRow(frontend, backend, num_fields);
+                               break;
+
+                       case 'E':       /* ErrorResponse */
+                               status = ErrorResponse(frontend, backend);
+                               break;
+
+                       case 'G':       /* CopyInResponse */
+                               status = CopyInResponse(frontend, backend);
+                               break;
+
+                       case 'H':       /* CopyOutResponse */
+                               status = CopyOutResponse(frontend, backend);
+                               break;
+
+                       case 'I':       /* EmptyQueryResponse */
+                               status = EmptyQueryResponse(frontend, backend);
+                               break;
+
+                       case 'N':       /* NoticeResponse */
+                               status = NoticeResponse(frontend, backend);
+                               break;
+
+                       case 'P':       /* CursorResponse */
+                               status = CursorResponse(frontend, backend);
+                               break;
+
+                       case 'T':       /* RowDescription */
+                               status = RowDescription(frontend, backend, &num_fields);
+                               break;
+
+                       case 'V':       /* FunctionResultResponse and FunctionVoidResponse */
+                               status = FunctionResultResponse(frontend, backend);
+                               break;
+
+                       case 'Z':       /* ReadyForQuery */
+                               status = ReadyForQuery(frontend, backend, 1);
+                               break;
+
+                       default:
+                               pool_error("Unknown message type %c(%02x)", kind, kind);
+                               exit(1);
+               }
+       }
+
+       /* Do we receive ready for query while processing reset
+        * request?
+        */
+       if (kind == 'Z' && frontend->no_forward && *state == 1)
+       {
+               *state = 0;
        }
 
        if (status != POOL_CONTINUE)
@@ -1645,7 +2278,23 @@ POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
                                }
                                else
                                {
-                                       SimpleForwardToBackend(kind, frontend, backend);
+                                       char *contents = NULL;
+
+                                       if (pool_read(frontend, &kind, 1) < 0)
+                                       {
+                                               pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited");
+                                               return POOL_END;
+                                       }
+
+                                       pool_debug("read kind from frontend %c(%02x)", kind, kind);
+
+                                       if (pool_read(frontend, &len, sizeof(len)) < 0)
+                                               return POOL_END;
+                                       len = ntohl(len) - 4;
+                                       if (len > 0)
+                                               contents = pool_read2(frontend, len);
+
+                                       SimpleForwardToBackend(kind, frontend, backend, len, contents);
                                }
 
                                /* CopyData? */
@@ -1960,3 +2609,89 @@ void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query,
                                 prefix, node_id, ntohl(slot->pid), query, message);
        }
 }
+
+static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
+                                                                        POOL_CONNECTION_POOL *backend,
+                                                                        PreparedStatement *ps)
+{
+       int i;
+       int len = ps->parse_len;
+       char kind = '\0';
+       char *contents = ps->parse_contents;
+       bool parse_was_sent = false;
+       bool backup[MAX_NUM_BACKENDS];
+       POOL_STATUS status;
+       POOL_QUERY_CONTEXT *qc = ps->qctxt;
+
+       memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
+
+       /* expect to send to master node only */
+       for (i = 0; i < NUM_BACKENDS; i++)
+       {
+               if (qc->where_to_send[i] && qc->query_state[i] < 1)     /* 1: parse done */
+               {
+                       pool_debug("parse_before_bind: waiting for backend %d completing parse", i);
+                       if (pool_send_and_wait(qc, contents, len, 1, i, "P") != POOL_CONTINUE)
+                               return POOL_END;
+               }
+               else
+               {
+                       qc->where_to_send[i] = 0;
+               }
+       }
+
+       for (i = 0; i < NUM_BACKENDS; i++)
+       {
+               if (qc->where_to_send[i])
+               {
+                       parse_was_sent = true;
+                       break;
+               }
+       }
+       
+       if (parse_was_sent)
+       {
+               while (kind != '1')
+               {
+                       status = read_kind_from_backend(frontend, backend, &kind);
+                       if (status != POOL_CONTINUE)
+                       {
+                               memcpy(qc->where_to_send, backup, sizeof(backup));
+                               return status;
+                       }
+                       status = pool_discard_packet_contents(backend);
+                       if (status != POOL_CONTINUE)
+                       {
+                               memcpy(qc->where_to_send, backup, sizeof(backup));
+                               return status;
+                       }
+               }
+       }
+
+       memcpy(qc->where_to_send, backup, sizeof(backup));
+       return POOL_CONTINUE;
+}
+
+static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context)
+{
+       char *name;
+       PreparedStatement *ps;
+       POOL_QUERY_CONTEXT *qc;
+
+       if (IsA(query_context->parse_tree, DeallocateStmt)) {
+               name = ((DeallocateStmt *)query_context->parse_tree)->name;
+               if (name != NULL)       /* NULL is "DEALLOCATE ALL" */
+               {
+                       ps = pool_get_prepared_statement_by_pstmt_name(name);
+                       if (ps != NULL)
+                       {
+                               qc = ps->qctxt;
+                               if (qc != NULL)
+                               {
+                                       memcpy(query_context->where_to_send, qc->where_to_send,
+                                                  sizeof(query_context->where_to_send));
+                               }
+                       }
+               }
+       }
+}
index 0c189adb201f02e4931aa16190bf374f09087508..6f75d7022f6b2922da98b60b6ecb0090dd0b9163 100644 (file)
@@ -62,17 +62,51 @@ extern char *parsed_query;
  * modules defined in pool_proto_modules.c
  */
 extern POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend, 
-                                                POOL_CONNECTION_POOL *backend, char *query);
+                                                          POOL_CONNECTION_POOL *backend,
+                                                          int len, char *contents);
 
 extern POOL_STATUS Execute(POOL_CONNECTION *frontend, 
-                                                  POOL_CONNECTION_POOL *backend);
+                                                  POOL_CONNECTION_POOL *backend,
+                                                  int len, char *contents);
 
 extern POOL_STATUS Parse(POOL_CONNECTION *frontend,
-                                                POOL_CONNECTION_POOL *backend);
+                                                POOL_CONNECTION_POOL *backend,
+                                                int len, char *contents);
+
+extern POOL_STATUS Bind(POOL_CONNECTION *frontend,
+                                               POOL_CONNECTION_POOL *backend,
+                                               int len, char *contents);
+
+extern POOL_STATUS Describe(POOL_CONNECTION *frontend,
+                                                       POOL_CONNECTION_POOL *backend,
+                                                       int len, char *contents);
+
+extern POOL_STATUS Close(POOL_CONNECTION *frontend,
+                                                POOL_CONNECTION_POOL *backend,
+                                                int len, char *contents);
+
+extern POOL_STATUS FunctionCall3(POOL_CONNECTION *frontend,
+                                                                POOL_CONNECTION_POOL *backend,
+                                                                int len, char *contents);
 
 extern POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, 
                                                                 POOL_CONNECTION_POOL *backend, int send_ready);
 
+extern POOL_STATUS ParseComplete(POOL_CONNECTION *frontend,
+                                                                POOL_CONNECTION_POOL *backend);
+
+extern POOL_STATUS BindComplete(POOL_CONNECTION *frontend,
+                                                               POOL_CONNECTION_POOL *backend);
+
+extern POOL_STATUS CloseComplete(POOL_CONNECTION *frontend,
+                                                                POOL_CONNECTION_POOL *backend);
+
+extern POOL_STATUS CommandComplete(POOL_CONNECTION *frontend,
+                                                                  POOL_CONNECTION_POOL *backend);
+
+extern POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend,
+                                                                 POOL_CONNECTION_POOL *backend);
+
 extern POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, 
                                                                  POOL_CONNECTION_POOL *backend);
 
@@ -88,6 +122,9 @@ extern POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
 extern POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, 
                                                                                   POOL_CONNECTION_POOL *backend);
 
+extern POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend, 
+                                                                                 POOL_CONNECTION_POOL *backend, int *state);
+
 /*
  * modules defined in pool_proto2.c
  */