From 7fea3c988ec8e93c018977c729c93c50db143c7b Mon Sep 17 00:00:00 2001 From: Alexey Antipovsky Date: Tue, 3 Aug 2021 13:24:00 +0300 Subject: [PATCH] [MCOL-4829] Compression for the temp disk-based aggregation files --- oam/etc/Columnstore.xml | 1 + oam/etc/Columnstore.xml.singleserver | 1 + utils/rowgroup/rowaggregation.cpp | 16 +- utils/rowgroup/rowstorage.cpp | 209 +++++++++++++++++---------- utils/rowgroup/rowstorage.h | 10 +- 5 files changed, 152 insertions(+), 85 deletions(-) diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index 98014307e..d8e161301 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -521,6 +521,7 @@ N + 127.0.0.1 diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 483207323..0f70acc4e 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -516,6 +516,7 @@ + 127.0.0.1 diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index b799efe46..1eda43bdf 100755 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -686,6 +686,8 @@ void RowAggregation::initialize() config::Config* config = config::Config::makeConfig(); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); + string compStr = config->getConfig("RowAggregation", "Compression"); + auto* compressor = compress::getCompressInterfaceByName(compStr); if (fKeyOnHeap) { @@ -696,7 +698,8 @@ void RowAggregation::initialize() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } else { @@ -706,7 +709,8 @@ void RowAggregation::initialize() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } // Initialize the work row. @@ -771,6 +775,8 @@ void RowAggregation::aggReset() config::Config* config = config::Config::makeConfig(); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); + string compStr = config->getConfig("RowAggregation", "Compression"); + auto* compressor = compress::getCompressInterfaceByName(compStr); if (fKeyOnHeap) { @@ -781,7 +787,8 @@ void RowAggregation::aggReset() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } else { @@ -791,7 +798,8 @@ void RowAggregation::aggReset() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } fRowGroupOut->getRow(0, &fRow); copyNullRow(fRow); diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 137ccaeea..4112be0d7 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -74,6 +74,101 @@ int readData(int fd, char* buf, size_t sz) return 0; } +int writeFile(const std::string& fname, const char* buf, size_t sz, + const compress::CompressInterface* compressor) +{ + if (sz == 0) + return 0; + + int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (UNLIKELY(fd < 0)) + return errno; + + const char* tmpbuf; + std::vector tmpvec; + if (compressor) + { + auto len = compressor->maxCompressedSize(sz); + tmpvec.resize(len); + compressor->compress(buf, sz, tmpvec.data(), &len); + tmpbuf = tmpvec.data(); + sz = len; + } + else + { + tmpbuf = buf; + } + + auto to_write = sz; + while (to_write > 0) + { + auto r = write(fd, tmpbuf + sz - to_write, to_write); + if (UNLIKELY(r < 0)) + { + if (errno == EAGAIN) + continue; + + close(fd); + return errno; + } + assert(size_t(r) <= to_write); + to_write -= r; + } + + close(fd); + return 0; +} + +int readFile(const std::string& fname, std::vector& buf, + const compress::CompressInterface* compressor) +{ + int fd = open(fname.c_str(), O_RDONLY); + if (UNLIKELY(fd < 0)) + return errno; + + struct stat st{}; + fstat(fd, &st); + size_t sz = st.st_size; + + std::vector tmpbuf; + tmpbuf.resize(sz); + + auto to_read = sz; + while (to_read > 0) + { + auto r = read(fd, tmpbuf.data() + sz - to_read, to_read); + if (UNLIKELY(r < 0)) + { + if (errno == EAGAIN) + continue; + + close(fd); + return errno; + } + + assert(size_t(r) <= to_read); + to_read -= r; + } + close(fd); + + if (compressor) + { + size_t len; + if (!compressor->getUncompressedSize(tmpbuf.data(), sz, &len)) + { + return EPROTO; + } + buf.resize(len); + compressor->uncompress(tmpbuf.data(), sz, buf.data(), &len); + } + else + { + tmpbuf.swap(buf); + } + + return 0; +} + std::string errorString(int errNo) { char tmp[1024]; @@ -365,6 +460,7 @@ public: * right now? * @param strict true -> throw an exception if not enough memory * false -> deal with it later + * @param compressor pointer to CompressInterface impl or nullptr */ RowGroupStorage(const std::string& tmpDir, RowGroup* rowGroupOut, @@ -372,12 +468,14 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool wait = false, - bool strict = false) + bool strict = false, + compress::CompressInterface* compressor = nullptr) : fRowGroupOut(rowGroupOut) , fMaxRows(maxRows) , fRGDatas() , fUniqId(this) , fTmpDir(tmpDir) + , fCompressor(compressor) { if (rm) { @@ -396,6 +494,7 @@ public: fMM.reset(new MemManager()); fLRU = std::unique_ptr(new LRUIface()); } + auto* curRG = new RGData(*fRowGroupOut, fMaxRows); fRowGroupOut->setData(curRG); fRowGroupOut->resetRowGroup(0); @@ -772,6 +871,7 @@ public: ret->fMM.reset(fMM->clone()); ret->fUniqId = fUniqId; ret->fGeneration = gen; + ret->fCompressor = fCompressor; ret->loadFinalizedInfo(); return ret; } @@ -964,41 +1064,18 @@ private: void loadRG(uint64_t rgid, std::unique_ptr& rgdata, bool unlinkDump = false) { auto fname = makeRGFilename(rgid); - int fd = open(fname.c_str(), O_RDONLY); - if (UNLIKELY(fd < 0)) + + std::vector data; + int errNo; + if ((errNo = readFile(fname, data, fCompressor)) != 0) { + unlink(fname.c_str()); throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - messageqcpp::ByteStream bs; - try - { - struct stat st - { - }; - fstat(fd, &st); - - bs.needAtLeast(st.st_size); - bs.restart(); - int errNo; - if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0) - { - close(fd); - unlink(fname.c_str()); - throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - bs.advanceInputPtr(st.st_size); - close(fd); - } - catch (...) - { - close(fd); - throw; - } + messageqcpp::ByteStream bs(reinterpret_cast(data.data()), data.size()); if (unlinkDump) unlink(fname.c_str()); @@ -1044,23 +1121,13 @@ private: fRowGroupOut->setData(rgdata); rgdata->serialize(bs, fRowGroupOut->getDataSize()); - int fd = open(makeRGFilename(rgid).c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (UNLIKELY(fd < 0)) - { - throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - int errNo; - if ((errNo = writeData(fd, (char*)bs.buf(), bs.length())) != 0) + if ((errNo = writeFile(makeRGFilename(rgid), (char*)bs.buf(), bs.length(), fCompressor)) != 0) { - close(fd); throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - close(fd); } #ifdef DISK_AGG_DEBUG @@ -1111,6 +1178,7 @@ private: uint16_t fGeneration{0}; std::vector fFinalizedRows; std::string fTmpDir; + compress::CompressInterface* fCompressor; }; /** @brief Internal data for the hashmap */ @@ -1142,9 +1210,11 @@ public: size_t size, joblist::ResourceManager* rm, boost::shared_ptr sessLimit, - bool enableDiskAgg) + bool enableDiskAgg, + compress::CompressInterface* compressor) : fUniqId(this) , fTmpDir(tmpDir) + , fCompressor(compressor) { if (rm) fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg)); @@ -1216,6 +1286,7 @@ public: cloned->init(size); cloned->fUniqId = fUniqId; cloned->fGeneration = gen; + cloned->fCompressor = fCompressor; if (loadDump) cloned->load(); return cloned; @@ -1240,27 +1311,15 @@ public: void dump() { - int fd = open(makeDumpName().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (fd < 0) - { - throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, - errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - int errNo; size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type); - if ((errNo = writeData(fd, (char*)fPosHashes.data(), sz)) != 0) + if ((errNo = writeFile(makeDumpName(), (char*)fPosHashes.data(), sz, fCompressor)) != 0) { - close(fd); throw logging::IDBExcept( logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - - close(fd); } private: @@ -1288,29 +1347,18 @@ private: void load() { - int fd = open(makeDumpName().c_str(), O_RDONLY); - if (fd < 0) - { - throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, - errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - - struct stat st; - fstat(fd, &st); - fPosHashes.resize(st.st_size / sizeof(decltype(fPosHashes)::value_type)); int errNo; - if ((errNo = readData(fd, (char*)fPosHashes.data(), st.st_size)) != 0) + std::vector data; + if ((errNo = readFile(makeDumpName(), data, fCompressor)) != 0) { - close(fd); throw logging::IDBExcept( logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - close(fd); + fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type)); + memcpy(fPosHashes.data(), data.data(), data.size()); } private: @@ -1319,6 +1367,7 @@ private: uint16_t fGeneration{0}; ///< current aggregation generation void* fUniqId; ///< uniq ID to make an uniq dump filename std::string fTmpDir; + compress::CompressInterface* fCompressor; }; /*--------------------------------------------------------------------------- @@ -1336,13 +1385,15 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, joblist::ResourceManager *rm, boost::shared_ptr sessLimit, bool enabledDiskAgg, - bool allowGenerations) + bool allowGenerations, + compress::CompressInterface* compressor) : fMaxRows(getMaxRows(enabledDiskAgg)) , fExtKeys(rowGroupOut != keysRowGroup) , fLastKeyCol(keyCount - 1) , fUniqId(this) , fAllowGenerations(allowGenerations) , fEnabledDiskAggregation(enabledDiskAgg) + , fCompressor(compressor) , fTmpDir(tmpDir) , fRowGroupOut(rowGroupOut) , fKeysRowGroup(keysRowGroup) @@ -1363,10 +1414,10 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, fMM.reset(new MemManager()); fNumOfInputRGPerThread = 1; } - fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg)); + fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get())); if (fExtKeys) { - fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg); + fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get()); } else { @@ -1375,7 +1426,7 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, fKeysStorage->initRow(fKeyRow); fGens.emplace_back(new Data); fCurData = fGens.back().get(); - fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation)); + fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get())); } RowAggStorage::~RowAggStorage() @@ -1408,7 +1459,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut) fMM->getResourceManaged(), fMM->getSessionLimit(), !fEnabledDiskAggregation, - !fEnabledDiskAggregation)); + !fEnabledDiskAggregation, + fCompressor.get())); if (fExtKeys) { fKeysStorage = new RowGroupStorage(fTmpDir, @@ -1417,7 +1469,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut) fMM->getResourceManaged(), fMM->getSessionLimit(), !fEnabledDiskAggregation, - !fEnabledDiskAggregation); + !fEnabledDiskAggregation, + fCompressor.get()); } else { diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 7ac38a1d2..3bb8f1c22 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -20,6 +20,7 @@ #include "resourcemanager.h" #include "rowgroup.h" +#include "idbcompress.h" #include #include @@ -51,7 +52,8 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, - bool allowGenerations = false); + bool allowGenerations = false, + compress::CompressInterface* compressor = nullptr); RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, @@ -59,10 +61,11 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, - bool allowGenerations = false) + bool allowGenerations = false, + compress::CompressInterface* compressor = nullptr) : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), - enabledDiskAgg, allowGenerations) + enabledDiskAgg, allowGenerations, compressor) {} ~RowAggStorage(); @@ -356,6 +359,7 @@ private: bool fAggregated = true; bool fAllowGenerations; bool fEnabledDiskAggregation; + std::unique_ptr fCompressor; std::string fTmpDir; bool fInitialized{false}; rowgroup::RowGroup* fRowGroupOut;