bug 304 - record sequence values as part of a EXECUTE_SCRIPT
authorSteve Singer <ssinger@ca.afilias.info>
Thu, 1 Aug 2013 19:09:45 +0000 (15:09 -0400)
committerSteve Singer <ssinger@ca.afilias.info>
Thu, 1 Aug 2013 19:09:45 +0000 (15:09 -0400)
Prior to 2.2 the EXECUTE_SCRIPT command result in a DDL_SCRIPT event
and a SYNC event generated as part of the same transaction. That
SYNC event also replicates the value of any sequences to the replica
before the DDL SCRIPT command.

In 2.2 the DDL script is replicated as part of a normal SYNC
as rows in sl_log_script.  This commit stores any sequence changes
as extra elements in the cmdargs array.  These sequence values are
then set before the DDL_SCRIPT is executed on the replica

This commit includes a unit test change to reproduce this issue
the test now passes

clustertest/regression/testddl/init_schema.sql
clustertest/regression/testddl/testddl.js
doc/adminguide/ddlchanges.sgml
src/backend/slony1_base.sql
src/backend/slony1_funcs.c
src/backend/slony1_funcs.sql

index ee951881d126c95b177dd81c9e1942582aea3696..1e9407535d66a4150555db0ab1815bb0b1192a36 100644 (file)
@@ -60,3 +60,13 @@ zone_id integer
 ALTER TABLE ONLY billing_discount
     ADD CONSTRAINT billing_discount_pkey PRIMARY KEY (billing_discount_id);
 
+CREATE OR REPLACE FUNCTION insert_table1() returns trigger
+as $$
+declare
+
+begin
+ insert into table1(data) values (NEW.data);
+  return NEW;
+end;
+$$
+language plpgsql;
\ No newline at end of file
index 4288f49a5f4fd3a5ea72da53f247adcd466ba285..7ecf4755c137fb790a146281c80652fea7a85937 100644 (file)
@@ -29,6 +29,9 @@ function init_tables() {
        +"set add table (id=4, set id=1, origin=1, fully qualified name = 'public.table4');\n"
        +"set add table (id=5, set id=1, origin=1, fully qualified name = 'public.table5');\n"
        +"set add table (id=6, set id=1, origin=1, fully qualified name = 'public.billing_discount');\n"
+       + "set add sequence(id=1,set id=1, origin=1, fully qualified name= 'public.table1_id_seq');\n"
+       + "set add sequence(id=2,set id=1, origin=1, fully qualified name= 'public.table5_id_seq');\n"
+       
        
        return script;
 }
@@ -121,6 +124,33 @@ function individual_ddl(coordinator, nodenum) {
        run_slonik('update ddl',coordinator,preamble,slonikScript);
 }
 
