bdr: Overhauld of bdr_replicate_ddl_command
authorPetr Jelinek <pjmodos@pjmodos.net>
Fri, 17 Oct 2014 08:50:31 +0000 (10:50 +0200)
committerPetr Jelinek <pjmodos@pjmodos.net>
Fri, 21 Nov 2014 17:21:56 +0000 (18:21 +0100)
* execute the ddl on local node
* force the input query to be fully qualified
* use command filter to disallow dangerous DDL commands even when using
  bdr_replicate_ddl_command unless overriden by the GUC
* make it work in BDR and make sure it does not do double replication of
  the DDL
* rewrite of queue_dropped_objects into C
* rename the drop trigger so that the order of execution is correct
* recognize bdr node by replication slot with bdr output plugin

Makefile.in
bdr--0.8.0.sql
bdr.c
bdr.h
bdr_apply.c
bdr_commandfilter.c
bdr_executor.c
bdr_locks.c
expected/ddl/create.out

index 9b8cce56d8b48f9269435f12034a8da6dca6a1f1..dee8cda70cbadc2be4d69e914daa76c7a5f9a86b 100644 (file)
@@ -23,18 +23,18 @@ OBJS = \
    bdr_catalogs.o \
    bdr_conflict_handlers.o \
    bdr_conflict_logging.o \
+   bdr_commandfilter.o \
    bdr_compat.o \
    bdr_count.o \
    bdr_executor.o \
    bdr_init_replica.o \
    bdr_label.o \
+   bdr_locks.o \
    bdr_output.o \
    bdr_relcache.o
 
 ifeq "@BUILDING_BDR@" "1"
 OBJS += \
-   bdr_commandfilter.o \
-   bdr_locks.o \
    bdr_seq.o
 else
 OBJS += \
index d114d9ef3191bb86e73d4c2307a894505bf7d662..713360f03eb666388eafab44869fcb0cce05ee48 100644 (file)
@@ -249,6 +249,8 @@ CREATE TYPE bdr_conflict_resolution AS ENUM
     'conflict_trigger_returned_tuple',
     'last_update_wins_keep_local',
     'last_update_wins_keep_remote',
+   'apply_change',
+   'skip_change',
     'unhandled_tx_abort'
 );
 
@@ -418,6 +420,12 @@ BEGIN
 END;
 $function$;
 
+CREATE OR REPLACE FUNCTION bdr.bdr_replicate_ddl_command(cmd TEXT)
+RETURNS VOID
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
 DO $DO$BEGIN
 IF bdr.bdr_variant() = 'BDR' THEN
 
@@ -426,8 +434,15 @@ IF bdr.bdr_variant() = 'BDR' THEN
    LANGUAGE C
    AS 'MODULE_PATHNAME';
 
+END IF;
 END;$DO$;
 
+CREATE OR REPLACE FUNCTION bdr.bdr_truncate_trigger_add()
+RETURNS event_trigger
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
 -- This type is tailored to use as input to get_object_address
 CREATE TYPE bdr.dropped_object AS (
     objtype text,
@@ -448,50 +463,14 @@ IF bdr.bdr_variant() = 'BDR' THEN
 
    CREATE OR REPLACE FUNCTION bdr.queue_dropped_objects()
    RETURNS event_trigger
-   LANGUAGE plpgsql
-   AS $function$
-   DECLARE
-       r RECORD;
-       dropped bdr.dropped_object;
-       otherobjs bdr.dropped_object[] = '{}';
-   BEGIN
-       -- don't recursively log drop commands
-       IF bdr.bdr_replication_identifier_is_replaying() THEN
-          RETURN;
-       END IF;
-
-       -- don't replicate if disabled
-       IF  current_setting('bdr.skip_ddl_replication')::bool THEN
-          RETURN;
-       END IF;
-
-       FOR r IN SELECT * FROM pg_event_trigger_dropped_objects()
-       LOOP
-           IF r.original OR r.normal THEN
-               dropped.objtype = r.object_type;
-               dropped.objnames = r.address_names;
-               dropped.objargs = r.address_args;
-               otherobjs := otherobjs || dropped;
-               RAISE LOG 'object is: %', dropped;
-           END IF;
-       END LOOP;
-
-       IF otherobjs <> '{}' THEN
-           INSERT INTO bdr.bdr_queued_drops (
-               lsn, queued_at, dropped_objects
-           )
-           VALUES (pg_current_xlog_location(),
-               NOW(),
-               otherobjs
-           );
-       END IF;
-   END;
-   $function$;
+   LANGUAGE C
+   AS 'MODULE_PATHNAME', 'bdr_queue_dropped_objects';
 
    CREATE EVENT TRIGGER queue_drops
    ON sql_drop
    EXECUTE PROCEDURE bdr.queue_dropped_objects();
 
+END IF;
 END;$DO$;
 
 CREATE OR REPLACE FUNCTION bdr_apply_pause()
@@ -506,6 +485,9 @@ LANGUAGE C
 AS 'MODULE_PATHNAME'
 ;
 
+---
+--- Replication identifier emulation
+---
 DO $DO$BEGIN
 IF bdr.bdr_variant() = 'UDR' THEN
 
@@ -516,6 +498,8 @@ IF bdr.bdr_variant() = 'UDR' THEN
        rilocal_lsn pg_lsn
    );
 
