1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

[MCOL-4829] Compression for the temp disk-based aggregation files

This commit is contained in:
Alexey Antipovsky
2021-08-03 13:24:00 +03:00
parent 7d7c982d9e
commit 7fea3c988e
5 changed files with 152 additions and 85 deletions

View File

@ -521,6 +521,7 @@
<!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 --> <!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 -->
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
<AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation>
<!-- <Compression>SNAPPY</Compression> --> <!-- Disabled by default -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>

View File

@ -516,6 +516,7 @@
<!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 --> <!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 -->
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
<!-- <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> --> <!-- Default value is N --> <!-- <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> --> <!-- Default value is N -->
<!-- <Compression>SNAPPY</Compression> --> <!-- Disabled by default -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>

View File

@ -686,6 +686,8 @@ void RowAggregation::initialize()
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
string compStr = config->getConfig("RowAggregation", "Compression");
auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap) if (fKeyOnHeap)
{ {
@ -696,7 +698,8 @@ void RowAggregation::initialize()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
else else
{ {
@ -706,7 +709,8 @@ void RowAggregation::initialize()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
// Initialize the work row. // Initialize the work row.
@ -771,6 +775,8 @@ void RowAggregation::aggReset()
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
string compStr = config->getConfig("RowAggregation", "Compression");
auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap) if (fKeyOnHeap)
{ {
@ -781,7 +787,8 @@ void RowAggregation::aggReset()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
else else
{ {
@ -791,7 +798,8 @@ void RowAggregation::aggReset()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
fRowGroupOut->getRow(0, &fRow); fRowGroupOut->getRow(0, &fRow);
copyNullRow(fRow); copyNullRow(fRow);

View File

@ -74,6 +74,101 @@ int readData(int fd, char* buf, size_t sz)
return 0; return 0;
} }
int writeFile(const std::string& fname, const char* buf, size_t sz,
const compress::CompressInterface* compressor)
{
if (sz == 0)
return 0;
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
return errno;
const char* tmpbuf;
std::vector<char> tmpvec;
if (compressor)
{
auto len = compressor->maxCompressedSize(sz);
tmpvec.resize(len);
compressor->compress(buf, sz, tmpvec.data(), &len);
tmpbuf = tmpvec.data();
sz = len;
}
else
{
tmpbuf = buf;
}
auto to_write = sz;
while (to_write > 0)
{
auto r = write(fd, tmpbuf + sz - to_write, to_write);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
close(fd);
return errno;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
close(fd);
return 0;
}
int readFile(const std::string& fname, std::vector<char>& buf,
const compress::CompressInterface* compressor)
{
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
return errno;
struct stat st{};
fstat(fd, &st);
size_t sz = st.st_size;
std::vector<char> tmpbuf;
tmpbuf.resize(sz);
auto to_read = sz;
while (to_read > 0)
{
auto r = read(fd, tmpbuf.data() + sz - to_read, to_read);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
close(fd);
return errno;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
close(fd);
if (compressor)
{
size_t len;
if (!compressor->getUncompressedSize(tmpbuf.data(), sz, &len))
{
return EPROTO;
}
buf.resize(len);
compressor->uncompress(tmpbuf.data(), sz, buf.data(), &len);
}
else
{
tmpbuf.swap(buf);
}
return 0;
}
std::string errorString(int errNo) std::string errorString(int errNo)
{ {
char tmp[1024]; char tmp[1024];
@ -365,6 +460,7 @@ public:
* right now? * right now?
* @param strict true -> throw an exception if not enough memory * @param strict true -> throw an exception if not enough memory
* false -> deal with it later * false -> deal with it later
* @param compressor pointer to CompressInterface impl or nullptr
*/ */
RowGroupStorage(const std::string& tmpDir, RowGroupStorage(const std::string& tmpDir,
RowGroup* rowGroupOut, RowGroup* rowGroupOut,
@ -372,12 +468,14 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool wait = false, bool wait = false,
bool strict = false) bool strict = false,
compress::CompressInterface* compressor = nullptr)
: fRowGroupOut(rowGroupOut) : fRowGroupOut(rowGroupOut)
, fMaxRows(maxRows) , fMaxRows(maxRows)
, fRGDatas() , fRGDatas()
, fUniqId(this) , fUniqId(this)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fCompressor(compressor)
{ {
if (rm) if (rm)
{ {
@ -396,6 +494,7 @@ public:
fMM.reset(new MemManager()); fMM.reset(new MemManager());
fLRU = std::unique_ptr<LRUIface>(new LRUIface()); fLRU = std::unique_ptr<LRUIface>(new LRUIface());
} }
auto* curRG = new RGData(*fRowGroupOut, fMaxRows); auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG); fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0); fRowGroupOut->resetRowGroup(0);
@ -772,6 +871,7 @@ public:
ret->fMM.reset(fMM->clone()); ret->fMM.reset(fMM->clone());
ret->fUniqId = fUniqId; ret->fUniqId = fUniqId;
ret->fGeneration = gen; ret->fGeneration = gen;
ret->fCompressor = fCompressor;
ret->loadFinalizedInfo(); ret->loadFinalizedInfo();
return ret; return ret;
} }
@ -964,41 +1064,18 @@ private:
void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false) void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false)
{ {
auto fname = makeRGFilename(rgid); auto fname = makeRGFilename(rgid);
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
messageqcpp::ByteStream bs;
try std::vector<char> data;
{
struct stat st
{
};
fstat(fd, &st);
bs.needAtLeast(st.st_size);
bs.restart();
int errNo; int errNo;
if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0) if ((errNo = readFile(fname, data, fCompressor)) != 0)
{ {
close(fd);
unlink(fname.c_str()); unlink(fname.c_str());
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
bs.advanceInputPtr(st.st_size);
close(fd); messageqcpp::ByteStream bs(reinterpret_cast<uint8_t*>(data.data()), data.size());
}
catch (...)
{
close(fd);
throw;
}
if (unlinkDump) if (unlinkDump)
unlink(fname.c_str()); unlink(fname.c_str());
@ -1044,23 +1121,13 @@ private:
fRowGroupOut->setData(rgdata); fRowGroupOut->setData(rgdata);
rgdata->serialize(bs, fRowGroupOut->getDataSize()); rgdata->serialize(bs, fRowGroupOut->getDataSize());
int fd = open(makeRGFilename(rgid).c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
int errNo; int errNo;
if ((errNo = writeData(fd, (char*)bs.buf(), bs.length())) != 0) if ((errNo = writeFile(makeRGFilename(rgid), (char*)bs.buf(), bs.length(), fCompressor)) != 0)
{ {
close(fd);
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd);
} }
#ifdef DISK_AGG_DEBUG #ifdef DISK_AGG_DEBUG
@ -1111,6 +1178,7 @@ private:
uint16_t fGeneration{0}; uint16_t fGeneration{0};
std::vector<uint64_t> fFinalizedRows; std::vector<uint64_t> fFinalizedRows;
std::string fTmpDir; std::string fTmpDir;
compress::CompressInterface* fCompressor;
}; };
/** @brief Internal data for the hashmap */ /** @brief Internal data for the hashmap */
@ -1142,9 +1210,11 @@ public:
size_t size, size_t size,
joblist::ResourceManager* rm, joblist::ResourceManager* rm,
boost::shared_ptr<int64_t> sessLimit, boost::shared_ptr<int64_t> sessLimit,
bool enableDiskAgg) bool enableDiskAgg,
compress::CompressInterface* compressor)
: fUniqId(this) : fUniqId(this)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fCompressor(compressor)
{ {
if (rm) if (rm)
fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg)); fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg));
@ -1216,6 +1286,7 @@ public:
cloned->init(size); cloned->init(size);
cloned->fUniqId = fUniqId; cloned->fUniqId = fUniqId;
cloned->fGeneration = gen; cloned->fGeneration = gen;
cloned->fCompressor = fCompressor;
if (loadDump) if (loadDump)
cloned->load(); cloned->load();
return cloned; return cloned;
@ -1240,27 +1311,15 @@ public:
void dump() void dump()
{ {
int fd = open(makeDumpName().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
int errNo; int errNo;
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type); size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
if ((errNo = writeData(fd, (char*)fPosHashes.data(), sz)) != 0) if ((errNo = writeFile(makeDumpName(), (char*)fPosHashes.data(), sz, fCompressor)) != 0)
{ {
close(fd);
throw logging::IDBExcept( throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)), errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd);
} }
private: private:
@ -1288,29 +1347,18 @@ private:
void load() void load()
{ {
int fd = open(makeDumpName().c_str(), O_RDONLY);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
struct stat st;
fstat(fd, &st);
fPosHashes.resize(st.st_size / sizeof(decltype(fPosHashes)::value_type));
int errNo; int errNo;
if ((errNo = readData(fd, (char*)fPosHashes.data(), st.st_size)) != 0) std::vector<char> data;
if ((errNo = readFile(makeDumpName(), data, fCompressor)) != 0)
{ {
close(fd);
throw logging::IDBExcept( throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)), errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd); fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type));
memcpy(fPosHashes.data(), data.data(), data.size());
} }
private: private:
@ -1319,6 +1367,7 @@ private:
uint16_t fGeneration{0}; ///< current aggregation generation uint16_t fGeneration{0}; ///< current aggregation generation
void* fUniqId; ///< uniq ID to make an uniq dump filename void* fUniqId; ///< uniq ID to make an uniq dump filename
std::string fTmpDir; std::string fTmpDir;
compress::CompressInterface* fCompressor;
}; };
/*--------------------------------------------------------------------------- /*---------------------------------------------------------------------------
@ -1336,13 +1385,15 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
joblist::ResourceManager *rm, joblist::ResourceManager *rm,
boost::shared_ptr<int64_t> sessLimit, boost::shared_ptr<int64_t> sessLimit,
bool enabledDiskAgg, bool enabledDiskAgg,
bool allowGenerations) bool allowGenerations,
compress::CompressInterface* compressor)
: fMaxRows(getMaxRows(enabledDiskAgg)) : fMaxRows(getMaxRows(enabledDiskAgg))
, fExtKeys(rowGroupOut != keysRowGroup) , fExtKeys(rowGroupOut != keysRowGroup)
, fLastKeyCol(keyCount - 1) , fLastKeyCol(keyCount - 1)
, fUniqId(this) , fUniqId(this)
, fAllowGenerations(allowGenerations) , fAllowGenerations(allowGenerations)
, fEnabledDiskAggregation(enabledDiskAgg) , fEnabledDiskAggregation(enabledDiskAgg)
, fCompressor(compressor)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fRowGroupOut(rowGroupOut) , fRowGroupOut(rowGroupOut)
, fKeysRowGroup(keysRowGroup) , fKeysRowGroup(keysRowGroup)
@ -1363,10 +1414,10 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fMM.reset(new MemManager()); fMM.reset(new MemManager());
fNumOfInputRGPerThread = 1; fNumOfInputRGPerThread = 1;
} }
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg)); fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
if (fExtKeys) if (fExtKeys)
{ {
fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg); fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get());
} }
else else
{ {
@ -1375,7 +1426,7 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fKeysStorage->initRow(fKeyRow); fKeysStorage->initRow(fKeyRow);
fGens.emplace_back(new Data); fGens.emplace_back(new Data);
fCurData = fGens.back().get(); fCurData = fGens.back().get();
fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation)); fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get()));
} }
RowAggStorage::~RowAggStorage() RowAggStorage::~RowAggStorage()
@ -1408,7 +1459,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(), fMM->getResourceManaged(),
fMM->getSessionLimit(), fMM->getSessionLimit(),
!fEnabledDiskAggregation, !fEnabledDiskAggregation,
!fEnabledDiskAggregation)); !fEnabledDiskAggregation,
fCompressor.get()));
if (fExtKeys) if (fExtKeys)
{ {
fKeysStorage = new RowGroupStorage(fTmpDir, fKeysStorage = new RowGroupStorage(fTmpDir,
@ -1417,7 +1469,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(), fMM->getResourceManaged(),
fMM->getSessionLimit(), fMM->getSessionLimit(),
!fEnabledDiskAggregation, !fEnabledDiskAggregation,
!fEnabledDiskAggregation); !fEnabledDiskAggregation,
fCompressor.get());
} }
else else
{ {

View File

@ -20,6 +20,7 @@
#include "resourcemanager.h" #include "resourcemanager.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "idbcompress.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
@ -51,7 +52,8 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool enabledDiskAgg = false,
bool allowGenerations = false); bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr);
RowAggStorage(const std::string& tmpDir, RowAggStorage(const std::string& tmpDir,
RowGroup* rowGroupOut, RowGroup* rowGroupOut,
@ -59,10 +61,11 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool enabledDiskAgg = false,
bool allowGenerations = false) bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr)
: RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount,
rm, std::move(sessLimit), rm, std::move(sessLimit),
enabledDiskAgg, allowGenerations) enabledDiskAgg, allowGenerations, compressor)
{} {}
~RowAggStorage(); ~RowAggStorage();
@ -356,6 +359,7 @@ private:
bool fAggregated = true; bool fAggregated = true;
bool fAllowGenerations; bool fAllowGenerations;
bool fEnabledDiskAggregation; bool fEnabledDiskAggregation;
std::unique_ptr<compress::CompressInterface> fCompressor;
std::string fTmpDir; std::string fTmpDir;
bool fInitialized{false}; bool fInitialized{false};
rowgroup::RowGroup* fRowGroupOut; rowgroup::RowGroup* fRowGroupOut;