From 071e940849a71b50d0d7cc38827b0ed6d6827079 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Sun, 28 Jun 2015 20:58:57 +0200 Subject: [PATCH] Make sequence pernode cache configurable Reloption cache_chunks added which sets how many chunks should be voted for and cached on each node. --- bdr_seq.c | 85 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/bdr_seq.c b/bdr_seq.c index c89fbbbd1d..d878b244d9 100644 --- a/bdr_seq.c +++ b/bdr_seq.c @@ -77,6 +77,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static bool bdr_seq_pending_wakeup = false; +static relopt_kind bdr_seq_relopt_kind = RELOPT_KIND_SEQUENCE; + /* vote, the logic is in a function */ const char* vote_sql = "" "SELECT bdr.bdr_sequencer_vote($1, $2, $3, $4);\n"; @@ -85,39 +87,47 @@ const char *start_elections_sql = "WITH to_be_updated_sequences AS (\n" " SELECT\n" " pg_namespace.nspname AS seqschema,\n" -" pg_class.relname AS seqname,\n" +" seq.relname AS seqname,\n" " COUNT(bdr_sequence_values) AS open_seq_chunks,\n" " GREATEST(COALESCE((\n" " SELECT max(upper(seqrange))\n" " FROM bdr_sequence_values max_val\n" " WHERE\n" " max_val.seqschema = pg_namespace.nspname\n" -" AND max_val.seqname = pg_class.relname\n" -" ), 0), (SELECT start_value FROM pg_sequence_parameters(pg_class.oid)))\n" -" AS current_max\n" +" AND max_val.seqname = seq.relname\n" +" ), 0), (SELECT start_value FROM pg_sequence_parameters(seq.oid)))\n" +" AS current_max,\n" +" seq.cache_chunks\n" " FROM\n" -" pg_class\n" -" JOIN pg_namespace ON (pg_class.relnamespace = pg_namespace.oid)\n" +" (SELECT\n" +" pg_class.oid,\n" +" pg_class.relnamespace,\n" +" pg_class.relname,\n" +" COALESCE((SELECT split_part(o, '=', 2)::int\n" +" FROM unnest(pg_class.reloptions) o\n" +" WHERE split_part(o, '=', 1) = 'cache_chunks'), 5)\n" +" AS cache_chunks\n" +" FROM\n" +" pg_class\n" +" WHERE\n" +" pg_class.relkind = 'S' AND\n" +" pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n" +" ) seq\n" +" JOIN pg_namespace ON (seq.relnamespace = pg_namespace.oid)\n" " LEFT JOIN bdr_sequence_values ON (\n" " bdr_sequence_values.seqschema = pg_namespace.nspname\n" -" AND bdr_sequence_values.seqname = pg_class.relname\n" +" AND bdr_sequence_values.seqname = seq.relname\n" " AND bdr_sequence_values.emptied = false\n" " AND bdr_sequence_values.in_use = false\n" " AND bdr_sequence_values.failed = false\n" -" AND bdr_sequence_values.owning_sysid = $1\n" -" AND bdr_sequence_values.owning_tlid = $2\n" -" AND bdr_sequence_values.owning_dboid = $3\n" -" AND bdr_sequence_values.owning_riname = $4\n" " )\n" -" WHERE\n" -" pg_class.relkind = 'S'\n" -" AND pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n" " GROUP BY\n" -" pg_class.relname,\n" +" seq.relname,\n" " pg_namespace.nspname,\n" -" pg_class.oid\n" +" seq.oid,\n" +" seq.cache_chunks\n" " HAVING\n" -" count(bdr_sequence_values) <= 5\n" +" count(bdr_sequence_values) <= cache_chunks\n" "),\n" "to_be_inserted_chunks AS (\n" " SELECT\n" @@ -127,7 +137,7 @@ const char *start_elections_sql = " generate_series(\n" " current_max,\n" " -- 1000 is the chunk size, -1 is to get < instead <= out of generate_series\n" -" current_max + 1000 * (5 - open_seq_chunks) - 1,\n" +" current_max + 1000 * (cache_chunks - open_seq_chunks) - 1,\n" " 1000) chunk_start\n" " FROM to_be_updated_sequences\n" " LIMIT 500\n" @@ -449,6 +459,11 @@ bdr_sequencer_shmem_init(int sequencers) prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = bdr_sequencer_shmem_startup; + + bdr_seq_relopt_kind = add_reloption_kind(); + add_int_reloption(bdr_seq_relopt_kind, "cache_chunks", + "Sets how many chunks shoult be cached on each node.", + 5, 5, 100); } /* @@ -1263,21 +1278,43 @@ PGDLLEXPORT Datum bdr_sequence_options(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(bdr_sequence_options); +typedef struct BDRSeqOptions +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + int cache_chunks; +} BDRSeqOptions; + Datum bdr_sequence_options(PG_FUNCTION_ARGS) { Datum reloptions = PG_GETARG_DATUM(0); bool validate = PG_GETARG_BOOL(1); - bytea *result; + relopt_value *options; + BDRSeqOptions *sopts; + int numoptions; + static const relopt_parse_elt tab[] = { + {"cache_chunks", RELOPT_TYPE_INT, + offsetof(BDRSeqOptions, cache_chunks)} + }; - result = default_reloptions(reloptions, validate, RELOPT_KIND_SEQUENCE); - if (result) - PG_RETURN_BYTEA_P(result); + options = parseRelOptions(reloptions, validate, bdr_seq_relopt_kind, &numoptions); - PG_RETURN_NULL(); + /* if none set, we're done */ + if (numoptions == 0) + PG_RETURN_NULL(); + + sopts = allocateReloptStruct(sizeof(BDRSeqOptions), options, numoptions); + + fillRelOptions((void *) sopts, sizeof(BDRSeqOptions), options, numoptions, + validate, tab, lengthof(tab)); + + pfree(options); /* schedule wakeup as soon as other xacts can see the seuqence */ bdr_schedule_eoxact_sequencer_wakeup(); - PG_RETURN_VOID(); + if (sopts) + PG_RETURN_BYTEA_P((bytea *)sopts); + + PG_RETURN_NULL(); } -- 2.39.5