bdr: Actually andle the implicit 'all' and 'default' replication sets.
authorAndres Freund <andres@anarazel.de>
Sun, 2 Nov 2014 14:53:49 +0000 (15:53 +0100)
committerAndres Freund <andres@anarazel.de>
Sun, 2 Nov 2014 14:53:49 +0000 (15:53 +0100)
bdr.h
bdr_output.c
bdr_relcache.c

diff --git a/bdr.h b/bdr.h
index 04c56f8654e976bae2198830fe46cdee2bd87b06..727ccfea2c2883c63ac14a8ee0311707639f2ad6 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -115,6 +115,7 @@ typedef struct BDRRelation
 
    /* ordered list of replication sets of length num_* */
    char          **replication_sets;
+   /* -1 for no configured set */
    int     num_replication_sets;
 } BDRRelation;
 
index 9486e14f190372dc41d46e6efe4c0e0656876c60..44a4f6ea1f7fd55eb79a1621f049b24b5dfa5fed 100644 (file)
@@ -78,6 +78,7 @@ typedef struct
 
    int num_replication_sets;
    char **replication_sets;
+   bool replication_sets_include_default;
 } BdrOutputData;
 
 /* These must be available to pg_dlsym() */
@@ -333,6 +334,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
    data->bdr_schema_oid = InvalidOid;
 
    data->num_replication_sets = -1;
+   data->replication_sets_include_default = false;
 
    /* parse options passed in by the client */
 
@@ -379,6 +381,8 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
                                          &data->replication_sets,
                                          &data->num_replication_sets);
 
+           Assert(data->num_replication_sets >= 0);
+
            /* validate elements */
            for (i = 0; i < data->num_replication_sets; i++)
                bdr_validate_replication_set_name(data->replication_sets[i],
@@ -387,6 +391,28 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
            /* make it bsearch()able */
            qsort(data->replication_sets, data->num_replication_sets,
                  sizeof(char *), pg_qsort_strcmp);
+
+           /* handle special case/implicit sets */
+           for (i = 0; i < data->num_replication_sets; i++)
+           {
+               /* no need to perform any checks */
+               if (strcmp(data->replication_sets[i], "all") == 0)
+               {
+                   data->num_replication_sets = -1;
+                   break;
+               }
+
+               /*
+                * Simpler handling, because we don't need to search the list
+                * later.
+                */
+               if (strcmp(data->replication_sets[i], "default") == 0)
+               {
+                   data->replication_sets_include_default = true;
+                   continue;
+               }
+           }
+
        }
        else
        {
@@ -548,12 +574,19 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
        return true;
 
    /* no explicit configuration */
-   if (data->num_replication_sets == -1 ||
-       r->num_replication_sets == -1)
+   if (data->num_replication_sets == -1)
    {
        return true;
    }
 
+   /*
+    * Handle the 'default' set.
+    */
+   if (data->replication_sets_include_default &&
+       r->num_replication_sets == -1)
+   {
+       return true;
+   }
 
    /*
     * Compare the two ordered list of replication sets and find overlapping
index b53ad70ed288f2e896c9dee30f5c00cd727594f0..220a03b9ad05a9103afcc9a061f22574c1f334a3 100644 (file)
@@ -190,6 +190,9 @@ bdr_parse_relation_options(const char *label, BDRRelation *rel)
                elog(ERROR, "unexpected key: %s",
                     pnstrdup(v.val.string.val, v.val.string.len));
            parsing_sets = true;
+
+           if (rel != NULL)
+               rel->num_replication_sets = 0;
        }
        else if (r == WJB_BEGIN_ARRAY || r == WJB_BEGIN_OBJECT)
        {
@@ -274,6 +277,7 @@ bdr_heap_open(Oid reloid, LOCKMODE lockmode)
 
    entry->reloid = reloid;
    entry->rel = rel;
+   entry->num_replication_sets = -1;
 
    object.classId = RelationRelationId;
    object.objectId = reloid;