+   PERFORM pg_catalog.pg_extension_config_dump('bdr_replication_identifier', '');
+
    CREATE UNIQUE INDEX bdr_replication_identifier_riiident_index ON bdr_replication_identifier(riident);
    CREATE UNIQUE INDEX bdr_replication_identifier_riname_index ON bdr_replication_identifier(riname varchar_pattern_ops);
 
@@ -525,6 +509,8 @@ IF bdr.bdr_variant() = 'UDR' THEN
        rilocal_lsn pg_lsn
    );
 
+   PERFORM pg_catalog.pg_extension_config_dump('bdr_replication_identifier_pos', '');
+
    CREATE UNIQUE INDEX bdr_replication_identifier_pos_riiident_index ON bdr_replication_identifier_pos(riident);
 
    CREATE OR REPLACE FUNCTION bdr_replication_identifier_create(i_riname text) RETURNS Oid
@@ -651,6 +637,10 @@ IF bdr.bdr_variant() = 'BDR' THEN
 END IF;
 END;$DO$;
 
+CREATE EVENT TRIGGER bdr_truncate_trigger_add
+ON ddl_command_end
+EXECUTE PROCEDURE bdr.bdr_truncate_trigger_add();
+
 RESET bdr.permit_unsafe_ddl_commands;
 RESET bdr.skip_ddl_replication;
 RESET search_path;
diff --git a/bdr.c b/bdr.c
index 455847a968cab3593c85b8b607733dc682ed0d00..6db570fb047cf465e342a5dd85320dd02d1a0ba8 100644 (file)
--- a/bdr.c
+++ b/bdr.c
@@ -988,7 +988,7 @@ bdr_launch_apply_workers(char *dbname)
 static void
 bdr_perdb_worker_main(Datum main_arg)
 {
-   int               rc;
+   int               rc = 0;
    List             *apply_workers;
    ListCell         *c;
    BdrPerdbWorker   *bdr_perdb_worker;
@@ -1016,10 +1016,8 @@ bdr_perdb_worker_main(Datum main_arg)
    bdr_saved_resowner = CurrentResourceOwner;
 
    /* need to be able to perform writes ourselves */
-#ifdef BUILDING_BDR
    bdr_executor_always_allow_writes(true);
    bdr_locks_startup(bdr_perdb_worker->nnodes);
-#endif
 
    /*
     * Do we need to init the local DB from a remote node?
@@ -1437,7 +1435,7 @@ bdr_is_bdr_activated_db(void)
    else
        mydb = get_database_name(MyDatabaseId);
 
-   /* look for the perdb worker's entries, they have the database name */
+   /* Look for the perdb worker's entries, they have the database name */
    for (i = 0; i < bdr_max_workers; i++)
    {
        BdrWorker *worker;
@@ -1457,6 +1455,48 @@ bdr_is_bdr_activated_db(void)
        }
    }
 
+   /*
+    * Make sure nobody changes the replication slot list concurrently
+    */
+   LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+   /* If no worker was found, try searching for slot with bdr output plugin */
+   for (i = 0; i < max_replication_slots; i++)
+   {
+       ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
+
+       Oid         database;
+       NameData    plugin;
+
+       /* XXX: is the spinlock necessary? */
+       SpinLockAcquire(&slot->mutex);
+       if (!slot->in_use)
+       {
+           SpinLockRelease(&slot->mutex);
+           continue;
+       }
+       else
+       {
+           database = slot->data.database;
+           namecpy(&plugin, &slot->data.plugin);
+       }
+       SpinLockRelease(&slot->mutex);
+
+       if (database == MyDatabaseId && strcmp(NameStr(plugin), "bdr") == 0)
+       {
+           LWLockRelease(ReplicationSlotControlLock);
+
+           /*
+            * XXX: we might not wan't to set is_bdr_db here because slot can
+            * be dropped without restart.
+            */
+           is_bdr_db = true;
+           return true;
+       }
+   }
+
+   LWLockRelease(ReplicationSlotControlLock);
+
    is_bdr_db = false;
    return false;
 }
@@ -1567,7 +1607,6 @@ _PG_init(void)
                            NULL, NULL, NULL);
 
 
-#ifdef BUILDING_BDR
    DefineCustomBoolVariable("bdr.permit_unsafe_ddl_commands",
                             "Allow commands that might cause data or " \
                             "replication problems under BDR to run",
@@ -1576,7 +1615,6 @@ _PG_init(void)
                             false, PGC_SUSET,
                             0,
                             NULL, NULL, NULL);
-#endif
 
    DefineCustomBoolVariable("bdr.skip_ddl_replication",
                             "Internal. Set during local restore during init_replica only",
@@ -1779,10 +1817,10 @@ out:
    bdr_executor_init();
 #ifdef BUILDING_BDR
    bdr_sequencer_shmem_init(bdr_max_workers, bdr_distinct_dbnames_count);
+#endif
    bdr_locks_shmem_init(bdr_distinct_dbnames_count);
    /* Set up a ProcessUtility_hook to stop unsupported commands being run */
    init_bdr_commandfilter();
-#endif
 
    MemoryContextSwitchTo(old_context);
 }
@@ -1863,7 +1901,7 @@ bdr_maintain_schema(void)
        create_stmt.extname = (char *)"bdr";
        CreateExtension(&create_stmt);
    }
-
+   else
    {
        AlterExtensionStmt alter_stmt;
 
diff --git a/bdr.h b/bdr.h
index 08c8af19914dada0ec26ec1564609b15d17f3aa1..a5d6c0bc76d5220be29eca107c7f7fb3dbc23528 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -242,9 +242,8 @@ extern char *bdr_temp_dump_directory;
 extern bool bdr_init_from_basedump;
 extern bool bdr_log_conflicts_to_table;
 extern bool bdr_conflict_logging_include_tuples;
-#ifdef BUILDING_BDR
 extern bool bdr_permit_unsafe_commands;
-#else
+#ifdef BUILDING_UDR
 extern bool bdr_conflict_default_apply;
 #endif
 
@@ -266,6 +265,9 @@ extern BdrWorkerControl *BdrWorkerCtl;
 
 extern ResourceOwner bdr_saved_resowner;
 
+/* DDL executor/filtering support */
+extern bool in_bdr_replicate_ddl_command;
+
 /* bdr_nodes table oid */
 extern Oid BdrNodesRelid;
 extern Oid  BdrConflictHistoryRelId;
@@ -407,10 +409,12 @@ extern bool bdr_is_bdr_activated_db(void);
 
 /* forbid commands we do not support currently (or never will) */
 extern void init_bdr_commandfilter(void);
+extern void bdr_commandfilter_always_allow_ddl(bool always_allow);
 
 extern void bdr_executor_init(void);
 extern void bdr_executor_always_allow_writes(bool always_allow);
 extern void bdr_queue_ddl_command(char *command_tag, char *command);
+extern void bdr_execute_ddl_command(char *cmdstr, char *perpetrator, bool tx_just_started);
 
 extern void bdr_locks_shmem_init(Size num_used_databases);
 extern void bdr_locks_check_query(void);
index b76f2cc2f47f39598fb18e9836b313406d344c23..182a4234a95d7b00cf24c8d60a1416b8e06a4ed5 100644 (file)
@@ -1385,63 +1385,17 @@ queued_command_error_callback(void *arg)
    errcontext("during DDL replay of ddl statement: %s", (char *) arg);
 }
 
-static void
-process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+void
+bdr_execute_ddl_command(char *cmdstr, char *perpetrator, bool tx_just_started)
 {
-   Relation    cmdsrel;
-   Datum       datum;
-   char       *command_tag;
-   char       *cmdstr;
-   bool        isnull;
-   char       *perpetrator;
    List       *commands;
    ListCell   *command_i;
    bool        isTopLevel;
    MemoryContext oldcontext;
    ErrorContextCallback errcallback;
 
-   /* ----
-    * We can't use spi here, because it implicitly assumes a transaction
-    * context. As we want to be able to replicate CONCURRENTLY commands,
-    * that's not going to work...
-    * So instead do all the work manually, being careful about managing the
-    * lifecycle of objects.
-    * ----
-    */
    oldcontext = MemoryContextSwitchTo(MessageContext);
 
-   cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
-
-   /* fetch the perpetrator user identifier */
-   datum = heap_getattr(cmdtup, 3,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
-            RelationGetRelationName(cmdsrel));
-   perpetrator = TextDatumGetCString(datum);
-
-   /* fetch the command tag */
-   datum = heap_getattr(cmdtup, 4,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null command tag in command tuple in \"%s\"",
-            RelationGetRelationName(cmdsrel));
-   command_tag = TextDatumGetCString(datum);
-
-   /* finally fetch and execute the command */
-   datum = heap_getattr(cmdtup, 5,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null command for \"%s\" command tuple", command_tag);
-
-   cmdstr = TextDatumGetCString(datum);
-
-   /* close relation, command execution might end/start xact */
-   heap_close(cmdsrel, NoLock);
-
    errcallback.callback = queued_command_error_callback;
    errcallback.arg = cmdstr;
    errcallback.previous = error_context_stack;
@@ -1489,7 +1443,7 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 
        PopActiveSnapshot();
 
-       portal = CreatePortal("", true, true);
+       portal = CreatePortal("bdr", true, true);
        PortalDefineQuery(portal, NULL,
                          cmdstr, commandTag,
                          plantree_list, NULL);
@@ -1516,6 +1470,65 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 }
 
 
+static void
+process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+{
+   Relation    cmdsrel;
+   Datum       datum;
+   char       *command_tag;
+   char       *cmdstr;
+   bool        isnull;
+   char       *perpetrator;
+   MemoryContext oldcontext;
+
+   /* ----
+    * We can't use spi here, because it implicitly assumes a transaction
+    * context. As we want to be able to replicate CONCURRENTLY commands,
+    * that's not going to work...
+    * So instead do all the work manually, being careful about managing the
+    * lifecycle of objects.
+    * ----
+    */
+   oldcontext = MemoryContextSwitchTo(MessageContext);
+
+   cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+
+   /* fetch the perpetrator user identifier */
+   datum = heap_getattr(cmdtup, 3,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
+            RelationGetRelationName(cmdsrel));
+   perpetrator = TextDatumGetCString(datum);
+
+   /* fetch the command tag */
+   datum = heap_getattr(cmdtup, 4,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null command tag in command tuple in \"%s\"",
+            RelationGetRelationName(cmdsrel));
+   command_tag = TextDatumGetCString(datum);
+
+   /* finally fetch and execute the command */
+   datum = heap_getattr(cmdtup, 5,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null command for \"%s\" command tuple", command_tag);
+
+   cmdstr = TextDatumGetCString(datum);
+
+   /* close relation, command execution might end/start xact */
+   heap_close(cmdsrel, NoLock);
+
+   MemoryContextSwitchTo(oldcontext);
+
+   bdr_execute_ddl_command(cmdstr, perpetrator, tx_just_started);
+}
+
+
 #ifdef BUILDING_BDR
 /*
  * ugly hack: Copied struct from dependency.c - there doesn't seem to be a
index 600a8d660e31eaef4bd4856eee1618947ef483fc..07bf342af77937fb1ca0abc791095e084102720c 100644 (file)
@@ -20,7 +20,9 @@
 #include "miscadmin.h"
 
 #include "access/heapam.h"
+#ifdef BUILDING_BDR
 #include "access/seqam.h"
+#endif
 
 #include "catalog/namespace.h"
 
@@ -39,8 +41,6 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-static void error_unsupported_command(const char *cmdtag) __attribute__((noreturn));
-
 /*
 * bdr_commandfilter.c: a ProcessUtility_hook to prevent a cluster from running
 * commands that BDR does not yet support.
@@ -51,6 +51,10 @@ static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
 /* GUCs */
 bool bdr_permit_unsafe_commands = false;
 
