1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-columnstore-engine/dbcon/joblist/batchprimitiveprocessor-jl.h
drrtuy 8ae5a3da40
Fix/mcol 5787 rgdata buffer max size dev (#3325)
* fix(rowgroup): RGData now uses uint64_t counter for the fixed sizes columns data buf.
	The buffer can utilize > 4GB RAM that is necessary for PM side join.
	RGData ctor uses uint32_t allocating data buffer.
 	This fact causes implicit heap overflow.

* feat(bytestream,serdes): BS buffer size type is uint64_t
	This necessary to handle 64bit RGData, that comes as
	a separate patch. The pair of patches would allow to
	have PM joins when SmallSide size > 4GB.

* feat(bytestream,serdes): Distribute BS buf size data type change to avoid implicit data type narrowing

* feat(rowgroup): this returns bits lost during cherry-pick. The bits lost caused the first RGData::serialize to crash a process
2024-11-09 19:44:02 +00:00

393 lines
12 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: batchprimitiveprocessor-jl.h 9705 2013-07-17 20:06:07Z pleblanc $
// C++ Interface: batchprimitiveprocessor
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
/** @file */
#pragma once
#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/uuid/uuid.hpp>
#include "mcs_basic_types.h"
#include "primitivemsg.h"
#include "serializeable.h"
#include "primitivestep.h"
#include "brm.h"
#include "command-jl.h"
#include "resourcemanager.h"
//#include "tableband.h"
namespace joblist
{
const uint32_t LOGICAL_BLOCKS_CONVERTER = 23; // 10 + 13. 13 to convert to logical blocks,
// 10 to convert to groups of 1024 logical blocks
const uint8_t CP_FLAG_AND_LBID = 9; // # bytes used for CP boolean and the lbid
// used by BatchPrimitiveProcessorJL::countThisMsg()
// forward reference
struct JobInfo;
class BatchPrimitiveProcessorJL
{
public:
/* Constructor used by the JobStep */
explicit BatchPrimitiveProcessorJL(const ResourceManager* rm);
~BatchPrimitiveProcessorJL();
/* Interface used by the JobStep */
/* Some accessors */
inline bool hasScan()
{
return _hasScan;
}
/* For initializing the object */
inline void setSessionID(uint32_t num)
{
sessionID = num;
}
inline void setStepID(uint32_t num)
{
stepID = num;
}
inline void setUniqueID(uint32_t id)
{
uniqueID = id;
}
inline void setQueryContext(const BRM::QueryContext& qc)
{
versionInfo = qc;
}
inline void setTxnID(uint32_t num)
{
txnID = num;
}
inline void setOutputType(BPSOutputType o)
{
ot = o;
if (ot == TUPLE || ot == ROW_GROUP)
needRidsAtDelivery = true;
}
inline void setNeedRidsAtDelivery(bool b)
{
needRidsAtDelivery = b;
}
inline void setTraceFlags(uint32_t flags)
{
LBIDTrace = ((flags & execplan::CalpontSelectExecutionPlan::TRACE_LBIDS) != 0);
}
inline uint32_t getRidCount()
{
return ridCount;
}
inline void setThreadCount(uint32_t tc)
{
threadCount = tc;
}
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID,
bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux,
execplan::CalpontSystemCatalog::OID oidAux);
void addFilterStep(const PseudoColStep&);
void addFilterStep(const pColStep&);
void addFilterStep(const pDictionaryStep&);
void addFilterStep(const FilterStep&);
void addProjectStep(const PseudoColStep&);
void addProjectStep(const pColStep&);
void addProjectStep(const PassThruStep&);
void addProjectStep(const pColStep&, const pDictionaryStep&);
void addProjectStep(const PassThruStep&, const pDictionaryStep&);
void createBPP(messageqcpp::ByteStream&) const;
void destroyBPP(messageqcpp::ByteStream&) const;
/* Call this one last */
// void addDeliveryStep(const DeliveryStep &);
/* At runtime, feed input here */
void addElementType(const ElementType&, uint32_t dbroot);
void addElementType(const StringElementType&, uint32_t dbroot);
// void setRowGroupData(const rowgroup::RowGroup &);
void runBPP(messageqcpp::ByteStream&, uint32_t pmNum, bool isExeMgrDEC);
void abortProcessing(messageqcpp::ByteStream*);
/* After serializing a BPP object, reset it and it's ready for more input */
void reset();
/* The JobStep calls these to initialize a BPP that starts with a column scan */
void setLBID(uint64_t lbid, const BRM::EMEntry& scannedExtent);
inline void setCount(uint16_t c)
{
idbassert(c > 0);
count = c;
}
/* Turn a ByteStream into ElementTypes or StringElementTypes */
void getElementTypes(messageqcpp::ByteStream& in, std::vector<ElementType>* out, bool* validCPData,
uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const;
void getStringElementTypes(messageqcpp::ByteStream& in, std::vector<StringElementType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const;
/* (returns the row count) */
// uint32_t getTableBand(messageqcpp::ByteStream &in, messageqcpp::ByteStream *out,
// bool *validCPData, uint64_t *lbid, int64_t *min, int64_t *max,
// uint32_t *cachedIO, uint32_t *physIO, uint32_t *touchedBlocks) const;
void getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out, bool* validCPData, uint64_t* lbid,
int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const;
void deserializeAggregateResults(messageqcpp::ByteStream* in, std::vector<rowgroup::RGData>* out) const;
void getRowGroupData(messageqcpp::ByteStream& in, std::vector<rowgroup::RGData>* out, bool* validCPData,
uint64_t* lbid, bool* fromDictScan, int128_t* min, int128_t* max, uint32_t* cachedIO,
uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const;
void deserializeAggregateResult(messageqcpp::ByteStream* in, std::vector<rowgroup::RGData>* out) const;
bool countThisMsg(messageqcpp::ByteStream& in) const;
void setStatus(uint16_t s)
{
status = s;
}
uint16_t getStatus() const
{
return status;
}
void runErrorBPP(messageqcpp::ByteStream&);
rowgroup::RGData getErrorRowGroupData(uint16_t error) const;
// @bug 1098
std::vector<SCommand>& getFilterSteps()
{
return filterSteps;
}
std::vector<SCommand>& getProjectSteps()
{
return projectSteps;
}
std::string toString() const;
/* RowGroup additions */
void setProjectionRowGroup(const rowgroup::RowGroup& rg);
void setInputRowGroup(const rowgroup::RowGroup& rg);
/* Aggregation */
void addAggregateStep(const rowgroup::SP_ROWAGG_PM_t&, const rowgroup::RowGroup&);
void setJoinedRowGroup(const rowgroup::RowGroup& rg);
/* Tuple hashjoin */
void useJoiners(const std::vector<std::shared_ptr<joiner::TupleJoiner> >&);
bool nextTupleJoinerMsg(messageqcpp::ByteStream&);
// void setSmallSideKeyColumn(uint32_t col);
/* OR hacks */
void setBOP(uint32_t op); // BOP_AND or BOP_OR, default is BOP_AND
void setForHJ(bool b); // default is false
/* self-join */
void jobInfo(const JobInfo* jobInfo)
{
fJobInfo = jobInfo;
}
/* Function & Expression support */
void setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper>, const rowgroup::RowGroup& input);
void setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper>, const rowgroup::RowGroup& output);
void setJoinFERG(const rowgroup::RowGroup& rg);
/* This fcn determines whether or not the containing TBPS obj will process results
from a join or put the RG data right in the output datalist. */
bool pmSendsFinalResult() const
{
// isn't aware of UM-only joins. Function name is a bit misleading.
return (tJoiners.size() == 0 || (PMJoinerCount > 0 && (fe2 || aggregatorPM)));
}
const std::string toMiniString() const;
void priority(uint32_t p)
{
_priority = p;
};
uint32_t priority()
{
return _priority;
}
void deliverStringTableRowGroup(bool b);
void setUuid(const boost::uuids::uuid& u)
{
uuid = u;
}
void setMaxPmJoinResultCount(uint32_t count)
{
maxPmJoinResultCount = count;
}
private:
const size_t perColumnProjectWeight_ = 10;
const size_t perColumnFilteringWeight_ = 10;
const size_t fe1Weight_ = 10;
const size_t fe2Weight_ = 10;
const size_t joinWeight_ = 500;
const size_t aggregationWeight_ = 500;
// This is simple SQL operations-based model leveraged by
// FairThreadPool run by PP facility.
// Every operation mentioned in this calculation spends
// some CPU so the morsel uses this op weights more.
uint32_t calculateBPPWeight() const
{
uint32_t weight = perColumnProjectWeight_ * projectCount;
weight += filterCount * perColumnFilteringWeight_;
weight += tJoiners.size() * joinWeight_;
weight += (aggregatorPM) ? aggregationWeight_ : 0;
weight += (fe1) ? fe1Weight_ : 0;
weight += (fe2) ? fe2Weight_ : 0;
return weight;
}
BPSOutputType ot;
bool needToSetLBID;
BRM::QueryContext versionInfo;
uint32_t txnID;
uint32_t sessionID;
uint32_t stepID;
uint32_t uniqueID;
// # of times to loop over the command arrays
// ... This is 1, except when the first command is a scan, in which case
// this single BPP object produces count responses.
uint16_t count;
/* XXXPAT: tradeoff here. Memory wasted by statically allocating all of these
arrays on the UM (most aren't used) vs more dynamic allocation
on the PM */
uint64_t baseRid; // first abs RID of the logical block
uint16_t relRids[LOGICAL_BLOCK_RIDS];
boost::scoped_array<uint64_t> absRids;
// TODO MCOL-641 Do we need uint128_t buffers here?
// When would sendValues=true, in which case values[]
// is sent to primproc?
uint64_t values[LOGICAL_BLOCK_RIDS];
uint16_t ridCount;
bool needStrValues;
uint16_t wideColumnsWidths;
std::vector<SCommand> filterSteps;
std::vector<SCommand> projectSteps;
//@bug 1136
uint16_t filterCount;
uint16_t projectCount;
bool needRidsAtDelivery;
uint16_t ridMap;
// TableBand templateTB;
uint32_t tableOID;
boost::scoped_array<int> tablePositions;
uint32_t tableColumnCount;
bool sendValues;
bool sendAbsRids;
bool _hasScan;
bool LBIDTrace;
/* for tuple return type */
std::vector<uint16_t> colWidths;
uint32_t tupleLength;
// uint32_t rowCounter; // for debugging
// uint32_t rowsProcessed;
uint16_t status;
/* for Joiner serialization */
bool pickNextJoinerNum();
uint32_t pos, joinerNum;
boost::shared_ptr<std::vector<ElementType> > smallSide;
boost::scoped_array<uint32_t> posByJoinerNum;
/* for RowGroup return type */
rowgroup::RowGroup inputRG, projectionRG;
uint32_t valueColumn;
/* for PM Aggregation */
rowgroup::RowGroup joinedRG;
rowgroup::SP_ROWAGG_PM_t aggregatorPM;
rowgroup::RowGroup aggregateRGPM;
/* UM portion of the PM join alg */
std::vector<std::shared_ptr<joiner::TupleJoiner> > tJoiners;
std::vector<rowgroup::RowGroup> smallSideRGs;
rowgroup::RowGroup largeSideRG;
std::vector<std::vector<uint32_t> > smallSideKeys;
boost::scoped_array<uint32_t> tlKeyLens;
bool sendTupleJoinRowGroupData;
uint32_t PMJoinerCount;
/* OR hack */
uint8_t bop; // BOP_AND or BOP_OR
bool forHJ; // indicate if feeding a hashjoin, doJoin does not cover smallside
/* Self-join */
const JobInfo* fJobInfo;
/* Functions & Expressions support */
boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2;
rowgroup::RowGroup fe1Input, fe2Output;
rowgroup::RowGroup joinFERG;
mutable boost::scoped_array<rowgroup::RowGroup>
primprocRG; // the format of the data received from PrimProc
uint32_t threadCount;
unsigned fJoinerChunkSize;
uint32_t dbRoot;
bool hasSmallOuterJoin;
uint32_t maxPmJoinResultCount = 1048576;
uint32_t _priority;
boost::uuids::uuid uuid;
friend class CommandJL;
friend class ColumnCommandJL;
friend class PassThruCommandJL;
};
} // namespace joblist