Adding log apply statistics.
authorJan Wieck <JanWieck@Yahoo.com>
Sat, 21 Jan 2012 16:59:01 +0000 (11:59 -0500)
committerJan Wieck <JanWieck@Yahoo.com>
Sat, 21 Jan 2012 16:59:01 +0000 (11:59 -0500)
src/backend/slony1_base.sql
src/backend/slony1_funcs.c
src/backend/slony1_funcs.sql
src/slon/remote_worker.c

index 16ebb6ab2340b665bf075abaafa9c87d7d72529e..ba13d1bbcef507db973510c43f7e1c5ec12e637d 100644 (file)
@@ -459,6 +459,44 @@ comment on column @NAMESPACE@.sl_registry.reg_text is 'Option value if type text
 comment on column @NAMESPACE@.sl_registry.reg_timestamp is 'Option value if type timestamp';
 
 
+-- ----------------------------------------------------------------------
+-- TABLE sl_apply_stats
+-- ----------------------------------------------------------------------
+create table @NAMESPACE@.sl_apply_stats (
+       as_origin                       int4,
+       as_num_insert           int8,
+       as_num_update           int8,
+       as_num_delete           int8,
+       as_num_truncate         int8,
+       as_num_script           int8,
+       as_num_total            int8,
+       as_duration                     interval,
+       as_apply_first          timestamptz,
+       as_apply_last           timestamptz,
+       as_cache_prepare        int8,
+       as_cache_hit            int8,
+       as_cache_evict          int8,
+       as_cache_prepare_max int8
+) WITHOUT OIDS;
+
+create index sl_apply_stats_idx1 on @NAMESPACE@.sl_apply_stats
+       (as_origin);
+
+comment on table @NAMESPACE@.sl_apply_stats is 'Local SYNC apply statistics (running totals)';
+comment on column @NAMESPACE@.sl_apply_stats.as_origin is 'Origin of the SYNCs';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_insert is 'Number of INSERT operations performed';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_update is 'Number of UPDATE operations performed';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_delete is 'Number of DELETE operations performed';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_truncate is 'Number of TRUNCATE operations performed';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_script is 'Number of DDL operations performed';
+comment on column @NAMESPACE@.sl_apply_stats.as_num_total is 'Total number of operations';
+comment on column @NAMESPACE@.sl_apply_stats.as_duration is 'Processing time';
+comment on column @NAMESPACE@.sl_apply_stats.as_apply_first is 'Timestamp of first recorded SYNC';
+comment on column @NAMESPACE@.sl_apply_stats.as_apply_last is 'Timestamp of most recent recorded SYNC';
+comment on column @NAMESPACE@.sl_apply_stats.as_cache_evict is 'Number of apply query cache evict operations';
+comment on column @NAMESPACE@.sl_apply_stats.as_cache_prepare_max is 'Maximum number of apply queries prepared in one SYNC group';
+
+
 -- **********************************************************************
 -- * Views
 -- **********************************************************************
index 103e44bf31cd5aa49de78aa0dfe58c8ea56f1e51..eb492fb022a9993f018ccedd5593e90902d486cf 100644 (file)
@@ -69,6 +69,7 @@ PG_FUNCTION_INFO_V1(_Slony_I_logTrigger);
 PG_FUNCTION_INFO_V1(_Slony_I_denyAccess);
 PG_FUNCTION_INFO_V1(_Slony_I_logApply);
 PG_FUNCTION_INFO_V1(_Slony_I_logApplySetCacheSize);
+PG_FUNCTION_INFO_V1(_Slony_I_logApplySaveStats);
 PG_FUNCTION_INFO_V1(_Slony_I_lockedSet);
 PG_FUNCTION_INFO_V1(_Slony_I_killBackend);
 PG_FUNCTION_INFO_V1(_Slony_I_seqtrack);
@@ -85,6 +86,7 @@ Datum         _Slony_I_logTrigger(PG_FUNCTION_ARGS);
 Datum          _Slony_I_denyAccess(PG_FUNCTION_ARGS);
 Datum          _Slony_I_logApply(PG_FUNCTION_ARGS);
 Datum          _Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS);
+Datum          _Slony_I_logApplySaveStats(PG_FUNCTION_ARGS);
 Datum          _Slony_I_lockedSet(PG_FUNCTION_ARGS);
 Datum          _Slony_I_killBackend(PG_FUNCTION_ARGS);
 Datum          _Slony_I_seqtrack(PG_FUNCTION_ARGS);
@@ -126,6 +128,8 @@ typedef struct slony_I_cluster_status
        void       *plan_record_sequences;
        void       *plan_get_logstatus;
        void       *plan_table_info;
+       void       *plan_apply_stats_update;
+       void       *plan_apply_stats_insert;
 
        text       *cmdtype_I;
        text       *cmdtype_U;
@@ -169,13 +173,22 @@ static int                                        applyCacheUsed = 0;
 static uint32 applyCache_hash(const void *kp, Size ksize);
 static int applyCache_cmp(const void *kp1, const void *kp2, Size ksize);
 
-static char                               *applyQuery = NULL;
-static char                               *applyQueryPos = NULL;
-static int                                     applyQuerySize = 8192;
+static char               *applyQuery = NULL;
+static char               *applyQueryPos = NULL;
+static int                     applyQuerySize = 8192;
 
 static void applyQueryReset(void);
 static void applyQueryIncrease(void);
 
+static int64           apply_num_insert;
+static int64           apply_num_update;
+static int64           apply_num_delete;
+static int64           apply_num_truncate;
+static int64           apply_num_script;
+static int64           apply_num_prepare;
+static int64           apply_num_hit;
+static int64           apply_num_evict;
+
 
 /*@null@*/
 static Slony_I_ClusterStatus *clusterStatusList = NULL;
@@ -876,6 +889,9 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        {
                HASHCTL         hctl;
 
+               /*
+                * Free all prepared apply queries.
+                */
                for (cacheEnt = applyCacheHead; cacheEnt; cacheEnt = cacheEnt->next)
                {
                        if (cacheEnt->plan != NULL)
@@ -886,6 +902,9 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                applyCacheTail = NULL;
                applyCacheUsed = 0;
 
+               /*
+                * Destroy and recreate the hashtable for the apply cache
+                */
                if (applyCacheHash != NULL)
                        hash_destroy(applyCacheHash);
                memset(&hctl, 0, sizeof(hctl));
@@ -897,6 +916,9 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                                        50, &hctl, 
                                        HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
 
+               /*
+                * Reset or create the apply cache key memory context.
+                */
                if (applyCacheContext == NULL)
                {
                        applyCacheContext = AllocSetContextCreate(
@@ -911,6 +933,18 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                        MemoryContextReset(applyCacheContext);
                }
 
+               /*
+                * Reset statistic counters.
+                */
+               apply_num_insert = 0;
+               apply_num_update = 0;
+               apply_num_delete = 0;
+               apply_num_truncate = 0;
+               apply_num_script = 0;
+               apply_num_prepare = 0;
+               apply_num_hit = 0;
+               apply_num_evict = 0;
+
                cs->currentXid = newXid;
        }
 
@@ -936,14 +970,16 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                bool            localNodeFound = true;
                Datum           script_insert_args[4];
 
+               apply_num_script++;
+
+               /*
+                * Turn the log_cmdargs into a plain array of Text Datums.
+                */
                dat = SPI_getbinval(new_row, tupdesc, 
                                SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
                if (isnull)
                        elog(ERROR, "Slony-I: log_cmdargs is NULL");
 
-               /*
-                * Turn the log_cmdargs into a plain array of Text Datums.
-                */
                deconstruct_array(DatumGetArrayTypeP(dat), 
                                TEXTOID, -1, false, 'i', 
                                &cmdargs, &cmdargsnulls, &cmdargsn);
@@ -1082,6 +1118,8 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        cacheEnt = hash_search(applyCacheHash, &cacheKey, HASH_ENTER, &found);
        if (found)
        {
+               apply_num_hit++;
+
                // elog(NOTICE, "cache entry for %s found", cacheKey);
                /*
                 * Free the cacheKey copy.
@@ -1140,6 +1178,8 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        {
                Datum   query_args[2];
 
+               apply_num_prepare++;
+
                // elog(NOTICE, "cache entry for %s NOT found", cacheKey);
 
 #ifdef APPLY_CACHE_VERIFY
@@ -1460,6 +1500,8 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                {
                        ApplyCacheEntry *evict = applyCacheHead;
 
+                       apply_num_evict++;
+
                        SPI_freeplan(evict->plan);
                        evict->plan = NULL;
 #ifdef APPLY_CACHE_VERIFY
@@ -1559,6 +1601,22 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        if ((spi_rc = SPI_execp(cacheEnt->plan, queryvals, querynulls, 0)) < 0)
                elog(ERROR, "Slony-I: SPI_execp() failed - rc=%d", spi_rc);
 
+       /*
+        * Count operations
+        */
+       switch(cmdtype)
+       {
+               case 'I':               apply_num_insert++;
+                                               break;
+               case 'U':               apply_num_update++;
+                                               break;
+               case 'D':               apply_num_delete++;
+                                               break;
+               case 'T':               apply_num_truncate++;
+                                               break;
+               default:                break;
+       }
+
        /*
         * Disconnect from SPI manager and return either the new tuple
         * or NULL according to the forwarding of log data.
@@ -1593,6 +1651,88 @@ _Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS)
 }
 
 
+Datum
+_Slony_I_logApplySaveStats(PG_FUNCTION_ARGS)
+{
+       Slony_I_ClusterStatus *cs;
+       Datum           params[10];
+       char       *nulls = "           ";
+       int32           rc = 0;
+       int                     spi_rc;
+
+       if (!superuser())
+               elog(ERROR, "Slony-I: insufficient privilege logApplySetCacheSize");
+
+       /*
+        * Connect to the SPI manager
+        */
+       if (SPI_connect() < 0)
+               elog(ERROR, "Slony-I: SPI_connect() failed in logApply()");
+
+       /*
+        * Get or create the cluster status information and make sure it has the
+        * SPI plans that we need here.
+        */
+       cs = getClusterStatus(PG_GETARG_NAME(0), PLAN_APPLY_QUERIES);
+
+       /* 
+        * Setup the parameter array. Note that both queries use the
+        * same parameters in exactly the same order.
+        */
+       params[0] = Int32GetDatum(PG_GETARG_INT32(1));
+
+       params[1] = Int64GetDatum(apply_num_insert);
+       params[2] = Int64GetDatum(apply_num_update);
+       params[3] = Int64GetDatum(apply_num_delete);
+       params[4] = Int64GetDatum(apply_num_truncate);
+       params[5] = Int64GetDatum(apply_num_script);
+       params[6] = Int64GetDatum(apply_num_insert + apply_num_update +
+                                       apply_num_delete + apply_num_truncate * apply_num_script);
+       params[7] = PointerGetDatum(PG_GETARG_INTERVAL_P(2));
+       params[8] = Int64GetDatum(apply_num_prepare);
+       params[9] = Int64GetDatum(apply_num_hit);
+       params[10] = Int64GetDatum(apply_num_evict);
+
+       /*
+        * Perform the UPDATE of sl_apply_stats. If that doesn't update
+        * any row(s), try to INSERT one.
+        */
+       if ((spi_rc = SPI_execp(cs->plan_apply_stats_update, params, nulls, 0)) < 0)
+               elog(ERROR, "Slony-I: SPI_execp() to update apply stats failed"
+                               " - rc=%d", spi_rc);
+       if (SPI_processed > 0)
+       {
+               rc = 2;
+       }
+       else
+       {
+               if ((spi_rc = SPI_execp(cs->plan_apply_stats_insert, params, nulls, 0)) < 0)
+                       elog(ERROR, "Slony-I: SPI_execp() to insert apply stats failed"
+                                       " - rc=%d", spi_rc);
+               if (SPI_processed > 0)
+                       rc = 1;
+       }
+
+       /*
+        * Reset statistic counters.
+        */
+       apply_num_insert = 0;
+       apply_num_update = 0;
+       apply_num_delete = 0;
+       apply_num_truncate = 0;
+       apply_num_script = 0;
+       apply_num_prepare = 0;
+       apply_num_hit = 0;
+       apply_num_evict = 0;
+
+       /*
+        * That's it.
+        */
+       SPI_finish();
+       PG_RETURN_INT32(rc);
+}
+
+
 static uint32
 applyCache_hash(const void *kp, Size ksize)
 {
@@ -1903,7 +2043,7 @@ getClusterStatus(Name cluster_name, int need_plan_mask)
        int                     rc;
        char            query[1024];
        bool            isnull;
-       Oid                     plan_types[9];
+       Oid                     plan_types[16];
        TypeName   *txid_snapshot_typname;
 
        /*
@@ -2121,6 +2261,79 @@ getClusterStatus(Name cluster_name, int need_plan_mask)
                if (cs->plan_table_info == NULL)
                        elog(ERROR, "Slony-I: SPI_prepare() failed");
 
+               /*
+                * The plan to update the apply stats
+                */
+               sprintf(query,
+                               "update %s.sl_apply_stats set "
+                               " as_num_insert = as_num_insert + $2, "
+                               " as_num_update = as_num_update + $3, "
+                               " as_num_delete = as_num_delete + $4, "
+                               " as_num_truncate = as_num_truncate + $5, "
+                               " as_num_script = as_num_script + $6, "
+                               " as_num_total = as_num_total + $7, "
+                               " as_duration = as_duration + $8, "
+                               " as_apply_last = \"pg_catalog\".timeofday()::timestamptz, "
+                               " as_cache_prepare = as_cache_prepare + $9, "
+                               " as_cache_hit = as_cache_hit + $10, "
+                               " as_cache_evict = as_cache_evict + $11, "
+                               " as_cache_prepare_max = case "
+                               "     when $9 > as_cache_prepare_max then $9 "
+                               "     else as_cache_prepare_max end "
+                               " where as_origin = $1;",
+                               slon_quote_identifier(NameStr(*cluster_name)));
+
+               plan_types[0] = INT4OID;
+               plan_types[1] = INT8OID;
+               plan_types[2] = INT8OID;
+               plan_types[3] = INT8OID;
+               plan_types[4] = INT8OID;
+               plan_types[5] = INT8OID;
+               plan_types[6] = INT8OID;
+               plan_types[7] = INTERVALOID;
+               plan_types[8] = INT8OID;
+               plan_types[9] = INT8OID;
+               plan_types[10] = INT8OID;
+
+               cs->plan_apply_stats_update = SPI_saveplan(
+                               SPI_prepare(query, 11, plan_types));
+               if (cs->plan_apply_stats_update == NULL)
+                       elog(ERROR, "Slony-I: SPI_prepare() failed");
+
+               /*
+                * The plan to insert the apply stats, if update misses
+                */
+               sprintf(query,
+                               "insert into %s.sl_apply_stats ("
+                               " as_origin, as_num_insert, as_num_update, as_num_delete, "
+                               " as_num_truncate, as_num_script, as_num_total, "
+                               " as_duration, as_apply_first, as_apply_last, "
+                               " as_cache_prepare, as_cache_hit, as_cache_evict, "
+                               " as_cache_prepare_max) "
+                               "values "
+                               "($1, $2, $3, $4, $5, $6, $7, $8, "
+                               "\"pg_catalog\".timeofday()::timestamptz, "
+                               "\"pg_catalog\".timeofday()::timestamptz, "
+                               "$9, $10, $11, $9);",
+                               slon_quote_identifier(NameStr(*cluster_name)));
+
+               plan_types[0] = INT4OID;
+               plan_types[1] = INT8OID;
+               plan_types[2] = INT8OID;
+               plan_types[3] = INT8OID;
+               plan_types[4] = INT8OID;
+               plan_types[5] = INT8OID;
+               plan_types[6] = INT8OID;
+               plan_types[7] = INTERVALOID;
+               plan_types[8] = INT8OID;
+               plan_types[9] = INT8OID;
+               plan_types[10] = INT8OID;
+
+               cs->plan_apply_stats_insert = SPI_saveplan(
+                               SPI_prepare(query, 11, plan_types));
+               if (cs->plan_apply_stats_insert == NULL)
+                       elog(ERROR, "Slony-I: SPI_prepare() failed");
+
                cs->have_plan |= PLAN_APPLY_QUERIES;
        }
 
index cb390003a1c647cbed7d52ba335ee1c6befcab81..a386ee066b3b18c9bb7b274d4c7f593ec3498078 100644 (file)
@@ -5070,11 +5070,35 @@ create table @NAMESPACE@.sl_components (
 ';
           execute v_query;
        end if;
+
        if not exists (select 1 from information_schema.tables t where table_schema = '_@CLUSTERNAME@' and table_name = 'sl_event_lock') then
           v_query := 'create table @NAMESPACE@.sl_event_lock (dummy integer);';
           execute v_query;
         end if;
        
+       if not exists (select 1 from information_schema.tables t 
+                       where table_schema = '_@CLUSTERNAME@' 
+                       and table_name = 'sl_apply_stats') then
+               v_query := '
+                       create table @NAMESPACE@.sl_apply_stats (
+                               as_origin                       int4,
+                               as_num_insert           int8,
+                               as_num_update           int8,
+                               as_num_delete           int8,
+                               as_num_truncate         int8,
+                               as_num_script           int8,
+                               as_num_total            int8,
+                               as_duration                     interval,
+                               as_apply_first          timestamptz,
+                               as_apply_last           timestamptz,
+                               as_cache_prepare        int8,
+                               as_cache_hit            int8,
+                               as_cache_evict          int8,
+                               as_cache_prepare_max int8
+                       ) WITHOUT OIDS;';
+               execute v_query;
+       end if;
+       
        --
        -- On the upgrade to 2.2, we change the layout of sl_log_N by
        -- adding columns log_tablenspname, log_tablerelname, and
@@ -5915,8 +5939,19 @@ create or replace function @NAMESPACE@.logApply () returns trigger
 --     A control function for the prepared query plan cache size used
 --     in the logApply() trigger.
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApplySetCacheSize (int4) 
+create or replace function @NAMESPACE@.logApplySetCacheSize (p_size int4) 
 returns int4
     as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySetCacheSize'
        language C;
 
+-- ----------------------------------------------------------------------
+-- FUNCTION logApplySaveStats ()
+--
+--     A function used by the remote worker to update sl_apply_stats after
+--     performing a SYNC.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 
+returns int4
+    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySaveStats'
+       language C;
+
index 01ea0fdf0d428949f599862925fbdcad7e82463d..9a29adc8b05430d37bf675d84e237a273f9e2fa4 100644 (file)
@@ -189,6 +189,7 @@ struct WorkerGroupData_s
        ProviderInfo *provider_head;
        ProviderInfo *provider_tail;
 
+       char            duration_buf[64];
 };
 
 
@@ -317,6 +318,7 @@ remoteWorkerThread_main(void *cdata)
        else
        {
                memset(wd, 0, sizeof(WorkerGroupData));
+               strcpy(wd->duration_buf, "0 s");
        }
 
 
@@ -641,7 +643,7 @@ remoteWorkerThread_main(void *cdata)
 
                        /*
                         * replace query1 with the forwarding of all the grouped sync
-                        * events and a commit.
+                        * events, the call to logApplySaveStats()  and a commit.
                         */
                        dstring_reset(&query1);
                        sg_last_grouping = 0;
@@ -652,6 +654,13 @@ remoteWorkerThread_main(void *cdata)
                                                        free(sync_group[i]);
                                        sg_last_grouping++;
                        }
+
+                       slon_appendquery(&query1, "select %s.logApplySaveStats("
+                                       "'_%s', %d, '%s'::interval); ",
+                                       rtcfg_namespace, rtcfg_cluster_name,
+                                       node->no_id, wd->duration_buf);
+                       strcpy(wd->duration_buf, "0 s");
+
                        slon_appendquery(&query1, "commit transaction;");
 
                        if (query_execute(node, local_dbconn, &query1) < 0)
@@ -4365,6 +4374,7 @@ slon_log(SLON_DEBUG2,
                         INT64_FORMAT " done in %.3f seconds\n",
                         node->no_id, event->ev_seqno,
                         TIMEVAL_DIFF(&tv_start, &tv_now));
+       sprintf(wd->duration_buf, "%.3f s", TIMEVAL_DIFF(&tv_start, &tv_now));
 
        slon_log(SLON_DEBUG1, 
                         "remoteWorkerThread_%d: SYNC " INT64_FORMAT " sync_event timing: "