1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-23 07:05:36 +03:00

2369 lines
62 KiB
C++

/* Copyright (C) 2021-2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <unistd.h>
#include <sys/stat.h>
#include <boost/filesystem.hpp>
#include "rowgroup.h"
#include <resourcemanager.h>
#include <fcntl.h>
#include "rowstorage.h"
#include "robin_hood.h"
namespace
{
int writeData(int fd, const char* buf, size_t sz)
{
if (sz == 0)
return 0;
auto to_write = sz;
while (to_write > 0)
{
auto r = write(fd, buf + sz - to_write, to_write);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
return errno;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
return 0;
}
int readData(int fd, char* buf, size_t sz)
{
if (sz == 0)
return 0;
auto to_read = sz;
while (to_read > 0)
{
auto r = read(fd, buf + sz - to_read, to_read);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
return errno;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
return 0;
}
std::string errorString(int errNo)
{
char tmp[1024];
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
return {buf};
}
} // anonymous namespace
namespace rowgroup
{
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol)
{
uint64_t ret = 0;
if (lastCol >= r.getColumnCount())
return 0;
datatypes::MariaDBHasher h;
utils::Hasher64_r columnHasher;
bool strHashUsed = false;
for (uint32_t i = 0; i <= lastCol; ++i)
{
switch (r.getColType(i))
{
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::BLOB:
case execplan::CalpontSystemCatalog::TEXT:
{
auto cs = r.getCharset(i);
auto strColValue = r.getConstString(i);
auto strColValueLen = strColValue.length();
if (strColValueLen > MaxConstStrSize)
{
h.add(cs, strColValue);
strHashUsed = true;
}
else
{
// This is relatively big stack allocation.
// It is aligned for future vectorization of hash calculation.
uchar buf[MaxConstStrBufSize] __attribute__((aligned(64)));
// Pay attention to the last strxfrm argument value.
// It is called flags and in many cases it has padding
// enabled(MY_STRXFRM_PAD_WITH_SPACE bit). With padding enabled
// strxfrm returns MaxConstStrBufSize bytes and not the actual
// weights array length. Here I disable padding.
auto charset = datatypes::Charset(cs);
auto trimStrColValue = strColValue.rtrimSpaces();
// The padding is disabled b/c we previously use rtrimSpaces().
// strColValueLen is used here.
size_t nActualWeights = charset.strnxfrm(buf, MaxConstStrBufSize, strColValueLen,
reinterpret_cast<const uchar*>(trimStrColValue.str()),
trimStrColValue.length(), 0);
ret = columnHasher(reinterpret_cast<const void*>(buf), nActualWeights, ret);
}
break;
}
default: ret = columnHasher(r.getData() + r.getOffset(i), r.getColumnWidth(i), ret); break;
}
}
// The properties of the hash produced are worse if MDB hasher results are incorporated
// so late but these results must be used very infrequently.
if (strHashUsed)
{
uint64_t strhash = h.finalize();
ret = columnHasher(&strhash, sizeof(strhash), ret);
}
return columnHasher.finalize(ret, lastCol << 2);
}
/** @brief NoOP interface to LRU-cache used by RowGroupStorage & HashStorage
*/
struct LRUIface
{
using List = std::list<uint64_t>;
virtual ~LRUIface() = default;
/** @brief Put an ID to cache or set it as last used */
virtual void add(uint64_t)
{
}
/** @brief Remove an ID from cache */
virtual void remove(uint64_t)
{
}
/** @brief Get iterator of the most recently used ID */
virtual List::const_reverse_iterator begin() const
{
return List::const_reverse_iterator();
}
/** @brief Get iterator after the latest ID */
virtual List::const_reverse_iterator end() const
{
return List::const_reverse_iterator();
}
/** @brief Get iterator of the latest ID */
virtual List::const_iterator rbegin() const
{
return {};
}
/** @brief Get iterator after the most recently used ID */
virtual List::const_iterator rend() const
{
return {};
}
virtual void clear()
{
}
virtual std::size_t size() const
{
return 0;
}
virtual bool empty() const
{
return true;
}
virtual LRUIface* clone() const
{
return new LRUIface();
}
};
struct LRU : public LRUIface
{
~LRU() override
{
fMap.clear();
fList.clear();
}
inline void add(uint64_t rgid) final
{
auto it = fMap.find(rgid);
if (it != fMap.end())
{
fList.erase(it->second);
}
fMap[rgid] = fList.insert(fList.end(), rgid);
}
inline void remove(uint64_t rgid) final
{
auto it = fMap.find(rgid);
if (UNLIKELY(it != fMap.end()))
{
fList.erase(it->second);
fMap.erase(it);
}
}
inline List::const_reverse_iterator begin() const final
{
return fList.crbegin();
}
inline List::const_reverse_iterator end() const final
{
return fList.crend();
}
inline List::const_iterator rbegin() const final
{
return fList.cbegin();
}
inline List::const_iterator rend() const final
{
return fList.cend();
}
inline void clear() final
{
fMap.clear();
fList.clear();
}
size_t size() const final
{
return fMap.size();
}
bool empty() const final
{
return fList.empty();
}
LRUIface* clone() const final
{
return new LRU();
}
robin_hood::unordered_flat_map<uint64_t, List::iterator> fMap;
List fList;
};
/** @brief Some service wrapping around ResourceManager (or NoOP) */
class MemManager
{
public:
MemManager()
{
}
virtual ~MemManager()
{
release(fMemUsed);
}
bool acquire(std::size_t amount)
{
return acquireImpl(amount);
}
void release(ssize_t amount = 0)
{
// in some cases it tries to release more memory than acquired, ie create
// new rowgroup, acquire maximum size (w/o strings), add some rows with
// strings and finally release the actual size of RG with strings
if (amount == 0 || amount > fMemUsed)
amount = fMemUsed;
releaseImpl(amount);
}
ssize_t getUsed() const
{
return fMemUsed;
}
virtual int64_t getFree() const
{
return std::numeric_limits<int64_t>::max();
}
virtual int64_t getConfigured() const
{
return std::numeric_limits<int64_t>::max();
}
virtual bool isStrict() const
{
return false;
}
virtual MemManager* clone() const
{
return new MemManager();
}
virtual joblist::ResourceManager* getResourceManaged()
{
return nullptr;
}
virtual boost::shared_ptr<int64_t> getSessionLimit()
{
return {};
}
protected:
virtual bool acquireImpl(std::size_t amount)
{
fMemUsed += amount;
return true;
}
virtual void releaseImpl(std::size_t amount)
{
fMemUsed -= amount;
}
ssize_t fMemUsed = 0;
};
class RMMemManager : public MemManager
{
public:
RMMemManager(joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sl, bool wait = true,
bool strict = true)
: fRm(rm), fSessLimit(std::move(sl)), fWait(wait), fStrict(strict)
{
}
~RMMemManager() override
{
release(fMemUsed);
fMemUsed = 0;
}
int64_t getConfigured() const final
{
return fRm->getConfiguredUMMemLimit();
}
int64_t getFree() const final
{
return std::min(fRm->availableMemory(), *fSessLimit);
}
bool isStrict() const final
{
return fStrict;
}
MemManager* clone() const final
{
return new RMMemManager(fRm, fSessLimit, fWait, fStrict);
}
joblist::ResourceManager* getResourceManaged() override
{
return fRm;
}
boost::shared_ptr<int64_t> getSessionLimit() override
{
return fSessLimit;
}
protected:
bool acquireImpl(size_t amount) final
{
if (amount)
{
if (!fRm->getMemory(amount, fSessLimit, fWait) && fStrict)
{
return false;
}
MemManager::acquireImpl(amount);
}
return true;
}
void releaseImpl(size_t amount) override
{
if (amount)
{
MemManager::releaseImpl(amount);
fRm->returnMemory(amount, fSessLimit);
}
}
private:
joblist::ResourceManager* fRm = nullptr;
boost::shared_ptr<int64_t> fSessLimit;
const bool fWait;
const bool fStrict;
};
class Dumper
{
public:
Dumper(const compress::CompressInterface* comp, MemManager* mm) : fCompressor(comp), fMM(mm->clone())
{
}
int write(const std::string& fname, const char* buf, size_t sz)
{
if (sz == 0)
return 0;
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
return errno;
const char* tmpbuf;
if (fCompressor)
{
auto len = fCompressor->maxCompressedSize(sz);
checkBuffer(len);
fCompressor->compress(buf, sz, fTmpBuf.data(), &len);
tmpbuf = fTmpBuf.data();
sz = len;
}
else
{
tmpbuf = buf;
}
auto to_write = sz;
int ret = 0;
while (to_write > 0)
{
auto r = ::write(fd, tmpbuf + sz - to_write, to_write);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_write);
to_write -= r;
}
close(fd);
return ret;
}
int read(const std::string& fname, std::vector<char>& buf)
{
int fd = open(fname.c_str(), O_RDONLY);
if (UNLIKELY(fd < 0))
return errno;
struct stat st
{
};
fstat(fd, &st);
size_t sz = st.st_size;
std::vector<char>* tmpbuf;
if (fCompressor)
{
tmpbuf = &fTmpBuf;
checkBuffer(sz);
}
else
{
tmpbuf = &buf;
buf.resize(sz);
}
auto to_read = sz;
int ret = 0;
while (to_read > 0)
{
auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read);
if (UNLIKELY(r < 0))
{
if (errno == EAGAIN)
continue;
ret = errno;
close(fd);
return ret;
}
assert(size_t(r) <= to_read);
to_read -= r;
}
if (fCompressor)
{
size_t len;
if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len))
{
ret = EPROTO;
close(fd);
return ret;
}
buf.resize(len);
fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len);
}
close(fd);
return ret;
}
size_t size() const
{
return fTmpBuf.size();
}
private:
void checkBuffer(size_t len)
{
if (fTmpBuf.size() < len)
{
size_t newtmpsz = (len + 8191) / 8192 * 8192;
std::vector<char> tmpvec(newtmpsz);
fMM->acquire(newtmpsz - fTmpBuf.size());
fTmpBuf.swap(tmpvec);
}
}
private:
const compress::CompressInterface* fCompressor;
std::unique_ptr<MemManager> fMM;
std::vector<char> fTmpBuf;
};
/** @brief Storage for RGData with LRU-cache & memory management
*/
class RowGroupStorage
{
public:
using RGDataStorage = std::vector<std::unique_ptr<RGData>>;
public:
/** @brief Default constructor
*
* @param tmpDir(in) directory for tmp data
* @param rowGroupOut(in,out) RowGroup metadata
* @param maxRows(in) number of rows per rowgroup
* @param rm ResourceManager to use or nullptr if we don't
* need memory accounting
* @param sessLimit session memory limit
* @param wait shall we wait a bit if we haven't enough memory
* right now?
* @param strict true -> throw an exception if not enough memory
* false -> deal with it later
* @param compressor pointer to CompressInterface impl or nullptr
*/
RowGroupStorage(const std::string& tmpDir, RowGroup* rowGroupOut, size_t maxRows,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
bool wait = false, bool strict = false, compress::CompressInterface* compressor = nullptr)
: fRowGroupOut(rowGroupOut)
, fMaxRows(maxRows)
, fRGDatas()
, fUniqId(this)
, fTmpDir(tmpDir)
, fCompressor(compressor)
{
if (rm)
{
fMM.reset(new RMMemManager(rm, sessLimit, wait, strict));
if (!wait && !strict)
{
fLRU = std::unique_ptr<LRUIface>(new LRU());
}
else
{
fLRU = std::unique_ptr<LRUIface>(new LRUIface());
}
}
else
{
fMM.reset(new MemManager());
fLRU = std::unique_ptr<LRUIface>(new LRUIface());
}
fDumper.reset(new Dumper(fCompressor, fMM.get()));
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
fRGDatas.emplace_back(curRG);
fMM->acquire(fRowGroupOut->getSizeWithStrings(fMaxRows));
}
~RowGroupStorage() = default;
ssize_t getAproxRGSize() const
{
return fRowGroupOut->getSizeWithStrings(fMaxRows);
}
/** @brief Take away RGDatas from another RowGroupStorage
*
* If some of the RGDatas is not in the memory do not load them,
* just rename dump file to match new RowGroupStorage pattern
*
* @param o RowGroupStorage to take from
*/
void append(std::unique_ptr<RowGroupStorage> o)
{
return append(o.get());
}
void append(RowGroupStorage* o)
{
std::unique_ptr<RGData> rgd;
std::string ofname;
while (o->getNextRGData(rgd, ofname))
{
fRGDatas.push_back(std::move(rgd));
uint64_t rgid = fRGDatas.size() - 1;
if (fRGDatas[rgid])
{
fRowGroupOut->setData(fRGDatas[rgid].get());
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
if (!fMM->acquire(memSz))
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
if (fMM->getFree() < memSz * 2)
{
saveRG(rgid);
fRGDatas[rgid].reset();
}
else
fLRU->add(rgid);
}
else
{
auto r = rename(ofname.c_str(), makeRGFilename(rgid).c_str());
if (UNLIKELY(r < 0))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
}
rgd.reset();
ofname.clear();
}
}
/** @brief Returns next RGData, load it from disk if necessary.
*
* @returns pointer to the next RGData or empty pointer if there is nothing
*/
std::unique_ptr<RGData> getNextRGData()
{
while (!fRGDatas.empty())
{
uint64_t rgid = fRGDatas.size() - 1;
if (!fRGDatas[rgid])
loadRG(rgid, fRGDatas[rgid], true);
unlink(makeRGFilename(rgid).c_str());
auto rgdata = std::move(fRGDatas[rgid]);
fRGDatas.pop_back();
fRowGroupOut->setData(rgdata.get());
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
fMM->release(memSz);
fLRU->remove(rgid);
if (fRowGroupOut->getRowCount() == 0)
continue;
return rgdata;
}
return {};
}
void initRow(Row& row) const
{
fRowGroupOut->initRow(&row);
}
/** @brief Get the row at the specified position, loading corresponding RGData if needed.
*
* @param idx(in) index (from 0) of the row
* @param row(out) resulting row
*/
void getRow(uint64_t idx, Row& row)
{
uint64_t rgid = idx / fMaxRows;
uint64_t rid = idx % fMaxRows;
if (UNLIKELY(!fRGDatas[rgid]))
{
loadRG(rgid);
}
fRGDatas[rgid]->getRow(rid, &row);
fLRU->add(rgid);
}
/** @brief Return a row and an index at the first free position.
*
* @param idx(out) index of the row
* @param row(out) the row itself
*/
void putRow(uint64_t& idx, Row& row)
{
bool need_new = false;
if (UNLIKELY(fRGDatas.empty()))
{
need_new = true;
}
else if (UNLIKELY(!fRGDatas[fCurRgid]))
{
need_new = true;
}
else
{
fRowGroupOut->setData(fRGDatas[fCurRgid].get());
if (UNLIKELY(fRowGroupOut->getRowCount() >= fMaxRows))
need_new = true;
}
if (UNLIKELY(need_new))
{
for (auto rgid : *fLRU)
{
if (LIKELY(static_cast<bool>(fRGDatas[rgid])))
{
fRowGroupOut->setData(fRGDatas[rgid].get());
if (fRowGroupOut->getRowCount() < fMaxRows)
{
fCurRgid = rgid;
need_new = false;
break;
}
}
}
}
if (UNLIKELY(need_new))
{
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
if (!fMM->acquire(memSz))
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
fRGDatas.emplace_back(curRG);
fCurRgid = fRGDatas.size() - 1;
}
fLRU->add(fCurRgid);
idx = fCurRgid * fMaxRows + fRowGroupOut->getRowCount();
fRowGroupOut->getRow(fRowGroupOut->getRowCount(), &row);
fRowGroupOut->incRowCount();
}
/** @brief Create a row at the specified position.
*
* Used only for key rows in case of external keys. Indexes of data row and
* corresponding key row are always the same.
*
* @param idx(in) index to create row
* @param row(out) row itself
*/
void putKeyRow(uint64_t idx, Row& row)
{
uint64_t rgid = idx / fMaxRows;
while (rgid >= fRGDatas.size())
{
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
if (!fMM->acquire(memSz))
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
fRGDatas.emplace_back(curRG);
fCurRgid = fRGDatas.size() - 1;
fLRU->add(fCurRgid);
}
if (UNLIKELY(!fRGDatas[rgid]))
{
loadRG(rgid);
}
else
{
fRowGroupOut->setData(fRGDatas[rgid].get());
}
fLRU->add(rgid);
assert(idx % fMaxRows == fRowGroupOut->getRowCount());
fRowGroupOut->getRow(fRowGroupOut->getRowCount(), &row);
fRowGroupOut->incRowCount();
}
/** @brief Dump the oldest RGData to disk, freeing memory
*
* @returns true if any RGData was dumped
*/
bool dump()
{
// Always leave at least 2 RG as this is the minimum size of the hashmap
constexpr size_t MIN_INMEMORY = 2;
if (fLRU->size() <= MIN_INMEMORY)
{
return false;
}
size_t moved = 0;
auto it = fLRU->rbegin();
while (LIKELY(it != fLRU->rend()))
{
if (fLRU->size() <= MIN_INMEMORY)
return false;
uint64_t rgid = *it;
if (UNLIKELY(!fRGDatas[rgid]))
{
++it;
fLRU->remove(rgid);
continue;
}
fRowGroupOut->setData(fRGDatas[rgid].get());
if (moved <= MIN_INMEMORY && fRowGroupOut->getRowCount() < fMaxRows)
{
++it;
++moved;
fLRU->add(rgid);
continue;
}
saveRG(rgid);
fLRU->remove(rgid);
fRGDatas[rgid].reset();
return true;
}
return false;
}
/** @brief Dump all data, clear state and start over */
void startNewGeneration()
{
dumpAll();
fLRU->clear();
fMM->release();
fRGDatas.clear();
// we need at least one RGData so create it right now
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
fRGDatas.emplace_back(curRG);
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
if (!fMM->acquire(memSz))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
fCurRgid = 0;
++fGeneration;
}
/** @brief Save "finalized" bitmap to disk for future use */
void dumpFinalizedInfo() const
{
auto fname = makeFinalizedFilename();
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (UNLIKELY(fd < 0))
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
uint64_t sz = fRGDatas.size();
uint64_t finsz = fFinalizedRows.size();
int errNo;
if ((errNo = writeData(fd, (const char*)&sz, sizeof(sz))) != 0 ||
(errNo = writeData(fd, (const char*)&finsz, sizeof(finsz)) != 0) ||
(errNo = writeData(fd, (const char*)fFinalizedRows.data(), finsz * sizeof(uint64_t)) != 0))
{
close(fd);
unlink(fname.c_str());
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
close(fd);
}
/** @brief Load "finalized" bitmap */
void loadFinalizedInfo()
{
auto fname = makeFinalizedFilename();
int fd = open(fname.c_str(), O_RDONLY);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
uint64_t sz;
uint64_t finsz;
int errNo;
if ((errNo = readData(fd, (char*)&sz, sizeof(sz)) != 0) ||
(errNo = readData(fd, (char*)&finsz, sizeof(finsz)) != 0))
{
close(fd);
unlink(fname.c_str());
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
fRGDatas.resize(sz);
fFinalizedRows.resize(finsz);
if ((errNo = readData(fd, (char*)fFinalizedRows.data(), finsz * sizeof(uint64_t))) != 0)
{
close(fd);
unlink(fname.c_str());
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
close(fd);
}
/** @brief Save all RGData to disk */
void dumpAll(bool dumpFin = true) const
{
#ifdef DISK_AGG_DEBUG
dumpMeta();
#endif
for (uint64_t i = 0; i < fRGDatas.size(); ++i)
{
if (fRGDatas[i])
saveRG(i, fRGDatas[i].get());
else
{
auto fname = makeRGFilename(i);
if (access(fname.c_str(), F_OK) != 0)
::abort();
}
}
if (dumpFin)
dumpFinalizedInfo();
}
/** @brief Create new RowGroupStorage with the save LRU, MemManager & uniq ID */
RowGroupStorage* clone(uint16_t gen) const
{
auto* ret = new RowGroupStorage(fTmpDir, fRowGroupOut, fMaxRows);
ret->fRGDatas.clear();
ret->fLRU.reset(fLRU->clone());
ret->fMM.reset(fMM->clone());
ret->fUniqId = fUniqId;
ret->fGeneration = gen;
ret->fCompressor = fCompressor;
ret->fDumper.reset(new Dumper(fCompressor, fMM.get()));
ret->loadFinalizedInfo();
return ret;
}
/** @brief Mark row at specified index as finalized so it should be skipped
*/
void markFinalized(uint64_t idx)
{
uint64_t gid = idx / 64;
uint64_t rid = idx % 64;
if (LIKELY(fFinalizedRows.size() <= gid))
fFinalizedRows.resize(gid + 1, 0ULL);
fFinalizedRows[gid] |= 1ULL << rid;
}
/** @brief Check if row at specified index was finalized earlier */
bool isFinalized(uint64_t idx) const
{
uint64_t gid = idx / 64;
uint64_t rid = idx % 64;
if (LIKELY(fFinalizedRows.size() <= gid))
return false;
return fFinalizedRows[gid] & (1ULL << rid);
}
void getTmpFilePrefixes(std::vector<std::string>& prefixes) const
{
char buf[PATH_MAX];
snprintf(buf, sizeof(buf), "Agg-p%u-t%p-rg", getpid(), fUniqId);
prefixes.emplace_back(buf);
snprintf(buf, sizeof(buf), "AggFin-p%u-t%p-g", getpid(), fUniqId);
prefixes.emplace_back(buf);
}
private:
/** @brief Get next available RGData and fill filename of the dump if it's
* not in the memory.
*
* It skips finalized rows, shifting data within rowgroups
*
* @param rgdata(out) RGData to return
* @param fname(out) Filename of the dump if it's not in the memory
* @returns true if there is available RGData
*/
bool getNextRGData(std::unique_ptr<RGData>& rgdata, std::string& fname)
{
if (UNLIKELY(fRGDatas.empty()))
{
fMM->release();
return false;
}
while (!fRGDatas.empty())
{
uint64_t rgid = fRGDatas.size() - 1;
rgdata = std::move(fRGDatas[rgid]);
fRGDatas.pop_back();
uint64_t fgid = rgid * fMaxRows / 64;
uint64_t tgid = fgid + fMaxRows / 64;
if (fFinalizedRows.size() > fgid)
{
if (tgid >= fFinalizedRows.size())
fFinalizedRows.resize(tgid + 1, 0ULL);
if (!rgdata)
{
for (auto i = fgid; i < tgid; ++i)
{
if (fFinalizedRows[i] != ~0ULL)
{
loadRG(rgid, rgdata, true);
break;
}
}
}
if (!rgdata)
{
unlink(makeRGFilename(rgid).c_str());
continue;
}
uint64_t pos = 0;
uint64_t opos = 0;
fRowGroupOut->setData(rgdata.get());
for (auto i = fgid; i < tgid; ++i)
{
if ((i - fgid) * 64 >= fRowGroupOut->getRowCount())
break;
uint64_t mask = ~fFinalizedRows[i];
if ((i - fgid + 1) * 64 > fRowGroupOut->getRowCount())
{
mask &= (~0ULL) >> ((i - fgid + 1) * 64 - fRowGroupOut->getRowCount());
}
opos = (i - fgid) * 64;
if (mask == ~0ULL)
{
if (LIKELY(pos != opos))
moveRows(rgdata.get(), pos, opos, 64);
pos += 64;
continue;
}
if (mask == 0)
continue;
while (mask != 0)
{
size_t b = __builtin_ffsll(mask);
size_t e = __builtin_ffsll(~(mask >> b)) + b;
if (UNLIKELY(e >= 64))
mask = 0;
else
mask >>= e;
if (LIKELY(pos != opos + b - 1))
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
pos += e - b;
opos += e;
}
--opos;
}
if (pos == 0)
{
fLRU->remove(rgid);
unlink(makeRGFilename(rgid).c_str());
continue;
}
fRowGroupOut->setData(rgdata.get());
fRowGroupOut->setRowCount(pos);
}
if (rgdata)
{
fRowGroupOut->setData(rgdata.get());
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
fMM->release(memSz);
unlink(makeRGFilename(rgid).c_str());
}
else
{
fname = makeRGFilename(rgid);
}
fLRU->remove(rgid);
return true;
}
return false;
}
/** @brief Compact rowgroup with finalized rows
*
* Move raw data from the next non-finalized row to replace finalized rows
* It is safe because pointers to long string also stored inside data
*
* @param rgdata(in,out) RGData to work with
* @param to(in) row num inside rgdata of the first finalized row
* @param from(in) row num of the first actual row
* @param numRows(in) how many rows should be moved
*/
void moveRows(RGData* rgdata, uint64_t to, uint64_t from, size_t numRows)
{
const size_t rowsz = fRowGroupOut->getRowSize();
const size_t hdrsz = RowGroup::getHeaderSize();
uint8_t* data = rgdata->rowData.get() + hdrsz;
memmove(data + to * rowsz, data + from * rowsz, numRows * rowsz);
}
/** @brief Load RGData from disk dump.
*
* @param rgid(in) RGData ID
*/
void loadRG(uint64_t rgid)
{
if (UNLIKELY(static_cast<bool>(fRGDatas[rgid])))
{
fRowGroupOut->setData(fRGDatas[rgid].get());
return;
}
loadRG(rgid, fRGDatas[rgid]);
}
void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false)
{
auto fname = makeRGFilename(rgid);
std::vector<char> data;
int errNo;
if ((errNo = fDumper->read(fname, data)) != 0)
{
unlink(fname.c_str());
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
messageqcpp::ByteStream bs(reinterpret_cast<uint8_t*>(data.data()), data.size());
if (unlinkDump)
unlink(fname.c_str());
rgdata.reset(new RGData());
rgdata->deserialize(bs, fRowGroupOut->getDataSize(fMaxRows));
fRowGroupOut->setData(rgdata.get());
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
if (!fMM->acquire(memSz))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
}
/** @brief Dump RGData to disk.
*
* @param rgid(in) RGData ID
*/
void saveRG(uint64_t rgid)
{
auto rgdata = std::move(fRGDatas[rgid]);
if (!rgdata)
return;
fLRU->remove(rgid);
fRowGroupOut->setData(rgdata.get());
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
saveRG(rgid, rgdata.get());
}
/** @brief Dump RGData to disk.
*
* @param rgid(in) RGData ID
* @param rgdata(in) pointer to RGData itself
*/
void saveRG(uint64_t rgid, RGData* rgdata) const
{
messageqcpp::ByteStream bs;
fRowGroupOut->setData(rgdata);
rgdata->serialize(bs, fRowGroupOut->getDataSize());
int errNo;
if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
}
#ifdef DISK_AGG_DEBUG
void dumpMeta() const
{
messageqcpp::ByteStream bs;
fRowGroupOut->serialize(bs);
char buf[1024];
snprintf(buf, sizeof(buf), "/tmp/kemm/META-p%u-t%p", getpid(), fUniqPtr);
int fd = open(buf, O_WRONLY | O_TRUNC | O_CREAT, 0644);
assert(fd >= 0);
auto r = write(fd, bs.buf(), bs.length());
assert(r == bs.length());
close(fd);
}
#endif
/** @brief Create dump filename.
*/
std::string makeRGFilename(uint64_t rgid) const
{
char buf[PATH_MAX];
snprintf(buf, sizeof(buf), "%s/Agg-p%u-t%p-rg%lu-g%u", fTmpDir.c_str(), getpid(), fUniqId, rgid,
fGeneration);
return buf;
}
std::string makeFinalizedFilename() const
{
char fname[PATH_MAX];
snprintf(fname, sizeof(fname), "%s/AggFin-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId, fGeneration);
return fname;
}
private:
friend class RowAggStorage;
RowGroup* fRowGroupOut{nullptr};
const size_t fMaxRows;
std::unique_ptr<MemManager> fMM;
std::unique_ptr<LRUIface> fLRU;
RGDataStorage fRGDatas;
const void* fUniqId;
uint64_t fCurRgid{0};
uint16_t fGeneration{0};
std::vector<uint64_t> fFinalizedRows;
std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
};
/** @brief Internal data for the hashmap */
struct RowPosHash
{
uint64_t hash; ///< Row hash
uint64_t idx; ///< index in the RowGroupStorage
};
/***
* @ brief Storage for row positions and hashes memory management
* and the ability to save on disk
*/
class RowPosHashStorage
{
public:
/***
* @brief Default constructor
*
* The internal data is stored as a plain vector of row pos & hash
*
* @param tmpDir(in) directory for tmp data
* @param size(in) maximum size of storage (NB: without the padding)
* @param rm ResourceManager to use
* @param sessLimit session memory limit
* @param enableDiskAgg is disk aggregation enabled?
*/
RowPosHashStorage(const std::string& tmpDir, size_t size, joblist::ResourceManager* rm,
boost::shared_ptr<int64_t> sessLimit, bool enableDiskAgg,
compress::CompressInterface* compressor)
: fUniqId(this), fTmpDir(tmpDir), fCompressor(compressor)
{
if (rm)
fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg));
else
fMM.reset(new MemManager());
fDumper.reset(new Dumper(fCompressor, fMM.get()));
if (size != 0)
init(size);
}
/***
* @brief Get the row position and hash at the idx
*
* @param idx(in) index (from 0) of the row
*/
RowPosHash& get(uint64_t idx)
{
return fPosHashes[idx];
}
/***
* @brief Store the row position and hash at the idx
*
* @param idx(in) index of the row
* @param rowData(in) position and hash of the row
*/
void set(uint64_t idx, const RowPosHash& pos)
{
memcpy(&fPosHashes[idx], &pos, sizeof(pos));
}
/*** @return Size of data */
ssize_t memUsage() const
{
return fMM->getUsed();
}
/*** @brief Unregister used memory */
void releaseMemory()
{
fMM->release();
}
/***
* @brief Move row positions & hashes inside [insIdx, startIdx] up by 1
*
* @param startIdx(in) last index to move
* @param insIdx(in) first index to move
*/
void shiftUp(uint64_t startIdx, uint64_t insIdx)
{
memmove(&fPosHashes[insIdx + 1], &fPosHashes[insIdx],
(startIdx - insIdx) * sizeof(decltype(fPosHashes)::value_type));
}
/***
* @brief Create new storage with the same MemManager & uniq ID
* @param size(in) maximum size of storage without padding
* @param gen(in) new generation
*/
RowPosHashStoragePtr clone(size_t size, uint16_t gen, bool loadDump = false) const
{
RowPosHashStoragePtr cloned;
cloned.reset(new RowPosHashStorage());
cloned->fMM.reset(fMM->clone());
cloned->fTmpDir = fTmpDir;
cloned->init(size);
cloned->fUniqId = fUniqId;
cloned->fGeneration = gen;
cloned->fCompressor = fCompressor;
cloned->fDumper.reset(new Dumper(fCompressor, cloned->fMM.get()));
if (loadDump)
cloned->load();
return cloned;
}
/*** @brief Remove dump file */
void cleanup() const
{
unlink(makeDumpName().c_str());
}
/***
* @brief Dump all data, clear state, and start over with the new generation
*/
void startNewGeneration()
{
dump();
++fGeneration;
fPosHashes.clear();
fMM->release();
}
void dump()
{
int errNo;
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
}
private:
RowPosHashStorage() = default;
void init(size_t size)
{
auto bkts = size + 0xFFUL;
if (!fMM->acquire(bkts * sizeof(decltype(fPosHashes)::value_type)))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
fPosHashes.resize(bkts);
}
std::string makeDumpName() const
{
char fname[PATH_MAX];
snprintf(fname, sizeof(fname), "%s/Agg-PosHash-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId,
fGeneration);
return fname;
}
void load()
{
int errNo;
std::vector<char> data;
if ((errNo = fDumper->read(makeDumpName(), data)) != 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type));
memcpy(fPosHashes.data(), data.data(), data.size());
}
private:
std::unique_ptr<MemManager> fMM;
std::vector<RowPosHash> fPosHashes;
uint16_t fGeneration{0}; ///< current aggregation generation
void* fUniqId; ///< uniq ID to make an uniq dump filename
std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
};
/*---------------------------------------------------------------------------
* Based on the Robin Hood hashmap implementation by Martin Ankerl:
* https://github.com/martinus/robin-hood-hashing
*
* But store actual row data within RowGroupStorage, that stores a vector
* of RGData with possibility to dump unused to disk. Also store some service
* information (index and hash) in the vector of portions of the same size in
* elements, that can be dumped on disk either.
----------------------------------------------------------------------------*/
RowAggStorage::RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup,
uint32_t keyCount, joblist::ResourceManager* rm,
boost::shared_ptr<int64_t> sessLimit, bool enabledDiskAgg, bool allowGenerations,
compress::CompressInterface* compressor)
: fMaxRows(getMaxRows(enabledDiskAgg))
, fExtKeys(rowGroupOut != keysRowGroup)
, fLastKeyCol(keyCount - 1)
, fUniqId(this)
, fAllowGenerations(allowGenerations)
, fEnabledDiskAggregation(enabledDiskAgg)
, fCompressor(compressor)
, fTmpDir(tmpDir)
, fRowGroupOut(rowGroupOut)
, fKeysRowGroup(keysRowGroup)
{
char suffix[PATH_MAX];
snprintf(suffix, sizeof(suffix), "/p%u-t%p/", getpid(), this);
fTmpDir.append(suffix);
if (enabledDiskAgg)
boost::filesystem::create_directories(fTmpDir);
if (rm)
{
fMM.reset(new RMMemManager(rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg));
fNumOfInputRGPerThread = std::max<uint32_t>(1, rm->aggNumRowGroups());
}
else
{
fMM.reset(new MemManager());
fNumOfInputRGPerThread = 1;
}
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg,
fCompressor.get()));
if (fExtKeys)
{
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg,
!enabledDiskAgg, fCompressor.get()));
fKeysStorage = fRealKeysStorage.get();
}
else
{
fKeysStorage = fStorage.get();
}
fKeysStorage->initRow(fKeyRow);
fGens.emplace_back(new Data);
fCurData = fGens.back().get();
fCurData->fHashes.reset(
new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get()));
}
RowAggStorage::~RowAggStorage()
{
cleanupAll();
}
bool RowAggStorage::getTargetRow(const Row& row, Row& rowOut)
{
uint64_t hash = hashRow(row, fLastKeyCol);
return getTargetRow(row, hash, rowOut);
}
bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut)
{
if (UNLIKELY(!fInitialized))
{
fInitialized = true;
fStorage.reset(new RowGroupStorage(fTmpDir, fRowGroupOut, fMaxRows, fMM->getResourceManaged(),
fMM->getSessionLimit(), !fEnabledDiskAggregation,
!fEnabledDiskAggregation, fCompressor.get()));
if (fExtKeys)
{
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, fKeysRowGroup, fMaxRows, fMM->getResourceManaged(),
fMM->getSessionLimit(), !fEnabledDiskAggregation,
!fEnabledDiskAggregation, fCompressor.get()));
fKeysStorage = fRealKeysStorage.get();
}
else
{
fKeysStorage = fStorage.get();
}
fKeysStorage->initRow(fKeyRow);
reserve(fMaxRows);
}
else if (UNLIKELY(fCurData->fSize >= fCurData->fMaxSize))
{
increaseSize();
}
auto [info, idx] = rowHashToIdx(hash);
nextWhileLess(info, idx);
while (info == fCurData->fInfo[idx])
{
auto& pos = fCurData->fHashes->get(idx);
if (pos.hash == hash)
{
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
fKeysStorage->getRow(pos.idx, keyRow);
if (row.equals(keyRow, fLastKeyCol))
{
if (!fExtKeys)
return false;
fStorage->getRow(pos.idx, rowOut);
return false;
}
}
next(info, idx);
}
if (!fEnabledDiskAggregation && fGeneration != 0)
{
// there are several generations here, so let's try to find suitable row in them
uint16_t gen = fGeneration - 1;
do
{
auto* genData = fGens[gen].get();
auto [ginfo, gidx] = rowHashToIdx(hash, genData->fMask, genData->hashMultiplier_, genData->fInfoInc,
genData->fInfoHashShift);
nextWhileLess(ginfo, gidx, genData);
while (ginfo == genData->fInfo[gidx])
{
auto& pos = genData->fHashes->get(idx);
if (pos.hash == hash)
{
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
fKeysStorage->getRow(pos.idx, keyRow);
if (row.equals(keyRow, fLastKeyCol))
{
if (!fExtKeys)
return false;
fStorage->getRow(pos.idx, rowOut);
return false;
}
}
next(ginfo, gidx, genData);
}
} while (gen-- != 0);
}
const auto ins_idx = idx;
const auto ins_info = info;
if (UNLIKELY(ins_info + fCurData->fInfoInc > 0xFF))
{
fCurData->fMaxSize = 0;
}
while (fCurData->fInfo[idx] != 0)
{
next(info, idx);
}
if (idx != ins_idx)
{
shiftUp(idx, ins_idx);
}
RowPosHash pos;
pos.hash = hash;
fStorage->putRow(pos.idx, rowOut);
if (fExtKeys)
{
fKeysStorage->putKeyRow(pos.idx, fKeyRow);
copyRow(row, &fKeyRow);
}
fCurData->fHashes->set(ins_idx, pos);
fCurData->fInfo[ins_idx] = static_cast<uint8_t>(ins_info);
++fCurData->fSize;
return true;
}
void RowAggStorage::dump()
{
if (!fEnabledDiskAggregation)
return;
constexpr const int freeMemLimit = 50ULL * 1024ULL * 1024ULL;
const int64_t leaveFree = fNumOfInputRGPerThread * fRowGroupOut->getRowSize() * getBucketSize();
uint64_t freeAttempts{0};
int64_t freeMem = 0;
while (true)
{
++freeAttempts;
freeMem = fMM->getFree();
if (freeMem > leaveFree)
break;
bool success = fStorage->dump();
if (fExtKeys)
success |= fKeysStorage->dump();
if (!success)
break;
}
const int64_t totalMem = fMM->getConfigured();
// If the generations are allowed and there are less than half of
// rowgroups in memory, then we start a new generation
if (fAllowGenerations && fStorage->fLRU->size() < fStorage->fRGDatas.size() / 2 &&
fStorage->fRGDatas.size() > 10)
{
startNewGeneration();
}
else if (fAllowGenerations && freeMem < totalMem / 10 * 3 && nextRandDistib() < 30)
{
startNewGeneration();
}
else if (fAllowGenerations && fMM->getFree() < freeMemLimit)
{
startNewGeneration();
}
else if (!fAllowGenerations && freeMem < 0 && freeAttempts == 1)
{
// safety guard so aggregation couldn't eat all available memory
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_TOO_BIG),
logging::ERR_DISKAGG_TOO_BIG);
}
}
void RowAggStorage::append(RowAggStorage& other)
{
// we don't need neither key rows storage nor any internal data anymore
// neither in this RowAggStorage nor in the other
cleanup();
freeData();
if (other.fGeneration == 0 || !fEnabledDiskAggregation)
{
// even if there is several generations in the other RowAggStorage
// in case of in-memory-only aggregation they all share the same RowStorage
other.cleanup();
other.freeData();
fStorage->append(std::move(other.fStorage));
return;
}
// iff other RowAggStorage has several generations, sequential load and append
// them all
// the only needed data is the aggregated RowStorage itself
auto gen = other.fGeneration;
while (true)
{
fStorage->append(other.fStorage.get());
other.cleanup();
if (gen == 0)
break;
--gen;
other.fGeneration = gen;
other.fStorage.reset(other.fStorage->clone(gen));
}
}
std::unique_ptr<RGData> RowAggStorage::getNextRGData()
{
if (!fStorage)
{
return {};
}
cleanup();
freeData();
return fStorage->getNextRGData();
}
void RowAggStorage::freeData()
{
for (auto& data : fGens)
{
data->fHashes.reset();
if (data->fInfo)
{
const size_t memSz = calcSizeWithBuffer(data->fMask + 1);
fMM->release(memSz);
data->fInfo.reset();
}
}
fGens.clear();
fCurData = nullptr;
}
void RowAggStorage::shiftUp(size_t startIdx, size_t insIdx)
{
auto idx = startIdx;
while (idx != insIdx)
{
fCurData->fInfo[idx] = static_cast<uint8_t>(fCurData->fInfo[idx - 1] + fCurData->fInfoInc);
if (UNLIKELY(fCurData->fInfo[idx] + fCurData->fInfoInc > 0xFF))
{
fCurData->fMaxSize = 0;
}
--idx;
}
fCurData->fHashes->shiftUp(startIdx, insIdx);
}
void RowAggStorage::increaseSize()
{
if (fCurData->fMask == 0)
{
initData(INIT_SIZE, fCurData->fHashes.get());
}
const auto maxSize = calcMaxSize(fCurData->fMask + 1);
if (fCurData->fSize < maxSize && tryIncreaseInfo())
return;
constexpr size_t maxMaskMultiplierWoRehashing = 1U << (INIT_INFO_INC - 1);
// We don't check for the overflow here b/c it is impractical to has fSize so that multiplication
// overflows.
if (fCurData->fSize * maxMaskMultiplierWoRehashing < calcMaxSize(fCurData->fMask + 1))
{
// something strange happens...
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_OVERFLOW2),
logging::ERR_DISKAGG_OVERFLOW2);
}
auto freeMem = fMM->getFree();
if (fEnabledDiskAggregation ||
freeMem > (fMM->getUsed() + fCurData->fHashes->memUsage() + fStorage->getAproxRGSize()) * 2)
{
if (fCurData->fSize * 2 < maxSize)
{
// we have to resize, even though there would still be plenty of space left!
// Try to rehash instead. Delete freed memory so we don't steadyily increase mem in case
// we have to rehash a few times
// adding an *even* number, so that the multiplier will always stay odd. This is necessary
// so that the hash stays a mixing function (and thus doesn't have any information loss).
fCurData->hashMultiplier_ += 0xc4ceb9fe1a85ec54;
rehashPowerOfTwo(fCurData->fMask + 1);
}
else
{
rehashPowerOfTwo((fCurData->fMask + 1) * 2);
}
}
else if (fGeneration < MAX_INMEMORY_GENS - 1)
{
startNewGeneration();
}
else
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
}
bool RowAggStorage::tryIncreaseInfo()
{
if (fCurData->fInfoInc <= 2)
return false;
fCurData->fInfoInc = static_cast<uint8_t>(fCurData->fInfoInc >> 1U);
++fCurData->fInfoHashShift;
const auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
for (size_t i = 0; i < elems; i += 8)
{
uint64_t val;
memcpy(&val, fCurData->fInfo.get() + i, sizeof(val));
val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL;
memcpy(fCurData->fInfo.get() + i, &val, sizeof(val));
}
fCurData->fInfo[elems] = 1;
fCurData->fMaxSize = calcMaxSize(fCurData->fMask + 1);
return true;
}
void RowAggStorage::rehashPowerOfTwo(size_t elems)
{
const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1);
auto oldInfo = std::move(fCurData->fInfo);
auto oldHashes = std::move(fCurData->fHashes);
fMM->release(calcBytes(oldSz));
initData(elems, oldHashes.get());
oldHashes->releaseMemory();
if (oldSz > 1)
{
for (size_t i = 0; i < oldSz; ++i)
{
if (UNLIKELY(oldInfo[i] != 0))
{
insertSwap(i, oldHashes.get());
}
}
}
}
void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes)
{
if (fCurData->fMaxSize == 0 && !tryIncreaseInfo())
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_OVERFLOW1),
logging::ERR_DISKAGG_OVERFLOW1);
}
auto pos = oldHashes->get(oldIdx);
auto [info, idx] = rowHashToIdx(pos.hash);
while (info <= fCurData->fInfo[idx])
{
++idx;
info += fCurData->fInfoInc;
}
// don't need to compare rows here - they differ by definition
const auto ins_idx = idx;
const auto ins_info = static_cast<uint8_t>(info);
if (UNLIKELY(ins_info + fCurData->fInfoInc > 0xFF))
fCurData->fMaxSize = 0;
while (fCurData->fInfo[idx] != 0)
{
next(info, idx);
}
if (idx != ins_idx)
shiftUp(idx, ins_idx);
fCurData->fHashes->set(ins_idx, pos);
fCurData->fInfo[ins_idx] = ins_info;
++fCurData->fSize;
}
void RowAggStorage::initData(size_t elems, const RowPosHashStorage* oldHashes)
{
fCurData->fSize = 0;
fCurData->fMask = elems - 1;
fCurData->fMaxSize = calcMaxSize(elems);
const auto sizeWithBuffer = calcSizeWithBuffer(elems, fCurData->fMaxSize);
const auto bytes = calcBytes(sizeWithBuffer);
if (!fMM->acquire(bytes))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
logging::ERR_AGGREGATION_TOO_BIG);
}
fCurData->fHashes = oldHashes->clone(elems, fGeneration);
fCurData->fInfo.reset(new uint8_t[bytes]());
fCurData->fInfo[sizeWithBuffer] = 1;
fCurData->fInfoInc = INIT_INFO_INC;
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
}
void RowAggStorage::reserve(size_t c)
{
auto const minElementsAllowed = (std::max)(c, fCurData->fSize);
auto newSize = INIT_SIZE;
while (calcMaxSize(newSize) < minElementsAllowed && newSize != 0)
{
newSize *= 2;
}
if (UNLIKELY(newSize == 0))
{
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_ERROR),
logging::ERR_DISKAGG_ERROR);
}
// only actually do anything when the new size is bigger than the old one. This prevents to
// continuously allocate for each reserve() call.
if (newSize > fCurData->fMask + 1)
{
rehashPowerOfTwo(newSize);
}
}
void RowAggStorage::startNewGeneration()
{
if (!fEnabledDiskAggregation)
{
++fGeneration;
fGens.emplace_back(new Data);
auto* newData = fGens.back().get();
newData->fHashes = fCurData->fHashes->clone(0, fGeneration);
fCurData = newData;
reserve(fMaxRows);
return;
}
if (fCurData->fSize == 0)
return;
// save all data and free storages' memory
dumpInternalData();
fCurData->fHashes->startNewGeneration();
fStorage->startNewGeneration();
if (fExtKeys)
fKeysStorage->startNewGeneration();
++fGeneration;
fMM->release();
// reinitialize internal structures
fCurData->fInfo.reset();
fCurData->fSize = 0;
fCurData->fMask = 0;
fCurData->fMaxSize = 0;
fCurData->fInfoInc = INIT_INFO_INC;
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
reserve(fMaxRows);
fAggregated = false;
}
std::string RowAggStorage::makeDumpFilename(int32_t gen) const
{
char fname[PATH_MAX];
uint16_t rgen = gen < 0 ? fGeneration : gen;
snprintf(fname, sizeof(fname), "%s/AggMap-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId, rgen);
return fname;
}
void RowAggStorage::dumpInternalData() const
{
if (!fCurData->fInfo)
return;
messageqcpp::ByteStream bs;
bs << fCurData->fSize;
bs << fCurData->fMask;
bs << fCurData->fMaxSize;
bs << fCurData->hashMultiplier_;
bs << fCurData->fInfoInc;
bs << fCurData->fInfoHashShift;
bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
int errNo;
if ((errNo = writeData(fd, (const char*)bs.buf(), bs.length())) != 0)
{
close(fd);
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
close(fd);
}
void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
{
if (fAggregated || fGeneration == 0 || !fEnabledDiskAggregation)
{
cleanup();
return;
}
Row tmpKeyRow;
fKeysStorage->initRow(tmpKeyRow);
Row tmpRow;
fStorage->initRow(tmpRow);
dumpInternalData();
fCurData->fHashes->dump();
fStorage->dumpAll();
if (fExtKeys)
fKeysStorage->dumpAll();
uint16_t curGen = fGeneration + 1;
while (--curGen > 0)
{
bool genUpdated = false;
if (fCurData->fSize == 0)
{
fStorage->dumpFinalizedInfo();
if (fExtKeys)
fKeysStorage->dumpFinalizedInfo();
cleanup(curGen);
loadGeneration(curGen - 1);
auto oh = std::move(fCurData->fHashes);
fCurData->fHashes = oh->clone(0, curGen - 1, true);
fStorage.reset(fStorage->clone(curGen - 1));
if (fExtKeys)
{
fRealKeysStorage.reset(fRealKeysStorage->clone(curGen - 1));
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = fStorage.get();
continue;
}
std::unique_ptr<RowPosHashStorage> prevHashes;
size_t prevSize;
size_t prevMask;
size_t prevMaxSize;
size_t prevHashMultiplier;
uint32_t prevInfoInc;
uint32_t prevInfoHashShift;
std::unique_ptr<uint8_t[]> prevInfo;
std::unique_ptr<RowGroupStorage> prevRowStorage;
std::unique_ptr<RowGroupStorage> prevRealKeyRowStorage;
RowGroupStorage* prevKeyRowStorage{nullptr};
auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
for (uint16_t prevGen = 0; prevGen < curGen; ++prevGen)
{
prevRowStorage.reset(fStorage->clone(prevGen));
if (fExtKeys)
{
prevRealKeyRowStorage.reset(fKeysStorage->clone(prevGen));
prevKeyRowStorage = prevRealKeyRowStorage.get();
}
else
prevKeyRowStorage = prevRowStorage.get();
loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevHashMultiplier, prevInfoInc,
prevInfoHashShift, prevInfo);
prevHashes = fCurData->fHashes->clone(prevMask + 1, prevGen, true);
// iterate over current generation rows
uint64_t idx{};
uint32_t info{};
for (;; next(info, idx))
{
nextExisting(info, idx);
if (idx >= elems)
{
// done finalizing generation
break;
}
const auto& pos = fCurData->fHashes->get(idx);
if (fKeysStorage->isFinalized(pos.idx))
{
// this row was already merged into newer generation, skip it
continue;
}
// now try to find row in the previous generation
auto [pinfo, pidx] =
rowHashToIdx(pos.hash, prevMask, prevHashMultiplier, prevInfoInc, prevInfoHashShift);
while (pinfo < prevInfo[pidx])
{
++pidx;
pinfo += prevInfoInc;
}
if (prevInfo[pidx] != pinfo)
{
// there is no such row
continue;
}
auto ppos = prevHashes->get(pidx);
while (prevInfo[pidx] == pinfo && pos.hash != ppos.hash)
{
// hashes are not equal => collision, try all matched rows
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
}
if (prevInfo[pidx] != pinfo || pos.hash != ppos.hash)
{
// no matches
continue;
}
bool found = false;
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
fKeysStorage->getRow(pos.idx, keyRow);
while (!found && prevInfo[pidx] == pinfo)
{
// try to find exactly match in case of hash collision
if (UNLIKELY(pos.hash != ppos.hash))
{
// hashes are not equal => no such row
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
continue;
}
prevKeyRowStorage->getRow(ppos.idx, fExtKeys ? tmpKeyRow : tmpRow);
if (!keyRow.equals(fExtKeys ? tmpKeyRow : tmpRow, fLastKeyCol))
{
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
continue;
}
found = true;
}
if (!found)
{
// nothing was found, go to the next row
continue;
}
if (UNLIKELY(prevKeyRowStorage->isFinalized(ppos.idx)))
{
// just to be sure, it can NEVER happen
continue;
}
// here it is! So:
// 1 Get actual row datas
// 2 Merge it into the current generation
// 3 Mark generations as updated
// 4 Mark the prev generation row as finalized
if (fExtKeys)
{
prevRowStorage->getRow(ppos.idx, tmpRow);
fStorage->getRow(pos.idx, rowOut);
}
mergeFunc(tmpRow);
genUpdated = true;
prevKeyRowStorage->markFinalized(ppos.idx);
if (fExtKeys)
{
prevRowStorage->markFinalized(ppos.idx);
}
}
prevRowStorage->dumpFinalizedInfo();
if (fExtKeys)
{
prevKeyRowStorage->dumpFinalizedInfo();
}
}
fStorage->dumpFinalizedInfo();
if (fExtKeys)
fKeysStorage->dumpFinalizedInfo();
if (genUpdated)
{
// we have to save current generation data to disk 'cause we update some rows
// TODO: to reduce disk IO we can dump only affected rowgroups
fStorage->dumpAll(false);
if (fExtKeys)
fKeysStorage->dumpAll(false);
}
// swap current generation N with the prev generation N-1
fCurData->fSize = prevSize;
fCurData->fMask = prevMask;
fCurData->fMaxSize = prevMaxSize;
fCurData->fInfoInc = prevInfoInc;
fCurData->fInfoHashShift = prevInfoHashShift;
fCurData->fInfo = std::move(prevInfo);
fCurData->fHashes = std::move(prevHashes);
fStorage = std::move(prevRowStorage);
if (fExtKeys)
{
fRealKeysStorage = std::move(prevRealKeyRowStorage);
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = prevKeyRowStorage;
}
fStorage->dumpFinalizedInfo();
if (fExtKeys)
fKeysStorage->dumpFinalizedInfo();
auto oh = std::move(fCurData->fHashes);
fCurData->fHashes = oh->clone(fCurData->fMask + 1, fGeneration, true);
fStorage.reset(fStorage->clone(fGeneration));
if (fExtKeys)
{
fRealKeysStorage.reset(fRealKeysStorage->clone(fGeneration));
fKeysStorage = fRealKeysStorage.get();
}
else
fKeysStorage = fStorage.get();
fAggregated = true;
}
void RowAggStorage::loadGeneration(uint16_t gen)
{
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->hashMultiplier_,
fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo);
}
void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize,
size_t& hashMultiplier, uint32_t& infoInc, uint32_t& infoHashShift,
std::unique_ptr<uint8_t[]>& info)
{
messageqcpp::ByteStream bs;
int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY);
if (fd < 0)
{
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
struct stat st
{
};
fstat(fd, &st);
bs.needAtLeast(st.st_size);
bs.restart();
int errNo;
if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0)
{
close(fd);
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
close(fd);
bs.advanceInputPtr(st.st_size);
bs >> size;
bs >> mask;
bs >> maxSize;
bs >> hashMultiplier;
bs >> infoInc;
bs >> infoHashShift;
size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize));
info.reset(new uint8_t[infoSz]());
uint8_t* tmp = info.get();
bs >> tmp;
}
void RowAggStorage::cleanupAll() noexcept
{
try
{
boost::filesystem::remove_all(fTmpDir);
}
catch (...)
{
}
}
void RowAggStorage::cleanup()
{
cleanup(fGeneration);
}
void RowAggStorage::cleanup(uint16_t gen)
{
if (!fInitialized)
return;
unlink(makeDumpFilename(gen).c_str());
}
size_t RowAggStorage::getBucketSize()
{
return 1 /* info byte */ + sizeof(RowPosHash);
}
uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets,
uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize,
bool enabledDiskAggr)
{
if (availMem < 0)
{
// Most likely, nothing can be processed, but we will still try
return 1;
}
uint32_t ret = numOfBuckets;
ssize_t minNeededMemPerThread = groupsPerThread * inRowSize * RowAggStorage::getMaxRows(false);
auto rowGroupSize = RowAggStorage::getMaxRows(enabledDiskAggr);
ssize_t minNeededMemPerBucket =
outRowSize * rowGroupSize * 2 +
RowAggStorage::calcSizeWithBuffer(rowGroupSize, rowGroupSize) * RowAggStorage::getBucketSize();
if ((availMem - minNeededMemPerThread * numOfThreads) / numOfBuckets < minNeededMemPerBucket)
{
ret = 0;
if (availMem > minNeededMemPerThread * numOfThreads)
{
ret = (availMem - minNeededMemPerThread * numOfThreads) / minNeededMemPerBucket;
}
if (ret < numOfThreads)
{
ret = availMem / (minNeededMemPerBucket + minNeededMemPerThread);
}
}
return ret == 0 ? 1 : ret;
}
} // namespace rowgroup