/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // // $Id: batchprimitiveprocessor-jl.h 9705 2013-07-17 20:06:07Z pleblanc $ // C++ Interface: batchprimitiveprocessor // // Description: // // // Author: Patrick LeBlanc , (C) 2008 // // Copyright: See COPYING file that comes with this distribution // // /** @file */ #pragma once #include #include #include #include #include "mcs_basic_types.h" #include "primitivemsg.h" #include "serializeable.h" #include "primitivestep.h" #include "brm.h" #include "command-jl.h" #include "resourcemanager.h" //#include "tableband.h" namespace joblist { const uint32_t LOGICAL_BLOCKS_CONVERTER = 23; // 10 + 13. 13 to convert to logical blocks, // 10 to convert to groups of 1024 logical blocks const uint8_t CP_FLAG_AND_LBID = 9; // # bytes used for CP boolean and the lbid // used by BatchPrimitiveProcessorJL::countThisMsg() // forward reference struct JobInfo; class BatchPrimitiveProcessorJL { public: /* Constructor used by the JobStep */ explicit BatchPrimitiveProcessorJL(const ResourceManager* rm); ~BatchPrimitiveProcessorJL(); /* Interface used by the JobStep */ /* Some accessors */ inline bool hasScan() { return _hasScan; } /* For initializing the object */ inline void setSessionID(uint32_t num) { sessionID = num; } inline void setStepID(uint32_t num) { stepID = num; } inline void setUniqueID(uint32_t id) { uniqueID = id; } inline void setQueryContext(const BRM::QueryContext& qc) { versionInfo = qc; } inline void setTxnID(uint32_t num) { txnID = num; } inline void setOutputType(BPSOutputType o) { ot = o; if (ot == TUPLE || ot == ROW_GROUP) needRidsAtDelivery = true; } inline void setNeedRidsAtDelivery(bool b) { needRidsAtDelivery = b; } inline void setTraceFlags(uint32_t flags) { LBIDTrace = ((flags & execplan::CalpontSelectExecutionPlan::TRACE_LBIDS) != 0); } inline uint32_t getRidCount() { return ridCount; } inline void setThreadCount(uint32_t tc) { threadCount = tc; } void addFilterStep(const pColScanStep&, std::vector lastScannedLBID, bool hasAuxCol, const std::vector& extentsAux, execplan::CalpontSystemCatalog::OID oidAux); void addFilterStep(const PseudoColStep&); void addFilterStep(const pColStep&); void addFilterStep(const pDictionaryStep&); void addFilterStep(const FilterStep&); void addProjectStep(const PseudoColStep&); void addProjectStep(const pColStep&); void addProjectStep(const PassThruStep&); void addProjectStep(const pColStep&, const pDictionaryStep&); void addProjectStep(const PassThruStep&, const pDictionaryStep&); void createBPP(messageqcpp::ByteStream&) const; void destroyBPP(messageqcpp::ByteStream&) const; /* Call this one last */ // void addDeliveryStep(const DeliveryStep &); /* At runtime, feed input here */ void addElementType(const ElementType&, uint32_t dbroot); void addElementType(const StringElementType&, uint32_t dbroot); // void setRowGroupData(const rowgroup::RowGroup &); void runBPP(messageqcpp::ByteStream&, uint32_t pmNum, bool isExeMgrDEC); void abortProcessing(messageqcpp::ByteStream*); /* After serializing a BPP object, reset it and it's ready for more input */ void reset(); /* The JobStep calls these to initialize a BPP that starts with a column scan */ void setLBID(uint64_t lbid, const BRM::EMEntry& scannedExtent); inline void setCount(uint16_t c) { idbassert(c > 0); count = c; } /* Turn a ByteStream into ElementTypes or StringElementTypes */ void getElementTypes(messageqcpp::ByteStream& in, std::vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const; void getStringElementTypes(messageqcpp::ByteStream& in, std::vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const; /* (returns the row count) */ // uint32_t getTableBand(messageqcpp::ByteStream &in, messageqcpp::ByteStream *out, // bool *validCPData, uint64_t *lbid, int64_t *min, int64_t *max, // uint32_t *cachedIO, uint32_t *physIO, uint32_t *touchedBlocks) const; void getTuples(messageqcpp::ByteStream& in, std::vector* out, bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const; void deserializeAggregateResults(messageqcpp::ByteStream* in, std::vector* out) const; void getRowGroupData(messageqcpp::ByteStream& in, std::vector* out, bool* validCPData, uint64_t* lbid, bool* fromDictScan, int128_t* min, int128_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis, uint32_t threadID, bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const; void deserializeAggregateResult(messageqcpp::ByteStream* in, std::vector* out) const; bool countThisMsg(messageqcpp::ByteStream& in) const; void setStatus(uint16_t s) { status = s; } uint16_t getStatus() const { return status; } void runErrorBPP(messageqcpp::ByteStream&); rowgroup::RGData getErrorRowGroupData(uint16_t error) const; // @bug 1098 std::vector& getFilterSteps() { return filterSteps; } std::vector& getProjectSteps() { return projectSteps; } std::string toString() const; /* RowGroup additions */ void setProjectionRowGroup(const rowgroup::RowGroup& rg); void setInputRowGroup(const rowgroup::RowGroup& rg); /* Aggregation */ void addAggregateStep(const rowgroup::SP_ROWAGG_PM_t&, const rowgroup::RowGroup&); void setJoinedRowGroup(const rowgroup::RowGroup& rg); /* Tuple hashjoin */ void useJoiners(const std::vector >&); bool nextTupleJoinerMsg(messageqcpp::ByteStream&); // void setSmallSideKeyColumn(uint32_t col); /* OR hacks */ void setBOP(uint32_t op); // BOP_AND or BOP_OR, default is BOP_AND void setForHJ(bool b); // default is false /* self-join */ void jobInfo(const JobInfo* jobInfo) { fJobInfo = jobInfo; } /* Function & Expression support */ void setFEGroup1(boost::shared_ptr, const rowgroup::RowGroup& input); void setFEGroup2(boost::shared_ptr, const rowgroup::RowGroup& output); void setJoinFERG(const rowgroup::RowGroup& rg); /* This fcn determines whether or not the containing TBPS obj will process results from a join or put the RG data right in the output datalist. */ bool pmSendsFinalResult() const { // isn't aware of UM-only joins. Function name is a bit misleading. return (tJoiners.size() == 0 || (PMJoinerCount > 0 && (fe2 || aggregatorPM))); } const std::string toMiniString() const; void priority(uint32_t p) { _priority = p; }; uint32_t priority() { return _priority; } void deliverStringTableRowGroup(bool b); void setUuid(const boost::uuids::uuid& u) { uuid = u; } void setMaxPmJoinResultCount(uint32_t count) { maxPmJoinResultCount = count; } private: const size_t perColumnProjectWeight_ = 10; const size_t perColumnFilteringWeight_ = 10; const size_t fe1Weight_ = 10; const size_t fe2Weight_ = 10; const size_t joinWeight_ = 500; const size_t aggregationWeight_ = 500; // This is simple SQL operations-based model leveraged by // FairThreadPool run by PP facility. // Every operation mentioned in this calculation spends // some CPU so the morsel uses this op weights more. uint32_t calculateBPPWeight() const { uint32_t weight = perColumnProjectWeight_ * projectCount; weight += filterCount * perColumnFilteringWeight_; weight += tJoiners.size() * joinWeight_; weight += (aggregatorPM) ? aggregationWeight_ : 0; weight += (fe1) ? fe1Weight_ : 0; weight += (fe2) ? fe2Weight_ : 0; return weight; } BPSOutputType ot; bool needToSetLBID; BRM::QueryContext versionInfo; uint32_t txnID; uint32_t sessionID; uint32_t stepID; uint32_t uniqueID; // # of times to loop over the command arrays // ... This is 1, except when the first command is a scan, in which case // this single BPP object produces count responses. uint16_t count; /* XXXPAT: tradeoff here. Memory wasted by statically allocating all of these arrays on the UM (most aren't used) vs more dynamic allocation on the PM */ uint64_t baseRid; // first abs RID of the logical block uint16_t relRids[LOGICAL_BLOCK_RIDS]; boost::scoped_array absRids; // TODO MCOL-641 Do we need uint128_t buffers here? // When would sendValues=true, in which case values[] // is sent to primproc? uint64_t values[LOGICAL_BLOCK_RIDS]; uint16_t ridCount; bool needStrValues; uint16_t wideColumnsWidths; std::vector filterSteps; std::vector projectSteps; //@bug 1136 uint16_t filterCount; uint16_t projectCount; bool needRidsAtDelivery; uint16_t ridMap; // TableBand templateTB; uint32_t tableOID; boost::scoped_array tablePositions; uint32_t tableColumnCount; bool sendValues; bool sendAbsRids; bool _hasScan; bool LBIDTrace; /* for tuple return type */ std::vector colWidths; uint32_t tupleLength; // uint32_t rowCounter; // for debugging // uint32_t rowsProcessed; uint16_t status; /* for Joiner serialization */ bool pickNextJoinerNum(); uint32_t pos, joinerNum; boost::shared_ptr > smallSide; boost::scoped_array posByJoinerNum; /* for RowGroup return type */ rowgroup::RowGroup inputRG, projectionRG; uint32_t valueColumn; /* for PM Aggregation */ rowgroup::RowGroup joinedRG; rowgroup::SP_ROWAGG_PM_t aggregatorPM; rowgroup::RowGroup aggregateRGPM; /* UM portion of the PM join alg */ std::vector > tJoiners; std::vector smallSideRGs; rowgroup::RowGroup largeSideRG; std::vector > smallSideKeys; boost::scoped_array tlKeyLens; bool sendTupleJoinRowGroupData; uint32_t PMJoinerCount; /* OR hack */ uint8_t bop; // BOP_AND or BOP_OR bool forHJ; // indicate if feeding a hashjoin, doJoin does not cover smallside /* Self-join */ const JobInfo* fJobInfo; /* Functions & Expressions support */ boost::shared_ptr fe1, fe2; rowgroup::RowGroup fe1Input, fe2Output; rowgroup::RowGroup joinFERG; mutable boost::scoped_array primprocRG; // the format of the data received from PrimProc uint32_t threadCount; unsigned fJoinerChunkSize; uint32_t dbRoot; bool hasSmallOuterJoin; uint32_t maxPmJoinResultCount = 1048576; uint32_t _priority; boost::uuids::uuid uuid; friend class CommandJL; friend class ColumnCommandJL; friend class PassThruCommandJL; }; } // namespace joblist