1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

[MCOL-4829] More accurate memory counting

This commit is contained in:
Alexey Antipovsky
2021-09-02 19:16:42 +03:00
parent bf1640be65
commit 2328f4ef2a
4 changed files with 201 additions and 187 deletions

View File

@@ -1,6 +1,6 @@
/*
Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2019-2020 MariaDB Corporation
Copyright (c) 2019-2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@@ -1319,7 +1319,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
uint32_t width = fRowGroupIn.getColumnWidth(colIn);
uint32_t width = rowIn.getColumnWidth(colIn);
isWideDataType = width == datatypes::MAXDECIMALWIDTH;
if(LIKELY(isWideDataType))
{
@@ -1328,7 +1328,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int
}
else if (width <= datatypes::MAXLEGACYWIDTH)
{
uint32_t scale = fRowGroupIn.getScale()[colIn];
uint32_t scale = rowIn.getScale(colIn);
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
}
else
@@ -1795,12 +1795,11 @@ void RowAggregation::mergeEntries(const Row& rowIn)
case ROWAGG_AVG:
// count(column) for average is inserted after the sum,
// colOut+1 is the position of the count column.
doAvg(rowIn, colOut, colOut, colOut + 1, true);
doAvg(rowIn, colOut, colOut, fFunctionCols[i]->fAuxColumnIndex, true);
break;
case ROWAGG_STATS:
mergeStatistics(rowIn, colOut, colOut + 1);
mergeStatistics(rowIn, colOut, fFunctionCols[i]->fAuxColumnIndex);
break;
case ROWAGG_BIT_AND:
@@ -4313,7 +4312,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn,
// colOut(in) - column in the output row group stores the sum
// colAux(in) - column in the output row group stores the count
//------------------------------------------------------------------------------
void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool)
void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool merge)
{
if (rowIn.isNullValue(colIn))
return;
@@ -4348,7 +4347,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
uint32_t width = fRowGroupIn.getColumnWidth(colIn);
uint32_t width = rowIn.getColumnWidth(colIn);
isWideDataType = width == datatypes::MAXDECIMALWIDTH;
if(LIKELY(isWideDataType))
{
@@ -4357,7 +4356,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
}
else if (width <= datatypes::MAXLEGACYWIDTH)
{
uint32_t scale = fRowGroupIn.getScale()[colIn];
uint32_t scale = rowIn.getScale(colIn);
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
}
else
@@ -4400,6 +4399,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
}
uint64_t cnt = fRow.getUintField(colAux);
auto colAuxIn = merge ? colAux : (colIn + 1);
if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType)
{
@@ -4408,13 +4408,13 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut);
int128_t sum = valIn + *valOutPtr;
fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
}
else
{
int128_t sum = valIn;
fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
}
}
else if (isWideDataType)
@@ -4425,12 +4425,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut);
int128_t sum = *valOutPtr + *dec;
fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
}
else
{
fRow.setBinaryField(dec, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
}
}
else
@@ -4439,12 +4439,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
{
long double valOut = fRow.getLongDoubleField(colOut);
fRow.setLongDoubleField(valIn + valOut, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
}
else
{
fRow.setLongDoubleField(valIn, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux);
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
}
}
}

View File

@@ -2235,12 +2235,14 @@ inline uint64_t StringStore::getSize() const
uint64_t ret = 0;
MemChunk* mc;
ret += sizeof(MemChunk) * mem.size();
for (i = 0; i < mem.size(); i++)
{
mc = (MemChunk*) mem[i].get();
ret += mc->capacity;
}
ret += sizeof(MemChunk) * longStrings.size();
for (i = 0; i < longStrings.size(); i++)
{
mc = (MemChunk*) longStrings[i].get();

View File

@@ -74,101 +74,6 @@ int readData(int fd, char* buf, size_t sz)
return 0;
}
int writeFile(const std::string& fname, const char* buf, size_t sz,
const compress::CompressInterface* compressor)
{
if (sz == 0)
return 0;
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
return errno;
const char* tmpbuf;
std::vector<char> tmpvec;
if (compressor)
{
auto len = compressor->maxCompressedSize(sz);
tmpvec.resize(len);
compressor->compress(buf, sz, tmpvec.data(), &len);
tmpbuf = tmpvec.data();
sz = len;
}
else
{
tmpbuf = buf;
}
auto to_write = sz;
while (to_write > 0)
{
auto r = write(fd, tmpbuf + sz - to_write, to_write);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
close(fd);
return errno;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
close(fd);
return 0;
}
int readFile(const std::string& fname, std::vector<char>& buf,
const compress::CompressInterface* compressor)
{
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
return errno;
struct stat st{};
fstat(fd, &st);
size_t sz = st.st_size;
std::vector<char> tmpbuf;
tmpbuf.resize(sz);
auto to_read = sz;
while (to_read > 0)
{
auto r = read(fd, tmpbuf.data() + sz - to_read, to_read);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
close(fd);
return errno;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
close(fd);
if (compressor)
{
size_t len;
if (!compressor->getUncompressedSize(tmpbuf.data(), sz, &len))
{
return EPROTO;
}
buf.resize(len);
compressor->uncompress(tmpbuf.data(), sz, buf.data(), &len);
}
else
{
tmpbuf.swap(buf);
}
return 0;
}
std::string errorString(int errNo)
{
char tmp[1024];
@@ -428,6 +333,7 @@ protected:
return true;
}
void releaseImpl(size_t amount) override {
MemManager::releaseImpl(amount);
fRm->returnMemory(amount, fSessLimit);
@@ -440,6 +346,122 @@ private:
const bool fStrict;
};
class Dumper {
public:
Dumper(const compress::CompressInterface* comp, MemManager* mm)
: fCompressor(comp)
, fMM(mm->clone())
{}
int write(const std::string &fname, const char *buf, size_t sz) {
if (sz == 0)
return 0;
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
return errno;
const char *tmpbuf;
if (fCompressor) {
auto len = fCompressor->maxCompressedSize(sz);
checkBuffer(len);
fCompressor->compress(buf, sz, fTmpBuf.data(), &len);
tmpbuf = fTmpBuf.data();
sz = len;
} else {
tmpbuf = buf;
}
auto to_write = sz;
int ret = 0;
while (to_write > 0) {
auto r = ::write(fd, tmpbuf + sz - to_write, to_write);
if (UNLIKELY(r < 0)) {
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
close(fd);
return ret;
}
int read(const std::string &fname, std::vector<char> &buf) {
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
return errno;
struct stat st{};
fstat(fd, &st);
size_t sz = st.st_size;
std::vector<char>* tmpbuf;
if (fCompressor) {
tmpbuf = &fTmpBuf;
checkBuffer(sz);
} else {
tmpbuf = &buf;
buf.resize(sz);
}
auto to_read = sz;
int ret = 0;
while (to_read > 0) {
auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read);
if (UNLIKELY(r < 0)) {
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
if (fCompressor) {
size_t len;
if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len)) {
ret = EPROTO;
close(fd);
return ret;
}
buf.resize(len);
fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len);
}
close(fd);
return ret;
}
size_t size() const {
return fTmpBuf.size();
}
private:
void checkBuffer(size_t len) {
if (fTmpBuf.size() < len) {
size_t newtmpsz = (len + 8191) / 8192 * 8192;
std::vector<char> tmpvec(newtmpsz);
fMM->acquire(newtmpsz - fTmpBuf.size());
fTmpBuf.swap(tmpvec);
}
}
private:
const compress::CompressInterface* fCompressor;
std::unique_ptr<MemManager> fMM;
std::vector<char> fTmpBuf;
};
/** @brief Storage for RGData with LRU-cache & memory management
*/
class RowGroupStorage
@@ -495,6 +517,8 @@ public:
fLRU = std::unique_ptr<LRUIface>(new LRUIface());
}
fDumper.reset(new Dumper(fCompressor, fMM.get()));
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
@@ -872,6 +896,7 @@ public:
ret->fUniqId = fUniqId;
ret->fGeneration = gen;
ret->fCompressor = fCompressor;
ret->fDumper.reset(new Dumper(fCompressor, fMM.get()));
ret->loadFinalizedInfo();
return ret;
}
@@ -1067,7 +1092,7 @@ private:
std::vector<char> data;
int errNo;
if ((errNo = readFile(fname, data, fCompressor)) != 0)
if ((errNo = fDumper->read(fname, data)) != 0)
{
unlink(fname.c_str());
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
@@ -1122,7 +1147,7 @@ private:
rgdata->serialize(bs, fRowGroupOut->getDataSize());
int errNo;
if ((errNo = writeFile(makeRGFilename(rgid), (char*)bs.buf(), bs.length(), fCompressor)) != 0)
if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0)
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
@@ -1179,6 +1204,7 @@ private:
std::vector<uint64_t> fFinalizedRows;
std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
};
/** @brief Internal data for the hashmap */
@@ -1221,6 +1247,8 @@ public:
else
fMM.reset(new MemManager());
fDumper.reset(new Dumper(fCompressor, fMM.get()));
if (size != 0)
init(size);
}
@@ -1287,6 +1315,7 @@ public:
cloned->fUniqId = fUniqId;
cloned->fGeneration = gen;
cloned->fCompressor = fCompressor;
cloned->fDumper.reset(new Dumper(fCompressor, cloned->fMM.get()));
if (loadDump)
cloned->load();
return cloned;
@@ -1313,7 +1342,7 @@ public:
{
int errNo;
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
if ((errNo = writeFile(makeDumpName(), (char*)fPosHashes.data(), sz, fCompressor)) != 0)
if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
@@ -1349,7 +1378,7 @@ private:
{
int errNo;
std::vector<char> data;
if ((errNo = readFile(makeDumpName(), data, fCompressor)) != 0)
if ((errNo = fDumper->read(makeDumpName(), data)) != 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
@@ -1368,6 +1397,7 @@ private:
void* fUniqId; ///< uniq ID to make an uniq dump filename
std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
};
/*---------------------------------------------------------------------------
@@ -1414,10 +1444,13 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fMM.reset(new MemManager());
fNumOfInputRGPerThread = 1;
}
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit,
!enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
if (fExtKeys)
{
fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get());
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit,
!enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
fKeysStorage = fRealKeysStorage.get();
}
else
{
@@ -1432,14 +1465,6 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
RowAggStorage::~RowAggStorage()
{
cleanupAll();
if (fExtKeys)
delete fKeysStorage;
for (auto& data : fGens)
{
if (data->fInfo != nullptr)
free(data->fInfo);
}
}
bool RowAggStorage::getTargetRow(const Row &row, Row &rowOut)
@@ -1658,11 +1683,6 @@ std::unique_ptr<RGData> RowAggStorage::getNextRGData()
void RowAggStorage::freeData()
{
if (fExtKeys && fKeysStorage)
{
delete fKeysStorage;
fKeysStorage = nullptr;
}
for (auto& data : fGens)
{
data->fHashes.reset();
@@ -1670,8 +1690,7 @@ void RowAggStorage::freeData()
{
const size_t memSz = calcSizeWithBuffer(data->fMask + 1);
fMM->release(memSz);
free(data->fInfo);
data->fInfo = nullptr;
data->fInfo.reset();
}
}
fGens.clear();
@@ -1757,9 +1776,9 @@ bool RowAggStorage::tryIncreaseInfo()
for (size_t i = 0; i < elems; i += 8)
{
uint64_t val;
memcpy(&val, fCurData->fInfo + i, sizeof(val));
memcpy(&val, fCurData->fInfo.get() + i, sizeof(val));
val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL;
memcpy(fCurData->fInfo + i, &val, sizeof(val));
memcpy(fCurData->fInfo.get() + i, &val, sizeof(val));
}
fCurData->fInfo[elems] = 1;
@@ -1770,12 +1789,10 @@ bool RowAggStorage::tryIncreaseInfo()
void RowAggStorage::rehashPowerOfTwo(size_t elems)
{
const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1);
const uint8_t* const oldInfo = fCurData->fInfo;
auto oldInfo = std::move(fCurData->fInfo);
auto oldHashes = std::move(fCurData->fHashes);
fMM->release(calcBytes(oldSz));
try
{
initData(elems, oldHashes.get());
oldHashes->releaseMemory();
@@ -1789,19 +1806,6 @@ void RowAggStorage::rehashPowerOfTwo(size_t elems)
}
}
}
}
catch (...)
{
if (oldInfo != nullptr && oldInfo != fCurData->fInfo)
{
free((void *)oldInfo);
}
throw;
}
if (oldInfo != nullptr && oldInfo != fCurData->fInfo)
{
free((void *)oldInfo);
}
}
void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes)
@@ -1859,7 +1863,7 @@ void RowAggStorage::initData(size_t elems, const RowPosHashStorage* oldHashes)
logging::ERR_AGGREGATION_TOO_BIG);
}
fCurData->fHashes = oldHashes->clone(elems, fGeneration);
fCurData->fInfo = reinterpret_cast<uint8_t*>(calloc(1, bytes));
fCurData->fInfo.reset(new uint8_t[bytes]());
fCurData->fInfo[sizeWithBuffer] = 1;
fCurData->fInfoInc = INIT_INFO_INC;
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
@@ -1910,11 +1914,7 @@ void RowAggStorage::startNewGeneration()
++fGeneration;
fMM->release();
// reinitialize internal structures
if (fCurData->fInfo)
{
free(fCurData->fInfo);
fCurData->fInfo = nullptr;
}
fCurData->fInfo.reset();
fCurData->fSize = 0;
fCurData->fMask = 0;
fCurData->fMaxSize = 0;
@@ -1944,7 +1944,7 @@ void RowAggStorage::dumpInternalData() const
bs << fCurData->fMaxSize;
bs << fCurData->fInfoInc;
bs << fCurData->fInfoHashShift;
bs.append(fCurData->fInfo, calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0)
{
@@ -1998,9 +1998,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fStorage.reset(fStorage->clone(curGen - 1));
if (fExtKeys)
{
auto* oks = fKeysStorage;
fKeysStorage = oks->clone(curGen - 1);
delete oks;
fRealKeysStorage.reset(fRealKeysStorage->clone(curGen - 1));
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = fStorage.get();
@@ -2012,9 +2011,10 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
size_t prevMaxSize;
uint32_t prevInfoInc;
uint32_t prevInfoHashShift;
uint8_t *prevInfo{nullptr};
std::unique_ptr<uint8_t[]> prevInfo;
std::unique_ptr<RowGroupStorage> prevRowStorage;
std::unique_ptr<RowGroupStorage> prevRealKeyRowStorage;
RowGroupStorage *prevKeyRowStorage{nullptr};
auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
@@ -2024,8 +2024,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
prevRowStorage.reset(fStorage->clone(prevGen));
if (fExtKeys)
{
delete prevKeyRowStorage;
prevKeyRowStorage = fKeysStorage->clone(prevGen);
prevRealKeyRowStorage.reset(fKeysStorage->clone(prevGen));
prevKeyRowStorage = prevRealKeyRowStorage.get();
}
else
prevKeyRowStorage = prevRowStorage.get();
@@ -2166,13 +2166,14 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fCurData->fMaxSize = prevMaxSize;
fCurData->fInfoInc = prevInfoInc;
fCurData->fInfoHashShift = prevInfoHashShift;
if (fCurData->fInfo)
free(fCurData->fInfo);
fCurData->fInfo = prevInfo;
fCurData->fInfo = std::move(prevInfo);
fCurData->fHashes = std::move(prevHashes);
fStorage = std::move(prevRowStorage);
if (fExtKeys)
delete fKeysStorage;
if (fExtKeys) {
fRealKeysStorage = std::move(prevRealKeyRowStorage);
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = prevKeyRowStorage;
}
@@ -2184,9 +2185,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fStorage.reset(fStorage->clone(fGeneration));
if (fExtKeys)
{
auto* oks = fKeysStorage;
fKeysStorage = oks->clone(fGeneration);
delete oks;
fRealKeysStorage.reset(fRealKeysStorage->clone(fGeneration));
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = fStorage.get();
@@ -2199,7 +2199,13 @@ void RowAggStorage::loadGeneration(uint16_t gen)
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo);
}
void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, size_t &maxSize, uint32_t &infoInc, uint32_t &infoHashShift, uint8_t *&info)
void RowAggStorage::loadGeneration(uint16_t gen,
size_t& size,
size_t& mask,
size_t& maxSize,
uint32_t& infoInc,
uint32_t& infoHashShift,
std::unique_ptr<uint8_t[]>& info)
{
messageqcpp::ByteStream bs;
int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY);
@@ -2230,10 +2236,9 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, siz
bs >> infoInc;
bs >> infoHashShift;
size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize));
if (info)
free(info);
info = (uint8_t*)calloc(1, infoSz);
bs >> info;
info.reset(new uint8_t[infoSz]());
uint8_t* tmp = info.get();
bs >> tmp;
}
void RowAggStorage::cleanupAll() noexcept

View File

@@ -225,7 +225,7 @@ private:
uint64_t data;
while (true)
{
memcpy(&data, fCurData->fInfo + idx, sizeof(data));
memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data));
if (data == 0)
{
idx += sizeof(n);
@@ -311,7 +311,13 @@ private:
*/
void loadGeneration(uint16_t gen);
/** @brief Load previously dumped data into the tmp storage */
void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, uint32_t& infoInc, uint32_t& infoHashShift, uint8_t*& info);
void loadGeneration(uint16_t gen,
size_t& size,
size_t& mask,
size_t& maxSize,
uint32_t& infoInc,
uint32_t& infoHashShift,
std::unique_ptr<uint8_t[]>& info);
/** @brief Remove temporary data files */
void cleanup();
@@ -333,7 +339,7 @@ private:
struct Data
{
RowPosHashStoragePtr fHashes;
uint8_t *fInfo{nullptr};
std::unique_ptr<uint8_t[]> fInfo;
size_t fSize{0};
size_t fMask{0};
size_t fMaxSize{0};
@@ -346,6 +352,7 @@ private:
const bool fExtKeys;
std::unique_ptr<RowGroupStorage> fStorage;
std::unique_ptr<RowGroupStorage> fRealKeysStorage;
RowGroupStorage* fKeysStorage;
uint32_t fLastKeyCol;