From a9ce25c4f68adf60ecdeb709a5839d7cbcc43d82 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 11 Feb 2019 18:20:11 -0600 Subject: [PATCH] Phase 1 of cleaning up the messaging code. --- utils/cloudio/SocketPool.cpp | 29 ++++++++++++++--------------- utils/cloudio/SocketPool.h | 3 +-- utils/cloudio/storage-manager | 2 +- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index fd72a5e85..349350bb6 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -94,6 +94,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * storagemanager::sm_msg_header hdr; hdr.type = storagemanager::SM_MSG_START; hdr.payloadLen = length; + hdr.flags = 0; //cout << "SP sending msg on sock " << sock << " with length = " << length << endl; err = ::write(sock, &hdr, sizeof(hdr)); sm_check_error; @@ -109,9 +110,9 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * out->restart(); uint8_t *outbuf; uint8_t window[8192]; - length = 0; uint remainingBytes = 0; uint i; + storagemanager::sm_msg_header *resp = NULL; while (1) { @@ -130,26 +131,24 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer // for the next iteration to handle. - if (endOfData < 8) + if (endOfData < storagemanager::SM_HEADER_LEN) { remainingBytes = endOfData; continue; } - for (i = 0; i <= endOfData - 8; i++) + + for (i = 0; i <= endOfData - storagemanager::SM_HEADER_LEN; i++) { if (*((uint *) &window[i]) == storagemanager::SM_MSG_START) { - length = *((uint *) &window[i+4]); + resp = (storagemanager::sm_msg_header *) &window[i]; break; } } - assert(i == 0); // in testing there shouldn't be any garbage in the stream - - if (length == 0) // didn't find the header yet + if (resp == NULL) // didn't find the header yet { - // i == endOfData - 7 here remainingBytes = endOfData - i; memmove(window, &window[i], remainingBytes); } @@ -157,14 +156,14 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * { // i == first byte of the header here // copy the payload fragment we got into the output bytestream - uint startOfPayload = i + 8; // for clarity - out->needAtLeast(length); + uint startOfPayload = i + storagemanager::SM_HEADER_LEN; // for clarity + out->needAtLeast(resp->payloadLen); outbuf = out->getInputPtr(); - memcpy(outbuf, &window[startOfPayload], endOfData - startOfPayload); - if (length < endOfData - startOfPayload) - cout << "SocketPool: warning! Probably got a bad length field! payload length = " << length << + if (resp->payloadLen < endOfData - startOfPayload) + cout << "SocketPool: warning! Probably got a bad length field! payload length = " << resp->payloadLen << " endOfData = " << endOfData << " startOfPayload = " << startOfPayload << endl; - remainingBytes = length - (endOfData - startOfPayload); // remainingBytes is now the # of bytes left to read + memcpy(outbuf, &window[startOfPayload], endOfData - startOfPayload); + remainingBytes = resp->payloadLen - (endOfData - startOfPayload); // remainingBytes is now the # of bytes left to read out->advanceInputPtr(endOfData - startOfPayload); break; // done looking for the header, can fill the output buffer directly now. } @@ -173,7 +172,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * // read the rest of the payload directly into the output bytestream while (remainingBytes > 0) { - err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes); + err = ::read(sock, &outbuf[resp->payloadLen - remainingBytes], remainingBytes); sm_check_error; remainingBytes -= err; out->advanceInputPtr(err); diff --git a/utils/cloudio/SocketPool.h b/utils/cloudio/SocketPool.h index cc7dbaa92..fcaf51d00 100644 --- a/utils/cloudio/SocketPool.h +++ b/utils/cloudio/SocketPool.h @@ -38,7 +38,7 @@ class SocketPool : public boost::noncopyable // 0 = success, -1 = failure. Should this throw instead? int send_recv(messageqcpp::ByteStream &to_send, messageqcpp::ByteStream *to_recv); - + private: int getSocket(); void returnSocket(const int sock); @@ -50,7 +50,6 @@ class SocketPool : public boost::noncopyable boost::condition_variable socketAvailable; uint maxSockets; static const uint defaultSockets = 20; - }; } diff --git a/utils/cloudio/storage-manager b/utils/cloudio/storage-manager index 5e1d67f51..b78cac755 160000 --- a/utils/cloudio/storage-manager +++ b/utils/cloudio/storage-manager @@ -1 +1 @@ -Subproject commit 5e1d67f51ffd537ba0d20e781cc1614f7890d513 +Subproject commit b78cac755bcebb0b67285bf2e2ecf9257b8f2cc8