Add test for pg_recvlogical reconnection behavior.
authorFujii Masao <fujii@postgresql.org>
Fri, 16 Jan 2026 03:36:34 +0000 (12:36 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 16 Jan 2026 03:36:34 +0000 (12:36 +0900)
This commit adds a test to verify that data already received and flushed by
pg_recvlogical is not streamed again even after the connection is lost,
reestablished, and logical replication is restarted.

Author: Mircea Cadariu <cadariu.mircea@gmail.com>
Co-authored-by: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAHGQGwFeTymZQ7RLvMU6WuDGar8bUQCazg=VOfA-9GeBkg-FzA@mail.gmail.com

src/bin/pg_basebackup/t/030_pg_recvlogical.pl

index e4ef44c4d4f5eb3b86b388643631aec4361249d0..063ad96b9be512f21342f03ade583dc90b07b5c0 100644 (file)
@@ -6,6 +6,7 @@ use warnings FATAL => 'all';
 use PostgreSQL::Test::Utils;
 use PostgreSQL::Test::Cluster;
 use Test::More;
+use Config;
 
 program_help_ok('pg_recvlogical');
 program_version_ok('pg_recvlogical');
@@ -151,4 +152,97 @@ my $result = $node->safe_psql('postgres',
 );
 is($result, 't', "failover is enabled for the new slot");
 
+# Test that when pg_recvlogical reconnects, it does not write duplicate
+# records to the output file
+my $outfile = $node->basedir . '/reconnect.out';
+
+$node->command_ok(
+   [
+       'pg_recvlogical',
+       '--slot' => 'reconnect_test',
+       '--dbname' => $node->connstr('postgres'),
+       '--create-slot',
+   ],
+   'slot created for reconnection test');
+
+# Insert the first record for this test
+$node->safe_psql('postgres', 'INSERT INTO test_table VALUES (1)');
+
+my @pg_recvlogical_cmd = (
+   'pg_recvlogical',
+   '--slot' => 'reconnect_test',
+   '--dbname' => $node->connstr('postgres'),
+   '--start',
+   '--file' => $outfile,
+   '--fsync-interval' => '1',
+   '--status-interval' => '100',
+   '--verbose');
+
+# On Windows, specify --endpos so pg_recvlogical can terminate, since
+# signals cannot be used. Use the current LSN plus 32MB as endpos, which
+# would be sufficient to cover the WAL generated by the test INSERTs.
+if ($Config{osname} eq 'MSWin32')
+{
+   $nextlsn = $node->safe_psql('postgres',
+       "SELECT pg_current_wal_insert_lsn() + pg_size_bytes('32MB')");
+   chomp($nextlsn);
+   push(@pg_recvlogical_cmd, '--endpos' => $nextlsn);
+}
+
+my ($stdout, $stderr);
+my $recv = IPC::Run::start(
+   [@pg_recvlogical_cmd],
+   '>' => \$stdout,
+   '2>' => \$stderr);
+
+# Wait for pg_recvlogical to receive and write the first INSERT
+my $first_ins = wait_for_file($outfile, qr/INSERT/);
+
+# Terminate the walsender to force pg_recvlogical to reconnect
+my $backend_pid = $node->safe_psql('postgres',
+   "SELECT active_pid FROM pg_replication_slots WHERE slot_name = 'reconnect_test'"
+);
+$node->safe_psql('postgres', "SELECT pg_terminate_backend($backend_pid)");
+
+# Wait for pg_recvlogical to reconnect
+$node->poll_query_until('postgres',
+   "SELECT active_pid IS NOT NULL AND active_pid != $backend_pid FROM pg_replication_slots WHERE slot_name = 'reconnect_test'"
+) or die "Timed out while waiting for pg_recvlogical to reconnect";
+
+# Insert the second record for this test
+$node->safe_psql('postgres', 'INSERT INTO test_table VALUES (2)');
+
+# Wait for pg_recvlogical to receive and write the second INSERT
+wait_for_file($outfile, qr/INSERT/, $first_ins);
+
+# Terminate pg_recvlogical by generating WAL until the current position
+# reaches the specified --endpos on Windows, or by sending a TERM signal
+# on other platforms.
+if ($Config{osname} eq 'MSWin32')
+{
+   $node->poll_query_until('postgres',
+       "SELECT pg_switch_wal() >= '$nextlsn' FROM pg_logical_emit_message(false, 'test', 'test')"
+   ) or die "Timed out while waiting for pg_recvlogical to end";
+}
+else
+{
+   $recv->signal('TERM');
+}
+
+$recv->finish();
+
+my $outfiledata = slurp_file("$outfile");
+my $count = (() = $outfiledata =~ /INSERT/g);
+cmp_ok($count, '==', 2,
+   'pg_recvlogical has received and written two INSERTs');
+
+$node->command_ok(
+   [
+       'pg_recvlogical',
+       '--slot' => 'reconnect_test',
+       '--dbname' => $node->connstr('postgres'),
+       '--drop-slot'
+   ],
+   'reconnect_test slot dropped');
+
 done_testing();