You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1782 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1782 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|    Copyright (C) 2019 MariaDB Corporation
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //
 | |
| // $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
 | |
| // C++ Implementation: batchprimitiveprocessor
 | |
| //
 | |
| // Description:
 | |
| //
 | |
| //
 | |
| // Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
 | |
| //
 | |
| // Copyright: See COPYING file that comes with this distribution
 | |
| //
 | |
| //
 | |
| 
 | |
| #include <unistd.h>
 | |
| //#define NDEBUG
 | |
| #include <cassert>
 | |
| #include <stdexcept>
 | |
| #include <iostream>
 | |
| #include <set>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/uuid/uuid.hpp>
 | |
| #include <boost/uuid/uuid_generators.hpp>
 | |
| namespace bu = boost::uuids;
 | |
| 
 | |
| #include "calpontsystemcatalog.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "bpp-jl.h"
 | |
| #include "jlf_common.h"
 | |
| using namespace messageqcpp;
 | |
| using namespace rowgroup;
 | |
| using namespace joiner;
 | |
| 
 | |
| //#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(ResourceManager* rm)
 | |
|  : ot(BPS_ELEMENT_TYPE)
 | |
|  , needToSetLBID(true)
 | |
|  , count(1)
 | |
|  , baseRid(0)
 | |
|  , ridCount(0)
 | |
|  , needStrValues(false)
 | |
|  , wideColumnsWidths(0)
 | |
|  , filterCount(0)
 | |
|  , projectCount(0)
 | |
|  , needRidsAtDelivery(false)
 | |
|  , ridMap(0)
 | |
|  , sendValues(false)
 | |
|  , sendAbsRids(false)
 | |
|  , _hasScan(false)
 | |
|  , LBIDTrace(false)
 | |
|  , tupleLength(0)
 | |
|  , status(0)
 | |
|  , valueColumn(0)
 | |
|  , sendTupleJoinRowGroupData(false)
 | |
|  , bop(BOP_AND)
 | |
|  , forHJ(false)
 | |
|  , threadCount(1)
 | |
|  , fJoinerChunkSize(rm->getJlJoinerChunkSize())
 | |
|  , hasSmallOuterJoin(false)
 | |
|  , _priority(1)
 | |
|  , rm_(rm)
 | |
| {
 | |
|   PMJoinerCount = 0;
 | |
|   uuid = bu::nil_generator()();
 | |
| }
 | |
| 
 | |
| BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
 | |
| {
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
 | |
|                                               vector<BRM::LBID_t> lastScannedLBID,
 | |
|                                               bool hasAuxCol,
 | |
|                                               const std::vector<BRM::EMEntry>& extentsAux,
 | |
|                                               execplan::CalpontSystemCatalog::OID oidAux)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   tableOID = scan.tableOid();
 | |
