static bool bdr_seq_pending_wakeup = false;
+static relopt_kind bdr_seq_relopt_kind = RELOPT_KIND_SEQUENCE;
+
/* vote, the logic is in a function */
const char* vote_sql = ""
"SELECT bdr.bdr_sequencer_vote($1, $2, $3, $4);\n";
"WITH to_be_updated_sequences AS (\n"
" SELECT\n"
" pg_namespace.nspname AS seqschema,\n"
-" pg_class.relname AS seqname,\n"
+" seq.relname AS seqname,\n"
" COUNT(bdr_sequence_values) AS open_seq_chunks,\n"
" GREATEST(COALESCE((\n"
" SELECT max(upper(seqrange))\n"
" FROM bdr_sequence_values max_val\n"
" WHERE\n"
" max_val.seqschema = pg_namespace.nspname\n"
-" AND max_val.seqname = pg_class.relname\n"
-" ), 0), (SELECT start_value FROM pg_sequence_parameters(pg_class.oid)))\n"
-" AS current_max\n"
+" AND max_val.seqname = seq.relname\n"
+" ), 0), (SELECT start_value FROM pg_sequence_parameters(seq.oid)))\n"
+" AS current_max,\n"
+" seq.cache_chunks\n"
" FROM\n"
-" pg_class\n"
-" JOIN pg_namespace ON (pg_class.relnamespace = pg_namespace.oid)\n"
+" (SELECT\n"
+" pg_class.oid,\n"
+" pg_class.relnamespace,\n"
+" pg_class.relname,\n"
+" COALESCE((SELECT split_part(o, '=', 2)::int\n"
+" FROM unnest(pg_class.reloptions) o\n"
+" WHERE split_part(o, '=', 1) = 'cache_chunks'), 5)\n"
+" AS cache_chunks\n"
+" FROM\n"
+" pg_class\n"
+" WHERE\n"
+" pg_class.relkind = 'S' AND\n"
+" pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n"
+" ) seq\n"
+" JOIN pg_namespace ON (seq.relnamespace = pg_namespace.oid)\n"
" LEFT JOIN bdr_sequence_values ON (\n"
" bdr_sequence_values.seqschema = pg_namespace.nspname\n"
-" AND bdr_sequence_values.seqname = pg_class.relname\n"
+" AND bdr_sequence_values.seqname = seq.relname\n"
" AND bdr_sequence_values.emptied = false\n"
" AND bdr_sequence_values.in_use = false\n"
" AND bdr_sequence_values.failed = false\n"
-" AND bdr_sequence_values.owning_sysid = $1\n"
-" AND bdr_sequence_values.owning_tlid = $2\n"
-" AND bdr_sequence_values.owning_dboid = $3\n"
-" AND bdr_sequence_values.owning_riname = $4\n"
" )\n"
-" WHERE\n"
-" pg_class.relkind = 'S'\n"
-" AND pg_class.relam = (SELECT oid FROM pg_seqam WHERE seqamname = 'bdr')\n"
" GROUP BY\n"
-" pg_class.relname,\n"
+" seq.relname,\n"
" pg_namespace.nspname,\n"
-" pg_class.oid\n"
+" seq.oid,\n"
+" seq.cache_chunks\n"
" HAVING\n"
-" count(bdr_sequence_values) <= 5\n"
+" count(bdr_sequence_values) <= cache_chunks\n"
"),\n"
"to_be_inserted_chunks AS (\n"
" SELECT\n"
" generate_series(\n"
" current_max,\n"
" -- 1000 is the chunk size, -1 is to get < instead <= out of generate_series\n"
-" current_max + 1000 * (5 - open_seq_chunks) - 1,\n"
+" current_max + 1000 * (cache_chunks - open_seq_chunks) - 1,\n"
" 1000) chunk_start\n"
" FROM to_be_updated_sequences\n"
" LIMIT 500\n"
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = bdr_sequencer_shmem_startup;
+
+ bdr_seq_relopt_kind = add_reloption_kind();
+ add_int_reloption(bdr_seq_relopt_kind, "cache_chunks",
+ "Sets how many chunks shoult be cached on each node.",
+ 5, 5, 100);
}
/*
PG_FUNCTION_INFO_V1(bdr_sequence_options);
+typedef struct BDRSeqOptions
+{
+ int32 vl_len_; /* varlena header (do not touch directly!) */
+ int cache_chunks;
+} BDRSeqOptions;
+
Datum
bdr_sequence_options(PG_FUNCTION_ARGS)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
- bytea *result;
+ relopt_value *options;
+ BDRSeqOptions *sopts;
+ int numoptions;
+ static const relopt_parse_elt tab[] = {
+ {"cache_chunks", RELOPT_TYPE_INT,
+ offsetof(BDRSeqOptions, cache_chunks)}
+ };
- result = default_reloptions(reloptions, validate, RELOPT_KIND_SEQUENCE);
- if (result)
- PG_RETURN_BYTEA_P(result);
+ options = parseRelOptions(reloptions, validate, bdr_seq_relopt_kind, &numoptions);
- PG_RETURN_NULL();
+ /* if none set, we're done */
+ if (numoptions == 0)
+ PG_RETURN_NULL();
+
+ sopts = allocateReloptStruct(sizeof(BDRSeqOptions), options, numoptions);
+
+ fillRelOptions((void *) sopts, sizeof(BDRSeqOptions), options, numoptions,
+ validate, tab, lengthof(tab));
+
+ pfree(options);
/* schedule wakeup as soon as other xacts can see the seuqence */
bdr_schedule_eoxact_sequencer_wakeup();
- PG_RETURN_VOID();
+ if (sopts)
+ PG_RETURN_BYTEA_P((bytea *)sopts);
+
+ PG_RETURN_NULL();
}