bdr: Initial implementation of replication sets.
authorAndres Freund <andres@anarazel.de>
Mon, 20 Oct 2014 07:19:12 +0000 (09:19 +0200)
committerAndres Freund <andres@anarazel.de>
Mon, 20 Oct 2014 07:19:12 +0000 (09:19 +0200)
Tables can be added to replication sets using
SELECT bdr.table_set_replication_sets('do_replicate_tbl', '{do_replicate}');
and viewed
SELECT bdr.table_get_replication_sets('do_replicate_tbl');

Nodes by default replicate all replication sets, but can be configured
to only consume specific ones using the bdr.%s_replication_sets GUC.

Tests are sorely needed.

12 files changed:
Makefile.in
bdr--0.7.1--0.8.0.sql [new file with mode: 0644]
bdr--0.8.0.sql [moved from bdr--0.7.1.sql with 91% similarity]
bdr.c
bdr.control
bdr.h
bdr_commandfilter.c
bdr_internal.h
bdr_label.c [new file with mode: 0644]
bdr_label.h [new file with mode: 0644]
bdr_output.c
bdr_relcache.c

index 4e9d7d87ca69e32a92f13bbf5e9c45ff9637629b..55683d7a55ec2169db96d17bc2c57b94c08ea84c 100644 (file)
@@ -5,7 +5,7 @@
 MODULE_big = bdr
 
 EXTENSION = bdr
-DATA = bdr--0.7.1.sql bdr--0.7--0.7.1.sql
+DATA = bdr--0.8.0.sql bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql
 DOCS = bdr.conf.sample README.bdr
 SCRIPTS = scripts/bdr_initial_load bdr_init_copy
 
@@ -23,6 +23,7 @@ OBJS = \
    bdr_count.o \
    bdr_executor.o \
    bdr_init_replica.o \
+   bdr_label.o \
    bdr_locks.o \
    bdr_output.o \
    bdr_relcache.o \
diff --git a/bdr--0.7.1--0.8.0.sql b/bdr--0.7.1--0.8.0.sql
new file mode 100644 (file)
index 0000000..8679dc6
--- /dev/null
@@ -0,0 +1,56 @@
+CREATE OR REPLACE FUNCTION bdr.table_get_replication_sets(relation regclass, OUT sets text[])
+  VOLATILE
+  STRICT
+  LANGUAGE 'sql'
+  AS $$
+    SELECT
+        ARRAY(
+            SELECT *
+            FROM json_array_elements_text(COALESCE((
+                SELECT label::json->'sets'
+                FROM pg_seclabel
+                WHERE provider = 'bdr'
+                     AND classoid = 'pg_class'::regclass
+                     AND objoid = $1::regclass
+                ), '["default"]'))
+        )|| '{all}';
+$$;
+
+
+CREATE OR REPLACE FUNCTION bdr.table_set_replication_sets(p_relation regclass, p_sets text[])
+  RETURNS void
+  VOLATILE
+  LANGUAGE 'plpgsql'
+  AS $$
+DECLARE
+    v_label json;
+BEGIN
+    -- emulate STRICT for p_relation parameter
+    IF p_relation IS NULL THEN
+        RETURN;
+    END IF;
+
+    -- query current label
+    SELECT label::json INTO v_label
+    FROM pg_seclabel
+    WHERE provider = 'bdr'
+        AND classoid = 'pg_class'::regclass
+        AND objoid = p_relation;
+
+    -- replace old 'sets' parameter with new value
+    SELECT json_object_agg(key, value) INTO v_label
+    FROM (
+        SELECT key, value
+        FROM json_each(v_label)
+        WHERE key <> 'sets'
+      UNION ALL
+        SELECT
+            'sets', to_json(p_sets)
+        WHERE p_sets IS NOT NULL
+    ) d;
+
+    -- and now set the appropriate label
+    EXECUTE format('SECURITY LABEL FOR bdr ON TABLE %I IS %L',
+                   p_relation, v_label) ;
+END;
+$$;
similarity index 91%
rename from bdr--0.7.1.sql
rename to bdr--0.8.0.sql
index da797e164a5619ad0c71c02233efa5b3938f9614..f809917aa995a273a069da70bc67d9f3ed0f8edb 100644 (file)
@@ -471,6 +471,66 @@ LANGUAGE C
 AS 'MODULE_PATHNAME'
 ;
 
