Add in additional performance monitoring, indicating how much time
authorChristopher Browne <cbbrowne@ca.afilias.info>
Fri, 1 Aug 2008 20:18:17 +0000 (20:18 +0000)
committerChristopher Browne <cbbrowne@ca.afilias.info>
Fri, 1 Aug 2008 20:18:17 +0000 (20:18 +0000)
and how many queries are being spent against the provider + subscriber.

This should enable people to more readily infer where bottlenecks are
themselves, rather than having to go to Jan & Chris and have them infer
things *really* indirectly.

src/slon/remote_worker.c

index 99edb887bda11751a4b0d54de014b28e2c7a062c..05d48e38f84706dc838622d8a125ebf783fb62b4 100644 (file)
@@ -6,7 +6,7 @@
  *     Copyright (c) 2003-2004, PostgreSQL Global Development Group
  *     Author: Jan Wieck, Afilias USA INC.
  *
- *     $Id: remote_worker.c,v 1.173 2008-06-27 20:16:04 cbbrowne Exp $
+ *     $Id: remote_worker.c,v 1.174 2008-08-01 20:18:17 cbbrowne Exp $
  *-------------------------------------------------------------------------
  */
 
 extern int     STMTS[MAXSTATEMENTS];
 
 #define MAXGROUPSIZE 10000             /* What is the largest number of SYNCs we'd
-                                                                * want to group together??? */
+                                        * want to group together??? */
 
+
+void slon_terminate_worker(void);
 /* ----------
  * Local definitions
  * ----------
@@ -138,6 +140,23 @@ typedef enum
        SLON_WGLC_ERROR
 } WorkGroupLineCode;
 
+typedef struct PerfMon_s PerfMon;   /* Structure for doing performance monitoring */
+struct PerfMon_s 
+{
+       struct timeval prev_t;
+       struct timeval now_t;
+       double prov_query_t;      /* Time spent running queries against the provider */
+       int prov_query_c;         /* Number of queries run against the provider */
+       double subscr_query_t;      /* Time spent running prep queries against the subscriber */
+       int subscr_query_c;         /* Number of prep queries run against the subscriber */
+       double subscr_iud__t;    /* Time spent running IUD against subscriber */
+       int subscr_iud__c;       /* Number of IUD requests run against subscriber */
+       double large_tuples_t;          /* Number of large tuples processed */
+       int large_tuples_c;          /* Number of large tuples processed */
+       int num_inserts;
+       int num_updates;
+       int num_deletes;
+};
 
 struct ProviderInfo_s
 {
@@ -240,6 +259,17 @@ int                        quit_sync_finalsync;
  * Local functions
  * ----------
  */
+/*
+ * Monitoring data structure
+ */
+
+static void init_perfmon(PerfMon *pm);
+static void start_monitored_event(PerfMon *pm);
+static void monitor_provider_query(PerfMon *pm);
+static void monitor_subscriber_query(PerfMon *pm);
+static void monitor_subscriber_iud(PerfMon *pm);
+static void monitor_largetuples(PerfMon *pm);
+
 static void adjust_provider_info(SlonNode *node,
                                         WorkerGroupData *wd, int cleanup);
 static int query_execute(SlonNode *node, PGconn *dbconn,
@@ -2102,12 +2132,12 @@ remoteWorker_confirm(int no_id,
                                        /*
                                         * Move the message to the head of the queue
                                         */
-                                       if (oldmsg != node->message_head)
+                                       if ((SlonWorkMsg *) oldmsg != node->message_head)
                                        {
                                                DLLIST_REMOVE(node->message_head,
-                                                               node->message_tail, oldmsg);
+                                                                         node->message_tail, (SlonWorkMsg *) oldmsg);
                                                DLLIST_ADD_HEAD(node->message_head,
-                                                               node->message_tail, oldmsg);
+                                                               node->message_tail, (SlonWorkMsg *) oldmsg);
                                        }
                                }
                                pthread_mutex_unlock(&(node->message_lock));
@@ -2419,6 +2449,23 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id,
                        break;
                }
        }
+       if (sub_provider < 0) {
+               rtcfg_unlock();
+               slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node for set %"
+                                "not found in runtime configuration\n",
+                                set_id);
+               slon_terminate_worker();
+               return -1;
+               
+       }
+       if (set_origin < 0) {
+               rtcfg_unlock();
+               slon_log(SLON_ERROR, "remoteWorkerThread_%d: origin node for set %"
+                                "not found in runtime configuration\n",
+                                set_id);
+               slon_terminate_worker();
+               return -1;
+       }
        if (set == NULL)
        {
                rtcfg_unlock();
@@ -2430,7 +2477,7 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id,
        if ((sub_node = rtcfg_findNode(sub_provider)) == NULL)
        {
                rtcfg_unlock();
-               slon_log(SLON_ERROR, "remoteWorkerThread_%d: node %d "
+               slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node %d "
                                 "not found in runtime configuration\n",
                                 node->no_id, sub_provider);
                return -1;
@@ -3498,6 +3545,7 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id,
        gettimeofday(&tv_now, NULL);
        slon_log(SLON_INFO, "copy_set %d done in %.3f seconds\n", set_id,
                         TIMEVAL_DIFF(&tv_start, &tv_now));
+
        return 0;
 }
 
@@ -3536,8 +3584,10 @@ sync_event(SlonNode *node, SlonConn *local_conn,
 
        int                     actionlist_len;
        int64           min_ssy_seqno;
+       PerfMon pm;
 
        gettimeofday(&tv_start, NULL);
+
        slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
                         " processing\n",
                         node->no_id, event->ev_seqno);
@@ -3546,6 +3596,8 @@ sync_event(SlonNode *node, SlonConn *local_conn,
        dstring_init(&query);
        dstring_init(&lsquery);
 
+       init_perfmon(&pm);
+
        /*
         * If this slon is running in log archiving mode, open a temporary file
         * for it.
@@ -3589,6 +3641,8 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                        }
                        sprintf(conn_symname, "subscriber_%d_provider_%d",
                                        node->no_id, provider->no_id);
+
+
                        provider->conn = slon_connectdb(provider->pa_conninfo,
                                                                                        conn_symname);
                        if (provider->conn == NULL)
@@ -3609,6 +3663,7 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                        (void) slon_mkquery(&query,
                                                                "select %s.registerNodeConnection(%d); ",
                                                                rtcfg_namespace, rtcfg_nodeid);
+                       start_monitored_event(&pm);
                        if (query_execute(node, provider->conn->dbconn, &query) < 0)
                        {
                                dstring_free(&query);
@@ -3618,6 +3673,8 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                provider->conn = NULL;
                                return provider->pa_connretry;
                        }
+                       monitor_provider_query(&pm);
+
                        slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
                                         "connected to data provider %d on '%s'\n",
                                         node->no_id, provider->no_id,
@@ -3717,7 +3774,11 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                                         pset->set_id);
                slon_appendquery(&query, "); ");
 
+               start_monitored_event(&pm);
                res1 = PQexec(local_dbconn, dstring_data(&query));
+               slon_log(SLON_INFO, "about to monitor_subscriber_query - pulling big actionid list %d\n", provider);
+               monitor_subscriber_query(&pm);
+
                if (PQresultStatus(res1) != PGRES_TUPLES_OK)
                {
                        slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
@@ -3772,7 +3833,9 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                                                rtcfg_namespace,
                                                                rtcfg_namespace,
                                                                sub_set);
+                       start_monitored_event(&pm);
                        res2 = PQexec(local_dbconn, dstring_data(&query));
+                       monitor_subscriber_query(&pm);
                        if (PQresultStatus(res2) != PGRES_TUPLES_OK)
                        {
                                slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
@@ -3945,6 +4008,7 @@ sync_event(SlonNode *node, SlonConn *local_conn,
        (void) slon_mkquery(&query, "select last_value from %s.sl_log_status",
                                                rtcfg_namespace);
        res1 = PQexec(local_dbconn, dstring_data(&query));
+
        if (PQresultStatus(res1) != PGRES_TUPLES_OK)
        {
                slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
@@ -4030,7 +4094,10 @@ sync_event(SlonNode *node, SlonConn *local_conn,
 
                                        if (wgline->log.n_used > 0)
                                        {
+                                         start_monitored_event(&pm);
                                                res1 = PQexec(local_dbconn, dstring_data(&(wgline->log)));
+
+                                         monitor_subscriber_iud(&pm);
                                                if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
                                                {
                                                        PQclear(res1);
@@ -4048,7 +4115,9 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                                PQclear(res1);
                                        }
 
+                                       start_monitored_event(&pm);
                                        res1 = PQexec(local_dbconn, dstring_data(&(wgline->data)));
+                                       monitor_subscriber_iud(&pm);
                                        if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
                                        {
                                                PQclear(res1);
@@ -4227,7 +4296,9 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                slon_appendquery(&query, ") "
                                                 "  group by 1; ");
 
+               start_monitored_event(&pm);
                res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
+               monitor_subscriber_iud(&pm);
                if (PQresultStatus(res1) != PGRES_TUPLES_OK)
                {
                        slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
@@ -4251,6 +4322,7 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                                         "select %s.sequenceSetValue(%s,%d,'%s','%s'); ",
                                                                rtcfg_namespace,
                                                   seql_seqid, node->no_id, seqbuf, seql_last_value);
+                       start_monitored_event(&pm);
                        if (query_execute(node, local_dbconn, &query) < 0)
                        {
                                PQclear(res1);
@@ -4259,6 +4331,7 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                                archive_terminate(node);
                                return 60;
                        }
+                       monitor_subscriber_iud(&pm);
 
                        /*
                         * Add the sequence number adjust call to the archive log.
@@ -4343,6 +4416,18 @@ sync_event(SlonNode *node, SlonConn *local_conn,
                         INT64_FORMAT " done in %.3f seconds\n",
                         node->no_id, event->ev_seqno,
                         TIMEVAL_DIFF(&tv_start, &tv_now));
+
+       slon_log(SLON_INFO, 
+                        "remoteWorkerThread_%d: SYNC " INT64_FORMAT " Timing: " 
+                        " pqexec (s/count)" 
+                        "- provider %.3f/%d " 
+                        "- subscriber %.3f/%d " 
+                        "- IUD %.3f/%d\n",
+                        node->no_id, event->ev_seqno, 
+                        pm.prov_query_t, pm.prov_query_c, 
+                        pm.subscr_query_t, pm.prov_query_c,
+                        pm.subscr_iud__t, pm.subscr_iud__c);
+
        return 0;
 }
 
@@ -4382,13 +4467,9 @@ sync_helper(void *cdata)
        int                     line_no;
        int                     line_ncmds;
 
-       int                     num_inserts,
-                               num_deletes,
-                               num_updates;
+       PerfMon pm;
 
-       num_inserts = 0;
-       num_deletes = 0;
-       num_updates = 0;
+       init_perfmon(&pm);
 
        dstring_init(&query);
        dstring_init(&query2);
@@ -4617,7 +4698,7 @@ sync_helper(void *cdata)
                                {
                                        /*
                                         * First make sure that the overall memory usage is inside
-                                        * bouds.
+                                        * bounds.
                                         */
                                        if (wd->workdata_largemem > sync_max_largemem)
                                        {
@@ -4758,6 +4839,7 @@ sync_helper(void *cdata)
 
                                        if (log_cmdsize >= sync_max_rowsize)
                                        {
+                                               start_monitored_event(&pm);
                                                (void) slon_mkquery(&query2,
                                                                                        "select log_cmddata "
                                                                                        "from %s.sl_log_1 "
@@ -4775,6 +4857,7 @@ sync_helper(void *cdata)
                                                                                        rtcfg_namespace,
                                                                                log_origin, log_txid, log_actionseq);
                                                res2 = PQexec(dbconn, dstring_data(&query2));
+                                               monitor_largetuples(&pm);
                                                if (PQresultStatus(res2) != PGRES_TUPLES_OK)
                                                {
                                                        slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s",
@@ -4838,7 +4921,7 @@ sync_helper(void *cdata)
                                                                                         "insert into %s %s;\n",
                                                                                         wd->tab_fqname[log_tableid],
                                                                                         log_cmddata);
-                                                       num_inserts++;
+                                                       pm.num_inserts++;
                                                        break;
 
                                                case 'U':
@@ -4846,7 +4929,7 @@ sync_helper(void *cdata)
                                                                                         "update only %s set %s;\n",
                                                                                         wd->tab_fqname[log_tableid],
                                                                                         log_cmddata);
-                                                       num_updates++;
+                                                       pm.num_updates++;
                                                        break;
 
                                                case 'D':
@@ -4854,7 +4937,7 @@ sync_helper(void *cdata)
                                                                                   "delete from only %s where %s;\n",
                                                                                         wd->tab_fqname[log_tableid],
                                                                                         log_cmddata);
-                                                       num_deletes++;
+                                                       pm.num_deletes++;
                                                        break;
                                        }
                                        line_ncmds++;
@@ -4958,11 +5041,13 @@ sync_helper(void *cdata)
                                 TIMEVAL_DIFF(&tv_start, &tv_now));
 
                slon_log(SLON_INFO, "remoteHelperThread_%d_%d: inserts=%d updates=%d deletes=%d\n",
-               node->no_id, provider->no_id, num_inserts, num_updates, num_deletes);
+                                node->no_id, provider->no_id, pm.num_inserts, pm.num_updates, pm.num_deletes);
 
-               num_inserts = 0;
-               num_deletes = 0;
-               num_updates = 0;
+       slon_log(SLON_INFO, 
+                        "remoteWorkerThread_%d: sync_helper timing: " 
+                        " large tuples %.3f/%d\n", 
+                        node->no_id, 
+                        pm.large_tuples_t, pm.large_tuples_c);
 
                /*
                 * Change our helper status to DONE and tell the worker thread about
@@ -5649,8 +5734,6 @@ compress_actionseq(const char *ssy_actionlist, SlonDString *action_subquery)
 static int
 check_set_subscriber(int set_id, int node_id, PGconn *local_dbconn)
 {
-
-
        SlonDString query1;
        PGresult   *res;
 
@@ -5674,6 +5757,50 @@ check_set_subscriber(int set_id, int node_id, PGconn *local_dbconn)
        }
        PQclear(res);
        return 1;
+}
 
+static void init_perfmon(PerfMon *perf_info) {
+  perf_info->prov_query_t = 0.0;
+  perf_info->prov_query_c = 0;
+  perf_info->subscr_query_t = 0.0;
+  perf_info->subscr_query_c = 0;
+  perf_info->subscr_iud__t = 0.0;
+  perf_info->subscr_iud__c = 0;
+  perf_info->large_tuples_t = 0;
+  perf_info->large_tuples_c = 0;
+  perf_info->num_inserts = 0;
+  perf_info->num_updates = 0;
+  perf_info->num_deletes = 0;
+}
+static void start_monitored_event(PerfMon *perf_info) {
+  gettimeofday(&(perf_info->prev_t), NULL);
+}
+static void monitor_subscriber_query(PerfMon *perf_info) {
+  double diff;
+  gettimeofday(&(perf_info->now_t), NULL);
+  diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); 
+  (perf_info->subscr_query_t) += diff;
+  (perf_info->subscr_query_c) ++;
+}
+static void monitor_provider_query(PerfMon *perf_info) {
+  double diff;
+  gettimeofday(&(perf_info->now_t), NULL);
+  diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); 
+  (perf_info->prov_query_t) += diff;
+  (perf_info->prov_query_c) ++;
+}
+static void monitor_subscriber_iud(PerfMon *perf_info) {
+  double diff;
+  gettimeofday(&(perf_info->now_t), NULL);
+  diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); 
+  (perf_info->subscr_iud__t) += diff;
+  (perf_info->subscr_iud__c) ++;
+}
 
+static void monitor_largetuples(PerfMon *perf_info) {
+  double diff;
+  gettimeofday(&(perf_info->now_t), NULL);
+  diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); 
+  (perf_info->large_tuples_t) += diff;
+  (perf_info->large_tuples_c) ++;
 }