bdr_catalogs.o \
bdr_conflict_handlers.o \
bdr_conflict_logging.o \
+ bdr_commandfilter.o \
bdr_compat.o \
bdr_count.o \
bdr_executor.o \
bdr_init_replica.o \
bdr_label.o \
+ bdr_locks.o \
bdr_output.o \
bdr_relcache.o
ifeq "@BUILDING_BDR@" "1"
OBJS += \
- bdr_commandfilter.o \
- bdr_locks.o \
bdr_seq.o
else
OBJS += \
'conflict_trigger_returned_tuple',
'last_update_wins_keep_local',
'last_update_wins_keep_remote',
+ 'apply_change',
+ 'skip_change',
'unhandled_tx_abort'
);
END;
$function$;
+CREATE OR REPLACE FUNCTION bdr.bdr_replicate_ddl_command(cmd TEXT)
+RETURNS VOID
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
DO $DO$BEGIN
IF bdr.bdr_variant() = 'BDR' THEN
LANGUAGE C
AS 'MODULE_PATHNAME';
+END IF;
END;$DO$;
+CREATE OR REPLACE FUNCTION bdr.bdr_truncate_trigger_add()
+RETURNS event_trigger
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
-- This type is tailored to use as input to get_object_address
CREATE TYPE bdr.dropped_object AS (
objtype text,
CREATE OR REPLACE FUNCTION bdr.queue_dropped_objects()
RETURNS event_trigger
- LANGUAGE plpgsql
- AS $function$
- DECLARE
- r RECORD;
- dropped bdr.dropped_object;
- otherobjs bdr.dropped_object[] = '{}';
- BEGIN
- -- don't recursively log drop commands
- IF bdr.bdr_replication_identifier_is_replaying() THEN
- RETURN;
- END IF;
-
- -- don't replicate if disabled
- IF current_setting('bdr.skip_ddl_replication')::bool THEN
- RETURN;
- END IF;
-
- FOR r IN SELECT * FROM pg_event_trigger_dropped_objects()
- LOOP
- IF r.original OR r.normal THEN
- dropped.objtype = r.object_type;
- dropped.objnames = r.address_names;
- dropped.objargs = r.address_args;
- otherobjs := otherobjs || dropped;
- RAISE LOG 'object is: %', dropped;
- END IF;
- END LOOP;
-
- IF otherobjs <> '{}' THEN
- INSERT INTO bdr.bdr_queued_drops (
- lsn, queued_at, dropped_objects
- )
- VALUES (pg_current_xlog_location(),
- NOW(),
- otherobjs
- );
- END IF;
- END;
- $function$;
+ LANGUAGE C
+ AS 'MODULE_PATHNAME', 'bdr_queue_dropped_objects';
CREATE EVENT TRIGGER queue_drops
ON sql_drop
EXECUTE PROCEDURE bdr.queue_dropped_objects();
+END IF;
END;$DO$;
CREATE OR REPLACE FUNCTION bdr_apply_pause()
AS 'MODULE_PATHNAME'
;
+---
+--- Replication identifier emulation
+---
DO $DO$BEGIN
IF bdr.bdr_variant() = 'UDR' THEN
rilocal_lsn pg_lsn
);
+ PERFORM pg_catalog.pg_extension_config_dump('bdr_replication_identifier', '');
+
CREATE UNIQUE INDEX bdr_replication_identifier_riiident_index ON bdr_replication_identifier(riident);
CREATE UNIQUE INDEX bdr_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
rilocal_lsn pg_lsn
);
+ PERFORM pg_catalog.pg_extension_config_dump('bdr_replication_identifier_pos', '');
+
CREATE UNIQUE INDEX bdr_replication_identifier_pos_riiident_index ON bdr_replication_identifier_pos(riident);
CREATE OR REPLACE FUNCTION bdr_replication_identifier_create(i_riname text) RETURNS Oid
END IF;
END;$DO$;
+CREATE EVENT TRIGGER bdr_truncate_trigger_add
+ON ddl_command_end
+EXECUTE PROCEDURE bdr.bdr_truncate_trigger_add();
+
RESET bdr.permit_unsafe_ddl_commands;
RESET bdr.skip_ddl_replication;
RESET search_path;
static void
bdr_perdb_worker_main(Datum main_arg)
{
- int rc;
+ int rc = 0;
List *apply_workers;
ListCell *c;
BdrPerdbWorker *bdr_perdb_worker;
bdr_saved_resowner = CurrentResourceOwner;
/* need to be able to perform writes ourselves */
-#ifdef BUILDING_BDR
bdr_executor_always_allow_writes(true);
bdr_locks_startup(bdr_perdb_worker->nnodes);
-#endif
/*
* Do we need to init the local DB from a remote node?
else
mydb = get_database_name(MyDatabaseId);
- /* look for the perdb worker's entries, they have the database name */
+ /* Look for the perdb worker's entries, they have the database name */
for (i = 0; i < bdr_max_workers; i++)
{
BdrWorker *worker;
}
}
+ /*
+ * Make sure nobody changes the replication slot list concurrently
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+ /* If no worker was found, try searching for slot with bdr output plugin */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
+
+ Oid database;
+ NameData plugin;
+
+ /* XXX: is the spinlock necessary? */
+ SpinLockAcquire(&slot->mutex);
+ if (!slot->in_use)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ else
+ {
+ database = slot->data.database;
+ namecpy(&plugin, &slot->data.plugin);
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (database == MyDatabaseId && strcmp(NameStr(plugin), "bdr") == 0)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * XXX: we might not wan't to set is_bdr_db here because slot can
+ * be dropped without restart.
+ */
+ is_bdr_db = true;
+ return true;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
is_bdr_db = false;
return false;
}
NULL, NULL, NULL);
-#ifdef BUILDING_BDR
DefineCustomBoolVariable("bdr.permit_unsafe_ddl_commands",
"Allow commands that might cause data or " \
"replication problems under BDR to run",
false, PGC_SUSET,
0,
NULL, NULL, NULL);
-#endif
DefineCustomBoolVariable("bdr.skip_ddl_replication",
"Internal. Set during local restore during init_replica only",
bdr_executor_init();
#ifdef BUILDING_BDR
bdr_sequencer_shmem_init(bdr_max_workers, bdr_distinct_dbnames_count);
+#endif
bdr_locks_shmem_init(bdr_distinct_dbnames_count);
/* Set up a ProcessUtility_hook to stop unsupported commands being run */
init_bdr_commandfilter();
-#endif
MemoryContextSwitchTo(old_context);
}
create_stmt.extname = (char *)"bdr";
CreateExtension(&create_stmt);
}
-
+ else
{
AlterExtensionStmt alter_stmt;
extern bool bdr_init_from_basedump;
extern bool bdr_log_conflicts_to_table;
extern bool bdr_conflict_logging_include_tuples;
-#ifdef BUILDING_BDR
extern bool bdr_permit_unsafe_commands;
-#else
+#ifdef BUILDING_UDR
extern bool bdr_conflict_default_apply;
#endif
extern ResourceOwner bdr_saved_resowner;
+/* DDL executor/filtering support */
+extern bool in_bdr_replicate_ddl_command;
+
/* bdr_nodes table oid */
extern Oid BdrNodesRelid;
extern Oid BdrConflictHistoryRelId;
/* forbid commands we do not support currently (or never will) */
extern void init_bdr_commandfilter(void);
+extern void bdr_commandfilter_always_allow_ddl(bool always_allow);
extern void bdr_executor_init(void);
extern void bdr_executor_always_allow_writes(bool always_allow);
extern void bdr_queue_ddl_command(char *command_tag, char *command);
+extern void bdr_execute_ddl_command(char *cmdstr, char *perpetrator, bool tx_just_started);
extern void bdr_locks_shmem_init(Size num_used_databases);
extern void bdr_locks_check_query(void);
errcontext("during DDL replay of ddl statement: %s", (char *) arg);
}
-static void
-process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+void
+bdr_execute_ddl_command(char *cmdstr, char *perpetrator, bool tx_just_started)
{
- Relation cmdsrel;
- Datum datum;
- char *command_tag;
- char *cmdstr;
- bool isnull;
- char *perpetrator;
List *commands;
ListCell *command_i;
bool isTopLevel;
MemoryContext oldcontext;
ErrorContextCallback errcallback;
- /* ----
- * We can't use spi here, because it implicitly assumes a transaction
- * context. As we want to be able to replicate CONCURRENTLY commands,
- * that's not going to work...
- * So instead do all the work manually, being careful about managing the
- * lifecycle of objects.
- * ----
- */
oldcontext = MemoryContextSwitchTo(MessageContext);
- cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
-
- /* fetch the perpetrator user identifier */
- datum = heap_getattr(cmdtup, 3,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
- RelationGetRelationName(cmdsrel));
- perpetrator = TextDatumGetCString(datum);
-
- /* fetch the command tag */
- datum = heap_getattr(cmdtup, 4,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null command tag in command tuple in \"%s\"",
- RelationGetRelationName(cmdsrel));
- command_tag = TextDatumGetCString(datum);
-
- /* finally fetch and execute the command */
- datum = heap_getattr(cmdtup, 5,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null command for \"%s\" command tuple", command_tag);
-
- cmdstr = TextDatumGetCString(datum);
-
- /* close relation, command execution might end/start xact */
- heap_close(cmdsrel, NoLock);
-
errcallback.callback = queued_command_error_callback;
errcallback.arg = cmdstr;
errcallback.previous = error_context_stack;
PopActiveSnapshot();
- portal = CreatePortal("", true, true);
+ portal = CreatePortal("bdr", true, true);
PortalDefineQuery(portal, NULL,
cmdstr, commandTag,
plantree_list, NULL);
}
+static void
+process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+{
+ Relation cmdsrel;
+ Datum datum;
+ char *command_tag;
+ char *cmdstr;
+ bool isnull;
+ char *perpetrator;
+ MemoryContext oldcontext;
+
+ /* ----
+ * We can't use spi here, because it implicitly assumes a transaction
+ * context. As we want to be able to replicate CONCURRENTLY commands,
+ * that's not going to work...
+ * So instead do all the work manually, being careful about managing the
+ * lifecycle of objects.
+ * ----
+ */
+ oldcontext = MemoryContextSwitchTo(MessageContext);
+
+ cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+
+ /* fetch the perpetrator user identifier */
+ datum = heap_getattr(cmdtup, 3,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
+ RelationGetRelationName(cmdsrel));
+ perpetrator = TextDatumGetCString(datum);
+
+ /* fetch the command tag */
+ datum = heap_getattr(cmdtup, 4,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null command tag in command tuple in \"%s\"",
+ RelationGetRelationName(cmdsrel));
+ command_tag = TextDatumGetCString(datum);
+
+ /* finally fetch and execute the command */
+ datum = heap_getattr(cmdtup, 5,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null command for \"%s\" command tuple", command_tag);
+
+ cmdstr = TextDatumGetCString(datum);
+
+ /* close relation, command execution might end/start xact */
+ heap_close(cmdsrel, NoLock);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ bdr_execute_ddl_command(cmdstr, perpetrator, tx_just_started);
+}
+
+
#ifdef BUILDING_BDR
/*
* ugly hack: Copied struct from dependency.c - there doesn't seem to be a
#include "miscadmin.h"
#include "access/heapam.h"
+#ifdef BUILDING_BDR
#include "access/seqam.h"
+#endif
#include "catalog/namespace.h"
#include "utils/rel.h"
#include "utils/syscache.h"
-static void error_unsupported_command(const char *cmdtag) __attribute__((noreturn));
-
/*
* bdr_commandfilter.c: a ProcessUtility_hook to prevent a cluster from running
* commands that BDR does not yet support.
/* GUCs */
bool bdr_permit_unsafe_commands = false;
+bool bdr_always_allow_ddl = false;
+
+static void error_unsupported_command(const char *cmdtag) __attribute__((noreturn));
+
/*
* Check the passed rangevar, locking it and looking it up in the cache
* then determine if the relation requires logging to WAL. If it does, then
static void
filter_CreateSeqStmt(Node *parsetree)
{
+#ifdef BUILDING_BDR
ListCell *param;
CreateSeqStmt *stmt;
errmsg("CREATE SEQUENCE ... %s is not supported for bdr sequences",
defel->defname)));
}
+#endif
}
static void
filter_AlterSeqStmt(Node *parsetree)
{
+#ifdef BUILDING_BDR
Oid seqoid;
ListCell *param;
AlterSeqStmt *stmt;
errmsg("ALTER SEQUENCE ... %s is not supported for bdr sequences",
defel->defname)));
}
+#endif
}
static void
goto done;
/* don't filter if explicitly told so */
- if (bdr_permit_unsafe_commands)
+ if (bdr_always_allow_ddl || bdr_permit_unsafe_commands)
goto done;
/* extension contents aren't individually replicated */
if (replication_origin_id != InvalidRepNodeId)
goto done;
- /* statements handled directly in standard_ProcessUtility */
+ /* commands we skip (for now) */
switch (nodeTag(parsetree))
{
case T_TransactionStmt:
case T_ReindexStmt:
goto done;
+ default:
+ break;
+ }
+
+#ifdef BUILDING_UDR
+ if (!in_bdr_replicate_ddl_command && bdr_is_bdr_activated_db())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DDL commands are not allowed when UDR is active unless bdr.permit_unsafe_ddl_commands is true")));
+#endif /*BUILDING_UDR*/
+
+ /* statements handled directly in standard_ProcessUtility */
+ switch (nodeTag(parsetree))
+ {
case T_DropStmt:
{
DropStmt *stmt = (DropStmt *) parsetree;
else
goto done;
}
+
default:
break;
}
dest, completionTag);
}
+void
+bdr_commandfilter_always_allow_ddl(bool always_allow)
+{
+ Assert(IsUnderPostmaster);
+ bdr_always_allow_ddl = always_allow;
+}
+
/* Module load */
void
init_bdr_commandfilter(void)
#include "executor/spi.h"
#include "executor/tuptable.h"
+#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
static ExecutorStart_hook_type PrevExecutorStart_hook = NULL;
static bool bdr_always_allow_writes = false;
+bool in_bdr_replicate_ddl_command = false;
#ifdef BUILDING_BDR
PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
-#else
-PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+PG_FUNCTION_INFO_V1(bdr_queue_dropped_objects);
#endif
-PG_FUNCTION_INFO_V1(bdr_add_truncate_trigger);
+PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+PG_FUNCTION_INFO_V1(bdr_truncate_trigger_add);
EState *
bdr_create_rel_estate(Relation rel)
/*
- * bdr_add_truncate_trigger
+ * bdr_truncate_trigger_add
*
* This function adds TRUNCATE trigger to newly created tables.
+ *
+ * Note: it's important that this function be named so that it comes
+ * after bdr_queue_ddl_commands when triggers are alphabetically sorted.
*/
Datum
-bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
+bdr_truncate_trigger_add(PG_FUNCTION_ARGS)
{
EventTriggerData *trigdata;
char *skip_ddl;
if (res != SPI_OK_UTILITY)
elog(ERROR, "SPI failure: %d", res);
+ /*
+ * If this is inside manually replicated DDL, the
+ * bdr_queue_ddl_commands will skip queueing the CREATE TRIGGER
+ * command, so we have to do it ourselves.
+ *
+ * XXX: The whole in_bdr_replicate_ddl_command concept is not very nice
+ */
+ if (in_bdr_replicate_ddl_command)
+ bdr_queue_ddl_command("CREATE TRIGGER", query);
+
SPI_finish();
}
uint32 nprocessed;
SPITupleTable *tuptable;
+ /*
+ * If the trigger comes from DDL executed by bdr_replicate_ddl_command,
+ * don't queue it as it would insert duplicate commands into the queue.
+ */
+ if (in_bdr_replicate_ddl_command)
+ PG_RETURN_VOID(); /* XXX return type? */
+
/*
* If we're currently replaying something from a remote node, don't queue
* the commands; that would cause recursion.
tuptable = SPI_tuptable;
for (i = 0; i < nprocessed; i++)
{
- HeapTuple newtup = NULL;
Datum cmdvalues[6]; /* # cols returned by above query */
bool cmdnulls[6];
- Datum values[5]; /* # cols in bdr_queued_commands */
- bool nulls[5];
MemoryContextReset(tupcxt);
PG_RETURN_VOID();
}
-#else
+
+/*
+ * bdr_queue_dropped_objects
+ * sql_drop event triggger handler for BDR
+ *
+ * This function queues DROPs for replay by other BDR nodes.
+ */
+Datum
+bdr_queue_dropped_objects(PG_FUNCTION_ARGS)
+{
+ char *skip_ddl;
+ int res;
+ int i;
+ Oid schema_oid;
+ Oid elmtype;
+ int16 elmlen;
+ bool elmbyval;
+ char elmalign;
+ int droppedcnt = 0;
+ Datum *droppedobjs;
+ ArrayType *droppedarr;
+ TupleDesc tupdesc;
+ uint32 nprocessed;
+ SPITupleTable *tuptable;
+
+ if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) /* internal error */
+ elog(ERROR, "%s: not fired by event trigger manager",
+ "bdr_queue_dropped_objects");
+
+ /*
+ * If the trigger comes from DDL executed by bdr_replicate_ddl_command,
+ * don't queue it as it would insert duplicate commands into the queue.
+ */
+ if (in_bdr_replicate_ddl_command)
+ PG_RETURN_VOID(); /* XXX return type? */
+
+ /*
+ * If we're currently replaying something from a remote node, don't queue
+ * the commands; that would cause recursion.
+ */
+ if (replication_origin_id != InvalidRepNodeId)
+ PG_RETURN_VOID(); /* XXX return type? */
+
+ /*
+ * Similarly, if configured to skip queueing DDL, don't queue. This is
+ * mostly used when pg_restore brings a remote node state, so all objects
+ * will be copied over in the dump anyway.
+ */
+ skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+ if (strcmp(skip_ddl, "on") == 0)
+ PG_RETURN_VOID();
+
+ /*
+ * Connect to SPI early, so that all memory allocated in this routine is
+ * released when we disconnect.
+ */
+ SPI_connect();
+
+ res = SPI_execute("SELECT "
+ " original, normal, object_type, "
+ " address_names, address_args "
+ "FROM pg_event_trigger_dropped_objects()",
+ false, 0);
+ if (res != SPI_OK_SELECT)
+ elog(ERROR, "SPI query failed: %d", res);
+
+ /*
+ * Build array of dropped objects based on the results of the query.
+ */
+ nprocessed = SPI_processed;
+ tuptable = SPI_tuptable;
+
+ droppedobjs = (Datum *) palloc(sizeof(Datum) * nprocessed);
+
+ schema_oid = get_namespace_oid("bdr", false);
+ elmtype = bdr_lookup_relid("dropped_object", schema_oid);
+ elmtype = get_rel_type_id(elmtype);
+
+ get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
+ tupdesc = TypeGetTupleDesc(elmtype, NIL);
+
+ for (i = 0; i < nprocessed; i++)
+ {
+ Datum cmdvalues[5]; /* # cols returned by above query */
+ bool cmdnulls[5];
+ Datum values[3];
+ bool nulls[3];
+ HeapTuple tuple;
+
+ /* this is the tuple reported by event triggers */
+ heap_deform_tuple(tuptable->vals[i], tuptable->tupdesc,
+ cmdvalues, cmdnulls);
+
+ /* if not original or normal skip */
+ if ((cmdnulls[0] || !DatumGetBool(cmdvalues[0])) &&
+ (cmdnulls[1] || !DatumGetBool(cmdvalues[1])))
+ continue;
+
+ nulls[0] = cmdnulls[2];
+ nulls[1] = cmdnulls[3];
+ nulls[2] = cmdnulls[4];
+ values[0] = cmdvalues[2];
+ values[1] = cmdvalues[3];
+ values[2] = cmdvalues[4];
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ droppedobjs[droppedcnt] = HeapTupleGetDatum(tuple);
+ droppedcnt++;
+ }
+
+ SPI_finish();
+
+ /* No objects dropped? */
+ if (droppedcnt == 0)
+ PG_RETURN_VOID();
+
+ droppedarr = construct_array(droppedobjs, droppedcnt,
+ elmtype, elmlen, elmbyval, elmalign);
+
+ /*
+ * Insert the dropped object(s) info into the bdr_queued_drops table
+ */
+ {
+ EState *estate;
+ TupleTableSlot *slot;
+ RangeVar *rv;
+ Relation queuedcmds;
+ HeapTuple newtup = NULL;
+ Datum values[5];
+ bool nulls[5];
+
+ /*
+ * Prepare bdr.bdr_queued_drops for insert.
+ * Can't use preloaded table oid since this method is executed under
+ * normal backends and not inside BDR worker.
+ * The tuple slot here is only needed for updating indexes.
+ */
+ rv = makeRangeVar("bdr", "bdr_queued_drops", -1);
+ queuedcmds = heap_openrv(rv, RowExclusiveLock);
+ slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+ estate = bdr_create_rel_estate(queuedcmds);
+ ExecOpenIndices(estate->es_result_relation_info);
+
+ /* lsn, queued_at, dropped_objects */
+ values[0] = pg_current_xlog_location(NULL);
+ values[1] = now(NULL);
+ values[2] = PointerGetDatum(droppedarr);
+ MemSet(nulls, 0, sizeof(nulls));
+
+ newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+ simple_heap_insert(queuedcmds, newtup);
+ ExecStoreTuple(newtup, slot, InvalidBuffer, false);
+ UserTableUpdateOpenIndexes(estate, slot);
+
+ ExecCloseIndices(estate->es_result_relation_info);
+ ExecDropSingleTupleTableSlot(slot);
+ heap_close(queuedcmds, RowExclusiveLock);
+ }
+
+ PG_RETURN_VOID();
+}
+#endif /*BUILDING_BDR*/
+
/*
* bdr_replicate_ddl_command
*
* Queues the input SQL for replication.
+ *
+ * Note that we don't allow CONCURRENTLY commands here, this is mainly because
+ * we queue command before we actually execute it, which we currently need
+ * to make the bdr_truncate_trigger_add work correctly. As written there
+ * the in_bdr_replicate_ddl_command concept is ugly.
*/
Datum
bdr_replicate_ddl_command(PG_FUNCTION_ARGS)
{
text *command = PG_GETARG_TEXT_PP(0);
- char *query;
+ char *query = text_to_cstring(command);
- /* XXX: handle more nicely */
- query = psprintf("SET LOCAL search_path TO %s;\n %s",
- GetConfigOptionByName("search_path", NULL),
- TextDatumGetCString(command));
+ /* Force everything in the query to be fully qualified. */
+ (void) set_config_option("search_path", "",
+ PGC_USERSET, PGC_S_SESSION,
+ GUC_ACTION_SAVE, true, 0);
- bdr_queue_ddl_command("SQL", query);
+ /* Execute the query locally. */
+ bdr_commandfilter_always_allow_ddl(true);
+ in_bdr_replicate_ddl_command = true;
+
+ PG_TRY();
+ /* Queue the query for replication. */
+ bdr_queue_ddl_command("SQL", query);
+
+ /* Execute the query locally. */
+ bdr_execute_ddl_command(query, GetUserNameFromId(GetUserId()), false);
+ PG_CATCH();
+ in_bdr_replicate_ddl_command = false;
+ bdr_commandfilter_always_allow_ddl(false);
+ PG_RE_THROW();
+ PG_END_TRY();
+
+ in_bdr_replicate_ddl_command = false;
+ bdr_commandfilter_always_allow_ddl(false);
PG_RETURN_VOID();
}
-#endif //BUILDING_BDR
void
bdr_executor_always_allow_writes(bool always_allow)
{
-#ifdef BUILDING_BDR
Assert(IsUnderPostmaster);
bdr_always_allow_writes = always_allow;
-#endif
}
/*
#include "postgres.h"
#include "bdr.h"
+
#include "bdr_locks.h"
+#ifdef BUILDING_BDR
+
#include "miscadmin.h"
#include "access/xact.h"
}
}
+
+#else
+
+/* bdr_locks are not used by UDR at the moment */
+void
+bdr_locks_startup(Size nnodes)
+{
+}
+
+void
+bdr_locks_shmem_init(Size num_used_databases)
+{
+}
+
+void
+bdr_acquire_ddl_lock(void)
+{
+}
+#endif
Column | Type | Modifiers | Storage | Stats target | Description
--------+---------+-----------+---------+--------------+-------------
val | integer | | plain | |
-Triggers:
- truncate_trigger AFTER TRUNCATE ON test_tbl_unlogged_create FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()
\c postgres
\d+ test_tbl_unlogged_create
Column | Type | Modifiers | Storage | Stats target | Description
--------+---------+-----------+---------+--------------+-------------
val | integer | | plain | |
-Triggers:
- truncate_trigger AFTER TRUNCATE ON test_tbl_unlogged_create FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()
DROP TABLE test_tbl_unlogged_create;
SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;