From bc5af6833cd6369fc07b8501923c36e776b288d2 Mon Sep 17 00:00:00 2001 From: Greg Sabino Mullane Date: Sat, 29 Aug 2015 23:06:07 -0400 Subject: [PATCH] More cleanup of push_rows(). Now at about minimum desired code quality. :) --- Bucardo.pm | 262 ++++++++++++++++++++++++++--------------------------- 1 file changed, 127 insertions(+), 135 deletions(-) diff --git a/Bucardo.pm b/Bucardo.pm index f368076a7..8ca860a8b 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -9234,7 +9234,7 @@ sub delete_rows { if ('firebird' eq $type) { $goat->{pklist} =~ s/\"//g; ## not ideal: fix someday $goat->{pklist} = uc $goat->{pklist}; - $tname = qq{"$tname"} if $tname !~ /"/; + $tname = qq{"$tname"} if $tname !~ /"/; } ## Internal counters to help us break queries into chunks if needed @@ -9405,11 +9405,11 @@ sub delete_rows { ## The actual target name my $tname = $newname->{$t->{name}}; - $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG); + $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG); - if ('firebird' eq $type) { - $tname = qq{"$tname"} if $tname !~ /"/; - } + if ('firebird' eq $type) { + $tname = qq{"$tname"} if $tname !~ /"/; + } if ('mongo' eq $type) { @@ -9600,23 +9600,19 @@ sub delete_rows { sub push_rows { - ## Copy rows from one table to another + ## Copy rows from one table to others ## Typically called after delete_rows() ## Arguments: six ## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty. ## 2. Table object ## 3. Sync object (may be empty if we are not associated with a sync) ## 4. Source database object - ## 5. Target database object, or arrayref of the same + ## 5. Target database object (or an arrayref of the same) ## 6. Action mode - currently only 'copy' and 'fullcopy' ## Returns: number of rows copied my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_; - if ($mode eq 'fullcopy') { - $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG); - } - ## This will be zero for fullcopy of course my $total_rows = keys %$rows; @@ -9624,19 +9620,9 @@ sub push_rows { return 0; ## Can happen on a truncation } - my $syncname = $Sync->{name} || ''; - - ## We may want to change the target table based on the customname table - ## It is up to the caller to populate these, even if the syncname is '' - my $customname = $Table->{newname}{$syncname} || {}; - - ## We may want to change the SELECT based on the customcols table - my $customcols = $Table->{newcols}{$syncname} || {}; - my $numpks = $Table->{numpkcols}; - ## As with delete, we may break this into more than one statement - ## Should only be a factor for very large numbers of keys + ## If there are a large number of rows (and we are not using ANY) break the statement up my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size; ## Build a list of all PK values to feed to IN clauses @@ -9648,25 +9634,27 @@ sub push_rows { if ($numpks == 1 and $total_rows <= $chunksize) { $mode = 'anyclause'; } - ## Otherwise, we push our completed SQL into bins + ## Otherwise, we split up the primary key values into bins else { - my $pkrounds = 1; + my $pk_array_number = 0; my $current_row = 1; ## Loop through each row and create the needed SQL fragment for my $key (keys %$rows) { - push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1; + push @{ $pkvals[$pk_array_number] ||= [] } => split '\0', $key, -1; ## Make sure our SQL statement doesn't grow too large if (++$current_row > $chunksize) { $current_row = 1; - $pkrounds++; + $pk_array_number++; } } } } + my $syncname = $Sync->{name} || ''; + ## Make sure TargetDB is an arrayref (may come as a single TargetDB object) if (ref $TargetDB ne 'ARRAY') { $TargetDB = [$TargetDB]; @@ -9674,20 +9662,25 @@ sub push_rows { ## Figure out the different SELECT clauses, and assign targets to them my %srccmd; - for my $t (@$TargetDB ) { + for my $Target (@$TargetDB ) { ## The SELECT clause we use (usually an empty string unless customcols is being used) - my $select_clause = $customcols->{$t->{name}} || ''; + my $select_clause = $Table->{newcols}{$syncname}{$Target->{name}} || ''; ## Associate this target with this clause - push @{$srccmd{$select_clause}} => $t; + push @{$srccmd{$select_clause}} => $Target; } + ## We may want to change the target table based on the customname table + ## It is up to the caller to populate these, even if the syncname is '' + my $customname = $Table->{newname}{$syncname} || {}; + + ## Name of the table to copy. Only Postgres can be used as a source my $source_tablename = "$Table->{safeschema}.$Table->{safetable}"; my $sourcedbh = $SourceDB->{dbh}; - ## The total number of source rows returned - my $total_source_rows = 0; + ## Actual number of source rows read and copied. May be less than $total_rows + my $source_rows_read = 0; ## Loop through each select command and push it out to all targets that are associated with it for my $select_clause (sort keys %srccmd) { @@ -9696,99 +9689,90 @@ sub push_rows { my $SELECT = $select_clause || 'SELECT *'; ## Prepare each target that is using this select clause - for my $target (@{ $srccmd{$select_clause} }) { + for my $Target (@{ $srccmd{$select_clause} }) { ## Internal name of this target - my $targetname = $target->{name}; + my $targetname = $Target->{name}; ## The actual target table name. Depends on dbtype and customname table entries my $target_tablename = $customname->{$targetname}; ## The columns we are pushing to, both as an arrayref and a CSV: my $cols = $Table->{tcolumns}{$SELECT}; - my $columnlist = $target->{does_sql} ? - ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')') + my $columnlist = $Target->{does_sql} ? + ('(' . (join ',', map { $Target->{dbh}->quote_identifier($_) } @$cols) . ')') : ('(' . (join ',', map { $_ } @$cols) . ')'); - my $type = $target->{dbtype}; + my $type = $Target->{dbtype}; - ## Use columnlist below so we never have to worry about the order - ## of the columns on the target + ## Using columnlist avoids worrying about the order of columns if ('postgres' eq $type) { my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN"; - $target->{dbh}->do($tgtcmd); + $Target->{dbh}->do($tgtcmd); + } + elsif ('firebird' eq $type) { + $columnlist =~ s/\"//g; + $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/; + my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; + $tgtcmd .= '?,' x @$cols; + $tgtcmd =~ s/,$/)/o; + $Target->{sth} = $Target->{dbh}->prepare($tgtcmd); } elsif ('flatpg' eq $type) { - print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n"; - $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE); + print {$Target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n"; } elsif ('flatsql' eq $type) { - print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n"; - $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE); + print {$Target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n"; } elsif ('mongo' eq $type) { - $self->{collection} = $target->{dbh}->get_collection($target_tablename); + $self->{collection} = $Target->{dbh}->get_collection($target_tablename); } elsif ('redis' eq $type) { ## No setup needed } - elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) { + elsif ('sqlite' eq $type or 'oracle' eq $type or + 'mysql' eq $type or 'mariadb' eq $type or 'drizzle' eq $type) { my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; $tgtcmd .= '?,' x @$cols; $tgtcmd =~ s/,$/)/o; - $target->{sth} = $target->{dbh}->prepare($tgtcmd); - } - elsif ('firebird' eq $type) { - $columnlist =~ s/\"//g; - $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/; - my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; - $tgtcmd .= '?,' x @$cols; - $tgtcmd =~ s/,$/)/o; - $target->{sth} = $target->{dbh}->prepare($tgtcmd); - } - elsif ('oracle' eq $type) { - my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; - $tgtcmd .= '?,' x @$cols; - $tgtcmd =~ s/,$/)/o; - $target->{sth} = $target->{dbh}->prepare($tgtcmd); - } - elsif ('sqlite' eq $type) { - my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES ("; - $tgtcmd .= '?,' x @$cols; - $tgtcmd =~ s/,$/)/o; - $target->{sth} = $target->{dbh}->prepare($tgtcmd); + $Target->{sth} = $Target->{dbh}->prepare($tgtcmd); } else { die qq{No support for database type "$type" yet!}; } + if ($type =~ /flat/) { + $self->glog(qq{Appended to flatfile "$Target->{filename}"}, LOG_VERBOSE); + } + } ## end preparing each target for this select clause my $loop = 1; - my $pcount = @pkvals; - my $fromname = $SourceDB->{name}; + my $number_chunks = @pkvals; ## Loop through each chunk of primary keys to copy over for my $pk_values (@pkvals) { ## Start streaming rows from the source - $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE); + my $pre = $number_chunks > 1 ? "/* $loop of $number_chunks */ " : ''; + $self->glog(qq{${pre}Copying from $SourceDB->{name}.$source_tablename}, LOG_VERBOSE); ## If we are doing a small batch of single primary keys, use ANY - ## If we are doing fullcopy, leave out the WHERE clause completely + ## For a fullcopy mode, leave the WHERE clause out completely if ($mode eq 'fullcopy' or $mode eq 'anyclause') { my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s', - $self->{sqlprefix}, - $SELECT, - $source_tablename, - $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)", - $Sync->{copyextra} ? " $Sync->{copyextra}" : ''; + $self->{sqlprefix}, + $SELECT, + $source_tablename, + $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)", + $Sync->{copyextra} ? " $Sync->{copyextra}" : ''; + my $srcsth = $sourcedbh->prepare($srccmd); - $mode eq 'fullcopy' ? $srcsth->execute() - : $srcsth->execute( [ keys %$rows ]); + $mode eq 'fullcopy' ? $srcsth->execute() : $srcsth->execute( [ keys %$rows ]); } else { + ## Create the proper number of placeholders my $baseq = '?'; if ($numpks > 1) { $baseq = '?,' x $numpks; @@ -9797,53 +9781,52 @@ sub push_rows { my $number_values = @$pk_values; my $placeholders = "$baseq," x ($number_values / $numpks); chop $placeholders; + my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s', - "/* $loop of $pcount */ ", + $pre, $self->{sqlprefix}, $SELECT, $source_tablename, $Table->{pkeycols}, $placeholders, $Sync->{copyextra} ? " $Sync->{copyextra}" : ''; + my $srcsth = $sourcedbh->prepare($srccmd); $srcsth->execute( @$pk_values ); - $loop++; } - ## How many rows we read from the source database this chunk - my $source_rows = 0; - ## Loop through each row output from the source, storing it in $buffer - ## Future optimzation: slurp in X rows at a time, then process them + ## Future optimization: slurp in X rows at a time, then process them my $buffer = ''; while ($sourcedbh->pg_getcopydata($buffer) >= 0) { - $source_rows++; + $source_rows_read++; ## For each target using this particular SELECT clause - for my $target (@{ $srccmd{$select_clause} }) { + for my $Target (@{ $srccmd{$select_clause} }) { - my $type = $target->{dbtype}; + my $type = $Target->{dbtype}; ## For Postgres, we simply do COPY to COPY if ('postgres' eq $type) { - $target->{dbh}->pg_putcopydata($buffer); + $Target->{dbh}->pg_putcopydata($buffer); } ## For flat files destined for Postgres, just do a tab-delimited dump elsif ('flatpg' eq $type) { - print {$target->{filehandle}} $buffer; + print {$Target->{filehandle}} $buffer; } ## For other flat files, make a standard VALUES list elsif ('flatsql' eq $type) { chomp $buffer; - if ($source_rows > 1) { - print {$target->{filehandle}} ",\n"; + if ($source_rows_read > 1) { + print {$Target->{filehandle}} ",\n"; } - print {$target->{filehandle}} '(' . + print {$Target->{filehandle}} '(' . (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')'; } ## For Mongo, do some mongomagic elsif ('mongo' eq $type) { + ## Have to map these values back to their names chomp $buffer; my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; @@ -9875,26 +9858,8 @@ sub push_rows { } $self->{collection}->insert($object, { safe => 1 }); } - ## For MySQL, MariaDB, Firebird, Drizzle, Oracle, and SQLite, do some basic INSERTs - elsif ('mysql' eq $type - or 'mariadb' eq $type - or 'drizzle' eq $type - or 'firebird' eq $type - or 'oracle' eq $type - or 'sqlite' eq $type) { - chomp $buffer; - my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; - my $targetcols = $Table->{tcolumns}{$SELECT}; - for my $cindex (0..@cols) { - next unless defined $cols[$cindex]; - if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') { - # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE - $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0; - } - } - $target->{sth}->execute(@cols); - } elsif ('redis' eq $type) { + ## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue" chomp $buffer; my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; @@ -9913,17 +9878,41 @@ sub push_rows { push @add, $targetcols->[$i], $val; } - my $target_tablename = $customname->{$target->{name}}; - $target->{dbh}->hmset("$target_tablename:$pkeyval", @add); + my $target_tablename = $customname->{$Target->{name}}; + $Target->{dbh}->hmset("$target_tablename:$pkeyval", @add); + } + ## For SQLite, MySQL, MariaDB, Firebird, Drizzle, and Oracle, do some basic INSERTs + elsif ('sqlite' eq $type + or 'oracle' eq $type + or 'mysql' eq $type + or 'mariadb' eq $type + or 'drizzle' eq $type + or 'firebird' eq $type) { + + chomp $buffer; + my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1; + my $targetcols = $Table->{tcolumns}{$SELECT}; + for my $cindex (0..@cols) { + next unless defined $cols[$cindex]; + if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') { + # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE + $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0; + } + } + $Target->{sth}->execute(@cols); + } + ## Safety valve: + else { + die qq{No support for database type "$type" yet!}; } } ## end each target } ## end each row pulled from the source - $total_source_rows += $source_rows; + $loop++; - } ## end each pklist + } ## end each chunk of primary keys ## Workaround for DBD::Pg bug ## Once we require a minimum version of 2.18.1 or better, we can remove this! @@ -9932,32 +9921,35 @@ sub push_rows { } ## Perform final cleanups for each target - for my $target (@{ $srccmd{$select_clause} }) { + for my $Target (@{ $srccmd{$select_clause} }) { + + my $target_tablename = $customname->{$Target->{name}}; - my $type = $target->{dbtype}; + my $type = $Target->{dbtype}; - my $tname = $customname->{$target->{name}}; + $self->glog(qq{Rows copied to ($type) $Target->{name}.$target_tablename: $source_rows_read}, LOG_VERBOSE); if ('postgres' eq $type) { - my $dbh = $target->{dbh}; + my $dbh = $Target->{dbh}; $dbh->pg_putcopyend(); ## Same bug as above if ($self->{dbdpgversion} < 21801) { $dbh->do('SELECT 1'); } - $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE); ## If this table is set to makedelta, add rows to bucardo.delta to simulate the ## normal action of a trigger and add a row to bucardo.track to indicate that ## it has already been replicated here. - my $d = $Sync->{db}{ $target->{name} }; + my $d = $Sync->{db}{ $Target->{name} }; if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) { - $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE); + $self->glog("Using makedelta to populate delta and track tables for $Target->{name}.$target_tablename", LOG_VERBOSE); my $cols = join ',' => @{ $Table->{qpkey} }; + ## We use the original list, not what may have actually got copied! for my $pk_values (@pkvals) { + ## Generate the correct number of placeholders my $baseq = '?'; if ($numpks > 1) { $baseq = '?,' x $numpks; @@ -9969,14 +9961,15 @@ sub push_rows { my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s', $Table->{deltatable}, - $cols, - $placeholders; + $cols, + $placeholders; + my $sth = $dbh->prepare($SQL); $sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows)); } # Make sure we track it - but only if this sync already acts as a source! - if ($target->{role} eq 'source') { + if ($Target->{role} eq 'source') { $dbh->do(qq{ INSERT INTO bucardo.$Table->{tracktable} VALUES (NOW(), ?) @@ -9987,35 +9980,34 @@ sub push_rows { ## However, we do not want to kick unless they are set to autokick and active ## This works even if we do not have a real syncs, as $syncname will be '' $self->glog('Signalling other syncs that this table has changed', LOG_DEBUG); - if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) { + if (! exists $self->{kick_othersyncs}{$syncname}{$target_tablename}) { $SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?'; $sth = $self->{masterdbh}->prepare($SQL); $sth->execute($Table->{schemaname}, $Table->{tablename}, $syncname, 'active'); - $self->{kick_othersyncs}{$syncname}{$tname} = $sth->fetchall_arrayref(); + $self->{kick_othersyncs}{$syncname}{$target_tablename} = $sth->fetchall_arrayref(); } - for my $row (@{ $self->{kick_othersyncs}{$syncname}{$tname} }) { + ## For each sync returned from the query above, send a kick request + for my $row (@{ $self->{kick_othersyncs}{$syncname}{$target_tablename} }) { my $othersync = $row->[0]; $self->db_notify($dbh, "kick_sync_$othersync", 0, '', 1); } } } elsif ('flatpg' eq $type) { - print {$target->{filehandle}} "\\\.\n\n"; + print {$Target->{filehandle}} "\\\.\n\n"; } elsif ('flatsql' eq $type) { - print {$target->{filehandle}} ";\n\n"; - } - elsif ('redis' eq $type) { - $self->glog(qq{Rows copied to Redis $target->{name}.$tname:: $total_source_rows}, LOG_VERBOSE); + print {$Target->{filehandle}} ";\n\n"; } else { - ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird + ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird, redis } - } + + } ## end each Target } ## end of each clause in the source command list - return $total_source_rows; + return $source_rows_read; } ## end of push_rows -- 2.39.5