TransactionId xid);
static void init_tuple_slot(PGOutputData *data, Relation relation,
RelationSyncEntry *entry);
+static void pgoutput_memory_context_reset(void *arg);
/* row filter routines */
static EState *create_estate_for_relation(Relation rel);
errmsg("option \"%s\" missing", "publication_names"));
}
+/*
+ * Memory context reset callback of PGOutputData->context.
+ */
+static void
+pgoutput_memory_context_reset(void *arg)
+{
+ if (RelationSyncCache)
+ {
+ hash_destroy(RelationSyncCache);
+ RelationSyncCache = NULL;
+ }
+}
+
/*
* Initialize this plugin
*/
{
PGOutputData *data = palloc0(sizeof(PGOutputData));
static bool publication_callback_registered = false;
+ MemoryContextCallback *mcallback;
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
"logical replication publication list context",
ALLOCSET_SMALL_SIZES);
+ /*
+ * Ensure to cleanup RelationSyncCache even when logical decoding invoked
+ * via SQL interface ends up with an error.
+ */
+ mcallback = palloc0(sizeof(MemoryContextCallback));
+ mcallback->func = pgoutput_memory_context_reset;
+ MemoryContextRegisterResetCallback(ctx->context, mcallback);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
{
- if (RelationSyncCache)
- {
- hash_destroy(RelationSyncCache);
- RelationSyncCache = NULL;
- }
+ pgoutput_memory_context_reset(NULL);
}
/*