Do not add a newline ('\n') between rows while running a BINARY COPY protocol.
authorPavan Deolasee <pavan.deolasee@gmail.com>
Fri, 10 Mar 2017 09:07:20 +0000 (14:37 +0530)
committerPavan Deolasee <pavan.deolasee@gmail.com>
Fri, 10 Mar 2017 09:07:20 +0000 (14:37 +0530)
PostgreSQL's BINARY COPY protocol does not expect a newline character between
rows. But coordinator was adding this while running the protocol between
coordinator and the datanodes. While Postgres-XL had some provision to deal
with this on the datanode side, it seemed either insufficient or buggy as
evident during tests with pglogical. So get rid of that and make the protocol
same as vanilla PG.

src/backend/commands/copy.c
src/backend/pgxc/locator/redistrib.c
src/backend/pgxc/pool/execRemote.c
src/include/pgxc/execRemote.h

index 09bbcb2b92d2aa2a0a6c9d2c528d07351e1d4629..b90d3c4812605d677bf88c37621fb043ff16439d 100644 (file)
@@ -690,13 +690,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                                        break;
                                        }
                                }
-#ifdef PGXC
-                               /* A PGXC Datanode does not need to read the header data received from Coordinator */
-                               if (IS_PGXC_DATANODE &&
-                                       cstate->binary &&
-                                       cstate->fe_msgbuf->data[cstate->fe_msgbuf->len-1] == '\n')
-                                       cstate->fe_msgbuf->len--;
-#endif
                                avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
                                if (avail > maxread)
                                        avail = maxread;
@@ -2594,7 +2587,8 @@ CopyFrom(CopyState cstate)
                        if (DataNodeCopyIn(cstate->line_buf.data,
                                                           cstate->line_buf.len,
                                                           GET_NODES(rcstate->locator, value, isnull, NULL),
-                                          (PGXCNodeHandle**) getLocatorResults(rcstate->locator)))
+                                                          (PGXCNodeHandle**) getLocatorResults(rcstate->locator),
+                                                          cstate->binary))
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_CONNECTION_EXCEPTION),
                                                         errmsg("Copy failed on a data node")));
@@ -2716,7 +2710,8 @@ CopyFrom(CopyState cstate)
                if (DataNodeCopyIn(cstate->line_buf.data,
                                                   cstate->line_buf.len,
                                                   getLocatorNodeCount(rcstate->locator),
-                                          (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator)))
+                                                  (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator),
+                                                  cstate->binary))
                                ereport(ERROR,
                                                (errcode(ERRCODE_CONNECTION_EXCEPTION),
                                                 errmsg("Copy failed on a data node")));
index eecca4781e1810b16aacdf2db8a3ebec479049d3..6999dc7e2a68d8cba3c581c60b015c17a1379897 100644 (file)
@@ -573,7 +573,9 @@ distrib_copy_from(RedistribState *distribState, ExecNodes *exec_nodes)
 
                if (DataNodeCopyIn(data, len,
                                                   GET_NODES(copyState->locator, value, is_null, NULL),
-                                  (PGXCNodeHandle**) getLocatorResults(copyState->locator)))
+                                                  (PGXCNodeHandle**)
+                                                  getLocatorResults(copyState->locator),
+                                                  false))
                                ereport(ERROR,
                                                (errcode(ERRCODE_CONNECTION_EXCEPTION),
                                                 errmsg("Copy failed on a data node")));
index ba1c0e3f08569779959a2856face2113fcef8e23..fe2d8108a4fa18689b8ecc084be0643f1813edca 100644 (file)
@@ -2862,10 +2862,12 @@ DataNodeCopyBegin(RemoteCopyData *rcstate)
  * Send a data row to the specified nodes
  */
 int
-DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_connections)
+DataNodeCopyIn(char *data_row, int len,
+               int conn_count, PGXCNodeHandle** copy_connections,
+               bool binary)
 {
-       /* size + data row + \n */
-       int msgLen = 4 + len + 1;
+       /* size + data row + \n in CSV mode */
+       int msgLen = 4 + len + (binary ? 0 : 1);
        int nLen = htonl(msgLen);
        int i;
 
@@ -2928,7 +2930,8 @@ DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_co
                        handle->outEnd += 4;
                        memcpy(handle->outBuffer + handle->outEnd, data_row, len);
                        handle->outEnd += len;
-                       handle->outBuffer[handle->outEnd++] = '\n';
+                       if (!binary)
+                               handle->outBuffer[handle->outEnd++] = '\n';
 
                        handle->in_extended_query = false;
                }
@@ -3699,7 +3702,7 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count,
                                                                          PGXCNodeHandle** connections)
 {
        int             i;
-       int msgLen = 4 + len + 1;
+       int msgLen = 4 + len;
        int nLen = htonl(msgLen);
 
        for (i = 0; i < conn_count; i++)
@@ -3720,7 +3723,6 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count,
                        handle->outEnd += 4;
                        memcpy(handle->outBuffer + handle->outEnd, msg_buf, len);
                        handle->outEnd += len;
-                       handle->outBuffer[handle->outEnd++] = '\n';
                }
                else
                {
index eda1ac4e99b1662f5a0e0bb52b976e9db539c646..45ad90738ddd1c763cb76117304f703ad68cfdd4 100644 (file)
@@ -228,7 +228,8 @@ typedef void (*xact_callback) (bool isCommit, void *args);
 /* Copy command just involves Datanodes */
 extern void DataNodeCopyBegin(RemoteCopyData *rcstate);
 extern int DataNodeCopyIn(char *data_row, int len, int conn_count,
-                                                 PGXCNodeHandle** copy_connections);
+                                                 PGXCNodeHandle** copy_connections,
+                                                 bool binary);
 extern uint64 DataNodeCopyOut(PGXCNodeHandle** copy_connections,
                                                          int conn_count, FILE* copy_file);
 extern uint64 DataNodeCopyStore(PGXCNodeHandle** copy_connections,