break;
}
}
-#ifdef PGXC
- /* A PGXC Datanode does not need to read the header data received from Coordinator */
- if (IS_PGXC_DATANODE &&
- cstate->binary &&
- cstate->fe_msgbuf->data[cstate->fe_msgbuf->len-1] == '\n')
- cstate->fe_msgbuf->len--;
-#endif
avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
if (avail > maxread)
avail = maxread;
if (DataNodeCopyIn(cstate->line_buf.data,
cstate->line_buf.len,
GET_NODES(rcstate->locator, value, isnull, NULL),
- (PGXCNodeHandle**) getLocatorResults(rcstate->locator)))
+ (PGXCNodeHandle**) getLocatorResults(rcstate->locator),
+ cstate->binary))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
if (DataNodeCopyIn(cstate->line_buf.data,
cstate->line_buf.len,
getLocatorNodeCount(rcstate->locator),
- (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator)))
+ (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator),
+ cstate->binary))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
if (DataNodeCopyIn(data, len,
GET_NODES(copyState->locator, value, is_null, NULL),
- (PGXCNodeHandle**) getLocatorResults(copyState->locator)))
+ (PGXCNodeHandle**)
+ getLocatorResults(copyState->locator),
+ false))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
* Send a data row to the specified nodes
*/
int
-DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_connections)
+DataNodeCopyIn(char *data_row, int len,
+ int conn_count, PGXCNodeHandle** copy_connections,
+ bool binary)
{
- /* size + data row + \n */
- int msgLen = 4 + len + 1;
+ /* size + data row + \n in CSV mode */
+ int msgLen = 4 + len + (binary ? 0 : 1);
int nLen = htonl(msgLen);
int i;
handle->outEnd += 4;
memcpy(handle->outBuffer + handle->outEnd, data_row, len);
handle->outEnd += len;
- handle->outBuffer[handle->outEnd++] = '\n';
+ if (!binary)
+ handle->outBuffer[handle->outEnd++] = '\n';
handle->in_extended_query = false;
}
PGXCNodeHandle** connections)
{
int i;
- int msgLen = 4 + len + 1;
+ int msgLen = 4 + len;
int nLen = htonl(msgLen);
for (i = 0; i < conn_count; i++)
handle->outEnd += 4;
memcpy(handle->outBuffer + handle->outEnd, msg_buf, len);
handle->outEnd += len;
- handle->outBuffer[handle->outEnd++] = '\n';
}
else
{
/* Copy command just involves Datanodes */
extern void DataNodeCopyBegin(RemoteCopyData *rcstate);
extern int DataNodeCopyIn(char *data_row, int len, int conn_count,
- PGXCNodeHandle** copy_connections);
+ PGXCNodeHandle** copy_connections,
+ bool binary);
extern uint64 DataNodeCopyOut(PGXCNodeHandle** copy_connections,
int conn_count, FILE* copy_file);
extern uint64 DataNodeCopyStore(PGXCNodeHandle** copy_connections,