experimantal server parameter syncing
authorMarko Kreen <markokr@gmail.com>
Thu, 11 Oct 2007 12:03:51 +0000 (12:03 +0000)
committerMarko Kreen <markokr@gmail.com>
Thu, 11 Oct 2007 12:03:51 +0000 (12:03 +0000)
src/execute.c
src/plproxy.h

index 1b8a123db4328a57e93a1d3afffed22d63884131..faeeec7a7e849f811ee380e748a82f8b293e92e9 100644 (file)
@@ -68,6 +68,23 @@ cmp_branch(const char *this, const char *that)
        return true;
 }
 
+static void
+flush_connection(ProxyFunction *func, ProxyConnection *conn)
+{
+       int res;
+
+       /* flush it down */
+       res = PQflush(conn->db);
+
+       /* set actual state */
+       if (res > 0)
+               conn->state = C_QUERY_WRITE;
+       else if (res == 0)
+               conn->state = C_QUERY_READ;
+       else
+               conn_error(func, conn, "PQflush");
+}
+
 /*
  * Small sanity checking for new connections.
  *
@@ -75,40 +92,82 @@ cmp_branch(const char *this, const char *that)
  * - Does there happen any encoding conversations?
  * - Difference in standard_conforming_strings.
  */
-static void
-check_new_connection(ProxyConnection *conn)
+static int
+tune_connection(ProxyFunction *func, ProxyConnection *conn)
 {
-       const char *px_client,
-                          *px_server,
-                          *dst_client,
-                          *dst_server;
+       ProxyConfig *cf = &func->cur_cluster->config;
+       const char *this_enc, *dst_enc;
        const char *q_server;
        const char *dst_ver;
        int                     srvquotes = 0;
+       StringInfo      sql = NULL;
 
        dst_ver = PQparameterStatus(conn->db, "server_version");
        conn->same_ver = cmp_branch(dst_ver, PG_VERSION);
 
+       /*
+        * sync standard_conforming_strings
+        */
        q_server = PQparameterStatus(conn->db, "standard_conforming_strings");
        if (q_server && strcasecmp(q_server, "off") != 0)
                srvquotes = 1;
        if (standard_conforming_strings != srvquotes)
-               elog(WARNING, "PL/Proxy: different setting of"
-                        " standard_conforming_strings");
-
-       px_client = pg_get_client_encoding_name();
-       px_server = GetDatabaseEncodingName();
-       dst_client = PQparameterStatus(conn->db, "client_encoding");
-       dst_server = PQparameterStatus(conn->db, "server_encoding");
-       if (strcmp(px_client, px_server)
-               || strcmp(px_client, dst_client)
-               || (dst_server && strcmp(px_client, dst_server)))
        {
-               elog(WARNING, "PL/Proxy: encoding mismatch:"
-                        " proxy client/server: %s/%s,"
-                        " partition client/server: %s/%s",
-                        px_client, px_server, dst_client, dst_server);
+               if (!sql)
+                       sql = makeStringInfo();
+               appendStringInfo(sql, "set standard_conforming_strings = '%s'; ",
+                                                standard_conforming_strings ? "on" : "off");
+       }
+
+       /*
+        * sync client_encoding
+        */
+       this_enc = pg_get_client_encoding_name();
+       dst_enc = PQparameterStatus(conn->db, "client_encoding");
+       if (dst_enc && strcmp(this_enc, dst_enc))
+       {
+               if (!sql)
+                       sql = makeStringInfo();
+               appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc);
+       }
+
+       /*
+        * if second time in this function, there should have been
+        * active already.
+        */
+       if (sql && conn->tuning)
+       {
+               appendStringInfo(sql, "-- those parameters does not seem to apply");
+               conn_error(func, conn, sql->data);
+       }
+
+       /* add statement_timeout to query */
+       if (cf->statement_timeout >= 0 && !conn->tuning)
+       {
+               if (!sql)
+                       sql = makeStringInfo();
+               appendStringInfo(sql, "set statement_timeout = %d; ",
+                                                cf->statement_timeout);
+       }
+
+       /*
+        * send tuning query
+        */
+       if (sql)
+       {
+               conn->tuning = 1;
+               conn->state = C_QUERY_WRITE;
+               if (!PQsendQuery(conn->db, sql->data))
+                       conn_error(func, conn, "PQsendQuery");
+               pfree(sql->data);
+               pfree(sql);
+
+               flush_connection(func, conn);
+               return 1;
        }
