MODULE_big = bdr
EXTENSION = bdr
-DATA = bdr--0.7.1.sql bdr--0.7--0.7.1.sql
+DATA = bdr--0.8.0.sql bdr--0.7--0.7.1.sql bdr--0.7.1--0.8.0.sql
DOCS = bdr.conf.sample README.bdr
SCRIPTS = scripts/bdr_initial_load bdr_init_copy
bdr_count.o \
bdr_executor.o \
bdr_init_replica.o \
+ bdr_label.o \
bdr_locks.o \
bdr_output.o \
bdr_relcache.o \
--- /dev/null
+CREATE OR REPLACE FUNCTION bdr.table_get_replication_sets(relation regclass, OUT sets text[])
+ VOLATILE
+ STRICT
+ LANGUAGE 'sql'
+ AS $$
+ SELECT
+ ARRAY(
+ SELECT *
+ FROM json_array_elements_text(COALESCE((
+ SELECT label::json->'sets'
+ FROM pg_seclabel
+ WHERE provider = 'bdr'
+ AND classoid = 'pg_class'::regclass
+ AND objoid = $1::regclass
+ ), '["default"]'))
+ )|| '{all}';
+$$;
+
+
+CREATE OR REPLACE FUNCTION bdr.table_set_replication_sets(p_relation regclass, p_sets text[])
+ RETURNS void
+ VOLATILE
+ LANGUAGE 'plpgsql'
+ AS $$
+DECLARE
+ v_label json;
+BEGIN
+ -- emulate STRICT for p_relation parameter
+ IF p_relation IS NULL THEN
+ RETURN;
+ END IF;
+
+ -- query current label
+ SELECT label::json INTO v_label
+ FROM pg_seclabel
+ WHERE provider = 'bdr'
+ AND classoid = 'pg_class'::regclass
+ AND objoid = p_relation;
+
+ -- replace old 'sets' parameter with new value
+ SELECT json_object_agg(key, value) INTO v_label
+ FROM (
+ SELECT key, value
+ FROM json_each(v_label)
+ WHERE key <> 'sets'
+ UNION ALL
+ SELECT
+ 'sets', to_json(p_sets)
+ WHERE p_sets IS NOT NULL
+ ) d;
+
+ -- and now set the appropriate label
+ EXECUTE format('SECURITY LABEL FOR bdr ON TABLE %I IS %L',
+ p_relation, v_label) ;
+END;
+$$;
AS 'MODULE_PATHNAME'
;
+---
+--- Funtions for manipulating/displaying replications sets
+---
+CREATE OR REPLACE FUNCTION bdr.table_get_replication_sets(relation regclass, OUT sets text[])
+ VOLATILE
+ STRICT
+ LANGUAGE 'sql'
+ AS $$
+ SELECT
+ ARRAY(
+ SELECT *
+ FROM json_array_elements_text(COALESCE((
+ SELECT label::json->'sets'
+ FROM pg_seclabel
+ WHERE provider = 'bdr'
+ AND classoid = 'pg_class'::regclass
+ AND objoid = $1::regclass
+ ), '["default"]'))
+ )|| '{all}';
+$$;
+
+CREATE OR REPLACE FUNCTION bdr.table_set_replication_sets(p_relation regclass, p_sets text[])
+ RETURNS void
+ VOLATILE
+ LANGUAGE 'plpgsql'
+ AS $$
+DECLARE
+ v_label json;
+BEGIN
+ -- emulate STRICT for p_relation parameter
+ IF p_relation IS NULL THEN
+ RETURN;
+ END IF;
+
+ -- query current label
+ SELECT label::json INTO v_label
+ FROM pg_seclabel
+ WHERE provider = 'bdr'
+ AND classoid = 'pg_class'::regclass
+ AND objoid = p_relation;
+
+ -- replace old 'sets' parameter with new value
+ SELECT json_object_agg(key, value) INTO v_label
+ FROM (
+ SELECT key, value
+ FROM json_each(v_label)
+ WHERE key <> 'sets'
+ UNION ALL
+ SELECT
+ 'sets', to_json(p_sets)
+ WHERE p_sets IS NOT NULL
+ ) d;
+
+ -- and now set the appropriate label
+ EXECUTE format('SECURITY LABEL FOR bdr ON TABLE %I IS %L',
+ p_relation, v_label) ;
+END;
+$$;
+
+
---
--- this should always be last to avoid replicating our internal schema
---
#include "bdr.h"
#include "bdr_locks.h"
+#include "bdr_label.h"
#include "libpq-fe.h"
#include "funcapi.h"
appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
+ appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+ if (bdr_apply_config->replication_sets != NULL &&
+ bdr_apply_config->replication_sets[0] != 0)
+ appendStringInfo(&query, ", replication_sets '%s'",
+ bdr_apply_config->replication_sets);
+
appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
if (bdr_apply_worker->forward_changesets)
appendStringInfo(&query, ", forward_changesets 't'");
char *optname_replica = palloc(strlen(name) + 30);
char *optname_local_dsn = palloc(strlen(name) + 30);
char *optname_local_dbname = palloc(strlen(name) + 30);
+ char *optname_replication_sets = palloc(strlen(name) + 30);
Assert(process_shared_preload_libraries_in_progress);
GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
+ sprintf(optname_replication_sets, "bdr.%s_replication_sets", name);
+ DefineCustomStringVariable(optname_replication_sets,
+ optname_replication_sets,
+ NULL,
+ &opts->replication_sets,
+ NULL, PGC_POSTMASTER,
+ GUC_LIST_INPUT | GUC_LIST_QUOTE,
+ NULL, NULL, NULL);
+
if (!opts->dsn)
{
*/
foreach(c, apply_workers)
{
- BackgroundWorkerHandle *h = (BackgroundWorkerHandle*)lfirst(c);
+ BackgroundWorkerHandle *h = (BackgroundWorkerHandle *) lfirst(c);
pfree(h);
}
PGC_BACKEND,
0,
NULL, NULL, NULL);
+ bdr_label_init();
/* if nothing is configured, we're done */
if (connections == NULL)
# bdr extension
comment = 'bdr support functions'
-default_version = '0.7.1'
+default_version = '0.8.0'
module_pathname = '$libdir/bdr'
relocatable = false
requires = btree_gist
BDRConflictHandler *conflict_handlers;
size_t conflict_handlers_len;
+
+ /* ordered list of replication sets of length num_* */
+ char **replication_sets;
+ int num_replication_sets;
} BDRRelation;
typedef struct BDRTupleData
extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode);
extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode);
+extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
+extern void bdr_parse_database_options(const char *label);
+
+
/* conflict handlers API */
extern void bdr_conflict_handlers_init(void);
break;
case T_SecLabelStmt:
- error_unsupported_command(CreateCommandTag(parsetree));
- break;
+ {
+ SecLabelStmt *sstmt;
+ sstmt = (SecLabelStmt *) parsetree;
+ if (sstmt->provider == NULL ||
+ strcmp(sstmt->provider, "bdr") == 0)
+ break;
+ error_unsupported_command(CreateCommandTag(parsetree));
+ break;
+ }
default:
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(parsetree));
int apply_delay;
bool init_replica;
char *replica_local_dsn;
+ char *replication_sets;
+
/*
* These aren't technically GUCs, but are per-connection config
* information obtained from the GUCs.
*/
char *name;
char *dbname;
+
/* Connection config might be broken (blank dsn, etc) */
bool is_valid;
} BdrConnectionConfig;
--- /dev/null
+/* -------------------------------------------------------------------------
+ *
+ * bdr_label.c
+ *
+ * Provide object metadata for bdr using the security label
+ * infrastructure.
+ *
+ * Copyright (c) 2014, PostgreSQL Global Development Group
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "bdr.h"
+#include "bdr_label.h"
+
+#include "commands/seclabel.h"
+#include "miscadmin.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+
+static void bdr_object_relabel(const ObjectAddress *object, const char *seclabel);
+
+/*
+ * Needs to call at postmaster init (or backend init for EXEC_BACKEND).
+ */
+void
+bdr_label_init(void)
+{
+ /* Security label provider hook */
+ register_label_provider("bdr", bdr_object_relabel);
+}
+
+static void
+bdr_object_relabel(const ObjectAddress *object, const char *seclabel)
+{
+ switch (object->classId)
+ {
+ case RelationRelationId:
+
+ if (!pg_class_ownercheck(object->objectId, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ get_rel_name(object->objectId));
+
+ if (seclabel != NULL)
+ bdr_parse_relation_options(seclabel, NULL);
+
+ CacheInvalidateRelcacheByRelid(object->objectId);
+ break;
+ default:
+ elog(ERROR, "unsupported object type: %s",
+ getObjectDescription(object));
+ break;
+ }
+}
--- /dev/null
+/*
+ * bdr_label.h
+ *
+ * BiDirectionalReplication
+ *
+ * Copyright (c) 2014, PostgreSQL Global Development Group
+ *
+ * bdr_label.h
+ */
+
+extern void bdr_label_init(void);
bool client_float8_byval;
bool client_int_datetime;
char *client_db_encoding;
+ Oid bdr_schema_oid;
Oid bdr_conflict_handlers_reloid;
Oid bdr_locks_reloid;
+
+ int num_replication_sets;
+ char **replication_sets;
} BdrOutputData;
/* These must be available to pg_dlsym() */
strVal(elem->arg), elem->defname)));
}
+static void
+bdr_parse_identifier_list_arr(DefElem *elem, char ***list, int *len)
+{
+ List *namelist;
+ ListCell *c;
+
+ bdr_parse_notnull(elem, "list");
+
+ if (!SplitIdentifierString(pstrdup(strVal(elem->arg)),
+ ',', &namelist))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not identifier list value \"%s\" for parameter \"%s\": %m",
+ strVal(elem->arg), elem->defname)));
+ }
+
+ *len = 0;
+ *list = palloc(list_length(namelist) * sizeof(char *));
+
+ foreach(c, namelist)
+ {
+ (*list)[(*len)++] = pstrdup(lfirst(c));
+ }
+ list_free(namelist);
+}
+
static void
bdr_req_param(const char *param)
{
data->bdr_conflict_handlers_reloid = InvalidOid;
data->bdr_locks_reloid = InvalidOid;
+ data->bdr_schema_oid = InvalidOid;
+
+ data->num_replication_sets = -1;
/* parse options passed in by the client */
data->client_db_encoding = pstrdup(strVal(elem->arg));
else if (strcmp(elem->defname, "forward_changesets") == 0)
bdr_parse_bool(elem, &data->forward_changesets);
+ else if (strcmp(elem->defname, "replication_sets") == 0)
+ {
+ bdr_parse_identifier_list_arr(elem,
+ &data->replication_sets,
+ &data->num_replication_sets);
+ qsort(data->replication_sets, data->num_replication_sets,
+ sizeof(char *), pg_qsort_strcmp);
+ }
else
{
ereport(ERROR,
}
schema_oid = get_namespace_oid("bdr", true);
+ data->bdr_schema_oid = schema_oid;
if (schema_oid != InvalidOid)
{
data->bdr_conflict_handlers_reloid =
* to the client unless we're in changeset forwarding mode.
*/
static inline bool
-should_forward_changeset(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+should_forward_changeset(LogicalDecodingContext *ctx, BdrOutputData *data,
+ ReorderBufferTXN *txn)
{
- return (txn->origin_id == InvalidRepNodeId)
- || ((BdrOutputData*)ctx->output_plugin_private)->forward_changesets;
+ return txn->origin_id == InvalidRepNodeId || data->forward_changesets;
+}
+
+static inline bool
+should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
+ BDRRelation *r)
+{
+ int i, j;
+
+ /* internal bdr relations that may not be replicated */
+ if(RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
+ RelationGetRelid(r->rel) == data->bdr_locks_reloid)
+ return false;
+
+ /* always replicate other stuff in the bdr schema */
+ if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
+ return true;
+
+ /* no explicit configuration */
+ if (data->num_replication_sets == -1 ||
+ r->num_replication_sets == -1)
+ {
+ return true;
+ }
+
+
+ /*
+ * Compare the two ordered list of replication sets and find overlapping
+ * elements.
+ */
+ i = j = 0;
+ while (i < data->num_replication_sets && j < r->num_replication_sets)
+ {
+ int cmp = strcmp(data->replication_sets[i],
+ r->replication_sets[j]);
+
+ if (cmp < 0)
+ i++;
+ else if (cmp == 0)
+ return true;
+ else
+ j++;
+ }
+
+ return false;
}
/*
AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
- if (!should_forward_changeset(ctx, txn))
+ if (!should_forward_changeset(ctx, data, txn))
return;
OutputPluginPrepareWrite(ctx, true);
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
-#ifdef NOT_YET
BdrOutputData *data = ctx->output_plugin_private;
-#endif
int flags = 0;
- if (!should_forward_changeset(ctx, txn))
+ if (!should_forward_changeset(ctx, data, txn))
return;
OutputPluginPrepareWrite(ctx, true);
{
BdrOutputData *data;
MemoryContext old;
+ BDRRelation *bdr_relation;
+
+ bdr_relation = bdr_heap_open(RelationGetRelid(relation), NoLock);
data = ctx->output_plugin_private;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- if (!should_forward_changeset(ctx, txn))
+ if (!should_forward_changeset(ctx, data, txn))
return;
- if(RelationGetRelid(relation) == data->bdr_conflict_handlers_reloid ||
- RelationGetRelid(relation) == data->bdr_locks_reloid)
+ if (!should_forward_change(ctx, data, bdr_relation))
return;
OutputPluginPrepareWrite(ctx, true);
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
+
+ bdr_heap_close(bdr_relation, NoLock);
}
/*
#include "access/heapam.h"
#include "access/xact.h"
+#include "commands/seclabel.h"
+
#include "utils/catcache.h"
#include "utils/inval.h"
static HTAB *BDRRelcacheHash = NULL;
+static void
+BDRRelcacheHashInvalidateEntry(BDRRelation *entry)
+{
+ int i;
+
+ if (entry->conflict_handlers)
+ pfree(entry->conflict_handlers);
+
+ if (entry->num_replication_sets > 0)
+ {
+ for (i = 0; i < entry->num_replication_sets; i++)
+ pfree(entry->replication_sets[i]);
+
+ pfree(entry->replication_sets);
+ }
+}
+
static void
BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
{
while ((entry = (BDRRelation *) hash_seq_search(&status)) != NULL)
{
- if (entry->conflict_handlers)
- pfree(entry->conflict_handlers);
+ BDRRelcacheHashInvalidateEntry(entry);
- if (hash_search(BDRRelcacheHash, (void *) &entry->reloid,
+ if (hash_search(BDRRelcacheHash, &entry->reloid,
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
}
}
else
{
- entry = hash_search(BDRRelcacheHash, (void *) &relid,
- HASH_REMOVE, NULL);
+ if ((entry = hash_search(BDRRelcacheHash, &relid,
+ HASH_FIND, NULL)) != NULL)
+ {
+ BDRRelcacheHashInvalidateEntry(entry);
- if (entry && entry->conflict_handlers)
- pfree(entry->conflict_handlers);
+ hash_search(BDRRelcacheHash, &relid,
+ HASH_REMOVE, NULL);
+ }
}
}
(Datum) 0);
}
+#include "utils/jsonapi.h"
+#include "utils/json.h"
+#include "utils/jsonb.h"
+
+void
+bdr_parse_relation_options(const char *label, BDRRelation *rel)
+{
+ JsonbIterator *it;
+ JsonbValue v;
+ int r;
+ bool parsing_sets = false;
+ int level = 0;
+ Jsonb *data = NULL;
+
+ if (label == NULL)
+ return;
+
+ data = DatumGetJsonb(
+ DirectFunctionCall1(jsonb_in, CStringGetDatum(label)));
+
+ if (!JB_ROOT_IS_OBJECT(data))
+ elog(ERROR, "root needs to be an object");
+
+ it = JsonbIteratorInit(&data->root);
+ while ((r = JsonbIteratorNext(&it, &v, false)) != WJB_DONE)
+ {
+ if (level == 0 && r != WJB_BEGIN_OBJECT)
+ elog(ERROR, "root element needs to be an object");
+ else if (level == 0 && it->nElems > 1)
+ elog(ERROR, "only 'sets' allowed on root level");
+ else if (level == 1 && r == WJB_KEY)
+ {
+ if (strncmp(v.val.string.val, "sets", v.val.string.len) != 0)
+ elog(ERROR, "unexpected key: %s",
+ pnstrdup(v.val.string.val, v.val.string.len));
+ parsing_sets = true;
+ }
+ else if (r == WJB_BEGIN_ARRAY || r == WJB_BEGIN_OBJECT)
+ {
+ if (parsing_sets && rel != NULL)
+ {
+ rel->replication_sets =
+ MemoryContextAlloc(CacheMemoryContext,
+ sizeof(char *) * it->nElems);
+ }
+ level++;
+ }
+ else if (r == WJB_END_ARRAY || r == WJB_END_OBJECT)
+ {
+ level--;
+ parsing_sets = false;
+ }
+ else if (parsing_sets)
+ {
+ char *setname;
+
+ if (r != WJB_ELEM)
+ elog(ERROR, "unexpected element type %u", r);
+ if (level != 2)
+ elog(ERROR, "unexpected level for set %d", level);
+
+ if (rel != NULL)
+ {
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+ setname = pnstrdup(v.val.string.val, v.val.string.len);
+ rel->replication_sets[rel->num_replication_sets++] = setname;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ }
+ else
+ elog(ERROR, "unexpected content: %u at level %d", r, level);
+ }
+
+ if (rel != NULL && rel->num_replication_sets > 0)
+ {
+ qsort(rel->replication_sets, rel->num_replication_sets,
+ sizeof(char *), pg_qsort_strcmp);
+ }
+
+}
+
BDRRelation *
bdr_heap_open(Oid reloid, LOCKMODE lockmode)
{
BDRRelation *entry;
bool found;
Relation rel;
+ ObjectAddress object;
+ const char *label;
rel = heap_open(reloid, lockmode);
entry->reloid = reloid;
entry->rel = rel;
+ object.classId = RelationRelationId;
+ object.objectId = reloid;
+ object.objectSubId = 0;
+
+ label = GetSecurityLabel(&object, "bdr");
+ bdr_parse_relation_options(label, entry);
+
return entry;
}