Reintroduce ddlScript_complete() at the end of EXECUTE SCRIPT
authorJan Wieck <JanWieck@Yahoo.com>
Thu, 16 Feb 2012 15:16:00 +0000 (10:16 -0500)
committerJan Wieck <JanWieck@Yahoo.com>
Thu, 16 Feb 2012 15:16:00 +0000 (10:16 -0500)
to fix relnames and log trigger arguments.

Minor changes according to code review by Steve.

src/backend/slony1_base.sql
src/backend/slony1_funcs.c
src/backend/slony1_funcs.def
src/backend/slony1_funcs.sql
src/slon/remote_worker.c
src/slonik/slonik.c

index 1fc0dc1d3557f12de88519bd2a75d8c3e5ed0c9c..83ee96fd9e7be9981d0ee5c0e525bc89fe9dced3 100644 (file)
@@ -433,6 +433,7 @@ create table @NAMESPACE@.sl_log_script (
        log_origin                      int4,
        log_txid                        bigint,
        log_actionseq           int8,
+       log_cmdtype                     "char",
        log_cmdargs                     text[]
 ) WITHOUT OIDS;
 create index sl_log_script_idx1 on @NAMESPACE@.sl_log_script
@@ -442,6 +443,7 @@ comment on table @NAMESPACE@.sl_log_script is 'Captures SQL script queries to be
 comment on column @NAMESPACE@.sl_log_script.log_origin is 'Origin name from which the change came';
 comment on column @NAMESPACE@.sl_log_script.log_txid is 'Transaction ID on the origin node';
 comment on column @NAMESPACE@.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas';
+comment on column @NAMESPACE@.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete';
 comment on column @NAMESPACE@.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.';
 
 -- ----------------------------------------------------------------------
index eb492fb022a9993f018ccedd5593e90902d486cf..9883b9587ec028d2f200b54763ae5093c5e9dab1 100644 (file)
@@ -105,6 +105,14 @@ extern DLLIMPORT Node *newNodeMacroHolder;
 #define PLAN_INSERT_LOG_STATUS (1 << 2)
 #define PLAN_APPLY_QUERIES     (1 << 3)
 
