Switch connections after processing PGXLRemoteFetchSize rows
authorPavan Deolasee <pavan.deolasee@gmail.com>
Mon, 5 Jun 2017 03:59:24 +0000 (09:29 +0530)
committerPavan Deolasee <pavan.deolasee@gmail.com>
Wed, 7 Jun 2017 05:54:34 +0000 (11:24 +0530)
Fast-query-shipping consumes all rows produced by one datanode (connection)
before moving to the next connection. This leads to suboptimal performance
when the datanodes can't prroduce tuples at a desired pace. Instead, we switch
between connections after every PGXLRemoteFetchSize (pgx_remote_fetch_size)
rows are fetched. This gives datanode a chance to produce more tuples while
the coordinator consumes tuples already produced and sent across by another
datanode.

This seems to improve performance for FQS-ed queries significantly when they
are returning large number of rows from more than one datanodes.

Report by Pilar de Teodoro <pteodoro@sciops.esa.int>, initial analysis and
performance tests by Tomas Vondra, further analysis and patch by me.

src/backend/pgxc/pool/execRemote.c
src/include/pgxc/execRemote.h

index 3f46ef2fe207ac6ff36305f467dfb21fba1d396e..2acc4de3c796acd7c6a1b45a6dfefed25c4b3c31 100644 (file)
@@ -221,6 +221,7 @@ InitResponseCombiner(ResponseCombiner *combiner, int node_count,
        combiner->connections = NULL;
        combiner->conn_count = 0;
        combiner->combine_type = combine_type;
+       combiner->current_conn_rows_consumed = 0;
        combiner->command_complete_count = 0;
        combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
        combiner->description_count = 0;
@@ -1377,6 +1378,22 @@ FetchTuple(ResponseCombiner *combiner)
                {
                        slot = combiner->ss.ps.ps_ResultTupleSlot;
                        CopyDataRowTupleToSlot(combiner, slot);
+                       combiner->current_conn_rows_consumed++;
+
+                       /*
+                        * If we are running simple query protocol, yield the connection
+                        * after we process PGXLRemoteFetchSize rows from the connection.
+                        * This should allow us to consume rows quickly from other
+                        * connections, while this node gets chance to generate more rows
+                        * which would then be processed in the next iteration.
+                        */
+                       if (!combiner->extended_query &&
+                               combiner->current_conn_rows_consumed >= PGXLRemoteFetchSize)
+                       {
+                               if (++combiner->current_conn >= combiner->conn_count)
+                                       combiner->current_conn = 0;
+                               combiner->current_conn_rows_consumed = 0;
+                       }
                        return slot;
                }
                else if (res == RESPONSE_EOF)
@@ -1432,6 +1449,7 @@ FetchTuple(ResponseCombiner *combiner)
 
                        if (++combiner->current_conn >= combiner->conn_count)
                                combiner->current_conn = 0;
+                       combiner->current_conn_rows_consumed = 0;
                        conn = combiner->connections[combiner->current_conn];
                }
                else if (res == RESPONSE_COMPLETE)
@@ -1453,7 +1471,10 @@ FetchTuple(ResponseCombiner *combiner)
                                }
                                REMOVE_CURR_CONN(combiner);
                                if (combiner->conn_count > 0)
+                               {
                                        conn = combiner->connections[combiner->current_conn];
+                                       combiner->current_conn_rows_consumed = 0;
+                               }
                                else
                                        return NULL;
                        }
index 928e55d62760f498f1067c1283334761f34fb54f..2a465e3e12ff47e3e2963d451e9089c4bb4f4564 100644 (file)
@@ -87,6 +87,7 @@ typedef struct ResponseCombiner
        PGXCNodeHandle **connections;           /* Datanode connections being combined */
        int                     conn_count;                             /* count of active connections */
        int                     current_conn;                   /* used to balance load when reading from connections */
+       long            current_conn_rows_consumed;
        CombineType combine_type;                       /* see CombineType enum */
        int                     command_complete_count; /* count of received CommandComplete messages */
        RequestType request_type;                       /* see RequestType enum */