bdr: Cache information about a relation being replicated in an output plugin.
authorAndres Freund <andres@anarazel.de>
Mon, 3 Nov 2014 13:14:15 +0000 (14:14 +0100)
committerAndres Freund <andres@anarazel.de>
Mon, 3 Nov 2014 13:17:49 +0000 (14:17 +0100)
That's primarily useful because it will make it easier to maintain the
state necessary for replication set wide settings. But the new
infrastructure is also generally more efficient.

The information is added to the bdr relation descriptor - as logical
decoding maintains that cache coherently that's quite convenient.

bdr.h
bdr_output.c
bdr_relcache.c

diff --git a/bdr.h b/bdr.h
index 9fc7ba5a5c7f2fd3dc7cbacfb1d32df14d49290b..1b0d97ddb866249831768c237a380634610f619f 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -119,6 +119,11 @@ typedef struct BDRRelation
    char      **replication_sets;
    /* -1 for no configured set */
    int         num_replication_sets;
+
+   bool        computed_repl_valid;
+   bool        computed_repl_insert;
+   bool        computed_repl_update;
+   bool        computed_repl_delete;
 } BDRRelation;
 
 typedef struct BDRTupleData
@@ -392,6 +397,10 @@ bdr_establish_connection_and_slot(BdrConnectionConfig *cfg,
 /* use instead of heap_open()/heap_close() */
 extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode);
 extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode);
+extern void bdr_heap_compute_replication_settings(
+   BDRRelation *rel,
+   int         num_replication_sets,
+   char      **replication_sets);
 
 extern void bdr_parse_relation_options(const char *label, BDRRelation *rel);
 extern void bdr_parse_database_options(const char *label);
index 44a4f6ea1f7fd55eb79a1621f049b24b5dfa5fed..f1b3c351349a328fd0f1be72578720cce247338f 100644 (file)
@@ -78,7 +78,6 @@ typedef struct
 
    int num_replication_sets;
    char **replication_sets;
-   bool replication_sets_include_default;
 } BdrOutputData;
 
 /* These must be available to pg_dlsym() */
@@ -334,7 +333,6 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
    data->bdr_schema_oid = InvalidOid;
 
    data->num_replication_sets = -1;
-   data->replication_sets_include_default = false;
 
    /* parse options passed in by the client */
 
@@ -391,28 +389,6 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
            /* make it bsearch()able */
            qsort(data->replication_sets, data->num_replication_sets,
                  sizeof(char *), pg_qsort_strcmp);
