You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
feat(bytestream): serialize long strings in the common way
This commit is contained in:
committed by
Alexey Antipovsky
parent
4bea7e59a0
commit
21ebd1ac20
@ -58,8 +58,6 @@ void ByteStream::doCopy(const ByteStream& rhs)
|
||||
memcpy(fBuf + ISSOverhead, rhs.fCurOutPtr, rlen);
|
||||
fCurInPtr = fBuf + ISSOverhead + rlen;
|
||||
fCurOutPtr = fBuf + ISSOverhead;
|
||||
// Copy `longStrings` as well.
|
||||
longStrings = rhs.longStrings;
|
||||
}
|
||||
|
||||
ByteStream::ByteStream(const ByteStream& rhs) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0)
|
||||
@ -86,8 +84,6 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs)
|
||||
deallocate(fBuf);
|
||||
fBuf = fCurInPtr = fCurOutPtr = 0;
|
||||
fMaxLen = 0;
|
||||
// Clear `longStrings`.
|
||||
longStrings.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@ -180,21 +176,6 @@ void ByteStream::growBuf(BSSizeType toSize)
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings()
|
||||
{
|
||||
return longStrings;
|
||||
}
|
||||
|
||||
const std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings() const
|
||||
{
|
||||
return longStrings;
|
||||
}
|
||||
|
||||
void ByteStream::setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other)
|
||||
{
|
||||
longStrings = other;
|
||||
}
|
||||
|
||||
ByteStream& ByteStream::operator<<(const int8_t b)
|
||||
{
|
||||
if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(b) > fMaxLen + ISSOverhead))
|
||||
@ -601,7 +582,6 @@ void ByteStream::swap(ByteStream& rhs)
|
||||
std::swap(fCurInPtr, rhs.fCurInPtr);
|
||||
std::swap(fCurOutPtr, rhs.fCurOutPtr);
|
||||
std::swap(fMaxLen, rhs.fMaxLen);
|
||||
std::swap(longStrings, rhs.longStrings);
|
||||
std::swap(allocator, rhs.allocator);
|
||||
}
|
||||
|
||||
@ -625,27 +605,6 @@ bool ByteStream::operator==(const ByteStream& b) const
|
||||
if (memcmp(fCurOutPtr, b.fCurOutPtr, length()) != 0)
|
||||
return false;
|
||||
|
||||
// Check the `longString` sizes.
|
||||
if (longStrings.size() != b.longStrings.size())
|
||||
return false;
|
||||
|
||||
// For each `longString`.
|
||||
for (uint32_t i = 0, e = b.longStrings.size(); i < e; ++i)
|
||||
{
|
||||
const auto* leftMemChunk = reinterpret_cast<MemChunk*>(longStrings[i].get());
|
||||
const auto* rightMemChunk = reinterpret_cast<MemChunk*>(b.longStrings[i].get());
|
||||
if (leftMemChunk == nullptr || rightMemChunk == nullptr)
|
||||
return false;
|
||||
|
||||
const uint32_t leftSize = leftMemChunk->currentSize;
|
||||
const uint32_t rightSize = rightMemChunk->currentSize;
|
||||
if (leftSize != rightSize)
|
||||
return false;
|
||||
|
||||
if (memcmp(leftMemChunk->data, rightMemChunk->data, leftSize) != 0)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -448,11 +448,6 @@ class ByteStream : public Serializeable
|
||||
EXPORT static const BSSizeType ISSOverhead =
|
||||
3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings.
|
||||
|
||||
// Methods to get and set `long strings`.
|
||||
EXPORT std::vector<rowgroup::StringStoreBufSPType>& getLongStrings();
|
||||
EXPORT const std::vector<rowgroup::StringStoreBufSPType>& getLongStrings() const;
|
||||
EXPORT void setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other);
|
||||
|
||||
friend class ::ByteStreamTestSuite;
|
||||
|
||||
protected:
|
||||
@ -473,20 +468,10 @@ class ByteStream : public Serializeable
|
||||
BSBufType* allocate(const size_t size);
|
||||
void deallocate(BSBufType* ptr);
|
||||
|
||||
// Put struct `MemChunk` declaration here, to avoid circular dependency.
|
||||
struct MemChunk
|
||||
{
|
||||
uint32_t currentSize;
|
||||
uint32_t capacity;
|
||||
uint8_t data[];
|
||||
};
|
||||
|
||||
BSBufType* fBuf; /// the start of the allocated buffer
|
||||
BSBufType* fCurInPtr; // the point in fBuf where data is inserted next
|
||||
BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next
|
||||
BSSizeType fMaxLen; // how big fBuf is currently
|
||||
// Stores `long strings`.
|
||||
std::vector<rowgroup::StringStoreBufSPType> longStrings;
|
||||
std::optional<allocators::CountingAllocator<BSBufType>> allocator = {};
|
||||
};
|
||||
|
||||
|
@ -496,11 +496,12 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
|
||||
msecs))
|
||||
return SBS(new ByteStream(0U));
|
||||
|
||||
// Read the number of the `long strings`.
|
||||
// Read the number of the `long strings` that are deprecated, so it should be 0
|
||||
uint32_t longStringSize;
|
||||
if (!readFixedSizeData(pfd, reinterpret_cast<uint8_t*>(&longStringSize), sizeof(longStringSize), timeout,
|
||||
isTimeOut, stats, msecs))
|
||||
return SBS(new ByteStream(0U));
|
||||
idbassert(longStringSize == 0);
|
||||
|
||||
// Read the actual data of the `ByteStream`.
|
||||
SBS res(new ByteStream(msglen));
|
||||
@ -508,50 +509,6 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
|
||||
return SBS(new ByteStream(0U));
|
||||
res->advanceInputPtr(msglen);
|
||||
|
||||
std::vector<rowgroup::StringStoreBufSPType> longStrings;
|
||||
try
|
||||
{
|
||||
for (uint32_t i = 0; i < longStringSize; ++i)
|
||||
{
|
||||
// Read `MemChunk`.
|
||||
rowgroup::StringStore::MemChunk memChunk;
|
||||
if (!readFixedSizeData(pfd, reinterpret_cast<uint8_t*>(&memChunk),
|
||||
sizeof(rowgroup::StringStore::MemChunk), timeout, isTimeOut, stats, msecs))
|
||||
return SBS(new ByteStream(0U));
|
||||
|
||||
// Allocate new memory for the `long string`.
|
||||
// TODO account this allocation also despite the fact BS allocations are insignificant
|
||||
// compared with structs used by SQL operators.
|
||||
rowgroup::StringStoreBufSPType longString(
|
||||
new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]);
|
||||
|
||||
uint8_t* longStringData = longString.get();
|
||||
// Initialize memchunk with `current size` and `capacity`.
|
||||
auto* memChunkPointer = reinterpret_cast<rowgroup::StringStore::MemChunk*>(longStringData);
|
||||
memChunkPointer->currentSize = memChunk.currentSize;
|
||||
memChunkPointer->capacity = memChunk.capacity;
|
||||
|
||||
// Read the `long string`.
|
||||
if (!readFixedSizeData(pfd, memChunkPointer->data, memChunkPointer->currentSize, timeout, isTimeOut,
|
||||
stats, msecs))
|
||||
return SBS(new ByteStream(0U));
|
||||
|
||||
longStrings.push_back(longString);
|
||||
}
|
||||
}
|
||||
catch (std::bad_alloc& exception)
|
||||
{
|
||||
logIoError("InetStreamSocket::read: error during read for 'long strings' - 'bad_alloc'", 0);
|
||||
return SBS(new ByteStream(0U));
|
||||
}
|
||||
catch (std::exception& exception)
|
||||
{
|
||||
std::string errorMsg = "InetStreamSocket::read: error during read for 'long strings' ";
|
||||
errorMsg += exception.what();
|
||||
throw runtime_error(errorMsg);
|
||||
}
|
||||
|
||||
res->setLongStrings(longStrings);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -577,30 +534,19 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat
|
||||
if (msglen == 0)
|
||||
return;
|
||||
|
||||
const auto& longStrings = msg.getLongStrings();
|
||||
/* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there
|
||||
are at least 12 bytes before that for the magic & length fields */
|
||||
realBuf = (uint32_t*)msg.buf();
|
||||
realBuf -= 3;
|
||||
realBuf[0] = magic;
|
||||
realBuf[1] = msglen;
|
||||
realBuf[2] = longStrings.size();
|
||||
realBuf[2] = 0;
|
||||
|
||||
try
|
||||
{
|
||||
auto bytesToWrite = sizeof(msglen) + sizeof(magic) + sizeof(uint32_t) + msglen;
|
||||
written(fSocketParms.sd(), (const uint8_t*)realBuf, bytesToWrite);
|
||||
|
||||
for (const auto& longString : longStrings)
|
||||
{
|
||||
const rowgroup::StringStore::MemChunk* memChunk =
|
||||
reinterpret_cast<rowgroup::StringStore::MemChunk*>(longString.get());
|
||||
const auto writeSize = memChunk->currentSize + sizeof(rowgroup::StringStore::MemChunk);
|
||||
written(fSocketParms.sd(), (const uint8_t*)longString.get(), writeSize);
|
||||
// For stats.
|
||||
bytesToWrite += writeSize;
|
||||
}
|
||||
|
||||
if (stats)
|
||||
stats->dataSent(bytesToWrite);
|
||||
}
|
||||
|
Reference in New Issue
Block a user