don't process a set if sl_setsync has the initial :1:1: snapshot id
authorSteve Singer <ssinger@ca.afilias.info>
Thu, 18 Jul 2013 18:03:08 +0000 (14:03 -0400)
committerSteve Singer <ssinger@ca.afilias.info>
Thu, 18 Jul 2013 18:03:08 +0000 (14:03 -0400)
this snapshot id is reserved for setting up the value indicates that
an even (ie the ACCEPT_SET) is still pending.

Also be explicit about the origin this remote worker is querying
sl_setsync for.

src/slon/remote_worker.c

index 3936b133dad03f87b95fc864a3ee5aa675e04d60..5411656c92897ce5e0d204ab34f222010eed8ae6 100644 (file)
@@ -3923,7 +3923,7 @@ sync_event(SlonNode * node, SlonConn * local_conn,
                                slon_appendquery(&query, "%s%d",
                                                                 (pset->prev == NULL) ? "" : ",",
                                                                 pset->set_id);
-                       slon_appendquery(&query, "); ");
+                       slon_appendquery(&query, ") and SSY.ssy_origin=%d; ",node->no_id);
 
                        start_monitored_event(&pm);
                        res1 = PQexec(local_dbconn, dstring_data(&query));
@@ -3984,6 +3984,19 @@ sync_event(SlonNode * node, SlonConn * local_conn,
                                char       *ssy_action_list = PQgetvalue(res1, tupno1, 4);
                                int64           ssy_seqno;
 
+                               if (strcmp(ssy_snapshot,"1:1:")==0 &&
+                                       ssy_seqno==0)
+                               {
+                                       /**
+                                        * we don't yet have a row in setsync with real data
+                                        * this means the ACCEPT_SET has not yet come in.
+                                        * ignore this set.
+                                        */
+                                       slon_log(SLON_WARN, "remoteWorkerThread_%d: skipping set %d ACCEPT_SET not yet received\n",
+                                                        node->no_id, sub_set);
+                                       continue;
+                               }
+
                                slon_scanint64(PQgetvalue(res1, tupno1, 1), &ssy_seqno);
                                if (min_ssy_seqno < 0 || ssy_seqno < min_ssy_seqno)
                                        min_ssy_seqno = ssy_seqno;