bdr: Allow to replicate insert/update/delete selectively on a per set basis.
authorAndres Freund <andres@anarazel.de>
Tue, 4 Nov 2014 00:03:52 +0000 (01:03 +0100)
committerAndres Freund <andres@anarazel.de>
Tue, 4 Nov 2014 00:06:16 +0000 (01:06 +0100)
Currently the user interface for this is the
bdr.bdr_replication_set_config table. We'll need a layer ontop of
that.

This commit also introduces some other novelty because it starts to
create the initial version specific extension scripts out of
components.

Makefile.in
bdr--0.8.0--0.8.0.1.sql [new file with mode: 0644]
bdr--0.8.0.sql
bdr.c
bdr.control
bdr.h
bdr_output.c
bdr_regress.conf
bdr_relcache.c
expected/ddl/replication_set.out
sql/ddl/replication_set.sql

index c476c331628dfe54e1118511d38b7a797ff9f25f..e2ab0bf5cc14e3c379ceba4ebbfc097510786ed6 100644 (file)
@@ -5,7 +5,12 @@
 MODULE_big = bdr
 
 EXTENSION = bdr
-DATA = bdr--0.8.0.sql bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql
+DATA = bdr--0.8.0.sql \
+   bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql \
+   bdr--0.8.0--0.8.0.1.sql
+
+DATA_built = bdr--0.8.0.1.sql
+
 DOCS = bdr.conf.sample README.bdr
 SCRIPTS = scripts/bdr_initial_load bdr_init_copy
 
@@ -66,6 +71,9 @@ scripts/bdr_initial_load: scripts/bdr_initial_load.in
    mkdir -p scripts
    sed -e "s/BDR_VERSION/$(BDR_VERSION)/" -e "s/PG_VERSION/$(VERSION)/" $< > $@
 
+bdr--0.8.0.1.sql: bdr--0.8.0.sql bdr--0.8.0--0.8.0.1.sql
+   cat $^ > $@
+
 all: all-lib bdr_init_copy
 
 clean: additional-clean
@@ -84,6 +92,7 @@ additional-maintainer-clean: clean
    rm -f config.status config.log
    rm -f bdr_config.h.in Makefile
    rm -f run_tests
+   rm -f bdr--0.8.0.1.sql
    @rm -rf autom4te.cache/
 
 # Disabled because these tests require "wal_level=logical", which
diff --git a/bdr--0.8.0--0.8.0.1.sql b/bdr--0.8.0--0.8.0.1.sql
new file mode 100644 (file)
index 0000000..d14dcbc
--- /dev/null
@@ -0,0 +1,9 @@
+CREATE TABLE bdr.bdr_replication_set_config
+(
+    set_name name PRIMARY KEY,
+    replicate_inserts bool NOT NULL DEFAULT true,
+    replicate_updates bool NOT NULL DEFAULT true,
+    replicate_deletes bool NOT NULL DEFAULT true
+);
+ALTER TABLE bdr.bdr_replication_set_config SET (user_catalog_table = true);
+REVOKE ALL ON TABLE bdr.bdr_replication_set_config FROM PUBLIC;
index f809917aa995a273a069da70bc67d9f3ed0f8edb..7e9427db33bc1f77e83891df96767bb52f8d0b17 100644 (file)
@@ -8,8 +8,9 @@ GRANT USAGE ON SCHEMA bdr TO public;
 
 SET LOCAL search_path = bdr;
 -- We must be able to use exclusion constraints for global sequences
-SET bdr.permit_unsafe_ddl_commands=true;
-
+SET bdr.permit_unsafe_ddl_commands = true;
+-- We don't want to replicate commands from in here
+SET bdr.skip_ddl_replication = true;
 CREATE OR REPLACE FUNCTION bdr_version()
 RETURNS TEXT
 LANGUAGE C
@@ -530,14 +531,10 @@ BEGIN
 END;
 $$;
 
-
----
---- this should always be last to avoid replicating our internal schema
----
-
 CREATE EVENT TRIGGER bdr_queue_ddl_commands
 ON ddl_command_end
 EXECUTE PROCEDURE bdr.bdr_queue_ddl_commands();
 
-SET bdr.permit_unsafe_ddl_commands = false;
+RESET bdr.permit_unsafe_ddl_commands;
+RESET bdr.skip_ddl_replication;
 RESET search_path;
diff --git a/bdr.c b/bdr.c
index de56f8ec9a54e119feb2d7ec643a470f2adc666a..02f4e7ef5a5af8e1c9507a5c76520fb5a56ebc1a 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -79,6 +79,7 @@ Oid   BdrNodesRelid;
 Oid   BdrConflictHistoryRelId;
 Oid   BdrLocksRelid;
 Oid   BdrLocksByOwnerRelid;
+Oid   BdrReplicationSetConfigRelid;
 
 BdrConnectionConfig  **bdr_connection_configs;
 /* All databases for which BDR is configured, valid after _PG_init */
@@ -1815,6 +1816,8 @@ bdr_maintain_schema(void)
        bdr_lookup_relid("bdr_global_locks", schema_oid);
    BdrLocksByOwnerRelid =
        bdr_lookup_relid("bdr_global_locks_byowner", schema_oid);
+   BdrReplicationSetConfigRelid  =
+       bdr_lookup_relid("bdr_replication_set_config", schema_oid);
 
    bdr_conflict_handlers_init();
 
index 2b35a48ccb70a82ba907981320867bae284b7baf..1ec698cf13d51a50d705322ad59f1d73c695b7bd 100644 (file)
@@ -1,6 +1,6 @@
 # bdr extension
 comment = 'bdr support functions'
-default_version = '0.8.0'
+default_version = '0.8.0.1'
 module_pathname = '$libdir/bdr'
 relocatable = false
 requires = btree_gist
diff --git a/bdr.h b/bdr.h
index ecfc4d6ab474655e6f73bb8868949b9bb2a716ad..c4fce6d1540516227ad24c50c5d3c3992550dbd5 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -272,6 +272,7 @@ extern Oid  BdrVotesRelid;
 extern Oid BdrLocksRelid;
 extern Oid BdrLocksByOwnerRelid;
 
+extern Oid  BdrReplicationSetConfigRelid;
 
 /* apply support */
 extern void bdr_process_remote_action(StringInfo s);
@@ -402,6 +403,7 @@ extern void bdr_heap_compute_replication_settings(
    BDRRelation *rel,
    int         num_replication_sets,
    char      **replication_sets);
+extern void BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid);
 
 extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
 extern void bdr_parse_database_options(const char *label);
index f11d75496a9cbb399070746a4e44ea630490002c..7ce3fd2aaf0b0f2fba4ed042f951b2f632f5686b 100644 (file)
@@ -491,13 +491,11 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
            StartTransactionCommand();
        }
 
-       schema_oid = get_namespace_oid("bdr", true);
-       if (schema_oid == InvalidOid)
-       {
-           bdr_maintain_schema();
-           schema_oid = get_namespace_oid("bdr", true);
-       }
-       data->bdr_schema_oid = schema_oid;
+       bdr_maintain_schema();
+
+       data->bdr_schema_oid = get_namespace_oid("bdr", true);
+       schema_oid = data->bdr_schema_oid;
+
        if (schema_oid != InvalidOid)
        {
            data->bdr_conflict_handlers_reloid =
@@ -548,6 +546,13 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
       RelationGetRelid(r->rel) == data->bdr_locks_reloid)
        return false;
 
+   /*
+    * Quite ugly, but there's no neat way right now: Flush replication set
+    * configuration from bdr's relcache.
+    */
+   if (RelationGetRelid(r->rel) == BdrReplicationSetConfigRelid)
+       BDRRelcacheHashInvalidateCallback(0, InvalidOid);
+
    /* always replicate other stuff in the bdr schema */
    if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
        return true;
index 9d5d7f7ccd4f80fe8e6f63f309f1be7f40374999..06b41f52428cd981c408ba67bbbda6e337bb1f55 100644 (file)
@@ -8,4 +8,4 @@ bdr.node1_replication_sets = 'default, important, for-node-1'
 
 bdr.node2_dsn = 'dbname=regression'
 bdr.node2_local_dbname = 'postgres'
-bdr.node2_replication_sets = 'default, important, for-node-2'
+bdr.node2_replication_sets = 'default, important, for-node-2, for-node-2-insert, for-node-2-update, for-node-2-delete'
index 5849d82d85751009e42ebb8ceb3aaaf4f8af0847..eca8fa5b49fc838e18358784178ab964e2500060 100644 (file)
@@ -46,7 +46,7 @@ BDRRelcacheHashInvalidateEntry(BDRRelation *entry)
    }
 }
 
-static void
+void
 BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
 {
    HASH_SEQ_STATUS status;
@@ -324,6 +324,41 @@ relation_in_replication_set(BDRRelation *r, const char *setname)
    return false;
 }
 
