MODULE_big = bdr
EXTENSION = bdr
-DATA = bdr--0.8.0.sql bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql
+DATA = bdr--0.8.0.sql \
+ bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql \
+ bdr--0.8.0--0.8.0.1.sql
+
+DATA_built = bdr--0.8.0.1.sql
+
DOCS = bdr.conf.sample README.bdr
SCRIPTS = scripts/bdr_initial_load bdr_init_copy
mkdir -p scripts
sed -e "s/BDR_VERSION/$(BDR_VERSION)/" -e "s/PG_VERSION/$(VERSION)/" $< > $@
+bdr--0.8.0.1.sql: bdr--0.8.0.sql bdr--0.8.0--0.8.0.1.sql
+ cat $^ > $@
+
all: all-lib bdr_init_copy
clean: additional-clean
rm -f config.status config.log
rm -f bdr_config.h.in Makefile
rm -f run_tests
+ rm -f bdr--0.8.0.1.sql
@rm -rf autom4te.cache/
# Disabled because these tests require "wal_level=logical", which
--- /dev/null
+CREATE TABLE bdr.bdr_replication_set_config
+(
+ set_name name PRIMARY KEY,
+ replicate_inserts bool NOT NULL DEFAULT true,
+ replicate_updates bool NOT NULL DEFAULT true,
+ replicate_deletes bool NOT NULL DEFAULT true
+);
+ALTER TABLE bdr.bdr_replication_set_config SET (user_catalog_table = true);
+REVOKE ALL ON TABLE bdr.bdr_replication_set_config FROM PUBLIC;
SET LOCAL search_path = bdr;
-- We must be able to use exclusion constraints for global sequences
-SET bdr.permit_unsafe_ddl_commands=true;
-
+SET bdr.permit_unsafe_ddl_commands = true;
+-- We don't want to replicate commands from in here
+SET bdr.skip_ddl_replication = true;
CREATE OR REPLACE FUNCTION bdr_version()
RETURNS TEXT
LANGUAGE C
END;
$$;
-
----
---- this should always be last to avoid replicating our internal schema
----
-
CREATE EVENT TRIGGER bdr_queue_ddl_commands
ON ddl_command_end
EXECUTE PROCEDURE bdr.bdr_queue_ddl_commands();
-SET bdr.permit_unsafe_ddl_commands = false;
+RESET bdr.permit_unsafe_ddl_commands;
+RESET bdr.skip_ddl_replication;
RESET search_path;
Oid BdrConflictHistoryRelId;
Oid BdrLocksRelid;
Oid BdrLocksByOwnerRelid;
+Oid BdrReplicationSetConfigRelid;
BdrConnectionConfig **bdr_connection_configs;
/* All databases for which BDR is configured, valid after _PG_init */
bdr_lookup_relid("bdr_global_locks", schema_oid);
BdrLocksByOwnerRelid =
bdr_lookup_relid("bdr_global_locks_byowner", schema_oid);
+ BdrReplicationSetConfigRelid =
+ bdr_lookup_relid("bdr_replication_set_config", schema_oid);
bdr_conflict_handlers_init();
# bdr extension
comment = 'bdr support functions'
-default_version = '0.8.0'
+default_version = '0.8.0.1'
module_pathname = '$libdir/bdr'
relocatable = false
requires = btree_gist
extern Oid BdrLocksRelid;
extern Oid BdrLocksByOwnerRelid;
+extern Oid BdrReplicationSetConfigRelid;
/* apply support */
extern void bdr_process_remote_action(StringInfo s);
BDRRelation *rel,
int num_replication_sets,
char **replication_sets);
+extern void BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid);
extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
extern void bdr_parse_database_options(const char *label);
StartTransactionCommand();
}
- schema_oid = get_namespace_oid("bdr", true);
- if (schema_oid == InvalidOid)
- {
- bdr_maintain_schema();
- schema_oid = get_namespace_oid("bdr", true);
- }
- data->bdr_schema_oid = schema_oid;
+ bdr_maintain_schema();
+
+ data->bdr_schema_oid = get_namespace_oid("bdr", true);
+ schema_oid = data->bdr_schema_oid;
+
if (schema_oid != InvalidOid)
{
data->bdr_conflict_handlers_reloid =
RelationGetRelid(r->rel) == data->bdr_locks_reloid)
return false;
+ /*
+ * Quite ugly, but there's no neat way right now: Flush replication set
+ * configuration from bdr's relcache.
+ */
+ if (RelationGetRelid(r->rel) == BdrReplicationSetConfigRelid)
+ BDRRelcacheHashInvalidateCallback(0, InvalidOid);
+
/* always replicate other stuff in the bdr schema */
if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
return true;
bdr.node2_dsn = 'dbname=regression'
bdr.node2_local_dbname = 'postgres'
-bdr.node2_replication_sets = 'default, important, for-node-2'
+bdr.node2_replication_sets = 'default, important, for-node-2, for-node-2-insert, for-node-2-update, for-node-2-delete'
}
}
-static void
+void
BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
{
HASH_SEQ_STATUS status;
return false;
}
+#include "access/genam.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+
+static HeapTuple
+replset_lookup(Relation rel, const char *cname)
+{
+ ScanKey key;
+ NameData name;
+ SysScanDesc scan;
+ HeapTuple tuple = NULL;
+
+ namestrcpy(&name, cname);
+
+ key = (ScanKey) palloc(sizeof(ScanKeyData) * 1);
+
+ ScanKeyInit(&key[0],
+ 1,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(&name)
+ );
+
+ /* FIXME: should use index */
+ scan = systable_beginscan(rel, 0, true, NULL, 1, key);
+
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ tuple = heap_copytuple(tuple);
+ break;
+ }
+
+ systable_endscan(scan);
+ return tuple;
+}
+
/*
* Compute whether modifications to this relation should be replicated or not
* and cache the result in the relation descriptor.
* a constant set of 'to be replicated' sets to be passed in - which happens
* to be what we need for logical decoding. As there really isn't another need
* for this functionality so far...
+ * Another reason restricting this to backends in decoding is that we
+ * currently can't invalidate the czache correctly otherwise.
*/
void
bdr_heap_compute_replication_settings(BDRRelation *r,
{
int i;
+ Assert(MyReplicationSlot); /* in decoding */
+
Assert(!r->computed_repl_valid);
/* Implicit "replicate everything" configuration */
*/
for (i = 0; i < conf_num_replication_sets; i++)
{
- const char* setname = conf_replication_sets[i];
+ Relation repl_sets;
+ HeapTuple tuple;
+ const char* setname;
+
+ setname = conf_replication_sets[i];
if (!relation_in_replication_set(r, setname))
continue;
- /*
- * In the future we'll lookup configuration for individual sets here.
- */
- r->computed_repl_insert = true;
- r->computed_repl_update = true;
- r->computed_repl_delete = true;
+ repl_sets = heap_open(BdrReplicationSetConfigRelid, AccessShareLock);
+ tuple = replset_lookup(repl_sets, setname);
+
+ if (tuple != NULL)
+ {
+ bool isnull;
+ TupleDesc desc = RelationGetDescr(repl_sets);
+
+ if (DatumGetBool(fastgetattr(tuple, 2, desc, &isnull)))
+ r->computed_repl_insert = true;
+
+ if (DatumGetBool(fastgetattr(tuple, 3, desc, &isnull)))
+ r->computed_repl_update = true;
+
+ if (DatumGetBool(fastgetattr(tuple, 4, desc, &isnull)))
+ r->computed_repl_delete = true;
+
+ pfree(tuple);
+ }
+ else
+ {
+ r->computed_repl_insert = true;
+ r->computed_repl_update = true;
+ r->computed_repl_delete = true;
+ }
+
+ heap_close(repl_sets, AccessShareLock);
/* no need to look any further, we replicate everything */
if (r->computed_repl_insert &&
should-replicate-via-for-node-2-even-though-unknown
(4 rows)
+\c regression
+DROP TABLE settest_1;
+/*
+ * Now test configurations where only some actions are replicated.
+ */
+CREATE TABLE settest_2(data text primary key);
+-- Test 1: ensure that inserts are replicated while update/delete are filtered
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}');
+ table_set_replication_sets
+----------------------------
+
+(1 row)
+
+INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes)
+VALUES ('for-node-2-insert', true, false, false),
+ ('for-node-2-update', false, true, false),
+ ('for-node-2-delete', false, false, true);
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete');
+UPDATE settest_2
+SET data = 'repl-insert--insert-#2-update'
+WHERE data = 'repl-insert--insert-#2-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-insert--insert-#3-then-delete';
+-- Test 2: ensure that updates are replicated while inserts/deletes are filtered
+-- insert before filtering
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}');
+ table_set_replication_sets
+----------------------------
+
+(1 row)
+
+UPDATE settest_2
+SET data = 'repl-update--insert-#1-update'
+WHERE data = 'repl-update--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-update--insert-#2-then-delete';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3');
+-- verify that changing the pg_replication_sets row has effects
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = true
+WHERE set_name = 'for-node-2-update';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4');
+-- but reset to normal meaning afterwards
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = false
+WHERE set_name = 'for-node-2-update';
+-- Test 3: ensure that deletes are replicated while inserts/updates are filtered
+-- insert before filtering
+SELECT bdr.table_set_replication_sets('settest_2', NULL);
+ table_set_replication_sets
+----------------------------
+
+(1 row)
+
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}');
+ table_set_replication_sets
+----------------------------
+
+(1 row)
+
+UPDATE settest_2
+SET data = 'repl-delete--insert-#1-update'
+WHERE data = 'repl-delete--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-delete--insert-#2-then-delete';
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3');
+-- Test 4: ensure that all partial sets together replicate everything
+SELECT bdr.table_set_replication_sets('settest_2',
+ '{for-node-2-insert,for-node-2-update,for-node-2-delete}');
+ table_set_replication_sets
+----------------------------
+
+(1 row)
+
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete');
+UPDATE settest_2
+SET data = 'repl-combined--insert-#1-update'
+WHERE data = 'repl-combined--insert-#1-then-update';
+DELETE FROM settest_2
+WHERE data = 'repl-combined--insert-#2-then-delete';
+SELECT * FROM settest_2 ORDER BY data;
+ data
+---------------------------------
+ repl-combined--insert-#1-update
+ repl-delete--insert-#1-update
+ repl-delete--insert-#3
+ repl-insert--insert-#1
+ repl-insert--insert-#2-update
+ repl-update--insert-#1-update
+ repl-update--insert-#3
+ repl-update--insert-#4
+(8 rows)
+
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
+ pg_xlog_wait_remote_apply
+---------------------------
+
+
+(2 rows)
+
+\c postgres
+SELECT * FROM settest_2 ORDER BY data;
+ data
+------------------------------------
+ repl-combined--insert-#1-update
+ repl-delete--insert-#1-then-update
+ repl-insert--insert-#1
+ repl-insert--insert-#2-then-update
+ repl-insert--insert-#3-then-delete
+ repl-update--insert-#1-update
+ repl-update--insert-#2-then-delete
+ repl-update--insert-#4
+(8 rows)
+
+\c regression
SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
\c postgres
SELECT * FROM settest_1 ORDER BY data;
+\c regression
+DROP TABLE settest_1;
+
+
+/*
+ * Now test configurations where only some actions are replicated.
+ */
+CREATE TABLE settest_2(data text primary key);
+
+-- Test 1: ensure that inserts are replicated while update/delete are filtered
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-insert}');
+INSERT INTO bdr.bdr_replication_set_config(set_name, replicate_inserts, replicate_updates, replicate_deletes)
+VALUES ('for-node-2-insert', true, false, false),
+ ('for-node-2-update', false, true, false),
+ ('for-node-2-delete', false, false, true);
+
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#1');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#2-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-insert--insert-#3-then-delete');
+UPDATE settest_2
+SET data = 'repl-insert--insert-#2-update'
+WHERE data = 'repl-insert--insert-#2-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-insert--insert-#3-then-delete';
+
+-- Test 2: ensure that updates are replicated while inserts/deletes are filtered
+-- insert before filtering
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#2-then-delete');
+
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-update}');
+
+UPDATE settest_2
+SET data = 'repl-update--insert-#1-update'
+WHERE data = 'repl-update--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-update--insert-#2-then-delete';
+
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#3');
+
+
+-- verify that changing the pg_replication_sets row has effects
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = true
+WHERE set_name = 'for-node-2-update';
+INSERT INTO settest_2(data) VALUES ('repl-update--insert-#4');
+
+-- but reset to normal meaning afterwards
+UPDATE bdr.bdr_replication_set_config
+SET replicate_inserts = false
+WHERE set_name = 'for-node-2-update';
+
+
+-- Test 3: ensure that deletes are replicated while inserts/updates are filtered
+-- insert before filtering
+SELECT bdr.table_set_replication_sets('settest_2', NULL);
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#2-then-delete');
+SELECT bdr.table_set_replication_sets('settest_2', '{for-node-2-delete}');
+
+UPDATE settest_2
+SET data = 'repl-delete--insert-#1-update'
+WHERE data = 'repl-delete--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-delete--insert-#2-then-delete';
+
+INSERT INTO settest_2(data) VALUES ('repl-delete--insert-#3');
+
+
+-- Test 4: ensure that all partial sets together replicate everything
+SELECT bdr.table_set_replication_sets('settest_2',
+ '{for-node-2-insert,for-node-2-update,for-node-2-delete}');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#1-then-update');
+INSERT INTO settest_2(data) VALUES ('repl-combined--insert-#2-then-delete');
+
+UPDATE settest_2
+SET data = 'repl-combined--insert-#1-update'
+WHERE data = 'repl-combined--insert-#1-then-update';
+
+DELETE FROM settest_2
+WHERE data = 'repl-combined--insert-#2-then-delete';
+
+SELECT * FROM settest_2 ORDER BY data;
+SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;
+\c postgres
+SELECT * FROM settest_2 ORDER BY data;
+\c regression