diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index bef473b1c..ce2679738 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -59,6 +59,8 @@ void ByteStream::doCopy(const ByteStream& rhs) memcpy(fBuf + ISSOverhead, rhs.fCurOutPtr, rlen); fCurInPtr = fBuf + ISSOverhead + rlen; fCurOutPtr = fBuf + ISSOverhead; + // Copy `longStrings` as well. + longStrings = rhs.longStrings; } ByteStream::ByteStream(const ByteStream& rhs) : @@ -87,6 +89,8 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs) delete [] fBuf; fBuf = fCurInPtr = fCurOutPtr = 0; fMaxLen = 0; + // Clear `longStrings`. + longStrings.clear(); } } @@ -152,6 +156,18 @@ void ByteStream::growBuf(uint32_t toSize) } } +std::vector>& ByteStream::getLongStrings() { return longStrings; } + +const std::vector>& ByteStream::getLongStrings() const +{ + return longStrings; +} + +void ByteStream::setLongStrings(const std::vector>& other) +{ + longStrings = other; +} + ByteStream& ByteStream::operator<<(const int8_t b) { if (fBuf == 0 || (fCurInPtr - fBuf + 1U > fMaxLen + ISSOverhead)) diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index 877d6f457..ca6b03103 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -445,7 +446,13 @@ public: EXPORT static const uint32_t BlockSize = 4096; /** size of the space we want in front of the data */ - EXPORT static const uint32_t ISSOverhead = 2 * sizeof(uint32_t); //space for the BS magic & length + EXPORT static const uint32_t ISSOverhead = + 3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings. + + // Methods to get and set `long strings`. + EXPORT std::vector>& getLongStrings(); + EXPORT const std::vector>& getLongStrings() const; + EXPORT void setLongStrings(const std::vector>& other); friend class ::ByteStreamTestSuite; @@ -469,6 +476,8 @@ private: uint8_t* fCurInPtr; //the point in fBuf where data is inserted next uint8_t* fCurOutPtr; //the point in fBuf where data is extracted from next uint32_t fMaxLen; //how big fBuf is currently + // Stores `long strings`. + std::vector> longStrings; }; template diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 0e8dd03f3..ab0fd5b06 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -84,6 +84,8 @@ using namespace std; #include using boost::scoped_array; +#include + #define INETSTREAMSOCKET_DLLEXPORT #include "inetstreamsocket.h" #undef INETSTREAMSOCKET_DLLEXPORT @@ -94,6 +96,7 @@ using boost::scoped_array; #include "logger.h" #include "loggingid.h" #include "idbcompress.h" +#include "rowgroup.h" // some static functions namespace @@ -471,43 +474,19 @@ retry: return true; } -const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const +bool InetStreamSocket::readFixedSizeData(struct pollfd* pfd, uint8_t* buffer, + const size_t numberOfBytes, + const struct ::timespec* timeout, bool* isTimeOut, + Stats* stats, int64_t msecs) const { - long msecs = -1; - - struct pollfd pfd[1]; - pfd[0].fd = fSocketParms.sd(); - pfd[0].events = POLLIN; - - if (timeout != 0) - msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000; - - // we need to read the 4-byte message length first. - uint32_t msglen; - uint8_t* msglenp = reinterpret_cast(&msglen); - size_t mlread = 0; - - if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF + size_t bytesRead = 0; + while (bytesRead < numberOfBytes) { - // MCOL-480 The connector calls with timeout in a loop so that - // it can check a killed flag. This means that for a long running query, - // the following fills the warning log. -// if (isTimeOut && *isTimeOut) -// { -// logIoError("InetStreamSocket::read: timeout during readToMagic", 0); -// } - return SBS(new ByteStream(0)); - } - - //FIXME: This seems like a lot of work to read 4 bytes... - while (mlread < sizeof(msglen)) - { - ssize_t t; + ssize_t currentBytesRead; + int err; if (timeout != NULL) { - int err; - pfd[0].revents = 0; err = poll(pfd, 1, msecs); @@ -525,152 +504,131 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO *isTimeOut = true; logIoError("InetStreamSocket::read: timeout during first poll", 0); - return SBS(new ByteStream(0)); + return false; } } #ifdef _MSC_VER - t = ::recv(fSocketParms.sd(), (char*)(msglenp + mlread), sizeof(msglen) - mlread, 0); + currentBytesRead = + ::recv(fSocketParms.sd(), (char*) (buffer + bytesRead), + std::min(numberOfBytes - bytesRead, reinterpret_cast(MaxSendPacketSize)); + readAmoumt, 0); #else - t = ::read(fSocketParms.sd(), msglenp + mlread, sizeof(msglen) - mlread); + currentBytesRead = ::read(fSocketParms.sd(), buffer + bytesRead, numberOfBytes - bytesRead); #endif - if (t == 0) + if (currentBytesRead == 0) { if (timeout == NULL) { logIoError("InetStreamSocket::read: timeout during first read", 0); - return SBS(new ByteStream(0)); // don't return an incomplete message + return false; } else throw SocketClosed("InetStreamSocket::read: Remote is closed"); } - if (t < 0) + if (currentBytesRead < 0) { - int e = errno; + err = errno; - if (e == EINTR) - { + if (err == EINTR) continue; - } - if (e == KERR_ERESTARTSYS) + if (err == KERR_ERESTARTSYS) { - logIoError("InetStreamSocket::read: I/O error2", e); + logIoError("InetStreamSocket::read: I/O error2", err); continue; } ostringstream oss; - oss << "InetStreamSocket::read: I/O error2: " << - strerror(e); + oss << "InetStreamSocket::read: I/O error2: " << strerror(err); throw runtime_error(oss.str()); } - mlread += t; + bytesRead += currentBytesRead; } if (stats) - stats->dataRecvd(sizeof(msglen)); + stats->dataRecvd(bytesRead); - SBS res(new ByteStream(msglen)); - uint8_t* bufp = res->getInputPtr(); + return true; +} - size_t nread = 0; +const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut, + Stats* stats) const +{ + int64_t msecs = -1; - //Finally read the actual message... - while (nread < msglen) + struct pollfd pfd[1]; + pfd[0].fd = fSocketParms.sd(); + pfd[0].events = POLLIN; + + if (timeout != 0) + msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000; + + if (readToMagic(msecs, isTimeOut, stats) == false) // indicates a timeout or EOF { - ssize_t t; - - if (timeout != NULL) - { - int err; - - pfd[0].revents = 0; - err = poll(pfd, 1, msecs); - - if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL)) - { - ostringstream oss; - oss << "InetStreamSocket::read: I/O error3: " << - strerror(errno); - throw runtime_error(oss.str()); - } - - if (err == 0) // timeout - { - if (isTimeOut) - { - logIoError("InetStreamSocket::read: timeout during second poll", 0); - *isTimeOut = true; - } - - if (stats) - stats->dataRecvd(nread); - - return SBS(new ByteStream(0)); - } - } - -#ifdef _MSC_VER - int readAmount = std::min((int)msglen - (int)nread, MaxSendPacketSize); - t = ::recv(fSocketParms.sd(), (char*)(bufp + nread), readAmount, 0); -#else - t = ::read(fSocketParms.sd(), bufp + nread, msglen - nread); -#endif - - if (t == 0) - { - if (stats) - stats->dataRecvd(nread); - - if (timeout == NULL) - return SBS(new ByteStream(0)); // don't return an incomplete message - else - { - logIoError("InetStreamSocket::read: timeout during second read", 0); - throw SocketClosed("InetStreamSocket::read: Remote is closed"); - } - } - - if (t < 0) - { - ostringstream oss; -#ifdef _MSC_VER - int e = WSAGetLastError(); - oss << "InetStreamSocket::read: I/O error4: WSA: " << e; -#else - int e = errno; - - if (e == EINTR) - { - continue; - } - - if (e == KERR_ERESTARTSYS) - { - logIoError("InetStreamSocket::read: I/O error4", e); - continue; - } - - oss << "InetStreamSocket::read: I/O error4: " << - strerror(e); -#endif - - if (stats) - stats->dataRecvd(nread); - - throw runtime_error(oss.str()); - } - - nread += t; + // MCOL-480 The connector calls with timeout in a loop so that + // it can check a killed flag. This means that for a long running query, + // the following fills the warning log. + // if (isTimeOut && *isTimeOut) + // { + // logIoError("InetStreamSocket::read: timeout during readToMagic", 0); + // } + return SBS(new ByteStream(0)); } - if (stats) - stats->dataRecvd(msglen); + // we need to read the 4-byte message length first. + uint32_t msglen; + if (!readFixedSizeData(pfd, reinterpret_cast(&msglen), sizeof(msglen), timeout, + isTimeOut, stats, msecs)) + return SBS(new ByteStream(0)); + // Read the number of the `long strings`. + uint32_t longStringSize; + if (!readFixedSizeData(pfd, reinterpret_cast(&longStringSize), sizeof(longStringSize), + timeout, isTimeOut, stats, msecs)) + return SBS(new ByteStream(0)); + + // Read the actual data of the `ByteStream`. + SBS res(new ByteStream(msglen)); + if (!readFixedSizeData(pfd, res->getInputPtr(), msglen, timeout, isTimeOut, stats, msecs)) + return SBS(new ByteStream(0)); res->advanceInputPtr(msglen); + + std::vector> longStrings; + longStrings.reserve(longStringSize); + + for (uint32_t i = 0; i < longStringSize; ++i) + { + // Read `MemChunk`. + rowgroup::StringStore::MemChunk memChunk; + if (!readFixedSizeData(pfd, reinterpret_cast(&memChunk), + sizeof(rowgroup::StringStore::MemChunk), timeout, isTimeOut, stats, + msecs)) + return SBS(new ByteStream(0)); + + // Allocate new memory for the `long string`. + boost::shared_array longString( + new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]); + + uint8_t* longStringData = longString.get(); + // Initialize memchunk with `current size` and `capacity`. + auto* memChunkPointer = reinterpret_cast(longStringData); + memChunkPointer->currentSize = memChunk.currentSize; + memChunkPointer->capacity = memChunk.capacity; + + // Read the `long string`. + if (!readFixedSizeData(pfd, memChunkPointer->data, memChunkPointer->currentSize, timeout, + isTimeOut, stats, msecs)) + return SBS(new ByteStream(0)); + + longStrings.push_back(longString); + } + + res->setLongStrings(longStrings); + return res; } @@ -692,16 +650,32 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat if (msglen == 0) return; + const auto& longStrings = msg.getLongStrings(); /* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there - are at least 8 bytes before that for the magic & length fields */ + are at least 12 bytes before that for the magic & length fields */ realBuf = (uint32_t*)msg.buf(); - realBuf -= 2; + realBuf -= 3; realBuf[0] = magic; realBuf[1] = msglen; + realBuf[2] = longStrings.size(); try { - written(fSocketParms.sd(), (const uint8_t*)realBuf, msglen + sizeof(msglen) + sizeof(magic)); + auto bytesToWrite = sizeof(msglen) + sizeof(magic) + sizeof(uint32_t) + msglen; + written(fSocketParms.sd(), (const uint8_t*) realBuf, bytesToWrite); + + for (const auto& longString : longStrings) + { + const rowgroup::StringStore::MemChunk* memChunk = + reinterpret_cast(longString.get()); + const auto writeSize = memChunk->currentSize + sizeof(rowgroup::StringStore::MemChunk); + written(fSocketParms.sd(), (const uint8_t*) longString.get(), writeSize); + // For stats. + bytesToWrite += writeSize; + } + + if (stats) + stats->dataSent(bytesToWrite); } catch (std::exception& ex) { @@ -709,9 +683,6 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat errorMsg += " -- write from " + toString(); throw runtime_error(errorMsg); } - - if (stats) - stats->dataSent(msglen + sizeof(msglen) + sizeof(magic)); } void InetStreamSocket::write(const ByteStream& msg, Stats* stats) diff --git a/utils/messageqcpp/inetstreamsocket.h b/utils/messageqcpp/inetstreamsocket.h index 0851c7720..81538177e 100644 --- a/utils/messageqcpp/inetstreamsocket.h +++ b/utils/messageqcpp/inetstreamsocket.h @@ -237,6 +237,9 @@ protected: void do_write(const ByteStream& msg, uint32_t magic, Stats* stats = NULL) const; ssize_t written(int fd, const uint8_t* ptr, size_t nbytes) const; + bool readFixedSizeData(struct pollfd* pfd, uint8_t* buffer, const size_t numberOfBytes, + const struct ::timespec* timeout, bool* isTimeOut, Stats* stats, + int64_t msec) const; SocketParms fSocketParms; /// The socket parms size_t fBlocksize; diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 60688a327..fb613bc22 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -173,14 +173,7 @@ void StringStore::serialize(ByteStream& bs) const bs.append(mc->data, mc->currentSize); } - bs << (uint64_t) longStrings.size(); - - for (i = 0; i < longStrings.size(); i++) - { - mc = (MemChunk*) longStrings[i].get(); - bs << (uint64_t) mc->currentSize; - bs.append(mc->data, mc->currentSize); - } + bs.setLongStrings(longStrings); } void StringStore::deserialize(ByteStream& bs) @@ -211,20 +204,7 @@ void StringStore::deserialize(ByteStream& bs) bs.advance(size); } - bs >> count; - longStrings.resize(count); - - for (i = 0; i < count; i++) - { - bs >> size; - buf = bs.buf(); - longStrings[i].reset(new uint8_t[size + sizeof(MemChunk)]); - mc = (MemChunk*) longStrings[i].get(); - mc->capacity = mc->currentSize = size; - memcpy(mc->data, buf, size); - bs.advance(size); - } - + longStrings = bs.getLongStrings(); return; } diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index e9ee5e87b..27355082f 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -175,13 +175,6 @@ public: return fUseStoreStringMutex; } -private: - std::string empty_str; - - StringStore(const StringStore&); - StringStore& operator=(const StringStore&); - static const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2 - // This is an overlay b/c the underlying data needs to be any size, // and alloc'd in one chunk. data can't be a separate dynamic chunk. struct MemChunk @@ -191,7 +184,14 @@ private: uint8_t data[]; }; - std::vector > mem; +private: + std::string empty_str; + + StringStore(const StringStore&); + StringStore& operator=(const StringStore&); + static constexpr const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2 + + std::vector> mem; // To store strings > 64KB (BLOB/TEXT) std::vector > longStrings;