ALTER TABLE ONLY billing_discount
ADD CONSTRAINT billing_discount_pkey PRIMARY KEY (billing_discount_id);
+CREATE OR REPLACE FUNCTION insert_table1() returns trigger
+as $$
+declare
+
+begin
+ insert into table1(data) values (NEW.data);
+ return NEW;
+end;
+$$
+language plpgsql;
\ No newline at end of file
+"set add table (id=4, set id=1, origin=1, fully qualified name = 'public.table4');\n"
+"set add table (id=5, set id=1, origin=1, fully qualified name = 'public.table5');\n"
+"set add table (id=6, set id=1, origin=1, fully qualified name = 'public.billing_discount');\n"
+ + "set add sequence(id=1,set id=1, origin=1, fully qualified name= 'public.table1_id_seq');\n"
+ + "set add sequence(id=2,set id=1, origin=1, fully qualified name= 'public.table5_id_seq');\n"
+
return script;
}
run_slonik('update ddl',coordinator,preamble,slonikScript);
}
+function trigger_function(coordinator) {
+ /**
+ * We stop the slons because we want to make sure that a SYNC does not
+ * happen in between the EXECUTE_SCRIPT and the next SYNC.
+ */
+ terminate_slon(coordinator);
+ var sql = '';
+ for(var idx=0; idx < 1000; idx++) {
+ sql = sql + "insert into table5(data) values ('seqtest');\n"
+ }
+ var psql = coordinator.createPsqlCommand('db1',sql);
+ psql.run();
+ coordinator.join(psql);
+ premable = get_slonik_preamble();
+ slonikScript = "EXECUTE SCRIPT( SQL='alter table table1 drop column seqed;create trigger table5_trigger "
+ + " before INSERT on public.table5 for each row execute procedure "
+ + " insert_table1();'"
+ + ' ,EVENT NODE=1 );';
+ run_slonik('add trigger ddl',coordinator,preamble,slonikScript);
+ slonikScript = "EXECUTE SCRIPT( SQL='insert into table5(data) values (9);'"
+ + ' ,EVENT NODE=1);';
+ run_slonik('add trigger ddl',coordinator,preamble,slonikScript);
+
+ launch_slon(coordinator);
+
+}
+
function inline_ddl(coordinator) {
premable = get_slonik_preamble();
inline_ddl(coordinator);
wait_for_sync(coordinator);
+
+
+ trigger_function(coordinator);
+ wait_for_sync(coordinator);
+
}
function get_compare_queries() {
or updating rows to a table while a script changing that table (adding or
deleting columns) is also running.</para></listitem>
+<listitem><para>&slony1; 2.2.x and higher will replicate the SQL
+of an EXECUTE SCRIPT as part of a SYNC. Scripts that perfrom an "ALTER TABLE"
+to a replicated table will be replicated in the correct order with respect
+to other concurrent activities on that table because of the exclusive lock
+that the alter table received on the origin node. If your EXECUTE SCRIPT
+does not obtain exclusive locks on all of the tables it uses then
+you need to make sure that any transactions running concurrently with
+the script are not making changes that can effect the results of the script.
+For example, if your script does a nextval('some_replicated_seq') and
+that sequence is being incremented by another transactions at the same time then
+it is possible that script sees a different value from the sequence when
+it runs on a replica than it did on the origin.</para></listitem>
+
+
</itemizedlist>
</sect2>
-- ----------------------------------------------------------------------
grant usage on schema @NAMESPACE@ to public;
-
#include "utils/memutils.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
+#include "utils/int8.h"
#ifdef HAVE_GETACTIVESNAPSHOT
#include "utils/snapmgr.h"
#endif
char *ddl_script;
bool localNodeFound = true;
Datum script_insert_args[5];
- char query[1024];
+ Datum *nodeargs;
+ bool *nodeargsnulls;
+ int nodeargsn;
+ Datum *seqargs;
+ bool *seqargsnulls;
+ int seqargsn;
+ Datum array_holder;
+ Datum delim_text;
apply_num_script++;
SPI_fnumber(tupdesc, "log_cmdargs"), &isnull);
if (isnull)
elog(ERROR, "Slony-I: log_cmdargs is NULL");
-
deconstruct_array(DatumGetArrayTypeP(dat),
TEXTOID, -1, false, 'i',
- &cmdargs, &cmdargsnulls, &cmdargsn);
+ &cmdargs, &cmdargsnulls, &cmdargsn);
+
+ if( cmdargsn < 3 )
+ {
+ elog(ERROR,"Slony-I: DDL_SCRIPT events require at least 3 elements"\
+ "in the argument array");
+ }
+ nodeargs=NULL;
+ nodeargsn=0;
+ delim_text=DirectFunctionCall1(textin,CStringGetDatum(","));
+ if ( (! cmdargsnulls[1]) )
+ {
+ char * astr=DatumGetCString(DirectFunctionCall1(textout,
+ cmdargs[1]));
+
+ if ( strcmp(astr,""))
+ {
+ array_holder = DirectFunctionCall2(text_to_array,cmdargs[1],
+ delim_text);
+ deconstruct_array(DatumGetArrayTypeP(array_holder),
+ TEXTOID, -1, false, 'i',
+ &nodeargs, &nodeargsnulls, &nodeargsn);
+ }
+ }
+ seqargs=NULL;
+ seqargsn=0;
+ if ( (! cmdargsnulls[2]) )
+ {
+ char * astr=DatumGetCString(DirectFunctionCall1(textout,
+ cmdargs[2]));
+ if( strcmp(astr,"") )
+ {
+ array_holder = DirectFunctionCall2(text_to_array,cmdargs[2],
+ delim_text);
+ deconstruct_array(DatumGetArrayTypeP(array_holder),
+ TEXTARRAYOID, -1, false, 'i',
+ &seqargs, &seqargsnulls, &seqargsn);
+ }
+ }
/*
* The first element is the DDL statement itself.
*/
ddl_script = DatumGetCString(DirectFunctionCall1(
textout, cmdargs[0]));
-
/*
* If there is an optional node ID list, check that we are in it.
*/
- if (cmdargsn > 1)
+ if (nodeargsn > 0)
{
localNodeFound = false;
- for (i = 1; i < cmdargsn; i++)
+ for (i = 0; i < nodeargsn; i++)
{
int32 nodeId = DatumGetInt32(
DirectFunctionCall1(int4in,
- DirectFunctionCall1(textout, cmdargs[i])));
-
+ DirectFunctionCall1(textout, nodeargs[i])));
if (nodeId == cs->localNodeId)
{
localNodeFound = true;
*/
if (localNodeFound)
{
+
+ char query[1024];
+ Oid argtypes[3];
+ argtypes[0] = INT4OID;
+ argtypes[1] = INT4OID;
+ argtypes[2] = INT8OID;
+
+ snprintf(query,1023,"select %s.sequenceSetValue($1," \
+ "$2,NULL,$3); ",tg->tg_trigger->tgargs[0]);
+ void * plan = SPI_prepare(query,3,argtypes);
+ if ( plan == NULL )
+ {
+
+ elog(ERROR,"could not prepare plan to call sequenceSetValue");
+ }
+ /**
+ * before we execute the DDL we need to update the sequences.
+ */
+ if ( seqargsn > 0 )
+ {
+
+ for( i = 0; (i+2) < seqargsn; i=i+3 )
+ {
+ Datum call_args[3];
+ bool call_nulls[3];
+ call_args[0] = DirectFunctionCall1(int4in,
+ DirectFunctionCall1(textout,seqargs[i]));
+ call_args[1] = DirectFunctionCall1(int4in,
+ DirectFunctionCall1(textout,seqargs[i+1]));
+ call_args[2] = DirectFunctionCall1(int8in,
+ DirectFunctionCall1(textout,seqargs[i+2]));
+
+ call_nulls[0]=0;
+ call_nulls[1]=0;
+ call_nulls[2]=0;
+
+ if ( SPI_execp(plan,call_args,call_nulls,0) < 0 )
+ {
+ elog(ERROR,"error executing sequenceSetValue plan");
+
+ }
+
+ }
+
+
+ }
+
sprintf(query,"set session_replication_role to local;");
if(SPI_exec(query,0) < 0)
{
MemoryContextSwitchTo(oldContext);
cacheEnt->typmod[i / 2] =
target_rel->rd_att->attrs[colnum - 1]->atttypmod;
-
sprintf(applyQueryPos, "%s%s = $%d",
(i > 0) ? " AND " : "",
slon_quote_identifier(colname),
execute 'select setval(''' || v_fqname ||
''', ' || p_last_value::text || ')';
- insert into @NAMESPACE@.sl_seqlog
+ if p_ev_seqno is not null then
+ insert into @NAMESPACE@.sl_seqlog
(seql_seqid, seql_origin, seql_ev_seqno, seql_last_value)
values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value);
-
+ end if;
return p_seq_id;
end;
$$ language plpgsql;
c_found_origin boolean;
c_node text;
c_cmdargs text[];
+ c_nodeargs text;
+ c_delim text;
begin
c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@');
c_cmdargs = array_append('{}'::text[], p_statement);
+ c_nodeargs = '';
if p_nodes is not null then
c_found_origin := 'f';
-- p_nodes list needs to consist of a list of nodes that exist
if c_local_node = (c_node::integer) then
c_found_origin := 't';
end if;
-
- c_cmdargs = array_append(c_cmdargs, c_node);
+ if length(c_nodeargs)>0 then
+ c_nodeargs = c_nodeargs ||','|| c_node;
+ else
+ c_nodeargs=c_node;
+ end if;
end loop;
if not c_found_origin then
p_statement, p_nodes, c_local_node;
end if;
end if;
-
- execute p_statement;
-
+ c_cmdargs = array_append(c_cmdargs,c_nodeargs);
+ c_delim=',';
+ c_cmdargs = array_append(c_cmdargs,
+
+ (select @NAMESPACE@.string_agg( seq_id::text || c_delim
+ || c_local_node ||
+ c_delim || seq_last_value)
+ FROM (
+ select seq_id,
+ seq_last_value from @NAMESPACE@.sl_seqlastvalue
+ where seq_origin = c_local_node) as FOO
+ where NOT @NAMESPACE@.seqtrack(seq_id,seq_last_value) is NULL));
insert into @NAMESPACE@.sl_log_script
(log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs)
values
(c_local_node, pg_catalog.txid_current(),
nextval('@NAMESPACE@.sl_action_seq'), 'S', c_cmdargs);
-
+ execute p_statement;
return currval('@NAMESPACE@.sl_action_seq');
end;
$$ language plpgsql;
return v_seq_id;
end
$$ language plpgsql;
+
+
+
+--
+-- we create a function + aggregate for string_agg to aggregate strings
+-- some versions of PG (ie prior to 9.0) don't support this
+CREATE OR replace function @NAMESPACE@.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS
+$BODY$
+DECLARE
+ c_delim text;
+BEGIN
+ c_delim = ',';
+ IF (txt_before IS NULL or txt_before='') THEN
+ RETURN txt_new;
+ END IF;
+ RETURN txt_before || c_delim || txt_new;
+END;
+$BODY$
+LANGUAGE plpgsql;
+comment on function @NAMESPACE@.agg_text_sum(text,text) is
+'An accumulator function used by the slony string_agg function to
+aggregate rows into a string';
+--
+-- create a string_agg function in the slony schema.
+-- PG 8.3 does not have this function so we make our own
+-- when slony stops supporting PG 8.3 we can switch to
+-- the PG 9.0+ provided version of string_agg
+--
+CREATE AGGREGATE @NAMESPACE@.string_agg(text) (
+SFUNC=@NAMESPACE@.agg_text_sum,
+STYPE=text,
+INITCOND=''
+);
\ No newline at end of file