Fix bugs in extend mode
authorTatsuo Ishii <ishii at sraoss.co.jp>
Mon, 17 Apr 2006 04:05:09 +0000 (04:05 +0000)
committerTatsuo Ishii <ishii at sraoss.co.jp>
Mon, 17 Apr 2006 04:05:09 +0000 (04:05 +0000)
pool_process_query.c

index 71c5b41f15029b2414b5a3477ab04ebf9385f09c..20352f88034219ff3ac88b819259158e836ce569 100644 (file)
@@ -739,6 +739,9 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
        int len;
        int sendlen;
        int i;
+       char kind0 = 0;
+       char kind;
+       int status;
 
        /* read Execute packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
@@ -767,24 +770,41 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
                sendlen = htonl(4);
                if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
                {
-                       return POOL_END;
+                       return POOL_ERROR;
                }
 
-               if (!REPLICATION)
-                       break;
+               status = pool_read(cp, &kind, sizeof(kind));
+               if (status < 0)
+               {
+                       pool_debug("Execute: pool_read returns error");
+                       return POOL_ERROR;
+               }
 
-               /*
-                * in "strict mode" we need to wait for backend completing the query.
-                */
-               if (pool_config.replication_strict)
+               if (i == 0)
+                       kind0 = kind;
+               else
                {
-                       pool_debug("waiting for backend[%d] completing the query", i);
-                       if (synchronize(cp))
-                               return POOL_END;
+                       if (kind != kind0)
+                       {
+                               pool_error("Execute: kind does not match kind0:%c kind:%c", kind0, kind);
+                               return POOL_ERROR;
+                       }
                }
 
+               if (!REPLICATION)
+                       break;
        }
 
+       if (kind == 0)
+       {
+               pool_error("Execute: kind is 0!");
+               return POOL_ERROR;
+       }
+
+       SimpleForwardToFrontend(kind, frontend, backend);
+       status = pool_flush(frontend);
+       if (status != POOL_CONTINUE)
+               return POOL_ERROR;
        return POOL_CONTINUE;
 }
 
@@ -1698,7 +1718,7 @@ static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
        if (pool_read(frontend, &fkind, 1) < 0)
        {
                pool_error("ProcessFrontendResponse: failed to read kind from frontend. fronend abnormally exited");
-               return POOL_END;
+               return POOL_ERROR;
        }
 
        pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
@@ -1711,8 +1731,7 @@ static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                                int len;
                                pool_read(frontend, &len, sizeof(len));
                        }
-                       status = POOL_END;
-                       break;
+                       return POOL_END;
 
                case 'Q':
                        status = Query(frontend, backend, NULL);
@@ -1723,7 +1742,6 @@ static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                        status = Sync(frontend, backend);
                        break;
 */
-
                case 'E':
                        status = Execute(frontend, backend);
                break;
@@ -1755,6 +1773,8 @@ static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                        break;
        }
 
+       if (status != POOL_CONTINUE)
+               status = POOL_ERROR;
        return status;
 }
 
@@ -2282,11 +2302,17 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C
 
        pool_write(frontend, &kind, 1);
 
+       /*
+        * Check if packet kind == 'C'(Command complete), '1'(Parse
+        * complete), '3'(Close complete). If so, then register or unregister
+        * pending prepared statement.
+        */
        if ((kind == 'C' || kind == '1' || kind == '3') &&
                pending_function &&     pending_prepared_name)
        {
                pending_function(&prepared_list, pending_prepared_name);
        }
+
        free(pending_prepared_name);
        pending_function = NULL;
        pending_prepared_name = NULL;
@@ -2329,6 +2355,74 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C
                if (pool_read2(SECONDARY(backend), len) == NULL)
                        return POOL_END;
 
+       if (kind == 'E')                /* error response? */
+       {
+               int i, k;
+               int res1, res2;
+               char *p1;
+
+               for (i = 0;i < backend->num;i++)
+               {
+                       POOL_CONNECTION *cp = backend->slots[i]->con;
+
+                       /* We need to send "sync" message to backend in extend mode
+                        * so that it accepts next command.
+                        * Note that this may be overkill since client may send
+                        * it by itself. Moreover we do not need it in non-extend mode.
+                        * At this point we regard it harmfull since error resonse
+                        * will not be sent so frequently.
+                        */
+                       pool_write(cp, "S", 1);
+                       res1 = htonl(4);
+                       if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
+                       {
+                               return POOL_END;
+                       }
+
+                       if (!REPLICATION)
+                               break;
+               }
+
+               k = pool_read_kind(backend);
+               if (k < 0)
+               {
+                       pool_error("SimpleForwardToBackend: pool_read_kind error");
+                       return POOL_ERROR;
+               }
+
+               status = pool_read(MASTER(backend), &res1, sizeof(res1));
+               if (status < 0)
+               {
+                       pool_error("SimpleForwardToFrontend: error while reading message length");
+                       return POOL_END;
+               }
+               res1 = ntohl(res1) - sizeof(res1);
+               p1 = pool_read2(MASTER(backend), res1);
+               if (p1 == NULL)
+                       return POOL_END;
+       
+               if (REPLICATION)
+               {
+                       status = pool_read(SECONDARY(backend), &res2, sizeof(res2));
+                       if (status < 0)
+                       {
+                               pool_error("SimpleForwardToFrontend: error while reading message length from secondary backend");
+                       return POOL_END;
+                       }
+
+                       res2 = ntohl(res2) - sizeof(res2);
+                       p1 = pool_read2(SECONDARY(backend), res2);
+                       if (p1 == NULL)
+                               return POOL_END;
+
+                       if (res1 != res2)
+                       {
+                               pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) secondary(%d) kind:(%c)",
+                                                  ntohl(res1), ntohl(res2), k);
+                       }
+               }
+       }
+
        return pool_write(frontend, p, len);
 }
 
@@ -2366,10 +2460,10 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
        if (p == NULL)
                return POOL_END;
 
-       if (pool_write(MASTER(backend), p, len))
+       if (pool_write_and_flush(MASTER(backend), p, len))
                return POOL_END;
        if (REPLICATION)
-               if (pool_write(SECONDARY(backend), p, len))
+               if (pool_write_and_flush(SECONDARY(backend), p, len))
                        return POOL_END;
 
        if (kind == 'P' && *p)
@@ -2399,6 +2493,39 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
                pending_prepared_name = name;
        }
 
+       if (kind == 'P' || kind == 'B' || kind == 'D')
+       {
+               int i;
+
+               for (i = 0;i < backend->num;i++)
+               {
+                       POOL_CONNECTION *cp = backend->slots[i]->con;
+
+                       /*
+                        * send "Flush" message so that backend notices us
+                        * the completion of the command
+                        */
+                       pool_write(cp, "H", 1);
+                       sendlen = htonl(4);
+                       if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
+                       {
+                               return POOL_END;
+                       }
+
+                       if (!REPLICATION)
+                               break;
+               }
+
+               kind = pool_read_kind(backend);
+               if (kind <= 0)
+               {
+                       pool_error("SimpleForwardToBackend: pool_read_kind error");
+                       return POOL_ERROR;
+               }
+               SimpleForwardToFrontend(kind, frontend, backend);
+               return pool_flush(frontend);
+       }
+
        return POOL_CONTINUE;
 }
 
@@ -2987,7 +3114,9 @@ static void del_prepared_list(PreparedStatementList *p, char *name)
        
        if (i == p->cnt)
                return;
-       else if (i == p->cnt - 1)
+
+       free(p->stmt_list[i]);
+       if (i == p->cnt - 1)
                p->cnt--;
        else
        {