PGDLLEXPORT Datum bdr_get_remote_nodeinfo(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum bdr_test_replication_connection(PG_FUNCTION_ARGS);
+PGDLLEXPORT Datum bdr_test_remote_connectback(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(bdr_get_remote_nodeinfo);
PG_FUNCTION_INFO_V1(bdr_test_replication_connection);
+PG_FUNCTION_INFO_V1(bdr_test_remote_connectback);
/*
* Make standard postgres connection, ERROR on failure.
PG_RETURN_DATUM(HeapTupleGetDatum(returnTuple));
}
+
+void
+bdr_test_remote_connectback_internal(PGconn *conn,
+ struct remote_node_info *ri, const char *my_dsn)
+{
+ PGresult *res;
+ int i;
+ char *remote_bdr_version_str;
+ const char * mydsn_values[1];
+ Oid mydsn_types[1] = { TEXTOID };
+
+ mydsn_values[0] = my_dsn;
+
+ /* Make sure BDR is actually present and active on the remote */
+ bdr_ensure_ext_installed(conn);
+
+ /*
+ * Ask the remote to connect back to us in replication mode, then
+ * discard the results.
+ */
+ res = PQexecParams(conn, "SELECT sysid, timeline, dboid "
+ "FROM bdr.bdr_test_replication_connection($1)",
+ 1, mydsn_types, mydsn_values, NULL, NULL, 0);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ /* TODO clone remote error to local */
+ ereport(ERROR,
+ (errmsg("connection from remote back to local in replication mode failed"),
+ errdetail("remote reported: %s", PQerrorMessage(conn))));
+ }
+
+ PQclear(res);
+
+ /*
+ * Acquire bdr_get_remote_nodeinfo's results from running it on the remote
+ * node to connect back to us.
+ */
+ res = PQexecParams(conn, "SELECT sysid, timeline, dboid, variant, version, "
+ " version_num, min_remote_version_num, is_superuser "
+ "FROM bdr.bdr_get_remote_nodeinfo($1)",
+ 1, mydsn_types, mydsn_values, NULL, NULL, 0);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ /* TODO clone remote error to local */
+ ereport(ERROR,
+ (errmsg("connection from remote back to local failed"),
+ errdetail("remote reported: %s", PQerrorMessage(conn))));
+ }
+
+ Assert(PQnfields(res) == 8);
+
+ if (PQntuples(res) != 1)
+ elog(ERROR, "Got %d tuples instead of expected 1", PQntuples(res));
+
+ for (i = 0; i < 8; i++)
+ {
+ if (PQgetisnull(res, 0, i))
+ elog(ERROR, "Unexpectedly null field %s", PQfname(res, i));
+ }
+
+ ri->sysid_str = pstrdup(PQgetvalue(res, 0, 0));
+
+ if (sscanf(ri->sysid_str, UINT64_FORMAT, &ri->sysid) != 1)
+ elog(ERROR, "could not parse sysid %s", ri->sysid_str);
+
+ ri->timeline = DatumGetObjectId(
+ DirectFunctionCall1(oidin, CStringGetDatum(PQgetvalue(res, 0, 1))));
+ ri->dboid = DatumGetObjectId(
+ DirectFunctionCall1(oidin, CStringGetDatum(PQgetvalue(res, 0, 2))));
+ ri->variant = pstrdup(PQgetvalue(res, 0, 3));
+ remote_bdr_version_str = PQgetvalue(res, 0, 4);
+ ri->version = pstrdup(remote_bdr_version_str);
+ ri->version_num = atoi(PQgetvalue(res, 0, 5));
+ ri->min_remote_version_num = atoi(PQgetvalue(res, 0, 6));
+ ri->is_superuser = DatumGetBool(
+ DirectFunctionCall1(boolin, CStringGetDatum(PQgetvalue(res, 0, 7))));
+
+ PQclear(res);
+}
+
+/*
+ * Establish a connection to a remote node and use that connection to connect
+ * back to the local node in both replication and non-replication modes.
+ *
+ * This is used during setup to make sure the local node is useable.
+ *
+ * Reports the same data as bdr_get_remote_nodeinfo, but it's reported
+ * about the local node via the remote node.
+ */
+Datum
+bdr_test_remote_connectback(PG_FUNCTION_ARGS)
+{
+ const char *remote_node_dsn;
+ const char *my_dsn;
+ Datum values[8];
+ bool isnull[8] = {false, false, false, false, false, false, false, false};
+ TupleDesc tupleDesc;
+ HeapTuple returnTuple;
+ PGconn *conn;
+
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
+ elog(ERROR, "both arguments must be non-null");
+
+ remote_node_dsn = text_to_cstring(PG_GETARG_TEXT_P(0));
+ my_dsn = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+ if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ conn = bdr_connect_nonrepl(remote_node_dsn, "bdrconnectback");
+
+ PG_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+ PointerGetDatum(&conn));
+ {
+ struct remote_node_info ri;
+
+ bdr_test_remote_connectback_internal(conn, &ri, my_dsn);
+
+ values[0] = CStringGetTextDatum(ri.sysid_str);
+ values[1] = ObjectIdGetDatum(ri.timeline);
+ values[2] = ObjectIdGetDatum(ri.dboid);
+ values[3] = CStringGetTextDatum(ri.variant);
+ values[4] = CStringGetTextDatum(ri.version);
+ values[5] = Int32GetDatum(ri.version_num);
+ values[6] = Int32GetDatum(ri.min_remote_version_num);
+ values[7] = BoolGetDatum(ri.is_superuser);
+
+ returnTuple = heap_form_tuple(tupleDesc, values, isnull);
+
+ free_remote_node_info(&ri);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+ PointerGetDatum(&conn));
+
+ PQfinish(conn);
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(returnTuple));
+}