diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index 98014307e..d8e161301 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -521,6 +521,7 @@ N + 127.0.0.1 diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 483207323..0f70acc4e 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -516,6 +516,7 @@ + 127.0.0.1 diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index b799efe46..3ccdaa2c9 100755 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -686,6 +686,8 @@ void RowAggregation::initialize() config::Config* config = config::Config::makeConfig(); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); + string compStr = config->getConfig("RowAggregation", "Compression"); + auto* compressor = compress::getCompressInterfaceByName(compStr); if (fKeyOnHeap) { @@ -696,7 +698,8 @@ void RowAggregation::initialize() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } else { @@ -706,7 +709,8 @@ void RowAggregation::initialize() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } // Initialize the work row. @@ -771,6 +775,8 @@ void RowAggregation::aggReset() config::Config* config = config::Config::makeConfig(); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); + string compStr = config->getConfig("RowAggregation", "Compression"); + auto* compressor = compress::getCompressInterfaceByName(compStr); if (fKeyOnHeap) { @@ -781,7 +787,8 @@ void RowAggregation::aggReset() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } else { @@ -791,7 +798,8 @@ void RowAggregation::aggReset() fRm, fSessionMemLimit, disk_agg, - allow_gen)); + allow_gen, + compressor)); } fRowGroupOut->getRow(0, &fRow); copyNullRow(fRow); @@ -1311,7 +1319,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: { - uint32_t width = fRowGroupIn.getColumnWidth(colIn); + uint32_t width = rowIn.getColumnWidth(colIn); isWideDataType = width == datatypes::MAXDECIMALWIDTH; if(LIKELY(isWideDataType)) { @@ -1320,7 +1328,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int } else if (width <= datatypes::MAXLEGACYWIDTH) { - uint32_t scale = fRowGroupIn.getScale()[colIn]; + uint32_t scale = rowIn.getScale(colIn); valIn = rowIn.getScaledSInt64FieldAsXFloat(colIn, scale); } else @@ -1787,12 +1795,11 @@ void RowAggregation::mergeEntries(const Row& rowIn) case ROWAGG_AVG: // count(column) for average is inserted after the sum, - // colOut+1 is the position of the count column. - doAvg(rowIn, colOut, colOut, colOut + 1, true); + doAvg(rowIn, colOut, colOut, fFunctionCols[i]->fAuxColumnIndex, true); break; case ROWAGG_STATS: - mergeStatistics(rowIn, colOut, colOut + 1); + mergeStatistics(rowIn, colOut, fFunctionCols[i]->fAuxColumnIndex); break; case ROWAGG_BIT_AND: @@ -4305,7 +4312,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn, // colOut(in) - column in the output row group stores the sum // colAux(in) - column in the output row group stores the count //------------------------------------------------------------------------------ -void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool) +void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool merge) { if (rowIn.isNullValue(colIn)) return; @@ -4340,7 +4347,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: { - uint32_t width = fRowGroupIn.getColumnWidth(colIn); + uint32_t width = rowIn.getColumnWidth(colIn); isWideDataType = width == datatypes::MAXDECIMALWIDTH; if(LIKELY(isWideDataType)) { @@ -4349,7 +4356,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, } else if (width <= datatypes::MAXLEGACYWIDTH) { - uint32_t scale = fRowGroupIn.getScale()[colIn]; + uint32_t scale = rowIn.getScale(colIn); valIn = rowIn.getScaledSInt64FieldAsXFloat(colIn, scale); } else @@ -4392,6 +4399,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, } uint64_t cnt = fRow.getUintField(colAux); + auto colAuxIn = merge ? colAux : (colIn + 1); if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType) { @@ -4400,13 +4408,13 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int128_t *valOutPtr = fRow.getBinaryField(colOut); int128_t sum = valIn + *valOutPtr; fRow.setBinaryField(&sum, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux); } else { int128_t sum = valIn; fRow.setBinaryField(&sum, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn), colAux); } } else if (isWideDataType) @@ -4417,12 +4425,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int128_t *valOutPtr = fRow.getBinaryField(colOut); int128_t sum = *valOutPtr + *dec; fRow.setBinaryField(&sum, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux); } else { fRow.setBinaryField(dec, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn), colAux); } } else @@ -4431,12 +4439,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, { long double valOut = fRow.getLongDoubleField(colOut); fRow.setLongDoubleField(valIn + valOut, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux); } else { fRow.setLongDoubleField(valIn, colOut); - fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); + fRow.setUintField(rowIn.getUintField(colAuxIn), colAux); } } } diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 27355082f..464dad6a4 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -2235,12 +2235,14 @@ inline uint64_t StringStore::getSize() const uint64_t ret = 0; MemChunk* mc; + ret += sizeof(MemChunk) * mem.size(); for (i = 0; i < mem.size(); i++) { mc = (MemChunk*) mem[i].get(); ret += mc->capacity; } + ret += sizeof(MemChunk) * longStrings.size(); for (i = 0; i < longStrings.size(); i++) { mc = (MemChunk*) longStrings[i].get(); diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 137ccaeea..09e9b5335 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -333,6 +333,7 @@ protected: return true; } + void releaseImpl(size_t amount) override { MemManager::releaseImpl(amount); fRm->returnMemory(amount, fSessLimit); @@ -342,7 +343,123 @@ private: joblist::ResourceManager* fRm = nullptr; boost::shared_ptr fSessLimit; const bool fWait; - const bool fStrict; + const bool fStrict; +}; + +class Dumper { +public: + Dumper(const compress::CompressInterface* comp, MemManager* mm) + : fCompressor(comp) + , fMM(mm->clone()) + {} + + int write(const std::string &fname, const char *buf, size_t sz) { + if (sz == 0) + return 0; + + int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (UNLIKELY(fd < 0)) + return errno; + + const char *tmpbuf; + if (fCompressor) { + auto len = fCompressor->maxCompressedSize(sz); + checkBuffer(len); + fCompressor->compress(buf, sz, fTmpBuf.data(), &len); + tmpbuf = fTmpBuf.data(); + sz = len; + } else { + tmpbuf = buf; + } + + auto to_write = sz; + int ret = 0; + while (to_write > 0) { + auto r = ::write(fd, tmpbuf + sz - to_write, to_write); + if (UNLIKELY(r < 0)) { + if (errno == EAGAIN) + continue; + + ret = errno; + close(fd); + return ret; + } + assert(size_t(r) <= to_write); + to_write -= r; + } + + close(fd); + return ret; + } + + int read(const std::string &fname, std::vector &buf) { + int fd = open(fname.c_str(), O_RDONLY); + if (UNLIKELY(fd < 0)) + return errno; + + struct stat st{}; + fstat(fd, &st); + size_t sz = st.st_size; + std::vector* tmpbuf; + if (fCompressor) { + tmpbuf = &fTmpBuf; + checkBuffer(sz); + } else { + tmpbuf = &buf; + buf.resize(sz); + } + + auto to_read = sz; + int ret = 0; + while (to_read > 0) { + auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read); + if (UNLIKELY(r < 0)) { + if (errno == EAGAIN) + continue; + + ret = errno; + close(fd); + return ret; + } + + assert(size_t(r) <= to_read); + to_read -= r; + } + + if (fCompressor) { + size_t len; + if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len)) { + ret = EPROTO; + close(fd); + return ret; + } + + buf.resize(len); + fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len); + } + + close(fd); + return ret; + } + + size_t size() const { + return fTmpBuf.size(); + } + +private: + void checkBuffer(size_t len) { + if (fTmpBuf.size() < len) { + size_t newtmpsz = (len + 8191) / 8192 * 8192; + std::vector tmpvec(newtmpsz); + fMM->acquire(newtmpsz - fTmpBuf.size()); + fTmpBuf.swap(tmpvec); + } + } + +private: + const compress::CompressInterface* fCompressor; + std::unique_ptr fMM; + std::vector fTmpBuf; }; /** @brief Storage for RGData with LRU-cache & memory management @@ -365,6 +482,7 @@ public: * right now? * @param strict true -> throw an exception if not enough memory * false -> deal with it later + * @param compressor pointer to CompressInterface impl or nullptr */ RowGroupStorage(const std::string& tmpDir, RowGroup* rowGroupOut, @@ -372,12 +490,14 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool wait = false, - bool strict = false) + bool strict = false, + compress::CompressInterface* compressor = nullptr) : fRowGroupOut(rowGroupOut) , fMaxRows(maxRows) , fRGDatas() , fUniqId(this) , fTmpDir(tmpDir) + , fCompressor(compressor) { if (rm) { @@ -396,6 +516,9 @@ public: fMM.reset(new MemManager()); fLRU = std::unique_ptr(new LRUIface()); } + + fDumper.reset(new Dumper(fCompressor, fMM.get())); + auto* curRG = new RGData(*fRowGroupOut, fMaxRows); fRowGroupOut->setData(curRG); fRowGroupOut->resetRowGroup(0); @@ -772,6 +895,8 @@ public: ret->fMM.reset(fMM->clone()); ret->fUniqId = fUniqId; ret->fGeneration = gen; + ret->fCompressor = fCompressor; + ret->fDumper.reset(new Dumper(fCompressor, fMM.get())); ret->loadFinalizedInfo(); return ret; } @@ -964,41 +1089,18 @@ private: void loadRG(uint64_t rgid, std::unique_ptr& rgdata, bool unlinkDump = false) { auto fname = makeRGFilename(rgid); - int fd = open(fname.c_str(), O_RDONLY); - if (UNLIKELY(fd < 0)) + + std::vector data; + int errNo; + if ((errNo = fDumper->read(fname, data)) != 0) { + unlink(fname.c_str()); throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - messageqcpp::ByteStream bs; - try - { - struct stat st - { - }; - fstat(fd, &st); - - bs.needAtLeast(st.st_size); - bs.restart(); - int errNo; - if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0) - { - close(fd); - unlink(fname.c_str()); - throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - bs.advanceInputPtr(st.st_size); - close(fd); - } - catch (...) - { - close(fd); - throw; - } + messageqcpp::ByteStream bs(reinterpret_cast(data.data()), data.size()); if (unlinkDump) unlink(fname.c_str()); @@ -1044,23 +1146,13 @@ private: fRowGroupOut->setData(rgdata); rgdata->serialize(bs, fRowGroupOut->getDataSize()); - int fd = open(makeRGFilename(rgid).c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (UNLIKELY(fd < 0)) - { - throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - int errNo; - if ((errNo = writeData(fd, (char*)bs.buf(), bs.length())) != 0) + if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0) { - close(fd); throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - close(fd); } #ifdef DISK_AGG_DEBUG @@ -1111,6 +1203,8 @@ private: uint16_t fGeneration{0}; std::vector fFinalizedRows; std::string fTmpDir; + compress::CompressInterface* fCompressor; + std::unique_ptr fDumper; }; /** @brief Internal data for the hashmap */ @@ -1142,15 +1236,19 @@ public: size_t size, joblist::ResourceManager* rm, boost::shared_ptr sessLimit, - bool enableDiskAgg) + bool enableDiskAgg, + compress::CompressInterface* compressor) : fUniqId(this) , fTmpDir(tmpDir) + , fCompressor(compressor) { if (rm) fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg)); else fMM.reset(new MemManager()); + fDumper.reset(new Dumper(fCompressor, fMM.get())); + if (size != 0) init(size); } @@ -1216,6 +1314,8 @@ public: cloned->init(size); cloned->fUniqId = fUniqId; cloned->fGeneration = gen; + cloned->fCompressor = fCompressor; + cloned->fDumper.reset(new Dumper(fCompressor, cloned->fMM.get())); if (loadDump) cloned->load(); return cloned; @@ -1240,27 +1340,15 @@ public: void dump() { - int fd = open(makeDumpName().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (fd < 0) - { - throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, - errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - int errNo; size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type); - if ((errNo = writeData(fd, (char*)fPosHashes.data(), sz)) != 0) + if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0) { - close(fd); throw logging::IDBExcept( logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - - close(fd); } private: @@ -1288,29 +1376,18 @@ private: void load() { - int fd = open(makeDumpName().c_str(), O_RDONLY); - if (fd < 0) - { - throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, - errorString(errno)), - logging::ERR_DISKAGG_FILEIO_ERROR); - } - - struct stat st; - fstat(fd, &st); - fPosHashes.resize(st.st_size / sizeof(decltype(fPosHashes)::value_type)); int errNo; - if ((errNo = readData(fd, (char*)fPosHashes.data(), st.st_size)) != 0) + std::vector data; + if ((errNo = fDumper->read(makeDumpName(), data)) != 0) { - close(fd); throw logging::IDBExcept( logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } - close(fd); + fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type)); + memcpy(fPosHashes.data(), data.data(), data.size()); } private: @@ -1319,6 +1396,8 @@ private: uint16_t fGeneration{0}; ///< current aggregation generation void* fUniqId; ///< uniq ID to make an uniq dump filename std::string fTmpDir; + compress::CompressInterface* fCompressor; + std::unique_ptr fDumper; }; /*--------------------------------------------------------------------------- @@ -1336,13 +1415,15 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, joblist::ResourceManager *rm, boost::shared_ptr sessLimit, bool enabledDiskAgg, - bool allowGenerations) + bool allowGenerations, + compress::CompressInterface* compressor) : fMaxRows(getMaxRows(enabledDiskAgg)) , fExtKeys(rowGroupOut != keysRowGroup) , fLastKeyCol(keyCount - 1) , fUniqId(this) , fAllowGenerations(allowGenerations) , fEnabledDiskAggregation(enabledDiskAgg) + , fCompressor(compressor) , fTmpDir(tmpDir) , fRowGroupOut(rowGroupOut) , fKeysRowGroup(keysRowGroup) @@ -1363,10 +1444,13 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, fMM.reset(new MemManager()); fNumOfInputRGPerThread = 1; } - fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg)); + fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, + !enabledDiskAgg, !enabledDiskAgg, fCompressor.get())); if (fExtKeys) { - fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg); + fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, + !enabledDiskAgg, !enabledDiskAgg, fCompressor.get())); + fKeysStorage = fRealKeysStorage.get(); } else { @@ -1375,20 +1459,12 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir, fKeysStorage->initRow(fKeyRow); fGens.emplace_back(new Data); fCurData = fGens.back().get(); - fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation)); + fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get())); } RowAggStorage::~RowAggStorage() { cleanupAll(); - - if (fExtKeys) - delete fKeysStorage; - for (auto& data : fGens) - { - if (data->fInfo != nullptr) - free(data->fInfo); - } } bool RowAggStorage::getTargetRow(const Row &row, Row &rowOut) @@ -1408,7 +1484,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut) fMM->getResourceManaged(), fMM->getSessionLimit(), !fEnabledDiskAggregation, - !fEnabledDiskAggregation)); + !fEnabledDiskAggregation, + fCompressor.get())); if (fExtKeys) { fKeysStorage = new RowGroupStorage(fTmpDir, @@ -1417,7 +1494,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut) fMM->getResourceManaged(), fMM->getSessionLimit(), !fEnabledDiskAggregation, - !fEnabledDiskAggregation); + !fEnabledDiskAggregation, + fCompressor.get()); } else { @@ -1605,11 +1683,6 @@ std::unique_ptr RowAggStorage::getNextRGData() void RowAggStorage::freeData() { - if (fExtKeys && fKeysStorage) - { - delete fKeysStorage; - fKeysStorage = nullptr; - } for (auto& data : fGens) { data->fHashes.reset(); @@ -1617,8 +1690,7 @@ void RowAggStorage::freeData() { const size_t memSz = calcSizeWithBuffer(data->fMask + 1); fMM->release(memSz); - free(data->fInfo); - data->fInfo = nullptr; + data->fInfo.reset(); } } fGens.clear(); @@ -1704,9 +1776,9 @@ bool RowAggStorage::tryIncreaseInfo() for (size_t i = 0; i < elems; i += 8) { uint64_t val; - memcpy(&val, fCurData->fInfo + i, sizeof(val)); + memcpy(&val, fCurData->fInfo.get() + i, sizeof(val)); val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL; - memcpy(fCurData->fInfo + i, &val, sizeof(val)); + memcpy(fCurData->fInfo.get() + i, &val, sizeof(val)); } fCurData->fInfo[elems] = 1; @@ -1717,38 +1789,23 @@ bool RowAggStorage::tryIncreaseInfo() void RowAggStorage::rehashPowerOfTwo(size_t elems) { const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1); - const uint8_t* const oldInfo = fCurData->fInfo; + auto oldInfo = std::move(fCurData->fInfo); auto oldHashes = std::move(fCurData->fHashes); fMM->release(calcBytes(oldSz)); - try - { - initData(elems, oldHashes.get()); - oldHashes->releaseMemory(); + initData(elems, oldHashes.get()); + oldHashes->releaseMemory(); - if (oldSz > 1) + if (oldSz > 1) + { + for (size_t i = 0; i < oldSz; ++i) { - for (size_t i = 0; i < oldSz; ++i) + if (UNLIKELY(oldInfo[i] != 0)) { - if (UNLIKELY(oldInfo[i] != 0)) - { - insertSwap(i, oldHashes.get()); - } + insertSwap(i, oldHashes.get()); } } } - catch (...) - { - if (oldInfo != nullptr && oldInfo != fCurData->fInfo) - { - free((void *)oldInfo); - } - throw; - } - if (oldInfo != nullptr && oldInfo != fCurData->fInfo) - { - free((void *)oldInfo); - } } void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes) @@ -1806,7 +1863,7 @@ void RowAggStorage::initData(size_t elems, const RowPosHashStorage* oldHashes) logging::ERR_AGGREGATION_TOO_BIG); } fCurData->fHashes = oldHashes->clone(elems, fGeneration); - fCurData->fInfo = reinterpret_cast(calloc(1, bytes)); + fCurData->fInfo.reset(new uint8_t[bytes]()); fCurData->fInfo[sizeWithBuffer] = 1; fCurData->fInfoInc = INIT_INFO_INC; fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT; @@ -1857,11 +1914,7 @@ void RowAggStorage::startNewGeneration() ++fGeneration; fMM->release(); // reinitialize internal structures - if (fCurData->fInfo) - { - free(fCurData->fInfo); - fCurData->fInfo = nullptr; - } + fCurData->fInfo.reset(); fCurData->fSize = 0; fCurData->fMask = 0; fCurData->fMaxSize = 0; @@ -1891,7 +1944,7 @@ void RowAggStorage::dumpInternalData() const bs << fCurData->fMaxSize; bs << fCurData->fInfoInc; bs << fCurData->fInfoHashShift; - bs.append(fCurData->fInfo, calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize))); + bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize))); int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); if (fd < 0) { @@ -1945,9 +1998,8 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) fStorage.reset(fStorage->clone(curGen - 1)); if (fExtKeys) { - auto* oks = fKeysStorage; - fKeysStorage = oks->clone(curGen - 1); - delete oks; + fRealKeysStorage.reset(fRealKeysStorage->clone(curGen - 1)); + fKeysStorage = fRealKeysStorage.get(); } else fKeysStorage = fStorage.get(); @@ -1959,9 +2011,10 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) size_t prevMaxSize; uint32_t prevInfoInc; uint32_t prevInfoHashShift; - uint8_t *prevInfo{nullptr}; + std::unique_ptr prevInfo; std::unique_ptr prevRowStorage; + std::unique_ptr prevRealKeyRowStorage; RowGroupStorage *prevKeyRowStorage{nullptr}; auto elems = calcSizeWithBuffer(fCurData->fMask + 1); @@ -1971,8 +2024,8 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) prevRowStorage.reset(fStorage->clone(prevGen)); if (fExtKeys) { - delete prevKeyRowStorage; - prevKeyRowStorage = fKeysStorage->clone(prevGen); + prevRealKeyRowStorage.reset(fKeysStorage->clone(prevGen)); + prevKeyRowStorage = prevRealKeyRowStorage.get(); } else prevKeyRowStorage = prevRowStorage.get(); @@ -2113,14 +2166,15 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) fCurData->fMaxSize = prevMaxSize; fCurData->fInfoInc = prevInfoInc; fCurData->fInfoHashShift = prevInfoHashShift; - if (fCurData->fInfo) - free(fCurData->fInfo); - fCurData->fInfo = prevInfo; + fCurData->fInfo = std::move(prevInfo); fCurData->fHashes = std::move(prevHashes); fStorage = std::move(prevRowStorage); - if (fExtKeys) - delete fKeysStorage; - fKeysStorage = prevKeyRowStorage; + if (fExtKeys) { + fRealKeysStorage = std::move(prevRealKeyRowStorage); + fKeysStorage = fRealKeysStorage.get(); + } + else + fKeysStorage = prevKeyRowStorage; } fStorage->dumpFinalizedInfo(); @@ -2131,9 +2185,8 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) fStorage.reset(fStorage->clone(fGeneration)); if (fExtKeys) { - auto* oks = fKeysStorage; - fKeysStorage = oks->clone(fGeneration); - delete oks; + fRealKeysStorage.reset(fRealKeysStorage->clone(fGeneration)); + fKeysStorage = fRealKeysStorage.get(); } else fKeysStorage = fStorage.get(); @@ -2146,7 +2199,13 @@ void RowAggStorage::loadGeneration(uint16_t gen) loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo); } -void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, size_t &maxSize, uint32_t &infoInc, uint32_t &infoHashShift, uint8_t *&info) +void RowAggStorage::loadGeneration(uint16_t gen, + size_t& size, + size_t& mask, + size_t& maxSize, + uint32_t& infoInc, + uint32_t& infoHashShift, + std::unique_ptr& info) { messageqcpp::ByteStream bs; int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY); @@ -2177,10 +2236,9 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, siz bs >> infoInc; bs >> infoHashShift; size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize)); - if (info) - free(info); - info = (uint8_t*)calloc(1, infoSz); - bs >> info; + info.reset(new uint8_t[infoSz]()); + uint8_t* tmp = info.get(); + bs >> tmp; } void RowAggStorage::cleanupAll() noexcept diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 7ac38a1d2..6e5235bcf 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -20,6 +20,7 @@ #include "resourcemanager.h" #include "rowgroup.h" +#include "idbcompress.h" #include #include @@ -51,7 +52,8 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, - bool allowGenerations = false); + bool allowGenerations = false, + compress::CompressInterface* compressor = nullptr); RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, @@ -59,10 +61,11 @@ public: joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, - bool allowGenerations = false) + bool allowGenerations = false, + compress::CompressInterface* compressor = nullptr) : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), - enabledDiskAgg, allowGenerations) + enabledDiskAgg, allowGenerations, compressor) {} ~RowAggStorage(); @@ -222,7 +225,7 @@ private: uint64_t data; while (true) { - memcpy(&data, fCurData->fInfo + idx, sizeof(data)); + memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data)); if (data == 0) { idx += sizeof(n); @@ -308,7 +311,13 @@ private: */ void loadGeneration(uint16_t gen); /** @brief Load previously dumped data into the tmp storage */ - void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, uint32_t& infoInc, uint32_t& infoHashShift, uint8_t*& info); + void loadGeneration(uint16_t gen, + size_t& size, + size_t& mask, + size_t& maxSize, + uint32_t& infoInc, + uint32_t& infoHashShift, + std::unique_ptr& info); /** @brief Remove temporary data files */ void cleanup(); @@ -330,7 +339,7 @@ private: struct Data { RowPosHashStoragePtr fHashes; - uint8_t *fInfo{nullptr}; + std::unique_ptr fInfo; size_t fSize{0}; size_t fMask{0}; size_t fMaxSize{0}; @@ -343,6 +352,7 @@ private: const bool fExtKeys; std::unique_ptr fStorage; + std::unique_ptr fRealKeysStorage; RowGroupStorage* fKeysStorage; uint32_t fLastKeyCol; @@ -356,6 +366,7 @@ private: bool fAggregated = true; bool fAllowGenerations; bool fEnabledDiskAggregation; + std::unique_ptr fCompressor; std::string fTmpDir; bool fInitialized{false}; rowgroup::RowGroup* fRowGroupOut;