From: Greg Sabino Mullane Date: Fri, 4 Jul 2014 17:39:06 +0000 (-0400) Subject: More overhaul of the conflict code. X-Git-Tag: 5.1.0~12 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=09389c289d4b5e3642049fd654c6a5787517a387;p=bucardo.git More overhaul of the conflict code. Use bucardo_latest and bucardo_latest_all_table to generate a list of winning databases. We then fall through and populate the conflict hash. Add a shared area for persistence to help custom code. --- diff --git a/Bucardo.pm b/Bucardo.pm index 034b76896..75d71dabd 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -3813,12 +3813,12 @@ sub start_kid { ## Only need to turn off triggers and rules once via pg_class my $disabled_via_pg_class = 0; - ## The overall winning database for conflicts - delete $self->{conflictwinner}; + ## Reset all of our non-persistent conflict information + $self->{conflictinfo} = {}; ## Custom conflict handler may have told us to always use the same winner - if (exists $self->{conflictwinneralways}) { - $self->{conflictwinner} = $self->{conflictwinneralways}; + if (exists $self->{conflictinfo}{winneralways}) { + $self->{conflictinfo}{winner} = $self->{conflictinfo}{winneralways}; } ## Do each goat in turn @@ -3955,83 +3955,68 @@ sub start_kid { $self->glog("Conflicts for $S.$T: $count", LOG_NORMAL); ## If we have a custom conflict handler for this goat, invoke it - if ($g->{code_conflict} and ! defined $self->{conflictwinner}) { + if ($g->{code_conflict} and ! exists $self->{conflictinfo}{winner}) { $self->glog('Starting code_conflict', LOG_VERBOSE); - my $conflict_resolved = 0; - - ## We pass it %conflict, and assume it will modify all the values therein + ## Give each piece of code a chance to resolve the conflict + ## We walk through each unless they tell is otherwise for my $code (@{ $g->{code_conflict} }) { - my $c = $code->{name}; + + ## The all important conflict hash, which the caller may change $code->{info}{conflicts} = \%conflict; - $code->{info}{safeschemaname} = $S; - $code->{info}{safetablename} = $T; + + ## Provide the current schema and table name + $code->{info}{schemaname} = $S; + $code->{info}{tablename} = $T; + + ## Provide detailed information on all databases, but elide the dbh for my $dbname (@dbs_connectable) { $x = $sync->{db}{$dbname}; ## Make a shallow copy, excluding the actual dbh handle for my $name (keys %$x) { - ## We provide DBIx::Safe versions elsehwere + ## We provide DBIx::Safe versions elsewhere next if $name eq 'dbh'; $code->{info}{dbinfo}{$dbname}{$name} = $x->{$name}; } } + my $cname = $code->{name}; + + ## Run the conflict handler customcode, get the result my $result = $self->run_kid_custom_code($sync, $code); - $self->glog("Result of custom code is $result", LOG_DEBUG); + $self->glog("Result of custom code $cname is $result", LOG_DEBUG); - ## We may want to simply skip this code altogether + ## Code has asked us to do nothing next if 'skip' eq $result; - ## Allow it to set permanent winner for this round + ## Allow it to set permanent winners for all rows this round if ($result =~ /winner: (.+)/o) { - my $winner = $1; - $self->glog("Custom conflict says winner should be: $winner"); - if (! exists $deltabin{$winner}) { - $self->pause_and_exit(qq{Conflict handler $c provided an invalid winner for $S.$T: $winner}); - } - $self->{conflictwinner} = $winner; - $conflict_resolved = 1; + my $winners = $1; + $self->glog("Custom code $cname says winners should be: $winners"); + $self->{conflictinfo}{winners} = $winners; last; ## No sense in going on } ## Allow it to set permanent winner for all rounds until we restart + ## Need an intermediate levels: row, table, all tables. one or all if ($result =~ /winner_always: (.+)/o) { - my $winner = $1; - $self->glog("Conflict handler $c says winner should ALWAYS be: $winner"); - if (! exists $deltabin{$winner}) { - $self->pause_and_exit(qq{Conflict handler $c provided an invalid winner_always for $S.$T: $winner}); - } - $self->{conflictwinner} = $self->{conflictwinneralways} = $winner; - $conflict_resolved = 1; + my $winners = $1; + $self->glog("Custom code $cname says winners should ALWAYS be: $winners"); + $self->{conflictinfo}{winners} = $self->{conflictinfo}{winneralways} = $winners; last; ## No sense in going on } - ## Allow it to set permanent winner for all rounds! - - ## Loop through and make sure the conflict handler has done its job - while (my ($key, $winner) = each %conflict) { - if (! defined $winner or ref $winner or ! exists $deltabin{$winner}) { - ($pkval = $key) =~ s/\0/\|/go; - $self->pause_and_exit(qq{Conflict handler $c provided an invalid winner for $S.$T.$pkval: $winner}); - } - $self->glog("Conflict handler $c says winner for $key is $winner", LOG_VERBOSE); - } - $conflict_resolved = 1; + ## We assume that some or all keys in %conflict have been changed, + ## from a hashref to a scalar. + ## We don't do checks here, as it will get caught down below. ## If info->{lastcode} has been set, we don't call any other codes - ## Kind of moot last if $result eq 'last'; } ## end each code_conflict - - if (! $conflict_resolved) { - $self->glog(qq{Could not resolve the conflicts! Pausing sync $syncname}, LOG_WARN); - $self->pause_and_exit(qq{Conflict handlers failed to resolve the conflict for $S.$T}); - } - } ## If conflict_strategy is abort, simply die right away elsif ('bucardo_abort' eq $g->{conflict_strategy}) { @@ -4042,174 +4027,123 @@ sub start_kid { elsif ('bucardo_custom' eq $g->{conflict_strategy}) { $self->pause_and_exit(qq{Aborting sync due to lack of custom conflict handler for $S.$T}); } - - ## If we are grabbing the 'latest', figure out which it is - ## For this handler, we want to treat all the tables in the sync - ## as deeply linked to each other, and this we have one winning - ## database for *all* tables in the sync. - ## Thus, the only things arriving from other databases will be inserts - elsif ('bucardo_latest' eq $g->{conflict_strategy}) { - - ## We only need to figure out the winning database once - ## The winner is the latest one to touch any of our tables - ## In theory, this is a little crappy. - ## In practice, it works out quite well. :) - - $self->glog(q{Starting 'bucardo_latest' conflict strategy}, LOG_VERBOSE); - - if (! exists $self->{conflictwinner}) { - - for my $dbname (@dbs_delta) { - - $x = $sync->{db}{$dbname}; - - ## Find the maximum txntime across all tables in this sync - my $maxsql = 'SELECT extract(epoch FROM MAX(txntime)) FROM'; + elsif ($g->{conflict_strategy} =~ /^bucardo_latest/o) { + + ## For bucardo_latest*, we want to check the transaction times across + ## all databases in this sync that may conflict - in other words, + ## source databases that have deltas. We then sort that list and set it + ## as the list of preferred databases + ## Right now, there are two variants: + ## bucardo_latest: check this table only + ## bucardo_latest_all_tables: check all tables in the sync + + $self->glog(qq{Starting conflict strategy $g->{conflict_strategy}}, LOG_VERBOSE); + + ## If we are doing all tables, we can cache the information + if ($g->{conflict_strategy} eq 'bucardo_latest' + or ! exists $self->{conflictinfo}{bucardo_latest_winners}) { + + ## Find the maximum txntime across all databases + my $maxsql = 'SELECT extract(epoch FROM MAX(txntime)) FROM'; + if ($g->{conflict_strategy} eq 'bucardo_latest') { + $SQL = "$maxsql bucardo.$g->{deltatable}"; + } + elsif ($g->{conflict_strategy} eq 'bucardo_latest_all_tables') { $SQL = join " UNION\n" => map { "$maxsql bucardo.$_->{deltatable}" } - grep { $_->{reltype} eq 'table'} - @$goatlist; - $SQL .= ' ORDER BY 1 DESC NULLS LAST LIMIT 1'; + grep { $_->{reltype} eq 'table'} + @$goatlist; + } + else { + $self->pause_and_exit(qq{Unknown conflict_strategy $g->{conflict_strategy}!}); + } + $SQL .= ' ORDER BY 1 DESC NULLS LAST LIMIT 1'; + ## Check every database that generates deltas + for my $dbname (@dbs_delta) { + $x = $sync->{db}{$dbname}; $sth = $x->{dbh}->prepare($SQL); $sth->execute(); $x->{lastmod} = $sth->fetchall_arrayref()->[0][0] || 0; + $self->glog("ZZZ db $dbname table $T lastmod is $x->{lastmod}"); + } - } ## end checking each source database - - ## Now we declare the overall winner - ## We sort the database names so even in the (very!) unlikely - ## chance of a tie, the same database always wins - my $highest = -1; - for my $dbname (sort @dbs_delta) { - - $x = $sync->{db}{$dbname}; - - $self->glog("Conflict check lastmod for $dbname is $x->{lastmod}", LOG_DEBUG); + ## Now we can put them in rank order + ## The last modification time is the main key + ## In the unlikely chance of a tie, we go by alphabetical database name + $self->{conflictinfo}{bucardo_latest_winner} + = join ' ' => + map { $_->[0] } + sort { $b->[1] <=> $a->[1] or $a->[0] cmp $b->[0] } + map { [$_, $sync->{db}{$_}{lastmod} ] } + @dbs_delta; - if ($x->{lastmod} > $highest) { - $highest = $x->{lastmod}; - $self->{conflictwinner} = $dbname; - } - } + $self->glog("Set conflict winners to: $self->{conflictinfo}{bucardo_latest_winner}", LOG_VERBOSE); - ## We now have a winning database inside self -> conflictwinner - ## This means we do not need to update %conflict at all - $self->glog("Conflict winner is $self->{conflictwinner} with $highest", LOG_VERBOSE); + } ## end bucardo_latest_winner not set yet - } ## end conflictwinner not set yet + $self->{conflictinfo}{winners} = $self->{conflictinfo}{bucardo_latest_winner}; - } + } ## end of bucardo_latest* else { + ## Not a built-in, so assume a list of databases: + $self->{conflictinfo}{winners} = $g->{conflict_strategy}; + } - ## Use the standard conflict: a list of database names - ## Basically, we use the first valid one we find - ## The only reason *not* to use an entry is if it had - ## no updates at all for this run. Note: this does not - ## mean no conflicts, it means no insert/update/delete - - $self->glog(qq{Starting conflict strategy "$g->{conflict_strategy}"}, LOG_VERBOSE); + ## At this point, conflictinfo{winners} should contain a list of databases + ## This can come from the built-in strategies, from a customcode, + ## or directly as a list supplied by the user + ## We walk through all of the conflicting rows, and set the winner as the + ## database highest in the supplied list - if (exists $self->{conflictwinner}) { - $self->glog("Using previous conflict winner: $self->{conflictwinner}", LOG_VERBOSE); + ## Optimize for a single database name + my $sc = $self->{conflictinfo}{winners} + or pause_and_exit(q{Invalid conflict winners list given}); + if (index($sc, ' ') < 1) { + ## Sanity check + if (! exists $deltacount{$sc}) { + $self->pause_and_exit(qq{Invalid conflict strategy '$sc' used for $S.$T: no such database}); } - else { - - ## Optimize for a single database name - my $sc = $g->{conflict_strategy}; - if (index($sc, ' ') < 1) { - ## Sanity check - if (! exists $deltacount{$sc}) { - $self->pause_and_exit(qq{Invalid conflict_strategy '$sc' used for $S.$T}); - } - $self->{conflictwinner} = $sc; - } - else { - ## Have more than one, so figure out the best one to use - my @dbs = split / +/ => $sc; - ## Make sure they all exist - for my $dbname (@dbs) { - if (! exists $deltacount{$dbname}) { - $self->pause_and_exit(qq{Invalid database "$dbname" found in standard conflict for $S.$T}); - } - } - - ## Check each candidate in turn - ## It wins, unless it has no changes at all - for my $dbname (@dbs) { - - my $found_delta = 0; - ## Walk through but stop at the first found delta - for my $g (@$goatlist) { - - ## This only makes sense for tables - next if $g->{reltype} ne 'table'; - - ## Prep our SQL: find the epoch of the latest transaction for this table - if (!exists $g->{sql_got_delta}) { - ## We need to know if any have run since the last time we ran this sync - ## In other words, any deltas newer than the highest track entry - $SQL = qq{SELECT COUNT(*) FROM bucardo.$g->{deltatable} d } - . q{WHERE d.txntime > } - . q{(SELECT COALESCE(MAX(txntime),'1999-12-31') } - . qq{FROM bucardo.$g->{tracktable} } - . qq{WHERE target = '$x->{DBGROUPNAME}')}; - $g->{sql_got_delta} = $SQL; - } - - $x = $sync->{db}{$dbname}; - $sth = $x->{dbh}->prepare($g->{sql_got_delta}); - $sth->execute(); - $count = $sth->fetch()->[0]; - if ($count >= 1) { - $self->glog("Found a delta for db $dbname, table $g->{tablename}", LOG_DEBUG); - $found_delta = 1; - last; - } - } - - if (! $found_delta) { - $self->glog("No rows changed, so discarding conflict winner '$dbname'", LOG_VERBOSE); - next; - } - - $self->{conflictwinner} = $dbname; - $self->glog("Set conflict winner to database $dbname", LOG_NORMAL); - last; - - } - - ## No match at all? Must be a non-inclusive list - if (! exists $self->{conflictwinner}) { - $self->pause_and_exit(qq{Invalid standard conflict '$sc': no matching database found!}); - } + for my $pkval (keys %conflict) { + ## May have already been set by customcode, so only change if a ref + $conflict{$pkval} = $sc if ref $conflict{$pkval}; + } + } + else { + ## Have more than one, so figure out the best one to use + my @dbs = split / +/ => $sc; + ## Make sure they all exist + for my $dbname (@dbs) { + if (! exists $deltacount{$dbname}) { + $self->pause_and_exit(qq{Invalid conflict strategy '$sc' used for $S.$T: no such database '$dbname'});; } - } ## end conflictwinner not set yet - - } ## end standard conflict + } + + ## Fill in each conflict with first found database + for my $pkval (keys %conflict) { + ## As above, we only change if currently a ref + next if ! ref $conflict{$pkval}; + $conflict{$pkval} = first { exists $conflict{$pkval}{$_} } split ' ' => $sc; + } + } - ## At this point, conflictwinner should be set, OR - ## %conflict should hold the winning database per key + ## At this point, the conflict hash should consist of keys with + ## the winning database as the value ## Walk through and apply to the %deltabin hash - ## We want to walk through each primary key for this table - ## We figure out who the winning database is - ## Then we remove all rows for all databases with this key - ## Finally, we add the winning databases/key combo to deltabin - ## We do it this way as we cannot be sure that the combo existed. - ## It could be the case that the winning database made - ## no changes to this table! - for my $key (keys %conflict) { - my $winner = $self->{conflictwinner} || $conflict{$key}; + for my $pkey (keys %conflict) { ## Delete everyone for this primary key for my $dbname (keys %deltabin) { - delete $deltabin{$dbname}{$key}; + delete $deltabin{$dbname}{$pkey}; } ## Add (or re-add) the winning one - $deltabin{$winner}{$key} = 1; + ## We do it this way as we cannot be sure that the combo existed. + ## It could be the case that the winning database made + ## no changes to this table! + $deltabin{ $conflict{$pkey} }{$pkey} = 1; } $self->glog('Conflicts have been resolved', LOG_NORMAL); @@ -8641,6 +8575,11 @@ sub run_kid_custom_code { $self->glog("Running $c->{whenrun} custom code $c->{id}: $c->{name}", LOG_NORMAL); + ## Allow the caller to maintain some state by providing a hash + if (! exists $self->{kid_customcode_shared}) { + $self->{kid_customcode_shared} = {}; + } + ## Create a hash of information common to all customcodes my $info = { syncname => $sync->{name}, @@ -8653,6 +8592,7 @@ sub run_kid_custom_code { lastcode => '', ## Tells the caller to skip any other codes of this type endsync => '', ## Tells the caller to cancel the whole sync sendmail => sub { $self->send_mail(@_) }, + shared => $self->{kid_customcode_shared}, }; ## Add in any items custom to this code