+/*
+ * This OID definition is missing in 8.3, although the data type
+ * does exist.
+ */
+#ifndef TEXTARRAYOID
+#define TEXTARRAYOID 1009
+#endif
+
 
 /* ----
  * Slony_I_ClusterStatus -
@@ -351,7 +359,7 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS)
        TransactionId newXid = GetTopTransactionId();
        Slony_I_ClusterStatus *cs;
        TriggerData *tg;
-       Datum           argv[6];
+       Datum           log_param[6];
        text       *cmdtype = NULL;
        int32           cmdupdncols = 0;
        int                     rc;
@@ -745,18 +753,18 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS)
        cmddims[0] = cmdargselem - cmdargs;
        cmdlbs[0] = 1;
 
-       argv[0] = Int32GetDatum(tab_id);
-       argv[1] = DirectFunctionCall1(textin, 
+       log_param[0] = Int32GetDatum(tab_id);
+       log_param[1] = DirectFunctionCall1(textin, 
                                CStringGetDatum(get_namespace_name(
                                        RelationGetNamespace(tg->tg_relation))));
-       argv[2] = DirectFunctionCall1(textin,
+       log_param[2] = DirectFunctionCall1(textin,
                                CStringGetDatum(RelationGetRelationName(tg->tg_relation)));
-       argv[3] = PointerGetDatum(cmdtype);
-       argv[4] = Int32GetDatum(cmdupdncols);
-       argv[5] = PointerGetDatum(construct_md_array(cmdargs, cmdnulls, 1,
+       log_param[3] = PointerGetDatum(cmdtype);
+       log_param[4] = Int32GetDatum(cmdupdncols);
+       log_param[5] = PointerGetDatum(construct_md_array(cmdargs, cmdnulls, 1,
                        cmddims, cmdlbs, TEXTOID, -1, false, 'i'));
 
-       SPI_execp(cs->plan_active_log, argv, NULL, 0);
+       SPI_execp(cs->plan_active_log, log_param, NULL, 0);
 
        SPI_finish();
        return PointerGetDatum(NULL);
@@ -968,7 +976,7 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        {
                char       *ddl_script;
                bool            localNodeFound = true;
-               Datum           script_insert_args[4];
+               Datum           script_insert_args[5];
 
                apply_num_script++;
 
@@ -1038,6 +1046,96 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
                script_insert_args[2] = SPI_getbinval(new_row, tupdesc, 
                                SPI_fnumber(tupdesc, "log_actionseq"), &isnull);
                script_insert_args[3] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_cmdtype"), &isnull);
+               script_insert_args[4] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
+               if (SPI_execp(cs->plan_insert_log_script, script_insert_args, NULL, 0) < 0)
+                       elog(ERROR, "Execution of sl_log_script insert plan failed");
+
+               /*
+                * Return NULL to suppress the insert into the original sl_log_N.
+                */
+               SPI_finish();
+               return PointerGetDatum(NULL);
+       }
+
+       /*
+        * Process the special ddlScript_complete row.
+        */
+       if (cmdtype == 's')
+       {
+               bool            localNodeFound = true;
+               Datum           script_insert_args[4];
+
+               apply_num_script++;
+
+               /*
+                * Turn the log_cmdargs into a plain array of Text Datums.
+                */
+               dat = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
+               if (isnull)
+                       elog(ERROR, "Slony-I: log_cmdargs is NULL");
+
+               deconstruct_array(DatumGetArrayTypeP(dat), 
+                               TEXTOID, -1, false, 'i', 
+                               &cmdargs, &cmdargsnulls, &cmdargsn);
+               
+               /*
+                * If there is an optional node ID list, check that we are in it.
+                */
+               if (cmdargsn > 1) {
+                       localNodeFound = false;
+                       for (i = 1; i < cmdargsn; i++)
+                       {
+                               int32   nodeId = DatumGetInt32(
+                                                                       DirectFunctionCall1(int4in,
+                                                                       DirectFunctionCall1(textout, cmdargs[i])));
+                               if (nodeId == cs->localNodeId)
+                               {
+                                       localNodeFound = true;
+                                       break;
+                               }
+                       }
+               }
+
+               /*
+                * Execute the ddlScript_complete_int() call if we have to.
+                */
+               if (localNodeFound)
+               {
+                       char    query[1024];
+
+                       snprintf(query, sizeof(query),
+                               "select %s.ddlScript_complete_int();",
+                               cs->clusterident);
+
+                       if (SPI_exec(query, 0) < 0)
+                       {
+                               elog(ERROR, "SPI_exec() failed for statement '%s'",
+                                       query);
+                       }
+
+                       /*
+                        * Set the currentXid to invalid to flush the apply
+                        * query cache.
+                        */
+                       cs->currentXid = InvalidTransactionId;
+               }
+
+               /*
+                * Build the parameters for the insert into sl_log_script
+                * and execute the query.
+                */
+               script_insert_args[0] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_origin"), &isnull);
+               script_insert_args[1] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_txid"), &isnull);
+               script_insert_args[2] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_actionseq"), &isnull);
+               script_insert_args[3] = SPI_getbinval(new_row, tupdesc, 
+                               SPI_fnumber(tupdesc, "log_cmdtype"), &isnull);
+               script_insert_args[4] = SPI_getbinval(new_row, tupdesc, 
                                SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
                if (SPI_execp(cs->plan_insert_log_script, script_insert_args, NULL, 0) < 0)
                        elog(ERROR, "Execution of sl_log_script insert plan failed");
@@ -1059,9 +1157,13 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
        tableid = DatumGetInt32(dat);
        nspname = SPI_getvalue(new_row, tupdesc, 
                        SPI_fnumber(tupdesc, "log_tablenspname"));
+       if (nspname == NULL)
+               elog(ERROR, "Slony-I: log_tablenspname is NULL on INSERT/UPDATE/DELETE");
 
        relname = SPI_getvalue(new_row, tupdesc,
                        SPI_fnumber(tupdesc, "log_tablerelname"));
+       if (relname == NULL)
+               elog(ERROR, "Slony-I: log_tablerelname is NULL on INSERT/UPDATE/DELETE");
 
        dat = SPI_getbinval(new_row, tupdesc, 
                        SPI_fnumber(tupdesc, "log_cmdupdncols"), &isnull);
@@ -1100,7 +1202,8 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
 
                        colname = DatumGetCString(DirectFunctionCall1(
                                                        textout, cmdargs[i]));
-                       sprintf(applyQueryPos, ",%s", slon_quote_identifier(colname));
+                       snprintf(applyQueryPos, applyQuerySize - (applyQueryPos - applyQuery),
+                               ",%s", slon_quote_identifier(colname));
                        applyQueryPos += strlen(applyQueryPos);
                }
        }
