combiner->connections = NULL;
combiner->conn_count = 0;
combiner->combine_type = combine_type;
+ combiner->current_conn_rows_consumed = 0;
combiner->command_complete_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->description_count = 0;
{
slot = combiner->ss.ps.ps_ResultTupleSlot;
CopyDataRowTupleToSlot(combiner, slot);
+ combiner->current_conn_rows_consumed++;
+
+ /*
+ * If we are running simple query protocol, yield the connection
+ * after we process PGXLRemoteFetchSize rows from the connection.
+ * This should allow us to consume rows quickly from other
+ * connections, while this node gets chance to generate more rows
+ * which would then be processed in the next iteration.
+ */
+ if (!combiner->extended_query &&
+ combiner->current_conn_rows_consumed >= PGXLRemoteFetchSize)
+ {
+ if (++combiner->current_conn >= combiner->conn_count)
+ combiner->current_conn = 0;
+ combiner->current_conn_rows_consumed = 0;
+ }
return slot;
}
else if (res == RESPONSE_EOF)
if (++combiner->current_conn >= combiner->conn_count)
combiner->current_conn = 0;
+ combiner->current_conn_rows_consumed = 0;
conn = combiner->connections[combiner->current_conn];
}
else if (res == RESPONSE_COMPLETE)
}
REMOVE_CURR_CONN(combiner);
if (combiner->conn_count > 0)
+ {
conn = combiner->connections[combiner->current_conn];
+ combiner->current_conn_rows_consumed = 0;
+ }
else
return NULL;
}
PGXCNodeHandle **connections; /* Datanode connections being combined */
int conn_count; /* count of active connections */
int current_conn; /* used to balance load when reading from connections */
+ long current_conn_rows_consumed;
CombineType combine_type; /* see CombineType enum */
int command_complete_count; /* count of received CommandComplete messages */
RequestType request_type; /* see RequestType enum */