This is a merge of the auto_wait_for feature branch.
This change modifies slonik so that it will (in normal circumstances)
automatically wait for events to be confirmed before proceeding in
cases where it detects that this is required.
See the documentation changes included in this patch for more details.
slonikScript += ' subscribe set(id=' + setid + ', provider='
+ provider_node + ', receiver=' + subscriber_node
+ ', forward=yes);\n';
- slonikScript += this.generateSlonikWait(origin_node);
- slonikScript += ' echo \'syncing\';\n';
- slonikScript += ' sync(id=' + provider_node + ');\n';
- slonikScript += ' echo \'waiting for event\';\n';
- slonikScript += this.generateSlonikWait(provider_node);
+ //slonikScript += this.generateSlonikWait(origin_node);
+ //slonikScript += ' echo \'syncing\';\n';
+ //slonikScript += ' sync(id=' + provider_node + ');\n';
+ //slonikScript += ' echo \'waiting for event\';\n';
+ //slonikScript += this.generateSlonikWait(provider_node);
slonikScript += ' echo \'finished subscribing ' + subscriber_node +'\' ;\n';
var slonik = this.coordinator.createSlonik('subscribe ', preamble,
*/
this.subscribeSet(1, 1,1, [ 2, 3 ]);
this.subscribeSet(1, 1,3, [ 4, 5 ]);
+ this.slonikSync(1,1);
this.testAddDropColumn(1, 1, false);
this.subscribeSet(2, 2,3, [ 4, 5 ]);
this.coordinator.log("ExecuteScript.prototype.runTest - move set to node 1");
+
+
/**
* Move the set to node 1. We want to do this for the next test.
*/
*/
var load = this.generateLoad();
- this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders");
+ this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders - expecting failure:" + expectFailure);
/**
* Now add a column to orders. We will do this via EXECUTE SCRIPT.
*/
var lag = this.measureLag(1,2);
this.coordinator.log('we do not expect lag, we measure it as ' + lag);
this.testResults.assertCheck('node is not lagged', lag < 10,true);
-
+ this.slonikSync(1,1);
this.dropTestTable(1,1,false);
statement.close();
* DROP 3 node.
* We expect it to fail since node 3 is still a provider
* for nodes 4,5.
+ *
+ * perform the slonikSync to make sure node 4,5 is caught up.
+ * otherwise slonik will wait before actually submitting the
+ * dropNode but since the slon will be stopped by this.failNode()
+ * the event might never get propogated.
*/
+ this.slonikSync(1,1);
this.coordinator.log('failing node 3');
- this.failNode(3,true);
-
+
/**
* Readd all paths, some might have been deleted and not re-added
* when we deleted nodes above.
* We also have to restart the slons because of bug # 120
*/
this.addCompletePaths();
+
+ this.failNode(3,true);
+
/**
* Sleep a bit.
* Do we need to do this for the paths to propogate????
}
this.coordinator.log("FailNodeTest.prototype.runTest - sleeping 60x1000");
java.lang.Thread.sleep(60*1000);
- /**
- * Replace the generateSlonikWait function with a version that
- * does individual wait for event(..) statements instead of
- * a confirmed=all since we do not want to be waiting on node 3
- * since we just destroyed it.
- */
- var originalGenerateWait = this.generateSlonikWait;
- this.generateSlonikWait=function(event_node) {
- var script='';
- for(var idx=1; idx <= this.getNodeCount(); idx ++) {
- if(idx==3||idx==event_node) {
- continue;
- }
- script += "echo 'waiting on confirm from " + idx + "';\n";
- script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n';
- }
- return script;
- }
+
/**
* SUBSCRIBE nodes 4,5 via node 1 directly.
*/
this.coordinator.log("FailNodeTest.prototype.runTest - subscribe 4,5 via 1");
- this.subscribeSet(1,1,1,[4,5]);
-
+ this.subscribeSet(1,1,1,[4,5]);
/**
* Now we should be able to drop node 3.
*/
- this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3");
+ this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3");
this.failNode(3,false);
- this.generateSlonikWait=originalGenerateWait;
load.stop();
java.lang.Thread.sleep(10*1000);
load.stop();
this.coordinator.join(load);
- this.addCompletePaths();
+ //this.addCompletePaths();
this.slonikSync(1,1);
this.slonikSync(1,2);
this.slonikSync(1,4);
this.coordinator.log("FailNodeTest.prototype.runTest - drop DB2");
//Now DROP the database. This lets us simulate a hard failure.
this.dropDb(['db2']);
- /**
- * Replace the generateSlonikWait function with a version that
- * does individual wait for event(..) statements instead of
- * a confirmed=all since we do not want to be waiting on node 3
- * since we just destroyed it.
- */
- var originalGenerateWait = this.generateSlonikWait;
- this.generateSlonikWait=function(event_node) {
- var script='';
- for(var idx=1; idx <= this.getNodeCount(); idx ++) {
- if(idx==2|| idx==3|| idx==event_node) {
- continue;
- }
- script += "echo 'waiting on confirm from " + idx + "';\n";
- script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n';
- }
- return script;
- }
-
+
this.coordinator.log("FailNodeTest.prototype.runTest - reshape cluster");
//Now reshape the cluster.
this.subscribeSet(1,1,1,[4,5]);
*/
FailNodeTest.prototype.failNode=function(nodeId, expectFailure) {
this.coordinator.log("FailNodeTest.prototype.FailNodeTest - begin");
- this.slonArray[nodeId-1].stop();
- this.coordinator.join(this.slonArray[nodeId-1]);
+
var slonikPreamble = this.getSlonikPreamble();
var slonikScript = 'echo \'FailNodeTest.prototype.failNode\';\n';
- slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n';
+ slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n'
+ + 'uninstall node(id=' + nodeId + ');\n';
+
for(var idx=2; idx <= this.getNodeCount(); idx++) {
if(idx == nodeId) {
continue;
}
- slonikScript += 'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n';
+ // slonikScript += 'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n';
}
var slonik=this.coordinator.createSlonik('drop node',slonikPreamble,slonikScript);
else {
this.testResults.assertCheck('drop node okay',slonik.getReturnCode(),0);
}
+ this.slonArray[nodeId-1].stop();
+ this.coordinator.join(this.slonArray[nodeId-1]);
this.coordinator.log("FailNodeTest.prototype.FailNodeTest - complete");
}
*/
this.subscribeSet(1,1, 1, [ 2, 3 ]);
this.subscribeSet(1,1, 3, [ 4, 5 ]);
-
+ this.slonikSync(1,1);
var load = this.generateLoad();
*
* Re resubscribe node 2 to receive from node 3 before the
* FAILOVER this should test the more simple case.
- */
- this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3');
+ */ this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3');
this.addCompletePaths();
this.subscribeSet(1,1,3,[2]);
-
+ this.slonikSync(1,1);
this.failNode(1,3,true);
java.lang.Thread.sleep(10*1000);
load.stop();
this.dropNode(1,3);
this.reAddNode(1,3,3);
-
this.addCompletePaths();
this.moveSet(1,3,1);
//this.generateSlonikWait=oldWait;
load.stop();
this.coordinator.join(load);
-
this.dropNode(1,3);
//??this.coordinator.join(subscribeSlonik[0]);
this.reAddNode(1,3,3);
- /**
- * Now Shutdown the slon for node 4.
- */
- this.coordinator.log('PROGRESS:Shutting down node 4');
- this.slonArray[3].stop();
- this.coordinator.join(this.slonArray[3]);
- /**
- * How does the failure behave when the slon for node 4 is down?
- *
- * Well for 1 the 'wait for event' to node 4 won't recover.
- *
- */
- this.failNode(1,3,false);
-
- this.slonArray[3] = this.coordinator.createSlonLauncher('db4');
- this.slonArray[3].run();
- java.lang.Thread.sleep(10*1000);
- this.dropNode(1,3);
- this.reAddNode(1,3,3);
+ this.slonikSync(1,1);
this.compareDb('db1', 'db2');
this.compareDb('db1', 'db3');
this.compareDb('db1', 'db4');
this.addCompletePaths();
this.moveSet(1,3,1)
- load = this.generateLoad();
+
/**
* Now shutdown the slon for node 3, see how a failover to node 3 behaves.
*/
this.coordinator.log('PROGRESS:shutting down node 3 for a failover test');
this.slonArray[2].stop();
this.coordinator.join(this.slonArray[2]);
- load.stop();
- this.coordinator.join(load);
- this.failNode(1,3,false);
- this.coordinator.log('PROGRESS:starting slon 3 back up');
- this.slonArray[2] = this.coordinator.createSlonLauncher('db3');
- this.slonArray[2].run();
+ /**
+ * create a timer event.
+ * in 60 seconds we will start up the slon again.
+ * the failover should not complete with the slon shutdown
+ * (at least not the 2.1 version of failover).
+ */
+
+
+ this.coordinator.log('PROGRESS:load has stopped');
+ var thisRef=this;
/**
* The failover needs to propogate before the DROP NODE or things can fail.
*/
- java.lang.Thread.sleep(10*1000);
+ var onTimeout = {
+ onEvent : function(object, event) {
+ thisRef.coordinator.log('PROGRESS:starting slon 3 back up');
+ thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3');
+ thisRef.slonArray[2].run();
+
+
+ }
+ };
+ var timeoutObserver = new Packages.info.slony.clustertest.testcoordinator.script.ExecutionObserver(onTimeout);
+ var timer = this.coordinator.addTimerTask('restart slon', 120,
+ timeoutObserver);
+ this.failNode(1,3,true);
+ this.coordinator.removeObserver(timer,
+ Packages.info.slony.clustertest.testcoordinator.Coordinator.EVENT_TIMER,
+ timeoutObserver);
+ if(this.slonArray[2].isFinished()) {
+ thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3');
+ thisRef.slonArray[2].run();
+ }
+
this.dropNode(1,3);
this.reAddNode(1,3,3);
+ this.slonikSync(1,1);
this.compareDb('db1', 'db2');
this.compareDb('db1', 'db3');
this.compareDb('db1', 'db4');
slonik.run();
this.coordinator.join(slonik);
this.testResults.assertCheck('drop path from 1 to 4',slonik.getReturnCode(),0);
+
+ this.slonikSync(1,1);
this.failNode(1,4,true);
this.compareDb('db2','db4');
this.reAddNode(1,4,4);
-
+ this.slonikSync(1,1);
for ( var idx = 1; idx <= this.getNodeCount(); idx++) {
this.slonArray[idx - 1].stop();
this.coordinator.join(this.slonArray[idx - 1]);
}
-
- this.compareDb('db1', 'db2');
+
+ this.compareDb('db1','db2');
this.compareDb('db1', 'db3');
this.compareDb('db1', 'db4');
this.compareDb('db4','db3');
}
Failover.prototype.failNode=function(node_id,backup_id, expect_success) {
-
+
this.slonArray[node_id-1].stop();
- this.coordinator.join(this.slonArray[node_id-1]);
+ if(!this.slonArray[node_id-1].isFinished()) {
+ this.coordinator.join(this.slonArray[node_id-1]);
+ }
+
var slonikPreamble = this.getSlonikPreamble();
var slonikScript = 'echo \'Failover.prototype.failNode\';\n';
if(waitIdx==idx || waitIdx==node_id) {
continue;
}
- slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n";
- slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n";
+ //autowaitfor slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n";
+ //autowaitfor slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n";
}
}
//+ 'sync(id=' + backup_id + ');\n'
}
Failover.prototype.dropNode=function(node_id,event_node) {
- this.coordinator.log("Failover.prototype.dropNode - begin");
+ this.coordinator.log("Failover.prototype.dropNode - begin");
var slonikPreamble = this.getSlonikPreamble();
var slonikScript = 'echo \'Failover.prototype.dropNode\';\n';
slonikScript += 'DROP NODE(id=' + node_id + ',event node=' + event_node +');\n';
slonik.run();
this.coordinator.join(slonik);
this.testResults.assertCheck('slonik drop node status okay',slonik.getReturnCode(),0);
- this.coordinator.log("Failover.prototype.dropNode - complete");
+ this.coordinator.log("Failover.prototype.dropNode - complete");
}
var dumpFile = java.io.File.createTempFile('slon_HeavyLoadTest','.sql');
dumpFile.deleteOnExit();
+ //wait until db4 is subscribed before creating the dump
+ this.slonikSync(1,1);
var dumpProcess = this.coordinator.createLogShippingDump('db4',dumpFile);
dumpProcess.run();
this.coordinator.join(dumpProcess);
this.subscribeSet(1,1,1,[3]);
java.lang.Thread.sleep(10*1000);
this.subscribeSet(1,1,3,[4]);
-
+ this.slonikSync(1,1);
this.coordinator.log("LogShipping.prototype.runTest - generate load");
//Generate some load.
var populate=this.generateLoad();
slonik.run();
java.lang.Thread.sleep(10*1000);
this.testResults.assertCheck('transaction is blocking add table command',slonik.isFinished(),false);
+
+
+
+
txnConnection.rollback();
txnConnection.close();
this.coordinator.join(slonik);
}
//A transaction should not block the subscription.
//make sure this is the case.
+ txnConnection = this.startTransaction();
this.coordinator.log("LongTransaction.prototype.runTest - sleep 3x60x1000");
java.lang.Thread.sleep(3*60*1000);
for(var idx=0; idx < subs.length; idx++) {
LongTransaction.prototype.startTransaction=function() {
var dbCon = this.coordinator.createJdbcConnection('db1');
var stat = dbCon.createStatement();
- stat.execute('BEGIN;');
+ dbCon.setAutoCommit(false);
var rs= stat.executeQuery('SELECT COUNT(*) FROM disorder.do_customer;');
rs.close();
stat.close();
--- /dev/null
+/**
+ * Tests a basic merge set.
+ */
+coordinator.includeFile("disorder/tests/BasicTest.js");
+
+
+function MergeSet(coordinator,results) {
+ BasicTest.call(this,coordinator,results);
+ this.syncWaitTime = 60;
+ this.testDescription = 'This test exercises the merge set command \n';
+}
+
+MergeSet.prototype = new BasicTest();
+MergeSet.prototype.constructor=MergeSet;
+MergeSet.prototype.getNodeCount = function() {
+ return 5;
+}
+
+MergeSet.prototype.runTest = function() {
+ this.coordinator.log("MergeSet.prototype.runTest - begin");
+
+ this.testResults.newGroup("merge set");
+ //this.prepareDb(['db1','db2']);
+
+
+//First setup slony
+ this.coordinator.log("MergeSet.prototype.runTest - set up replication");
+ this.setupReplication();
+ this.addCompletePaths();
+ this.addTables();
+
+ this.coordinator.log("MergeSet.prototype.runTest - start slons");
+ //Start the slons.
+ //These must be started before slonik runs or the subscribe won't happen
+ //thus slonik won't finish.
+ var slonArray=[];
+ for(var idx=1; idx <= this.getNodeCount(); idx++) {
+ slonArray[idx-1] = this.coordinator.createSlonLauncher('db' + idx);
+ slonArray[idx-1].run();
+ }
+ this.createSecondSet(1);
+ var load = this.generateLoad();
+ java.lang.Thread.sleep(5*1000);
+ this.subscribeSet(1,1,1,[2,3]);
+ this.subscribeSet(1,1,3,[4,5]);
+ this.subscribeSet(2,1,1,[2,3]);
+ this.subscribeSet(2,1,3,[4,5]);
+ var mergeScript="merge set(id=1,add id=2,origin=1);";
+ var slonikPreamble=this.getSlonikPreamble();
+ var slonik = this.coordinator.createSlonik('merge set',slonikPreamble,mergeScript);
+ slonik.run();
+ this.coordinator.join(slonik);
+ this.testResults.assertCheck('merge set okay',slonik.getReturnCode(),
+ 0);
+
+ load.stop();
+ this.coordinator.join(load);
+ this.coordinator.log("MergeSet.prototype.runTest - subscriptions complete");
+ this.slonikSync(1,1);
+ this.coordinator.log("MergeSet.prototype.runTest - syncing complete");
+ this.compareDb('db1','db2');
+ this.compareDb('db1','db3');
+ this.compareDb('db1','db4');
+ this.compareDb('db1','db5');
+
+ for(var idx=1; idx <= this.getNodeCount(); idx++) {
+ slonArray[idx-1].stop();
+ this.coordinator.join(slonArray[idx-1]);
+ }
+ this.coordinator.log("MergeSet.prototype.runTest - complete");
+}
+
+MergeSet.prototype.getSyncWaitTime = function () {
+ return this.syncWaitTime;
+}
this.compareDb('db1','db3');
- this.moveSet(1,4,1);
+ this.moveSet(1,4,1);
+ this.slonikSync(1,1);
+ this.slonikSync(1,4);
this.failNode(1,4,true);
for(var idx=1; idx <= this.getNodeCount(); idx++) {
var slonikPreamble = this.getSlonikPreamble();
var slonikScript = 'echo \'OmitCopy.prototype.subscribeOmitCopy\';\n';
slonikScript += "subscribe set(id=1, provider=" + provider+", receiver=" + subscriberNodeId+", omit copy=true, forward=yes);\n";
- slonikScript += ' wait for event (origin='+origin+', wait on='+provider+',confirmed=all);\n';
var slonik=this.coordinator.createSlonik('omit copy subscribe',slonikPreamble,slonikScript);
slonik.run();
* due to the lock on set 1.
*/
var slonikPreamble = this.getSlonikPreamble();
- var slonikScript = 'drop set (id=2, origin=1);\n'
- + 'wait for event(origin=1, confirmed=2,wait on=1);\n';
+ var slonikScript = 'drop set (id=2, origin=1);\n';
var slonik=this.coordinator.createSlonik('drop set 1',slonikPreamble,slonikScript);
slonik.run();
this.subscribeSet(1,1, 1, [ 2, 3 ]);
this.subscribeSet(1,1, 3, [ 4, 5 ]);
-
+
/**
* Create the test_transient table.
*/
this.createAndReplicateTestTable();
-
/**
* Add a row to the table.
*/
* Ensure that changes to the table still get
* replicated.
*/
-
+ this.slonikSync(1,1);
this.executeScript("ALTER TABLE disorder.test_transient RENAME TO test_transient2;\n");
this.coordinator.log("Unsubscribe.prototype.unsubscribe - begin");
var slonikPreamble = this.getSlonikPreamble();
var slonikScript = 'echo \'Unsubscribe.prototype.unsubscribe\';\n';
- slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n'
- + 'wait for event(origin=' + node_id + ',wait on=' + node_id + ',confirmed=all);\n';
+ slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n';
+
var slonik = this.coordinator.createSlonik('unsubscribe ' , slonikPreamble,slonikScript);
slonik.run();
this.coordinator.join(slonik);
coordinator.includeFile('disorder/tests/RenameTests.js');
coordinator.includeFile('disorder/tests/CleanupTest.js');
coordinator.includeFile('disorder/tests/RecreateSet.js');
+coordinator.includeFile('disorder/tests/MergeSet.js');
var tests =
[new EmptySet(coordinator,results)
,new OmitCopy(coordinator,results)
,new BigBacklogTest(coordinator,results)
,new LongTransaction(coordinator,results)
,new RenameTests(coordinator,results)
-
+ ,new MergeSet(coordinator,results)
//Below tests are known to fail.
,new UnsubscribeBeforeEnable(coordinator,results)
,new DropSet(coordinator,results) //fails bug 133
,new CleanupTest(coordinator,results) //cleanup_interval does not (yet) do what the test wants
];
-//tests=[new CleanupTest(coordinator,results)];
+//tests=[new MergeSet(coordinator,results)];
+
var basicTest = new BasicTest(coordinator,results);
//Setup the schema.
create set (id=2, origin=1, comment='a second replication set');
set add table (set id=2, origin=1, id=5, fully qualified name = 'public.newtable', comment='some new table');
subscribe set(id=1, provider=1,receiver=2);
-wait for event(origin=1, confirmed=all, wait on=1);
merge set(id=1, add id=2,origin=1);
</programlisting>
clustername=testcluster;
store node(id=5,comment='some slave node',event node=1);
-wait for event(origin=1, confirmed=all, wait on=1);
</programlisting>
</para></listitem>
node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony';
clustername=testcluster;
node 1 admin conninfo='host=masterhost dbname=masterdb user=slony password=slony';
+# also include the admin conninfo lines for any other nodes in your cluster.
+#
+#
clustername=testcluster;
store path(server=1,client=5,conninfo='host=masterhost,dbname=masterdb,user=slony,password=slony');
store path(server=5,client=1,conninfo='host=slavehost,dbname=masterdb,user=slony,password=slony');
-wait for event(origin=1,confirmed=all,wait on=1);
-wait for event(origin=5,confirmed=all,wait on=1);
</programlisting>
</para></listitem>
node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony';
clustername=testcluster;
node 1 admin conninfo='host=masterhost dbname=slavedb user=slony password=slony';
+#
+# also include the admin conninfo lines for any other nodes in the cluster
+#
+#
clustername=testcluster;
subscribe set(id=1,provider=1, receiver=5,forward=yes);
-wait for event(origin=1, confirmed=all, wait on=1);
</programlisting>
</orderedlist>
store path(server=2,client=3,conninfo='host=slave2host,dbname=slave2db,user=slony,password=slony');
subscribe set(set id=1, provider=1, receiver=2,forward=yes);
-wait for event(origin=1, confirmed=all, wait on=1);
subscribe set (set id=1,provider=2, receiver=3,forward=yes);
-wait for event(origin=1,confirmed=all,wait on=1);
+wait for event(origin=1, confirmed=all, wait on=1);
</programlisting>
<para>
In the above example we define paths from 1==>2 and from 2==>3 but do
<!-- -->
<sect1 id="events">
<title>Events & Confirmations</title>
+<indexterm><primary>events and confirmations</primary></indexterm>
<para>
&slony1; transfers configuration changes and application data through
events. Events in &slony1; have an origin, a type and some parameters.
When an event is created it is inserted
into the event queue (the <xref linkend="table.sl-event"> table) on the node the event
-originates on. The remoteListener threads of slon processes then pick up
-that event (by quering the sl_event table) and passing the event to the
-slons remoteWorker thread for processing.
+originates on. The remoteListener threads for each remote &lslon; process then picks up
+that event (by querying the table &slevent;) and pass the event to the
+&lslon;'s remoteWorker thread for processing.
</para>
<para>
-An event is uniquely identified by taking both the node id of the node the
+An event is uniquely identified via the combination of the node id of the node the
event originates on and the event sequence number for that node.
-For example (1,5000001) identifies event 5000001 originating from node 1.
-While (3,5000001) identifies a different event that originated on a
+For example, (1,5000001) identifies event 5000001 originating from node 1.
+In contrast, (3,5000001) identifies a different event that originated on a
different node.
</para>
<title>SYNC Events</title>
<para>
SYNC events are used to transfer application data for one node to the next.
-When data in a replicated table changes a trigger fires that records information
-about the change in the <xref linkend="table.sl-log-1"> or <xref linkend="table.sl-log-2"> tables. The localListener thread
+When data in a replicated table changes, a trigger fires that records information
+about the change in the &sllog1; or &sllog2; tables. The localListener thread
in the slon processes will then periodically generate a SYNC event. When the
-SYNC event is created &slony1; will find the heighest log_seqid assigned so far
+SYNC event is created, &slony1; will determine the highest log_seqid assigned so far
along with a list of log_seqid's that were assigned to transactions that
-have not yet been committed. This information is stored as part of the SYNC
+have not yet been committed. This information is all stored as part of the SYNC
event.
</para>
<para>
-When a slon processes remoteWorker processes a SYNC it will query the rows
-from sl_log_1 and sl_log2 that are covered by the SYNC (log_seqid rows
-that had been committed at the time the SYNC was generated). The modifications
-described by these rows are then made on the subscriber.
+When the remoteWorker thread for a &lslon; processes a SYNC, it
+queries the rows from &sllog1; and &sllog2; that are covered by the
+SYNC (<emphasis>e.g.</emphasis> - log_seqid rows that had been
+committed at the time the SYNC was generated). The data modifications
+indicated by this logged data are then applied to the subscriber.
</para>
</sect2>
<sect2>
<title>Event Confirmations</title>
<para>
-When an event is processed by the slon for a remote node a CONFIRM message
-is generated by inserting a row into the sl_confirm table. This row indicates
-that a particular event was confirmed by a particular receiver node.
-Confirmation messages are the transfered back to all other nodes in the cluster.
+When an event is processed by the &lslon; process for a remote node, a CONFIRM message
+is generated by inserting a tuple into the &slconfirm; table. This tuple indicates
+that a particular event has been confirmed by a particular receiver node.
+Confirmation messages are then transferred back to all other nodes in the cluster.
</para>
</sect2>
<sect2>
<title>Event cleanup</title>
<para>
-The slon cleanupThread will periodically run the
-<xref linkend="function.cleanupevent-p-interval-interval"> database
-function that will delete all but the most recently confirmed event for
-each origin/receiver pair (because if an event has been confirmed by a
-receiver then all older events from that origin have also been confirmed
-by the receiver). Then the function will delete any SYNC events that are
-older than the oldest row left in sl_confirm (for each origin). The data
-for these deleted events will also be removed from the sl_log_1 and sl_log_2
-tables.
+The &lslon; cleanupThread periodically runs the <xref
+linkend="function.cleanupevent-p-interval-interval"> database function
+that deletes all but the most recently confirmed event for each
+origin/receiver pair (this is safe to do because if an event has been
+confirmed by a receiver, then we know that all older events from that
+origin have also been confirmed by the receiver). Then the function
+deletes all SYNC events that are older than the oldest row left in
+&slconfirm; (for each origin). The data for these deleted events will
+also be removed from the &sllog1; and &sllog2; tables.
</para>
+
<para>
-When &slony1; is first enabled it will log the data to replicate to the
-sl_log_1 table. After a while it will stop logging to sl_log_1 and switch
-to logging on sl_log_2. When all of rows in in sl_log_1 have been replicated
-&slony1; will TRUNCATE the sl_log_1 table (to clear out the dead tuples),
-stop logging to sl_log_2 and switch back to logging to the newly truncated
-sl_log_1 table. This process will be periodically repeated as &slony1; runs.
+When &slony1; is first enabled it will log the data to replicate to
+the &sllog1; table. After a while it will stop logging to &sllog1;
+and switch to logging in &sllog2;. When all the data in &sllog1; is
+known to have been replicated to all the other nodes, &slony1; will
+TRUNCATE the &sllog1; table, clearing out this now-obsolete
+replication data. Then, it stops logging to &sllog2;, switching back
+to logging to the freshly truncated &sllog1; table. This process is
+repeated periodically as &slony1; runs, keeping these tables from
+growing uncontrollably. By using TRUNCATE, we guarantee that the
+tables are properly emptied out.
</para>
</sect2>
+<sect2>
+<title>Slonik and Event Confirmations</title>
+<para>
+
+&lslonik; can submit configuration commands to different event nodes,
+as controlled by the parameters of each slonik command. If two
+commands are submitted to different nodes, it might be important to
+ensure they are processed by other nodes in a consistent order. The
+&lslonik; <xref linkend="stmtwaitevent"> command may be used to
+accomplish this, but as of &slony1; 2.1 this consistency is handled
+automatically by &lslonik; under a number of circumstances.
+</para>
+<orderedlist>
+<listitem><para>Before slonik submits an event to a node,
+it waits until that node has confirmed the last configuration event
+from the previous event node.</para></listitem>
+
+<listitem><para>Before slonik submits a <xref
+linkend="stmtsubscribeset"> command, it verifies that the provider
+node has confirmed all configuration events from all other
+nodes.</para></listitem>
+
+<listitem><para>Before &lslonik; submits a <xref
+linkend="stmtdropNode"> event, it verifies that all nodes in the
+cluster (aside from the one being dropped, of course!) have already
+caught up with all other nodes</para></listitem>
+
+<listitem><para>Before slonik submits a <xref linkend="stmtcloneprepare">
+it verifies that the node being cloned is caught up with all other
+nodes in the cluster.</para></listitem>
+
+<listitem><para>Before slonik submits a <xref linkend="stmtcreateset"> command
+it verifies that any <xref linkend="stmtdropset"> commands have been confirmed by
+all nodes.</para></listitem>
+
+</orderedlist>
+<para>
+
+When &lslonik; starts up, it contacts all nodes for which it has <xref
+linkend="admconninfo"> information, to find the last non-SYNC event
+from each node. Submitting commands from multiple &lslonik; instances
+at the same time will confuse &lslonik; and is not recommended.
+Whenever &lslonik; is waiting for an event confirmation, it displays a
+message every 10 seconds indicating which events are still
+outstanding. Any commands that might require slonik to wait for event
+confirmations may not be validly executed within a <link
+linkend="tryblock">try block</link> for the very same reasons that
+<xref linkend="stmtwaitevent"> command may not be used within a <link
+linkend="tryblock">try block</link>, namely that it is not reasonable
+to ask &slony1; to try to roll back events.
+</para>
+
+<para>
+Automatic waiting for confirmations may be disabled in &lslonik; by
+running &lslonik; with the <option>-w</option> option.</para>
+
+</sect2>
</sect1>
<refsynopsisdiv>
<cmdsynopsis>
<command>slonik</command>
+ <arg><replaceable class="parameter">options</replaceable></arg>
<arg><replaceable class="parameter">filename</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+<refsect1>
+ <title>Options</title>
+ <variablelist>
+ <varlistentry>
+ <term><option>-w</option></term>
+ <listitem>
+ <para>
+ Suppress slonik's behaviour of automatically waiting
+ for event confirmations before submitting events to
+ a different node. If this option is specified, your
+ slonik script may require explicit <xref
+ linkend="stmtwaitevent"> commands in order to behave
+ properly, as was the behaviour of slonik prior to
+ version 2.1.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
<refsect1>
<title>Description</title>
<para>
Comments begin at a hash sign (#) and extend to the end of the line.
</para>
- <sect2><title>Command groups</title>
+ <sect2 id="tryblock"><title>Command groups</title>
<para>
Commands can be combined into groups of commands with optional
<command>on error</command> and <command>on success</command> conditionals.
therein; no public objects should be locked during the duration of
this.</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
therein; no public objects should be locked during the duration of
this.</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command. </para>
+ </refsect1>
+
<refsect1> <title> Version Information </title> <para> This command
was introduced in &slony1; 1.0. The <envar>SPOOLNODE</envar>
parameter was introduced in version 1.1, but was vestigial in that
cluster. </para>
</refsect1>
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits until nodes (other than the one being dropped)
+ are caught up with non-SYNC events from all other nodes before
+ submitting the DROP NODE command.
+ </para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
<para> In version 2.0, the default value for <envar>EVENT NODE</envar> was removed, so a node must be specified.</para>
<para>After dropping a node, you may also need to recycle
connections in your application.</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title> <para> This command
was introduced in &slony1; 1.0; frequent use became unnecessary as
of version 1.0.5. There are, however, occasional cases where it is
<para> No application-visible locking should take place. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title> <para> This command
was introduced in &slony1; 1.0. As of version 1.1, you <emphasis>should</emphasis> no
longer need to use this command, as listen paths are generated automatically. </para>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title> <para> This command
was introduced in &slony1; 1.0. As of version 1.1, you should not
need to use it anymore. </para>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command. Slonik will also wait until any outstanding
+ DROP SET commands are confirmed by all nodes before it submits
+ the CREATE SET command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
<para> Until version 1.2, it would crash if no comment was provided. </para>
on each replicated table in order to modify the table schema to
clean up the triggers and rules. </para>
</refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
<programlisting>
# Assuming that node 1 is the origin of set 999 that has direct subscribers 2 and 3
SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 2);
- SYNC (ID=1);
- WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 2, WAIT ON=1);
SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 3);
- SYNC (ID=1);
- WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 3, WAIT ON=1);
MERGE SET ( ID = 1, ADD ID = 999, ORIGIN = 1 );
</programlisting>
</refsect1>
</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command. Slonik will also wait for any
+ in progress subscriptions involving the ADD ID to be subscribed
+ before submitting the MERGE SET command. </para>
+ </refsect1>
+
<refsect1> <title> Version Information </title> <para> This command
was introduced in &slony1; 1.0.5. In 1.2.1, a race condition was
rectified where the merge request would be submitted while
takes place at the time of the <command>SUBSCRIBE_SET</command>
event. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
</programlisting>
</refsect1>
- <refsect1> <title> Locking Behaviour </title>
+ <refsect1> <title> Locking Behaviour </title>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
replication trigger. On subscriber nodes, this also involves
adding back any rules/triggers that have been hidden. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0.5 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0.5 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0.5 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0.5 </para>
</refsect1>
acquiring any locks on the provider node.</para>
<para> On the subscriber node, it will have the effect of locking
- every table in the replication set. In version 1.2 or later, exclusive
+ every table in the replication set. In version 1.2 and later, exclusive
locks are acquired at the beginning of the process.
</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits until the provider has confirmed all
+ outstanding configuration events from any other node before
+ contacting the provider to determine the set origin. Slonik
+ then waits for the command submitted to the previous event
+ node to be confirmed on the origin before submitting this
+ command to the origin.</para>
+</refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
<para> The <command>OMIT COPY</command> option was introduced in &slony1; 2.0.3.</para>
tables and restore other triggers/rules. </para>
</refsect1>
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
+
<refsect1><title> Dangerous/Unintuitive Behaviour </title>
<para> Resubscribing an unsubscribed set requires a
on the origin node, and triggers are added to each such table that
reject table updates. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
on the origin node, as the triggers are removed from each table
that reject table updates. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
denyaccess trigger is dropped and a logtrigger trigger
added. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik will submit the FAILOVER_EVENT without waiting
+ but wait until the most ahead node has received confirmations
+ of the FAILOVER_EVENT from all nodes before completing.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
<para> In version 2.0, the default <envar>BACKUP NODE</envar> value of 1 was removed, so it is mandatory to provide a value for this parameter.</para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para> Slonik waits for the command submitted to the previous
+ event node to be confirmed on the specified event node before
+ submitting this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0. </para>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.0 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.1 </para>
</refsect1>
<para> No application-visible locking should take place. </para>
</refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.1.6 / 1.2.1 </para>
</refsect1>
sleep (seconds = 5);
</Programlisting>
</Refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 1.1.6 / 1.2.1. </para>
</refsect1>
sync (id=22);
</Programlisting>
</Refsect1>
+
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik will wait until the node being cloned (the provider)
+ is caught up with all other nodes before submitting the clone prepare
+ command</para>
+</refsect1>
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 2.0. </para>
</refsect1>
clone finish (id = 33, provider = 22);
</Programlisting>
</Refsect1>
+
+ <refsect1> <title> Slonik Event Confirmation Behaviour </title>
+ <para>Slonik does not wait for event confirmations before
+ performing this command.</para>
+ </refsect1>
+
+
<refsect1> <title> Version Information </title>
<para> This command was introduced in &slony1; 2.0. </para>
</refsect1>
/* @+nullderef@ */
}
+/* Provide a way to reset the per-session data structure that stores
+ the cluster status in the C functions.
+
+ * This is used to rectify the case where CLONE NODE updates the node
+ * ID, but calls to getLocalNodeId() could continue to return the old
+ * value.
+ */
Datum
_Slony_I_resetSession(PG_FUNCTION_ARGS)
{
--
-- Generate the MERGE_SET event.
-- ----------------------------------------------------------------------
-create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4)
+create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4)
returns bigint
as $$
declare
v_origin int4;
+ in_progress boolean;
begin
-- ----
-- Grab the central configuration lock
-- ----
-- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed
-- ----
- if exists (select true from @NAMESPACE@.sl_event
- where ev_type = 'ENABLE_SUBSCRIPTION'
- and ev_data1 = p_add_id::text
- and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm
- where con_origin = ev_origin
- and con_received::text = ev_data3))
- then
+ select @NAMESPACE@.isSubscriptionInProgress(p_add_id) into in_progress ;
+
+ if in_progress then
raise exception 'Slony-I: set % has subscriptions in progress - cannot merge',
p_add_id;
end if;
Both sets must exist, and originate on the same node. They must be
subscribed by the same set of nodes.';
+
+create or replace function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4)
+returns boolean
+as $$
+begin
+ if exists (select true from @NAMESPACE@.sl_event
+ where ev_type = 'ENABLE_SUBSCRIPTION'
+ and ev_data1 = p_add_id::text
+ and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm
+ where con_origin = ev_origin
+ and con_received::text = ev_data3))
+ then
+ return true;
+ else
+ return false;
+ end if;
+end;
+$$ language plpgsql;
+comment on function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4) is
+'Checks to see if a subscription for the indicated set is in progress.
+Returns true if a subscription is in progress. Otherwise false';
+
-- ----------------------------------------------------------------------
-- FUNCTION mergeSet_int (set_id, add_id)
--
comment on function @NAMESPACE@.store_application_name (i_name text) is
'Set application_name GUC, if possible. Returns NULL if it fails to work.';
+create or replace function @NAMESPACE@.is_node_reachable(origin_node_id integer,
+ receiver_node_id integer) returns boolean as $$
+declare
+ listen_row record;
+ reachable boolean;
+begin
+ reachable:=false;
+ select * into listen_row from @NAMESPACE@.sl_listen where
+ li_origin=origin_node_id and li_receiver=receiver_node_id;
+ if found then
+ reachable:=true;
+ end if;
+ return reachable;
+end $$ language plpgsql;
+
+comment on function @NAMESPACE@.is_node_reachable(origin_node_id integer, receiver_node_id integer)
+is 'Is the receiver node reachable from the origin, via any of the listen paths?';
+
create or replace function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$
begin
-- Trim out old state for this component
comment on function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is
'Store state of a Slony component. Useful for monitoring';
+
int parser_errors = 0;
int current_try_level;
+int last_event_node=-1;
+int auto_wait_disabled=0;
+
static char myfull_path[MAXPGPATH];
static char share_path[MAXPGPATH];
const char * table_name);
static int slonik_is_slony_installed(SlonikStmt * stmt,
SlonikAdmInfo * adminfo);
+static int slonik_submitEvent(SlonikStmt * stmt,
+ SlonikAdmInfo * adminfo,
+ SlonDString * query,
+ SlonikScript * script,
+ int suppress_wait_for);
+
+static size_t slonik_get_last_event_id(SlonikStmt* stmt,
+ SlonikScript * script,
+ const char * event_filter,
+ int64 ** events);
+static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1,
+ SlonikStmt * stmt,
+ int ignore_node);
/* ----------
* main
* ----------
extern int optind;
int opt;
- while ((opt = getopt(argc, (char **)argv, "hv")) != EOF)
+ while ((opt = getopt(argc, (char **)argv, "hvw")) != EOF)
{
switch (opt)
{
printf("slonik version %s\n", SLONY_I_VERSION_STRING);
exit(0);
break;
+ case 'w':
+ auto_wait_disabled=1;
+ break;
default:
printf("unknown option '%c'\n", opt);
script_exec_stmts(SlonikScript * script, SlonikStmt * hdr)
{
int errors = 0;
+ int64 * events;
+ size_t event_length;
+ int idx=0;
+ SlonikAdmInfo * curAdmInfo;
+ event_length=slonik_get_last_event_id(hdr,script,"ev_type <> 'SYNC' ",
+ &events);
+ for( curAdmInfo = script->adminfo_list;
+ curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+ {
+ curAdmInfo->last_event=events[idx];
+ idx++;
+ if(idx > event_length)
+ break;
+ }
+ free(events);
+
while (hdr && errors == 0)
{
hdr->script = script;
"select \"_%s\".enableNode(%d); ",
stmt->hdr.script->clustername, stmt->no_id, stmt->no_comment,
stmt->hdr.script->clustername, stmt->no_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
{
SlonikAdmInfo *adminfo1;
SlonDString query;
+ SlonikAdmInfo * curAdmInfo;
adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->ev_origin);
if (adminfo1 == NULL)
if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
return -1;
+ if(!auto_wait_disabled)
+ {
+ for(curAdmInfo = stmt->hdr.script->adminfo_list;
+ curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+ {
+ if(curAdmInfo->no_id == stmt->no_id)
+ continue;
+ if(slonik_is_slony_installed((SlonikStmt*)stmt,curAdmInfo) > 0 )
+ {
+ slonik_wait_config_caughtup(curAdmInfo,(SlonikStmt*)stmt,
+ stmt->no_id);
+ }
+
+ }
+
+ }
+
dstring_init(&query);
slon_mkquery(&query,
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->no_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ /**
+ * we disable auto wait because we perform a wait
+ * above ignoring the node being dropped.
+ */
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,true) < 0)
{
dstring_free(&query);
return -1;
}
+ /**
+ * if we have a conninfo for the node being dropped
+ * we want to clear out the last seqid.
+ */
+ adminfo1 = get_adminfo(&stmt->hdr,stmt->no_id);
+ if(adminfo1 != NULL) {
+ adminfo1->last_event=-1;
+ }
dstring_free(&query);
return 0;
"select \"_%s\".failedNode(%d, %d); ",
stmt->hdr.script->clustername,
stmt->no_id, stmt->backup_node);
+ printf("executing failedNode() on %d\n",adminfo1->no_id);
if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0)
{
free(configbuf);
if (use_node != stmt->backup_node)
{
+
+ /**
+ * commit the transaction so a new transaction
+ * is ready for the lock table
+ */
+ if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0)
+ {
+ free(configbuf);
+ dstring_free(&query);
+ return -1;
+ }
slon_mkquery(&query,
+ "lock table \"_%s\".sl_event_lock; "
"select \"_%s\".storeListen(%d,%d,%d); "
"select \"_%s\".subscribeSet_int(%d,%d,%d,'t','f'); ",
stmt->hdr.script->clustername,
+ stmt->hdr.script->clustername,
stmt->no_id, use_node, stmt->backup_node,
stmt->hdr.script->clustername,
setinfo[i].set_id, use_node, stmt->backup_node);
sprintf(ev_seqno_c, INT64_FORMAT, setinfo[i].max_seqno);
sprintf(ev_seqfake_c, INT64_FORMAT, ++max_seqno_total);
-
+ if (db_commit_xact((SlonikStmt *) stmt, max_node_total->adminfo)
+ < 0)
+ {
+ return -1;
+ }
slon_mkquery(&query,
"lock table \"_%s\".sl_event_lock; "
"select \"_%s\".failedNode2(%d,%d,%d,'%s','%s'); ",
stmt->hdr.script->clustername,
stmt->no_id, stmt->backup_node,
setinfo[i].set_id, ev_seqno_c, ev_seqfake_c);
+ printf("NOTICE: executing \"_%s\".failedNode2 on node %d\n",
+ stmt->hdr.script->clustername,
+ max_node_total->adminfo->no_id);
if (db_exec_command((SlonikStmt *) stmt,
max_node_total->adminfo, &query) < 0)
- rc = -1;
+ rc = -1;
+ else
+ {
+ SlonikAdmInfo * failed_conn_info=NULL;
+ SlonikAdmInfo * last_conn_info=NULL;
+ bool temp_conn_info=false;
+ /**
+ * now wait for the FAILOVER to finish.
+ * To do this we must wait for the FAILOVER_EVENT
+ * which has ev_origin=stmt->no_id (the failed node)
+ * but was incjected into the sl_event table on the
+ * most ahead node (max_node_total->adminfo)
+ * to be confirmed by the backup node.
+ *
+ * Then we wait for the backup node to send an event
+ * and be confirmed elsewhere.
+ *
+ */
+
+
+ SlonikStmt_wait_event wait_event;
+ wait_event.hdr=*(SlonikStmt*)stmt;
+ wait_event.wait_origin=stmt->no_id; /*failed node*/
+ wait_event.wait_on=max_node_total->adminfo->no_id;
+ wait_event.wait_confirmed=-1;
+ wait_event.wait_timeout=0;
+
+ /**
+ * see if we can find a admconninfo
+ * for the failed node.
+ */
+
+ for(failed_conn_info = stmt->hdr.script->adminfo_list;
+ failed_conn_info != NULL;
+ failed_conn_info=failed_conn_info->next)
+ {
+
+ if(failed_conn_info->no_id==stmt->no_id)
+ {
+ break;
+ }
+ last_conn_info=failed_conn_info;
+ }
+ if(failed_conn_info == NULL)
+ {
+ temp_conn_info=true;
+ last_conn_info->next = malloc(sizeof(SlonikAdmInfo));
+ memset(last_conn_info->next,0,sizeof(SlonikAdmInfo));
+ failed_conn_info=last_conn_info->next;
+ failed_conn_info->no_id=stmt->no_id;
+ failed_conn_info->stmt_filename="slonik generated";
+ failed_conn_info->stmt_lno=-1;
+ failed_conn_info->conninfo="";
+ failed_conn_info->script=last_conn_info->script;
+ }
+
+ failed_conn_info->last_event=max_seqno_total;
+
+ /*
+ * commit all open transactions despite of all possible errors
+ * otherwise the WAIT FOR will not work.
+ **/
+ for (i = 0; i < num_nodes; i++)
+ {
+ if (db_commit_xact((SlonikStmt *) stmt,
+ nodeinfo[i].adminfo) < 0)
+ rc = -1;
+ }
+
+
+ rc = slonik_wait_event(&wait_event);
+ if(rc < 0)
+ {
+ /**
+ * pretty serious? how do we recover?
+ */
+ printf("%s:%d error waiting for event\n",
+ stmt->hdr.stmt_filename, stmt->hdr.stmt_lno);
+ }
+
+ if(temp_conn_info)
+ {
+ last_conn_info->next=failed_conn_info->next;
+ free(failed_conn_info);
+
+ }
+
+ slon_mkquery(&query,
+ "lock table \"_%s\".sl_event_lock; "
+ "select \"_%s\".createEvent('_%s', 'SYNC'); ",
+ stmt->hdr.script->clustername,
+ stmt->hdr.script->clustername,
+ stmt->hdr.script->clustername);
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,1) < 0)
+ {
+ printf("%s:%d: error submitting SYNC event to backup node"
+ ,stmt->hdr.stmt_filename, stmt->hdr.stmt_lno);
+ }
+
+
+
+ }/*else*/
+
}
}
-
+
/*
* commit all open transactions despite of all possible errors
*/
for (i = 0; i < num_nodes; i++)
{
- if (db_commit_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0)
+ if (db_commit_xact((SlonikStmt *) stmt,
+ nodeinfo[i].adminfo) < 0)
rc = -1;
}
+
free(configbuf);
dstring_free(&query);
if (adminfo1 == NULL)
return -1;
- dstring_init(&query);
+ if(!auto_wait_disabled)
+ slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
+ dstring_init(&query);
+
if (stmt->no_comment == NULL)
slon_mkquery(&query,
"select \"_%s\".cloneNodePrepare(%d, %d, 'Node %d'); ",
stmt->hdr.script->clustername,
stmt->no_id, stmt->no_provider,
stmt->no_comment);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->pa_server, stmt->pa_client,
stmt->pa_conninfo, stmt->pa_connretry);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,1) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->pa_server, stmt->pa_client);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->li_origin, stmt->li_provider,
stmt->li_receiver);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->li_origin, stmt->li_provider,
stmt->li_receiver);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
SlonikAdmInfo *adminfo1;
SlonDString query;
const char *comment;
+ SlonikAdmInfo * curAdmInfo;
+ int64 * drop_set_events;
+ int64 * cached_events;
+ size_t event_size;
+ int adm_idx;
adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin);
if (adminfo1 == NULL)
return -1;
+ if( ! auto_wait_disabled)
+ {
+ /**
+ * loop through each node and make sure there are no
+ * pending DROP SET commands.
+ *
+ * if there is a DROP SET command from the node
+ * in sl_event then we wait until all other nodes are
+ * caughtup to that.
+ *
+ */
+ event_size = slonik_get_last_event_id((SlonikStmt*)stmt,
+ stmt->hdr.script,
+ "ev_type='DROP_SET'",
+ &drop_set_events);
+
+ /**
+ * copy the 'real' last event to storage
+ * and update the AdmInfo structure with the last 'DROP SET' id.
+ */
+ cached_events=malloc(sizeof(int64)*event_size);
+ adm_idx=0;
+ for(curAdmInfo = stmt->hdr.script->adminfo_list;
+ curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+ {
+ cached_events[adm_idx]=curAdmInfo->last_event;
+ curAdmInfo->last_event=drop_set_events[adm_idx];
+ adm_idx++;
+ }
+ slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
+ /***
+ * reset the last_event values in the AdmInfo to
+ * the values we saved above.
+ */
+ adm_idx=0;
+ for(curAdmInfo = stmt->hdr.script->adminfo_list;
+ curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next)
+ {
+ curAdmInfo->last_event=cached_events[adm_idx];
+ adm_idx++;
+ }
+ free(cached_events);
+ free(drop_set_events);
+
+ }
+
if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->set_id, comment);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->set_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
{
SlonikAdmInfo *adminfo1;
SlonDString query;
+ PGresult *res;
+ bool in_progress=1;
adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin);
if (adminfo1 == NULL)
return -1;
- if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
- return -1;
+
+ /**
+ * The event node (the origin) should be caught up
+ * with itself before submitting a merge set.
+ * this ensures no subscriptions involving the set
+ * are still in progress.
+ *
+ * (we could also check for the event number of any
+ * unconfirmed subscriptions and wait for that
+ * but we don't)
+ */
+
+
dstring_init(&query);
+ slon_mkquery(&query,"select \"_%s\".isSubscriptionInProgress(%d)"
+ ,stmt->hdr.script->clustername,
+ stmt->add_id);
+ while(in_progress)
+ {
+ char *result;
+
+ if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0)
+ return -1;
+
+ res = db_exec_select((SlonikStmt*) stmt,adminfo1,&query);
+ if (res == NULL)
+ {
+ dstring_free(&query);
+ return -1;
+
+ }
+ result = PQgetvalue(res,0,0);
+ if(result != NULL && (*result=='t' ||
+ *result=='T'))
+ {
+ printf("%s:%d subscription in progress before mergeSet. waiting",
+ stmt->hdr.stmt_filename,stmt->hdr.stmt_lno);
+ db_rollback_xact((SlonikStmt *) stmt, adminfo1);
+ sleep(5);
+ }
+ else
+ in_progress=false;
+ if(result != NULL)
+ PQclear(res);
+ }
+
slon_mkquery(&query,
"lock table \"_%s\".sl_event_lock;"
"select \"_%s\".mergeSet(%d, %d); ",
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->set_id, stmt->add_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
slon_mkquery(&query,
"select \"_%s\".setAddTable(%d, %d, '%q', '%q', '%q'); ",
stmt->hdr.script->clustername,
- stmt->set_id, tab_id,
- fqname, idxname, stmt->tab_comment);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ stmt->set_id, stmt->tab_id,
+ stmt->tab_fqname, idxname, stmt->tab_comment);
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
PQclear(res);
dstring_free(&query);
dstring_terminate(&query);
}
else
- rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
+ rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
stmt->seq_fqname,
stmt->set_id,stmt->seq_comment,
stmt->seq_id);
if(seq_id < 0)
{
- seq_id = slonik_get_next_sequence_id(stmt);
+ seq_id = slonik_get_next_sequence_id((SlonikStmt*)stmt);
if(seq_id < 0)
return -1;
}
stmt->script->clustername,
set_id, seq_id, seq_name,
seq_comment);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->script,auto_wait_disabled) < 0)
{
db_notice_silent = false;
dstring_free(&query);
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->tab_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
"select \"_%s\".setDropSequence(%d); ",
stmt->hdr.script->clustername,
stmt->seq_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
db_notice_silent = false;
dstring_free(&query);
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->tab_id, stmt->new_set_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->seq_id, stmt->new_set_id);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
* about this change directly.
*/
+ /**
+ * we don't actually want to execute that query until
+ * the provider node is caught up with all other nodes wrt config data.
+ *
+ * this is because we don't want to pick the origin based on
+ * stale data.
+ *
+ * @note an alternative might be to contact all adminconninfo
+ * nodes looking for the set origin and then submit the
+ * set origin to that. This avoids the wait for and is probably
+ * what we should do.
+ */
+ if(!auto_wait_disabled)
+ slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1);
+
slon_mkquery(&query,"select count(*) FROM \"_%s\".sl_subscribe " \
"where sub_set=%d AND sub_receiver=%d " \
" and sub_active=true and sub_provider<>%d",
stmt->sub_setid,stmt->sub_receiver,
stmt->sub_provider);
- res1 = db_exec_select((SlonikStmt*) stmt,adminfo1,&query);
+ res1 = db_exec_select(&stmt->hdr,adminfo1,&query);
if(res1 == NULL) {
dstring_free(&query);
return -1;
}
- if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0)
+ if(PQntuples(res1) > 0)
{
- reshape=1;
+ if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0)
+ {
+ reshape=1;
+ }
}
PQclear(res1);
dstring_reset(&query);
" set_id=%d",stmt->hdr.script->clustername,
stmt->sub_setid);
res1 = db_exec_select((SlonikStmt*)stmt,adminfo1,&query);
- if(res1==NULL)
+ if(res1==NULL || PQntuples(res1) <= 0 )
{
- PQclear(res1);
+ printf("%s:%d error: can not determine set origin for set %d\n",
+ stmt->hdr.stmt_filename,stmt->hdr.stmt_lno,stmt->sub_setid);
+ if(res1 != NULL)
+ PQclear(res1);
dstring_free(&query);
return -1;
stmt->sub_receiver,
(stmt->sub_forward) ? "t" : "f",
(stmt->omit_copy) ? "t" : "f");
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->sub_setid, stmt->sub_receiver);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->set_id, stmt->new_origin);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
stmt->ddl_setid, /* dstring_data(&script), */
stmt->only_on_node);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,auto_wait_disabled) < 0)
{
dstring_free(&query);
return -1;
time_t now;
int all_confirmed = 0;
char seqbuf[64];
+ int loop_count=0;
+ SlonDString outstanding_nodes;
+ int tupindex;
adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->wait_on);
if (adminfo1 == NULL)
time(&timeout);
timeout += stmt->wait_timeout;
dstring_init(&query);
-
+ dstring_init(&outstanding_nodes);
while (!all_confirmed)
{
all_confirmed = 1;
return -1;
}
if (PQntuples(res) > 0)
- all_confirmed = 0;
+ {
+ all_confirmed = 0;
+ dstring_reset(&outstanding_nodes);
+ for(tupindex=0; tupindex < PQntuples(res); tupindex++)
+ {
+ char * node = PQgetvalue(res,tupindex,0);
+ char * last_event = PQgetvalue(res,tupindex,1);
+ if( last_event == 0)
+ last_event="null";
+ slon_appendquery(&outstanding_nodes,"%snode %s only on event %s"
+ , tupindex==0 ? "" : ", "
+ , node,last_event);
+
+ }
+ }
PQclear(res);
if (!all_confirmed)
return -1;
}
+ loop_count++;
+ if(loop_count % 10 == 0 && stmt->wait_confirmed >= 0)
+ {
+ sprintf(seqbuf, INT64_FORMAT, adminfo->last_event);
+ printf("%s:%d: waiting for event (%d,%s) to be confirmed on node %d\n"
+ ,stmt->hdr.stmt_filename,stmt->hdr.stmt_lno
+ ,stmt->wait_origin,seqbuf,
+ stmt->wait_confirmed);
+ }
+ else if (loop_count % 10 ==0 )
+ {
+ sprintf(seqbuf, INT64_FORMAT, adminfo->last_event);
+ printf("%s:%d: waiting for event (%d,%s). %s\n",
+ stmt->hdr.stmt_filename,stmt->hdr.stmt_lno,
+ stmt->wait_origin,seqbuf,
+ dstring_data(&outstanding_nodes));
+
+ }
sleep(1);
}
-
+ dstring_free(&outstanding_nodes);
dstring_free(&query);
return 0;
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->hdr.script->clustername);
- if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0)
+ if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query,
+ stmt->hdr.script,1) < 0)
{
dstring_free(&query);
return -1;
}
/**
- * checks all nodes (that an admin conninfo exists for)
- * to find the next available table id.
- *
- *
- */
+* checks all nodes (that an admin conninfo exists for)
+* to find the next available table id.
+*
+*
+*/
static int
slonik_get_next_tab_id(SlonikStmt * stmt)
{
int tab_id=0;
char * tab_id_str;
PGresult* res;
-
+
dstring_init(&query);
slon_mkquery(&query,
"select max(tab_id) FROM \"_%s\".sl_table",
res = db_exec_select((SlonikStmt*)stmt,adminfo,&query);
if(res == NULL )
{
- printf("%s:%d: Error:could not query node %d for next table id",
- stmt->stmt_filename,stmt->stmt_lno,
- adminfo->no_id);
- if( res != NULL)
- PQclear(res);
- dstring_terminate(&query);
- return -1;
+ printf("%s:%d: Error:could not query node %d for next table id",
+ stmt->stmt_filename,stmt->stmt_lno,
+ adminfo->no_id);
+ if( res != NULL)
+ PQclear(res);
+ dstring_terminate(&query);
+ return -1;
}
}
else
{
tab_id_str = PQgetvalue(res,0,0);
if(tab_id_str != NULL)
- tab_id=strtol(tab_id_str,NULL,10);
+ tab_id=strtol(tab_id_str,NULL,10);
else
{
PQclear(res);
/**
- * checks all nodes (that an admin conninfo exists for)
- * to find the next available sequence id.
- *
- *
- */
+* checks all nodes (that an admin conninfo exists for)
+* to find the next available sequence id.
+*
+*
+*/
static int
slonik_get_next_sequence_id(SlonikStmt * stmt)
{
char * seq_id_str;
PGresult* res;
int rc;
-
+
dstring_init(&query);
slon_mkquery(&query,
"select max(seq_id) FROM \"_%s\".sl_sequence",
{
seq_id_str = PQgetvalue(res,0,0);
if(seq_id_str != NULL)
- seq_id=strtol(seq_id_str,NULL,10);
+ seq_id=strtol(seq_id_str,NULL,10);
else
{
PQclear(res);
}
/**
- * find the origin node for a particular set.
- * This function will query the first admin node it
- * finds to determine the origin of the set.
- *
- * If the node doesn't know about the set then
- * it will query the next admin node until it finds
- * one that does.
- *
- */
+* find the origin node for a particular set.
+* This function will query the first admin node it
+* finds to determine the origin of the set.
+*
+* If the node doesn't know about the set then
+* it will query the next admin node until it finds
+* one that does.
+*
+*/
static int find_origin(SlonikStmt * stmt,int set_id)
{
-
+
SlonikAdmInfo *adminfo_def;
SlonDString query;
PGresult * res;
slon_mkquery(&query,
"select set_origin from \"_%s\".\"sl_set\" where set_id=%d",
stmt->script->clustername,set_id);
-
+
for (adminfo_def = stmt->script->adminfo_list;
adminfo_def; adminfo_def = adminfo_def->next)
{
origin_id_str = PQgetvalue(res,0,0);
if(origin_id_str != NULL)
{
- origin_id=strtol(origin_id_str,NULL,10);
- PQclear(res);
+ origin_id=strtol(origin_id_str,NULL,10);
+ PQclear(res);
}
else
{
PQclear(res);
continue;
-
+
}
}
if(origin_id >= 0)
dstring_terminate(&query);
-
+
return origin_id;
}
/**
- * adds any sequences that table_name depends on to the replication
- * set.
- *
- *
- *
- */
+* adds any sequences that table_name depends on to the replication
+* set.
+*
+*
+*
+*/
int
slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt,
- SlonikAdmInfo * adminfo1,
- const char * table_name)
+ SlonikAdmInfo * adminfo1,
+ const char * table_name)
{
SlonDString query;
const char * seq_name;
char * comment;
int rc;
-
+
dstring_init(&query);
slon_mkquery(&query,
"select pg_get_serial_sequence('%s',column_name) "
}
for(idx=0; idx < PQntuples(result);idx++)
{
-
+
if(!PQgetisnull(result,idx,0) )
{
seq_name=PQgetvalue(result,idx,0);
comment=malloc(strlen(table_name)+strlen("sequence for"+1));
sprintf(comment,"sequence for %s",table_name);
rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1,
- seq_name,
- stmt->set_id,
- comment,-1);
+ seq_name,
+ stmt->set_id,
+ comment,-1);
free(comment);
if(rc < 0 )
{
dstring_terminate(&query);
return rc;
}
-
+
}
}/*for*/
PQclear(result);
dstring_terminate(&query);
return 0;
-
+
}
/**
- * checks to see if slony is installed on the given node.
- *
- * this function will check to see if slony tables exist
- * on the node by querying the information_schema.
- *
- * returns:
- * -1 => could not query information schema
- * 0 => slony not installed
- * 1 => slony is installed.
- */
+* checks to see if slony is installed on the given node.
+*
+* this function will check to see if slony tables exist
+* on the node by querying the information_schema.
+*
+* returns:
+* -1 => could not query information schema
+* 0 => slony not installed
+* 1 => slony is installed.
+*/
static int
slonik_is_slony_installed(SlonikStmt * stmt,
- SlonikAdmInfo * adminfo)
+ SlonikAdmInfo * adminfo)
{
- SlonDString query;
+ SlonDString query;
PGresult * res;
dstring_init(&query);
int rc=-1;
if(res != NULL)
PQclear(res);
+ db_rollback_xact(stmt, adminfo);
dstring_terminate(&query);
return rc;
+
+}
+
+/* slonik_submitEvent(stmt, adminfo, query, script, suppress_wait_for)
+ *
+ * Wraps former requests to generate events, folding together the
+ * logic for whether or not to do auto wait for or suppress this into
+ * one place.
+ */
+
+static int slonik_submitEvent(SlonikStmt * stmt,
+ SlonikAdmInfo * adminfo,
+ SlonDString * query,
+ SlonikScript * script,
+ int suppress_wait_for)
+{
+ int rc;
+ if ( last_event_node >= 0 &&
+ last_event_node != adminfo->no_id
+ && ! suppress_wait_for )
+ {
+ /**
+ * the last event node is not the current event node.
+ * time to wait.
+ */
+
+ if( current_try_level != 0)
+ {
+ printf("%s:%d Error: the event origin can not be changed "
+ "inside of a try block",
+ stmt->stmt_filename, stmt->stmt_lno);
+ return -1;
+ }
+
+ /**
+ * for now we generate a 'fake' Slonik_wait_event structure
+ *
+ */
+ SlonikStmt_wait_event wait_event;
+ wait_event.hdr=*stmt;
+ wait_event.wait_origin=last_event_node;
+ wait_event.wait_on=last_event_node;
+ wait_event.wait_confirmed=adminfo->no_id;
+ wait_event.wait_timeout=0;
+ rc = slonik_wait_event(&wait_event);
+ if(rc < 0)
+ return rc;
+
+ }
+ rc= db_exec_evcommand(stmt,adminfo,query);
+ if(! suppress_wait_for)
+ last_event_node=adminfo->no_id;
+ return rc;
+
+}
+
+/**
+ * slonik_get_last_event_id(stmt, script, event_filter, events)
+ *
+ * query all nodes we have admin conninfo data for and
+ * find the last non SYNC event id generated from that node.
+ *
+ * store this in the SlonikAdmInfo structure so it can later
+ * be used as part of a wait for.
+ *
+ */
+static size_t slonik_get_last_event_id(SlonikStmt *stmt,
+ SlonikScript * script,
+ const char * event_filter,
+ int64 ** events)
+{
+
+ SlonDString query;
+ PGresult * result;
+ char * event_id;
+ SlonikAdmInfo * curAdmInfo=NULL;
+ int node_count=0;
+ int node_idx;
+ int rc;
+
+ dstring_init(&query);
+ slon_mkquery(&query,"select max(ev_seqno) FROM \"_%s\".sl_event"
+ " , \"_%s\".sl_node "
+ " where ev_origin=\"_%s\".getLocalNodeId('_%s') "
+ " AND %s AND sl_node.no_id="
+ " ev_origin"
+ , script->clustername,script->clustername,
+ script->clustername,script->clustername,event_filter);
+ node_count=0;
+ for( curAdmInfo = script->adminfo_list;
+ curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+ {
+ node_count++;
+ }
+ *events = malloc(sizeof(int64)*(node_count+1));
+ node_idx=0;
+ for( curAdmInfo = script->adminfo_list;
+ curAdmInfo != NULL; curAdmInfo = curAdmInfo->next,node_idx++)
+ {
+ SlonikAdmInfo * activeAdmInfo =
+ get_active_adminfo(stmt,curAdmInfo->no_id);
+ if( activeAdmInfo == NULL)
+ {
+ /**
+ * warning?
+ */
+ continue;
+ }
+ rc = slonik_is_slony_installed(stmt,activeAdmInfo);
+ if(rc == 1)
+ {
+ result = db_exec_select(stmt,activeAdmInfo,&query);
+ if(result == NULL || PQntuples(result) != 1 )
+ {
+ printf("error: unable to query event history on node %d\n",
+ curAdmInfo->no_id);
+ if(result != NULL)
+ PQclear(result);
+ return -1;
+ }
+ event_id = PQgetvalue(result,0,0);
+ db_rollback_xact(stmt, activeAdmInfo);
+ if(event_id != NULL)
+ (*events)[node_idx]=strtoll(event_id,NULL,10);
+ else
+ (*events)[node_idx]=-1;
+ PQclear(result);
+ }
+ else {
+ (*events)[node_idx]=-1;
+ }
+
+ }
+
+
+ dstring_terminate(&query);
+ return node_count;
+}
+
+/**
+ * waits until adminfo1 is caught up with config events from
+ * all other nodes.
+ *
+ * adminfo1 - The node that we are waiting to be caught up
+ * stmt - The statement that is currently being executed
+ * ignore_node - allows 1 node to be ignored (don't wait for
+ * adminfo1 to be caught up with that node)
+ * -1 means don't ignore any nodes.
+ */
+static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1,
+ SlonikStmt * stmt,
+ int ignore_node)
+{
+ SlonDString event_list;
+ PGresult * result=NULL;
+ SlonikAdmInfo * curAdmInfo=NULL;
+ int first_event=1;
+ int confirm_count=0;
+ SlonDString is_caughtup_query;
+ SlonDString node_list;
+ int wait_count=0;
+ int node_list_size=0;
+ int sleep_count=0;
+ int64* behind_nodes=NULL;
+ int idx;
+ int cur_array_idx;
+
+ /**
+ * an array that stores a node_id, last_event.
+ * or the last event seen for each admin conninfo
+ * node.
+ */
+ int64 * last_event_array=NULL;
+
+
+ dstring_init(&event_list);
+ dstring_init(&node_list);
+
+ if( current_try_level != 0)
+ {
+ printf("%s:%d Error: waiting operation not allowed inside of "
+ "inside of a try block",
+ stmt->stmt_filename, stmt->stmt_lno);
+ return -1;
+ }
+
+ for( curAdmInfo = stmt->script->adminfo_list;
+ curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+ {
+ node_list_size++;
+ }
+ last_event_array = malloc(node_list_size * sizeof(int64)*2);
+ memset(last_event_array,0,sizeof(node_list_size * sizeof(int64)*2));
+
+ for( curAdmInfo = stmt->script->adminfo_list;
+ curAdmInfo != NULL; curAdmInfo = curAdmInfo->next)
+ {
+ if(curAdmInfo->last_event < 0 ||
+ curAdmInfo->no_id==adminfo1->no_id ||
+ curAdmInfo->no_id == ignore_node )
+ continue;
+
+ char seqno[64];
+ sprintf(seqno,INT64_FORMAT,curAdmInfo->last_event);
+ slon_appendquery(&event_list,
+ "%s (node_list.no_id=%d)"
+ ,first_event ? " " : " OR "
+ ,curAdmInfo->no_id
+ ,seqno
+ );
+ slon_appendquery(&node_list,"%s (%d) ",
+ first_event ? " " : ",",
+ curAdmInfo->no_id);
+ last_event_array[wait_count*2]=curAdmInfo->no_id;
+ last_event_array[wait_count*2+1]=curAdmInfo->last_event;
+ first_event=0;
+ wait_count++;
+ }
+
+
+
+ dstring_init(&is_caughtup_query);
+ /**
+ * I need a row for the case where a node is not in sl_confirm
+ * and the node is disabled or deleted.
+ */
+ slon_mkquery(&is_caughtup_query,
+ "select node_list.no_id,max(con_seqno),no_active FROM "
+ " (VALUES %s) as node_list (no_id) LEFT JOIN "
+ "\"_%s\".sl_confirm ON(sl_confirm.con_origin=node_list.no_id"
+ " AND sl_confirm.con_received=%d)"
+ " LEFT JOIN \"_%s\".sl_node ON (node_list.no_id=sl_node.no_id) "
+ "GROUP BY node_list.no_id,no_active"
+ ,dstring_data(&node_list)
+ ,stmt->script->clustername
+ ,adminfo1->no_id
+ ,stmt->script->clustername);
+
+ while(confirm_count != wait_count)
+ {
+ result = db_exec_select(stmt,
+ adminfo1,&is_caughtup_query);
+ if (result == NULL)
+ {
+ /**
+ * error
+ */
+ }
+ confirm_count = PQntuples(result);
+
+ db_rollback_xact(stmt, adminfo1);
+
+ /**
+ * find nodes that are missing.
+ *
+ */
+ behind_nodes=malloc(node_list_size * sizeof(int64));
+ memset(behind_nodes,0,node_list_size*sizeof(int64));
+ confirm_count=0;
+ for(idx = 0; idx < PQntuples(result); idx++)
+ {
+ char * n_id_c = PQgetvalue(result,idx,0);
+ int n_id = atoi(n_id_c);
+ char * seqno_c = PQgetvalue(result,idx,1);
+ int64 seqno=strtoll(seqno_c,NULL,10);
+ char * node_active = PQgetvalue(result,idx,2);
+ for(cur_array_idx=0;
+ cur_array_idx < wait_count; cur_array_idx++)
+ {
+ if(last_event_array[cur_array_idx*2]==n_id)
+ {
+ /*
+ * found.
+ */
+ if(node_active == NULL || *node_active=='f')
+ {
+ /**
+ * if node_active is null we assume the
+ * node has been deleted since it
+ * has no entry in sl_node
+ */
+ behind_nodes[cur_array_idx]=-1;
+ confirm_count++;
+ }
+ else if(last_event_array[cur_array_idx*2+1]>seqno)
+ {
+ behind_nodes[cur_array_idx]=seqno;
+ }
+ else
+ {
+ behind_nodes[cur_array_idx]=-1;
+ confirm_count++;
+ }
+
+ }
+
+ }
+ }/*for .. PQntuples*/
+ if(confirm_count < wait_count )
+ {
+ sleep_count++;
+ if(sleep_count % 10 == 0)
+ {
+ /**
+ * any elements in caught_up_nodes with a value 0
+ * means that the cooresponding node id in
+ * last_event_array is not showing up in the
+ * query result.
+ */
+ SlonDString outstanding;
+ dstring_init(&outstanding);
+ first_event=1;
+ for(cur_array_idx=0; cur_array_idx < wait_count;
+ cur_array_idx++)
+ {
+ if(behind_nodes[cur_array_idx] >= 0)
+ {
+ char tmpbuf[96];
+ sprintf(tmpbuf, "(" INT64_FORMAT "," INT64_FORMAT
+ ") only at (" INT64_FORMAT "," INT64_FORMAT
+ ")"
+ ,
+ last_event_array[cur_array_idx*2]
+ ,last_event_array[cur_array_idx*2+1],
+ last_event_array[cur_array_idx*2],
+ behind_nodes[cur_array_idx] );
+ slon_appendquery(&outstanding,"%s %s"
+ , first_event ? "" : ",",tmpbuf);
+ first_event=0;
+ }
+
+ }
+ printf("waiting for events %s to be confirmed on node %d\n",
+ dstring_data(&outstanding),adminfo1->no_id);
+ dstring_terminate(&outstanding);
+
+ }/* every 10 iterations */
+ sleep(1);
+ }
+ free(behind_nodes);
+
+ }/*while*/
+ if(result != NULL)
+ PQclear(result);
+ dstring_terminate(&event_list);
+ dstring_terminate(&is_caughtup_query);
+ free(last_event_array);
+ return 0;
+
+}
-}
/*
* Local Variables: