bdr: prototype of DDL replication in UDR
authorPetr Jelinek <pjmodos@pjmodos.net>
Sat, 20 Sep 2014 00:08:26 +0000 (02:08 +0200)
committerPetr Jelinek <pjmodos@pjmodos.net>
Fri, 21 Nov 2014 13:55:51 +0000 (14:55 +0100)
The actual user facing function is quite ugly currently so the commit is
mainly about refactoring to make this possible.

bdr--0.7.1--0.8.sql
bdr--0.8.0.sql
bdr.h
bdr_executor.c

index e9976136ec0dde7e52e583ed0ea7bf7bc4740174..8829a10a3136b5f02a3c04b1a3364fc0fee2956e 100644 (file)
@@ -1,2 +1,24 @@
-ALTER TYPE bdr_conflict_resolution ADD VALUE 'apply_change' BEFORE 'unhandled_tx_abort';
-ALTER TYPE bdr_conflict_resolution ADD VALUE 'skip_change' BEFORE 'unhandled_tx_abort';
+ALTER TYPE bdr.bdr_conflict_resolution ADD VALUE 'apply_change' BEFORE 'unhandled_tx_abort';
+ALTER TYPE bdr.bdr_conflict_resolution ADD VALUE 'skip_change' BEFORE 'unhandled_tx_abort';
+
+CREATE OR REPLACE FUNCTION bdr.bdr_add_truncate_trigger()
+RETURNS event_trigger
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
+CREATE EVENT TRIGGER bdr_add_truncate_trigger
+ON ddl_command_end
+EXECUTE PROCEDURE bdr.bdr_add_truncate_trigger();
+
+DO $DO$BEGIN
+IF right(bdr_version(), 4) = '-udr' THEN
+
+CREATE OR REPLACE FUNCTION bdr.bdr_replicate_ddl_command(cmd TEXT)
+RETURNS VOID
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
+
+END IF;
+END;$DO$;
\ No newline at end of file
index a856c99a6220e5b7eea770c804cfd625dc4d6f78..cbe5ea5d8e75dba0df0e14f6b8a0d909b291479e 100644 (file)
@@ -438,6 +438,11 @@ CREATE TABLE bdr.bdr_queued_drops (
 REVOKE ALL ON TABLE bdr_queued_drops FROM PUBLIC;
 SELECT pg_catalog.pg_extension_config_dump('bdr_queued_drops', '');
 
+DO $DO$BEGIN
+IF right(bdr_version(), 4) = '-udr' THEN
+    RETURN;
+END IF;
+
 CREATE OR REPLACE FUNCTION bdr.queue_dropped_objects()
 RETURNS event_trigger
 LANGUAGE plpgsql
@@ -484,6 +489,8 @@ CREATE EVENT TRIGGER queue_drops
 ON sql_drop
 EXECUTE PROCEDURE bdr.queue_dropped_objects();
 
+END;$DO$;
+
 CREATE OR REPLACE FUNCTION bdr_apply_pause()
 RETURNS VOID
 LANGUAGE C
diff --git a/bdr.h b/bdr.h
index 05d088f78bc938f78d456ab2e8010eef1ff58dcb..53c20b6ad65578281ff2cd95fec1f9657f817094 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -408,6 +408,7 @@ extern void init_bdr_commandfilter(void);
 
 extern void bdr_executor_init(void);
 extern void bdr_executor_always_allow_writes(bool always_allow);
+extern void bdr_queue_ddl_command(char *command_tag, char *command);
 
 extern void bdr_locks_shmem_init(Size num_used_databases);
 extern void bdr_locks_check_query(void);
index aba537f05898c66e21d0e9e2398b89d1657b50c5..18e9ff096c1ae5587b910e4d5fab3757922c6d7f 100644 (file)
 #include "access/xact.h"
 #include "access/xlog_fn.h"
 
+#include "catalog/namespace.h"
 #include "catalog/pg_trigger.h"
 
+#include "commands/event_trigger.h"
 #include "commands/trigger.h"
 
 #include "executor/executor.h"
@@ -57,8 +59,12 @@ static ExecutorStart_hook_type PrevExecutorStart_hook = NULL;
 
 static bool bdr_always_allow_writes = false;
 
-
+#ifdef BDR_MULTIMASTER
 PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
+#else
+PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+#endif
+PG_FUNCTION_INFO_V1(bdr_add_truncate_trigger);
 
 EState *
 bdr_create_rel_estate(Relation rel)
@@ -310,6 +316,111 @@ retry:
    return found;
 }
 
