int bdr_max_workers;
static bool bdr_skip_ddl_replication;
bool bdr_skip_ddl_locking;
-/*
- * These globals are valid only for apply bgworkers, not for
- * bdr running in the postmaster or for per-db workers.
- *
- * TODO: move into bdr_apply.c when bdr_apply_main moved.
- */
-extern BdrApplyWorker *bdr_apply_worker;
-extern BdrConnectionConfig *bdr_apply_config;
/* shmem init hook to chain to on startup, if any */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
* - set search_path
*
*/
-static void
+void
bdr_worker_init(char *dbname)
{
Assert(IsBackgroundWorker);
return streamConn;
}
-/*
- * Entry point for a BDR apply worker.
- *
- * Responsible for establishing a replication connection, creating slots
- * and starting the reply loop.
- *
- * TODO: move to bdr_apply.c
- */
-void
-bdr_apply_main(Datum main_arg)
-{
- PGconn *streamConn;
- PGresult *res;
- StringInfoData query;
- char *sqlstate;
- RepNodeId replication_identifier;
- XLogRecPtr start_from;
- NameData slot_name;
- BdrWorker *bdr_worker_slot;
-
- Assert(IsBackgroundWorker);
-
- initStringInfo(&query);
-
- bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
- Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
- bdr_apply_worker = &bdr_worker_slot->worker_data.apply_worker;
- bdr_worker_type = BDR_WORKER_APPLY;
-
- bdr_apply_config = bdr_connection_configs[bdr_apply_worker->connection_config_idx];
- Assert(bdr_apply_config != NULL);
-
- bdr_worker_init(bdr_apply_config->dbname);
-
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr apply top-level resource owner");
- bdr_saved_resowner = CurrentResourceOwner;
-
- elog(DEBUG1, "%s initialized on %s",
- MyBgworkerEntry->bgw_name, bdr_apply_config->dbname);
-
- /* Set our local application_name for our SPI connections */
- resetStringInfo(&query);
- appendStringInfo(&query, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "apply");
- if (bdr_apply_worker->forward_changesets)
- appendStringInfoString(&query, " catchup");
-
- if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
- appendStringInfo(&query, " up to %X/%X",
- (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
- (uint32)bdr_apply_worker->replay_stop_lsn);
-
- SetConfigOption("application_name", query.data, PGC_USERSET, PGC_S_SESSION);
-
- /* Form an application_name string to send to the remote end */
- resetStringInfo(&query);
- appendStringInfoString(&query, "receive");
-
- if (bdr_apply_worker->forward_changesets)
- appendStringInfoString(&query, " catchup");
-
- if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
- appendStringInfo(&query, " up to %X/%X",
- (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
- (uint32)bdr_apply_worker->replay_stop_lsn);
-
- /* Make the replication connection to the remote end */
- streamConn = bdr_establish_connection_and_slot(bdr_apply_config,
- query.data, &slot_name, &origin_sysid, &origin_timeline,
- &origin_dboid, &replication_identifier, NULL);
-
-
- /* initialize stat subsystem, our id won't change further */
- bdr_count_set_current_node(replication_identifier);
-
- /*
- * tell replication_identifier.c about our identifier so it can cache the
- * search in shared memory.
- */
- SetupCachedReplicationIdentifier(replication_identifier);
-
- /*
- * Check whether we already replayed something so we don't replay it
- * multiple times.
- */
-
- start_from = RemoteCommitFromCachedReplicationIdentifier();
-
- elog(INFO, "starting up replication from %u at %X/%X",
- replication_identifier,
- (uint32) (start_from >> 32), (uint32) start_from);
-
- resetStringInfo(&query);
- appendStringInfo(&query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
- NameStr(slot_name), (uint32) (start_from >> 32),
- (uint32) start_from);
- appendStringInfo(&query, "pg_version '%u'", PG_VERSION_NUM);
- appendStringInfo(&query, ", pg_catversion '%u'", CATALOG_VERSION_NO);
- appendStringInfo(&query, ", bdr_version '%u'", BDR_VERSION_NUM);
- appendStringInfo(&query, ", bdr_variant '%s'", BDR_VARIANT);
- appendStringInfo(&query, ", min_bdr_version '%u'", BDR_MIN_REMOTE_VERSION_NUM);
- appendStringInfo(&query, ", sizeof_int '%zu'", sizeof(int));
- appendStringInfo(&query, ", sizeof_long '%zu'", sizeof(long));
- appendStringInfo(&query, ", sizeof_datum '%zu'", sizeof(Datum));
- appendStringInfo(&query, ", maxalign '%d'", MAXIMUM_ALIGNOF);
- appendStringInfo(&query, ", float4_byval '%d'", bdr_get_float4byval());
- appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
- appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
- appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
- appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
- if (bdr_apply_config->replication_sets != NULL &&
- bdr_apply_config->replication_sets[0] != 0)
- appendStringInfo(&query, ", replication_sets '%s'",
- bdr_apply_config->replication_sets);
-
- appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
- if (bdr_apply_worker->forward_changesets)
- appendStringInfo(&query, ", forward_changesets 't'");
-
- appendStringInfoChar(&query, ')');
-
- elog(DEBUG3, "Sending replication command: %s", query.data);
-
- res = PQexec(streamConn, query.data);
-
- sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
-
- if (PQresultStatus(res) != PGRES_COPY_BOTH)
- {
- elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
- query.data, PQresultErrorMessage(res), sqlstate);
- }
- PQclear(res);
-
- replication_origin_id = replication_identifier;
-
- bdr_conflict_logging_startup();
-
- PG_TRY();
- {
- bdr_apply_work(streamConn);
- }
- PG_CATCH();
- {
- if (IsTransactionState())
- bdr_count_rollback();
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- proc_exit(0);
-}
-
/*
* In postmaster, at shared_preload_libaries time, create the GUCs for a
* connection. They'll be accessed by the apply worker that uses these GUCs
extern Oid bdr_lookup_relid(const char *relname, Oid schema_oid);
/* apply support */
-extern void bdr_process_remote_action(StringInfo s);
extern void bdr_fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid,
TimeLineID *tli, Oid *remote_dboid);
extern RepNodeId bdr_fetch_node_id_via_sysid(uint64 sysid, TimeLineID tli, Oid dboid);
extern void bdr_locks_check_query(void);
/* background workers */
+extern void bdr_worker_init(char* dbname);
extern void bdr_apply_main(Datum main_arg);
-extern void bdr_apply_work(PGconn* streamConn);
/* manipulation of bdr catalogs */
extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Oid dboid);
#include "access/relscan.h"
#include "access/xact.h"
+#include "catalog/catversion.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "libpq/pqformat.h"
+#include "mb/pg_wchar.h"
+
#include "parser/parse_type.h"
#include "replication/logical.h"
dlist_head bdr_lsn_association = DLIST_STATIC_INIT(bdr_lsn_association);
static BDRRelation *read_rel(StringInfo s, LOCKMODE mode);
-extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
+static void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
static void check_apply_update(BdrConflictType conflict_type,
RepNodeId local_node_id, TimestampTz local_ts,
#endif
}
-void
+static void
read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup)
{
TupleDesc desc = RelationGetDescr(rel->rel);
*
* May set got_SIGTERM to stop processing before next record.
*/
-void
+static void
bdr_process_remote_action(StringInfo s)
{
char action = pq_getmsgbyte(s);
/*
* The actual main loop of a BDR apply worker.
*/
-void
+static void
bdr_apply_work(PGconn* streamConn)
{
int fd;
MemoryContextResetAndDeleteChildren(MessageContext);
}
}
+
+/*
+ * Entry point for a BDR apply worker.
+ *
+ * Responsible for establishing a replication connection, creating slots
+ * and starting the reply loop.
+ */
+void
+bdr_apply_main(Datum main_arg)
+{
+ PGconn *streamConn;
+ PGresult *res;
+ StringInfoData query;
+ char *sqlstate;
+ RepNodeId replication_identifier;
+ XLogRecPtr start_from;
+ NameData slot_name;
+ BdrWorker *bdr_worker_slot;
+
+ Assert(IsBackgroundWorker);
+
+ initStringInfo(&query);
+
+ bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+ Assert(bdr_worker_slot->worker_type == BDR_WORKER_APPLY);
+ bdr_apply_worker = &bdr_worker_slot->worker_data.apply_worker;
+ bdr_worker_type = BDR_WORKER_APPLY;
+
+ bdr_apply_config = bdr_connection_configs[bdr_apply_worker->connection_config_idx];
+ Assert(bdr_apply_config != NULL);
+
+ bdr_worker_init(bdr_apply_config->dbname);
+
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr apply top-level resource owner");
+ bdr_saved_resowner = CurrentResourceOwner;
+
+ elog(DEBUG1, "%s initialized on %s",
+ MyBgworkerEntry->bgw_name, bdr_apply_config->dbname);
+
+ /* Set our local application_name for our SPI connections */
+ resetStringInfo(&query);
+ appendStringInfo(&query, BDR_LOCALID_FORMAT": %s", BDR_LOCALID_FORMAT_ARGS, "apply");
+ if (bdr_apply_worker->forward_changesets)
+ appendStringInfoString(&query, " catchup");
+
+ if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
+ appendStringInfo(&query, " up to %X/%X",
+ (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
+ (uint32)bdr_apply_worker->replay_stop_lsn);
+
+ SetConfigOption("application_name", query.data, PGC_USERSET, PGC_S_SESSION);
+
+ /* Form an application_name string to send to the remote end */
+ resetStringInfo(&query);
+ appendStringInfoString(&query, "receive");
+
+ if (bdr_apply_worker->forward_changesets)
+ appendStringInfoString(&query, " catchup");
+
+ if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr)
+ appendStringInfo(&query, " up to %X/%X",
+ (uint32)(bdr_apply_worker->replay_stop_lsn >> 32),
+ (uint32)bdr_apply_worker->replay_stop_lsn);
+
+ /* Make the replication connection to the remote end */
+ streamConn = bdr_establish_connection_and_slot(bdr_apply_config,
+ query.data, &slot_name, &origin_sysid, &origin_timeline,
+ &origin_dboid, &replication_identifier, NULL);
+
+
+ /* initialize stat subsystem, our id won't change further */
+ bdr_count_set_current_node(replication_identifier);
+
+ /*
+ * tell replication_identifier.c about our identifier so it can cache the
+ * search in shared memory.
+ */
+ SetupCachedReplicationIdentifier(replication_identifier);
+
+ /*
+ * Check whether we already replayed something so we don't replay it
+ * multiple times.
+ */
+
+ start_from = RemoteCommitFromCachedReplicationIdentifier();
+
+ elog(INFO, "starting up replication from %u at %X/%X",
+ replication_identifier,
+ (uint32) (start_from >> 32), (uint32) start_from);
+
+ resetStringInfo(&query);
+ appendStringInfo(&query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
+ NameStr(slot_name), (uint32) (start_from >> 32),
+ (uint32) start_from);
+ appendStringInfo(&query, "pg_version '%u'", PG_VERSION_NUM);
+ appendStringInfo(&query, ", pg_catversion '%u'", CATALOG_VERSION_NO);
+ appendStringInfo(&query, ", bdr_version '%u'", BDR_VERSION_NUM);
+ appendStringInfo(&query, ", bdr_variant '%s'", BDR_VARIANT);
+ appendStringInfo(&query, ", min_bdr_version '%u'", BDR_MIN_REMOTE_VERSION_NUM);
+ appendStringInfo(&query, ", sizeof_int '%zu'", sizeof(int));
+ appendStringInfo(&query, ", sizeof_long '%zu'", sizeof(long));
+ appendStringInfo(&query, ", sizeof_datum '%zu'", sizeof(Datum));
+ appendStringInfo(&query, ", maxalign '%d'", MAXIMUM_ALIGNOF);
+ appendStringInfo(&query, ", float4_byval '%d'", bdr_get_float4byval());
+ appendStringInfo(&query, ", float8_byval '%d'", bdr_get_float8byval());
+ appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
+ appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
+ appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+ if (bdr_apply_config->replication_sets != NULL &&
+ bdr_apply_config->replication_sets[0] != 0)
+ appendStringInfo(&query, ", replication_sets '%s'",
+ bdr_apply_config->replication_sets);
+
+ appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+ if (bdr_apply_worker->forward_changesets)
+ appendStringInfo(&query, ", forward_changesets 't'");
+
+ appendStringInfoChar(&query, ')');
+
+ elog(DEBUG3, "Sending replication command: %s", query.data);
+
+ res = PQexec(streamConn, query.data);
+
+ sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
+ query.data, PQresultErrorMessage(res), sqlstate);
+ }
+ PQclear(res);
+
+ replication_origin_id = replication_identifier;
+
+ bdr_conflict_logging_startup();
+
+ PG_TRY();
+ {
+ bdr_apply_work(streamConn);
+ }
+ PG_CATCH();
+ {
+ if (IsTransactionState())
+ bdr_count_rollback();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ proc_exit(0);
+}