1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(PrimProc): MCOL-5852 disk-based GROUP_CONCAT & JSON_ARRAYAGG

* move GROUP_CONCAT/JSON_ARRAYAGG storage to the RowGroup from
  the RowAggregation*
* internal data structures (de)serialization
* get rid of a specialized classes for processing JSON_ARRAYAGG
* move the memory accounting to disk-based aggregation classes
* allow aggregation generations to be used for queries with
  GROUP_CONCAT/JSON_ARRAYAGG
* Remove the thread id from the error message as it interferes with the mtr
This commit is contained in:
Aleksei Antipovskii
2025-02-19 12:32:51 +01:00
committed by Alexey Antipovsky
parent 87d47fd7ae
commit 4bea7e59a0
25 changed files with 1339 additions and 2056 deletions

View File

@ -431,7 +431,6 @@ ByteStream& ByteStream::operator>>(utils::NullString& s)
return *this;
}
ByteStream& ByteStream::operator>>(uint8_t*& bpr)
{
peek(bpr);

View File

@ -35,7 +35,6 @@
#include "mcs_basic_types.h"
#include "resourcemanager.h"
#include "groupconcat.h"
#include "jsonarrayagg.h"
#include "blocksize.h"
#include "errorcodes.h"
@ -537,6 +536,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs)
, fRm(rhs.fRm)
, fSessionMemLimit(rhs.fSessionMemLimit)
, fRollupFlag(rhs.fRollupFlag)
, fGroupConcat(rhs.fGroupConcat)
{
fGroupByCols.assign(rhs.fGroupByCols.begin(), rhs.fGroupByCols.end());
fFunctionCols.assign(rhs.fFunctionCols.begin(), rhs.fFunctionCols.end());
@ -661,14 +661,17 @@ void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColsIdx
//------------------------------------------------------------------------------
void RowAggregation::initialize(bool hasGroupConcat)
{
if (hasGroupConcat)
{
fRowGroupOut->setUseAggregateDataStore(true, fGroupConcat);
}
// Calculate the length of the hashmap key.
fAggMapKeyCount = fGroupByCols.size();
bool disk_agg = fRm ? fRm->getAllowDiskAggregation() : false;
bool allow_gen = true;
for (auto& fun : fFunctionCols)
{
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
fun->fAggFunction == ROWAGG_JSON_ARRAY)
if (fun->fAggFunction == ROWAGG_UDAF)
{
allow_gen = false;
break;
@ -757,8 +760,7 @@ void RowAggregation::aggReset()
bool allow_gen = true;
for (auto& fun : fFunctionCols)
{
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
fun->fAggFunction == ROWAGG_JSON_ARRAY)
if (fun->fAggFunction == ROWAGG_UDAF)
{
allow_gen = false;
break;
@ -1884,9 +1886,9 @@ void RowAggregation::mergeEntries(const Row& rowIn)
case ROWAGG_DUP_AVG:
case ROWAGG_DUP_STATS:
case ROWAGG_DUP_UDAF:
case ROWAGG_CONSTANT:
case ROWAGG_CONSTANT: break;
case ROWAGG_JSON_ARRAY:
case ROWAGG_GROUP_CONCAT: break;
case ROWAGG_GROUP_CONCAT: mergeGroupConcat(rowIn, colOut); break;
case ROWAGG_UDAF: doUDAF(rowIn, colOut, colOut, colOut + 1, i); break;
@ -2138,6 +2140,12 @@ void RowAggregation::mergeStatistics(const Row& rowIn, uint64_t colOut, uint64_t
colAux + 1);
}
void RowAggregation::mergeGroupConcat(const Row& rowIn, uint64_t colOut)
{
auto* gccAg = fRow.getAggregateData(colOut);
gccAg->merge(rowIn, colOut);
}
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
uint64_t& funcColsIdx, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
{
@ -2540,7 +2548,6 @@ RowAggregationUM::RowAggregationUM(const RowAggregationUM& rhs)
, fExpression(rhs.fExpression)
, fTotalMemUsage(rhs.fTotalMemUsage)
, fConstantAggregate(rhs.fConstantAggregate)
, fGroupConcat(rhs.fGroupConcat)
, fLastMemUsage(rhs.fLastMemUsage)
{
}
@ -2626,28 +2633,19 @@ void RowAggregationUM::attachGroupConcatAg()
{
if (fGroupConcat.size() > 0)
{
uint8_t* data = fRow.getData();
uint64_t i = 0, j = 0;
uint64_t gc_idx = 0;
for (; i < fFunctionColGc.size(); i++)
for (uint64_t i = 0; i < fFunctionColGc.size(); i++)
{
if (fFunctionColGc[i]->fAggFunction != ROWAGG_GROUP_CONCAT &&
fFunctionColGc[i]->fAggFunction != ROWAGG_JSON_ARRAY)
{
continue;
}
int64_t colOut = fFunctionColGc[i]->fOutputColumnIndex;
if (fFunctionColGc[i]->fAggFunction == ROWAGG_GROUP_CONCAT)
{
// save the object's address in the result row
SP_GroupConcatAg gcc(new joblist::GroupConcatAgUM(fGroupConcat[j++]));
fGroupConcatAg.push_back(gcc);
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
}
if (fFunctionColGc[i]->fAggFunction == ROWAGG_JSON_ARRAY)
{
// save the object's address in the result row
SP_GroupConcatAg gcc(new joblist::JsonArrayAggregatAgUM(fGroupConcat[j++]));
fGroupConcatAg.push_back(gcc);
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
}
joblist::SP_GroupConcatAg gcc(new joblist::GroupConcatAg(
fGroupConcat[gc_idx++], fFunctionColGc[i]->fAggFunction == ROWAGG_JSON_ARRAY));
fRow.setAggregateData(gcc, colOut);
}
}
}
@ -2706,14 +2704,9 @@ void RowAggregationUM::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1
}
case ROWAGG_GROUP_CONCAT:
{
doGroupConcat(rowIn, colIn, colOut);
break;
}
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
doGroupConcat(rowIn, colIn, colOut);
break;
}
@ -2756,15 +2749,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1
//------------------------------------------------------------------------------
void RowAggregationUM::doGroupConcat(const Row& rowIn, int64_t, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
gccAg->processRow(rowIn);
}
void RowAggregationUM::doJsonAgg(const Row& rowIn, int64_t, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
auto* gccAg = fRow.getAggregateData(o);
gccAg->processRow(rowIn);
}
@ -4158,30 +4143,17 @@ void RowAggregationUM::setGroupConcatString()
for (uint64_t i = 0; i < fRowGroupOut->getRowCount(); i++, fRow.nextRow())
{
for (uint64_t j = 0; j < fFunctionCols.size(); j++)
for (const auto& fcall : fFunctionCols)
{
uint8_t* data = fRow.getData();
if (fFunctionCols[j]->fAggFunction == ROWAGG_GROUP_CONCAT)
if (fcall->fAggFunction != ROWAGG_GROUP_CONCAT && fcall->fAggFunction != ROWAGG_JSON_ARRAY)
{
uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex);
uint8_t* gcString;
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)buff);
gcString = gccAg->getResult();
utils::ConstString str((char*)gcString, gcString ? strlen((const char*)gcString) : 0);
fRow.setStringField(str, fFunctionCols[j]->fOutputColumnIndex);
// gccAg->getResult(buff);
continue;
}
if (fFunctionCols[j]->fAggFunction == ROWAGG_JSON_ARRAY)
{
uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex);
uint8_t* gcString;
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)buff);
gcString = gccAg->getResult();
utils::ConstString str((char*)gcString, gcString ? strlen((char*)gcString) : 0);
fRow.setStringField(str, fFunctionCols[j]->fOutputColumnIndex);
}
auto* gccAg = fRow.getAggregateData(fcall->fOutputColumnIndex);
uint8_t* gcString = gccAg->getResult();
utils::ConstString str((char*)gcString, gcString ? strlen((const char*)gcString) : 0);
fRow.setStringField(str, fcall->fOutputColumnIndex);
}
}
}
@ -4306,14 +4278,9 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcs
}
case ROWAGG_GROUP_CONCAT:
{
doGroupConcat(rowIn, colIn, colOut);
break;
}
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
doGroupConcat(rowIn, colIn, colOut);
break;
}
@ -4537,15 +4504,7 @@ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t c
//------------------------------------------------------------------------------
void RowAggregationUMP2::doGroupConcat(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
gccAg->merge(rowIn, i);
}
void RowAggregationUMP2::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
auto* gccAg = fRow.getAggregateData(o);
gccAg->merge(rowIn, i);
}
@ -4803,14 +4762,9 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn, std::vector<mcsv1sdk:
}
case ROWAGG_GROUP_CONCAT:
{
doGroupConcat(rowIn, colIn, colOut);
break;
}
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
doGroupConcat(rowIn, colIn, colOut);
break;
}
@ -4943,17 +4897,10 @@ void RowAggregationSubDistinct::addRowGroup(const RowGroup* pRows,
//------------------------------------------------------------------------------
void RowAggregationSubDistinct::doGroupConcat(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
auto* gccAg = fRow.getAggregateData(o);
gccAg->merge(rowIn, i);
}
void RowAggregationSubDistinct::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
gccAg->merge(rowIn, i);
}
//------------------------------------------------------------------------------
// Constructor / destructor
//------------------------------------------------------------------------------
@ -5129,12 +5076,122 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(
fOrigFunctionCols = nullptr;
}
GroupConcatAg::GroupConcatAg(SP_GroupConcat& gcc) : fGroupConcat(gcc)
void GroupConcat::serialize(messageqcpp::ByteStream& bs) const
{
uint64_t size;
size = fGroupCols.size();
bs << size;
for (const auto& [k, v] : fGroupCols)
{
bs << k;
bs << v;
}
size = fOrderCols.size();
bs << size;
for (const auto& [k, v] : fOrderCols)
{
bs << k;
bs << static_cast<uint8_t>(v);
}
bs << fSeparator;
size = fConstCols.size();
bs << size;
for (const auto& [k, v] : fConstCols)
{
bs << k;
bs << v;
}
bs << static_cast<uint8_t>(fDistinct);
bs << fSize;
fRowGroup.serialize(bs);
size = fRowGroup.getColumnCount() * sizeof(int);
bs << size;
bs.append(reinterpret_cast<uint8_t*>(fMapping.get()), size);
size = fOrderCond.size();
bs << size;
for (const auto& [k, v] : fOrderCond)
{
bs << k;
bs << static_cast<uint8_t>(v);
}
bs << fTimeZone;
bs << id;
}
GroupConcatAg::~GroupConcatAg()
void GroupConcat::deserialize(messageqcpp::ByteStream& bs)
{
fGroupCols.clear();
fOrderCols.clear();
fConstCols.clear();
fOrderCond.clear();
RGDataSizeType size;
bs >> size;
fGroupCols.reserve(size);
for (RGDataSizeType i = 0; i < size; ++i)
{
uint32_t f, s;
bs >> f;
bs >> s;
fGroupCols.emplace_back(f, s);
}
bs >> size;
fOrderCols.reserve(size);
for (RGDataSizeType i = 0; i < size; ++i)
{
uint32_t f;
bs >> f;
uint8_t s;
bs >> s;
fOrderCond.emplace_back(f, static_cast<bool>(s));
}
bs >> fSeparator;
bs >> size;
fConstCols.reserve(size);
for (RGDataSizeType i = 0; i < size; ++i)
{
utils::NullString f;
bs >> f;
uint32_t s;
bs >> s;
fConstCols.emplace_back(f, s);
}
uint8_t tmp8;
bs >> tmp8;
fDistinct = tmp8;
bs >> fSize;
fRowGroup.deserialize(bs);
bs >> size;
idbassert(size % sizeof(int) == 0);
fMapping.reset(new int[size / 4]);
memcpy(fMapping.get(), bs.buf(), size);
bs.advance(size);
bs >> size;
fOrderCond.reserve(size);
for (RGDataSizeType i = 0; i < size; ++i)
{
int f;
bs >> f;
uint8_t s;
bs >> s;
fOrderCond.emplace_back(f, static_cast<bool>(s));
}
bs >> fTimeZone;
bs >> id;
}
RGDataSizeType GroupConcat::getDataSize() const
{
RGDataSizeType size = 0;
size += fGroupCols.capacity() * 8;
size += fOrderCols.capacity() * 8;
size += fSeparator.capacity();
size += fConstCols.capacity() * (4 + sizeof(utils::NullString));
size += fRowGroup.getEmptySize();
size += fRowGroup.getColumnCount() * sizeof(int);
size += fOrderCols.capacity() * 8;
return size;
}
} // namespace rowgroup