+bool bdr_always_allow_ddl = false;
+
+static void error_unsupported_command(const char *cmdtag) __attribute__((noreturn));
+
 /*
 * Check the passed rangevar, locking it and looking it up in the cache
 * then determine if the relation requires logging to WAL. If it does, then
@@ -357,6 +361,7 @@ filter_AlterTableStmt(Node *parsetree,
 static void
 filter_CreateSeqStmt(Node *parsetree)
 {
+#ifdef BUILDING_BDR
    ListCell       *param;
    CreateSeqStmt  *stmt;
 
@@ -375,11 +380,13 @@ filter_CreateSeqStmt(Node *parsetree)
                     errmsg("CREATE SEQUENCE ... %s is not supported for bdr sequences",
                    defel->defname)));
    }
+#endif
 }
 
 static void
 filter_AlterSeqStmt(Node *parsetree)
 {
+#ifdef BUILDING_BDR
    Oid             seqoid;
    ListCell       *param;
    AlterSeqStmt   *stmt;
@@ -425,6 +432,7 @@ filter_AlterSeqStmt(Node *parsetree)
                     errmsg("ALTER SEQUENCE ... %s is not supported for bdr sequences",
                    defel->defname)));
    }
+#endif
 }
 
 static void
@@ -450,7 +458,7 @@ bdr_commandfilter(Node *parsetree,
        goto done;
 
    /* don't filter if explicitly told so */
-   if (bdr_permit_unsafe_commands)
+   if (bdr_always_allow_ddl || bdr_permit_unsafe_commands)
        goto done;
 
    /* extension contents aren't individually replicated */
@@ -461,7 +469,7 @@ bdr_commandfilter(Node *parsetree,
    if (replication_origin_id != InvalidRepNodeId)
        goto done;
 
-   /* statements handled directly in standard_ProcessUtility */
+   /* commands we skip (for now) */
    switch (nodeTag(parsetree))
    {
        case T_TransactionStmt:
@@ -508,6 +516,20 @@ bdr_commandfilter(Node *parsetree,
        case T_ReindexStmt:
            goto done;
 
+       default:
+           break;
+   }
+
+#ifdef BUILDING_UDR
+   if (!in_bdr_replicate_ddl_command && bdr_is_bdr_activated_db())
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("DDL commands are not allowed when UDR is active unless bdr.permit_unsafe_ddl_commands is true")));
+#endif /*BUILDING_UDR*/
+
+   /* statements handled directly in standard_ProcessUtility */
+   switch (nodeTag(parsetree))
+   {
        case T_DropStmt:
            {
                DropStmt   *stmt = (DropStmt *) parsetree;
@@ -544,6 +566,7 @@ bdr_commandfilter(Node *parsetree,
                else
                    goto done;
            }
+
        default:
            break;
    }
@@ -741,6 +764,13 @@ done:
                                dest, completionTag);
 }
 
+void
+bdr_commandfilter_always_allow_ddl(bool always_allow)
+{
+   Assert(IsUnderPostmaster);
+   bdr_always_allow_ddl = always_allow;
+}
+
 /* Module load */
 void
 init_bdr_commandfilter(void)
index 7b7743a14bdaea4caf5da6bce5d6b6030deb39b0..8a18d951989d111d0e9b517fb6daa4270367c1a5 100644 (file)
@@ -33,6 +33,7 @@
 #include "executor/spi.h"
 #include "executor/tuptable.h"
 
+#include "funcapi.h"
 #include "miscadmin.h"
 
 #include "nodes/execnodes.h"
@@ -58,13 +59,14 @@ static void BdrExecutorStart(QueryDesc *queryDesc, int eflags);
 static ExecutorStart_hook_type PrevExecutorStart_hook = NULL;
 
 static bool bdr_always_allow_writes = false;
