From: Yoshiyuki Asaba Date: Wed, 11 Jul 2007 01:40:17 +0000 (+0000) Subject: Fixed kind mismatch error when deadlock error occured. X-Git-Tag: V3_4~26 X-Git-Url: http://git.postgresql.org/gitweb/static/connections.php?a=commitdiff_plain;h=9c3ac27bb2902830b5cc8b079faedbeb3badccee;p=pgpool1.git Fixed kind mismatch error when deadlock error occured. The problem is the following senario. A: BEGIN: B: BEGIN; A: LOCK TABLE t1 IN SHARE ROW EXCLUSIVE MODE; B: LOCK TABLE t2 IN SHARE ROW EXCLUSIVE MODE; A: LOCK TABLE t2 IN SHARE ROW EXCLUSIVE MODE; B: LOCK TABLE t1 IN SHARE ROW EXCLUSIVE MODE; Transaction "A" aborts on master node, but it completes on another nodes. It causes wrong failover. So pgpool checks deadlock error(code == '40P01') and sends error query to another nodes. --- diff --git a/pool.h b/pool.h index e83b09f..1a0d60b 100644 --- a/pool.h +++ b/pool.h @@ -301,6 +301,7 @@ extern int pool_flush(POOL_CONNECTION *cp); extern int pool_flush_it(POOL_CONNECTION *cp); extern int pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len); extern char *pool_read_string(POOL_CONNECTION *cp, int *len, int line); +extern int pool_unread(POOL_CONNECTION *cp, void *data, int len); extern int pool_do_auth(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); extern int pool_do_reauth(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *cp); diff --git a/pool_process_query.c b/pool_process_query.c index a8f4c8d..e7b11fc 100644 --- a/pool_process_query.c +++ b/pool_process_query.c @@ -41,6 +41,9 @@ #define INIT_STATEMENT_LIST_SIZE 8 +#define DEADLOCK_ERROR_CODE "40P01" +#define POOL_ERROR_QUERY "send invalid query from pgpool to abort transaction" + typedef struct { char *statement_name; char *portal_name; @@ -155,6 +158,7 @@ static PreparedStatement *unnamed_statement = NULL; static PreparedStatement *unnamed_portal = NULL; static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */ static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */ +static int detect_deadlock_error(POOL_CONNECTION *master, int major); POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, @@ -519,6 +523,7 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend, int len; static char *sq = "show pool_status"; POOL_STATUS status; + int deadlock_detected = 0; if (query == NULL) /* need to read query from frontend? */ { @@ -747,6 +752,21 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend, pool_debug("waiting for master completing the query"); if (synchronize(MASTER(backend))) return POOL_END; + + /* + * We must check deadlock error because a aborted transaction + * by detecting deadlock isn't same on all nodes. + * If a transaction is aborted on master node, pgpool send a + * error query to another nodes. + */ + deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend)); + if (deadlock_detected < 0) + return POOL_END; + else if (deadlock_detected) + { + string = POOL_ERROR_QUERY; + len = strlen(string) + 1; + } } #define SEQUENCE_DEBUG @@ -792,6 +812,7 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, char kind; int status; PreparedStatement *stmt; + int deadlock_detected = 0; /* read Execute packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) @@ -834,18 +855,29 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, { POOL_CONNECTION *cp = backend->slots[i]->con; - /* forward the query to the backend */ - pool_write(cp, "E", 1); - sendlen = htonl(len + 4); - pool_write(cp, &sendlen, sizeof(sendlen)); - pool_write(cp, string, len); + if (deadlock_detected) + { + pool_write(cp, "Q", 1); + len = strlen(POOL_ERROR_QUERY) + 1; + sendlen = htonl(len + 4); + pool_write(cp, &sendlen, sizeof(sendlen)); + pool_write(cp, POOL_ERROR_QUERY, len); + } + else + { + /* forward the query to the backend */ + pool_write(cp, "E", 1); + sendlen = htonl(len + 4); + pool_write(cp, &sendlen, sizeof(sendlen)); + pool_write(cp, string, len); - /* - * send "Flush" message so that backend notices us - * the completion of the command - */ - pool_write(cp, "H", 1); - sendlen = htonl(4); + /* + * send "Flush" message so that backend notices us + * the completion of the command + */ + pool_write(cp, "H", 1); + sendlen = htonl(4); + } if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0) { return POOL_ERROR; @@ -858,6 +890,16 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, pool_debug("waiting for backend completing the query"); if (synchronize(cp)) return POOL_END; + + /* + * We must check deadlock error because a aborted transaction + * by detecting deadlock isn't same on all nodes. + * If a transaction is aborted on master node, pgpool send a + * error query to another nodes. + */ + deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend)); + if (deadlock_detected < 0) + return POOL_END; } } @@ -3821,3 +3863,91 @@ static POOL_STATUS error_kind_mismatch(POOL_CONNECTION *frontend, POOL_CONNECTIO else return POOL_ERROR; } + +static int detect_deadlock_error(POOL_CONNECTION *master, int major) +{ + int deadlock = 0; + char kind; + int readlen = 0, len; + char *buf; + char *p, *str; + + if ((buf = malloc(1024)) == NULL) + { + pool_error("detect_deadlock_error: malloc failed"); + return -1; + } + + if (pool_read(master, &kind, sizeof(kind))) + return POOL_END; + readlen += sizeof(kind); + p = buf; + memcpy(p, &kind, sizeof(kind)); + p += sizeof(kind); + + if (kind == 'E') /* deadlock error? */ + { + /* read actual query */ + if (major == PROTO_MAJOR_V3) + { + char *error_code; + + if (pool_read(master, &len, sizeof(len)) < 0) + return POOL_END; + readlen += sizeof(len); + memcpy(p, &len, sizeof(len)); + p += sizeof(len); + + len = ntohl(len) - 4; + str = malloc(len); + pool_read(master, str, len); + readlen += len; + if (readlen > 1024) + { + buf = realloc(buf, readlen); + if (buf == NULL) + { + pool_error("detect_deadlock_error: malloc failed"); + return -1; + } + } + memcpy(p, str, len); + + error_code = str; + while (*error_code) + { + if (*error_code == 'C') + { + if (strcmp(error_code+1, DEADLOCK_ERROR_CODE) == 0) /* deadlock error */ + { + pool_debug("SimpleQuery: receive deadlock error from master node."); + deadlock = 1; + } + break; + } + else + error_code = error_code + strlen(error_code) + 1; + } + free(str); + } + else + { + str = pool_read_string(master, &len, 0); + readlen += len; + if (readlen > 1024) + { + buf = realloc(buf, readlen); + if (buf == NULL) + { + pool_error("detect_deadlock_error: malloc failed"); + return -1; + } + } + memcpy(p, str, len); + } + } + if (pool_unread(master, buf, readlen) != 0) + deadlock = -1; + free(buf); + return deadlock; +} diff --git a/pool_stream.c b/pool_stream.c index 6634091..04f3d11 100644 --- a/pool_stream.c +++ b/pool_stream.c @@ -764,3 +764,31 @@ static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len) return consume_size; } + +/* + * pool_unread: Put back data to input buffer + */ +int pool_unread(POOL_CONNECTION *cp, void *data, int len) +{ + void *p = cp->hp; + int n = cp->len + len; + int realloc_size; + + if (cp->bufsz < n) + { + realloc_size = (n/READBUFSZ+1)*READBUFSZ; + p = realloc(cp->hp, realloc_size); + if (p == NULL) + { + pool_error("pool_unread: realloc failed"); + return -1; + } + cp->hp = p; + } + if (cp->len != 0) + memmove(p + len, cp->hp + cp->po, cp->len); + memmove(p, data, len); + cp->len = n; + cp->po = 0; + return 0; +}