*/
return POOL_PRIMARY;
}
+ /*
+ * SHOW
+ */
+ else if (IsA(node, VariableShowStmt))
+ {
+ return POOL_EITHER;
+ }
/*
* Other statements are sent to primary
/*
* If operated in streaming replication mode and doing an extended query,
* read backend message according to the query context.
+ * Also we set the transaction state at this point.
*/
if (STREAM && pool_is_doing_extended_query_message())
{
return POOL_END;
p1 = palloc(len);
memcpy(p1, p, len);
+
+ if (session_context->query_context->parse_tree &&
+ is_start_transaction_query(session_context->query_context->parse_tree))
+ TSTATE(backend, i) ='T'; /* we are inside a transaction */
+
+ ereport(DEBUG1,
+ (errmsg("processing command complete"),
+ errdetail("set transaction state to T")));
}
}
}
* - COPY TO STDOUT
* - EXPLAIN
* - EXPLAIN ANALYZE and query is SELECT not including writing functions
+ * - SHOW
*
* Note that for SELECT this function returns false.
*/
return true;
}
}
+ else if (IsA(node, VariableShowStmt)) /* SHOW command */
+ return true;
+
return false;
}
pool_set_ignore_till_sync();
pool_unset_query_in_progress();
- /* Remove all pending messages */
- while (pool_pending_message_pull_out())
- ;
- pool_pending_message_reset_previous_message();
+ if (STREAM)
+ {
+ POOL_PENDING_MESSAGE *pmsg;
+ int i;
+
+ /* Remove all pending messages */
+ do
+ {
+ pmsg = pool_pending_message_pull_out();
+ pool_pending_message_free_pending_message(pmsg);
+ }
+ while (pmsg);
+
+ pool_pending_message_reset_previous_message();
+
+ /* Discard read buffer */
+ for (i=0;i<NUM_BACKENDS;i++)
+ {
+ if (VALID_BACKEND(i))
+ {
+ pool_discard_read_buffer(CONNECTION(backend, i));
+ }
+ }
+ }
}
break;