## Reset the numbers to track total bucardo_delta matches
undef %deltacount;
$deltacount{all} = 0;
+ $deltacount{table} = {};
## Reset our counts of total inserts, deletes, truncates, and conflicts
undef %dmlcount;
## so this tracks the largest number returned
my $maxcount = 0;
+ ## Use the bucardo_delta_check function on each database, which gives us
+ ## a quick summary of whether each table has any active delta rows
+ ## This is a big win on slow networks!
+ if ($config{quick_delta_check}) {
+ for my $dbname (@dbs_source) {
+
+ $x = $sync->{db}{$dbname};
+
+ $SQL = 'SELECT * FROM bucardo.bucardo_delta_check(?,?)';
+ $sth = $x->{dbh}->prepare($SQL);
+ $sth->execute($syncname, $x->{DBGROUPNAME});
+ $x->{deltazero} = 0;
+ for my $row (@{$sth->fetchall_arrayref()}) {
+ my ($number,$tablename) = split /,/ => $row->[0], 2;
+ $x->{deltaquick}{$tablename} = $number;
+ if ($number) {
+ $x->{deltatotal}++;
+ $deltacount{table}{$tablename}++;
+ }
+ else {
+ $x->{deltazero}++;
+ }
+ }
+ $self->glog("Tables with deltas on $dbname: $x->{deltatotal} Without: $x->{deltazero}", LOG_VERBOSE);
+
+ } ## end quick delta check for each database
+ } ## end quick delta check
+
## Grab the delta information for each table from each source database
## While we could do this as per-db/per-goat instead of per-goat/per-db,
## we want to take advantage of the async requests as much as possible,
## We still need these, as we want to respect changes made after the truncation!
next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname;
+ $x = $sync->{db}{$dbname};
+
+ ## No need to grab information if we know there are no deltas in this database,
+ ## or for just this database/table combination
+ if ($config{quick_delta_check}) {
+ next if ! $x->{deltatotal} or ! $x->{deltaquick}{"$S.$T"};
+ }
+
## Gets all relevant rows from bucardo_deltas: runs asynchronously
$sth{getdelta}{$dbname}{$g}->execute();
$x = $sync->{db}{$dbname};
+ ## If we skipped this, set the deltacount to zero and move on
+ if ($config{quick_delta_check}) {
+ if (! $x->{deltatotal} or ! $x->{deltaquick}{"$S.$T"}) {
+ $self->glog("Skipping $S.$T: no delta rows", LOG_DEBUG);
+ $deltacount{dbtable}{$dbname}{$S}{$T} = 0;
+ next;
+ }
+ }
+
## pg_result tells us to wait for the query to finish
$count = $x->{dbh}->pg_result();
} ## end each goat
+ ## Get sizing for the next printout
+ my $maxsize = 10;
+ my $maxcount2 = 1;
+
+ for my $g (@$goatlist) {
+ next if $g->{reltype} ne 'table';
+ ($S,$T) = ($g->{safeschema},$g->{safetable});
+ for my $dbname (keys %{ $sync->{db} }) {
+ $x = $sync->{db}{$dbname};
+ next if ! $deltacount{dbtable}{$dbname}{$S}{$T};
+ $maxsize = length " $dbname.$S.$T" if length " $dbname.$S.$T" > $maxsize;
+ $maxcount2 = length $count if length $count > $maxcount2;
+ }
+ }
+
## Pretty print the number of rows per db/table
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
$count = $self->{insertcount}{dbname}{$S}{$T};
$self->glog((sprintf 'Rows inserted to bucardo_%s for %-*s: %*d',
$x->{trackstage} ? 'stage' : 'track',
- $self->{maxdbstname},
+ $maxsize,
"$dbname.$S.$T",
- length $maxcount,
+ length $maxcount2,
$count),
LOG_DEBUG);
}
## Put a note in the logs for how long this took
my $synctime = sprintf '%.2f', tv_interval($kid_start_time);
- $self->glog((sprintf 'Total time for sync "%s" (%s rows): %s%s',
+ $self->glog((sprintf 'Total time for sync "%s" (%s %s, %s %s): %s%s',
$syncname,
$dmlcount{inserts},
+ (1==$dmlcount{inserts} ? 'row' : 'rows'),
+ scalar keys %{$deltacount{table}},
+ (1== keys %{$deltacount{table}} ? 'table' : 'tables'),
pretty_time($synctime),
$synctime < 120 ? '' : " ($synctime seconds)",),
## We don't want to output a "finished" if no changes made unless verbose
my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */";
$loop++;
- ## Kick off the copy on the source
+ ## Kick off the copy on the source
+ $self->glog(qq{${pre}Copying from $fromname.$S.$T}, LOG_VERBOSE);
my $srccmd = sprintf '%s%sCOPY (%s FROM %s.%s%s) TO STDOUT%s',
$pre,
$self->{sqlprefix},
$fromdbh->do($srccmd);
my $buffer = '';
- $self->glog(qq{${pre}Copying from $fromname.$S.$T}, LOG_VERBOSE);
## Loop through all changed rows on the source, and push to the target(s)
my $multirow = 0;
host_safety_check||Regex to make sure we don't accidentally run where we should not
isolation_level|repeatable read|Default isolation level: can be serializable or repeatable read
piddir|/var/run/bucardo|Directory holding Bucardo PID files
+quick_delta_check|1|Whether to do a quick scan of delta activity
reason_file|bucardo.restart.reason.txt|File to hold reasons for stopping and starting
semaphore_table|bucardo_status|Table to let apps know a sync is ongoing
statement_chunk_size|8000|How many primary keys to shove into a single statement
$run_sql->($SQL,$dbh);
}
+ if (! exists $bfunctionoid{'bucardo_delta_names_helper'}) {
+ $SQL = q{
+CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_names_helper()
+RETURNS TRIGGER
+LANGUAGE plpgsql
+IMMUTABLE
+AS $bcc$
+BEGIN
+IF NEW.deltaname IS NULL THEN
+ NEW.deltaname = 'delta_' || bucardo.bucardo_tablename_maker(NEW.tablename);
+END IF;
+IF NEW.trackname IS NULL THEN
+ NEW.trackname = 'track_' || bucardo.bucardo_tablename_maker(NEW.tablename);
+END IF;
+RETURN NEW;
+END;
+$bcc$;
+ };
+ $run_sql->($SQL,$dbh);
+ }
+
+ ## Create the bucardo_delta_names table as needed
+ if (! exists $btableoid{'bucardo_delta_names'}) {
+ $SQL = qq{
+ CREATE TABLE bucardo.bucardo_delta_names (
+ sync TEXT,
+ tablename TEXT,
+ deltaname TEXT,
+ trackname TEXT,
+ cdate TIMESTAMPTZ NOT NULL DEFAULT now()
+ );
+ };
+ $run_sql->($SQL,$dbh);
+
+ $SQL = qq{CREATE UNIQUE INDEX bucardo_delta_names_unique ON bucardo.bucardo_delta_names (sync,tablename)};
+ $run_sql->($SQL,$dbh);
+
+ $SQL = qq{
+CREATE TRIGGER bucardo_delta_namemaker
+BEFORE INSERT OR UPDATE
+ON bucardo.bucardo_delta_names
+FOR EACH ROW EXECUTE PROCEDURE bucardo.bucardo_delta_names_helper();
+ };
+ $run_sql->($SQL,$dbh);
+ }
+
+ ## Function to do a quick check of all deltas for a given sync
+ if (! exists $bfunctionoid{'bucardo_delta_check'}) {
+ $SQL = q{
+CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_check(TEXT,TEXT)
+RETURNS SETOF TEXT
+LANGUAGE plpgsql
+AS $bcc$
+DECLARE
+ myst TEXT;
+ myrec RECORD;
+ mycount INT;
+BEGIN
+ FOR myrec IN
+ SELECT * FROM bucardo_delta_names
+ WHERE sync = $1
+ ORDER BY tablename
+ LOOP
+
+ RAISE DEBUG 'GOT % and %', myrec.deltaname, myrec.tablename;
+
+ myst = $$
+ SELECT 1
+ FROM bucardo.$$ || myrec.deltaname || $$ d
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM bucardo.$$ || myrec.trackname || $$ t
+ WHERE d.txntime = t.txntime
+ AND (t.target = '$$ || $2 || $$'::text OR t.target ~ '^T:')
+ ) LIMIT 1$$;
+ EXECUTE myst;
+ GET DIAGNOSTICS mycount = ROW_COUNT;
+
+ IF mycount>=1 THEN
+ RETURN NEXT '1,' || myrec.tablename;
+ ELSE
+ RETURN NEXT '0,' || myrec.tablename;
+ END IF;
+
+ END LOOP;
+ RETURN;
+END;
+$bcc$;
+ };
+ $run_sql->($SQL,$dbh);
+ }
+
## Create the bucardo_delta_targets table as needed
if (! exists $btableoid{'bucardo_delta_targets'}) {
$SQL = qq{
## Build another list of information for each table
## This saves us multiple lookups
$SQL = q{SELECT n.nspname,c.relname,relkind,c.oid FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) WHERE };
- my @args;
+ my $SQL2 = q{INSERT INTO bucardo.bucardo_delta_names VALUES };
+
+ my (@args,@args2);
for my $schema (sort keys %goat) {
for my $table (sort keys %{$goat{$schema}}) {
$SQL .= '(nspname = ? AND relname = ?) OR ';
push @args => $remoteschema, $remotetable;
+ push @args2 => $syncname, "$remoteschema.$remotetable";
+
} ## end each table
} ## end each schema
my (%goatoid,@tableoids);
for my $row (@{$sth->fetchall_arrayref()}) {
$goatoid{"$row->[0].$row->[1]"} = [$row->[2],$row->[3]];
- push @tableoids => $row->[3] if $row->[2] eq 'r';
+ push @tableoids => $row->[3] if $row->[2] eq 'r';
}
+ ## Populate the bucardo_delta_names table for this sync
+ $SQL = 'DELETE FROM bucardo.bucardo_delta_names WHERE sync = ?';
+ $sth = $dbh->prepare($SQL);
+ $sth->execute($syncname);
+ $SQL = $SQL2;
+ my $number = @args2 / 2;
+ $SQL .= '(?,?),' x $number;
+ chop $SQL;
+ $sth = $dbh->prepare($SQL);
+ $sth->execute(@args2);
+
## Get column information about all of our tables
$SQL = q{
SELECT attrelid, attname, quote_ident(attname) AS qattname, atttypid, format_type(atttypid, atttypmod) AS ftype,
$dbh->do('RESET search_path');
- my $colinfo = $columninfo->{$oid};
+ my $colinfo = $columninfo->{$oid};
## Allow for 'dead' columns in the attnum ordering
## Turn the old keys (attname) into new keys (number)
$x=1;