char **replication_sets;
/* -1 for no configured set */
int num_replication_sets;
+
+ bool computed_repl_valid;
+ bool computed_repl_insert;
+ bool computed_repl_update;
+ bool computed_repl_delete;
} BDRRelation;
typedef struct BDRTupleData
/* use instead of heap_open()/heap_close() */
extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode);
extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode);
+extern void bdr_heap_compute_replication_settings(
+ BDRRelation *rel,
+ int num_replication_sets,
+ char **replication_sets);
extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
extern void bdr_parse_database_options(const char *label);
int num_replication_sets;
char **replication_sets;
- bool replication_sets_include_default;
} BdrOutputData;
/* These must be available to pg_dlsym() */
data->bdr_schema_oid = InvalidOid;
data->num_replication_sets = -1;
- data->replication_sets_include_default = false;
/* parse options passed in by the client */
/* make it bsearch()able */
qsort(data->replication_sets, data->num_replication_sets,
sizeof(char *), pg_qsort_strcmp);
-
- /* handle special case/implicit sets */
- for (i = 0; i < data->num_replication_sets; i++)
- {
- /* no need to perform any checks */
- if (strcmp(data->replication_sets[i], "all") == 0)
- {
- data->num_replication_sets = -1;
- break;
- }
-
- /*
- * Simpler handling, because we don't need to search the list
- * later.
- */
- if (strcmp(data->replication_sets[i], "default") == 0)
- {
- data->replication_sets_include_default = true;
- continue;
- }
- }
-
}
else
{
static inline bool
should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
- BDRRelation *r)
+ BDRRelation *r, enum ReorderBufferChangeType change)
{
- int i, j;
-
/* internal bdr relations that may not be replicated */
if(RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
RelationGetRelid(r->rel) == data->bdr_locks_reloid)
if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
return true;
- /* no explicit configuration */
- if (data->num_replication_sets == -1)
- {
- return true;
- }
-
- /*
- * Handle the 'default' set.
- */
- if (data->replication_sets_include_default &&
- r->num_replication_sets == -1)
- {
- return true;
- }
+ if (!r->computed_repl_valid)
+ bdr_heap_compute_replication_settings(r,
+ data->num_replication_sets,
+ data->replication_sets);
- /*
- * Compare the two ordered list of replication sets and find overlapping
- * elements.
- *
- * XXX: At some point in the future we probably want to cache this
- * computation in the bdr relcache entry.
- */
- i = j = 0;
- while (i < data->num_replication_sets && j < r->num_replication_sets)
+ /* Check whether the current action is configured to be replicated */
+ switch (change)
{
- int cmp = strcmp(data->replication_sets[i],
- r->replication_sets[j]);
-
- if (cmp < 0)
- i++;
- else if (cmp == 0)
- return true;
- else
- j++;
+ case REORDER_BUFFER_CHANGE_INSERT:
+ return r->computed_repl_insert;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ return r->computed_repl_update;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ return r->computed_repl_delete;
+ default:
+ elog(ERROR, "should be unreachable");
}
-
- return false;
}
/*
if (!should_forward_changeset(ctx, data, txn))
return;
- if (!should_forward_change(ctx, data, bdr_relation))
+ if (!should_forward_change(ctx, data, bdr_relation, change->action))
return;
OutputPluginPrepareWrite(ctx, true);
heap_close(rel->rel, lockmode);
rel->rel = NULL;
}
+
+
+static bool
+relation_in_replication_set(BDRRelation *r, const char *setname)
+{
+ /* "all" set contains, surprise, all relations */
+ if (strcmp(setname, "all") == 0)
+ return true;
+
+ /* "default" set contains all relations without a replication set configuration */
+ if (strcmp(setname, "default") == 0 && r->num_replication_sets == -1)
+ return true;
+
+ /* if no set is configured, it's not in there */
+ if (r->num_replication_sets <= 0)
+ return false;
+
+ /* look whether the relation is the named set */
+ if (bsearch(&setname,
+ r->replication_sets, r->num_replication_sets, sizeof(char *),
+ pg_qsort_strcmp))
+ return true;
+
+ return false;
+}
+
+/*
+ * Compute whether modifications to this relation should be replicated or not
+ * and cache the result in the relation descriptor.
+ *
+ * NB: This can only sensibly used from inside logical decoding as we require
+ * a constant set of 'to be replicated' sets to be passed in - which happens
+ * to be what we need for logical decoding. As there really isn't another need
+ * for this functionality so far...
+ */
+void
+bdr_heap_compute_replication_settings(BDRRelation *r,
+ int conf_num_replication_sets,
+ char **conf_replication_sets)
+{
+ int i;
+
+ Assert(!r->computed_repl_valid);
+
+ /* Implicit "replicate everything" configuration */
+ if (conf_num_replication_sets == -1)
+ {
+ r->computed_repl_insert = true;
+ r->computed_repl_update = true;
+ r->computed_repl_delete = true;
+
+ r->computed_repl_valid = true;
+ return;
+ }
+
+ /*
+ * Build the union of all replicated actions across all configured
+ * replication sets.
+ */
+ for (i = 0; i < conf_num_replication_sets; i++)
+ {
+ const char* setname = conf_replication_sets[i];
+
+ if (!relation_in_replication_set(r, setname))
+ continue;
+
+ /*
+ * In the future we'll lookup configuration for individual sets here.
+ */
+ r->computed_repl_insert = true;
+ r->computed_repl_update = true;
+ r->computed_repl_delete = true;
+
+ /* no need to look any further, we replicate everything */
+ if (r->computed_repl_insert &&
+ r->computed_repl_update &&
+ r->computed_repl_delete)
+ break;
+ }
+
+ r->computed_repl_valid = true;
+}