Fix for bug-545: Quorum lost and not recovered
authorMuhammad Usama <m.usama@gmail.com>
Sat, 28 Sep 2019 20:15:11 +0000 (01:15 +0500)
committerMuhammad Usama <m.usama@gmail.com>
Sat, 28 Sep 2019 20:15:11 +0000 (01:15 +0500)
Master watchdog node was not adding the lost standby node to its list of valid
standby nodes after it is rediscovered by the lifecheck.The fix is to ask the
node to rejoin the master node when it gets rediscovered by the lifecheck. 
As part of this commit, I have also added the watchdog data version and Pgpool-II
version in the watchdog info packet to make the extensions in the watchdog
messages easier in the future.

Thanks to Guille(reporter of this bug), for providing lots of help in testing the fix

src/include/watchdog/watchdog.h
src/main/pgpool_main.c
src/watchdog/watchdog.c
src/watchdog/wd_json_data.c

index f7753b78888b087f4a5c4ee3026a949c5d6c6848..89f323c35b9f07c5b346b0e98946f77672735cf9 100644 (file)
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2017     PgPool Global Development Group
+ * Copyright (c) 2003-2019     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
 #define WD_TIME_DIFF_SEC(a,b) (int)(((a).tv_sec - (b).tv_sec) + \
                                     ((a).tv_usec - (b).tv_usec) / 1000000.0)
 
+/*
+* Data version number of watchdog messages
+* The version number is in major.minor format
+* The major versions are always kept compatible.
+*
+* Increment the minor version whenever a minor change is
+* made to message data, where the older versions can still
+* work even when that change is not present in it.
+*
+* while incrementing the major version would mean that
+* the watchdog node with older major version will not be
+* allowed to join the cluster
+*
+* Since the message data version was not present from the
+* beginning, so the default version is considered to be 1.0
+* meaning if the data version number is not present in the
+* watchdog node info then it will be considered as version 1.0
+*/
+
+#define WD_MESSAGE_DATA_VERSION_MAJOR  "1"
+#define WD_MESSAGE_DATA_VERSION_MINOR  "1"
+#define WD_MESSAGE_DATA_VERSION        WD_MESSAGE_DATA_VERSION_MAJOR "." WD_MESSAGE_DATA_VERSION_MINOR
+#define MAX_VERSION_STR_LEN            10
+
 /*
  * watchdog state
  */
@@ -86,9 +110,9 @@ typedef enum {
 
        WD_EVENT_NODE_CON_LOST,
        WD_EVENT_NODE_CON_FOUND,
-       WD_EVENT_CLUSTER_QUORUM_CHANGED
-
-} WD_EVENTS;
+       WD_EVENT_CLUSTER_QUORUM_CHANGED,
+       WD_EVENT_WD_STATE_REQUIRE_RELOAD
+}                      WD_EVENTS;
 
 typedef struct SocketConnection
 {
@@ -109,6 +133,10 @@ typedef struct WatchdogNode
        struct timeval last_sent_time;                  /* timestamp when last packet
                                                                                         * was sent on the node
                                                                                         */
+       char            pgp_version[MAX_VERSION_STR_LEN];               /* Pgpool-II version */
+       int                     wd_data_major_version;  /* watchdog messaging version major*/
+       int                     wd_data_minor_version;  /* watchdog messaging version minor*/
+
        char nodeName[WD_MAX_HOST_NAMELEN];             /* name of this node */
        char hostname[WD_MAX_HOST_NAMELEN];             /* host name */
        int wd_port;                                                    /* watchdog port */
index bd80f8b146bc1ee58f3fcb32c29c6a013842f196..ec53867e1a44c693f133c2ce2e4e0217216b1914 100644 (file)
@@ -315,7 +315,8 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
                }
 
                ereport (LOG,
-                                (errmsg("watchdog process is initialized")));
+                                (errmsg("watchdog process is initialized"),
+                                errdetail("watchdog messaging data version: %s",WD_MESSAGE_DATA_VERSION)));
                /*
                 * initialize the lifecheck process
                 */
index 4a705e7ff33b1da115e7bbdb97eb437cc8a03013..06dc74b865b7fec1f0e0924b8233b116ed1845a2 100644 (file)
@@ -129,6 +129,7 @@ typedef enum IPC_CMD_PREOCESS_RES
 #define CLUSTER_IAM_NOT_TRUE_MASTER                    'X'
 #define CLUSTER_IAM_RESIGNING_FROM_MASTER      'R'
 #define CLUSTER_NODE_INVALID_VERSION           'V'
+#define CLUSTER_NODE_REQUIRE_TO_RELOAD         'I'
 
 #define WD_MASTER_NODE getMasterWatchdogNode()
 