+---
+--- Funtions for manipulating/displaying replications sets
+---
+CREATE OR REPLACE FUNCTION bdr.table_get_replication_sets(relation regclass, OUT sets text[])
+  VOLATILE
+  STRICT
+  LANGUAGE 'sql'
+  AS $$
+    SELECT
+        ARRAY(
+            SELECT *
+            FROM json_array_elements_text(COALESCE((
+                SELECT label::json->'sets'
+                FROM pg_seclabel
+                WHERE provider = 'bdr'
+                     AND classoid = 'pg_class'::regclass
+                     AND objoid = $1::regclass
+                ), '["default"]'))
+        )|| '{all}';
+$$;
+
+CREATE OR REPLACE FUNCTION bdr.table_set_replication_sets(p_relation regclass, p_sets text[])
+  RETURNS void
+  VOLATILE
+  LANGUAGE 'plpgsql'
+  AS $$
+DECLARE
+    v_label json;
+BEGIN
+    -- emulate STRICT for p_relation parameter
+    IF p_relation IS NULL THEN
+        RETURN;
+    END IF;
+
+    -- query current label
+    SELECT label::json INTO v_label
+    FROM pg_seclabel
+    WHERE provider = 'bdr'
+        AND classoid = 'pg_class'::regclass
+        AND objoid = p_relation;
+
+    -- replace old 'sets' parameter with new value
+    SELECT json_object_agg(key, value) INTO v_label
+    FROM (
+        SELECT key, value
+        FROM json_each(v_label)
+        WHERE key <> 'sets'
+      UNION ALL
+        SELECT
+            'sets', to_json(p_sets)
+        WHERE p_sets IS NOT NULL
+    ) d;
+
+    -- and now set the appropriate label
+    EXECUTE format('SECURITY LABEL FOR bdr ON TABLE %I IS %L',
+                   p_relation, v_label) ;
+END;
+$$;
+
+
 ---
 --- this should always be last to avoid replicating our internal schema
 ---
diff --git a/bdr.c b/bdr.c
index 43e1b11b501b4fa72a0b8a5ec1cd23d3186a9766..0dd495ba88c2404bd9871cc0874c3a1bf3d7919a 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -16,6 +16,7 @@
 
 #include "bdr.h"
 #include "bdr_locks.h"
+#include "bdr_label.h"
 
 #include "libpq-fe.h"
 #include "funcapi.h"
@@ -574,6 +575,12 @@ bdr_apply_main(Datum main_arg)
    appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
    appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
    appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
+   appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+   if (bdr_apply_config->replication_sets != NULL &&
+       bdr_apply_config->replication_sets[0] != 0)
+       appendStringInfo(&query, ", replication_sets '%s'",
+                        bdr_apply_config->replication_sets);
+
    appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
    if (bdr_apply_worker->forward_changesets)
        appendStringInfo(&query, ", forward_changesets 't'");
