Handle temp sequences so that duplicate values are not produced.
authorPavan Deolasee <pavan.deolasee@gmail.com>
Wed, 12 Apr 2017 14:07:37 +0000 (19:37 +0530)
committerPavan Deolasee <pavan.deolasee@gmail.com>
Mon, 17 Apr 2017 08:13:54 +0000 (13:43 +0530)
We used to keep the temporary sequences on the local node and generate sequence
values locally. But when nextval is pushed down to the datanodes, each node
might end up producing the same value and thus causing duplicates. Instead we
now handle the temporary sequences on the GTM too. But instead of schema
qualifying sequence names, we use coordinator name and coordinator PID to
uniquely identify the sequence.

Report by Tomas Vondra and fixes by me.

src/backend/catalog/dependency.c
src/backend/commands/sequence.c
src/backend/commands/tablecmds.c
src/test/regress/expected/sequence.out
src/test/regress/sql/sequence.sql

index 51b1d0703e545d606fc851c12cd00ddd9ad5d95a..373e6402351b3cf4b33f4764bfe7e26b12597d01 100644 (file)
@@ -419,8 +419,7 @@ doRename(const ObjectAddress *object, const char *oldname, const char *newname)
                         * An operation with GTM can just be done from a remote Coordinator.
                         */
                        if (relKind == RELKIND_SEQUENCE &&
-                               IS_PGXC_LOCAL_COORDINATOR &&
-                               !IsTempSequence(object->objectId))
+                               IS_PGXC_LOCAL_COORDINATOR)
                        {
                                Relation relseq = relation_open(object->objectId, AccessShareLock);
                                char *seqname = GetGlobalSeqName(relseq, NULL, oldname);
@@ -1268,7 +1267,6 @@ doDeletion(const ObjectAddress *object, int flags)
                                                 * Sequence is dropped on GTM by a remote Coordinator only
                                                 * for a non temporary sequence.
                                                 */
-                                               if (!IsTempSequence(object->objectId))
                                                {
                                                        /*
                                                         * The sequence has already been removed from Coordinator,
index 820c4abf0be8210e5c8a35b34de7c4fc98b2fe08..750c86f9237111ae0477acc18ca26c93d501c903 100644 (file)
@@ -331,9 +331,7 @@ DefineSequence(CreateSeqStmt *seq)
         * Remote Coordinator is in charge of creating sequence in GTM.
         * If sequence is temporary, it is not necessary to create it on GTM.
         */
-       if (IS_PGXC_LOCAL_COORDINATOR &&
-               (seq->sequence->relpersistence == RELPERSISTENCE_PERMANENT ||
-                seq->sequence->relpersistence == RELPERSISTENCE_UNLOGGED))
+       if (IS_PGXC_LOCAL_COORDINATOR)
        {
                char *seqname = GetGlobalSeqName(rel, NULL, NULL);
 
@@ -698,9 +696,6 @@ nextval_internal(Oid relid)
                                next,
                                rescnt = 0;
        bool            logit = false;
-#ifdef PGXC
-       bool            is_temp;
-#endif
 
        /* open and AccessShareLock sequence */
        init_sequence(relid, &elm, &seqrel);
@@ -716,9 +711,6 @@ nextval_internal(Oid relid)
        if (!seqrel->rd_islocaltemp)
                PreventCommandIfReadOnly("nextval()");
 
-#ifdef PGXC
-       is_temp = seqrel->rd_backend == MyBackendId;
-#endif
        /*
         * Forbid this during parallel operation because, to make it work, the
         * cooperating backends would need to share the backend-local cached
@@ -740,9 +732,6 @@ nextval_internal(Oid relid)
        seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
        page = BufferGetPage(buf);
 
-#ifdef PGXC  /* PGXC_COORD */
-       /* Allow nextval executed on datanodes */
-       if (!is_temp)
        {
                int64 range = seq->cache_value; /* how many values to ask from GTM? */
                int64 rangemax; /* the max value returned from the GTM for our request */
@@ -822,16 +811,7 @@ nextval_internal(Oid relid)
 
                last_used_seq = elm;
        }
-       else
-       {
-#endif
-       last = next = result = seq->last_value;
-       incby = seq->increment_by;
-       maxv = seq->max_value;
-       minv = seq->min_value;
-#ifdef PGXC
-       }
-#endif
+
        fetch = cache = seq->cache_value;
        log = seq->log_cnt;
 
@@ -875,168 +855,17 @@ nextval_internal(Oid relid)
                 * Check MAXVALUE for ascending sequences and MINVALUE for descending
                 * sequences
                 */
-#ifdef PGXC
-               /* Temporary sequences go through normal process */
-               if (is_temp)
-               {
-#endif
-               /* Result has been checked and received from GTM */
-               if (incby > 0)
-               {
-                       /* ascending sequence */
-                       if ((maxv >= 0 && next > maxv - incby) ||
-                               (maxv < 0 && next + incby > maxv))
-                       {
-                               if (rescnt > 0)
-                                       break;          /* stop fetching */
-                               if (!seq->is_cycled)
-                               {
-                                       char            buf[100];
-
-                                       snprintf(buf, sizeof(buf), INT64_FORMAT, maxv);
-                                       ereport(ERROR,
-                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                                  errmsg("nextval: reached maximum value of sequence \"%s\" (%s)",
-                                                                 RelationGetRelationName(seqrel), buf)));
-                               }
-                               next = minv;
-                       }
-                       else
-                               next += incby;
-               }
-               else
-               {
-                       /* descending sequence */
-                       if ((minv < 0 && next < minv - incby) ||
-                               (minv >= 0 && next + incby < minv))
-                       {
-                               if (rescnt > 0)
-                                       break;          /* stop fetching */
-                               if (!seq->is_cycled)
-                               {
-                                       char            buf[100];
-
-                                       snprintf(buf, sizeof(buf), INT64_FORMAT, minv);
-                                       ereport(ERROR,
-                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                                  errmsg("nextval: reached minimum value of sequence \"%s\" (%s)",
-                                                                 RelationGetRelationName(seqrel), buf)));
-                               }
-                               next = maxv;
-                       }
-                       else
-                               next += incby;
-               }
-#ifdef PGXC
-               }
-#endif
                fetch--;
                if (rescnt < cache)
                {
                        log--;
                        rescnt++;
-#ifdef PGXC
-                       /* Temporary sequences can go through normal process */
-                       if (is_temp)
-                       {
-                       /*
-                        * This part is not taken into account,
-                        * result has been received from GTM
-                        */
-#endif
-                       last = next;
-                       if (rescnt == 1)        /* if it's first result - */
-                               result = next;  /* it's what to return */
-#ifdef PGXC
-                       }
-#endif
                }
        }
 
        log -= fetch;                           /* adjust for any unfetched numbers */
        Assert(log >= 0);