@@ -198,7 +199,8 @@ char *wd_event_name[] =
        "THIS NODE FOUND",
        "NODE CONNECTION LOST",
        "NODE CONNECTION FOUND",
-       "CLUSTER QUORUM STATUS CHANGED"
+       "CLUSTER QUORUM STATUS CHANGED",
+       "NODE REQUIRE TO RELOAD STATE"
 };
 
 char *wd_state_names[] = {
@@ -1454,7 +1456,10 @@ static int read_sockets(fd_set* rmask,int pending_fds_count)
                                                                close_socket_connection(&wdNode->server_socket);
                                                                strlcpy(wdNode->delegate_ip, tempNode->delegate_ip, WD_MAX_HOST_NAMELEN);
                                                                strlcpy(wdNode->nodeName, tempNode->nodeName, WD_MAX_HOST_NAMELEN);
+                                                               strlcpy(wdNode->pgp_version, tempNode->pgp_version, MAX_VERSION_STR_LEN);
                                                                wdNode->state = tempNode->state;
+                                                               wdNode->wd_data_major_version = tempNode->wd_data_major_version;
+                                                               wdNode->wd_data_minor_version = tempNode->wd_data_minor_version;
                                                                wdNode->startup_time.tv_sec = tempNode->startup_time.tv_sec;
                                                                wdNode->wd_priority = tempNode->wd_priority;
                                                                wdNode->server_socket = *conn;
@@ -1473,7 +1478,13 @@ static int read_sockets(fd_set* rmask,int pending_fds_count)
                                                {
                                                        /* reply with node info message */
                                                        ereport(LOG,
-                                                                       (errmsg("new node joined the cluster hostname:\"%s\" port:%d pgpool_port:%d",tempNode->hostname,tempNode->wd_port,tempNode->pgpool_port)));
+                                                                       (errmsg("new node joined the cluster hostname:\"%s\" port:%d pgpool_port:%d", wdNode->hostname,
+                                                                                       wdNode->wd_port,
+                                                                                       wdNode->pgpool_port),
+                                                                        errdetail("Pgpool-II version:\"%s\" watchdog messaging version: %d.%d",
+                                                                                          wdNode->pgp_version,
+                                                                                          wdNode->wd_data_major_version,
+                                                                                          wdNode->wd_data_minor_version)));
 
                                                        watchdog_state_machine(WD_EVENT_PACKET_RCV, wdNode, pkt, NULL);
                                                }
@@ -1941,10 +1952,10 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcComma
 
 static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData* ipcCommand)
 {
-       int nodeStatus;
-       int nodeID;
-       char *message;
-       bool ret;
+       int                     nodeStatus;
+       int                     nodeID;
+       char       *message = NULL;
+       bool            ret;
 
        if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL)
                return IPC_CMD_ERROR;
@@ -1960,12 +1971,13 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData*
        }
 
        if (message)
+       {
                ereport(LOG,
                                (errmsg("received node status change ipc message"),
-                                errdetail("%s",message)));
-       pfree(message);
-
-       if (fire_node_status_event(nodeID,nodeStatus) == false)
+                                errdetail("%s", message)));
+               pfree(message);
+       }
+       if (fire_node_status_event(nodeID, nodeStatus) == false)
                return IPC_CMD_ERROR;
        
        return IPC_CMD_COMPLETE;