@@ -657,6 +664,7 @@ bdr_create_con_gucs(char  *name,
    char       *optname_replica = palloc(strlen(name) + 30);
    char       *optname_local_dsn = palloc(strlen(name) + 30);
    char       *optname_local_dbname = palloc(strlen(name) + 30);
+   char       *optname_replication_sets = palloc(strlen(name) + 30);
 
    Assert(process_shared_preload_libraries_in_progress);
 
@@ -722,6 +730,15 @@ bdr_create_con_gucs(char  *name,
                               GUC_NOT_IN_SAMPLE,
                               NULL, NULL, NULL);
 
+   sprintf(optname_replication_sets, "bdr.%s_replication_sets", name);
+   DefineCustomStringVariable(optname_replication_sets,
+                              optname_replication_sets,
+                              NULL,
+                              &opts->replication_sets,
+                              NULL, PGC_POSTMASTER,
+                              GUC_LIST_INPUT | GUC_LIST_QUOTE,
+                              NULL, NULL, NULL);
+
 
    if (!opts->dsn)
    {
@@ -966,7 +983,7 @@ bdr_perdb_worker_main(Datum main_arg)
     */
    foreach(c, apply_workers)
    {
-       BackgroundWorkerHandle *h = (BackgroundWorkerHandle*)lfirst(c);
+       BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
        pfree(h);
    }
 
@@ -1523,6 +1540,7 @@ _PG_init(void)
                             PGC_BACKEND,
                             0,
                             NULL, NULL, NULL);
+   bdr_label_init();
 
    /* if nothing is configured, we're done */
    if (connections == NULL)
index 62657cd73c0c2b9cb1e1e5306a323ac21ef5a2f4..2b35a48ccb70a82ba907981320867bae284b7baf 100644 (file)
@@ -1,6 +1,6 @@
 # bdr extension
 comment = 'bdr support functions'
-default_version = '0.7.1'
+default_version = '0.8.0'
 module_pathname = '$libdir/bdr'
 relocatable = false
 requires = btree_gist
diff --git a/bdr.h b/bdr.h
index 8cc95f404591f79a10fa00b6345130e9da3ebeb2..a4fc9c07292cdcbe9a558cefd2589f31e67db826 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -112,6 +112,10 @@ typedef struct BDRRelation
 
    BDRConflictHandler *conflict_handlers;
    size_t      conflict_handlers_len;
+
+   /* ordered list of replication sets of length num_* */
+   char          **replication_sets;
+   int     num_replication_sets;
 } BDRRelation;
 
 typedef struct BDRTupleData
@@ -386,6 +390,10 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
 extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode);
 extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode);
 
+extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
+extern void bdr_parse_database_options(const char *label);
+
+
 /* conflict handlers API */
 extern void bdr_conflict_handlers_init(void);
 
index fabb2c6a1d389e07348c76581db8339609ef3cd9..600a8d660e31eaef4bd4856eee1618947ef483fc 100644 (file)
@@ -713,9 +713,16 @@ bdr_commandfilter(Node *parsetree,
            break;
 
        case T_SecLabelStmt:
-           error_unsupported_command(CreateCommandTag(parsetree));
-           break;
+           {
+               SecLabelStmt *sstmt;
+               sstmt = (SecLabelStmt *) parsetree;
 
+               if (sstmt->provider == NULL ||
+                   strcmp(sstmt->provider, "bdr") == 0)
+                   break;
+               error_unsupported_command(CreateCommandTag(parsetree));
+               break;
+           }
        default:
            elog(ERROR, "unrecognized node type: %d",
                 (int) nodeTag(parsetree));
index 3c392e15d8e872d9435815ff4943093b3b102182..27968f5050e60b6d602c0845ef04ed40a50fc352 100644 (file)
@@ -24,12 +24,15 @@ typedef struct BdrConnectionConfig
    int   apply_delay;
    bool  init_replica;
    char *replica_local_dsn;
+   char *replication_sets;
+
    /*
     * These aren't technically GUCs, but are per-connection config
     * information obtained from the GUCs.
     */
    char *name;
    char *dbname;
+
    /* Connection config might be broken (blank dsn, etc) */
    bool is_valid;
 } BdrConnectionConfig;