-
-#ifdef PGXC
-       /* Temporary sequences go through normal process */
-       if (is_temp)
-       {
-       /* Result has been received from GTM */
-#endif
-       /* save info in local cache */
-       elm->last = result;                     /* last returned number */
-       elm->cached = last;                     /* last fetched number */
-       elm->last_valid = true;
-
-       last_used_seq = elm;
-
-       /*
-        * If something needs to be WAL logged, acquire an xid, so this
-        * transaction's commit will trigger a WAL flush and wait for syncrep.
-        * It's sufficient to ensure the toplevel transaction has an xid, no need
-        * to assign xids subxacts, that'll already trigger an appropriate wait.
-        * (Have to do that here, so we're outside the critical section)
-        */
-       if (logit && RelationNeedsWAL(seqrel))
-               GetTopTransactionId();
-
-       /* ready to change the on-disk (or really, in-buffer) tuple */
-       START_CRIT_SECTION();
-
-       /*
-        * We must mark the buffer dirty before doing XLogInsert(); see notes in
-        * SyncOneBuffer().  However, we don't apply the desired changes just yet.
-        * This looks like a violation of the buffer update protocol, but it is in
-        * fact safe because we hold exclusive lock on the buffer.  Any other
-        * process, including a checkpoint, that tries to examine the buffer
-        * contents will block until we release the lock, and then will see the
-        * final state that we install below.
-        */
-       MarkBufferDirty(buf);
-
-       /* XLOG stuff */
-       if (logit && RelationNeedsWAL(seqrel))
-       {
-               xl_seq_rec      xlrec;
-               XLogRecPtr      recptr;
-
-               /*
-                * We don't log the current state of the tuple, but rather the state
-                * as it would appear after "log" more fetches.  This lets us skip
-                * that many future WAL records, at the cost that we lose those
-                * sequence values if we crash.
-                */
-               XLogBeginInsert();
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
-
-               /* set values that will be saved in xlog */
-               seq->last_value = next;
-               seq->is_called = true;
-               seq->log_cnt = 0;
-
-               xlrec.node = seqrel->rd_node;
-
-               XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
-               XLogRegisterData((char *) seqtuple.t_data, seqtuple.t_len);
-
-               recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
-
-               PageSetLSN(page, recptr);
-       }
-
-       /* Now update sequence tuple to the intended final state */
-       seq->last_value = last;         /* last fetched number */
-       seq->is_called = true;
-       seq->log_cnt = log;                     /* how much is logged */
-
-       END_CRIT_SECTION();
-
-#ifdef PGXC
-       }
-       else
-       {
-               seq->log_cnt = log;
-       }
-#endif
+       seq->log_cnt = log;
        UnlockReleaseBuffer(buf);
 
        relation_close(seqrel, NoLock);
