bdr: Increase validation around replication sets.
authorAndres Freund <andres@anarazel.de>
Sun, 2 Nov 2014 14:38:44 +0000 (15:38 +0100)
committerAndres Freund <andres@anarazel.de>
Sun, 2 Nov 2014 14:38:44 +0000 (15:38 +0100)
bdr.h
bdr_output.c
bdr_relcache.c

diff --git a/bdr.h b/bdr.h
index a4fc9c07292cdcbe9a558cefd2589f31e67db826..04c56f8654e976bae2198830fe46cdee2bd87b06 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -404,6 +404,9 @@ extern HeapTuple bdr_conflict_handlers_resolve(BDRRelation * rel,
                                               BdrConflictType event_type,
                                               uint64 timeframe, bool *skip);
 
+/* replication set stuff */
+void bdr_validate_replication_set_name(const char *name, bool allow_implicit);
+
 /*
  * Global to identify the type of BDR worker the current process is. Primarily
  * useful for assertions and debugging.
index 016150d8562b4c6eea64fc88b15ebd6103a32655..9486e14f190372dc41d46e6efe4c0e0656876c60 100644 (file)
@@ -372,9 +372,19 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
            bdr_parse_bool(elem, &data->forward_changesets);
        else if (strcmp(elem->defname, "replication_sets") == 0)
        {
+           int i;
+
+           /* parse list */
            bdr_parse_identifier_list_arr(elem,
                                          &data->replication_sets,
                                          &data->num_replication_sets);
+
+           /* validate elements */
+           for (i = 0; i < data->num_replication_sets; i++)
+               bdr_validate_replication_set_name(data->replication_sets[i],
+                                                 true);
+
+           /* make it bsearch()able */
            qsort(data->replication_sets, data->num_replication_sets,
                  sizeof(char *), pg_qsort_strcmp);
        }
@@ -548,6 +558,9 @@ should_forward_change(LogicalDecodingContext *ctx, BdrOutputData *data,
    /*
     * Compare the two ordered list of replication sets and find overlapping
     * elements.
+    *
+    * XXX: At some point in the future we probably want to cache this
+    * computation in the bdr relcache entry.
     */
    i = j = 0;
    while (i < data->num_replication_sets && j < r->num_replication_sets)
index c425926299171db650edccdb56c5e529a7fb7059..b53ad70ed288f2e896c9dee30f5c00cd727594f0 100644 (file)
@@ -23,6 +23,9 @@
 
 #include "utils/catcache.h"
 #include "utils/inval.h"
+#include "utils/jsonapi.h"
+#include "utils/json.h"
+#include "utils/jsonb.h"
 
 static HTAB *BDRRelcacheHash = NULL;
 
@@ -106,9 +109,54 @@ bdr_initialize_cache()
                                  (Datum) 0);
 }
 
-#include "utils/jsonapi.h"
-#include "utils/json.h"
-#include "utils/jsonb.h"
+void
+bdr_validate_replication_set_name(const char *name,
+                                 bool allow_implicit)
+{
+   const char *cp;
+
+   if (strlen(name) == 0)
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_NAME),
+                errmsg("replication set name \"%s\" is too short",
+                       name)));
+   }
+
+   if (strlen(name) >= NAMEDATALEN)
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_NAME_TOO_LONG),
+                errmsg("replication set name \"%s\" is too long",
+                       name)));
+   }
+
+   for (cp = name; *cp; cp++)
+   {
+       if (!((*cp >= 'a' && *cp <= 'z')
+             || (*cp >= '0' && *cp <= '9')
+             || (*cp == '_')
+             || (*cp == '-')))
+       {
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_NAME),
+                    errmsg("replication set name \"%s\" contains invalid character",
+                           name),
+                    errhint("Replication set names may only contain letters, numbers, and the underscore character.")));
+       }
+   }
+
+   if (!allow_implicit && (
+           strcmp(name, "default") == 0 ||
+           strcmp(name, "all") == 0
+           ))
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_NAME_TOO_LONG),
+                errmsg("replication set name \"%s\" is reserved",
+                       name)));
+   }
+}
 
 void
 bdr_parse_relation_options(const char *label, BDRRelation *rel)
@@ -161,21 +209,26 @@ bdr_parse_relation_options(const char *label, BDRRelation *rel)
        else if (parsing_sets)
        {
            char *setname;
+           MemoryContext oldcontext;
 
            if (r != WJB_ELEM)
                elog(ERROR, "unexpected element type %u", r);
            if (level != 2)
                elog(ERROR, "unexpected level for set %d", level);
 
+           oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+
+           setname = pnstrdup(v.val.string.val, v.val.string.len);
+           bdr_validate_replication_set_name(setname, false);
+
            if (rel != NULL)
            {
-               MemoryContext oldcontext;
-
-               oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
-               setname = pnstrdup(v.val.string.val, v.val.string.len);
                rel->replication_sets[rel->num_replication_sets++] = setname;
-               MemoryContextSwitchTo(oldcontext);
            }
+           else
+               pfree(setname);
+
+           MemoryContextSwitchTo(oldcontext);
        }
        else
            elog(ERROR, "unexpected content: %u at level %d", r, level);