bdr: Try to make bdr_relcache.c's logic works correctly for recursive flushes.
authorAndres Freund <andres@anarazel.de>
Mon, 3 Nov 2014 11:58:31 +0000 (12:58 +0100)
committerAndres Freund <andres@anarazel.de>
Mon, 3 Nov 2014 12:04:57 +0000 (13:04 +0100)
bdr.h
bdr_relcache.c

diff --git a/bdr.h b/bdr.h
index 727ccfea2c2883c63ac14a8ee0311707639f2ad6..9fc7ba5a5c7f2fd3dc7cbacfb1d32df14d49290b 100644 (file)
--- a/bdr.h
+++ b/bdr.h
@@ -108,15 +108,17 @@ typedef struct BDRRelation
    /* hash key */
    Oid         reloid;
 
+   bool        valid;
+
    Relation    rel;
 
    BDRConflictHandler *conflict_handlers;
    size_t      conflict_handlers_len;
 
    /* ordered list of replication sets of length num_* */
-   char          **replication_sets;
+   char      **replication_sets;
    /* -1 for no configured set */
-   int     num_replication_sets;
+   int         num_replication_sets;
 } BDRRelation;
 
 typedef struct BDRTupleData
index 220a03b9ad05a9103afcc9a061f22574c1f334a3..1e19b83146d3d6d618496ac22eff2cd2b5c73ec5 100644 (file)
@@ -52,12 +52,18 @@ BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
    HASH_SEQ_STATUS status;
    BDRRelation *entry;
 
-   /* callback only gets registered after creating the hash */
-   Assert(BDRRelcacheHash != NULL);
+   /*
+    * We sometimes explicitly invalidate the entire bdr relcache -
+    * independent of actual system caused invalidations. Without that this
+    * situation could not happen as the normall inval callback only gets
+    * registered after creating the hash.
+    */
+   if (BDRRelcacheHash == NULL)
+       return;
 
    /*
     * If relid is InvalidOid, signalling a complete reset, we have to remove
-    * all entries, otherwise just remove the specific relation's entry.
+    * all entries, otherwise just invalidate the specific relation's entry.
     */
    if (relid == InvalidOid)
    {
@@ -65,11 +71,7 @@ BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
 
        while ((entry = (BDRRelation *) hash_seq_search(&status)) != NULL)
        {
-           BDRRelcacheHashInvalidateEntry(entry);
-
-           if (hash_search(BDRRelcacheHash, &entry->reloid,
-                           HASH_REMOVE, NULL) == NULL)
-               elog(ERROR, "hash table corrupted");
+           entry->valid = false;
        }
    }
    else
@@ -77,10 +79,7 @@ BDRRelcacheHashInvalidateCallback(Datum arg, Oid relid)
        if ((entry = hash_search(BDRRelcacheHash, &relid,
                                 HASH_FIND, NULL)) != NULL)
        {
-           BDRRelcacheHashInvalidateEntry(entry);
-
-           hash_search(BDRRelcacheHash, &relid,
-                       HASH_REMOVE, NULL);
+           entry->valid = false;
        }
    }
 }
@@ -265,18 +264,20 @@ bdr_heap_open(Oid reloid, LOCKMODE lockmode)
    entry = hash_search(BDRRelcacheHash, (void *) &reloid,
                        HASH_ENTER, &found);
 
-   if (found)
-   {
-       entry->rel = rel;
+   /* possibly a new relcache.c relcache entry */
+   entry->rel = rel;
+
+   if (found && entry->valid)
        return entry;
-   }
+   else if (found)
+       BDRRelcacheHashInvalidateEntry(entry);
 
    /* zero out data part of the entry */
-   memset(((char *) entry) + sizeof(Oid), 0,
-          sizeof(BDRRelation) - sizeof(Oid));
+   memset(((char *) entry) + offsetof(BDRRelation, conflict_handlers),
+          0,
+          sizeof(BDRRelation) - offsetof(BDRRelation, conflict_handlers));
 
    entry->reloid = reloid;
-   entry->rel = rel;
    entry->num_replication_sets = -1;
 
    object.classId = RelationRelationId;
@@ -286,6 +287,8 @@ bdr_heap_open(Oid reloid, LOCKMODE lockmode)
    label = GetSecurityLabel(&object, "bdr");
    bdr_parse_relation_options(label, entry);
 
+   entry->valid = true;
+
    return entry;
 }