@@ -1141,9 +970,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
        Buffer          buf;
        HeapTupleData seqtuple;
        Form_pg_sequence seq;
-#ifdef PGXC
-       bool            is_temp;
-#endif
 
        /* open and AccessShareLock sequence */
        init_sequence(relid, &elm, &seqrel);
@@ -1158,9 +984,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
        if (!seqrel->rd_islocaltemp)
                PreventCommandIfReadOnly("setval()");
 
-#ifdef PGXC
-       is_temp = seqrel->rd_backend == MyBackendId;
-#endif
        /*
         * Forbid this during parallel operation because, to make it work, the
         * cooperating backends would need to share the backend-local cached
@@ -1187,9 +1010,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
                                                bufm, bufx)));
        }
 
-#ifdef PGXC
-       /* Allow to execute on datanodes */
-       if (!is_temp)
        {
                char *seqname = GetGlobalSeqName(seqrel, NULL, NULL);
 
@@ -1211,57 +1031,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
                elm->cached = elm->last;
        }
-       else
-       {
-#endif
-
-       /* Set the currval() state only if iscalled = true */
-       if (iscalled)
-       {
-               elm->last = next;               /* last returned number */
-               elm->last_valid = true;
-       }
-
-       /* In any case, forget any future cached numbers */
-       elm->cached = elm->last;
-
-       /* check the comment above nextval_internal()'s equivalent call. */
-       if (RelationNeedsWAL(seqrel))
-               GetTopTransactionId();
-
-       /* ready to change the on-disk (or really, in-buffer) tuple */
-       START_CRIT_SECTION();
-
-       seq->last_value = next;         /* last fetched number */
-       seq->is_called = iscalled;
-       seq->log_cnt = 0;
-
-       MarkBufferDirty(buf);
-
-       /* XLOG stuff */
-       if (RelationNeedsWAL(seqrel))
-       {
-               xl_seq_rec      xlrec;
-               XLogRecPtr      recptr;
-               Page            page = BufferGetPage(buf);
-
-               XLogBeginInsert();
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
-
-               xlrec.node = seqrel->rd_node;
-               XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
-               XLogRegisterData((char *) seqtuple.t_data, seqtuple.t_len);
-
-               recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
-
-               PageSetLSN(page, recptr);
-       }
-
-       END_CRIT_SECTION();
-
-#ifdef PGXC
-       }
-#endif
 
        UnlockReleaseBuffer(buf);
 
