1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-05 16:15:50 +03:00

Change posixtask::read to return int instead of bool and handle when data read is short of expected message length.

This commit is contained in:
benthompson15
2019-09-16 16:26:42 -05:00
parent 99399d289f
commit 021009fe99
10 changed files with 27 additions and 23 deletions

View File

@@ -35,7 +35,7 @@ AppendTask::~AppendTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \
@@ -46,7 +46,7 @@ AppendTask::~AppendTask()
bool AppendTask::run() bool AppendTask::run()
{ {
SMLogging* logger = SMLogging::get(); SMLogging* logger = SMLogging::get();
bool success; int success;
uint8_t cmdbuf[1024] = {0}; uint8_t cmdbuf[1024] = {0};
ssize_t err; ssize_t err;
@@ -77,14 +77,16 @@ bool AppendTask::run()
uint toRead = min(cmd->count - readCount, bufsize); uint toRead = min(cmd->count - readCount, bufsize);
success = read(&databuf[0], toRead); success = read(&databuf[0], toRead);
check_error("AppendTask read data", false); check_error("AppendTask read data", false);
readCount += toRead; if (success==0)
break;
readCount += success;
uint writePos = 0; uint writePos = 0;
while (writeCount < readCount) while (writeCount < readCount)
{ {
try try
{ {
err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos); err = ioc->append(cmd->filename, &databuf[writePos], success - writePos);
} }
catch (exception &e) catch (exception &e)
{ {
@@ -112,8 +114,7 @@ bool AppendTask::run()
} }
else else
resp->returnCode = writeCount; resp->returnCode = writeCount;
success = write(*resp, payloadLen); return write(*resp, payloadLen);
return success;
} }
} }

View File

@@ -35,7 +35,7 @@ CopyTask::~CopyTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -37,7 +37,7 @@ ListDirectoryTask::~ListDirectoryTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -75,18 +75,20 @@ uint PosixTask::getLength()
} }
// todo, need this to return an int instead of a bool b/c it modifies the length of the read // todo, need this to return an int instead of a bool b/c it modifies the length of the read
bool PosixTask::read(uint8_t *buf, uint length) int PosixTask::read(uint8_t *buf, uint length)
{ {
if (length > remainingLengthForCaller) if (length > remainingLengthForCaller)
length = remainingLengthForCaller; length = remainingLengthForCaller;
if (length == 0) if (length == 0)
return false; return 0;
uint count = 0; uint count = 0;
int err; int err;
// copy data from the local buffer first. // copy data from the local buffer first.
uint dataInBuffer = bufferLen - bufferPos; uint dataInBuffer = bufferLen - bufferPos;
if (length <= dataInBuffer) if (length <= dataInBuffer)
{ {
memcpy(buf, &localBuffer[bufferPos], length); memcpy(buf, &localBuffer[bufferPos], length);
@@ -109,7 +111,7 @@ bool PosixTask::read(uint8_t *buf, uint length)
{ {
err = ::recv(sock, &buf[count], length - count, 0); err = ::recv(sock, &buf[count], length - count, 0);
if (err < 0) if (err < 0)
return false; return err;
count += err; count += err;
remainingLengthInStream -= err; remainingLengthInStream -= err;
@@ -119,7 +121,7 @@ bool PosixTask::read(uint8_t *buf, uint length)
/* The caller's request has been satisfied here. If there is remaining data in the stream /* The caller's request has been satisfied here. If there is remaining data in the stream
get what's available. */ get what's available. */
primeBuffer(); primeBuffer();
return true; return count;
} }
void PosixTask::primeBuffer() void PosixTask::primeBuffer()

View File

@@ -41,7 +41,7 @@ class PosixTask
void primeBuffer(); void primeBuffer();
protected: protected:
bool read(uint8_t *buf, uint length); int read(uint8_t *buf, uint length);
bool write(const std::vector<uint8_t> &buf); bool write(const std::vector<uint8_t> &buf);
bool write(sm_response &resp, uint payloadLength); bool write(sm_response &resp, uint payloadLength);
bool write(const uint8_t *buf, uint length); bool write(const uint8_t *buf, uint length);

View File

@@ -35,7 +35,7 @@ ReadTask::~ReadTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -39,7 +39,7 @@ StatTask::~StatTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -35,7 +35,7 @@ TruncateTask::~TruncateTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -35,7 +35,7 @@ UnlinkTask::~UnlinkTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@@ -36,7 +36,7 @@ WriteTask::~WriteTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \
@@ -47,7 +47,7 @@ WriteTask::~WriteTask()
bool WriteTask::run() bool WriteTask::run()
{ {
SMLogging* logger = SMLogging::get(); SMLogging* logger = SMLogging::get();
bool success; int success;
uint8_t cmdbuf[1024] = {0}; uint8_t cmdbuf[1024] = {0};
success = read(cmdbuf, sizeof(write_cmd)); success = read(cmdbuf, sizeof(write_cmd));
@@ -77,14 +77,16 @@ bool WriteTask::run()
uint toRead = min(cmd->count - readCount, bufsize); uint toRead = min(cmd->count - readCount, bufsize);
success = read(&databuf[0], toRead); success = read(&databuf[0], toRead);
check_error("WriteTask read data", false); check_error("WriteTask read data", false);
readCount += toRead; if (success==0)
break;
readCount += success;
uint writePos = 0; uint writePos = 0;
ssize_t err; ssize_t err;
while (writeCount < readCount) while (writeCount < readCount)
{ {
try try
{ {
err = ioc->write(cmd->filename, &databuf[writePos], cmd->offset + writeCount, toRead - writePos); err = ioc->write(cmd->filename, &databuf[writePos], cmd->offset + writeCount, success - writePos);
} }
catch (exception &e) catch (exception &e)
{ {
@@ -113,8 +115,7 @@ bool WriteTask::run()
} }
else else
resp->returnCode = writeCount; resp->returnCode = writeCount;
success = write(*resp, payloadLen); return write(*resp, payloadLen);
return success;
} }
} }