+function trigger_function(coordinator) {
+       /**
+        * We stop the slons because we want to make sure that a SYNC does not
+        * happen in between the EXECUTE_SCRIPT and the next SYNC.
+        */
+       terminate_slon(coordinator);
+       var sql = '';
+       for(var idx=0; idx < 1000; idx++) {
+               sql = sql + "insert into table5(data) values ('seqtest');\n"
+       }
+       var psql = coordinator.createPsqlCommand('db1',sql);
+       psql.run();
+       coordinator.join(psql);
+       premable = get_slonik_preamble();
+       slonikScript = "EXECUTE SCRIPT(  SQL='alter table table1 drop column seqed;create trigger table5_trigger "
+               + " before INSERT on public.table5 for each row execute procedure "
+               + " insert_table1();'"
+               + ' ,EVENT NODE=1 );';
+       run_slonik('add trigger ddl',coordinator,preamble,slonikScript);
+       slonikScript = "EXECUTE SCRIPT(  SQL='insert into table5(data) values (9);'"
+               + ' ,EVENT NODE=1);';
+       run_slonik('add trigger ddl',coordinator,preamble,slonikScript);
+
+       launch_slon(coordinator);       
+       
+}
+
 
 function inline_ddl(coordinator) {
        premable = get_slonik_preamble();
@@ -166,6 +196,11 @@ function do_test(coordinator) {
 
        inline_ddl(coordinator);
        wait_for_sync(coordinator);
+
+
+       trigger_function(coordinator);
+       wait_for_sync(coordinator);
+
 }
 
 function get_compare_queries() {
index 549e576cf4f3e47da68cedfbf8d0abe657005e4f..21f9f34266c745acd011adfc661ce492fe1f995a 100644 (file)
@@ -66,6 +66,20 @@ replica nodes.  It is advisiable to not be concurrently inserting,deleting
 or updating rows to a table while a script changing that table (adding or
 deleting columns) is also running.</para></listitem>
 
+<listitem><para>&slony1; 2.2.x and higher will replicate the SQL
+of an EXECUTE SCRIPT as part of a SYNC.  Scripts that perfrom an "ALTER TABLE"
+to a replicated table will be replicated in the correct order with respect
+to other concurrent activities on that table because of the exclusive lock
+that the alter table received on the origin node.   If your EXECUTE SCRIPT
+does not obtain exclusive locks on all of the tables it uses then
+you need to make sure that any transactions running concurrently with
+the script are not making changes that can effect the results of the script.
+For example, if your script does a nextval('some_replicated_seq') and 
+that sequence is being incremented by another transactions at the same time then
+it is possible that script sees a different value from the sequence when
+it runs on a replica than it did on the origin.</para></listitem>
+
+
 </itemizedlist>
 
 </sect2>
index 83ee96fd9e7be9981d0ee5c0e525bc89fe9dced3..9be53974b7a6193361fe77f03f7d251f819bfb6a 100644 (file)
@@ -728,4 +728,3 @@ comment on column @NAMESPACE@.sl_components.co_event is 'which event have I star
 -- ----------------------------------------------------------------------
 grant usage on schema @NAMESPACE@ to public;
 
-
index 0699cbc9bd9ecdc2ae2578c58c0739ac602d51a5..cb78305531d8a83b8e73c4b1a788887126be39cb 100644 (file)
@@ -42,6 +42,7 @@
 #include "utils/memutils.h"
 #include "utils/hsearch.h"
 #include "utils/timestamp.h"
+#include "utils/int8.h"
 #ifdef HAVE_GETACTIVESNAPSHOT
 #include "utils/snapmgr.h"
 #endif
@@ -989,7 +990,14 @@ versionFunc(logApply)(PG_FUNCTION_ARGS)
                char       *ddl_script;
                bool            localNodeFound = true;
                Datum           script_insert_args[5];
-               char        query[1024];
+               Datum      *nodeargs;
+               bool       *nodeargsnulls;
+               int                     nodeargsn;
+               Datum      *seqargs;
+               bool       *seqargsnulls;
+               int                     seqargsn;
+               Datum  array_holder;
+               Datum delim_text;
 
                apply_num_script++;
 
@@ -1001,29 +1009,64 @@ versionFunc(logApply)(PG_FUNCTION_ARGS)
                                                        SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
                if (isnull)
                        elog(ERROR, "Slony-I: log_cmdargs is NULL");
-
                deconstruct_array(DatumGetArrayTypeP(dat),
                                                  TEXTOID, -1, false, 'i',
-                                                 &cmdargs, &cmdargsnulls, &cmdargsn);
+                                                 &cmdargs, &cmdargsnulls, &cmdargsn);          
+               
+               if( cmdargsn < 3 )
+               {
+                       elog(ERROR,"Slony-I: DDL_SCRIPT events require at least 3 elements"\
+                                "in the argument array");
+               }
 
+               nodeargs=NULL;
+               nodeargsn=0;
+               delim_text=DirectFunctionCall1(textin,CStringGetDatum(","));
+               if ( (! cmdargsnulls[1])  )
+               {                       
+                       char * astr=DatumGetCString(DirectFunctionCall1(textout,
+                                                                                                                       cmdargs[1]));
+                       
+                       if ( strcmp(astr,""))
+                       {
+                         array_holder = DirectFunctionCall2(text_to_array,cmdargs[1],
+                                                                                                delim_text);
+                         deconstruct_array(DatumGetArrayTypeP(array_holder),
+                                                               TEXTOID, -1, false, 'i',
+                                                                 &nodeargs, &nodeargsnulls, &nodeargsn);
+                       }
+               }
+               seqargs=NULL;
+               seqargsn=0;
+               if ( (! cmdargsnulls[2])  ) 
+               {
+                       char * astr=DatumGetCString(DirectFunctionCall1(textout,
+                                                                                                                       cmdargs[2]));   
+                       if(  strcmp(astr,"") )
+                       {
+                               array_holder = DirectFunctionCall2(text_to_array,cmdargs[2],
+                                                                                                  delim_text);
+                               deconstruct_array(DatumGetArrayTypeP(array_holder),
+                                                                 TEXTARRAYOID, -1, false, 'i',
+                                                                 &seqargs, &seqargsnulls, &seqargsn);
+                       }
+               }
                /*
                 * The first element is the DDL statement itself.
                 */
                ddl_script = DatumGetCString(DirectFunctionCall1(
                                                                                                           textout, cmdargs[0]));
-
                /*
                 * If there is an optional node ID list, check that we are in it.
                 */
-               if (cmdargsn > 1)
+               if (nodeargsn > 0)
                {
                        localNodeFound = false;
-                       for (i = 1; i < cmdargsn; i++)
+                       for (i = 0; i < nodeargsn; i++)
                        {
                                int32           nodeId = DatumGetInt32(
                                                                                                   DirectFunctionCall1(int4in,
-                                                                 DirectFunctionCall1(textout, cmdargs[i])));
-
+                                                                 DirectFunctionCall1(textout, nodeargs[i])));
                                if (nodeId == cs->localNodeId)
                                {
                                        localNodeFound = true;
@@ -1038,6 +1081,53 @@ versionFunc(logApply)(PG_FUNCTION_ARGS)
                 */
                if (localNodeFound)
                {
+
+                       char query[1024];
+                       Oid argtypes[3];
+                       argtypes[0] = INT4OID;
+                       argtypes[1] = INT4OID;
+                       argtypes[2] = INT8OID;
+
+                       snprintf(query,1023,"select %s.sequenceSetValue($1,"    \
+                                        "$2,NULL,$3); ",tg->tg_trigger->tgargs[0]);                    
+                       void * plan = SPI_prepare(query,3,argtypes);
+                       if ( plan == NULL )
+                       {
+
+                               elog(ERROR,"could not prepare plan to call sequenceSetValue");
+                       }
+                       /**
+                        * before we execute the DDL we need to update the sequences.
+                        */
+                       if ( seqargsn > 0 )
+                       {
+                               
+                               for( i = 0; (i+2) < seqargsn; i=i+3 )
+                               {
+                                       Datum           call_args[3];
+                                       bool        call_nulls[3];
+                                       call_args[0] = DirectFunctionCall1(int4in,
+                                                                                                          DirectFunctionCall1(textout,seqargs[i]));
+                                       call_args[1] = DirectFunctionCall1(int4in,
+                                                                                                          DirectFunctionCall1(textout,seqargs[i+1]));
+                                       call_args[2] = DirectFunctionCall1(int8in,
+                                                                                                          DirectFunctionCall1(textout,seqargs[i+2]));
+
+                                       call_nulls[0]=0;
+                                       call_nulls[1]=0;
+                                       call_nulls[2]=0;
+
+                                       if ( SPI_execp(plan,call_args,call_nulls,0) < 0 )
+                                       {
+                                               elog(ERROR,"error executing sequenceSetValue plan");
+
+                                       }
+                                       
+                               }
+
+                               
+                       }
+
                        sprintf(query,"set session_replication_role to local;");
                        if(SPI_exec(query,0) < 0)
                        {
@@ -1606,7 +1696,6 @@ versionFunc(logApply)(PG_FUNCTION_ARGS)
                                        MemoryContextSwitchTo(oldContext);
                                        cacheEnt->typmod[i / 2] =
                                                target_rel->rd_att->attrs[colnum - 1]->atttypmod;
-
                                        sprintf(applyQueryPos, "%s%s = $%d",
                                                        (i > 0) ? " AND " : "",
                                                        slon_quote_identifier(colname),
index 00aad224190ff9ca5000e724f584b4de0a1abcce..dcbc9d91fb580af7c60bd177a6c1cfd0b5b02a05 100644 (file)
@@ -3366,10 +3366,11 @@ begin
        execute 'select setval(''' || v_fqname ||
                        ''', ' || p_last_value::text || ')';
 
-       insert into @NAMESPACE@.sl_seqlog
+       if p_ev_seqno is not null then
+          insert into @NAMESPACE@.sl_seqlog
                        (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value)
                        values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value);
-
+       end if;
        return p_seq_id;
 end;
 $$ language plpgsql;
@@ -3391,10 +3392,13 @@ declare
        c_found_origin  boolean;
        c_node                  text;
        c_cmdargs               text[];
+       c_nodeargs      text;
+       c_delim         text;
 begin
        c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
 
        c_cmdargs = array_append('{}'::text[], p_statement);
+       c_nodeargs = '';
        if p_nodes is not null then
                c_found_origin := 'f';
                -- p_nodes list needs to consist of a list of nodes that exist
@@ -3411,8 +3415,11 @@ begin
                   if c_local_node = (c_node::integer) then
                          c_found_origin := 't';
                   end if;
-
-                  c_cmdargs = array_append(c_cmdargs, c_node);
+                  if length(c_nodeargs)>0 then
+                         c_nodeargs = c_nodeargs ||','|| c_node;
+                  else
+                               c_nodeargs=c_node;
+                       end if;
           end loop;
 
                if not c_found_origin then
@@ -3421,15 +3428,24 @@ begin
                                p_statement, p_nodes, c_local_node;
        end if;
     end if;
-
-       execute p_statement;
-
+       c_cmdargs = array_append(c_cmdargs,c_nodeargs);
+       c_delim=',';
+       c_cmdargs = array_append(c_cmdargs, 
+
+           (select @NAMESPACE@.string_agg( seq_id::text || c_delim
+                  || c_local_node ||
+                   c_delim || seq_last_value)
+                   FROM (
+                      select seq_id,
+                  seq_last_value from @NAMESPACE@.sl_seqlastvalue
+                  where seq_origin = c_local_node) as FOO
+                       where NOT @NAMESPACE@.seqtrack(seq_id,seq_last_value) is NULL));
        insert into @NAMESPACE@.sl_log_script
                        (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
                values 
                        (c_local_node, pg_catalog.txid_current(), 
                        nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs);
-
+       execute p_statement;
        return currval('@NAMESPACE@.sl_action_seq');
 end;
 $$ language plpgsql;
@@ -6224,3 +6240,36 @@ begin
        return v_seq_id;
 end
 $$ language plpgsql;
+
+
+
+--
+-- we create a function + aggregate for string_agg to aggregate strings
+-- some versions of PG (ie prior to 9.0) don't support this
+CREATE OR replace function @NAMESPACE@.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS
+$BODY$
+DECLARE
+  c_delim text;
+BEGIN
+    c_delim = ',';
+       IF (txt_before IS NULL or txt_before='') THEN
+          RETURN txt_new;
+       END IF;
+       RETURN txt_before || c_delim || txt_new;
+END;
+$BODY$
+LANGUAGE plpgsql;
+comment on function @NAMESPACE@.agg_text_sum(text,text) is 
+'An accumulator function used by the slony string_agg function to
+aggregate rows into a string';
+--
+-- create a string_agg function in the slony schema.
+-- PG 8.3 does not have this function so we make our own
+-- when slony stops supporting PG 8.3 we can switch to
+-- the PG 9.0+ provided version of string_agg
+--
+CREATE AGGREGATE @NAMESPACE@.string_agg(text) (
+SFUNC=@NAMESPACE@.agg_text_sum,
+STYPE=text,
+INITCOND=''
+);
\ No newline at end of file