1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/tuplehashjoin.h
2023-07-11 14:05:14 +03:00

657 lines
17 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tuplehashjoin.h 9655 2013-06-25 23:08:13Z xlou $
#pragma once
#include "jobstep.h"
#include "calpontsystemcatalog.h"
#include "hasher.h"
#include "tuplejoiner.h"
#include "threadnaming.h"
#include <boost/shared_ptr.hpp>
#include <map>
#include <string>
#include <vector>
#include <utility>
#include "resourcemanager.h"
#include "exceptclasses.h"
namespace joblist
{
class BatchPrimitive;
class TupleBPS;
struct FunctionJoinInfo;
class DiskJoinStep;
class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
{
public:
/**
* @param
*/
TupleHashJoinStep(const JobInfo& jobInfo);
virtual ~TupleHashJoinStep();
void setLargeSideBPS(BatchPrimitive*);
void setLargeSideStepsOut(const std::vector<SJSTEP>& largeSideSteps);
void setSmallSideStepsOut(const std::vector<SJSTEP>& smallSideSteps);
/* mandatory JobStep interface */
void run();
void join();
const std::string toString() const;
/* These tableOID accessors can go away soon */
execplan::CalpontSystemCatalog::OID tableOid() const
{
return fTableOID2;
}
execplan::CalpontSystemCatalog::OID tableOid1() const
{
return fTableOID1;
}
execplan::CalpontSystemCatalog::OID tableOid2() const
{
return fTableOID2;
}
void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1)
{
fTableOID1 = tableOid1;
if (fTableOID1 < 3000)
numCores = 1; // syscat query, no need for more than 1 thread
}
void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2)
{
fTableOID2 = tableOid2;
}
std::string alias1() const
{
return fAlias1;
}
void alias1(const std::string& alias)
{
fAlias1 = alias;
}
std::string alias2() const
{
return fAlias2;
}
void alias2(const std::string& alias)
{
fAlias = fAlias2 = alias;
}
std::string view1() const
{
return fView1;
}
void view1(const std::string& vw)
{
fView1 = vw;
}
std::string view2() const
{
return fView2;
}
void view2(const std::string& vw)
{
fView = fView2 = vw;
}
std::string schema1() const
{
return fSchema1;
}
void schema1(const std::string& s)
{
fSchema1 = s;
}
std::string schema2() const
{
return fSchema2;
}
void schema2(const std::string& s)
{
fSchema = fSchema2 = s;
}
int32_t sequence1() const
{
return fSequence1;
}
void sequence1(int32_t seq)
{
fSequence1 = seq;
}
int32_t sequence2() const
{
return fSequence2;
}
void sequence2(int32_t seq)
{
fSequence2 = seq;
}
const execplan::ReturnedColumn* column1() const
{
return fColumn1;
}
void column1(const execplan::ReturnedColumn* pos)
{
fColumn1 = pos;
}
const execplan::ReturnedColumn* column2() const
{
return fColumn2;
}
void column2(const execplan::ReturnedColumn* pos)
{
fColumn2 = pos;
}
int correlatedSide() const
{
return fCorrelatedSide;
}
void correlatedSide(int c)
{
fCorrelatedSide = c;
}
using JobStep::tupleId;
uint64_t tupleId() const
{
return fTupleId2;
}
uint64_t tupleId1() const
{
return fTupleId1;
}
uint64_t tupleId2() const
{
return fTupleId2;
}
void tupleId1(uint64_t id)
{
fTupleId1 = id;
}
void tupleId2(uint64_t id)
{
fTupleId2 = id;
}
void addSmallSideRG(const std::vector<rowgroup::RowGroup>& rgs, const std::vector<std::string>& tableNames);
void addJoinKeyIndex(const std::vector<JoinType>& jt, const std::vector<bool>& typeless,
const std::vector<std::vector<uint32_t> >& smallkeys,
const std::vector<std::vector<uint32_t> >& largekeys);
void configSmallSideRG(const std::vector<rowgroup::RowGroup>& rgs,
const std::vector<std::string>& tableNames);
void configLargeSideRG(const rowgroup::RowGroup& rg);
void configJoinKeyIndex(const std::vector<JoinType>& jt, const std::vector<bool>& typeless,
const std::vector<std::vector<uint32_t> >& smallkeys,
const std::vector<std::vector<uint32_t> >& largekeys);
void setOutputRowGroup(const rowgroup::RowGroup& rg);
uint32_t nextBand(messageqcpp::ByteStream& bs);
const rowgroup::RowGroup& getOutputRowGroup() const
{
return outputRG;
}
const rowgroup::RowGroup& getSmallRowGroup() const
{
return smallRGs[0];
}
const std::vector<rowgroup::RowGroup>& getSmallRowGroups() const
{
return smallRGs;
}
const rowgroup::RowGroup& getLargeRowGroup() const
{
return largeRG;
}
uint32_t getSmallKey() const
{
return smallSideKeys[0][0];
}
const std::vector<std::vector<uint32_t> >& getSmallKeys() const
{
return smallSideKeys;
}
const std::vector<std::vector<uint32_t> >& getLargeKeys() const
{
return largeSideKeys;
}
/* Some compat fcns to get rid of later */
void oid1(execplan::CalpontSystemCatalog::OID oid)
{
fOid1 = oid;
}
execplan::CalpontSystemCatalog::OID oid1() const
{
return fOid1;
}
void oid2(execplan::CalpontSystemCatalog::OID oid)
{
fOid2 = oid;
}
execplan::CalpontSystemCatalog::OID oid2() const
{
return fOid2;
}
void dictOid1(execplan::CalpontSystemCatalog::OID oid)
{
fDictOid1 = oid;
}
execplan::CalpontSystemCatalog::OID dictOid1() const
{
return fDictOid1;
}
void dictOid2(execplan::CalpontSystemCatalog::OID oid)
{
fDictOid2 = oid;
}
execplan::CalpontSystemCatalog::OID dictOid2() const
{
return fDictOid2;
}
/* The replacements. I don't think there's a need for setters or vars.
OIDs are already in the rowgroups. */
// s - sth table pair; k - kth key in compound join, 0 for non-compand join
execplan::CalpontSystemCatalog::OID smallSideKeyOID(uint32_t s, uint32_t k) const;
execplan::CalpontSystemCatalog::OID largeSideKeyOID(uint32_t s, uint32_t k) const;
void deliveryStep(const SJSTEP& ds)
{
fDeliveryStep = ds;
}
/* Iteration 18 mods */
void setLargeSideDLIndex(uint32_t i)
{
largeSideIndex = i;
}
/* obsolete, need to update JLF */
void setJoinType(JoinType jt)
{
joinType = jt;
}
JoinType getJoinType() const
{
return joinType;
}
/* Functions & Expressions interface */
/* Cross-table Functions & Expressions in where clause */
void addFcnExpGroup2(const boost::shared_ptr<execplan::ParseTree>& fe);
bool hasFcnExpGroup2()
{
return (fe2 != NULL);
}
/* Functions & Expressions in select and groupby clause */
void setFcnExpGroup3(const std::vector<execplan::SRCP>& fe);
void setFE23Output(const rowgroup::RowGroup& rg);
/* result rowgroup */
const rowgroup::RowGroup& getDeliveredRowGroup() const;
void deliverStringTableRowGroup(bool b);
bool deliverStringTableRowGroup() const;
// joinId
void joinId(int64_t id)
{
fJoinId = id;
}
int64_t joinId() const
{
return fJoinId;
}
/* semi-join support */
void addJoinFilter(boost::shared_ptr<execplan::ParseTree>, uint32_t index);
bool hasJoinFilter() const
{
return (fe.size() > 0);
}
bool hasJoinFilter(uint32_t index) const;
boost::shared_ptr<funcexp::FuncExpWrapper> getJoinFilter(uint32_t index) const;
void setJoinFilterInputRG(const rowgroup::RowGroup& rg);
virtual bool stringTableFriendly()
{
return true;
}
uint32_t tokenJoin() const
{
return fTokenJoin;
}
void tokenJoin(uint32_t k)
{
fTokenJoin = k;
}
//@bug3683 function join
boost::shared_ptr<FunctionJoinInfo>& funcJoinInfo()
{
return fFunctionJoinInfo;
}
void funcJoinInfo(const boost::shared_ptr<FunctionJoinInfo>& fji)
{
fFunctionJoinInfo = fji;
}
void abort();
void returnMemory()
{
if (fMemSizeForOutputRG > 0)
{
resourceManager->returnMemory(fMemSizeForOutputRG);
fMemSizeForOutputRG = 0;
}
}
bool getMemory(uint64_t memSize)
{
bool gotMem = resourceManager->getMemory(memSize);
if (gotMem)
fMemSizeForOutputRG += memSize;
return gotMem;
}
private:
TupleHashJoinStep();
TupleHashJoinStep(const TupleHashJoinStep&);
TupleHashJoinStep& operator=(const TupleHashJoinStep&);
void errorLogging(const std::string& msg, int err) const;
void startAdjoiningSteps();
void formatMiniStats(uint32_t index);
RowGroupDL *largeDL, *outputDL;
std::vector<RowGroupDL*> smallDLs;
std::vector<uint32_t> smallIts;
uint32_t largeIt;
JoinType joinType; // deprecated
std::vector<JoinType> joinTypes;
execplan::CalpontSystemCatalog::OID fTableOID1;
execplan::CalpontSystemCatalog::OID fTableOID2;
execplan::CalpontSystemCatalog::OID fOid1;
execplan::CalpontSystemCatalog::OID fOid2;
// v-table string join
execplan::CalpontSystemCatalog::OID fDictOid1;
execplan::CalpontSystemCatalog::OID fDictOid2;
std::string fAlias1;
std::string fAlias2;
std::string fView1;
std::string fView2;
std::string fSchema1;
std::string fSchema2;
int32_t fSequence1;
int32_t fSequence2;
// @bug3398, add tuple id to steps
uint64_t fTupleId1;
uint64_t fTupleId2;
// @bug3524
// for NOT IN subquery where correlate join in subquery is treated as additional comparison.
// These simple columns are for converting join to expression.
// DON'T delete, they owned by exec plan.
const execplan::ReturnedColumn* fColumn1;
const execplan::ReturnedColumn* fColumn2;
int fCorrelatedSide;
std::vector<bool> typelessJoin; // the size of the vector is # of small side
std::vector<std::vector<uint32_t> > largeSideKeys;
std::vector<std::vector<uint32_t> > smallSideKeys;
ResourceManager* resourceManager;
uint64_t fMemSizeForOutputRG;
struct JoinerSorter
{
inline bool operator()(const std::shared_ptr<joiner::TupleJoiner>& j1,
const std::shared_ptr<joiner::TupleJoiner>& j2) const
{
return *j1 < *j2;
}
};
std::vector<std::shared_ptr<joiner::TupleJoiner> > joiners;
boost::scoped_array<std::vector<rowgroup::RGData> > rgData;
TupleBPS* largeBPS;
rowgroup::RowGroup largeRG, outputRG;
std::vector<rowgroup::RowGroup> smallRGs;
ssize_t pmMemLimit;
void hjRunner();
void smallRunnerFcn(uint32_t index, uint threadID, uint64_t* threads);
struct HJRunner
{
HJRunner(TupleHashJoinStep* hj) : HJ(hj)
{
}
void operator()()
{
utils::setThreadName("HJSBigSide");
HJ->hjRunner();
}
TupleHashJoinStep* HJ;
};
struct SmallRunner
{
SmallRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i)
{
}
void operator()()
{
HJ->startSmallRunners(index);
}
TupleHashJoinStep* HJ;
uint32_t index;
};
int64_t mainRunner; // thread handle from thread pool
// for notify TupleAggregateStep PM hashjoin
// Ideally, hashjoin and delivery communicate with RowGroupDL,
// they don't need to know each other.
// Due to dynamic PM/UM hashjoin selection and support PM aggregation,
// delivery step need to know if raw or partially aggregated to process.
SJSTEP fDeliveryStep;
// temporary hack to make sure JobList only calls run, join once
boost::mutex jlLock;
bool runRan, joinRan;
/* Iteration 18 mods */
uint32_t largeSideIndex;
bool joinIsTooBig;
/* Functions & Expressions support */
boost::shared_ptr<funcexp::FuncExpWrapper> fe2;
std::vector<uint32_t> fe2TableDeps;
rowgroup::RowGroup fe2Output;
bool runFE2onPM;
// Support Mixed Join Type
int64_t fJoinId;
/* Semi-join support */
std::vector<int> feIndexes;
std::vector<boost::shared_ptr<funcexp::FuncExpWrapper> > fe;
rowgroup::RowGroup joinFilterRG;
/* Casual Partitioning forwarding */
void forwardCPData();
uint32_t uniqueLimit;
/* UM Join support. Most of this code is ported from the UM join code in tuple-bps.cpp.
* They should be kept in sync as much as possible. */
struct JoinRunner
{
JoinRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i)
{
}
void operator()()
{
HJ->joinRunnerFcn(index);
}
TupleHashJoinStep* HJ;
uint32_t index;
};
void joinRunnerFcn(uint32_t index);
void startJoinThreads();
void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer> >& joinerOutput,
rowgroup::Row& baseRow,
const std::shared_ptr<std::shared_ptr<int[]>[] >& mappings,
const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
std::vector<rowgroup::RGData>& outputData,
const std::shared_ptr<rowgroup::Row[]>& smallRows, rowgroup::Row& joinedRow,
RowGroupDL* outputDL);
void grabSomeWork(std::vector<rowgroup::RGData>* work);
void sendResult(const std::vector<rowgroup::RGData>& res);
void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow,
rowgroup::Row& outRow, std::vector<rowgroup::RGData>* rgData,
funcexp::FuncExpWrapper* local_fe);
void joinOneRG(uint32_t threadID, std::vector<rowgroup::RGData>& out, rowgroup::RowGroup& inputRG,
rowgroup::RowGroup& joinOutput, rowgroup::Row& largeSideRow, rowgroup::Row& joinFERow,
rowgroup::Row& joinedRow, rowgroup::Row& baseRow,
std::vector<std::vector<rowgroup::Row::Pointer> >& joinMatches,
std::shared_ptr<rowgroup::Row[]>& smallRowTemplates, RowGroupDL* outputDL,
std::vector<std::shared_ptr<joiner::TupleJoiner> >* joiners = NULL,
std::shared_ptr<std::shared_ptr<int[]>[] >* rgMappings = NULL,
std::shared_ptr<std::shared_ptr<int[]>[] >* feMappings = NULL,
boost::scoped_array<boost::scoped_array<uint8_t>>* smallNullMem = NULL);
void finishSmallOuterJoin();
void makeDupList(const rowgroup::RowGroup& rg);
void processDupList(uint32_t threadID, rowgroup::RowGroup& ingrp, std::vector<rowgroup::RGData>* rowData);
std::vector<uint64_t> joinRunners; // thread handles from thread pool
boost::mutex inputDLLock, outputDLLock;
std::shared_ptr<std::shared_ptr<int[]>[]> columnMappings, fergMappings;
std::shared_ptr<int[]> fe2Mapping;
uint32_t joinThreadCount;
boost::scoped_array<boost::scoped_array<uint8_t> > smallNullMemory;
uint64_t outputIt;
bool moreInput;
std::vector<std::pair<uint32_t, uint32_t> > dupList;
boost::scoped_array<rowgroup::Row> dupRows;
std::vector<std::string> smallTableNames;
bool isExeMgr;
uint32_t lastSmallOuterJoiner;
//@bug5958 & 6117, stores the table key for identify token join
uint32_t fTokenJoin;
// moved from base class JobStep
boost::mutex* fStatsMutexPtr;
//@bug3683 function join
boost::shared_ptr<FunctionJoinInfo> fFunctionJoinInfo;
std::set<uint32_t> fFunctionJoinKeys; // for skipping CP forward
/* Disk-based join support */
std::vector<std::shared_ptr<DiskJoinStep>> djs;
boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos;
void djsReaderFcn(int index);
uint64_t djsReader; // thread handle from thread pool
struct DJSReader
{
DJSReader(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i)
{
}
void operator()()
{
utils::setThreadName("DJSReader");
HJ->djsReaderFcn(index);
}
TupleHashJoinStep* HJ;
uint32_t index;
};
uint64_t djsRelay; // thread handle from thread pool
void djsRelayFcn();
struct DJSRelay
{
DJSRelay(TupleHashJoinStep* hj) : HJ(hj)
{
}
void operator()()
{
HJ->djsRelayFcn();
}
TupleHashJoinStep* HJ;
};
boost::shared_ptr<int64_t> djsSmallUsage;
int64_t djsSmallLimit;
int64_t djsLargeLimit;
uint64_t djsPartitionSize;
uint32_t djsMaxPartitionTreeDepth;
bool djsForceRun;
bool isDML;
bool allowDJS;
// hacky mechanism to prevent nextBand from starting before the final
// THJS configuration is settled. Debatable whether to use a bool and poll instead;
// once the config is settled it stays settled, technically no need to
// keep grabbing locks after that.
boost::mutex deliverMutex;
bool ownsOutputDL;
void segregateJoiners();
std::vector<std::shared_ptr<joiner::TupleJoiner> > tbpsJoiners;
std::vector<std::shared_ptr<joiner::TupleJoiner> > djsJoiners;
std::vector<int> djsJoinerMap;
boost::scoped_array<ssize_t> memUsedByEachJoin;
boost::mutex djsLock;
boost::shared_ptr<int64_t> sessionMemLimit;
/* Threaded UM join support */
int numCores;
boost::mutex dlMutex, memTrackMutex, saneErrMsg;
boost::condition memTrackDone;
std::atomic<bool> rgdLock;
bool stopMemTracking;
void trackMem(uint index);
void startSmallRunners(uint index);
friend class DiskJoinStep;
};
} // namespace joblist