+
+       conn->tuning = 0;
+       return 0;
 }
 
 /* send the query to server connection */
@@ -119,16 +178,15 @@ send_query(ProxyFunction *func, ProxyConnection *conn,
        int                     res;
        struct timeval now;
        ProxyQuery *q = func->remote_sql;
-       const char *sql;
        ProxyConfig *cf = &func->cur_cluster->config;
        int                     binary_result = 0;
-       StringInfoData buf;
 
        gettimeofday(&now, NULL);
        conn->query_time = now.tv_sec;
 
-       /* change state to tag conn unclean */
-       conn->state = C_QUERY_WRITE;
+       tune_connection(func, conn);
+       if (conn->tuning)
+               return;
 
        /* use binary result only on same backend ver */
        if (cf->disable_binary == 0 && conn->same_ver)
@@ -146,20 +204,8 @@ send_query(ProxyFunction *func, ProxyConnection *conn,
                }
        }
 
-       /* prepared sql, no buffer */
-       sql = q->sql;
-       buf.data = NULL;
-
-       /* add statement_timeout to query */
-       if (cf->statement_timeout >= 0)
-       {
-               initStringInfo(&buf);
-               appendStringInfo(&buf, "SET statement_timeout=%d; %s",
-                                                cf->statement_timeout, q->sql);
-               sql = buf.data;
-       }
-
        /* send query */
+       conn->state = C_QUERY_WRITE;
        res = PQsendQueryParams(conn->db, q->sql, q->arg_count,
                                                        NULL,           /* paramTypes */
                                                        values,         /* paramValues */
@@ -170,19 +216,7 @@ send_query(ProxyFunction *func, ProxyConnection *conn,
                conn_error(func, conn, "PQsendQueryParams");
 
        /* flush it down */
-       res = PQflush(conn->db);
-
-       /* set actual state */
-       if (res > 0)
-               conn->state = C_QUERY_WRITE;
-       else if (res == 0)
-               conn->state = C_QUERY_READ;
-       else
-               conn_error(func, conn, "PQflush");
-
-       /* if buffer, free it */
-       if (buf.data)
-               pfree(buf.data);
+       flush_connection(func, conn);
 }
 
 /* returns false of conn should be dropped */
@@ -304,7 +338,10 @@ another_result(ProxyFunction *func, ProxyConnection *conn)
        res = PQgetResult(conn->db);
        if (res == NULL)
        {
-               conn->state = C_DONE;
+               if (conn->tuning)
+                       conn->state = C_READY;
+               else
+                       conn->state = C_DONE;
                return false;
        }
 
@@ -351,7 +388,6 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
                                        break;
                                case PGRES_POLLING_OK:
                                        conn->state = C_READY;
-                                       check_new_connection(conn);
                                        break;
                                case PGRES_POLLING_ACTIVE:
                                case PGRES_POLLING_FAILED:
@@ -359,13 +395,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
                        }
                        break;
                case C_QUERY_WRITE:
-                       res = PQflush(conn->db);
-                       if (res > 0)
-                               conn->state = C_QUERY_WRITE;
-                       else if (res == 0)
-                               conn->state = C_QUERY_READ;
-                       else
-                               conn_error(func, conn, "PQflush");
+                       flush_connection(func, conn);
                        break;
                case C_QUERY_READ:
                        res = PQconsumeInput(conn->db);
index b7756dc33e89019c5ace104182336c71d2739418..9d5ca1fe11e3ac135246686bcb5e67e10ee1725f 100644 (file)
@@ -119,6 +119,7 @@ typedef struct
        time_t          query_time;             /* When last query was sent */
        unsigned        run_on:1;               /* True it this connection should be used */
        unsigned        same_ver:1;             /* True if dest backend has same X.Y ver */
+       unsigned        tuning:1;               /* True if tuning query is running on conn */
 } ProxyConnection;
 
 /* Info about one cluster */