@@ -3535,9 +3547,9 @@ static void cluster_service_message_processor(WatchdogNode* wdNode, WDPacketData
                        break;
 
                case CLUSTER_NEEDS_ELECTION:
-               {
-                       ereport(LOG,
-                                       (errmsg("remote node \"%s\" detected the split-brain and wants to re-initialize the cluster",wdNode->nodeName)));
+                       {
+                               ereport(LOG,
+                                               (errmsg("remote node \"%s\" detected the problem and asking us to rejoin the cluster", wdNode->nodeName)));
 
                        set_state(WD_JOINING);
                }
@@ -3569,6 +3581,12 @@ static void cluster_service_message_processor(WatchdogNode* wdNode, WDPacketData
                }
                        break;
 
+               case CLUSTER_NODE_REQUIRE_TO_RELOAD:
+               {
+                       watchdog_state_machine(WD_EVENT_WD_STATE_REQUIRE_RELOAD, NULL, NULL, NULL);
+               }
+                       break;
+
                case CLUSTER_NODE_INVALID_VERSION:
                {
                        /* this should never happen
@@ -5370,6 +5388,39 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
                }
                        break;
 
+               case WD_EVENT_REMOTE_NODE_FOUND:
+               {
+                       ereport(LOG,
+                               (errmsg("remote node \"%s\" is reachable again", wdNode->nodeName),
+                                       errdetail("trying to add it back as a standby")));
+                       /* If I am the cluster master. Ask for the node info and to re-send the join message */
+                       send_message_of_type(wdNode, WD_REQ_INFO_MESSAGE, NULL);
+                       if (wdNode->wd_data_major_version >= 1 && wdNode->wd_data_minor_version >= 1)
+                       {
+                               /*
+                                * Since data version 1.1 we support CLUSTER_NODE_REQUIRE_TO_RELOAD
+                                * which makes the standby nodes to re-send the join master node
+                                */
+                               ereport(DEBUG1,
+                                       (errmsg("asking remote node \"%s\" to rejoin master", wdNode->nodeName),
+                                               errdetail("watchdog data version %s",WD_MESSAGE_DATA_VERSION)));
+
+                               send_cluster_service_message(wdNode, pkt, CLUSTER_NODE_REQUIRE_TO_RELOAD);
+                       }
+                       else
+                       {
+                               /*
+                                * The node is on older version
+                                * So ask it to re-join the cluster
+                                */
+                               ereport(DEBUG1,
+                                       (errmsg("asking remote node \"%s\" to rejoin cluster", wdNode->nodeName),
+                                               errdetail("watchdog data version %s",WD_MESSAGE_DATA_VERSION)));
+                               send_cluster_service_message(wdNode, pkt, CLUSTER_NEEDS_ELECTION);
+                       }
+                       break;
+               }
+
                case WD_EVENT_PACKET_RCV:
                {
                        switch (pkt->type)
@@ -5852,6 +5903,14 @@ static int watchdog_state_machine_standby(WD_EVENTS event, WatchdogNode* wdNode,
                        set_timeout(5);
                        break;
 
+               case WD_EVENT_WD_STATE_REQUIRE_RELOAD:
+
+                       ereport(LOG,
+                                       (errmsg("re-sending join coordinator message to master node: \"%s\"", WD_MASTER_NODE->nodeName)));
+
+                       send_cluster_command(WD_MASTER_NODE, WD_JOIN_COORDINATOR_MESSAGE, 5);
+                       break;
+
                case WD_EVENT_COMMAND_FINISHED:
                {
                        if (clusterCommand->commandPacket.type == WD_JOIN_COORDINATOR_MESSAGE)
index dc82ab7d34b5728d233c7951ab34725250a3d2d9..a8786bb3c2f7f2e0d6226212d3c773a7852efe43 100644 (file)
@@ -419,6 +419,10 @@ char* get_watchdog_node_info_json(WatchdogNode* wdNode, char* authkey)
 
        JsonNode* jNode = jw_create_with_object(true);
 
+       jw_put_string(jNode, "PGPOOL_VERSION", VERSION);
+       jw_put_string(jNode, "DATA_VERSION_MAJOR", WD_MESSAGE_DATA_VERSION_MAJOR);
+       jw_put_string(jNode, "DATA_VERSION_MINOR", WD_MESSAGE_DATA_VERSION_MINOR);
+
        jw_put_int(jNode, "State", wdNode->state);
        jw_put_int(jNode, "WdPort", wdNode->wd_port);
        jw_put_int(jNode, "PgpoolPort", wdNode->pgpool_port);
@@ -517,6 +521,24 @@ WatchdogNode* get_watchdog_node_from_json(char* json_data, int data_len, char**
                goto ERROR_EXIT;
        strncpy(wdNode->delegate_ip, ptr, sizeof(wdNode->delegate_ip) -1);
 
+       ptr = json_get_string_value_for_key(root, "DATA_VERSION_MAJOR");
+       if (ptr == NULL)
+               wdNode->wd_data_major_version = 1;
+       else
+               wdNode->wd_data_major_version = atoi(ptr);
+
+       ptr = json_get_string_value_for_key(root, "DATA_VERSION_MINOR");
+       if (ptr == NULL)
+               wdNode->wd_data_minor_version = 0;
+       else
+               wdNode->wd_data_minor_version = atoi(ptr);
+
+       ptr = json_get_string_value_for_key(root, "PGPOOL_VERSION");
+       if (ptr != NULL)
+               strncpy(wdNode->pgp_version, ptr, sizeof(wdNode->pgp_version) - 1);
+       else
+               wdNode->pgp_version[0] = '0';
+
        if (authkey)
        {
                ptr = json_get_string_value_for_key(root, "authkey");
@@ -525,6 +547,7 @@ WatchdogNode* get_watchdog_node_from_json(char* json_data, int data_len, char**
                else
                        *authkey = NULL;
        }
+
        return wdNode;
 
        ERROR_EXIT: