int len;
int sendlen;
int i;
+ char kind0 = 0;
+ char kind;
+ int status;
/* read Execute packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
sendlen = htonl(4);
if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
{
- return POOL_END;
+ return POOL_ERROR;
}
- if (!REPLICATION)
- break;
+ status = pool_read(cp, &kind, sizeof(kind));
+ if (status < 0)
+ {
+ pool_debug("Execute: pool_read returns error");
+ return POOL_ERROR;
+ }
- /*
- * in "strict mode" we need to wait for backend completing the query.
- */
- if (pool_config.replication_strict)
+ if (i == 0)
+ kind0 = kind;
+ else
{
- pool_debug("waiting for backend[%d] completing the query", i);
- if (synchronize(cp))
- return POOL_END;
+ if (kind != kind0)
+ {
+ pool_error("Execute: kind does not match kind0:%c kind:%c", kind0, kind);
+ return POOL_ERROR;
+ }
}
+ if (!REPLICATION)
+ break;
}
+ if (kind == 0)
+ {
+ pool_error("Execute: kind is 0!");
+ return POOL_ERROR;
+ }
+
+ SimpleForwardToFrontend(kind, frontend, backend);
+ status = pool_flush(frontend);
+ if (status != POOL_CONTINUE)
+ return POOL_ERROR;
return POOL_CONTINUE;
}
if (pool_read(frontend, &fkind, 1) < 0)
{
pool_error("ProcessFrontendResponse: failed to read kind from frontend. fronend abnormally exited");
- return POOL_END;
+ return POOL_ERROR;
}
pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
int len;
pool_read(frontend, &len, sizeof(len));
}
- status = POOL_END;
- break;
+ return POOL_END;
case 'Q':
status = Query(frontend, backend, NULL);
status = Sync(frontend, backend);
break;
*/
-
case 'E':
status = Execute(frontend, backend);
break;
break;
}
+ if (status != POOL_CONTINUE)
+ status = POOL_ERROR;
return status;
}
pool_write(frontend, &kind, 1);
+ /*
+ * Check if packet kind == 'C'(Command complete), '1'(Parse
+ * complete), '3'(Close complete). If so, then register or unregister
+ * pending prepared statement.
+ */
if ((kind == 'C' || kind == '1' || kind == '3') &&
pending_function && pending_prepared_name)
{
pending_function(&prepared_list, pending_prepared_name);
}
+
free(pending_prepared_name);
pending_function = NULL;
pending_prepared_name = NULL;
if (pool_read2(SECONDARY(backend), len) == NULL)
return POOL_END;
+ if (kind == 'E') /* error response? */
+ {
+ int i, k;
+ int res1, res2;
+ char *p1;
+
+ for (i = 0;i < backend->num;i++)
+ {
+ POOL_CONNECTION *cp = backend->slots[i]->con;
+
+ /* We need to send "sync" message to backend in extend mode
+ * so that it accepts next command.
+ * Note that this may be overkill since client may send
+ * it by itself. Moreover we do not need it in non-extend mode.
+ * At this point we regard it harmfull since error resonse
+ * will not be sent so frequently.
+ */
+ pool_write(cp, "S", 1);
+ res1 = htonl(4);
+ if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
+ {
+ return POOL_END;
+ }
+
+ if (!REPLICATION)
+ break;
+ }
+
+ k = pool_read_kind(backend);
+ if (k < 0)
+ {
+ pool_error("SimpleForwardToBackend: pool_read_kind error");
+ return POOL_ERROR;
+ }
+
+ status = pool_read(MASTER(backend), &res1, sizeof(res1));
+ if (status < 0)
+ {
+ pool_error("SimpleForwardToFrontend: error while reading message length");
+ return POOL_END;
+ }
+ res1 = ntohl(res1) - sizeof(res1);
+ p1 = pool_read2(MASTER(backend), res1);
+ if (p1 == NULL)
+ return POOL_END;
+
+ if (REPLICATION)
+ {
+ status = pool_read(SECONDARY(backend), &res2, sizeof(res2));
+ if (status < 0)
+ {
+ pool_error("SimpleForwardToFrontend: error while reading message length from secondary backend");
+ return POOL_END;
+ }
+
+ res2 = ntohl(res2) - sizeof(res2);
+ p1 = pool_read2(SECONDARY(backend), res2);
+ if (p1 == NULL)
+ return POOL_END;
+
+ if (res1 != res2)
+ {
+ pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) secondary(%d) kind:(%c)",
+ ntohl(res1), ntohl(res2), k);
+ }
+ }
+ }
+
return pool_write(frontend, p, len);
}
if (p == NULL)
return POOL_END;
- if (pool_write(MASTER(backend), p, len))
+ if (pool_write_and_flush(MASTER(backend), p, len))
return POOL_END;
if (REPLICATION)
- if (pool_write(SECONDARY(backend), p, len))
+ if (pool_write_and_flush(SECONDARY(backend), p, len))
return POOL_END;
if (kind == 'P' && *p)
pending_prepared_name = name;
}
+ if (kind == 'P' || kind == 'B' || kind == 'D')
+ {
+ int i;
+
+ for (i = 0;i < backend->num;i++)
+ {
+ POOL_CONNECTION *cp = backend->slots[i]->con;
+
+ /*
+ * send "Flush" message so that backend notices us
+ * the completion of the command
+ */
+ pool_write(cp, "H", 1);
+ sendlen = htonl(4);
+ if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
+ {
+ return POOL_END;
+ }
+
+ if (!REPLICATION)
+ break;
+ }
+
+ kind = pool_read_kind(backend);
+ if (kind <= 0)
+ {
+ pool_error("SimpleForwardToBackend: pool_read_kind error");
+ return POOL_ERROR;
+ }
+ SimpleForwardToFrontend(kind, frontend, backend);
+ return pool_flush(frontend);
+ }
+
return POOL_CONTINUE;
}
if (i == p->cnt)
return;
- else if (i == p->cnt - 1)
+
+ free(p->stmt_list[i]);
+ if (i == p->cnt - 1)
p->cnt--;
else
{