@@ -1629,6 +1732,12 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
 }
 
 
+/*
+ * _Slony_I_logApplySetCacheSize()
+ *
+ *     Called by slon during startup to set the size of the log apply
+ *     query cache according to the config parameter apply_cache_size.
+ */
 Datum
 _Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS)
 {
@@ -1651,6 +1760,11 @@ _Slony_I_logApplySetCacheSize(PG_FUNCTION_ARGS)
 }
 
 
+/*
+ * _Slony_I_logApplySaveStats()
+ *
+ *     Save statistics at the end of SYNC processing.
+ */
 Datum
 _Slony_I_logApplySaveStats(PG_FUNCTION_ARGS)
 {
@@ -1771,7 +1885,7 @@ applyQueryIncrease(void)
 {
        if (applyQueryPos - applyQuery + 1024 > applyQuerySize)
        {
-               int offset = applyQueryPos - applyQuery;
+               size_t offset = applyQueryPos - applyQuery;
                applyQuerySize *= 2;
                applyQuery = realloc(applyQuery, applyQuerySize);
                if (applyQuery == NULL)
@@ -2229,16 +2343,17 @@ getClusterStatus(Name cluster_name, int need_plan_mask)
                 * The plan to insert into sl_log_script.
                 */
                sprintf(query, "insert into %s.sl_log_script "
-                               "(log_origin, log_txid, log_actionseq, log_cmdargs) "
-                               "values ($1, $2, $3, $4);",
+                               "(log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) "
+                               "values ($1, $2, $3, $4, $5);",
                                slon_quote_identifier(NameStr(*cluster_name)));
                plan_types[0] = INT4OID;
                plan_types[1] = INT8OID;
                plan_types[2] = INT8OID;
-               plan_types[3] = TEXTARRAYOID;
+               plan_types[3] = CHAROID;
+               plan_types[4] = TEXTARRAYOID;
 
                cs->plan_insert_log_script = SPI_saveplan(
-                               SPI_prepare(query, 4, plan_types));
+                               SPI_prepare(query, 5, plan_types));
                if (cs->plan_insert_log_script == NULL)
                        elog(ERROR, "Slony-I: SPI_prepare() failed");
 
@@ -2346,10 +2461,6 @@ getClusterStatus(Name cluster_name, int need_plan_mask)
  *
  */
 
-#ifndef TEXTARRAYOID
-#define TEXTARRAYOID 1009
-#endif
-
 int prepareLogPlan(Slony_I_ClusterStatus * cs,
                                int log_status)
 {
index 95e96ce7882be92a0abdc99590f1e56b7383b39b..ce8f28ed224ff95988c2cf1e3cd2a3b7246af328 100644 (file)
@@ -8,4 +8,7 @@ _Slony_I_killBackend
 _Slony_I_seqtrack
 _Slony_I_logTrigger
 _Slony_I_resetSession
-_slon_decode_tgargs
\ No newline at end of file
+_Slony_1_logApply
+_Slony_1_logApplySetCacheSize
+_Slony_1_logApplySaveStats
+_slon_decode_tgargs
index 10270ad674ec8cbc9f7797a90bc340ad180fcccc..8ac6a848c19163763dc58efe436a697f6d79c9a1 100644 (file)
@@ -184,6 +184,40 @@ create or replace function @NAMESPACE@.resetSession() returns text
           as '$libdir/slony1_funcs.@MODULEVERSION@','_Slony_I_resetSession'
           language C;
 
+-- ----------------------------------------------------------------------
+-- FUNCTION logApply ()
+--
+--     A trigger function that is placed on the tables sl_log_1/2 that
+--     does the actual work of updating the user tables.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApply () returns trigger
+    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApply'
+       language C
+       security definer;
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logApplySetCacheSize ()
+--
+--     A control function for the prepared query plan cache size used
+--     in the logApply() trigger.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApplySetCacheSize (p_size int4) 
+returns int4
+    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySetCacheSize'
+       language C;
+
+-- ----------------------------------------------------------------------
+-- FUNCTION logApplySaveStats ()
+--
+--     A function used by the remote worker to update sl_apply_stats after
+--     performing a SYNC.
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE@.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 
+returns int4
+    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySaveStats'
+       language C;
+
+
 create or replace function @NAMESPACE@.checkmoduleversion () returns text as $$
 declare
   moduleversion        text;
@@ -3322,7 +3356,7 @@ Set sequence seq_id to have new value last_value.
 ';
 
 -- ----------------------------------------------------------------------
--- FUNCTION ddlCapture (origin, statement)
+-- FUNCTION ddlCapture (statement, nodes)
 --
 --     Capture DDL into sl_log_script
 -- ----------------------------------------------------------------------
@@ -3368,10 +3402,10 @@ begin
        execute p_statement;
 
        insert into @NAMESPACE@.sl_log_script
-                       (log_origin, log_txid, log_actionseq, log_cmdargs)
+                       (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
                values 
                        (c_local_node, pg_catalog.txid_current(), 
-                       nextval('@NAMESPACE@.sl_action_seq'), c_cmdargs);
+                       nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs);
 
        return currval('@NAMESPACE@.sl_action_seq');
 end;
@@ -3381,6 +3415,91 @@ comment on function @NAMESPACE@.ddlCapture (p_statement text, p_nodes text) is
 'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers';
 
 
+-- ----------------------------------------------------------------------
+-- FUNCTION ddlScript_complete (p_nodes)
+--
+--     Actions to be performed after DDL script execution
+-- ----------------------------------------------------------------------
+drop function if exists @NAMESPACE@.ddlScript_complete (int4, text, int4);  -- Needed because function signature has changed!
+
+create or replace function @NAMESPACE@.ddlScript_complete (p_nodes text)
+returns bigint
+as $$
+declare
+       c_local_node    integer;
+       c_found_origin  boolean;
+       c_node                  text;
+       c_cmdargs               text[];
+begin
+       c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
+
+       c_cmdargs = '{}'::text[];
+       if p_nodes is not null then
+               c_found_origin := 'f';
+               -- p_nodes list needs to consist of a list of nodes that exist
+               -- and that include the current node ID
+               for c_node in select trim(node) from
+                               pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop
+                       if not exists 
+                                       (select 1 from @NAMESPACE@.sl_node 
+                                       where no_id = (c_node::integer)) then
+                               raise exception 'ddlcapture(%,%) - node % does not exist!', 
+                                       p_statement, p_nodes, c_node;
+                  end if;
+
+                  if c_local_node = (c_node::integer) then
+                         c_found_origin := 't';
+                  end if;
+
+                  c_cmdargs = array_append(c_cmdargs, c_node);
+          end loop;
+
+               if not c_found_origin then
+                       raise exception 
+                               'ddlScript_complete(%) - origin node % not included in ONLY ON list!',
+                               p_nodes, c_local_node;
+       end if;
+    end if;
+
+       perform @NAMESPACE@.ddlScript_complete_int();
+
+       insert into @NAMESPACE@.sl_log_script
+                       (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
+               values 
+                       (c_local_node, pg_catalog.txid_current(), 
+                       nextval('@NAMESPACE@.sl_action_seq'), 's', c_cmdargs);
+
+       return currval('@NAMESPACE@.sl_action_seq');
+end;
+$$ language plpgsql;
+
+comment on function @NAMESPACE@.ddlScript_complete(p_nodes text) is
+'ddlScript_complete(set_id, script, only_on_node)
+
+After script has run on origin, this fixes up relnames and
+log trigger arguments and inserts the "fire ddlScript_complete_int()
+log row into sl_log_script.';
+
+-- ----------------------------------------------------------------------
+-- FUNCTION ddlScript_complete_int ()
+--
+--     Complete the DDL_SCRIPT event
+-- ----------------------------------------------------------------------
+drop function if exists @NAMESPACE@.ddlScript_complete_int(int4, int4);
+create or replace function @NAMESPACE@.ddlScript_complete_int ()
+returns int4
+as $$
+begin
+       perform @NAMESPACE@.updateRelname();
+       perform @NAMESPACE@.repair_log_triggers(true);
+       return 0;
+end;
+$$ language plpgsql;
+comment on function @NAMESPACE@.ddlScript_complete_int() is
+'ddlScript_complete_int()
+
+Complete processing the DDL_SCRIPT event.';
+
 -- ----------------------------------------------------------------------
 -- FUNCTION alterTableAddTriggers (tab_id)
 -- ----------------------------------------------------------------------
@@ -4614,7 +4733,8 @@ comment on function @NAMESPACE@.generate_sync_event(p_interval interval) is
 --
 --      Reset the relnames          
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.updateRelname (p_set_id int4, p_only_on_node int4)
+drop function if exists @NAMESPACE@.updateRelname(int4, int4);
+create or replace function @NAMESPACE@.updateRelname ()
 returns int4
 as $$
 declare
@@ -4626,33 +4746,6 @@ begin
         -- ----
         lock table @NAMESPACE@.sl_config_lock;
 
-        -- ----
-        -- Check that we either are the set origin or a current
-        -- subscriber of the set.
-        -- ----
-        v_no_id := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
-        select set_origin into v_set_origin
-                        from @NAMESPACE@.sl_set
-                        where set_id = p_set_id
-                        for update;
-        if not found then
-                raise exception 'Slony-I: set % not found', p_set_id;
-        end if;
-        if v_set_origin <> v_no_id
-                and not exists (select 1 from @NAMESPACE@.sl_subscribe
-                        where sub_set = p_set_id
-                        and sub_receiver = v_no_id)
-        then
-                return 0;
-        end if;
-    
-        -- ----
-        -- If execution on only one node is requested, check that
-        -- we are that node.
-        -- ----
-        if p_only_on_node > 0 and p_only_on_node <> v_no_id then
-                return 0;
-        end if;
         update @NAMESPACE@.sl_table set 
                 tab_relname = PGC.relname, tab_nspname = PGN.nspname
                 from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 
@@ -4665,12 +4758,12 @@ begin
                 where @NAMESPACE@.sl_sequence.seq_reloid = PGC.oid
                 and PGC.relnamespace = PGN.oid and
                    (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname);
-        return p_set_id;
+        return 0;
 end;
 $$ language plpgsql;
 
-comment on function @NAMESPACE@.updateRelname(p_set_id int4, p_only_on_node int4) is
-'updateRelname(set_id, only_on_node)';
+comment on function @NAMESPACE@.updateRelname() is
+'updateRelname()';
 
 -- ----------------------------------------------------------------------
 -- FUNCTION updateReloid (set_id, only_on_node)
@@ -5979,40 +6072,6 @@ repaired. Otherwise all replicated tables with outdated trigger arguments
 are recreated.';
 
 
--- ----------------------------------------------------------------------
--- FUNCTION logApply ()
---
---     A trigger function that is placed on the tables sl_log_1/2 that
---     does the actual work of updating the user tables.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApply () returns trigger
-    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApply'
-       language C
-       security definer;
-
--- ----------------------------------------------------------------------
--- FUNCTION logApplySetCacheSize ()
---
---     A control function for the prepared query plan cache size used
---     in the logApply() trigger.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApplySetCacheSize (p_size int4) 
-returns int4
-    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySetCacheSize'
-       language C;
-
--- ----------------------------------------------------------------------
--- FUNCTION logApplySaveStats ()
---
---     A function used by the remote worker to update sl_apply_stats after
---     performing a SYNC.
--- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 
-returns int4
-    as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApplySaveStats'
-       language C;
-
-
 create or replace function @NAMESPACE@.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint
 as $$
 declare
index 3e149a9823a9370314036aa8c4335e08795113cb..e61c779151895ae7ba898df340afcbec94bcc24b 100644 (file)
@@ -659,10 +659,13 @@ remoteWorkerThread_main(void *cdata)
                                        sg_last_grouping++;
                        }
 
-                       slon_appendquery(&query1, "select %s.logApplySaveStats("
-                                       "'_%s', %d, '%s'::interval); ",
-                                       rtcfg_namespace, rtcfg_cluster_name,
-                                       node->no_id, wd->duration_buf);
+                       if (monitor_threads)
+                       {
+                               slon_appendquery(&query1, "select %s.logApplySaveStats("
+                                               "'_%s', %d, '%s'::interval); ",
+                                               rtcfg_namespace, rtcfg_cluster_name,
+                                               node->no_id, wd->duration_buf);
+                       }
                        strcpy(wd->duration_buf, "0 s");
 
                        slon_log(SLON_DEBUG2,"remoteWorkerThread_%d: committing SYNC"
@@ -1453,10 +1456,6 @@ remoteWorkerThread_main(void *cdata)
 
                                need_reloadListen = true;
                        }
-                       else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) 
-                       {
-                               /* don't need to do anything for this event */
-                       }
                        else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
                        {
                                int                     reset_config_setid = (int) strtol(event->ev_data1, NULL, 10);
@@ -3840,7 +3839,7 @@ slon_log(SLON_DEBUG2,
                        slon_appendquery(provider_query,
                                                         "select log_origin, log_txid, "
                                                         "NULL::integer, log_actionseq, "
-                                                        "NULL::text, NULL::text, 'S'::\"char\", "
+                                                        "NULL::text, NULL::text, log_cmdtype, "
                                                         "NULL::integer, log_cmdargs "
                                                         "from %s.sl_log_script "
                                                         "where log_origin = %d ",
@@ -3857,7 +3856,7 @@ slon_log(SLON_DEBUG2,
                                                        "union all "
                                                         "select log_origin, log_txid, "
                                                         "NULL::integer, log_actionseq, "
-                                                        "NULL::text, NULL::text, 'S'::\"char\", "
+                                                        "NULL::text, NULL::text, log_cmdtype, "
                                                         "NULL::integer, log_cmdargs "
                                                         "from %s.sl_log_script "
                                                         "where log_origin = %d ",
index 4766a1292ef9906fda266997e331e8ab174f4cce..869a8da01b4214e305de508b06424997ddb8b90b 100644 (file)
@@ -4628,6 +4628,7 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt)
        SlonikAdmInfo *adminfo1;
        SlonDString query, equery;
        SlonDString script;
+       PGresult   *res1;
        int                     rc;
        int                     num_statements = -1, stmtno;
        char            buf[4096];
@@ -4693,7 +4694,6 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt)
        for (stmtno=0; stmtno < num_statements;  stmtno++) {
                int startpos, endpos;
                char *dest;
-               PGresult   *res1;
                if (stmtno == 0)
                        startpos = 0;
                else
@@ -4722,6 +4722,39 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt)
                }
                PQclear(res1);
        }
+
+    /*
+        * Finally call ddlScript_complete()
+        */
+       if ((stmt->only_on_nodes == NULL) && (stmt->only_on_node < 0)) {
+                       slon_mkquery(&query,
+                                                "select \"_%s\".ddlScript_complete(NULL::text);",
+                                                stmt->hdr.script->clustername);
+       } else {
+               if (stmt->only_on_node > 0) {
+                       slon_mkquery(&query,
+                                                "select \"_%s\".ddlScript_complete('%d');",
+                                                stmt->hdr.script->clustername, stmt->only_on_node);
+               } else {  /* stmt->only_on_nodes is populated */
+                       slon_mkquery(&query,
+                                                "select \"_%s\".ddlScript_complete('%s');",
+                                                stmt->hdr.script->clustername, stmt->only_on_nodes);
+               }
+       }
+       res1 = PQexec(adminfo1->dbconn, dstring_data(&query));
+       if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+       {
+                       fprintf(stderr, "%s [%s] - %s",
+                                       PQresStatus(PQresultStatus(res1)),
+                                       dstring_data(&query), PQresultErrorMessage(res1));
+                       PQclear(res1);
+                       dstring_free(&equery);
+                       dstring_free(&script);
+                       dstring_free(&query);
+                       return -1;
+       }
+       PQclear(res1);
+
        dstring_free(&equery);
        dstring_free(&script);
        dstring_free(&query);