You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
* feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization Iterate over the rows in the plain vector of RGData instead of iterating over the hashmap. This reduces the complexity and speeds up finalization (by up to the twice in the certain cases) * replace magic constant with muggle constant
2547 lines
67 KiB
C++
2547 lines
67 KiB
C++
/* Copyright (C) 2021-2022 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
#include <boost/filesystem.hpp>
|
|
#include <cstdint>
|
|
#include "branchpred.h"
|
|
#include "rowgroup.h"
|
|
#include <resourcemanager.h>
|
|
#include <fcntl.h>
|
|
#include "rowstorage.h"
|
|
#include "robin_hood.h"
|
|
|
|
//#define DISK_AGG_DEBUG
|
|
|
|
namespace
|
|
{
|
|
int writeData(int fd, const char* buf, size_t sz)
|
|
{
|
|
if (sz == 0)
|
|
return 0;
|
|
|
|
auto to_write = sz;
|
|
while (to_write > 0)
|
|
{
|
|
auto r = write(fd, buf + sz - to_write, to_write);
|
|
if (UNLIKELY(r < 0))
|
|
{
|
|
if (errno == EAGAIN)
|
|
continue;
|
|
|
|
return errno;
|
|
}
|
|
assert(size_t(r) <= to_write);
|
|
to_write -= r;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int readData(int fd, char* buf, size_t sz)
|
|
{
|
|
if (sz == 0)
|
|
return 0;
|
|
|
|
auto to_read = sz;
|
|
while (to_read > 0)
|
|
{
|
|
auto r = read(fd, buf + sz - to_read, to_read);
|
|
if (UNLIKELY(r < 0))
|
|
{
|
|
if (errno == EAGAIN)
|
|
continue;
|
|
|
|
return errno;
|
|
}
|
|
|
|
assert(size_t(r) <= to_read);
|
|
to_read -= r;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
std::string errorString(int errNo)
|
|
{
|
|
char tmp[1024];
|
|
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
|
|
return {buf};
|
|
}
|
|
|
|
size_t findFirstSetBit(const uint64_t mask)
|
|
{
|
|
return __builtin_ffsll(mask);
|
|
}
|
|
} // anonymous namespace
|
|
|
|
namespace rowgroup
|
|
{
|
|
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol)
|
|
{
|
|
uint64_t ret = 0;
|
|
if (lastCol >= r.getColumnCount())
|
|
return 0;
|
|
|
|
datatypes::MariaDBHasher h;
|
|
utils::Hasher64_r columnHasher;
|
|
|
|
bool strHashUsed = false;
|
|
|
|
for (uint32_t i = 0; i <= lastCol; ++i)
|
|
{
|
|
switch (r.getColType(i))
|
|
{
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
auto cs = r.getCharset(i);
|
|
auto strColValue = r.getConstString(i);
|
|
auto strColValueLen = strColValue.length();
|
|
if (strColValueLen > MaxConstStrSize)
|
|
{
|
|
h.add(cs, strColValue);
|
|
strHashUsed = true;
|
|
}
|
|
else
|
|
{
|
|
// This is relatively big stack allocation.
|
|
// It is aligned for future vectorization of hash calculation.
|
|
uchar buf[MaxConstStrBufSize] __attribute__((aligned(64)));
|
|
// Pay attention to the last strxfrm argument value.
|
|
// It is called flags and in many cases it has padding
|
|
// enabled(MY_STRXFRM_PAD_WITH_SPACE bit). With padding enabled
|
|
// strxfrm returns MaxConstStrBufSize bytes and not the actual
|
|
// weights array length. Here I disable padding.
|
|
auto charset = datatypes::Charset(cs);
|
|
auto trimStrColValue = strColValue.rtrimSpaces();
|
|
// The padding is disabled b/c we previously use rtrimSpaces().
|
|
// strColValueLen is used here.
|
|
size_t nActualWeights = charset.strnxfrm(buf, MaxConstStrBufSize, strColValueLen,
|
|
reinterpret_cast<const uchar*>(trimStrColValue.str()),
|
|
trimStrColValue.length(), 0);
|
|
ret = columnHasher(reinterpret_cast<const void*>(buf), nActualWeights, ret);
|
|
}
|
|
break;
|
|
}
|
|
default: ret = columnHasher(r.getData() + r.getOffset(i), r.getColumnWidth(i), ret); break;
|
|
}
|
|
}
|
|
|
|
// The properties of the hash produced are worse if MDB hasher results are incorporated
|
|
// so late but these results must be used very infrequently.
|
|
if (strHashUsed)
|
|
{
|
|
uint64_t strhash = h.finalize();
|
|
ret = columnHasher(&strhash, sizeof(strhash), ret);
|
|
}
|
|
return columnHasher.finalize(ret, lastCol << 2);
|
|
}
|
|
|
|
/** @brief NoOP interface to LRU-cache used by RowGroupStorage & HashStorage
|
|
*/
|
|
struct LRUIface
|
|
{
|
|
using List = std::list<uint64_t>;
|
|
|
|
virtual ~LRUIface() = default;
|
|
/** @brief Put an ID to cache or set it as last used */
|
|
virtual void add(uint64_t)
|
|
{
|
|
}
|
|
/** @brief Remove an ID from cache */
|
|
virtual void remove(uint64_t)
|
|
{
|
|
}
|
|
/** @brief Get iterator of the most recently used ID */
|
|
virtual List::const_reverse_iterator begin() const
|
|
{
|
|
return List::const_reverse_iterator();
|
|
}
|
|
/** @brief Get iterator after the latest ID */
|
|
virtual List::const_reverse_iterator end() const
|
|
{
|
|
return List::const_reverse_iterator();
|
|
}
|
|
/** @brief Get iterator of the latest ID */
|
|
virtual List::const_iterator rbegin() const
|
|
{
|
|
return {};
|
|
}
|
|
/** @brief Get iterator after the most recently used ID */
|
|
virtual List::const_iterator rend() const
|
|
{
|
|
return {};
|
|
}
|
|
|
|
virtual void clear()
|
|
{
|
|
}
|
|
virtual std::size_t size() const
|
|
{
|
|
return 0;
|
|
}
|
|
virtual bool empty() const
|
|
{
|
|
return true;
|
|
}
|
|
virtual LRUIface* clone() const
|
|
{
|
|
return new LRUIface();
|
|
}
|
|
};
|
|
|
|
struct LRU : public LRUIface
|
|
{
|
|
~LRU() override
|
|
{
|
|
fMap.clear();
|
|
fList.clear();
|
|
}
|
|
inline void add(uint64_t rgid) final
|
|
{
|
|
auto it = fMap.find(rgid);
|
|
if (it != fMap.end())
|
|
{
|
|
fList.erase(it->second);
|
|
}
|
|
fMap[rgid] = fList.insert(fList.end(), rgid);
|
|
}
|
|
|
|
inline void remove(uint64_t rgid) final
|
|
{
|
|
auto it = fMap.find(rgid);
|
|
if (UNLIKELY(it != fMap.end()))
|
|
{
|
|
fList.erase(it->second);
|
|
fMap.erase(it);
|
|
}
|
|
}
|
|
|
|
inline List::const_reverse_iterator begin() const final
|
|
{
|
|
return fList.crbegin();
|
|
}
|
|
inline List::const_reverse_iterator end() const final
|
|
{
|
|
return fList.crend();
|
|
}
|
|
inline List::const_iterator rbegin() const final
|
|
{
|
|
return fList.cbegin();
|
|
}
|
|
inline List::const_iterator rend() const final
|
|
{
|
|
return fList.cend();
|
|
}
|
|
inline void clear() final
|
|
{
|
|
fMap.clear();
|
|
fList.clear();
|
|
}
|
|
|
|
size_t size() const final
|
|
{
|
|
return fMap.size();
|
|
}
|
|
bool empty() const final
|
|
{
|
|
return fList.empty();
|
|
}
|
|
|
|
LRUIface* clone() const final
|
|
{
|
|
return new LRU();
|
|
}
|
|
|
|
robin_hood::unordered_flat_map<uint64_t, List::iterator> fMap;
|
|
List fList;
|
|
};
|
|
|
|
/** @brief Some service wrapping around ResourceManager (or NoOP) */
|
|
class MemManager
|
|
{
|
|
public:
|
|
MemManager()
|
|
{
|
|
}
|
|
virtual ~MemManager()
|
|
{
|
|
release(fMemUsed);
|
|
}
|
|
|
|
bool acquire(ssize_t amount)
|
|
{
|
|
if (UNLIKELY(-amount > fMemUsed))
|
|
amount = -fMemUsed;
|
|
return acquireImpl(amount);
|
|
}
|
|
void release(ssize_t amount = 0)
|
|
{
|
|
// in some cases it tries to release more memory than acquired, ie create
|
|
// new rowgroup, acquire maximum size (w/o strings), add some rows with
|
|
// strings and finally release the actual size of RG with strings
|
|
if (amount == 0 || amount > fMemUsed)
|
|
amount = fMemUsed;
|
|
releaseImpl(amount);
|
|
}
|
|
|
|
ssize_t getUsed() const
|
|
{
|
|
return fMemUsed;
|
|
}
|
|
virtual int64_t getFree() const
|
|
{
|
|
return std::numeric_limits<int64_t>::max();
|
|
}
|
|
|
|
virtual int64_t getConfigured() const
|
|
{
|
|
return std::numeric_limits<int64_t>::max();
|
|
}
|
|
|
|
virtual bool isStrict() const
|
|
{
|
|
return false;
|
|
}
|
|
|
|
virtual MemManager* clone() const
|
|
{
|
|
return new MemManager();
|
|
}
|
|
|
|
virtual joblist::ResourceManager* getResourceManaged()
|
|
{
|
|
return nullptr;
|
|
}
|
|
virtual boost::shared_ptr<int64_t> getSessionLimit()
|
|
{
|
|
return {};
|
|
}
|
|
|
|
protected:
|
|
virtual bool acquireImpl(ssize_t amount)
|
|
{
|
|
fMemUsed += amount;
|
|
return true;
|
|
}
|
|
virtual void releaseImpl(ssize_t amount)
|
|
{
|
|
fMemUsed -= amount;
|
|
}
|
|
ssize_t fMemUsed{0};
|
|
};
|
|
|
|
class RMMemManager : public MemManager
|
|
{
|
|
public:
|
|
RMMemManager(joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sl, bool wait = true,
|
|
bool strict = true)
|
|
: fRm(rm), fSessLimit(std::move(sl)), fWait(wait), fStrict(strict)
|
|
{
|
|
}
|
|
|
|
~RMMemManager() override
|
|
{
|
|
release(fMemUsed);
|
|
fMemUsed = 0;
|
|
}
|
|
|
|
int64_t getConfigured() const final
|
|
{
|
|
return fRm->getConfiguredUMMemLimit();
|
|
}
|
|
|
|
int64_t getFree() const final
|
|
{
|
|
return std::min(fRm->availableMemory(), *fSessLimit);
|
|
}
|
|
|
|
bool isStrict() const final
|
|
{
|
|
return fStrict;
|
|
}
|
|
|
|
MemManager* clone() const final
|
|
{
|
|
return new RMMemManager(fRm, fSessLimit, fWait, fStrict);
|
|
}
|
|
|
|
joblist::ResourceManager* getResourceManaged() override
|
|
{
|
|
return fRm;
|
|
}
|
|
boost::shared_ptr<int64_t> getSessionLimit() override
|
|
{
|
|
return fSessLimit;
|
|
}
|
|
|
|
protected:
|
|
bool acquireImpl(ssize_t amount) final
|
|
{
|
|
if (amount)
|
|
{
|
|
if (!fRm->getMemory(amount, fSessLimit, fWait) && fStrict)
|
|
{
|
|
return false;
|
|
}
|
|
MemManager::acquireImpl(amount);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void releaseImpl(ssize_t amount) override
|
|
{
|
|
if (amount)
|
|
{
|
|
MemManager::releaseImpl(amount);
|
|
fRm->returnMemory(amount, fSessLimit);
|
|
}
|
|
}
|
|
|
|
private:
|
|
joblist::ResourceManager* fRm = nullptr;
|
|
boost::shared_ptr<int64_t> fSessLimit;
|
|
const bool fWait;
|
|
const bool fStrict;
|
|
};
|
|
|
|
class Dumper
|
|
{
|
|
public:
|
|
Dumper(const compress::CompressInterface* comp, MemManager* mm) : fCompressor(comp), fMM(mm->clone())
|
|
{
|
|
}
|
|
|
|
int write(const std::string& fname, const char* buf, size_t sz)
|
|
{
|
|
if (sz == 0)
|
|
return 0;
|
|
|
|
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
|
if (UNLIKELY(fd < 0))
|
|
return errno;
|
|
|
|
const char* tmpbuf;
|
|
if (fCompressor)
|
|
{
|
|
auto len = fCompressor->maxCompressedSize(sz);
|
|
checkBuffer(len);
|
|
fCompressor->compress(buf, sz, fTmpBuf.data(), &len);
|
|
tmpbuf = fTmpBuf.data();
|
|
sz = len;
|
|
}
|
|
else
|
|
{
|
|
tmpbuf = buf;
|
|
}
|
|
|
|
auto to_write = sz;
|
|
int ret = 0;
|
|
while (to_write > 0)
|
|
{
|
|
auto r = ::write(fd, tmpbuf + sz - to_write, to_write);
|
|
if (UNLIKELY(r < 0))
|
|
{
|
|
if (errno == EAGAIN)
|
|
continue;
|
|
|
|
ret = errno;
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
assert(size_t(r) <= to_write);
|
|
to_write -= r;
|
|
}
|
|
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
|
|
int read(const std::string& fname, std::vector<char>& buf)
|
|
{
|
|
int fd = open(fname.c_str(), O_RDONLY);
|
|
if (UNLIKELY(fd < 0))
|
|
return errno;
|
|
|
|
struct stat st
|
|
{
|
|
};
|
|
fstat(fd, &st);
|
|
size_t sz = st.st_size;
|
|
std::vector<char>* tmpbuf;
|
|
if (fCompressor)
|
|
{
|
|
tmpbuf = &fTmpBuf;
|
|
checkBuffer(sz);
|
|
}
|
|
else
|
|
{
|
|
tmpbuf = &buf;
|
|
buf.resize(sz);
|
|
}
|
|
|
|
auto to_read = sz;
|
|
int ret = 0;
|
|
while (to_read > 0)
|
|
{
|
|
auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read);
|
|
if (UNLIKELY(r < 0))
|
|
{
|
|
if (errno == EAGAIN)
|
|
continue;
|
|
|
|
ret = errno;
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
|
|
assert(size_t(r) <= to_read);
|
|
to_read -= r;
|
|
}
|
|
|
|
if (fCompressor)
|
|
{
|
|
size_t len;
|
|
if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len))
|
|
{
|
|
ret = EPROTO;
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
|
|
buf.resize(len);
|
|
fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len);
|
|
}
|
|
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
|
|
size_t size() const
|
|
{
|
|
return fTmpBuf.size();
|
|
}
|
|
|
|
private:
|
|
void checkBuffer(size_t len)
|
|
{
|
|
if (fTmpBuf.size() < len)
|
|
{
|
|
size_t newtmpsz = (len + 8191) / 8192 * 8192;
|
|
std::vector<char> tmpvec(newtmpsz);
|
|
fMM->acquire(newtmpsz - fTmpBuf.size());
|
|
fTmpBuf.swap(tmpvec);
|
|
}
|
|
}
|
|
|
|
private:
|
|
const compress::CompressInterface* fCompressor;
|
|
std::unique_ptr<MemManager> fMM;
|
|
std::vector<char> fTmpBuf;
|
|
};
|
|
|
|
/** @brief Storage for RGData with LRU-cache & memory management
|
|
*/
|
|
class RowGroupStorage
|
|
{
|
|
public:
|
|
using RGDataStorage = std::vector<RGDataUnPtr>;
|
|
|
|
public:
|
|
/** @brief Default constructor
|
|
*
|
|
* @param tmpDir(in) directory for tmp data
|
|
* @param rowGroupOut(in,out) RowGroup metadata
|
|
* @param maxRows(in) number of rows per rowgroup
|
|
* @param rm ResourceManager to use or nullptr if we don't
|
|
* need memory accounting
|
|
* @param sessLimit session memory limit
|
|
* @param wait shall we wait a bit if we haven't enough memory
|
|
* right now?
|
|
* @param strict true -> throw an exception if not enough memory
|
|
* false -> deal with it later
|
|
* @param compressor pointer to CompressInterface impl or nullptr
|
|
*/
|
|
RowGroupStorage(const std::string& tmpDir, RowGroup* rowGroupOut, size_t maxRows,
|
|
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
|
|
bool wait = false, bool strict = false, compress::CompressInterface* compressor = nullptr)
|
|
: fRowGroupOut(rowGroupOut)
|
|
, fMaxRows(maxRows)
|
|
, fRGDatas()
|
|
, fUniqId(this)
|
|
, fTmpDir(tmpDir)
|
|
, fCompressor(compressor)
|
|
, fUseDisk(!strict)
|
|
{
|
|
if (rm)
|
|
{
|
|
fMM.reset(new RMMemManager(rm, sessLimit, wait, strict));
|
|
if (!wait && !strict)
|
|
{
|
|
fLRU = std::unique_ptr<LRUIface>(new LRU());
|
|
}
|
|
else
|
|
{
|
|
fLRU = std::unique_ptr<LRUIface>(new LRUIface());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
fMM.reset(new MemManager());
|
|
fLRU = std::unique_ptr<LRUIface>(new LRUIface());
|
|
}
|
|
|
|
fDumper.reset(new Dumper(fCompressor, fMM.get()));
|
|
|
|
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
|
|
fRowGroupOut->setData(curRG);
|
|
fRowGroupOut->resetRowGroup(0);
|
|
fRGDatas.emplace_back(curRG);
|
|
fMM->acquire(fRowGroupOut->getSizeWithStrings(fMaxRows));
|
|
}
|
|
|
|
~RowGroupStorage() = default;
|
|
|
|
ssize_t getAproxRGSize() const
|
|
{
|
|
return fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
}
|
|
|
|
// This shifts data within RGData such that it compacts the non finalized rows
|
|
PosOpos shiftRowsInRowGroup(RGDataUnPtr& rgdata, uint64_t fgid, uint64_t tgid)
|
|
{
|
|
uint64_t pos = 0;
|
|
uint64_t opos = 0;
|
|
|
|
fRowGroupOut->setData(rgdata.get());
|
|
for (auto i = fgid; i < tgid; ++i)
|
|
{
|
|
if ((i - fgid) * HashMaskElements >= fRowGroupOut->getRowCount())
|
|
break;
|
|
uint64_t mask = ~fFinalizedRows[i];
|
|
if ((i - fgid + 1) * HashMaskElements > fRowGroupOut->getRowCount())
|
|
{
|
|
mask &= (~0ULL) >> ((i - fgid + 1) * HashMaskElements - fRowGroupOut->getRowCount());
|
|
}
|
|
opos = (i - fgid) * HashMaskElements;
|
|
|
|
if (mask == ~0ULL)
|
|
{
|
|
if (LIKELY(pos != opos))
|
|
moveRows(rgdata.get(), pos, opos, HashMaskElements);
|
|
pos += HashMaskElements;
|
|
continue;
|
|
}
|
|
|
|
if (mask == 0)
|
|
continue;
|
|
|
|
while (mask != 0)
|
|
{
|
|
// find position until block full of not finalized rows.
|
|
size_t b = findFirstSetBit(mask);
|
|
size_t e = findFirstSetBit(~(mask >> b)) + b;
|
|
if (UNLIKELY(e >= HashMaskElements))
|
|
mask = 0;
|
|
else
|
|
mask >>= e;
|
|
if (LIKELY(pos != opos + b - 1))
|
|
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
|
|
pos += e - b;
|
|
opos += e;
|
|
}
|
|
--opos;
|
|
}
|
|
return {pos, opos};
|
|
}
|
|
|
|
/** @brief Take away RGDatas from another RowGroupStorage
|
|
*
|
|
* If some of the RGDatas is not in the memory do not load them,
|
|
* just rename dump file to match new RowGroupStorage pattern
|
|
*
|
|
* @param o RowGroupStorage to take from
|
|
*/
|
|
void append(std::unique_ptr<RowGroupStorage> o)
|
|
{
|
|
return append(o.get());
|
|
}
|
|
void append(RowGroupStorage* o)
|
|
{
|
|
RGDataUnPtr rgd;
|
|
std::string ofname;
|
|
while (o->getNextRGData(rgd, ofname))
|
|
{
|
|
fRGDatas.push_back(std::move(rgd));
|
|
uint64_t rgid = fRGDatas.size() - 1;
|
|
if (fRGDatas[rgid])
|
|
{
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
// An implicit s2u type cast.
|
|
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
if (!fMM->acquire(memSz))
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
|
|
if (fUseDisk && fMM->getFree() < memSz * 2)
|
|
{
|
|
saveRG(rgid);
|
|
fRGDatas[rgid].reset();
|
|
}
|
|
else
|
|
fLRU->add(rgid);
|
|
}
|
|
else
|
|
{
|
|
auto r = rename(ofname.c_str(), makeRGFilename(rgid).c_str());
|
|
if (UNLIKELY(r < 0))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
|
|
logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
}
|
|
rgd.reset();
|
|
ofname.clear();
|
|
}
|
|
}
|
|
|
|
/** @brief Get the last RGData from fRGDatas, remove it from the vector and return its id.
|
|
*
|
|
* @param rgdata The RGData to be retrieved
|
|
*/
|
|
uint64_t getLastRGData(RGDataUnPtr& rgdata)
|
|
{
|
|
assert(!fRGDatas.empty());
|
|
uint64_t rgid = fRGDatas.size() - 1;
|
|
rgdata = std::move(fRGDatas[rgid]);
|
|
fRGDatas.pop_back();
|
|
return rgid;
|
|
}
|
|
|
|
static FgidTgid calculateGids(const uint64_t rgid, const uint64_t fMaxRows)
|
|
{
|
|
// Calculate from first and last uint64_t entry in fFinalizedRows BitMap
|
|
// which contains information about rows in the RGData.
|
|
uint64_t fgid = rgid * fMaxRows / HashMaskElements;
|
|
uint64_t tgid = fgid + fMaxRows / HashMaskElements;
|
|
return {fgid, tgid};
|
|
}
|
|
|
|
/** @brief Used to output aggregation results from memory and disk in the current generation in the form of
|
|
* RGData. Returns next RGData, loads from disk if necessary. Skips finalized rows as they would contain
|
|
* duplicate results, compacts actual rows into start of RGData and adapts number of rows transmitted in
|
|
* RGData.
|
|
* @returns A pointer to the next RGData or an empty pointer if there are no more RGDatas in this
|
|
* generation.
|
|
*/
|
|
bool getNextOutputRGData(RGDataUnPtr& rgdata)
|
|
{
|
|
if (UNLIKELY(fRGDatas.empty()))
|
|
{
|
|
fMM->release();
|
|
return false;
|
|
}
|
|
|
|
while (!fRGDatas.empty())
|
|
{
|
|
auto rgid = getLastRGData(rgdata);
|
|
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
|
|
|
|
if (fFinalizedRows.size() <= fgid)
|
|
{
|
|
// There are no finalized rows in this RGData. We can just return it.
|
|
// Load from disk if necessary and unlink DumpFile.
|
|
if (!rgdata)
|
|
{
|
|
loadRG(rgid, rgdata, true);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
if (tgid >= fFinalizedRows.size())
|
|
fFinalizedRows.resize(tgid + 1, 0ULL);
|
|
|
|
// Check if there are rows to process
|
|
bool hasReturnRows = false;
|
|
for (auto i = fgid; i < tgid; ++i)
|
|
{
|
|
if (fFinalizedRows[i] != ~0ULL)
|
|
{
|
|
// Not all rows are finalized, we have to return at least parts of this RGData
|
|
hasReturnRows = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (rgdata)
|
|
{
|
|
// RGData is currently in memory
|
|
if (!hasReturnRows)
|
|
{
|
|
// All rows are finalized, don't return this RGData
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (hasReturnRows)
|
|
{
|
|
// Load RGData from disk, unlink dump file and continue processing
|
|
loadRG(rgid, rgdata, true);
|
|
}
|
|
else
|
|
{
|
|
// All rows are finalized. Unlink dump file and continue search for return RGData
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
continue;
|
|
}
|
|
}
|
|
|
|
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
|
|
|
|
// Nothing got shifted at all -> all rows must be finalized. If all rows finalized remove
|
|
// RGData and file and don't give it out.
|
|
if (pos == 0)
|
|
{
|
|
fLRU->remove(rgid);
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
continue;
|
|
}
|
|
|
|
// set RGData with number of not finalized rows which have been compacted at front of RGData
|
|
fRowGroupOut->setData(rgdata.get());
|
|
fRowGroupOut->setRowCount(pos);
|
|
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
|
|
// Release the memory used by the current rgdata from this MemoryManager.
|
|
fMM->release(memSz);
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
|
|
// to periodically clean up freed memory so it can be used by other threads.
|
|
fLRU->remove(rgid);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/** @brief Returns next RGData, load it from disk if necessary.
|
|
*
|
|
* @returns pointer to the next RGData or empty pointer if there is nothing
|
|
*/
|
|
RGDataUnPtr getNextRGData()
|
|
{
|
|
while (!fRGDatas.empty())
|
|
{
|
|
uint64_t rgid = fRGDatas.size() - 1;
|
|
if (!fRGDatas[rgid])
|
|
loadRG(rgid, fRGDatas[rgid], true);
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
|
|
auto rgdata = std::move(fRGDatas[rgid]);
|
|
fRGDatas.pop_back();
|
|
|
|
fRowGroupOut->setData(rgdata.get());
|
|
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
|
|
fLRU->remove(rgid);
|
|
if (fRowGroupOut->getRowCount() == 0)
|
|
continue;
|
|
return rgdata;
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
void initRow(Row& row) const
|
|
{
|
|
fRowGroupOut->initRow(&row);
|
|
}
|
|
|
|
/** @brief Get the row at the specified position, loading corresponding RGData if needed.
|
|
*
|
|
* @param idx(in) index (from 0) of the row
|
|
* @param row(out) resulting row
|
|
*/
|
|
void getRow(uint64_t idx, Row& row)
|
|
{
|
|
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
|
|
if (UNLIKELY(!fRGDatas[rgid]))
|
|
{
|
|
loadRG(rgid);
|
|
}
|
|
fRGDatas[rgid]->getRow(rid, &row);
|
|
fLRU->add(rgid);
|
|
}
|
|
|
|
/** @brief Get the row at the specified position, skipping nonexistent rows
|
|
*
|
|
* @param idx(in, out) index (from 0) of the row, set to the actual index
|
|
* @param row(out) resulting row
|
|
*/
|
|
bool getRowForFinalization(uint64_t& idx, Row& row)
|
|
{
|
|
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
|
|
while (rgid < fRGDatas.size())
|
|
{
|
|
if (UNLIKELY(!fRGDatas[rgid]))
|
|
{
|
|
loadRG(rgid);
|
|
}
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
if (UNLIKELY(rid >= fRowGroupOut->getRowCount()))
|
|
{
|
|
++rgid;
|
|
rid = 0;
|
|
continue;
|
|
}
|
|
fRGDatas[rgid]->getRow(rid, &row);
|
|
idx = rowGidRidToIdx(rgid, rid, fMaxRows);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
size_t getRowsPerRG() const
|
|
{
|
|
return fMaxRows;
|
|
}
|
|
|
|
void dropRGData(uint64_t rgid)
|
|
{
|
|
if (UNLIKELY(!fRGDatas[rgid]))
|
|
{
|
|
return;
|
|
}
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
|
|
fRGDatas[rgid].reset();
|
|
}
|
|
|
|
/** @brief Return a row and an index at the first free position.
|
|
*
|
|
* @param idx(out) index of the row
|
|
* @param row(out) the row itself
|
|
*/
|
|
void putRow(uint64_t& idx, Row& row)
|
|
{
|
|
bool need_new = false;
|
|
if (UNLIKELY(fRGDatas.empty()))
|
|
{
|
|
need_new = true;
|
|
}
|
|
else if (UNLIKELY(!fRGDatas[fCurRgid]))
|
|
{
|
|
need_new = true;
|
|
}
|
|
else
|
|
{
|
|
fRowGroupOut->setData(fRGDatas[fCurRgid].get());
|
|
if (UNLIKELY(fRowGroupOut->getRowCount() >= fMaxRows))
|
|
need_new = true;
|
|
}
|
|
|
|
if (UNLIKELY(need_new))
|
|
{
|
|
for (auto rgid : *fLRU)
|
|
{
|
|
if (LIKELY(static_cast<bool>(fRGDatas[rgid])))
|
|
{
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
if (fRowGroupOut->getRowCount() < fMaxRows)
|
|
{
|
|
fCurRgid = rgid;
|
|
need_new = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (UNLIKELY(need_new))
|
|
{
|
|
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
if (!fMM->acquire(memSz))
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
|
|
fRowGroupOut->setData(curRG);
|
|
fRowGroupOut->resetRowGroup(0);
|
|
fRGDatas.emplace_back(curRG);
|
|
fCurRgid = fRGDatas.size() - 1;
|
|
}
|
|
|
|
fLRU->add(fCurRgid);
|
|
idx = rowGidRidToIdx(fCurRgid, fRowGroupOut->getRowCount(), fMaxRows);
|
|
fRowGroupOut->getRow(fRowGroupOut->getRowCount(), &row);
|
|
fRowGroupOut->incRowCount();
|
|
}
|
|
|
|
/** @brief Create a row at the specified position.
|
|
*
|
|
* Used only for key rows in case of external keys. Indexes of data row and
|
|
* corresponding key row are always the same.
|
|
*
|
|
* @param idx(in) index to create row
|
|
* @param row(out) row itself
|
|
*/
|
|
void putKeyRow(uint64_t idx, Row& row)
|
|
{
|
|
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
|
|
|
|
while (rgid >= fRGDatas.size())
|
|
{
|
|
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
if (!fMM->acquire(memSz))
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
|
|
fRowGroupOut->setData(curRG);
|
|
fRowGroupOut->resetRowGroup(0);
|
|
fRGDatas.emplace_back(curRG);
|
|
fCurRgid = fRGDatas.size() - 1;
|
|
fLRU->add(fCurRgid);
|
|
}
|
|
|
|
if (UNLIKELY(!fRGDatas[rgid]))
|
|
{
|
|
loadRG(rgid);
|
|
}
|
|
else
|
|
{
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
}
|
|
|
|
fLRU->add(rgid);
|
|
|
|
assert(idx % fMaxRows == fRowGroupOut->getRowCount());
|
|
fRowGroupOut->getRow(fRowGroupOut->getRowCount(), &row);
|
|
fRowGroupOut->incRowCount();
|
|
}
|
|
|
|
/** @brief Dump the oldest RGData to disk, freeing memory
|
|
*
|
|
* @returns true if any RGData was dumped
|
|
*/
|
|
bool dump()
|
|
{
|
|
// Always leave at least 2 RG as this is the minimum size of the hashmap
|
|
constexpr size_t MIN_INMEMORY = 2;
|
|
if (fLRU->size() <= MIN_INMEMORY)
|
|
{
|
|
return false;
|
|
}
|
|
size_t moved = 0;
|
|
auto it = fLRU->rbegin();
|
|
while (LIKELY(it != fLRU->rend()))
|
|
{
|
|
if (fLRU->size() <= MIN_INMEMORY)
|
|
return false;
|
|
uint64_t rgid = *it;
|
|
if (UNLIKELY(!fRGDatas[rgid]))
|
|
{
|
|
++it;
|
|
fLRU->remove(rgid);
|
|
continue;
|
|
}
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
if (moved <= MIN_INMEMORY && fRowGroupOut->getRowCount() < fMaxRows)
|
|
{
|
|
++it;
|
|
++moved;
|
|
fLRU->add(rgid);
|
|
continue;
|
|
}
|
|
saveRG(rgid);
|
|
fLRU->remove(rgid);
|
|
fRGDatas[rgid].reset();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/** @brief Dump all data, clear state and start over */
|
|
void startNewGeneration()
|
|
{
|
|
dumpAll();
|
|
fLRU->clear();
|
|
fMM->release();
|
|
fRGDatas.clear();
|
|
|
|
// we need at least one RGData so create it right now
|
|
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
|
|
fRowGroupOut->setData(curRG);
|
|
fRowGroupOut->resetRowGroup(0);
|
|
fRGDatas.emplace_back(curRG);
|
|
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
if (!fMM->acquire(memSz))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
fCurRgid = 0;
|
|
++fGeneration;
|
|
}
|
|
|
|
/** @brief Save "finalized" bitmap to disk for future use */
|
|
void dumpFinalizedInfo() const
|
|
{
|
|
auto fname = makeFinalizedFilename();
|
|
int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
|
if (UNLIKELY(fd < 0))
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
uint64_t sz = fRGDatas.size();
|
|
uint64_t finsz = fFinalizedRows.size();
|
|
|
|
int errNo;
|
|
if ((errNo = writeData(fd, (const char*)&sz, sizeof(sz))) != 0 ||
|
|
(errNo = writeData(fd, (const char*)&finsz, sizeof(finsz)) != 0) ||
|
|
(errNo = writeData(fd, (const char*)fFinalizedRows.data(), finsz * sizeof(uint64_t)) != 0))
|
|
{
|
|
close(fd);
|
|
unlink(fname.c_str());
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
close(fd);
|
|
}
|
|
|
|
/** @brief Load "finalized" bitmap */
|
|
void loadFinalizedInfo()
|
|
{
|
|
auto fname = makeFinalizedFilename();
|
|
int fd = open(fname.c_str(), O_RDONLY);
|
|
if (fd < 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
uint64_t sz;
|
|
uint64_t finsz;
|
|
int errNo;
|
|
if ((errNo = readData(fd, (char*)&sz, sizeof(sz)) != 0) ||
|
|
(errNo = readData(fd, (char*)&finsz, sizeof(finsz)) != 0))
|
|
{
|
|
close(fd);
|
|
unlink(fname.c_str());
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
fRGDatas.resize(sz);
|
|
fFinalizedRows.resize(finsz);
|
|
if ((errNo = readData(fd, (char*)fFinalizedRows.data(), finsz * sizeof(uint64_t))) != 0)
|
|
{
|
|
close(fd);
|
|
unlink(fname.c_str());
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
close(fd);
|
|
}
|
|
|
|
/** @brief Save all RGData to disk */
|
|
void dumpAll(bool dumpFin = true) const
|
|
{
|
|
#ifdef DISK_AGG_DEBUG
|
|
dumpMeta();
|
|
#endif
|
|
for (uint64_t i = 0; i < fRGDatas.size(); ++i)
|
|
{
|
|
if (fRGDatas[i])
|
|
saveRG(i, fRGDatas[i].get());
|
|
else
|
|
{
|
|
auto fname = makeRGFilename(i);
|
|
if (access(fname.c_str(), F_OK) != 0)
|
|
::abort();
|
|
}
|
|
}
|
|
if (dumpFin)
|
|
dumpFinalizedInfo();
|
|
}
|
|
|
|
/** @brief Create new RowGroupStorage with the save LRU, MemManager & uniq ID */
|
|
RowGroupStorage* clone(uint16_t gen) const
|
|
{
|
|
auto* ret = new RowGroupStorage(fTmpDir, fRowGroupOut, fMaxRows);
|
|
ret->fRGDatas.clear();
|
|
ret->fLRU.reset(fLRU->clone());
|
|
ret->fMM.reset(fMM->clone());
|
|
ret->fUniqId = fUniqId;
|
|
ret->fGeneration = gen;
|
|
ret->fCompressor = fCompressor;
|
|
ret->fDumper.reset(new Dumper(fCompressor, fMM.get()));
|
|
ret->fUseDisk = fUseDisk;
|
|
ret->loadFinalizedInfo();
|
|
return ret;
|
|
}
|
|
|
|
/** @brief Mark row at specified index as finalized so it should be skipped
|
|
*/
|
|
void markFinalized(uint64_t idx)
|
|
{
|
|
auto [gid, rid] = rowIdxToGidRid(idx, 64);
|
|
if (LIKELY(fFinalizedRows.size() <= gid))
|
|
fFinalizedRows.resize(gid + 1, 0ULL);
|
|
|
|
fFinalizedRows[gid] |= 1ULL << rid;
|
|
}
|
|
|
|
/** @brief Check if row at specified index was finalized earlier */
|
|
bool isFinalized(uint64_t idx) const
|
|
{
|
|
auto [gid, rid] = rowIdxToGidRid(idx, 64);
|
|
if (LIKELY(fFinalizedRows.size() <= gid))
|
|
return false;
|
|
|
|
return fFinalizedRows[gid] & (1ULL << rid);
|
|
}
|
|
|
|
void getTmpFilePrefixes(std::vector<std::string>& prefixes) const
|
|
{
|
|
char buf[PATH_MAX];
|
|
snprintf(buf, sizeof(buf), "Agg-p%u-t%p-rg", getpid(), fUniqId);
|
|
prefixes.emplace_back(buf);
|
|
|
|
snprintf(buf, sizeof(buf), "AggFin-p%u-t%p-g", getpid(), fUniqId);
|
|
prefixes.emplace_back(buf);
|
|
}
|
|
|
|
private:
|
|
/** @brief Get next available RGData and fill filename of the dump if it's
|
|
* not in the memory.
|
|
*
|
|
* It skips finalized rows, shifting data within rowgroups
|
|
*
|
|
* @param rgdata(out) RGData to return
|
|
* @param fname(out) Filename of the dump if it's not in the memory
|
|
* @returns true if there is available RGData
|
|
*/
|
|
bool getNextRGData(RGDataUnPtr& rgdata, std::string& fname)
|
|
{
|
|
if (UNLIKELY(fRGDatas.empty()))
|
|
{
|
|
fMM->release();
|
|
return false;
|
|
}
|
|
while (!fRGDatas.empty())
|
|
{
|
|
auto rgid = getLastRGData(rgdata);
|
|
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
|
|
|
|
if (fFinalizedRows.size() > fgid)
|
|
{
|
|
if (tgid >= fFinalizedRows.size())
|
|
fFinalizedRows.resize(tgid + 1, 0ULL);
|
|
|
|
if (!rgdata)
|
|
{
|
|
for (auto i = fgid; i < tgid; ++i)
|
|
{
|
|
if (fFinalizedRows[i] != ~0ULL)
|
|
{
|
|
loadRG(rgid, rgdata, true);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!rgdata)
|
|
{
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
continue;
|
|
}
|
|
|
|
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
|
|
|
|
if (pos == 0)
|
|
{
|
|
fLRU->remove(rgid);
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
continue;
|
|
}
|
|
|
|
fRowGroupOut->setData(rgdata.get());
|
|
fRowGroupOut->setRowCount(pos);
|
|
}
|
|
|
|
// Release the memory used by the current rgdata.
|
|
if (rgdata)
|
|
{
|
|
fRowGroupOut->setData(rgdata.get());
|
|
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
fMM->release(memSz);
|
|
unlink(makeRGFilename(rgid).c_str());
|
|
}
|
|
else
|
|
{
|
|
fname = makeRGFilename(rgid);
|
|
}
|
|
// to periodically clean up freed memory so it can be used by other threads.
|
|
fLRU->remove(rgid);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/** @brief Compact rowgroup with finalized rows
|
|
*
|
|
* Move raw data from the next non-finalized row to replace finalized rows
|
|
* It is safe because pointers to long string also stored inside data
|
|
*
|
|
* @param rgdata(in,out) RGData to work with
|
|
* @param to(in) row num inside rgdata of the first finalized row
|
|
* @param from(in) row num of the first actual row
|
|
* @param numRows(in) how many rows should be moved
|
|
*/
|
|
void moveRows(RGData* rgdata, uint64_t to, uint64_t from, size_t numRows)
|
|
{
|
|
const size_t rowsz = fRowGroupOut->getRowSize();
|
|
const size_t hdrsz = RowGroup::getHeaderSize();
|
|
uint8_t* data = rgdata->rowData.get() + hdrsz;
|
|
memmove(data + to * rowsz, data + from * rowsz, numRows * rowsz);
|
|
}
|
|
|
|
/** @brief Load RGData from disk dump.
|
|
*
|
|
* @param rgid(in) RGData ID
|
|
*/
|
|
void loadRG(uint64_t rgid)
|
|
{
|
|
if (UNLIKELY(static_cast<bool>(fRGDatas[rgid])))
|
|
{
|
|
fRowGroupOut->setData(fRGDatas[rgid].get());
|
|
return;
|
|
}
|
|
|
|
loadRG(rgid, fRGDatas[rgid]);
|
|
}
|
|
|
|
void loadRG(uint64_t rgid, RGDataUnPtr& rgdata, bool unlinkDump = false)
|
|
{
|
|
auto fname = makeRGFilename(rgid);
|
|
|
|
std::vector<char> data;
|
|
int errNo;
|
|
if ((errNo = fDumper->read(fname, data)) != 0)
|
|
{
|
|
unlink(fname.c_str());
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
|
|
messageqcpp::ByteStream bs(reinterpret_cast<uint8_t*>(data.data()), data.size());
|
|
|
|
if (unlinkDump)
|
|
unlink(fname.c_str());
|
|
rgdata.reset(new RGData());
|
|
rgdata->deserialize(bs, fRowGroupOut->getDataSize(fMaxRows));
|
|
assert(bs.length() == 0);
|
|
|
|
fRowGroupOut->setData(rgdata.get());
|
|
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
|
|
if (!fMM->acquire(memSz))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
}
|
|
|
|
/** @brief Dump RGData to disk.
|
|
*
|
|
* @param rgid(in) RGData ID
|
|
*/
|
|
void saveRG(uint64_t rgid)
|
|
{
|
|
auto rgdata = std::move(fRGDatas[rgid]);
|
|
if (!rgdata)
|
|
return;
|
|
|
|
fLRU->remove(rgid);
|
|
fRowGroupOut->setData(rgdata.get());
|
|
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
|
|
|
|
saveRG(rgid, rgdata.get());
|
|
}
|
|
|
|
/** @brief Dump RGData to disk.
|
|
*
|
|
* @param rgid(in) RGData ID
|
|
* @param rgdata(in) pointer to RGData itself
|
|
*/
|
|
void saveRG(uint64_t rgid, RGData* rgdata) const
|
|
{
|
|
messageqcpp::ByteStream bs;
|
|
fRowGroupOut->setData(rgdata);
|
|
rgdata->serialize(bs, fRowGroupOut->getDataSize());
|
|
|
|
int errNo;
|
|
if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
}
|
|
|
|
#ifdef DISK_AGG_DEBUG
|
|
void dumpMeta() const
|
|
{
|
|
messageqcpp::ByteStream bs;
|
|
fRowGroupOut->serialize(bs);
|
|
|
|
char buf[1024];
|
|
snprintf(buf, sizeof(buf), "%s/META-p%u-t%p", fTmpDir.c_str(), getpid(), fUniqId);
|
|
int fd = open(buf, O_WRONLY | O_TRUNC | O_CREAT, 0644);
|
|
assert(fd >= 0);
|
|
|
|
auto r = write(fd, bs.buf(), bs.length());
|
|
assert(size_t(r) == bs.length());
|
|
close(fd);
|
|
}
|
|
#endif
|
|
|
|
/** @brief Create dump filename.
|
|
*/
|
|
std::string makeRGFilename(uint64_t rgid) const
|
|
{
|
|
char buf[PATH_MAX];
|
|
snprintf(buf, sizeof(buf), "%s/Agg-p%u-t%p-rg%lu-g%u", fTmpDir.c_str(), getpid(), fUniqId, rgid,
|
|
fGeneration);
|
|
return buf;
|
|
}
|
|
|
|
std::string makeFinalizedFilename() const
|
|
{
|
|
char fname[PATH_MAX];
|
|
snprintf(fname, sizeof(fname), "%s/AggFin-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId, fGeneration);
|
|
return fname;
|
|
}
|
|
|
|
private:
|
|
friend class RowAggStorage;
|
|
RowGroup* fRowGroupOut{nullptr};
|
|
const size_t fMaxRows;
|
|
std::unique_ptr<MemManager> fMM;
|
|
std::unique_ptr<LRUIface> fLRU;
|
|
RGDataStorage fRGDatas;
|
|
const void* fUniqId;
|
|
|
|
uint64_t fCurRgid{0};
|
|
uint16_t fGeneration{0};
|
|
std::vector<uint64_t> fFinalizedRows;
|
|
std::string fTmpDir;
|
|
compress::CompressInterface* fCompressor;
|
|
std::unique_ptr<Dumper> fDumper;
|
|
bool fUseDisk;
|
|
};
|
|
|
|
/** @brief Internal data for the hashmap */
|
|
struct RowPosHash
|
|
{
|
|
uint64_t hash; ///< Row hash
|
|
uint64_t idx; ///< index in the RowGroupStorage
|
|
};
|
|
|
|
/***
|
|
* @ brief Storage for row positions and hashes memory management
|
|
* and the ability to save on disk
|
|
*/
|
|
class RowPosHashStorage
|
|
{
|
|
public:
|
|
/***
|
|
* @brief Default constructor
|
|
*
|
|
* The internal data is stored as a plain vector of row pos & hash
|
|
*
|
|
* @param tmpDir(in) directory for tmp data
|
|
* @param size(in) maximum size of storage (NB: without the padding)
|
|
* @param rm ResourceManager to use
|
|
* @param sessLimit session memory limit
|
|
* @param enableDiskAgg is disk aggregation enabled?
|
|
*/
|
|
RowPosHashStorage(const std::string& tmpDir, size_t size, joblist::ResourceManager* rm,
|
|
boost::shared_ptr<int64_t> sessLimit, bool enableDiskAgg,
|
|
compress::CompressInterface* compressor)
|
|
: fUniqId(this), fTmpDir(tmpDir), fCompressor(compressor)
|
|
{
|
|
if (rm)
|
|
fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg));
|
|
else
|
|
fMM.reset(new MemManager());
|
|
|
|
fDumper.reset(new Dumper(fCompressor, fMM.get()));
|
|
|
|
if (size != 0)
|
|
init(size);
|
|
}
|
|
|
|
/***
|
|
* @brief Get the row position and hash at the idx
|
|
*
|
|
* @param idx(in) index (from 0) of the row
|
|
*/
|
|
RowPosHash& get(uint64_t idx)
|
|
{
|
|
return fPosHashes[idx];
|
|
}
|
|
|
|
/***
|
|
* @brief Store the row position and hash at the idx
|
|
*
|
|
* @param idx(in) index of the row
|
|
* @param rowData(in) position and hash of the row
|
|
*/
|
|
void set(uint64_t idx, const RowPosHash& pos)
|
|
{
|
|
memcpy(&fPosHashes[idx], &pos, sizeof(pos));
|
|
}
|
|
|
|
/*** @return Size of data */
|
|
ssize_t memUsage() const
|
|
{
|
|
return fMM->getUsed();
|
|
}
|
|
|
|
/*** @brief Unregister used memory */
|
|
void releaseMemory()
|
|
{
|
|
fMM->release();
|
|
}
|
|
|
|
/***
|
|
* @brief Move row positions & hashes inside [insIdx, startIdx] up by 1
|
|
*
|
|
* @param startIdx(in) last index to move
|
|
* @param insIdx(in) first index to move
|
|
*/
|
|
void shiftUp(uint64_t startIdx, uint64_t insIdx)
|
|
{
|
|
memmove(&fPosHashes[insIdx + 1], &fPosHashes[insIdx],
|
|
(startIdx - insIdx) * sizeof(decltype(fPosHashes)::value_type));
|
|
}
|
|
|
|
/***
|
|
* @brief Create new storage with the same MemManager & uniq ID
|
|
* @param size(in) maximum size of storage without padding
|
|
* @param gen(in) new generation
|
|
*/
|
|
RowPosHashStoragePtr clone(size_t size, uint16_t gen, bool loadDump = false) const
|
|
{
|
|
RowPosHashStoragePtr cloned;
|
|
|
|
cloned.reset(new RowPosHashStorage());
|
|
cloned->fMM.reset(fMM->clone());
|
|
cloned->fTmpDir = fTmpDir;
|
|
cloned->init(size);
|
|
cloned->fUniqId = fUniqId;
|
|
cloned->fGeneration = gen;
|
|
cloned->fCompressor = fCompressor;
|
|
cloned->fDumper.reset(new Dumper(fCompressor, cloned->fMM.get()));
|
|
if (loadDump)
|
|
cloned->load();
|
|
return cloned;
|
|
}
|
|
|
|
/*** @brief Remove dump file */
|
|
void cleanup() const
|
|
{
|
|
unlink(makeDumpName().c_str());
|
|
}
|
|
|
|
/***
|
|
* @brief Dump all data, clear state, and start over with the new generation
|
|
*/
|
|
void startNewGeneration()
|
|
{
|
|
dump();
|
|
++fGeneration;
|
|
fPosHashes.clear();
|
|
fMM->release();
|
|
}
|
|
|
|
void dump()
|
|
{
|
|
int errNo;
|
|
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
|
|
if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
}
|
|
|
|
private:
|
|
RowPosHashStorage() = default;
|
|
|
|
void init(size_t size)
|
|
{
|
|
auto bkts = size + 0xFFUL;
|
|
if (!fMM->acquire(bkts * sizeof(decltype(fPosHashes)::value_type)))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
fPosHashes.resize(bkts);
|
|
}
|
|
|
|
std::string makeDumpName() const
|
|
{
|
|
char fname[PATH_MAX];
|
|
snprintf(fname, sizeof(fname), "%s/Agg-PosHash-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId,
|
|
fGeneration);
|
|
return fname;
|
|
}
|
|
|
|
void load()
|
|
{
|
|
int errNo;
|
|
std::vector<char> data;
|
|
if ((errNo = fDumper->read(makeDumpName(), data)) != 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
|
|
fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type));
|
|
memcpy(fPosHashes.data(), data.data(), data.size());
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<MemManager> fMM;
|
|
std::vector<RowPosHash> fPosHashes;
|
|
uint16_t fGeneration{0}; ///< current aggregation generation
|
|
void* fUniqId; ///< uniq ID to make an uniq dump filename
|
|
std::string fTmpDir;
|
|
compress::CompressInterface* fCompressor;
|
|
std::unique_ptr<Dumper> fDumper;
|
|
};
|
|
|
|
/*---------------------------------------------------------------------------
|
|
* Based on the Robin Hood hashmap implementation by Martin Ankerl:
|
|
* https://github.com/martinus/robin-hood-hashing
|
|
*
|
|
* But store actual row data within RowGroupStorage, that stores a vector
|
|
* of RGData with possibility to dump unused to disk. Also store some service
|
|
* information (index and hash) in the vector of portions of the same size in
|
|
* elements, that can be dumped on disk either.
|
|
----------------------------------------------------------------------------*/
|
|
RowAggStorage::RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup,
|
|
uint32_t keyCount, joblist::ResourceManager* rm,
|
|
boost::shared_ptr<int64_t> sessLimit, bool enabledDiskAgg, bool allowGenerations,
|
|
compress::CompressInterface* compressor)
|
|
: fMaxRows(getMaxRows(enabledDiskAgg))
|
|
, fExtKeys(rowGroupOut != keysRowGroup)
|
|
, fLastKeyCol(keyCount - 1)
|
|
, fUniqId(this)
|
|
, fAllowGenerations(allowGenerations)
|
|
, fEnabledDiskAggregation(enabledDiskAgg)
|
|
, fCompressor(compressor)
|
|
, fTmpDir(tmpDir)
|
|
, fRowGroupOut(rowGroupOut)
|
|
, fKeysRowGroup(keysRowGroup)
|
|
, fRandom(reinterpret_cast<std::uintptr_t>(this))
|
|
{
|
|
char suffix[PATH_MAX];
|
|
snprintf(suffix, sizeof(suffix), "/p%u-t%p/", getpid(), this);
|
|
fTmpDir.append(suffix);
|
|
if (enabledDiskAgg)
|
|
boost::filesystem::create_directories(fTmpDir);
|
|
|
|
if (rm)
|
|
{
|
|
fMM.reset(new RMMemManager(rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg));
|
|
fNumOfInputRGPerThread = std::max<uint32_t>(1, rm->aggNumRowGroups());
|
|
}
|
|
else
|
|
{
|
|
fMM.reset(new MemManager());
|
|
fNumOfInputRGPerThread = 1;
|
|
}
|
|
fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg,
|
|
fCompressor.get()));
|
|
if (fExtKeys)
|
|
{
|
|
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg,
|
|
!enabledDiskAgg, fCompressor.get()));
|
|
fKeysStorage = fRealKeysStorage.get();
|
|
}
|
|
else
|
|
{
|
|
fKeysStorage = fStorage.get();
|
|
}
|
|
fKeysStorage->initRow(fKeyRow);
|
|
fGens.emplace_back(new Data);
|
|
fCurData = fGens.back().get();
|
|
fCurData->fHashes.reset(
|
|
new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get()));
|
|
}
|
|
|
|
RowAggStorage::~RowAggStorage()
|
|
{
|
|
cleanupAll();
|
|
}
|
|
|
|
bool RowAggStorage::getTargetRow(const Row& row, Row& rowOut)
|
|
{
|
|
uint64_t hash = hashRow(row, fLastKeyCol);
|
|
return getTargetRow(row, hash, rowOut);
|
|
}
|
|
|
|
bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut)
|
|
{
|
|
if (UNLIKELY(!fInitialized))
|
|
{
|
|
fInitialized = true;
|
|
fStorage.reset(new RowGroupStorage(fTmpDir, fRowGroupOut, fMaxRows, fMM->getResourceManaged(),
|
|
fMM->getSessionLimit(), !fEnabledDiskAggregation,
|
|
!fEnabledDiskAggregation, fCompressor.get()));
|
|
if (fExtKeys)
|
|
{
|
|
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, fKeysRowGroup, fMaxRows, fMM->getResourceManaged(),
|
|
fMM->getSessionLimit(), !fEnabledDiskAggregation,
|
|
!fEnabledDiskAggregation, fCompressor.get()));
|
|
fKeysStorage = fRealKeysStorage.get();
|
|
}
|
|
else
|
|
{
|
|
fKeysStorage = fStorage.get();
|
|
}
|
|
fKeysStorage->initRow(fKeyRow);
|
|
reserve(fMaxRows);
|
|
}
|
|
else if (UNLIKELY(fCurData->fSize >= fCurData->fMaxSize))
|
|
{
|
|
increaseSize();
|
|
}
|
|
auto [info, idx] = rowHashToIdx(hash);
|
|
|
|
nextWhileLess(info, idx);
|
|
|
|
while (info == fCurData->fInfo[idx])
|
|
{
|
|
auto& pos = fCurData->fHashes->get(idx);
|
|
if (pos.hash == hash)
|
|
{
|
|
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
|
|
fKeysStorage->getRow(pos.idx, keyRow);
|
|
if (row.equals(keyRow, fLastKeyCol))
|
|
{
|
|
if (!fExtKeys)
|
|
return false;
|
|
|
|
fStorage->getRow(pos.idx, rowOut);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
next(info, idx);
|
|
}
|
|
|
|
if (!fEnabledDiskAggregation && fGeneration != 0)
|
|
{
|
|
// there are several generations here, so let's try to find suitable row in them
|
|
uint16_t gen = fGeneration - 1;
|
|
do
|
|
{
|
|
auto* genData = fGens[gen].get();
|
|
auto [ginfo, gidx] = rowHashToIdx(hash, genData->fMask, genData->hashMultiplier_, genData->fInfoInc,
|
|
genData->fInfoHashShift);
|
|
nextWhileLess(ginfo, gidx, genData);
|
|
|
|
while (ginfo == genData->fInfo[gidx])
|
|
{
|
|
auto& pos = genData->fHashes->get(idx);
|
|
if (pos.hash == hash)
|
|
{
|
|
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
|
|
|
|
fKeysStorage->getRow(pos.idx, keyRow);
|
|
if (row.equals(keyRow, fLastKeyCol))
|
|
{
|
|
if (!fExtKeys)
|
|
return false;
|
|
|
|
fStorage->getRow(pos.idx, rowOut);
|
|
return false;
|
|
}
|
|
}
|
|
next(ginfo, gidx, genData);
|
|
}
|
|
} while (gen-- != 0);
|
|
}
|
|
|
|
const auto ins_idx = idx;
|
|
const auto ins_info = info;
|
|
if (UNLIKELY(ins_info + fCurData->fInfoInc > 0xFF))
|
|
{
|
|
fCurData->fMaxSize = 0;
|
|
}
|
|
|
|
while (fCurData->fInfo[idx] != 0)
|
|
{
|
|
next(info, idx);
|
|
}
|
|
|
|
if (idx != ins_idx)
|
|
{
|
|
shiftUp(idx, ins_idx);
|
|
}
|
|
RowPosHash pos;
|
|
pos.hash = hash;
|
|
fStorage->putRow(pos.idx, rowOut);
|
|
if (fExtKeys)
|
|
{
|
|
fKeysStorage->putKeyRow(pos.idx, fKeyRow);
|
|
copyRow(row, &fKeyRow);
|
|
}
|
|
fCurData->fHashes->set(ins_idx, pos);
|
|
fCurData->fInfo[ins_idx] = static_cast<uint8_t>(ins_info);
|
|
++fCurData->fSize;
|
|
return true;
|
|
}
|
|
|
|
void RowAggStorage::dump()
|
|
{
|
|
if (!fEnabledDiskAggregation)
|
|
return;
|
|
|
|
constexpr int64_t freeMemLimit = 50LL * 1024LL * 1024LL;
|
|
|
|
const int64_t leaveFree = std::max(RGDataSizeType(freeMemLimit), fNumOfInputRGPerThread * fRowGroupOut->getSizeWithStrings(fMaxRows) * getBucketSize());
|
|
uint64_t freeAttempts{0};
|
|
int64_t freeMem = 0;
|
|
while (true)
|
|
{
|
|
++freeAttempts;
|
|
freeMem = fMM->getFree();
|
|
if (freeMem > leaveFree)
|
|
break;
|
|
|
|
bool success = fStorage->dump();
|
|
if (fExtKeys)
|
|
success |= fKeysStorage->dump();
|
|
|
|
if (!success)
|
|
break;
|
|
}
|
|
|
|
const int64_t totalMem = fMM->getConfigured();
|
|
// If the generations are allowed and there are less than half of
|
|
// rowgroups in memory, then we start a new generation
|
|
if (fAllowGenerations && fStorage->fLRU->size() < fStorage->fRGDatas.size() / 2 &&
|
|
fStorage->fRGDatas.size() > 10)
|
|
{
|
|
startNewGeneration();
|
|
}
|
|
else if (fAllowGenerations && freeMem < totalMem / 10 * 3 && nextRandDistib() < 30)
|
|
{
|
|
startNewGeneration();
|
|
}
|
|
else if (fAllowGenerations && fMM->getFree() < freeMemLimit)
|
|
{
|
|
startNewGeneration();
|
|
}
|
|
else if (!fAllowGenerations && freeMem < 0 && freeAttempts == 1)
|
|
{
|
|
// safety guard so aggregation couldn't eat all available memory
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_TOO_BIG),
|
|
logging::ERR_DISKAGG_TOO_BIG);
|
|
}
|
|
}
|
|
|
|
void RowAggStorage::append(RowAggStorage& other)
|
|
{
|
|
// we don't need neither key rows storage nor any internal data anymore
|
|
// neither in this RowAggStorage nor in the other
|
|
cleanup();
|
|
freeData();
|
|
if (other.fGeneration == 0 || !fEnabledDiskAggregation)
|
|
{
|
|
// even if there is several generations in the other RowAggStorage
|
|
// in case of in-memory-only aggregation they all share the same RowStorage
|
|
other.cleanup();
|
|
other.freeData();
|
|
fStorage->append(std::move(other.fStorage));
|
|
return;
|
|
}
|
|
|
|
// iff other RowAggStorage has several generations, sequential load and append
|
|
// them all
|
|
// the only needed data is the aggregated RowStorage itself
|
|
auto gen = other.fGeneration;
|
|
while (true)
|
|
{
|
|
fStorage->append(other.fStorage.get());
|
|
other.cleanup();
|
|
if (gen == 0)
|
|
break;
|
|
--gen;
|
|
other.fGeneration = gen;
|
|
other.fStorage.reset(other.fStorage->clone(gen));
|
|
}
|
|
}
|
|
|
|
RGDataUnPtr RowAggStorage::getNextRGData()
|
|
{
|
|
if (!fStorage)
|
|
{
|
|
return {};
|
|
}
|
|
cleanup();
|
|
freeData();
|
|
return fStorage->getNextRGData();
|
|
}
|
|
|
|
bool RowAggStorage::getNextOutputRGData(RGDataUnPtr& rgdata)
|
|
{
|
|
if (!fStorage)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
cleanup();
|
|
freeData();
|
|
|
|
// fGeneration is an unsigned int, we need a signed int for a comparison >= 0
|
|
int32_t gen = fGeneration;
|
|
while (gen >= 0)
|
|
{
|
|
bool moreInGeneration = fStorage->getNextOutputRGData(rgdata);
|
|
|
|
if (moreInGeneration)
|
|
{
|
|
fRowGroupOut->setData(rgdata.get());
|
|
return true;
|
|
}
|
|
|
|
// all generations have been emptied
|
|
if (fGeneration == 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// current generation has no more RGDatas to return
|
|
// load earlier generation and continue with returning its RGDatas
|
|
gen--;
|
|
fGeneration--;
|
|
fStorage.reset(fStorage->clone(fGeneration));
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void RowAggStorage::freeData()
|
|
{
|
|
for (auto& data : fGens)
|
|
{
|
|
data->fHashes.reset();
|
|
if (data->fInfo)
|
|
{
|
|
const size_t memSz = calcSizeWithBuffer(data->fMask + 1);
|
|
fMM->release(memSz);
|
|
data->fInfo.reset();
|
|
}
|
|
}
|
|
fGens.clear();
|
|
fCurData = nullptr;
|
|
}
|
|
|
|
void RowAggStorage::shiftUp(size_t startIdx, size_t insIdx)
|
|
{
|
|
auto idx = startIdx;
|
|
while (idx != insIdx)
|
|
{
|
|
fCurData->fInfo[idx] = static_cast<uint8_t>(fCurData->fInfo[idx - 1] + fCurData->fInfoInc);
|
|
if (UNLIKELY(fCurData->fInfo[idx] + fCurData->fInfoInc > 0xFF))
|
|
{
|
|
fCurData->fMaxSize = 0;
|
|
}
|
|
--idx;
|
|
}
|
|
fCurData->fHashes->shiftUp(startIdx, insIdx);
|
|
}
|
|
|
|
void RowAggStorage::increaseSize()
|
|
{
|
|
if (fCurData->fMask == 0)
|
|
{
|
|
initData(INIT_SIZE, fCurData->fHashes.get());
|
|
}
|
|
|
|
const auto maxSize = calcMaxSize(fCurData->fMask + 1);
|
|
if (fCurData->fSize < maxSize && tryIncreaseInfo())
|
|
return;
|
|
|
|
constexpr size_t maxMaskMultiplierWoRehashing = 1U << (INIT_INFO_INC - 1);
|
|
// We don't check for the overflow here b/c it is impractical to has fSize so that multiplication
|
|
// overflows.
|
|
if (fCurData->fSize * maxMaskMultiplierWoRehashing < calcMaxSize(fCurData->fMask + 1))
|
|
{
|
|
// something strange happens...
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_OVERFLOW2),
|
|
logging::ERR_DISKAGG_OVERFLOW2);
|
|
}
|
|
|
|
auto freeMem = fMM->getFree();
|
|
if (fEnabledDiskAggregation ||
|
|
freeMem > (fMM->getUsed() + fCurData->fHashes->memUsage() + fStorage->getAproxRGSize()) * 2)
|
|
{
|
|
if (fCurData->fSize * 2 < maxSize)
|
|
{
|
|
// we have to resize, even though there would still be plenty of space left!
|
|
// Try to rehash instead. Delete freed memory so we don't steadyily increase mem in case
|
|
// we have to rehash a few times
|
|
// adding an *even* number, so that the multiplier will always stay odd. This is necessary
|
|
// so that the hash stays a mixing function (and thus doesn't have any information loss).
|
|
fCurData->hashMultiplier_ += 0xc4ceb9fe1a85ec54;
|
|
rehashPowerOfTwo(fCurData->fMask + 1);
|
|
}
|
|
else
|
|
{
|
|
rehashPowerOfTwo((fCurData->fMask + 1) * 2);
|
|
}
|
|
}
|
|
else if (fGeneration < MAX_INMEMORY_GENS - 1)
|
|
{
|
|
startNewGeneration();
|
|
}
|
|
else
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
}
|
|
|
|
bool RowAggStorage::tryIncreaseInfo()
|
|
{
|
|
if (fCurData->fInfoInc <= 2)
|
|
return false;
|
|
|
|
fCurData->fInfoInc = static_cast<uint8_t>(fCurData->fInfoInc >> 1U);
|
|
++fCurData->fInfoHashShift;
|
|
const auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
|
|
for (size_t i = 0; i < elems; i += 8)
|
|
{
|
|
uint64_t val;
|
|
memcpy(&val, fCurData->fInfo.get() + i, sizeof(val));
|
|
val = (val >> 1U) & 0x7f7f7f7f7f7f7f7fULL;
|
|
memcpy(fCurData->fInfo.get() + i, &val, sizeof(val));
|
|
}
|
|
|
|
fCurData->fInfo[elems] = 1;
|
|
fCurData->fMaxSize = calcMaxSize(fCurData->fMask + 1);
|
|
return true;
|
|
}
|
|
|
|
void RowAggStorage::rehashPowerOfTwo(size_t elems)
|
|
{
|
|
const size_t oldSz = calcSizeWithBuffer(fCurData->fMask + 1);
|
|
auto oldInfo = std::move(fCurData->fInfo);
|
|
auto oldHashes = std::move(fCurData->fHashes);
|
|
fMM->release(calcBytes(oldSz));
|
|
|
|
initData(elems, oldHashes.get());
|
|
oldHashes->releaseMemory();
|
|
|
|
if (oldSz > 1)
|
|
{
|
|
for (size_t i = 0; i < oldSz; ++i)
|
|
{
|
|
if (UNLIKELY(oldInfo[i] != 0))
|
|
{
|
|
insertSwap(i, oldHashes.get());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes)
|
|
{
|
|
if (fCurData->fMaxSize == 0 && !tryIncreaseInfo())
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_OVERFLOW1),
|
|
logging::ERR_DISKAGG_OVERFLOW1);
|
|
}
|
|
|
|
auto pos = oldHashes->get(oldIdx);
|
|
auto [info, idx] = rowHashToIdx(pos.hash);
|
|
|
|
while (info <= fCurData->fInfo[idx])
|
|
{
|
|
++idx;
|
|
info += fCurData->fInfoInc;
|
|
}
|
|
|
|
// don't need to compare rows here - they differ by definition
|
|
const auto ins_idx = idx;
|
|
const auto ins_info = static_cast<uint8_t>(info);
|
|
if (UNLIKELY(ins_info + fCurData->fInfoInc > 0xFF))
|
|
fCurData->fMaxSize = 0;
|
|
|
|
while (fCurData->fInfo[idx] != 0)
|
|
{
|
|
next(info, idx);
|
|
}
|
|
|
|
if (idx != ins_idx)
|
|
shiftUp(idx, ins_idx);
|
|
|
|
fCurData->fHashes->set(ins_idx, pos);
|
|
fCurData->fInfo[ins_idx] = ins_info;
|
|
++fCurData->fSize;
|
|
}
|
|
|
|
void RowAggStorage::initData(size_t elems, const RowPosHashStorage* oldHashes)
|
|
{
|
|
fCurData->fSize = 0;
|
|
fCurData->fMask = elems - 1;
|
|
fCurData->fMaxSize = calcMaxSize(elems);
|
|
|
|
const auto sizeWithBuffer = calcSizeWithBuffer(elems, fCurData->fMaxSize);
|
|
const auto bytes = calcBytes(sizeWithBuffer);
|
|
|
|
if (!fMM->acquire(bytes))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG),
|
|
logging::ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
fCurData->fHashes = oldHashes->clone(elems, fGeneration);
|
|
fCurData->fInfo.reset(new uint8_t[bytes]());
|
|
fCurData->fInfo[sizeWithBuffer] = 1;
|
|
fCurData->fInfoInc = INIT_INFO_INC;
|
|
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
|
|
}
|
|
|
|
void RowAggStorage::reserve(size_t c)
|
|
{
|
|
auto const minElementsAllowed = (std::max)(c, fCurData->fSize);
|
|
auto newSize = INIT_SIZE;
|
|
while (calcMaxSize(newSize) < minElementsAllowed && newSize != 0)
|
|
{
|
|
newSize *= 2;
|
|
}
|
|
if (UNLIKELY(newSize == 0))
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_ERROR),
|
|
logging::ERR_DISKAGG_ERROR);
|
|
}
|
|
|
|
// only actually do anything when the new size is bigger than the old one. This prevents to
|
|
// continuously allocate for each reserve() call.
|
|
if (newSize > fCurData->fMask + 1)
|
|
{
|
|
rehashPowerOfTwo(newSize);
|
|
}
|
|
}
|
|
|
|
void RowAggStorage::startNewGeneration()
|
|
{
|
|
if (!fEnabledDiskAggregation)
|
|
{
|
|
++fGeneration;
|
|
fGens.emplace_back(new Data);
|
|
auto* newData = fGens.back().get();
|
|
newData->fHashes = fCurData->fHashes->clone(0, fGeneration);
|
|
fCurData = newData;
|
|
reserve(fMaxRows);
|
|
return;
|
|
}
|
|
|
|
if (fCurData->fSize == 0)
|
|
return;
|
|
// save all data and free storages' memory
|
|
dumpInternalData();
|
|
fCurData->fHashes->startNewGeneration();
|
|
fStorage->startNewGeneration();
|
|
if (fExtKeys)
|
|
fKeysStorage->startNewGeneration();
|
|
|
|
++fGeneration;
|
|
fMM->release();
|
|
// reinitialize internal structures
|
|
fCurData->fInfo.reset();
|
|
fCurData->fSize = 0;
|
|
fCurData->fMask = 0;
|
|
fCurData->fMaxSize = 0;
|
|
fCurData->fInfoInc = INIT_INFO_INC;
|
|
fCurData->fInfoHashShift = INIT_INFO_HASH_SHIFT;
|
|
reserve(fMaxRows);
|
|
fAggregated = false;
|
|
}
|
|
|
|
std::string RowAggStorage::makeDumpFilename(int32_t gen) const
|
|
{
|
|
char fname[PATH_MAX];
|
|
uint16_t rgen = gen < 0 ? fGeneration : gen;
|
|
snprintf(fname, sizeof(fname), "%s/AggMap-p%u-t%p-g%u", fTmpDir.c_str(), getpid(), fUniqId, rgen);
|
|
return fname;
|
|
}
|
|
|
|
void RowAggStorage::dumpInternalData() const
|
|
{
|
|
if (!fCurData->fInfo)
|
|
return;
|
|
|
|
messageqcpp::ByteStream bs;
|
|
bs << fCurData->fSize;
|
|
bs << fCurData->fMask;
|
|
bs << fCurData->fMaxSize;
|
|
bs << fCurData->hashMultiplier_;
|
|
bs << fCurData->fInfoInc;
|
|
bs << fCurData->fInfoHashShift;
|
|
bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
|
|
int fd = open(makeDumpFilename().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
|
if (fd < 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
|
|
int errNo;
|
|
if ((errNo = writeData(fd, (const char*)bs.buf(), bs.length())) != 0)
|
|
{
|
|
close(fd);
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
close(fd);
|
|
}
|
|
|
|
void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
|
{
|
|
if (fAggregated || fGeneration == 0 || !fEnabledDiskAggregation)
|
|
{
|
|
cleanup();
|
|
return;
|
|
}
|
|
|
|
Row tmpKeyRow;
|
|
fKeysStorage->initRow(tmpKeyRow);
|
|
Row tmpRow;
|
|
fStorage->initRow(tmpRow);
|
|
dumpInternalData();
|
|
fCurData->fHashes->dump();
|
|
fStorage->dumpAll();
|
|
if (fExtKeys)
|
|
fKeysStorage->dumpAll();
|
|
|
|
uint16_t curGen = fGeneration + 1;
|
|
while (--curGen > 0)
|
|
{
|
|
bool genUpdated = false;
|
|
if (fCurData->fSize == 0)
|
|
{
|
|
fStorage->dumpFinalizedInfo();
|
|
if (fExtKeys)
|
|
fKeysStorage->dumpFinalizedInfo();
|
|
cleanup(curGen);
|
|
loadGeneration(curGen - 1);
|
|
auto oh = std::move(fCurData->fHashes);
|
|
fCurData->fHashes = oh->clone(0, curGen - 1, true);
|
|
fStorage.reset(fStorage->clone(curGen - 1));
|
|
if (fExtKeys)
|
|
{
|
|
fRealKeysStorage.reset(fRealKeysStorage->clone(curGen - 1));
|
|
fKeysStorage = fRealKeysStorage.get();
|
|
}
|
|
else
|
|
fKeysStorage = fStorage.get();
|
|
continue;
|
|
}
|
|
|
|
std::unique_ptr<RowGroupStorage> prevRowStorage;
|
|
std::unique_ptr<RowGroupStorage> prevRealKeyRowStorage;
|
|
RowGroupStorage* prevKeyRowStorage{nullptr};
|
|
|
|
for (uint16_t prevGen = 0; prevGen < curGen; ++prevGen)
|
|
{
|
|
prevRowStorage.reset(fStorage->clone(prevGen));
|
|
if (fExtKeys)
|
|
{
|
|
prevRealKeyRowStorage.reset(fKeysStorage->clone(prevGen));
|
|
prevKeyRowStorage = prevRealKeyRowStorage.get();
|
|
}
|
|
else
|
|
prevKeyRowStorage = prevRowStorage.get();
|
|
|
|
if (fExtKeys)
|
|
{
|
|
fKeysRowGroup->initRow(&tmpKeyRow);
|
|
}
|
|
auto& prevRow = fExtKeys ? tmpKeyRow : tmpRow;
|
|
|
|
// iterate over rows in prev generation
|
|
for (uint64_t rowidx = 0; prevKeyRowStorage->getRowForFinalization(rowidx, prevRow); ++rowidx)
|
|
{
|
|
auto [rgid, rid] = rowIdxToGidRid(rowidx, prevKeyRowStorage->getRowsPerRG());
|
|
if (rgid != 0 && rid == 0)
|
|
{
|
|
// start next RGData. At this point we don't need the previous one and can free the data
|
|
// as it has no changes to dump
|
|
prevKeyRowStorage->dropRGData(rgid - 1);
|
|
if (fExtKeys) {
|
|
prevKeyRowStorage->dropRGData(rgid - 1);
|
|
}
|
|
}
|
|
|
|
if (prevKeyRowStorage->isFinalized(rowidx))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// TODO: store hashes in the RowGroupStorage?
|
|
uint64_t hash = hashRow(prevRow, fLastKeyCol);
|
|
auto [info, idx] = rowHashToIdx(hash);
|
|
nextWhileLess(info, idx);
|
|
constexpr uint64_t NOTFOUND_IDX = -1;
|
|
uint64_t curidx = NOTFOUND_IDX;
|
|
while (info == fCurData->fInfo[idx])
|
|
{
|
|
auto& pos = fCurData->fHashes->get(idx);
|
|
if (pos.hash == hash)
|
|
{
|
|
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
|
|
fKeysStorage->getRow(pos.idx, keyRow);
|
|
if (prevRow.equals(keyRow, fLastKeyCol))
|
|
{
|
|
// Hallelujah!
|
|
curidx = pos.idx;
|
|
break;
|
|
}
|
|
}
|
|
|
|
next(info, idx);
|
|
}
|
|
|
|
if (curidx == NOTFOUND_IDX)
|
|
{
|
|
// nothing was found, go to the next row
|
|
continue;
|
|
}
|
|
|
|
if (UNLIKELY(fKeysStorage->isFinalized(curidx)))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// here it is! So:
|
|
// 1 Get actual row datas
|
|
// 2 Merge it into the current generation
|
|
// 3 Mark generations as updated
|
|
// 4 Mark the prev generation row as finalized
|
|
if (fExtKeys)
|
|
{
|
|
prevRowStorage->getRow(rowidx, tmpRow);
|
|
fStorage->getRow(curidx, rowOut);
|
|
}
|
|
mergeFunc(tmpRow);
|
|
genUpdated = true;
|
|
prevKeyRowStorage->markFinalized(rowidx);
|
|
if (fExtKeys)
|
|
{
|
|
prevRowStorage->markFinalized(rowidx);
|
|
}
|
|
}
|
|
|
|
prevRowStorage->dumpFinalizedInfo();
|
|
if (fExtKeys)
|
|
{
|
|
prevKeyRowStorage->dumpFinalizedInfo();
|
|
}
|
|
}
|
|
|
|
fStorage->dumpFinalizedInfo();
|
|
if (fExtKeys)
|
|
fKeysStorage->dumpFinalizedInfo();
|
|
if (genUpdated)
|
|
{
|
|
// we have to save current generation data to disk 'cause we update some rows
|
|
// TODO: to reduce disk IO we can dump only affected rowgroups
|
|
fStorage->dumpAll(false);
|
|
if (fExtKeys)
|
|
fKeysStorage->dumpAll(false);
|
|
}
|
|
|
|
// load previous generation (reusing RowGroupStorages)
|
|
loadGeneration(curGen - 1);
|
|
auto oh = std::move(fCurData->fHashes);
|
|
fCurData->fHashes = oh->clone(0, curGen - 1, true);
|
|
|
|
fStorage = std::move(prevRowStorage);
|
|
if (fExtKeys)
|
|
{
|
|
fRealKeysStorage = std::move(prevRealKeyRowStorage);
|
|
fKeysStorage = fRealKeysStorage.get();
|
|
}
|
|
else
|
|
fKeysStorage = prevKeyRowStorage;
|
|
}
|
|
|
|
fStorage->dumpFinalizedInfo();
|
|
if (fExtKeys)
|
|
fKeysStorage->dumpFinalizedInfo();
|
|
auto oh = std::move(fCurData->fHashes);
|
|
fCurData->fHashes = oh->clone(fCurData->fMask + 1, fGeneration, true);
|
|
fStorage.reset(fStorage->clone(fGeneration));
|
|
if (fExtKeys)
|
|
{
|
|
fRealKeysStorage.reset(fRealKeysStorage->clone(fGeneration));
|
|
fKeysStorage = fRealKeysStorage.get();
|
|
}
|
|
else
|
|
fKeysStorage = fStorage.get();
|
|
|
|
fAggregated = true;
|
|
}
|
|
|
|
void RowAggStorage::loadGeneration(uint16_t gen)
|
|
{
|
|
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->hashMultiplier_,
|
|
fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo);
|
|
}
|
|
|
|
void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize,
|
|
size_t& hashMultiplier, uint32_t& infoInc, uint32_t& infoHashShift,
|
|
std::unique_ptr<uint8_t[]>& info)
|
|
{
|
|
messageqcpp::ByteStream bs;
|
|
int fd = open(makeDumpFilename(gen).c_str(), O_RDONLY);
|
|
if (fd < 0)
|
|
{
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
struct stat st
|
|
{
|
|
};
|
|
fstat(fd, &st);
|
|
bs.needAtLeast(st.st_size);
|
|
bs.restart();
|
|
int errNo;
|
|
if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0)
|
|
{
|
|
close(fd);
|
|
throw logging::IDBExcept(
|
|
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
|
|
logging::ERR_DISKAGG_FILEIO_ERROR);
|
|
}
|
|
close(fd);
|
|
bs.advanceInputPtr(st.st_size);
|
|
|
|
bs >> size;
|
|
bs >> mask;
|
|
bs >> maxSize;
|
|
bs >> hashMultiplier;
|
|
bs >> infoInc;
|
|
bs >> infoHashShift;
|
|
size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize));
|
|
info.reset(new uint8_t[infoSz]());
|
|
uint8_t* tmp = info.get();
|
|
bs >> tmp;
|
|
}
|
|
|
|
void RowAggStorage::cleanupAll() noexcept
|
|
{
|
|
try
|
|
{
|
|
boost::filesystem::remove_all(fTmpDir);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void RowAggStorage::cleanup()
|
|
{
|
|
cleanup(fGeneration);
|
|
}
|
|
|
|
void RowAggStorage::cleanup(uint16_t gen)
|
|
{
|
|
if (!fInitialized)
|
|
return;
|
|
unlink(makeDumpFilename(gen).c_str());
|
|
}
|
|
|
|
size_t RowAggStorage::getBucketSize()
|
|
{
|
|
return 1 /* info byte */ + sizeof(RowPosHash);
|
|
}
|
|
|
|
uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets,
|
|
uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize,
|
|
bool enabledDiskAggr)
|
|
{
|
|
if (availMem < 0)
|
|
{
|
|
// Most likely, nothing can be processed, but we will still try
|
|
return 1;
|
|
}
|
|
uint32_t ret = numOfBuckets;
|
|
|
|
ssize_t minNeededMemPerThread = groupsPerThread * inRowSize * RowAggStorage::getMaxRows(false);
|
|
auto rowGroupSize = RowAggStorage::getMaxRows(enabledDiskAggr);
|
|
ssize_t minNeededMemPerBucket =
|
|
outRowSize * rowGroupSize * 2 +
|
|
RowAggStorage::calcSizeWithBuffer(rowGroupSize, rowGroupSize) * RowAggStorage::getBucketSize();
|
|
if ((availMem - minNeededMemPerThread * numOfThreads) / numOfBuckets < minNeededMemPerBucket)
|
|
{
|
|
ret = 0;
|
|
if (availMem > minNeededMemPerThread * numOfThreads)
|
|
{
|
|
ret = (availMem - minNeededMemPerThread * numOfThreads) / minNeededMemPerBucket;
|
|
}
|
|
|
|
if (ret < numOfThreads)
|
|
{
|
|
ret = availMem / (minNeededMemPerBucket + minNeededMemPerThread);
|
|
}
|
|
}
|
|
|
|
return ret == 0 ? 1 : ret;
|
|
}
|
|
|
|
} // namespace rowgroup
|