to fix relnames and log trigger arguments.
Minor changes according to code review by Steve.
log_origin int4,
log_txid bigint,
log_actionseq int8,
+ log_cmdtype "char",
log_cmdargs text[]
) WITHOUT OIDS;
create index sl_log_script_idx1 on @NAMESPACE@.sl_log_script
comment on column @NAMESPACE@.sl_log_script.log_origin is 'Origin name from which the change came';
comment on column @NAMESPACE@.sl_log_script.log_txid is 'Transaction ID on the origin node';
comment on column @NAMESPACE@.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas';
+comment on column @NAMESPACE@.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete';
comment on column @NAMESPACE@.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.';
-- ----------------------------------------------------------------------
#define PLAN_INSERT_LOG_STATUS (1 << 2)
#define PLAN_APPLY_QUERIES (1 << 3)
+/*
+ * This OID definition is missing in 8.3, although the data type
+ * does exist.
+ */
+#ifndef TEXTARRAYOID
+#define TEXTARRAYOID 1009
+#endif
+
/* ----
* Slony_I_ClusterStatus -
TransactionId newXid = GetTopTransactionId();
Slony_I_ClusterStatus *cs;
TriggerData *tg;
- Datum argv[6];
+ Datum log_param[6];
text *cmdtype = NULL;
int32 cmdupdncols = 0;
int rc;
cmddims[0] = cmdargselem - cmdargs;
cmdlbs[0] = 1;
- argv[0] = Int32GetDatum(tab_id);
- argv[1] = DirectFunctionCall1(textin,
+ log_param[0] = Int32GetDatum(tab_id);
+ log_param[1] = DirectFunctionCall1(textin,
CStringGetDatum(get_namespace_name(
RelationGetNamespace(tg->tg_relation))));
- argv[2] = DirectFunctionCall1(textin,
+ log_param[2] = DirectFunctionCall1(textin,
CStringGetDatum(RelationGetRelationName(tg->tg_relation)));
- argv[3] = PointerGetDatum(cmdtype);
- argv[4] = Int32GetDatum(cmdupdncols);
- argv[5] = PointerGetDatum(construct_md_array(cmdargs, cmdnulls, 1,
+ log_param[3] = PointerGetDatum(cmdtype);
+ log_param[4] = Int32GetDatum(cmdupdncols);
+ log_param[5] = PointerGetDatum(construct_md_array(cmdargs, cmdnulls, 1,
cmddims, cmdlbs, TEXTOID, -1, false, 'i'));
- SPI_execp(cs->plan_active_log, argv, NULL, 0);
+ SPI_execp(cs->plan_active_log, log_param, NULL, 0);
SPI_finish();
return PointerGetDatum(NULL);
{
char *ddl_script;
bool localNodeFound = true;
- Datum script_insert_args[4];
+ Datum script_insert_args[5];
apply_num_script++;
script_insert_args[2] = SPI_getbinval(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_actionseq"), &isnull);
script_insert_args[3] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_cmdtype"), &isnull);
+ script_insert_args[4] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
+ if (SPI_execp(cs->plan_insert_log_script, script_insert_args, NULL, 0) < 0)
+ elog(ERROR, "Execution of sl_log_script insert plan failed");
+
+ /*
+ * Return NULL to suppress the insert into the original sl_log_N.
+ */
+ SPI_finish();
+ return PointerGetDatum(NULL);
+ }
+
+ /*
+ * Process the special ddlScript_complete row.
+ */
+ if (cmdtype == 's')
+ {
+ bool localNodeFound = true;
+ Datum script_insert_args[4];
+
+ apply_num_script++;
+
+ /*
+ * Turn the log_cmdargs into a plain array of Text Datums.
+ */
+ dat = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
+ if (isnull)
+ elog(ERROR, "Slony-I: log_cmdargs is NULL");
+
+ deconstruct_array(DatumGetArrayTypeP(dat),
+ TEXTOID, -1, false, 'i',
+ &cmdargs, &cmdargsnulls, &cmdargsn);
+
+ /*
+ * If there is an optional node ID list, check that we are in it.
+ */
+ if (cmdargsn > 1) {
+ localNodeFound = false;
+ for (i = 1; i < cmdargsn; i++)
+ {
+ int32 nodeId = DatumGetInt32(
+ DirectFunctionCall1(int4in,
+ DirectFunctionCall1(textout, cmdargs[i])));
+ if (nodeId == cs->localNodeId)
+ {
+ localNodeFound = true;
+ break;
+ }
+ }
+ }
+
+ /*
+ * Execute the ddlScript_complete_int() call if we have to.
+ */
+ if (localNodeFound)
+ {
+ char query[1024];
+
+ snprintf(query, sizeof(query),
+ "select %s.ddlScript_complete_int();",
+ cs->clusterident);
+
+ if (SPI_exec(query, 0) < 0)
+ {
+ elog(ERROR, "SPI_exec() failed for statement '%s'",
+ query);
+ }
+
+ /*
+ * Set the currentXid to invalid to flush the apply
+ * query cache.
+ */
+ cs->currentXid = InvalidTransactionId;
+ }
+
+ /*
+ * Build the parameters for the insert into sl_log_script
+ * and execute the query.
+ */
+ script_insert_args[0] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_origin"), &isnull);
+ script_insert_args[1] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_txid"), &isnull);
+ script_insert_args[2] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_actionseq"), &isnull);
+ script_insert_args[3] = SPI_getbinval(new_row, tupdesc,
+ SPI_fnumber(tupdesc, "log_cmdtype"), &isnull);
+ script_insert_args[4] = SPI_getbinval(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
if (SPI_execp(cs->plan_insert_log_script, script_insert_args, NULL, 0) < 0)
elog(ERROR, "Execution of sl_log_script insert plan failed");
tableid = DatumGetInt32(dat);
nspname = SPI_getvalue(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_tablenspname"));
+ if (nspname == NULL)
+ elog(ERROR, "Slony-I: log_tablenspname is NULL on INSERT/UPDATE/DELETE");
relname = SPI_getvalue(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_tablerelname"));
+ if (relname == NULL)
+ elog(ERROR, "Slony-I: log_tablerelname is NULL on INSERT/UPDATE/DELETE");
dat = SPI_getbinval(new_row, tupdesc,
SPI_fnumber(tupdesc, "log_cmdupdncols"), &isnull);
colname = DatumGetCString(DirectFunctionCall1(
textout, cmdargs[i]));
- sprintf(applyQueryPos, ",%s", slon_quote_identifier(colname));
+ snprintf(applyQueryPos, applyQuerySize - (applyQueryPos - applyQuery),
+ ",%s", slon_quote_identifier(colname));
applyQueryPos += strlen(applyQueryPos);
}
}
}
+/*
+ * _Slony_I_logApplySetCacheSize()
+ *
+ * Called by slon during startup to set the size of the log apply
+ * query cache according to the config parameter apply_cache_size.
+ */
Datum
_Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS)
{
}
+/*
+ * _Slony_I_logApplySaveStats()
+ *
+ * Save statistics at the end of SYNC processing.
+ */
Datum
_Slony_I_logApplySaveStats(PG_FUNCTION_ARGS)
{
{
if (applyQueryPos - applyQuery + 1024 > applyQuerySize)
{
- int offset = applyQueryPos - applyQuery;
+ size_t offset = applyQueryPos - applyQuery;
applyQuerySize *= 2;
applyQuery = realloc(applyQuery, applyQuerySize);
if (applyQuery == NULL)
* The plan to insert into sl_log_script.
*/
sprintf(query, "insert into %s.sl_log_script "
- "(log_origin, log_txid, log_actionseq, log_cmdargs) "
- "values ($1, $2, $3, $4);",
+ "(log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) "
+ "values ($1, $2, $3, $4, $5);",
slon_quote_identifier(NameStr(*cluster_name)));
plan_types[0] = INT4OID;
plan_types[1] = INT8OID;
plan_types[2] = INT8OID;
- plan_types[3] = TEXTARRAYOID;
+ plan_types[3] = CHAROID;
+ plan_types[4] = TEXTARRAYOID;
cs->plan_insert_log_script = SPI_saveplan(
- SPI_prepare(query, 4, plan_types));
+ SPI_prepare(query, 5, plan_types));
if (cs->plan_insert_log_script == NULL)
elog(ERROR, "Slony-I: SPI_prepare() failed");
*
*/
-#ifndef TEXTARRAYOID
-#define TEXTARRAYOID 1009
-#endif
-
int prepareLogPlan(Slony_I_ClusterStatus * cs,
int log_status)
{
_Slony_I_seqtrack
_Slony_I_logTrigger
_Slony_I_resetSession
-_slon_decode_tgargs
\ No newline at end of file
+_Slony_1_logApply
+_Slony_1_logApplySetCacheSize
+_Slony_1_logApplySaveStats
+_slon_decode_tgargs
as '$libdir/slony1_funcs.@MODULEVERSION@','_Slony_I_resetSession'
language C;
+-- ----------------------------------------------------------------------
+-- FUNCTION logApply ()
+--
+-- A trigger function that is placed on the tables sl_log_1/2 that
+-- does the actual work of updating the user tables.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApply () returns trigger
+ as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApply'
+ language C
+ security definer;
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logApplySetCacheSize ()
+--
+-- A control function for the prepared query plan cache size used
+-- in the logApply() trigger.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApplySetCacheSize (p_size int4)
+returns int4
+ as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySetCacheSize'
+ language C;
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logApplySaveStats ()
+--
+-- A function used by the remote worker to update sl_apply_stats after
+-- performing a SYNC.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval)
+returns int4
+ as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySaveStats'
+ language C;
+
+
create or replace function @NAMESPACE@.checkmoduleversion () returns text as $$
declare
moduleversion text;
';
-- ----------------------------------------------------------------------
--- FUNCTION ddlCapture (origin, statement)
+-- FUNCTION ddlCapture (statement, nodes)
--
-- Capture DDL into sl_log_script
-- ----------------------------------------------------------------------
execute p_statement;
insert into @NAMESPACE@.sl_log_script
- (log_origin, log_txid, log_actionseq, log_cmdargs)
+ (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
values
(c_local_node, pg_catalog.txid_current(),
- nextval('@NAMESPACE@.sl_action_seq'), c_cmdargs);
+ nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs);
return currval('@NAMESPACE@.sl_action_seq');
end;
'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers';
+-- ----------------------------------------------------------------------
+-- FUNCTION ddlScript_complete (p_nodes)
+--
+-- Actions to be performed after DDL script execution
+-- ----------------------------------------------------------------------
+drop function if exists @NAMESPACE@.ddlScript_complete (int4, text, int4); -- Needed because function signature has changed!
+
+create or replace function @NAMESPACE@.ddlScript_complete (p_nodes text)
+returns bigint
+as $$
+declare
+ c_local_node integer;
+ c_found_origin boolean;
+ c_node text;
+ c_cmdargs text[];
+begin
+ c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
+
+ c_cmdargs = '{}'::text[];
+ if p_nodes is not null then
+ c_found_origin := 'f';
+ -- p_nodes list needs to consist of a list of nodes that exist
+ -- and that include the current node ID
+ for c_node in select trim(node) from
+ pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop
+ if not exists
+ (select 1 from @NAMESPACE@.sl_node
+ where no_id = (c_node::integer)) then
+ raise exception 'ddlcapture(%,%) - node % does not exist!',
+ p_statement, p_nodes, c_node;
+ end if;
+
+ if c_local_node = (c_node::integer) then
+ c_found_origin := 't';
+ end if;
+
+ c_cmdargs = array_append(c_cmdargs, c_node);
+ end loop;
+
+ if not c_found_origin then
+ raise exception
+ 'ddlScript_complete(%) - origin node % not included in ONLY ON list!',
+ p_nodes, c_local_node;
+ end if;
+ end if;
+
+ perform @NAMESPACE@.ddlScript_complete_int();
+
+ insert into @NAMESPACE@.sl_log_script
+ (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
+ values
+ (c_local_node, pg_catalog.txid_current(),
+ nextval('@NAMESPACE@.sl_action_seq'), 's', c_cmdargs);
+
+ return currval('@NAMESPACE@.sl_action_seq');
+end;
+$$ language plpgsql;
+
+comment on function @NAMESPACE@.ddlScript_complete(p_nodes text) is
+'ddlScript_complete(set_id, script, only_on_node)
+
+After script has run on origin, this fixes up relnames and
+log trigger arguments and inserts the "fire ddlScript_complete_int()
+log row into sl_log_script.';
+
+-- ----------------------------------------------------------------------
+-- FUNCTION ddlScript_complete_int ()
+--
+-- Complete the DDL_SCRIPT event
+-- ----------------------------------------------------------------------
+drop function if exists @NAMESPACE@.ddlScript_complete_int(int4, int4);
+create or replace function @NAMESPACE@.ddlScript_complete_int ()
+returns int4
+as $$
+begin
+ perform @NAMESPACE@.updateRelname();
+ perform @NAMESPACE@.repair_log_triggers(true);
+ return 0;
+end;
+$$ language plpgsql;
+comment on function @NAMESPACE@.ddlScript_complete_int() is
+'ddlScript_complete_int()
+
+Complete processing the DDL_SCRIPT event.';
+
-- ----------------------------------------------------------------------
-- FUNCTION alterTableAddTriggers (tab_id)
-- ----------------------------------------------------------------------
--
-- Reset the relnames
-- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.updateRelname (p_set_id int4, p_only_on_node int4)
+drop function if exists @NAMESPACE@.updateRelname(int4, int4);
+create or replace function @NAMESPACE@.updateRelname ()
returns int4
as $$
declare
-- ----
lock table @NAMESPACE@.sl_config_lock;
- -- ----
- -- Check that we either are the set origin or a current
- -- subscriber of the set.
- -- ----
- v_no_id := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
- select set_origin into v_set_origin
- from @NAMESPACE@.sl_set
- where set_id = p_set_id
- for update;
- if not found then
- raise exception 'Slony-I: set % not found', p_set_id;
- end if;
- if v_set_origin <> v_no_id
- and not exists (select 1 from @NAMESPACE@.sl_subscribe
- where sub_set = p_set_id
- and sub_receiver = v_no_id)
- then
- return 0;
- end if;
-
- -- ----
- -- If execution on only one node is requested, check that
- -- we are that node.
- -- ----
- if p_only_on_node > 0 and p_only_on_node <> v_no_id then
- return 0;
- end if;
update @NAMESPACE@.sl_table set
tab_relname = PGC.relname, tab_nspname = PGN.nspname
from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN
where @NAMESPACE@.sl_sequence.seq_reloid = PGC.oid
and PGC.relnamespace = PGN.oid and
(seq_relname <> PGC.relname or seq_nspname <> PGN.nspname);
- return p_set_id;
+ return 0;
end;
$$ language plpgsql;
-comment on function @NAMESPACE@.updateRelname(p_set_id int4, p_only_on_node int4) is
-'updateRelname(set_id, only_on_node)';
+comment on function @NAMESPACE@.updateRelname() is
+'updateRelname()';
-- ----------------------------------------------------------------------
-- FUNCTION updateReloid (set_id, only_on_node)
are recreated.';
--- ----------------------------------------------------------------------
--- FUNCTION logApply ()
---
--- A trigger function that is placed on the tables sl_log_1/2 that
--- does the actual work of updating the user tables.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApply () returns trigger
- as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApply'
- language C
- security definer;
-
--- ----------------------------------------------------------------------
--- FUNCTION logApplySetCacheSize ()
---
--- A control function for the prepared query plan cache size used
--- in the logApply() trigger.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApplySetCacheSize (p_size int4)
-returns int4
- as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySetCacheSize'
- language C;
-
--- ----------------------------------------------------------------------
--- FUNCTION logApplySaveStats ()
---
--- A function used by the remote worker to update sl_apply_stats after
--- performing a SYNC.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval)
-returns int4
- as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySaveStats'
- language C;
-
-
create or replace function @NAMESPACE@.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint
as $$
declare
sg_last_grouping++;
}
- slon_appendquery(&query1, "select %s.logApplySaveStats("
- "'_%s', %d, '%s'::interval); ",
- rtcfg_namespace, rtcfg_cluster_name,
- node->no_id, wd->duration_buf);
+ if (monitor_threads)
+ {
+ slon_appendquery(&query1, "select %s.logApplySaveStats("
+ "'_%s', %d, '%s'::interval); ",
+ rtcfg_namespace, rtcfg_cluster_name,
+ node->no_id, wd->duration_buf);
+ }
strcpy(wd->duration_buf, "0 s");
slon_log(SLON_DEBUG2,"remoteWorkerThread_%d: committing SYNC"
need_reloadListen = true;
}
- else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
- {
- /* don't need to do anything for this event */
- }
else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
{
int reset_config_setid = (int) strtol(event->ev_data1, NULL, 10);
slon_appendquery(provider_query,
"select log_origin, log_txid, "
"NULL::integer, log_actionseq, "
- "NULL::text, NULL::text, 'S'::\"char\", "
+ "NULL::text, NULL::text, log_cmdtype, "
"NULL::integer, log_cmdargs "
"from %s.sl_log_script "
"where log_origin = %d ",
"union all "
"select log_origin, log_txid, "
"NULL::integer, log_actionseq, "
- "NULL::text, NULL::text, 'S'::\"char\", "
+ "NULL::text, NULL::text, log_cmdtype, "
"NULL::integer, log_cmdargs "
"from %s.sl_log_script "
"where log_origin = %d ",
SlonikAdmInfo *adminfo1;
SlonDString query, equery;
SlonDString script;
+ PGresult *res1;
int rc;
int num_statements = -1, stmtno;
char buf[4096];
for (stmtno=0; stmtno < num_statements; stmtno++) {
int startpos, endpos;
char *dest;
- PGresult *res1;
if (stmtno == 0)
startpos = 0;
else
}
PQclear(res1);
}
+
+ /*
+ * Finally call ddlScript_complete()
+ */
+ if ((stmt->only_on_nodes == NULL) && (stmt->only_on_node < 0)) {
+ slon_mkquery(&query,
+ "select \"_%s\".ddlScript_complete(NULL::text);",
+ stmt->hdr.script->clustername);
+ } else {
+ if (stmt->only_on_node > 0) {
+ slon_mkquery(&query,
+ "select \"_%s\".ddlScript_complete('%d');",
+ stmt->hdr.script->clustername, stmt->only_on_node);
+ } else { /* stmt->only_on_nodes is populated */
+ slon_mkquery(&query,
+ "select \"_%s\".ddlScript_complete('%s');",
+ stmt->hdr.script->clustername, stmt->only_on_nodes);
+ }
+ }
+ res1 = PQexec(adminfo1->dbconn, dstring_data(&query));
+ if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, "%s [%s] - %s",
+ PQresStatus(PQresultStatus(res1)),
+ dstring_data(&query), PQresultErrorMessage(res1));
+ PQclear(res1);
+ dstring_free(&equery);
+ dstring_free(&script);
+ dstring_free(&query);
+ return -1;
+ }
+ PQclear(res1);
+
dstring_free(&equery);
dstring_free(&script);
dstring_free(&query);