PG_FUNCTION_INFO_V1(_Slony_I_denyAccess);
PG_FUNCTION_INFO_V1(_Slony_I_logApply);
PG_FUNCTION_INFO_V1(_Slony_I_logApplySetCacheSize);
+PG_FUNCTION_INFO_V1(_Slony_I_logApplySaveStats);
PG_FUNCTION_INFO_V1(_Slony_I_lockedSet);
PG_FUNCTION_INFO_V1(_Slony_I_killBackend);
PG_FUNCTION_INFO_V1(_Slony_I_seqtrack);
Datum _Slony_I_denyAccess(PG_FUNCTION_ARGS);
Datum _Slony_I_logApply(PG_FUNCTION_ARGS);
Datum _Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS);
+Datum _Slony_I_logApplySaveStats(PG_FUNCTION_ARGS);
Datum _Slony_I_lockedSet(PG_FUNCTION_ARGS);
Datum _Slony_I_killBackend(PG_FUNCTION_ARGS);
Datum _Slony_I_seqtrack(PG_FUNCTION_ARGS);
void *plan_record_sequences;
void *plan_get_logstatus;
void *plan_table_info;
+ void *plan_apply_stats_update;
+ void *plan_apply_stats_insert;
text *cmdtype_I;
text *cmdtype_U;
static uint32 applyCache_hash(const void *kp, Size ksize);
static int applyCache_cmp(const void *kp1, const void *kp2, Size ksize);
-static char *applyQuery = NULL;
-static char *applyQueryPos = NULL;
-static int applyQuerySize = 8192;
+static char *applyQuery = NULL;
+static char *applyQueryPos = NULL;
+static int applyQuerySize = 8192;
static void applyQueryReset(void);
static void applyQueryIncrease(void);
+static int64 apply_num_insert;
+static int64 apply_num_update;
+static int64 apply_num_delete;
+static int64 apply_num_truncate;
+static int64 apply_num_script;
+static int64 apply_num_prepare;
+static int64 apply_num_hit;
+static int64 apply_num_evict;
+
/*@null@*/
static Slony_I_ClusterStatus *clusterStatusList = NULL;
{
HASHCTL hctl;
+ /*
+ * Free all prepared apply queries.
+ */
for (cacheEnt = applyCacheHead; cacheEnt; cacheEnt = cacheEnt->next)
{
if (cacheEnt->plan != NULL)
applyCacheTail = NULL;
applyCacheUsed = 0;
+ /*
+ * Destroy and recreate the hashtable for the apply cache
+ */
if (applyCacheHash != NULL)
hash_destroy(applyCacheHash);
memset(&hctl, 0, sizeof(hctl));
50, &hctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
+ /*
+ * Reset or create the apply cache key memory context.
+ */
if (applyCacheContext == NULL)
{
applyCacheContext = AllocSetContextCreate(
MemoryContextReset(applyCacheContext);
}
+ /*
+ * Reset statistic counters.
+ */
+ apply_num_insert = 0;
+ apply_num_update = 0;
+ apply_num_delete = 0;
+ apply_num_truncate = 0;
+ apply_num_script = 0;
+ apply_num_prepare = 0;
+ apply_num_hit = 0;
+ apply_num_evict = 0;
+
cs->currentXid = newXid;
}
bool localNodeFound = true;
Datum script_insert_args[4];
+ apply_num_script++;
+
+ /*
+ * Turn the log_cmdargs into a plain array of Text Datums.
+ */
dat = SPI_getbinval(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
if (isnull)
elog(ERROR, "Slony-I: log_cmdargs is NULL");
- /*
- * Turn the log_cmdargs into a plain array of Text Datums.
- */
deconstruct_array(DatumGetArrayTypeP(dat),
TEXTOID, -1, false, 'i',
&cmdargs, &cmdargsnulls, &cmdargsn);
cacheEnt = hash_search(applyCacheHash, &cacheKey, HASH_ENTER, &found);
if (found)
{
+ apply_num_hit++;
+
// elog(NOTICE, "cache entry for %s found", cacheKey);
/*
* Free the cacheKey copy.
{
Datum query_args[2];
+ apply_num_prepare++;
+
// elog(NOTICE, "cache entry for %s NOT found", cacheKey);
#ifdef APPLY_CACHE_VERIFY
{
ApplyCacheEntry *evict = applyCacheHead;
+ apply_num_evict++;
+
SPI_freeplan(evict->plan);
evict->plan = NULL;
#ifdef APPLY_CACHE_VERIFY
if ((spi_rc = SPI_execp(cacheEnt->plan, queryvals, querynulls, 0)) < 0)
elog(ERROR, "Slony-I: SPI_execp() failed - rc=%d", spi_rc);
+ /*
+ * Count operations
+ */
+ switch(cmdtype)
+ {
+ case 'I': apply_num_insert++;
+ break;
+ case 'U': apply_num_update++;
+ break;
+ case 'D': apply_num_delete++;
+ break;
+ case 'T': apply_num_truncate++;
+ break;
+ default: break;
+ }
+
/*
* Disconnect from SPI manager and return either the new tuple
* or NULL according to the forwarding of log data.
}
+Datum
+_Slony_I_logApplySaveStats(PG_FUNCTION_ARGS)
+{
+ Slony_I_ClusterStatus *cs;
+ Datum params[10];
+ char *nulls = " ";
+ int32 rc = 0;
+ int spi_rc;
+
+ if (!superuser())
+ elog(ERROR, "Slony-I: insufficient privilege logApplySetCacheSize");
+
+ /*
+ * Connect to the SPI manager
+ */
+ if (SPI_connect() < 0)
+ elog(ERROR, "Slony-I: SPI_connect() failed in logApply()");
+
+ /*
+ * Get or create the cluster status information and make sure it has the
+ * SPI plans that we need here.
+ */
+ cs = getClusterStatus(PG_GETARG_NAME(0), PLAN_APPLY_QUERIES);
+
+ /*
+ * Setup the parameter array. Note that both queries use the
+ * same parameters in exactly the same order.
+ */
+ params[0] = Int32GetDatum(PG_GETARG_INT32(1));
+
+ params[1] = Int64GetDatum(apply_num_insert);
+ params[2] = Int64GetDatum(apply_num_update);
+ params[3] = Int64GetDatum(apply_num_delete);
+ params[4] = Int64GetDatum(apply_num_truncate);
+ params[5] = Int64GetDatum(apply_num_script);
+ params[6] = Int64GetDatum(apply_num_insert + apply_num_update +
+ apply_num_delete + apply_num_truncate * apply_num_script);
+ params[7] = PointerGetDatum(PG_GETARG_INTERVAL_P(2));
+ params[8] = Int64GetDatum(apply_num_prepare);
+ params[9] = Int64GetDatum(apply_num_hit);
+ params[10] = Int64GetDatum(apply_num_evict);
+
+ /*
+ * Perform the UPDATE of sl_apply_stats. If that doesn't update
+ * any row(s), try to INSERT one.
+ */
+ if ((spi_rc = SPI_execp(cs->plan_apply_stats_update, params, nulls, 0)) < 0)
+ elog(ERROR, "Slony-I: SPI_execp() to update apply stats failed"
+ " - rc=%d", spi_rc);
+ if (SPI_processed > 0)
+ {
+ rc = 2;
+ }
+ else
+ {
+ if ((spi_rc = SPI_execp(cs->plan_apply_stats_insert, params, nulls, 0)) < 0)
+ elog(ERROR, "Slony-I: SPI_execp() to insert apply stats failed"
+ " - rc=%d", spi_rc);
+ if (SPI_processed > 0)
+ rc = 1;
+ }
+
+ /*
+ * Reset statistic counters.
+ */
+ apply_num_insert = 0;
+ apply_num_update = 0;
+ apply_num_delete = 0;
+ apply_num_truncate = 0;
+ apply_num_script = 0;
+ apply_num_prepare = 0;
+ apply_num_hit = 0;
+ apply_num_evict = 0;
+
+ /*
+ * That's it.
+ */
+ SPI_finish();
+ PG_RETURN_INT32(rc);
+}
+
+
static uint32
applyCache_hash(const void *kp, Size ksize)
{
int rc;
char query[1024];
bool isnull;
- Oid plan_types[9];
+ Oid plan_types[16];
TypeName *txid_snapshot_typname;
/*
if (cs->plan_table_info == NULL)
elog(ERROR, "Slony-I: SPI_prepare() failed");
+ /*
+ * The plan to update the apply stats
+ */
+ sprintf(query,
+ "update %s.sl_apply_stats set "
+ " as_num_insert = as_num_insert + $2, "
+ " as_num_update = as_num_update + $3, "
+ " as_num_delete = as_num_delete + $4, "
+ " as_num_truncate = as_num_truncate + $5, "
+ " as_num_script = as_num_script + $6, "
+ " as_num_total = as_num_total + $7, "
+ " as_duration = as_duration + $8, "
+ " as_apply_last = \"pg_catalog\".timeofday()::timestamptz, "
+ " as_cache_prepare = as_cache_prepare + $9, "
+ " as_cache_hit = as_cache_hit + $10, "
+ " as_cache_evict = as_cache_evict + $11, "
+ " as_cache_prepare_max = case "
+ " when $9 > as_cache_prepare_max then $9 "
+ " else as_cache_prepare_max end "
+ " where as_origin = $1;",
+ slon_quote_identifier(NameStr(*cluster_name)));
+
+ plan_types[0] = INT4OID;
+ plan_types[1] = INT8OID;
+ plan_types[2] = INT8OID;
+ plan_types[3] = INT8OID;
+ plan_types[4] = INT8OID;
+ plan_types[5] = INT8OID;
+ plan_types[6] = INT8OID;
+ plan_types[7] = INTERVALOID;
+ plan_types[8] = INT8OID;
+ plan_types[9] = INT8OID;
+ plan_types[10] = INT8OID;
+
+ cs->plan_apply_stats_update = SPI_saveplan(
+ SPI_prepare(query, 11, plan_types));
+ if (cs->plan_apply_stats_update == NULL)
+ elog(ERROR, "Slony-I: SPI_prepare() failed");
+
+ /*
+ * The plan to insert the apply stats, if update misses
+ */
+ sprintf(query,
+ "insert into %s.sl_apply_stats ("
+ " as_origin, as_num_insert, as_num_update, as_num_delete, "
+ " as_num_truncate, as_num_script, as_num_total, "
+ " as_duration, as_apply_first, as_apply_last, "
+ " as_cache_prepare, as_cache_hit, as_cache_evict, "
+ " as_cache_prepare_max) "
+ "values "
+ "($1, $2, $3, $4, $5, $6, $7, $8, "
+ "\"pg_catalog\".timeofday()::timestamptz, "
+ "\"pg_catalog\".timeofday()::timestamptz, "
+ "$9, $10, $11, $9);",
+ slon_quote_identifier(NameStr(*cluster_name)));
+
+ plan_types[0] = INT4OID;
+ plan_types[1] = INT8OID;
+ plan_types[2] = INT8OID;
+ plan_types[3] = INT8OID;
+ plan_types[4] = INT8OID;
+ plan_types[5] = INT8OID;
+ plan_types[6] = INT8OID;
+ plan_types[7] = INTERVALOID;
+ plan_types[8] = INT8OID;
+ plan_types[9] = INT8OID;
+ plan_types[10] = INT8OID;
+
+ cs->plan_apply_stats_insert = SPI_saveplan(
+ SPI_prepare(query, 11, plan_types));
+ if (cs->plan_apply_stats_insert == NULL)
+ elog(ERROR, "Slony-I: SPI_prepare() failed");
+
cs->have_plan |= PLAN_APPLY_QUERIES;
}