#include "access/xact.h"
#include "access/xlog_fn.h"
+#include "catalog/namespace.h"
#include "catalog/pg_trigger.h"
+#include "commands/event_trigger.h"
#include "commands/trigger.h"
#include "executor/executor.h"
static bool bdr_always_allow_writes = false;
-
+#ifdef BDR_MULTIMASTER
PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
+#else
+PG_FUNCTION_INFO_V1(bdr_replicate_ddl_command);
+#endif
+PG_FUNCTION_INFO_V1(bdr_add_truncate_trigger);
EState *
bdr_create_rel_estate(Relation rel)
return found;
}
+/*
+ * bdr_queue_ddl_command
+ *
+ * Insert DDL command into the bdr.bdr_queued_commands table.
+ */
+void
+bdr_queue_ddl_command(char *command_tag, char *command)
+{
+ EState *estate;
+ TupleTableSlot *slot;
+ RangeVar *rv;
+ Relation queuedcmds;
+ HeapTuple newtup = NULL;
+ Datum values[5];
+ bool nulls[5];
+
+ /* prepare bdr.bdr_queued_commands for insert */
+ rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
+ queuedcmds = heap_openrv(rv, RowExclusiveLock);
+ slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+ estate = bdr_create_rel_estate(queuedcmds);
+ ExecOpenIndices(estate->es_result_relation_info);
+
+ /* lsn, queued_at, perpetrator, command_tag, command */
+ values[0] = pg_current_xlog_location(NULL);
+ values[1] = now(NULL);
+ values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
+ values[3] = CStringGetTextDatum(command_tag);
+ values[4] = CStringGetTextDatum(command);
+ MemSet(nulls, 0, sizeof(nulls));
+
+ newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+ simple_heap_insert(queuedcmds, newtup);
+ ExecStoreTuple(newtup, slot, InvalidBuffer, false);
+ UserTableUpdateOpenIndexes(estate, slot);
+
+ ExecCloseIndices(estate->es_result_relation_info);
+ ExecDropSingleTupleTableSlot(slot);
+ heap_close(queuedcmds, RowExclusiveLock);
+}
+
+
+/*
+ * bdr_add_truncate_trigger
+ *
+ * This function adds TRUNCATE trigger to newly created tables.
+ */
+Datum
+bdr_add_truncate_trigger(PG_FUNCTION_ARGS)
+{
+ EventTriggerData *trigdata;
+ char *skip_ddl;
+
+ if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) /* internal error */
+ elog(ERROR, "not fired by event trigger manager");
+
+ /*
+ * If we're currently replaying something from a remote node, don't queue
+ * the commands; that would cause recursion.
+ */
+ if (replication_origin_id != InvalidRepNodeId)
+ PG_RETURN_VOID(); /* XXX return type? */
+
+ /*
+ * Similarly, if configured to skip queueing DDL, don't queue. This is
+ * mostly used when pg_restore brings a remote node state, so all objects
+ * will be copied over in the dump anyway.
+ */
+ skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+ if (strcmp(skip_ddl, "on") == 0)
+ PG_RETURN_VOID();
+
+ trigdata = (EventTriggerData *) fcinfo->context;
+
+ if (strcmp(trigdata->tag, "CREATE TABLE") == 0 &&
+ IsA(trigdata->parsetree, CreateStmt))
+ {
+ CreateStmt *stmt = (CreateStmt *)trigdata->parsetree;
+ char *nspname;
+ char *query;
+ int res;
+
+ /* Skip temporary and unlogged tables */
+ if (stmt->relation->relpersistence != RELPERSISTENCE_PERMANENT)
+ PG_RETURN_VOID();
+
+ nspname = get_namespace_name(RangeVarGetCreationNamespace(stmt->relation));
+
+ SPI_connect();
+
+ query = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
+ "ON %s.%s FOR EACH STATEMENT EXECUTE PROCEDURE "
+ "bdr.queue_truncate()",
+ quote_identifier(nspname),
+ quote_identifier(stmt->relation->relname));
+ res = SPI_execute(query, false, 0);
+ if (res != SPI_OK_UTILITY)
+ elog(ERROR, "SPI failure: %d", res);
+
+ SPI_finish();
+ }
+
+ PG_RETURN_VOID();
+}
+
#ifdef BDR_MULTIMASTER
/*
* bdr_queue_ddl_commands
Datum
bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
{
- EState *estate;
- TupleTableSlot *slot;
- RangeVar *rv;
- Relation queuedcmds;
char *skip_ddl;
int res;
int i;
if (res != SPI_OK_SELECT)
elog(ERROR, "SPI query failed: %d", res);
- /* prepare bdr.bdr_queued_commands for insert */
- rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
- queuedcmds = heap_openrv(rv, RowExclusiveLock);
- slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
- estate = bdr_create_rel_estate(queuedcmds);
- ExecOpenIndices(estate->es_result_relation_info);
-
/*
* For each command row reported by the event trigger facility, insert zero
* or one row in the BDR queued commands table specifying how to replicate
if (DatumGetBool(cmdvalues[4]))
continue;
- /* lsn, queued_at, perpetrator, command_tag, command */
- values[0] = pg_current_xlog_location(NULL);
- values[1] = now(NULL);
- values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
- values[3] = cmdvalues[0];
- values[4] = cmdvalues[5];
- MemSet(nulls, 0, sizeof(nulls));
+ bdr_queue_ddl_command(TextDatumGetCString(cmdvalues[0]),
+ TextDatumGetCString(cmdvalues[5]));
+ }
- newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
- simple_heap_insert(queuedcmds, newtup);
- ExecStoreTuple(newtup, slot, InvalidBuffer, false);
- UserTableUpdateOpenIndexes(estate, slot);
+ SPI_finish();
- /*
- * If we're creating a table, attach a per-stmt trigger to it too, so
- * that whenever a TRUNCATE is executed in a node, it's replicated to
- * all other nodes.
- */
- if ((strcmp(TextDatumGetCString(cmdvalues[0]), "CREATE TABLE") == 0) &&
- (strcmp(TextDatumGetCString(cmdvalues[1]), "table") == 0))
- {
- char *stmt;
-
- /* The table identity is already quoted */
- stmt = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
- "ON %s FOR EACH STATEMENT EXECUTE PROCEDURE "
- "bdr.queue_truncate()",
- TextDatumGetCString(cmdvalues[3]));
- res = SPI_execute(stmt, false, 0);
- if (res != SPI_OK_UTILITY)
- elog(ERROR, "SPI failure: %d", res);
- }
- }
+ PG_RETURN_VOID();
+}
+#else
+/*
+ * bdr_replicate_ddl_command
+ *
+ * Queues the input SQL for replication.
+ */
+Datum
+bdr_replicate_ddl_command(PG_FUNCTION_ARGS)
+{
+ text *command = PG_GETARG_TEXT_PP(0);
+ char *query;
- ExecCloseIndices(estate->es_result_relation_info);
- ExecDropSingleTupleTableSlot(slot);
- heap_close(queuedcmds, RowExclusiveLock);
+ /* XXX: handle more nicely */
+ query = psprintf("SET LOCAL search_path TO %s;\n %s",
+ GetConfigOptionByName("search_path", NULL),
+ TextDatumGetCString(command));
- SPI_finish();
+ bdr_queue_ddl_command("SQL", query);
PG_RETURN_VOID();
}