Introduce the concept of a "quick delta" check.
authorGreg Sabino Mullane <greg@endpoint.com>
Fri, 8 Nov 2013 04:53:09 +0000 (23:53 -0500)
committerGreg Sabino Mullane <greg@endpoint.com>
Fri, 8 Nov 2013 04:53:09 +0000 (23:53 -0500)
By default, this is turned on, but is left as a config variable in case this turns out to be a very bad idea. :)

This is a huge win on slow networks. Rather than getting a distinct pk count for every table in the sync, we ask the remote
database for a list of all tables and whether or not they have any changes. If they don't, we do not have to bother
with the normal "select distinct" query. This reduces the number of database calls drastically for syncs with a
large number of tables. In addition, the delta quick queries inside the function are extraorfinarily fast compared
to their distinct counterparts, as they do a select 1 ... limit 1.

All of this is accomplished by a new table on each source database called bucardo_delta_names, which stores a
list of all tables for a sync, along with their delta and track table names. This table's information is rewritten
on Bucardo startup each time. The new function uses this table to dynamically generate the quick delta queries
for each table, and then returns it in a simple text format.

This is especially noticeable on slow networks with a large number of tables in a sync. Testing in the case
that drove all this improved the sync run speed from 42 seconds to less than a second (for no rows - the
number of rows is a constant cost limited by how fast COPY goes, in most cases).

Bucardo.pm
bucardo.schema

