Establish a maximum number of events (sel_max_events) to be processed
authorChristopher Browne <cbbrowne@ca.afilias.info>
Mon, 26 May 2008 20:14:27 +0000 (20:14 +0000)
committerChristopher Browne <cbbrowne@ca.afilias.info>
Mon, 26 May 2008 20:14:27 +0000 (20:14 +0000)
in an iteration in remote_listen.c's pull of events from a remote node.

This means that if a node is way, way, way behind (e.g. - tens or hundreds
of thousands of events behind), it won't try to get up to date in one
swell foop.

src/slon/remote_listen.c

index 37d2af508c2b349f505b50bab551df1183a58c18..2c70e1a68e34338b41ca8f46c8701177e88cb011 100644 (file)
@@ -7,7 +7,7 @@
  *     Copyright (c) 2003-2004, PostgreSQL Global Development Group
  *     Author: Jan Wieck, Afilias USA INC.
  *
- *     $Id: remote_listen.c,v 1.43 2008-04-23 22:29:12 cbbrowne Exp $
+ *     $Id: remote_listen.c,v 1.44 2008-05-26 20:14:27 cbbrowne Exp $
  * ----------------------------------------------------------------------
  */
 
@@ -65,6 +65,8 @@ static int                    poll_sleep;
 extern char *lag_interval;
 int remote_listen_timeout;
 
+static int sel_max_events = 0;
+
 /* ----------
  * slon_remoteListenThread
  *
@@ -697,7 +699,11 @@ remoteListen_receive_events(SlonNode * node, SlonConn * conn,
        {
                slon_appendquery(&query, ")");
        }
-       slon_appendquery(&query, " order by e.ev_origin, e.ev_seqno");
+       /* Limit the result set size to:
+            sync_group_maxsize * 2, if it's set
+                       100, if sync_group_maxsize isn't set */
+       slon_appendquery(&query, " order by e.ev_origin, e.ev_seqno limit %d",
+                                        (sync_group_maxsize>0)? sync_group_maxsize * 2 : 100);
 
        rtcfg_unlock();
 
@@ -754,6 +760,13 @@ remoteListen_receive_events(SlonNode * node, SlonConn * conn,
         * Add all events found to the remote worker message queue.
         */
        ntuples = PQntuples(res);
+
+       /* If we drew in the maximum number of events */
+       if (ntuples == ((sync_group_maxsize>0)? sync_group_maxsize * 2 : 100))
+                       sel_max_events++;   /* Add to the count... */
+       else
+                       sel_max_events=0;   /* reset the count */
+
        for (tupno = 0; tupno < ntuples; tupno++)
        {
                int                     ev_origin;
@@ -785,24 +798,25 @@ remoteListen_receive_events(SlonNode * node, SlonConn * conn,
        }
 
        if (ntuples > 0) {
-               poll_sleep = 0;
-               poll_state = SLON_POLLSTATE_POLL;
+                       if ((sel_max_events > 2) && (sync_group_maxsize > 100)) {
+                                       poll_state = SLON_POLLSTATE_LISTEN;
+                                       slon_log(SLON_INFO, "remoteListenThread_%d: drew maximum # of events for %d iterations\n",
+                                                        node->no_id, sel_max_events);
+                                       slon_log(SLON_INFO, "remoteListenThread_%d: sleep %ds, return to LISTEN mode\n",
+                                                        node->no_id, 10+sel_max_events);
+                                       sched_msleep(node, 10000 + (1000 * sel_max_events));
+                       } else {
+                                       poll_sleep = 0;
+                                       poll_state = SLON_POLLSTATE_POLL;
+                       }
        } else {
-               poll_sleep = poll_sleep * 2 + sync_interval;
-               if (poll_sleep > sync_interval_timeout) {
-                       poll_sleep = sync_interval_timeout;
-                       poll_state = SLON_POLLSTATE_LISTEN;
-               }
+                       poll_sleep = poll_sleep * 2 + sync_interval;
+                       if (poll_sleep > sync_interval_timeout) {
+                                       poll_sleep = sync_interval_timeout;
+                                       poll_state = SLON_POLLSTATE_LISTEN;
+                       }
        }
        PQclear(res);
-
+       last_event_sel = ntuples;
        return 0;
 }
-
-/*
- * Local Variables:
- *     tab-width: 4
- *     c-indent-level: 4
- *     c-basic-offset: 4
- * End:
- */