From: Andres Freund Date: Tue, 4 Nov 2014 00:03:52 +0000 (+0100) Subject: bdr: Allow to replicate insert/update/delete selectively on a per set basis. X-Git-Tag: bdr-plugin/dynconf-before-global-add~100 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=dac330e8ec30e79284de229fd16ff1a5ae46d7cf;p=2ndquadrant_bdr.git bdr: Allow to replicate insert/update/delete selectively on a per set basis. Currently the user interface for this is the bdr.bdr_replication_set_config table. We'll need a layer ontop of that. This commit also introduces some other novelty because it starts to create the initial version specific extension scripts out of components. --- diff --git a/Makefile.in b/Makefile.in index c476c33162..e2ab0bf5cc 100644 --- a/Makefile.in +++ b/Makefile.in @@ -5,7 +5,12 @@ MODULE_big = bdr EXTENSION = bdr -DATA = bdr--0.8.0.sql bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql +DATA = bdr--0.8.0.sql \ + bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql \ + bdr--0.8.0--0.8.0.1.sql + +DATA_built = bdr--0.8.0.1.sql + DOCS = bdr.conf.sample README.bdr SCRIPTS = scripts/bdr_initial_load bdr_init_copy @@ -66,6 +71,9 @@ scripts/bdr_initial_load: scripts/bdr_initial_load.in mkdir -p scripts sed -e "s/BDR_VERSION/$(BDR_VERSION)/" -e "s/PG_VERSION/$(VERSION)/" $< > $@ +bdr--0.8.0.1.sql: bdr--0.8.0.sql bdr--0.8.0--0.8.0.1.sql + cat $^ > $@ + all: all-lib bdr_init_copy clean: additional-clean @@ -84,6 +92,7 @@ additional-maintainer-clean: clean rm -f config.status config.log rm -f bdr_config.h.in Makefile rm -f run_tests + rm -f bdr--0.8.0.1.sql @rm -rf autom4te.cache/ # Disabled because these tests require "wal_level=logical", which diff --git a/bdr--0.8.0--0.8.0.1.sql b/bdr--0.8.0--0.8.0.1.sql new file mode 100644 index 0000000000..d14dcbc212 --- /dev/null +++ b/bdr--0.8.0--0.8.0.1.sql @@ -0,0 +1,9 @@ +CREATE TABLE bdr.bdr_replication_set_config +( + set_name name PRIMARY KEY, + replicate_inserts bool NOT NULL DEFAULT true, + replicate_updates bool NOT NULL DEFAULT true, + replicate_deletes bool NOT NULL DEFAULT true +); +ALTER TABLE bdr.bdr_replication_set_config SET (user_catalog_table = true); +REVOKE ALL ON TABLE bdr.bdr_replication_set_config FROM PUBLIC; diff --git a/bdr--0.8.0.sql b/bdr--0.8.0.sql index f809917aa9..7e9427db33 100644 --- a/bdr--0.8.0.sql +++ b/bdr--0.8.0.sql @@ -8,8 +8,9 @@ GRANT USAGE ON SCHEMA bdr TO public; SET LOCAL search_path = bdr; -- We must be able to use exclusion constraints for global sequences -SET bdr.permit_unsafe_ddl_commands=true; - +SET bdr.permit_unsafe_ddl_commands = true; +-- We don't want to replicate commands from in here +SET bdr.skip_ddl_replication = true; CREATE OR REPLACE FUNCTION bdr_version() RETURNS TEXT LANGUAGE C @@ -530,14 +531,10 @@ BEGIN END; $$; - ---- ---- this should always be last to avoid replicating our internal schema ---- - CREATE EVENT TRIGGER bdr_queue_ddl_commands ON ddl_command_end EXECUTE PROCEDURE bdr.bdr_queue_ddl_commands(); -SET bdr.permit_unsafe_ddl_commands = false; +RESET bdr.permit_unsafe_ddl_commands; +RESET bdr.skip_ddl_replication; RESET search_path; diff --git a/bdr.c b/bdr.c index de56f8ec9a..02f4e7ef5a 100644 --- a/bdr.c +++ b/bdr.c @@ -79,6 +79,7 @@ Oid BdrNodesRelid; Oid BdrConflictHistoryRelId; Oid BdrLocksRelid; Oid BdrLocksByOwnerRelid; +Oid BdrReplicationSetConfigRelid; BdrConnectionConfig **bdr_connection_configs; /* All databases for which BDR is configured, valid after _PG_init */ @@ -1815,6 +1816,8 @@ bdr_maintain_schema(void) bdr_lookup_relid("bdr_global_locks", schema_oid); BdrLocksByOwnerRelid = bdr_lookup_relid("bdr_global_locks_byowner", schema_oid); + BdrReplicationSetConfigRelid = + bdr_lookup_relid("bdr_replication_set_config", schema_oid); bdr_conflict_handlers_init(); diff --git a/bdr.control b/bdr.control index 2b35a48ccb..1ec698cf13 100644 --- a/bdr.control +++ b/bdr.control @@ -1,6 +1,6 @@ # bdr extension comment = 'bdr support functions' -default_version = '0.8.0' +default_version = '0.8.0.1' module_pathname = '$libdir/bdr' relocatable = false requires = btree_gist diff --git a/bdr.h b/bdr.h index ecfc4d6ab4..c4fce6d154 100644 --- a/bdr.h +++ b/bdr.h @@ -272,6 +272,7 @@ extern Oid BdrVotesRelid; extern Oid BdrLocksRelid; extern Oid BdrLocksByOwnerRelid; +extern Oid BdrReplicationSetConfigRelid; /* apply support */ extern void bdr_process_remote_action(StringInfo s); @@ -402,6 +403,7 @@ extern void bdr_heap_compute_replication_settings( BDRRelation *rel, int num_replication_sets, char **replication_sets); +extern void BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid); extern void bdr_parse_relation_options(const char *label, BDRRelation *rel); extern void bdr_parse_database_options(const char *label); diff --git a/bdr_output.c b/bdr_output.c index f11d75496a..7ce3fd2aaf 100644 --- a/bdr_output.c +++ b/bdr_output.c @@ -491,13 +491,11 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i StartTransactionCommand(); } - schema_oid = get_namespace_oid("bdr", true); - if (schema_oid == InvalidOid) - { - bdr_maintain_schema(); - schema_oid = get_namespace_oid("bdr", true); - } - data->bdr_schema_oid = schema_oid; + bdr_maintain_schema(); + + data->bdr_schema_oid = get_namespace_oid("bdr", true); + schema_oid = data->bdr_schema_oid; + if (schema_oid != InvalidOid) { data->bdr_conflict_handlers_reloid = @@ -548,6 +546,13 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data, RelationGetRelid(r->rel) == data->bdr_locks_reloid) return false; + /* + * Quite ugly, but there's no neat way right now: Flush replication set + * configuration from bdr's relcache. + */ + if (RelationGetRelid(r->rel) == BdrReplicationSetConfigRelid) + BDRRelcacheHashInvalidateCallback(0, InvalidOid); + /* always replicate other stuff in the bdr schema */ if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid) return true; diff --git a/bdr_regress.conf b/bdr_regress.conf index 9d5d7f7ccd..06b41f5242 100644 --- a/bdr_regress.conf +++ b/bdr_regress.conf @@ -8,4 +8,4 @@ bdr.node1_replication_sets = 'default, important, for-node-1' bdr.node2_dsn = 'dbname=regression' bdr.node2_local_dbname = 'postgres' -bdr.node2_replication_sets = 'default, important, for-node-2' +bdr.node2_replication_sets = 'default, important, for-node-2, for-node-2-insert, for-node-2-update, for-node-2-delete' diff --git a/bdr_relcache.c b/bdr_relcache.c index 5849d82d85..eca8fa5b49 100644 --- a/bdr_relcache.c +++ b/bdr_relcache.c @@ -46,7 +46,7 @@ BDRRelcacheHashInvalidateEntry(BDRRelation *entry) } } -static void +void BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid) { HASH_SEQ_STATUS status; @@ -324,6 +324,41 @@ relation_in_replication_set(BDRRelation *r, const char *setname) return false; } +#include "access/genam.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" + +static HeapTuple +replset_lookup(Relation rel, const char *cname) +{ + ScanKey key; + NameData name; + SysScanDesc scan; + HeapTuple tuple = NULL; + + namestrcpy(&name, cname); + + key = (ScanKey) palloc(sizeof(ScanKeyData) * 1); + + ScanKeyInit(&key[0], + 1, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(&name) + ); + + /* FIXME: should use index */ + scan = systable_beginscan(rel, 0, true, NULL, 1, key); + + while ((tuple = systable_getnext(scan)) != NULL) + { + tuple = heap_copytuple(tuple); + break; + } + + systable_endscan(scan); + return tuple; +} + /* * Compute whether modifications to this relation should be replicated or not * and cache the result in the relation descriptor. @@ -332,6 +367,8 @@ relation_in_replication_set(BDRRelation *r, const char *setname) * a constant set of 'to be replicated' sets to be passed in - which happens * to be what we need for logical decoding. As there really isn't another need * for this functionality so far... + * Another reason restricting this to backends in decoding is that we + * currently can't invalidate the czache correctly otherwise. */ void bdr_heap_compute_replication_settings(BDRRelation *r, @@ -340,6 +377,8 @@ bdr_heap_compute_replication_settings(BDRRelation *r, { int i; + Assert(MyReplicationSlot); /* in decoding */ + Assert(!r->computed_repl_valid); /* Implicit "replicate everything" configuration */ @@ -359,17 +398,42 @@ bdr_heap_compute_replication_settings(BDRRelation *r, */ for (i = 0; i < conf_num_replication_sets; i++) { - const char* setname = conf_replication_sets[i]; + Relation repl_sets; + HeapTuple tuple; + const char* setname; + + setname = conf_replication_sets[i]; if (!relation_in_replication_set(r, setname)) continue; - /* - * In the future we'll lookup configuration for individual sets here. - */ - r->computed_repl_insert = true; - r->computed_repl_update = true; - r->computed_repl_delete = true; + repl_sets = heap_open(BdrReplicationSetConfigRelid, AccessShareLock); + tuple = replset_lookup(repl_sets, setname); + + if (tuple != NULL) + { + bool isnull; + TupleDesc desc = RelationGetDescr(repl_sets); + + if (DatumGetBool(fastgetattr(tuple, 2, desc, &isnull))) + r->computed_repl_insert = true; + + if (DatumGetBool(fastgetattr(tuple, 3, desc, &isnull))) + r->computed_repl_update = true; + + if (DatumGetBool(fastgetattr(tuple, 4, desc, &isnull))) + r->computed_repl_delete = true; + + pfree(tuple); + } + else + { + r->computed_repl_insert = true; + r->computed_repl_update = true; + r->computed_repl_delete = true; + } + + heap_close(repl_sets, AccessShareLock); /* no need to look any further, we replicate everything */ if (r->computed_repl_insert && diff --git a/expected/ddl/replication_set.out b/expected/ddl/replication_set.out index 637865c741..b8c149b9cb 100644 --- a/expected/ddl/replication_set.out +++ b/expected/ddl/replication_set.out @@ -210,3 +210,125 @@ SELECT * FROM settest_1 ORDER BY data; should-replicate-via-for-node-2-even-though-unknown (4 rows) +\c regression +DROP TABLE settest_1; +/* + * Now test configurations where only some actions are replicated. + */ +CREATE TABLE settest_2(data text primary key); +-- Test 1: ensure that inserts are replicated while update/delete are filtered +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}'); + table_set_replication_sets +---------------------------- + +(1 row) + +INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes) +VALUES ('for-node-2-insert', true, false, false), + ('for-node-2-update', false, true, false), + ('for-node-2-delete', false, false, true); +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1'); +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete'); +UPDATE settest_2 +SET data = 'repl-insert--insert-#2-update' +WHERE data = 'repl-insert--insert-#2-then-update'; +DELETE FROM settest_2 +WHERE data = 'repl-insert--insert-#3-then-delete'; +-- Test 2: ensure that updates are replicated while inserts/deletes are filtered +-- insert before filtering +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete'); +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}'); + table_set_replication_sets +---------------------------- + +(1 row) + +UPDATE settest_2 +SET data = 'repl-update--insert-#1-update' +WHERE data = 'repl-update--insert-#1-then-update'; +DELETE FROM settest_2 +WHERE data = 'repl-update--insert-#2-then-delete'; +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3'); +-- verify that changing the pg_replication_sets row has effects +UPDATE bdr.bdr_replication_set_config +SET replicate_inserts = true +WHERE set_name = 'for-node-2-update'; +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4'); +-- but reset to normal meaning afterwards +UPDATE bdr.bdr_replication_set_config +SET replicate_inserts = false +WHERE set_name = 'for-node-2-update'; +-- Test 3: ensure that deletes are replicated while inserts/updates are filtered +-- insert before filtering +SELECT bdr.table_set_replication_sets('settest_2', NULL); + table_set_replication_sets +---------------------------- + +(1 row) + +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete'); +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}'); + table_set_replication_sets +---------------------------- + +(1 row) + +UPDATE settest_2 +SET data = 'repl-delete--insert-#1-update' +WHERE data = 'repl-delete--insert-#1-then-update'; +DELETE FROM settest_2 +WHERE data = 'repl-delete--insert-#2-then-delete'; +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3'); +-- Test 4: ensure that all partial sets together replicate everything +SELECT bdr.table_set_replication_sets('settest_2', + '{for-node-2-insert,for-node-2-update,for-node-2-delete}'); + table_set_replication_sets +---------------------------- + +(1 row) + +INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete'); +UPDATE settest_2 +SET data = 'repl-combined--insert-#1-update' +WHERE data = 'repl-combined--insert-#1-then-update'; +DELETE FROM settest_2 +WHERE data = 'repl-combined--insert-#2-then-delete'; +SELECT * FROM settest_2 ORDER BY data; + data +--------------------------------- + repl-combined--insert-#1-update + repl-delete--insert-#1-update + repl-delete--insert-#3 + repl-insert--insert-#1 + repl-insert--insert-#2-update + repl-update--insert-#1-update + repl-update--insert-#3 + repl-update--insert-#4 +(8 rows) + +SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication; + pg_xlog_wait_remote_apply +--------------------------- + + +(2 rows) + +\c postgres +SELECT * FROM settest_2 ORDER BY data; + data +------------------------------------ + repl-combined--insert-#1-update + repl-delete--insert-#1-then-update + repl-insert--insert-#1 + repl-insert--insert-#2-then-update + repl-insert--insert-#3-then-delete + repl-update--insert-#1-update + repl-update--insert-#2-then-delete + repl-update--insert-#4 +(8 rows) + +\c regression diff --git a/sql/ddl/replication_set.sql b/sql/ddl/replication_set.sql index a3ba5e1e2b..4b1a120a5d 100644 --- a/sql/ddl/replication_set.sql +++ b/sql/ddl/replication_set.sql @@ -77,3 +77,93 @@ SELECT * FROM settest_1 ORDER BY data; SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication; \c postgres SELECT * FROM settest_1 ORDER BY data; +\c regression +DROP TABLE settest_1; + + +/* + * Now test configurations where only some actions are replicated. + */ +CREATE TABLE settest_2(data text primary key); + +-- Test 1: ensure that inserts are replicated while update/delete are filtered +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}'); +INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes) +VALUES ('for-node-2-insert', true, false, false), + ('for-node-2-update', false, true, false), + ('for-node-2-delete', false, false, true); + +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1'); +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete'); +UPDATE settest_2 +SET data = 'repl-insert--insert-#2-update' +WHERE data = 'repl-insert--insert-#2-then-update'; + +DELETE FROM settest_2 +WHERE data = 'repl-insert--insert-#3-then-delete'; + +-- Test 2: ensure that updates are replicated while inserts/deletes are filtered +-- insert before filtering +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete'); + +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}'); + +UPDATE settest_2 +SET data = 'repl-update--insert-#1-update' +WHERE data = 'repl-update--insert-#1-then-update'; + +DELETE FROM settest_2 +WHERE data = 'repl-update--insert-#2-then-delete'; + +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3'); + + +-- verify that changing the pg_replication_sets row has effects +UPDATE bdr.bdr_replication_set_config +SET replicate_inserts = true +WHERE set_name = 'for-node-2-update'; +INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4'); + +-- but reset to normal meaning afterwards +UPDATE bdr.bdr_replication_set_config +SET replicate_inserts = false +WHERE set_name = 'for-node-2-update'; + + +-- Test 3: ensure that deletes are replicated while inserts/updates are filtered +-- insert before filtering +SELECT bdr.table_set_replication_sets('settest_2', NULL); +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete'); +SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}'); + +UPDATE settest_2 +SET data = 'repl-delete--insert-#1-update' +WHERE data = 'repl-delete--insert-#1-then-update'; + +DELETE FROM settest_2 +WHERE data = 'repl-delete--insert-#2-then-delete'; + +INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3'); + + +-- Test 4: ensure that all partial sets together replicate everything +SELECT bdr.table_set_replication_sets('settest_2', + '{for-node-2-insert,for-node-2-update,for-node-2-delete}'); +INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update'); +INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete'); + +UPDATE settest_2 +SET data = 'repl-combined--insert-#1-update' +WHERE data = 'repl-combined--insert-#1-then-update'; + +DELETE FROM settest_2 +WHERE data = 'repl-combined--insert-#2-then-delete'; + +SELECT * FROM settest_2 ORDER BY data; +SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication; +\c postgres +SELECT * FROM settest_2 ORDER BY data; +\c regression