-
-           /* handle special case/implicit sets */
-           for (i = 0; i < data->num_replication_sets; i++)
-           {
-               /* no need to perform any checks */
-               if (strcmp(data->replication_sets[i], "all") == 0)
-               {
-                   data->num_replication_sets = -1;
-                   break;
-               }
-
-               /*
-                * Simpler handling, because we don't need to search the list
-                * later.
-                */
-               if (strcmp(data->replication_sets[i], "default") == 0)
-               {
-                   data->replication_sets_include_default = true;
-                   continue;
-               }
-           }
-
        }
        else
        {
@@ -560,10 +536,8 @@ should_forward_changeset(LogicalDecodingContext *ctx, BdrOutputData *data,
 
 static inline bool
 should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
-                     BDRRelation *r)
+                     BDRRelation *r, enum ReorderBufferChangeType change)
 {
-   int i, j;
-
    /* internal bdr relations that may not be replicated */
    if(RelationGetRelid(r->rel) == data->bdr_conflict_handlers_reloid ||
       RelationGetRelid(r->rel) == data->bdr_locks_reloid)
@@ -573,43 +547,23 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
    if (r->rel->rd_rel->relnamespace == data->bdr_schema_oid)
        return true;
 
-   /* no explicit configuration */
-   if (data->num_replication_sets == -1)
-   {
-       return true;
-   }
-
-   /*
-    * Handle the 'default' set.
-    */
-   if (data->replication_sets_include_default &&
-       r->num_replication_sets == -1)
-   {
-       return true;
-   }
+   if (!r->computed_repl_valid)
+       bdr_heap_compute_replication_settings(r,
+                                             data->num_replication_sets,
+                                             data->replication_sets);
 
-   /*
-    * Compare the two ordered list of replication sets and find overlapping
-    * elements.
-    *
-    * XXX: At some point in the future we probably want to cache this
-    * computation in the bdr relcache entry.
-    */
-   i = j = 0;
-   while (i < data->num_replication_sets && j < r->num_replication_sets)
+   /* Check whether the current action is configured to be replicated */
+   switch (change)
    {
-       int cmp = strcmp(data->replication_sets[i],
-                        r->replication_sets[j]);
-
-       if (cmp < 0)
-           i++;
-       else if (cmp == 0)
-           return true;
-       else
-           j++;
+       case REORDER_BUFFER_CHANGE_INSERT:
+           return r->computed_repl_insert;
+       case REORDER_BUFFER_CHANGE_UPDATE:
+           return r->computed_repl_update;
+       case REORDER_BUFFER_CHANGE_DELETE:
+           return r->computed_repl_delete;
+       default:
+           elog(ERROR, "should be unreachable");
    }
-
-   return false;
 }
 
 /*
@@ -729,7 +683,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    if (!should_forward_changeset(ctx, data, txn))
        return;
 
-   if (!should_forward_change(ctx, data, bdr_relation))
+   if (!should_forward_change(ctx, data, bdr_relation, change->action))
        return;
 
    OutputPluginPrepareWrite(ctx, true);
index 1e19b83146d3d6d618496ac22eff2cd2b5c73ec5..5849d82d85751009e42ebb8ceb3aaaf4f8af0847 100644 (file)
@@ -298,3 +298,85 @@ bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode)
    heap_close(rel->rel, lockmode);
    rel->rel = NULL;
 }
+
+
+static bool
+relation_in_replication_set(BDRRelation *r, const char *setname)
+{
+   /* "all" set contains, surprise, all relations */
+   if (strcmp(setname, "all") == 0)
+       return true;
+
+   /* "default" set contains all relations without a replication set configuration */
+   if (strcmp(setname, "default") == 0 && r->num_replication_sets == -1)
+       return true;
+
+   /* if no set is configured, it's not in there */
+   if (r->num_replication_sets <= 0)
+       return false;
+
+   /* look whether the relation is the named set */
+   if (bsearch(&setname,
+               r->replication_sets, r->num_replication_sets, sizeof(char *),
+               pg_qsort_strcmp))
+       return true;
+
+   return false;
+}
+
+/*
+ * Compute whether modifications to this relation should be replicated or not
+ * and cache the result in the relation descriptor.
+ *
+ * NB: This can only sensibly used from inside logical decoding as we require
+ * a constant set of 'to be replicated' sets to be passed in - which happens
+ * to be what we need for logical decoding. As there really isn't another need
+ * for this functionality so far...
+ */
+void
+bdr_heap_compute_replication_settings(BDRRelation *r,
+                                     int          conf_num_replication_sets,
+                                     char       **conf_replication_sets)
+{
+   int i;
+
+   Assert(!r->computed_repl_valid);
+
+   /* Implicit "replicate everything" configuration */
+   if (conf_num_replication_sets == -1)
+   {
+       r->computed_repl_insert = true;
+       r->computed_repl_update = true;
+       r->computed_repl_delete = true;
+
+       r->computed_repl_valid = true;
+       return;
+   }
+
+   /*
+    * Build the union of all replicated actions across all configured
+    * replication sets.
+    */
+   for (i = 0; i < conf_num_replication_sets; i++)
+   {
+       const char* setname = conf_replication_sets[i];
+
+       if (!relation_in_replication_set(r, setname))
+           continue;
+
+       /*
+        * In the future we'll lookup configuration for individual sets here.
+        */
+       r->computed_repl_insert = true;
+       r->computed_repl_update = true;
+       r->computed_repl_delete = true;
+
+       /* no need to look any further, we replicate everything */
+       if (r->computed_repl_insert &&
+           r->computed_repl_update &&
+           r->computed_repl_delete)
+           break;
+   }
+
+   r->computed_repl_valid = true;
+}