{"version", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
-
+
myargc = argc;
myargv = argv;
if (pool_is_shmem_cache())
{
size_t size;
-
+
size = pool_shared_memory_cache_size();
if (size == 0)
{
close(fd);
pool_shmem_exit(1);
exit(1);
- }
+ }
if (close(fd) == -1)
{
pool_error("could not close pid file as %s. reason: %s",
{
kill(parent, SIGUSR1);
}
+ else
+ {
+ pool_log("degenerate_backend_set: failover request from pid %d is canceled by other pgpool", getpid());
+ memset(Req_info->node_id, -1, sizeof(int) * MAX_NUM_BACKENDS);
+ }
}
pool_semaphore_unlock(REQUEST_INFO_SEM);
{
kill(parent, SIGUSR1);
}
+ else
+ {
+ pool_log("promote_backend: promote request from pid %d is canceled by other pgpool", getpid());
+ Req_info->node_id[0] = -1;
+ }
+
pool_semaphore_unlock(REQUEST_INFO_SEM);
}
Req_info->kind = NODE_UP_REQUEST;
Req_info->node_id[0] = node_id;
- if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
+ if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
(RAW_MODE && BACKEND_INFO(node_id).backend_status != CON_DOWN && VALID_BACKEND(node_id)))
{
pool_error("send_failback_request: node %d is alive.", node_id);
+ Req_info->node_id[0] = -1;
return;
}
if (pool_config->use_watchdog && WD_OK != wd_send_failback_request(node_id))
{
+ pool_log("send_failback_request: failback request from pid %d is canceled by other pgpool", getpid());
+ Req_info->node_id[0] = -1;
return;
}
kill(parent, SIGUSR1);
switching = 0;
Req_info->switching = false;
- /* end of command inter-lock */
+ /* end of command inter-lock */
if (pool_config->use_watchdog)
wd_leave_interlock();
switching = 0;
Req_info->switching = false;
- /* end of command inter-lock */
+ /* end of command inter-lock */
if (pool_config->use_watchdog)
wd_leave_interlock();
switching = 0;
Req_info->switching = false;
- /* end of command inter-lock */
+ /* end of command inter-lock */
if (pool_config->use_watchdog)
wd_leave_interlock();
if (MASTER_SLAVE && !strcmp(pool_config->master_slave_sub_mode, MODE_STREAMREP) &&
Req_info->kind == NODE_UP_REQUEST)
{
- pool_log("Do not restart children because we are failbacking node id %d host%s port:%d and we are in streaming replication mode", node_id,
+ pool_log("Do not restart children because we are failbacking node id %d host%s port:%d and we are in streaming replication mode", node_id,
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port);
else
new_primary = find_primary_node_repeatedly();
- /*
+ /*
* If follow_master_command is provided and in master/slave
* streaming replication mode, we start degenerating all backends
* as they are not replicated anymore.
wd_unlock(WD_FOLLOW_MASTER_COMMAND_LOCK);
}
- /* end of command inter-lock */
+ /* end of command inter-lock */
if (pool_config->use_watchdog)
wd_end_interlock();
bkinfo->backend_status == CON_DOWN)
continue;
- slot = make_persistent_db_connection(bkinfo->backend_hostname,
+ slot = make_persistent_db_connection(bkinfo->backend_hostname,
bkinfo->backend_port,
dbname,
pool_config->health_check_user,
/* Child terminated by segmentation fault. Report it */
pool_error("Child process %d was terminated by segmentation fault", pid);
}
-
+
/* if exiting child process was PCP handler */
if (pid == pcp_pid)
{
{
BackendInfo *bkinfo;
POOL_CONNECTION_POOL_SLOT *s;
- POOL_CONNECTION *con;
+ POOL_CONNECTION *con;
POOL_STATUS status;
POOL_SELECT_RESULT *res;
bool is_standby;
is_standby = false;
bkinfo = pool_get_node_info(i);
- s = make_persistent_db_connection(bkinfo->backend_hostname,
+ s = make_persistent_db_connection(bkinfo->backend_hostname,
bkinfo->backend_port,
"postgres",
pool_config->sr_check_user,
if (res->data[0] && !strcmp(res->data[0], "t"))
{
is_standby = true;
- }
+ }
free_select_result(res);
discard_persistent_db_connection(s);
{
strlcpy(WD_HB_IF(slot).if_name, str, WD_MAX_IF_NAME_LEN);
pool_config->num_hb_if = slot + 1;
- pool_log("hoge %d",pool_config->num_hb_if);
}
}
exit(0);
}
+/* send signal specified by sig to watchdog processes */
void
wd_kill_watchdog(int sig)
{
return pid;
}
+/* if pid is for one of watchdog processes return 1, othewize return 0 */
int
wd_is_watchdog_pid(pid_t pid)
{
return 0;
}
+/* restart watchdog process specified by pid */
int
wd_reaper_watchdog(pid_t pid, int status)
{
/* normal packet */
WD_INVALID = 0, /* invalid packet no */
+ WD_INFO_REQ, /* information request */
WD_ADD_REQ, /* add request into the watchdog list */
WD_ADD_ACCEPT, /* accept the add request */
WD_ADD_REJECT, /* reject the add request */
* watchdog list
*/
typedef struct {
- WD_STATUS status; /* status */
+ WD_STATUS status; /* status */
struct timeval tv; /* startup time value */
char hostname[WD_MAX_HOST_NAMELEN]; /* host name */
int pgpool_port; /* pgpool port */
struct timeval hb_send_time; /* send time */
struct timeval hb_last_recv_time; /* recv time */
bool is_lock_holder; /* lock holder flag */
- bool in_interlocking; /* failover or failback is in progress */
+ bool in_interlocking; /* interlocking is in progress */
+ bool is_contactable; /* able to create socket and connection */
} WdInfo;
typedef struct {
return rtn;
}
memset(&send_packet, 0, sizeof(WdPacket));
- p = &(recv_pack->wd_body.wd_info);
+ p = &(recv_pack->wd_body.wd_info);
/* authentication */
if (strlen(pool_config->wd_authkey))
/* set response packet no */
switch (recv_pack->packet_no)
{
+ /* information request */
+ case WD_INFO_REQ:
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ send_packet.packet_no = WD_READY;
+ memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
+ break;
+
/* add request into the watchdog list */
case WD_ADD_REQ:
- p = &(recv_pack->wd_body.wd_info);
- if (wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status) > 0)
+ p = &(recv_pack->wd_body.wd_info);
+ if (wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port,
+ p->delegate_ip, &(p->tv), p->status) > 0)
{
+ pool_log("wd_send_response: receive add request from %s:%d and accept it",
+ p->hostname, p->pgpool_port);
send_packet.packet_no = WD_ADD_ACCEPT;
}
else
{
+ pool_log("wd_send_response: receive add request from %s:%d and reject it",
+ p->hostname, p->pgpool_port);
send_packet.packet_no = WD_ADD_REJECT;
}
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
/* announce candidacy to be the new master */
case WD_STAND_FOR_MASTER:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
/* check exist master */
if ((q = wd_is_alive_master()) != NULL)
{
/* vote against the candidate */
send_packet.packet_no = WD_MASTER_EXIST;
memcpy(&(send_packet.wd_body.wd_info), q, sizeof(WdInfo));
+
+ pool_log("wd_send_response: WD_STAND_FOR_MASTER received, and voting against %s:%d",
+ p->hostname, p->pgpool_port);
}
else
{
/* vote for the candidate */
send_packet.packet_no = WD_VOTE_YOU;
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
+
+ pool_log("wd_send_response: WD_STAND_FOR_MASTER received, and voting for %s:%d",
+ p->hostname, p->pgpool_port);
}
break;
/* announce assumption to be the new master */
case WD_DECLARE_NEW_MASTER:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
if (WD_MYSELF->status == WD_MASTER)
{
/* resign master server */
- pool_log("wd_declare_new_master: ifconfig down to resign master server");
+ pool_log("wd_send_response: WD_DECLARE_NEW_MASTER received and resign master server");
wd_IP_down();
wd_set_myself(NULL, WD_NORMAL);
}
/* announce to assume lock holder */
case WD_STAND_FOR_LOCK_HOLDER:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
- /* only master handles lock holder privilege */
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ /* only master handles lock holder assignment */
if (WD_MYSELF->status == WD_MASTER)
{
- /* if there are no lock holder yet */
+ /* if lock holder exists yet */
if (wd_get_lock_holder() != NULL)
{
+ pool_log("wd_send_response: WD_STAND_FOR_LOCK_HOLDER received but lock holder exists already");
send_packet.packet_no = WD_LOCK_HOLDER_EXIST;
}
+ else
+ {
+ pool_log("wd_send_response: WD_STAND_FOR_LOCK_HOLDER received it");
+ wd_set_lock_holder(p, true);
+ send_packet.packet_no = WD_READY;
+ }
+ }
+ else
+ {
+ send_packet.packet_no = WD_READY;
}
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
break;
+ /* announce to assume lock holder */
case WD_DECLARE_LOCK_HOLDER:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
- wd_set_lock_holder(p, true);
- send_packet.packet_no = WD_READY;
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ if (WD_MYSELF->is_lock_holder)
+ {
+ pool_log("wd_send_response: WD_DECLARE_LOCK_HOLDER received but lock holder exists already");
+ send_packet.packet_no = WD_LOCK_HOLDER_EXIST;
+ }
+ else
+ {
+ wd_set_lock_holder(p, true);
+ send_packet.packet_no = WD_READY;
+ }
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
break;
/* announce to resign lock holder */
case WD_RESIGN_LOCK_HOLDER:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
wd_set_lock_holder(p, false);
send_packet.packet_no = WD_READY;
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
break;
+ /* announce to start interlocking */
case WD_START_INTERLOCK:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
wd_set_interlocking(p, true);
+ send_packet.packet_no = WD_READY;
+ memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
break;
+ /* announce to end interlocking */
case WD_END_INTERLOCK:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), p->status);
wd_set_interlocking(p, false);
+ send_packet.packet_no = WD_READY;
+ memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
break;
/* announce that server is down */
case WD_SERVER_DOWN:
- p = &(recv_pack->wd_body.wd_info);
- wd_set_wd_list(p->hostname,p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), WD_DOWN);
+ p = &(recv_pack->wd_body.wd_info);
+ wd_set_wd_list(p->hostname, p->pgpool_port, p->wd_port, p->delegate_ip, &(p->tv), WD_DOWN);
send_packet.packet_no = WD_READY;
memcpy(&(send_packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
if (wd_am_I_oldest() == WD_OK && WD_MYSELF->status != WD_MASTER)
}
}
break;
+
+ /* announce end online recovery */
case WD_END_RECOVERY:
send_packet.packet_no = WD_NODE_READY;
*InRecovery = RECOVERY_INIT;
kill(wd_ppid, SIGUSR2);
break;
+
+ /* announce failback request */
case WD_FAILBACK_REQUEST:
- node = &(recv_pack->wd_body.wd_node_info);
- wd_set_node_mask(WD_FAILBACK_REQUEST,node->node_id_set,node->node_num);
- is_node_packet = true;
- send_packet.packet_no = WD_NODE_READY;
+ if (Req_info->switching)
+ {
+ pool_log("wd_send_response: failback request from other pgpool is canceled because it's while switching");
+ send_packet.packet_no = WD_NODE_FAILED;
+ }
+ else
+ {
+ node = &(recv_pack->wd_body.wd_node_info);
+ wd_set_node_mask(WD_FAILBACK_REQUEST,node->node_id_set,node->node_num);
+ is_node_packet = true;
+ send_packet.packet_no = WD_NODE_READY;
+ }
break;
+
+ /* announce degenerate backend */
case WD_DEGENERATE_BACKEND:
- node = &(recv_pack->wd_body.wd_node_info);
- wd_set_node_mask(WD_DEGENERATE_BACKEND,node->node_id_set, node->node_num);
- is_node_packet = true;
- send_packet.packet_no = WD_NODE_READY;
+ if (Req_info->switching)
+ {
+ pool_log("wd_send_response: failover request from other pgpool is canceled because it's while switching");
+ send_packet.packet_no = WD_NODE_FAILED;
+ }
+ else
+ {
+ node = &(recv_pack->wd_body.wd_node_info);
+ wd_set_node_mask(WD_DEGENERATE_BACKEND,node->node_id_set, node->node_num);
+ is_node_packet = true;
+ send_packet.packet_no = WD_NODE_READY;
+ }
break;
+
+ /* announce promote backend */
case WD_PROMOTE_BACKEND:
- node = &(recv_pack->wd_body.wd_node_info);
- wd_set_node_mask(WD_PROMOTE_BACKEND,node->node_id_set, node->node_num);
- is_node_packet = true;
- send_packet.packet_no = WD_NODE_READY;
+ if (Req_info->switching)
+ {
+ pool_log("wd_send_response: promote request from other pgpool is canceled because it's while switching");
+ send_packet.packet_no = WD_NODE_FAILED;
+ }
+ else
+ {
+ node = &(recv_pack->wd_body.wd_node_info);
+ wd_set_node_mask(WD_PROMOTE_BACKEND,node->node_id_set, node->node_num);
+ is_node_packet = true;
+ send_packet.packet_no = WD_NODE_READY;
+ }
break;
+ /* announce to unlock command */
case WD_UNLOCK_REQUEST:
- lock = &(recv_pack->wd_body.wd_lock_info);
+ lock = &(recv_pack->wd_body.wd_lock_info);
wd_set_lock(lock->lock_id, false);
send_packet.packet_no = WD_LOCK_READY;
break;
return rtn;
}
-/* send node request signal */
+/* send node request signal for other pgpool*/
static void
wd_node_request_signal(WD_PACKET_NO packet_no, WdNodeInfo *node)
{
extern WdInfo * wd_get_lock_holder(void);
extern WdInfo * wd_get_interlocking(void);
+extern bool wd_is_interlocking_all(void);
extern void wd_set_lock_holder(WdInfo *p, bool value);
extern void wd_set_interlocking(WdInfo *info, bool value);
extern void wd_clear_interlocking_info(void);
+extern bool wd_is_contactable_master(void);
/* wd_packet.c */
extern int wd_startup(void);
extern int wd_declare(void);
extern int wd_stand_for_master(void);
extern int wd_notice_server_down(void);
+extern int wd_update_info(void);
extern int wd_authentication_failed(int sock);
extern int wd_create_send_socket(char * hostname, int port);
extern int wd_create_recv_socket(int port);
/* wd_lifecheck.c */
extern int is_wd_lifecheck_ready(void);
extern int wd_lifecheck(void);
+extern int wd_check_heartbeat(WdInfo * pgpool);
extern int wd_ping_pgpool(WdInfo * pgpool);
/* wd_hearbeat.c */
static int ntoh_wd_hb_packet(WdHbPacket * to, WdHbPacket * from);
static int packet_to_string_hb(WdHbPacket pkt, char *str, int maxlen);
+/* create socket for sending heartbeat */
int
wd_create_hb_send_socket(WdHbIf hb_if)
{
close(sock);
return -1;
}
-
+/*
#if defined(SO_BINDTODEVICE)
{
struct ifreq i;
pool_log("wd_create_hb_send_socket: bind send socket to device: %s", i.ifr_name);
}
#endif
+*/
#if defined(SO_REUSEPORT)
{
int one = 1;
return sock;
}
+/* create socket for receiving heartbeat */
int
wd_create_hb_recv_socket(WdHbIf hb_if)
{
close(sock);
return -1;
}
-
+/*
#if defined(SO_BINDTODEVICE)
{
struct ifreq i;
pool_log("wd_create_hb_recv_socket: bind receive socket to device: %s", i.ifr_name);
}
#endif
+*/
#if defined(SO_REUSEPORT)
{
return sock;
}
+/* send heartbeat signal */
int
wd_hb_send(int sock, WdHbPacket * pkt, int len, const char * host)
{
return WD_OK;
}
+/* receive heartbeat signal */
int
wd_hb_recv(int sock, WdHbPacket * pkt)
{
return WD_OK;
}
+/* fork heartbeat receiver child */
pid_t wd_hb_receiver(int fork_wait_time, WdHbIf hb_if)
{
int sock;
for(;;)
{
+ /* receive heartbeat signal */
if (wd_hb_recv(sock, &pkt) == WD_OK)
{
/* authentication */
return pid;
}
+/* fork heartbeat sender child */
pid_t wd_hb_sender(int fork_wait_time, WdHbIf hb_if)
{
int sock;
for(;;)
{
+ /* contents of packet */
gettimeofday(&pkt.send_time, NULL);
strlcpy(pkt.from, pool_config->wd_hostname, sizeof(pkt.from));
-
pkt.status = p->status;
+ /* authentication key */
if (strlen(pool_config->wd_authkey))
{
/* calculate hash from packet */
wd_calc_hash(pack_str, pack_str_len, pkt.hash);
}
+ /* send heartbeat signal */
wd_hb_send(sock, &pkt, sizeof(pkt), hb_if.addr);
pool_debug("wd_hb_sender: send heartbeat signal to %s", hb_if.addr);
sleep(pool_config->wd_heartbeat_keepalive);
static int wd_assume_lock_holder(void);
static void wd_resign_lock_holder(void);
static void wd_lock_all(void);
+static void sleep_in_waiting(void);
/* initialize interlock */
int
{
pool_log("wd_start_interlock: start interlocking");
+ /* lock all the resource */
+ wd_lock_all();
+
wd_set_interlocking(WD_MYSELF, true);
wd_send_packet_no(WD_START_INTERLOCK);
/* try to assume lock holder */
wd_assume_lock_holder();
- /* lock all the resource */
- wd_lock_all();
+ /* wait for all pgpools starting interlock */
+ while (wd_is_locked(WD_FAILOVER_START_LOCK))
+ {
+ if (WD_MYSELF->status == WD_DOWN)
+ {
+ wd_set_lock_holder(WD_MYSELF, false);
+ break;
+ }
+
+ if (wd_am_I_lock_holder() && wd_is_interlocking_all())
+ {
+ wd_unlock(WD_FAILOVER_START_LOCK);
+ }
+
+ sleep_in_waiting();
+ }
+
}
/* notify to end interlocking */
/* wait for all pgpools ending interlock */
while (wd_is_locked(WD_FAILOVER_END_LOCK))
{
- struct timeval t = {1, 0};
- select(0, NULL, NULL, NULL, &t);
+ if (WD_MYSELF->status == WD_DOWN)
+ {
+ wd_set_lock_holder(WD_MYSELF, false);
+ break;
+ }
if (wd_am_I_lock_holder() && !wd_get_interlocking())
{
wd_unlock(WD_FAILOVER_END_LOCK);
}
+
+ sleep_in_waiting();
}
wd_clear_interlocking_info();
{
while (wd_is_locked(lock_id))
{
- struct timeval t = {1, 0};
- select(0, NULL, NULL, NULL, &t);
+ if (WD_MYSELF->status == WD_DOWN)
+ {
+ wd_set_lock_holder(WD_MYSELF, false);
+ break;
+ }
if (wd_am_I_lock_holder())
break;
+
+ sleep_in_waiting();
}
}
wd_set_lock_holder(WD_MYSELF, false);
- /* (master and lock holder) or (succeeded to become lock holder) */
+ if (WD_MYSELF->status == WD_DOWN)
+ return WD_NG;
+
+ /*
+ * confirm contatable master exists,
+ * otherwise WD_STAND_FOR_LOCK_HOLDER always returns WD_OK,
+ * eventually multiple pgpools become lock_holder
+ */
+ while (!wd_is_contactable_master())
+ {
+ if (WD_MYSELF->status == WD_DOWN)
+ break;
+ }
+
+ /* I'm master and not lock holder, or I succeeded to become lock holder */
if ((WD_MYSELF->status == WD_MASTER && wd_get_lock_holder() == NULL) ||
(wd_send_packet_no(WD_STAND_FOR_LOCK_HOLDER) == WD_OK))
{
- wd_set_lock_holder(WD_MYSELF, true);
- wd_send_packet_no(WD_DECLARE_LOCK_HOLDER);
- rtn = WD_OK;
+ if (wd_send_packet_no(WD_DECLARE_LOCK_HOLDER) == WD_OK)
+ {
+ wd_set_lock_holder(WD_MYSELF, true);
+ pool_log("wd_assume_lock_holder: become a new lock holder");
+ rtn = WD_OK;
+ }
}
return rtn;
wd_update_lock_holder(void)
{
/* assume lock holder if not exist */
- if (wd_get_lock_holder() == NULL)
+ while (wd_get_lock_holder() == NULL)
+ {
+ if (WD_MYSELF->status == WD_DOWN)
+ break;
+
wd_assume_lock_holder();
+ }
}
/* resign lock holder */
return rtn;
}
+
+static void
+sleep_in_waiting(void)
+{
+ struct timeval t = {0, 100000};
+ select(0, NULL, NULL, NULL, &t);
+}
{
struct timeval tv;
+ /* I'm in down.... */
+ if (WD_MYSELF->status == WD_DOWN)
+ {
+ pool_error("wd_lifecheck: watchdog status is DOWN. You need to restart pgpool");
+ return WD_NG;
+ }
+
/* set startup time */
gettimeofday(&tv, NULL);
{
pool_error("wd_lifecheck: failed to connect to any trusted servers");
- /* This server connection may be downed */
if (WD_MYSELF->status == WD_MASTER)
{
wd_IP_down();
}
+
wd_set_myself(&tv, WD_DOWN);
wd_notice_server_down();
+
return WD_NG;
}
return WD_OK;
}
- /* check pgpool status */
+ /* check and update pgpool status */
check_pgpool_status();
- /* I'm in down.... */
- if (WD_MYSELF->status == WD_DOWN)
- {
- pool_error("wd_lifecheck: watchdog status is DOWN. You need to restart this for recovery.");
- }
-
return WD_OK;
}
/*
- * check pgpool status and modify WDList
+ * check and update pgpool status
*/
static void
check_pgpool_status()
{
int cnt;
WdInfo * p = WD_List;
- int interval;
struct timeval tv;
gettimeofday(&tv, NULL);
pool_debug("check_pgpool_status_by_hb: checking pgpool %d (%s:%d)",
cnt, p->hostname, p->pgpool_port);
+ /* about myself */
if (p == WD_MYSELF)
{
+ /* parent is dead so it's orphan.... */
if (is_parent_alive() == WD_NG && WD_MYSELF->status != WD_DOWN)
{
pool_debug("check_pgpool_status_by_hb: NG; the main pgpool process does't exist.");
wd_set_myself(&tv, WD_DOWN);
wd_notice_server_down();
}
+ /* otherwise, the parent would take care of children. */
else
{
pool_debug("check_pgpool_status_by_hb: OK; status %d", p->status);
}
}
+
+ /* about other pgpools, check the latest heartbeat. */
else
{
- interval = WD_TIME_DIFF_SEC(tv, p->hb_last_recv_time);
-
- if (interval > pool_config->wd_heartbeat_deadtime)
+ if (wd_check_heartbeat(p) == WD_NG)
{
- pool_debug("check_pgpool_status_by_hb: the latest heartbeat received %d seconds ago", interval);
pool_debug("check_pgpool_status_by_hb: NG; status %d", p->status);
pool_log("check_pgpool_status_by_hb: lifecheck failed. pgpool %d (%s:%d) seems not to be working",
cnt, p->hostname, p->pgpool_port);
- pgpool_down(p);
+
+ if (p->status != WD_DOWN)
+ pgpool_down(p);
}
else
{
- pool_debug("check_pgpool_status_by_hb: the latest heartbeat received %d secconds ago", interval);
pool_debug("check_pgpool_status_by_hb: OK; status %d", p->status);
}
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- /* send packet to all watchdogs */
+ /* send queries to all pgpools using threads */
cnt = 0;
while (p->status != WD_END)
{
}
pthread_attr_destroy(&attr);
- /* check results */
+ /* check results of queries */
p = WD_List;
for (i = 0; i < cnt; )
{
{
p->life --;
}
+
+ /* pgpool goes down */
if (p->life <= 0)
{
pool_log("check_pgpool_status_by_query: lifecheck failed %d times. pgpool %d (%s:%d) seems not to be working",
pool_config->wd_life_point, i, p->hostname, p->pgpool_port);
+ /* It's me! */
if ((i == 0) &&
(WD_MYSELF->status != WD_DOWN))
{
wd_set_myself(&tv, WD_DOWN);
wd_notice_server_down();
}
- else
- {
+
+ /* It's other pgpool */
+ else if (p->status != WD_DOWN)
pgpool_down(p);
- }
}
}
i++;
thread_arg = (WdPgpoolThreadArg *)arg;
conn = thread_arg->conn;
- rtn = (uintptr_t)ping_pgpool(conn);
+ rtn = (uintptr_t)ping_pgpool(conn);
pthread_exit((void *)rtn);
}
static int
pgpool_down(WdInfo * pool)
{
- int rtn = WD_DOWN;
+ int rtn = WD_OK;
+ WD_STATUS prev_status;
- /* the active pgpool goes down */
- if ((WD_MYSELF->status == WD_NORMAL) &&
- (pool->status == WD_MASTER))
+ pool_log("pgpool_down: %s:%d is going down",
+ pool->hostname, pool->pgpool_port);
+
+ prev_status = pool->status;
+ pool->status = WD_DOWN;
+
+ /* the active pgpool goes down and I'm sandby pgpool */
+ if (prev_status == WD_MASTER && WD_MYSELF->status == WD_NORMAL)
{
- pool->status = WD_DOWN;
if (wd_am_I_oldest() == WD_OK)
{
+ pool_log("pgpool_down: I'm oldest so standing for master");
/* stand for master */
rtn = wd_stand_for_master();
if (rtn == WD_OK)
/* win */
wd_escalation();
}
+ else
+ {
+ /* rejected by others */
+ pool->status = prev_status;
+ }
}
}
- pool->status = WD_DOWN;
+
return rtn;
}
+/*
+ * Check if pgpool is alive using heartbeat signal.
+ */
+int
+wd_check_heartbeat(WdInfo * pgpool)
+{
+ int interval;
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+
+ interval = WD_TIME_DIFF_SEC(tv, pgpool->hb_last_recv_time);
+ pool_debug("wd_check_heartbeat: the latest heartbeat from %s:%d received %d seconds ago",
+ pgpool->hostname, pgpool->pgpool_port, interval);
+
+ if (interval > pool_config->wd_heartbeat_deadtime)
+ return WD_NG;
+ else
+ return WD_OK;
+}
+
/*
* Check if pgpool can accept the lifecheck query.
*/
return rtn;
}
+/* inner function for issueing lifecheck query */
static int
ping_pgpool(PGconn * conn)
{
WdInfo * wd_is_exist_master(void);
WdInfo * wd_get_lock_holder(void);
WdInfo * wd_get_interlocking(void);
+bool wd_is_interlocking_all(void);
void wd_set_lock_holder(WdInfo *info, bool value);
void wd_set_interlocking(WdInfo *info, bool value);
void wd_clear_interlocking_info(void);
int wd_am_I_oldest(void);
int wd_set_myself(struct timeval * tv, int status);
WdInfo * wd_is_alive_master(void);
+bool wd_is_contactable_master(void);
/* add or modify watchdog information list */
int
for ( i = 0 ; i < MAX_WATCHDOG_NUM ; i ++)
{
- p = (WD_List+i);
+ p = (WD_List+i);
if( p->status != WD_END)
{
{
WdInfo * p = WD_List;
- p++;
while (p->status != WD_END)
{
/* find master pgpool in the other servers */
p++;
}
/* not found */
- return NULL;
+ return NULL;
}
/* set or unset in_interlocking flag */
(p->wd_port == info->wd_port))
{
p->in_interlocking = value;
-
+
return;
}
p++;
}
/* not found */
- return NULL;
+ return NULL;
}
/* return the pgpool in interlocking found in first, NULL if not found */
{
WdInfo * p = WD_List;
+ /* for updating contactable flag */
+ wd_update_info();
+
while (p->status != WD_END)
{
+ /* skip if not contactable */
+ if (!p->is_contactable)
+ {
+ p++;
+ continue;
+ }
+
/* find pgpool in interlocking */
if ((p->status == WD_NORMAL || p->status == WD_MASTER) &&
p->in_interlocking)
}
/* not found */
- return NULL;
+ return NULL;
+}
+
+/* if all pgpool are in interlocking return true, otherwise false */
+bool
+wd_is_interlocking_all(void)
+{
+ WdInfo * p = WD_List;
+ bool rtn = true;
+
+ /* for updating contactable flag */
+ wd_update_info();
+
+ while (p->status != WD_END)
+ {
+ /* skip if not contactable */
+ if (!p->is_contactable)
+ {
+ p++;
+ continue;
+ }
+
+ /* find pgpool not in interlocking */
+ if ((p->status == WD_NORMAL || p->status == WD_MASTER) &&
+ !p->in_interlocking)
+ {
+ rtn = false;
+ }
+ p++;
+ }
+
+ return rtn;
}
/* clear flags for interlocking */
return WD_OK;
}
+/*
+ * if master exists and it is alive actually return the master,
+ * otherwise return NULL
+ */
WdInfo *
wd_is_alive_master(void)
{
WdInfo * master = NULL;
+ if (WD_MYSELF->status == WD_MASTER)
+ return WD_MYSELF;
+
master = wd_is_exist_master();
if (master != NULL)
{
- if (!strcmp(pool_config->wd_lifecheck_method, MODE_HEARTBEAT) ||
- (!strcmp(pool_config->wd_lifecheck_method, MODE_QUERY) &&
- wd_ping_pgpool(master) == WD_OK))
+ if ((!strcmp(pool_config->wd_lifecheck_method, MODE_HEARTBEAT)
+ && wd_check_heartbeat(master) == WD_OK) ||
+ (!strcmp(pool_config->wd_lifecheck_method, MODE_QUERY)
+ && wd_ping_pgpool(master) == WD_OK))
{
return master;
}
}
+
+ pool_debug("wd_is_alive_master: alive master not found");
+
return NULL;
}
+/*
+ * if master exists and it is contactable return true,
+ * otherwise return false
+ */
+bool
+wd_is_contactable_master(void)
+{
+ WdInfo * master = NULL;
+
+ if (WD_MYSELF->status == WD_MASTER)
+ return true;
+
+ /* for updating contactable flag */
+ wd_update_info();
+
+ master = wd_is_alive_master();
+ if (master != NULL)
+ {
+ return master->is_contactable;
+ }
+
+ return false;
+}
int wd_declare(void);
int wd_stand_for_master(void);
int wd_notice_server_down(void);
+int wd_update_info(void);
int wd_authentication_failed(int sock);
int wd_create_send_socket(char * hostname, int port);
int wd_create_recv_socket(int port);
int wd_packet_to_string(WdPacket pkt, char *str, int maxlen);
static void * wd_thread_negotiation(void * arg);
+static int send_packet_for_all(WdPacket *packet);
static int send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type);
static int hton_wd_packet(WdPacket * to, WdPacket * from);
static int ntoh_wd_packet(WdPacket * to, WdPacket * from);
return rtn;
}
+int
+wd_update_info(void)
+{
+ int rtn;
+
+ /* send info request packet */
+ rtn = wd_send_packet_no(WD_INFO_REQ);
+ return rtn;
+}
+
/* send authentication failed packet */
int
wd_authentication_failed(int sock)
int
wd_send_packet_no(WD_PACKET_NO packet_no )
{
- int rtn;
+ int rtn = WD_OK;
WdPacket packet;
memset(&packet, 0, sizeof(WdPacket));
- /* set add request packet */
+ /* set packet no and self information */
packet.packet_no = packet_no;
memcpy(&(packet.wd_body.wd_info), WD_MYSELF, sizeof(WdInfo));
- /* send packet to all watchdogs */
- rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
- if (rtn == WD_OK)
- {
- rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
- }
+ /* send packet for all watchdogs */
+ rtn = send_packet_for_all(&packet);
+
return rtn;
}
continue;
else
{
+ pool_error("wd_recv_packet: recv failed");
return WD_NG;
}
}
}
break;
case WD_STAND_FOR_LOCK_HOLDER:
+ case WD_DECLARE_LOCK_HOLDER:
if (recv_packet.packet_no == WD_LOCK_HOLDER_EXIST)
{
rtn = WD_NG;
}
break;
case WD_DECLARE_NEW_MASTER:
- case WD_DECLARE_LOCK_HOLDER:
case WD_RESIGN_LOCK_HOLDER:
+
if (recv_packet.packet_no != WD_READY)
{
rtn = WD_NG;
pthread_exit((void *)rtn);
}
+static int
+send_packet_for_all(WdPacket *packet)
+{
+ int rtn = WD_OK;
+
+ WdInfo *p;
+
+ /* send packet to master watchdog */
+ if (WD_MYSELF->status != WD_MASTER)
+ rtn = send_packet_4_nodes(packet, WD_SEND_TO_MASTER );
+
+ /* send packet to other watchdogs */
+ if (rtn == WD_OK)
+ {
+ rtn = send_packet_4_nodes(packet, WD_SEND_WITHOUT_MASTER);
+ }
+
+ return rtn;
+}
+
static int
send_packet_4_nodes(WdPacket *packet, WD_SEND_TYPE type)
{
return WD_NG;
}
- if ((type == WD_SEND_TO_MASTER) && (WD_MYSELF->status == WD_MASTER))
- {
- return WD_OK;
- }
-
/* thread init */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- /* send packet to all watchdogs */
+ /* skip myself */
p++;
+ WD_MYSELF->is_contactable = true;
+
+ /* send packet to other watchdogs */
cnt = 0;
while (p->status != WD_END)
{
+ /* don't send packet to pgpool in down */
if (p->status == WD_DOWN)
{
+ p->is_contactable = false;
p++;
continue;
}
+
if (type == WD_SEND_TO_MASTER )
{
if (p->status != WD_MASTER)
continue;
}
}
+
sock = wd_create_send_socket(p->hostname, p->wd_port);
if (sock == -1)
{
+ pool_log("send_packet_4_nodes: packet for %s:%d is canceled", p->hostname, p->wd_port);
+ p->is_contactable = false;
p++;
continue;
}
+ else
+ {
+ p->is_contactable = true;
+ }
+
thread_arg[cnt].sock = sock;
thread_arg[cnt].target = p;
thread_arg[cnt].packet = packet;
rc = pthread_create(&thread[cnt], &attr, wd_thread_negotiation, (void*)&thread_arg[cnt]);
+
cnt ++;
p++;
}
pthread_attr_destroy(&attr);
+ /* no packet is sent */
if (cnt == 0)
{
return WD_OK;
}
- /* init return value */
+ /* default return value */
if ((packet->packet_no == WD_STAND_FOR_MASTER) ||
(packet->packet_no == WD_STAND_FOR_LOCK_HOLDER) ||
+ (packet->packet_no == WD_DECLARE_LOCK_HOLDER) ||
(packet->packet_no == WD_START_RECOVERY))
{
rtn = WD_OK;
rtn = WD_NG;
}
+ /* receive the results */
for (i=0; i<cnt; )
{
int result;
continue;
}
+ /* aggregate results according to the packet type */
if ((packet->packet_no == WD_STAND_FOR_MASTER) ||
(packet->packet_no == WD_STAND_FOR_LOCK_HOLDER) ||
+ (packet->packet_no == WD_DECLARE_LOCK_HOLDER) ||
(packet->packet_no == WD_START_RECOVERY))
{
/* if any result is NG then return NG */
rtn = WD_NG;
}
}
+
else
{
/* if any result is OK then return OK */
packet.wd_body.wd_node_info.node_num = count;
/* send packet to all watchdogs */
- rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
- if (rtn == WD_OK)
- {
- rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
- }
+ rtn = send_packet_for_all(&packet);
+
return rtn;
}
packet.wd_body.wd_lock_info.lock_id= lock_id;
/* send packet to all watchdogs */
- rtn = send_packet_4_nodes(&packet, WD_SEND_TO_MASTER );
- if (rtn == WD_OK)
- {
- rtn = send_packet_4_nodes(&packet, WD_SEND_WITHOUT_MASTER);
- }
+ rtn = send_packet_for_all(&packet);
+
return rtn;
}