Fix possible deadlock during faiover when using three pgpools with watchdog.
authorYugo Nagata <nagata@sraoss.co.jp>
Fri, 31 May 2013 08:11:31 +0000 (17:11 +0900)
committerYugo Nagata <nagata@sraoss.co.jp>
Fri, 31 May 2013 08:11:31 +0000 (17:11 +0900)
- In interlocking of failover command, wait for contactable master pgpool
  befor assuming lock holder (= command executor) because master pgpool
  decide who should become the lock holder.
- Reject failover request from other pgpool if failover has started alread.

main.c
pool_config.c
watchdog/watchdog.c
watchdog/watchdog.h
watchdog/wd_child.c
watchdog/wd_ext.h
watchdog/wd_heartbeat.c
watchdog/wd_interlock.c
watchdog/wd_lifecheck.c
watchdog/wd_list.c
watchdog/wd_packet.c

diff --git a/main.c b/main.c
index 61b6ce91bd0c03aaa78842d509a8a3a9b1625e59..6ace3e235403efa48029103184b384e0f664ff49 100644 (file)
--- a/main.c
+++ b/main.c
@@ -219,7 +219,7 @@ int main(int argc, char **argv)
                {"version", no_argument, NULL, 'v'},
                {NULL, 0, NULL, 0}
        };
-       
+
        myargc = argc;
        myargv = argv;
 
@@ -574,7 +574,7 @@ int main(int argc, char **argv)
                if (pool_is_shmem_cache())
                {
                        size_t size;
-                       
+
                        size = pool_shared_memory_cache_size();
                        if (size == 0)
                        {
@@ -1051,7 +1051,7 @@ static void write_pid_file(void)
                close(fd);
                pool_shmem_exit(1);
                exit(1);
-       }       
+       }
        if (close(fd) == -1)
        {
                pool_error("could not close pid file as %s. reason: %s",
@@ -1499,6 +1499,11 @@ void degenerate_backend_set(int *node_id_set, int count)
                {
                        kill(parent, SIGUSR1);
                }
+               else
+               {
+                       pool_log("degenerate_backend_set: failover request from pid %d is canceled by other pgpool", getpid());
+                       memset(Req_info->node_id, -1, sizeof(int) * MAX_NUM_BACKENDS);
+               }
        }
 
        pool_semaphore_unlock(REQUEST_INFO_SEM);
@@ -1530,6 +1535,12 @@ void promote_backend(int node_id)
        {
                kill(parent, SIGUSR1);
        }
+       else
+       {
+               pool_log("promote_backend: promote request from pid %d is canceled by other pgpool", getpid());
+               Req_info->node_id[0] = -1;
+       }
+
        pool_semaphore_unlock(REQUEST_INFO_SEM);
 }
 
@@ -1542,15 +1553,18 @@ void send_failback_request(int node_id)
        Req_info->kind = NODE_UP_REQUEST;
        Req_info->node_id[0] = node_id;
 
-    if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || 
+    if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
                (RAW_MODE && BACKEND_INFO(node_id).backend_status != CON_DOWN && VALID_BACKEND(node_id)))
        {
                pool_error("send_failback_request: node %d is alive.", node_id);
+               Req_info->node_id[0] = -1;
                return;
        }
 
        if (pool_config->use_watchdog && WD_OK != wd_send_failback_request(node_id))
        {
+               pool_log("send_failback_request: failback request from pid %d is canceled by other pgpool", getpid());
+               Req_info->node_id[0] = -1;
                return;
        }
        kill(parent, SIGUSR1);
@@ -1764,7 +1778,7 @@ static void failover(void)
                        switching = 0;
                        Req_info->switching = false;
 
-                       /* end of command inter-lock */ 
+                       /* end of command inter-lock */
                        if (pool_config->use_watchdog)
                                wd_leave_interlock();
 
@@ -1808,7 +1822,7 @@ static void failover(void)
                        switching = 0;
                        Req_info->switching = false;
 
-                       /* end of command inter-lock */ 
+                       /* end of command inter-lock */
                        if (pool_config->use_watchdog)
                                wd_leave_interlock();
 
@@ -1844,7 +1858,7 @@ static void failover(void)
                        switching = 0;
                        Req_info->switching = false;
 
-                       /* end of command inter-lock */ 
+                       /* end of command inter-lock */
                        if (pool_config->use_watchdog)
                                wd_leave_interlock();
 
@@ -1918,7 +1932,7 @@ static void failover(void)
        if (MASTER_SLAVE && !strcmp(pool_config->master_slave_sub_mode, MODE_STREAMREP) &&
                Req_info->kind == NODE_UP_REQUEST)
        {
-               pool_log("Do not restart children because we are failbacking node id %d host%s port:%d and we are in streaming replication mode", node_id, 
+               pool_log("Do not restart children because we are failbacking node id %d host%s port:%d and we are in streaming replication mode", node_id,
                                 BACKEND_INFO(node_id).backend_hostname,
                                 BACKEND_INFO(node_id).backend_port);
 
@@ -1979,7 +1993,7 @@ static void failover(void)
        else
                new_primary =  find_primary_node_repeatedly();
 
-       /* 
+       /*
         * If follow_master_command is provided and in master/slave
         * streaming replication mode, we start degenerating all backends
         * as they are not replicated anymore.
@@ -2047,7 +2061,7 @@ static void failover(void)
                        wd_unlock(WD_FOLLOW_MASTER_COMMAND_LOCK);
        }
 
-       /* end of command inter-lock */ 
+       /* end of command inter-lock */
        if (pool_config->use_watchdog)
                wd_end_interlock();
 
@@ -2201,7 +2215,7 @@ static int health_check(void)
                        bkinfo->backend_status == CON_DOWN)
                        continue;
 
-               slot = make_persistent_db_connection(bkinfo->backend_hostname, 
+               slot = make_persistent_db_connection(bkinfo->backend_hostname,
                                                                                         bkinfo->backend_port,
                                                                                         dbname,
                                                                                         pool_config->health_check_user,
@@ -2359,7 +2373,7 @@ static void reaper(void)
                        /* Child terminated by segmentation fault. Report it */
                        pool_error("Child process %d was terminated by segmentation fault", pid);
                }
-                       
+
                /* if exiting child process was PCP handler */
                if (pid == pcp_pid)
                {
@@ -2773,7 +2787,7 @@ static int find_primary_node(void)
 {
        BackendInfo *bkinfo;
        POOL_CONNECTION_POOL_SLOT *s;
-       POOL_CONNECTION *con; 
+       POOL_CONNECTION *con;
        POOL_STATUS status;
        POOL_SELECT_RESULT *res;
        bool is_standby;
@@ -2801,7 +2815,7 @@ static int find_primary_node(void)
                is_standby = false;
 
                bkinfo = pool_get_node_info(i);
-               s = make_persistent_db_connection(bkinfo->backend_hostname, 
+               s = make_persistent_db_connection(bkinfo->backend_hostname,
                                                                                  bkinfo->backend_port,
                                                                                  "postgres",
                                                                                  pool_config->sr_check_user,
@@ -2829,7 +2843,7 @@ static int find_primary_node(void)
                if (res->data[0] && !strcmp(res->data[0], "t"))
                {
                        is_standby = true;
-               }   
+               }
                free_select_result(res);
                discard_persistent_db_connection(s);
 
index 422f7f3e1a9ca78db0510cccc5395012d3f5948f..8225d0d7d3c8bd8a32e1e73ce9931fc19b9034f5 100644 (file)
@@ -3932,7 +3932,6 @@ int pool_get_config(char *confpath, POOL_CONFIG_CONTEXT context)
                        {
                                strlcpy(WD_HB_IF(slot).if_name, str, WD_MAX_IF_NAME_LEN);
                                pool_config->num_hb_if = slot + 1;
-                               pool_log("hoge %d",pool_config->num_hb_if);
                        }
 
                }
index 50de1aa35eb2393782544283014afe2cc67ce811..3b1624a98d8767ce739ae61987ec2eb3864bf199 100644 (file)
@@ -74,6 +74,7 @@ wd_exit(int exit_signo)
        exit(0);
 }
 
+/* send signal specified by sig to watchdog processes */
 void
 wd_kill_watchdog(int sig)
 {
@@ -241,6 +242,7 @@ fork_a_lifecheck(int fork_wait_time)
        return pid;
 }
 
+/* if pid is for one of watchdog processes return 1, othewize return 0 */
 int
 wd_is_watchdog_pid(pid_t pid)
 {
@@ -262,6 +264,7 @@ wd_is_watchdog_pid(pid_t pid)
        return 0;
 }
 
+/* restart watchdog process specified by pid */
 int
 wd_reaper_watchdog(pid_t pid, int status)
 {
index 8f4404b9f1d2d84c56c33a48fc10bbd7842f976a..746b549db0e28bda56cc0839cce212c9c0ba027c 100644 (file)
@@ -63,6 +63,7 @@ typedef enum {
 
        /* normal packet */
        WD_INVALID = 0,                         /* invalid packet no */
+       WD_INFO_REQ,                            /* information request */
        WD_ADD_REQ,                                     /* add request into the watchdog list */
        WD_ADD_ACCEPT,                          /* accept the add request */
        WD_ADD_REJECT,                          /* reject the add request */
@@ -123,7 +124,7 @@ typedef enum {
  * watchdog list
  */
 typedef struct {
-       WD_STATUS status;                                               /* status */    
+       WD_STATUS status;                                               /* status */
        struct timeval tv;                                              /* startup time value */
        char hostname[WD_MAX_HOST_NAMELEN];             /* host name */
        int pgpool_port;                                                /* pgpool port */
@@ -134,7 +135,8 @@ typedef struct {
        struct timeval hb_send_time;                    /* send time */
        struct timeval hb_last_recv_time;               /* recv time */
        bool is_lock_holder;                                    /* lock holder flag */
-       bool in_interlocking;                                   /* failover or failback is in progress */
+       bool in_interlocking;                                   /* interlocking is in progress */
+       bool is_contactable;                                    /* able to create socket and connection */
 } WdInfo;
 
 typedef struct {
index 97d6ee2ead17114771a9dff94098e66b66623b50..343cd9898df1bbac906bfb1d684793a0e9254d63 100644 (file)
@@ -147,7 +147,7 @@ wd_send_response(int sock, WdPacket * recv_pack)
                return rtn;
        }
        memset(&send_packet, 0, sizeof(WdPacket));
-       p = &(recv_pack->wd_body.wd_info);      
+       p = &(recv_pack->wd_body.wd_info);
 
        /* authentication */
        if (strlen(pool_config->wd_authkey))
@@ -167,15 +167,28 @@ wd_send_response(int sock, WdPacket * recv_pack)
        /* set response packet no */
        switch (recv_pack->packet_no)
        {
+               /* information request */
+               case WD_INFO_REQ:
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       send_packet.packet_no = WD_READY;
+                       memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
+                       break;
+
                /* add request into the watchdog list */
                case WD_ADD_REQ:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       if (wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status) > 0)
+                       p = &(recv_pack->wd_body.wd_info);
+                       if (wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port,
+                                          p->delegate_ip, &(p->tv), p->status) > 0)
                        {
+                               pool_log("wd_send_response: receive add request from %s:%d and accept it",
+                                        p->hostname, p->pgpool_port);
                                send_packet.packet_no = WD_ADD_ACCEPT;
                        }
                        else
                        {
+                               pool_log("wd_send_response: receive add request from %s:%d and reject it",
+                                        p->hostname, p->pgpool_port);
                                send_packet.packet_no = WD_ADD_REJECT;
                        }
                        memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
@@ -183,14 +196,17 @@ wd_send_response(int sock, WdPacket * recv_pack)
 
                /* announce candidacy to be the new master */
                case WD_STAND_FOR_MASTER:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
                        /* check exist master */
                        if ((q = wd_is_alive_master()) != NULL)
                        {
                                /* vote against the candidate */
                                send_packet.packet_no = WD_MASTER_EXIST;
                                memcpy(&(send_packet.wd_body.wd_info), q, sizeof(WdInfo));
+
+                               pool_log("wd_send_response: WD_STAND_FOR_MASTER received, and voting against %s:%d",
+                                        p->hostname, p->pgpool_port);
                        }
                        else
                        {
@@ -203,17 +219,20 @@ wd_send_response(int sock, WdPacket * recv_pack)
                                /* vote for the candidate */
                                send_packet.packet_no = WD_VOTE_YOU;
                                memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
+
+                               pool_log("wd_send_response: WD_STAND_FOR_MASTER received, and voting for %s:%d",
+                                        p->hostname, p->pgpool_port);
                        }
                        break;
 
                /* announce assumption to be the new master */
                case WD_DECLARE_NEW_MASTER:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
                        if (WD_MYSELF->status == WD_MASTER)
                        {
                                /* resign master server */
-                               pool_log("wd_declare_new_master: ifconfig down to resign master server");
+                               pool_log("wd_send_response: WD_DECLARE_NEW_MASTER received and resign master server");
                                wd_IP_down();
                                wd_set_myself(NULL, WD_NORMAL);
                        }
@@ -223,53 +242,79 @@ wd_send_response(int sock, WdPacket * recv_pack)
 
                /* announce to assume lock holder */
                case WD_STAND_FOR_LOCK_HOLDER:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
-                       /* only master handles lock holder privilege */
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       /* only master handles lock holder assignment */
                        if (WD_MYSELF->status == WD_MASTER)
                        {
-                               /* if there are no lock holder yet */
+                               /* if lock holder exists yet */
                                if (wd_get_lock_holder() != NULL)
                                {
+                                       pool_log("wd_send_response: WD_STAND_FOR_LOCK_HOLDER received but lock holder exists already");
                                        send_packet.packet_no = WD_LOCK_HOLDER_EXIST;
                                }
+                               else
+                               {
+                                       pool_log("wd_send_response: WD_STAND_FOR_LOCK_HOLDER received it");
+                                       wd_set_lock_holder(p, true);
+                                       send_packet.packet_no = WD_READY;
+                               }
+                       }
+                       else
+                       {
+                               send_packet.packet_no = WD_READY;
                        }
                        memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        break;
 
+               /* announce to assume lock holder */
                case WD_DECLARE_LOCK_HOLDER:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
-                       wd_set_lock_holder(p, true);
-                       send_packet.packet_no = WD_READY;
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       if (WD_MYSELF->is_lock_holder)
+                       {
+                               pool_log("wd_send_response: WD_DECLARE_LOCK_HOLDER received but lock holder exists already");
+                               send_packet.packet_no = WD_LOCK_HOLDER_EXIST;
+                       }
+                       else
+                       {
+                               wd_set_lock_holder(p, true);
+                               send_packet.packet_no = WD_READY;
+                       }
                        memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        break;
 
                /* announce to resign lock holder */
                case WD_RESIGN_LOCK_HOLDER:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
                        wd_set_lock_holder(p, false);
                        send_packet.packet_no = WD_READY;
                        memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        break;
 
+               /* announce to start interlocking */
                case WD_START_INTERLOCK:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
                        wd_set_interlocking(p, true);
+                       send_packet.packet_no = WD_READY;
+                       memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        break;
 
+               /* announce to end interlocking */
                case WD_END_INTERLOCK:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
                        wd_set_interlocking(p, false);
+                       send_packet.packet_no = WD_READY;
+                       memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        break;
 
                /* announce that server is down */
                case WD_SERVER_DOWN:
-                       p = &(recv_pack->wd_body.wd_info);      
-                       wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), WD_DOWN);
+                       p = &(recv_pack->wd_body.wd_info);
+                       wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), WD_DOWN);
                        send_packet.packet_no = WD_READY;
                        memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
                        if (wd_am_I_oldest() == WD_OK && WD_MYSELF->status != WD_MASTER)
@@ -294,32 +339,65 @@ wd_send_response(int sock, WdPacket * recv_pack)
                                }
                        }
                        break;
+
+               /* announce end online recovery */
                case WD_END_RECOVERY:
                        send_packet.packet_no = WD_NODE_READY;
                        *InRecovery = RECOVERY_INIT;
                        kill(wd_ppid, SIGUSR2);
                        break;
+
+               /* announce failback request */
                case WD_FAILBACK_REQUEST:
-                       node = &(recv_pack->wd_body.wd_node_info);      
-                       wd_set_node_mask(WD_FAILBACK_REQUEST,node->node_id_set,node->node_num);
-                       is_node_packet = true;
-                       send_packet.packet_no = WD_NODE_READY;
+                       if (Req_info->switching)
+                       {
+                               pool_log("wd_send_response: failback request from other pgpool is canceled because it's while switching");
+                               send_packet.packet_no = WD_NODE_FAILED;
+                       }
+                       else
+                       {
+                               node = &(recv_pack->wd_body.wd_node_info);
+                               wd_set_node_mask(WD_FAILBACK_REQUEST,node->node_id_set,node->node_num);
+                               is_node_packet = true;
+                               send_packet.packet_no = WD_NODE_READY;
+                       }
                        break;
+
+               /* announce degenerate backend */
                case WD_DEGENERATE_BACKEND:
-                       node = &(recv_pack->wd_body.wd_node_info);      
-                       wd_set_node_mask(WD_DEGENERATE_BACKEND,node->node_id_set, node->node_num);
-                       is_node_packet = true;
-                       send_packet.packet_no = WD_NODE_READY;
+                       if (Req_info->switching)
+                       {
+                               pool_log("wd_send_response: failover request from other pgpool is canceled because it's while switching");
+                               send_packet.packet_no = WD_NODE_FAILED;
+                       }
+                       else
+                       {
+                               node = &(recv_pack->wd_body.wd_node_info);
+                               wd_set_node_mask(WD_DEGENERATE_BACKEND,node->node_id_set, node->node_num);
+                               is_node_packet = true;
+                               send_packet.packet_no = WD_NODE_READY;
+                       }
                        break;
+
+               /* announce promote backend */
                case WD_PROMOTE_BACKEND:
-                       node = &(recv_pack->wd_body.wd_node_info);      
-                       wd_set_node_mask(WD_PROMOTE_BACKEND,node->node_id_set, node->node_num);
-                       is_node_packet = true;
-                       send_packet.packet_no = WD_NODE_READY;
+                       if (Req_info->switching)
+                       {
+                               pool_log("wd_send_response: promote request from other pgpool is canceled because it's while switching");
+                               send_packet.packet_no = WD_NODE_FAILED;
+                       }
+                       else
+                       {
+                               node = &(recv_pack->wd_body.wd_node_info);
+                               wd_set_node_mask(WD_PROMOTE_BACKEND,node->node_id_set, node->node_num);
+                               is_node_packet = true;
+                               send_packet.packet_no = WD_NODE_READY;
+                       }
                        break;
 
+               /* announce to unlock command */
                case WD_UNLOCK_REQUEST:
-                       lock = &(recv_pack->wd_body.wd_lock_info);      
+                       lock = &(recv_pack->wd_body.wd_lock_info);
                        wd_set_lock(lock->lock_id, false);
                        send_packet.packet_no = WD_LOCK_READY;
                        break;
@@ -343,7 +421,7 @@ wd_send_response(int sock, WdPacket * recv_pack)
        return rtn;
 }
 
-/* send node request signal */
+/* send node request signal for other pgpool*/
 static void
 wd_node_request_signal(WD_PACKET_NO packet_no, WdNodeInfo *node)
 {
index 2c0d5c4bafda025e41296ea9afa7c56c626ad6fd..7491ff7aa19179db60b37f0559e32bcf2862479d 100644 (file)
@@ -51,15 +51,18 @@ extern WdInfo * wd_is_alive_master(void);
 
 extern WdInfo * wd_get_lock_holder(void);
 extern WdInfo * wd_get_interlocking(void);
+extern bool wd_is_interlocking_all(void);
 extern void wd_set_lock_holder(WdInfo *p, bool value);
 extern void wd_set_interlocking(WdInfo *info, bool value);
 extern void wd_clear_interlocking_info(void);
+extern bool wd_is_contactable_master(void);
 
 /* wd_packet.c */
 extern int wd_startup(void);
 extern int wd_declare(void);
 extern int wd_stand_for_master(void);
 extern int wd_notice_server_down(void);
+extern int wd_update_info(void);
 extern int wd_authentication_failed(int sock);
 extern int wd_create_send_socket(char * hostname, int port);
 extern int wd_create_recv_socket(int port);
@@ -90,6 +93,7 @@ extern int wd_get_cmd(char * buf, char * cmd);
 /* wd_lifecheck.c */
 extern int is_wd_lifecheck_ready(void);
 extern int wd_lifecheck(void);
+extern int wd_check_heartbeat(WdInfo * pgpool);
 extern int wd_ping_pgpool(WdInfo * pgpool);
 
 /* wd_hearbeat.c */
index 4b6c6cc0f7768a8f789c15611143f0acbf5fb406..11df1ec12b9e302a9731dee79f27d43b41f709cd 100644 (file)
@@ -58,6 +58,7 @@ static int hton_wd_hb_packet(WdHbPacket * to, WdHbPacket * from);
 static int ntoh_wd_hb_packet(WdHbPacket * to, WdHbPacket * from);
 static int packet_to_string_hb(WdHbPacket pkt, char *str, int maxlen);
 
+/* create socket for sending heartbeat */
 int
 wd_create_hb_send_socket(WdHbIf hb_if)
 {
@@ -82,7 +83,7 @@ wd_create_hb_send_socket(WdHbIf hb_if)
                close(sock);
                return -1;
        }
-
+/*
 #if defined(SO_BINDTODEVICE)
        {
                struct ifreq i;
@@ -98,6 +99,7 @@ wd_create_hb_send_socket(WdHbIf hb_if)
                pool_log("wd_create_hb_send_socket: bind send socket to device: %s", i.ifr_name);
        }
 #endif
+*/
 #if defined(SO_REUSEPORT)
        {
                int one = 1;
@@ -122,6 +124,7 @@ wd_create_hb_send_socket(WdHbIf hb_if)
        return sock;
 }
 
+/* create socket for receiving heartbeat */
 int
 wd_create_hb_recv_socket(WdHbIf hb_if)
 {
@@ -152,7 +155,7 @@ wd_create_hb_recv_socket(WdHbIf hb_if)
                close(sock);
                return -1;
        }
-
+/*
 #if defined(SO_BINDTODEVICE)
        {
                struct ifreq i;
@@ -168,6 +171,7 @@ wd_create_hb_recv_socket(WdHbIf hb_if)
                pool_log("wd_create_hb_recv_socket: bind receive socket to device: %s", i.ifr_name);
        }
 #endif
+*/
 
 #if defined(SO_REUSEPORT)
        {
@@ -216,6 +220,7 @@ wd_create_hb_recv_socket(WdHbIf hb_if)
        return sock;
 }
 
+/* send heartbeat signal */
 int
 wd_hb_send(int sock, WdHbPacket * pkt, int len, const char * host)
 {
@@ -255,6 +260,7 @@ wd_hb_send(int sock, WdHbPacket * pkt, int len, const char * host)
        return WD_OK;
 }
 
+/* receive heartbeat signal */
 int
 wd_hb_recv(int sock, WdHbPacket * pkt)
 {
@@ -287,6 +293,7 @@ wd_hb_recv(int sock, WdHbPacket * pkt)
        return WD_OK;
 }
 
+/* fork heartbeat receiver child */
 pid_t wd_hb_receiver(int fork_wait_time, WdHbIf hb_if)
 {
        int sock;
@@ -340,6 +347,7 @@ pid_t wd_hb_receiver(int fork_wait_time, WdHbIf hb_if)
 
        for(;;)
        {
+               /* receive heartbeat signal */
                if (wd_hb_recv(sock, &pkt) == WD_OK)
                {
                        /* authentication */
@@ -385,6 +393,7 @@ pid_t wd_hb_receiver(int fork_wait_time, WdHbIf hb_if)
        return pid;
 }
 
+/* fork heartbeat sender child */
 pid_t wd_hb_sender(int fork_wait_time, WdHbIf hb_if)
 {
        int sock;
@@ -434,11 +443,12 @@ pid_t wd_hb_sender(int fork_wait_time, WdHbIf hb_if)
 
        for(;;)
        {
+               /* contents of packet */
                gettimeofday(&pkt.send_time, NULL);
                strlcpy(pkt.from, pool_config->wd_hostname, sizeof(pkt.from));
-
                pkt.status = p->status;
 
+               /* authentication key */
                if (strlen(pool_config->wd_authkey))
                {
                        /* calculate hash from packet */
@@ -446,6 +456,7 @@ pid_t wd_hb_sender(int fork_wait_time, WdHbIf hb_if)
                        wd_calc_hash(pack_str, pack_str_len, pkt.hash);
                }
 
+               /* send heartbeat signal */
                wd_hb_send(sock, &pkt, sizeof(pkt), hb_if.addr);
                pool_debug("wd_hb_sender: send heartbeat signal to %s", hb_if.addr);
                sleep(pool_config->wd_heartbeat_keepalive);
index 7000d94acfb090aac4c5a9f8170f0c0e3ef48f2b..7fc70520bb6f004643a52ed8aeda5b1435982b90 100644 (file)
@@ -47,6 +47,7 @@ static void wd_update_lock_holder(void);
 static int wd_assume_lock_holder(void);
 static void wd_resign_lock_holder(void);
 static void wd_lock_all(void);
+static void sleep_in_waiting(void);
 
 /* initialize interlock */
 int
@@ -72,14 +73,32 @@ void wd_start_interlock(void)
 {
        pool_log("wd_start_interlock: start interlocking");
 
+       /* lock all the resource */
+       wd_lock_all();
+
        wd_set_interlocking(WD_MYSELF, true);
        wd_send_packet_no(WD_START_INTERLOCK);
 
        /* try to assume lock holder */
        wd_assume_lock_holder();
 
-       /* lock all the resource */
-       wd_lock_all();
+       /* wait for all pgpools starting interlock */
+       while (wd_is_locked(WD_FAILOVER_START_LOCK))
+       {
+               if (WD_MYSELF->status == WD_DOWN)
+               {
+                       wd_set_lock_holder(WD_MYSELF, false);
+                       break;
+               }
+
+               if (wd_am_I_lock_holder() && wd_is_interlocking_all())
+               {
+                       wd_unlock(WD_FAILOVER_START_LOCK);
+               }
+
+               sleep_in_waiting();
+       }
+
 }
 
 /* notify to end interlocking */
@@ -93,13 +112,18 @@ void wd_end_interlock(void)
        /* wait for all pgpools ending interlock */
        while (wd_is_locked(WD_FAILOVER_END_LOCK))
        {
-               struct timeval t = {1, 0};
-               select(0, NULL, NULL, NULL, &t);
+               if (WD_MYSELF->status == WD_DOWN)
+               {
+                       wd_set_lock_holder(WD_MYSELF, false);
+                       break;
+               }
 
                if (wd_am_I_lock_holder() && !wd_get_interlocking())
                {
                        wd_unlock(WD_FAILOVER_END_LOCK);
                }
+
+               sleep_in_waiting();
        }
 
        wd_clear_interlocking_info();
@@ -128,11 +152,16 @@ void wd_wait_for_lock(WD_LOCK_ID lock_id)
 {
        while (wd_is_locked(lock_id))
        {
-               struct timeval t = {1, 0};
-               select(0, NULL, NULL, NULL, &t);
+               if (WD_MYSELF->status == WD_DOWN)
+               {
+                       wd_set_lock_holder(WD_MYSELF, false);
+                       break;
+               }
 
                if (wd_am_I_lock_holder())
                        break;
+
+               sleep_in_waiting();
        }
 }
 
@@ -147,13 +176,30 @@ wd_assume_lock_holder(void)
 
        wd_set_lock_holder(WD_MYSELF, false);
 
-       /* (master and lock holder) or (succeeded to become lock holder) */
+       if (WD_MYSELF->status == WD_DOWN)
+               return WD_NG;
+
+       /*
+        * confirm contatable master exists,
+        * otherwise WD_STAND_FOR_LOCK_HOLDER always returns WD_OK,
+        * eventually multiple pgpools become lock_holder
+        */
+       while (!wd_is_contactable_master())
+       {
+               if (WD_MYSELF->status == WD_DOWN)
+                       break;
+       }
+
+       /* I'm master and not lock holder, or I succeeded to become lock holder */
        if ((WD_MYSELF->status == WD_MASTER && wd_get_lock_holder() == NULL) ||
            (wd_send_packet_no(WD_STAND_FOR_LOCK_HOLDER) == WD_OK))
        {
-               wd_set_lock_holder(WD_MYSELF, true);
-               wd_send_packet_no(WD_DECLARE_LOCK_HOLDER);
-               rtn = WD_OK;
+               if (wd_send_packet_no(WD_DECLARE_LOCK_HOLDER) == WD_OK)
+               {
+                       wd_set_lock_holder(WD_MYSELF, true);
+                       pool_log("wd_assume_lock_holder: become a new lock holder");
+                       rtn = WD_OK;
+               }
        }
 
        return rtn;
@@ -167,8 +213,13 @@ static void
 wd_update_lock_holder(void)
 {
        /* assume lock holder if not exist */
-       if (wd_get_lock_holder() == NULL)
+       while (wd_get_lock_holder() == NULL)
+       {
+               if (WD_MYSELF->status == WD_DOWN)
+                       break;
+
                wd_assume_lock_holder();
+       }
 }
 
 /* resign lock holder */
@@ -215,3 +266,10 @@ wd_unlock(WD_LOCK_ID lock_id)
 
        return rtn;
 }
+
+static void
+sleep_in_waiting(void)
+{
+       struct timeval t = {0, 100000};
+       select(0, NULL, NULL, NULL, &t);
+}
index 14c31d85d2d227cd74d32204aa2a9b426c99b859..08c47e276f6c9737fe496f932396dfa4c2588852 100644 (file)
@@ -110,6 +110,13 @@ wd_lifecheck(void)
 {
        struct timeval tv;
 
+       /* I'm in down.... */
+       if (WD_MYSELF->status == WD_DOWN)
+       {
+               pool_error("wd_lifecheck: watchdog status is DOWN. You need to restart pgpool");
+               return WD_NG;
+       }
+
        /* set startup time */
        gettimeofday(&tv, NULL);
 
@@ -119,13 +126,14 @@ wd_lifecheck(void)
        {
                pool_error("wd_lifecheck: failed to connect to any trusted servers");
 
-               /* This server connection may be downed */
                if (WD_MYSELF->status == WD_MASTER)
                {
                        wd_IP_down();
                }
+
                wd_set_myself(&tv, WD_DOWN);
                wd_notice_server_down();
+
                return WD_NG;
        }
 
@@ -135,20 +143,14 @@ wd_lifecheck(void)
                return WD_OK;
        }
 
-       /* check pgpool status */
+       /* check and update pgpool status */
        check_pgpool_status();
 
-       /* I'm in down.... */
-       if (WD_MYSELF->status == WD_DOWN)
-       {
-               pool_error("wd_lifecheck: watchdog status is DOWN. You need to restart this for recovery.");
-       }
-
        return WD_OK;
 }
 
 /*
- * check pgpool status and modify WDList
+ * check and update pgpool status
  */
 static void
 check_pgpool_status()
@@ -170,7 +172,6 @@ check_pgpool_status_by_hb(void)
 {
        int cnt;
        WdInfo * p = WD_List;
-       int interval;
        struct timeval tv;
 
        gettimeofday(&tv, NULL);
@@ -181,8 +182,10 @@ check_pgpool_status_by_hb(void)
                pool_debug("check_pgpool_status_by_hb: checking pgpool %d (%s:%d)",
                           cnt, p->hostname, p->pgpool_port);
 
+               /* about myself */
                if (p == WD_MYSELF)
                {
+                       /* parent is dead so it's orphan.... */
                        if (is_parent_alive() == WD_NG && WD_MYSELF->status != WD_DOWN)
                        {
                                pool_debug("check_pgpool_status_by_hb: NG; the main pgpool process does't exist.");
@@ -191,27 +194,28 @@ check_pgpool_status_by_hb(void)
                                wd_set_myself(&tv, WD_DOWN);
                                wd_notice_server_down();
                        }
+                       /* otherwise, the parent would take care of children. */
                        else
                        {
                                pool_debug("check_pgpool_status_by_hb: OK; status %d", p->status);
                        }
                }
+
+               /*  about other pgpools, check the latest heartbeat. */
                else
                {
-                       interval = WD_TIME_DIFF_SEC(tv, p->hb_last_recv_time);
-
-                       if (interval > pool_config->wd_heartbeat_deadtime)
+                       if (wd_check_heartbeat(p) == WD_NG)
                        {
-                               pool_debug("check_pgpool_status_by_hb: the latest heartbeat received %d seconds ago", interval);
                                pool_debug("check_pgpool_status_by_hb: NG; status %d", p->status);
 
                                pool_log("check_pgpool_status_by_hb: lifecheck failed. pgpool %d (%s:%d) seems not to be working",
                                 cnt, p->hostname, p->pgpool_port);
-                               pgpool_down(p);
+
+                               if (p->status != WD_DOWN)
+                                       pgpool_down(p);
                        }
                        else
                        {
-                               pool_debug("check_pgpool_status_by_hb: the latest heartbeat received %d secconds ago", interval);
                                pool_debug("check_pgpool_status_by_hb: OK; status %d", p->status);
                        }
                }
@@ -244,7 +248,7 @@ check_pgpool_status_by_query(void)
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
-       /* send packet to all watchdogs */
+       /* send queries to all pgpools using threads */
        cnt = 0;
        while (p->status != WD_END)
        {
@@ -260,7 +264,7 @@ check_pgpool_status_by_query(void)
        }
        pthread_attr_destroy(&attr);
 
-       /* check results */
+       /* check results of queries */
        p = WD_List;
        for (i = 0; i < cnt; )
        {
@@ -291,21 +295,24 @@ check_pgpool_status_by_query(void)
                        {
                                p->life --;
                        }
+
+                       /* pgpool goes down */
                        if (p->life <= 0)
                        {
                                pool_log("check_pgpool_status_by_query: lifecheck failed %d times. pgpool %d (%s:%d) seems not to be working",
                                         pool_config->wd_life_point, i, p->hostname, p->pgpool_port);
 
+                               /* It's me! */
                                if ((i == 0) &&
                                        (WD_MYSELF->status != WD_DOWN))
                                {
                                        wd_set_myself(&tv, WD_DOWN);
                                        wd_notice_server_down();
                                }
-                               else
-                               {
+
+                               /* It's other pgpool */
+                               else if (p->status != WD_DOWN)
                                        pgpool_down(p);
-                               }
                        }
                }
                i++;
@@ -326,7 +333,7 @@ thread_ping_pgpool(void * arg)
 
        thread_arg = (WdPgpoolThreadArg *)arg;
        conn = thread_arg->conn;
-       rtn = (uintptr_t)ping_pgpool(conn);     
+       rtn = (uintptr_t)ping_pgpool(conn);
 
        pthread_exit((void *)rtn);
 }
@@ -375,15 +382,21 @@ create_conn(char * hostname, int port)
 static int
 pgpool_down(WdInfo * pool)
 {
-       int rtn = WD_DOWN;
+       int rtn = WD_OK;
+       WD_STATUS prev_status;
 
-       /* the active pgpool goes down */
-       if ((WD_MYSELF->status == WD_NORMAL) &&
-               (pool->status == WD_MASTER))
+       pool_log("pgpool_down: %s:%d is going down",
+                pool->hostname, pool->pgpool_port);
+
+       prev_status = pool->status;
+       pool->status = WD_DOWN;
+
+       /* the active pgpool goes down and I'm sandby pgpool */
+       if (prev_status == WD_MASTER && WD_MYSELF->status == WD_NORMAL)
        {
-               pool->status = WD_DOWN;
                if (wd_am_I_oldest() == WD_OK)
                {
+                       pool_log("pgpool_down: I'm oldest so standing for master");
                        /* stand for master */
                        rtn = wd_stand_for_master();
                        if (rtn == WD_OK)
@@ -391,12 +404,38 @@ pgpool_down(WdInfo * pool)
                                /* win */
                                wd_escalation();
                        }
+                       else
+                       {
+                               /* rejected by others */
+                               pool->status = prev_status;
+                       }
                }
        }
-       pool->status = WD_DOWN;
+
        return rtn;
 }
 
+/*
+ * Check if pgpool is alive using heartbeat signal.
+ */
+int
+wd_check_heartbeat(WdInfo * pgpool)
+{
+       int interval;
+       struct timeval tv;
+
+       gettimeofday(&tv, NULL);
+
+       interval = WD_TIME_DIFF_SEC(tv, pgpool->hb_last_recv_time);
+       pool_debug("wd_check_heartbeat: the latest heartbeat from %s:%d received %d seconds ago",
+                  pgpool->hostname, pgpool->pgpool_port, interval);
+
+       if (interval > pool_config->wd_heartbeat_deadtime)
+               return WD_NG;
+       else
+               return WD_OK;
+}
+
 /*
  * Check if pgpool can accept the lifecheck query.
  */
@@ -412,6 +451,7 @@ wd_ping_pgpool(WdInfo * pgpool)
        return rtn;
 }
 
+/* inner function for issueing lifecheck query */
 static int
 ping_pgpool(PGconn * conn)
 {
index 56e73c3b21171a5539dd64b7ca65f151c5afa7fb..f82b44f0c79c358b1efc295fab5e50cb118a9555 100644 (file)
@@ -36,12 +36,14 @@ int wd_set_wd_info(WdInfo * info);
 WdInfo * wd_is_exist_master(void);
 WdInfo * wd_get_lock_holder(void);
 WdInfo * wd_get_interlocking(void);
+bool wd_is_interlocking_all(void);
 void wd_set_lock_holder(WdInfo *info, bool value);
 void wd_set_interlocking(WdInfo *info, bool value);
 void wd_clear_interlocking_info(void);
 int wd_am_I_oldest(void);
 int wd_set_myself(struct timeval * tv, int status);
 WdInfo * wd_is_alive_master(void);
+bool wd_is_contactable_master(void);
 
 /* add or modify watchdog information list */
 int
@@ -64,7 +66,7 @@ wd_set_wd_list(char * hostname, int pgpool_port, int wd_port, char * delegate_ip
 
        for ( i = 0 ; i < MAX_WATCHDOG_NUM ; i ++)
        {
-               p = (WD_List+i);        
+               p = (WD_List+i);
 
                if( p->status != WD_END)
                {
@@ -148,7 +150,6 @@ wd_is_exist_master(void)
 {
        WdInfo * p = WD_List;
 
-       p++;
        while (p->status != WD_END)
        {
                /* find master pgpool in the other servers */
@@ -160,7 +161,7 @@ wd_is_exist_master(void)
                p++;
        }
        /* not found */
-       return NULL;    
+       return NULL;
 }
 
 /* set or unset in_interlocking flag */
@@ -176,7 +177,7 @@ wd_set_interlocking(WdInfo *info, bool value)
                        (p->wd_port == info->wd_port))
                {
                        p->in_interlocking = value;
-                       
+
                        return;
                }
                p++;
@@ -222,7 +223,7 @@ wd_get_lock_holder(void)
        }
 
        /* not found */
-       return NULL;    
+       return NULL;
 }
 
 /* return the pgpool in interlocking found in first, NULL if not found */
@@ -231,8 +232,18 @@ wd_get_interlocking(void)
 {
        WdInfo * p = WD_List;
 
+       /* for updating contactable flag */
+       wd_update_info();
+
        while (p->status != WD_END)
        {
+               /* skip if not contactable */
+               if (!p->is_contactable)
+               {
+                       p++;
+                       continue;
+               }
+
                /* find pgpool in interlocking */
                if ((p->status == WD_NORMAL || p->status == WD_MASTER) &&
                    p->in_interlocking)
@@ -244,7 +255,38 @@ wd_get_interlocking(void)
        }
 
        /* not found */
-       return NULL;    
+       return NULL;
+}
+
+/* if all pgpool are in interlocking return true, otherwise false */
+bool
+wd_is_interlocking_all(void)
+{
+       WdInfo * p = WD_List;
+       bool rtn = true;
+
+       /* for updating contactable flag */
+       wd_update_info();
+
+       while (p->status != WD_END)
+       {
+               /* skip if not contactable */
+               if (!p->is_contactable)
+               {
+                       p++;
+                       continue;
+               }
+
+               /* find pgpool not in interlocking */
+               if ((p->status == WD_NORMAL || p->status == WD_MASTER) &&
+                   !p->in_interlocking)
+               {
+                       rtn = false;
+               }
+               p++;
+       }
+
+       return rtn;
 }
 
 /* clear flags for interlocking */
@@ -300,21 +342,55 @@ wd_set_myself(struct timeval * tv, int status)
        return WD_OK;
 }
 
+/*
+ * if master exists and it is alive actually return the master,
+ * otherwise return NULL
+ */
 WdInfo *
 wd_is_alive_master(void)
 {
        WdInfo * master = NULL;
 
+       if (WD_MYSELF->status == WD_MASTER)
+               return WD_MYSELF;
+
        master = wd_is_exist_master();
        if (master != NULL)
        {
-               if (!strcmp(pool_config->wd_lifecheck_method, MODE_HEARTBEAT) ||
-                   (!strcmp(pool_config->wd_lifecheck_method, MODE_QUERY) &&
-                        wd_ping_pgpool(master) == WD_OK))
+               if ((!strcmp(pool_config->wd_lifecheck_method, MODE_HEARTBEAT)
+                        && wd_check_heartbeat(master) == WD_OK) ||
+                   (!strcmp(pool_config->wd_lifecheck_method, MODE_QUERY)
+                            && wd_ping_pgpool(master) == WD_OK))
                {
                        return master;
                }
        }
+
+       pool_debug("wd_is_alive_master: alive master not found");
+
        return NULL;
 }
 
+/*
+ * if master exists and it is contactable return true,
+ * otherwise return false
+ */
+bool
+wd_is_contactable_master(void)
+{
+       WdInfo * master = NULL;
+
+       if (WD_MYSELF->status == WD_MASTER)
+               return true;
+
+       /* for updating contactable flag */
+       wd_update_info();
+
+       master = wd_is_alive_master();
+       if (master != NULL)
+       {
+               return master->is_contactable;
+       }
+
+       return false;
+}
index fe437414273fc9eac72e42425ccb389933e9333e..0f04fc9ced3d74a7f7f5db99a35b49fb2ecf4844 100644 (file)
@@ -56,6 +56,7 @@ int wd_startup(void);
 int wd_declare(void);
 int wd_stand_for_master(void);
 int wd_notice_server_down(void);
+int wd_update_info(void);
 int wd_authentication_failed(int sock);
 int wd_create_send_socket(char * hostname, int port);
 int wd_create_recv_socket(int port);
@@ -79,6 +80,7 @@ void wd_calc_hash(const char *str, int len, char *buf);
 int wd_packet_to_string(WdPacket pkt, char *str, int maxlen);
 
 static void * wd_thread_negotiation(void * arg);
+static int send_packet_for_all(WdPacket *packet);
 static int send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type);
 static int hton_wd_packet(WdPacket * to, WdPacket * from);
 static int ntoh_wd_packet(WdPacket * to, WdPacket * from);
@@ -131,6 +133,16 @@ wd_notice_server_down(void)
        return rtn;
 }
 
+int
+wd_update_info(void)
+{
+       int rtn;
+
+       /* send info request packet */
+       rtn = wd_send_packet_no(WD_INFO_REQ);
+       return rtn;
+}
+
 /* send authentication failed packet */
 int
 wd_authentication_failed(int sock)
@@ -151,21 +163,18 @@ wd_authentication_failed(int sock)
 int
 wd_send_packet_no(WD_PACKET_NO packet_no )
 {
-       int rtn;
+       int rtn = WD_OK;
        WdPacket packet;
 
        memset(&packet, 0, sizeof(WdPacket));
 
-       /* set add request packet */
+       /* set packet no and self information */
        packet.packet_no = packet_no;
        memcpy(&(packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
 
-       /* send packet to all watchdogs */
-       rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
-       if (rtn == WD_OK)
-       {
-               rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
-       }
+       /* send packet for all watchdogs */
+       rtn = send_packet_for_all(&packet);
+
        return rtn;
 }
 
@@ -465,6 +474,7 @@ wd_recv_packet(int sock, WdPacket * recv_pack)
                                continue;
                        else
                        {
+                               pool_error("wd_recv_packet: recv failed");
                                return WD_NG;
                        }
                }
@@ -562,14 +572,15 @@ wd_thread_negotiation(void * arg)
                        }
                        break;
                case WD_STAND_FOR_LOCK_HOLDER:
+               case WD_DECLARE_LOCK_HOLDER:
                        if (recv_packet.packet_no == WD_LOCK_HOLDER_EXIST)
                        {
                                rtn = WD_NG;
                        }
                        break;
                case WD_DECLARE_NEW_MASTER:
-               case WD_DECLARE_LOCK_HOLDER:
                case WD_RESIGN_LOCK_HOLDER:
+
                        if (recv_packet.packet_no != WD_READY)
                        {
                                rtn = WD_NG;
@@ -595,6 +606,26 @@ wd_thread_negotiation(void * arg)
        pthread_exit((void *)rtn);
 }
 
+static int
+send_packet_for_all(WdPacket *packet)
+{
+       int rtn = WD_OK;
+
+       WdInfo *p;
+
+       /* send packet to master watchdog */
+       if (WD_MYSELF->status != WD_MASTER)
+               rtn = send_packet_4_nodes(packet, WD_SEND_TO_MASTER );
+
+       /* send packet to other watchdogs */
+       if (rtn == WD_OK)
+       {
+               rtn = send_packet_4_nodes(packet, WD_SEND_WITHOUT_MASTER);
+       }
+
+       return rtn;
+}
+
 static int
 send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
 {
@@ -612,25 +643,26 @@ send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
                return WD_NG;
        }
 