View File

@ -327,7 +327,7 @@ struct ConstantAggData
typedef boost::shared_ptr<RowAggGroupByCol> SP_ROWAGG_GRPBY_t;
typedef boost::shared_ptr<RowAggFunctionCol> SP_ROWAGG_FUNC_t;
struct GroupConcat
struct GroupConcat : public messageqcpp::Serializeable
{
// GROUP_CONCAT(DISTINCT col1, 'const', col2 ORDER BY col3 desc SEPARATOR 'sep')
std::vector<std::pair<uint32_t, uint32_t>> fGroupCols; // columns to concatenate, and position
@ -340,38 +340,26 @@ struct GroupConcat
RowGroup fRowGroup;
std::shared_ptr<int[]> fMapping;
std::vector<std::pair<int, bool>> fOrderCond; // position to order by [asc/desc]
joblist::ResourceManager* fRm; // resource manager
boost::shared_ptr<int64_t> fSessionMemLimit;
long fTimeZone;
uint32_t id;
GroupConcat() : fRm(nullptr)
GroupConcat() = default;
GroupConcat(joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sessLimit)
: fRm(rm)
, fSessionMemLimit(sessLimit)
{
}
void serialize(messageqcpp::ByteStream& bs) const override;
void deserialize(messageqcpp::ByteStream& bs) override;
RGDataSizeType getDataSize() const;
joblist::ResourceManager* fRm{nullptr};
boost::shared_ptr<int64_t> fSessionMemLimit;
};
typedef boost::shared_ptr<GroupConcat> SP_GroupConcat;
class GroupConcatAg
{
public:
explicit GroupConcatAg(SP_GroupConcat&);
virtual ~GroupConcatAg();
virtual void initialize() {};
virtual void processRow(const rowgroup::Row&) {};
virtual void merge(const rowgroup::Row&, uint64_t) {};
virtual uint8_t* getResult()
{
return nullptr;
}
protected:
rowgroup::SP_GroupConcat fGroupConcat;
};
typedef boost::shared_ptr<GroupConcatAg> SP_GroupConcatAg;
//------------------------------------------------------------------------------
/** @brief Class that aggregates RowGroups.
*/
@ -555,6 +543,8 @@ class RowAggregation : public messageqcpp::Serializeable
virtual void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false);
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
void mergeStatistics(const Row&, uint64_t colOut, uint64_t colAux);
void mergeGroupConcat(const Row& rowIn, uint64_t colOut);
virtual void doBitOp(const Row&, int64_t, int64_t, int);
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
@ -650,6 +640,8 @@ class RowAggregation : public messageqcpp::Serializeable
std::string fTmpDir =
config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
std::vector<SP_GroupConcat> fGroupConcat;
};
//------------------------------------------------------------------------------
@ -794,7 +786,6 @@ class RowAggregationUM : public RowAggregation
// @bug3362, group_concat
virtual void doGroupConcat(const Row&, int64_t, int64_t);
virtual void doJsonAgg(const Row&, int64_t, int64_t);
virtual void setGroupConcatString();
bool fHasAvg;
@ -814,8 +805,6 @@ class RowAggregationUM : public RowAggregation
std::vector<ConstantAggData> fConstantAggregate;
// @bug3362, group_concat
std::vector<SP_GroupConcat> fGroupConcat;
std::vector<SP_GroupConcatAg> fGroupConcatAg;
std::vector<SP_ROWAGG_FUNC_t> fFunctionColGc;
private:
@ -856,7 +845,6 @@ class RowAggregationUMP2 : public RowAggregationUM
void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override;
void doStatistics(const Row&, int64_t, int64_t, int64_t) override;
void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
void doBitOp(const Row&, int64_t, int64_t, int) override;
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
@ -964,7 +952,6 @@ class RowAggregationSubDistinct : public RowAggregationUM
protected:
// virtual methods from RowAggregationUM
void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
// for groupby columns and the aggregated distinct column
Row fDistRow;
boost::scoped_array<uint8_t> fDistRowData;