|   cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setQueryUuid(scan.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   filterSteps.push_back(cc);
 | |
|   filterCount++;
 | |
|   _hasScan = true;
 | |
|   if (utils::isWide(cc->getWidth()))
 | |
|     wideColumnsWidths |= cc->getWidth();
 | |
|   idbassert(sessionID == scan.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   tableOID = pcs.tableOid();
 | |
|   cc.reset(new PseudoCCJL(pcs));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setQueryUuid(pcs.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   filterSteps.push_back(cc);
 | |
|   filterCount++;
 | |
|   idbassert(sessionID == pcs.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   tableOID = step.tableOid();
 | |
|   cc.reset(new ColumnCommandJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   filterSteps.push_back(cc);
 | |
|   filterCount++;
 | |
|   if (utils::isWide(cc->getWidth()))
 | |
|     wideColumnsWidths |= cc->getWidth();
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   tableOID = step.tableOid();
 | |
| 
 | |
|   if (filterCount == 0)
 | |
|   {
 | |
|     sendAbsRids = true;
 | |
|     sendValues = true;
 | |
|     absRids.reset(new uint64_t[8192]);
 | |
|   }
 | |
| 
 | |
|   cc.reset(new DictStepJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
| 
 | |
| #if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
 | |
|   if (filterSteps.size() > 0)
 | |
|   {
 | |
|     size_t stepsIndex = filterSteps.size() - 1;
 | |
|     SCommand prevCC = filterSteps[stepsIndex];
 | |
|     ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
 | |
|     DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
 | |
|     if (pcc && ccc)
 | |
|     {
 | |
|       filterSteps[stepsIndex].reset(
 | |
|           new ColumnCommandJL(*pcc, *ccc));  // column command will use same filters.
 | |
|     }
 | |
|   }
 | |
| #endif
 | |
|   filterSteps.push_back(cc);
 | |
|   filterCount++;
 | |
|   needStrValues = true;
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   tableOID = step.tableOid();
 | |
|   cc.reset(new FilterCommandJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   filterSteps.push_back(cc);
 | |
|   filterCount++;
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   cc.reset(new PseudoCCJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setTupleKey(step.tupleId());
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   projectSteps.push_back(cc);
 | |
|   colWidths.push_back(cc->getWidth());
 | |
|   tupleLength += cc->getWidth();
 | |
|   projectCount++;
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   cc.reset(new ColumnCommandJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setTupleKey(step.tupleId());
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   projectSteps.push_back(cc);
 | |
|   colWidths.push_back(cc->getWidth());
 | |
|   tupleLength += cc->getWidth();
 | |
|   projectCount++;
 | |
|   if (utils::isWide(cc->getWidth()))
 | |
|     wideColumnsWidths |= cc->getWidth();
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   cc.reset(new PassThruCommandJL(step));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setTupleKey(step.tupleId());
 | |
|   cc->setQueryUuid(step.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   projectSteps.push_back(cc);
 | |
|   colWidths.push_back(cc->getWidth());
 | |
|   tupleLength += cc->getWidth();
 | |
|   projectCount++;
 | |
| 
 | |
|   if (utils::isWide(cc->getWidth()))
 | |
|     wideColumnsWidths |= cc->getWidth();
 | |
| 
 | |
|   if (filterCount == 0)
 | |
|     sendValues = true;
 | |
| 
 | |
|   idbassert(sessionID == step.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   cc.reset(new RTSCommandJL(col, dict));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setTupleKey(dict.tupleId());
 | |
|   cc->setQueryUuid(col.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   projectSteps.push_back(cc);
 | |
|   colWidths.push_back(cc->getWidth());
 | |
|   tupleLength += cc->getWidth();
 | |
|   projectCount++;
 | |
|   needStrValues = true;
 | |
|   idbassert(sessionID == col.sessionId());
 | |
|   idbassert(sessionID == dict.sessionId());
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
 | |
| {
 | |
|   SCommand cc;
 | |
| 
 | |
|   cc.reset(new RTSCommandJL(p, dict));
 | |
|   cc->setBatchPrimitiveProcessor(this);
 | |
|   cc->setTupleKey(dict.tupleId());
 | |
|   cc->setQueryUuid(p.queryUuid());
 | |
|   cc->setStepUuid(uuid);
 | |
|   projectSteps.push_back(cc);
 | |
|   colWidths.push_back(cc->getWidth());
 | |
|   tupleLength += cc->getWidth();
 | |
|   projectCount++;
 | |
|   needStrValues = true;
 | |
| 
 | |
|   if (filterCount == 0)
 | |
|   {
 | |
|     sendValues = true;
 | |
|     sendAbsRids = true;
 | |
|     absRids.reset(new uint64_t[8192]);
 | |
|   }
 | |
| 
 | |
|   idbassert(sessionID == p.sessionId());
 | |
|   idbassert(sessionID == dict.sessionId());
 | |
| }
 | |
| 
 | |
| #if 0
 | |
| void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
 | |
| {
 | |
|     idbassert(sessionID == ds.sessionId());
 | |
| 
 | |
|     uint32_t i;
 | |
| 
 | |
|     templateTB = ds.fEmptyTableBand;
 | |
|     tableOID = ds.fTableOID;
 | |
| 
 | |
|     tableColumnCount = templateTB.getColumnCount();
 | |
|     idbassert(tableColumnCount > 0);
 | |
|     tablePositions.reset(new int[tableColumnCount]);
 | |
| 
 | |
|     for (i = 0; i < tableColumnCount; i++)
 | |
|         tablePositions[i] = -1;
 | |
| 
 | |
|     idbassert(projectCount <= projectSteps.size());
 | |
| 
 | |
|     /* In theory, tablePositions maps a column's table position to a projection step
 | |
|      -1 means it's a null column */
 | |
|     CalpontSystemCatalog::OID oid;
 | |
|     int idx;
 | |
| 
 | |
|     for (i = 0; i < projectCount; i++)
 | |
|     {
 | |
|         oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());
 | |
| 
 | |
|         if (oid > 0)
 | |
|         {
 | |
|             idx = templateTB.find(oid);
 | |
| 
 | |
|             if (idx >= 0)
 | |
|             {
 | |
|                 tablePositions[idx] = i;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
 | |
|                      " in tableband at pjstep idx " << i << endl;
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
 | |
|                  " doesn't have a valid OID" << endl;
 | |
|         }
 | |
|     }
 | |
| }
 | |
| #endif
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
 | |
| {
 | |
|   uint32_t i;
 | |
|   // 	rowCounter++;
 | |
| 
 | |
|   if (needToSetLBID)
 | |
|   {
 | |
|     needToSetLBID = false;
 | |
| 
 | |
|     for (i = 0; i < filterCount; ++i)
 | |
|       filterSteps[i]->setLBID(et.first, dbroot);
 | |
| 
 | |
|     for (i = 0; i < projectCount; ++i)
 | |
|       projectSteps[i]->setLBID(et.first, dbroot);
 | |
| 
 | |
|     baseRid = et.first & 0xffffffffffffe000ULL;
 | |
|   }
 | |
| 
 | |
|   // TODO: get rid of magics
 | |
|   if (sendAbsRids)
 | |
|     absRids[ridCount] = et.first;
 | |
|   else
 | |
|   {
 | |
|     relRids[ridCount] = et.first & 0x1fff;    // 8192 rows per logical block
 | |
|     ridMap |= 1 << (relRids[ridCount] >> 9);  // LSB -> 0-511, MSB -> 7680-8191
 | |
|   }
 | |
| 
 | |
|   if (sendValues)
 | |
|   {
 | |
|     // 		cout << "adding value " << et.second << endl;
 | |
|     values[ridCount] = et.second;
 | |
|   }
 | |
| 
 | |
|   ridCount++;
 | |
|   idbassert(ridCount <= 8192);
 | |
| }
 | |
| 
 | |
| #if 0
 | |
| void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|     uint32_t i;
 | |
|     rowgroup::Row r;
 | |
| 
 | |
|     inputRG.setData(rg.getData());
 | |
|     inputRG.initRow(&r);
 | |
|     inputRG.getRow(0, &r);
 | |
| 
 | |
|     if (needToSetLBID)
 | |
|     {
 | |
|         needToSetLBID = false;
 | |
| 
 | |
|         for (i = 0; i < filterCount; ++i)
 | |
|             filterSteps[i]->setLBID(r.getRid(), 1);
 | |
| 
 | |
|         for (i = 0; i < projectCount; ++i)
 | |
|             projectSteps[i]->setLBID(r.getRid(), 1);
 | |
| 
 | |
|         baseRid = r.getRid() & 0xffffffffffffe000ULL;
 | |
|     }
 | |
| }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
 | |
| {
 | |
|   if (filterCount == 0)
 | |
|     throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");
 | |
| 
 | |
|   addElementType(ElementType(et.first, et.first), dbroot);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * When the output type is ElementType, the messages have the following format:
 | |
|  *
 | |
|  * ISMPacketHeader (ignored)
 | |
|  * PrimitiveHeader (ignored)
 | |
|  * --
 | |
|  * If the BPP starts with a scan
 | |
|  *    casual partitioning data from the scanned column
 | |
|  * Base Rid for the returned logical block
 | |
|  * rid count
 | |
|  * (rid count)x 16-bit rids
 | |
|  * (rid count)x 64-bit values
 | |
|  * If there's a join performed by this BPP
 | |
|  *    pre-Join rid count   (for logging purposes only)
 | |
|  *    small-side new match count
 | |
|  *    (match count)x ElementTypes
 | |
|  * cached IO count
 | |
|  * physical IO count
 | |
|  * block touches
 | |
|  */
 | |
| 
 | |
| // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | |
| void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
 | |
|                                                 uint64_t* lbid, int64_t* min, int64_t* max,
 | |
|                                                 uint32_t* cachedIO, uint32_t* physIO,
 | |
|                                                 uint32_t* touchedBlocks) const
 | |
| {
 | |
|   uint32_t i;
 | |
|   uint16_t l_count;
 | |
|   uint64_t l_baseRid;
 | |
|   uint16_t* rids;
 | |
|   uint64_t* vals;
 | |
|   uint8_t* buf;
 | |
|   uint64_t tmp64;
 | |
|   uint8_t tmp8;
 | |
| 
 | |
|   /* skip the header */
 | |
|   idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | |
|   in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | |
| 
 | |
|   if (_hasScan)
 | |
|   {
 | |
|     in >> tmp8;
 | |
|     *validCPData = (tmp8 != 0);
 | |
| 
 | |
|     if (*validCPData)
 | |
|     {
 | |
|       in >> *lbid;
 | |
| 
 | |
|       in >> tmp64;
 | |
|       *min = (int64_t)tmp64;
 | |
|       in >> tmp64;
 | |
|       *max = (int64_t)tmp64;
 | |
|     }
 | |
|     else
 | |
|       in >> *lbid;
 | |
|   }
 | |
| 
 | |
|   in >> l_baseRid;
 | |
|   in >> l_count;
 | |
|   // 	rowsProcessed += l_count;
 | |
|   idbassert(l_count <= 8192);
 | |
|   out->resize(l_count);
 | |
| 
 | |
|   buf = (uint8_t*)in.buf();
 | |
| 
 | |
|   rids = (uint16_t*)buf;
 | |
|   vals = (uint64_t*)(buf + (l_count << 1));
 | |
|   idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
 | |
|   in.advance((l_count << 1) + (l_count << 3));
 | |
| 
 | |
|   for (i = 0; i < l_count; ++i)
 | |
|   {
 | |
|     (*out)[i].first = rids[i] + l_baseRid;
 | |
|     // 		if (tableOID >= 3000)
 | |
|     // 			idbassert((*out)[i].first > 1023);
 | |
|     (*out)[i].second = vals[i];
 | |
|   }
 | |
| 
 | |
|   in >> *cachedIO;
 | |
|   in >> *physIO;
 | |
|   in >> *touchedBlocks;
 | |
|   // 	cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
 | |
|   // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
 | |
|   idbassert(in.length() == 0);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * When the output type is StringElementType the messages have the following format:
 | |
|  *
 | |
|  * ISMPacketHeader  (ignored)
 | |
|  * PrimitiveHeader  (ignored)
 | |
|  * ---
 | |
|  * If the BPP starts with a scan
 | |
|  *     Casual partitioning data from the column scanned
 | |
|  * Rid count
 | |
|  * (rid count)x 64-bit absolute rids
 | |
|  * (rid count)x serialized strings
 | |
|  * cached IO count
 | |
|  * physical IO count
 | |
|  * blocks touched
 | |
|  */
 | |
| 
 | |
| // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | |
| void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
 | |
|                                                       bool* validCPData, uint64_t* lbid, int64_t* min,
 | |
|                                                       int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
 | |
|                                                       uint32_t* touchedBlocks) const
 | |
| {
 | |
|   uint32_t i;
 | |
|   uint16_t l_count;
 | |
|   uint64_t* l_absRids;
 | |
|   uint64_t tmp64;
 | |
|   uint8_t tmp8;
 | |
| 
 | |
|   // 	cout << "get String ETs uniqueID\n";
 | |
|   /* skip the header */
 | |
|   in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | |
| 
 | |
|   if (_hasScan)
 | |
|   {
 | |
|     in >> tmp8;
 | |
|     *validCPData = (tmp8 != 0);
 | |
| 
 | |
|     if (*validCPData)
 | |
|     {
 | |
|       in >> *lbid;
 | |
|       in >> tmp64;
 | |
|       *min = (int64_t)tmp64;
 | |
|       in >> tmp64;
 | |
|       *max = (int64_t)tmp64;
 | |
|     }
 | |
|     else
 | |
|       in >> *lbid;
 | |
|   }
 | |
| 
 | |
|   in >> l_count;
 | |
|   // 	cout << "parsing " << l_count << " strings\n";
 | |
|   l_absRids = (uint64_t*)in.buf();
 | |
|   out->resize(l_count);
 | |
|   in.advance(l_count << 3);
 | |
| 
 | |
|   for (i = 0; i < l_count; ++i)
 | |
|   {
 | |
|     (*out)[i].first = l_absRids[i];
 | |
|     in >> (*out)[i].second;
 | |
|   }
 | |
| 
 | |
|   in >> *cachedIO;
 | |
|   in >> *physIO;
 | |
|   in >> *touchedBlocks;
 | |
|   // 	cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
 | |
|   // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
 | |
|   idbassert(in.length() == 0);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * When the output type is Tuples, the input message format is the same
 | |
|  * as when the output type is TableBands
 | |
|  */
 | |
| 
 | |
| // TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | |
| void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
 | |
|                                           bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
 | |
|                                           uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
 | |
| {
 | |
|   uint32_t i, j, pos, len;
 | |
|   uint16_t l_rowCount;
 | |
|   uint64_t l_baseRid;
 | |
|   uint16_t* l_relRids;
 | |
|   uint64_t absRids[8192];
 | |
|   // const uint8_t* columnData[projectCount];
 | |
|   const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
 | |
|   memset(columnData, 0, projectCount * sizeof(uint8_t*));
 | |
|   const uint8_t* buf;
 | |
|   // uint32_t colLengths[projectCount];
 | |
|   uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
 | |
|   uint64_t tmp64;
 | |
|   uint8_t tmp8;
 | |
| 
 | |
|   // 	cout << "getTuples msg is " << in.length() << " bytes\n";
 | |
|   /* skip the header */
 | |
|   in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | |
| 
 | |
|   if (_hasScan)
 | |
|   {
 | |
|     in >> tmp8;
 | |
|     *validCPData = (tmp8 != 0);
 | |
| 
 | |
|     if (*validCPData)
 | |
|     {
 | |
|       in >> *lbid;
 | |
|       in >> tmp64;
 | |
|       *min = (int64_t)tmp64;
 | |
|       in >> tmp64;
 | |
|       *max = (int64_t)tmp64;
 | |
|     }
 | |
|     else
 | |
|       in >> *lbid;
 | |
|   }
 | |
| 
 | |
|   in >> l_rowCount;
 | |
| 
 | |
|   // 	cout << "read " << l_rowCount << " rows\n";
 | |
| 
 | |
|   if (needRidsAtDelivery)
 | |
|   {
 | |
|     in >> l_baseRid;
 | |
|     l_relRids = (uint16_t*)in.buf();
 | |
| 
 | |
|     for (i = 0; i < l_rowCount; i++)
 | |
|       absRids[i] = l_relRids[i] + l_baseRid;
 | |
| 
 | |
|     in.advance(l_rowCount << 1);
 | |
|   }
 | |
| 
 | |
|   /* Set up pointers to the column data */
 | |
|   pos = 0;
 | |
|   buf = in.buf();
 | |
| 
 | |
|   for (i = 0; i < projectCount; i++)
 | |
|   {
 | |
|     colLengths[i] = *((uint32_t*)&buf[pos]);
 | |
|     pos += 4;
 | |
|     columnData[i] = &buf[pos];
 | |
|     // 		cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
 | |
|     pos += colLengths[i];
 | |
|     idbassert(pos < in.length());
 | |
|   }
 | |
| 
 | |
|   in.advance(pos);
 | |
| 
 | |
|   out->resize(l_rowCount);
 | |
| 
 | |
|   for (i = 0; i < l_rowCount; i++)
 | |
|   {
 | |
|     (*out)[i].first = absRids[i];
 | |
|     (*out)[i].second = new char[tupleLength];
 | |
| 
 | |
|     for (j = 0, pos = 0; j < projectCount; j++)
 | |
|     {
 | |
|       idbassert(pos + colWidths[j] <= tupleLength);
 | |
| 
 | |
|       if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
 | |
|       {
 | |
|         len = *((uint32_t*)columnData[j]);
 | |
|         columnData[j] += 4;
 | |
|         memcpy(&(*out)[i].second[pos], columnData[j], len);
 | |
|         pos += len;
 | |
|         columnData[j] += len;
 | |
| 
 | |
|         // insert padding...
 | |
|         memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
 | |
|         pos += colWidths[j] - len;
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         switch (colWidths[j])
 | |
|         {
 | |
|           case 8:
 | |
|             *((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
 | |
|             columnData[j] += 8;
 | |
|             pos += 8;
 | |
|             break;
 | |
| 
 | |
|           case 4:
 | |
|             *((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
 | |
|             columnData[j] += 4;
 | |
|             pos += 4;
 | |
|             break;
 | |
| 
 | |
|           case 2:
 | |
|             *((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
 | |
|             columnData[j] += 2;
 | |
|             pos += 4;
 | |
|             break;
 | |
| 
 | |
|           case 1:
 | |
|             (*out)[i].second[pos] = (char)*columnData[j];
 | |
|             columnData[j]++;
 | |
|             pos++;
 | |
|             break;
 | |
| 
 | |
|           // TODO MCOL-641
 | |
|           case 16:
 | |
|             // fallthrough
 | |
|           default:
 | |
|             cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
 | |
|             throw logic_error("BPP::getTuples(): bad column width");
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   in >> *cachedIO;
 | |
|   in >> *physIO;
 | |
|   in >> *touchedBlocks;
 | |
|   idbassert(in.length() == 0);
 | |
| }
 | |
| 
 | |
| bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
 | |
| {
 | |
|   const uint8_t* data = in.buf();
 | |
|   uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);  // skip the headers
 | |
| 
 | |
|   ISMPacketHeader* hdr = (ISMPacketHeader*)(data);
 | |
| 
 | |
|   if (_hasScan && in.length() > offset)
 | |
|   {
 | |
|     // This is a legitimate error message sent by PrimProc
 | |
|     // so we need to return to allow upper layer to throw an error
 | |
|     // if needed.
 | |
|     if (hdr->Status > 0)
 | |
|     {
 | |
|       return true;
 | |
|     }
 | |
| 
 | |
|     if (data[offset] != 0)
 | |
|       offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
 | |
|                 1;  // skip the CP data with wide min/max values (16/32 bytes each). we also skip
 | |
|                     // cpFromDictScan flag.
 | |
|     else
 | |
|       offset += CP_FLAG_AND_LBID;  // skip only the "valid CP data" & LBID bytes
 | |
|   }
 | |
| 
 | |
|   // Throw b/c PP throws and sends here error msg.
 | |
|   // See BatchPrimitiveProcessor::writeErrorMsg() for details.
 | |
|   // The inversion of the assert used here previously.
 | |
|   if (in.length() <= offset)
 | |
|   {
 | |
|     if (hdr->Status > 0)
 | |
|     {
 | |
|       throw std::runtime_error(" an exception originally thrown by PrimProc: ");
 | |
|     }
 | |
|     throw std::runtime_error(
 | |
|         " an exception because there is not enough \
 | |
| data in the Primitive message from PrimProc.");
 | |
|   }
 | |
| 
 | |
|   return (data[offset] != 0);
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
 | |
| {
 | |
|   RGData rgData;
 | |
|   uint32_t count, i;
 | |
| 
 | |
|   *in >> count;
 | |
| 
 | |
|   for (i = 0; i < count; i++)
 | |
|   {
 | |
|     rgData.deserialize(*in, true);
 | |
|     out->push_back(rgData);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
 | |
|                                                 uint64_t* lbid, bool* fromDictScan, int128_t* min,
 | |
|                                                 int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
 | |
|                                                 uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
 | |
|                                                 bool* hasWideColumn,
 | |
|                                                 const execplan::CalpontSystemCatalog::ColType& colType) const
 | |
| {
 | |
|   uint64_t tmp64;
 | |
|   int128_t tmp128;
 | |
|   uint8_t tmp8;
 | |
|   RGData rgData;
 | |
|   uint32_t rowCount;
 | |
|   RowGroup& org = primprocRG[threadID];
 | |
| 
 | |
|   out->clear();
 | |
| 
 | |
|   if (in.length() == 0)
 | |
|   {
 | |
|     // done, return an empty RG
 | |
|     rgData = RGData(org, 0U);
 | |
|     org.setData(&rgData);
 | |
|     org.resetRowGroup(0);
 | |
|     out->push_back(rgData);
 | |
|     *cachedIO = 0;
 | |
|     *physIO = 0;
 | |
|     *touchedBlocks = 0;
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   /* skip the header */
 | |
|   in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | |
| 
 | |
|   if (_hasScan)
 | |
|   {
 | |
|     in >> tmp8;
 | |
|     *validCPData = (tmp8 != 0);
 | |
| 
 | |
|     if (*validCPData)
 | |
|     {
 | |
|       in >> *lbid;
 | |
|       in >> tmp8;
 | |
|       *fromDictScan = tmp8 != 0;
 | |
|       in >> tmp8;
 | |
|       *hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
 | |
|       if (UNLIKELY(*hasWideColumn))
 | |
|       {
 | |
|         idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
 | |
|         if (LIKELY(colType.isWideDecimalType()))
 | |
|         {
 | |
|           in >> tmp128;
 | |
|           *min = tmp128;
 | |
|           in >> tmp128;
 | |
|           *max = tmp128;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           std::ostringstream oss;
 | |
|           oss << __func__ << " WARNING!!! Not implemented for the data type ";
 | |
|           oss << colType.colDataType << std::endl;
 | |
|           std::cout << oss.str();
 | |
|           idbassert(false);
 | |
|         }
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         in >> tmp64;
 | |
|         *min = static_cast<int128_t>(tmp64);
 | |
|         in >> tmp64;
 | |
|         *max = static_cast<int128_t>(tmp64);
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|       in >> *lbid;
 | |
|   }
 | |
| 
 | |
|   in >> tmp8;
 | |
|   *countThis = (tmp8 != 0);
 | |
| 
 | |
|   /* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
 | |
|    * For later... */
 | |
|   if (aggregatorPM)
 | |
|   {
 | |
|     deserializeAggregateResult(&in, out);
 | |
|     // for (uint32_t z = 0; z < out->size(); z++) {
 | |
|     //	org.setData(&(*out)[z]);
 | |
|     //	cout << "BPPJL: " << org.toString() << endl;
 | |
|     //}
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     rgData.deserialize(in, true);
 | |
|     out->push_back(rgData);
 | |
|     org.setData(&rgData);
 | |
| 
 | |
|     rowCount = org.getRowCount();
 | |
| 
 | |
|     bool pmSendsMatchesAnyway =
 | |
|         (hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));
 | |
| 
 | |
|     if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
 | |
|     {
 | |
|       std::shared_ptr<vector<uint32_t>[]> joinResults;
 | |
|       uint32_t i, j;
 | |
| 
 | |
|       if (pmSendsMatchesAnyway)
 | |
|       {
 | |
|         uint16_t joinRowCount;
 | |
|         in >> joinRowCount;
 | |
|         rowCount = joinRowCount;
 | |
|       }
 | |
| 
 | |
|       for (j = 0; j < PMJoinerCount; j++)
 | |
|       {
 | |
|         /* Reuse the result space if possible */
 | |
|         joinResults = tJoiners[j]->getPMJoinArrays(threadID);
 | |
| 
 | |
|         if (joinResults.get() == NULL)
 | |
|         {
 | |
|           auto v = new vector<uint32_t>[8192];
 | |
|           joinResults.reset(v);
 | |
|           tJoiners[j]->setPMJoinResults(joinResults, threadID);
 | |
|         }
 | |
| 
 | |
|         for (i = 0; i < rowCount; i++)
 | |
|           deserializeInlineVector<uint32_t>(in, joinResults[i]);
 | |
| 
 | |
|         if (tJoiners[j]->smallOuterJoin())
 | |
|           tJoiners[j]->markMatches(threadID, rowCount);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (*countThis)
 | |
|   {
 | |
|     // 		cout << "grabbing io stats\n";
 | |
|     in >> *cachedIO;
 | |
|     in >> *physIO;
 | |
|     in >> *touchedBlocks;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     *cachedIO = 0;
 | |
|     *physIO = 0;
 | |
|     *touchedBlocks = 0;
 | |
|   }
 | |
| 
 | |
|   idbassert(in.length() == 0);
 | |
| }
 | |
| 
 | |
| RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
 | |
| {
 | |
|   RGData ret;
 | |
|   rowgroup::RowGroup rg(projectionRG);
 | |
| 
 | |
|   ret = RGData(rg, 0U);
 | |
|   rg.setData(&ret);
 | |
|   // rg.convertToInlineDataInPlace();
 | |
|   rg.resetRowGroup(0);
 | |
|   rg.setStatus(error);
 | |
|   return ret;
 | |
|   // return ret->rowData;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::reset()
 | |
| {
 | |
|   ridCount = 0;
 | |
|   ridMap = 0;
 | |
|   needToSetLBID = true;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
 | |
| {
 | |
|   uint32_t i;
 | |
| 
 | |
|   dbRoot = scannedExtent.dbRoot;
 | |
|   baseRid = rowgroup::convertToRid(
 | |
|       scannedExtent.partitionNum, scannedExtent.segmentNum,
 | |
|       scannedExtent.blockOffset / (scannedExtent.range.size * 1024),  // the extent #
 | |
|       (l - scannedExtent.range.start) / scannedExtent.range.size);    // the logical block #
 | |
| 
 | |
|   /*
 | |
|   cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
 | |
|               << " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
 | |
|               scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
 | |
|               " blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
 | |
|   */
 | |
| 
 | |
|   for (i = 0; i < filterCount; ++i)
 | |
|     filterSteps[i]->setLBID(baseRid, dbRoot);
 | |
| 
 | |
|   for (i = 0; i < projectCount; ++i)
 | |
|     projectSteps[i]->setLBID(baseRid, dbRoot);
 | |
| }
 | |
| 
 | |
| string BatchPrimitiveProcessorJL::toString() const
 | |
| {
 | |
|   ostringstream ret;
 | |
|   uint32_t i;
 | |
| 
 | |
|   ret << "BatchPrimitiveProcessorJL:" << endl;
 | |
| 
 | |
|   if (!_hasScan)
 | |
|   {
 | |
|     if (sendValues)
 | |
|       ret << "   -- serializing values" << endl;
 | |
| 
 | |
|     if (sendAbsRids)
 | |
|       ret << "   -- serializing absolute rids" << endl;
 | |
|     else
 | |
|       ret << "   -- serializing relative rids" << endl;
 | |
|   }
 | |
|   else
 | |
|     ret << "   -- scan driven" << endl;
 | |
| 
 | |
|   ret << "   " << (int)filterCount << " filter steps:\n";
 | |
| 
 | |
|   for (i = 0; i < filterCount; i++)
 | |
|     ret << "      " << filterSteps[i]->toString() << endl;
 | |
| 
 | |
|   ret << "   " << (int)projectCount << " projection steps:\n";
 | |
| 
 | |
|   for (i = 0; i < projectCount; i++)
 | |
|     ret << "      " << projectSteps[i]->toString() << endl;
 | |
| 
 | |
|   return ret.str();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The creation messages have the following format:
 | |
|  *
 | |
|  * ISMPacketHeader (necessary for DEC and ReadThread classes)
 | |
|  * ---
 | |
|  * output type (ElementType, StringElementType, etc)
 | |
|  * version #
 | |
|  * transaction #
 | |
|  * session #  (unnecessary except for debugging)
 | |
|  * step ID    ( same )
 | |
|  * unique ID  (uniquely identifies the DEC queue on the UM)
 | |
|  * 8-bit flags
 | |
|  * if there's a join...
 | |
|  *      # of elements in the Joiner object
 | |
|  *       a flag whether or not to match every element (for inner joins)
 | |
|  * filter step count
 | |
|  * (filter count)x serialized Commands
 | |
|  * projection step count
 | |
|  * (projection count)x serialized Commands
 | |
|  */
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
 | |
| {
 | |
|   ISMPacketHeader ism;
 | |
|   uint32_t i;
 | |
|   uint16_t flags = 0;
 | |
| 
 | |
|   ism.Command = BATCH_PRIMITIVE_CREATE;
 | |
| 
 | |
|   bs.load((uint8_t*)&ism, sizeof(ism));
 | |
|   bs << (uint8_t)ot;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)txnID;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|   bs << uniqueID;
 | |
|   bs << versionInfo;
 | |
| 
 | |
|   if (needStrValues)
 | |
|     flags |= NEED_STR_VALUES;
 | |
| 
 | |
|   if (sendAbsRids)
 | |
|     flags |= GOT_ABS_RIDS;
 | |
| 
 | |
|   if (sendValues)
 | |
|     flags |= GOT_VALUES;
 | |
| 
 | |
|   if (LBIDTrace)
 | |
|     flags |= LBID_TRACE;
 | |
| 
 | |
|   if (needRidsAtDelivery)
 | |
|     flags |= SEND_RIDS_AT_DELIVERY;
 | |
| 
 | |
|   if (tJoiners.size() > 0)
 | |
|     flags |= HAS_JOINER;
 | |
| 
 | |
|   if (sendTupleJoinRowGroupData)
 | |
|     flags |= JOIN_ROWGROUP_DATA;
 | |
| 
 | |
|   if (wideColumnsWidths)
 | |
|     flags |= HAS_WIDE_COLUMNS;
 | |
| 
 | |
|   bs << flags;
 | |
| 
 | |
|   if (wideColumnsWidths)
 | |
|     bs << wideColumnsWidths;
 | |
| 
 | |
|   bs << bop;
 | |
|   bs << (uint8_t)(forHJ ? 1 : 0);
 | |
| 
 | |
|   if (ot == ROW_GROUP)
 | |
|   {
 | |
|     bs << projectionRG;
 | |
|     // 		cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;
 | |
| 
 | |
|     /* F&E serialization */
 | |
|     if (fe1)
 | |
|     {
 | |
|       bs << (uint8_t)1;
 | |
|       bs << *fe1;
 | |
|       bs << fe1Input;
 | |
|     }
 | |
|     else
 | |
|       bs << (uint8_t)0;
 | |
| 
 | |
|     if (fe2)
 | |
|     {
 | |
|       bs << (uint8_t)1;
 | |
|       bs << *fe2;
 | |
|       bs << fe2Output;
 | |
|     }
 | |
|     else
 | |
|       bs << (uint8_t)0;
 | |
|   }
 | |
| 
 | |
|   /* if HAS_JOINER, send the init params */
 | |
|   if (flags & HAS_JOINER)
 | |
|   {
 | |
|     bs << (uint32_t)maxPmJoinResultCount;
 | |
|     if (ot == ROW_GROUP)
 | |
|     {
 | |
|       idbassert(tJoiners.size() > 0);
 | |
|       bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;
 | |
| 
 | |
|       bool atLeastOneFE = false;
 | |
| #ifdef JLF_DEBUG
 | |
|       cout << "PMJoinerCount = " << PMJoinerCount << endl;
 | |
| #endif
 | |
| 
 | |
|       bool smallSideRGSent = false;
 | |
|       for (i = 0; i < PMJoinerCount; i++)
 | |
|       {
 | |
|         bs << (uint32_t)tJoiners[i]->size();
 | |
|         bs << tJoiners[i]->getJoinType();
 | |
| 
 | |
|         // bs << (uint64_t) tJoiners[i]->smallNullValue();
 | |
| 
 | |
|         bs << (uint8_t)tJoiners[i]->isTypelessJoin();
 | |
| 
 | |
|         if (tJoiners[i]->hasFEFilter())
 | |
|         {
 | |
|           atLeastOneFE = true;
 | |
| #ifdef JLF_DEBUG
 | |
|           cout << "serializing join FE object\n";
 | |
| #endif
 | |
|           bs << *tJoiners[i]->getFcnExpFilter();
 | |
|         }
 | |
| 
 | |
|         if (!tJoiners[i]->isTypelessJoin())
 | |
|         {
 | |
|           bs << (uint64_t)tJoiners[i]->smallNullValue();
 | |
|           bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
 | |
|           // cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
 | |
|           bs << (uint32_t)tJoiners[i]->getKeyLength();
 | |
|           // MCOL-4173 Notify PP if smallSide and largeSide have different column widths
 | |
|           // and send smallSide RG to PP.
 | |
|           bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
 | |
|           bs << (uint8_t)joinHasSkewedKeyColumn;
 | |
|           if (!smallSideRGSent && joinHasSkewedKeyColumn)
 | |
|           {
 | |
|             idbassert(!smallSideRGs.empty());
 | |
|             bs << smallSideRGs[0];
 | |
|             serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
 | |
|             smallSideRGSent = true;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (atLeastOneFE)
 | |
|         bs << joinFERG;
 | |
| 
 | |
|       if (sendTupleJoinRowGroupData)
 | |
|       {
 | |
| #ifdef JLF_DEBUG
 | |
|         cout << "sending smallside data\n";
 | |
| #endif
 | |
|         serializeVector<RowGroup>(bs, smallSideRGs);
 | |
|         bs << largeSideRG;
 | |
|         bs << joinedRG;  // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
 | |
|                          // 				cout << "joined RG: " << joinedRG.toString() << endl;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   bs << filterCount;
 | |
| 
 | |
|   for (i = 0; i < filterCount; ++i)
 | |
|   {
 | |
|     // 		cout << "serializing step " << i << endl;
 | |
|     filterSteps[i]->createCommand(bs);
 | |
|   }
 | |
| 
 | |
|   bs << projectCount;
 | |
| 
 | |
|   for (i = 0; i < projectCount; ++i)
 | |
|     projectSteps[i]->createCommand(bs);
 | |
| 
 | |
|   // aggregate step only when output is row group
 | |
|   if (ot == ROW_GROUP)
 | |
|   {
 | |
|     if (aggregatorPM.get() != NULL)
 | |
|     {
 | |
|       bs << (uint8_t)1;
 | |
|       bs << aggregateRGPM;
 | |
|       bs << *(aggregatorPM.get());
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       bs << (uint8_t)0;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // decide which rowgroup is received by PrimProc
 | |
|   if (ot == ROW_GROUP)
 | |
|   {
 | |
|     primprocRG.reset(new RowGroup[threadCount]);
 | |
| 
 | |
|     for (uint32_t i = 0; i < threadCount; i++)
 | |
|       if (aggregatorPM)
 | |
|         primprocRG[i] = aggregateRGPM;
 | |
|       else if (fe2)
 | |
|         primprocRG[i] = fe2Output;
 | |
|       // This shouldn't be necessary. As of 2-17-14, PrimProc
 | |
|       // will only send joined results if fe2 || aggregatorPM,
 | |
|       // so it will never send back data for joinedRG.
 | |
|       // else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
 | |
|       //	primprocRG[i] = joinedRG;
 | |
|       else
 | |
|         primprocRG[i] = projectionRG;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The BPP Run messages have the following format:
 | |
|  *
 | |
|  * ISMPacketHeader
 | |
|  *    -- Interleave field is used for DEC interleaving data
 | |
|  *    -- Size field is used for the relative weight to process the message
 | |
|  * -----
 | |
|  * Session ID
 | |
|  * Step ID
 | |
|  * Unique ID
 | |
|  * Sequence #
 | |
|  * Iteration count
 | |
|  * Rid count
 | |
|  * If absolute rids are sent
 | |
|  *    (rid count)x 64-bit absolute rids
 | |
|  * else
 | |
|  *    RID range bitmap
 | |
|  *    base RID
 | |
|  *    (rid count)x 16-bit relative rids
 | |
|  * If values are sent
 | |
|  *    (rid count)x 64-bit values
 | |
|  * (filter count)x run msgs for filter Commands
 | |
|  * (projection count)x run msgs for projection Commands
 | |
|  */
 | |
| 
 | |
| // The deser counterpart function is BPP::resetBPP
 | |
| void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
 | |
| {
 | |
|   ISMPacketHeader ism;
 | |
|   uint32_t i;
 | |
| 
 | |
|   /* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
 | |
|   save bandwidth.  They're completely unused otherwise.  We need to throw out all unused
 | |
|   fields of every defined header. */
 | |
| 
 | |
|   bs.restart();
 | |
| 
 | |
|   memset((void*)&ism, 0, sizeof(ism));
 | |
|   ism.Command = BATCH_PRIMITIVE_RUN;
 | |
| 
 | |
|   // TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
 | |
|   // defined for each PM.
 | |
|   ism.Interleave = pmNum;
 | |
|   //	ism.Interleave = pmNum - 1;
 | |
| 
 | |
|   /* ... and the Size field is used for the "weight" of processing a BPP
 | |
|     where 1 is one Command on one logical block. */
 | |
|   ism.Size = count * (filterCount + projectCount);
 | |
| 
 | |
|   bs.append((uint8_t*)&ism, sizeof(ism));
 | |
| 
 | |
|   /* The next 4 vars are for BPPSeeder; BPP itself skips them */
 | |
|   bs << sessionID;
 | |
|   bs << stepID;
 | |
|   bs << uniqueID;
 | |
|   bs << _priority;
 | |
| 
 | |
|   // The weight is used by PrimProc thread pool algo
 | |
|   uint32_t weight = calculateBPPWeight();
 | |
|   bs << weight;
 | |
| 
 | |
|   bs << dbRoot;
 | |
|   bs << count;
 | |
|   uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
 | |
|   bs << sentByEM;
 | |
| 
 | |
|   if (_hasScan)
 | |
|   {
 | |
|     idbassert(ridCount == 0);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
 | |
|   }
 | |
| 
 | |
|   bs << ridCount;
 | |
| 
 | |
|   if (sendAbsRids)
 | |
|     bs.append((uint8_t*)absRids.get(), ridCount << 3);
 | |
|   else
 | |
|   {
 | |
|     bs << ridMap;
 | |
|     bs << baseRid;
 | |
|     bs.append((uint8_t*)relRids, ridCount << 1);
 | |
|   }
 | |
| 
 | |
|   if (sendValues)
 | |
|     bs.append((uint8_t*)values, ridCount << 3);
 | |
| 
 | |
|   for (i = 0; i < filterCount; i++)
 | |
|     filterSteps[i]->runCommand(bs);
 | |
| 
 | |
|   for (i = 0; i < projectCount; i++)
 | |
|     projectSteps[i]->runCommand(bs);
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
 | |
| {
 | |
|   ISMPacketHeader ism;
 | |
|   bs.restart();
 | |
| 
 | |
|   memset((void*)&ism, 0, sizeof(ism));
 | |
|   ism.Command = BATCH_PRIMITIVE_RUN;
 | |
|   ism.Status = status;
 | |
|   ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);
 | |
| 
 | |
|   bs.append((uint8_t*)&ism, sizeof(ism));
 | |
| 
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|   bs << uniqueID;
 | |
|   bs << count;
 | |
|   bs << ridCount;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
 | |
| {
 | |
|   ISMPacketHeader ism;
 | |
| 
 | |
|   // 	if (!(sessionID & 0x80000000))
 | |
|   // 		cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
 | |
|   // 			<< rowsProcessed << " rows" << endl;
 | |
| 
 | |
|   memset((void*)&ism, 0, sizeof(ism));
 | |
|   ism.Command = BATCH_PRIMITIVE_DESTROY;
 | |
| 
 | |
|   bs.append((uint8_t*)&ism, sizeof(ism));
 | |
|   /* XXXPAT: We can get rid of sessionID and stepID;
 | |
|   it's there for easier debugging only */
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|   bs << uniqueID;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
 | |
| {
 | |
|   pos = 0;
 | |
|   joinerNum = 0;
 | |
|   tJoiners = j;
 | |
| 
 | |
|   PMJoinerCount = 0;
 | |
|   tlKeyLens.reset(new uint32_t[tJoiners.size()]);
 | |
| 
 | |
|   for (uint32_t i = 0; i < tJoiners.size(); i++)
 | |
|   {
 | |
|     if (tJoiners[i]->inPM())
 | |
|     {
 | |
|       PMJoinerCount++;
 | |
|       smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
 | |
|       smallSideRGs.push_back(tJoiners[i]->getSmallRG());
 | |
| 
 | |
|       if (tJoiners[i]->isTypelessJoin())
 | |
|         tlKeyLens[i] = tJoiners[i]->getKeyLength();
 | |
| 
 | |
|       if (tJoiners[i]->hasFEFilter())
 | |
|         sendTupleJoinRowGroupData = true;
 | |
| 
 | |
|       if (tJoiners[i]->smallOuterJoin())
 | |
|         hasSmallOuterJoin = true;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   largeSideRG = tJoiners[0]->getLargeRG();
 | |
| 
 | |
|   if (aggregatorPM || fe2)
 | |
|   {
 | |
|     sendTupleJoinRowGroupData = true;
 | |
| #ifdef JLF_DEBUG
 | |
|     cout << "will send small side row data\n";
 | |
| #endif
 | |
|   }
 | |
|   posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
 | |
|   memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
 | |
| }
 | |
| 
 | |
| // helper fcn to interleave small side data by joinernum
 | |
| bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
 | |
| {
 | |
|   uint i;
 | |
|   // find the next joiner that still has more data to send.  Set joinerNum & pos.
 | |
|   for (i = 0; i < PMJoinerCount; i++)
 | |
|   {
 | |
|     joinerNum = (joinerNum + 1) % PMJoinerCount;
 | |
|     if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide().size())
 | |
|       break;
 | |
|   }
 | |
|   if (i == PMJoinerCount)
 | |
|     return false;
 | |
|   pos = posByJoinerNum[joinerNum];
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| /* This algorithm relies on the joiners being sorted by size atm */
 | |
| /* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
 | |
| bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
 | |
| {
 | |
|   uint32_t toSend, i, j;
 | |
|   ISMPacketHeader ism;
 | |
|   Row r;
 | |
|   joiner::TypelessData tlData;
 | |
|   uint32_t smallKeyCol;
 | |
|   uint32_t largeKeyCol;
 | |
|   uint64_t smallkey;
 | |
|   bool isNull;
 | |
|   bool bSignedUnsigned;
 | |
| 
 | |
|   bool moreMsgs = pickNextJoinerNum();
 | |
| 
 | |
|   if (!moreMsgs)
 | |
|   {
 | |
|     /* last message */
 | |
|     // 		cout << "sending last joiner msg\n";
 | |
|     ism.Command = BATCH_PRIMITIVE_END_JOINER;
 | |
|     bs.load((uint8_t*)&ism, sizeof(ism));
 | |
|     bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|     bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|     bs << uniqueID;
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   memset((void*)&ism, 0, sizeof(ism));
 | |
|   auto& tSmallSide = tJoiners[joinerNum]->getSmallSide();
 | |
|   auto size = tSmallSide.size();
 | |
| 
 | |
| #if 0
 | |
|     if (joinerNum == PMJoinerCount - 1 && pos == size)
 | |
|     {
 | |
|         /* last message */
 | |
| // 		cout << "sending last joiner msg\n";
 | |
|         ism.Command = BATCH_PRIMITIVE_END_JOINER;
 | |
|         bs.load((uint8_t*) &ism, sizeof(ism));
 | |
|         bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|         bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|         bs << uniqueID;
 | |
|         pos = 0;
 | |
|         return false;
 | |
|     }
 | |
|     else if (pos == size)
 | |
|     {
 | |
|         joinerNum++;
 | |
|         tSmallSide = tJoiners[joinerNum]->getSmallSide();
 | |
|         size = tSmallSide->size();
 | |
|         pos = 0;
 | |
|     }
 | |
| #endif
 | |
| 
 | |
|   ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
 | |
|   bs.load((uint8_t*)&ism, sizeof(ism));
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | |
|   bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | |
|   bs << uniqueID;
 | |
| 
 | |
|   smallSideRGs[joinerNum].initRow(&r);
 | |
| 
 | |
|   unsigned metasize = 12;  // 12 = sizeof(struct JoinerElements)
 | |
| 
 | |
|   if (tJoiners[joinerNum]->isTypelessJoin())
 | |
|     metasize = 0;
 | |
| 
 | |
|   unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
 | |
|   toSend = std::min<unsigned int>(size - pos, rowunits);
 | |
|   bs << toSend;
 | |
|   bs << pos;
 | |
|   bs << joinerNum;
 | |
| 
 | |
|   if (tJoiners[joinerNum]->isTypelessJoin())
 | |
|   {
 | |
|     // TODO: change RM ptr to ref b/c its scope and lifetime lasts till the end of the program.
 | |
|     auto alloc = rm_->getAllocator<utils::FixedAllocatorBufType>();
 | |
|     utils::FixedAllocator fa(alloc, tlKeyLens[joinerNum], true);
 | |
| 
 | |
|     for (i = pos; i < pos + toSend; i++)
 | |
|     {
 | |
|       r.setPointer(tSmallSide[i]);
 | |
|       isNull = false;
 | |
|       bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
 | |
| 
 | |
|       for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
 | |
|       {
 | |
|         isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);
 | |
| 
 | |
|         if (UNLIKELY(bSignedUnsigned))
 | |
|         {
 | |
|           // BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
 | |
|           // then it should not compare. Send null to PM to prevent compare
 | |
|           smallKeyCol = smallSideKeys[joinerNum][j];
 | |
|           largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];
 | |
| 
 | |
|           if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
 | |
|           {
 | |
|             if (r.isUnsigned(smallKeyCol))
 | |
|               smallkey = r.getUintField(smallKeyCol);
 | |
|             else
 | |
|               smallkey = r.getIntField(smallKeyCol);
 | |
| 
 | |
|             if (smallkey & 0x8000000000000000ULL)
 | |
|             {
 | |
|               isNull = true;
 | |
|               break;
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (!isNull)
 | |
|       {
 | |
|         tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
 | |
|                                  tJoiners[joinerNum]->getLargeKeyColumns());
 | |
|         if (tlData.len == 0)
 | |
|         {
 | |
|           isNull = true;
 | |
|         }
 | |
|       }
 | |
|       bs << (uint8_t)isNull;
 | |
|       if (!isNull)
 | |
|       {
 | |
|         tlData.serialize(bs);
 | |
|         bs << i;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
| #pragma pack(push, 1)
 | |
|     struct JoinerElements
 | |
|     {
 | |
|       int64_t key;
 | |
|       uint32_t value;
 | |
|     } * arr;
 | |
| #pragma pack(pop)
 | |
|     bs.needAtLeast(toSend * sizeof(JoinerElements));
 | |
|     arr = (JoinerElements*)bs.getInputPtr();
 | |
| 
 | |
|     smallKeyCol = smallSideKeys[joinerNum][0];
 | |
|     bSignedUnsigned =
 | |
|         r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
 | |
|     j = 0;
 | |
| 
 | |
|     for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
 | |
|     {
 | |
|       r.setPointer(tSmallSide[i]);
 | |
| 
 | |
|       if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
 | |
|       {
 | |
|         // Small side is a long double. Since CS can't store larger than DOUBLE,
 | |
|         // we need to convert to whatever type large side is -- double or int64
 | |
|         long double smallkeyld = r.getLongDoubleField(smallKeyCol);
 | |
|         switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
 | |
|         {
 | |
|           case CalpontSystemCatalog::DOUBLE:
 | |
|           case CalpontSystemCatalog::UDOUBLE:
 | |
|           case CalpontSystemCatalog::FLOAT:
 | |
|           case CalpontSystemCatalog::UFLOAT:
 | |
|           {
 | |
|             if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
 | |
|             {
 | |
|               smallkey = joblist::UBIGINTEMPTYROW;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|               double d = (double)smallkeyld;
 | |
|               smallkey = *(int64_t*)&d;
 | |
|             }
 | |
|             break;
 | |
|           }
 | |
|           default:
 | |
|           {
 | |
|             if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
 | |
|             {
 | |
|               smallkey = joblist::UBIGINTEMPTYROW;
 | |
|             }
 | |
|             else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
 | |
|             {
 | |
|               smallkey = joblist::UBIGINTEMPTYROW;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|               smallkey = (int64_t)smallkeyld;
 | |
|             }
 | |
|             break;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|       else if (r.isUnsigned(smallKeyCol))
 | |
|         smallkey = r.getUintField(smallKeyCol);
 | |
|       else
 | |
|         smallkey = r.getIntField(smallKeyCol);
 | |
| 
 | |
|       // If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
 | |
|       // against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
 | |
|       if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
 | |
|         smallkey = joblist::UBIGINTEMPTYROW;
 | |
| 
 | |
|       arr[j].key = (int64_t)smallkey;
 | |
|       arr[j].value = i;
 | |
|       // 			cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
 | |
|     }
 | |
| 
 | |
|     bs.advanceInputPtr(toSend * sizeof(JoinerElements));
 | |
|   }
 | |
| 
 | |
|   if (sendTupleJoinRowGroupData)
 | |
|   {
 | |
|     RowGroup& smallSide = smallSideRGs[joinerNum];
 | |
|     RGData tmpData(smallSide, toSend);
 | |
|     Row tmpRow;
 | |
| 
 | |
|     smallSide.setData(&tmpData);
 | |
|     smallSide.initRow(&tmpRow);
 | |
|     smallSide.getRow(0, &tmpRow);
 | |
| 
 | |
|     for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
 | |
|     {
 | |
|       r.setPointer(tSmallSide[i]);
 | |
|       copyRow(r, &tmpRow);
 | |
|     }
 | |
| 
 | |
|     smallSide.setRowCount(toSend);
 | |
|     tmpData.serialize(bs, smallSide.getDataSize());
 | |
|   }
 | |
| 
 | |
|   pos += toSend;
 | |
|   posByJoinerNum[joinerNum] = pos;
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|   ot = ROW_GROUP;
 | |
|   projectionRG = rg;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|   joinedRG = rg;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|   sendAbsRids = false;
 | |
|   sendValues = false;
 | |
|   inputRG = rg;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
 | |
|                                                  const rowgroup::RowGroup& argpm)
 | |
| {
 | |
|   aggregatorPM = aggpm;
 | |
|   aggregateRGPM = argpm;
 | |
| 
 | |
|   if (tJoiners.size() > 0)
 | |
|     sendTupleJoinRowGroupData = true;
 | |
| }
 | |
| 
 | |
| /* OR hacks */
 | |
| void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
 | |
| {
 | |
|   bop = op;
 | |
| 
 | |
|   if (op == BOP_OR && filterCount > 1)
 | |
|   {
 | |
|     for (int i = 1; i < filterCount; ++i)
 | |
|     {
 | |
|       ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());
 | |
| 
 | |
|       if (colcmd != NULL)
 | |
|         colcmd->scan(false);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setForHJ(bool b)
 | |
| {
 | |
|   forHJ = b;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
 | |
|                                             const RowGroup& input)
 | |
| {
 | |
|   fe1 = fe;
 | |
|   fe1Input = input;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
 | |
|                                             const RowGroup& output)
 | |
| {
 | |
|   fe2 = fe;
 | |
|   fe2Output = output;
 | |
| 
 | |
|   if (tJoiners.size() > 0 && PMJoinerCount > 0)
 | |
|     sendTupleJoinRowGroupData = true;
 | |
| }
 | |
| 
 | |
| const string BatchPrimitiveProcessorJL::toMiniString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   int i;
 | |
|   set<string> colset;
 | |
|   string col;
 | |
| 
 | |
|   for (i = 0; i < filterCount; i++)
 | |
|   {
 | |
|     col = filterSteps[i]->getColName();
 | |
|     // FilterCommandJL has two referenced columns, needs special handling.
 | |
|     FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());
 | |
| 
 | |
|     if (filterCmd == NULL)
 | |
|     {
 | |
|       colset.insert(col);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       // is a FilterCommandJL
 | |
|       size_t sep = col.find(',');
 | |
|       colset.insert(col.substr(0, sep));
 | |
| 
 | |
|       if (sep != string::npos)
 | |
|         colset.insert(col.substr(++sep));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   for (i = 0; i < projectCount; i++)
 | |
|   {
 | |
|     col = projectSteps[i]->getColName();
 | |
|     colset.insert(col);
 | |
|   }
 | |
| 
 | |
|   set<string>::iterator iter = colset.begin();
 | |
|   oss << '(' << *iter++;
 | |
| 
 | |
|   while (iter != colset.end())
 | |
|     oss << ',' << *iter++;
 | |
| 
 | |
|   oss << ')';
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
 | |
| {
 | |
|   joinFERG = rg;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
 | |
| {
 | |
|   ISMPacketHeader ism;
 | |
| 
 | |
|   memset((void*)&ism, 0, sizeof(ism));
 | |
|   ism.Command = BATCH_PRIMITIVE_ABORT;
 | |
| 
 | |
|   bs->load((uint8_t*)&ism, sizeof(ism));
 | |
|   *bs << uniqueID;
 | |
| }
 | |
| 
 | |
| void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
 | |
| {
 | |
|   if (aggregatorPM)
 | |
|     aggregateRGPM.setUseStringTable(b);
 | |
|   else if (fe2)
 | |
|     fe2Output.setUseStringTable(b);
 | |
|   else
 | |
|     projectionRG.setUseStringTable(b);
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |