From: Christopher Browne Date: Fri, 1 Aug 2008 20:18:17 +0000 (+0000) Subject: Add in additional performance monitoring, indicating how much time X-Git-Tag: REL_2_0_0_RC2~15 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=768b49949c4c651b471f5eed26acbbecd09bc639;p=slony1-engine.git Add in additional performance monitoring, indicating how much time and how many queries are being spent against the provider + subscriber. This should enable people to more readily infer where bottlenecks are themselves, rather than having to go to Jan & Chris and have them infer things *really* indirectly. --- diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c index 99edb887..05d48e38 100644 --- a/src/slon/remote_worker.c +++ b/src/slon/remote_worker.c @@ -6,7 +6,7 @@ * Copyright (c) 2003-2004, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * - * $Id: remote_worker.c,v 1.173 2008-06-27 20:16:04 cbbrowne Exp $ + * $Id: remote_worker.c,v 1.174 2008-08-01 20:18:17 cbbrowne Exp $ *------------------------------------------------------------------------- */ @@ -27,8 +27,10 @@ extern int STMTS[MAXSTATEMENTS]; #define MAXGROUPSIZE 10000 /* What is the largest number of SYNCs we'd - * want to group together??? */ + * want to group together??? */ + +void slon_terminate_worker(void); /* ---------- * Local definitions * ---------- @@ -138,6 +140,23 @@ typedef enum SLON_WGLC_ERROR } WorkGroupLineCode; +typedef struct PerfMon_s PerfMon; /* Structure for doing performance monitoring */ +struct PerfMon_s +{ + struct timeval prev_t; + struct timeval now_t; + double prov_query_t; /* Time spent running queries against the provider */ + int prov_query_c; /* Number of queries run against the provider */ + double subscr_query_t; /* Time spent running prep queries against the subscriber */ + int subscr_query_c; /* Number of prep queries run against the subscriber */ + double subscr_iud__t; /* Time spent running IUD against subscriber */ + int subscr_iud__c; /* Number of IUD requests run against subscriber */ + double large_tuples_t; /* Number of large tuples processed */ + int large_tuples_c; /* Number of large tuples processed */ + int num_inserts; + int num_updates; + int num_deletes; +}; struct ProviderInfo_s { @@ -240,6 +259,17 @@ int quit_sync_finalsync; * Local functions * ---------- */ +/* + * Monitoring data structure + */ + +static void init_perfmon(PerfMon *pm); +static void start_monitored_event(PerfMon *pm); +static void monitor_provider_query(PerfMon *pm); +static void monitor_subscriber_query(PerfMon *pm); +static void monitor_subscriber_iud(PerfMon *pm); +static void monitor_largetuples(PerfMon *pm); + static void adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup); static int query_execute(SlonNode *node, PGconn *dbconn, @@ -2102,12 +2132,12 @@ remoteWorker_confirm(int no_id, /* * Move the message to the head of the queue */ - if (oldmsg != node->message_head) + if ((SlonWorkMsg *) oldmsg != node->message_head) { DLLIST_REMOVE(node->message_head, - node->message_tail, oldmsg); + node->message_tail, (SlonWorkMsg *) oldmsg); DLLIST_ADD_HEAD(node->message_head, - node->message_tail, oldmsg); + node->message_tail, (SlonWorkMsg *) oldmsg); } } pthread_mutex_unlock(&(node->message_lock)); @@ -2419,6 +2449,23 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id, break; } } + if (sub_provider < 0) { + rtcfg_unlock(); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node for set %" + "not found in runtime configuration\n", + set_id); + slon_terminate_worker(); + return -1; + + } + if (set_origin < 0) { + rtcfg_unlock(); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: origin node for set %" + "not found in runtime configuration\n", + set_id); + slon_terminate_worker(); + return -1; + } if (set == NULL) { rtcfg_unlock(); @@ -2430,7 +2477,7 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id, if ((sub_node = rtcfg_findNode(sub_provider)) == NULL) { rtcfg_unlock(); - slon_log(SLON_ERROR, "remoteWorkerThread_%d: node %d " + slon_log(SLON_ERROR, "remoteWorkerThread_%d: provider node %d " "not found in runtime configuration\n", node->no_id, sub_provider); return -1; @@ -3498,6 +3545,7 @@ copy_set(SlonNode *node, SlonConn *local_conn, int set_id, gettimeofday(&tv_now, NULL); slon_log(SLON_INFO, "copy_set %d done in %.3f seconds\n", set_id, TIMEVAL_DIFF(&tv_start, &tv_now)); + return 0; } @@ -3536,8 +3584,10 @@ sync_event(SlonNode *node, SlonConn *local_conn, int actionlist_len; int64 min_ssy_seqno; + PerfMon pm; gettimeofday(&tv_start, NULL); + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT " processing\n", node->no_id, event->ev_seqno); @@ -3546,6 +3596,8 @@ sync_event(SlonNode *node, SlonConn *local_conn, dstring_init(&query); dstring_init(&lsquery); + init_perfmon(&pm); + /* * If this slon is running in log archiving mode, open a temporary file * for it. @@ -3589,6 +3641,8 @@ sync_event(SlonNode *node, SlonConn *local_conn, } sprintf(conn_symname, "subscriber_%d_provider_%d", node->no_id, provider->no_id); + + provider->conn = slon_connectdb(provider->pa_conninfo, conn_symname); if (provider->conn == NULL) @@ -3609,6 +3663,7 @@ sync_event(SlonNode *node, SlonConn *local_conn, (void) slon_mkquery(&query, "select %s.registerNodeConnection(%d); ", rtcfg_namespace, rtcfg_nodeid); + start_monitored_event(&pm); if (query_execute(node, provider->conn->dbconn, &query) < 0) { dstring_free(&query); @@ -3618,6 +3673,8 @@ sync_event(SlonNode *node, SlonConn *local_conn, provider->conn = NULL; return provider->pa_connretry; } + monitor_provider_query(&pm); + slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: " "connected to data provider %d on '%s'\n", node->no_id, provider->no_id, @@ -3717,7 +3774,11 @@ sync_event(SlonNode *node, SlonConn *local_conn, pset->set_id); slon_appendquery(&query, "); "); + start_monitored_event(&pm); res1 = PQexec(local_dbconn, dstring_data(&query)); + slon_log(SLON_INFO, "about to monitor_subscriber_query - pulling big actionid list %d\n", provider); + monitor_subscriber_query(&pm); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", @@ -3772,7 +3833,9 @@ sync_event(SlonNode *node, SlonConn *local_conn, rtcfg_namespace, rtcfg_namespace, sub_set); + start_monitored_event(&pm); res2 = PQexec(local_dbconn, dstring_data(&query)); + monitor_subscriber_query(&pm); if (PQresultStatus(res2) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", @@ -3945,6 +4008,7 @@ sync_event(SlonNode *node, SlonConn *local_conn, (void) slon_mkquery(&query, "select last_value from %s.sl_log_status", rtcfg_namespace); res1 = PQexec(local_dbconn, dstring_data(&query)); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", @@ -4030,7 +4094,10 @@ sync_event(SlonNode *node, SlonConn *local_conn, if (wgline->log.n_used > 0) { + start_monitored_event(&pm); res1 = PQexec(local_dbconn, dstring_data(&(wgline->log))); + + monitor_subscriber_iud(&pm); if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) { PQclear(res1); @@ -4048,7 +4115,9 @@ sync_event(SlonNode *node, SlonConn *local_conn, PQclear(res1); } + start_monitored_event(&pm); res1 = PQexec(local_dbconn, dstring_data(&(wgline->data))); + monitor_subscriber_iud(&pm); if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) { PQclear(res1); @@ -4227,7 +4296,9 @@ sync_event(SlonNode *node, SlonConn *local_conn, slon_appendquery(&query, ") " " group by 1; "); + start_monitored_event(&pm); res1 = PQexec(provider->conn->dbconn, dstring_data(&query)); + monitor_subscriber_iud(&pm); if (PQresultStatus(res1) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n", @@ -4251,6 +4322,7 @@ sync_event(SlonNode *node, SlonConn *local_conn, "select %s.sequenceSetValue(%s,%d,'%s','%s'); ", rtcfg_namespace, seql_seqid, node->no_id, seqbuf, seql_last_value); + start_monitored_event(&pm); if (query_execute(node, local_dbconn, &query) < 0) { PQclear(res1); @@ -4259,6 +4331,7 @@ sync_event(SlonNode *node, SlonConn *local_conn, archive_terminate(node); return 60; } + monitor_subscriber_iud(&pm); /* * Add the sequence number adjust call to the archive log. @@ -4343,6 +4416,18 @@ sync_event(SlonNode *node, SlonConn *local_conn, INT64_FORMAT " done in %.3f seconds\n", node->no_id, event->ev_seqno, TIMEVAL_DIFF(&tv_start, &tv_now)); + + slon_log(SLON_INFO, + "remoteWorkerThread_%d: SYNC " INT64_FORMAT " Timing: " + " pqexec (s/count)" + "- provider %.3f/%d " + "- subscriber %.3f/%d " + "- IUD %.3f/%d\n", + node->no_id, event->ev_seqno, + pm.prov_query_t, pm.prov_query_c, + pm.subscr_query_t, pm.prov_query_c, + pm.subscr_iud__t, pm.subscr_iud__c); + return 0; } @@ -4382,13 +4467,9 @@ sync_helper(void *cdata) int line_no; int line_ncmds; - int num_inserts, - num_deletes, - num_updates; + PerfMon pm; - num_inserts = 0; - num_deletes = 0; - num_updates = 0; + init_perfmon(&pm); dstring_init(&query); dstring_init(&query2); @@ -4617,7 +4698,7 @@ sync_helper(void *cdata) { /* * First make sure that the overall memory usage is inside - * bouds. + * bounds. */ if (wd->workdata_largemem > sync_max_largemem) { @@ -4758,6 +4839,7 @@ sync_helper(void *cdata) if (log_cmdsize >= sync_max_rowsize) { + start_monitored_event(&pm); (void) slon_mkquery(&query2, "select log_cmddata " "from %s.sl_log_1 " @@ -4775,6 +4857,7 @@ sync_helper(void *cdata) rtcfg_namespace, log_origin, log_txid, log_actionseq); res2 = PQexec(dbconn, dstring_data(&query2)); + monitor_largetuples(&pm); if (PQresultStatus(res2) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s", @@ -4838,7 +4921,7 @@ sync_helper(void *cdata) "insert into %s %s;\n", wd->tab_fqname[log_tableid], log_cmddata); - num_inserts++; + pm.num_inserts++; break; case 'U': @@ -4846,7 +4929,7 @@ sync_helper(void *cdata) "update only %s set %s;\n", wd->tab_fqname[log_tableid], log_cmddata); - num_updates++; + pm.num_updates++; break; case 'D': @@ -4854,7 +4937,7 @@ sync_helper(void *cdata) "delete from only %s where %s;\n", wd->tab_fqname[log_tableid], log_cmddata); - num_deletes++; + pm.num_deletes++; break; } line_ncmds++; @@ -4958,11 +5041,13 @@ sync_helper(void *cdata) TIMEVAL_DIFF(&tv_start, &tv_now)); slon_log(SLON_INFO, "remoteHelperThread_%d_%d: inserts=%d updates=%d deletes=%d\n", - node->no_id, provider->no_id, num_inserts, num_updates, num_deletes); + node->no_id, provider->no_id, pm.num_inserts, pm.num_updates, pm.num_deletes); - num_inserts = 0; - num_deletes = 0; - num_updates = 0; + slon_log(SLON_INFO, + "remoteWorkerThread_%d: sync_helper timing: " + " large tuples %.3f/%d\n", + node->no_id, + pm.large_tuples_t, pm.large_tuples_c); /* * Change our helper status to DONE and tell the worker thread about @@ -5649,8 +5734,6 @@ compress_actionseq(const char *ssy_actionlist, SlonDString *action_subquery) static int check_set_subscriber(int set_id, int node_id, PGconn *local_dbconn) { - - SlonDString query1; PGresult *res; @@ -5674,6 +5757,50 @@ check_set_subscriber(int set_id, int node_id, PGconn *local_dbconn) } PQclear(res); return 1; +} +static void init_perfmon(PerfMon *perf_info) { + perf_info->prov_query_t = 0.0; + perf_info->prov_query_c = 0; + perf_info->subscr_query_t = 0.0; + perf_info->subscr_query_c = 0; + perf_info->subscr_iud__t = 0.0; + perf_info->subscr_iud__c = 0; + perf_info->large_tuples_t = 0; + perf_info->large_tuples_c = 0; + perf_info->num_inserts = 0; + perf_info->num_updates = 0; + perf_info->num_deletes = 0; +} +static void start_monitored_event(PerfMon *perf_info) { + gettimeofday(&(perf_info->prev_t), NULL); +} +static void monitor_subscriber_query(PerfMon *perf_info) { + double diff; + gettimeofday(&(perf_info->now_t), NULL); + diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); + (perf_info->subscr_query_t) += diff; + (perf_info->subscr_query_c) ++; +} +static void monitor_provider_query(PerfMon *perf_info) { + double diff; + gettimeofday(&(perf_info->now_t), NULL); + diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); + (perf_info->prov_query_t) += diff; + (perf_info->prov_query_c) ++; +} +static void monitor_subscriber_iud(PerfMon *perf_info) { + double diff; + gettimeofday(&(perf_info->now_t), NULL); + diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); + (perf_info->subscr_iud__t) += diff; + (perf_info->subscr_iud__c) ++; +} +static void monitor_largetuples(PerfMon *perf_info) { + double diff; + gettimeofday(&(perf_info->now_t), NULL); + diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); + (perf_info->large_tuples_t) += diff; + (perf_info->large_tuples_c) ++; }