+bool in_bdr_replicate_ddl_command = false;
 
 #ifdef BUILDING_BDR
 PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
-#else
-PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+PG_FUNCTION_INFO_V1(bdr_queue_dropped_objects);
 #endif
-PG_FUNCTION_INFO_V1(bdr_add_truncate_trigger);
+PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+PG_FUNCTION_INFO_V1(bdr_truncate_trigger_add);
 
 EState *
 bdr_create_rel_estate(Relation rel)
@@ -359,12 +361,15 @@ bdr_queue_ddl_command(char *command_tag, char *command)
 
 
 /*
- * bdr_add_truncate_trigger
+ * bdr_truncate_trigger_add
  *
  * This function adds TRUNCATE trigger to newly created tables.
+ *
+ * Note: it's important that this function be named so that it comes
+ * after bdr_queue_ddl_commands when triggers are alphabetically sorted.
  */
 Datum
-bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
+bdr_truncate_trigger_add(PG_FUNCTION_ARGS)
 {
    EventTriggerData   *trigdata;
    char               *skip_ddl;
@@ -415,6 +420,16 @@ bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
        if (res != SPI_OK_UTILITY)
            elog(ERROR, "SPI failure: %d", res);
 
+       /*
+        * If this is inside manually replicated DDL, the
+        * bdr_queue_ddl_commands will skip queueing the CREATE TRIGGER
+        * command, so we have to do it ourselves.
+        *
+        * XXX: The whole in_bdr_replicate_ddl_command concept is not very nice
+        */
+       if (in_bdr_replicate_ddl_command)
+           bdr_queue_ddl_command("CREATE TRIGGER", query);
+
        SPI_finish();
    }
 
@@ -439,6 +454,13 @@ bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
    uint32  nprocessed;
    SPITupleTable *tuptable;
 
+   /*
+    * If the trigger comes from DDL executed by bdr_replicate_ddl_command,
+    * don't queue it as it would insert duplicate commands into the queue.
+    */
+   if (in_bdr_replicate_ddl_command)
+       PG_RETURN_VOID();   /* XXX return type? */
+
    /*
     * If we're currently replaying something from a remote node, don't queue
     * the commands; that would cause recursion.
@@ -489,11 +511,8 @@ bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
    tuptable = SPI_tuptable;
    for (i = 0; i < nprocessed; i++)
    {
-       HeapTuple   newtup = NULL;
        Datum       cmdvalues[6];   /* # cols returned by above query */
        bool        cmdnulls[6];
-       Datum       values[5];      /* # cols in bdr_queued_commands */
-       bool        nulls[5];
 
        MemoryContextReset(tupcxt);
 
@@ -518,36 +537,217 @@ bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
 
    PG_RETURN_VOID();
 }
