From ebb8a8b8699fbbe72cfa76f9cccc74d2c01b1053 Mon Sep 17 00:00:00 2001 From: Greg Sabino Mullane Date: Sun, 23 Aug 2015 10:54:43 -0400 Subject: [PATCH] Cleanup and partial rewrite of push_rows() A major change is to use ANY(?) when possible, and to avoid do any quoting. --- Bucardo.pm | 320 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 180 insertions(+), 140 deletions(-) diff --git a/Bucardo.pm b/Bucardo.pm index d4373b72c..330862582 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -9576,8 +9576,11 @@ sub push_rows { my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB) = @_; + ## Build a list of all PK values to feed to IN clauses + ## This is an array in case we go over $chunksize + my @pkvals = []; + ## If fullcopy, $rows will be the scalar 'fullcopy' instead of a hashref - ## In which case, we create a dummy hashref my $fullcopy = 0; if (! ref $rows) { if ($rows eq 'fullcopy') { @@ -9588,142 +9591,145 @@ sub push_rows { die "Invalid rows passed to push_rows: $rows\n"; } $rows = {}; + $pkvals[0] = 'fullcopy'; } - ## Make sure TargetDB is an arrayref (may come as a single TargetDB object) - if (ref $TargetDB ne 'ARRAY') { - $TargetDB = [$TargetDB]; + ## This will be zero for fullcopy of course + my $total_rows = keys %$rows; + + if (!$total_rows and !$fullcopy) { + return 0; ## Can happen on a truncation } - ## This will be zero for fullcopy of course - my $total = keys %$rows; + my $syncname = $Sync->{name} || ''; + + ## We may want to change the target table based on the customname table + ## It is up to the caller to populate these, even if the syncname is '' + my $customname = $Table->{newname}{$syncname} || {}; - ## Total number of rows written - $count = 0; + ## We may want to change the SELECT based on the customcols table + my $customcols = $Table->{newcols}{$syncname} || {}; - my $syncname = $Sync->{name}; - my $newname = $Table->{newname}{$syncname}; my $numpks = $Table->{numpkcols}; ## As with delete, we may break this into more than one step ## Should only be a factor for very large numbers of keys my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size; - ## Build a list of all PK values to feed to IN clauses - my @pkvals; - my $round = 0; - my $roundtotal = 0; - for my $key (keys %$rows) { - my $inner = length $key - ? (join ',' => map { s{\'}{''}go; s{\\}{\\\\}go; qq{'$_'}; } split '\0', $key, -1) - : q{''}; - push @{ $pkvals[$round] ||= [] } => $numpks > 1 ? "($inner)" : $inner; - if (++$roundtotal >= $chunksize) { - $roundtotal = 0; - $round++; + ## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)' + if (! $fullcopy) { + if ($numpks == 1 and $total_rows < $chunksize) { + $pkvals[0] = 'allrows'; } - } + ## Otherwise, we push our completed SQL into bins + else { + my $pkrounds = 1; + my $current_row = 1; - ## Example: 1234, 221 - ## Example MCPK: ('1234','Don''t Stop','2008-01-01'),('221','foobar','2008-11-01') + ## Loop through each row and create the needed SQL fragment + for my $key (keys %$rows) { - ## This can happen if we truncated but had no delta activity - return 0 if (! $pkvals[0] or ! length $pkvals[0]->[0] ) and ! $fullcopy; + push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1; - ## Put dummy data into @pkvals if using fullcopy - if ($fullcopy) { - push @pkvals => ['fullcopy']; + ## Make sure our SQL statement doesn't grow too large + if (++$current_row > $chunksize) { + $current_row = 1; + $pkrounds++; + } + } + } } - ## Get ready to export from the source - ## This may have multiple versions depending on the customcols table - my $newcols = $Table->{newcols}{$syncname} || {}; + ## Make sure TargetDB is an arrayref (may come as a single TargetDB object) + if (ref $TargetDB ne 'ARRAY') { + $TargetDB = [$TargetDB]; + } - ## Walk through and grab which SQL is needed for each target - ## Cache this earlier on - controller? + ## Figure out the different SELECT clauses, and assign targets to them my %srccmd; for my $t (@$TargetDB ) { - ## The SELECT clause we use (may be empty) - my $clause = $newcols->{$t->{name}}; + ## The SELECT clause we use (usually an empty string unless customcols is being used) + my $select_clause = $customcols->{$t->{name}} || ''; ## Associate this target with this clause - push @{$srccmd{$clause}} => $t; + push @{$srccmd{$select_clause}} => $t; } - my $tablename = "$Table->{safeschema}.$Table->{safetable}"; - my $fromdbh = $SourceDB->{dbh}; + my $source_tablename = "$Table->{safeschema}.$Table->{safetable}"; + my $sourcedbh = $SourceDB->{dbh}; + + ## The total number of source rows returned + my $total_source_rows = 0; - ## Loop through each source command and push it out to all targets - ## that are associated with it - for my $clause (sort keys %srccmd) { + ## Loop through each select command and push it out to all targets that are associated with it + for my $select_clause (sort keys %srccmd) { ## Build the clause (cache) and kick it off - my $SELECT = $clause || 'SELECT *'; + my $SELECT = $select_clause || 'SELECT *'; - ## Prepare each target in turn - for my $t (@{ $srccmd{$clause} }) { + ## Prepare each target that is using this select clause + for my $target (@{ $srccmd{$select_clause} }) { ## Internal name of this target - my $targetname = $t->{name}; + my $targetname = $target->{name}; - ## Name of the table we are pushing to on this target - my $tname = $newname->{$targetname}; + ## The actual target table name. Depends on dbtype and customname table entries + my $target_tablename = $customname->{$targetname}; ## The columns we are pushing to, both as an arrayref and a CSV: my $cols = $Table->{tcolumns}{$SELECT}; - my $columnlist = $t->{does_sql} ? - ('(' . (join ',', map { $t->{dbh}->quote_identifier($_) } @$cols) . ')') + my $columnlist = $target->{does_sql} ? + ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')') : ('(' . (join ',', map { $_ } @$cols) . ')'); - my $type = $t->{dbtype}; + my $type = $target->{dbtype}; ## Use columnlist below so we never have to worry about the order ## of the columns on the target if ('postgres' eq $type) { - my $tgtcmd = "$self->{sqlprefix}COPY $tname$columnlist FROM STDIN"; - $t->{dbh}->do($tgtcmd); + my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN"; + $target->{dbh}->do($tgtcmd); } elsif ('flatpg' eq $type) { - print {$t->{filehandle}} "COPY $tname$columnlist FROM STDIN;\n"; - $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE); + print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n"; + $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE); } elsif ('flatsql' eq $type) { - print {$t->{filehandle}} "INSERT INTO $tname$columnlist VALUES\n"; - $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE); + print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n"; + $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE); } elsif ('mongo' eq $type) { - $self->{collection} = $t->{dbh}->get_collection($tname); + $self->{collection} = $target->{dbh}->get_collection($target_tablename); } elsif ('redis' eq $type) { - ## No prep needed, other than to reset our count of changes - $t->{redis} = 0; + ## No setup needed } elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) { - my $tgtcmd = "INSERT INTO $tname$columnlist VALUES ("; + my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; $tgtcmd .= '?,' x @$cols; $tgtcmd =~ s/,$/)/o; - $t->{sth} = $t->{dbh}->prepare($tgtcmd); + $target->{sth} = $target->{dbh}->prepare($tgtcmd); } elsif ('oracle' eq $type) { - my $tgtcmd = "INSERT INTO $tname$columnlist VALUES ("; + my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; $tgtcmd .= '?,' x @$cols; $tgtcmd =~ s/,$/)/o; - $t->{sth} = $t->{dbh}->prepare($tgtcmd); + $target->{sth} = $target->{dbh}->prepare($tgtcmd); } elsif ('sqlite' eq $type) { - my $tgtcmd = "INSERT INTO $tname$columnlist VALUES ("; + my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; $tgtcmd .= '?,' x @$cols; $tgtcmd =~ s/,$/)/o; - $t->{sth} = $t->{dbh}->prepare($tgtcmd); + $target->{sth} = $target->{dbh}->prepare($tgtcmd); } else { die qq{No support for database type "$type" yet!}; } - } ## end preparing each target for this clause + } ## end preparing each target for this select clause my $loop = 1; my $pcount = @pkvals; @@ -9731,72 +9737,88 @@ sub push_rows { ## Loop through each chunk of primary keys to copy over for my $pk_values (@pkvals) { - my $pkvs = join ',' => @{ $pk_values }; - ## Message to prepend to the statement if chunking - my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */ "; - $loop++; + ## Start streaming rows from the source + $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE); - ## Kick off the copy on the source - $self->glog(qq{${pre}Copying from $fromname.$tablename}, LOG_VERBOSE); - my $srccmd = sprintf '%s%sCOPY (%s FROM %s %s) TO STDOUT%s', - $pre, + ## If we are doing a small batch of single primary keys, use ANY + ## If we are doing fullcopy, leave out the WHERE clause completely + if (! ref $pk_values ) { + my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s', $self->{sqlprefix}, $SELECT, - $tablename, - $fullcopy ? '' : " WHERE $Table->{pkcols} IN ($pkvs)", + $source_tablename, + $pk_values eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)", $Sync->{copyextra} ? " $Sync->{copyextra}" : ''; - $fromdbh->do($srccmd); - - my $buffer = ''; - - ## Loop through all changed rows on the source, and push to the target(s) - my $multirow = 0; - - ## If in fullcopy mode, we don't know how many rows will get copied, - ## so we count as we go along - if ($fullcopy) { - $total = 0; + my $srcsth = $sourcedbh->prepare($srccmd); + $pk_values eq 'fullcopy' ? $srcsth->execute() + : $srcsth->execute( [ keys %$rows ]); } + else { + my $baseq = '?'; + if ($numpks > 1) { + $baseq = '?,' x $numpks; + $baseq =~ s/(.+?).$/\($1\)/; + } + my $number_values = @$pk_values; + my $placeholders = "$baseq," x ($number_values / $numpks); + chop $placeholders; + my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s', + "/* $loop of $pcount */ ", + $self->{sqlprefix}, + $SELECT, + $source_tablename, + $Table->{pkeycols}, + $placeholders, + $Sync->{copyextra} ? " $Sync->{copyextra}" : ''; + my $srcsth = $sourcedbh->prepare($srccmd); + $srcsth->execute( @$pk_values ); + $loop++; + } + + ## How many rows we read from the source database this chunk + my $source_rows = 0; ## Loop through each row output from the source, storing it in $buffer - while ($fromdbh->pg_getcopydata($buffer) >= 0) { - - $total++ if $fullcopy; + ## Future optimzation: slurp in X rows at a time, then process them + my $buffer = ''; + while ($sourcedbh->pg_getcopydata($buffer) >= 0) { - ## For each target using this particular COPY statement - for my $t (@{ $srccmd{$clause} }) { + $source_rows++; - my $type = $t->{dbtype}; - my $cols = $Table->{tcolumns}{$SELECT}; - my $tname = $newname->{$t->{name}}; + ## For each target using this particular SELECT clause + for my $target (@{ $srccmd{$select_clause} }) { - chomp $buffer; + my $type = $target->{dbtype}; ## For Postgres, we simply do COPY to COPY if ('postgres' eq $type) { - $t->{dbh}->pg_putcopydata("$buffer\n"); + $target->{dbh}->pg_putcopydata($buffer); } ## For flat files destined for Postgres, just do a tab-delimited dump elsif ('flatpg' eq $type) { - print {$t->{filehandle}} "$buffer\n"; + print {$target->{filehandle}} $buffer; } ## For other flat files, make a standard VALUES list elsif ('flatsql' eq $type) { - if ($multirow++) { - print {$t->{filehandle}} ",\n"; + chomp $buffer; + if ($source_rows > 1) { + print {$target->{filehandle}} ",\n"; } - print {$t->{filehandle}} '(' . - (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')'; + print {$target->{filehandle}} '(' . + (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')'; } ## For Mongo, do some mongomagic elsif ('mongo' eq $type) { ## Have to map these values back to their names + chomp $buffer; my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; + my $targetcols = $Table->{tcolumns}{$SELECT}; + ## Our object consists of the primary keys, plus all other fields my $object = {}; - for my $cname (@{ $cols }) { + for my $cname (@{ $targetcols }) { $object->{$cname} = shift @cols; } ## Coerce non-strings into different objects @@ -9814,7 +9836,7 @@ sub push_rows { } } elsif ($Table->{columnhash}{$key}{ftype} =~ /real|double|numeric/o) { - $object->{$key} = strtod($object->{$key}); + $object->{$key} = strtod($object->{$key}); } } $self->{collection}->insert($object, { safe => 1 }); @@ -9825,18 +9847,21 @@ sub push_rows { or 'drizzle' eq $type or 'oracle' eq $type or 'sqlite' eq $type) { + chomp $buffer; my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; + my $targetcols = $Table->{tcolumns}{$SELECT}; for my $cindex (0..@cols) { next unless defined $cols[$cindex]; - if ($Table->{columnhash}{$cols->[$cindex]}{ftype} eq 'boolean') { + if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') { # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0; } } - $count += $t->{sth}->execute(@cols); + $target->{sth}->execute(@cols); } elsif ('redis' eq $type) { ## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue" + chomp $buffer; my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; my @pkey; for (1 .. $Table->{numpkcols}) { @@ -9846,66 +9871,78 @@ sub push_rows { ## Build a list of non-null key/value pairs to set in the hash my @add; $i = $Table->{numpkcols} - 1; + my $targetcols = $Table->{tcolumns}{$SELECT}; for my $val (@colvals) { $i++; next if ! defined $val; - push @add, $cols->[$i], $val; + push @add, $targetcols->[$i], $val; } - $t->{dbh}->hmset("$tname:$pkeyval", @add); - $count++; - $t->{redis}++; + my $target_tablename = $customname->{$target->{name}}; + $target->{dbh}->hmset("$target_tablename:$pkeyval", @add); } } ## end each target } ## end each row pulled from the source + $total_source_rows += $source_rows; + } ## end each pklist ## Workaround for DBD::Pg bug ## Once we require a minimum version of 2.18.1 or better, we can remove this! - if ($self->{dbdpgversion} < 21801) { - $fromdbh->do('SELECT 1'); + if ($SourceDB->{dbtype} eq 'postgres' and $self->{dbdpgversion} < 21801) { + $sourcedbh->do('SELECT 1'); } ## Perform final cleanups for each target - for my $t (@{ $srccmd{$clause} }) { + for my $target (@{ $srccmd{$select_clause} }) { - my $type = $t->{dbtype}; + my $type = $target->{dbtype}; - my $tname = $newname->{$t->{name}}; + my $tname = $customname->{$target->{name}}; if ('postgres' eq $type) { - my $dbh = $t->{dbh}; + my $dbh = $target->{dbh}; $dbh->pg_putcopyend(); ## Same bug as above if ($self->{dbdpgversion} < 21801) { $dbh->do('SELECT 1'); } - $self->glog(qq{Rows copied to $t->{name}.$tname: $total}, LOG_VERBOSE); - $count += $total; + $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE); ## If this table is set to makedelta, add rows to bucardo.delta to simulate the ## normal action of a trigger and add a row to bucardo.track to indicate that ## it has already been replicated here. - my $d = $Sync->{db}{ $t->{name} }; - if (!$fullcopy and $d->{does_makedelta}{$tablename}) { - $self->glog("Using makedelta to populate delta and track tables for $t->{name}.$tname", LOG_VERBOSE); - my $vals; - if ($numpks == 1) { - $vals = join ',', map { "($_)" } map { @{ $_ } } @pkvals; - } - else { - $vals = join ',', map { @{ $_ } } @pkvals; - } + my $d = $Sync->{db}{ $target->{name} }; + if (!$fullcopy and $d->{does_makedelta}{$source_tablename} ) { + + $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE); + my $cols = join ',' => @{ $Table->{qpkey} }; - $dbh->do(qq{ - INSERT INTO bucardo.$Table->{deltatable} ($cols) - VALUES $vals - }); + for my $pk_values (@pkvals) { + + my $baseq = '?'; + if ($numpks > 1) { + $baseq = '?,' x $numpks; + chop $baseq; + } + my $number_values = ref $pk_values ? @$pk_values : keys %$rows; + my $placeholders = "($baseq)," x ($number_values / $numpks); + chop $placeholders; + + my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s', + $Table->{deltatable}, + $cols, + $placeholders; + $self->glog("GOT $placeholders leading to $SQL", LOG_DEBUG); + my $sth = $dbh->prepare($SQL); + $sth->execute(ref $pk_values ? @$pk_values : (keys %$rows)); + } + # Make sure we track it - but only if this sync already acts as a source! - if ($t->{role} eq 'source') { + if ($target->{role} eq 'source') { $dbh->do(qq{ INSERT INTO bucardo.$Table->{tracktable} VALUES (NOW(), ?) @@ -9914,7 +9951,7 @@ sub push_rows { ## We want to send a kick signal to other syncs that are using this table ## However, we do not want to kick unless they are set to autokick and active - + ## This works even if we do not have a real syncs, as $syncname will be '' $self->glog('Signalling other syncs that this table has changed', LOG_DEBUG); if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) { $SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?'; @@ -9929,19 +9966,22 @@ sub push_rows { } } elsif ('flatpg' eq $type) { - print {$t->{filehandle}} "\\\.\n\n"; + print {$target->{filehandle}} "\\\.\n\n"; } elsif ('flatsql' eq $type) { - print {$t->{filehandle}} ";\n\n"; + print {$target->{filehandle}} ";\n\n"; } elsif ('redis' eq $type) { - $self->glog(qq{Rows copied to Redis $t->{name}.$tname:: $t->{redis}}, LOG_VERBOSE); + $self->glog(qq{Rows copied to Redis $target->{name}.$tname:: $total_source_rows}, LOG_VERBOSE); + } + else { + ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle } } } ## end of each clause in the source command list - return $count; + return $total_source_rows; } ## end of push_rows -- 2.39.5