From 21ebd1ac209924d00be95fa75f40989ca74cc688 Mon Sep 17 00:00:00 2001 From: Aleksei Antipovskii Date: Wed, 5 Mar 2025 13:56:37 +0100 Subject: [PATCH] feat(bytestream): serialize long strings in the common way --- utils/messageqcpp/bytestream.cpp | 41 ------------------ utils/messageqcpp/bytestream.h | 15 ------- utils/messageqcpp/inetstreamsocket.cpp | 60 ++------------------------ utils/rowgroup/rowgroup.cpp | 23 ++++++++-- 4 files changed, 22 insertions(+), 117 deletions(-) diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index 2546cde09..37243c567 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -58,8 +58,6 @@ void ByteStream::doCopy(const ByteStream& rhs) memcpy(fBuf + ISSOverhead, rhs.fCurOutPtr, rlen); fCurInPtr = fBuf + ISSOverhead + rlen; fCurOutPtr = fBuf + ISSOverhead; - // Copy `longStrings` as well. - longStrings = rhs.longStrings; } ByteStream::ByteStream(const ByteStream& rhs) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0) @@ -86,8 +84,6 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs) deallocate(fBuf); fBuf = fCurInPtr = fCurOutPtr = 0; fMaxLen = 0; - // Clear `longStrings`. - longStrings.clear(); } } @@ -180,21 +176,6 @@ void ByteStream::growBuf(BSSizeType toSize) } } -std::vector& ByteStream::getLongStrings() -{ - return longStrings; -} - -const std::vector& ByteStream::getLongStrings() const -{ - return longStrings; -} - -void ByteStream::setLongStrings(const std::vector& other) -{ - longStrings = other; -} - ByteStream& ByteStream::operator<<(const int8_t b) { if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(b) > fMaxLen + ISSOverhead)) @@ -601,7 +582,6 @@ void ByteStream::swap(ByteStream& rhs) std::swap(fCurInPtr, rhs.fCurInPtr); std::swap(fCurOutPtr, rhs.fCurOutPtr); std::swap(fMaxLen, rhs.fMaxLen); - std::swap(longStrings, rhs.longStrings); std::swap(allocator, rhs.allocator); } @@ -625,27 +605,6 @@ bool ByteStream::operator==(const ByteStream& b) const if (memcmp(fCurOutPtr, b.fCurOutPtr, length()) != 0) return false; - // Check the `longString` sizes. - if (longStrings.size() != b.longStrings.size()) - return false; - - // For each `longString`. - for (uint32_t i = 0, e = b.longStrings.size(); i < e; ++i) - { - const auto* leftMemChunk = reinterpret_cast(longStrings[i].get()); - const auto* rightMemChunk = reinterpret_cast(b.longStrings[i].get()); - if (leftMemChunk == nullptr || rightMemChunk == nullptr) - return false; - - const uint32_t leftSize = leftMemChunk->currentSize; - const uint32_t rightSize = rightMemChunk->currentSize; - if (leftSize != rightSize) - return false; - - if (memcmp(leftMemChunk->data, rightMemChunk->data, leftSize) != 0) - return false; - } - return true; } diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index d3db76988..ccc359d8e 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -448,11 +448,6 @@ class ByteStream : public Serializeable EXPORT static const BSSizeType ISSOverhead = 3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings. - // Methods to get and set `long strings`. - EXPORT std::vector& getLongStrings(); - EXPORT const std::vector& getLongStrings() const; - EXPORT void setLongStrings(const std::vector& other); - friend class ::ByteStreamTestSuite; protected: @@ -473,20 +468,10 @@ class ByteStream : public Serializeable BSBufType* allocate(const size_t size); void deallocate(BSBufType* ptr); - // Put struct `MemChunk` declaration here, to avoid circular dependency. - struct MemChunk - { - uint32_t currentSize; - uint32_t capacity; - uint8_t data[]; - }; - BSBufType* fBuf; /// the start of the allocated buffer BSBufType* fCurInPtr; // the point in fBuf where data is inserted next BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next BSSizeType fMaxLen; // how big fBuf is currently - // Stores `long strings`. - std::vector longStrings; std::optional> allocator = {}; }; diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 08bfaefd2..5bdae0c1b 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -496,11 +496,12 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO msecs)) return SBS(new ByteStream(0U)); - // Read the number of the `long strings`. + // Read the number of the `long strings` that are deprecated, so it should be 0 uint32_t longStringSize; if (!readFixedSizeData(pfd, reinterpret_cast(&longStringSize), sizeof(longStringSize), timeout, isTimeOut, stats, msecs)) return SBS(new ByteStream(0U)); + idbassert(longStringSize == 0); // Read the actual data of the `ByteStream`. SBS res(new ByteStream(msglen)); @@ -508,50 +509,6 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO return SBS(new ByteStream(0U)); res->advanceInputPtr(msglen); - std::vector longStrings; - try - { - for (uint32_t i = 0; i < longStringSize; ++i) - { - // Read `MemChunk`. - rowgroup::StringStore::MemChunk memChunk; - if (!readFixedSizeData(pfd, reinterpret_cast(&memChunk), - sizeof(rowgroup::StringStore::MemChunk), timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0U)); - - // Allocate new memory for the `long string`. - // TODO account this allocation also despite the fact BS allocations are insignificant - // compared with structs used by SQL operators. - rowgroup::StringStoreBufSPType longString( - new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]); - - uint8_t* longStringData = longString.get(); - // Initialize memchunk with `current size` and `capacity`. - auto* memChunkPointer = reinterpret_cast(longStringData); - memChunkPointer->currentSize = memChunk.currentSize; - memChunkPointer->capacity = memChunk.capacity; - - // Read the `long string`. - if (!readFixedSizeData(pfd, memChunkPointer->data, memChunkPointer->currentSize, timeout, isTimeOut, - stats, msecs)) - return SBS(new ByteStream(0U)); - - longStrings.push_back(longString); - } - } - catch (std::bad_alloc& exception) - { - logIoError("InetStreamSocket::read: error during read for 'long strings' - 'bad_alloc'", 0); - return SBS(new ByteStream(0U)); - } - catch (std::exception& exception) - { - std::string errorMsg = "InetStreamSocket::read: error during read for 'long strings' "; - errorMsg += exception.what(); - throw runtime_error(errorMsg); - } - - res->setLongStrings(longStrings); return res; } @@ -577,30 +534,19 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat if (msglen == 0) return; - const auto& longStrings = msg.getLongStrings(); /* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there are at least 12 bytes before that for the magic & length fields */ realBuf = (uint32_t*)msg.buf(); realBuf -= 3; realBuf[0] = magic; realBuf[1] = msglen; - realBuf[2] = longStrings.size(); + realBuf[2] = 0; try { auto bytesToWrite = sizeof(msglen) + sizeof(magic) + sizeof(uint32_t) + msglen; written(fSocketParms.sd(), (const uint8_t*)realBuf, bytesToWrite); - for (const auto& longString : longStrings) - { - const rowgroup::StringStore::MemChunk* memChunk = - reinterpret_cast(longString.get()); - const auto writeSize = memChunk->currentSize + sizeof(rowgroup::StringStore::MemChunk); - written(fSocketParms.sd(), (const uint8_t*)longString.get(), writeSize); - // For stats. - bytesToWrite += writeSize; - } - if (stats) stats->dataSent(bytesToWrite); } diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 936fdd59d..dc5e5790b 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -157,7 +157,14 @@ void StringStore::serialize(ByteStream& bs) const bs.append(mc->data, mc->currentSize); } - bs.setLongStrings(longStrings); + bs << (uint8_t)useOnlyLongStrings(); + RGDataSizeType sz = longStrings.size(); + bs << sz; + for (const auto& ls : longStrings) + { + mc = reinterpret_cast(ls.get()); + bs.append(ls.get(), mc->currentSize + sizeof(*mc)); + } } void StringStore::deserialize(ByteStream& bs) @@ -194,9 +201,17 @@ void StringStore::deserialize(ByteStream& bs) memcpy(mc->data, buf, size); bs.advance(size); } - - longStrings = bs.getLongStrings(); - return; + bs >> tmp8; + useOnlyLongStrings(tmp8); + bs >> size; + longStrings.resize(size); + for (i = 0; i < size; i++) + { + mc = reinterpret_cast(bs.buf()); + longStrings[i].reset(new uint8_t[mc->currentSize + sizeof(*mc)]); + memcpy(longStrings[i].get(), bs.buf(), mc->currentSize + sizeof(*mc)); + bs.advance(mc->currentSize + sizeof(*mc)); + } } void StringStore::clear()