* Copyright (c) 2003-2004, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
- * $Id: remote_worker.c,v 1.173 2008-06-27 20:16:04 cbbrowne Exp $
+ * $Id: remote_worker.c,v 1.174 2008-08-01 20:18:17 cbbrowne Exp $
*-------------------------------------------------------------------------
*/
extern int STMTS[MAXSTATEMENTS];
#define MAXGROUPSIZE 10000 /* What is the largest number of SYNCs we'd
- * want to group together??? */
+ * want to group together??? */
+
+void slon_terminate_worker(void);
/* ----------
* Local definitions
* ----------
SLON_WGLC_ERROR
} WorkGroupLineCode;
+typedef struct PerfMon_s PerfMon; /* Structure for doing performance monitoring */
+struct PerfMon_s
+{
+ struct timeval prev_t;
+ struct timeval now_t;
+ double prov_query_t; /* Time spent running queries against the provider */
+ int prov_query_c; /* Number of queries run against the provider */
+ double subscr_query_t; /* Time spent running prep queries against the subscriber */
+ int subscr_query_c; /* Number of prep queries run against the subscriber */
+ double subscr_iud__t; /* Time spent running IUD against subscriber */
+ int subscr_iud__c; /* Number of IUD requests run against subscriber */
+ double large_tuples_t; /* Number of large tuples processed */
+ int large_tuples_c; /* Number of large tuples processed */
+ int num_inserts;
+ int num_updates;
+ int num_deletes;
+};
struct ProviderInfo_s
{
* Local functions
* ----------
*/
+/*
+ * Monitoring data structure
+ */
+
+static void init_perfmon(PerfMon *pm);
+static void start_monitored_event(PerfMon *pm);
+static void monitor_provider_query(PerfMon *pm);
+static void monitor_subscriber_query(PerfMon *pm);
+static void monitor_subscriber_iud(PerfMon *pm);
+static void monitor_largetuples(PerfMon *pm);
+
static void adjust_provider_info(SlonNode *node,
WorkerGroupData *wd, int cleanup);
static int query_execute(SlonNode *node, PGconn *dbconn,
/*
* Move the message to the head of the queue
*/
- if (oldmsg != node->message_head)
+ if ((SlonWorkMsg *) oldmsg != node->message_head)
{
DLLIST_REMOVE(node->message_head,
- node->message_tail, oldmsg);
+ node->message_tail, (SlonWorkMsg *) oldmsg);
DLLIST_ADD_HEAD(node->message_head,
- node->message_tail, oldmsg);
+ node->message_tail, (SlonWorkMsg *) oldmsg);
}
}
pthread_mutex_unlock(&(node->message_lock));
break;
}
}
+ if (sub_provider < 0) {
+ rtcfg_unlock();
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node for set %"
+ "not found in runtime configuration\n",
+ set_id);
+ slon_terminate_worker();
+ return -1;
+
+ }
+ if (set_origin < 0) {
+ rtcfg_unlock();
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: origin node for set %"
+ "not found in runtime configuration\n",
+ set_id);
+ slon_terminate_worker();
+ return -1;
+ }
if (set == NULL)
{
rtcfg_unlock();
if ((sub_node = rtcfg_findNode(sub_provider)) == NULL)
{
rtcfg_unlock();
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: node %d "
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node %d "
"not found in runtime configuration\n",
node->no_id, sub_provider);
return -1;
gettimeofday(&tv_now, NULL);
slon_log(SLON_INFO, "copy_set %d done in %.3f seconds\n", set_id,
TIMEVAL_DIFF(&tv_start, &tv_now));
+
return 0;
}
int actionlist_len;
int64 min_ssy_seqno;
+ PerfMon pm;
gettimeofday(&tv_start, NULL);
+
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
" processing\n",
node->no_id, event->ev_seqno);
dstring_init(&query);
dstring_init(&lsquery);
+ init_perfmon(&pm);
+
/*
* If this slon is running in log archiving mode, open a temporary file
* for it.
}
sprintf(conn_symname, "subscriber_%d_provider_%d",
node->no_id, provider->no_id);
+
+
provider->conn = slon_connectdb(provider->pa_conninfo,
conn_symname);
if (provider->conn == NULL)
(void) slon_mkquery(&query,
"select %s.registerNodeConnection(%d); ",
rtcfg_namespace, rtcfg_nodeid);
+ start_monitored_event(&pm);
if (query_execute(node, provider->conn->dbconn, &query) < 0)
{
dstring_free(&query);
provider->conn = NULL;
return provider->pa_connretry;
}
+ monitor_provider_query(&pm);
+
slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
"connected to data provider %d on '%s'\n",
node->no_id, provider->no_id,
pset->set_id);
slon_appendquery(&query, "); ");
+ start_monitored_event(&pm);
res1 = PQexec(local_dbconn, dstring_data(&query));
+ slon_log(SLON_INFO, "about to monitor_subscriber_query - pulling big actionid list %d\n", provider);
+ monitor_subscriber_query(&pm);
+
if (PQresultStatus(res1) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
rtcfg_namespace,
rtcfg_namespace,
sub_set);
+ start_monitored_event(&pm);
res2 = PQexec(local_dbconn, dstring_data(&query));
+ monitor_subscriber_query(&pm);
if (PQresultStatus(res2) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
(void) slon_mkquery(&query, "select last_value from %s.sl_log_status",
rtcfg_namespace);
res1 = PQexec(local_dbconn, dstring_data(&query));
+
if (PQresultStatus(res1) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
if (wgline->log.n_used > 0)
{
+ start_monitored_event(&pm);
res1 = PQexec(local_dbconn, dstring_data(&(wgline->log)));
+
+ monitor_subscriber_iud(&pm);
if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
{
PQclear(res1);
PQclear(res1);
}
+ start_monitored_event(&pm);
res1 = PQexec(local_dbconn, dstring_data(&(wgline->data)));
+ monitor_subscriber_iud(&pm);
if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
{
PQclear(res1);
slon_appendquery(&query, ") "
" group by 1; ");
+ start_monitored_event(&pm);
res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
+ monitor_subscriber_iud(&pm);
if (PQresultStatus(res1) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
"select %s.sequenceSetValue(%s,%d,'%s','%s'); ",
rtcfg_namespace,
seql_seqid, node->no_id, seqbuf, seql_last_value);
+ start_monitored_event(&pm);
if (query_execute(node, local_dbconn, &query) < 0)
{
PQclear(res1);
archive_terminate(node);
return 60;
}
+ monitor_subscriber_iud(&pm);
/*
* Add the sequence number adjust call to the archive log.
INT64_FORMAT " done in %.3f seconds\n",
node->no_id, event->ev_seqno,
TIMEVAL_DIFF(&tv_start, &tv_now));
+
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d: SYNC " INT64_FORMAT " Timing: "
+ " pqexec (s/count)"
+ "- provider %.3f/%d "
+ "- subscriber %.3f/%d "
+ "- IUD %.3f/%d\n",
+ node->no_id, event->ev_seqno,
+ pm.prov_query_t, pm.prov_query_c,
+ pm.subscr_query_t, pm.prov_query_c,
+ pm.subscr_iud__t, pm.subscr_iud__c);
+
return 0;
}
int line_no;
int line_ncmds;
- int num_inserts,
- num_deletes,
- num_updates;
+ PerfMon pm;
- num_inserts = 0;
- num_deletes = 0;
- num_updates = 0;
+ init_perfmon(&pm);
dstring_init(&query);
dstring_init(&query2);
{
/*
* First make sure that the overall memory usage is inside
- * bouds.
+ * bounds.
*/
if (wd->workdata_largemem > sync_max_largemem)
{
if (log_cmdsize >= sync_max_rowsize)
{
+ start_monitored_event(&pm);
(void) slon_mkquery(&query2,
"select log_cmddata "
"from %s.sl_log_1 "
rtcfg_namespace,
log_origin, log_txid, log_actionseq);
res2 = PQexec(dbconn, dstring_data(&query2));
+ monitor_largetuples(&pm);
if (PQresultStatus(res2) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s",
"insert into %s %s;\n",
wd->tab_fqname[log_tableid],
log_cmddata);
- num_inserts++;
+ pm.num_inserts++;
break;
case 'U':
"update only %s set %s;\n",
wd->tab_fqname[log_tableid],
log_cmddata);
- num_updates++;
+ pm.num_updates++;
break;
case 'D':
"delete from only %s where %s;\n",
wd->tab_fqname[log_tableid],
log_cmddata);
- num_deletes++;
+ pm.num_deletes++;
break;
}
line_ncmds++;
TIMEVAL_DIFF(&tv_start, &tv_now));
slon_log(SLON_INFO, "remoteHelperThread_%d_%d: inserts=%d updates=%d deletes=%d\n",
- node->no_id, provider->no_id, num_inserts, num_updates, num_deletes);
+ node->no_id, provider->no_id, pm.num_inserts, pm.num_updates, pm.num_deletes);
- num_inserts = 0;
- num_deletes = 0;
- num_updates = 0;
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d: sync_helper timing: "
+ " large tuples %.3f/%d\n",
+ node->no_id,
+ pm.large_tuples_t, pm.large_tuples_c);
/*
* Change our helper status to DONE and tell the worker thread about
static int
check_set_subscriber(int set_id, int node_id, PGconn *local_dbconn)
{
-
-
SlonDString query1;
PGresult *res;
}
PQclear(res);
return 1;
+}
+static void init_perfmon(PerfMon *perf_info) {
+ perf_info->prov_query_t = 0.0;
+ perf_info->prov_query_c = 0;
+ perf_info->subscr_query_t = 0.0;
+ perf_info->subscr_query_c = 0;
+ perf_info->subscr_iud__t = 0.0;
+ perf_info->subscr_iud__c = 0;
+ perf_info->large_tuples_t = 0;
+ perf_info->large_tuples_c = 0;
+ perf_info->num_inserts = 0;
+ perf_info->num_updates = 0;
+ perf_info->num_deletes = 0;
+}
+static void start_monitored_event(PerfMon *perf_info) {
+ gettimeofday(&(perf_info->prev_t), NULL);
+}
+static void monitor_subscriber_query(PerfMon *perf_info) {
+ double diff;
+ gettimeofday(&(perf_info->now_t), NULL);
+ diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t));
+ (perf_info->subscr_query_t) += diff;
+ (perf_info->subscr_query_c) ++;
+}
+static void monitor_provider_query(PerfMon *perf_info) {
+ double diff;
+ gettimeofday(&(perf_info->now_t), NULL);
+ diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t));
+ (perf_info->prov_query_t) += diff;
+ (perf_info->prov_query_c) ++;
+}
+static void monitor_subscriber_iud(PerfMon *perf_info) {
+ double diff;
+ gettimeofday(&(perf_info->now_t), NULL);
+ diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t));
+ (perf_info->subscr_iud__t) += diff;
+ (perf_info->subscr_iud__c) ++;
+}
+static void monitor_largetuples(PerfMon *perf_info) {
+ double diff;
+ gettimeofday(&(perf_info->now_t), NULL);
+ diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t));
+ (perf_info->large_tuples_t) += diff;
+ (perf_info->large_tuples_c) ++;
}