1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #1842 from denis0x0D/MCOL-987_LZ

MCOL-987 LZ4 compression support.
This commit is contained in:
Roman Nozdrin
2021-07-07 13:13:18 +03:00
committed by GitHub
45 changed files with 1311 additions and 549 deletions

View File

@ -64,6 +64,7 @@ CompressedInetStreamSocket::CompressedInetStreamSocket()
{
config::Config* config = config::Config::makeConfig();
string val;
string compressionType;
try
{
@ -75,6 +76,19 @@ CompressedInetStreamSocket::CompressedInetStreamSocket()
useCompression = true;
else
useCompression = false;
try
{
compressionType =
config->getConfig("NetworkCompression", "NetworkCompression");
}
catch (...) { }
auto* compressInterface = compress::getCompressInterfaceByName(compressionType);
if (!compressInterface)
compressInterface = new compress::CompressInterfaceSnappy();
alg.reset(compressInterface);
}
Socket* CompressedInetStreamSocket::clone() const
@ -87,20 +101,25 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool*
{
SBS readBS, ret;
size_t uncompressedSize;
bool err;
readBS = InetStreamSocket::read(timeout, isTimeOut, stats);
if (readBS->length() == 0 || fMagicBuffer == BYTESTREAM_MAGIC)
return readBS;
err = alg.getUncompressedSize((char*) readBS->buf(), readBS->length(), &uncompressedSize);
// Read stored len, first 4 bytes.
uint32_t storedLen = *(uint32_t*) readBS->buf();
if (!err)
if (!storedLen)
return SBS(new ByteStream(0));
uncompressedSize = storedLen;
ret.reset(new ByteStream(uncompressedSize));
alg.uncompress((char*) readBS->buf(), readBS->length(), (char*) ret->getInputPtr());
alg->uncompress((char*) readBS->buf() + HEADER_SIZE,
readBS->length() - HEADER_SIZE, (char*) ret->getInputPtr(),
&uncompressedSize);
ret->advanceInputPtr(uncompressedSize);
return ret;
@ -108,15 +127,18 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool*
void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats)
{
size_t outLen = 0;
uint32_t len = msg.length();
size_t len = msg.length();
if (useCompression && (len > 512))
{
ByteStream smsg(alg.maxCompressedSize(len));
size_t outLen = alg->maxCompressedSize(len) + HEADER_SIZE;
ByteStream smsg(outLen);
alg.compress((char*) msg.buf(), len, (char*) smsg.getInputPtr(), &outLen);
smsg.advanceInputPtr(outLen);
alg->compress((char*) msg.buf(), len,
(char*) smsg.getInputPtr() + HEADER_SIZE, &outLen);
// Save original len.
*(uint32_t*) smsg.getInputPtr() = len;
smsg.advanceInputPtr(outLen + HEADER_SIZE);
if (outLen < len)
do_write(smsg, COMPRESSED_BYTESTREAM_MAGIC, stats);

View File

@ -54,8 +54,9 @@ public:
virtual const IOSocket accept(const struct timespec* timeout);
virtual void connect(const sockaddr* addr);
private:
compress::IDBCompressInterface alg;
std::shared_ptr<compress::CompressInterface> alg;
bool useCompression;
static const uint32_t HEADER_SIZE = 4;
};
} //namespace messageqcpp