Make sequence pernode cache configurable
authorPetr Jelinek <petr@2ndquadrant.com>
Sun, 28 Jun 2015 18:58:57 +0000 (20:58 +0200)
committerPetr Jelinek <petr@2ndquadrant.com>
Tue, 30 Jun 2015 10:02:03 +0000 (12:02 +0200)
Reloption cache_chunks added which sets how many chunks should be voted
for and cached on each node.

bdr_seq.c

index c89fbbbd1d4db63c5d3a1770390bd4fa956fc5f0..d878b244d9954276d0c273f4fe84cba13b13ce4a 100644 (file)
--- a/bdr_seq.c
+++ b/bdr_seq.c
@@ -77,6 +77,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
 
 static bool bdr_seq_pending_wakeup = false;
 
+static relopt_kind bdr_seq_relopt_kind = RELOPT_KIND_SEQUENCE;
+
 /* vote, the logic is in a function */
 const char* vote_sql = ""
 "SELECT bdr.bdr_sequencer_vote($1, $2, $3, $4);\n";
@@ -85,39 +87,47 @@ const char *start_elections_sql =
 "WITH to_be_updated_sequences AS (\n"
 "    SELECT\n"
 "        pg_namespace.nspname AS seqschema,\n"
-"        pg_class.relname AS seqname,\n"
+"        seq.relname AS seqname,\n"
 "        COUNT(bdr_sequence_values) AS open_seq_chunks,\n"
 "        GREATEST(COALESCE((\n"
 "            SELECT max(upper(seqrange))\n"
 "            FROM bdr_sequence_values max_val\n"
 "            WHERE\n"
 "                max_val.seqschema = pg_namespace.nspname\n"
-"                AND max_val.seqname = pg_class.relname\n"
-"        ), 0), (SELECT start_value FROM pg_sequence_parameters(pg_class.oid)))\n"
-"        AS current_max\n"
+"                AND max_val.seqname = seq.relname\n"
+"        ), 0), (SELECT start_value FROM pg_sequence_parameters(seq.oid)))\n"
+"        AS current_max,\n"
+"        seq.cache_chunks\n"
 "    FROM\n"
-"        pg_class\n"
-"        JOIN pg_namespace ON (pg_class.relnamespace = pg_namespace.oid)\n"
+"        (SELECT\n"
+"            pg_class.oid,\n"
+"            pg_class.relnamespace,\n"
+"            pg_class.relname,\n"
+"            COALESCE((SELECT split_part(o, '=', 2)::int\n"
+"                      FROM unnest(pg_class.reloptions) o\n"
+"                      WHERE split_part(o, '=', 1) = 'cache_chunks'), 5)\n"
+"            AS cache_chunks\n"
+"        FROM\n"
+"            pg_class\n"
+"        WHERE\n"
+"            pg_class.relkind = 'S' AND\n"
+"            pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n"
+"        ) seq\n"
+"        JOIN pg_namespace ON (seq.relnamespace = pg_namespace.oid)\n"
 "        LEFT JOIN bdr_sequence_values ON (\n"
 "            bdr_sequence_values.seqschema = pg_namespace.nspname\n"
-"            AND bdr_sequence_values.seqname = pg_class.relname\n"
+"            AND bdr_sequence_values.seqname = seq.relname\n"
 "            AND bdr_sequence_values.emptied = false\n"
 "            AND bdr_sequence_values.in_use = false\n"
 "            AND bdr_sequence_values.failed = false\n"
-"            AND bdr_sequence_values.owning_sysid = $1\n"
-"            AND bdr_sequence_values.owning_tlid = $2\n"
-"            AND bdr_sequence_values.owning_dboid = $3\n"
-"            AND bdr_sequence_values.owning_riname = $4\n"
 "        )\n"
-"    WHERE\n"
-"        pg_class.relkind = 'S'\n"
-"        AND pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n"
 "    GROUP BY\n"
-"        pg_class.relname,\n"
+"        seq.relname,\n"
 "        pg_namespace.nspname,\n"
-"        pg_class.oid\n"
+"        seq.oid,\n"
+"        seq.cache_chunks\n"
 "    HAVING\n"
-"        count(bdr_sequence_values) <= 5\n"
+"        count(bdr_sequence_values) <= cache_chunks\n"
 "),\n"
 "to_be_inserted_chunks AS (\n"
 "    SELECT\n"
@@ -127,7 +137,7 @@ const char *start_elections_sql =
 "        generate_series(\n"
 "            current_max,\n"
 "            -- 1000 is the chunk size, -1 is to get < instead <= out of generate_series\n"
-"            current_max + 1000 * (5 - open_seq_chunks) - 1,\n"
+"            current_max + 1000 * (cache_chunks - open_seq_chunks) - 1,\n"
 "            1000) chunk_start\n"
 "    FROM to_be_updated_sequences\n"
 "    LIMIT 500\n"
@@ -449,6 +459,11 @@ bdr_sequencer_shmem_init(int sequencers)
 
    prev_shmem_startup_hook = shmem_startup_hook;
    shmem_startup_hook = bdr_sequencer_shmem_startup;
+
+   bdr_seq_relopt_kind = add_reloption_kind();
+   add_int_reloption(bdr_seq_relopt_kind, "cache_chunks",
+                     "Sets how many chunks shoult be cached on each node.",
+                     5, 5, 100);
 }
 
 /*
@@ -1263,21 +1278,43 @@ PGDLLEXPORT Datum bdr_sequence_options(PG_FUNCTION_ARGS);
 
 PG_FUNCTION_INFO_V1(bdr_sequence_options);
 
+typedef struct BDRSeqOptions
+{
+   int32       vl_len_;        /* varlena header (do not touch directly!) */
+   int         cache_chunks;
+} BDRSeqOptions;
+
 Datum
 bdr_sequence_options(PG_FUNCTION_ARGS)
 {
    Datum       reloptions = PG_GETARG_DATUM(0);
    bool        validate = PG_GETARG_BOOL(1);
-   bytea      *result;
+   relopt_value *options;
+   BDRSeqOptions *sopts;
+   int         numoptions;
+   static const relopt_parse_elt tab[] = {
+       {"cache_chunks", RELOPT_TYPE_INT,
+       offsetof(BDRSeqOptions, cache_chunks)}
+   };
 
-   result = default_reloptions(reloptions, validate, RELOPT_KIND_SEQUENCE);
-   if (result)
-       PG_RETURN_BYTEA_P(result);
+   options = parseRelOptions(reloptions, validate, bdr_seq_relopt_kind, &numoptions);
 
-   PG_RETURN_NULL();
+   /* if none set, we're done */
+   if (numoptions == 0)
+       PG_RETURN_NULL();
+
+   sopts = allocateReloptStruct(sizeof(BDRSeqOptions), options, numoptions);
+
+   fillRelOptions((void *) sopts, sizeof(BDRSeqOptions), options, numoptions,
+                  validate, tab, lengthof(tab));
+
+   pfree(options);
 
    /* schedule wakeup as soon as other xacts can see the seuqence */
    bdr_schedule_eoxact_sequencer_wakeup();
 
-   PG_RETURN_VOID();
+   if (sopts)
+       PG_RETURN_BYTEA_P((bytea *)sopts);
+
+   PG_RETURN_NULL();
 }