View File

@ -29,6 +29,8 @@
// #define NDEBUG
#include <sstream>
#include <iterator>
#include "rowaggregation.h"
using namespace std;
#include <numeric>
@ -43,6 +45,7 @@ using namespace execplan;
#include "rowgroup.h"
#include "dataconvert.h"
#include "columnwidth.h"
#include "groupconcat.h"
namespace rowgroup
{
@ -305,6 +308,72 @@ void UserDataStore::deserialize(ByteStream& bs)
return;
}
void AggregateDataStore::serialize(messageqcpp::ByteStream& bs) const
{
uint64_t size = fGroupConcat.size();
bs << size;
for (const auto& gc : fGroupConcat)
{
gc->serialize(bs);
}
size = fData.size();
bs << size;
for (const auto& gca : fData)
{
bs << gca->getGroupConcatId();
gca->serialize(bs);
}
}
void AggregateDataStore::deserialize(messageqcpp::ByteStream& bs)
{
fGroupConcat.clear();
fData.clear();
uint64_t size;
bs >> size;
fGroupConcat.resize(size);
for (uint64_t i = 0; i < size; i++)
{
fGroupConcat[i].reset(new GroupConcat());
fGroupConcat[i]->deserialize(bs);
}
bs >> size;
fData.resize(size);
for (uint64_t i = 0; i < size; i++)
{
uint32_t gc_id;
bs >> gc_id;
idbassert(gc_id < fGroupConcat.size());
fData[i].reset(new joblist::GroupConcatAg(fGroupConcat[gc_id]));
fData[i]->deserialize(bs);
}
}
uint32_t AggregateDataStore::storeAggregateData(boost::shared_ptr<joblist::GroupConcatAg>& data)
{
fData.emplace_back(data);
return fData.size() - 1;
}
boost::shared_ptr<joblist::GroupConcatAg> AggregateDataStore::getAggregateData(uint32_t pos) const
{
idbassert(pos < fData.size());
return fData[pos];
}
RGDataSizeType AggregateDataStore::getDataSize() const
{
RGDataSizeType size = 0;
for (const auto& gc : fGroupConcat)
{
size += gc->getDataSize();
}
for (const auto& gca : fData)
{
size += gca->getDataSize();
}
return size;
}
RGData::RGData(allocators::CountingAllocator<RGDataBufType>& _alloc) : RGData()
{
@ -316,29 +385,24 @@ RGData::RGData(const RowGroup& rg, uint32_t rowCount)
RGDataSizeType s = rg.getDataSize(rowCount);
rowData.reset(new uint8_t[s]);
if (rg.usesStringTable() && rowCount > 0) {
if (rg.usesStringTable() && rowCount > 0)
{
strings.reset(new StringStore());
strings->useOnlyLongStrings(rg.usesOnlyLongString());
}
if (rg.usesAggregateDataStore())
{
aggregateDataStore.reset(new AggregateDataStore(rg.getGroupConcats()));
}
userDataStore.reset();
columnCount = rg.getColumnCount();
rowSize = rg.getRowSize();
}
RGData::RGData(const RowGroup& rg)
RGData::RGData(const RowGroup& rg) : RGData(rg, rgCommonSize)
{
rowData.reset(new uint8_t[rg.getMaxDataSize()]);
if (rg.usesStringTable())
{
strings.reset(new StringStore());
strings->useOnlyLongStrings(rg.usesOnlyLongString());
}
userDataStore.reset();
columnCount = rg.getColumnCount();
rowSize = rg.getRowSize();
}
@ -371,21 +435,28 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
userDataStore.reset();
if (rg.usesStringTable())
if (rg.usesStringTable() || rg.usesOnlyLongString())
{
if (alloc)
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = alloc.value();
strings.reset(new StringStore(ssAlloc));
strings->useOnlyLongStrings(rg.usesOnlyLongString());
}
else
{
strings.reset(new StringStore());
}
}
else
strings.reset();
if (rg.usesAggregateDataStore())
{
aggregateDataStore.reset(new AggregateDataStore(rg.getGroupConcats()));
}
else
aggregateDataStore.reset();
columnCount = rg.getColumnCount();
rowSize = rg.getRowSize();
}
@ -419,6 +490,14 @@ void RGData::serialize(ByteStream& bs, RGDataSizeType amount) const
}
else
bs << (uint8_t)0;
if (aggregateDataStore)
{
bs << (uint8_t)1;
aggregateDataStore->serialize(bs);
}
else
bs << (uint8_t)0;
}
void RGData::deserialize(ByteStream& bs, RGDataSizeType defAmount)
@ -473,6 +552,15 @@ void RGData::deserialize(ByteStream& bs, RGDataSizeType defAmount)
}
else
userDataStore.reset();
bs >> tmp8;
if (tmp8)
{
aggregateDataStore.reset(new AggregateDataStore());
aggregateDataStore->deserialize(bs);
}
else
aggregateDataStore.reset();
}
return;
@ -1133,8 +1221,10 @@ RowGroup::RowGroup(const RowGroup& r)
, precision(r.precision)
, rgData(r.rgData)
, strings(r.strings)
, aggregateDataStore(r.aggregateDataStore)
, useStringTable(r.useStringTable)
, useOnlyLongStrings(r.useOnlyLongStrings)
, useAggregateDataStore(r.useAggregateDataStore)
, hasCollation(r.hasCollation)
, hasLongStringField(r.hasLongStringField)
, sTableThreshold(r.sTableThreshold)
@ -1166,8 +1256,10 @@ RowGroup& RowGroup::operator=(const RowGroup& r)
precision = r.precision;
rgData = r.rgData;
strings = r.strings;
aggregateDataStore = r.aggregateDataStore;
useStringTable = r.useStringTable;
useOnlyLongStrings = r.useOnlyLongStrings;
useAggregateDataStore = r.useAggregateDataStore;
hasCollation = r.hasCollation;
hasLongStringField = r.hasLongStringField;
sTableThreshold = r.sTableThreshold;
@ -1261,6 +1353,25 @@ void RowGroup::deserialize(ByteStream& bs)
charsets.insert(charsets.begin(), charsetNumbers.size(), nullptr);
}
void RowGroup::setUseAggregateDataStore(bool b, std::span<boost::shared_ptr<GroupConcat>> group_concats)
{
idbassert(!b || !group_concats.empty());
if (useAggregateDataStore && !b)
{
fGroupConcats.clear();
}
else if (b)
{
fGroupConcats.assign(group_concats.begin(), group_concats.end());
if (rgData)
{
rgData->aggregateDataStore.reset(new AggregateDataStore(fGroupConcats));
aggregateDataStore = rgData->aggregateDataStore.get();
}
}
useAggregateDataStore = b;
}
void RowGroup::serializeRGData(ByteStream& bs) const
{
rgData->serialize(bs, getDataSize());

View File

@ -27,6 +27,7 @@
#pragma once
#include <span>
#include <vector>
#include <string>
#include <stdexcept>
@ -60,6 +61,10 @@
#include "execinfo.h"
// Workaround for my_global.h #define of isnan(X) causing a std::std namespace
namespace joblist
{
class GroupConcatAg;
}
namespace rowgroup
{
@ -172,8 +177,14 @@ class StringStore
{
return fUseStoreStringMutex;
}
void useOnlyLongStrings(bool b) { fUseOnlyLongStrings = b; }
bool useOnlyLongStrings() const { return fUseOnlyLongStrings; }
void useOnlyLongStrings(bool b)
{
fUseOnlyLongStrings = b;
}
bool useOnlyLongStrings() const
{
return fUseOnlyLongStrings;
}
// This is an overlay b/c the underlying data needs to be any size,
// and alloc'd in one chunk. data can't be a separate dynamic chunk.
@ -256,6 +267,36 @@ class UserDataStore
boost::mutex fMutex;
};
struct GroupConcat;
class AggregateDataStore
{
public:
AggregateDataStore() = default;
explicit AggregateDataStore(const std::vector<boost::shared_ptr<GroupConcat>>& groupConcat)
: fGroupConcat(groupConcat)
{
}
~AggregateDataStore() = default;
AggregateDataStore(const AggregateDataStore&) = delete;
AggregateDataStore(AggregateDataStore&&) = delete;
AggregateDataStore& operator=(const AggregateDataStore&) = delete;
AggregateDataStore& operator=(AggregateDataStore&&) = delete;
void serialize(messageqcpp::ByteStream&) const;
void deserialize(messageqcpp::ByteStream&);
uint32_t storeAggregateData(boost::shared_ptr<joblist::GroupConcatAg>& data);
boost::shared_ptr<joblist::GroupConcatAg> getAggregateData(uint32_t pos) const;
RGDataSizeType getDataSize() const;
private:
friend class RGData;
std::vector<boost::shared_ptr<GroupConcat>> fGroupConcat;
std::vector<boost::shared_ptr<joblist::GroupConcatAg>> fData;
};
class RowGroup;
class Row;
@ -331,6 +372,7 @@ class RGData
boost::shared_ptr<RGDataBufType> rowData;
boost::shared_ptr<StringStore> strings;
std::shared_ptr<UserDataStore> userDataStore;
std::shared_ptr<AggregateDataStore> aggregateDataStore;
std::optional<allocators::CountingAllocator<RGDataBufType>> alloc = {};
// Need sig to support backward compat. RGData can deserialize both forms.
@ -356,9 +398,14 @@ class Row
inline Pointer(uint8_t* d, StringStore* s, UserDataStore* u) : data(d), strings(s), userDataStore(u)
{
}
inline Pointer(uint8_t* d, StringStore* s, UserDataStore* u, AggregateDataStore* a)
: data(d), strings(s), userDataStore(u), aggregateDataStore(a)
{
}
uint8_t* data = nullptr;
StringStore* strings = nullptr;
UserDataStore* userDataStore = nullptr;
AggregateDataStore* aggregateDataStore = nullptr;
};
Row() = default;
@ -526,6 +573,8 @@ class Row
inline boost::shared_ptr<mcsv1sdk::UserData> getUserData(uint32_t colIndex) const;
inline void setUserData(mcsv1sdk::mcsv1Context& context, boost::shared_ptr<mcsv1sdk::UserData> userData,
uint32_t len, uint32_t colIndex);
inline void setAggregateData(boost::shared_ptr<joblist::GroupConcatAg> data, uint32_t colIndex);
inline joblist::GroupConcatAg* getAggregateData(uint32_t colIndex) const;
uint64_t getNullValue(uint32_t colIndex) const;
bool isNullValue(uint32_t colIndex) const;
@ -638,14 +687,15 @@ class Row
bool hasLongStringField = false;
uint32_t sTableThreshold = 20;
std::shared_ptr<bool[]> forceInline;
UserDataStore* userDataStore = nullptr; // For UDAF
UserDataStore* userDataStore = nullptr; // For UDAF
AggregateDataStore* aggregateDataStore = nullptr; // group_concat & json_arrayagg
friend class RowGroup;
};
inline Row::Pointer Row::getPointer() const
{
return Pointer(data, strings, userDataStore);
return Pointer(data, strings, userDataStore, aggregateDataStore);
}
inline uint8_t* Row::getData() const
{
@ -665,6 +715,7 @@ inline void Row::setPointer(const Pointer& p)
}
userDataStore = p.userDataStore;
aggregateDataStore = p.aggregateDataStore;
}
inline void Row::setData(const Pointer& p)
@ -1258,7 +1309,7 @@ inline void Row::setUintField(uint64_t val, uint32_t colIndex)
template <int len>
inline void Row::setIntField(int64_t val, uint32_t colIndex)
{
// idbassert(getColumnWidth(colIndex) == len);
// idbassert(getColumnWidth(colIndex) == len);
switch (len)
{
case 1: *((int8_t*)&data[offsets[colIndex]]) = val; break;
@ -1362,6 +1413,28 @@ inline void Row::setUserData(mcsv1sdk::mcsv1Context& context, boost::shared_ptr<
*((uint32_t*)&data[offsets[colIndex] + 4]) = len;
}
inline void Row::setAggregateData(boost::shared_ptr<joblist::GroupConcatAg> agData, uint32_t colIndex)
{
if (!aggregateDataStore)
{
throw std::logic_error("Row::getAggregateData: no aggregateDataStore");
}
uint32_t pos = aggregateDataStore->storeAggregateData(agData);
*((uint32_t*)&data[offsets[colIndex]]) = pos;
}
inline joblist::GroupConcatAg* Row::getAggregateData(uint32_t colIndex) const
{
if (!aggregateDataStore)
{
throw std::logic_error("Row::getAggregateData: no aggregateDataStore");
}
uint32_t pos = *((uint32_t*)&data[offsets[colIndex]]);
return aggregateDataStore->getAggregateData(pos).get();
}
inline void Row::copyField(uint32_t destIndex, uint32_t srcIndex) const
{
uint32_t n = offsets[destIndex + 1] - offsets[destIndex];
@ -1559,8 +1632,19 @@ class RowGroup : public messageqcpp::Serializeable
inline bool usesStringTable() const;
inline void setUseStringTable(bool);
void setUseOnlyLongString(bool b) { useOnlyLongStrings = b; }
bool usesOnlyLongString() const { return useOnlyLongStrings ; }
void setUseOnlyLongString(bool b)
{
useOnlyLongStrings = b;
}
bool usesOnlyLongString() const
{
return useOnlyLongStrings;
}
void setUseAggregateDataStore(bool b, std::span<boost::shared_ptr<GroupConcat>> group_concats = {});
bool usesAggregateDataStore() const
{
return useAggregateDataStore;
}
bool hasLongString() const
{
@ -1606,6 +1690,11 @@ class RowGroup : public messageqcpp::Serializeable
const CHARSET_INFO* getCharset(uint32_t col);
const auto& getGroupConcats() const
{
return fGroupConcats;
}
private:
uint32_t columnCount = 0;
uint8_t* data = nullptr;
@ -1632,19 +1721,22 @@ class RowGroup : public messageqcpp::Serializeable
// string table impl
RGData* rgData = nullptr;
StringStore* strings = nullptr; // note, strings and data belong to rgData
AggregateDataStore* aggregateDataStore = nullptr;
bool useStringTable = true;
bool useOnlyLongStrings = false;
bool useAggregateDataStore = true;
bool useAggregateDataStore = false;
bool hasCollation = false;
bool hasLongStringField = false;
uint32_t sTableThreshold = 20;
std::shared_ptr<bool[]> forceInline;
static const uint64_t headerSize = 18;
static const uint64_t rowCountOffset = 0;
static const uint64_t baseRidOffset = 4;
static const uint64_t statusOffset = 12;
static const uint64_t dbRootOffset = 14;
std::vector<boost::shared_ptr<GroupConcat>> fGroupConcats;
static constexpr uint64_t headerSize = 18;
static constexpr uint64_t rowCountOffset = 0;
static constexpr uint64_t baseRidOffset = 4;
static constexpr uint64_t statusOffset = 12;
static constexpr uint64_t dbRootOffset = 14;
};
inline uint64_t convertToRid(const uint32_t& partNum, const uint16_t& segNum, const uint8_t& extentNum,
@ -1700,12 +1792,14 @@ inline void RowGroup::getRow(uint32_t rowNum, Row* r) const
r->data = &(data[headerSize + (rowNum * r->getSize())]);
r->strings = strings;
r->userDataStore = rgData->userDataStore.get();
r->aggregateDataStore = rgData->aggregateDataStore.get();
}
inline void RowGroup::setData(RGData* rgd)
{
data = rgd->rowData.get();
strings = rgd->strings.get();
aggregateDataStore = rgd->aggregateDataStore.get();
rgData = rgd;
}
@ -1792,10 +1886,16 @@ inline uint32_t RowGroup::getRowSizeWithStrings() const
inline RGDataSizeType RowGroup::getSizeWithStrings(uint64_t n) const
{
if (strings == nullptr)
return getDataSize(n);
else
return getDataSize(n) + strings->getSize();
RGDataSizeType ret = getDataSize(n);
if (strings)
{
ret += strings->getSize();
}
if (aggregateDataStore)
{
ret += aggregateDataStore->getDataSize();
}
return ret;
}
inline uint64_t RowGroup::getSizeWithStrings() const
@ -2216,7 +2316,18 @@ inline void RGData::getRow(uint32_t num, Row* row)
idbassert(columnCount == row->getColumnCount() && rowSize == incomingRowSize);
row->setData(
Row::Pointer(&rowData[RowGroup::getHeaderSize() + (num * incomingRowSize)], strings.get(), userDataStore.get()));
Row::Pointer(&rowData[RowGroup::getHeaderSize() + (num * incomingRowSize)], strings.get(),
userDataStore.get(), aggregateDataStore.get()));
}
inline uint64_t rowGidRidToIdx(uint64_t gid, uint32_t rid, uint32_t maxRows)
{
return gid * maxRows + rid;
}
inline std::pair<uint64_t, uint64_t> rowIdxToGidRid(uint64_t idx, uint32_t maxRows)
{
return {idx / maxRows, idx % maxRows};
}
} // namespace rowgroup

