1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(bytestream,serdes): BS buffer size type is uint64_t

This necessary to handle 64bit RGData, that comes as
	a separate patch. The pair of patches would allow to
	have PM joins when SmallSide size > 4GB.
This commit is contained in:
drrtuy
2024-08-26 20:29:12 +00:00
committed by Leonid Fedorov
parent dc03621e9d
commit a947f7341c
4 changed files with 1005 additions and 65 deletions

View File

@ -45,6 +45,7 @@ class ByteStreamTestSuite;
namespace messageqcpp
{
typedef boost::shared_ptr<ByteStream> SBS;
using BSSizeType = uint64_t;
/**
* @brief A class to marshall bytes as a stream
@ -76,11 +77,11 @@ class ByteStream : public Serializeable
/**
* default ctor
*/
EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best
EXPORT explicit ByteStream(BSSizeType initSize = 8192); // multiples of pagesize are best
/**
* ctor with a uint8_t array and len initializer
*/
inline ByteStream(const uint8_t* bp, const uint32_t len);
inline ByteStream(const uint8_t* bp, const BSSizeType len);
/**
* copy ctor
*/
@ -337,12 +338,12 @@ class ByteStream : public Serializeable
/**
* load the stream from an array. Clears out any previous data.
*/
EXPORT void load(const uint8_t* bp, uint32_t len);
EXPORT void load(const uint8_t* bp, BSSizeType len);
/**
* append bytes to the end of the stream.
*/
EXPORT void append(const uint8_t* bp, uint32_t len);
EXPORT void append(const uint8_t* bp, BSSizeType len);
/**
* equality check on buffer contents.
@ -378,19 +379,19 @@ class ByteStream : public Serializeable
* advance the output ptr without having to extract bytes
* @warning be careful advancing near 4GB!
*/
inline void advance(uint32_t amt);
inline void advance(BSSizeType amt);
/**
* returns the length of the queue (in bytes)
* @warning do not attempt to make a ByteStream bigger than 4GB!
*/
inline uint32_t length() const;
inline BSSizeType length() const;
inline bool empty() const;
/**
* returns the length of the queue, including header overhead (in bytes)
*/
inline uint32_t lengthWithHdrOverhead() const;
inline BSSizeType lengthWithHdrOverhead() const;
/**
* clears the stream. Releases any current stream and sets all pointers to 0. The state of the object
@ -422,7 +423,7 @@ class ByteStream : public Serializeable
/**
* Get the allocated size of the buffer.
*/
inline uint32_t getBufferSize() const;
inline BSSizeType getBufferSize() const;
/**
* Serializeable interface
@ -437,10 +438,10 @@ class ByteStream : public Serializeable
/**
* memory allocation chunk size
*/
EXPORT static const uint32_t BlockSize = 4096;
EXPORT static const BSSizeType BlockSize = 4096;
/** size of the space we want in front of the data */
EXPORT static const uint32_t ISSOverhead =
EXPORT static const BSSizeType ISSOverhead =
3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings.
// Methods to get and set `long strings`.
@ -458,7 +459,7 @@ class ByteStream : public Serializeable
/**
* adds another BlockSize bytes to the internal buffer
*/
void growBuf(uint32_t toSize = 0);
void growBuf(BSSizeType toSize = 0);
/**
* handles member copying from one ByteStream to another
*/
@ -476,9 +477,8 @@ class ByteStream : public Serializeable
uint8_t* fBuf; /// the start of the allocated buffer
uint8_t* fCurInPtr; // the point in fBuf where data is inserted next
uint8_t* fCurOutPtr; // the point in fBuf where data is extracted from next
uint32_t fMaxLen; // how big fBuf is currently
// Stores `long strings`.
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
BSSizeType fMaxLen; // how big fBuf is currently
std::vector<std::shared_ptr<uint8_t[]>> longStrings; // Stores `long strings`.
};
template <int W, typename T = void>
@ -527,7 +527,7 @@ static const uint8_t BS_BLOB = 9;
static const uint8_t BS_SERIALIZABLE = 10;
static const uint8_t BS_UUID = 11;
inline ByteStream::ByteStream(const uint8_t* bp, const uint32_t len) : fBuf(0), fMaxLen(0)
inline ByteStream::ByteStream(const uint8_t* bp, const BSSizeType len) : fBuf(0), fMaxLen(0)
{
load(bp, len);
}
@ -544,15 +544,15 @@ inline uint8_t* ByteStream::buf()
{
return fCurOutPtr;
}
inline uint32_t ByteStream::length() const
inline BSSizeType ByteStream::length() const
{
return (uint32_t)(fCurInPtr - fCurOutPtr);
return static_cast<BSSizeType>(fCurInPtr - fCurOutPtr);
}
inline bool ByteStream::empty() const
{
return (length() == 0);
}
inline uint32_t ByteStream::lengthWithHdrOverhead() const
inline BSSizeType ByteStream::lengthWithHdrOverhead() const
{
return (length() + ISSOverhead);
}
@ -570,7 +570,7 @@ inline void ByteStream::rewind()
{
fCurOutPtr = fBuf + ISSOverhead;
}
inline void ByteStream::advance(uint32_t adv)
inline void ByteStream::advance(BSSizeType adv)
{
// fCurOutPtr is always >= fBuf, so fCurOutPtr - fBuf is >= 0, and this difference is always <= 32 bits
// there is an edge condition not detected here: if fCurOutPtr - fBuf is nearly 4GB and you try to
@ -619,7 +619,7 @@ inline ByteStream& ByteStream::operator=(const SBS& rhs)
return *this;
}
inline uint32_t ByteStream::getBufferSize() const
inline BSSizeType ByteStream::getBufferSize() const
{
return fMaxLen;
}
@ -738,12 +738,6 @@ void deserializeSet(ByteStream& bs, std::set<T>& s)
s.insert(tmp);
}
}
/*
template<>
struct ByteStream::_ByteStreamType<1, ByteStream::byte>>
{
typedef ByteStream::byte type;
}*/
} // namespace messageqcpp