diff --git a/bdr_label.c b/bdr_label.c
new file mode 100644 (file)
index 0000000..139a83b
--- /dev/null
@@ -0,0 +1,56 @@
+/* -------------------------------------------------------------------------
+ *
+ * bdr_label.c
+ *
+ * Provide object metadata for bdr using the security label
+ * infrastructure.
+ *
+ * Copyright (c) 2014, PostgreSQL Global Development Group
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_label.h"
+
+#include "commands/seclabel.h"
+#include "miscadmin.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+
+static void bdr_object_relabel(const ObjectAddress *object, const char *seclabel);
+
+/*
+ * Needs to call at postmaster init (or backend init for EXEC_BACKEND).
+ */
+void
+bdr_label_init(void)
+{
+   /* Security label provider hook */
+   register_label_provider("bdr", bdr_object_relabel);
+}
+
+static void
+bdr_object_relabel(const ObjectAddress *object, const char *seclabel)
+{
+   switch (object->classId)
+   {
+       case RelationRelationId:
+
+           if (!pg_class_ownercheck(object->objectId, GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                              get_rel_name(object->objectId));
+
+           if (seclabel != NULL)
+               bdr_parse_relation_options(seclabel, NULL);
+
+           CacheInvalidateRelcacheByRelid(object->objectId);
+           break;
+       default:
+           elog(ERROR, "unsupported object type: %s",
+                getObjectDescription(object));
+           break;
+   }
+}
diff --git a/bdr_label.h b/bdr_label.h
new file mode 100644 (file)
index 0000000..21dcaa0
--- /dev/null
@@ -0,0 +1,11 @@
+/*
+ * bdr_label.h
+ *
+ * BiDirectionalReplication
+ *
+ * Copyright (c) 2014, PostgreSQL Global Development Group
+ *
+ * bdr_label.h
+ */
+
+extern void bdr_label_init(void);
index 9cd273e97afbc414544113342bb4bf8a123a9da3..016150d8562b4c6eea64fc88b15ebd6103a32655 100644 (file)
@@ -72,8 +72,12 @@ typedef struct
    bool client_float8_byval;
    bool client_int_datetime;
    char *client_db_encoding;
+   Oid bdr_schema_oid;
    Oid bdr_conflict_handlers_reloid;
    Oid bdr_locks_reloid;
+
+   int num_replication_sets;
+   char **replication_sets;
 } BdrOutputData;
 
 /* These must be available to pg_dlsym() */
@@ -161,6 +165,33 @@ bdr_parse_bool(DefElem *elem, bool *res)
                        strVal(elem->arg), elem->defname)));
 }
 
