From 9a591c1bccc5edeb06b979c59f39753982131181 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 8 May 2017 12:07:59 -0400 Subject: [PATCH] Fix statistics reporting in logical replication workers This new arrangement ensures that statistics are reported right after commit of transactions. The previous arrangement didn't get this quite right and could lead to assertion failures. Author: Petr Jelinek Reported-by: Erik Rijkers --- src/backend/replication/logical/tablesync.c | 18 ++++++++++++++---- src/backend/replication/logical/worker.c | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0823000f00..7e51076b37 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; + bool started_tx = false; Assert(!IsTransactionState()); @@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) table_states = NIL; StartTransactionCommand(); + started_tx = true; /* Fetch all non-ready tables. */ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); @@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } MemoryContextSwitchTo(oldctx); - CommitTransactionCommand(); - table_states_valid = true; } @@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - StartTransactionCommand(); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } SetSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); - CommitTransactionCommand(); } } else @@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } } /* @@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); + pgstat_report_stat(false); /* * We want to do the table data sync in single diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2d7770d4dc..a61240ceee 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s) replorigin_session_origin_timestamp = commit_data.committime; CommitTransactionCommand(); + pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); } @@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); - pgstat_report_stat(false); pgstat_report_activity(STATE_IDLE, NULL); } -- 2.39.5