1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #2065 from mariadb-AlexeyAntipovsky/MCOL-4829-dev

[MCOL-4829] Compression for the temp disk-based aggregation files
This commit is contained in:
Roman Nozdrin
2021-09-13 23:13:53 +03:00
committed by GitHub
6 changed files with 251 additions and 170 deletions

View File

@ -521,6 +521,7 @@
<!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 --> <!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 -->
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
<AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation>
<!-- <Compression>SNAPPY</Compression> --> <!-- Disabled by default -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>

View File

@ -516,6 +516,7 @@
<!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 --> <!-- <RowAggrBuckets>32</RowAggrBuckets> --> <!-- Default value is number of cores * 4 -->
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
<!-- <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> --> <!-- Default value is N --> <!-- <AllowDiskBasedAggregation>N</AllowDiskBasedAggregation> --> <!-- Default value is N -->
<!-- <Compression>SNAPPY</Compression> --> <!-- Disabled by default -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>

View File

@ -686,6 +686,8 @@ void RowAggregation::initialize()
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
string compStr = config->getConfig("RowAggregation", "Compression");
auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap) if (fKeyOnHeap)
{ {
@ -696,7 +698,8 @@ void RowAggregation::initialize()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
else else
{ {
@ -706,7 +709,8 @@ void RowAggregation::initialize()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
// Initialize the work row. // Initialize the work row.
@ -771,6 +775,8 @@ void RowAggregation::aggReset()
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates); string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
string compStr = config->getConfig("RowAggregation", "Compression");
auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap) if (fKeyOnHeap)
{ {
@ -781,7 +787,8 @@ void RowAggregation::aggReset()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
else else
{ {
@ -791,7 +798,8 @@ void RowAggregation::aggReset()
fRm, fRm,
fSessionMemLimit, fSessionMemLimit,
disk_agg, disk_agg,
allow_gen)); allow_gen,
compressor));
} }
fRowGroupOut->getRow(0, &fRow); fRowGroupOut->getRow(0, &fRow);
copyNullRow(fRow); copyNullRow(fRow);
@ -1311,7 +1319,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
uint32_t width = fRowGroupIn.getColumnWidth(colIn); uint32_t width = rowIn.getColumnWidth(colIn);
isWideDataType = width == datatypes::MAXDECIMALWIDTH; isWideDataType = width == datatypes::MAXDECIMALWIDTH;
if(LIKELY(isWideDataType)) if(LIKELY(isWideDataType))
{ {
@ -1320,7 +1328,7 @@ void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int
} }
else if (width <= datatypes::MAXLEGACYWIDTH) else if (width <= datatypes::MAXLEGACYWIDTH)
{ {
uint32_t scale = fRowGroupIn.getScale()[colIn]; uint32_t scale = rowIn.getScale(colIn);
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale); valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
} }
else else
@ -1787,12 +1795,11 @@ void RowAggregation::mergeEntries(const Row& rowIn)
case ROWAGG_AVG: case ROWAGG_AVG:
// count(column) for average is inserted after the sum, // count(column) for average is inserted after the sum,
// colOut+1 is the position of the count column. doAvg(rowIn, colOut, colOut, fFunctionCols[i]->fAuxColumnIndex, true);
doAvg(rowIn, colOut, colOut, colOut + 1, true);
break; break;
case ROWAGG_STATS: case ROWAGG_STATS:
mergeStatistics(rowIn, colOut, colOut + 1); mergeStatistics(rowIn, colOut, fFunctionCols[i]->fAuxColumnIndex);
break; break;
case ROWAGG_BIT_AND: case ROWAGG_BIT_AND:
@ -4305,7 +4312,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn,
// colOut(in) - column in the output row group stores the sum // colOut(in) - column in the output row group stores the sum
// colAux(in) - column in the output row group stores the count // colAux(in) - column in the output row group stores the count
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool) void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool merge)
{ {
if (rowIn.isNullValue(colIn)) if (rowIn.isNullValue(colIn))
return; return;
@ -4340,7 +4347,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
uint32_t width = fRowGroupIn.getColumnWidth(colIn); uint32_t width = rowIn.getColumnWidth(colIn);
isWideDataType = width == datatypes::MAXDECIMALWIDTH; isWideDataType = width == datatypes::MAXDECIMALWIDTH;
if(LIKELY(isWideDataType)) if(LIKELY(isWideDataType))
{ {
@ -4349,7 +4356,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
} }
else if (width <= datatypes::MAXLEGACYWIDTH) else if (width <= datatypes::MAXLEGACYWIDTH)
{ {
uint32_t scale = fRowGroupIn.getScale()[colIn]; uint32_t scale = rowIn.getScale(colIn);
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale); valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
} }
else else
@ -4392,6 +4399,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
} }
uint64_t cnt = fRow.getUintField(colAux); uint64_t cnt = fRow.getUintField(colAux);
auto colAuxIn = merge ? colAux : (colIn + 1);
if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType) if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType)
{ {
@ -4400,13 +4408,13 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut); int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut);
int128_t sum = valIn + *valOutPtr; int128_t sum = valIn + *valOutPtr;
fRow.setBinaryField(&sum, colOut); fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
} }
else else
{ {
int128_t sum = valIn; int128_t sum = valIn;
fRow.setBinaryField(&sum, colOut); fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
} }
} }
else if (isWideDataType) else if (isWideDataType)
@ -4417,12 +4425,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut); int128_t *valOutPtr = fRow.getBinaryField<int128_t>(colOut);
int128_t sum = *valOutPtr + *dec; int128_t sum = *valOutPtr + *dec;
fRow.setBinaryField(&sum, colOut); fRow.setBinaryField(&sum, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
} }
else else
{ {
fRow.setBinaryField(dec, colOut); fRow.setBinaryField(dec, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
} }
} }
else else
@ -4431,12 +4439,12 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
{ {
long double valOut = fRow.getLongDoubleField(colOut); long double valOut = fRow.getLongDoubleField(colOut);
fRow.setLongDoubleField(valIn + valOut, colOut); fRow.setLongDoubleField(valIn + valOut, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1) + cnt, colAux); fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
} }
else else
{ {
fRow.setLongDoubleField(valIn, colOut); fRow.setLongDoubleField(valIn, colOut);
fRow.setUintField(rowIn.getUintField(colIn + 1), colAux); fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
} }
} }
} }

