Make DROP IF EXISTS more consistently not fail
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 23 Jan 2014 17:40:29 +0000 (14:40 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 23 Jan 2014 17:40:29 +0000 (14:40 -0300)
Some cases were still reporting errors and aborting, instead of a NOTICE
that the object was being skipped.  This makes it more difficult to
cleanly handle pg_dump --clean, so change that to instead skip missing
objects properly.

Per bug #7873 reported by Dave Rolsky; apparently this affects a large
number of users.

Authors: Pavel Stehule and Dean Rasheed.  Some tweaks by Álvaro Herrera

17 files changed:
src/backend/catalog/namespace.c
src/backend/catalog/objectaddress.c
src/backend/commands/dropcmds.c
src/backend/commands/functioncmds.c
src/backend/commands/opclasscmds.c
src/backend/commands/tablecmds.c
src/backend/commands/typecmds.c
src/backend/parser/parse_func.c
src/backend/parser/parse_oper.c
src/backend/parser/parse_type.c
src/backend/utils/adt/regproc.c
src/include/catalog/namespace.h
src/include/parser/parse_type.h
src/pl/plpgsql/src/pl_comp.c
src/test/regress/expected/drop_if_exists.out
src/test/regress/expected/event_trigger.out
src/test/regress/sql/drop_if_exists.sql

index 08b26b462a7b059a86175be7de2e1a9c89929350..6c2a5d2af5aef659e2196210fab535c1324b18c6 100644 (file)
@@ -906,10 +906,15 @@ TypeIsVisible(Oid typid)
  * with oid = 0 that represents a set of such conflicting candidates.
  * The caller might end up discarding such an entry anyway, but if it selects
  * such an entry it should react as though the call were ambiguous.
+ *
+ * If missing_ok is true, an empty list (NULL) is returned if the name was
+ * schema- qualified with a schema that does not exist.  Likewise if no
+ * candidate is found for other reasons.
  */
 FuncCandidateList
 FuncnameGetCandidates(List *names, int nargs, List *argnames,
-                                         bool expand_variadic, bool expand_defaults)
+                                         bool expand_variadic, bool expand_defaults,
+                                         bool missing_ok)
 {
        FuncCandidateList resultList = NULL;
        bool            any_special = false;
@@ -928,7 +933,9 @@ FuncnameGetCandidates(List *names, int nargs, List *argnames,
        if (schemaname)
        {
                /* use exact schema given */
-               namespaceId = LookupExplicitNamespace(schemaname, false);
+               namespaceId = LookupExplicitNamespace(schemaname, missing_ok);
+               if (!OidIsValid(namespaceId))
+                       return NULL;
        }
        else
        {
@@ -1414,7 +1421,7 @@ FunctionIsVisible(Oid funcid)
                visible = false;
 
                clist = FuncnameGetCandidates(list_make1(makeString(proname)),
-                                                                         nargs, NIL, false, false);
+                                                                         nargs, NIL, false, false, false);
 
                for (; clist; clist = clist->next)
                {
@@ -1443,7 +1450,8 @@ FunctionIsVisible(Oid funcid)
  * a postfix op.
  *
  * If the operator name is not schema-qualified, it is sought in the current
- * namespace search path.
+ * namespace search path.  If the name is schema-qualified and the given
+ * schema does not exist, InvalidOid is returned.
  */
 Oid
 OpernameGetOprid(List *names, Oid oprleft, Oid oprright)
@@ -1460,21 +1468,26 @@ OpernameGetOprid(List *names, Oid oprleft, Oid oprright)
        {
                /* search only in exact schema given */
                Oid                     namespaceId;
-               HeapTuple       opertup;
 
-               namespaceId = LookupExplicitNamespace(schemaname, false);
-               opertup = SearchSysCache4(OPERNAMENSP,
-                                                                 CStringGetDatum(opername),
-                                                                 ObjectIdGetDatum(oprleft),
-                                                                 ObjectIdGetDatum(oprright),
-                                                                 ObjectIdGetDatum(namespaceId));
-               if (HeapTupleIsValid(opertup))
+               namespaceId = LookupExplicitNamespace(schemaname, true);
+               if (OidIsValid(namespaceId))
                {
-                       Oid                     result = HeapTupleGetOid(opertup);
+                       HeapTuple       opertup;
+
+                       opertup = SearchSysCache4(OPERNAMENSP,
+                                                                         CStringGetDatum(opername),
+                                                                         ObjectIdGetDatum(oprleft),
+                                                                         ObjectIdGetDatum(oprright),
+                                                                         ObjectIdGetDatum(namespaceId));
+                       if (HeapTupleIsValid(opertup))
+                       {
+                               Oid                     result = HeapTupleGetOid(opertup);
 
-                       ReleaseSysCache(opertup);
-                       return result;
+                               ReleaseSysCache(opertup);
+                               return result;
+                       }
                }
+
                return InvalidOid;
        }
 
@@ -1729,7 +1742,7 @@ OperatorIsVisible(Oid oprid)
                 * If it is in the path, it might still not be visible; it could be
                 * hidden by another operator of the same name and arguments earlier
                 * in the path.  So we must do a slow check to see if this is the same
-                * operator that would be found by OpernameGetOprId.
+                * operator that would be found by OpernameGetOprid.
                 */
                char       *oprname = NameStr(oprform->oprname);
 
index a6ec6aa04bdd952354d9d6c8b7115766fca4d341..ea812238be075e0028a3803a19c38933cff31926 100644 (file)
@@ -450,7 +450,7 @@ static void getRelationIdentity(StringInfo buffer, Oid relid);
  * sub-object is looked up, the parent object will be locked instead.
  *
  * If the object is a relation or a child object of a relation (e.g. an
- * attribute or contraint), the relation is also opened and *relp receives
+ * attribute or constraint), the relation is also opened and *relp receives
  * the open relcache entry pointer; otherwise, *relp is set to NULL.  This
  * is a bit grotty but it makes life simpler, since the caller will
  * typically need the relcache entry too.  Caller must close the relcache
@@ -458,9 +458,20 @@ static void getRelationIdentity(StringInfo buffer, Oid relid);
  * if the target object is the relation itself or an attribute, but for other
  * child objects, only AccessShareLock is acquired on the relation.
  *
+ * If the object is not found, an error is thrown, unless missing_ok is
+ * true.  In this case, no lock is acquired, relp is set to NULL, and the
+ * returned address has objectId set to InvalidOid.
+ *
  * We don't currently provide a function to release the locks acquired here;
  * typically, the lock must be held until commit to guard against a concurrent
  * drop operation.
+ *
+ * Note: If the object is not found, we don't give any indication of the
+ * reason.     (It might have been a missing schema if the name was qualified, or
+ * an inexistant type name in case of a cast, function or operator; etc).
+ * Currently there is only one caller that might be interested in such info, so
+ * we don't spend much effort here.  If more callers start to care, it might be
+ * better to add some support for that in this function.
  */
 ObjectAddress
 get_object_address(ObjectType objtype, List *objname, List *objargs,
@@ -580,9 +591,11 @@ get_object_address(ObjectType objtype, List *objname, List *objargs,
                                {
                                        TypeName   *sourcetype = (TypeName *) linitial(objname);
                                        TypeName   *targettype = (TypeName *) linitial(objargs);
-                                       Oid                     sourcetypeid = typenameTypeId(NULL, sourcetype);
-                                       Oid                     targettypeid = typenameTypeId(NULL, targettype);
+                                       Oid                     sourcetypeid;
+                                       Oid                     targettypeid;
 
+                                       sourcetypeid = LookupTypeNameOid(NULL, sourcetype, missing_ok);
+                                       targettypeid = LookupTypeNameOid(NULL, targettype, missing_ok);
                                        address.classId = CastRelationId;
                                        address.objectId =
                                                get_cast_oid(sourcetypeid, targettypeid, missing_ok);
@@ -942,26 +955,31 @@ get_object_address_relobject(ObjectType objtype, List *objname,
 
                /* Extract relation name and open relation. */
                relname = list_truncate(list_copy(objname), nnames - 1);
-               relation = heap_openrv(makeRangeVarFromNameList(relname),
-                                                          AccessShareLock);
-               reloid = RelationGetRelid(relation);
+               relation = heap_openrv_extended(makeRangeVarFromNameList(relname),
+                                                                               AccessShareLock,
+                                                                               missing_ok);
+
+               reloid = relation ? RelationGetRelid(relation) : InvalidOid;
 
                switch (objtype)
                {
                        case OBJECT_RULE:
                                address.classId = RewriteRelationId;
-                               address.objectId = get_rewrite_oid(reloid, depname, missing_ok);
+                               address.objectId = relation ?
+                                       get_rewrite_oid(reloid, depname, missing_ok) : InvalidOid;
                                address.objectSubId = 0;
                                break;
                        case OBJECT_TRIGGER:
                                address.classId = TriggerRelationId;
-                               address.objectId = get_trigger_oid(reloid, depname, missing_ok);
+                               address.objectId = relation ?
+                                       get_trigger_oid(reloid, depname, missing_ok) : InvalidOid;
                                address.objectSubId = 0;
                                break;
                        case OBJECT_CONSTRAINT:
                                address.classId = ConstraintRelationId;
-                               address.objectId =
-                                       get_relation_constraint_oid(reloid, depname, missing_ok);
+                               address.objectId = relation ?
+                                       get_relation_constraint_oid(reloid, depname, missing_ok) :
+                                       InvalidOid;
                                address.objectSubId = 0;
                                break;
                        default:
@@ -975,7 +993,9 @@ get_object_address_relobject(ObjectType objtype, List *objname,
                /* Avoid relcache leak when object not found. */
                if (!OidIsValid(address.objectId))
                {
-                       heap_close(relation, AccessShareLock);
+                       if (relation != NULL)
+                               heap_close(relation, AccessShareLock);
+
                        relation = NULL;        /* department of accident prevention */
                        return address;
                }
@@ -1008,6 +1028,7 @@ get_object_address_attribute(ObjectType objtype, List *objname,
                                 errmsg("column name must be qualified")));
        attname = strVal(lfirst(list_tail(objname)));
        relname = list_truncate(list_copy(objname), list_length(objname) - 1);
+       /* XXX no missing_ok support here */
        relation = relation_openrv(makeRangeVarFromNameList(relname), lockmode);
        reloid = RelationGetRelid(relation);
 
@@ -1053,7 +1074,7 @@ get_object_address_type(ObjectType objtype,
        address.objectId = InvalidOid;
        address.objectSubId = 0;
 
-       tup = LookupTypeName(NULL, typename, NULL);
+       tup = LookupTypeName(NULL, typename, NULL, missing_ok);
        if (!HeapTupleIsValid(tup))
        {
                if (!missing_ok)
@@ -1090,6 +1111,7 @@ get_object_address_opcf(ObjectType objtype,
        ObjectAddress address;
 
        Assert(list_length(objargs) == 1);
+       /* XXX no missing_ok support here */
        amoid = get_am_oid(strVal(linitial(objargs)), false);
 
        switch (objtype)
index b25317ebad427b7cc8f16f2cb49669ccce28d4a3..e64ad8027e22f9455063b6b9b87b74edee9109ba 100644 (file)
@@ -12,7 +12,6 @@
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/heapam.h"
 #include "utils/builtins.h"
 #include "utils/syscache.h"
 
+
 static void does_not_exist_skipping(ObjectType objtype,
                                                List *objname, List *objargs);
+static bool owningrel_does_not_exist_skipping(List *objname,
+                                                                 const char **msg, char **name);
+static bool schema_does_not_exist_skipping(List *objname,
+                                                          const char **msg, char **name);
+static bool type_in_list_does_not_exist_skipping(List *typenames,
+                                                                        const char **msg, char **name);
+
 
 /*
  * Drop one or more objects.
@@ -73,9 +80,14 @@ RemoveObjects(DropStmt *stmt)
                                                                         AccessExclusiveLock,
                                                                         stmt->missing_ok);
 
-               /* Issue NOTICE if supplied object was not found. */
+               /*
+                * Issue NOTICE if supplied object was not found.  Note this is only
+                * relevant in the missing_ok case, because otherwise
+                * get_object_address would have thrown an error.
+                */
                if (!OidIsValid(address.objectId))
                {
+                       Assert(stmt->missing_ok);
                        does_not_exist_skipping(stmt->removeType, objname, objargs);
                        continue;
                }
@@ -125,9 +137,121 @@ RemoveObjects(DropStmt *stmt)
 }
 
 /*
+ * owningrel_does_not_exist_skipping
+ *             Subroutine for RemoveObjects
+ *
+ * After determining that a specification for a rule or trigger returns that
+ * the specified object does not exist, test whether its owning relation, and
+ * its schema, exist or not; if they do, return false --- the trigger or rule
+ * itself is missing instead.  If the owning relation or its schema do not
+ * exist, fill the error message format string and name, and return true.
+ */
+static bool
+owningrel_does_not_exist_skipping(List *objname, const char **msg, char **name)
+{
+       List       *parent_objname;
+       RangeVar   *parent_rel;
+
+       parent_objname = list_truncate(list_copy(objname),
+                                                                  list_length(objname) - 1);
+
+       if (schema_does_not_exist_skipping(parent_objname, msg, name))
+               return true;
+
+       parent_rel = makeRangeVarFromNameList(parent_objname);
+
+       if (!OidIsValid(RangeVarGetRelid(parent_rel, NoLock, true)))
+       {
+               *msg = gettext_noop("relation \"%s\" does not exist, skipping");
+               *name = NameListToString(parent_objname);
+
+               return true;
+       }
+
+       return false;
+}
+
+/*
+ * schema_does_not_exist_skipping
+ *             Subroutine for RemoveObjects
+ *
+ * After determining that a specification for a schema-qualifiable object
+ * refers to an object that does not exist, test whether the specified schema
+ * exists or not.  If no schema was specified, or if the schema does exist,
+ * return false -- the object itself is missing instead.  If the specified
+ * schema does not exist, fill the error message format string and the
+ * specified schema name, and return true.
+ */
+static bool
+schema_does_not_exist_skipping(List *objname, const char **msg, char **name)
+{
+       RangeVar   *rel;
+
+       rel = makeRangeVarFromNameList(objname);
+
+       if (rel->schemaname != NULL &&
+               !OidIsValid(LookupNamespaceNoError(rel->schemaname)))
+       {
+               *msg = gettext_noop("schema \"%s\" does not exist, skipping");
+               *name = rel->schemaname;
+
+               return true;
+       }
+
+       return false;
+}
+
+/*
+ * type_in_list_does_not_exist_skipping
+ *             Subroutine for RemoveObjects
+ *
+ * After determining that a specification for a function, cast, aggregate or
+ * operator returns that the specified object does not exist, test whether the
+ * involved datatypes, and their schemas, exist or not; if they do, return
+ * false --- the original object itself is missing instead.  If the datatypes
+ * or schemas do not exist, fill the error message format string and the
+ * missing name, and return true.
+ *
+ * First parameter is a list of TypeNames.
+ */
+static bool
+type_in_list_does_not_exist_skipping(List *typenames, const char **msg,
+                                                                        char **name)
+{
+       ListCell   *l;
+
+       foreach(l, typenames)
+       {
+               TypeName   *typeName = (TypeName *) lfirst(l);
+
+               if (typeName != NULL)
+               {
+                       Assert(IsA(typeName, TypeName));
+
+                       if (!OidIsValid(LookupTypeNameOid(NULL, typeName, true)))
+                       {
+                               /* type doesn't exist, try to find why */
+                               if (schema_does_not_exist_skipping(typeName->names, msg, name))
+                                       return true;
+
+                               *msg = gettext_noop("type \"%s\" does not exist, skipping");
+                               *name = TypeNameToString(typeName);
+
+                               return true;
+                       }
+               }
+       }
+
+       return false;
+}
+
+/*
+ * does_not_exist_skipping
+ *             Subroutine for RemoveObjects
+ *
  * Generate a NOTICE stating that the named object was not found, and is
  * being skipped.  This is only relevant when "IF EXISTS" is used; otherwise,
- * get_object_address() will throw an ERROR.
+ * get_object_address() in RemoveObjects would have thrown an ERROR.
  */
 static void
 does_not_exist_skipping(ObjectType objtype, List *objname, List *objargs)
@@ -140,81 +264,125 @@ does_not_exist_skipping(ObjectType objtype, List *objname, List *objargs)
        {
                case OBJECT_TYPE:
                case OBJECT_DOMAIN:
-                       msg = gettext_noop("type \"%s\" does not exist, skipping");
-                       name = TypeNameToString(makeTypeNameFromNameList(objname));
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("type \"%s\" does not exist, skipping");
+                               name = TypeNameToString(makeTypeNameFromNameList(objname));
+                       }
                        break;
                case OBJECT_COLLATION:
-                       msg = gettext_noop("collation \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("collation \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_CONVERSION:
-                       msg = gettext_noop("conversion \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("conversion \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_SCHEMA:
                        msg = gettext_noop("schema \"%s\" does not exist, skipping");
                        name = NameListToString(objname);
                        break;
                case OBJECT_TSPARSER:
-                       msg = gettext_noop("text search parser \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("text search parser \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_TSDICTIONARY:
-                       msg = gettext_noop("text search dictionary \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("text search dictionary \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_TSTEMPLATE:
-                       msg = gettext_noop("text search template \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("text search template \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_TSCONFIGURATION:
-                       msg = gettext_noop("text search configuration \"%s\" does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("text search configuration \"%s\" does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_EXTENSION:
                        msg = gettext_noop("extension \"%s\" does not exist, skipping");
                        name = NameListToString(objname);
                        break;
                case OBJECT_FUNCTION:
-                       msg = gettext_noop("function %s(%s) does not exist, skipping");
-                       name = NameListToString(objname);
-                       args = TypeNameListToString(objargs);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name) &&
+                               !type_in_list_does_not_exist_skipping(objargs, &msg, &name))
+                       {
+                               msg = gettext_noop("function %s(%s) does not exist, skipping");
+                               name = NameListToString(objname);
+                               args = TypeNameListToString(objargs);
+                       }
                        break;
                case OBJECT_AGGREGATE:
-                       msg = gettext_noop("aggregate %s(%s) does not exist, skipping");
-                       name = NameListToString(objname);
-                       args = TypeNameListToString(objargs);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name) &&
+                               !type_in_list_does_not_exist_skipping(objargs, &msg, &name))
+                       {
+                               msg = gettext_noop("aggregate %s(%s) does not exist, skipping");
+                               name = NameListToString(objname);
+                               args = TypeNameListToString(objargs);
+                       }
                        break;
                case OBJECT_OPERATOR:
-                       msg = gettext_noop("operator %s does not exist, skipping");
-                       name = NameListToString(objname);
+                       if (!schema_does_not_exist_skipping(objname, &msg, &name) &&
+                               !type_in_list_does_not_exist_skipping(objargs, &msg, &name))
+                       {
+                               msg = gettext_noop("operator %s does not exist, skipping");
+                               name = NameListToString(objname);
+                       }
                        break;
                case OBJECT_LANGUAGE:
                        msg = gettext_noop("language \"%s\" does not exist, skipping");
                        name = NameListToString(objname);
                        break;
                case OBJECT_CAST:
-                       msg = gettext_noop("cast from type %s to type %s does not exist, skipping");
-                       name = format_type_be(typenameTypeId(NULL,
-                                                                                       (TypeName *) linitial(objname)));
-                       args = format_type_be(typenameTypeId(NULL,
-                                                                                       (TypeName *) linitial(objargs)));
+                       {
+                               if (!type_in_list_does_not_exist_skipping(objname, &msg, &name) &&
+                                !type_in_list_does_not_exist_skipping(objargs, &msg, &name))
+                               {
+                                       /* XXX quote or no quote? */
+                                       msg = gettext_noop("cast from type %s to type %s does not exist, skipping");
+                                       name = TypeNameToString((TypeName *) linitial(objname));
+                                       args = TypeNameToString((TypeName *) linitial(objargs));
+                               }
+                       }
                        break;
                case OBJECT_TRIGGER:
-                       msg = gettext_noop("trigger \"%s\" for table \"%s\" does not exist, skipping");
-                       name = strVal(llast(objname));
-                       args = NameListToString(list_truncate(list_copy(objname),
+                       if (!owningrel_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("trigger \"%s\" for relation \"%s\" does not exist, skipping");
+                               name = strVal(llast(objname));
+                               args = NameListToString(list_truncate(list_copy(objname),
                                                                                                  list_length(objname) - 1));
+                       }
                        break;
                case OBJECT_EVENT_TRIGGER:
                        msg = gettext_noop("event trigger \"%s\" does not exist, skipping");
                        name = NameListToString(objname);
                        break;
                case OBJECT_RULE:
-                       msg = gettext_noop("rule \"%s\" for relation \"%s\" does not exist, skipping");
-                       name = strVal(llast(objname));
-                       args = NameListToString(list_truncate(list_copy(objname),
+                       if (!owningrel_does_not_exist_skipping(objname, &msg, &name))
+                       {
+                               msg = gettext_noop("rule \"%s\" for relation \"%s\" does not exist, skipping");
+                               name = strVal(llast(objname));
+