index c9325cbd11ad4c0400c324b856459f59ca05446f..e7bf4618791e5dc900d1922401bb956e4e47552b 100644 (file)
@@ -2846,6 +2846,7 @@ sub start_kid {
         ## Reset the numbers to track total bucardo_delta matches
         undef %deltacount;
         $deltacount{all} = 0;
+        $deltacount{table} = {};
 
         ## Reset our counts of total inserts, deletes, truncates, and conflicts
         undef %dmlcount;
@@ -3124,6 +3125,34 @@ sub start_kid {
             ## so this tracks the largest number returned
             my $maxcount = 0;
 
+            ## Use the bucardo_delta_check function on each database, which gives us
+            ## a quick summary of whether each table has any active delta rows
+            ## This is a big win on slow networks!
+            if ($config{quick_delta_check}) {
+                for my $dbname (@dbs_source) {
+
+                    $x = $sync->{db}{$dbname};
+
+                    $SQL = 'SELECT * FROM bucardo.bucardo_delta_check(?,?)';
+                    $sth = $x->{dbh}->prepare($SQL);
+                    $sth->execute($syncname, $x->{DBGROUPNAME});
+                    $x->{deltazero} = 0;
+                    for my $row (@{$sth->fetchall_arrayref()}) {
+                        my ($number,$tablename) = split /,/ => $row->[0], 2;
+                        $x->{deltaquick}{$tablename} = $number;
+                        if ($number) {
+                            $x->{deltatotal}++;
+                            $deltacount{table}{$tablename}++;
+                        }
+                        else {
+                            $x->{deltazero}++;
+                        }
+                    }
+                    $self->glog("Tables with deltas on $dbname: $x->{deltatotal} Without: $x->{deltazero}", LOG_VERBOSE);
+
+                } ## end quick delta check for each database
+            } ## end quick delta check
+
             ## Grab the delta information for each table from each source database
             ## While we could do this as per-db/per-goat instead of per-goat/per-db,
             ## we want to take advantage of the async requests as much as possible,
@@ -3144,6 +3173,14 @@ sub start_kid {
                     ## We still need these, as we want to respect changes made after the truncation!
                     next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname;
 
+                    $x = $sync->{db}{$dbname};
+
+                    ## No need to grab information if we know there are no deltas in this database,
+                    ## or for just this database/table combination
+                    if ($config{quick_delta_check}) {
+                        next if ! $x->{deltatotal} or ! $x->{deltaquick}{"$S.$T"};
+                    }
+
                     ## Gets all relevant rows from bucardo_deltas: runs asynchronously
                     $sth{getdelta}{$dbname}{$g}->execute();
 
@@ -3158,6 +3195,15 @@ sub start_kid {
 
                     $x = $sync->{db}{$dbname};
 
+                    ## If we skipped this, set the deltacount to zero and move on
+                    if ($config{quick_delta_check}) {
+                        if (! $x->{deltatotal} or ! $x->{deltaquick}{"$S.$T"}) {
+                            $self->glog("Skipping $S.$T: no delta rows", LOG_DEBUG);
+                            $deltacount{dbtable}{$dbname}{$S}{$T} = 0;
+                            next;
+                        }
+                    }
+
                     ## pg_result tells us to wait for the query to finish
                     $count = $x->{dbh}->pg_result();
 
@@ -4162,6 +4208,21 @@ sub start_kid {
 
             } ## end each goat
 
+            ## Get sizing for the next printout
+            my $maxsize = 10;
+            my $maxcount2 = 1;
+
+            for my $g (@$goatlist) {
+                next if $g->{reltype} ne 'table';
+                ($S,$T) = ($g->{safeschema},$g->{safetable});
+                for my $dbname (keys %{ $sync->{db} }) {
+                    $x = $sync->{db}{$dbname};
+                    next if ! $deltacount{dbtable}{$dbname}{$S}{$T};
+                    $maxsize = length " $dbname.$S.$T" if length " $dbname.$S.$T" > $maxsize;
+                    $maxcount2 = length $count if length $count > $maxcount2;
+                }
+            }
+
             ## Pretty print the number of rows per db/table
             for my $g (@$goatlist) {
                 next if $g->{reltype} ne 'table';
@@ -4173,9 +4234,9 @@ sub start_kid {
                         $count = $self->{insertcount}{dbname}{$S}{$T};
                         $self->glog((sprintf 'Rows inserted to bucardo_%s for %-*s: %*d',
                              $x->{trackstage} ? 'stage' : 'track',
-                             $self->{maxdbstname},
+                             $maxsize,
                              "$dbname.$S.$T",
-                             length $maxcount,
+                             length $maxcount2,
                              $count),
                              LOG_DEBUG);
                     }
@@ -4639,9 +4700,12 @@ sub start_kid {
 
         ## Put a note in the logs for how long this took
         my $synctime = sprintf '%.2f', tv_interval($kid_start_time);
-        $self->glog((sprintf 'Total time for sync "%s" (%s rows): %s%s',
+        $self->glog((sprintf 'Total time for sync "%s" (%s %s, %s %s): %s%s',
                     $syncname,
                     $dmlcount{inserts},
+                    (1==$dmlcount{inserts} ? 'row' : 'rows'),
+                    scalar keys %{$deltacount{table}},
+                    (1== keys %{$deltacount{table}} ? 'table' : 'tables'),
                     pretty_time($synctime),
                     $synctime < 120 ? '' : " ($synctime seconds)",),
                     ## We don't want to output a "finished" if no changes made unless verbose
@@ -8815,7 +8879,8 @@ sub push_rows {
             my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */";
             $loop++;
 
-            ## Kick off the copy on the source 
+            ## Kick off the copy on the source
+            $self->glog(qq{${pre}Copying from $fromname.$S.$T}, LOG_VERBOSE);
             my $srccmd = sprintf '%s%sCOPY (%s FROM %s.%s%s) TO STDOUT%s',
                 $pre,
                 $self->{sqlprefix},
@@ -8827,7 +8892,6 @@ sub push_rows {
             $fromdbh->do($srccmd);
 
             my $buffer = '';
-            $self->glog(qq{${pre}Copying from $fromname.$S.$T}, LOG_VERBOSE);
 
             ## Loop through all changed rows on the source, and push to the target(s)
             my $multirow = 0;
index 4653446708fbc8b138727615110719d52c39137d..b2af86e34b2311b690f147fbb8eaefb331dc8892 100644 (file)
@@ -168,6 +168,7 @@ flatfile_dir|.|Directory to store the flatfile output inside of
 host_safety_check||Regex to make sure we don't accidentally run where we should not
 isolation_level|repeatable read|Default isolation level: can be serializable or repeatable read
 piddir|/var/run/bucardo|Directory holding Bucardo PID files
+quick_delta_check|1|Whether to do a quick scan of delta activity
 reason_file|bucardo.restart.reason.txt|File to hold reasons for stopping and starting
 semaphore_table|bucardo_status|Table to let apps know a sync is ongoing
 statement_chunk_size|8000|How many primary keys to shove into a single statement
@@ -1469,6 +1470,98 @@ END;
           $run_sql->($SQL,$dbh);
         }
 
+        if (! exists $bfunctionoid{'bucardo_delta_names_helper'}) {
+            $SQL = q{
+CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_names_helper()
+RETURNS TRIGGER
+LANGUAGE plpgsql
+IMMUTABLE
+AS $bcc$
+BEGIN
+IF NEW.deltaname IS NULL THEN
+  NEW.deltaname = 'delta_' || bucardo.bucardo_tablename_maker(NEW.tablename);
+END IF;
+IF NEW.trackname IS NULL THEN
+  NEW.trackname = 'track_' || bucardo.bucardo_tablename_maker(NEW.tablename);
+END IF;
+RETURN NEW;
+END;
+$bcc$;
+            };
+            $run_sql->($SQL,$dbh);
+        }
+
+        ## Create the bucardo_delta_names table as needed
+        if (! exists $btableoid{'bucardo_delta_names'}) {
+            $SQL = qq{
+                    CREATE TABLE bucardo.bucardo_delta_names (
+                        sync TEXT,
+                        tablename TEXT,
+                        deltaname TEXT,
+                        trackname TEXT,
+                        cdate TIMESTAMPTZ NOT NULL DEFAULT now()
+                    );
+                };
+            $run_sql->($SQL,$dbh);
+
+            $SQL = qq{CREATE UNIQUE INDEX bucardo_delta_names_unique ON bucardo.bucardo_delta_names (sync,tablename)};
+            $run_sql->($SQL,$dbh);
+
+            $SQL = qq{
+CREATE TRIGGER bucardo_delta_namemaker
+BEFORE INSERT OR UPDATE
+ON bucardo.bucardo_delta_names
+FOR EACH ROW EXECUTE PROCEDURE bucardo.bucardo_delta_names_helper();
+            };
+            $run_sql->($SQL,$dbh);
+        }
+
+        ## Function to do a quick check of all deltas for a given sync
+        if (! exists $bfunctionoid{'bucardo_delta_check'}) {
+            $SQL = q{
+CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_check(TEXT,TEXT) 
+RETURNS SETOF TEXT
+LANGUAGE plpgsql
+AS $bcc$
+DECLARE
+ myst TEXT;
+ myrec RECORD;
+ mycount INT;
+BEGIN
+  FOR myrec IN
+    SELECT * FROM bucardo_delta_names
+      WHERE sync = $1 
+      ORDER BY tablename
+  LOOP
+
+    RAISE DEBUG 'GOT % and %', myrec.deltaname, myrec.tablename;
+
+    myst = $$
+      SELECT  1
+      FROM    bucardo.$$ || myrec.deltaname || $$ d
+      WHERE   NOT EXISTS (
+        SELECT 1
+        FROM   bucardo.$$ || myrec.trackname || $$ t
+        WHERE  d.txntime = t.txntime
+        AND    (t.target = '$$ || $2 || $$'::text OR t.target ~ '^T:')
+      ) LIMIT 1$$;
+    EXECUTE myst;
+    GET DIAGNOSTICS mycount = ROW_COUNT;
+
+    IF mycount>=1 THEN
+      RETURN NEXT '1,' || myrec.tablename;
+    ELSE
+      RETURN NEXT '0,' || myrec.tablename;
+    END IF;
+
+  END LOOP;
+  RETURN;
+END;
+$bcc$;
+            };
+            $run_sql->($SQL,$dbh);
+        }
+
         ## Create the bucardo_delta_targets table as needed
         if (! exists $btableoid{'bucardo_delta_targets'}) {
             $SQL = qq{
@@ -1937,7 +2030,9 @@ SELECT 'Fixme'::TEXT;
     ## Build another list of information for each table
     ## This saves us multiple lookups
     $SQL = q{SELECT n.nspname,c.relname,relkind,c.oid FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) WHERE };
-    my @args;
+    my $SQL2 = q{INSERT INTO bucardo.bucardo_delta_names VALUES };
+
+    my (@args,@args2);
 
     for my $schema (sort keys %goat) {
         for my $table (sort keys %{$goat{$schema}}) {
@@ -1967,6 +2062,8 @@ SELECT 'Fixme'::TEXT;
 
             $SQL .= '(nspname = ? AND relname = ?) OR ';
             push @args => $remoteschema, $remotetable;
+            push @args2 => $syncname, "$remoteschema.$remotetable";
+
         } ## end each table
 
     } ## end each schema
@@ -1977,9 +2074,20 @@ SELECT 'Fixme'::TEXT;
     my (%goatoid,@tableoids);
     for my $row (@{$sth->fetchall_arrayref()}) {
         $goatoid{"$row->[0].$row->[1]"} = [$row->[2],$row->[3]];
-       push @tableoids => $row->[3] if $row->[2] eq 'r';
+        push @tableoids => $row->[3] if $row->[2] eq 'r';
     }
 
+    ## Populate the bucardo_delta_names table for this sync
+    $SQL = 'DELETE FROM bucardo.bucardo_delta_names WHERE sync = ?';
+    $sth = $dbh->prepare($SQL);
+    $sth->execute($syncname);
+    $SQL = $SQL2;
+    my $number = @args2 / 2;
+    $SQL .= '(?,?),' x $number;
+    chop $SQL;
+    $sth = $dbh->prepare($SQL);
+    $sth->execute(@args2);
+   
     ## Get column information about all of our tables
     $SQL = q{
             SELECT   attrelid, attname, quote_ident(attname) AS qattname, atttypid, format_type(atttypid, atttypmod) AS ftype,
@@ -2074,7 +2182,7 @@ SELECT 'Fixme'::TEXT;
 
          $dbh->do('RESET search_path');
 
-        my $colinfo = $columninfo->{$oid};
+         my $colinfo = $columninfo->{$oid};
             ## Allow for 'dead' columns in the attnum ordering
             ## Turn the old keys (attname) into new keys (number)
             $x=1;