View File

@ -2235,12 +2235,14 @@ inline uint64_t StringStore::getSize() const
uint64_t ret = 0; uint64_t ret = 0;
MemChunk* mc; MemChunk* mc;
ret += sizeof(MemChunk) * mem.size();
for (i = 0; i < mem.size(); i++) for (i = 0; i < mem.size(); i++)
{ {
mc = (MemChunk*) mem[i].get(); mc = (MemChunk*) mem[i].get();
ret += mc->capacity; ret += mc->capacity;
} }
ret += sizeof(MemChunk) * longStrings.size();
for (i = 0; i < longStrings.size(); i++) for (i = 0; i < longStrings.size(); i++)
{ {
mc = (MemChunk*) longStrings[i].get(); mc = (MemChunk*) longStrings[i].get();

View File

@ -333,6 +333,7 @@ protected:
return true; return true;
} }
void releaseImpl(size_t amount) override { void releaseImpl(size_t amount) override {
MemManager::releaseImpl(amount); MemManager::releaseImpl(amount);
fRm->returnMemory(amount, fSessLimit); fRm->returnMemory(amount, fSessLimit);
@ -342,7 +343,123 @@ private:
joblist::ResourceManager* fRm = nullptr; joblist::ResourceManager* fRm = nullptr;
boost::shared_ptr<int64_t> fSessLimit; boost::shared_ptr<int64_t> fSessLimit;
const bool fWait; const bool fWait;
const bool fStrict; const bool fStrict;
};
class Dumper {
public:
Dumper(const compress::CompressInterface* comp, MemManager* mm)
: fCompressor(comp)
, fMM(mm->clone())
{}
int write(const std::string &fname, const char *buf, size_t sz) {
if (sz == 0)
return 0;
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
return errno;
const char *tmpbuf;
if (fCompressor) {
auto len = fCompressor->maxCompressedSize(sz);
checkBuffer(len);
fCompressor->compress(buf, sz, fTmpBuf.data(), &len);
tmpbuf = fTmpBuf.data();
sz = len;
} else {
tmpbuf = buf;
}
auto to_write = sz;
int ret = 0;
while (to_write > 0) {
auto r = ::write(fd, tmpbuf + sz - to_write, to_write);
if (UNLIKELY(r < 0)) {
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
close(fd);
return ret;
}
int read(const std::string &fname, std::vector<char> &buf) {
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
return errno;
struct stat st{};
fstat(fd, &st);
size_t sz = st.st_size;
std::vector<char>* tmpbuf;
if (fCompressor) {
tmpbuf = &fTmpBuf;
checkBuffer(sz);
} else {
tmpbuf = &buf;
buf.resize(sz);
}
auto to_read = sz;
int ret = 0;
while (to_read > 0) {
auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read);
if (UNLIKELY(r < 0)) {
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
if (fCompressor) {
size_t len;
if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len)) {
ret = EPROTO;
close(fd);
return ret;
}
buf.resize(len);
fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len);
}
close(fd);
return ret;
}
size_t size() const {
return fTmpBuf.size();
}
private:
void checkBuffer(size_t len) {
if (fTmpBuf.size() < len) {
size_t newtmpsz = (len + 8191) / 8192 * 8192;
std::vector<char> tmpvec(newtmpsz);
fMM->acquire(newtmpsz - fTmpBuf.size());
fTmpBuf.swap(tmpvec);
}
}
private:
const compress::CompressInterface* fCompressor;
std::unique_ptr<MemManager> fMM;
std::vector<char> fTmpBuf;
}; };
/** @brief Storage for RGData with LRU-cache & memory management /** @brief Storage for RGData with LRU-cache & memory management
@ -365,6 +482,7 @@ public:
* right now? * right now?
* @param strict true -> throw an exception if not enough memory * @param strict true -> throw an exception if not enough memory
* false -> deal with it later * false -> deal with it later
* @param compressor pointer to CompressInterface impl or nullptr
*/ */
RowGroupStorage(const std::string& tmpDir, RowGroupStorage(const std::string& tmpDir,
RowGroup* rowGroupOut, RowGroup* rowGroupOut,
@ -372,12 +490,14 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool wait = false, bool wait = false,
bool strict = false) bool strict = false,
compress::CompressInterface* compressor = nullptr)
: fRowGroupOut(rowGroupOut) : fRowGroupOut(rowGroupOut)
, fMaxRows(maxRows) , fMaxRows(maxRows)
, fRGDatas() , fRGDatas()
, fUniqId(this) , fUniqId(this)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fCompressor(compressor)
{ {
if (rm) if (rm)
{ {
@ -396,6 +516,9 @@ public:
fMM.reset(new MemManager()); fMM.reset(new MemManager());
fLRU = std::unique_ptr<LRUIface>(new LRUIface()); fLRU = std::unique_ptr<LRUIface>(new LRUIface());
} }
fDumper.reset(new Dumper(fCompressor, fMM.get()));
auto* curRG = new RGData(*fRowGroupOut, fMaxRows); auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG); fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0); fRowGroupOut->resetRowGroup(0);
@ -772,6 +895,8 @@ public:
ret->fMM.reset(fMM->clone()); ret->fMM.reset(fMM->clone());
ret->fUniqId = fUniqId; ret->fUniqId = fUniqId;
ret->fGeneration = gen; ret->fGeneration = gen;
ret->fCompressor = fCompressor;
ret->fDumper.reset(new Dumper(fCompressor, fMM.get()));
ret->loadFinalizedInfo(); ret->loadFinalizedInfo();
return ret; return ret;
} }
@ -964,41 +1089,18 @@ private:
void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false) void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false)
{ {
auto fname = makeRGFilename(rgid); auto fname = makeRGFilename(rgid);
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0)) std::vector<char> data;
int errNo;
if ((errNo = fDumper->read(fname, data)) != 0)
{ {
unlink(fname.c_str());
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
messageqcpp::ByteStream bs;
try messageqcpp::ByteStream bs(reinterpret_cast<uint8_t*>(data.data()), data.size());
{
struct stat st
{
};
fstat(fd, &st);
bs.needAtLeast(st.st_size);
bs.restart();
int errNo;
if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0)
{
close(fd);
unlink(fname.c_str());
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
bs.advanceInputPtr(st.st_size);
close(fd);
}
catch (...)
{
close(fd);
throw;
}
if (unlinkDump) if (unlinkDump)
unlink(fname.c_str()); unlink(fname.c_str());
@ -1044,23 +1146,13 @@ private:
fRowGroupOut->setData(rgdata); fRowGroupOut->setData(rgdata);
rgdata->serialize(bs, fRowGroupOut->getDataSize()); rgdata->serialize(bs, fRowGroupOut->getDataSize());
int fd = open(makeRGFilename(rgid).c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
int errNo; int errNo;
if ((errNo = writeData(fd, (char*)bs.buf(), bs.length())) != 0) if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0)
{ {
close(fd);
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd);
} }
#ifdef DISK_AGG_DEBUG #ifdef DISK_AGG_DEBUG
@ -1111,6 +1203,8 @@ private:
uint16_t fGeneration{0}; uint16_t fGeneration{0};
std::vector<uint64_t> fFinalizedRows; std::vector<uint64_t> fFinalizedRows;
std::string fTmpDir; std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
}; };
/** @brief Internal data for the hashmap */ /** @brief Internal data for the hashmap */
@ -1142,15 +1236,19 @@ public:
size_t size, size_t size,
joblist::ResourceManager* rm, joblist::ResourceManager* rm,
boost::shared_ptr<int64_t> sessLimit, boost::shared_ptr<int64_t> sessLimit,
bool enableDiskAgg) bool enableDiskAgg,
compress::CompressInterface* compressor)
: fUniqId(this) : fUniqId(this)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fCompressor(compressor)
{ {
if (rm) if (rm)
fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg)); fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg));
else else
fMM.reset(new MemManager()); fMM.reset(new MemManager());
fDumper.reset(new Dumper(fCompressor, fMM.get()));
if (size != 0) if (size != 0)
init(size); init(size);
} }
@ -1216,6 +1314,8 @@ public:
cloned->init(size); cloned->init(size);
cloned->fUniqId = fUniqId; cloned->fUniqId = fUniqId;
cloned->fGeneration = gen; cloned->fGeneration = gen;
cloned->fCompressor = fCompressor;
cloned->fDumper.reset(new Dumper(fCompressor, cloned->fMM.get()));
if (loadDump) if (loadDump)
cloned->load(); cloned->load();
return cloned; return cloned;
@ -1240,27 +1340,15 @@ public:
void dump() void dump()
{ {
int fd = open(makeDumpName().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
int errNo; int errNo;
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type); size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
if ((errNo = writeData(fd, (char*)fPosHashes.data(), sz)) != 0) if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0)
{ {
close(fd);
throw logging::IDBExcept( throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)), errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd);
} }
private: private:
@ -1288,29 +1376,18 @@ private:
void load() void load()
{ {
int fd = open(makeDumpName().c_str(), O_RDONLY);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
struct stat st;
fstat(fd, &st);
fPosHashes.resize(st.st_size / sizeof(decltype(fPosHashes)::value_type));
int errNo; int errNo;
if ((errNo = readData(fd, (char*)fPosHashes.data(), st.st_size)) != 0) std::vector<char> data;
if ((errNo = fDumper->read(makeDumpName(), data)) != 0)
{ {
close(fd);
throw logging::IDBExcept( throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)), errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR); logging::ERR_DISKAGG_FILEIO_ERROR);
} }
close(fd); fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type));
memcpy(fPosHashes.data(), data.data(), data.size());
} }
private: private:
@ -1319,6 +1396,8 @@ private:
uint16_t fGeneration{0}; ///< current aggregation generation uint16_t fGeneration{0}; ///< current aggregation generation
void* fUniqId; ///< uniq ID to make an uniq dump filename void* fUniqId; ///< uniq ID to make an uniq dump filename
std::string fTmpDir; std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
}; };
/*--------------------------------------------------------------------------- /*---------------------------------------------------------------------------
@ -1336,13 +1415,15 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
joblist::ResourceManager *rm, joblist::ResourceManager *rm,
boost::shared_ptr<int64_t> sessLimit, boost::shared_ptr<int64_t> sessLimit,
bool enabledDiskAgg, bool enabledDiskAgg,
bool allowGenerations) bool allowGenerations,
compress::CompressInterface* compressor)
: fMaxRows(getMaxRows(enabledDiskAgg)) : fMaxRows(getMaxRows(enabledDiskAgg))
, fExtKeys(rowGroupOut != keysRowGroup) , fExtKeys(rowGroupOut != keysRowGroup)
, fLastKeyCol(keyCount - 1) , fLastKeyCol(keyCount - 1)
, fUniqId(this) , fUniqId(this)
, fAllowGenerations(allowGenerations) , fAllowGenerations(allowGenerations)
, fEnabledDiskAggregation(enabledDiskAgg) , fEnabledDiskAggregation(enabledDiskAgg)
, fCompressor(compressor)
, fTmpDir(tmpDir) , fTmpDir(tmpDir)
, fRowGroupOut(rowGroupOut) , fRowGroupOut(rowGroupOut)
, fKeysRowGroup(keysRowGroup) , fKeysRowGroup(keysRowGroup)
@ -1363,10 +1444,13 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fMM.reset(new MemManager()); fMM.reset(new MemManager());
fNumOfInputRGPerThread = 1; fNumOfInputRGPerThread = 1;
} }
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg)); fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit,
!enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
if (fExtKeys) if (fExtKeys)
{ {
fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg); fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit,
!enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
fKeysStorage = fRealKeysStorage.get();
} }
else else
{ {
@ -1375,20 +1459,12 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fKeysStorage->initRow(fKeyRow); fKeysStorage->initRow(fKeyRow);
fGens.emplace_back(new Data); fGens.emplace_back(new Data);
fCurData = fGens.back().get(); fCurData = fGens.back().get();
fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation)); fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get()));
} }
RowAggStorage::~RowAggStorage() RowAggStorage::~RowAggStorage()
{ {
cleanupAll(); cleanupAll();
if (fExtKeys)
delete fKeysStorage;
for (auto& data : fGens)
{
if (data->fInfo != nullptr)
free(data->fInfo);
}
} }
bool RowAggStorage::getTargetRow(const Row &row, Row &rowOut) bool RowAggStorage::getTargetRow(const Row &row, Row &rowOut)
@ -1408,7 +1484,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(), fMM->getResourceManaged(),
fMM->getSessionLimit(), fMM->getSessionLimit(),
!fEnabledDiskAggregation, !fEnabledDiskAggregation,
!fEnabledDiskAggregation)); !fEnabledDiskAggregation,
fCompressor.get()));
if (fExtKeys) if (fExtKeys)
{ {
fKeysStorage = new RowGroupStorage(fTmpDir, fKeysStorage = new RowGroupStorage(fTmpDir,
@ -1417,7 +1494,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(), fMM->getResourceManaged(),
fMM->getSessionLimit(), fMM->getSessionLimit(),
!fEnabledDiskAggregation, !fEnabledDiskAggregation,
!fEnabledDiskAggregation); !fEnabledDiskAggregation,
fCompressor.get());
} }
else else
{ {
@ -1605,11 +1683,6 @@ std::unique_ptr<RGData> RowAggStorage::getNextRGData()
void RowAggStorage::freeData() void RowAggStorage::freeData()
{ {
if (fExtKeys && fKeysStorage)
{
delete fKeysStorage;
fKeysStorage = nullptr;
}
for (auto& data : fGens) for (auto& data : fGens)
{ {
data->fHashes.reset(); data->fHashes.reset();
@ -1617,8 +1690,7 @@ void RowAggStorage::freeData()
{ {
const size_t memSz = calcSizeWithBuffer(data->fMask + 1); const size_t memSz = calcSizeWithBuffer(data->fMask + 1);
fMM->release(memSz); fMM->release(memSz);
free(data->fInfo); data->fInfo.reset();
data->fInfo = nullptr;
} }
} }
fGens.clear(); fGens.clear();
@ -1704,9 +1776,9 @@ bool RowAggStorage::tryIncreaseInfo()
for (size_t i = 0; i < elems; i += 8) for (size_t i = 0; i < elems; i += 8)
{ {
uint64_t val; uint64_t val;
memcpy(&val, fCurData->fInfo + i, sizeof(val)); memcpy(&val, fCurData->fInfo.get() + i, sizeof(val));
val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL; val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL;
memcpy(fCurData->fInfo + i, &val, sizeof(val)); memcpy(fCurData->fInfo.get() + i, &val, sizeof(val));
} }
fCurData->fInfo[elems] = 1; fCurData->fInfo[elems] = 1;
@ -1717,38 +1789,23 @@ bool RowAggStorage::tryIncreaseInfo()
void RowAggStorage::rehashPowerOfTwo(size_t elems) void RowAggStorage::rehashPowerOfTwo(size_t elems)
{ {
const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1); const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1);
const uint8_t* const oldInfo = fCurData->fInfo; auto oldInfo = std::move(fCurData->fInfo);
auto oldHashes = std::move(fCurData->fHashes); auto oldHashes = std::move(fCurData->fHashes);
fMM->release(calcBytes(oldSz)); fMM->release(calcBytes(oldSz));
try initData(elems, oldHashes.get());
{ oldHashes->releaseMemory();
initData(elems, oldHashes.get());
oldHashes->releaseMemory();
if (oldSz > 1) if (oldSz > 1)
{
for (size_t i = 0; i < oldSz; ++i)
{ {
for (size_t i = 0; i < oldSz; ++i) if (UNLIKELY(oldInfo[i] != 0))
{ {
if (UNLIKELY(oldInfo[i] != 0)) insertSwap(i, oldHashes.get());
{
insertSwap(i, oldHashes.get());
}
} }
} }
} }
catch (...)
{
if (oldInfo != nullptr && oldInfo != fCurData->fInfo)
{
free((void *)oldInfo);
}
throw;
}
if (oldInfo != nullptr && oldInfo != fCurData->fInfo)
{
free((void *)oldInfo);
}
} }
void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes) void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes)
@ -1806,7 +1863,7 @@ void RowAggStorage::initData(size_t elems, const RowPosHashStorage* oldHashes)
logging::ERR_AGGREGATION_TOO_BIG); logging::ERR_AGGREGATION_TOO_BIG);
} }
fCurData->fHashes = oldHashes->clone(elems, fGeneration); fCurData->fHashes = oldHashes->clone(elems, fGeneration);
fCurData->fInfo = reinterpret_cast<uint8_t*>(calloc(1, bytes)); fCurData->fInfo.reset(new uint8_t[bytes]());
fCurData->fInfo[sizeWithBuffer] = 1; fCurData->fInfo[sizeWithBuffer] = 1;
fCurData->fInfoInc = INIT_INFO_INC; fCurData->fInfoInc = INIT_INFO_INC;
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT; fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
@ -1857,11 +1914,7 @@ void RowAggStorage::startNewGeneration()
++fGeneration; ++fGeneration;
fMM->release(); fMM->release();
// reinitialize internal structures // reinitialize internal structures
if (fCurData->fInfo) fCurData->fInfo.reset();
{
free(fCurData->fInfo);
fCurData->fInfo = nullptr;
}
fCurData->fSize = 0; fCurData->fSize = 0;
fCurData->fMask = 0; fCurData->fMask = 0;
fCurData->fMaxSize = 0; fCurData->fMaxSize = 0;
@ -1891,7 +1944,7 @@ void RowAggStorage::dumpInternalData() const
bs << fCurData->fMaxSize; bs << fCurData->fMaxSize;
bs << fCurData->fInfoInc; bs << fCurData->fInfoInc;
bs << fCurData->fInfoHashShift; bs << fCurData->fInfoHashShift;
bs.append(fCurData->fInfo, calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize))); bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) if (fd < 0)
{ {
@ -1945,9 +1998,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fStorage.reset(fStorage->clone(curGen - 1)); fStorage.reset(fStorage->clone(curGen - 1));
if (fExtKeys) if (fExtKeys)
{ {
auto* oks = fKeysStorage; fRealKeysStorage.reset(fRealKeysStorage->clone(curGen - 1));
fKeysStorage = oks->clone(curGen - 1); fKeysStorage = fRealKeysStorage.get();
delete oks;
} }
else else
fKeysStorage = fStorage.get(); fKeysStorage = fStorage.get();
@ -1959,9 +2011,10 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
size_t prevMaxSize; size_t prevMaxSize;
uint32_t prevInfoInc; uint32_t prevInfoInc;
uint32_t prevInfoHashShift; uint32_t prevInfoHashShift;
uint8_t *prevInfo{nullptr}; std::unique_ptr<uint8_t[]> prevInfo;
std::unique_ptr<RowGroupStorage> prevRowStorage; std::unique_ptr<RowGroupStorage> prevRowStorage;
std::unique_ptr<RowGroupStorage> prevRealKeyRowStorage;
RowGroupStorage *prevKeyRowStorage{nullptr}; RowGroupStorage *prevKeyRowStorage{nullptr};
auto elems = calcSizeWithBuffer(fCurData->fMask + 1); auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
@ -1971,8 +2024,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
prevRowStorage.reset(fStorage->clone(prevGen)); prevRowStorage.reset(fStorage->clone(prevGen));
if (fExtKeys) if (fExtKeys)
{ {
delete prevKeyRowStorage; prevRealKeyRowStorage.reset(fKeysStorage->clone(prevGen));
prevKeyRowStorage = fKeysStorage->clone(prevGen); prevKeyRowStorage = prevRealKeyRowStorage.get();
} }
else else
prevKeyRowStorage = prevRowStorage.get(); prevKeyRowStorage = prevRowStorage.get();
@ -2113,14 +2166,15 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fCurData->fMaxSize = prevMaxSize; fCurData->fMaxSize = prevMaxSize;
fCurData->fInfoInc = prevInfoInc; fCurData->fInfoInc = prevInfoInc;
fCurData->fInfoHashShift = prevInfoHashShift; fCurData->fInfoHashShift = prevInfoHashShift;
if (fCurData->fInfo) fCurData->fInfo = std::move(prevInfo);
free(fCurData->fInfo);
fCurData->fInfo = prevInfo;
fCurData->fHashes = std::move(prevHashes); fCurData->fHashes = std::move(prevHashes);
fStorage = std::move(prevRowStorage); fStorage = std::move(prevRowStorage);
if (fExtKeys) if (fExtKeys) {
delete fKeysStorage; fRealKeysStorage = std::move(prevRealKeyRowStorage);
fKeysStorage = prevKeyRowStorage; fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = prevKeyRowStorage;
} }
fStorage->dumpFinalizedInfo(); fStorage->dumpFinalizedInfo();
@ -2131,9 +2185,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fStorage.reset(fStorage->clone(fGeneration)); fStorage.reset(fStorage->clone(fGeneration));
if (fExtKeys) if (fExtKeys)
{ {
auto* oks = fKeysStorage; fRealKeysStorage.reset(fRealKeysStorage->clone(fGeneration));
fKeysStorage = oks->clone(fGeneration); fKeysStorage = fRealKeysStorage.get();
delete oks;
} }
else else
fKeysStorage = fStorage.get(); fKeysStorage = fStorage.get();
@ -2146,7 +2199,13 @@ void RowAggStorage::loadGeneration(uint16_t gen)
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo); loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo);
} }
void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, size_t &maxSize, uint32_t &infoInc, uint32_t &infoHashShift, uint8_t *&info) void RowAggStorage::loadGeneration(uint16_t gen,
size_t& size,
size_t& mask,
size_t& maxSize,
uint32_t& infoInc,
uint32_t& infoHashShift,
std::unique_ptr<uint8_t[]>& info)
{ {
messageqcpp::ByteStream bs; messageqcpp::ByteStream bs;
int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY); int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY);
@ -2177,10 +2236,9 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t &size, size_t &mask, siz
bs >> infoInc; bs >> infoInc;
bs >> infoHashShift; bs >> infoHashShift;
size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize)); size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize));
if (info) info.reset(new uint8_t[infoSz]());
free(info); uint8_t* tmp = info.get();
info = (uint8_t*)calloc(1, infoSz); bs >> tmp;
bs >> info;
} }
void RowAggStorage::cleanupAll() noexcept void RowAggStorage::cleanupAll() noexcept

View File

@ -20,6 +20,7 @@
#include "resourcemanager.h" #include "resourcemanager.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "idbcompress.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
@ -51,7 +52,8 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool enabledDiskAgg = false,
bool allowGenerations = false); bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr);
RowAggStorage(const std::string& tmpDir, RowAggStorage(const std::string& tmpDir,
RowGroup* rowGroupOut, RowGroup* rowGroupOut,
@ -59,10 +61,11 @@ public:
joblist::ResourceManager* rm = nullptr, joblist::ResourceManager* rm = nullptr,
boost::shared_ptr<int64_t> sessLimit = {}, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool enabledDiskAgg = false,
bool allowGenerations = false) bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr)
: RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount,
rm, std::move(sessLimit), rm, std::move(sessLimit),
enabledDiskAgg, allowGenerations) enabledDiskAgg, allowGenerations, compressor)
{} {}
~RowAggStorage(); ~RowAggStorage();
@ -222,7 +225,7 @@ private:
uint64_t data; uint64_t data;
while (true) while (true)
{ {
memcpy(&data, fCurData->fInfo + idx, sizeof(data)); memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data));
if (data == 0) if (data == 0)
{ {
idx += sizeof(n); idx += sizeof(n);
@ -308,7 +311,13 @@ private:
*/ */
void loadGeneration(uint16_t gen); void loadGeneration(uint16_t gen);
/** @brief Load previously dumped data into the tmp storage */ /** @brief Load previously dumped data into the tmp storage */
void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, uint32_t& infoInc, uint32_t& infoHashShift, uint8_t*& info); void loadGeneration(uint16_t gen,
size_t& size,
size_t& mask,
size_t& maxSize,
uint32_t& infoInc,
uint32_t& infoHashShift,
std::unique_ptr<uint8_t[]>& info);
/** @brief Remove temporary data files */ /** @brief Remove temporary data files */
void cleanup(); void cleanup();
@ -330,7 +339,7 @@ private:
struct Data struct Data
{ {
RowPosHashStoragePtr fHashes; RowPosHashStoragePtr fHashes;
uint8_t *fInfo{nullptr}; std::unique_ptr<uint8_t[]> fInfo;
size_t fSize{0}; size_t fSize{0};
size_t fMask{0}; size_t fMask{0};
size_t fMaxSize{0}; size_t fMaxSize{0};
@ -343,6 +352,7 @@ private:
const bool fExtKeys; const bool fExtKeys;
std::unique_ptr<RowGroupStorage> fStorage; std::unique_ptr<RowGroupStorage> fStorage;
std::unique_ptr<RowGroupStorage> fRealKeysStorage;
RowGroupStorage* fKeysStorage; RowGroupStorage* fKeysStorage;
uint32_t fLastKeyCol; uint32_t fLastKeyCol;
@ -356,6 +366,7 @@ private:
bool fAggregated = true; bool fAggregated = true;
bool fAllowGenerations; bool fAllowGenerations;
bool fEnabledDiskAggregation; bool fEnabledDiskAggregation;
std::unique_ptr<compress::CompressInterface> fCompressor;
std::string fTmpDir; std::string fTmpDir;
bool fInitialized{false}; bool fInitialized{false};
rowgroup::RowGroup* fRowGroupOut; rowgroup::RowGroup* fRowGroupOut;