1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Phase 1 of cleaning up the messaging code.

This commit is contained in:
Patrick LeBlanc
2019-02-11 18:20:11 -06:00
parent d1297f31a2
commit a9ce25c4f6
3 changed files with 16 additions and 18 deletions

View File

@ -94,6 +94,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
storagemanager::sm_msg_header hdr; storagemanager::sm_msg_header hdr;
hdr.type = storagemanager::SM_MSG_START; hdr.type = storagemanager::SM_MSG_START;
hdr.payloadLen = length; hdr.payloadLen = length;
hdr.flags = 0;
//cout << "SP sending msg on sock " << sock << " with length = " << length << endl; //cout << "SP sending msg on sock " << sock << " with length = " << length << endl;
err = ::write(sock, &hdr, sizeof(hdr)); err = ::write(sock, &hdr, sizeof(hdr));
sm_check_error; sm_check_error;
@ -109,9 +110,9 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
out->restart(); out->restart();
uint8_t *outbuf; uint8_t *outbuf;
uint8_t window[8192]; uint8_t window[8192];
length = 0;
uint remainingBytes = 0; uint remainingBytes = 0;
uint i; uint i;
storagemanager::sm_msg_header *resp = NULL;
while (1) while (1)
{ {
@ -130,26 +131,24 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
// scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer
// for the next iteration to handle. // for the next iteration to handle.
if (endOfData < 8) if (endOfData < storagemanager::SM_HEADER_LEN)
{ {
remainingBytes = endOfData; remainingBytes = endOfData;
continue; continue;
} }
for (i = 0; i <= endOfData - 8; i++)
for (i = 0; i <= endOfData - storagemanager::SM_HEADER_LEN; i++)
{ {
if (*((uint *) &window[i]) == storagemanager::SM_MSG_START) if (*((uint *) &window[i]) == storagemanager::SM_MSG_START)
{ {
length = *((uint *) &window[i+4]); resp = (storagemanager::sm_msg_header *) &window[i];
break; break;
} }
} }
assert(i == 0); // in testing there shouldn't be any garbage in the stream if (resp == NULL) // didn't find the header yet
if (length == 0) // didn't find the header yet
{ {
// i == endOfData - 7 here
remainingBytes = endOfData - i; remainingBytes = endOfData - i;
memmove(window, &window[i], remainingBytes); memmove(window, &window[i], remainingBytes);
} }
@ -157,14 +156,14 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
{ {
// i == first byte of the header here // i == first byte of the header here
// copy the payload fragment we got into the output bytestream // copy the payload fragment we got into the output bytestream
uint startOfPayload = i + 8; // for clarity uint startOfPayload = i + storagemanager::SM_HEADER_LEN; // for clarity
out->needAtLeast(length); out->needAtLeast(resp->payloadLen);
outbuf = out->getInputPtr(); outbuf = out->getInputPtr();
memcpy(outbuf, &window[startOfPayload], endOfData - startOfPayload); if (resp->payloadLen < endOfData - startOfPayload)
if (length < endOfData - startOfPayload) cout << "SocketPool: warning! Probably got a bad length field! payload length = " << resp->payloadLen <<
cout << "SocketPool: warning! Probably got a bad length field! payload length = " << length <<
" endOfData = " << endOfData << " startOfPayload = " << startOfPayload << endl; " endOfData = " << endOfData << " startOfPayload = " << startOfPayload << endl;
remainingBytes = length - (endOfData - startOfPayload); // remainingBytes is now the # of bytes left to read memcpy(outbuf, &window[startOfPayload], endOfData - startOfPayload);
remainingBytes = resp->payloadLen - (endOfData - startOfPayload); // remainingBytes is now the # of bytes left to read
out->advanceInputPtr(endOfData - startOfPayload); out->advanceInputPtr(endOfData - startOfPayload);
break; // done looking for the header, can fill the output buffer directly now. break; // done looking for the header, can fill the output buffer directly now.
} }
@ -173,7 +172,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
// read the rest of the payload directly into the output bytestream // read the rest of the payload directly into the output bytestream
while (remainingBytes > 0) while (remainingBytes > 0)
{ {
err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes); err = ::read(sock, &outbuf[resp->payloadLen - remainingBytes], remainingBytes);
sm_check_error; sm_check_error;
remainingBytes -= err; remainingBytes -= err;
out->advanceInputPtr(err); out->advanceInputPtr(err);

View File

@ -38,7 +38,7 @@ class SocketPool : public boost::noncopyable
// 0 = success, -1 = failure. Should this throw instead? // 0 = success, -1 = failure. Should this throw instead?
int send_recv(messageqcpp::ByteStream &to_send, messageqcpp::ByteStream *to_recv); int send_recv(messageqcpp::ByteStream &to_send, messageqcpp::ByteStream *to_recv);
private: private:
int getSocket(); int getSocket();
void returnSocket(const int sock); void returnSocket(const int sock);
@ -50,7 +50,6 @@ class SocketPool : public boost::noncopyable
boost::condition_variable socketAvailable; boost::condition_variable socketAvailable;
uint maxSockets; uint maxSockets;
static const uint defaultSockets = 20; static const uint defaultSockets = 20;
}; };
} }