-#else
+
+/*
+ * bdr_queue_dropped_objects
+ *         sql_drop event triggger handler for BDR
+ *
+ * This function queues DROPs for replay by other BDR nodes.
+ */
+Datum
+bdr_queue_dropped_objects(PG_FUNCTION_ARGS)
+{
+   char       *skip_ddl;
+   int         res;
+   int         i;
+   Oid         schema_oid;
+   Oid         elmtype;
+   int16       elmlen;
+   bool        elmbyval;
+   char        elmalign;
+   int         droppedcnt = 0;
+   Datum      *droppedobjs;
+   ArrayType  *droppedarr;
+   TupleDesc   tupdesc;
+   uint32      nprocessed;
+   SPITupleTable *tuptable;
+
+   if (!CALLED_AS_EVENT_TRIGGER(fcinfo))  /* internal error */
+       elog(ERROR, "%s: not fired by event trigger manager",
+            "bdr_queue_dropped_objects");
+
+   /*
+    * If the trigger comes from DDL executed by bdr_replicate_ddl_command,
+    * don't queue it as it would insert duplicate commands into the queue.
+    */
+   if (in_bdr_replicate_ddl_command)
+       PG_RETURN_VOID();   /* XXX return type? */
+
+   /*
+    * If we're currently replaying something from a remote node, don't queue
+    * the commands; that would cause recursion.
+    */
+   if (replication_origin_id != InvalidRepNodeId)
+       PG_RETURN_VOID();   /* XXX return type? */
+
+   /*
+    * Similarly, if configured to skip queueing DDL, don't queue.  This is
+    * mostly used when pg_restore brings a remote node state, so all objects
+    * will be copied over in the dump anyway.
+    */
+   skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+   if (strcmp(skip_ddl, "on") == 0)
+       PG_RETURN_VOID();
+
+   /*
+    * Connect to SPI early, so that all memory allocated in this routine is
+    * released when we disconnect.
+    */
+   SPI_connect();
+
+   res = SPI_execute("SELECT "
+                     "   original, normal, object_type, "
+                     "   address_names, address_args "
+                     "FROM pg_event_trigger_dropped_objects()",
+                     false, 0);
+   if (res != SPI_OK_SELECT)
+       elog(ERROR, "SPI query failed: %d", res);
+
+   /*
+    * Build array of dropped objects based on the results of the query.
+    */
+   nprocessed = SPI_processed;
+   tuptable = SPI_tuptable;
+
+   droppedobjs = (Datum *) palloc(sizeof(Datum) * nprocessed);
+
+   schema_oid = get_namespace_oid("bdr", false);
+   elmtype = bdr_lookup_relid("dropped_object", schema_oid);
+   elmtype = get_rel_type_id(elmtype);
+
+   get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
+   tupdesc = TypeGetTupleDesc(elmtype, NIL);
+
+   for (i = 0; i < nprocessed; i++)
+   {
+       Datum       cmdvalues[5];   /* # cols returned by above query */
+       bool        cmdnulls[5];
+       Datum       values[3];
+       bool        nulls[3];
+       HeapTuple   tuple;
+
+       /* this is the tuple reported by event triggers */
+       heap_deform_tuple(tuptable->vals[i], tuptable->tupdesc,
+                         cmdvalues, cmdnulls);
+
+       /* if not original or normal skip */
+       if ((cmdnulls[0] || !DatumGetBool(cmdvalues[0])) &&
+           (cmdnulls[1] || !DatumGetBool(cmdvalues[1])))
+           continue;
+
+       nulls[0] = cmdnulls[2];
+       nulls[1] = cmdnulls[3];
+       nulls[2] = cmdnulls[4];
+       values[0] = cmdvalues[2];
+       values[1] = cmdvalues[3];
+       values[2] = cmdvalues[4];
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       droppedobjs[droppedcnt] = HeapTupleGetDatum(tuple);
+       droppedcnt++;
+   }
+
+   SPI_finish();
+
+   /* No objects dropped? */
+   if (droppedcnt == 0)
+       PG_RETURN_VOID();
+
+   droppedarr = construct_array(droppedobjs, droppedcnt,
+                                elmtype, elmlen, elmbyval, elmalign);
+
+   /*
+    * Insert the dropped object(s) info into the bdr_queued_drops table
+    */
+   {
+       EState         *estate;
+       TupleTableSlot *slot;
+       RangeVar       *rv;
+       Relation        queuedcmds;
+       HeapTuple       newtup = NULL;
+       Datum           values[5];
+       bool            nulls[5];
+
+       /*
+        * Prepare bdr.bdr_queued_drops for insert.
+        * Can't use preloaded table oid since this method is executed under
+        * normal backends and not inside BDR worker.
+        * The tuple slot here is only needed for updating indexes.
+        */
+       rv = makeRangeVar("bdr", "bdr_queued_drops", -1);
+       queuedcmds = heap_openrv(rv, RowExclusiveLock);
+       slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+       estate = bdr_create_rel_estate(queuedcmds);
+       ExecOpenIndices(estate->es_result_relation_info);
+
+       /* lsn, queued_at, dropped_objects */
+       values[0] = pg_current_xlog_location(NULL);
+       values[1] = now(NULL);
+       values[2] = PointerGetDatum(droppedarr);
+       MemSet(nulls, 0, sizeof(nulls));
+
+       newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+       simple_heap_insert(queuedcmds, newtup);
+       ExecStoreTuple(newtup, slot, InvalidBuffer, false);
+       UserTableUpdateOpenIndexes(estate, slot);
+
+       ExecCloseIndices(estate->es_result_relation_info);
+       ExecDropSingleTupleTableSlot(slot);
+       heap_close(queuedcmds, RowExclusiveLock);
+   }
+
+   PG_RETURN_VOID();
+}
+#endif /*BUILDING_BDR*/
+
 /*
  * bdr_replicate_ddl_command
  *
  * Queues the input SQL for replication.
+ *
+ * Note that we don't allow CONCURRENTLY commands here, this is mainly because
+ * we queue command before we actually execute it, which we currently need
+ * to make the bdr_truncate_trigger_add work correctly. As written there
+ * the in_bdr_replicate_ddl_command concept is ugly.
  */
 Datum
 bdr_replicate_ddl_command(PG_FUNCTION_ARGS)
 {
    text    *command = PG_GETARG_TEXT_PP(0);
-   char    *query;
+   char    *query = text_to_cstring(command);
 
-   /* XXX: handle more nicely */
-   query = psprintf("SET LOCAL search_path TO %s;\n %s",
-                    GetConfigOptionByName("search_path", NULL),
-                    TextDatumGetCString(command));
+   /* Force everything in the query to be fully qualified. */
+   (void) set_config_option("search_path", "",
+                            PGC_USERSET, PGC_S_SESSION,
+                            GUC_ACTION_SAVE, true, 0);
 
-   bdr_queue_ddl_command("SQL", query);
+   /* Execute the query locally. */
+   bdr_commandfilter_always_allow_ddl(true);
+   in_bdr_replicate_ddl_command = true;
+
+   PG_TRY();
+       /* Queue the query for replication. */
+       bdr_queue_ddl_command("SQL", query);
+
+       /* Execute the query locally. */
+       bdr_execute_ddl_command(query, GetUserNameFromId(GetUserId()), false);
+   PG_CATCH();
+       in_bdr_replicate_ddl_command = false;
+       bdr_commandfilter_always_allow_ddl(false);
+       PG_RE_THROW();
+   PG_END_TRY();
+
+   in_bdr_replicate_ddl_command = false;
+   bdr_commandfilter_always_allow_ddl(false);
 
    PG_RETURN_VOID();
 }
-#endif //BUILDING_BDR
 
 void
 bdr_executor_always_allow_writes(bool always_allow)
 {
-#ifdef BUILDING_BDR
    Assert(IsUnderPostmaster);
    bdr_always_allow_writes = always_allow;
-#endif
 }
 
 /*
index 6c0cc9d1cc373fd9899c87e1988781806063c09c..89b59a3e5ec936919b0e54904b6af7bfb8237088 100644 (file)
 #include "postgres.h"
 
 #include "bdr.h"
+
 #include "bdr_locks.h"
 
+#ifdef BUILDING_BDR
+
 #include "miscadmin.h"
 
 #include "access/xact.h"
@@ -1243,3 +1246,22 @@ bdr_locks_check_query(void)
 
    }
 }
+
+#else
+
+/* bdr_locks are not used by UDR at the moment */
+void
+bdr_locks_startup(Size nnodes)
+{
+}
+
+void
+bdr_locks_shmem_init(Size num_used_databases)
+{
+}
+
+void
+bdr_acquire_ddl_lock(void)
+{
+}
+#endif
index b1dd6b655c117d586cdd7f0370e9d36e9885fc87..1e903521e4504b6f9f2b2423eb43fac602008570 100644 (file)
@@ -47,8 +47,6 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_r
  Column |  Type   | Modifiers | Storage | Stats target | Description 
 --------+---------+-----------+---------+--------------+-------------
  val    | integer |           | plain   |              | 
-Triggers:
-    truncate_trigger AFTER TRUNCATE ON test_tbl_unlogged_create FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()
 
 \c postgres
 \d+ test_tbl_unlogged_create
@@ -56,8 +54,6 @@ Triggers:
  Column |  Type   | Modifiers | Storage | Stats target | Description 
 --------+---------+-----------+---------+--------------+-------------
  val    | integer |           | plain   |              | 
-Triggers:
-    truncate_trigger AFTER TRUNCATE ON test_tbl_unlogged_create FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()
 
 DROP TABLE test_tbl_unlogged_create;
 SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_replication;