1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/tupleaggregatestep.h
Denis Khalikov 467fe0b401 [MCOL-5109] Make a singleton from ServicePrimProc.
This patch makes a singleton from ServicePrimProc.
2022-06-07 13:27:45 +03:00

230 lines
6.6 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tupleaggregatestep.h 9732 2013-08-02 15:56:15Z pleblanc $
#pragma once
#include "jobstep.h"
#include "rowaggregation.h"
#include "threadnaming.h"
#include "../../primitives/primproc/primitiveserverthreadpools.h"
#include <boost/thread.hpp>
namespace joblist
{
// forward reference
struct JobInfo;
/** @brief class TupleAggregateStep
*
*/
class TupleAggregateStep : public JobStep, public TupleDeliveryStep
{
public:
/** @brief TupleAggregateStep constructor
*/
TupleAggregateStep(const rowgroup::SP_ROWAGG_UM_t&, const rowgroup::RowGroup&, const rowgroup::RowGroup&,
const JobInfo&);
/** @brief TupleAggregateStep destructor
*/
~TupleAggregateStep();
/** @brief virtual void Run method
*/
void run();
void join();
const std::string toString() const;
void setOutputRowGroup(const rowgroup::RowGroup&);
const rowgroup::RowGroup& getOutputRowGroup() const;
const rowgroup::RowGroup& getDeliveredRowGroup() const;
void deliverStringTableRowGroup(bool b);
bool deliverStringTableRowGroup() const;
uint32_t nextBand(messageqcpp::ByteStream& bs);
uint32_t nextBand_singleThread(messageqcpp::ByteStream& bs);
bool setPmHJAggregation(JobStep* step);
void savePmHJData(rowgroup::SP_ROWAGG_t&, rowgroup::SP_ROWAGG_t&, rowgroup::RowGroup&);
bool umOnly() const
{
return fUmOnly;
}
void umOnly(bool b)
{
fUmOnly = b;
}
void configDeliveredRowGroup(const JobInfo&);
// void setEidMap(std::map<int, int>& m) { fIndexEidMap = m; }
static SJSTEP prepAggregate(SJSTEP&, JobInfo&);
// for multi-thread variables
void initializeMultiThread();
private:
static void prep1PhaseDistinctAggregate(JobInfo&, std::vector<rowgroup::RowGroup>&,
std::vector<rowgroup::SP_ROWAGG_t>&);
static void prep1PhaseAggregate(JobInfo&, std::vector<rowgroup::RowGroup>&,
std::vector<rowgroup::SP_ROWAGG_t>&);
static void prep2PhasesAggregate(JobInfo&, std::vector<rowgroup::RowGroup>&,
std::vector<rowgroup::SP_ROWAGG_t>&);
static void prep2PhasesDistinctAggregate(JobInfo&, std::vector<rowgroup::RowGroup>&,
std::vector<rowgroup::SP_ROWAGG_t>&);
void prepExpressionOnAggregate(rowgroup::SP_ROWAGG_UM_t&, JobInfo&);
void addConstangAggregate(std::vector<rowgroup::ConstantAggData>&);
void doAggregate();
void doAggregate_singleThread();
uint64_t doThreadedAggregate(messageqcpp::ByteStream& bs, RowGroupDL* dlp);
void aggregateRowGroups();
void threadedAggregateRowGroups(uint32_t threadID);
void threadedAggregateFinalize(uint32_t threadID);
void doThreadedSecondPhaseAggregate(uint32_t threadID);
bool nextDeliveredRowGroup();
void pruneAuxColumns();
void formatMiniStats();
void printCalTrace();
boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog;
uint64_t fRowsReturned;
bool fDoneAggregate;
bool fEndOfResult;
rowgroup::SP_ROWAGG_UM_t fAggregator;
rowgroup::RowGroup fRowGroupOut;
rowgroup::RowGroup fRowGroupDelivered;
rowgroup::RGData fRowGroupData;
// for setting aggregate column eid in delivered rowgroup
// std::map<int, int> fIndexEidMap;
// data from RowGroupDL
rowgroup::RowGroup fRowGroupIn;
// for PM HashJoin
// PM hashjoin is selected at runtime, prepare for it anyway.
rowgroup::SP_ROWAGG_UM_t fAggregatorUM;
rowgroup::SP_ROWAGG_PM_t fAggregatorPM;
rowgroup::RowGroup fRowGroupPMHJ;
// for run thread (first added for union)
class Aggregator
{
public:
Aggregator(TupleAggregateStep* step) : fStep(step)
{
}
void operator()()
{
utils::setThreadName("TASAggr");
fStep->doAggregate();
}
TupleAggregateStep* fStep;
};
class ThreadedAggregator
{
public:
ThreadedAggregator(TupleAggregateStep* step, uint32_t threadID) : fStep(step), fThreadID(threadID)
{
}
void operator()()
{
std::string t{"TASThrAggr"};
t.append(std::to_string(fThreadID));
utils::setThreadName(t.c_str());
fStep->threadedAggregateRowGroups(fThreadID);
}
TupleAggregateStep* fStep;
uint32_t fThreadID;
};
class ThreadedAggregateFinalizer
{
public:
ThreadedAggregateFinalizer(TupleAggregateStep* step, uint32_t threadID) : fStep(step), fThreadID(threadID)
{
}
void operator()()
{
std::string t{"TASThrFin"};
t.append(std::to_string(fThreadID));
utils::setThreadName(t.c_str());
fStep->threadedAggregateFinalize(fThreadID);
}
TupleAggregateStep* fStep;
uint32_t fThreadID;
};
class ThreadedSecondPhaseAggregator
{
public:
ThreadedSecondPhaseAggregator(TupleAggregateStep* step, uint32_t threadID, uint32_t bucketsPerThread)
: fStep(step), fThreadID(threadID), bucketCount(bucketsPerThread)
{
}
void operator()()
{
utils::setThreadName("TASThr2ndPAggr");
for (uint32_t i = 0; i < bucketCount; i++)
fStep->doThreadedSecondPhaseAggregate(fThreadID + i);
}
TupleAggregateStep* fStep;
uint32_t fThreadID;
uint32_t bucketCount;
};
uint64_t fRunner; // thread pool handle
bool fUmOnly;
ResourceManager* fRm;
// multi-threaded
uint32_t fNumOfThreads;
uint32_t fNumOfBuckets;
uint32_t fNumOfRowGroups;
uint32_t fBucketNum;
boost::mutex fMutex;
std::vector<boost::mutex*> fAgg_mutex;
std::vector<rowgroup::RGData> fRowGroupDatas;
std::vector<rowgroup::SP_ROWAGG_UM_t> fAggregators;
std::vector<rowgroup::RowGroup> fRowGroupIns;
std::vector<rowgroup::RowGroup> fRowGroupOuts;
std::vector<std::vector<rowgroup::RGData> > fRowGroupsDeliveredData;
bool fIsMultiThread;
int fInputIter; // iterator
boost::scoped_array<uint64_t> fMemUsage;
boost::shared_ptr<int64_t> fSessionMemLimit;
PrimitiveServerThreadPools fPrimitiveServerThreadPools;
};
} // namespace joblist