+/*
+ * bdr_queue_ddl_command
+ *
+ * Insert DDL command into the bdr.bdr_queued_commands table.
+ */
+void
+bdr_queue_ddl_command(char *command_tag, char *command)
+{
+   EState         *estate;
+   TupleTableSlot *slot;
+   RangeVar       *rv;
+   Relation        queuedcmds;
+   HeapTuple       newtup = NULL;
+   Datum           values[5];
+   bool            nulls[5];
+
+   /* prepare bdr.bdr_queued_commands for insert */
+   rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
+   queuedcmds = heap_openrv(rv, RowExclusiveLock);
+   slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+   estate = bdr_create_rel_estate(queuedcmds);
+   ExecOpenIndices(estate->es_result_relation_info);
+
+   /* lsn, queued_at, perpetrator, command_tag, command */
+   values[0] = pg_current_xlog_location(NULL);
+   values[1] = now(NULL);
+   values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
+   values[3] = CStringGetTextDatum(command_tag);
+   values[4] = CStringGetTextDatum(command);
+   MemSet(nulls, 0, sizeof(nulls));
+
+   newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+   simple_heap_insert(queuedcmds, newtup);
+   ExecStoreTuple(newtup, slot, InvalidBuffer, false);
+   UserTableUpdateOpenIndexes(estate, slot);
+
+   ExecCloseIndices(estate->es_result_relation_info);
+   ExecDropSingleTupleTableSlot(slot);
+   heap_close(queuedcmds, RowExclusiveLock);
+}
+
+
+/*
+ * bdr_add_truncate_trigger
+ *
+ * This function adds TRUNCATE trigger to newly created tables.
+ */
+Datum
+bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
+{
+    EventTriggerData   *trigdata;
+   char               *skip_ddl;
+
+    if (!CALLED_AS_EVENT_TRIGGER(fcinfo))  /* internal error */
+        elog(ERROR, "not fired by event trigger manager");
+
+   /*
+    * If we're currently replaying something from a remote node, don't queue
+    * the commands; that would cause recursion.
+    */
+   if (replication_origin_id != InvalidRepNodeId)
+       PG_RETURN_VOID();   /* XXX return type? */
+
+   /*
+    * Similarly, if configured to skip queueing DDL, don't queue.  This is
+    * mostly used when pg_restore brings a remote node state, so all objects
+    * will be copied over in the dump anyway.
+    */
+   skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+   if (strcmp(skip_ddl, "on") == 0)
+       PG_RETURN_VOID();
+
+    trigdata = (EventTriggerData *) fcinfo->context;
+
+   if (strcmp(trigdata->tag, "CREATE TABLE") == 0 &&
+       IsA(trigdata->parsetree, CreateStmt))
+   {
+       CreateStmt *stmt = (CreateStmt *)trigdata->parsetree;
+       char       *nspname;
+       char       *query;
+       int         res;
+
+       /* Skip temporary and unlogged tables */
+       if (stmt->relation->relpersistence != RELPERSISTENCE_PERMANENT)
+           PG_RETURN_VOID();
+
+       nspname = get_namespace_name(RangeVarGetCreationNamespace(stmt->relation));
+
+       SPI_connect();
+
+       query = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
+                        "ON %s.%s FOR EACH STATEMENT EXECUTE PROCEDURE "
+                        "bdr.queue_truncate()",
+                        quote_identifier(nspname),
+                        quote_identifier(stmt->relation->relname));
+       res = SPI_execute(query, false, 0);
+       if (res != SPI_OK_UTILITY)
+           elog(ERROR, "SPI failure: %d", res);
+
+       SPI_finish();
+   }
+
+   PG_RETURN_VOID();
+}
+
 #ifdef BDR_MULTIMASTER
 /*
  * bdr_queue_ddl_commands
@@ -321,10 +432,6 @@ retry:
 Datum
 bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
 {
-   EState  *estate;
-   TupleTableSlot  *slot;
-   RangeVar *rv;
-   Relation queuedcmds;
    char   *skip_ddl;
    int     res;
    int     i;
@@ -372,13 +479,6 @@ bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
    if (res != SPI_OK_SELECT)
        elog(ERROR, "SPI query failed: %d", res);
 
-   /* prepare bdr.bdr_queued_commands for insert */
-   rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
-   queuedcmds = heap_openrv(rv, RowExclusiveLock);
-   slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
-   estate = bdr_create_rel_estate(queuedcmds);
-   ExecOpenIndices(estate->es_result_relation_info);
-
    /*
     * For each command row reported by the event trigger facility, insert zero
     * or one row in the BDR queued commands table specifying how to replicate
@@ -410,45 +510,32 @@ bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
        if (DatumGetBool(cmdvalues[4]))
            continue;
 
-       /* lsn, queued_at, perpetrator, command_tag, command */
-       values[0] = pg_current_xlog_location(NULL);
-       values[1] = now(NULL);
-       values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
-       values[3] = cmdvalues[0];
-       values[4] = cmdvalues[5];
-       MemSet(nulls, 0, sizeof(nulls));
+       bdr_queue_ddl_command(TextDatumGetCString(cmdvalues[0]),
+                             TextDatumGetCString(cmdvalues[5]));
+   }
 
