Automatic Wait For.
authorSteve Singer <ssinger@ca.afilias.info>
Wed, 4 May 2011 15:14:25 +0000 (11:14 -0400)
committerSteve Singer <ssinger@ca.afilias.info>
Wed, 4 May 2011 15:14:25 +0000 (11:14 -0400)
This is a merge of the auto_wait_for feature branch.

This change modifies slonik so that it will (in normal circumstances)
automatically wait for events to be confirmed before proceeding in
cases where it detects that this is required.

See the documentation changes included in this patch for more details.

21 files changed:
clustertest/disorder/tests/BasicTest.js
clustertest/disorder/tests/ExecuteScript.js
clustertest/disorder/tests/FailNodeTest.js
clustertest/disorder/tests/Failover.js
clustertest/disorder/tests/HeavyLoadTest.js
clustertest/disorder/tests/LogShipping.js
clustertest/disorder/tests/LongTransaction.js
clustertest/disorder/tests/MergeSet.js [new file with mode: 0644]
clustertest/disorder/tests/MultipleOrigins.js
clustertest/disorder/tests/OmitCopy.js
clustertest/disorder/tests/RecreateSet.js
clustertest/disorder/tests/RenameTests.js
clustertest/disorder/tests/Unsubscribe.js
clustertest/disorder/tests/disorder_tests.js
doc/adminguide/addthings.sgml
doc/adminguide/events.sgml
doc/adminguide/slonik.sgml
doc/adminguide/slonik_ref.sgml
src/backend/slony1_funcs.c
src/backend/slony1_funcs.sql
src/slonik/slonik.c

index b0e17f211ab946bcd54dbd68d513794d5b737737..34f0029fa917a75e1140c92c5585fd2d5f49b2cd 100644 (file)
@@ -480,11 +480,11 @@ BasicTest.prototype.subscribeSetBackground = function(setid, origin_node,
                slonikScript += ' subscribe set(id=' + setid + ', provider='
                                + provider_node + ', receiver=' + subscriber_node
                                + ', forward=yes);\n';
-               slonikScript += this.generateSlonikWait(origin_node);
-               slonikScript += ' echo \'syncing\';\n';
-               slonikScript += ' sync(id=' + provider_node + ');\n';
-               slonikScript += ' echo \'waiting for event\';\n';
-               slonikScript += this.generateSlonikWait(provider_node);
+               //slonikScript += this.generateSlonikWait(origin_node);
+               //slonikScript += ' echo \'syncing\';\n';
+               //slonikScript += ' sync(id=' + provider_node + ');\n';
+               //slonikScript += ' echo \'waiting for event\';\n';
+               //slonikScript += this.generateSlonikWait(provider_node);
                slonikScript += ' echo \'finished subscribing ' + subscriber_node +'\' ;\n';
 
                var slonik = this.coordinator.createSlonik('subscribe ', preamble,
index 61bcc67e8d3fa97ffa474af122a98e245adf7e21..32f8879d918c730406169950ba2a21703c71982b 100644 (file)
@@ -43,6 +43,7 @@ ExecuteScript.prototype.runTest = function() {
         */
        this.subscribeSet(1, 1,1, [ 2, 3 ]);
        this.subscribeSet(1, 1,3, [ 4, 5 ]);
+       this.slonikSync(1,1);
 
        this.testAddDropColumn(1, 1, false);
 
@@ -64,6 +65,8 @@ ExecuteScript.prototype.runTest = function() {
        this.subscribeSet(2, 2,3, [ 4, 5 ]);
 
        this.coordinator.log("ExecuteScript.prototype.runTest - move set to node 1");
+
+
        /**
         * Move the set to node 1. We want to do this for the next test.
         */
@@ -173,7 +176,7 @@ ExecuteScript.prototype.testAddDropColumn = function(setid, eventNode,
         */
        var load = this.generateLoad();
 
-       this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders");
+       this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders - expecting failure:" + expectFailure);
        /**
         * Now add a column to orders. We will do this via EXECUTE SCRIPT.
         */
@@ -406,7 +409,7 @@ ExecuteScript.prototype.testDDLFailure = function() {
        var lag = this.measureLag(1,2);
        this.coordinator.log('we do not expect lag, we measure it as ' + lag);
        this.testResults.assertCheck('node is not lagged', lag < 10,true);
-       
+       this.slonikSync(1,1);
        this.dropTestTable(1,1,false);
        
        statement.close();
index 6bd02331433b1443bdb3f180c31040aed63af40d..c3936c021c80c512648a52da8f8ff955692f531b 100644 (file)
@@ -88,10 +88,15 @@ FailNodeTest.prototype.runTest = function() {
         * DROP 3 node.
         * We expect it to fail since node 3 is still a provider
         * for nodes 4,5.
+        * 
+        * perform the slonikSync to make sure node 4,5 is caught up.
+        * otherwise slonik will wait before actually submitting the
+        * dropNode but since the slon will be stopped by this.failNode()
+        * the event might never get propogated.
         */
+       this.slonikSync(1,1);
        this.coordinator.log('failing node 3');
-       this.failNode(3,true);
-       
+
        /**
         * Readd all paths, some might have been deleted and not re-added
         * when we deleted nodes above.
@@ -99,6 +104,9 @@ FailNodeTest.prototype.runTest = function() {
         * We also have to restart the slons because of bug # 120
         */
        this.addCompletePaths();
+
+       this.failNode(3,true);
+       
        /**
         * Sleep a bit.
         * Do we need to do this for the paths to propogate????
@@ -114,39 +122,20 @@ FailNodeTest.prototype.runTest = function() {
        }
        this.coordinator.log("FailNodeTest.prototype.runTest - sleeping 60x1000");
        java.lang.Thread.sleep(60*1000);
-       /**
-        * Replace the generateSlonikWait function with a version that 
-        * does individual wait for event(..) statements instead of 
-        * a confirmed=all since we do not want to be waiting on node 3
-        * since we just destroyed it.
-        */
-       var originalGenerateWait = this.generateSlonikWait;
-       this.generateSlonikWait=function(event_node) {
-               var script='';
-               for(var idx=1; idx <= this.getNodeCount(); idx ++) {
-                       if(idx==3||idx==event_node) {
-                               continue;
-                       }
-                       script += "echo 'waiting on confirm from " + idx + "';\n";
-                       script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n';
-               }
-               return script;
-       }
+
        /**
         * SUBSCRIBE nodes 4,5 via node 1 directly. 
         */
        
        this.coordinator.log("FailNodeTest.prototype.runTest - subscribe 4,5 via 1");
-       this.subscribeSet(1,1,1,[4,5]);
-       
+       this.subscribeSet(1,1,1,[4,5]); 
        
        
        /**
         * Now we should be able to drop node 3.
         */     
-       this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3");
+       this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3");   
        this.failNode(3,false);
-       this.generateSlonikWait=originalGenerateWait;
                
        
        load.stop();
@@ -175,7 +164,7 @@ FailNodeTest.prototype.runTest = function() {
        java.lang.Thread.sleep(10*1000);
        load.stop();
        this.coordinator.join(load);
-       this.addCompletePaths();
+       //this.addCompletePaths();
        this.slonikSync(1,1);
        this.slonikSync(1,2);   
        this.slonikSync(1,4);
@@ -201,25 +190,7 @@ FailNodeTest.prototype.runTest = function() {
        this.coordinator.log("FailNodeTest.prototype.runTest - drop DB2");
        //Now DROP the database. This lets us simulate a hard failure.
        this.dropDb(['db2']);
-       /**
-        * Replace the generateSlonikWait function with a version that 
-        * does individual wait for event(..) statements instead of 
-        * a confirmed=all since we do not want to be waiting on node 3
-        * since we just destroyed it.
-        */
-       var originalGenerateWait = this.generateSlonikWait;
-       this.generateSlonikWait=function(event_node) {
-               var script='';
-               for(var idx=1; idx <= this.getNodeCount(); idx ++) {
-                       if(idx==2|| idx==3|| idx==event_node) {
-                               continue;
-                       }
-                       script += "echo 'waiting on confirm from " + idx + "';\n";
-                       script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n';
-               }
-               return script;
-       }
-       
+
        this.coordinator.log("FailNodeTest.prototype.runTest - reshape cluster");
        //Now reshape the cluster.
        this.subscribeSet(1,1,1,[4,5]);
@@ -255,16 +226,17 @@ FailNodeTest.prototype.runTest = function() {
  */
 FailNodeTest.prototype.failNode=function(nodeId, expectFailure) {
        this.coordinator.log("FailNodeTest.prototype.FailNodeTest - begin");
-       this.slonArray[nodeId-1].stop();
-       this.coordinator.join(this.slonArray[nodeId-1]);
+
        var slonikPreamble = this.getSlonikPreamble();
        var slonikScript = 'echo \'FailNodeTest.prototype.failNode\';\n';
-       slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n';
+       slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n'
+       + 'uninstall node(id=' + nodeId + ');\n';
+
        for(var idx=2; idx <= this.getNodeCount(); idx++) {
                if(idx == nodeId) {
                        continue;
                }
-               slonikScript +=  'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n';
+               //              slonikScript +=  'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n';
        }
                
        var slonik=this.coordinator.createSlonik('drop node',slonikPreamble,slonikScript);
@@ -276,6 +248,8 @@ FailNodeTest.prototype.failNode=function(nodeId, expectFailure) {
        else { 
                this.testResults.assertCheck('drop node okay',slonik.getReturnCode(),0);
        }
+       this.slonArray[nodeId-1].stop();
+       this.coordinator.join(this.slonArray[nodeId-1]);
        this.coordinator.log("FailNodeTest.prototype.FailNodeTest - complete");
 }
 
index d3e8dab526d4e8e99f34df3718a7983874b7c300..d3d0fce64a8e362f2b89a959971404c6178af154 100644 (file)
@@ -42,7 +42,7 @@ Failover.prototype.runTest = function() {
         */
        this.subscribeSet(1,1, 1, [ 2, 3 ]);
        this.subscribeSet(1,1, 3, [ 4, 5 ]);
-
+       this.slonikSync(1,1);
 
        var load = this.generateLoad();
        
@@ -81,11 +81,10 @@ Failover.prototype.runTest = function() {
         * 
         * Re resubscribe node 2 to receive from node 3 before the
         * FAILOVER this should test the more simple case.
-        */
-       this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3');
+        */     this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3');
        this.addCompletePaths();                
        this.subscribeSet(1,1,3,[2]);
-       
+       this.slonikSync(1,1);
        this.failNode(1,3,true);        
        java.lang.Thread.sleep(10*1000);
        load.stop();
@@ -93,7 +92,6 @@ Failover.prototype.runTest = function() {
        this.dropNode(1,3);
                
        this.reAddNode(1,3,3);
-       
        this.addCompletePaths();
        
        this.moveSet(1,3,1);
@@ -144,57 +142,63 @@ Failover.prototype.runTest = function() {
        //this.generateSlonikWait=oldWait;
        load.stop();
        this.coordinator.join(load);
-       
        this.dropNode(1,3);
        //??this.coordinator.join(subscribeSlonik[0]);
        
        
        this.reAddNode(1,3,3);
        
-       /**
-        * Now Shutdown the slon for node 4. 
-        */
-       this.coordinator.log('PROGRESS:Shutting down node 4');
-       this.slonArray[3].stop();
-       this.coordinator.join(this.slonArray[3]);
        
-       /**
-        * How does the failure behave when the slon for node 4 is down?
-        *  
-        * Well for 1 the 'wait for event' to node 4 won't recover.
-        * 
-        */
-       this.failNode(1,3,false);
-       
-       this.slonArray[3] = this.coordinator.createSlonLauncher('db4');
-       this.slonArray[3].run();
-       java.lang.Thread.sleep(10*1000);
-       this.dropNode(1,3);
-       this.reAddNode(1,3,3);
+       this.slonikSync(1,1);
        this.compareDb('db1', 'db2');
        this.compareDb('db1', 'db3');
        this.compareDb('db1', 'db4');
        this.addCompletePaths();
        this.moveSet(1,3,1)
-       load = this.generateLoad();
+
        /**
         * Now shutdown the slon for node 3, see how a failover to node 3 behaves.
         */
        this.coordinator.log('PROGRESS:shutting down node 3 for a failover test');
        this.slonArray[2].stop();
        this.coordinator.join(this.slonArray[2]);
-       load.stop();
-       this.coordinator.join(load);
-       this.failNode(1,3,false);
-       this.coordinator.log('PROGRESS:starting slon 3 back up');
-       this.slonArray[2] = this.coordinator.createSlonLauncher('db3');
-       this.slonArray[2].run();
+       /**
+        * create a timer event.
+        * in 60 seconds we will start up the slon again.
+        * the failover should not complete with the slon shutdown
+        * (at least not the 2.1 version of failover).
+        */
+               
+
+       this.coordinator.log('PROGRESS:load has stopped');
+       var thisRef=this;       
        /**
         * The failover needs to propogate before the DROP NODE or things can fail.
         */
-       java.lang.Thread.sleep(10*1000);
+        var onTimeout = {
+                onEvent : function(object, event) {
+                               thisRef.coordinator.log('PROGRESS:starting slon 3 back up');
+                               thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3');
+                               thisRef.slonArray[2].run();
+
+
+               }
+       };
+       var timeoutObserver = new Packages.info.slony.clustertest.testcoordinator.script.ExecutionObserver(onTimeout);
+       var timer = this.coordinator.addTimerTask('restart slon', 120,
+                                                                                timeoutObserver);
+       this.failNode(1,3,true);
+       this.coordinator.removeObserver(timer,
+                                                                               Packages.info.slony.clustertest.testcoordinator.Coordinator.EVENT_TIMER,
+                                                                               timeoutObserver);
+       if(this.slonArray[2].isFinished()) {
+                       thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3');
+                       thisRef.slonArray[2].run();
+       }
+
        this.dropNode(1,3);
        this.reAddNode(1,3,3);  
+               this.slonikSync(1,1);
        this.compareDb('db1', 'db2');
        this.compareDb('db1', 'db3');
        this.compareDb('db1', 'db4');
@@ -223,6 +227,8 @@ Failover.prototype.runTest = function() {
        slonik.run();
        this.coordinator.join(slonik);
        this.testResults.assertCheck('drop path from 1 to 4',slonik.getReturnCode(),0);
+          
+       this.slonikSync(1,1);
        this.failNode(1,4,true);
        
        this.compareDb('db2','db4');
@@ -233,13 +239,13 @@ Failover.prototype.runTest = function() {
        this.reAddNode(1,4,4);
        
        
-       
+       this.slonikSync(1,1);   
        for ( var idx = 1; idx <= this.getNodeCount(); idx++) {
                this.slonArray[idx - 1].stop();
                this.coordinator.join(this.slonArray[idx - 1]);
        }
-       
-       this.compareDb('db1', 'db2');
+
+       this.compareDb('db1','db2');
        this.compareDb('db1', 'db3');
        this.compareDb('db1', 'db4');
        this.compareDb('db4','db3');
@@ -249,9 +255,12 @@ Failover.prototype.runTest = function() {
 }
 
 Failover.prototype.failNode=function(node_id,backup_id, expect_success) {
-       
+
        this.slonArray[node_id-1].stop();
-       this.coordinator.join(this.slonArray[node_id-1]);
+       if(!this.slonArray[node_id-1].isFinished()) {
+               this.coordinator.join(this.slonArray[node_id-1]);
+       }
+       
        
        var slonikPreamble = this.getSlonikPreamble();
        var slonikScript = 'echo \'Failover.prototype.failNode\';\n';
@@ -266,8 +275,8 @@ Failover.prototype.failNode=function(node_id,backup_id, expect_success) {
                        if(waitIdx==idx || waitIdx==node_id) {
                                continue;
                        }
-                       slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n";
-                       slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n";
+                       //autowaitfor                   slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n";
+                       //autowaitfor                   slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n";
                }
        }
                //+ 'sync(id=' + backup_id + ');\n'
@@ -326,7 +335,7 @@ Failover.prototype.addCompletePaths = function() {
 }
 
 Failover.prototype.dropNode=function(node_id,event_node) {
-        this.coordinator.log("Failover.prototype.dropNode - begin");
+       this.coordinator.log("Failover.prototype.dropNode - begin");
        var slonikPreamble = this.getSlonikPreamble();
        var slonikScript = 'echo \'Failover.prototype.dropNode\';\n';
        slonikScript += 'DROP NODE(id='  + node_id  + ',event node=' + event_node +');\n';
@@ -340,5 +349,5 @@ Failover.prototype.dropNode=function(node_id,event_node) {
        slonik.run();
        this.coordinator.join(slonik);
        this.testResults.assertCheck('slonik drop node status okay',slonik.getReturnCode(),0);
-        this.coordinator.log("Failover.prototype.dropNode - complete");
+    this.coordinator.log("Failover.prototype.dropNode - complete");
 }
index b8ddd4dd7f367169539fb10ff2e7f2abc8dca778..bc6ee630f3ce852076eab06c2e429fdb24f4c3cf 100644 (file)
@@ -72,6 +72,8 @@ HeavyLoadTest.prototype.runTest = function() {
        
        var dumpFile = java.io.File.createTempFile('slon_HeavyLoadTest','.sql');
        dumpFile.deleteOnExit();
+       //wait until db4 is subscribed before creating the dump
+       this.slonikSync(1,1);   
        var dumpProcess = this.coordinator.createLogShippingDump('db4',dumpFile);
        dumpProcess.run();
        this.coordinator.join(dumpProcess);
index 9fa6b8dbc30af4859f721f8e54692824070ef27d..a08f3fc1772b911d58dc724d50d94712fb17e2aa 100644 (file)
@@ -60,7 +60,7 @@ LogShipping.prototype.runTest = function() {
        this.subscribeSet(1,1,1,[3]);
        java.lang.Thread.sleep(10*1000);
        this.subscribeSet(1,1,3,[4]);
-       
+       this.slonikSync(1,1);
         this.coordinator.log("LogShipping.prototype.runTest - generate load");
        //Generate some load.
        var populate=this.generateLoad();
index fb51939226f93591e44eb508b6b3b536b084cf16..faeb432e35865f9cce2acfaa89f0a555b53d5675 100644 (file)
@@ -44,6 +44,10 @@ LongTransaction.prototype.runTest = function() {
        slonik.run();
        java.lang.Thread.sleep(10*1000);
        this.testResults.assertCheck('transaction is blocking add table command',slonik.isFinished(),false);
+
+
+
+
        txnConnection.rollback();
        txnConnection.close();
        this.coordinator.join(slonik);
@@ -68,6 +72,7 @@ LongTransaction.prototype.runTest = function() {
        }
        //A transaction should not block the subscription.
        //make sure this is the case.
+        txnConnection = this.startTransaction();
         this.coordinator.log("LongTransaction.prototype.runTest - sleep 3x60x1000");
        java.lang.Thread.sleep(3*60*1000);
        for(var idx=0; idx < subs.length; idx++) {
@@ -109,7 +114,7 @@ LongTransaction.prototype.runTest = function() {
 LongTransaction.prototype.startTransaction=function() {
        var dbCon = this.coordinator.createJdbcConnection('db1');
        var stat = dbCon.createStatement();
-       stat.execute('BEGIN;');
+       dbCon.setAutoCommit(false);
        var  rs= stat.executeQuery('SELECT COUNT(*) FROM disorder.do_customer;');
        rs.close();
        stat.close();
diff --git a/clustertest/disorder/tests/MergeSet.js b/clustertest/disorder/tests/MergeSet.js
new file mode 100644 (file)
index 0000000..3cedbc5
--- /dev/null
@@ -0,0 +1,75 @@
+/**
+ * Tests a basic merge set.
+ */
+coordinator.includeFile("disorder/tests/BasicTest.js");
+
+
+function MergeSet(coordinator,results) {
+       BasicTest.call(this,coordinator,results);
+       this.syncWaitTime = 60;
+       this.testDescription = 'This test exercises the merge set command \n';
+}
+
+MergeSet.prototype = new BasicTest();
+MergeSet.prototype.constructor=MergeSet;
+MergeSet.prototype.getNodeCount = function() {
+       return 5;
+}
+
+MergeSet.prototype.runTest = function() {
+        this.coordinator.log("MergeSet.prototype.runTest - begin");
+
+       this.testResults.newGroup("merge set");
+       //this.prepareDb(['db1','db2']);
+
+       
+//First setup slony
+        this.coordinator.log("MergeSet.prototype.runTest - set up replication");
+       this.setupReplication();
+       this.addCompletePaths();
+       this.addTables();
+       
+        this.coordinator.log("MergeSet.prototype.runTest - start slons");
+       //Start the slons.
+       //These must be started before slonik runs or the subscribe won't happen
+       //thus slonik won't finish.
+       var slonArray=[];
+       for(var idx=1; idx <= this.getNodeCount(); idx++) {
+               slonArray[idx-1] = this.coordinator.createSlonLauncher('db' + idx);
+               slonArray[idx-1].run();
+       }
+       this.createSecondSet(1);
+       var load = this.generateLoad();
+       java.lang.Thread.sleep(5*1000);
+       this.subscribeSet(1,1,1,[2,3]);
+       this.subscribeSet(1,1,3,[4,5]);
+       this.subscribeSet(2,1,1,[2,3]);
+       this.subscribeSet(2,1,3,[4,5]);
+       var mergeScript="merge set(id=1,add id=2,origin=1);";
+       var slonikPreamble=this.getSlonikPreamble();
+       var slonik = this.coordinator.createSlonik('merge set',slonikPreamble,mergeScript);
+       slonik.run();
+       this.coordinator.join(slonik);
+       this.testResults.assertCheck('merge set okay',slonik.getReturnCode(),
+                                    0);
+
+       load.stop();
+       this.coordinator.join(load);
+        this.coordinator.log("MergeSet.prototype.runTest - subscriptions complete");
+       this.slonikSync(1,1);
+       this.coordinator.log("MergeSet.prototype.runTest - syncing complete");
+       this.compareDb('db1','db2');
+       this.compareDb('db1','db3');
+       this.compareDb('db1','db4');
+       this.compareDb('db1','db5');
+       
+       for(var idx=1; idx <= this.getNodeCount(); idx++) {
+               slonArray[idx-1].stop();
+               this.coordinator.join(slonArray[idx-1]);        
+       }
+        this.coordinator.log("MergeSet.prototype.runTest - complete");
+}
+
+MergeSet.prototype.getSyncWaitTime = function () {
+       return this.syncWaitTime;
+}
index 8860d8b0db0167742e05cf3e1223b79ad0fed450..79872bbb94d39d7d0d3bb212df974b75661fb8f2 100644 (file)
@@ -87,7 +87,9 @@ MultipleOrigins.prototype.runTest = function() {
        this.compareDb('db1','db3');
        
        
-       this.moveSet(1,4,1);    
+       this.moveSet(1,4,1);
+       this.slonikSync(1,1);
+       this.slonikSync(1,4);
        this.failNode(1,4,true);
        
        for(var idx=1; idx <= this.getNodeCount(); idx++) {             
index ce0b0b3fee99907aaf38a08187b14fbcbf9a3798..1433cf0eec7db4d650316c5e2bad04ce02377c2c 100644 (file)
@@ -137,7 +137,6 @@ OmitCopy.prototype.subscribeOmitCopy=function(origin,provider,subscriberNodeId,o
        var slonikPreamble = this.getSlonikPreamble();
         var slonikScript = 'echo \'OmitCopy.prototype.subscribeOmitCopy\';\n';
        slonikScript += "subscribe set(id=1, provider=" + provider+", receiver=" + subscriberNodeId+", omit copy=true, forward=yes);\n";
-       slonikScript += ' wait for event (origin='+origin+', wait on='+provider+',confirmed=all);\n';
        
        var slonik=this.coordinator.createSlonik('omit copy subscribe',slonikPreamble,slonikScript);
        slonik.run();
index 911610dc3e386ffe7cfa39f3ce4c1d77b0805a05..98fc0ce41f3ef83ddca0d3a816e2c0cfb307c1be 100644 (file)
@@ -67,8 +67,7 @@ RecreateSet.prototype.runTest = function() {
         * due to the lock on set 1.
         */
        var slonikPreamble = this.getSlonikPreamble();
-       var slonikScript = 'drop set (id=2, origin=1);\n'
-       + 'wait for event(origin=1, confirmed=2,wait on=1);\n';
+       var slonikScript = 'drop set (id=2, origin=1);\n';
 
        var slonik=this.coordinator.createSlonik('drop set 1',slonikPreamble,slonikScript);     
        slonik.run();
index 913fbb9fb0b362ff20eecf4029255130c9a4b3f5..818e2d85ac48b7a0e2d551073ab561def4420cd3 100644 (file)
@@ -40,7 +40,7 @@ RenameTests.prototype.runTest = function() {
        this.subscribeSet(1,1, 1, [ 2, 3 ]);
        this.subscribeSet(1,1, 3, [ 4, 5 ]);
 
-
+       
        /**
          * Create the test_transient table.
          */
@@ -48,7 +48,6 @@ RenameTests.prototype.runTest = function() {
        this.createAndReplicateTestTable();
        
        
-       
        /**
         * Add a row to the table.
         */
@@ -60,7 +59,7 @@ RenameTests.prototype.runTest = function() {
         * Ensure that changes to the table still get
         * replicated.
         */
-                       
+       this.slonikSync(1,1);           
        this.executeScript("ALTER TABLE disorder.test_transient RENAME TO test_transient2;\n");
        
         
index 9448538bcb00761ddf942213b46b091990b6b689..208fa074bc4b246bab297fc036396c33f1584864 100644 (file)
@@ -106,8 +106,8 @@ Unsubscribe.prototype.unsubscribe=function(node_id,set_id,expect_success) {
         this.coordinator.log("Unsubscribe.prototype.unsubscribe - begin");     
        var slonikPreamble = this.getSlonikPreamble();
         var slonikScript = 'echo \'Unsubscribe.prototype.unsubscribe\';\n';
-        slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n'
-               + 'wait for event(origin=' + node_id + ',wait on=' + node_id + ',confirmed=all);\n';
+        slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n';
+
        var slonik = this.coordinator.createSlonik('unsubscribe ' , slonikPreamble,slonikScript);
        slonik.run();
        this.coordinator.join(slonik);
index 521e1ddc5a7e2c2c771e5384dbf0ce112a07cf7e..4e869f12e8e27258e07d7a0a21643d448484400a 100644 (file)
@@ -22,6 +22,7 @@ coordinator.includeFile('disorder/tests/LongTransaction.js');
 coordinator.includeFile('disorder/tests/RenameTests.js');
 coordinator.includeFile('disorder/tests/CleanupTest.js');
 coordinator.includeFile('disorder/tests/RecreateSet.js');
+coordinator.includeFile('disorder/tests/MergeSet.js');
 var tests = 
     [new EmptySet(coordinator,results)
      ,new OmitCopy(coordinator,results)
@@ -44,14 +45,15 @@ var tests =
      ,new BigBacklogTest(coordinator,results)
      ,new LongTransaction(coordinator,results)
      ,new RenameTests(coordinator,results)
-
+     ,new MergeSet(coordinator,results)
         //Below tests are known to fail.
         ,new UnsubscribeBeforeEnable(coordinator,results)
      ,new DropSet(coordinator,results) //fails bug 133
      ,new CleanupTest(coordinator,results) //cleanup_interval does not (yet) do what the test wants
     ];
 
-//tests=[new CleanupTest(coordinator,results)];
+//tests=[new MergeSet(coordinator,results)];
+
 var basicTest = new BasicTest(coordinator,results);
 
 //Setup the schema.
index 557fd45e71526ced829f027697befe02bb4e0caa..76a2af7eb8203bfab44e1a87aceb890df3b486c5 100644 (file)
@@ -36,7 +36,6 @@ slonik <<_EOF_
 create set (id=2, origin=1, comment='a second replication set');
 set add table (set id=2, origin=1, id=5, fully qualified name = 'public.newtable', comment='some new table');
 subscribe set(id=1, provider=1,receiver=2);
-wait for event(origin=1, confirmed=all, wait on=1);
 merge set(id=1, add id=2,origin=1);
 
 </programlisting>
@@ -171,7 +170,6 @@ node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony';
 clustername=testcluster;
 
 store node(id=5,comment='some slave node',event node=1);
-wait for event(origin=1, confirmed=all, wait on=1);
 </programlisting>
 
 </para></listitem>
@@ -181,11 +179,12 @@ the <xref linkend="stmtstorepath"> command.
 node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony';
 clustername=testcluster;
 node 1 admin conninfo='host=masterhost dbname=masterdb user=slony password=slony';
+# also include the admin conninfo lines for any other nodes in your cluster.
+#
+#
 clustername=testcluster;
 store path(server=1,client=5,conninfo='host=masterhost,dbname=masterdb,user=slony,password=slony');
 store path(server=5,client=1,conninfo='host=slavehost,dbname=masterdb,user=slony,password=slony');
-wait for event(origin=1,confirmed=all,wait on=1);
-wait for event(origin=5,confirmed=all,wait on=1);
 </programlisting>
 
 </para></listitem>
@@ -195,9 +194,12 @@ wait for event(origin=5,confirmed=all,wait on=1);
 node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony';
 clustername=testcluster;
 node 1 admin conninfo='host=masterhost dbname=slavedb user=slony password=slony';
+#
+# also include the admin conninfo lines for any other nodes in the cluster
+#
+#
 clustername=testcluster;
 subscribe set(id=1,provider=1, receiver=5,forward=yes);
-wait for event(origin=1, confirmed=all, wait on=1);
 </programlisting>
 
 </orderedlist>
@@ -232,9 +234,8 @@ store path(server=3,client=2,conninfo='host=slave3host,dbname=slave3db,user=slon
 store path(server=2,client=3,conninfo='host=slave2host,dbname=slave2db,user=slony,password=slony');
 
 subscribe set(set id=1, provider=1, receiver=2,forward=yes);
-wait for event(origin=1, confirmed=all, wait on=1);
 subscribe set (set id=1,provider=2, receiver=3,forward=yes);
-wait for event(origin=1,confirmed=all,wait on=1);
+wait for event(origin=1, confirmed=all, wait on=1);
 </programlisting>
 <para>
 In the above example we define paths from 1==>2 and from 2==>3 but do
index 6162ce76b3c2a02bfc393baa33b34d3d696ea64f..6c4f75042a5f03b0d02afceab165b03a619d02b8 100644 (file)
@@ -1,21 +1,22 @@
 <!--  -->
 <sect1 id="events">
 <title>Events & Confirmations</title>
+<indexterm><primary>events and confirmations</primary></indexterm>
 <para>
 &slony1; transfers configuration changes and application data through
 events.   Events in &slony1; have an origin, a type and some parameters.   
 When an event is created it is inserted
 into the event queue (the <xref linkend="table.sl-event"> table) on the node the event
-originates on.  The remoteListener threads of slon processes then pick up
-that event (by quering the sl_event table) and passing the event to the
-slons remoteWorker thread for processing.
+originates on.  The remoteListener threads for each remote &lslon; process then picks up
+that event (by querying the table &slevent;) and pass the event to the
+&lslon;'s remoteWorker thread for processing.
 </para>
 
 <para>
-An event is uniquely identified by taking both the node id of the node the
+An event is uniquely identified via the combination of the node id of the node the
 event originates on and the event sequence number for that node.
-For example (1,5000001) identifies event 5000001 originating from node 1.
-While (3,5000001) identifies a different event that originated on a 
+For example, (1,5000001) identifies event 5000001 originating from node 1.
+In contrast, (3,5000001) identifies a different event that originated on a 
 different node.
 </para>
 
@@ -23,21 +24,22 @@ different node.
 <title>SYNC Events</title>
 <para>
 SYNC events are used to transfer application data for one node to the next.
-When data in a replicated table changes a trigger fires that records information
-about the change in the <xref linkend="table.sl-log-1"> or <xref linkend="table.sl-log-2"> tables.  The localListener thread
+When data in a replicated table changes, a trigger fires that records information
+about the change in the &sllog1; or &sllog2; tables.  The localListener thread
 in the slon processes will then periodically generate a SYNC event.  When the
-SYNC event is created &slony1; will find the heighest log_seqid assigned so far
+SYNC event is created, &slony1; will determine the highest log_seqid assigned so far
 along with a list of log_seqid's that were assigned to transactions that
-have not yet been committed.  This information is stored as part of the SYNC 
+have not yet been committed.  This information is all stored as part of the SYNC 
 event.
 </para>
 
 
 <para>
-When a slon processes remoteWorker processes a SYNC it will query the rows
-from sl_log_1 and sl_log2 that are covered by the SYNC (log_seqid rows
-that had been committed at the time the SYNC was generated).  The modifications
-described by these rows are then made on the subscriber.
+When the remoteWorker thread for a &lslon; processes a SYNC, it
+queries the rows from &sllog1; and &sllog2; that are covered by the
+SYNC (<emphasis>e.g.</emphasis> - log_seqid rows that had been
+committed at the time the SYNC was generated).  The data modifications
+indicated by this logged data are then applied to the subscriber.
 </para>
 
 </sect2>
@@ -46,35 +48,97 @@ described by these rows are then made on the subscriber.
 <sect2>
 <title>Event Confirmations</title>
 <para>
-When an event is processed by the slon for a remote node a CONFIRM message
-is generated by inserting a row into the sl_confirm table.  This row indicates
-that a particular event was confirmed by a particular receiver node.
-Confirmation messages are the transfered back to all other nodes in the cluster.
+When an event is processed by the &lslon; process for a remote node, a CONFIRM message
+is generated by inserting a tuple into the &slconfirm; table.  This tuple indicates
+that a particular event has been confirmed by a particular receiver node.
+Confirmation messages are then transferred back to all other nodes in the cluster.
 </para>
 </sect2>
 
 <sect2>
 <title>Event cleanup</title>
 <para>
-The slon cleanupThread will periodically run the
-<xref linkend="function.cleanupevent-p-interval-interval"> database
-function that will delete all but the most recently confirmed event for 
-each origin/receiver pair (because if an event has been confirmed by a 
-receiver then all older events from that origin have also been confirmed
-by the receiver).   Then the function will delete any SYNC events that are
-older than the oldest row left in sl_confirm (for each origin). The data 
-for these deleted events will also be removed from the sl_log_1 and sl_log_2
-tables.
+The &lslon; cleanupThread periodically runs the <xref
+linkend="function.cleanupevent-p-interval-interval"> database function
+that deletes all but the most recently confirmed event for each
+origin/receiver pair (this is safe to do because if an event has been
+confirmed by a receiver, then we know that all older events from that
+origin have also been confirmed by the receiver).  Then the function
+deletes all SYNC events that are older than the oldest row left in
+&slconfirm; (for each origin). The data for these deleted events will
+also be removed from the &sllog1; and &sllog2; tables.
 </para>
+
 <para>
-When &slony1; is first enabled it will log the data to replicate to the
-sl_log_1 table.  After a while it will stop logging to sl_log_1 and switch
-to logging on sl_log_2.  When all of rows in  in sl_log_1 have been replicated
-&slony1; will TRUNCATE the sl_log_1 table (to clear out the dead tuples),
-stop logging to sl_log_2 and switch back to logging to the newly truncated 
-sl_log_1 table.  This process will be periodically repeated as &slony1; runs.
+When &slony1; is first enabled it will log the data to replicate to
+the &sllog1; table.  After a while it will stop logging to &sllog1;
+and switch to logging in &sllog2;.  When all the data in &sllog1; is
+known to have been replicated to all the other nodes, &slony1; will
+TRUNCATE the &sllog1; table, clearing out this now-obsolete
+replication data.  Then, it stops logging to &sllog2;, switching back
+to logging to the freshly truncated &sllog1; table.  This process is
+repeated periodically as &slony1; runs, keeping these tables from
+growing uncontrollably.  By using TRUNCATE, we guarantee that the
+tables are properly emptied out.
 </para>
 
 </sect2>
 
+<sect2>
+<title>Slonik and Event Confirmations</title>
+<para>
+
+&lslonik; can submit configuration commands to different event nodes,
+as controlled by the parameters of each slonik command.  If two
+commands are submitted to different nodes, it might be important to
+ensure they are processed by other nodes in a consistent order.  The
+&lslonik; <xref linkend="stmtwaitevent"> command may be used to
+accomplish this, but as of &slony1; 2.1 this consistency is handled
+automatically by &lslonik; under a number of circumstances.
+</para>
+<orderedlist>
+<listitem><para>Before slonik submits an event to a node,
+it waits until that node has confirmed the last configuration event
+from the previous event node.</para></listitem>
+
+<listitem><para>Before slonik submits a <xref
+linkend="stmtsubscribeset"> command, it verifies that the provider
+node has confirmed all configuration events from all other
+nodes.</para></listitem>
+
+<listitem><para>Before &lslonik; submits a <xref
+linkend="stmtdropNode"> event, it verifies that all nodes in the
+cluster (aside from the one being dropped, of course!) have already
+caught up with all other nodes</para></listitem>
+
+<listitem><para>Before slonik submits a <xref linkend="stmtcloneprepare">
+it verifies that the node being cloned is caught up with all other
+nodes in the cluster.</para></listitem>
+
+<listitem><para>Before slonik submits a <xref linkend="stmtcreateset"> command
+it verifies that any <xref linkend="stmtdropset"> commands have been confirmed by
+all nodes.</para></listitem>
+
+</orderedlist>
+<para>
+
+When &lslonik; starts up, it contacts all nodes for which it has <xref
+linkend="admconninfo"> information, to find the last non-SYNC event
+from each node.  Submitting commands from multiple &lslonik; instances
+at the same time will confuse &lslonik; and is not recommended.
+Whenever &lslonik; is waiting for an event confirmation, it displays a
+message every 10 seconds indicating which events are still
+outstanding.  Any commands that might require slonik to wait for event
+confirmations may not be validly executed within a <link
+linkend="tryblock">try block</link> for the very same reasons that
+<xref linkend="stmtwaitevent"> command may not be used within a <link
+linkend="tryblock">try block</link>, namely that it is not reasonable
+to ask &slony1; to try to roll back events.
+</para>
+
+<para>
+Automatic waiting for confirmations may be disabled in &lslonik; by
+running &lslonik; with the <option>-w</option> option.</para>
+
+</sect2>
 </sect1>
index ed0ddcb566dbc87cd2404ab437b4396992a300a2..f5966672ea6f67d0bc18a5271a83456aca2e81bb 100644 (file)
  <refsynopsisdiv>
   <cmdsynopsis>
    <command>slonik</command>
+   <arg><replaceable class="parameter">options</replaceable></arg>
    <arg><replaceable class="parameter">filename</replaceable></arg>
   </cmdsynopsis>
  </refsynopsisdiv>
-
+<refsect1>
+  <title>Options</title>
+  <variablelist>
+       <varlistentry>
+         <term><option>-w</option></term>
+         <listitem>
+               <para>
+                 Suppress slonik's behaviour of automatically waiting
+                 for event confirmations before submitting events to
+                 a different node. If this option is specified, your
+                 slonik script may require explicit <xref
+                 linkend="stmtwaitevent"> commands in order to behave
+                 properly, as was the behaviour of slonik prior to
+                 version 2.1.
+                 </para>
+               </listitem>
+         </varlistentry>
+       </variablelist>
+  </refsect1>
  <refsect1>
   <title>Description</title>
 
index 2c9d053d9778bcb5249022483f3addd39db64122..79d89a37d06add0ba522be74094fc0a390692a25 100644 (file)
@@ -68,7 +68,7 @@
      <para>
       Comments begin at a hash sign (#) and extend to the end of the line.
      </para>
-    <sect2><title>Command groups</title>
+    <sect2 id="tryblock"><title>Command groups</title>
      <para>
       Commands can be combined into groups of commands with optional
       <command>on error</command> and <command>on success</command> conditionals.
@@ -523,6 +523,12 @@ INIT CLUSTER (
     therein; no public objects should be locked during the duration of
     this.</para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -604,6 +610,13 @@ INIT CLUSTER (
     therein; no public objects should be locked during the duration of
     this.</para>
    </refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command. </para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title> <para> This command
    was introduced in &slony1; 1.0.  The <envar>SPOOLNODE</envar>
    parameter was introduced in version 1.1, but was vestigial in that
@@ -681,6 +694,13 @@ INIT CLUSTER (
    cluster. </para>
    </refsect1>
 
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits until nodes (other than the one being dropped)
+            are caught up with non-SYNC events from all other nodes before
+               submitting the DROP NODE command.
+    </para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    <para> In version 2.0, the default value for <envar>EVENT NODE</envar> was removed, so a node must be specified.</para>
@@ -745,6 +765,12 @@ INIT CLUSTER (
    <para>After dropping a node, you may also need to recycle
    connections in your application.</para>
    </refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -792,6 +818,13 @@ INIT CLUSTER (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title> <para> This command
    was introduced in &slony1; 1.0; frequent use became unnecessary as
    of version 1.0.5.  There are, however, occasional cases where it is
@@ -870,6 +903,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -921,6 +961,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -991,6 +1038,14 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title> <para> This command
    was introduced in &slony1; 1.0.  As of version 1.1, you <emphasis>should</emphasis> no
    longer need to use this command, as listen paths are generated automatically. </para>
@@ -1044,6 +1099,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+      
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title> <para> This command
    was introduced in &slony1; 1.0.  As of version 1.1, you should not
    need to use it anymore. </para>
@@ -1163,6 +1225,16 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+   
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command. Slonik will also wait until any outstanding
+          DROP SET commands are confirmed by all nodes before it submits
+          the CREATE SET command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
     <para> Until version 1.2, it would crash if no comment was provided. </para>
@@ -1218,6 +1290,14 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
     on each replicated table in order to modify the table schema to
     clean up the triggers and rules.  </para>
    </refsect1>
+
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -1273,11 +1353,7 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
     <programlisting>
      # Assuming that node 1 is the origin of set 999 that has direct subscribers 2 and 3
      SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 2);
-     SYNC (ID=1);
-     WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 2, WAIT ON=1);
      SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 3);
-     SYNC (ID=1);
-     WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 3, WAIT ON=1);
      MERGE SET ( ID = 1, ADD ID = 999, ORIGIN = 1 );
     </programlisting>
    </refsect1>
@@ -1295,6 +1371,15 @@ STORE PATH ( SERVER = 1, CLIENT = 2,
    </para>
 
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+   <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.  Slonik will also wait for any 
+       in progress subscriptions involving the ADD ID to be subscribed
+       before submitting the MERGE SET command. </para>
+   </refsect1>  
+
    <refsect1> <title> Version Information </title> <para> This command
    was introduced in &slony1; 1.0.5.  In 1.2.1, a race condition was
    rectified where the merge request would be submitted while
@@ -1525,6 +1610,14 @@ SET ADD TABLE (
     takes place at the time of the <command>SUBSCRIBE_SET</command>
     event.  </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -1618,10 +1711,18 @@ SET ADD TABLE (
 
     </programlisting>
    </refsect1>
-   <refsect1> <title> Locking Behaviour </title>
+   <refsect1> <title> Locking Behaviour </title>       
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+          <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+        </refsect1>
+
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -1677,6 +1778,13 @@ SET ADD TABLE (
     replication trigger.  On subscriber nodes, this also involves
     adding back any rules/triggers that have been hidden. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>   
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0.5 </para>
    </refsect1>
@@ -1729,6 +1837,13 @@ SET ADD TABLE (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0.5 </para>
    </refsect1>
@@ -1792,6 +1907,13 @@ SET MOVE TABLE (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0.5 </para>
    </refsect1>
@@ -1861,6 +1983,13 @@ SET MOVE SEQUENCE (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0.5 </para>
    </refsect1>
@@ -2156,10 +2285,20 @@ SUBSCRIBE SET (
     acquiring any locks on the provider node.</para>
 
     <para> On the subscriber node, it will have the effect of locking
-    every table in the replication set.  In version 1.2 or later, exclusive
+    every table in the replication set.  In version 1.2 and later, exclusive
     locks are acquired at the beginning of the process.
     </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits until the provider has confirmed all
+        outstanding configuration events from any other node before
+        contacting the provider to determine the set origin.  Slonik
+        then waits for the command submitted to the previous event
+        node to be confirmed on the origin before submitting this
+        command to the origin.</para>
+</refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
     <para> The <command>OMIT COPY</command> option was introduced in &slony1; 2.0.3.</para>
@@ -2222,6 +2361,13 @@ UNSUBSCRIBE SET (
     tables and restore other triggers/rules. </para>
    </refsect1>
 
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
+
    <refsect1><title> Dangerous/Unintuitive Behaviour </title>
 
      <para> Resubscribing an unsubscribed set requires a
@@ -2306,6 +2452,12 @@ LOCK SET (
     on the origin node, and triggers are added to each such table that
     reject table updates. </para>
    </refsect1>
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -2358,6 +2510,12 @@ UNLOCK SET (
     on the origin node, as the triggers are removed from each table
     that reject table updates. </para>
    </refsect1>
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -2450,6 +2608,14 @@ MOVE SET (
     denyaccess trigger is dropped and a logtrigger trigger
     added. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -2541,6 +2707,13 @@ FAILOVER (
     </para>
 
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik will submit the FAILOVER_EVENT without waiting
+           but wait until the most ahead node has received confirmations
+           of the FAILOVER_EVENT from all nodes before completing.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
     <para> In version 2.0, the default <envar>BACKUP NODE</envar> value of 1 was removed, so it is mandatory to provide a value for this parameter.</para>
@@ -2657,6 +2830,13 @@ EXECUTE SCRIPT (
   
 
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para> Slonik waits for the command submitted to the previous
+          event node to be confirmed on the specified event node before
+          submitting this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0. </para>
 
@@ -2735,6 +2915,12 @@ UPDATE FUNCTIONS (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.0 </para>
    </refsect1>
@@ -2921,6 +3107,12 @@ REPAIR CONFIG (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+   
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.1 </para>
    </refsect1>
@@ -2961,6 +3153,12 @@ REPAIR CONFIG (
 
     <para> No application-visible locking should take place. </para>
    </refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+   
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.1.6 / 1.2.1 </para>
    </refsect1>
@@ -2992,6 +3190,12 @@ REPAIR CONFIG (
      sleep (seconds = 5);
     </Programlisting>
    </Refsect1>
+
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 1.1.6 / 1.2.1. </para>
    </refsect1>
@@ -3040,6 +3244,14 @@ REPAIR CONFIG (
      sync (id=22);
      </Programlisting>
    </Refsect1>
+
+   
+   <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik will wait until the node being cloned (the provider)
+          is caught up with all other nodes before submitting the clone prepare
+          command</para>
+</refsect1>
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 2.0. </para>
    </refsect1>
@@ -3078,6 +3290,13 @@ REPAIR CONFIG (
      clone finish (id = 33, provider = 22);
     </Programlisting>
    </Refsect1>
+
+    <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+        <para>Slonik does not wait for event confirmations before 
+          performing this command.</para>
+   </refsect1>
+   
+
    <refsect1> <title> Version Information </title>
     <para> This command was introduced in &slony1; 2.0. </para>
    </refsect1>
index 7b4b04c6ff455676533426613a4412d6dbe40636..cf8cc5d281c326058f794f80da308ec3bb24b8dd 100644 (file)
@@ -1402,6 +1402,13 @@ getClusterStatus(Name cluster_name, int need_plan_mask)
        /* @+nullderef@ */
 }
 
+/* Provide a way to reset the per-session data structure that stores
+   the cluster status in the C functions. 
+
+ * This is used to rectify the case where CLONE NODE updates the node
+ * ID, but calls to getLocalNodeId() could continue to return the old
+ * value.
+ */
 Datum
 _Slony_I_resetSession(PG_FUNCTION_ARGS)
 {
index 990c209fe5fcb7cc12e8f6463be5c4b0c78f5f59..512338bfc4c17f1d17e93f21291df5bf866c5921 100644 (file)
@@ -2483,11 +2483,12 @@ comment on function @NAMESPACE@.dropSet(p_set_id int4) is
 --
 --     Generate the MERGE_SET event.
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4)
+create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4) 
 returns bigint
 as $$
 declare
        v_origin                        int4;
+       in_progress                     boolean;
 begin
        -- ----
        -- Grab the central configuration lock
@@ -2546,13 +2547,9 @@ begin
        -- ----
        -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed
        -- ----
-       if exists (select true from @NAMESPACE@.sl_event
-                       where ev_type = 'ENABLE_SUBSCRIPTION'
-                       and ev_data1 = p_add_id::text
-                       and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm
-                                       where con_origin = ev_origin
-                                       and con_received::text = ev_data3))
-       then
+       select @NAMESPACE@.isSubscriptionInProgress(p_add_id) into in_progress ;
+       
+       if in_progress then
                raise exception 'Slony-I: set % has subscriptions in progress - cannot merge',
                                p_add_id;
        end if;
@@ -2572,6 +2569,28 @@ comment on function @NAMESPACE@.mergeSet(p_set_id int4, p_add_id int4) is
 Both sets must exist, and originate on the same node.  They must be
 subscribed by the same set of nodes.';
 
+
+create or replace function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4)
+returns boolean
+as $$
+begin
+       if exists (select true from @NAMESPACE@.sl_event
+                       where ev_type = 'ENABLE_SUBSCRIPTION'
+                       and ev_data1 = p_add_id::text
+                       and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm
+                                       where con_origin = ev_origin
+                                       and con_received::text = ev_data3))
+       then
+               return true;
+       else
+               return false;
+       end if;
+end;
+$$ language plpgsql;
+comment on function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4) is
+'Checks to see if a subscription for the indicated set is in progress.
+Returns true if a subscription is in progress. Otherwise false';
+
 -- ----------------------------------------------------------------------
 -- FUNCTION mergeSet_int (set_id, add_id)
 --
@@ -5761,6 +5780,24 @@ end $$ language plpgsql;
 comment on function @NAMESPACE@.store_application_name (i_name text) is
 'Set application_name GUC, if possible.  Returns NULL if it fails to work.';
 
+create or replace function @NAMESPACE@.is_node_reachable(origin_node_id integer,
+          receiver_node_id integer) returns boolean as $$
+declare
+               listen_row record;
+               reachable boolean;
+begin
+       reachable:=false;
+       select * into listen_row from @NAMESPACE@.sl_listen where
+                  li_origin=origin_node_id and li_receiver=receiver_node_id;
+       if found then
+          reachable:=true;
+       end if;
+  return reachable;
+end $$ language plpgsql;
+
+comment on function @NAMESPACE@.is_node_reachable(origin_node_id integer, receiver_node_id integer) 
+is 'Is the receiver node reachable from the origin, via any of the listen paths?';
+
 create or replace function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$
 begin
        -- Trim out old state for this component
@@ -5783,3 +5820,4 @@ language plpgsql;
 
 comment on function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is
 'Store state of a Slony component.  Useful for monitoring';
+
index c7b64ae06dd189a6aeb88228d553bbc7bec9a39a..ea9efeefde54af73d32ac9c70ba4a0c90ba55370 100644 (file)
@@ -43,6 +43,9 @@ SlonikScript *parser_script = NULL;
 int                    parser_errors = 0;
 int                    current_try_level;
 
+int last_event_node=-1;
+int auto_wait_disabled=0;
+
 static char myfull_path[MAXPGPATH];
 static char share_path[MAXPGPATH];
 
@@ -91,6 +94,19 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
                                                           const char * table_name);
 static int slonik_is_slony_installed(SlonikStmt * stmt,
                                                  SlonikAdmInfo * adminfo);
+static int slonik_submitEvent(SlonikStmt * stmt,
+                                                         SlonikAdmInfo * adminfo, 
+                                                         SlonDString * query,
+                                                         SlonikScript * script,
+                                                         int suppress_wait_for);
+
+static size_t slonik_get_last_event_id(SlonikStmt* stmt,
+                                                                       SlonikScript * script,
+                                                                       const char * event_filter,
+                                                                       int64 ** events);
+static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1,
+                                                                          SlonikStmt * stmt,
+                                                                          int ignore_node);
 /* ----------
  * main
  * ----------
@@ -101,7 +117,7 @@ main(int argc, const char *argv[])
        extern int      optind;
        int                     opt;
 
-       while ((opt = getopt(argc, (char **)argv, "hv")) != EOF)
+       while ((opt = getopt(argc, (char **)argv, "hvw")) != EOF)
        {
                switch (opt)
                {
@@ -113,6 +129,9 @@ main(int argc, const char *argv[])
                                printf("slonik version %s\n", SLONY_I_VERSION_STRING);
                                exit(0);
                                break;
+                       case 'w':
+                               auto_wait_disabled=1;
+                               break;
 
                        default:
                                printf("unknown option '%c'\n", opt);
@@ -1143,7 +1162,23 @@ static int
 script_exec_stmts(SlonikScript * script, SlonikStmt * hdr)
 {
        int                     errors = 0;
+       int64 * events;
+       size_t event_length;
+       int idx=0;
+       SlonikAdmInfo * curAdmInfo;
 
+       event_length=slonik_get_last_event_id(hdr,script,"ev_type <> 'SYNC' ",
+                                                                                 &events);
+       for( curAdmInfo = script->adminfo_list;
+               curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+       {
+               curAdmInfo->last_event=events[idx];
+               idx++;
+               if(idx > event_length)
+                       break;
+       }       
+       free(events);
+       
        while (hdr && errors == 0)
        {
                hdr->script = script;
@@ -2373,7 +2408,8 @@ slonik_store_node(SlonikStmt_store_node * stmt)
                                 "select \"_%s\".enableNode(%d); ",
                                 stmt->hdr.script->clustername, stmt->no_id, stmt->no_comment,
                                 stmt->hdr.script->clustername, stmt->no_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -2389,6 +2425,7 @@ slonik_drop_node(SlonikStmt_drop_node * stmt)
 {
        SlonikAdmInfo *adminfo1;
        SlonDString query;
+       SlonikAdmInfo * curAdmInfo;
 
        adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->ev_origin);
        if (adminfo1 == NULL)
@@ -2397,6 +2434,23 @@ slonik_drop_node(SlonikStmt_drop_node * stmt)
        if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
                return -1;
 
+       if(!auto_wait_disabled)
+       {
+               for(curAdmInfo = stmt->hdr.script->adminfo_list;
+                       curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+               {
+                       if(curAdmInfo->no_id == stmt->no_id)
+                               continue;
+                       if(slonik_is_slony_installed((SlonikStmt*)stmt,curAdmInfo) > 0 )
+                       {
+                               slonik_wait_config_caughtup(curAdmInfo,(SlonikStmt*)stmt,
+                                                                                       stmt->no_id);
+                       }
+
+               }
+               
+       }
+
        dstring_init(&query);
 
        slon_mkquery(&query,
@@ -2405,11 +2459,24 @@ slonik_drop_node(SlonikStmt_drop_node * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->no_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       /**
+        * we disable auto wait because we perform a wait
+        * above ignoring the node being dropped.
+        */
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,true) < 0)
        {
                dstring_free(&query);
                return -1;
        }
+       /**
+        * if we have a conninfo for the node being dropped
+        * we want to clear out the last seqid.
+        */
+       adminfo1 = get_adminfo(&stmt->hdr,stmt->no_id);
+       if(adminfo1 != NULL) {
+         adminfo1->last_event=-1;
+       }
 
        dstring_free(&query);
        return 0;
@@ -2654,6 +2721,7 @@ slonik_failed_node(SlonikStmt_failed_node * stmt)
                                 "select \"_%s\".failedNode(%d, %d); ",
                                 stmt->hdr.script->clustername,
                                 stmt->no_id, stmt->backup_node);
+       printf("executing failedNode() on %d\n",adminfo1->no_id);
        if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0)
        {
                free(configbuf);
@@ -2894,10 +2962,23 @@ slonik_failed_node(SlonikStmt_failed_node * stmt)
 
                if (use_node != stmt->backup_node)
                {
+                       
+                       /**
+                        * commit the transaction so a new transaction
+                        * is ready for the lock table
+                        */
+                       if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0)
+                       {
+                               free(configbuf);
+                               dstring_free(&query);
+                               return -1;
+                       }
                        slon_mkquery(&query,
+                                                "lock table \"_%s\".sl_event_lock; "
                                                 "select \"_%s\".storeListen(%d,%d,%d); "
                                                 "select \"_%s\".subscribeSet_int(%d,%d,%d,'t','f'); ",
                                                 stmt->hdr.script->clustername,
+                                                stmt->hdr.script->clustername,
                                                 stmt->no_id, use_node, stmt->backup_node,
                                                 stmt->hdr.script->clustername,
                                                 setinfo[i].set_id, use_node, stmt->backup_node);
@@ -2925,7 +3006,11 @@ slonik_failed_node(SlonikStmt_failed_node * stmt)
 
                        sprintf(ev_seqno_c, INT64_FORMAT, setinfo[i].max_seqno);
                        sprintf(ev_seqfake_c, INT64_FORMAT, ++max_seqno_total);
-
+                       if (db_commit_xact((SlonikStmt *) stmt, max_node_total->adminfo)
+                               < 0)
+                       {
+                               return -1;
+                       }
                        slon_mkquery(&query,
                                                 "lock table \"_%s\".sl_event_lock; "
                                                 "select \"_%s\".failedNode2(%d,%d,%d,'%s','%s'); ",
@@ -2933,20 +3018,128 @@ slonik_failed_node(SlonikStmt_failed_node * stmt)
                                                 stmt->hdr.script->clustername,
                                                 stmt->no_id, stmt->backup_node,
                                                 setinfo[i].set_id, ev_seqno_c, ev_seqfake_c);
+                       printf("NOTICE: executing \"_%s\".failedNode2 on node %d\n",
+                                  stmt->hdr.script->clustername,
+                                  max_node_total->adminfo->no_id);
                        if (db_exec_command((SlonikStmt *) stmt,
                                                                max_node_total->adminfo, &query) < 0)
-                               rc = -1;
+                               rc = -1;                        
+                       else
+                       {
+                               SlonikAdmInfo * failed_conn_info=NULL;
+                               SlonikAdmInfo * last_conn_info=NULL;
+                               bool temp_conn_info=false;
+                               /**
+                                * now wait for the FAILOVER to finish.
+                                * To do this we must wait for the FAILOVER_EVENT
+                                * which has ev_origin=stmt->no_id (the failed node)
+                                * but was incjected into the sl_event table on the
+                                * most ahead node (max_node_total->adminfo)
+                                * to be confirmed by the backup node.
+                                * 
+                                * Then we wait for the backup node to send an event
+                                * and be confirmed elsewhere.                           
+                                *
+                                */
+                               
+                       
+                               SlonikStmt_wait_event wait_event;               
+                               wait_event.hdr=*(SlonikStmt*)stmt;
+                               wait_event.wait_origin=stmt->no_id; /*failed node*/
+                               wait_event.wait_on=max_node_total->adminfo->no_id;
+                               wait_event.wait_confirmed=-1;
+                               wait_event.wait_timeout=0;
+
+                               /**
+                                * see if we can find a admconninfo
+                                * for the failed node.
+                                */
+                               
+                               for(failed_conn_info = stmt->hdr.script->adminfo_list;
+                                       failed_conn_info != NULL;
+                                       failed_conn_info=failed_conn_info->next)
+                               {
+
+                                       if(failed_conn_info->no_id==stmt->no_id)
+                                       {
+                                               break;
+                                       }
+                                       last_conn_info=failed_conn_info;
+                               }
+                               if(failed_conn_info == NULL)
+                               {
+                                       temp_conn_info=true;
+                                       last_conn_info->next = malloc(sizeof(SlonikAdmInfo));
+                                       memset(last_conn_info->next,0,sizeof(SlonikAdmInfo));
+                                       failed_conn_info=last_conn_info->next;
+                                       failed_conn_info->no_id=stmt->no_id;
+                                       failed_conn_info->stmt_filename="slonik generated";
+                                       failed_conn_info->stmt_lno=-1;
+                                       failed_conn_info->conninfo="";
+                                       failed_conn_info->script=last_conn_info->script;
+                               }
+                               
+                               failed_conn_info->last_event=max_seqno_total;
+
+                               /*
+                                * commit all open transactions despite of all possible errors
+                                * otherwise the WAIT FOR will not work.
+                                **/
+                               for (i = 0; i < num_nodes; i++)
+                               {
+                                       if (db_commit_xact((SlonikStmt *) stmt, 
+                                                                          nodeinfo[i].adminfo) < 0)
+                                               rc = -1;
+                               }
+                               
+
+                               rc = slonik_wait_event(&wait_event);
+                               if(rc < 0)  
+                               {
+                                       /**
+                                        * pretty serious? how do we recover?
+                                        */
+                                       printf("%s:%d error waiting for event\n",
+                                                  stmt->hdr.stmt_filename, stmt->hdr.stmt_lno);
+                               }
+
+                               if(temp_conn_info)
+                               {
+                                       last_conn_info->next=failed_conn_info->next;
+                                       free(failed_conn_info);                                 
+
+                               }
+
+                               slon_mkquery(&query,
+                                                        "lock table \"_%s\".sl_event_lock; "
+                                                        "select \"_%s\".createEvent('_%s', 'SYNC'); ",
+                                                        stmt->hdr.script->clustername,
+                                                        stmt->hdr.script->clustername,
+                                                        stmt->hdr.script->clustername);
+                               if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                                          stmt->hdr.script,1) < 0)
+                               {
+                                       printf("%s:%d: error submitting SYNC event to backup node"
+                                                  ,stmt->hdr.stmt_filename, stmt->hdr.stmt_lno);
+                               }
+                               
+                               
+                               
+                       }/*else*/
+                                       
                }
        }
-
+       
        /*
         * commit all open transactions despite of all possible errors
         */
        for (i = 0; i < num_nodes; i++)
        {
-               if (db_commit_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0)
+               if (db_commit_xact((SlonikStmt *) stmt, 
+                                                  nodeinfo[i].adminfo) < 0)
                        rc = -1;
        }
+       
 
        free(configbuf);
        dstring_free(&query);
@@ -3020,8 +3213,11 @@ slonik_clone_prepare(SlonikStmt_clone_prepare * stmt)
        if (adminfo1 == NULL)
                return -1;
 
-       dstring_init(&query);
+       if(!auto_wait_disabled)
+               slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
 
+       dstring_init(&query);
+       
        if (stmt->no_comment == NULL)
                slon_mkquery(&query,
                                 "select \"_%s\".cloneNodePrepare(%d, %d, 'Node %d'); ",
@@ -3034,7 +3230,8 @@ slonik_clone_prepare(SlonikStmt_clone_prepare * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->no_id, stmt->no_provider,
                                 stmt->no_comment);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3094,7 +3291,8 @@ slonik_store_path(SlonikStmt_store_path * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->pa_server, stmt->pa_client,
                                 stmt->pa_conninfo, stmt->pa_connretry);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,1) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3126,7 +3324,8 @@ slonik_drop_path(SlonikStmt_drop_path * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->pa_server, stmt->pa_client);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3159,7 +3358,8 @@ slonik_store_listen(SlonikStmt_store_listen * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->li_origin, stmt->li_provider,
                                 stmt->li_receiver);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3192,7 +3392,8 @@ slonik_drop_listen(SlonikStmt_drop_listen * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->li_origin, stmt->li_provider,
                                 stmt->li_receiver);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3210,11 +3411,62 @@ slonik_create_set(SlonikStmt_create_set * stmt)
        SlonikAdmInfo *adminfo1;
        SlonDString query;
        const char *comment;
+       SlonikAdmInfo * curAdmInfo;
+       int64 * drop_set_events;
+       int64 * cached_events;
+       size_t event_size;
+       int adm_idx;
 
        adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin);
        if (adminfo1 == NULL)
                return -1;
 
+       if( ! auto_wait_disabled)
+       {
+               /**
+                * loop through each node and make sure there are no
+                * pending DROP SET commands.
+                *
+                * if there is a DROP SET command from the node
+                * in sl_event then we wait until all other nodes are
+                * caughtup to that.
+                *
+                */
+               event_size = slonik_get_last_event_id((SlonikStmt*)stmt,
+                                                                                         stmt->hdr.script,
+                                                                                         "ev_type='DROP_SET'",
+                                                                                         &drop_set_events);
+
+               /**
+                * copy the 'real' last event to storage
+                * and update the AdmInfo structure with the last 'DROP SET' id.
+                */
+               cached_events=malloc(sizeof(int64)*event_size);         
+               adm_idx=0;
+               for(curAdmInfo = stmt->hdr.script->adminfo_list;
+                       curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+               {
+                       cached_events[adm_idx]=curAdmInfo->last_event;
+                       curAdmInfo->last_event=drop_set_events[adm_idx];
+                       adm_idx++;
+               }
+               slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
+               /***
+                * reset the last_event values in the AdmInfo to
+                * the values we saved above.
+                */
+               adm_idx=0;
+               for(curAdmInfo = stmt->hdr.script->adminfo_list;
+                       curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+               {
+                       curAdmInfo->last_event=cached_events[adm_idx];
+                       adm_idx++;
+               }
+               free(cached_events);
+               free(drop_set_events);
+               
+       }
+
        if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
                return -1;
 
@@ -3231,7 +3483,8 @@ slonik_create_set(SlonikStmt_create_set * stmt)
                     stmt->hdr.script->clustername,
                     stmt->hdr.script->clustername,
                     stmt->set_id, comment);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3263,7 +3516,8 @@ slonik_drop_set(SlonikStmt_drop_set * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->set_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3279,23 +3533,69 @@ slonik_merge_set(SlonikStmt_merge_set * stmt)
 {
        SlonikAdmInfo *adminfo1;
        SlonDString query;
+       PGresult *res;
+       bool in_progress=1;
 
        adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin);
        if (adminfo1 == NULL)
                return -1;
 
-       if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
-               return -1;
+
+       /**
+        * The event node (the origin) should be caught up
+        * with itself before submitting a merge set.
+        * this ensures no subscriptions involving the set
+        * are still in progress.
+        *
+        * (we could also check for the event number of any
+        * unconfirmed subscriptions and wait for that
+        * but we don't)
+        */
+
+       
 
        dstring_init(&query);
 
+       slon_mkquery(&query,"select \"_%s\".isSubscriptionInProgress(%d)"
+                    ,stmt->hdr.script->clustername,
+                    stmt->add_id);
+       while(in_progress)
+       {
+               char *result;
+
+               if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
+                       return -1;
+
+               res = db_exec_select((SlonikStmt*) stmt,adminfo1,&query);
+               if (res == NULL)
+               {
+                       dstring_free(&query);
+                       return -1;
+                       
+               }
+               result = PQgetvalue(res,0,0);
+               if(result != NULL && (*result=='t' ||
+                                                         *result=='T'))
+               {
+                       printf("%s:%d subscription in progress before mergeSet. waiting",
+                               stmt->hdr.stmt_filename,stmt->hdr.stmt_lno);
+                       db_rollback_xact((SlonikStmt *) stmt, adminfo1);
+                       sleep(5);
+               }
+               else
+                       in_progress=false;
+               if(result != NULL)
+                       PQclear(res);
+       }
+       
        slon_mkquery(&query,
                                 "lock table \"_%s\".sl_event_lock;"
                                 "select \"_%s\".mergeSet(%d, %d); ",
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->set_id, stmt->add_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3432,9 +3732,10 @@ slonik_set_add_single_table(SlonikStmt_set_add_table * stmt,
        slon_mkquery(&query,
                                 "select \"_%s\".setAddTable(%d, %d, '%q', '%q', '%q'); ",
                                 stmt->hdr.script->clustername,
-                                stmt->set_id, tab_id,
-                                fqname, idxname, stmt->tab_comment);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+                                stmt->set_id, stmt->tab_id,
+                                stmt->tab_fqname, idxname, stmt->tab_comment);
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                PQclear(res);
                dstring_free(&query);
@@ -3521,7 +3822,7 @@ slonik_set_add_sequence(SlonikStmt_set_add_sequence * stmt)
                dstring_terminate(&query);
        }
        else
-               rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
+         rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
                                                                                  stmt->seq_fqname,
                                                                                  stmt->set_id,stmt->seq_comment,
                                                                                  stmt->seq_id);
@@ -3545,7 +3846,7 @@ slonik_set_add_single_sequence(SlonikStmt *stmt,
        
        if(seq_id < 0)
        {
-               seq_id = slonik_get_next_sequence_id(stmt);
+         seq_id = slonik_get_next_sequence_id((SlonikStmt*)stmt);
                if(seq_id < 0)
                        return -1;
        }
@@ -3556,7 +3857,8 @@ slonik_set_add_single_sequence(SlonikStmt *stmt,
                                 stmt->script->clustername,
                                 set_id, seq_id, seq_name,
                                 seq_comment);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->script,auto_wait_disabled) < 0)
        {
                db_notice_silent = false;
                dstring_free(&query);
@@ -3589,7 +3891,8 @@ slonik_set_drop_table(SlonikStmt_set_drop_table * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->tab_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3622,7 +3925,8 @@ slonik_set_drop_sequence(SlonikStmt_set_drop_sequence * stmt)
                                 "select \"_%s\".setDropSequence(%d); ",
                                 stmt->hdr.script->clustername,
                                 stmt->seq_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                db_notice_silent = false;
                dstring_free(&query);
@@ -3655,7 +3959,8 @@ slonik_set_move_table(SlonikStmt_set_move_table * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->tab_id, stmt->new_set_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3686,7 +3991,8 @@ slonik_set_move_sequence(SlonikStmt_set_move_sequence * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->seq_id, stmt->new_set_id);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3719,6 +4025,21 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
         * about this change directly.
         */
 
+       /**
+        * we don't actually want to execute that query until
+        * the provider node is caught up with all other nodes wrt config data.
+        *
+        * this is because we don't want to pick the origin based on
+        * stale data. 
+        *
+        * @note an alternative might be to contact all adminconninfo
+        * nodes looking for the set origin and then submit the
+        * set origin to that.  This avoids the wait for and is probably
+        * what we should do.
+        */ 
+       if(!auto_wait_disabled)
+               slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
+
        slon_mkquery(&query,"select count(*) FROM \"_%s\".sl_subscribe " \
                                 "where sub_set=%d AND sub_receiver=%d " \
                                 " and sub_active=true and sub_provider<>%d",
@@ -3726,14 +4047,17 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
                                 stmt->sub_setid,stmt->sub_receiver,
                                 stmt->sub_provider);
        
-       res1 = db_exec_select((SlonikStmt*) stmt,adminfo1,&query);
+       res1 = db_exec_select(&stmt->hdr,adminfo1,&query);
        if(res1 == NULL) {
                dstring_free(&query);
                return -1;
        }
-       if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0)
+       if(PQntuples(res1) > 0)
        {
-               reshape=1;
+               if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0)
+               {
+                       reshape=1;
+               }
        }
        PQclear(res1);
        dstring_reset(&query);
@@ -3743,9 +4067,12 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
                                 " set_id=%d",stmt->hdr.script->clustername,
                                 stmt->sub_setid);
        res1 = db_exec_select((SlonikStmt*)stmt,adminfo1,&query);
-       if(res1==NULL) 
+       if(res1==NULL || PQntuples(res1) <= 0 
        {
-               PQclear(res1);
+               printf("%s:%d error: can not determine set origin for set %d\n",
+                          stmt->hdr.stmt_filename,stmt->hdr.stmt_lno,stmt->sub_setid);
+               if(res1 != NULL)
+                       PQclear(res1);
                dstring_free(&query);
                return -1;
 
@@ -3772,7 +4099,8 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt)
                                 stmt->sub_receiver,
                                 (stmt->sub_forward) ? "t" : "f",
                                 (stmt->omit_copy) ? "t" : "f");
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3824,7 +4152,8 @@ slonik_unsubscribe_set(SlonikStmt_unsubscribe_set * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->sub_setid, stmt->sub_receiver);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -3970,7 +4299,8 @@ slonik_move_set(SlonikStmt_move_set * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->set_id, stmt->new_origin);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -4033,7 +4363,8 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt)
                     stmt->ddl_setid, /* dstring_data(&script),  */ 
                     stmt->only_on_node);
 
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, 
+                                                  stmt->hdr.script,auto_wait_disabled) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -4206,6 +4537,9 @@ slonik_wait_event(SlonikStmt_wait_event * stmt)
        time_t          now;
        int                     all_confirmed = 0;
        char            seqbuf[64];
+       int loop_count=0;
+       SlonDString outstanding_nodes;
+       int tupindex;
 
        adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->wait_on);
        if (adminfo1 == NULL)
@@ -4214,7 +4548,7 @@ slonik_wait_event(SlonikStmt_wait_event * stmt)
        time(&timeout);
        timeout += stmt->wait_timeout;
        dstring_init(&query);
-
+       dstring_init(&outstanding_nodes);
        while (!all_confirmed)
        {
                all_confirmed = 1;
@@ -4288,7 +4622,21 @@ slonik_wait_event(SlonikStmt_wait_event * stmt)
                                return -1;
                        }
                        if (PQntuples(res) > 0)
-                               all_confirmed = 0;
+                       {
+                               all_confirmed = 0;              
+                               dstring_reset(&outstanding_nodes);
+                               for(tupindex=0; tupindex < PQntuples(res); tupindex++)
+                               {
+                                       char * node = PQgetvalue(res,tupindex,0);
+                                       char * last_event = PQgetvalue(res,tupindex,1);
+                                       if( last_event == 0)
+                                         last_event="null";
+                                       slon_appendquery(&outstanding_nodes,"%snode %s only on event %s"
+                                                                        , tupindex==0 ? "" : ", "
+                                                                        , node,last_event);
+                                       
+                               }
+                       }
                        PQclear(res);
 
                        if (!all_confirmed)
@@ -4310,9 +4658,27 @@ slonik_wait_event(SlonikStmt_wait_event * stmt)
                        return -1;
                }
 
+               loop_count++;
+               if(loop_count % 10 == 0 && stmt->wait_confirmed >= 0)
+               {
+                       sprintf(seqbuf, INT64_FORMAT, adminfo->last_event);
+                       printf("%s:%d: waiting for event (%d,%s) to be confirmed on node %d\n"
+                                  ,stmt->hdr.stmt_filename,stmt->hdr.stmt_lno
+                                  ,stmt->wait_origin,seqbuf,
+                                  stmt->wait_confirmed);
+               }
+               else if (loop_count % 10 ==0 )
+               {
+                       sprintf(seqbuf, INT64_FORMAT, adminfo->last_event);
+                       printf("%s:%d: waiting for event (%d,%s).  %s\n",
+                                  stmt->hdr.stmt_filename,stmt->hdr.stmt_lno,
+                                  stmt->wait_origin,seqbuf,
+                                  dstring_data(&outstanding_nodes));
+
+               }
                sleep(1);
        }
-
+       dstring_free(&outstanding_nodes);
        dstring_free(&query);
 
        return 0;
@@ -4371,7 +4737,8 @@ slonik_sync(SlonikStmt_sync * stmt)
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername,
                                 stmt->hdr.script->clustername);
-       if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+       if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+                                                  stmt->hdr.script,1) < 0)
        {
                dstring_free(&query);
                return -1;
@@ -4501,11 +4868,11 @@ replace_token(char *resout, char *lines, const char *token, const char *replacem
 }
 
 /**
- * checks all nodes (that an admin conninfo exists for)
- * to find the next available table id.
- *
- *
- */
+* checks all nodes (that an admin conninfo exists for)
+* to find the next available table id.
+*
+*
+*/
 static int 
 slonik_get_next_tab_id(SlonikStmt * stmt)
 {
@@ -4515,7 +4882,7 @@ slonik_get_next_tab_id(SlonikStmt * stmt)
        int tab_id=0;
        char * tab_id_str;
        PGresult* res;
-
+       
        dstring_init(&query);
        slon_mkquery(&query,
                                 "select max(tab_id) FROM \"_%s\".sl_table",
@@ -4541,13 +4908,13 @@ slonik_get_next_tab_id(SlonikStmt * stmt)
                        res = db_exec_select((SlonikStmt*)stmt,adminfo,&query);
                        if(res == NULL ) 
                        {
-                               printf("%s:%d: Error:could not query node %d for next table id",
-                                          stmt->stmt_filename,stmt->stmt_lno,
-                                          adminfo->no_id);
-                               if( res != NULL)
-                                       PQclear(res);
-                               dstring_terminate(&query);
-                               return -1;
+                                       printf("%s:%d: Error:could not query node %d for next table id",
+                                                  stmt->stmt_filename,stmt->stmt_lno,
+                                                  adminfo->no_id);
+                                       if( res != NULL)
+                                                       PQclear(res);
+                                       dstring_terminate(&query);
+                                       return -1;
                        }
                }
                else
@@ -4561,7 +4928,7 @@ slonik_get_next_tab_id(SlonikStmt * stmt)
                {               
                        tab_id_str = PQgetvalue(res,0,0);
                        if(tab_id_str != NULL)
-                          tab_id=strtol(tab_id_str,NULL,10);
+                               tab_id=strtol(tab_id_str,NULL,10);
                        else
                        {
                                PQclear(res);
@@ -4579,11 +4946,11 @@ slonik_get_next_tab_id(SlonikStmt * stmt)
 
 
 /**
- * checks all nodes (that an admin conninfo exists for)
- * to find the next available sequence id.
- *
- *
- */
+* checks all nodes (that an admin conninfo exists for)
+* to find the next available sequence id.
+*
+*
+*/
 static int 
 slonik_get_next_sequence_id(SlonikStmt * stmt)
 {
@@ -4594,7 +4961,7 @@ slonik_get_next_sequence_id(SlonikStmt * stmt)
        char * seq_id_str;
        PGresult* res;
        int rc;
-
+       
        dstring_init(&query);
        slon_mkquery(&query,
                                 "select max(seq_id) FROM \"_%s\".sl_sequence",
@@ -4642,7 +5009,7 @@ slonik_get_next_sequence_id(SlonikStmt * stmt)
                {               
                        seq_id_str = PQgetvalue(res,0,0);
                        if(seq_id_str != NULL)
-                          seq_id=strtol(seq_id_str,NULL,10);
+                               seq_id=strtol(seq_id_str,NULL,10);
                        else
                        {
                                PQclear(res);
@@ -4658,18 +5025,18 @@ slonik_get_next_sequence_id(SlonikStmt * stmt)
 }
 
 /**
- * find the origin node for a particular set.
- * This function will query the first admin node it
- * finds to determine the origin of the set.
- *
- * If the node doesn't know about the set then
- * it will query the next admin node until it finds
- * one that does.
- *
- */
+* find the origin node for a particular set.
+* This function will query the first admin node it
+* finds to determine the origin of the set.
+*
+* If the node doesn't know about the set then
+* it will query the next admin node until it finds
+* one that does.
+*
+*/
 static int find_origin(SlonikStmt * stmt,int set_id)
 {
-       
+
        SlonikAdmInfo *adminfo_def;
        SlonDString query;
        PGresult * res;
@@ -4679,7 +5046,7 @@ static int find_origin(SlonikStmt * stmt,int set_id)
        slon_mkquery(&query,
                                 "select set_origin from \"_%s\".\"sl_set\" where set_id=%d",
                                 stmt->script->clustername,set_id);
-
+       
        for (adminfo_def = stmt->script->adminfo_list;
                 adminfo_def; adminfo_def = adminfo_def->next)
        {       
@@ -4700,14 +5067,14 @@ static int find_origin(SlonikStmt * stmt,int set_id)
                        origin_id_str = PQgetvalue(res,0,0);
                        if(origin_id_str != NULL)
                        {
-                          origin_id=strtol(origin_id_str,NULL,10);
-                          PQclear(res);
+                               origin_id=strtol(origin_id_str,NULL,10);
+                               PQclear(res);
                        }
                        else
                        {
                                PQclear(res);
                                continue;
-
+                               
                        }
                }
                if(origin_id >= 0) 
@@ -4716,22 +5083,22 @@ static int find_origin(SlonikStmt * stmt,int set_id)
        
        dstring_terminate(&query);
        
-
+       
        return origin_id;
 }
 
 
 /**
- * adds any sequences that table_name depends on to the replication
- * set.
- *
- 
- *
- */
+* adds any sequences that table_name depends on to the replication
+* set.
+*
+* 
+*
+*/
 int
 slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
-                                                          SlonikAdmInfo * adminfo1,
-                                                          const char * table_name)
+                                                  SlonikAdmInfo * adminfo1,
+                                                  const char * table_name)
 {
 
        SlonDString query;
@@ -4740,7 +5107,7 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
        const char * seq_name;
        char * comment;
        int rc;
-
+       
        dstring_init(&query);
        slon_mkquery(&query,
                                 "select pg_get_serial_sequence('%s',column_name) "
@@ -4755,7 +5122,7 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
        }
        for(idx=0; idx < PQntuples(result);idx++)
        {
-       
+               
                if(!PQgetisnull(result,idx,0)  )
                {
                        seq_name=PQgetvalue(result,idx,0);
@@ -4765,9 +5132,9 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
                        comment=malloc(strlen(table_name)+strlen("sequence for"+1));
                        sprintf(comment,"sequence for %s",table_name);
                        rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
-                                                                                  seq_name,
-                                                                                  stmt->set_id,
-                                                                                  comment,-1);
+                                                                                         seq_name,
+                                                                                         stmt->set_id,
+                                                                                         comment,-1);
                        free(comment);
                        if(rc < 0 )
                        {
@@ -4775,33 +5142,33 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
                                dstring_terminate(&query);
                                return rc;
                        }
-                               
+                       
                }
                
        }/*for*/
        PQclear(result);
        dstring_terminate(&query);
        return 0;
-
+       
 }
 
 
 /**
- * checks to see if slony is installed on the given node.
- *
- * this function will check to see if slony tables exist
- * on the node by querying the information_schema.
- 
- * returns:
- *        -1 => could not query information schema
- *         0 => slony not installed
- *         1 => slony is installed.
- */
+* checks to see if slony is installed on the given node.
+*
+* this function will check to see if slony tables exist
+* on the node by querying the information_schema.
+* 
+* returns:
+*        -1 => could not query information schema
+*         0 => slony not installed
+*         1 => slony is installed.
+*/
 static int
 slonik_is_slony_installed(SlonikStmt * stmt,
-                                                 SlonikAdmInfo * adminfo)
+                                         SlonikAdmInfo * adminfo)
 {
-       SlonDString query;
+       SlonDString query;
        PGresult * res;
        dstring_init(&query);
        int rc=-1;
@@ -4818,10 +5185,360 @@ slonik_is_slony_installed(SlonikStmt * stmt,
        
        if(res != NULL)
                PQclear(res);
+       db_rollback_xact(stmt, adminfo);
        dstring_terminate(&query);
        return rc;
+       
+}      
+
+/* slonik_submitEvent(stmt, adminfo, query, script, suppress_wait_for)
+ *
+ * Wraps former requests to generate events, folding together the
+ * logic for whether or not to do auto wait for or suppress this into
+ * one place.
+ */
+                                          
+static int slonik_submitEvent(SlonikStmt * stmt,
+                                                         SlonikAdmInfo * adminfo, 
+                                                         SlonDString * query,
+                                                         SlonikScript * script,
+                                                         int suppress_wait_for)
+{
+       int rc;
+       if ( last_event_node >= 0 &&
+                last_event_node != adminfo->no_id
+               && ! suppress_wait_for )
+       {
+               /**
+                * the last event node is not the current event node.
+                * time to wait.
+                */
+               
+               if( current_try_level != 0)
+               {
+                       printf("%s:%d Error: the event origin can not be changed "
+                                  "inside of a try block",
+                                  stmt->stmt_filename, stmt->stmt_lno);
+                       return -1;
+               }
+
+               /**
+                * for now we generate a 'fake' Slonik_wait_event structure
+                * 
+                */
+               SlonikStmt_wait_event wait_event;               
+               wait_event.hdr=*stmt;
+               wait_event.wait_origin=last_event_node;
+               wait_event.wait_on=last_event_node;
+               wait_event.wait_confirmed=adminfo->no_id;
+               wait_event.wait_timeout=0;
+               rc = slonik_wait_event(&wait_event);
+               if(rc < 0) 
+                       return rc;
+               
+       }
+       rc= db_exec_evcommand(stmt,adminfo,query);
+       if(! suppress_wait_for)
+               last_event_node=adminfo->no_id;
+       return rc;
+  
+}
+
+/**
+ * slonik_get_last_event_id(stmt, script, event_filter, events)
+ *
+ * query all nodes we have admin conninfo data for and
+ * find the last non SYNC event id generated from that node.
+ *
+ * store this in the SlonikAdmInfo structure so it can later
+ * be used as part of a wait for.
+ *
+ */
+static size_t slonik_get_last_event_id(SlonikStmt *stmt,
+                                                                          SlonikScript * script,
+                                                                          const char * event_filter,
+                                                                          int64 ** events)
+{
+       
+       SlonDString query;
+       PGresult * result;
+       char * event_id;
+       SlonikAdmInfo * curAdmInfo=NULL;
+       int node_count=0;
+       int node_idx;
+       int rc;
+
+       dstring_init(&query);
+       slon_mkquery(&query,"select max(ev_seqno) FROM \"_%s\".sl_event"
+                                " , \"_%s\".sl_node "
+                                " where ev_origin=\"_%s\".getLocalNodeId('_%s') "
+                                " AND %s AND sl_node.no_id="
+                                " ev_origin"
+                                , script->clustername,script->clustername,
+                                script->clustername,script->clustername,event_filter);
+       node_count=0;
+       for( curAdmInfo = script->adminfo_list;
+               curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+       {
+               node_count++;
+       }
+       *events = malloc(sizeof(int64)*(node_count+1));
+       node_idx=0;
+       for( curAdmInfo = script->adminfo_list;
+                curAdmInfo != NULL; curAdmInfo = curAdmInfo->next,node_idx++)
+       {
+               SlonikAdmInfo * activeAdmInfo = 
+                       get_active_adminfo(stmt,curAdmInfo->no_id);
+               if( activeAdmInfo == NULL)
+               {
+                       /**
+                        * warning?
+                        */
+                       continue;
+               }
+               rc = slonik_is_slony_installed(stmt,activeAdmInfo);
+               if(rc == 1)
+               {
+                       result = db_exec_select(stmt,activeAdmInfo,&query);
+                       if(result == NULL || PQntuples(result) != 1 ) 
+                       {
+                               printf("error: unable to query event history on node %d\n",
+                                          curAdmInfo->no_id);
+                               if(result != NULL)
+                                       PQclear(result);
+                               return -1;
+                       }
+                       event_id = PQgetvalue(result,0,0);
+                       db_rollback_xact(stmt, activeAdmInfo);
+                       if(event_id != NULL)
+                               (*events)[node_idx]=strtoll(event_id,NULL,10);             
+                       else
+                               (*events)[node_idx]=-1;
+                       PQclear(result);
+               }
+               else {
+                       (*events)[node_idx]=-1;
+               }
+               
+       }
+       
+       
+       dstring_terminate(&query);
+       return node_count;
+}
+
+/**
+ * waits until adminfo1 is caught up with config events from
+ * all other nodes.
+ *
+ * adminfo1 - The node that we are waiting to be caught up
+ * stmt - The statement that is currently being executed
+ * ignore_node - allows 1 node to be ignored (don't wait for
+ *               adminfo1 to be caught up with that node)
+ *               -1 means don't ignore any nodes.
+ */                                                            
+static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1,
+                                                                          SlonikStmt * stmt,
+                                                                          int ignore_node)
+{
+       SlonDString event_list;
+       PGresult * result=NULL;
+       SlonikAdmInfo * curAdmInfo=NULL;
+       int first_event=1;
+       int confirm_count=0;
+       SlonDString is_caughtup_query;
+       SlonDString node_list;
+       int wait_count=0;
+       int node_list_size=0;
+       int sleep_count=0;
+       int64* behind_nodes=NULL;
+       int idx;
+       int cur_array_idx;
+       
+       /**
+        * an array that stores a node_id, last_event.
+        * or the last event seen for each admin conninfo
+        * node.
+        */
+       int64 * last_event_array=NULL;
+       
+
+       dstring_init(&event_list);
+       dstring_init(&node_list);
+
+       if( current_try_level != 0)
+       {
+         printf("%s:%d Error: waiting operation not allowed inside of "
+                        "inside of a try block",
+                        stmt->stmt_filename, stmt->stmt_lno);
+         return -1;
+       }
+
+       for( curAdmInfo = stmt->script->adminfo_list;
+                curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+       {
+               node_list_size++;
+       }
+       last_event_array = malloc(node_list_size * sizeof(int64)*2);
+       memset(last_event_array,0,sizeof(node_list_size * sizeof(int64)*2));
+       
+       for( curAdmInfo = stmt->script->adminfo_list;
+                curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+       {
+               if(curAdmInfo->last_event < 0 || 
+                  curAdmInfo->no_id==adminfo1->no_id ||
+                       curAdmInfo->no_id == ignore_node )
+                       continue;
+               
+               char  seqno[64];
+               sprintf(seqno,INT64_FORMAT,curAdmInfo->last_event);
+               slon_appendquery(&event_list, 
+                                                "%s (node_list.no_id=%d)"
+                                                ,first_event ? " " : " OR "
+                                                ,curAdmInfo->no_id
+                                                ,seqno
+                                                );             
+               slon_appendquery(&node_list,"%s (%d) ",
+                                                first_event ? " " : ",",
+                                                curAdmInfo->no_id);
+               last_event_array[wait_count*2]=curAdmInfo->no_id;
+               last_event_array[wait_count*2+1]=curAdmInfo->last_event;
+               first_event=0;
+               wait_count++;
+       }
+
+       
+
+       dstring_init(&is_caughtup_query);
+        /**
+         * I need a row for the case where a node is not in sl_confirm
+         * and the node is disabled or deleted.
+         */
+        slon_mkquery(&is_caughtup_query,
+                                 "select node_list.no_id,max(con_seqno),no_active FROM "
+                                 " (VALUES %s) as node_list (no_id) LEFT JOIN "
+                                 "\"_%s\".sl_confirm ON(sl_confirm.con_origin=node_list.no_id"
+                                 " AND sl_confirm.con_received=%d)"
+                                 " LEFT JOIN \"_%s\".sl_node ON (node_list.no_id=sl_node.no_id) "
+                                 "GROUP BY node_list.no_id,no_active"
+                                 ,dstring_data(&node_list)
+                                 ,stmt->script->clustername
+                                 ,adminfo1->no_id
+                                 ,stmt->script->clustername);
+        
+        while(confirm_count != wait_count)
+        {
+                result = db_exec_select(stmt,
+                                                                adminfo1,&is_caughtup_query);
+                if (result == NULL) 
+                {
+                        /**
+                         * error
+                         */
+                }
+                confirm_count = PQntuples(result);
+
+                db_rollback_xact(stmt, adminfo1);        
+                 
+                /**
+                 * find nodes that are missing.
+                 * 
+                 */
+                behind_nodes=malloc(node_list_size * sizeof(int64));
+                memset(behind_nodes,0,node_list_size*sizeof(int64));
+                confirm_count=0;
+                for(idx = 0; idx < PQntuples(result); idx++)
+                {
+                        char * n_id_c = PQgetvalue(result,idx,0);
+                        int n_id = atoi(n_id_c);
+                        char * seqno_c = PQgetvalue(result,idx,1);
+                        int64 seqno=strtoll(seqno_c,NULL,10);  
+                        char * node_active = PQgetvalue(result,idx,2);
+                        for(cur_array_idx=0;
+                                cur_array_idx < wait_count; cur_array_idx++)
+                        {
+                                if(last_event_array[cur_array_idx*2]==n_id)
+                                {
+                                        /*
+                                         *  found.
+                                         */
+                                        if(node_active == NULL ||  *node_active=='f')
+                                        {
+                                                /**
+                                                 * if node_active is null we assume the
+                                                 * node has been deleted since it
+                                                 * has no entry in sl_node
+                                                 */
+                                                behind_nodes[cur_array_idx]=-1;
+                                                confirm_count++;
+                                        }                                      
+                                        else if(last_event_array[cur_array_idx*2+1]>seqno)
+                                        {
+                                                behind_nodes[cur_array_idx]=seqno;
+                                        }
+                                        else
+                                        {
+                                                behind_nodes[cur_array_idx]=-1;
+                                                confirm_count++;
+                                        }
+                                        
+                                }
+                                
+                        }
+                }/*for .. PQntuples*/
+                if(confirm_count < wait_count )
+                {
+                        sleep_count++;
+                        if(sleep_count % 10 == 0)
+                        {
+                                /**
+                                 * any elements in caught_up_nodes with a value 0
+                                 * means that the cooresponding node id in
+                                 * last_event_array is not showing up in the
+                                 * query result.
+                                 */
+                                SlonDString outstanding;
+                                dstring_init(&outstanding);
+                                first_event=1;
+                                for(cur_array_idx=0; cur_array_idx < wait_count;
+                                        cur_array_idx++)
+                                {                                 
+                                        if(behind_nodes[cur_array_idx] >= 0)
+                                        {
+                                                char tmpbuf[96];
+                                                sprintf(tmpbuf,        "(" INT64_FORMAT "," INT64_FORMAT
+                                                                ") only at (" INT64_FORMAT "," INT64_FORMAT
+                                                                ")"
+                                                                ,
+                                                                last_event_array[cur_array_idx*2]
+                                                                ,last_event_array[cur_array_idx*2+1],
+                                                                last_event_array[cur_array_idx*2],
+                                                                behind_nodes[cur_array_idx] );
+                                                slon_appendquery(&outstanding,"%s %s"
+                                                                                 , first_event ? "" : ",",tmpbuf);
+                                                first_event=0;
+                                        }
+                                        
+                                }
+                                printf("waiting for events %s to be confirmed on node %d\n",
+                                               dstring_data(&outstanding),adminfo1->no_id);
+                                dstring_terminate(&outstanding);
+                          
+                        }/* every 10 iterations */                              
+                        sleep(1);
+                } 
+                free(behind_nodes);
+                
+        }/*while*/
+        if(result != NULL)
+                        PQclear(result);    
+        dstring_terminate(&event_list);
+        dstring_terminate(&is_caughtup_query);
+        free(last_event_array);
+        return 0;
+
+}
 
-}                                                 
 
 /*
  * Local Variables: