From 88cbcd18e54263196012a66aa66b8f0a7da519a8 Mon Sep 17 00:00:00 2001 From: Jan Wieck Date: Thu, 19 Jan 2012 15:49:00 -0500 Subject: [PATCH] Change format of the sl_log_1/2 tables so that they contain the table name and schema as separate fields. Change the column names and values from a rudimentary SQL statement into a text array. Change the transport to use COPY protocol and move the part that does the updates to user tables into a trigger that fires on sl_log_1/2. Move DDL from sl_event into a separate sl_log_script table that is pulled into the log selection so that schema changes flow at the correct time in between data updates, not as separate events. --- src/backend/.gitignore | 1 + src/backend/slony1_base.sql | 56 +- src/backend/slony1_funcs.c | 1273 +++++++++++++++------ src/backend/slony1_funcs.sql | 586 +++++++--- src/slon/confoptions.c | 24 - src/slon/confoptions.h | 2 - src/slon/remote_worker.c | 2074 +++++++++++----------------------- src/slon/runtime_config.c | 58 + src/slon/slon.c | 13 +- src/slon/slon.h | 5 +- src/slonik/parser.y | 9 +- src/slonik/slonik.c | 89 +- src/slonik/slonik.h | 3 +- 13 files changed, 2184 insertions(+), 2009 deletions(-) diff --git a/src/backend/.gitignore b/src/backend/.gitignore index 7ef43b68..370f7569 100755 --- a/src/backend/.gitignore +++ b/src/backend/.gitignore @@ -2,3 +2,4 @@ avl_tree.c avl_tree.h slony1_funcs.o slony1_funcs.so +slony1_funcs.*.so diff --git a/src/backend/slony1_base.sql b/src/backend/slony1_base.sql index d82e69db..16ebb6ab 100644 --- a/src/backend/slony1_base.sql +++ b/src/backend/slony1_base.sql @@ -369,8 +369,11 @@ create table @NAMESPACE@.sl_log_1 ( log_txid bigint, log_tableid int4, log_actionseq int8, - log_cmdtype char, - log_cmddata text + log_tablenspname text, + log_tablerelname text, + log_cmdtype "char", + log_cmdupdncols int4, + log_cmdargs text[] ) WITHOUT OIDS; create index sl_log_1_idx1 on @NAMESPACE@.sl_log_1 (log_origin, log_txid, log_actionseq); @@ -383,8 +386,12 @@ comment on table @NAMESPACE@.sl_log_1 is 'Stores each change to be propagated to comment on column @NAMESPACE@.sl_log_1.log_origin is 'Origin node from which the change came'; comment on column @NAMESPACE@.sl_log_1.log_txid is 'Transaction ID on the origin node'; comment on column @NAMESPACE@.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; -comment on column @NAMESPACE@.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE'; -comment on column @NAMESPACE@.sl_log_1.log_cmddata is 'The data needed to perform the log action'; +comment on column @NAMESPACE@.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; +comment on column @NAMESPACE@.sl_log_1.log_tablenspname is 'The schema name of the table affected'; +comment on column @NAMESPACE@.sl_log_1.log_tablerelname is 'The table name of the table affected'; +comment on column @NAMESPACE@.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; +comment on column @NAMESPACE@.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; +comment on column @NAMESPACE@.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; -- ---------------------------------------------------------------------- -- TABLE sl_log_2 @@ -394,15 +401,12 @@ create table @NAMESPACE@.sl_log_2 ( log_txid bigint, log_tableid int4, log_actionseq int8, - log_cmdtype char, - log_cmddata text + log_tablenspname text, + log_tablerelname text, + log_cmdtype "char", + log_cmdupdncols int4, + log_cmdargs text[] ) WITHOUT OIDS; -comment on table @NAMESPACE@.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; -comment on column @NAMESPACE@.sl_log_2.log_origin is 'Origin node from which the change came'; -comment on column @NAMESPACE@.sl_log_2.log_txid is 'Transaction ID on the origin node'; -comment on column @NAMESPACE@.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; -comment on column @NAMESPACE@.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE'; -comment on column @NAMESPACE@.sl_log_2.log_cmddata is 'The data needed to perform the log action'; create index sl_log_2_idx1 on @NAMESPACE@.sl_log_2 (log_origin, log_txid, log_actionseq); @@ -410,6 +414,34 @@ create index sl_log_2_idx1 on @NAMESPACE@.sl_log_2 -- create index sl_log_2_idx2 on @NAMESPACE@.sl_log_2 -- (log_txid); +comment on table @NAMESPACE@.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; +comment on column @NAMESPACE@.sl_log_2.log_origin is 'Origin node from which the change came'; +comment on column @NAMESPACE@.sl_log_2.log_txid is 'Transaction ID on the origin node'; +comment on column @NAMESPACE@.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; +comment on column @NAMESPACE@.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; +comment on column @NAMESPACE@.sl_log_2.log_tablenspname is 'The schema name of the table affected'; +comment on column @NAMESPACE@.sl_log_2.log_tablerelname is 'The table name of the table affected'; +comment on column @NAMESPACE@.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; +comment on column @NAMESPACE@.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; +comment on column @NAMESPACE@.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; + +-- ---------------------------------------------------------------------- +-- TABLE sl_log_script +-- ---------------------------------------------------------------------- +create table @NAMESPACE@.sl_log_script ( + log_origin int4, + log_txid bigint, + log_actionseq int8, + log_cmdargs text[] +) WITHOUT OIDS; +create index sl_log_script_idx1 on @NAMESPACE@.sl_log_script + (log_origin, log_txid, log_actionseq); + +comment on table @NAMESPACE@.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; +comment on column @NAMESPACE@.sl_log_script.log_origin is 'Origin name from which the change came'; +comment on column @NAMESPACE@.sl_log_script.log_txid is 'Transaction ID on the origin node'; +comment on column @NAMESPACE@.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; +comment on column @NAMESPACE@.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; -- ---------------------------------------------------------------------- -- TABLE sl_registry diff --git a/src/backend/slony1_funcs.c b/src/backend/slony1_funcs.c index 6dac296f..ca4a4a96 100644 --- a/src/backend/slony1_funcs.c +++ b/src/backend/slony1_funcs.c @@ -25,10 +25,12 @@ #include "parser/keywords.h" #include "parser/parse_type.h" #include "executor/spi.h" +#include "libpq/md5.h" #include "commands/trigger.h" #include "commands/async.h" #include "catalog/pg_operator.h" #include "catalog/pg_type.h" +#include "catalog/namespace.h" #include "access/xact.h" #include "access/transam.h" #include "utils/builtins.h" @@ -36,6 +38,8 @@ #include "utils/guc.h" #include "utils/rel.h" #include "utils/relcache.h" +#include "utils/lsyscache.h" +#include "utils/hsearch.h" #ifdef HAVE_GETACTIVESNAPSHOT #include "utils/snapmgr.h" #endif @@ -62,6 +66,7 @@ PG_FUNCTION_INFO_V1(_Slony_I_getModuleVersion); PG_FUNCTION_INFO_V1(_Slony_I_logTrigger); PG_FUNCTION_INFO_V1(_Slony_I_denyAccess); +PG_FUNCTION_INFO_V1(_Slony_I_logApply); PG_FUNCTION_INFO_V1(_Slony_I_lockedSet); PG_FUNCTION_INFO_V1(_Slony_I_killBackend); PG_FUNCTION_INFO_V1(_Slony_I_seqtrack); @@ -76,6 +81,7 @@ Datum _Slony_I_getModuleVersion(PG_FUNCTION_ARGS); Datum _Slony_I_logTrigger(PG_FUNCTION_ARGS); Datum _Slony_I_denyAccess(PG_FUNCTION_ARGS); +Datum _Slony_I_logApply(PG_FUNCTION_ARGS); Datum _Slony_I_lockedSet(PG_FUNCTION_ARGS); Datum _Slony_I_killBackend(PG_FUNCTION_ARGS); Datum _Slony_I_seqtrack(PG_FUNCTION_ARGS); @@ -92,6 +98,7 @@ extern DLLIMPORT Node *newNodeMacroHolder; #define PLAN_NONE 0 #define PLAN_INSERT_EVENT (1 << 1) #define PLAN_INSERT_LOG_STATUS (1 << 2) +#define PLAN_APPLY_QUERIES (1 << 3) /* ---- @@ -112,26 +119,46 @@ typedef struct slony_I_cluster_status void *plan_insert_event; void *plan_insert_log_1; void *plan_insert_log_2; + void *plan_insert_log_script; void *plan_record_sequences; void *plan_get_logstatus; + void *plan_table_info; text *cmdtype_I; text *cmdtype_U; text *cmdtype_D; - text *cmddata_buf; - int cmddata_size; - struct slony_I_cluster_status *next; } Slony_I_ClusterStatus; + +typedef struct apply_cache_entry +{ + char key[16]; + + void *plan; + bool forward; + struct apply_cache_entry *prev; + struct apply_cache_entry *next; + struct apply_cache_entry *self; +} ApplyCacheEntry; + + +static HTAB *applyCacheHash = NULL; +static ApplyCacheEntry *applyCacheHead = NULL; +static ApplyCacheEntry *applyCacheTail = NULL; +static int applyCacheSize = 100; +static int applyCacheUsed = 0; + + + + /*@null@*/ static Slony_I_ClusterStatus *clusterStatusList = NULL; static Slony_I_ClusterStatus * getClusterStatus(Name cluster_name, int need_plan_mask); static const char *slon_quote_identifier(const char *ident); -static char *slon_quote_literal(char *str); static int prepareLogPlan(Slony_I_ClusterStatus * cs, int log_status); @@ -287,14 +314,22 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) TransactionId newXid = GetTopTransactionId(); Slony_I_ClusterStatus *cs; TriggerData *tg; - Datum argv[4]; + Datum argv[6]; text *cmdtype = NULL; + int32 cmdupdncols = 0; int rc; Name cluster_name; int32 tab_id; char *attkind; int attkind_idx; - int cmddata_need; + + char *olddatestyle = NULL; + Datum *cmdargs = NULL; + Datum *cmdargselem = NULL; + bool *cmdnulls = NULL; + bool *cmdnullselem = NULL; + int cmddims[1]; + int cmdlbs[1]; /* * Don't do any logging if the current session role isn't Origin. @@ -323,7 +358,7 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) * Connect to the SPI manager */ if ((rc = SPI_connect()) < 0) - elog(ERROR, "Slony-I: SPI_connect() failed in createEvent()"); + elog(ERROR, "Slony-I: SPI_connect() failed in logTrigger()"); /* * Get all the trigger arguments @@ -379,35 +414,46 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) cs->currentXid = newXid; } + /* + * Save the current datestyle setting and switch to ISO (if not already) + */ + olddatestyle = GetConfigOptionByName("DateStyle", NULL); + if (!strstr(olddatestyle, "ISO")) +#ifdef SETCONFIGOPTION_6 + set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, + true, true); +#else + set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, + true, true, 0); +#endif + /* - * Determine cmdtype and cmddata depending on the command type + * Determine cmdtype and cmdargs depending on the command type */ if (TRIGGER_FIRED_BY_INSERT(tg->tg_event)) { HeapTuple new_row = tg->tg_trigtuple; TupleDesc tupdesc = tg->tg_relation->rd_att; - char *col_ident; char *col_value; - int len_ident; - int len_value; int i; - int need_comma = false; - char *OldDateStyle; - char *cp = VARDATA(cs->cmddata_buf); /* * INSERT * - * cmdtype = 'I' cmddata = ("col" [, ...]) values ('value' [, ...]) + * cmdtype = 'I' cmdargs = colname, newval [, ...] */ cmdtype = cs->cmdtype_I; + cmdargselem = cmdargs = (Datum *)palloc(sizeof(Datum) * + ((tg->tg_relation->rd_att->natts * 2) + 2)); + cmdnullselem = cmdnulls = (bool *)palloc(sizeof(bool) * + ((tg->tg_relation->rd_att->natts * 2) + 2)); + /* * Specify all the columns */ - *cp++ = '('; for (i = 0; i < tg->tg_relation->rd_att->natts; i++) { /* @@ -416,106 +462,29 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) if (tupdesc->attrs[i]->attisdropped) continue; - col_ident = (char *) slon_quote_identifier(SPI_fname(tupdesc, i + 1)); - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_ident = strlen(col_ident)); - if (cs->cmddata_size < cmddata_need) - { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; - } - - if (need_comma) - *cp++ = ','; - else - need_comma = true; - - memcpy(cp, col_ident, len_ident); - cp += len_ident; - } - - /* - * Append the string ") values (" - */ - *cp++ = ')'; - *cp++ = ' '; - *cp++ = 'v'; - *cp++ = 'a'; - *cp++ = 'l'; - *cp++ = 'u'; - *cp++ = 'e'; - *cp++ = 's'; - *cp++ = ' '; - *cp++ = '('; - - /* - * Append the values - */ - need_comma = false; - OldDateStyle = GetConfigOptionByName("DateStyle", NULL); - if (!strstr(OldDateStyle, "ISO")) -#ifdef SETCONFIGOPTION_6 - set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, true, true); -#else - set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, true, true, 0); -#endif - for (i = 0; i < tg->tg_relation->rd_att->natts; i++) - { /* - * Skip dropped columns + * Add the column name */ - if (tupdesc->attrs[i]->attisdropped) - continue; - + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(SPI_fname(tupdesc, i + 1))); + *cmdnullselem++ = false; + /* + * Add the column value + */ if ((col_value = SPI_getvalue(new_row, tupdesc, i + 1)) == NULL) { - col_value = "NULL"; + *cmdnullselem++ = true; + cmdargselem++; } else { - col_value = slon_quote_literal(col_value); + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(col_value)); + *cmdnullselem++ = false; } - - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_value = strlen(col_value)); - if (cs->cmddata_size < cmddata_need) - { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; - } - - if (need_comma) - *cp++ = ','; - else - need_comma = true; - - memcpy(cp, col_value, len_value); - cp += len_value; } - if (!strstr(OldDateStyle, "ISO")) -#ifdef SETCONFIGOPTION_6 - set_config_option("DateStyle", OldDateStyle, PGC_USERSET, PGC_S_SESSION, true, true); -#else - set_config_option("DateStyle", OldDateStyle, PGC_USERSET, PGC_S_SESSION, true, true, 0); -#endif - - /* - * Terminate and done - */ - *cp++ = ')'; - *cp = '\0'; - SET_VARSIZE(cs->cmddata_buf, - VARHDRSZ + (cp - VARDATA(cs->cmddata_buf))); } else if (TRIGGER_FIRED_BY_UPDATE(tg->tg_event)) { @@ -529,22 +498,24 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) char *col_ident; char *col_value; - int len_ident; - int len_value; int i; - int need_comma = false; - int need_and = false; - char *OldDateStyle; - - char *cp = VARDATA(cs->cmddata_buf); /* * UPDATE * - * cmdtype = 'U' cmddata = "col_ident"='value' [, ...] where - * "pk_ident" = 'value' [ and ...] + * cmdtype = 'U' cmdargs = pkcolname, oldval [, ...] + * colname, newval [, ...] */ cmdtype = cs->cmdtype_U; + + cmdargselem = cmdargs = (Datum *)palloc(sizeof(Datum) * + ((tg->tg_relation->rd_att->natts * 4) + 3)); + cmdnullselem = cmdnulls = (bool *)palloc(sizeof(bool) * + ((tg->tg_relation->rd_att->natts * 4) + 3)); + + /* + * For all changed columns, add name+value pairs and count them. + */ for (i = 0; i < tg->tg_relation->rd_att->natts; i++) { /* @@ -620,103 +591,26 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) } } - if (need_comma) - *cp++ = ','; - else - need_comma = true; - - col_ident = (char *) slon_quote_identifier(SPI_fname(tupdesc, i + 1)); - if (new_isnull) - col_value = "NULL"; - else + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(SPI_fname(tupdesc, i + 1))); + *cmdnullselem++ = false; + if (new_isnull) { - OldDateStyle = GetConfigOptionByName("DateStyle", NULL); - if (!strstr(OldDateStyle, "ISO")) -#ifdef SETCONFIGOPTION_6 - set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, true, true); -#else - set_config_option("DateStyle", "ISO", PGC_USERSET, PGC_S_SESSION, true, true, 0); -#endif - col_value = slon_quote_literal(SPI_getvalue(new_row, tupdesc, i + 1)); - if (!strstr(OldDateStyle, "ISO")) -#ifdef SETCONFIGOPTION_6 - set_config_option("DateStyle", OldDateStyle, PGC_USERSET, PGC_S_SESSION, true, true); -#else - set_config_option("DateStyle", OldDateStyle, PGC_USERSET, PGC_S_SESSION, true, true, 0); -#endif + *cmdnullselem++ = true; + cmdargselem++; } - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_ident = strlen(col_ident)) + - (len_value = strlen(col_value)); - if (cs->cmddata_size < cmddata_need) + else { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(SPI_getvalue(new_row, tupdesc, i + 1))); + *cmdnullselem++ = false; } - - memcpy(cp, col_ident, len_ident); - cp += len_ident; - *cp++ = '='; - memcpy(cp, col_value, len_value); - cp += len_value; + cmdupdncols++; } /* - * It can happen that the only UPDATE an application does is to set a - * column to the same value again. In that case, we'd end up here with - * no columns in the SET clause yet. We add the first key column here - * with it's old value to simulate the same for the replication - * engine. + * Add pairs of PK column names and values */ - if (!need_comma) - { - for (i = 0, attkind_idx = -1; i < tg->tg_relation->rd_att->natts; i++) - { - if (tupdesc->attrs[i]->attisdropped) - continue; - - attkind_idx++; - if (!attkind[attkind_idx]) - elog(ERROR, "Slony-I: no key columns found in logTrigger() attkind parameter"); - - if (attkind[attkind_idx] == 'k') - break; - } - col_ident = (char *) slon_quote_identifier(SPI_fname(tupdesc, i + 1)); - col_value = slon_quote_literal(SPI_getvalue(old_row, tupdesc, i + 1)); - - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_ident = strlen(col_ident)) + - (len_value = strlen(col_value)); - if (cs->cmddata_size < cmddata_need) - { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; - } - - memcpy(cp, col_ident, len_ident); - cp += len_ident; - *cp++ = '='; - memcpy(cp, col_value, len_value); - cp += len_value; - } - - *cp++ = ' '; - *cp++ = 'w'; - *cp++ = 'h'; - *cp++ = 'e'; - *cp++ = 'r'; - *cp++ = 'e'; - *cp++ = ' '; - for (i = 0, attkind_idx = -1; i < tg->tg_relation->rd_att->natts; i++) { /* @@ -730,45 +624,21 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) break; if (attkind[attkind_idx] != 'k') continue; - col_ident = (char *) slon_quote_identifier(SPI_fname(tupdesc, i + 1)); - col_value = slon_quote_literal(SPI_getvalue(old_row, tupdesc, i + 1)); + col_ident = SPI_fname(tupdesc, i + 1); + col_value = SPI_getvalue(old_row, tupdesc, i + 1); if (col_value == NULL) elog(ERROR, "Slony-I: old key column %s.%s IS NULL on UPDATE", NameStr(tg->tg_relation->rd_rel->relname), col_ident); - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_ident = strlen(col_ident)) + - (len_value = strlen(col_value)); - if (cs->cmddata_size < cmddata_need) - { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; - } - - if (need_and) - { - *cp++ = ' '; - *cp++ = 'a'; - *cp++ = 'n'; - *cp++ = 'd'; - *cp++ = ' '; - } - else - need_and = true; + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(col_ident)); + *cmdnullselem++ = false; - memcpy(cp, col_ident, len_ident); - cp += len_ident; - *cp++ = '='; - memcpy(cp, col_value, len_value); - cp += len_value; + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(col_value)); + *cmdnullselem++ = false; } - *cp = '\0'; - SET_VARSIZE(cs->cmddata_buf, - VARHDRSZ + (cp - VARDATA(cs->cmddata_buf))); + } else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event)) { @@ -776,19 +646,23 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) TupleDesc tupdesc = tg->tg_relation->rd_att; char *col_ident; char *col_value; - int len_ident; - int len_value; int i; - int need_and = false; - char *cp = VARDATA(cs->cmddata_buf); /* * DELETE * - * cmdtype = 'D' cmddata = "pk_ident"='value' [and ...] + * cmdtype = 'D' cmdargs = pkcolname, oldval [, ...] */ cmdtype = cs->cmdtype_D; + cmdargselem = cmdargs = (Datum *)palloc(sizeof(Datum) * + ((tg->tg_relation->rd_att->natts * 2) + 2)); + cmdnullselem = cmdnulls = (bool *)palloc(sizeof(bool) * + ((tg->tg_relation->rd_att->natts * 2) + 2)); + + /* + * Add the PK columns + */ for (i = 0, attkind_idx = -1; i < tg->tg_relation->rd_att->natts; i++) { if (tupdesc->attrs[i]->attisdropped) @@ -799,55 +673,52 @@ _Slony_I_logTrigger(PG_FUNCTION_ARGS) break; if (attkind[attkind_idx] != 'k') continue; - col_ident = (char *) slon_quote_identifier(SPI_fname(tupdesc, i + 1)); - col_value = slon_quote_literal(SPI_getvalue(old_row, tupdesc, i + 1)); + + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(col_ident = SPI_fname(tupdesc, i + 1))); + *cmdnullselem++ = false; + + col_value = SPI_getvalue(old_row, tupdesc, i + 1); if (col_value == NULL) elog(ERROR, "Slony-I: old key column %s.%s IS NULL on DELETE", NameStr(tg->tg_relation->rd_rel->relname), col_ident); - - cmddata_need = (cp - (char *) (cs->cmddata_buf)) + 16 + - (len_ident = strlen(col_ident)) + - (len_value = strlen(col_value)); - if (cs->cmddata_size < cmddata_need) - { - int have = (cp - (char *) (cs->cmddata_buf)); - - while (cs->cmddata_size < cmddata_need) - cs->cmddata_size *= 2; - cs->cmddata_buf = realloc(cs->cmddata_buf, cs->cmddata_size); - cp = (char *) (cs->cmddata_buf) + have; - } - - if (need_and) - { - *cp++ = ' '; - *cp++ = 'a'; - *cp++ = 'n'; - *cp++ = 'd'; - *cp++ = ' '; - } - else - need_and = true; - - memcpy(cp, col_ident, len_ident); - cp += len_ident; - *cp++ = '='; - memcpy(cp, col_value, len_value); - cp += len_value; + *cmdargselem++ = DirectFunctionCall1(textin, + CStringGetDatum(col_value)); + *cmdnullselem++ = false; } - *cp = '\0'; - SET_VARSIZE(cs->cmddata_buf, - VARHDRSZ + (cp - VARDATA(cs->cmddata_buf))); } else elog(ERROR, "Slony-I: logTrigger() fired for unhandled event"); + /* + * Restore the datestyle + */ + if (!strstr(olddatestyle, "ISO")) +#ifdef SETCONFIGOPTION_6 + set_config_option("DateStyle", olddatestyle, + PGC_USERSET, PGC_S_SESSION, true, true); +#else + set_config_option("DateStyle", olddatestyle, + PGC_USERSET, PGC_S_SESSION, true, true, 0); +#endif + /* * Construct the parameter array and insert the log row. */ + cmddims[0] = cmdargselem - cmdargs; + cmdlbs[0] = 1; + argv[0] = Int32GetDatum(tab_id); - argv[1] = PointerGetDatum(cmdtype); - argv[2] = PointerGetDatum(cs->cmddata_buf); + argv[1] = DirectFunctionCall1(textin, + CStringGetDatum(get_namespace_name( + RelationGetNamespace(tg->tg_relation)))); + argv[2] = DirectFunctionCall1(textin, + CStringGetDatum(RelationGetRelationName(tg->tg_relation))); + argv[3] = PointerGetDatum(cmdtype); + argv[4] = Int32GetDatum(cmdupdncols); + argv[5] = PointerGetDatum(construct_md_array(cmdargs, cmdnulls, 1, + cmddims, cmdlbs, TEXTOID, -1, false, 'i')); + SPI_execp(cs->plan_active_log, argv, NULL, 0); SPI_finish(); @@ -902,6 +773,727 @@ _Slony_I_denyAccess(PG_FUNCTION_ARGS) } +Datum +_Slony_I_logApply(PG_FUNCTION_ARGS) +{ + static char *query = NULL; + static int query_alloc = 0; + + TransactionId newXid = GetTopTransactionId(); + Slony_I_ClusterStatus *cs; + TriggerData *tg; + HeapTuple new_row; + TupleDesc tupdesc; + Name cluster_name; + int rc; + bool isnull; + Relation target_rel; + + char *query_pos; + Datum dat; + char cmdtype; + char *nspname; + char *relname; + int32 cmdupdncols; + Datum *cmdargs; + bool *cmdargsnulls; + int cmdargsn; + int querynvals = 0; + Datum *queryvals = NULL; + Oid *querytypes = NULL; + char *querynulls = NULL; + char **querycolnames = NULL; + int i; + int spi_rc; + + ApplyCacheEntry *cacheEnt; + char cacheKey[16]; + bool found; + + /* + * Get the trigger call context + */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "Slony-I: logApply() not called as trigger"); + tg = (TriggerData *) (fcinfo->context); + + /* + * Don't do any applying if the current session role isn't Replica. + */ + if (SessionReplicationRole != SESSION_REPLICATION_ROLE_REPLICA) + return PointerGetDatum(tg->tg_trigtuple); + + /* + * Check all logApply() calling conventions + */ + if (!TRIGGER_FIRED_BEFORE(tg->tg_event)) + elog(ERROR, "Slony-I: logApply() must be fired BEFORE"); + if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event)) + elog(ERROR, "Slony-I: logApply() must be fired FOR EACH ROW"); + if (tg->tg_trigger->tgnargs != 1) + elog(ERROR, "Slony-I: logApply() must be defined with 1 arg"); + + /* + * Connect to the SPI manager + */ + if ((rc = SPI_connect()) < 0) + elog(ERROR, "Slony-I: SPI_connect() failed in logApply()"); + + /* + * Get or create the cluster status information and make sure it has the + * SPI plans that we need here. + */ + cluster_name = DatumGetName(DirectFunctionCall1(namein, + CStringGetDatum(tg->tg_trigger->tgargs[0]))); + cs = getClusterStatus(cluster_name, PLAN_APPLY_QUERIES); + + /* + * Do the following only once per transaction. + */ + if (!TransactionIdEquals(cs->currentXid, newXid)) + { + HASHCTL hctl; + + for (cacheEnt = applyCacheHead; cacheEnt; cacheEnt = cacheEnt->next) + { + if (cacheEnt->plan != NULL) + SPI_freeplan(cacheEnt->plan); + cacheEnt->plan = NULL; + } + applyCacheHead = NULL; + applyCacheTail = NULL; + applyCacheUsed = 0; + + if (applyCacheHash != NULL) + hash_destroy(applyCacheHash); + memset(&hctl, 0, sizeof(hctl)); + hctl.keysize = 16; + hctl.entrysize = sizeof(ApplyCacheEntry); + applyCacheHash = hash_create("Slony-I apply cache", + 50, &hctl, HASH_ELEM); + + cs->currentXid = newXid; + } + + /* + * Get the cmdtype first. + */ + new_row = tg->tg_trigtuple; + tupdesc = tg->tg_relation->rd_att; + + dat = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_cmdtype"), &isnull); + if (isnull) + elog(ERROR, "Slony-I: log_cmdtype is NULL"); + cmdtype = DatumGetChar(dat); + + /* + * Rows coming from sl_log_script are handled different from + * regular data log rows since they don't have all the columns. + */ + if (cmdtype == 'S') + { + char *ddl_script; + bool localNodeFound = true; + Datum script_insert_args[4]; + + dat = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); + if (isnull) + elog(ERROR, "Slony-I: log_cmdargs is NULL"); + + /* + * Turn the log_cmdargs into a plain array of Text Datums. + */ + deconstruct_array(DatumGetArrayTypeP(dat), + TEXTOID, -1, false, 'i', + &cmdargs, &cmdargsnulls, &cmdargsn); + + /* + * The first element is the DDL statement itself. + */ + ddl_script = DatumGetCString(DirectFunctionCall1( + textout, cmdargs[0])); + + /* + * If there is an optional node ID list, check that we are in it. + */ + if (cmdargsn > 1) { + localNodeFound = false; + for (i = 1; i < cmdargsn; i++) + { + int32 nodeId = DatumGetInt32( + DirectFunctionCall1(int4in, + DirectFunctionCall1(textout, cmdargs[i]))); + if (nodeId == cs->localNodeId) + { + localNodeFound = true; + break; + } + } + } + + /* + * Execute the DDL statement if the node list is empty or our + * local node ID appears in it. + */ + if (localNodeFound) + { + if (SPI_exec(ddl_script, 0) < 0) + { + elog(ERROR, "SPI_exec() failed for DDL statement '%s'", + ddl_script); + } + + /* + * Set the currentXid to invalid to flush the apply + * query cache. + */ + cs->currentXid = InvalidTransactionId; + } + + /* + * Build the parameters for the insert into sl_log_script + * and execute the query. + */ + script_insert_args[0] = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_origin"), &isnull); + script_insert_args[1] = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_txid"), &isnull); + script_insert_args[2] = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_actionseq"), &isnull); + script_insert_args[3] = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); + if (SPI_execp(cs->plan_insert_log_script, script_insert_args, NULL, 0) < 0) + elog(ERROR, "Execution of sl_log_script insert plan failed"); + + /* + * Return NULL to suppress the insert into the original sl_log_N. + */ + SPI_finish(); + return PointerGetDatum(NULL); + } + + /* + * Normal data log row. Get all the relevant data from the log row. + */ + nspname = SPI_getvalue(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_tablenspname")); + + relname = SPI_getvalue(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_tablerelname")); + + dat = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_cmdupdncols"), &isnull); + if (isnull && cmdtype == 'U') + elog(ERROR, "Slony-I: log_cmdupdncols is NULL on UPDATE"); + cmdupdncols = DatumGetInt32(dat); + + dat = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_cmdargs"), &isnull); + if (isnull) + elog(ERROR, "Slony-I: log_cmdargs is NULL"); + + /* + * Turn the log_cmdargs into a plain array of Text Datums. + */ + deconstruct_array(DatumGetArrayTypeP(dat), + TEXTOID, -1, false, 'i', + &cmdargs, &cmdargsnulls, &cmdargsn); + + /* + * Find the target relation in the system cache. We need this to + * find the data types of the target columns for casting. + */ + target_rel = RelationIdGetRelation( + get_relname_relid(relname, LookupExplicitNamespace(nspname))); + if (target_rel == NULL) + elog(ERROR, "Slony-I: cannot find table %s.%s in logApply()", + slon_quote_identifier(nspname), + slon_quote_identifier(relname)); + + /* + * On first call, allocate the query string buffer. + */ + if (query == NULL) + { + if ((query = malloc(query_alloc = 8192)) == NULL) + { + elog(ERROR, "Slony-I: out of memory in logApply()"); + } + } + query_pos = query; + + /* + * Handle the log row according to its log_cmdtype + */ + switch (cmdtype) + { + case 'I': + /* + * INSERT + */ + querycolnames = (char **)palloc(sizeof(char *) * cmdargsn / 2); + queryvals = (Datum *)palloc(sizeof(Datum) * cmdargsn / 2); + querytypes = (Oid *)palloc(sizeof(Oid) * cmdargsn / 2); + querynulls = (char *)palloc(cmdargsn / 2 + 1); + + sprintf(query_pos, "INSERT INTO %s.%s (", + slon_quote_identifier(nspname), + slon_quote_identifier(relname)); + query_pos += strlen(query_pos); + + /* + * Construct the list of quoted column names. + */ + for (i = 0; i < cmdargsn; i += 2) + { + char *colname; + + /* + * Double the query buffer if we are running low. + */ + if (query_pos - query > query_alloc - 256) + { + int have = query_pos - query; + + query_alloc *= 2; + query = realloc(query, query_alloc); + query_pos = query + have; + } + + if (i > 0) + { + strcpy(query_pos, ", "); + query_pos += 2; + } + + if (cmdargsnulls[i]) + elog(ERROR, "Slony-I: column name in log_cmdargs is NULL"); + querycolnames[i / 2] = DatumGetCString(DirectFunctionCall1( + textout, cmdargs[i])); + colname = (char *)slon_quote_identifier(querycolnames[i / 2]); + strcpy(query_pos, colname); + query_pos += strlen(query_pos); + } + + /* + * Add ") VALUES (" + */ + strcpy(query_pos, ") VALUES ("); + query_pos += strlen(query_pos); + + /* + * Add $n:: placeholders for all the values. + * At the same time assemble the Datum array, nulls string + * and typeoid array for query planning and execution. + */ + for (i = 0; i < cmdargsn; i += 2) + { + char *coltype; + + /* + * Double the query buffer if we are running low. + */ + if (query_pos - query > query_alloc - 256) + { + int have = query_pos - query; + + query_alloc *= 2; + query = realloc(query, query_alloc); + query_pos = query + have; + } + + /* + * Lookup the column data type in the target relation. + */ + coltype = SPI_gettype(target_rel->rd_att, + SPI_fnumber(target_rel->rd_att, querycolnames[i / 2])); + if (coltype == NULL) + elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", + querycolnames[i / 2]); + + /* + * Add the parameter to the query string and the + * datum to the query parameter array. + */ + sprintf(query_pos, "%s$%d::%s", (i == 0) ? "" : ", ", + i / 2 + 1, coltype); + query_pos += strlen(query_pos); + + queryvals[i / 2] = cmdargs[i + 1]; + if (cmdargsnulls[i + 1]) + querynulls[i / 2] = 'n'; + else + querynulls[i / 2] = ' '; + querytypes[i / 2] = TEXTOID; + } + + /* + * Finish the query string and terminate the nulls vector. + */ + strcpy(query_pos, ");"); + query_pos += 2; + querynulls[cmdargsn / 2] = '\0'; + querynvals = cmdargsn / 2; + + break; + + case 'U': + /* + * UPDATE + */ + querycolnames = (char **)palloc(sizeof(char *) * cmdargsn / 2); + queryvals = (Datum *)palloc(sizeof(Datum) * cmdargsn / 2); + querytypes = (Oid *)palloc(sizeof(Oid) * cmdargsn / 2); + querynulls = (char *)palloc(cmdargsn / 2 + 1); + + sprintf(query_pos, "UPDATE ONLY %s.%s SET ", + slon_quote_identifier(nspname), + slon_quote_identifier(relname)); + query_pos += strlen(query_pos); + + /* + * This can all be done in one pass over the cmdargs array. + * We just have to switch the behavior slightly between + * the SET clause and the WHERE clause. + */ + for (i = 0; i < cmdargsn; i += 2) + { + char *colname; + char *coltype; + + /* + * Double the query buffer if we are running low. + */ + if (query_pos - query > query_alloc - 256) + { + int have = query_pos - query; + + query_alloc *= 2; + query = realloc(query, query_alloc); + query_pos = query + have; + } + + /* + * Get the column name and data type. + */ + if (cmdargsnulls[i]) + elog(ERROR, "Slony-I: column name in log_cmdargs is NULL"); + colname = DatumGetCString(DirectFunctionCall1( + textout, cmdargs[i])); + coltype = SPI_gettype(target_rel->rd_att, + SPI_fnumber(target_rel->rd_att, colname)); + if (coltype == NULL) + elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", + colname); + + /* + * Special case if there were no columns updated. + * We tell it to set the first PK column to itself. + */ + if (cmdupdncols == 0) + { + sprintf(query_pos, "%s = %s", + slon_quote_identifier(colname), + slon_quote_identifier(colname)); + query_pos += strlen(query_pos); + } + + /* + * If we are at the transition point from SET to WHERE, + * add the WHERE keyword. + */ + if (i == cmdupdncols * 2) + { + strcpy(query_pos, " WHERE "); + query_pos += 7; + } + + if (i < cmdupdncols * 2) + { + /* + * This is inside the SET clause. + * Add the = $n:: separated by + * comma. + */ + sprintf(query_pos, "%s%s = $%d::%s", + (i > 0) ? ", " : "", + slon_quote_identifier(colname), + i / 2 + 1, coltype); + } + else + { + /* + * This is in the WHERE clause. Same as above but + * separated by AND. + */ + sprintf(query_pos, "%s%s = $%d::%s", + (i > cmdupdncols * 2) ? " AND " : "", + slon_quote_identifier(colname), + i / 2 + 1, coltype); + } + query_pos += strlen(query_pos); + + queryvals[i / 2] = cmdargs[i + 1]; + if (cmdargsnulls[i + 1]) + querynulls[i / 2] = 'n'; + else + querynulls[i / 2] = ' '; + querytypes[i / 2] = TEXTOID; + } + + strcpy(query_pos, ";"); + query_pos += 1; + querynulls[cmdargsn / 2] = '\0'; + querynvals = cmdargsn / 2; + + break; + + case 'D': + /* + * DELETE + */ + querycolnames = (char **)palloc(sizeof(char *) * cmdargsn / 2); + queryvals = (Datum *)palloc(sizeof(Datum) * cmdargsn / 2); + querytypes = (Oid *)palloc(sizeof(Oid) * cmdargsn / 2); + querynulls = (char *)palloc(cmdargsn / 2 + 1); + + sprintf(query_pos, "DELETE FROM ONLY %s.%s WHERE ", + slon_quote_identifier(nspname), + slon_quote_identifier(relname)); + query_pos += strlen(query_pos); + + for (i = 0; i < cmdargsn; i += 2) + { + char *colname; + char *coltype; + + /* + * Double the query buffer if we are running low. + */ + if (query_pos - query > query_alloc - 256) + { + int have = query_pos - query; + + query_alloc *= 2; + query = realloc(query, query_alloc); + query_pos = query + have; + } + + /* + * Add = $n:: separated by comma. + */ + if (cmdargsnulls[i]) + elog(ERROR, "Slony-I: column name in log_cmdargs is NULL"); + colname = DatumGetCString(DirectFunctionCall1( + textout, cmdargs[i])); + coltype = SPI_gettype(target_rel->rd_att, + SPI_fnumber(target_rel->rd_att, colname)); + if (coltype == NULL) + elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()", + colname); + sprintf(query_pos, "%s%s = $%d::%s", + (i > 0) ? " AND " : "", + slon_quote_identifier(colname), + i / 2 + 1, coltype); + + query_pos += strlen(query_pos); + + queryvals[i / 2] = cmdargs[i + 1]; + if (cmdargsnulls[i + 1]) + querynulls[i / 2] = 'n'; + else + querynulls[i / 2] = ' '; + querytypes[i / 2] = TEXTOID; + } + + strcpy(query_pos, ";"); + query_pos += 1; + + querynulls[cmdargsn / 2] = '\0'; + querynvals = cmdargsn / 2; + + break; + + case 'T': + /* + * TRUNCATE + */ + queryvals = (Datum *)palloc(sizeof(Datum) * 2); + querytypes = (Oid *)palloc(sizeof(Oid) * 2); + querynulls = (char *)palloc(3); + + sprintf(query_pos, "SELECT %s.TruncateOnlyTable(" + "%s.slon_quote_brute($1) || '.' || " + "%s.slon_quote_brute($2));", + slon_quote_identifier(NameStr(*cluster_name)), + slon_quote_identifier(NameStr(*cluster_name)), + slon_quote_identifier(NameStr(*cluster_name))); + + queryvals[0] = DirectFunctionCall1(textin, CStringGetDatum(nspname)); + queryvals[1] = DirectFunctionCall1(textin, CStringGetDatum(relname)); + querytypes[0] = TEXTOID; + querytypes[1] = TEXTOID; + querynulls[0] = ' '; + querynulls[1] = ' '; + querynulls[2] = '\0'; + querynvals = 2; + + break; + + default: + elog(ERROR, "Slony-I: unhandled log cmdtype '%c' in logApply()", + cmdtype); + break; + } + + /* + * Close the target relation. + */ + RelationClose(target_rel); + + /* + * Check the query cache if we have an entry. + */ + pg_md5_binary(query, strlen(query), &cacheKey); + cacheEnt = hash_search(applyCacheHash, &cacheKey, HASH_ENTER, &found); + if (found) + { + /* + * We are reusing an existing query plan. Just move it + * to the end of the list. + */ + if (cacheEnt->self != cacheEnt) + elog(ERROR, "logApply(): cacheEnt != cacheEnt->self"); + if (cacheEnt != applyCacheTail) + { + /* + * Remove the entry from the list + */ + if (cacheEnt->prev == NULL) + applyCacheHead = cacheEnt->next; + else + cacheEnt->prev->next = cacheEnt->next; + if (cacheEnt->next == NULL) + applyCacheTail = cacheEnt->prev; + else + cacheEnt->next->prev = cacheEnt->prev; + + /* + * Put the entry back at the end of the list. + */ + if (applyCacheHead == NULL) + { + cacheEnt->prev = NULL; + cacheEnt->next = NULL; + applyCacheHead = cacheEnt; + applyCacheTail = cacheEnt; + } + else + { + cacheEnt->prev = applyCacheTail; + cacheEnt->next = NULL; + applyCacheTail->next = cacheEnt; + applyCacheTail = cacheEnt; + } + } + } + else + { + Datum query_args[2]; + + /* + * Query plan not found in plan cache, need to SPI_prepare() it. + */ + cacheEnt->plan = SPI_saveplan( + SPI_prepare(query, querynvals, querytypes)); + if (cacheEnt->plan == NULL) + elog(ERROR, "Slony-I: SPI_prepare() failed for query '%s'", query); + + /* + * Add the plan to the double linked LRU list + */ + if (applyCacheHead == NULL) + { + cacheEnt->prev = NULL; + cacheEnt->next = NULL; + applyCacheHead = cacheEnt; + applyCacheTail = cacheEnt; + } + else + { + cacheEnt->prev = applyCacheTail; + cacheEnt->next = NULL; + applyCacheTail->next = cacheEnt; + applyCacheTail = cacheEnt; + } + cacheEnt->self = cacheEnt; + applyCacheUsed++; + + /* + * If that pushes us over the maximum allowed cached plans, + * evict the one that wasn't used the longest. + */ + if (applyCacheUsed > applyCacheSize) + { + ApplyCacheEntry *evict = applyCacheHead; + + SPI_freeplan(evict->plan); + + if (evict->prev == NULL) + applyCacheHead = evict->next; + else + evict->prev->next = evict->next; + if (evict->next == NULL) + applyCacheTail = evict->prev; + else + evict->next->prev = evict->prev; + + hash_search(applyCacheHash, &(evict->key), HASH_REMOVE, &found); + if (!found) + elog(ERROR, "Slony-I: cached queries hash entry not found " + "on evict"); + } + + /* + * We also need to determine if this table belongs to a + * set, that we are a forwarder of. + */ + query_args[0] = SPI_getbinval(new_row, tupdesc, + SPI_fnumber(tupdesc, "log_tableid"), &isnull); + query_args[1] = Int32GetDatum(cs->localNodeId); + + if (SPI_execp(cs->plan_table_info, query_args, NULL, 0) < 0) + elog(ERROR, "SPI_execp() failed for table forward lookup"); + + if (SPI_processed != 1) + elog(ERROR, "forwarding lookup for table %d failed", + DatumGetInt32(query_args[1])); + + cacheEnt->forward = DatumGetBool( + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, + SPI_fnumber(SPI_tuptable->tupdesc, "sub_forward"), &isnull)); + } + + /* + * Execute the query. + */ + if (cacheEnt->plan == NULL) + elog(ERROR, "Slony-I: cacheEnt->plan is NULL"); + if ((spi_rc = SPI_execp(cacheEnt->plan, queryvals, querynulls, 0)) < 0) + elog(ERROR, "Slony-I: SPI_execp() for query '%s' failed - rc=%d", + query, spi_rc); + + + SPI_finish(); + if (cacheEnt->forward) + return PointerGetDatum(tg->tg_trigtuple); + else + return PointerGetDatum(NULL); +} + + Datum _Slony_I_lockedSet(PG_FUNCTION_ARGS) { @@ -1034,50 +1626,6 @@ _Slony_I_seqtrack(PG_FUNCTION_ARGS) } -static char * -slon_quote_literal(char *str) -{ - char *result; - char *cp1; - char *cp2; - int len; - int wl; - - if (str == NULL) - return NULL; - - len = strlen(str); - result = palloc(len * 2 + 3); - cp1 = str; - cp2 = result; - - *cp2++ = '\''; - while (len > 0) - { - if ((wl = pg_mblen((const char *) cp1)) != 1) - { - len -= wl; - - while (wl-- > 0) - *cp2++ = *cp1++; - continue; - } - - if (*cp1 == '\'') - *cp2++ = '\''; - if (*cp1 == '\\') - *cp2++ = '\\'; - *cp2++ = *cp1++; - len--; - } - - *cp2++ = '\''; - *cp2++ = '\0'; - - return result; -} - - /* * slon_quote_identifier - Quote an identifier only if needed * @@ -1358,7 +1906,7 @@ getClusterStatus(Name cluster_name, int need_plan_mask) /* * Also create the 3 rather static text values for the log_cmdtype - * parameter and initialize the cmddata_buf. + * parameter. */ cs->cmdtype_I = malloc(VARHDRSZ + 1); SET_VARSIZE(cs->cmdtype_I, VARHDRSZ + 1); @@ -1376,13 +1924,59 @@ getClusterStatus(Name cluster_name, int need_plan_mask) sprintf(query, "SELECT last_value::int4 FROM %s.sl_log_status", cs->clusterident); cs->plan_get_logstatus = SPI_saveplan(SPI_prepare(query, 0, NULL)); - - cs->cmddata_size = 8192; - cs->cmddata_buf = (text *) malloc(8192); + if (cs->plan_get_logstatus == NULL) + elog(ERROR, "Slony-I: SPI_prepare() failed"); cs->have_plan |= PLAN_INSERT_LOG_STATUS; } + /* + * Prepare and save the PLAN_APPLY_QUERIES + */ + if ((need_plan_mask & PLAN_APPLY_QUERIES) != 0 && + (cs->have_plan & PLAN_APPLY_QUERIES) == 0) + { + /* @-nullderef@ */ + + /* + * The plan to insert into sl_log_script. + */ + sprintf(query, "insert into %s.sl_log_script " + "(log_origin, log_txid, log_actionseq, log_cmdargs) " + "values ($1, $2, $3, $4);", + slon_quote_identifier(NameStr(*cluster_name))); + plan_types[0] = INT4OID; + plan_types[1] = INT8OID; + plan_types[2] = INT8OID; + plan_types[3] = TEXTARRAYOID; + + cs->plan_insert_log_script = SPI_saveplan( + SPI_prepare(query, 4, plan_types)); + if (cs->plan_insert_log_script == NULL) + elog(ERROR, "Slony-I: SPI_prepare() failed"); + + /* + * The plan to lookup table forwarding info + */ + sprintf(query, + "select sub_forward from " + " %s.sl_subscribe, %s.sl_table " + " where tab_id = $1 and tab_set = sub_set " + " and sub_receiver = $2;", + slon_quote_identifier(NameStr(*cluster_name)), + slon_quote_identifier(NameStr(*cluster_name))); + + plan_types[0] = INT4OID; + plan_types[1] = INT4OID; + + cs->plan_table_info = SPI_saveplan( + SPI_prepare(query, 2, plan_types)); + if (cs->plan_table_info == NULL) + elog(ERROR, "Slony-I: SPI_prepare() failed"); + + cs->have_plan |= PLAN_APPLY_QUERIES; + } + return cs; /* @+nullderef@ */ } @@ -1391,6 +1985,11 @@ getClusterStatus(Name cluster_name, int need_plan_mask) * prepare the plan for the curren sl_log_x insert query. * */ + +#ifndef TEXTARRAYOID +#define TEXTARRAYOID 1009 +#endif + int prepareLogPlan(Slony_I_ClusterStatus * cs, int log_status) { @@ -1407,15 +2006,19 @@ int prepareLogPlan(Slony_I_ClusterStatus * cs, */ sprintf(query, "INSERT INTO %s.sl_log_1 " "(log_origin, log_txid, log_tableid, log_actionseq," - " log_cmdtype, log_cmddata) " + " log_tablenspname, log_tablerelname, " + " log_cmdtype, log_cmdupdncols, log_cmdargs) " "VALUES (%d, \"pg_catalog\".txid_current(), $1, " - "nextval('%s.sl_action_seq'), $2, $3); ", + "nextval('%s.sl_action_seq'), $2, $3, $4, $5, $6); ", cs->clusterident, cs->localNodeId, cs->clusterident); plan_types[0] = INT4OID; plan_types[1] = TEXTOID; plan_types[2] = TEXTOID; + plan_types[3] = TEXTOID; + plan_types[4] = INT4OID; + plan_types[5] = TEXTARRAYOID; - cs->plan_insert_log_1 = SPI_saveplan(SPI_prepare(query, 3, plan_types)); + cs->plan_insert_log_1 = SPI_saveplan(SPI_prepare(query, 6, plan_types)); if (cs->plan_insert_log_1 == NULL) elog(ERROR, "Slony-I: SPI_prepare() failed"); } @@ -1425,15 +2028,19 @@ int prepareLogPlan(Slony_I_ClusterStatus * cs, { sprintf(query, "INSERT INTO %s.sl_log_2 " "(log_origin, log_txid, log_tableid, log_actionseq," - " log_cmdtype, log_cmddata) " + " log_tablenspname, log_tablerelname, " + " log_cmdtype, log_cmdupdncols, log_cmdargs) " "VALUES (%d, \"pg_catalog\".txid_current(), $1, " - "nextval('%s.sl_action_seq'), $2, $3); ", + "nextval('%s.sl_action_seq'), $2, $3, $4, $5, $6); ", cs->clusterident, cs->localNodeId, cs->clusterident); plan_types[0] = INT4OID; plan_types[1] = TEXTOID; plan_types[2] = TEXTOID; + plan_types[3] = TEXTOID; + plan_types[4] = INT4OID; + plan_types[5] = TEXTARRAYOID; - cs->plan_insert_log_2 = SPI_saveplan(SPI_prepare(query, 3, plan_types)); + cs->plan_insert_log_2 = SPI_saveplan(SPI_prepare(query, 6, plan_types)); if (cs->plan_insert_log_2 == NULL) elog(ERROR, "Slony-I: SPI_prepare() failed"); } @@ -1462,8 +2069,6 @@ _Slony_I_resetSession(PG_FUNCTION_ARGS) free(cs->cmdtype_D); if(cs->cmdtype_U) free(cs->cmdtype_D); - if(cs->cmddata_buf) - free(cs->cmddata_buf); free(cs->clusterident); if(cs->plan_insert_event) SPI_freeplan(cs->plan_insert_event); diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql index 6cf7f8b8..061d51c6 100644 --- a/src/backend/slony1_funcs.sql +++ b/src/backend/slony1_funcs.sql @@ -717,6 +717,20 @@ begin raise notice 'You may run into problems later!'; end if; + -- + -- Put the apply trigger onto sl_log_1 and sl_log_2 + -- + create trigger apply_trigger + before INSERT on @NAMESPACE@.sl_log_1 + for each row execute procedure @NAMESPACE@.logApply('_@CLUSTERNAME@'); + alter table @NAMESPACE@.sl_log_1 + enable replica trigger apply_trigger; + create trigger apply_trigger + before INSERT on @NAMESPACE@.sl_log_2 + for each row execute procedure @NAMESPACE@.logApply('_@CLUSTERNAME@'); + alter table @NAMESPACE@.sl_log_2 + enable replica trigger apply_trigger; + return p_local_node_id; end; $$ language plpgsql; @@ -3324,171 +3338,65 @@ Set sequence seq_id to have new value last_value. '; -- ---------------------------------------------------------------------- --- FUNCTION ddlScript_prepare (set_id, only_on_node) +-- FUNCTION ddlCapture (origin, statement) -- --- Generate the DDL_SCRIPT event +-- Capture DDL into sl_log_script -- ---------------------------------------------------------------------- -create or replace function @NAMESPACE@.ddlScript_prepare (p_set_id int4, p_only_on_node int4) +create or replace function @NAMESPACE@.ddlCapture (p_statement text, p_nodes text) returns integer as $$ declare - v_set_origin int4; -begin - -- ---- - -- Check that the set exists and originates here - -- ---- - select set_origin into v_set_origin - from @NAMESPACE@.sl_set - where set_id = p_set_id - for update; - if not found then - raise exception 'Slony-I: set % not found', p_set_id; - end if; - if p_only_on_node = -1 then - if v_set_origin <> @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@') then - raise exception 'Slony-I: set % does not originate on local node', - p_set_id; - end if; - -- ---- - -- Create a SYNC event - -- ---- - perform @NAMESPACE@.createEvent('_@CLUSTERNAME@', 'SYNC', NULL); - else - -- If running "ONLY ON NODE", there are two possibilities: - -- 1. Running on origin, where denyaccess() triggers are already shut off - -- 2. Running on replica, where we need the LOCAL role to suppress denyaccess() triggers - execute 'create temp table _slony1_saved_session_replication_role ( - setting text);'; - execute 'insert into _slony1_saved_session_replication_role - select setting from pg_catalog.pg_settings - where name = ''session_replication_role'';'; - if (v_set_origin <> @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@')) then - execute 'set session_replication_role to local;'; - end if; - end if; - return 1; -end; -$$ language plpgsql; - -comment on function @NAMESPACE@.ddlScript_prepare (p_set_id int4, p_only_on_node int4) is -'Prepare for DDL script execution on origin'; - --- perform @NAMESPACE@.ddlScript_int(p_set_id, p_script, p_only_on_node); - --- ---------------------------------------------------------------------- --- FUNCTION ddlScript_complete (set_id, script, only_on_node) --- --- Generate the DDL_SCRIPT event --- ---------------------------------------------------------------------- -drop function if exists @NAMESPACE@.ddlScript_complete (int4, text, int4); -- Needed because function signature has changed! - -create or replace function @NAMESPACE@.ddlScript_complete (p_set_id int4, p_script text, p_only_on_node int4) -returns bigint -as $$ -declare - v_set_origin int4; - v_query text; - v_row record; -begin - if p_only_on_node = -1 then - perform @NAMESPACE@.ddlScript_complete_int(p_set_id,p_only_on_node); - return @NAMESPACE@.createEvent('_@CLUSTERNAME@', 'DDL_SCRIPT', - p_set_id::text, p_script::text, p_only_on_node::text); - end if; - if p_only_on_node <> -1 then - for v_row in execute - 'select setting from _slony1_saved_session_replication_role' loop - v_query := 'set session_replication_role to ' || v_row.setting; - end loop; - execute v_query; - execute 'drop table _slony1_saved_session_replication_role'; - perform @NAMESPACE@.ddlScript_complete_int(p_set_id,p_only_on_node); - end if; - return NULL; -end; -$$ language plpgsql; - -comment on function @NAMESPACE@.ddlScript_complete(p_set_id int4, p_script text, p_only_on_node int4) is -'ddlScript_complete(set_id, script, only_on_node) - -After script has run on origin, this fixes up relnames, restores -triggers, and generates a DDL_SCRIPT event to request it to be run on -replicated slaves.'; - --- ---------------------------------------------------------------------- --- FUNCTION ddlScript_prepare_int (set_id, only_on_node) --- --- Prepare for the DDL_SCRIPT event --- ---------------------------------------------------------------------- -create or replace function @NAMESPACE@.ddlScript_prepare_int (p_set_id int4, p_only_on_node int4) -returns int4 -as $$ -declare - v_set_origin int4; - v_no_id int4; - v_row record; + c_local_node integer; + c_found_origin boolean; + c_node text; + c_cmdargs text[]; begin - -- ---- - -- Check that we either are the set origin or a current - -- subscriber of the set. - -- ---- - v_no_id := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); - select set_origin into v_set_origin - from @NAMESPACE@.sl_set - where set_id = p_set_id - for update; - if not found then - raise exception 'Slony-I: set % not found', p_set_id; - end if; - if v_set_origin <> v_no_id - and not exists (select 1 from @NAMESPACE@.sl_subscribe - where sub_set = p_set_id - and sub_receiver = v_no_id) - then - return 0; - end if; - - -- ---- - -- If execution on only one node is requested, check that - -- we are that node. - -- ---- - if p_only_on_node > 0 and p_only_on_node <> v_no_id then - return 0; - end if; - - return p_set_id; + c_local_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); + + c_cmdargs = array_append('{}'::text[], p_statement); + if p_nodes is not null then + c_found_origin := 'f'; + -- p_nodes list needs to consist of a list of nodes that exist + -- and that include the current node ID + for c_node in select trim(node) from + pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop + if not exists + (select 1 from @NAMESPACE@.sl_node + where no_id = (c_node::integer)) then + raise exception 'ddlcapture(%,%) - node % does not exist!', + p_statement, p_nodes, c_node; + end if; + + if c_local_node = (c_node::integer) then + c_found_origin := 't'; + end if; + + c_cmdargs = array_append(c_cmdargs, c_node); + end loop; + + if not c_found_origin then + raise exception + 'ddlcapture(%,%) - origin node % not included in ONLY ON list!', + p_statement, p_nodes, c_local_node; + end if; + end if; + + execute p_statement; + + insert into @NAMESPACE@.sl_log_script + (log_origin, log_txid, log_actionseq, log_cmdargs) + values + (c_local_node, pg_catalog.txid_current(), + nextval('@NAMESPACE@.sl_action_seq'), c_cmdargs); + + return currval('@NAMESPACE@.sl_action_seq'); end; $$ language plpgsql; -comment on function @NAMESPACE@.ddlScript_prepare_int (p_set_id int4, p_only_on_node int4) is -'ddlScript_prepare_int (set_id, only_on_node) - -Do preparatory work for a DDL script, restoring -triggers/rules to original state.'; +comment on function @NAMESPACE@.ddlCapture (p_statement text, p_nodes text) is +'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers'; --- ---------------------------------------------------------------------- --- FUNCTION ddlScript_complete_int (set_id, only_on_node) --- --- Complete the DDL_SCRIPT event --- ---------------------------------------------------------------------- -create or replace function @NAMESPACE@.ddlScript_complete_int (p_set_id int4, p_only_on_node int4) -returns int4 -as $$ -declare - v_row record; -begin - perform @NAMESPACE@.updateRelname(p_set_id, p_only_on_node); - perform @NAMESPACE@.repair_log_triggers(true); - return p_set_id; -end; -$$ language plpgsql; -comment on function @NAMESPACE@.ddlScript_complete_int(p_set_id int4, p_only_on_node int4) is -'ddlScript_complete_int(set_id, script, only_on_node) - -Complete processing the DDL_SCRIPT event. This puts tables back into -replicated mode.'; - -- ---------------------------------------------------------------------- -- FUNCTION alterTableAddTriggers (tab_id) -- ---------------------------------------------------------------------- @@ -3783,6 +3691,43 @@ begin end if; end if; + -- --- + -- Enforce that all sets from one origin are subscribed + -- using the same data provider per receiver. + -- ---- + if not exists (select 1 from @NAMESPACE@.sl_subscribe + where sub_set = p_sub_set and sub_receiver = p_sub_receiver) then + -- + -- New subscription - error out if we have any other subscription + -- from that origin with a different data provider. + -- + for v_rec in select sub_provider from @NAMESPACE@.sl_subscribe + join @NAMESPACE@.sl_set on set_id = sub_set + where set_origin = v_set_origin and sub_receiver = p_sub_receiver + loop + if v_rec.sub_provider <> p_sub_provider then + raise exception 'Slony-I: subscribeSet(): wrong provider % - existing subscription from origin % users provider %', + p_sub_provider, v_set_origin, v_rec.sub_provider; + end if; + end loop; + else + -- + -- Existing subscription - in case the data provider changes and + -- there are other subscriptions, warn here. subscribeSet_int() + -- will currently change the data provider for those sets as well. + -- + for v_rec in select set_id, sub_provider from @NAMESPACE@.sl_subscribe + join @NAMESPACE@.sl_set on set_id = sub_set + where set_origin = v_set_origin and sub_receiver = p_sub_receiver + and set_id <> p_sub_set + loop + if v_rec.sub_provider <> p_sub_provider then + raise notice 'Slony-I: subscribeSet(): data provider for set % will also be changed', + v_rec.set_id; + end if; + end loop; + end if; + -- ---- -- Create the SUBSCRIBE_SET event -- ---- @@ -3821,6 +3766,16 @@ declare v_set_origin int4; v_sub_row record; begin + -- ---- + -- Lookup the set origin + -- ---- + select set_origin into v_set_origin + from @NAMESPACE@.sl_set + where set_id = p_sub_set; + if not found then + raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; + end if; + -- ---- -- Provider change is only allowed for active sets -- ---- @@ -3845,6 +3800,40 @@ begin where sub_set = p_sub_set and sub_receiver = p_sub_receiver; if found then + -- ---- + -- This is changing a subscriptoin. Make sure all sets from + -- this origin are subscribed using the same data provider. + -- For this we first check that the requested data provider + -- is subscribed to all the sets, the receiver is subscribed to. + -- ---- + for v_sub_row in select set_id from @NAMESPACE@.sl_set + join @NAMESPACE@.sl_subscribe on set_id = sub_set + where set_origin = v_set_origin + and sub_receiver = p_sub_receiver + and sub_set <> p_sub_set + loop + if not exists (select 1 from @NAMESPACE@.sl_subscribe + where sub_set = v_sub_row.set_id + and sub_receiver = p_sub_provider + and sub_active and sub_forward) + and not exists (select 1 from @NAMESPACE@.sl_set + where set_id = v_sub_row.set_id + and set_origin = p_sub_provider) + then + raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %', + p_sub_provider, v_sub_row.set_id; + end if; + + -- ---- + -- New data provider offers this set as well, change that + -- subscription too. + -- ---- + update @NAMESPACE@.sl_subscribe + set sub_provider = p_sub_provider + where sub_set = v_sub_row.set_id + and sub_receiver = p_sub_receiver; + end loop; + -- ---- -- Rewrite sl_listen table -- ---- @@ -3874,13 +3863,6 @@ begin -- ---- -- If the set origin is here, then enable the subscription -- ---- - select set_origin into v_set_origin - from @NAMESPACE@.sl_set - where set_id = p_sub_set; - if not found then - raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; - end if; - if v_set_origin = @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@') then perform @NAMESPACE@.createEvent('_@CLUSTERNAME@', 'ENABLE_SUBSCRIPTION', p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, @@ -3906,6 +3888,7 @@ comment on function @NAMESPACE@.subscribeSet_int (p_sub_set int4, p_sub_provider Internal actions for subscribing receiver sub_receiver to subscription set sub_set.'; + -- ---------------------------------------------------------------------- -- FUNCTION unsubscribeSet (sub_set, sub_receiver) -- ---------------------------------------------------------------------- @@ -4222,6 +4205,7 @@ begin where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from @NAMESPACE@.sl_event where ev_type = 'SYNC' group by ev_origin) loop delete from @NAMESPACE@.sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno; + delete from @NAMESPACE@.sl_log_script where log_origin = v_origin and log_txid < v_xmin; end loop; v_rc := @NAMESPACE@.logswitch_finish(); @@ -4616,12 +4600,14 @@ begin tab_relname = PGC.relname, tab_nspname = PGN.nspname from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN where @NAMESPACE@.sl_table.tab_reloid = PGC.oid - and PGC.relnamespace = PGN.oid; + and PGC.relnamespace = PGN.oid and + (tab_relname <> PGC.relname or tab_nspname <> PGN.nspname); update @NAMESPACE@.sl_sequence set seq_relname = PGC.relname, seq_nspname = PGN.nspname from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN where @NAMESPACE@.sl_sequence.seq_reloid = PGC.oid - and PGC.relnamespace = PGN.oid; + and PGC.relnamespace = PGN.oid and + (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname); return p_set_id; end; $$ language plpgsql; @@ -4996,19 +4982,27 @@ system switches between sl_log_1 and sl_log_2.'; -- -- Called by slonik during the function upgrade process. -- ---------------------------------------------------------------------- +create or replace function @NAMESPACE@.check_table_field_exists (p_namespace text, p_table text, p_field text) +returns bool as $$ +BEGIN + return exists ( + select 1 from "information_schema".columns + where table_schema = p_namespace + and table_name = p_table + and column_name = p_field + ); +END;$$ language plpgsql; + +comment on function @NAMESPACE@.check_table_field_exists (p_namespace text, p_table text, p_field text) +is 'Check if a table has a specific attribute'; + create or replace function @NAMESPACE@.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) returns bool as $$ DECLARE v_row record; v_query text; BEGIN - select 1 into v_row from pg_namespace n, pg_class c, pg_attribute a - where @NAMESPACE@.slon_quote_brute(n.nspname) = p_namespace and - c.relnamespace = n.oid and - @NAMESPACE@.slon_quote_brute(c.relname) = p_table and - a.attrelid = c.oid and - @NAMESPACE@.slon_quote_brute(a.attname) = p_field; - if not found then + if not @NAMESPACE@.check_table_field_exists(p_namespace, p_table, p_field) then raise notice 'Upgrade table %.% - add field %', p_namespace, p_table, p_field; v_query := 'alter table ' || p_namespace || '.' || p_table || ' add column '; v_query := v_query || p_field || ' ' || p_type || ';'; @@ -5080,9 +5074,213 @@ create table @NAMESPACE@.sl_components ( v_query := 'create table @NAMESPACE@.sl_event_lock (dummy integer);'; execute v_query; end if; + + -- + -- On the upgrade to 2.2, we change the layout of sl_log_N by + -- adding columns log_tablenspname, log_tablerelname, and + -- log_cmdupdncols as well as changing log_cmddata into + -- log_cmdargs, which is a text array. + -- + if not @NAMESPACE@.check_table_field_exists('_@CLUSTERNAME@', 'sl_log_1', 'log_cmdargs') then + -- + -- Check that the cluster is completely caught up + -- + if @NAMESPACE@.check_unconfirmed_log() then + raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; + end if; + + -- + -- Drop tables sl_log_1 and sl_log_2 + -- + drop table @NAMESPACE@.sl_log_1; + drop table @NAMESPACE@.sl_log_2; + + -- + -- Create the new sl_log_1 + -- + create table @NAMESPACE@.sl_log_1 ( + log_origin int4, + log_txid bigint, + log_tableid int4, + log_actionseq int8, + log_tablenspname text, + log_tablerelname text, + log_cmdtype char, + log_cmdupdncols int4, + log_cmdargs text[] + ) without oids; + create index sl_log_1_idx1 on @NAMESPACE@.sl_log_1 + (log_origin, log_txid, log_actionseq); + + comment on table @NAMESPACE@.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; + comment on column @NAMESPACE@.sl_log_1.log_origin is 'Origin node from which the change came'; + comment on column @NAMESPACE@.sl_log_1.log_txid is 'Transaction ID on the origin node'; + comment on column @NAMESPACE@.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; + comment on column @NAMESPACE@.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; + comment on column @NAMESPACE@.sl_log_1.log_tablenspname is 'The schema name of the table affected'; + comment on column @NAMESPACE@.sl_log_1.log_tablerelname is 'The table name of the table affected'; + comment on column @NAMESPACE@.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; + comment on column @NAMESPACE@.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; + comment on column @NAMESPACE@.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; + + -- + -- Create the new sl_log_2 + -- + create table @NAMESPACE@.sl_log_2 ( + log_origin int4, + log_txid bigint, + log_tableid int4, + log_actionseq int8, + log_tablenspname text, + log_tablerelname text, + log_cmdtype char, + log_cmdupdncols int4, + log_cmdargs text[] + ) without oids; + create index sl_log_2_idx1 on @NAMESPACE@.sl_log_2 + (log_origin, log_txid, log_actionseq); + + comment on table @NAMESPACE@.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; + comment on column @NAMESPACE@.sl_log_2.log_origin is 'Origin node from which the change came'; + comment on column @NAMESPACE@.sl_log_2.log_txid is 'Transaction ID on the origin node'; + comment on column @NAMESPACE@.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; + comment on column @NAMESPACE@.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; + comment on column @NAMESPACE@.sl_log_2.log_tablenspname is 'The schema name of the table affected'; + comment on column @NAMESPACE@.sl_log_2.log_tablerelname is 'The table name of the table affected'; + comment on column @NAMESPACE@.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; + comment on column @NAMESPACE@.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; + comment on column @NAMESPACE@.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; + + create table @NAMESPACE@.sl_log_script ( + log_origin int4, + log_txid bigint, + log_actionseq int8, + log_query text, + log_only_on text + ) WITHOUT OIDS; + create index sl_log_script_idx1 on @NAMESPACE@.sl_log_script + (log_origin, log_txid, log_actionseq); + + comment on table @NAMESPACE@.sl_log_script is 'Captures DDL queries to be propagated to subscriber nodes'; + comment on column @NAMESPACE@.sl_log_script.log_origin is 'Origin name from which the change came'; + comment on column @NAMESPACE@.sl_log_script.log_txid is 'Transaction ID on the origin node'; + comment on column @NAMESPACE@.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; + comment on column @NAMESPACE@.sl_log_script.log_query is 'The data needed to perform the log action on the replica.'; + comment on column @NAMESPACE@.sl_log_script.log_only_on is 'Optional list of nodes on which scripts are to be executed'; + + -- + -- Put the log apply triggers back onto sl_log_1/2 + -- + create trigger apply_trigger + before INSERT on @NAMESPACE@.sl_log_1 + for each row execute procedure @NAMESPACE@.logApply('_@CLUSTERNAME@'); + alter table @NAMESPACE@.sl_log_1 + enable replica trigger apply_trigger; + create trigger apply_trigger + before INSERT on @NAMESPACE@.sl_log_2 + for each row execute procedure @NAMESPACE@.logApply('_@CLUSTERNAME@'); + alter table @NAMESPACE@.sl_log_2 + enable replica trigger apply_trigger; + end if; + return p_old; end; -$$ language plpgsql +$$ language plpgsql; + +create or replace function @NAMESPACE@.check_unconfirmed_log () +returns bool as $$ +declare + v_rc bool = false; + v_error bool = false; + v_origin integer; + v_allconf bigint; + v_allsnap txid_snapshot; + v_count bigint; +begin + -- + -- Loop over all nodes that are the origin of at least one set + -- + for v_origin in select distinct set_origin as no_id + from @NAMESPACE@.sl_set loop + -- + -- Per origin determine which is the highest event seqno + -- that is confirmed by all subscribers to any of the + -- origins sets. + -- + select into v_allconf min(max_seqno) from ( + select con_received, max(con_seqno) as max_seqno + from @NAMESPACE@.sl_confirm + where con_origin = v_origin + and con_received in ( + select distinct sub_receiver + from @NAMESPACE@.sl_set as SET, + @NAMESPACE@.sl_subscribe as SUB + where SET.set_id = SUB.sub_set + and SET.set_origin = v_origin + ) + group by con_received + ) as maxconfirmed; + if not found then + raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin; + v_error = true; + continue; + end if; + + -- + -- Get the txid snapshot that corresponds with that event + -- + select into v_allsnap ev_snapshot + from @NAMESPACE@.sl_event + where ev_origin = v_origin + and ev_seqno = v_allconf; + if not found then + raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf; + v_error = true; + continue; + end if; + + -- + -- Count the number of log rows that appeard after that event. + -- + select into v_count count(*) from ( + select 1 from @NAMESPACE@.sl_log_1 + where log_origin = v_origin + and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) + union all + select 1 from @NAMESPACE@.sl_log_1 + where log_origin = v_origin + and log_txid in ( + select * from "pg_catalog".txid_snapshot_xip(v_allsnap) + ) + union all + select 1 from @NAMESPACE@.sl_log_2 + where log_origin = v_origin + and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) + union all + select 1 from @NAMESPACE@.sl_log_2 + where log_origin = v_origin + and log_txid in ( + select * from "pg_catalog".txid_snapshot_xip(v_allsnap) + ) + ) as cnt; + + if v_count > 0 then + raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count; + v_rc = true; + end if; + end loop; + + if v_error then + raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency'; + end if; + + return v_rc; +end; +$$ language plpgsql; + +-- +-- XXX What is this doing here? +-- set search_path to @NAMESPACE@ ; @@ -5305,6 +5503,11 @@ begin if @NAMESPACE@.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then return next prec; end if; + prec.nspname := '_@CLUSTERNAME@'; + prec.relname := 'sl_log_script'; + if @NAMESPACE@.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then + return next prec; + end if; prec.nspname := 'pg_catalog'; prec.relname := 'pg_listener'; if @NAMESPACE@.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then @@ -5520,23 +5723,37 @@ comment on function @NAMESPACE@.slon_node_health_check() is 'called when slon st create or replace function @NAMESPACE@.log_truncate () returns trigger as $$ declare - c_command text; + c_nspname text; + c_relname text; c_log integer; c_node integer; c_tabid integer; begin c_tabid := tg_argv[0]; c_node := @NAMESPACE@.getLocalNodeId('_@CLUSTERNAME@'); - c_command := 'TRUNCATE TABLE ONLY "' || tab_nspname || '"."' || - tab_relname || '" CASCADE' + select tab_nspname, tab_relname into c_nspname, c_relname from @NAMESPACE@.sl_table where tab_id = c_tabid; select last_value into c_log from @NAMESPACE@.sl_log_status; if c_log in (0, 2) then - insert into @NAMESPACE@.sl_log_1 (log_origin, log_txid, log_tableid, log_actionseq, log_cmdtype, log_cmddata) - values (c_node, pg_catalog.txid_current(), c_tabid, nextval('"_@CLUSTERNAME@".sl_action_seq'), 'T', c_command); + insert into @NAMESPACE@.sl_log_1 ( + log_origin, log_txid, log_tableid, + log_actionseq, log_tablenspname, + log_tablerelname, log_cmdtype, + log_cmdupdncols, log_cmdargs + ) values ( + c_node, pg_catalog.txid_current(), c_tabid, + nextval('@NAMESPACE@.sl_action_seq'), c_nspname, + c_relname, 'T', 0, array[]::text[]); else -- (1, 3) - insert into @NAMESPACE@.sl_log_2 (log_origin, log_txid, log_tableid, log_actionseq, log_cmdtype, log_cmddata) - values (c_node, pg_catalog.txid_current(), c_tabid, nextval('"_@CLUSTERNAME@".sl_action_seq'), 'T', c_command); + insert into @NAMESPACE@.sl_log_2 ( + log_origin, log_txid, log_tableid, + log_actionseq, log_tablenspname, + log_tablerelname, log_cmdtype, + log_cmdupdncols, log_cmdargs + ) values ( + c_node, pg_catalog.txid_current(), c_tabid, + nextval('@NAMESPACE@.sl_action_seq'), c_nspname, + c_relname, 'T', 0, array[]::text[]); end if; return NULL; end @@ -5676,7 +5893,18 @@ language plpgsql; comment on function @NAMESPACE@.repair_log_triggers(only_locked boolean) is ' repair the log triggers as required. If only_locked is true then only -tables that are already exclusivly locked by the current transaction are +tables that are already exclusively locked by the current transaction are repaired. Otherwise all replicated tables with outdated trigger arguments are recreated.'; + +-- ---------------------------------------------------------------------- +-- FUNCTION logApply () +-- +-- +-- ---------------------------------------------------------------------- +create or replace function @NAMESPACE@.logApply () returns trigger + as '$libdir/slony1_funcs.@MODULEVERSION@', '_Slony_I_logApply' + language C + security definer; + diff --git a/src/slon/confoptions.c b/src/slon/confoptions.c index 43e9c5d0..86f39874 100644 --- a/src/slon/confoptions.c +++ b/src/slon/confoptions.c @@ -119,30 +119,6 @@ static struct config_int ConfigureNamesInt[] = 0, 2147483647 }, - { - { - (const char *) "sync_max_rowsize", /* conf name */ - gettext_noop("sl_log_? rows larger than that are read separately"), /* short desc */ - gettext_noop("sl_log_? rows with octet_length(log_cmddata) larger than this are read separately"), /* long desc */ - SLON_C_INT /* config type */ - }, - &sync_max_rowsize, /* var name */ - 8192, /* default val */ - 1024, /* min val */ - 32768 /* max val */ - }, - { - { - (const char *) "sync_max_largemem", /* conf name */ - gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* short desc */ - gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* long desc */ - SLON_C_INT /* config type */ - }, - &sync_max_largemem, /* var name */ - 5242880, /* default val */ - 1048576, /* min val */ - 1073741824 /* max val */ - }, { { diff --git a/src/slon/confoptions.h b/src/slon/confoptions.h index 8f62a678..6fd315e9 100644 --- a/src/slon/confoptions.h +++ b/src/slon/confoptions.h @@ -18,8 +18,6 @@ extern char *archive_dir; extern int slon_log_level; extern int sync_interval; extern int sync_interval_timeout; -extern int sync_max_rowsize; -extern int sync_max_largemem; extern int remote_listen_timeout; extern int sync_group_maxsize; diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c index 7f54421e..941c5338 100644 --- a/src/slon/remote_worker.c +++ b/src/slon/remote_worker.c @@ -111,7 +111,6 @@ struct SlonWorkMsg_s typedef struct ProviderInfo_s ProviderInfo; typedef struct ProviderSet_s ProviderSet; typedef struct WorkerGroupData_s WorkerGroupData; -typedef struct WorkerGroupLine_s WorkerGroupLine; struct ProviderSet_s @@ -170,10 +169,6 @@ struct ProviderInfo_s WorkerGroupData *wd; - pthread_t helper_thread; - pthread_mutex_t helper_lock; - pthread_cond_t helper_cond; - WorkGroupStatus helper_status; SlonDString helper_query; int log_status; @@ -191,39 +186,12 @@ struct WorkerGroupData_s int active_log_table; - char *tab_forward; - char **tab_fqname; - int tab_fqname_size; - ProviderInfo *provider_head; ProviderInfo *provider_tail; - pthread_mutex_t workdata_lock; - WorkGroupStatus workgroup_status; - int workdata_largemem; - - pthread_cond_t repldata_cond; - WorkerGroupLine *repldata_head; - WorkerGroupLine *repldata_tail; - - pthread_cond_t linepool_cond; - WorkerGroupLine *linepool_head; - WorkerGroupLine *linepool_tail; }; -struct WorkerGroupLine_s -{ - WorkGroupLineCode code; - ProviderInfo *provider; - SlonDString data; - SlonDString log; - int line_largemem; - - WorkerGroupLine *prev; - WorkerGroupLine *next; -}; - /* * Global status for all remote worker threads, remembering the last seen @@ -243,8 +211,6 @@ static struct node_confirm_status *node_confirm_tail = NULL; static pthread_mutex_t node_confirm_lock = PTHREAD_MUTEX_INITIALIZER; int sync_group_maxsize; -int sync_max_rowsize; -int sync_max_largemem; int explain_interval; time_t explain_lastsec; int explain_thistime; @@ -272,10 +238,9 @@ static void start_monitored_event(PerfMon *pm); static void monitor_provider_query(PerfMon *pm); static void monitor_subscriber_query(PerfMon *pm); static void monitor_subscriber_iud(PerfMon *pm); -static void monitor_largetuples(PerfMon *pm); static void adjust_provider_info(SlonNode *node, - WorkerGroupData *wd, int cleanup); + WorkerGroupData *wd, int cleanup, int event_provider); static int query_execute(SlonNode *node, PGconn *dbconn, SlonDString *dsp); static void query_append_event(SlonDString *dsp, @@ -287,13 +252,14 @@ static int copy_set(SlonNode *node, SlonConn *local_conn, int set_id, SlonWorkMsg_event *event); static int sync_event(SlonNode *node, SlonConn *local_conn, WorkerGroupData *wd, SlonWorkMsg_event *event); -static void *sync_helper(void *cdata); +static int sync_helper(void *cdata,PGconn * local_dbconn); static int archive_open(SlonNode *node, char *seqbuf, PGconn *dbconn); static int archive_close(SlonNode *node); static void archive_terminate(SlonNode *node); + static int archive_append_ds(SlonNode *node, SlonDString *ds); static int archive_append_str(SlonNode *node, const char *s); static int archive_append_data(SlonNode *node, const char *s, int len); @@ -353,19 +319,9 @@ remoteWorkerThread_main(void *cdata) memset(wd, 0, sizeof(WorkerGroupData)); } - pthread_mutex_init(&(wd->workdata_lock), NULL); - pthread_cond_init(&(wd->repldata_cond), NULL); - pthread_cond_init(&(wd->linepool_cond), NULL); - pthread_mutex_lock(&(wd->workdata_lock)); - wd->workgroup_status = SLON_WG_IDLE; + wd->node = node; - wd->workdata_largemem = 0; - wd->tab_fqname_size = SLON_MAX_PATH; - wd->tab_fqname = (char **) malloc(sizeof(char *) * wd->tab_fqname_size); - memset(wd->tab_fqname, 0, sizeof(char *) * wd->tab_fqname_size); - wd->tab_forward = malloc(wd->tab_fqname_size); - memset(wd->tab_forward, 0, (size_t) (wd->tab_fqname_size)); dstring_init(&query1); dstring_init(&query2); @@ -418,7 +374,7 @@ remoteWorkerThread_main(void *cdata) if (curr_config != rtcfg_seq_get()) { - adjust_provider_info(node, wd, false); + adjust_provider_info(node, wd, false, -1); curr_config = rtcfg_seq_get(); /* @@ -575,6 +531,7 @@ remoteWorkerThread_main(void *cdata) */ sync_group[0] = event; + sync_group_size = 1; if (true) { int initial_proposed = sg_proposed; @@ -676,8 +633,7 @@ remoteWorkerThread_main(void *cdata) /* * replace query1 with the forwarding of all the grouped sync - * events and a commit. Also free all the WMSG structures except - * the last one (it's freed further down). + * events and a commit. */ dstring_reset(&query1); sg_last_grouping = 0; @@ -692,6 +648,11 @@ remoteWorkerThread_main(void *cdata) if (query_execute(node, local_dbconn, &query1) < 0) slon_retry(); + + /* + * Remember the sync snapshot in the in memory node structure + */ + rtcfg_setNodeLastSnapshot(node->no_id, event->ev_snapshot_c); } else /* not SYNC */ { @@ -1408,144 +1369,9 @@ remoteWorkerThread_main(void *cdata) need_reloadListen = true; } - else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) + else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) { - int ddl_setid = (int) strtol(event->ev_data1, NULL, 10); - char *ddl_script = event->ev_data2; - int ddl_only_on_node = (int) strtol(event->ev_data3, NULL, 10); - int num_statements = -1, - stmtno; - int node_in_set; - int localNodeId; - - PGresult *res; - ExecStatusType rstat; - - /** - * Check to make sure this node is part of the set - */ - slon_log(SLON_INFO, "Checking local node id\n"); - localNodeId = db_getLocalNodeId(local_dbconn); - slon_log(SLON_INFO, "Found local node id\n"); - node_in_set = check_set_subscriber(ddl_setid, localNodeId, local_dbconn); - - if (!node_in_set) - { - /** - * - * Node is not part of the set. - * Do not forward the DDL to the node, - * nor should it be included in the log for log-shipping. - */ - slon_log(SLON_INFO, "Not forwarding DDL to node %d for set %d\n", - node->no_id, ddl_setid); - - } - else - { - - slon_appendquery(&query1, - "set session_replication_role to local; " - "lock table %s.sl_config_lock;" - "select %s.ddlScript_prepare_int(%d, %d); ", - rtcfg_namespace, - rtcfg_namespace, - ddl_setid, ddl_only_on_node); - - if (query_execute(node, local_dbconn, &query1) < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %d\n", - node->no_id, ddl_setid, ddl_only_on_node); - slon_retry(); - } - - num_statements = scan_for_statements(ddl_script); - slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n", - node->no_id, num_statements); - if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", - node->no_id, num_statements); - slon_retry(); - } - - for (stmtno = 0; stmtno < num_statements; stmtno++) - { - int startpos, - endpos; - char *dest; - - if (stmtno == 0) - startpos = 0; - else - startpos = STMTS[stmtno - 1]; - - endpos = STMTS[stmtno]; - dest = (char *) malloc(endpos - startpos + 1); - if (dest == 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", - node->no_id, endpos - startpos + 1); - slon_retry(); - } - strncpy(dest, ddl_script + startpos, endpos - startpos); - dest[STMTS[stmtno] - startpos] = 0; - (void) slon_mkquery(&query1, "%s", dest); - slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", - node->no_id, stmtno, dest); - free(dest); - - res = PQexec(local_dbconn, dstring_data(&query1)); - - if (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK && - PQresultStatus(res) != PGRES_EMPTY_QUERY) - { - rstat = PQresultStatus(res); - slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat)); - PQclear(res); - dstring_free(&query1); - slon_retry(); - } - rstat = PQresultStatus(res); - slon_log(SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat)); - PQclear(res); - } - - (void) slon_mkquery(&query1, - "select %s.ddlScript_complete_int(%d, %d); " - "set session_replication_role to replica; ", - rtcfg_namespace, - ddl_setid, - ddl_only_on_node); - - /* - * DDL_SCRIPT needs to be turned into a log shipping - * script - */ - - /* - * Note that the issue about parsing that mandates - * breaking up compound statements into - * individually-processed statements does not apply to log - * shipping as psql parses and processes each statement - * individually - */ - - if (archive_dir) - { - if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) - { - - if (archive_append_str(node, "set session_replication_role to local;\n") < 0) - slon_retry(); - if (archive_append_str(node, ddl_script) < 0) - slon_retry(); - if (archive_append_str(node, "set session_replication_role to replica;\n") < 0) - slon_retry(); - } - } - } + /* don't need to do anything for this event */ } else if (strcmp(event->ev_type, "RESET_CONFIG") == 0) { @@ -1602,19 +1428,12 @@ remoteWorkerThread_main(void *cdata) * Thread exit time has arrived. Disconnect from all data providers and * free memory */ - adjust_provider_info(node, wd, true); - - pthread_mutex_unlock(&(wd->workdata_lock)); - pthread_mutex_destroy(&(wd->workdata_lock)); - pthread_cond_destroy(&(wd->repldata_cond)); - pthread_cond_destroy(&(wd->linepool_cond)); + adjust_provider_info(node, wd, true, -1); slon_disconnectdb(local_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); - free(wd->tab_fqname); - free(wd->tab_forward); #ifdef SLON_MEMDEBUG local_conn = NULL; memset(wd, 66, sizeof(WorkerGroupData)); @@ -1634,14 +1453,14 @@ remoteWorkerThread_main(void *cdata) * ---------- */ static void -adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup) +adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup, + int event_provider) { ProviderInfo *provider; ProviderInfo *provnext; ProviderSet *pset; SlonNode *rtcfg_node; SlonSet *rtcfg_set; - int i; slon_log(SLON_CONFIG, "remoteWorkerThread_%d: " "update provider configuration\n", @@ -1662,8 +1481,7 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup) * We create a lock here and keep it until we made our final decision * about what to do with the helper thread. */ - pthread_mutex_lock(&(provider->helper_lock)); - + while ((pset = provider->set_head) != NULL) { DLLIST_REMOVE(provider->set_head, provider->set_tail, @@ -1719,49 +1537,13 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup) provider->no_id = rtcfg_set->sub_provider; provider->wd = wd; - /* - * Also create a helper thread for this provider, which - * will actually run the log data selection for us. - */ - pthread_mutex_init(&(provider->helper_lock), NULL); - pthread_mutex_lock(&(provider->helper_lock)); - pthread_cond_init(&(provider->helper_cond), NULL); - dstring_init(&(provider->helper_query)); - provider->helper_status = SLON_WG_IDLE; - if (pthread_create(&(provider->helper_thread), NULL, - sync_helper, (void *) provider) != 0) - { - slon_log(SLON_FATAL, "remoteWorkerThread_%d: ", - "pthread_create() - %s\n", - node->no_id, strerror(errno)); - slon_retry(); - } - slon_log(SLON_CONFIG, "remoteWorkerThread_%d: " - "helper thread for provider %d created\n", - node->no_id, provider->no_id); - - /* - * Add more workgroup data lines to the pool. - */ - for (i = 0; i < SLON_WORKLINES_PER_HELPER; i++) - { - WorkerGroupLine *line; - - line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine)); - memset(line, 0, sizeof(WorkerGroupLine)); - line->line_largemem = 0; - dstring_init(&(line->data)); - dstring_init(&(line->log)); - DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail, - line); - } + dstring_init(&provider->helper_query); /* * Add the provider to our work group */ DLLIST_ADD_TAIL(wd->provider_head, wd->provider_tail, provider); - /* * Copy the runtime configurations conninfo into the * provider info. @@ -1810,41 +1592,15 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup) * If the list of currently replicated sets we receive from this * provider is empty, we don't need to maintain a connection to it. */ - if (provider->set_head == NULL) + if (provider->set_head == NULL && provider->no_id != event_provider) { /* * Tell this helper thread to exit, join him and destroy thread * related data. */ - provider->helper_status = SLON_WG_EXIT; - pthread_cond_signal(&(provider->helper_cond)); - pthread_mutex_unlock(&(provider->helper_lock)); - pthread_join(provider->helper_thread, NULL); - pthread_cond_destroy(&(provider->helper_cond)); - pthread_mutex_destroy(&(provider->helper_lock)); - slon_log(SLON_CONFIG, "remoteWorkerThread_%d: " "helper thread for provider %d terminated\n", - node->no_id, provider->no_id); - - /* - * Remove the line buffers we added for this helper. - */ - for (i = 0; i < SLON_WORKLINES_PER_HELPER; i++) - { - WorkerGroupLine *line; - - if ((line = wd->linepool_head) == NULL) - break; - dstring_free(&(line->data)); - dstring_free(&(line->log)); - DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail, - line); -#ifdef SLON_MEMDEBUG - memset(line, 55, sizeof(WorkerGroupLine)); -#endif - free(line); - } + node->no_id, provider->no_id); /* * Disconnect from the database. @@ -1897,11 +1653,52 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup) else provider->pa_conninfo = strdup(rtcfg_node->pa_conninfo); } + } - /* - * Unlock the helper thread ... he should now go and wait for work. - */ - pthread_mutex_unlock(&(provider->helper_lock)); + /* + * Step 4. + * + * Make sure the event provider is in the list of providers. + */ + if (event_provider >= 0) + { + for (provider = wd->provider_head; provider; + provider = provider->next) + { + if (provider->no_id == event_provider) + break; + } + if (provider == NULL) + { + /* + * No provider entry found. Create a new one. + */ + provider = (ProviderInfo *) + malloc(sizeof(ProviderInfo)); + memset(provider, 0, sizeof(ProviderInfo)); + provider->no_id = event_provider; + provider->wd = wd; + + dstring_init(&provider->helper_query); + + /* + * Add the provider to our work group + */ + DLLIST_ADD_TAIL(wd->provider_head, wd->provider_tail, + provider); + /* + * Copy the runtime configurations conninfo into the + * provider info. + */ + rtcfg_node = rtcfg_findNode(provider->no_id); + if (rtcfg_node != NULL) + { + provider->pa_connretry = rtcfg_node->pa_connretry; + if (rtcfg_node->pa_conninfo != NULL) + provider->pa_conninfo = + strdup(rtcfg_node->pa_conninfo); + } + } } } @@ -3709,9 +3506,8 @@ sync_event(SlonNode *node, SlonConn *local_conn, PGresult *res1; int ntuples1; int num_sets = 0; - int num_providers_active = 0; - int num_errors; - WorkerGroupLine *wgline; + int num_errors=0; + int i; int rc; char seqbuf[64]; @@ -3754,6 +3550,19 @@ sync_event(SlonNode *node, SlonConn *local_conn, } } + /* + * Make sure that we have the event provider in our provider list. + */ + for (provider = wd->provider_head; provider; provider = provider->next) + { + if (provider->no_id == event->event_provider) + break; + } + if (provider == NULL) + { + adjust_provider_info(node, wd, false, event->event_provider); + } + /* * Establish all required data provider connections */ @@ -3884,12 +3693,15 @@ sync_event(SlonNode *node, SlonConn *local_conn, int rc; int need_union; int sl_log_no; +slon_log(SLON_DEBUG2, + "remoteWorkerThread_%d: creating log select for provider %d\n", + node->no_id, provider->no_id); need_union = 0; provider_query = &(provider->helper_query); dstring_reset(provider_query); (void) slon_mkquery(provider_query, - "declare LOG cursor for "); + "COPY ( "); /* * Get the current sl_log_status value for this provider @@ -3934,317 +3746,342 @@ sync_event(SlonNode *node, SlonConn *local_conn, node->no_id, provider->no_id, provider->log_status); /* - * Select all sets we receive from this provider and which are not - * synced better than this SYNC already. + * Add the DDL selection to the provider_query if this is the + * event provider. In case we are subscribed to any set(s) from + * the origin, this is implicitly the data provider because we + * only listen for events on that node. */ - (void) slon_mkquery(&query, - "select SSY.ssy_setid, SSY.ssy_seqno, " - " \"pg_catalog\".txid_snapshot_xmax(SSY.ssy_snapshot), " - " SSY.ssy_snapshot, " - " SSY.ssy_action_list " - "from %s.sl_setsync SSY " - "where SSY.ssy_seqno < '%s' " - " and SSY.ssy_setid in (", - rtcfg_namespace, seqbuf); - for (pset = provider->set_head; pset; pset = pset->next) - slon_appendquery(&query, "%s%d", - (pset->prev == NULL) ? "" : ",", - pset->set_id); - slon_appendquery(&query, "); "); - - start_monitored_event(&pm); - res1 = PQexec(local_dbconn, dstring_data(&query)); - monitor_subscriber_query(&pm); - - slon_log(SLON_DEBUG1, "about to monitor_subscriber_query - pulling big actionid list for %d\n", provider->no_id); - - if (PQresultStatus(res1) != PGRES_TUPLES_OK) + if (provider->no_id == event->event_provider) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", - node->no_id, dstring_data(&query), - PQresultErrorMessage(res1)); - PQclear(res1); - dstring_free(&query); - dstring_free(&lsquery); - archive_terminate(node); - return 60; + slon_appendquery(provider_query, + "select log_origin, log_txid, " + "NULL::integer, log_actionseq, " + "NULL::text, NULL::text, 'S'::\"char\", " + "NULL::integer, log_cmdargs " + "from %s.sl_log_script " + "where log_origin = %d ", + rtcfg_namespace, node->no_id); + slon_appendquery(provider_query, + "and log_txid >= \"pg_catalog\".txid_snapshot_xmax('%s') " + "and log_txid < '%s' " + "and \"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s') ", + node->last_snapshot, + event->ev_maxtxid_c, + event->ev_snapshot_c); + + slon_appendquery(provider_query, + "union all " + "select log_origin, log_txid, " + "NULL::integer, log_actionseq, " + "NULL::text, NULL::text, 'S'::\"char\", " + "NULL::integer, log_cmdargs " + "from %s.sl_log_script " + "where log_origin = %d ", + rtcfg_namespace, node->no_id); + slon_appendquery(provider_query, + "and log_txid in (select * from " + "\"pg_catalog\".txid_snapshot_xip('%s') " + "except " + "select * from " + "\"pg_catalog\".txid_snapshot_xip('%s') )", + node->last_snapshot, + event->ev_snapshot_c); + + need_union = 1; } /* - * For every set we receive from this provider + * Only go through the trouble of looking up the setsync and tables + * if we actually use this provider for data. */ - ntuples1 = PQntuples(res1); - if (ntuples1 == 0) - { - PQclear(res1); - slon_appendquery(provider_query,"select * FROM %s.sl_log_1" - " where false",rtcfg_namespace); - continue; - } - num_sets += ntuples1; - - for (tupno1 = 0; tupno1 < ntuples1; tupno1++) + if (provider->set_head != NULL) { - int sub_set = strtol(PQgetvalue(res1, tupno1, 0), NULL, 10); - char *ssy_maxxid = PQgetvalue(res1, tupno1, 2); - char *ssy_snapshot = PQgetvalue(res1, tupno1, 3); - char *ssy_action_list = PQgetvalue(res1, tupno1, 4); - int64 ssy_seqno; - - slon_scanint64(PQgetvalue(res1, tupno1, 1), &ssy_seqno); - if (min_ssy_seqno < 0 || ssy_seqno < min_ssy_seqno) - min_ssy_seqno = ssy_seqno; - /* - * Select the tables in that set ... + * Select all sets we receive from this provider and which are not + * synced better than this SYNC already. */ (void) slon_mkquery(&query, - "select T.tab_id, T.tab_set, " - " %s.slon_quote_brute(PGN.nspname) || '.' || " - " %s.slon_quote_brute(PGC.relname) as tab_fqname " - "from %s.sl_table T, " - " \"pg_catalog\".pg_class PGC, " - " \"pg_catalog\".pg_namespace PGN " - "where T.tab_set = %d " - " and PGC.oid = T.tab_reloid " - " and PGC.relnamespace = PGN.oid; ", - rtcfg_namespace, - rtcfg_namespace, - rtcfg_namespace, - sub_set); + "select SSY.ssy_setid, SSY.ssy_seqno, " + " \"pg_catalog\".txid_snapshot_xmax(SSY.ssy_snapshot), " + " SSY.ssy_snapshot, " + " SSY.ssy_action_list " + "from %s.sl_setsync SSY " + "where SSY.ssy_seqno < '%s' " + " and SSY.ssy_setid in (", + rtcfg_namespace, seqbuf); + for (pset = provider->set_head; pset; pset = pset->next) + slon_appendquery(&query, "%s%d", + (pset->prev == NULL) ? "" : ",", + pset->set_id); + slon_appendquery(&query, "); "); start_monitored_event(&pm); - res2 = PQexec(local_dbconn, dstring_data(&query)); + res1 = PQexec(local_dbconn, dstring_data(&query)); monitor_subscriber_query(&pm); - if (PQresultStatus(res2) != PGRES_TUPLES_OK) + slon_log(SLON_DEBUG1, "about to monitor_subscriber_query - pulling big actionid list for %d\n", provider->no_id); + + if (PQresultStatus(res1) != PGRES_TUPLES_OK) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", node->no_id, dstring_data(&query), - PQresultErrorMessage(res2)); - PQclear(res2); + PQresultErrorMessage(res1)); PQclear(res1); dstring_free(&query); dstring_free(&lsquery); archive_terminate(node); return 60; } - ntuples2 = PQntuples(res2); - slon_log(SLON_INFO, "remoteWorkerThread_%d: " - "syncing set %d with %d table(s) from provider %d\n", - node->no_id, sub_set, ntuples2, - provider->no_id); - if (ntuples2 == 0) + ntuples1 = PQntuples(res1); + if (ntuples1 == 0) { - PQclear(res2); + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " + "no setsync found for provider %d\n", + node->no_id, provider->no_id); + PQclear(res1); + if (need_union) + { + dstring_append(provider_query, + " order by log_actionseq) TO STDOUT"); + dstring_terminate(provider_query); + } + else + { + slon_mkquery(provider_query, + "COPY ( " + "select log_origin, log_txid, log_tableid, " + "log_actionseq, log_tablenspname, " + "log_tablerelname, log_cmdtype, " + "log_cmdupdncols, log_cmdargs " + "from %s.sl_log_1 " + "where false) TO STDOUT", + rtcfg_namespace); + } continue; } - ntables_total += ntuples2; + num_sets += ntuples1; /* - * ... and build up a the provider query + * For every set we receive from this provider */ - for (sl_log_no = 1; sl_log_no <= 2; sl_log_no++) + for (tupno1 = 0; tupno1 < ntuples1; tupno1++) { - /* - * We only need to query sl_log_1 when log_status is - * 0 or during log switching (log_status 2 and 3). - */ - if (sl_log_no == 1 && provider->log_status == 1) - continue; - /* - * Likewise we only query sl_log_2 when log_status is - * 1, 2 or 3. - */ - if (sl_log_no == 2 && provider->log_status == 0) - continue; - + int sub_set = strtol(PQgetvalue(res1, tupno1, 0), NULL, 10); + char *ssy_maxxid = PQgetvalue(res1, tupno1, 2); + char *ssy_snapshot = PQgetvalue(res1, tupno1, 3); + char *ssy_action_list = PQgetvalue(res1, tupno1, 4); + int64 ssy_seqno; - if (need_union) - { - slon_appendquery(provider_query, " union all "); - } - need_union = 1; + slon_scanint64(PQgetvalue(res1, tupno1, 1), &ssy_seqno); + if (min_ssy_seqno < 0 || ssy_seqno < min_ssy_seqno) + min_ssy_seqno = ssy_seqno; /* - * First for the big chunk that does the index - * scan with upper and lower bounds: - * - * select ... from sl_log_N - * where log_origin = X - * and log_tableid in () + * Select the tables in that set ... */ - slon_appendquery(provider_query, - "select log_origin, log_txid, log_tableid, " - "log_actionseq, log_cmdtype, " - "octet_length(log_cmddata), " - "case when octet_length(log_cmddata) <= %d " - "then log_cmddata " - "else null end " - "from %s.sl_log_%d " - "where log_origin = %d " - "and log_tableid in (", - sync_max_rowsize, - rtcfg_namespace, sl_log_no, - node->no_id); - for (tupno2 = 0; tupno2 < ntuples2; tupno2++) - { - if (tupno2 > 0) - dstring_addchar(provider_query, ','); - dstring_append(provider_query, - PQgetvalue(res2, tupno2, 0)); - } - dstring_append(provider_query, ") "); + (void) slon_mkquery(&query, + "select T.tab_id, T.tab_set, " + " %s.slon_quote_brute(PGN.nspname) || '.' || " + " %s.slon_quote_brute(PGC.relname) as tab_fqname " + "from %s.sl_table T, " + " \"pg_catalog\".pg_class PGC, " + " \"pg_catalog\".pg_namespace PGN " + "where T.tab_set = %d " + " and PGC.oid = T.tab_reloid " + " and PGC.relnamespace = PGN.oid; ", + rtcfg_namespace, + rtcfg_namespace, + rtcfg_namespace, + sub_set); - /* - * and log_txid >= '' - * and log_txid < '' - * and txit_visible_in_snapshot(log_txid, '') - */ - slon_appendquery(provider_query, - "and log_txid >= '%s' " - "and log_txid < '%s' " - "and \"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s') ", - ssy_maxxid, - event->ev_maxtxid_c, - event->ev_snapshot_c); + start_monitored_event(&pm); + res2 = PQexec(local_dbconn, dstring_data(&query)); + monitor_subscriber_query(&pm); - /* - * and () - */ - actionlist_len = strlen(ssy_action_list); - slon_log(SLON_DEBUG2, "remoteWorkerThread_%d_%d: " - "ssy_action_list length: %d\n", - node->no_id, provider->no_id, - actionlist_len); - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d_%d: " - "ssy_action_list value: %s\n", - node->no_id, provider->no_id, - ssy_action_list); - if (actionlist_len > 0) + if (PQresultStatus(res2) != PGRES_TUPLES_OK) { - dstring_init(&actionseq_subquery); - compress_actionseq(ssy_action_list, &actionseq_subquery); - slon_appendquery(provider_query, - " and (%s)", - dstring_data(&actionseq_subquery)); - dstring_free(&actionseq_subquery); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query), + PQresultErrorMessage(res2)); + PQclear(res2); + PQclear(res1); + dstring_free(&query); + dstring_free(&lsquery); + archive_terminate(node); + return 60; } + ntuples2 = PQntuples(res2); + slon_log(SLON_INFO, "remoteWorkerThread_%d: " + "syncing set %d with %d table(s) from provider %d\n", + node->no_id, sub_set, ntuples2, + provider->no_id); - /* - * Now do it all over again to get the log rows - * from in-progress transactions at snapshot one - * that have committed by the time of snapshot two. - * again, we do: - * - * select ... from sl_log_N - * where log_origin = X - * and log_tableid in () - */ - slon_appendquery(provider_query, - "union all " - "select log_origin, log_txid, log_tableid, " - "log_actionseq, log_cmdtype, " - "octet_length(log_cmddata), " - "case when octet_length(log_cmddata) <= %d " - "then log_cmddata " - "else null end " - "from %s.sl_log_%d " - "where log_origin = %d " - "and log_tableid in (", - sync_max_rowsize, - rtcfg_namespace, sl_log_no, - node->no_id); - for (tupno2 = 0; tupno2 < ntuples2; tupno2++) + if (ntuples2 == 0) { - if (tupno2 > 0) - dstring_addchar(provider_query, ','); - dstring_append(provider_query, - PQgetvalue(res2, tupno2, 0)); + PQclear(res2); + continue; } - dstring_append(provider_query, ") "); - - /* - * and log_txid in (select - * txid_snapshot_xip('')) - * and txit_visible_in_snapshot(log_txid, '') - */ - slon_appendquery(provider_query, - "and log_txid in (select * from " - "\"pg_catalog\".txid_snapshot_xip('%s') " - "except " - "select * from " - "\"pg_catalog\".txid_snapshot_xip('%s') )", - ssy_snapshot, - event->ev_snapshot_c); + ntables_total += ntuples2; /* - * and () + * ... and build up the log selection query */ - actionlist_len = strlen(ssy_action_list); - if (actionlist_len > 0) + for (sl_log_no = 1; sl_log_no <= 2; sl_log_no++) { - dstring_init(&actionseq_subquery); - compress_actionseq(ssy_action_list, &actionseq_subquery); - slon_appendquery(provider_query, - " and (%s)", - dstring_data(&actionseq_subquery)); - dstring_free(&actionseq_subquery); - } - } - - /* Remember info about the tables in the set */ - for (tupno2 = 0; tupno2 < ntuples2; tupno2++) - { - int tab_id = strtol(PQgetvalue(res2, tupno2, 0), NULL, 10); - int tab_set = strtol(PQgetvalue(res2, tupno2, 1), NULL, 10); - SlonSet *rtcfg_set; + /* + * We only need to query sl_log_1 when log_status is + * 0 or during log switching (log_status 2 and 3). + */ + if (sl_log_no == 1 && provider->log_status == 1) + continue; + /* + * Likewise we only query sl_log_2 when log_status is + * 1, 2 or 3. + */ + if (sl_log_no == 2 && provider->log_status == 0) + continue; - /* - * Remember the fully qualified table name on the fly. This - * might have to become a hashtable someday. - */ - while (tab_id >= wd->tab_fqname_size) - { - wd->tab_fqname = (char **) realloc(wd->tab_fqname, - sizeof(char *) * wd->tab_fqname_size * 2); - memset(&(wd->tab_fqname[wd->tab_fqname_size]), 0, - sizeof(char *) * wd->tab_fqname_size); - wd->tab_forward = realloc(wd->tab_forward, - wd->tab_fqname_size * 2); - memset(&(wd->tab_forward[wd->tab_fqname_size]), 0, - wd->tab_fqname_size); - wd->tab_fqname_size *= 2; - } - wd->tab_fqname[tab_id] = strdup(PQgetvalue(res2, tupno2, 2)); + if (need_union) + { + slon_appendquery(provider_query, " union all "); + } + need_union = 1; - /* - * Also remember if the tables log data needs to be forwarded. - */ - for (rtcfg_set = rtcfg_set_list_head; rtcfg_set; - rtcfg_set = rtcfg_set->next) - { - if (rtcfg_set->set_id == tab_set) + /* + * First for the big chunk that does the index + * scan with upper and lower bounds: + * + * select ... from sl_log_N + * where log_origin = X + * and log_tableid in () + */ + slon_appendquery(provider_query, + "select log_origin, log_txid, log_tableid, " + "log_actionseq, log_tablenspname, " + "log_tablerelname, log_cmdtype, " + "log_cmdupdncols, log_cmdargs " + "from %s.sl_log_%d " + "where log_origin = %d " + "and log_tableid in (", + rtcfg_namespace, sl_log_no, + node->no_id); + for (tupno2 = 0; tupno2 < ntuples2; tupno2++) { - wd->tab_forward[tab_id] = rtcfg_set->sub_forward; - break; + if (tupno2 > 0) + dstring_addchar(provider_query, ','); + dstring_append(provider_query, + PQgetvalue(res2, tupno2, 0)); } - } - } + dstring_append(provider_query, ") "); - PQclear(res2); - } - PQclear(res1); + /* + * and log_txid >= '' + * and log_txid < '' + * and txit_visible_in_snapshot(log_txid, '') + */ + slon_appendquery(provider_query, + "and log_txid >= '%s' " + "and log_txid < '%s' " + "and \"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s') ", + ssy_maxxid, + event->ev_maxtxid_c, + event->ev_snapshot_c); - /* - * Finally add the order by clause. - */ - dstring_append(provider_query, " order by log_actionseq"); - dstring_terminate(provider_query); + /* + * and () + */ + actionlist_len = strlen(ssy_action_list); + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d_%d: " + "ssy_action_list length: %d\n", + node->no_id, provider->no_id, + actionlist_len); + slon_log(SLON_DEBUG4, "remoteWorkerThread_%d_%d: " + "ssy_action_list value: %s\n", + node->no_id, provider->no_id, + ssy_action_list); + if (actionlist_len > 0) + { + dstring_init(&actionseq_subquery); + compress_actionseq(ssy_action_list, &actionseq_subquery); + slon_appendquery(provider_query, + " and (%s)", + dstring_data(&actionseq_subquery)); + dstring_free(&actionseq_subquery); + } - /* + /* + * Now do it all over again to get the log rows + * from in-progress transactions at snapshot one + * that have committed by the time of snapshot two. + * again, we do: + * + * select ... from sl_log_N + * where log_origin = X + * and log_tableid in () + */ + + slon_appendquery(provider_query, + "union all " + "select log_origin, log_txid, log_tableid, " + "log_actionseq, log_tablenspname, " + "log_tablerelname, log_cmdtype, " + "log_cmdupdncols, log_cmdargs " + "from %s.sl_log_%d " + "where log_origin = %d " + "and log_tableid in (", + rtcfg_namespace, sl_log_no, + node->no_id); + for (tupno2 = 0; tupno2 < ntuples2; tupno2++) + { + if (tupno2 > 0) + dstring_addchar(provider_query, ','); + dstring_append(provider_query, + PQgetvalue(res2, tupno2, 0)); + } + dstring_append(provider_query, ") "); + + /* + * and log_txid in (select + * txid_snapshot_xip('')) + * and txit_visible_in_snapshot(log_txid, '') + */ + slon_appendquery(provider_query, + "and log_txid in (select * from " + "\"pg_catalog\".txid_snapshot_xip('%s') " + "except " + "select * from " + "\"pg_catalog\".txid_snapshot_xip('%s') )", + ssy_snapshot, + event->ev_snapshot_c); + + /* + * and () + */ + actionlist_len = strlen(ssy_action_list); + if (actionlist_len > 0) + { + dstring_init(&actionseq_subquery); + compress_actionseq(ssy_action_list, &actionseq_subquery); + slon_appendquery(provider_query, + " and (%s)", + dstring_data(&actionseq_subquery)); + dstring_free(&actionseq_subquery); + } + } + PQclear(res2); + } + PQclear(res1); + } + + /* + * Finally add the order by clause. + */ + dstring_append(provider_query, " order by log_actionseq) TO STDOUT"); + dstring_terminate(provider_query); + + /* * Check that we select something from the provider. */ if (!need_union) @@ -4254,38 +4091,17 @@ sync_event(SlonNode *node, SlonConn *local_conn, * sets that we subscribe from this node. */ slon_mkquery(provider_query, - "declare LOG cursor for " + "COPY ( " "select log_origin, log_txid, log_tableid, " - "log_actionseq, log_cmdtype, " - "octet_length(log_cmddata), " - "case when octet_length(log_cmddata) <= %d " - "then log_cmddata " - "else null end " + "log_actionseq, log_tablenspname, " + "log_tablerelname, log_cmdtype, " + "log_cmdupdncols, log_cmdargs " "from %s.sl_log_1 " - "where false", - sync_max_rowsize, rtcfg_namespace); + "where false) TO STDOUT", + rtcfg_namespace); } } - /* - * If we have found no sets needing sync at all, why bother the helpers? - */ - if (num_sets == 0) - { - slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: " - "no sets need syncing for this event\n", - node->no_id); - dstring_free(&query); - dstring_free(&lsquery); - if (archive_dir) - { - rc = archive_close(node); - if (rc < 0) - slon_retry(); - } - return 0; - } - /* * Get the current sl_log_status */ @@ -4351,214 +4167,19 @@ sync_event(SlonNode *node, SlonConn *local_conn, /* * Time to get the helpers busy. */ - wd->workgroup_status = SLON_WG_BUSY; - pthread_mutex_unlock(&(wd->workdata_lock)); for (provider = wd->provider_head; provider; provider = provider->next) { - pthread_mutex_lock(&(provider->helper_lock)); - slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " - "activate helper %d\n", - node->no_id, provider->no_id); - provider->helper_status = SLON_WG_BUSY; - pthread_cond_signal(&(provider->helper_cond)); - pthread_mutex_unlock(&(provider->helper_lock)); - num_providers_active++; - } - - num_errors = 0; - while (num_providers_active > 0) - { - WorkerGroupLine *lines_head = NULL; - WorkerGroupLine *wgnext = NULL; - - /* - * Consume the replication data from the providers - */ - pthread_mutex_lock(&(wd->workdata_lock)); - while (wd->repldata_head == NULL) - { - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: waiting for log data\n", - node->no_id); - pthread_cond_wait(&(wd->repldata_cond), &(wd->workdata_lock)); - } - lines_head = wd->repldata_head; - wd->repldata_head = NULL; - wd->repldata_tail = NULL; - pthread_mutex_unlock(&(wd->workdata_lock)); - - for (wgline = lines_head; wgline; wgline = wgline->next) - { - /* - * Got a line ... process content - */ - switch (wgline->code) - { - case SLON_WGLC_ACTION: - if (num_errors > 0) - break; - - if (wgline->log.n_used > 0) - { - start_monitored_event(&pm); - res1 = PQexec(local_dbconn, dstring_data(&(wgline->log))); - monitor_subscriber_iud(&pm); - - if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) - { - PQclear(res1); - break; - } - if (PQresultStatus(res1) != PGRES_COMMAND_OK) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "\"%s\" %s - query was: %s\n", - node->no_id, dstring_data(&(wgline->data)), - PQresultErrorMessage(res1), - dstring_data(&(wgline->provider->helper_query))); - num_errors++; - } - PQclear(res1); - } - - start_monitored_event(&pm); - res1 = PQexec(local_dbconn, dstring_data(&(wgline->data))); - monitor_subscriber_iud(&pm); - - if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) - { - PQclear(res1); - break; - } - if (PQresultStatus(res1) != PGRES_COMMAND_OK) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "\"%s\" %s - query was: %s\n", - node->no_id, dstring_data(&(wgline->data)), - PQresultErrorMessage(res1), - dstring_data(&(wgline->provider->helper_query))); - num_errors++; - } -#ifdef SLON_CHECK_CMDTUPLES - else - { - if (strtol(PQcmdTuples(res1), NULL, 10) != 1) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "replication query did not affect " - "one data row (cmdTuples = %s) - " - "query was: %s - query was: %s\n", - node->no_id, PQcmdTuples(res1), - dstring_data(&(wgline->data)), - dstring_data(&(wgline->provider->helper_query))); - num_errors++; - } - else - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: %s\n", - node->no_id, dstring_data(&(wgline->data))); - } -#endif - PQclear(res1); - - /* - * Add the user data modification part to the archive log. - */ - if (archive_dir) - { - rc = archive_append_ds(node, &(wgline->data)); - if (rc < 0) - slon_retry(); - } - break; - - case SLON_WGLC_DONE: - provider = wgline->provider; - slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " - "helper %d finished\n", - node->no_id, wgline->provider->no_id); - num_providers_active--; - break; - - case SLON_WGLC_ERROR: - provider = wgline->provider; - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "helper %d finished with error\n", - node->no_id, wgline->provider->no_id); - num_providers_active--; - num_errors++; - break; - } - } - - /* - * Put the line buffers back into the pool. + /** + * instead of starting the helpers we want to + * perform the COPY on each provider. */ - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: returning lines to pool\n", - node->no_id); - pthread_mutex_lock(&(wd->workdata_lock)); - for (wgline = lines_head; wgline; wgline = wgnext) - { - wgnext = wgline->next; - if (wgline->line_largemem > 0) - { - /* - * Really free the lines that contained large rows - */ - dstring_free(&(wgline->data)); - dstring_free(&(wgline->log)); - dstring_init(&(wgline->data)); - dstring_init(&(wgline->log)); - wd->workdata_largemem -= wgline->line_largemem; - wgline->line_largemem = 0; - } - else - { - /* - * just reset (and allow to grow further) the small ones - */ - dstring_reset(&(wgline->data)); - dstring_reset(&(wgline->log)); - } - DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline); - } - if (num_errors == 1) - wd->workgroup_status = SLON_WG_ABORT; - pthread_cond_broadcast(&(wd->linepool_cond)); - pthread_mutex_unlock(&(wd->workdata_lock)); + num_errors+=sync_helper((void*)provider,local_dbconn); } - /* - * Inform the helpers that the whole group is done with this SYNC. - */ - slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " - "all helpers done.\n", - node->no_id); - pthread_mutex_lock(&(wd->workdata_lock)); - for (provider = wd->provider_head; provider; provider = provider->next) - { - slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: " - "changing helper %d to IDLE\n", - node->no_id, provider->no_id); - pthread_mutex_lock(&(provider->helper_lock)); - provider->helper_status = SLON_WG_IDLE; - pthread_cond_signal(&(provider->helper_cond)); - pthread_mutex_unlock(&(provider->helper_lock)); - } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: cleanup\n", node->no_id); - /* - * Cleanup - */ - for (i = 0; i < wd->tab_fqname_size; i++) - { - if (wd->tab_fqname[i] != NULL) - { - free(wd->tab_fqname[i]); - wd->tab_fqname[i] = NULL; - } - } - memset(wd->tab_forward, 0, wd->tab_fqname_size); /* * If there have been any errors, abort the SYNC @@ -4582,6 +4203,12 @@ sync_event(SlonNode *node, SlonConn *local_conn, int tupno1; char min_ssy_seqno_buf[64]; + /* + * Skip this if the provider is only here for DDL. + */ + if (provider->set_head == NULL) + continue; + sprintf(min_ssy_seqno_buf, INT64_FORMAT, min_ssy_seqno); (void) slon_mkquery(&query, @@ -4750,683 +4377,334 @@ sync_event(SlonNode *node, SlonConn *local_conn, * sync_helper * ---------- */ -static void * -sync_helper(void *cdata) +static int +sync_helper(void *cdata,PGconn * local_conn) { ProviderInfo *provider = (ProviderInfo *) cdata; - WorkerGroupData *wd = provider->wd; - SlonNode *node = wd->node; + SlonNode *node = provider->wd->node; + WorkerGroupData * wd = provider->wd; PGconn *dbconn; - WorkerGroupLine *line = NULL; SlonDString query; - SlonDString query2; + SlonDString copy_in; int errors; struct timeval tv_start; struct timeval tv_now; int first_fetch; int log_status; int rc; - - WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE]; - int data_line_alloc = 0; - int data_line_first = 0; - int data_line_last = 0; - - PGresult *res = NULL; - PGresult *res2 = NULL; - PGresult *res3 = NULL; + int rc2; int ntuples; int tupno; - - int line_no; - int line_ncmds; + PGresult *res = NULL; + PGresult *res2 = NULL; + char * buffer; PerfMon pm; dstring_init(&query); - dstring_init(&query2); - for (;;) + + /* + * OK, we got work to do. + */ + dbconn = provider->conn->dbconn; + + errors = 0; + + init_perfmon(&pm); + /* + * Start a transaction + */ + + (void) slon_mkquery(&query, "start transaction; " + "set enable_seqscan = off; " + "set enable_indexscan = on; "); + + start_monitored_event(&pm); + + if (query_execute(node, dbconn, &query) < 0) { - pthread_mutex_lock(&(provider->helper_lock)); - while (provider->helper_status == SLON_WG_IDLE) - { - slon_log(SLON_DEBUG4, "remoteHelperThread_%d_%d: " - "waiting for work\n", - node->no_id, provider->no_id); - - pthread_cond_wait(&(provider->helper_cond), &(provider->helper_lock)); - } + errors++; + dstring_terminate(&query); + return errors; + } + monitor_subscriber_query (&pm); + + /* + * Get the current sl_log_status value + */ + (void) slon_mkquery(&query, "select last_value from %s.sl_log_status", + rtcfg_namespace); + + start_monitored_event(&pm); + res2 = PQexec(dbconn, dstring_data(&query)); + monitor_provider_query(&pm); - if (provider->helper_status == SLON_WG_EXIT) + rc = PQresultStatus(res2); + if (rc != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, + "remoteWorkerThread_%d: \"%s\" %s %s\n", + node->no_id, dstring_data(&query), + PQresStatus(rc), + PQresultErrorMessage(res2)); + PQclear(res2); + errors++; + dstring_terminate(&query); + return errors; + } + if (PQntuples(res2) != 1) + { + slon_log(SLON_ERROR, + "remoteWorkerThread_%d: \"%s\" %s returned %d tuples\n", + node->no_id, dstring_data(&query), + PQresStatus(rc), PQntuples(res2)); + PQclear(res2); + errors++; + dstring_terminate(&query); + return errors; + } + log_status = strtol(PQgetvalue(res2, 0, 0), NULL, 10); + PQclear(res2); + slon_log(SLON_DEBUG2, + "remoteWorkerThread_%d_%d: current remote log_status = %d\n", + node->no_id, provider->no_id, log_status); + dstring_terminate(&query); + /* + * See if we have to run the query through EXPLAIN first + */ + if (explain_thistime) + { + SlonDString explain_query; + + /* + * Let Postgres EXPLAIN the query plan for the current + * log selection query + */ + dstring_init(&explain_query); + slon_mkquery(&explain_query, "explain %s", + dstring_data(&(provider->helper_query))); + + res = PQexec(dbconn, dstring_data(&explain_query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { - dstring_free(&query); - dstring_free(&query2); - pthread_mutex_unlock(&(provider->helper_lock)); - pthread_exit(NULL); + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: \"%s\" %s", + node->no_id, provider->no_id, + dstring_data(&explain_query), + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&explain_query); + errors++; + return errors; } - if (provider->helper_status != SLON_WG_BUSY) + + slon_log(SLON_INFO, + "remoteWorkerThread_%d_%d: " + "Log selection query: %s\n", + node->no_id, provider->no_id, + dstring_data(&explain_query)); + slon_log(SLON_INFO, + "remoteWorkerThread_%d_%d: Query Plan:\n", + node->no_id, provider->no_id); + + ntuples = PQntuples(res); + for (tupno = 0; tupno < ntuples; tupno++) { - provider->helper_status = SLON_WG_IDLE; - pthread_mutex_unlock(&(provider->helper_lock)); - continue; + slon_log(SLON_INFO, + "remoteWorkerThread_%d_%d: PLAN %s\n", + node->no_id, provider->no_id, + PQgetvalue(res, tupno, 0)); } - - /* - * OK, we got work to do. - */ - dbconn = provider->conn->dbconn; - pthread_mutex_unlock(&(provider->helper_lock)); - - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: got work to do\n", + slon_log(SLON_INFO, + "remoteWorkerThread_%d_%d: PLAN_END\n", node->no_id, provider->no_id); + + PQclear(res); + dstring_free(&explain_query); + } + + gettimeofday(&tv_start, NULL); + first_fetch = true; + res = NULL; + + /* + * execute the COPY to read the log data. + */ + start_monitored_event(&pm); + res = PQexec(dbconn, dstring_data(&provider->helper_query)); + if( PQresultStatus(res) != PGRES_COPY_OUT) + { + errors++; + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error executing COPY OUT: \"%s\" %s", + node->no_id, provider->no_id, + dstring_data(&provider->helper_query), + PQresultErrorMessage(res)); + return errors; + } + monitor_provider_query(&pm); - errors = 0; - do - { - init_perfmon(&pm); - /* - * Start a transaction - */ - - (void) slon_mkquery(&query, "start transaction; " - "set enable_seqscan = off; " - "set enable_indexscan = on; "); - - start_monitored_event(&pm); - - if (query_execute(node, dbconn, &query) < 0) - { - errors++; - break; - } - monitor_subscriber_query (&pm); - - /* - * Get the current sl_log_status value - */ - (void) slon_mkquery(&query, "select last_value from %s.sl_log_status", - rtcfg_namespace); - - start_monitored_event(&pm); - res3 = PQexec(dbconn, dstring_data(&query)); - monitor_provider_query(&pm); - - rc = PQresultStatus(res3); - if (rc != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, - "remoteWorkerThread_%d: \"%s\" %s %s\n", - node->no_id, dstring_data(&query), - PQresStatus(rc), - PQresultErrorMessage(res3)); - PQclear(res3); - errors++; - break; - } - if (PQntuples(res3) != 1) - { - slon_log(SLON_ERROR, - "remoteWorkerThread_%d: \"%s\" %s returned %d tuples\n", - node->no_id, dstring_data(&query), - PQresStatus(rc), PQntuples(res3)); - PQclear(res3); - errors++; - break; - } - log_status = strtol(PQgetvalue(res3, 0, 0), NULL, 10); - PQclear(res3); - slon_log(SLON_DEBUG2, - "remoteWorkerThread_%d_%d: current remote log_status = %d\n", - node->no_id, provider->no_id, log_status); - - /* - * See if we have to run the query through EXPLAIN first - */ - if (explain_thistime) - { - SlonDString explain_query; - - /* - * Let Postgres EXPLAIN the query plan for the current - * log selection query - */ - dstring_init(&explain_query); - slon_mkquery(&explain_query, "explain %s", - dstring_data(&(provider->helper_query))); - - res = PQexec(dbconn, dstring_data(&explain_query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s", - node->no_id, provider->no_id, - dstring_data(&explain_query), - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&explain_query); - errors++; - break; - } - - slon_log(SLON_INFO, - "remoteWorkerThread_%d_%d: " - "Log selection query: %s\n", - node->no_id, provider->no_id, - dstring_data(&explain_query)); - slon_log(SLON_INFO, - "remoteWorkerThread_%d_%d: Query Plan:\n", - node->no_id, provider->no_id); - - ntuples = PQntuples(res); - for (tupno = 0; tupno < ntuples; tupno++) - { - slon_log(SLON_INFO, - "remoteWorkerThread_%d_%d: PLAN %s\n", - node->no_id, provider->no_id, - PQgetvalue(res, tupno, 0)); - } - slon_log(SLON_INFO, - "remoteWorkerThread_%d_%d: PLAN_END\n", - node->no_id, provider->no_id); - - PQclear(res); - dstring_free(&explain_query); - } - - gettimeofday(&tv_start, NULL); - first_fetch = true; - res = NULL; + /** + * execute the COPY on the local node to write the log data. + * + */ + dstring_init(©_in); + slon_mkquery(©_in,"COPY %s.\"sl_log_%d\" ( log_origin, " \ + "log_txid,log_tableid,log_actionseq,log_tablenspname, " \ + "log_tablerelname, log_cmdtype, log_cmdupdncols," \ + "log_cmdargs) FROM STDOUT", + rtcfg_namespace, wd->active_log_table); + + res2 = PQexec(local_conn,dstring_data(©_in)); \ + if ( PQresultStatus(res2) != PGRES_COPY_IN ) + { + + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error executing COPY IN: \"%s\" %s", + node->no_id, provider->no_id, + dstring_data(©_in), + PQresultErrorMessage(res2)); + errors++; + dstring_free(©_in); + PQclear(res2); + return errors; + + } + if (archive_dir) + { + SlonDString log_copy; + dstring_init(&log_copy); + slon_mkquery(&log_copy,"COPY %s.\"sl_log_archive\" ( log_origin, " \ + "log_txid,log_tableid,log_actionseq,log_tablenspname, " \ + "log_tablerelname, log_cmdtype, log_cmdupdncols," \ + "log_cmdargs) FROM STDIN;", + rtcfg_namespace); + archive_append_ds(node,&log_copy); + dstring_terminate(&log_copy); + - /* - * Open a cursor that reads the log data. - */ - start_monitored_event(&pm); - if (query_execute(node, dbconn, &(provider->helper_query)) < 0) - { + } + dstring_free(©_in); + tupno=0; + while (!errors) + { + rc = PQgetCopyData(dbconn,&buffer,0); + if (rc < 0) { + if ( rc == -2 ) { errors++; - break; + slon_log(SLON_ERROR,"remoteWorkerThread_%d_%d: error reading copy data: %s", + node->no_id, provider->no_id,PQerrorMessage(dbconn)); } - monitor_provider_query(&pm); - - (void) slon_mkquery(&query, "fetch %d from LOG; ", - SLON_DATA_FETCH_SIZE * SLON_COMMANDS_PER_LINE); - data_line_alloc = 0; - data_line_first = 0; - data_line_last = 0; - - res = NULL; - ntuples = 0; - tupno = 0; - - while (!errors) - { - /* - * Deliver filled line buffers to the worker process. - */ - if (data_line_last > data_line_first) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: deliver %d lines to worker\n", - node->no_id, provider->no_id, - data_line_last - data_line_first); - - pthread_mutex_lock(&(wd->workdata_lock)); - while (data_line_first < data_line_last) - { - DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail, - data_line[data_line_first]); - data_line_first++; - } - pthread_cond_signal(&(wd->repldata_cond)); - pthread_mutex_unlock(&(wd->workdata_lock)); - } - - /* - * If we cycled through all the allocated line buffers, reset - * the indexes. - */ - if (data_line_first == data_line_alloc) - { - data_line_alloc = 0; - data_line_first = 0; - data_line_last = 0; - } - - /* - * Make sure we are inside memory limits and that we have - * available line buffers. - */ - pthread_mutex_lock(&(wd->workdata_lock)); - if (data_line_alloc == 0 || - wd->workdata_largemem > sync_max_largemem) - { - /* - * First make sure that the overall memory usage is inside - * bounds. - */ - if (wd->workdata_largemem > sync_max_largemem) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: wait for oversize memory to free\n", - node->no_id, provider->no_id); - - while (wd->workdata_largemem > sync_max_largemem && - wd->workgroup_status == SLON_WG_BUSY) - { - pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); - } - if (wd->workgroup_status != SLON_WG_BUSY) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: abort operation\n", - node->no_id, provider->no_id); - errors++; - break; - } - } - - /* - * Second make sure that we have at least 1 line buffer. - */ - if (data_line_alloc == 0) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: allocate line buffers\n", - node->no_id, provider->no_id); - while (data_line_alloc == 0 && !errors) - { - while (wd->linepool_head == NULL && - wd->workgroup_status == SLON_WG_BUSY) - { - pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); - } - if (wd->workgroup_status != SLON_WG_BUSY) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: abort operation\n", - node->no_id, provider->no_id); - errors++; - break; - } - - /* - * While we are at it, we can as well allocate up - * to FETCH_SIZE buffers. - */ - while (data_line_alloc < SLON_DATA_FETCH_SIZE && - wd->linepool_head != NULL) - { - data_line[data_line_alloc] = wd->linepool_head; - DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail, - data_line[data_line_alloc]); - data_line_alloc++; - } - } - } - } - pthread_mutex_unlock(&(wd->workdata_lock)); - - /* - * We are within memory limits and have allocated line - * buffers. Make sure that we have log lines fetched. - */ - if (tupno >= ntuples) - { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: fetch from cursor\n", - node->no_id, provider->no_id); - if (res != NULL) - PQclear(res); - - start_monitored_event(&pm); - res = PQexec(dbconn, dstring_data(&query)); - monitor_provider_query(&pm); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s", - node->no_id, provider->no_id, - dstring_data(&query), - PQresultErrorMessage(res)); - errors++; - break; - } - if (first_fetch) - { - gettimeofday(&tv_now, NULL); - slon_log(SLON_DEBUG1, - "remoteHelperThread_%d_%d: %.3f seconds delay for first row\n", - node->no_id, provider->no_id, - TIMEVAL_DIFF(&tv_start, &tv_now)); - - first_fetch = false; - } - - ntuples = PQntuples(res); - tupno = 0; - - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: fetched %d log rows\n", - node->no_id, provider->no_id, ntuples); - } - - /* - * If there are no more tuples, we're done - */ - if (ntuples == 0) - break; - - /* - * Now move tuples from the fetch result into the line - * buffers. - */ - line_no = data_line_last++; - line_ncmds = 0; - - line = data_line[line_no]; - line->code = SLON_WGLC_ACTION; - line->provider = provider; - dstring_reset(&(line->data)); - dstring_reset(&(line->log)); - - while (tupno < ntuples && line_no < data_line_alloc) - { - char *log_origin = PQgetvalue(res, tupno, 0); - char *log_txid = PQgetvalue(res, tupno, 1); - int log_tableid = strtol(PQgetvalue(res, tupno, 2), - NULL, 10); - char *log_actionseq = PQgetvalue(res, tupno, 3); - char *log_cmdtype = PQgetvalue(res, tupno, 4); - int log_cmdsize = strtol(PQgetvalue(res, tupno, 5), - NULL, 10); - char *log_cmddata = PQgetvalue(res, tupno, 6); - int largemem = 0; - - tupno++; - - if (log_cmdsize >= sync_max_rowsize) - { - (void) slon_mkquery(&query2, - "select log_cmddata " - "from %s.sl_log_1 " - "where log_origin = '%s' " - " and log_txid = '%s' " - " and log_actionseq = '%s' " - "UNION ALL " - "select log_cmddata " - "from %s.sl_log_2 " - "where log_origin = '%s' " - " and log_txid = '%s' " - " and log_actionseq = '%s'", - rtcfg_namespace, - log_origin, log_txid, log_actionseq, - rtcfg_namespace, - log_origin, log_txid, log_actionseq); - start_monitored_event(&pm); - res2 = PQexec(dbconn, dstring_data(&query2)); - monitor_largetuples(&pm); - - if (PQresultStatus(res2) != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s", - node->no_id, provider->no_id, - dstring_data(&query), - PQresultErrorMessage(res2)); - PQclear(res2); - errors++; - break; - } - if (PQntuples(res2) != 1) - { - slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: large log_cmddata for actionseq %s not found\n", - node->no_id, provider->no_id, - log_actionseq); - PQclear(res2); - errors++; - break; - } - - log_cmddata = PQgetvalue(res2, 0, 0); - largemem = log_cmdsize; - } - - /* - * This can happen if the table belongs to a set that - * already has a better sync status than the event we're - * currently processing as a result from another SYNC - * occuring before we had started processing the copy_set. - */ - if (log_tableid >= wd->tab_fqname_size || - wd->tab_fqname[log_tableid] == NULL) - { - if (largemem > 0) - PQclear(res2); - continue; - } - - /* - * If we are forwarding this set, add the insert into - * sl_log_? - */ - if (wd->tab_forward[log_tableid]) - { - slon_appendquery(&(line->log), - "insert into %s.sl_log_%d " - " (log_origin, log_txid, log_tableid, " - " log_actionseq, log_cmdtype, " - " log_cmddata) values " - " ('%s', '%s', '%d', '%s', '%q', '%q');\n", - rtcfg_namespace, wd->active_log_table, - log_origin, log_txid, log_tableid, - log_actionseq, log_cmdtype, log_cmddata); - largemem *= 2; - } - - /* - * Add the actual replicating command to the line buffer - */ - line->line_largemem += largemem; - switch (*log_cmdtype) - { - case 'I': - slon_appendquery(&(line->data), - "insert into %s %s;\n", - wd->tab_fqname[log_tableid], - log_cmddata); - pm.num_inserts++; - break; - - case 'U': - slon_appendquery(&(line->data), - "update only %s set %s;\n", - wd->tab_fqname[log_tableid], - log_cmddata); - pm.num_updates++; - break; - - case 'D': - slon_appendquery(&(line->data), - "delete from only %s where %s;\n", - wd->tab_fqname[log_tableid], - log_cmddata); - pm.num_deletes++; - break; - case 'T': - slon_appendquery(&(line->data), - "%s;\n", - log_cmddata); - pm.num_truncates++; - break; - } - line_ncmds++; - - if (line_ncmds >= SLON_COMMANDS_PER_LINE) - { - if (data_line_last >= data_line_alloc) - { - if (largemem > 0) - PQclear(res2); - break; - } - - line_no = data_line_last++; - - line = data_line[line_no]; - line->code = SLON_WGLC_ACTION; - line->provider = provider; - dstring_reset(&(line->data)); - dstring_reset(&(line->log)); - - line_ncmds = 0; - } - - /* - * If this was a large log_cmddata entry (> - * sync_max_rowsize), add this to the memory usage of the - * workgroup and check if we are exceeding limits. - */ - if (largemem > 0) - { - PQclear(res2); - pthread_mutex_lock(&(wd->workdata_lock)); - wd->workdata_largemem += largemem; - if (wd->workdata_largemem >= sync_max_largemem) - { - /* - * This is it ... we exit the loop here and wait - * for the worker to apply enough of the large - * rows first. - */ - pthread_mutex_unlock(&(wd->workdata_lock)); - break; - } - pthread_mutex_unlock(&(wd->workdata_lock)); - } - } - - /* - * Move one line back if we actually ran out of tuples on an - * exact SLON_COMMANDS_PER_LINE boundary. - */ - if (line_ncmds == 0) - { - data_line_last--; - } - } /* Cursor returned EOF */ - } while (0); - - /* - * if there are still line buffers allocated, give them back. - */ - if (data_line_first < data_line_alloc) + break; + } + tupno++; + if (first_fetch) { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: return %d unused line buffers\n", + gettimeofday(&tv_now, NULL); + slon_log(SLON_DEBUG1, + "remoteWorkerThread_%d_%d: %.3f seconds delay for first row\n", node->no_id, provider->no_id, - data_line_alloc - data_line_first); - pthread_mutex_lock(&(wd->workdata_lock)); - while (data_line_first < data_line_alloc) - { - data_line_alloc--; - DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, - data_line[data_line_alloc]); - } - pthread_cond_broadcast(&(wd->linepool_cond)); - pthread_mutex_unlock(&(wd->workdata_lock)); - - data_line_alloc = 0; - data_line_first = 0; - data_line_last = 0; + TIMEVAL_DIFF(&tv_start, &tv_now)); + + first_fetch = false; } - - if (res != NULL) + rc2 = PQputCopyData(local_conn,buffer,rc); + if (rc2 < 0 ) { - PQclear(res); - res = NULL; + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error writing" \ + " to sl_log: %s\n", + node->no_id,provider->no_id, + PQerrorMessage(local_conn)); + errors++; + if(buffer) + PQfreemem(buffer); + break; } - /* - * Close the cursor and rollback the transaction. - */ - (void) slon_mkquery(&query, "close LOG; "); - if (query_execute(node, dbconn, &query) < 0) - errors++; - (void) slon_mkquery(&query, "rollback transaction; " - "set enable_seqscan = default; " - "set enable_indexscan = default; "); - if (query_execute(node, dbconn, &query) < 0) - errors++; + if(archive_dir) + archive_append_data(node,buffer,rc); + if(buffer) + PQfreemem(buffer); + + }/*errors*/ + rc2 = PQputCopyEnd(local_conn, NULL); + if (rc2 < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error ending copy" + " to sl_log:%s\n", + node->no_id,provider->no_id, + PQerrorMessage(local_conn)); + errors++; + } - gettimeofday(&tv_now, NULL); - slon_log(SLON_DEBUG1, - "remoteHelperThread_%d_%d: %.3f seconds until close cursor\n", - node->no_id, provider->no_id, - TIMEVAL_DIFF(&tv_start, &tv_now)); - - slon_log(SLON_DEBUG1, "remoteHelperThread_%d_%d: inserts=%d updates=%d deletes=%d truncates=%d\n", - node->no_id, provider->no_id, pm.num_inserts, pm.num_updates, pm.num_deletes, pm.num_truncates); - - slon_log(SLON_DEBUG1, - "remoteWorkerThread_%d: sync_helper timing: " - " pqexec (s/count)" - "- provider %.3f/%d " - "- subscriber %.3f/%d\n", - node->no_id, - pm.prov_query_t, pm.prov_query_c, - pm.subscr_query_t, pm.prov_query_c); - - slon_log(SLON_DEBUG1, - "remoteWorkerThread_%d: sync_helper timing: " - " large tuples %.3f/%d\n", - node->no_id, - pm.large_tuples_t, pm.large_tuples_c); + if(archive_dir) + { + archive_append_str(node,"\\."); + } + if (res != NULL) + { + PQclear(res); + res = NULL; + } + if( res2 != NULL) + { + PQclear(res2); + res2 = NULL; + } - /* - * Change our helper status to DONE and tell the worker thread about - * it. - */ - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: change helper thread status\n", - node->no_id, provider->no_id); - pthread_mutex_lock(&(provider->helper_lock)); - provider->helper_status = SLON_WG_DONE; - dstring_reset(&provider->helper_query); - pthread_mutex_unlock(&(provider->helper_lock)); + res = PQgetResult(dbconn); + if ( PQresultStatus(res) < 0 ) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY OUT: %s", + node->no_id, provider->no_id, + PQresultErrorMessage(res)); + errors++; + } + PQclear(res); - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: send DONE/ERROR line to worker\n", - node->no_id, provider->no_id); - pthread_mutex_lock(&(wd->workdata_lock)); - while (wd->linepool_head == NULL) - { - pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); - } - line = wd->linepool_head; - DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail, line); - if (errors) - line->code = SLON_WGLC_ERROR; - else - line->code = SLON_WGLC_DONE; - line->provider = provider; - DLLIST_ADD_HEAD(wd->repldata_head, wd->repldata_tail, line); - pthread_cond_signal(&(wd->repldata_cond)); - pthread_mutex_unlock(&(wd->workdata_lock)); + res = PQgetResult(local_conn); + if ( PQresultStatus(res) < 0 ) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY IN: %s", + node->no_id, provider->no_id, + PQresultErrorMessage(res)); + errors++; + } + PQclear(res); + res = NULL; - /* - * Wait for the whole workgroup to be done. - */ - pthread_mutex_lock(&(provider->helper_lock)); - while (provider->helper_status == SLON_WG_DONE) - { - slon_log(SLON_DEBUG3, "remoteHelperThread_%d_%d: " - "waiting for workgroup to finish\n", - node->no_id, provider->no_id); + if (errors) + slon_log(SLON_ERROR, + "remoteWorkerThread_%d_%d: failed SYNC's log selection query was '%s'\n", + node->no_id, provider->no_id, + dstring_data(&(provider->helper_query))); + + dstring_init(&query); + (void) slon_mkquery(&query, "rollback transaction; " + "set enable_seqscan = default; " + "set enable_indexscan = default; "); + if (query_execute(node, dbconn, &query) < 0) + errors++; + + gettimeofday(&tv_now, NULL); + slon_log(SLON_DEBUG1, + "remoteWorkerThread_%d_%d: %.3f seconds until close cursor\n", + node->no_id, provider->no_id, + TIMEVAL_DIFF(&tv_start, &tv_now)); + slon_log(SLON_DEBUG1,"remoteWorkerThread_%d_%d: rows=%d\n", + node->no_id,provider->no_id,tupno); - pthread_cond_wait(&(provider->helper_cond), &(provider->helper_lock)); - } - pthread_mutex_unlock(&(provider->helper_lock)); - } + slon_log(SLON_DEBUG1, + "remoteWorkerThread_%d: sync_helper timing: " + " pqexec (s/count)" + "- provider %.3f/%d " + "- subscriber %.3f/%d\n", + node->no_id, + pm.prov_query_t, pm.prov_query_c, + pm.subscr_query_t, pm.prov_query_c); + + slon_log(SLON_DEBUG4, + "remoteWorkerThread_%d_%d: sync_helper done\n", + node->no_id, provider->no_id); + return errors; } /* ---------- @@ -5557,7 +4835,6 @@ archive_open(SlonNode *node, char *seqbuf, PGconn *dbconn) node->no_id, node->archive_temp, strerror(errno)); return -1; } - rc = fprintf(node->archive_fp, "------------------------------------------------------------------\n" "-- Slony-I log shipping archive\n" @@ -6131,10 +5408,3 @@ static void monitor_subscriber_iud(PerfMon *perf_info) { (perf_info->subscr_iud__c) ++; } -static void monitor_largetuples(PerfMon *perf_info) { - double diff; - gettimeofday(&(perf_info->now_t), NULL); - diff = TIMEVAL_DIFF(&(perf_info->prev_t), &(perf_info->now_t)); - (perf_info->large_tuples_t) += diff; - (perf_info->large_tuples_c) ++; -} diff --git a/src/slon/runtime_config.c b/src/slon/runtime_config.c index 7eb98688..516a761e 100644 --- a/src/slon/runtime_config.c +++ b/src/slon/runtime_config.c @@ -147,6 +147,7 @@ rtcfg_storeNode(int no_id, char *no_comment) node->no_id = no_id; node->no_active = false; node->no_comment = strdup(no_comment); + node->last_snapshot = strdup("1:1:"); pthread_mutex_init(&(node->message_lock), NULL); pthread_cond_init(&(node->message_cond), NULL); @@ -222,6 +223,63 @@ rtcfg_getNodeLastEvent(int no_id) } +/* ---------- + * rtcfg_setNodeLastSnapshot() + * + * Set the last_snapshot field in the node runtime structure. + * ---------- + */ +void +rtcfg_setNodeLastSnapshot(int no_id, char *snapshot) +{ + SlonNode *node; + + if (snapshot == NULL || strcmp(snapshot, "") == 0) + snapshot = "1:1:"; + + rtcfg_lock(); + if ((node = rtcfg_findNode(no_id)) != NULL) + { + if (node->last_snapshot != NULL) + free (node->last_snapshot); + + node->last_snapshot = strdup(snapshot); + } + + rtcfg_unlock(); + + slon_log(SLON_DEBUG2, + "setNodeLastSnapshot: no_id=%d snapshot='%s'\n", + no_id, snapshot); +} + + +/* ---------- + * rtcfg_getNodeLastSnapshot + * + * Read the nodes last_snapshot field + * ---------- + */ +char * +rtcfg_getNodeLastSnapshot(int no_id) +{ + SlonNode *node; + char *retval; + + rtcfg_lock(); + if ((node = rtcfg_findNode(no_id)) != NULL) + { + retval = node->last_snapshot; + } + else + retval = NULL; + + rtcfg_unlock(); + + return retval; +} + + /* ---------- * rtcfg_enableNode * ---------- diff --git a/src/slon/slon.c b/src/slon/slon.c index ef9d996e..28286fb9 100644 --- a/src/slon/slon.c +++ b/src/slon/slon.c @@ -548,10 +548,18 @@ SlonMain(void) "select no_id, no_active, no_comment, " " (select coalesce(max(con_seqno),0) from %s.sl_confirm " " where con_origin = no_id and con_received = %d) " - " as last_event " + " as last_event, " + " (select ev_snapshot from %s.sl_event " + " where ev_origin = no_id " + " and ev_seqno = (select max(ev_seqno) " + " from %s.sl_event " + " where ev_origin = no_id " + " and ev_type = 'SYNC')) as last_snapshot " "from %s.sl_node " "order by no_id; ", - rtcfg_namespace, rtcfg_nodeid, rtcfg_namespace); + rtcfg_namespace, rtcfg_nodeid, + rtcfg_namespace, rtcfg_namespace, + rtcfg_namespace); res = PQexec(startup_conn, dstring_data(&query)); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -584,6 +592,7 @@ SlonMain(void) slon_scanint64(PQgetvalue(res, i, 3), &last_event); rtcfg_storeNode(no_id, no_comment); rtcfg_setNodeLastEvent(no_id, last_event); + rtcfg_setNodeLastSnapshot(no_id, PQgetvalue(res, i, 4)); /* * If it is active, remember for activation just before we start diff --git a/src/slon/slon.h b/src/slon/slon.h index 173d9fe7..9ee90065 100644 --- a/src/slon/slon.h +++ b/src/slon/slon.h @@ -120,6 +120,7 @@ struct SlonNode_s int pa_connretry; /* connection retry interval */ int64 last_event; /* last event we have received */ + char *last_snapshot; /* snapshot of last sync event */ SlonThreadStatus listen_status; /* status of the listen thread */ pthread_t listen_thread; /* thread id of listen thread */ @@ -460,6 +461,8 @@ extern void rtcfg_disableNode(int no_id); extern SlonNode *rtcfg_findNode(int no_id); extern int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq); extern int64 rtcfg_getNodeLastEvent(int no_id); +extern void rtcfg_setNodeLastSnapshot(int no_id, char *snapshot); +extern char *rtcfg_getNodeLastSnapshot(int no_id); extern void rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry); @@ -557,8 +560,6 @@ extern void *remoteListenThread_main(void *cdata); * ---------- */ extern int sync_group_maxsize; -extern int sync_max_rowsize; -extern int sync_max_largemem; extern int explain_interval; diff --git a/src/slonik/parser.y b/src/slonik/parser.y index 4852de7f..4290a082 100644 --- a/src/slonik/parser.y +++ b/src/slonik/parser.y @@ -1306,6 +1306,7 @@ stmt_ddl_script : lno K_EXECUTE K_SCRIPT option_list STMT_OPTION_INT( O_SET_ID, -1 ), STMT_OPTION_STR( O_FILENAME, NULL ), STMT_OPTION_INT( O_EVENT_NODE, -1 ), + STMT_OPTION_STR( O_EXECUTE_ONLY_ON, NULL ), STMT_OPTION_INT( O_EXECUTE_ONLY_ON, -1 ), STMT_OPTION_END }; @@ -1322,7 +1323,8 @@ stmt_ddl_script : lno K_EXECUTE K_SCRIPT option_list new->ddl_setid = opt[0].ival; new->ddl_fname = opt[1].str; new->ev_origin = opt[2].ival; - new->only_on_node = opt[3].ival; + new->only_on_nodes = opt[3].str; + new->only_on_node = opt[4].ival; new->ddl_fd = NULL; } else @@ -1696,6 +1698,11 @@ option_list_item : K_ID '=' option_item_id $5->opt_code = O_EXECUTE_ONLY_ON; $$ = $5; } + | K_EXECUTE K_ONLY K_ON '=' option_item_literal + { + $5->opt_code = O_EXECUTE_ONLY_ON; + $$ = $5; + } | K_SECONDS '=' option_item_id { $3->opt_code = O_SECONDS; diff --git a/src/slonik/slonik.c b/src/slonik/slonik.c index f43e397f..f6ae536d 100644 --- a/src/slonik/slonik.c +++ b/src/slonik/slonik.c @@ -994,15 +994,6 @@ script_check_stmts(SlonikScript * script, SlonikStmt * hdr) SlonikStmt_ddl_script *stmt = (SlonikStmt_ddl_script *) hdr; - if ((stmt->only_on_node > -1) && (stmt->ev_origin > -1) - && (stmt->ev_origin != stmt->only_on_node)) { - printf ("If ONLY ON NODE is given, " - "EVENT ORIGIN must be the same node"); - errors++; - } - if (stmt->only_on_node > -1) - stmt->ev_origin = stmt->only_on_node; - if (stmt->ev_origin < 0) { printf("%s:%d: Error: require EVENT NODE\n", @@ -1023,6 +1014,13 @@ script_check_stmts(SlonikScript * script, SlonikStmt * hdr) hdr->stmt_filename, hdr->stmt_lno); errors++; } + if ((stmt->only_on_node > 0) && (stmt->only_on_nodes != NULL)) + { + printf("%s:%d: Error: " + "cannot specify singular node as well as node list\n", + hdr->stmt_filename, hdr->stmt_lno); + errors++; + } if (script_check_adminfo(hdr, stmt->ev_origin) < 0) errors++; @@ -4535,7 +4533,7 @@ int slonik_ddl_script(SlonikStmt_ddl_script * stmt) { SlonikAdmInfo *adminfo1; - SlonDString query; + SlonDString query, equery; SlonDString script; int rc; int num_statements = -1, stmtno; @@ -4548,8 +4546,6 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) #define PARMCOUNT 1 const char *params[PARMCOUNT]; - int paramlens[PARMCOUNT]; - int paramfmts[PARMCOUNT]; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->ev_origin); if (adminfo1 == NULL) @@ -4565,7 +4561,7 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) while (fgets(rex1, 256, stmt->ddl_fd) != NULL) { rc = strlen(rex1); - rex1[rc] = '\0'; + rex1[rc] = '\0'; replace_token(rex3, rex1, "@CLUSTERNAME@", stmt->hdr.script->clustername); replace_token(rex4, rex3, "@MODULEVERSION@", SLONY_I_VERSION_STRING); replace_token(buf, rex4, "@NAMESPACE@", rex2); @@ -4574,23 +4570,23 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) } dstring_terminate(&script); - dstring_init(&query); - slon_mkquery(&query, - "lock table \"_%s\".sl_event_lock, \"_%s\".sl_config_lock;" - "select \"_%s\".ddlScript_prepare(%d, %d); ", - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->hdr.script->clustername, - stmt->ddl_setid, /* dstring_data(&script), */ - stmt->only_on_node); - - if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, - stmt->hdr.script,auto_wait_disabled) < 0) - { - dstring_free(&query); - return -1; + /* This prepares the statement that will be run over and over for each DDL statement */ + dstring_init(&equery); + if ((stmt->only_on_nodes == NULL) && (stmt->only_on_node < 0)) { + slon_mkquery(&equery, + "select \"_%s\".ddlCapture($1, NULL::text);", + stmt->hdr.script->clustername); + } else { + if (stmt->only_on_node > 0) { + slon_mkquery(&equery, + "select \"_%s\".ddlCapture($1, '%d');", + stmt->hdr.script->clustername, stmt->only_on_node); + } else { /* stmt->only_on_nodes is populated */ + slon_mkquery(&equery, + "select \"_%s\".ddlCapture($1, '%s');", + stmt->hdr.script->clustername, stmt->only_on_nodes); + } } - /* Split the script into a series of SQL statements - each needs to be submitted separately */ num_statements = scan_for_statements (dstring_data(&script)); @@ -4599,10 +4595,12 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) { printf("DDL - number of statements invalid - %d not between 0 and %d\n", num_statements, MAXSTATEMENTS); return -1; - } + } + dstring_init(&query); for (stmtno=0; stmtno < num_statements; stmtno++) { int startpos, endpos; char *dest; + PGresult *res1; if (stmtno == 0) startpos = 0; else @@ -4618,29 +4616,20 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) slon_mkquery(&query, "%s", dest); free(dest); - if (db_exec_command((SlonikStmt *)stmt, adminfo1, &query) < 0) + params[PARMCOUNT-1] = dstring_data(&query); + + res1 = PQexecParams(adminfo1->dbconn, dstring_data(&equery), 1, NULL, params, NULL, NULL, 0); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) { - dstring_free(&query); - return -1; + fprintf(stderr, "%s [%s] - %s", + PQresStatus(PQresultStatus(res1)), + dstring_data(&query), PQresultErrorMessage(res1)); + PQclear(res1); + return -1; } + PQclear(res1); } - - slon_mkquery(&query, "select \"_%s\".ddlScript_complete(%d, $1::text, %d); ", - stmt->hdr.script->clustername, - stmt->ddl_setid, - stmt->only_on_node); - - paramlens[PARMCOUNT-1] = 0; - paramfmts[PARMCOUNT-1] = 0; - params[PARMCOUNT-1] = dstring_data(&script); - - if (db_exec_evcommand_p((SlonikStmt *)stmt, adminfo1, &query, - PARMCOUNT, NULL, params, paramlens, paramfmts, 0) < 0) - { - dstring_free(&query); - return -1; - } - + dstring_free(&equery); dstring_free(&script); dstring_free(&query); return 0; diff --git a/src/slonik/slonik.h b/src/slonik/slonik.h index b26adbbf..62ba26e8 100644 --- a/src/slonik/slonik.h +++ b/src/slonik/slonik.h @@ -407,7 +407,8 @@ struct SlonikStmt_ddl_script_s int ddl_setid; char *ddl_fname; int ev_origin; - int only_on_node; + char *only_on_nodes; + int only_on_node; FILE *ddl_fd; }; -- 2.39.5