View File

@ -584,6 +584,7 @@ class RowGroupStorage
, fUniqId(this)
, fTmpDir(tmpDir)
, fCompressor(compressor)
, fUseDisk(!strict)
{
if (rm)
{
@ -698,7 +699,7 @@ class RowGroupStorage
logging::ERR_AGGREGATION_TOO_BIG);
}
if (fMM->getFree() < memSz * 2)
if (fUseDisk && fMM->getFree() < memSz * 2)
{
saveRG(rgid);
fRGDatas[rgid].reset();
@ -880,8 +881,7 @@ class RowGroupStorage
*/
void getRow(uint64_t idx, Row& row)
{
uint64_t rgid = idx / fMaxRows;
uint64_t rid = idx % fMaxRows;
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
if (UNLIKELY(!fRGDatas[rgid]))
{
loadRG(rgid);
@ -947,7 +947,7 @@ class RowGroupStorage
}
fLRU->add(fCurRgid);
idx = fCurRgid * fMaxRows + fRowGroupOut->getRowCount();
idx = rowGidRidToIdx(fCurRgid, fRowGroupOut->getRowCount(), fMaxRows);
fRowGroupOut->getRow(fRowGroupOut->getRowCount(), &row);
fRowGroupOut->incRowCount();
}
@ -962,7 +962,7 @@ class RowGroupStorage
*/
void putKeyRow(uint64_t idx, Row& row)
{
uint64_t rgid = idx / fMaxRows;
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
while (rgid >= fRGDatas.size())
{
@ -1157,6 +1157,7 @@ class RowGroupStorage
ret->fGeneration = gen;
ret->fCompressor = fCompressor;
ret->fDumper.reset(new Dumper(fCompressor, fMM.get()));
ret->fUseDisk = fUseDisk;
ret->loadFinalizedInfo();
return ret;
}
@ -1165,8 +1166,7 @@ class RowGroupStorage
*/
void markFinalized(uint64_t idx)
{
uint64_t gid = idx / 64;
uint64_t rid = idx % 64;
auto [gid, rid] = rowIdxToGidRid(idx, 64);
if (LIKELY(fFinalizedRows.size() <= gid))
fFinalizedRows.resize(gid + 1, 0ULL);
@ -1176,8 +1176,7 @@ class RowGroupStorage
/** @brief Check if row at specified index was finalized earlier */
bool isFinalized(uint64_t idx) const
{
uint64_t gid = idx / 64;
uint64_t rid = idx % 64;
auto [gid, rid] = rowIdxToGidRid(idx, 64);
if (LIKELY(fFinalizedRows.size() <= gid))
return false;
@ -1324,6 +1323,7 @@ class RowGroupStorage
unlink(fname.c_str());
rgdata.reset(new RGData());
rgdata->deserialize(bs, fRowGroupOut->getDataSize(fMaxRows));
assert(bs.length() == 0);
fRowGroupOut->setData(rgdata.get());
auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
@ -1379,12 +1379,12 @@ class RowGroupStorage
fRowGroupOut->serialize(bs);
char buf[1024];
snprintf(buf, sizeof(buf), "/tmp/kemm/META-p%u-t%p", getpid(), fUniqPtr);
snprintf(buf, sizeof(buf), "%s/META-p%u-t%p", fTmpDir.c_str(), getpid(), fUniqId);
int fd = open(buf, O_WRONLY | O_TRUNC | O_CREAT, 0644);
assert(fd >= 0);
auto r = write(fd, bs.buf(), bs.length());
assert(r == bs.length());
assert(size_t(r) == bs.length());
close(fd);
}
#endif
@ -1421,6 +1421,7 @@ class RowGroupStorage
std::string fTmpDir;
compress::CompressInterface* fCompressor;
std::unique_ptr<Dumper> fDumper;
bool fUseDisk;
};
/** @brief Internal data for the hashmap */

View File

@ -526,11 +526,11 @@ int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
bool CompareRule::less(Row::Pointer r1, Row::Pointer r2)
bool CompareRule::less(Row::Pointer r1, Row::Pointer r2) const
{
for (auto& compare : fCompares)
for (auto* compare : fCompares)
{
int c = ((*compare)(fIdbCompare, r1, r2));
int c = (*compare)(fIdbCompare, r1, r2);
if (c < 0)
return true;

View File

@ -316,7 +316,7 @@ class CompareRule
{
}
bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2);
bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2) const;
void compileRules(const std::vector<IdbSortSpec>&, const rowgroup::RowGroup&);
void revertRules();