From f9941532e886bf3ab9cf309acea55f8fcf3b3a3e Mon Sep 17 00:00:00 2001 From: Greg Sabino Mullane Date: Wed, 20 May 2015 12:46:20 -0400 Subject: [PATCH] Have validate_sync check the contents of functions, not just their existence, to allow us to update functions on remote targets on an upgrade. --- bucardo.schema | 772 ++++++++++++++++++++++--------------------------- 1 file changed, 351 insertions(+), 421 deletions(-) diff --git a/bucardo.schema b/bucardo.schema index fac5ab364..4e43346da 100644 --- a/bucardo.schema +++ b/bucardo.schema @@ -1261,16 +1261,11 @@ if (! $is_fullcopy) { my $run_sql = sub { my ($sql,$dbh) = @_; - $sql =~ s/\t/ /gsm; - if ($sql =~ /^(\s+)/m) { - (my $ws = $1) =~ s/[^ ]//g; - my $leading = length($ws); - $sql =~ s/^\s{$leading}//gsm; - } elog(DEBUG, "SQL: $sql"); $dbh->do($sql); }; + my $fetch1_sql = sub { my ($sql,$dbh,@items) = @_; $sql =~ s/\t/ /gsm; @@ -1420,51 +1415,16 @@ for my $dbname (sort { ($db{$b}{role} eq 'source') <=> ($db{$a}{role} eq 'source } my $newschema = $count < 1 ? 1 : 0; - ## Create the 'kickfunc' function as needed - if (exists $bfunctionoid{$kickfunc}) { - ## We may want to recreate this function - if ($force{all} or $force{funcs} or $force{kickfunc}) { - $dbh->do(qq{DROP FUNCTION bucardo."$kickfunc"()}); - delete $bfunctionoid{$kickfunc}; - } - } - - if (! exists $bfunctionoid{$kickfunc}) { - ## We may override this later on with a custom function from bucardo_custom_trigger - ## and we may not even use it all, but no harm in creating the stock one here - my $notice = $dbh->{pg_server_version} >= 90000 - ? qq{bucardo, 'kick_sync_$syncname'} - : qq{"bucardo_kick_sync_$syncname"}; - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo."$kickfunc"() - RETURNS TRIGGER - VOLATILE - LANGUAGE plpgsql - AS \$notify\$ - BEGIN - EXECUTE \$nn\$NOTIFY $notice\$nn\$; - RETURN NEW; - END; - \$notify\$; - }; - $run_sql->($SQL,$dbh); - } +my @functions = ( - ## Create the bucardo_tablename_maker function as needed - if (! exists $bfunctionoid{'bucardo_tablename_maker'}) { - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_tablename_maker(TEXT) -RETURNS TEXT -LANGUAGE plpgsql -IMMUTABLE -AS \$clone\$ +{ name => 'bucardo_tablename_maker', args => 'text', returns => 'text', vol => 'immutable', body => q{ DECLARE tname TEXT; newname TEXT; hashed TEXT; BEGIN -- Change the first period to an underscore - SELECT INTO tname REPLACE(\$1, '.', '_'); + SELECT INTO tname REPLACE($1, '.', '_'); -- Assumes max_identifier_length is 63 -- Because even if not, we'll still abbreviate for consistency and portability SELECT INTO newname SUBSTRING(tname FROM 1 FOR 57); @@ -1477,95 +1437,47 @@ BEGIN SELECT INTO newname quote_ident(newname); RETURN newname; END; -\$clone\$; - - }; - $run_sql->($SQL,$dbh); +} +}, - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_tablename_maker(TEXT,TEXT) -RETURNS TEXT -LANGUAGE plpgsql -IMMUTABLE -AS \$clone\$ +{ name => 'bucardo_tablename_maker', args => 'text, text', returns => 'text', vol => 'immutable', body => q{ DECLARE newname TEXT; BEGIN - SELECT INTO newname bucardo.bucardo_tablename_maker(\$1); + SELECT INTO newname bucardo.bucardo_tablename_maker($1); -- If it has quotes around it, we expand the quotes to include the prefix IF (POSITION('"' IN newname) >= 1) THEN newname = REPLACE(newname, '"', ''); - newname = '"' || \$2 || newname || '"'; + newname = '"' || $2 || newname || '"'; ELSE - newname = \$2 || newname; + newname = $2 || newname; END IF; RETURN newname; END; -\$clone\$; - - }; - $run_sql->($SQL,$dbh); - } +} +}, - if (! exists $bfunctionoid{'bucardo_delta_names_helper'}) { - $SQL = q{ -CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_names_helper() -RETURNS TRIGGER -LANGUAGE plpgsql -IMMUTABLE -AS $bcc$ +{ name => 'bucardo_delta_names_helper', args => '', returns => 'trigger', vol => 'immutable', body => q{ BEGIN -IF NEW.deltaname IS NULL THEN - NEW.deltaname = bucardo.bucardo_tablename_maker(NEW.tablename, 'delta_'); -END IF; -IF NEW.trackname IS NULL THEN - NEW.trackname = bucardo.bucardo_tablename_maker(NEW.tablename, 'track_'); -END IF; -RETURN NEW; + IF NEW.deltaname IS NULL THEN + NEW.deltaname = bucardo.bucardo_tablename_maker(NEW.tablename, 'delta_'); + END IF; + IF NEW.trackname IS NULL THEN + NEW.trackname = bucardo.bucardo_tablename_maker(NEW.tablename, 'track_'); + END IF; + RETURN NEW; END; -$bcc$; - }; - $run_sql->($SQL,$dbh); - } - - ## Create the bucardo_delta_names table as needed - if (! exists $btableoid{'bucardo_delta_names'}) { - $SQL = qq{ - CREATE TABLE bucardo.bucardo_delta_names ( - sync TEXT, - tablename TEXT, - deltaname TEXT, - trackname TEXT, - cdate TIMESTAMPTZ NOT NULL DEFAULT now() - ); - }; - $run_sql->($SQL,$dbh); - - $SQL = qq{CREATE UNIQUE INDEX bucardo_delta_names_unique ON bucardo.bucardo_delta_names (sync,tablename)}; - $run_sql->($SQL,$dbh); - - $SQL = qq{ -CREATE TRIGGER bucardo_delta_namemaker -BEFORE INSERT OR UPDATE -ON bucardo.bucardo_delta_names -FOR EACH ROW EXECUTE PROCEDURE bucardo.bucardo_delta_names_helper(); - }; - $run_sql->($SQL,$dbh); - } +} +}, - ## Function to do a quick check of all deltas for a given sync - if (! exists $bfunctionoid{'bucardo_delta_check'}) { - $SQL = q{ -CREATE OR REPLACE FUNCTION bucardo.bucardo_delta_check(TEXT,TEXT) -RETURNS SETOF TEXT -LANGUAGE plpgsql -AS $bcc$ +## Function to do a quick check of all deltas for a given sync +{ name => 'bucardo_delta_check', args => 'text, text', returns => 'SETOF TEXT', body => q{ DECLARE - myst TEXT; - myrec RECORD; - mycount INT; + myst TEXT; + myrec RECORD; + mycount INT; BEGIN FOR myrec IN SELECT * FROM bucardo.bucardo_delta_names @@ -1596,7 +1508,330 @@ BEGIN END LOOP; RETURN; END; -$bcc$; +} +}, + +## Function to write to the tracking table upon a truncation +{ name => 'bucardo_note_truncation', args => '', returns => 'trigger', body => q{ +DECLARE + mytable TEXT; + myst TEXT; +BEGIN + INSERT INTO bucardo.bucardo_truncate_trigger(tablename,sname,tname,sync) + VALUES (TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME, TG_ARGV[0]); + + SELECT INTO mytable + bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'delta_'); + myst = 'TRUNCATE TABLE bucardo.' || mytable; + EXECUTE myst; + + SELECT INTO mytable + bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'track_'); + myst = 'TRUNCATE TABLE bucardo.' || mytable; + EXECUTE myst; + + -- Not strictly necessary, but nice to have a clean slate + SELECT INTO mytable + bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'stage_'); + myst = 'TRUNCATE TABLE bucardo.' || mytable; + EXECUTE myst; + + RETURN NEW; +END; +} +}, + +## Function to remove duplicated entries from the bucardo_delta tables +{ name => 'bucardo_compress_delta', args => 'text, text', returns => 'text', body => q{ +DECLARE + mymode TEXT; + myoid OID; + myst TEXT; + got2 bool; + drows BIGINT = 0; + trows BIGINT = 0; + rnames TEXT; + rname TEXT; + rnamerec RECORD; + ids_where TEXT; + ids_sel TEXT; + ids_grp TEXT; + idnum TEXT; +BEGIN + + -- Are we running in a proper mode? + SELECT INTO mymode current_setting('transaction_isolation'); + IF (mymode <> 'serializable' AND mymode <> 'repeatable read') THEN + RAISE EXCEPTION 'This function must be run in repeatable read mode'; + END IF; + + -- Grab the oid of this schema/table combo + SELECT INTO myoid + c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE nspname = $1 AND relname = $2; + + IF NOT FOUND THEN + RAISE EXCEPTION 'No such table: %.%', $1, $2; + END IF; + + ids_where = 'COALESCE(rowid,''NULL'') = COALESCE(id, ''NULL'')'; + ids_sel = 'rowid AS id'; + ids_grp = 'rowid'; + FOR rnamerec IN SELECT attname FROM pg_attribute WHERE attrelid = + (SELECT oid FROM pg_class WHERE relname = 'bucardo_delta' + AND relnamespace = + (SELECT oid FROM pg_namespace WHERE nspname = 'bucardo') AND attname ~ '^rowid' + ) LOOP + rname = rnamerec.attname; + rnames = COALESCE(rnames || ' ', '') || rname ; + SELECT INTO idnum SUBSTRING(rname FROM '[[:digit:]]+'); + IF idnum IS NOT NULL THEN + ids_where = ids_where + || ' AND (' + || rname + || ' = id' + || idnum + || ' OR (' + || rname + || ' IS NULL AND id' + || idnum + || ' IS NULL))'; + ids_sel = ids_sel + || ', ' + || rname + || ' AS id' + || idnum; + ids_grp = ids_grp + || ', ' + || rname; + END IF; + END LOOP; + + myst = 'DELETE FROM bucardo.bucardo_delta + USING (SELECT MAX(txntime) AS maxt, '||ids_sel||' + FROM bucardo.bucardo_delta + WHERE tablename = '||myoid||' + GROUP BY ' || ids_grp || ') AS foo + WHERE tablename = '|| myoid || ' AND ' || ids_where ||' AND txntime <> maxt'; + RAISE DEBUG 'Running %', myst; + EXECUTE myst; + + GET DIAGNOSTICS drows := row_count; + + myst = 'DELETE FROM bucardo.bucardo_track' + || ' WHERE NOT EXISTS (SELECT 1 FROM bucardo.bucardo_delta d WHERE d.txntime = bucardo_track.txntime)'; + EXECUTE myst; + + GET DIAGNOSTICS trows := row_count; + + RETURN 'Compressed '||$1||'.'||$2||'. Rows deleted from bucardo_delta: '||drows|| + ' Rows deleted from bucardo_track: '||trows; +END; +} ## end of bucardo_compress_delta body +}, + +{ name => 'bucardo_compress_delta', args => 'text', returns => 'text', language => 'sql', body => q{ +SELECT bucardo.bucardo_compress_delta(n.nspname, c.relname) + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE relname = $1 AND pg_table_is_visible(c.oid); +} +}, + +{ name => 'bucardo_compress_delta', args => 'oid', returns => 'text', language => 'sql', body => q{ +SELECT bucardo.bucardo_compress_delta(n.nspname, c.relname) + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.oid = $1; +} +}, + +## The main vacuum function to clean up the delta and track tables +{ name => 'bucardo_purge_delta_oid', 'args' => 'text, oid', returns => 'text', body => q{ +DECLARE + deltatable TEXT; + tracktable TEXT; + dtablename TEXT; + myst TEXT; + drows BIGINT = 0; + trows BIGINT = 0; +BEGIN + -- Store the schema and table name + SELECT INTO dtablename + quote_ident(nspname)||'.'||quote_ident(relname) + FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) + WHERE c.oid = $2; + + -- See how many dbgroups are being used by this table + SELECT INTO drows + COUNT(DISTINCT target) + FROM bucardo.bucardo_delta_targets + WHERE tablename = $2; + RAISE DEBUG 'delta_targets rows found for %: %', dtablename, drows; + + -- If no dbgroups, no point in going on, as we will never purge anything + IF drows < 1 THEN + RETURN 'Nobody is using table '|| dtablename ||', according to bucardo_delta_targets'; + END IF; + + -- Figure out the names of the delta and track tables for this relation + SELECT INTO deltatable + bucardo.bucardo_tablename_maker(dtablename, 'delta_'); + SELECT INTO tracktable + bucardo.bucardo_tablename_maker(dtablename, 'track_'); + + -- Delete all txntimes from the delta table that: + -- 1) Have been used by all dbgroups listed in bucardo_delta_targets + -- 2) Have a matching txntime from the track table + -- 3) Are older than the first argument interval + myst = 'DELETE FROM bucardo.' + || deltatable + || ' USING (SELECT txntime AS tt FROM bucardo.' + || tracktable + || ' GROUP BY 1 HAVING COUNT(*) = ' + || drows + || ') AS foo' + || ' WHERE txntime = tt' + || ' AND txntime < now() - interval ' + || quote_literal($1); + + EXECUTE myst; + + GET DIAGNOSTICS drows := row_count; + + -- Now that we have done that, we can remove rows from the track table + -- which have no match at all in the delta table + myst = 'DELETE FROM bucardo.' + || tracktable + || ' WHERE NOT EXISTS (SELECT 1 FROM bucardo.' + || deltatable + || ' d WHERE d.txntime = bucardo.' + || tracktable + || '.txntime)'; + + EXECUTE myst; + + GET DIAGNOSTICS trows := row_count; + + RETURN 'Rows deleted from ' + || deltatable + || ': ' + || drows + || ' Rows deleted from ' + || tracktable + || ': ' + || trows; + +END; +} ## end of bucardo_purge_delta_oid body +}, + +{ name => 'bucardo_purge_delta', args => 'text', returns => 'text', body => q{ +DECLARE + myrec RECORD; + myrez TEXT; + total INTEGER = 0; +BEGIN + + SET LOCAL search_path = pg_catalog; + + -- Grab all potential tables to be vacuumed by looking at bucardo_delta_targets + FOR myrec IN SELECT DISTINCT tablename FROM bucardo.bucardo_delta_targets LOOP + SELECT INTO myrez + bucardo.bucardo_purge_delta_oid($1, myrec.tablename); + RAISE NOTICE '%', myrez; + total = total + 1; + END LOOP; + + RESET search_path; + + RETURN 'Tables processed: ' || total; + +END; +} ## end of bucardo_purge_delta body +}, + + +); ## end of %functions + + for my $info (@functions) { + my $funcname = $info->{name}; + my ($oldmd5,$newmd5) = (0,1); + $SQL = 'SELECT md5(prosrc), md5(?) FROM pg_proc WHERE proname=? AND oidvectortypes(proargtypes)=?'; + my $sthmd5 = $dbh->prepare($SQL); + $count = $sthmd5->execute(" $info->{body} ", $funcname, $info->{args}); + if ($count < 1) { + $sthmd5->finish(); + } + else { + ($oldmd5,$newmd5) = @{$sthmd5->fetchall_arrayref()->[0]}; + } + if ($oldmd5 ne $newmd5) { + my $language = $info->{language} || 'plpgsql'; + my $volatility = $info->{vol} || 'VOLATILE'; + $SQL = " +CREATE OR REPLACE FUNCTION bucardo.$funcname($info->{args}) +RETURNS $info->{returns} +LANGUAGE $language +$volatility +SECURITY DEFINER +AS \$clone\$ $info->{body} \$clone\$"; + elog(INFO, "Writing function $funcname($info->{args})"); + $run_sql->($SQL,$dbh); + } + } + + ## Create the 'kickfunc' function as needed + if (exists $bfunctionoid{$kickfunc}) { + ## We may want to recreate this function + if ($force{all} or $force{funcs} or $force{kickfunc}) { + $dbh->do(qq{DROP FUNCTION bucardo."$kickfunc"()}); + delete $bfunctionoid{$kickfunc}; + } + } + + if (! exists $bfunctionoid{$kickfunc}) { + ## We may override this later on with a custom function from bucardo_custom_trigger + ## and we may not even use it all, but no harm in creating the stock one here + my $notice = $dbh->{pg_server_version} >= 90000 + ? qq{bucardo, 'kick_sync_$syncname'} + : qq{"bucardo_kick_sync_$syncname"}; + $SQL = qq{ + CREATE OR REPLACE FUNCTION bucardo."$kickfunc"() + RETURNS TRIGGER + VOLATILE + LANGUAGE plpgsql + AS \$notify\$ + BEGIN + EXECUTE \$nn\$NOTIFY $notice\$nn\$; + RETURN NEW; + END; + \$notify\$; + }; + $run_sql->($SQL,$dbh); + } + + ## Create the bucardo_delta_names table as needed + if (! exists $btableoid{'bucardo_delta_names'}) { + $SQL = qq{ + CREATE TABLE bucardo.bucardo_delta_names ( + sync TEXT, + tablename TEXT, + deltaname TEXT, + trackname TEXT, + cdate TIMESTAMPTZ NOT NULL DEFAULT now() + ); + }; + $run_sql->($SQL,$dbh); + + $SQL = qq{CREATE UNIQUE INDEX bucardo_delta_names_unique ON bucardo.bucardo_delta_names (sync,tablename)}; + $run_sql->($SQL,$dbh); + + $SQL = qq{ +CREATE TRIGGER bucardo_delta_namemaker +BEFORE INSERT OR UPDATE +ON bucardo.bucardo_delta_names +FOR EACH ROW EXECUTE PROCEDURE bucardo.bucardo_delta_names_helper(); }; $run_sql->($SQL,$dbh); } @@ -1690,44 +1925,6 @@ $bcc$; WHERE tablename NOT IN (select oid from pg_class) }); - ## Create the bucardo_note_truncation function as needed - if (! exists $bfunctionoid{'bucardo_note_truncation'}) { - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_note_truncation() - RETURNS TRIGGER - LANGUAGE plpgsql - SECURITY DEFINER - AS \$clone\$ - DECLARE - mytable TEXT; - myst TEXT; - BEGIN - INSERT INTO bucardo.bucardo_truncate_trigger(tablename,sname,tname,sync) - VALUES (TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME, TG_ARGV[0]); - - SELECT INTO mytable - bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'delta_'); - myst = 'TRUNCATE TABLE bucardo.' || mytable; - EXECUTE myst; - - SELECT INTO mytable - bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'track_'); - myst = 'TRUNCATE TABLE bucardo.' || mytable; - EXECUTE myst; - - -- Not strictly necessary, but nice to have a clean slate - SELECT INTO mytable - bucardo.bucardo_tablename_maker(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'stage_'); - myst = 'TRUNCATE TABLE bucardo.' || mytable; - EXECUTE myst; - - RETURN NEW; - END; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - } - ## Create the bucardo_truncate_trigger table as needed if (! exists $btableoid{'bucardo_truncate_trigger'}) { $SQL = qq{ @@ -1763,268 +1960,6 @@ $bcc$; $run_sql->($SQL,$dbh); } - ## Create the bucardo_compress_delta function as needed - if (! exists $bfunctionoid{'bucardo_compress_delta'}) { - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_compress_delta(text, text) - RETURNS TEXT - LANGUAGE plpgsql - SECURITY DEFINER - AS \$clone\$ - DECLARE - mymode TEXT; - myoid OID; - myst TEXT; - got2 bool; - drows BIGINT = 0; - trows BIGINT = 0; - rnames TEXT; - rname TEXT; - rnamerec RECORD; - ids_where TEXT; - ids_sel TEXT; - ids_grp TEXT; - idnum TEXT; - BEGIN - - -- Are we running in a proper mode? - SELECT INTO mymode current_setting('transaction_isolation'); - IF (mymode <> 'serializable' AND mymode <> 'repeatable read') THEN - RAISE EXCEPTION 'This function must be run in repeatable read mode'; - END IF; - - -- Grab the oid of this schema/table combo - SELECT INTO myoid - c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE nspname = \$1 AND relname = \$2; - - IF NOT FOUND THEN - RAISE EXCEPTION 'No such table: %.%', \$1, \$2; - END IF; - - ids_where = 'COALESCE(rowid,''NULL'') = COALESCE(id, ''NULL'')'; - ids_sel = 'rowid AS id'; - ids_grp = 'rowid'; - FOR rnamerec IN SELECT attname FROM pg_attribute WHERE attrelid = - (SELECT oid FROM pg_class WHERE relname = 'bucardo_delta' - AND relnamespace = - (SELECT oid FROM pg_namespace WHERE - nspname = 'bucardo') AND attname ~ '^rowid' - ) LOOP - rname = rnamerec.attname; - rnames = COALESCE(rnames || ' ', '') || rname ; - SELECT INTO idnum SUBSTRING(rname FROM '[[:digit:]]+'); - IF idnum IS NOT NULL THEN - ids_where = ids_where - || ' AND (' - || rname - || ' = id' - || idnum - || ' OR (' - || rname - || ' IS NULL AND id' - || idnum - || ' IS NULL))'; - ids_sel = ids_sel - || ', ' - || rname - || ' AS id' - || idnum; - ids_grp = ids_grp - || ', ' - || rname; - END IF; - END LOOP; - - myst = 'DELETE FROM bucardo.bucardo_delta - USING (SELECT MAX(txntime) AS maxt, '||ids_sel||' - FROM bucardo.bucardo_delta - WHERE tablename = '||myoid||' - GROUP BY ' || ids_grp || ') AS foo - WHERE tablename = '|| myoid || ' AND ' || ids_where ||' AND txntime <> maxt'; - RAISE DEBUG 'Running %', myst; - EXECUTE myst; - - GET DIAGNOSTICS drows := row_count; - - myst = 'DELETE FROM bucardo.bucardo_track' - || ' WHERE NOT EXISTS (SELECT 1 FROM bucardo.bucardo_delta d WHERE d.txntime = bucardo_track.txntime)'; - EXECUTE myst; - - GET DIAGNOSTICS trows := row_count; - - RETURN 'Compressed '||\$1||'.'||\$2||'. Rows deleted from bucardo_delta: '||drows|| - ' Rows deleted from bucardo_track: '||trows; - END; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_compress_delta(text) - RETURNS TEXT - LANGUAGE SQL - SECURITY DEFINER - AS \$clone\$ - SELECT bucardo.bucardo_compress_delta(n.nspname, c.relname) FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE relname = \$1 AND pg_table_is_visible(c.oid); - \$clone\$; - }; - $run_sql->($SQL,$dbh); - - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_compress_delta(oid) - RETURNS TEXT - LANGUAGE SQL - SECURITY DEFINER - AS \$clone\$ - SELECT bucardo.bucardo_compress_delta(n.nspname, c.relname) FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE c.oid = \$1; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_compress_delta() - RETURNS SETOF TEXT - LANGUAGE SQL - SECURITY DEFINER - AS \$clone\$ --- SELECT bucardo.bucardo_compress_delta(n.nspname, c.relname) FROM pg_class c --- JOIN pg_namespace n ON n.oid = c.relnamespace --- WHERE c.oid IN (SELECT DISTINCT tablename FROM bucardo.bucardo_delta); -SELECT 'Fixme'::TEXT; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - - } ## end of bucardo_compress_delta creations - - ## Create the bucardo_purge_delta functions as needed - if (! exists $bfunctionoid{'bucardo_purge_delta_oid'}) { - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_purge_delta_oid(text,oid) - RETURNS TEXT - LANGUAGE plpgsql - VOLATILE - SECURITY DEFINER - AS - \$clone\$ - DECLARE - deltatable TEXT; - tracktable TEXT; - dtablename TEXT; - myst TEXT; - drows BIGINT = 0; - trows BIGINT = 0; - BEGIN - -- Store the schema and table name - SELECT INTO dtablename - quote_ident(nspname)||'.'||quote_ident(relname) - FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) - WHERE c.oid = \$2; - - -- See how many dbgroups are being used by this table - SELECT INTO drows - COUNT(DISTINCT target) - FROM bucardo.bucardo_delta_targets - WHERE tablename = \$2; - RAISE DEBUG 'delta_targets rows found for %: %', dtablename, drows; - - -- If no dbgroups, no point in going on, as we will never purge anything - IF drows < 1 THEN - RETURN 'Nobody is using table '|| dtablename ||', according to bucardo_delta_targets'; - END IF; - - -- Figure out the names of the delta and track tables for this relation - SELECT INTO deltatable - bucardo.bucardo_tablename_maker(dtablename, 'delta_'); - SELECT INTO tracktable - bucardo.bucardo_tablename_maker(dtablename, 'track_'); - - -- Delete all txntimes from the delta table that: - -- 1) Have been used by all dbgroups listed in bucardo_delta_targets - -- 2) Have a matching txntime from the track table - -- 3) Are older than the first argument interval - myst = 'DELETE FROM bucardo.' - || deltatable - || ' USING (SELECT txntime AS tt FROM bucardo.' - || tracktable - || ' GROUP BY 1 HAVING COUNT(*) = ' - || drows - || ') AS foo' - || ' WHERE txntime = tt' - || ' AND txntime < now() - interval ' - || quote_literal(\$1); - - EXECUTE myst; - - GET DIAGNOSTICS drows := row_count; - - -- Now that we have done that, we can remove rows from the track table - -- which have no match at all in the delta table - myst = 'DELETE FROM bucardo.' - || tracktable - || ' WHERE NOT EXISTS (SELECT 1 FROM bucardo.' - || deltatable - || ' d WHERE d.txntime = bucardo.' - || tracktable - || '.txntime)'; - - EXECUTE myst; - - GET DIAGNOSTICS trows := row_count; - - RETURN 'Rows deleted from ' - || deltatable - || ': ' - || drows - || ' Rows deleted from ' - || tracktable - || ': ' - || trows; - - END; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - - $SQL = qq{ - CREATE OR REPLACE FUNCTION bucardo.bucardo_purge_delta(text) - RETURNS TEXT - LANGUAGE plpgsql - VOLATILE - SECURITY DEFINER - AS - \$clone\$ - DECLARE - myrec RECORD; - myrez TEXT; - total INTEGER = 0; - BEGIN - - SET LOCAL search_path = pg_catalog; - - -- Grab all potential tables to be vacuumed by looking at bucardo_delta_targets - FOR myrec IN SELECT DISTINCT tablename - FROM bucardo.bucardo_delta_targets LOOP - SELECT INTO myrez - bucardo.bucardo_purge_delta_oid(\$1, myrec.tablename); - RAISE NOTICE '%', myrez; - total = total + 1; - END LOOP; - - RESET search_path; - - RETURN 'Tables processed: ' || total; - END; - \$clone\$; - }; - $run_sql->($SQL,$dbh); - } - if (exists $btableoid{'bucardo_sequences'}) { ## Check for older version of bucardo_sequences table $SQL = q{SELECT count(*) FROM pg_attribute WHERE attname = 'targetname' } @@ -2060,11 +1995,6 @@ SELECT 'Fixme'::TEXT; $run_sql->($SQL,$dbh); } - ## Create the bucardo_audit function as needed - if (! exists $bfunctionoid{'bucardo_audit'}) { - $SQL = q{fillin later fixme}; - } - } ## end not fullcopy / all global items ## Build another list of information for each table -- 2.39.5