From: Steve Singer Date: Thu, 1 Aug 2013 19:09:45 +0000 (-0400) Subject: bug 304 - record sequence values as part of a EXECUTE_SCRIPT X-Git-Tag: REL_2_2_0~16^2~6 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=94521f91ae32c129ebb28ca0b4ca4fcabb2bbc00;p=slony1-engine.git bug 304 - record sequence values as part of a EXECUTE_SCRIPT Prior to 2.2 the EXECUTE_SCRIPT command result in a DDL_SCRIPT event and a SYNC event generated as part of the same transaction. That SYNC event also replicates the value of any sequences to the replica before the DDL SCRIPT command. In 2.2 the DDL script is replicated as part of a normal SYNC as rows in sl_log_script. This commit stores any sequence changes as extra elements in the cmdargs array. These sequence values are then set before the DDL_SCRIPT is executed on the replica This commit includes a unit test change to reproduce this issue the test now passes --- diff --git a/clustertest/regression/testddl/init_schema.sql b/clustertest/regression/testddl/init_schema.sql index ee951881..1e940753 100644 --- a/clustertest/regression/testddl/init_schema.sql +++ b/clustertest/regression/testddl/init_schema.sql @@ -60,3 +60,13 @@ zone_id integer ALTER TABLE ONLY billing_discount ADD CONSTRAINT billing_discount_pkey PRIMARY KEY (billing_discount_id); +CREATE OR REPLACE FUNCTION insert_table1() returns trigger +as $$ +declare + +begin + insert into table1(data) values (NEW.data); + return NEW; +end; +$$ +language plpgsql; \ No newline at end of file diff --git a/clustertest/regression/testddl/testddl.js b/clustertest/regression/testddl/testddl.js index 4288f49a..7ecf4755 100644 --- a/clustertest/regression/testddl/testddl.js +++ b/clustertest/regression/testddl/testddl.js @@ -29,6 +29,9 @@ function init_tables() { +"set add table (id=4, set id=1, origin=1, fully qualified name = 'public.table4');\n" +"set add table (id=5, set id=1, origin=1, fully qualified name = 'public.table5');\n" +"set add table (id=6, set id=1, origin=1, fully qualified name = 'public.billing_discount');\n" + + "set add sequence(id=1,set id=1, origin=1, fully qualified name= 'public.table1_id_seq');\n" + + "set add sequence(id=2,set id=1, origin=1, fully qualified name= 'public.table5_id_seq');\n" + return script; } @@ -121,6 +124,33 @@ function individual_ddl(coordinator, nodenum) { run_slonik('update ddl',coordinator,preamble,slonikScript); } +function trigger_function(coordinator) { + /** + * We stop the slons because we want to make sure that a SYNC does not + * happen in between the EXECUTE_SCRIPT and the next SYNC. + */ + terminate_slon(coordinator); + var sql = ''; + for(var idx=0; idx < 1000; idx++) { + sql = sql + "insert into table5(data) values ('seqtest');\n" + } + var psql = coordinator.createPsqlCommand('db1',sql); + psql.run(); + coordinator.join(psql); + premable = get_slonik_preamble(); + slonikScript = "EXECUTE SCRIPT( SQL='alter table table1 drop column seqed;create trigger table5_trigger " + + " before INSERT on public.table5 for each row execute procedure " + + " insert_table1();'" + + ' ,EVENT NODE=1 );'; + run_slonik('add trigger ddl',coordinator,preamble,slonikScript); + slonikScript = "EXECUTE SCRIPT( SQL='insert into table5(data) values (9);'" + + ' ,EVENT NODE=1);'; + run_slonik('add trigger ddl',coordinator,preamble,slonikScript); + + launch_slon(coordinator); + +} + function inline_ddl(coordinator) { premable = get_slonik_preamble(); @@ -166,6 +196,11 @@ function do_test(coordinator) { inline_ddl(coordinator); wait_for_sync(coordinator); + + + trigger_function(coordinator); + wait_for_sync(coordinator); + } function get_compare_queries() { diff --git a/doc/adminguide/ddlchanges.sgml b/doc/adminguide/ddlchanges.sgml index 549e576c..21f9f342 100644 --- a/doc/adminguide/ddlchanges.sgml +++ b/doc/adminguide/ddlchanges.sgml @@ -66,6 +66,20 @@ replica nodes. It is advisiable to not be concurrently inserting,deleting or updating rows to a table while a script changing that table (adding or deleting columns) is also running. +&slony1; 2.2.x and higher will replicate the SQL +of an EXECUTE SCRIPT as part of a SYNC. Scripts that perfrom an "ALTER TABLE" +to a replicated table will be replicated in the correct order with respect +to other concurrent activities on that table because of the exclusive lock +that the alter table received on the origin node. If your EXECUTE SCRIPT +does not obtain exclusive locks on all of the tables it uses then +you need to make sure that any transactions running concurrently with +the script are not making changes that can effect the results of the script. +For example, if your script does a nextval('some_replicated_seq') and +that sequence is being incremented by another transactions at the same time then +it is possible that script sees a different value from the sequence when +it runs on a replica than it did on the origin. + + diff --git a/src/backend/slony1_base.sql b/src/backend/slony1_base.sql index 83ee96fd..9be53974 100644 --- a/src/backend/slony1_base.sql +++ b/src/backend/slony1_base.sql @@ -728,4 +728,3 @@ comment on column @NAMESPACE@.sl_components.co_event is 'which event have I star -- ---------------------------------------------------------------------- grant usage on schema @NAMESPACE@ to public; - diff --git a/src/backend/slony1_funcs.c b/src/backend/slony1_funcs.c index 0699cbc9..cb783055 100644 --- a/src/backend/slony1_funcs.c +++ b/src/backend/slony1_funcs.c @@ -42,6 +42,7 @@ #include "utils/memutils.h" #include "utils/hsearch.h" #include "utils/timestamp.h" +#include "utils/int8.h" #ifdef HAVE_GETACTIVESNAPSHOT #include "utils/snapmgr.h" #endif @@ -989,7 +990,14 @@ versionFunc(logApply)(PG_FUNCTION_ARGS) char *ddl_script; bool localNodeFound = true; Datum script_insert_args[5]; - char query[1024]; + Datum *nodeargs; + bool *nodeargsnulls; + int nodeargsn; + Datum *seqargs; + bool *seqargsnulls; + int seqargsn; + Datum array_holder; + Datum delim_text; apply_num_script++; @@ -1001,29 +1009,64 @@ versionFunc(logApply)(PG_FUNCTION_ARGS) SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); if (isnull) elog(ERROR, "Slony-I: log_cmdargs is NULL"); - deconstruct_array(DatumGetArrayTypeP(dat), TEXTOID, -1, false, 'i', - &cmdargs, &cmdargsnulls, &cmdargsn); + &cmdargs, &cmdargsnulls, &cmdargsn); + + if( cmdargsn < 3 ) + { + elog(ERROR,"Slony-I: DDL_SCRIPT events require at least 3 elements"\ + "in the argument array"); + } + nodeargs=NULL; + nodeargsn=0; + delim_text=DirectFunctionCall1(textin,CStringGetDatum(",")); + if ( (! cmdargsnulls[1]) ) + { + char * astr=DatumGetCString(DirectFunctionCall1(textout, + cmdargs[1])); + + if ( strcmp(astr,"")) + { + array_holder = DirectFunctionCall2(text_to_array,cmdargs[1], + delim_text); + deconstruct_array(DatumGetArrayTypeP(array_holder), + TEXTOID, -1, false, 'i', + &nodeargs, &nodeargsnulls, &nodeargsn); + } + } + seqargs=NULL; + seqargsn=0; + if ( (! cmdargsnulls[2]) ) + { + char * astr=DatumGetCString(DirectFunctionCall1(textout, + cmdargs[2])); + if( strcmp(astr,"") ) + { + array_holder = DirectFunctionCall2(text_to_array,cmdargs[2], + delim_text); + deconstruct_array(DatumGetArrayTypeP(array_holder), + TEXTARRAYOID, -1, false, 'i', + &seqargs, &seqargsnulls, &seqargsn); + } + } /* * The first element is the DDL statement itself. */ ddl_script = DatumGetCString(DirectFunctionCall1( textout, cmdargs[0])); - /* * If there is an optional node ID list, check that we are in it. */ - if (cmdargsn > 1) + if (nodeargsn > 0) { localNodeFound = false; - for (i = 1; i < cmdargsn; i++) + for (i = 0; i < nodeargsn; i++) { int32 nodeId = DatumGetInt32( DirectFunctionCall1(int4in, - DirectFunctionCall1(textout, cmdargs[i]))); - + DirectFunctionCall1(textout, nodeargs[i]))); if (nodeId == cs->localNodeId) { localNodeFound = true; @@ -1038,6 +1081,53 @@ versionFunc(logApply)(PG_FUNCTION_ARGS) */ if (localNodeFound) { + + char query[1024]; + Oid argtypes[3]; + argtypes[0] = INT4OID; + argtypes[1] = INT4OID; + argtypes[2] = INT8OID; + + snprintf(query,1023,"select %s.sequenceSetValue($1," \ + "$2,NULL,$3); ",tg->tg_trigger->tgargs[0]); + void * plan = SPI_prepare(query,3,argtypes); + if ( plan == NULL ) + { + + elog(ERROR,"could not prepare plan to call sequenceSetValue"); + } + /** + * before we execute the DDL we need to update the sequences. + */ + if ( seqargsn > 0 ) + { + + for( i = 0; (i+2) < seqargsn; i=i+3 ) + { + Datum call_args[3]; + bool call_nulls[3]; + call_args[0] = DirectFunctionCall1(int4in, + DirectFunctionCall1(textout,seqargs[i])); + call_args[1] = DirectFunctionCall1(int4in, + DirectFunctionCall1(textout,seqargs[i+1])); + call_args[2] = DirectFunctionCall1(int8in, + DirectFunctionCall1(textout,seqargs[i+2])); + + call_nulls[0]=0; + call_nulls[1]=0; + call_nulls[2]=0; + + if ( SPI_execp(plan,call_args,call_nulls,0) < 0 ) + { + elog(ERROR,"error executing sequenceSetValue plan"); + + } + + } + + + } + sprintf(query,"set session_replication_role to local;"); if(SPI_exec(query,0) < 0) { @@ -1606,7 +1696,6 @@ versionFunc(logApply)(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldContext); cacheEnt->typmod[i / 2] = target_rel->rd_att->attrs[colnum - 1]->atttypmod; - sprintf(applyQueryPos, "%s%s = $%d", (i > 0) ? " AND " : "", slon_quote_identifier(colname), diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql index 00aad224..dcbc9d91 100644 --- a/src/backend/slony1_funcs.sql +++ b/src/backend/slony1_funcs.sql @@ -3366,10 +3366,11 @@ begin execute 'select setval(''' || v_fqname || ''', ' || p_last_value::text || ')'; - insert into @NAMESPACE@.sl_seqlog + if p_ev_seqno is not null then + insert into @NAMESPACE@.sl_seqlog (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); - + end if; return p_seq_id; end; $$ language plpgsql; @@ -3391,10 +3392,13 @@ declare c_found_origin boolean; c_node text; c_cmdargs text[]; + c_nodeargs text; + c_delim text; begin c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); c_cmdargs = array_append('{}'::text[], p_statement); + c_nodeargs = ''; if p_nodes is not null then c_found_origin := 'f'; -- p_nodes list needs to consist of a list of nodes that exist @@ -3411,8 +3415,11 @@ begin if c_local_node = (c_node::integer) then c_found_origin := 't'; end if; - - c_cmdargs = array_append(c_cmdargs, c_node); + if length(c_nodeargs)>0 then + c_nodeargs = c_nodeargs ||','|| c_node; + else + c_nodeargs=c_node; + end if; end loop; if not c_found_origin then @@ -3421,15 +3428,24 @@ begin p_statement, p_nodes, c_local_node; end if; end if; - - execute p_statement; - + c_cmdargs = array_append(c_cmdargs,c_nodeargs); + c_delim=','; + c_cmdargs = array_append(c_cmdargs, + + (select @NAMESPACE@.string_agg( seq_id::text || c_delim + || c_local_node || + c_delim || seq_last_value) + FROM ( + select seq_id, + seq_last_value from @NAMESPACE@.sl_seqlastvalue + where seq_origin = c_local_node) as FOO + where NOT @NAMESPACE@.seqtrack(seq_id,seq_last_value) is NULL)); insert into @NAMESPACE@.sl_log_script (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) values (c_local_node, pg_catalog.txid_current(), nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs); - + execute p_statement; return currval('@NAMESPACE@.sl_action_seq'); end; $$ language plpgsql; @@ -6224,3 +6240,36 @@ begin return v_seq_id; end $$ language plpgsql; + + + +-- +-- we create a function + aggregate for string_agg to aggregate strings +-- some versions of PG (ie prior to 9.0) don't support this +CREATE OR replace function @NAMESPACE@.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS +$BODY$ +DECLARE + c_delim text; +BEGIN + c_delim = ','; + IF (txt_before IS NULL or txt_before='') THEN + RETURN txt_new; + END IF; + RETURN txt_before || c_delim || txt_new; +END; +$BODY$ +LANGUAGE plpgsql; +comment on function @NAMESPACE@.agg_text_sum(text,text) is +'An accumulator function used by the slony string_agg function to +aggregate rows into a string'; +-- +-- create a string_agg function in the slony schema. +-- PG 8.3 does not have this function so we make our own +-- when slony stops supporting PG 8.3 we can switch to +-- the PG 9.0+ provided version of string_agg +-- +CREATE AGGREGATE @NAMESPACE@.string_agg(text) ( +SFUNC=@NAMESPACE@.agg_text_sum, +STYPE=text, +INITCOND='' +); \ No newline at end of file