@@ -1774,9 +1543,10 @@ init_params(List *options, bool isInit,
 char *
 GetGlobalSeqName(Relation seqrel, const char *new_seqname, const char *new_schemaname)
 {
-       char *seqname, *dbname, *schemaname, *relname;
+       char *seqname, *dbname, *relname;
+       char namespace[NAMEDATALEN * 2];
        int charlen;
-
+       bool is_temp = seqrel->rd_backend == MyBackendId;
        /* Get all the necessary relation names */
        dbname = get_database_name(seqrel->rd_node.dbNode);
 
@@ -1785,13 +1555,34 @@ GetGlobalSeqName(Relation seqrel, const char *new_seqname, const char *new_schem
        else
                relname = RelationGetRelationName(seqrel);
 
-       if (new_schemaname)
-               schemaname = (char *) new_schemaname;
+       if (!is_temp)
+       {
+               /*
+                * For a permanent sequence, use schema qualified name. That can
+                * uniquely identify the sequences.
+                */
+               char *schema = get_namespace_name(RelationGetNamespace(seqrel));
+               sprintf(namespace, "%s", new_schemaname ? new_schemaname : schema);
+               pfree(schema);
+       }
        else
-               schemaname = get_namespace_name(RelationGetNamespace(seqrel));
+       {
+               /*
+                * For temporary sequences, we use originating coordinator name and
+                * originating coordinator PID to qualify the sequence name. If we are
+                * running on the local coordinator, we can readily fetch that
+                * information from PGXCNodeName and MyProcPid, but when running on
+                * remote datanode, we must consult MyCoordName and MyProcPid to get
+                * the correct information.
+                */
+               if (IS_PGXC_LOCAL_COORDINATOR)
+                       sprintf(namespace, "%s.%d", PGXCNodeName, MyProcPid);
+               else
+                       sprintf(namespace, "%s.%d", MyCoordName, MyCoordPid);
+       }
 
        /* Calculate the global name size including the dots and \0 */
-       charlen = strlen(dbname) + strlen(schemaname) + strlen(relname) + 3;
+       charlen = strlen(dbname) + strlen(namespace) + strlen(relname) + 3;
        seqname = (char *) palloc(charlen);
 
        /* Form a unique sequence name with schema and database name for GTM */
@@ -1799,13 +1590,11 @@ GetGlobalSeqName(Relation seqrel, const char *new_seqname, const char *new_schem
                         charlen,
                         "%s.%s.%s",
                         dbname,
-                        schemaname,
+                        namespace,
                         relname);
 
        if (dbname)
                pfree(dbname);
-       if (schemaname)
-               pfree(schemaname);
 
        return seqname;
 }
index 2fabb54ffae8a0d6fc06ee96972c2e29ffa7b0b7..de79a6a8ffcfa38378d4e5dc5d8eab2f82baf09f 100644 (file)
@@ -2715,8 +2715,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal)
        /* Operation with GTM can only be done with a Remote Coordinator */
        if (IS_PGXC_LOCAL_COORDINATOR &&
                (targetrelation->rd_rel->reltype == OBJECT_SEQUENCE ||
-                targetrelation->rd_rel->relkind == RELKIND_SEQUENCE) &&
-               !IsTempSequence(myrelid)) /* It is possible to rename a sequence with ALTER TABLE */
+                targetrelation->rd_rel->relkind == RELKIND_SEQUENCE))
        {
                char *seqname = GetGlobalSeqName(targetrelation, NULL, NULL);
                char *newseqname = GetGlobalSeqName(targetrelation, newrelname, NULL);
@@ -12228,8 +12227,7 @@ AlterTableNamespaceInternal(Relation rel, Oid oldNspOid, Oid nspOid,
 #ifdef PGXC
        /* Rename also sequence on GTM for a sequence */
        if (IS_PGXC_LOCAL_COORDINATOR &&
-               rel->rd_rel->relkind == RELKIND_SEQUENCE &&
-               !IsTempSequence(RelationGetRelid(rel)))
+               rel->rd_rel->relkind == RELKIND_SEQUENCE)
        {
                char *seqname = GetGlobalSeqName(rel, NULL, NULL);
                char *newseqname = GetGlobalSeqName(rel, NULL, get_namespace_name(nspOid));
@@ -12429,8 +12427,7 @@ AlterSeqNamespaces(Relation classRel, Relation rel,
 
 #ifdef PGXC
                /* Change also this sequence name on GTM */
-               if (IS_PGXC_LOCAL_COORDINATOR &&
-                       !IsTempSequence(RelationGetRelid(seqRel)))
+               if (IS_PGXC_LOCAL_COORDINATOR)
                {
                        char *seqname = GetGlobalSeqName(seqRel, NULL, NULL);
                        char *newseqname = GetGlobalSeqName(seqRel, NULL, get_namespace_name(newNspOid));
index f403c31bb6e1bcad54eaeabf8860c0b24c63d410..ff0e0004cc38c324f3367ad000f959ac0ecb3f3f 100644 (file)
@@ -518,3 +518,23 @@ SELECT * FROM information_schema.sequences WHERE sequence_name IN
 
 DROP USER seq_user;
 DROP SEQUENCE seq;
+create table test_seqtab (unique1 int, unique2 int);
+insert into test_seqtab select i, i from generate_series(1,1000) s(i);
+create temp sequence testseq;
+select unique1, unique2, nextval('testseq')
+  from test_seqtab order by unique2 limit 10;
+ unique1 | unique2 | nextval 
+---------+---------+---------
+       1 |       1 |       1
+       2 |       2 |       3
+       3 |       3 |       2
+       4 |       4 |       4
+       5 |       5 |       5
+       6 |       6 |       7
+       7 |       7 |       6
+       8 |       8 |       9
+       9 |       9 |      11
+      10 |      10 |       8
+(10 rows)
+
+drop table test_seqtab;
index 3182fc7d516a4dc02e4ec3d6a0825ece813b15ed..855a5f82a264277b804e19435d70e0bdac8fe917 100644 (file)
@@ -263,3 +263,11 @@ SELECT * FROM information_schema.sequences WHERE sequence_name IN
 
 DROP USER seq_user;
 DROP SEQUENCE seq;
+
+create table test_seqtab (unique1 int, unique2 int);
+insert into test_seqtab select i, i from generate_series(1,1000) s(i);
+
+create temp sequence testseq;
+select unique1, unique2, nextval('testseq')
+  from test_seqtab order by unique2 limit 10;
+drop table test_seqtab;