diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml
index 98014307e..d8e161301 100644
--- a/oam/etc/Columnstore.xml
+++ b/oam/etc/Columnstore.xml
@@ -521,6 +521,7 @@
N
+
127.0.0.1
diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver
index 483207323..0f70acc4e 100644
--- a/oam/etc/Columnstore.xml.singleserver
+++ b/oam/etc/Columnstore.xml.singleserver
@@ -516,6 +516,7 @@
+
127.0.0.1
diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp
index b799efe46..1eda43bdf 100755
--- a/utils/rowgroup/rowaggregation.cpp
+++ b/utils/rowgroup/rowaggregation.cpp
@@ -686,6 +686,8 @@ void RowAggregation::initialize()
config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
+ string compStr = config->getConfig("RowAggregation", "Compression");
+ auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap)
{
@@ -696,7 +698,8 @@ void RowAggregation::initialize()
fRm,
fSessionMemLimit,
disk_agg,
- allow_gen));
+ allow_gen,
+ compressor));
}
else
{
@@ -706,7 +709,8 @@ void RowAggregation::initialize()
fRm,
fSessionMemLimit,
disk_agg,
- allow_gen));
+ allow_gen,
+ compressor));
}
// Initialize the work row.
@@ -771,6 +775,8 @@ void RowAggregation::aggReset()
config::Config* config = config::Config::makeConfig();
string tmpDir = config->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
+ string compStr = config->getConfig("RowAggregation", "Compression");
+ auto* compressor = compress::getCompressInterfaceByName(compStr);
if (fKeyOnHeap)
{
@@ -781,7 +787,8 @@ void RowAggregation::aggReset()
fRm,
fSessionMemLimit,
disk_agg,
- allow_gen));
+ allow_gen,
+ compressor));
}
else
{
@@ -791,7 +798,8 @@ void RowAggregation::aggReset()
fRm,
fSessionMemLimit,
disk_agg,
- allow_gen));
+ allow_gen,
+ compressor));
}
fRowGroupOut->getRow(0, &fRow);
copyNullRow(fRow);
diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp
index 137ccaeea..4112be0d7 100644
--- a/utils/rowgroup/rowstorage.cpp
+++ b/utils/rowgroup/rowstorage.cpp
@@ -74,6 +74,101 @@ int readData(int fd, char* buf, size_t sz)
return 0;
}
+int writeFile(const std::string& fname, const char* buf, size_t sz,
+ const compress::CompressInterface* compressor)
+{
+ if (sz == 0)
+ return 0;
+
+ int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (UNLIKELY(fd < 0))
+ return errno;
+
+ const char* tmpbuf;
+ std::vector tmpvec;
+ if (compressor)
+ {
+ auto len = compressor->maxCompressedSize(sz);
+ tmpvec.resize(len);
+ compressor->compress(buf, sz, tmpvec.data(), &len);
+ tmpbuf = tmpvec.data();
+ sz = len;
+ }
+ else
+ {
+ tmpbuf = buf;
+ }
+
+ auto to_write = sz;
+ while (to_write > 0)
+ {
+ auto r = write(fd, tmpbuf + sz - to_write, to_write);
+ if (UNLIKELY(r < 0))
+ {
+ if (errno == EAGAIN)
+ continue;
+
+ close(fd);
+ return errno;
+ }
+ assert(size_t(r) <= to_write);
+ to_write -= r;
+ }
+
+ close(fd);
+ return 0;
+}
+
+int readFile(const std::string& fname, std::vector& buf,
+ const compress::CompressInterface* compressor)
+{
+ int fd = open(fname.c_str(), O_RDONLY);
+ if (UNLIKELY(fd < 0))
+ return errno;
+
+ struct stat st{};
+ fstat(fd, &st);
+ size_t sz = st.st_size;
+
+ std::vector tmpbuf;
+ tmpbuf.resize(sz);
+
+ auto to_read = sz;
+ while (to_read > 0)
+ {
+ auto r = read(fd, tmpbuf.data() + sz - to_read, to_read);
+ if (UNLIKELY(r < 0))
+ {
+ if (errno == EAGAIN)
+ continue;
+
+ close(fd);
+ return errno;
+ }
+
+ assert(size_t(r) <= to_read);
+ to_read -= r;
+ }
+ close(fd);
+
+ if (compressor)
+ {
+ size_t len;
+ if (!compressor->getUncompressedSize(tmpbuf.data(), sz, &len))
+ {
+ return EPROTO;
+ }
+ buf.resize(len);
+ compressor->uncompress(tmpbuf.data(), sz, buf.data(), &len);
+ }
+ else
+ {
+ tmpbuf.swap(buf);
+ }
+
+ return 0;
+}
+
std::string errorString(int errNo)
{
char tmp[1024];
@@ -365,6 +460,7 @@ public:
* right now?
* @param strict true -> throw an exception if not enough memory
* false -> deal with it later
+ * @param compressor pointer to CompressInterface impl or nullptr
*/
RowGroupStorage(const std::string& tmpDir,
RowGroup* rowGroupOut,
@@ -372,12 +468,14 @@ public:
joblist::ResourceManager* rm = nullptr,
boost::shared_ptr sessLimit = {},
bool wait = false,
- bool strict = false)
+ bool strict = false,
+ compress::CompressInterface* compressor = nullptr)
: fRowGroupOut(rowGroupOut)
, fMaxRows(maxRows)
, fRGDatas()
, fUniqId(this)
, fTmpDir(tmpDir)
+ , fCompressor(compressor)
{
if (rm)
{
@@ -396,6 +494,7 @@ public:
fMM.reset(new MemManager());
fLRU = std::unique_ptr(new LRUIface());
}
+
auto* curRG = new RGData(*fRowGroupOut, fMaxRows);
fRowGroupOut->setData(curRG);
fRowGroupOut->resetRowGroup(0);
@@ -772,6 +871,7 @@ public:
ret->fMM.reset(fMM->clone());
ret->fUniqId = fUniqId;
ret->fGeneration = gen;
+ ret->fCompressor = fCompressor;
ret->loadFinalizedInfo();
return ret;
}
@@ -964,41 +1064,18 @@ private:
void loadRG(uint64_t rgid, std::unique_ptr& rgdata, bool unlinkDump = false)
{
auto fname = makeRGFilename(rgid);
- int fd = open(fname.c_str(), O_RDONLY);
- if (UNLIKELY(fd < 0))
+
+ std::vector data;
+ int errNo;
+ if ((errNo = readFile(fname, data, fCompressor)) != 0)
{
+ unlink(fname.c_str());
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
- logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
+ logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
- messageqcpp::ByteStream bs;
- try
- {
- struct stat st
- {
- };
- fstat(fd, &st);
-
- bs.needAtLeast(st.st_size);
- bs.restart();
- int errNo;
- if ((errNo = readData(fd, (char*)bs.getInputPtr(), st.st_size)) != 0)
- {
- close(fd);
- unlink(fname.c_str());
- throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
- logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
- logging::ERR_DISKAGG_FILEIO_ERROR);
- }
- bs.advanceInputPtr(st.st_size);
- close(fd);
- }
- catch (...)
- {
- close(fd);
- throw;
- }
+ messageqcpp::ByteStream bs(reinterpret_cast(data.data()), data.size());
if (unlinkDump)
unlink(fname.c_str());
@@ -1044,23 +1121,13 @@ private:
fRowGroupOut->setData(rgdata);
rgdata->serialize(bs, fRowGroupOut->getDataSize());
- int fd = open(makeRGFilename(rgid).c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
- if (UNLIKELY(fd < 0))
- {
- throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
- logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)),
- logging::ERR_DISKAGG_FILEIO_ERROR);
- }
-
int errNo;
- if ((errNo = writeData(fd, (char*)bs.buf(), bs.length())) != 0)
+ if ((errNo = writeFile(makeRGFilename(rgid), (char*)bs.buf(), bs.length(), fCompressor)) != 0)
{
- close(fd);
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(
- logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
+ logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
- close(fd);
}
#ifdef DISK_AGG_DEBUG
@@ -1111,6 +1178,7 @@ private:
uint16_t fGeneration{0};
std::vector fFinalizedRows;
std::string fTmpDir;
+ compress::CompressInterface* fCompressor;
};
/** @brief Internal data for the hashmap */
@@ -1142,9 +1210,11 @@ public:
size_t size,
joblist::ResourceManager* rm,
boost::shared_ptr sessLimit,
- bool enableDiskAgg)
+ bool enableDiskAgg,
+ compress::CompressInterface* compressor)
: fUniqId(this)
, fTmpDir(tmpDir)
+ , fCompressor(compressor)
{
if (rm)
fMM.reset(new RMMemManager(rm, sessLimit, !enableDiskAgg, !enableDiskAgg));
@@ -1216,6 +1286,7 @@ public:
cloned->init(size);
cloned->fUniqId = fUniqId;
cloned->fGeneration = gen;
+ cloned->fCompressor = fCompressor;
if (loadDump)
cloned->load();
return cloned;
@@ -1240,27 +1311,15 @@ public:
void dump()
{
- int fd = open(makeDumpName().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
- if (fd < 0)
- {
- throw logging::IDBExcept(
- logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
- errorString(errno)),
- logging::ERR_DISKAGG_FILEIO_ERROR);
- }
-
int errNo;
size_t sz = fPosHashes.size() * sizeof(decltype(fPosHashes)::value_type);
- if ((errNo = writeData(fd, (char*)fPosHashes.data(), sz)) != 0)
+ if ((errNo = writeFile(makeDumpName(), (char*)fPosHashes.data(), sz, fCompressor)) != 0)
{
- close(fd);
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
-
- close(fd);
}
private:
@@ -1288,29 +1347,18 @@ private:
void load()
{
- int fd = open(makeDumpName().c_str(), O_RDONLY);
- if (fd < 0)
- {
- throw logging::IDBExcept(
- logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
- errorString(errno)),
- logging::ERR_DISKAGG_FILEIO_ERROR);
- }
-
- struct stat st;
- fstat(fd, &st);
- fPosHashes.resize(st.st_size / sizeof(decltype(fPosHashes)::value_type));
int errNo;
- if ((errNo = readData(fd, (char*)fPosHashes.data(), st.st_size)) != 0)
+ std::vector data;
+ if ((errNo = readFile(makeDumpName(), data, fCompressor)) != 0)
{
- close(fd);
throw logging::IDBExcept(
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR,
errorString(errNo)),
logging::ERR_DISKAGG_FILEIO_ERROR);
}
- close(fd);
+ fPosHashes.resize(data.size() / sizeof(decltype(fPosHashes)::value_type));
+ memcpy(fPosHashes.data(), data.data(), data.size());
}
private:
@@ -1319,6 +1367,7 @@ private:
uint16_t fGeneration{0}; ///< current aggregation generation
void* fUniqId; ///< uniq ID to make an uniq dump filename
std::string fTmpDir;
+ compress::CompressInterface* fCompressor;
};
/*---------------------------------------------------------------------------
@@ -1336,13 +1385,15 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
joblist::ResourceManager *rm,
boost::shared_ptr sessLimit,
bool enabledDiskAgg,
- bool allowGenerations)
+ bool allowGenerations,
+ compress::CompressInterface* compressor)
: fMaxRows(getMaxRows(enabledDiskAgg))
, fExtKeys(rowGroupOut != keysRowGroup)
, fLastKeyCol(keyCount - 1)
, fUniqId(this)
, fAllowGenerations(allowGenerations)
, fEnabledDiskAggregation(enabledDiskAgg)
+ , fCompressor(compressor)
, fTmpDir(tmpDir)
, fRowGroupOut(rowGroupOut)
, fKeysRowGroup(keysRowGroup)
@@ -1363,10 +1414,10 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fMM.reset(new MemManager());
fNumOfInputRGPerThread = 1;
}
- fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg));
+ fStorage.reset(new RowGroupStorage(fTmpDir, rowGroupOut, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get()));
if (fExtKeys)
{
- fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg);
+ fKeysStorage = new RowGroupStorage(fTmpDir, keysRowGroup, 1, rm, sessLimit, !enabledDiskAgg, !enabledDiskAgg, fCompressor.get());
}
else
{
@@ -1375,7 +1426,7 @@ RowAggStorage::RowAggStorage(const std::string& tmpDir,
fKeysStorage->initRow(fKeyRow);
fGens.emplace_back(new Data);
fCurData = fGens.back().get();
- fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation));
+ fCurData->fHashes.reset(new RowPosHashStorage(fTmpDir, 0, rm, sessLimit, fEnabledDiskAggregation, fCompressor.get()));
}
RowAggStorage::~RowAggStorage()
@@ -1408,7 +1459,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(),
fMM->getSessionLimit(),
!fEnabledDiskAggregation,
- !fEnabledDiskAggregation));
+ !fEnabledDiskAggregation,
+ fCompressor.get()));
if (fExtKeys)
{
fKeysStorage = new RowGroupStorage(fTmpDir,
@@ -1417,7 +1469,8 @@ bool RowAggStorage::getTargetRow(const Row &row, uint64_t hash, Row &rowOut)
fMM->getResourceManaged(),
fMM->getSessionLimit(),
!fEnabledDiskAggregation,
- !fEnabledDiskAggregation);
+ !fEnabledDiskAggregation,
+ fCompressor.get());
}
else
{
diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h
index 7ac38a1d2..3bb8f1c22 100644
--- a/utils/rowgroup/rowstorage.h
+++ b/utils/rowgroup/rowstorage.h
@@ -20,6 +20,7 @@
#include "resourcemanager.h"
#include "rowgroup.h"
+#include "idbcompress.h"
#include
#include
@@ -51,7 +52,8 @@ public:
joblist::ResourceManager* rm = nullptr,
boost::shared_ptr sessLimit = {},
bool enabledDiskAgg = false,
- bool allowGenerations = false);
+ bool allowGenerations = false,
+ compress::CompressInterface* compressor = nullptr);
RowAggStorage(const std::string& tmpDir,
RowGroup* rowGroupOut,
@@ -59,10 +61,11 @@ public:
joblist::ResourceManager* rm = nullptr,
boost::shared_ptr sessLimit = {},
bool enabledDiskAgg = false,
- bool allowGenerations = false)
+ bool allowGenerations = false,
+ compress::CompressInterface* compressor = nullptr)
: RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount,
rm, std::move(sessLimit),
- enabledDiskAgg, allowGenerations)
+ enabledDiskAgg, allowGenerations, compressor)
{}
~RowAggStorage();
@@ -356,6 +359,7 @@ private:
bool fAggregated = true;
bool fAllowGenerations;
bool fEnabledDiskAggregation;
+ std::unique_ptr fCompressor;
std::string fTmpDir;
bool fInitialized{false};
rowgroup::RowGroup* fRowGroupOut;