+#include "access/genam.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+
+static HeapTuple
+replset_lookup(Relation rel, const char *cname)
+{
+   ScanKey         key;
+   NameData        name;
+   SysScanDesc     scan;
+   HeapTuple       tuple = NULL;
+
+   namestrcpy(&name, cname);
+
+   key = (ScanKey) palloc(sizeof(ScanKeyData) * 1);
+
+   ScanKeyInit(&key[0],
+               1,
+               BTEqualStrategyNumber, F_NAMEEQ,
+               NameGetDatum(&name)
+       );
+
+   /* FIXME: should use index */
+   scan = systable_beginscan(rel, 0, true, NULL, 1, key);
+
+   while ((tuple = systable_getnext(scan)) != NULL)
+   {
+       tuple = heap_copytuple(tuple);
+       break;
+   }
+
+   systable_endscan(scan);
+   return tuple;
+}
+
 /*
  * Compute whether modifications to this relation should be replicated or not
  * and cache the result in the relation descriptor.
@@ -332,6 +367,8 @@ relation_in_replication_set(BDRRelation *r, const char *setname)
  * a constant set of 'to be replicated' sets to be passed in - which happens
  * to be what we need for logical decoding. As there really isn't another need
  * for this functionality so far...
+ * Another reason restricting this to backends in decoding is that we
+ * currently can't invalidate the czache correctly otherwise.
  */
 void
 bdr_heap_compute_replication_settings(BDRRelation *r,
@@ -340,6 +377,8 @@ bdr_heap_compute_replication_settings(BDRRelation *r,
 {
    int i;
 
+   Assert(MyReplicationSlot); /* in decoding */
+
    Assert(!r->computed_repl_valid);
 
    /* Implicit "replicate everything" configuration */
@@ -359,17 +398,42 @@ bdr_heap_compute_replication_settings(BDRRelation *r,
     */
    for (i = 0; i < conf_num_replication_sets; i++)
    {
-       const char* setname = conf_replication_sets[i];
+       Relation repl_sets;
+       HeapTuple tuple;
+       const char* setname;
+
+       setname = conf_replication_sets[i];
 
        if (!relation_in_replication_set(r, setname))
            continue;
 
-       /*
-        * In the future we'll lookup configuration for individual sets here.
-        */
-       r->computed_repl_insert = true;
-       r->computed_repl_update = true;
-       r->computed_repl_delete = true;
+       repl_sets = heap_open(BdrReplicationSetConfigRelid, AccessShareLock);
+       tuple = replset_lookup(repl_sets, setname);
+
+       if (tuple != NULL)
+       {
+           bool        isnull;
+           TupleDesc   desc = RelationGetDescr(repl_sets);
+
+           if (DatumGetBool(fastgetattr(tuple, 2, desc, &isnull)))
+               r->computed_repl_insert = true;
+
+           if (DatumGetBool(fastgetattr(tuple, 3, desc, &isnull)))
+               r->computed_repl_update = true;
+
+           if (DatumGetBool(fastgetattr(tuple, 4, desc, &isnull)))
+               r->computed_repl_delete = true;
+
+           pfree(tuple);
+       }
+       else
+       {
+           r->computed_repl_insert = true;
+           r->computed_repl_update = true;
+           r->computed_repl_delete = true;
+       }
+
+       heap_close(repl_sets, AccessShareLock);
 
        /* no need to look any further, we replicate everything */
        if (r->computed_repl_insert &&
index 637865c7415f4bdb23a3915be9aaa3268cb221e0..b8c149b9cb6595b4db249ee7965714ab6421605b 100644 (file)
@@ -210,3 +210,125 @@ SELECT * FROM settest_1 ORDER BY data;
  should-replicate-via-for-node-2-even-though-unknown
 (4 rows)
 
+\c regression
+DROP TABLE settest_1;
+/*
+ * Now test configurations where only some actions are replicated.
+ */
+CREATE TABLE settest_2(data text primary key);
+-- Test 1: ensure that inserts are replicated while update/delete are filtered
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}');
+ table_set_replication_sets 
+----------------------------
+(1 row)
+
+INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes)
+VALUES ('for-node-2-insert', true, false, false),
+       ('for-node-2-update', false, true, false),
+       ('for-node-2-delete', false, false, true);
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete');
+UPDATE settest_2
+SET data = 'repl-insert--insert-#2-update'
+WHERE data = 'repl-insert--insert-#2-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-insert--insert-#3-then-delete';
+-- Test 2: ensure that updates are replicated while inserts/deletes are filtered
+-- insert before filtering
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}');
+ table_set_replication_sets 
+----------------------------
+(1 row)
+
+UPDATE settest_2
+SET data = 'repl-update--insert-#1-update'
+WHERE data = 'repl-update--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-update--insert-#2-then-delete';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3');
+-- verify that changing the pg_replication_sets row has effects
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = true
+WHERE set_name = 'for-node-2-update';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4');
+-- but reset to normal meaning afterwards
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = false
+WHERE set_name = 'for-node-2-update';
+-- Test 3: ensure that deletes are replicated while inserts/updates are filtered
+-- insert before filtering
+SELECT bdr.table_set_replication_sets('settest_2', NULL);
+ table_set_replication_sets 
+----------------------------
+(1 row)
+
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}');
+ table_set_replication_sets 
+----------------------------
+(1 row)
+
+UPDATE settest_2
+SET data = 'repl-delete--insert-#1-update'
+WHERE data = 'repl-delete--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-delete--insert-#2-then-delete';
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3');
+-- Test 4: ensure that all partial sets together replicate everything
+SELECT bdr.table_set_replication_sets('settest_2',
+    '{for-node-2-insert,for-node-2-update,for-node-2-delete}');
+ table_set_replication_sets 
+----------------------------
+(1 row)
+
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete');
+UPDATE settest_2
+SET data = 'repl-combined--insert-#1-update'
+WHERE data = 'repl-combined--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-combined--insert-#2-then-delete';
+SELECT * FROM settest_2 ORDER BY data;
+              data               
+---------------------------------
+ repl-combined--insert-#1-update
+ repl-delete--insert-#1-update
+ repl-delete--insert-#3
+ repl-insert--insert-#1
+ repl-insert--insert-#2-update
+ repl-update--insert-#1-update
+ repl-update--insert-#3
+ repl-update--insert-#4
+(8 rows)
+
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
+ pg_xlog_wait_remote_apply 
+---------------------------
+(2 rows)
+
+\c postgres
+SELECT * FROM settest_2 ORDER BY data;
+                data                
+------------------------------------
+ repl-combined--insert-#1-update
+ repl-delete--insert-#1-then-update
+ repl-insert--insert-#1
+ repl-insert--insert-#2-then-update
+ repl-insert--insert-#3-then-delete
+ repl-update--insert-#1-update
+ repl-update--insert-#2-then-delete
+ repl-update--insert-#4
+(8 rows)
+
+\c regression
index a3ba5e1e2b1f823dae3a15d069c974629b0e0a7c..4b1a120a5dc8409392bdf4eecb030d5eaba1bd79 100644 (file)
@@ -77,3 +77,93 @@ SELECT * FROM settest_1 ORDER BY data;
 SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
 \c postgres
 SELECT * FROM settest_1 ORDER BY data;
+\c regression
+DROP TABLE settest_1;
+
+
+/*
+ * Now test configurations where only some actions are replicated.
+ */
+CREATE TABLE settest_2(data text primary key);
+
+-- Test 1: ensure that inserts are replicated while update/delete are filtered
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}');
+INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes)
+VALUES ('for-node-2-insert', true, false, false),
+       ('for-node-2-update', false, true, false),
+       ('for-node-2-delete', false, false, true);
+
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete');
+UPDATE settest_2
+SET data = 'repl-insert--insert-#2-update'
+WHERE data = 'repl-insert--insert-#2-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-insert--insert-#3-then-delete';
+
+-- Test 2: ensure that updates are replicated while inserts/deletes are filtered
+-- insert before filtering
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete');
+
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}');
+
+UPDATE settest_2
+SET data = 'repl-update--insert-#1-update'
+WHERE data = 'repl-update--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-update--insert-#2-then-delete';
+
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3');
+
+
+-- verify that changing the pg_replication_sets row has effects
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = true
+WHERE set_name = 'for-node-2-update';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4');
+
+-- but reset to normal meaning afterwards
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = false
+WHERE set_name = 'for-node-2-update';
+
+
+-- Test 3: ensure that deletes are replicated while inserts/updates are filtered
+-- insert before filtering
+SELECT bdr.table_set_replication_sets('settest_2', NULL);
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}');
+
+UPDATE settest_2
+SET data = 'repl-delete--insert-#1-update'
+WHERE data = 'repl-delete--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-delete--insert-#2-then-delete';
+
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3');
+
+
+-- Test 4: ensure that all partial sets together replicate everything
+SELECT bdr.table_set_replication_sets('settest_2',
+    '{for-node-2-insert,for-node-2-update,for-node-2-delete}');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete');
+
+UPDATE settest_2
+SET data = 'repl-combined--insert-#1-update'
+WHERE data = 'repl-combined--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-combined--insert-#2-then-delete';
+
+SELECT * FROM settest_2 ORDER BY data;
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
+\c postgres
+SELECT * FROM settest_2 ORDER BY data;
+\c regression