-       if ((type == WD_SEND_TO_MASTER) && (WD_MYSELF->status == WD_MASTER))
-       {
-               return WD_OK;
-       }
-
        /* thread init */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
-       /* send packet to all watchdogs */
+       /* skip myself */
        p++;
+       WD_MYSELF->is_contactable = true;
+
+       /* send packet to other watchdogs */
        cnt = 0;
        while (p->status != WD_END)
        {
+               /* don't send packet to pgpool in down */
                if (p->status == WD_DOWN)
                {
+                       p->is_contactable = false;
                        p++;
                        continue;
                }
+
                if (type == WD_SEND_TO_MASTER )
                {
                        if (p->status != WD_MASTER)
@@ -647,30 +679,41 @@ send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
                                continue;
                        }
                }
+
                sock = wd_create_send_socket(p->hostname, p->wd_port);
                if (sock == -1)
                {
+                       pool_log("send_packet_4_nodes: packet for %s:%d is canceled", p->hostname, p->wd_port);
+                       p->is_contactable = false;
                        p++;
                        continue;
                }
+               else
+               {
+                       p->is_contactable = true;
+               }
+
                thread_arg[cnt].sock = sock;
                thread_arg[cnt].target = p;
                thread_arg[cnt].packet = packet;
                rc = pthread_create(&thread[cnt], &attr, wd_thread_negotiation, (void*)&thread_arg[cnt]);
+
                cnt ++;
                p++;
        }
 
        pthread_attr_destroy(&attr);
 
+       /* no packet is sent */
        if (cnt == 0)
        {
                return WD_OK;
        }
 
-       /* init return value */
+       /* default return value */
        if ((packet->packet_no == WD_STAND_FOR_MASTER) ||
                (packet->packet_no == WD_STAND_FOR_LOCK_HOLDER) ||
+               (packet->packet_no == WD_DECLARE_LOCK_HOLDER) ||
                (packet->packet_no == WD_START_RECOVERY))
        {
                rtn = WD_OK;
@@ -680,6 +723,7 @@ send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
                rtn = WD_NG;
        }
 
+       /* receive the results */
        for (i=0; i<cnt; )
        {
                int result;
@@ -690,8 +734,10 @@ send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
                        continue;
                }
 
+               /*  aggregate results according to the packet type */
                if ((packet->packet_no == WD_STAND_FOR_MASTER) ||
                        (packet->packet_no == WD_STAND_FOR_LOCK_HOLDER) ||
+                       (packet->packet_no == WD_DECLARE_LOCK_HOLDER) ||
                        (packet->packet_no == WD_START_RECOVERY))
                {
                        /* if any result is NG then return NG */
@@ -700,6 +746,7 @@ send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
                                rtn = WD_NG;
                        }
                }
+
                else
                {
                        /* if any result is OK then return OK */
@@ -1006,11 +1053,8 @@ wd_send_node_packet(WD_PACKET_NO packet_no, int *node_id_set, int count)
        packet.wd_body.wd_node_info.node_num = count;
 
        /* send packet to all watchdogs */
-       rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
-       if (rtn == WD_OK)
-       {
-               rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
-       }
+       rtn = send_packet_for_all(&packet);
+
        return rtn;
 }
 
@@ -1027,11 +1071,8 @@ wd_send_lock_packet(WD_PACKET_NO packet_no,  WD_LOCK_ID lock_id)
        packet.wd_body.wd_lock_info.lock_id= lock_id;
 
        /* send packet to all watchdogs */
-       rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
-       if (rtn == WD_OK)
-       {
-               rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
-       }
+       rtn = send_packet_for_all(&packet);
+
        return rtn;
 }