+static void
+bdr_parse_identifier_list_arr(DefElem *elem, char ***list, int *len)
+{
+   List       *namelist;
+   ListCell   *c;
+
+   bdr_parse_notnull(elem, "list");
+
+   if (!SplitIdentifierString(pstrdup(strVal(elem->arg)),
+                             ',', &namelist))
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                errmsg("could not identifier list value \"%s\" for parameter \"%s\": %m",
+                       strVal(elem->arg), elem->defname)));
+   }
+
+   *len = 0;
+   *list = palloc(list_length(namelist) * sizeof(char *));
+
+   foreach(c, namelist)
+   {
+       (*list)[(*len)++] = pstrdup(lfirst(c));
+   }
+   list_free(namelist);
+}
+
 static void
 bdr_req_param(const char *param)
 {
@@ -299,6 +330,9 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
 
    data->bdr_conflict_handlers_reloid = InvalidOid;
    data->bdr_locks_reloid = InvalidOid;
+   data->bdr_schema_oid = InvalidOid;
+
+   data->num_replication_sets = -1;
 
    /* parse options passed in by the client */
 
@@ -336,6 +370,14 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
            data->client_db_encoding = pstrdup(strVal(elem->arg));
        else if (strcmp(elem->defname, "forward_changesets") == 0)
            bdr_parse_bool(elem, &data->forward_changesets);
+       else if (strcmp(elem->defname, "replication_sets") == 0)
+       {
+           bdr_parse_identifier_list_arr(elem,
+                                         &data->replication_sets,
+                                         &data->num_replication_sets);
+           qsort(data->replication_sets, data->num_replication_sets,
+                 sizeof(char *), pg_qsort_strcmp);
+       }
        else
        {
            ereport(ERROR,
@@ -438,6 +480,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
        }
 
        schema_oid = get_namespace_oid("bdr", true);
+       data->bdr_schema_oid = schema_oid;
        if (schema_oid != InvalidOid)
        {
            data->bdr_conflict_handlers_reloid =
@@ -473,10 +516,54 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
  * to the client unless we're in changeset forwarding mode.
  */
 static inline bool
-should_forward_changeset(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+should_forward_changeset(LogicalDecodingContext *ctx, BdrOutputData *data,
+                        ReorderBufferTXN *txn)
 {
-   return (txn->origin_id == InvalidRepNodeId)
-          || ((BdrOutputData*)ctx->output_plugin_private)->forward_changesets;
+   return txn->origin_id == InvalidRepNodeId || data->forward_changesets;
+}
+
+static inline bool
+should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
+                     BDRRelation *r)
+{
+   int i, j;
+
+   /* internal bdr relations that may not be replicated */
+   if(RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
+      RelationGetRelid(r->rel) == data->bdr_locks_reloid)
+       return false;
+
+   /* always replicate other stuff in the bdr schema */
+   if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
+       return true;
+
+   /* no explicit configuration */
+   if (data->num_replication_sets == -1 ||
+       r->num_replication_sets == -1)
+   {
+       return true;
+   }
+
+
+   /*
+    * Compare the two ordered list of replication sets and find overlapping
+    * elements.
+    */
+   i = j = 0;
+   while (i < data->num_replication_sets && j < r->num_replication_sets)
+   {
+       int cmp = strcmp(data->replication_sets[i],
+                        r->replication_sets[j]);
+
+       if (cmp < 0)
+           i++;
+       else if (cmp == 0)
+           return true;
+       else
+           j++;
+   }
+
+   return false;
 }
 
 /*
@@ -493,7 +580,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
    AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
 
-   if (!should_forward_changeset(ctx, txn))
+   if (!should_forward_changeset(ctx, data, txn))
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -557,13 +644,11 @@ void
 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                     XLogRecPtr commit_lsn)
 {
-#ifdef NOT_YET
    BdrOutputData *data = ctx->output_plugin_private;
-#endif
 
    int flags = 0;
 
-   if (!should_forward_changeset(ctx, txn))
+   if (!should_forward_changeset(ctx, data, txn))
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -586,17 +671,19 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
    BdrOutputData *data;
    MemoryContext old;
+   BDRRelation *bdr_relation;
+
+   bdr_relation = bdr_heap_open(RelationGetRelid(relation), NoLock);
 
    data = ctx->output_plugin_private;
 
    /* Avoid leaking memory by using and resetting our own context */
    old = MemoryContextSwitchTo(data->context);
 
-   if (!should_forward_changeset(ctx, txn))
+   if (!should_forward_changeset(ctx, data, txn))
        return;
 
-   if(RelationGetRelid(relation) == data->bdr_conflict_handlers_reloid ||
-      RelationGetRelid(relation) == data->bdr_locks_reloid)
+   if (!should_forward_change(ctx, data, bdr_relation))
        return;
 
    OutputPluginPrepareWrite(ctx, true);
@@ -641,6 +728,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
    MemoryContextSwitchTo(old);
    MemoryContextReset(data->context);
+
+   bdr_heap_close(bdr_relation, NoLock);
 }
 
 /*
index 067f1bccb10e530da623d7c79c48a75125b80c0b..c425926299171db650edccdb56c5e529a7fb7059 100644 (file)
 #include "access/heapam.h"
 #include "access/xact.h"
 
+#include "commands/seclabel.h"
+
 #include "utils/catcache.h"
 #include "utils/inval.h"
 
 static HTAB *BDRRelcacheHash = NULL;
 
+static void
+BDRRelcacheHashInvalidateEntry(BDRRelation *entry)
+{
+   int i;
+
+   if (entry->conflict_handlers)
+       pfree(entry->conflict_handlers);
+
+   if (entry->num_replication_sets > 0)
+   {
+       for (i = 0; i < entry->num_replication_sets; i++)
+           pfree(entry->replication_sets[i]);
+
+       pfree(entry->replication_sets);
+   }
+}
+
 static void
 BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
 {
@@ -43,21 +62,23 @@ BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
 
        while ((entry = (BDRRelation *) hash_seq_search(&status)) != NULL)
        {
-           if (entry->conflict_handlers)
-               pfree(entry->conflict_handlers);
+           BDRRelcacheHashInvalidateEntry(entry);
 
-           if (hash_search(BDRRelcacheHash, (void *) &entry->reloid,
+           if (hash_search(BDRRelcacheHash, &entry->reloid,
                            HASH_REMOVE, NULL) == NULL)
                elog(ERROR, "hash table corrupted");
        }
    }
    else
    {
-       entry = hash_search(BDRRelcacheHash, (void *) &relid,
-                           HASH_REMOVE, NULL);
+       if ((entry = hash_search(BDRRelcacheHash, &relid,
+                                HASH_FIND, NULL)) != NULL)
+       {
+           BDRRelcacheHashInvalidateEntry(entry);
 
-       if (entry && entry->conflict_handlers)
-           pfree(entry->conflict_handlers);
+           hash_search(BDRRelcacheHash, &relid,
+                       HASH_REMOVE, NULL);
+       }
    }
 }
 
@@ -85,12 +106,97 @@ bdr_initialize_cache()
                                  (Datum) 0);
 }
 
+#include "utils/jsonapi.h"
+#include "utils/json.h"
+#include "utils/jsonb.h"
+
+void
+bdr_parse_relation_options(const char *label, BDRRelation *rel)
+{
+   JsonbIterator *it;
+   JsonbValue  v;
+   int         r;
+   bool        parsing_sets = false;
+   int         level = 0;
+   Jsonb   *data = NULL;
+
+   if (label == NULL)
+       return;
+
+   data = DatumGetJsonb(
+       DirectFunctionCall1(jsonb_in, CStringGetDatum(label)));
+
+   if (!JB_ROOT_IS_OBJECT(data))
+       elog(ERROR, "root needs to be an object");
+
+   it = JsonbIteratorInit(&data->root);
+   while ((r = JsonbIteratorNext(&it, &v, false)) != WJB_DONE)
+   {
+       if (level == 0 && r != WJB_BEGIN_OBJECT)
+           elog(ERROR, "root element needs to be an object");
+       else if (level == 0 && it->nElems > 1)
+           elog(ERROR, "only 'sets' allowed on root level");
+       else if (level == 1 && r == WJB_KEY)
+       {
+           if (strncmp(v.val.string.val, "sets", v.val.string.len) != 0)
+               elog(ERROR, "unexpected key: %s",
+                    pnstrdup(v.val.string.val, v.val.string.len));
+           parsing_sets = true;
+       }
+       else if (r == WJB_BEGIN_ARRAY || r == WJB_BEGIN_OBJECT)
+       {
+           if (parsing_sets && rel != NULL)
+           {
+               rel->replication_sets =
+                   MemoryContextAlloc(CacheMemoryContext,
+                                      sizeof(char *) * it->nElems);
+           }
+           level++;
+       }
+       else if (r == WJB_END_ARRAY || r == WJB_END_OBJECT)
+       {
+           level--;
+           parsing_sets = false;
+       }
+       else if (parsing_sets)
+       {
+           char *setname;
+
+           if (r != WJB_ELEM)
+               elog(ERROR, "unexpected element type %u", r);
+           if (level != 2)
+               elog(ERROR, "unexpected level for set %d", level);
+
+           if (rel != NULL)
+           {
+               MemoryContext oldcontext;
+
+               oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+               setname = pnstrdup(v.val.string.val, v.val.string.len);
+               rel->replication_sets[rel->num_replication_sets++] = setname;
+               MemoryContextSwitchTo(oldcontext);
+           }
+       }
+       else
+           elog(ERROR, "unexpected content: %u at level %d", r, level);
+   }
+
+   if (rel != NULL && rel->num_replication_sets > 0)
+   {
+           qsort(rel->replication_sets, rel->num_replication_sets,
+                 sizeof(char *), pg_qsort_strcmp);
+   }
+
+}
+
 BDRRelation *
 bdr_heap_open(Oid reloid, LOCKMODE lockmode)
 {
    BDRRelation *entry;
    bool        found;
    Relation    rel;
+   ObjectAddress object;
+   const char *label;
 
    rel = heap_open(reloid, lockmode);
 
@@ -116,6 +222,13 @@ bdr_heap_open(Oid reloid, LOCKMODE lockmode)
    entry->reloid = reloid;
    entry->rel = rel;
 
+   object.classId = RelationRelationId;
+   object.objectId = reloid;
+   object.objectSubId = 0;
+
+   label = GetSecurityLabel(&object, "bdr");
+   bdr_parse_relation_options(label, entry);
+
    return entry;
 }