return true;
}
+static void
+flush_connection(ProxyFunction *func, ProxyConnection *conn)
+{
+ int res;
+
+ /* flush it down */
+ res = PQflush(conn->db);
+
+ /* set actual state */
+ if (res > 0)
+ conn->state = C_QUERY_WRITE;
+ else if (res == 0)
+ conn->state = C_QUERY_READ;
+ else
+ conn_error(func, conn, "PQflush");
+}
+
/*
* Small sanity checking for new connections.
*
* - Does there happen any encoding conversations?
* - Difference in standard_conforming_strings.
*/
-static void
-check_new_connection(ProxyConnection *conn)
+static int
+tune_connection(ProxyFunction *func, ProxyConnection *conn)
{
- const char *px_client,
- *px_server,
- *dst_client,
- *dst_server;
+ ProxyConfig *cf = &func->cur_cluster->config;
+ const char *this_enc, *dst_enc;
const char *q_server;
const char *dst_ver;
int srvquotes = 0;
+ StringInfo sql = NULL;
dst_ver = PQparameterStatus(conn->db, "server_version");
conn->same_ver = cmp_branch(dst_ver, PG_VERSION);
+ /*
+ * sync standard_conforming_strings
+ */
q_server = PQparameterStatus(conn->db, "standard_conforming_strings");
if (q_server && strcasecmp(q_server, "off") != 0)
srvquotes = 1;
if (standard_conforming_strings != srvquotes)
- elog(WARNING, "PL/Proxy: different setting of"
- " standard_conforming_strings");
-
- px_client = pg_get_client_encoding_name();
- px_server = GetDatabaseEncodingName();
- dst_client = PQparameterStatus(conn->db, "client_encoding");
- dst_server = PQparameterStatus(conn->db, "server_encoding");
- if (strcmp(px_client, px_server)
- || strcmp(px_client, dst_client)
- || (dst_server && strcmp(px_client, dst_server)))
{
- elog(WARNING, "PL/Proxy: encoding mismatch:"
- " proxy client/server: %s/%s,"
- " partition client/server: %s/%s",
- px_client, px_server, dst_client, dst_server);
+ if (!sql)
+ sql = makeStringInfo();
+ appendStringInfo(sql, "set standard_conforming_strings = '%s'; ",
+ standard_conforming_strings ? "on" : "off");
+ }
+
+ /*
+ * sync client_encoding
+ */
+ this_enc = pg_get_client_encoding_name();
+ dst_enc = PQparameterStatus(conn->db, "client_encoding");
+ if (dst_enc && strcmp(this_enc, dst_enc))
+ {
+ if (!sql)
+ sql = makeStringInfo();
+ appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc);
+ }
+
+ /*
+ * if second time in this function, there should have been
+ * active already.
+ */
+ if (sql && conn->tuning)
+ {
+ appendStringInfo(sql, "-- those parameters does not seem to apply");
+ conn_error(func, conn, sql->data);
+ }
+
+ /* add statement_timeout to query */
+ if (cf->statement_timeout >= 0 && !conn->tuning)
+ {
+ if (!sql)
+ sql = makeStringInfo();
+ appendStringInfo(sql, "set statement_timeout = %d; ",
+ cf->statement_timeout);
+ }
+
+ /*
+ * send tuning query
+ */
+ if (sql)
+ {
+ conn->tuning = 1;
+ conn->state = C_QUERY_WRITE;
+ if (!PQsendQuery(conn->db, sql->data))
+ conn_error(func, conn, "PQsendQuery");
+ pfree(sql->data);
+ pfree(sql);
+
+ flush_connection(func, conn);
+ return 1;
}
+
+ conn->tuning = 0;
+ return 0;
}
/* send the query to server connection */
int res;
struct timeval now;
ProxyQuery *q = func->remote_sql;
- const char *sql;
ProxyConfig *cf = &func->cur_cluster->config;
int binary_result = 0;
- StringInfoData buf;
gettimeofday(&now, NULL);
conn->query_time = now.tv_sec;
- /* change state to tag conn unclean */
- conn->state = C_QUERY_WRITE;
+ tune_connection(func, conn);
+ if (conn->tuning)
+ return;
/* use binary result only on same backend ver */
if (cf->disable_binary == 0 && conn->same_ver)
}
}
- /* prepared sql, no buffer */
- sql = q->sql;
- buf.data = NULL;
-
- /* add statement_timeout to query */
- if (cf->statement_timeout >= 0)
- {
- initStringInfo(&buf);
- appendStringInfo(&buf, "SET statement_timeout=%d; %s",
- cf->statement_timeout, q->sql);
- sql = buf.data;
- }
-
/* send query */
+ conn->state = C_QUERY_WRITE;
res = PQsendQueryParams(conn->db, q->sql, q->arg_count,
NULL, /* paramTypes */
values, /* paramValues */
conn_error(func, conn, "PQsendQueryParams");
/* flush it down */
- res = PQflush(conn->db);
-
- /* set actual state */
- if (res > 0)
- conn->state = C_QUERY_WRITE;
- else if (res == 0)
- conn->state = C_QUERY_READ;
- else
- conn_error(func, conn, "PQflush");
-
- /* if buffer, free it */
- if (buf.data)
- pfree(buf.data);
+ flush_connection(func, conn);
}
/* returns false of conn should be dropped */
res = PQgetResult(conn->db);
if (res == NULL)
{
- conn->state = C_DONE;
+ if (conn->tuning)
+ conn->state = C_READY;
+ else
+ conn->state = C_DONE;
return false;
}
break;
case PGRES_POLLING_OK:
conn->state = C_READY;
- check_new_connection(conn);
break;
case PGRES_POLLING_ACTIVE:
case PGRES_POLLING_FAILED:
}
break;
case C_QUERY_WRITE:
- res = PQflush(conn->db);
- if (res > 0)
- conn->state = C_QUERY_WRITE;
- else if (res == 0)
- conn->state = C_QUERY_READ;
- else
- conn_error(func, conn, "PQflush");
+ flush_connection(func, conn);
break;
case C_QUERY_READ:
res = PQconsumeInput(conn->db);