-       newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
-       simple_heap_insert(queuedcmds, newtup);
-       ExecStoreTuple(newtup, slot, InvalidBuffer, false);
-       UserTableUpdateOpenIndexes(estate, slot);
+   SPI_finish();
 
-       /*
-        * If we're creating a table, attach a per-stmt trigger to it too, so
-        * that whenever a TRUNCATE is executed in a node, it's replicated to
-        * all other nodes.
-        */
-       if ((strcmp(TextDatumGetCString(cmdvalues[0]), "CREATE TABLE") == 0) &&
-           (strcmp(TextDatumGetCString(cmdvalues[1]), "table") == 0))
-       {
-           char    *stmt;
-
-           /* The table identity is already quoted */
-           stmt = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
-                           "ON %s FOR EACH STATEMENT EXECUTE PROCEDURE "
-                           "bdr.queue_truncate()",
-                           TextDatumGetCString(cmdvalues[3]));
-           res = SPI_execute(stmt, false, 0);
-           if (res != SPI_OK_UTILITY)
-               elog(ERROR, "SPI failure: %d", res);
-       }
-   }
+   PG_RETURN_VOID();
+}
+#else
+/*
+ * bdr_replicate_ddl_command
+ *
+ * Queues the input SQL for replication.
+ */
+Datum
+bdr_replicate_ddl_command(PG_FUNCTION_ARGS)
+{
+   text    *command = PG_GETARG_TEXT_PP(0);
+   char    *query;
 
-   ExecCloseIndices(estate->es_result_relation_info);
-   ExecDropSingleTupleTableSlot(slot);
-   heap_close(queuedcmds, RowExclusiveLock);
+   /* XXX: handle more nicely */
+   query = psprintf("SET LOCAL search_path TO %s;\n %s",
+                    GetConfigOptionByName("search_path", NULL),
+                    TextDatumGetCString(command));
 
-   SPI_finish();
+   bdr_queue_ddl_command("SQL", query);
 
    PG_RETURN_VOID();
 }