You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1782 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1782 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
   Copyright (C) 2019 MariaDB Corporation
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
//
 | 
						|
// $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
 | 
						|
// C++ Implementation: batchprimitiveprocessor
 | 
						|
//
 | 
						|
// Description:
 | 
						|
//
 | 
						|
//
 | 
						|
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
 | 
						|
//
 | 
						|
// Copyright: See COPYING file that comes with this distribution
 | 
						|
//
 | 
						|
//
 | 
						|
 | 
						|
#include <unistd.h>
 | 
						|
//#define NDEBUG
 | 
						|
#include <cassert>
 | 
						|
#include <stdexcept>
 | 
						|
#include <iostream>
 | 
						|
#include <set>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include <boost/uuid/uuid.hpp>
 | 
						|
#include <boost/uuid/uuid_generators.hpp>
 | 
						|
namespace bu = boost::uuids;
 | 
						|
 | 
						|
#include "calpontsystemcatalog.h"
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
#include "bpp-jl.h"
 | 
						|
#include "jlf_common.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
using namespace rowgroup;
 | 
						|
using namespace joiner;
 | 
						|
 | 
						|
//#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX
 | 
						|
 | 
						|
namespace joblist
 | 
						|
{
 | 
						|
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(ResourceManager* rm)
 | 
						|
 : ot(BPS_ELEMENT_TYPE)
 | 
						|
 , needToSetLBID(true)
 | 
						|
 , count(1)
 | 
						|
 , baseRid(0)
 | 
						|
 , ridCount(0)
 | 
						|
 , needStrValues(false)
 | 
						|
 , wideColumnsWidths(0)
 | 
						|
 , filterCount(0)
 | 
						|
 , projectCount(0)
 | 
						|
 , needRidsAtDelivery(false)
 | 
						|
 , ridMap(0)
 | 
						|
 , sendValues(false)
 | 
						|
 , sendAbsRids(false)
 | 
						|
 , _hasScan(false)
 | 
						|
 , LBIDTrace(false)
 | 
						|
 , tupleLength(0)
 | 
						|
 , status(0)
 | 
						|
 , valueColumn(0)
 | 
						|
 , sendTupleJoinRowGroupData(false)
 | 
						|
 , bop(BOP_AND)
 | 
						|
 , forHJ(false)
 | 
						|
 , threadCount(1)
 | 
						|
 , fJoinerChunkSize(rm->getJlJoinerChunkSize())
 | 
						|
 , hasSmallOuterJoin(false)
 | 
						|
 , _priority(1)
 | 
						|
 , rm_(rm)
 | 
						|
{
 | 
						|
  PMJoinerCount = 0;
 | 
						|
  uuid = bu::nil_generator()();
 | 
						|
}
 | 
						|
 | 
						|
BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
 | 
						|
                                              vector<BRM::LBID_t> lastScannedLBID,
 | 
						|
                                              bool hasAuxCol,
 | 
						|
                                              const std::vector<BRM::EMEntry>& extentsAux,
 | 
						|
                                              execplan::CalpontSystemCatalog::OID oidAux)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  tableOID = scan.tableOid();
 | 
						|
  cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setQueryUuid(scan.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  filterSteps.push_back(cc);
 | 
						|
  filterCount++;
 | 
						|
  _hasScan = true;
 | 
						|
  if (utils::isWide(cc->getWidth()))
 | 
						|
    wideColumnsWidths |= cc->getWidth();
 | 
						|
  idbassert(sessionID == scan.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  tableOID = pcs.tableOid();
 | 
						|
  cc.reset(new PseudoCCJL(pcs));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setQueryUuid(pcs.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  filterSteps.push_back(cc);
 | 
						|
  filterCount++;
 | 
						|
  idbassert(sessionID == pcs.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  tableOID = step.tableOid();
 | 
						|
  cc.reset(new ColumnCommandJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  filterSteps.push_back(cc);
 | 
						|
  filterCount++;
 | 
						|
  if (utils::isWide(cc->getWidth()))
 | 
						|
    wideColumnsWidths |= cc->getWidth();
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  tableOID = step.tableOid();
 | 
						|
 | 
						|
  if (filterCount == 0)
 | 
						|
  {
 | 
						|
    sendAbsRids = true;
 | 
						|
    sendValues = true;
 | 
						|
    absRids.reset(new uint64_t[8192]);
 | 
						|
  }
 | 
						|
 | 
						|
  cc.reset(new DictStepJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
 | 
						|
#if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
 | 
						|
  if (filterSteps.size() > 0)
 | 
						|
  {
 | 
						|
    size_t stepsIndex = filterSteps.size() - 1;
 | 
						|
    SCommand prevCC = filterSteps[stepsIndex];
 | 
						|
    ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
 | 
						|
    DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
 | 
						|
    if (pcc && ccc)
 | 
						|
    {
 | 
						|
      filterSteps[stepsIndex].reset(
 | 
						|
          new ColumnCommandJL(*pcc, *ccc));  // column command will use same filters.
 | 
						|
    }
 | 
						|
  }
 | 
						|
#endif
 | 
						|
  filterSteps.push_back(cc);
 | 
						|
  filterCount++;
 | 
						|
  needStrValues = true;
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  tableOID = step.tableOid();
 | 
						|
  cc.reset(new FilterCommandJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  filterSteps.push_back(cc);
 | 
						|
  filterCount++;
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  cc.reset(new PseudoCCJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setTupleKey(step.tupleId());
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  projectSteps.push_back(cc);
 | 
						|
  colWidths.push_back(cc->getWidth());
 | 
						|
  tupleLength += cc->getWidth();
 | 
						|
  projectCount++;
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  cc.reset(new ColumnCommandJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setTupleKey(step.tupleId());
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  projectSteps.push_back(cc);
 | 
						|
  colWidths.push_back(cc->getWidth());
 | 
						|
  tupleLength += cc->getWidth();
 | 
						|
  projectCount++;
 | 
						|
  if (utils::isWide(cc->getWidth()))
 | 
						|
    wideColumnsWidths |= cc->getWidth();
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  cc.reset(new PassThruCommandJL(step));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setTupleKey(step.tupleId());
 | 
						|
  cc->setQueryUuid(step.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  projectSteps.push_back(cc);
 | 
						|
  colWidths.push_back(cc->getWidth());
 | 
						|
  tupleLength += cc->getWidth();
 | 
						|
  projectCount++;
 | 
						|
 | 
						|
  if (utils::isWide(cc->getWidth()))
 | 
						|
    wideColumnsWidths |= cc->getWidth();
 | 
						|
 | 
						|
  if (filterCount == 0)
 | 
						|
    sendValues = true;
 | 
						|
 | 
						|
  idbassert(sessionID == step.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  cc.reset(new RTSCommandJL(col, dict));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setTupleKey(dict.tupleId());
 | 
						|
  cc->setQueryUuid(col.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  projectSteps.push_back(cc);
 | 
						|
  colWidths.push_back(cc->getWidth());
 | 
						|
  tupleLength += cc->getWidth();
 | 
						|
  projectCount++;
 | 
						|
  needStrValues = true;
 | 
						|
  idbassert(sessionID == col.sessionId());
 | 
						|
  idbassert(sessionID == dict.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
 | 
						|
{
 | 
						|
  SCommand cc;
 | 
						|
 | 
						|
  cc.reset(new RTSCommandJL(p, dict));
 | 
						|
  cc->setBatchPrimitiveProcessor(this);
 | 
						|
  cc->setTupleKey(dict.tupleId());
 | 
						|
  cc->setQueryUuid(p.queryUuid());
 | 
						|
  cc->setStepUuid(uuid);
 | 
						|
  projectSteps.push_back(cc);
 | 
						|
  colWidths.push_back(cc->getWidth());
 | 
						|
  tupleLength += cc->getWidth();
 | 
						|
  projectCount++;
 | 
						|
  needStrValues = true;
 | 
						|
 | 
						|
  if (filterCount == 0)
 | 
						|
  {
 | 
						|
    sendValues = true;
 | 
						|
    sendAbsRids = true;
 | 
						|
    absRids.reset(new uint64_t[8192]);
 | 
						|
  }
 | 
						|
 | 
						|
  idbassert(sessionID == p.sessionId());
 | 
						|
  idbassert(sessionID == dict.sessionId());
 | 
						|
}
 | 
						|
 | 
						|
#if 0
 | 
						|
void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
 | 
						|
{
 | 
						|
    idbassert(sessionID == ds.sessionId());
 | 
						|
 | 
						|
    uint32_t i;
 | 
						|
 | 
						|
    templateTB = ds.fEmptyTableBand;
 | 
						|
    tableOID = ds.fTableOID;
 | 
						|
 | 
						|
    tableColumnCount = templateTB.getColumnCount();
 | 
						|
    idbassert(tableColumnCount > 0);
 | 
						|
    tablePositions.reset(new int[tableColumnCount]);
 | 
						|
 | 
						|
    for (i = 0; i < tableColumnCount; i++)
 | 
						|
        tablePositions[i] = -1;
 | 
						|
 | 
						|
    idbassert(projectCount <= projectSteps.size());
 | 
						|
 | 
						|
    /* In theory, tablePositions maps a column's table position to a projection step
 | 
						|
     -1 means it's a null column */
 | 
						|
    CalpontSystemCatalog::OID oid;
 | 
						|
    int idx;
 | 
						|
 | 
						|
    for (i = 0; i < projectCount; i++)
 | 
						|
    {
 | 
						|
        oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());
 | 
						|
 | 
						|
        if (oid > 0)
 | 
						|
        {
 | 
						|
            idx = templateTB.find(oid);
 | 
						|
 | 
						|
            if (idx >= 0)
 | 
						|
            {
 | 
						|
                tablePositions[idx] = i;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
 | 
						|
                     " in tableband at pjstep idx " << i << endl;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
 | 
						|
                 " doesn't have a valid OID" << endl;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
#endif
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
  // 	rowCounter++;
 | 
						|
 | 
						|
  if (needToSetLBID)
 | 
						|
  {
 | 
						|
    needToSetLBID = false;
 | 
						|
 | 
						|
    for (i = 0; i < filterCount; ++i)
 | 
						|
      filterSteps[i]->setLBID(et.first, dbroot);
 | 
						|
 | 
						|
    for (i = 0; i < projectCount; ++i)
 | 
						|
      projectSteps[i]->setLBID(et.first, dbroot);
 | 
						|
 | 
						|
    baseRid = et.first & 0xffffffffffffe000ULL;
 | 
						|
  }
 | 
						|
 | 
						|
  // TODO: get rid of magics
 | 
						|
  if (sendAbsRids)
 | 
						|
    absRids[ridCount] = et.first;
 | 
						|
  else
 | 
						|
  {
 | 
						|
    relRids[ridCount] = et.first & 0x1fff;    // 8192 rows per logical block
 | 
						|
    ridMap |= 1 << (relRids[ridCount] >> 9);  // LSB -> 0-511, MSB -> 7680-8191
 | 
						|
  }
 | 
						|
 | 
						|
  if (sendValues)
 | 
						|
  {
 | 
						|
    // 		cout << "adding value " << et.second << endl;
 | 
						|
    values[ridCount] = et.second;
 | 
						|
  }
 | 
						|
 | 
						|
  ridCount++;
 | 
						|
  idbassert(ridCount <= 8192);
 | 
						|
}
 | 
						|
 | 
						|
#if 0
 | 
						|
void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
 | 
						|
{
 | 
						|
    uint32_t i;
 | 
						|
    rowgroup::Row r;
 | 
						|
 | 
						|
    inputRG.setData(rg.getData());
 | 
						|
    inputRG.initRow(&r);
 | 
						|
    inputRG.getRow(0, &r);
 | 
						|
 | 
						|
    if (needToSetLBID)
 | 
						|
    {
 | 
						|
        needToSetLBID = false;
 | 
						|
 | 
						|
        for (i = 0; i < filterCount; ++i)
 | 
						|
            filterSteps[i]->setLBID(r.getRid(), 1);
 | 
						|
 | 
						|
        for (i = 0; i < projectCount; ++i)
 | 
						|
            projectSteps[i]->setLBID(r.getRid(), 1);
 | 
						|
 | 
						|
        baseRid = r.getRid() & 0xffffffffffffe000ULL;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
 | 
						|
{
 | 
						|
  if (filterCount == 0)
 | 
						|
    throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");
 | 
						|
 | 
						|
  addElementType(ElementType(et.first, et.first), dbroot);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * When the output type is ElementType, the messages have the following format:
 | 
						|
 *
 | 
						|
 * ISMPacketHeader (ignored)
 | 
						|
 * PrimitiveHeader (ignored)
 | 
						|
 * --
 | 
						|
 * If the BPP starts with a scan
 | 
						|
 *    casual partitioning data from the scanned column
 | 
						|
 * Base Rid for the returned logical block
 | 
						|
 * rid count
 | 
						|
 * (rid count)x 16-bit rids
 | 
						|
 * (rid count)x 64-bit values
 | 
						|
 * If there's a join performed by this BPP
 | 
						|
 *    pre-Join rid count   (for logging purposes only)
 | 
						|
 *    small-side new match count
 | 
						|
 *    (match count)x ElementTypes
 | 
						|
 * cached IO count
 | 
						|
 * physical IO count
 | 
						|
 * block touches
 | 
						|
 */
 | 
						|
 | 
						|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | 
						|
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
 | 
						|
                                                uint64_t* lbid, int64_t* min, int64_t* max,
 | 
						|
                                                uint32_t* cachedIO, uint32_t* physIO,
 | 
						|
                                                uint32_t* touchedBlocks) const
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
  uint16_t l_count;
 | 
						|
  uint64_t l_baseRid;
 | 
						|
  uint16_t* rids;
 | 
						|
  uint64_t* vals;
 | 
						|
  uint8_t* buf;
 | 
						|
  uint64_t tmp64;
 | 
						|
  uint8_t tmp8;
 | 
						|
 | 
						|
  /* skip the header */
 | 
						|
  idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | 
						|
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | 
						|
 | 
						|
  if (_hasScan)
 | 
						|
  {
 | 
						|
    in >> tmp8;
 | 
						|
    *validCPData = (tmp8 != 0);
 | 
						|
 | 
						|
    if (*validCPData)
 | 
						|
    {
 | 
						|
      in >> *lbid;
 | 
						|
 | 
						|
      in >> tmp64;
 | 
						|
      *min = (int64_t)tmp64;
 | 
						|
      in >> tmp64;
 | 
						|
      *max = (int64_t)tmp64;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      in >> *lbid;
 | 
						|
  }
 | 
						|
 | 
						|
  in >> l_baseRid;
 | 
						|
  in >> l_count;
 | 
						|
  // 	rowsProcessed += l_count;
 | 
						|
  idbassert(l_count <= 8192);
 | 
						|
  out->resize(l_count);
 | 
						|
 | 
						|
  buf = (uint8_t*)in.buf();
 | 
						|
 | 
						|
  rids = (uint16_t*)buf;
 | 
						|
  vals = (uint64_t*)(buf + (l_count << 1));
 | 
						|
  idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
 | 
						|
  in.advance((l_count << 1) + (l_count << 3));
 | 
						|
 | 
						|
  for (i = 0; i < l_count; ++i)
 | 
						|
  {
 | 
						|
    (*out)[i].first = rids[i] + l_baseRid;
 | 
						|
    // 		if (tableOID >= 3000)
 | 
						|
    // 			idbassert((*out)[i].first > 1023);
 | 
						|
    (*out)[i].second = vals[i];
 | 
						|
  }
 | 
						|
 | 
						|
  in >> *cachedIO;
 | 
						|
  in >> *physIO;
 | 
						|
  in >> *touchedBlocks;
 | 
						|
  // 	cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
 | 
						|
  // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
 | 
						|
  idbassert(in.length() == 0);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * When the output type is StringElementType the messages have the following format:
 | 
						|
 *
 | 
						|
 * ISMPacketHeader  (ignored)
 | 
						|
 * PrimitiveHeader  (ignored)
 | 
						|
 * ---
 | 
						|
 * If the BPP starts with a scan
 | 
						|
 *     Casual partitioning data from the column scanned
 | 
						|
 * Rid count
 | 
						|
 * (rid count)x 64-bit absolute rids
 | 
						|
 * (rid count)x serialized strings
 | 
						|
 * cached IO count
 | 
						|
 * physical IO count
 | 
						|
 * blocks touched
 | 
						|
 */
 | 
						|
 | 
						|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | 
						|
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
 | 
						|
                                                      bool* validCPData, uint64_t* lbid, int64_t* min,
 | 
						|
                                                      int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
 | 
						|
                                                      uint32_t* touchedBlocks) const
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
  uint16_t l_count;
 | 
						|
  uint64_t* l_absRids;
 | 
						|
  uint64_t tmp64;
 | 
						|
  uint8_t tmp8;
 | 
						|
 | 
						|
  // 	cout << "get String ETs uniqueID\n";
 | 
						|
  /* skip the header */
 | 
						|
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | 
						|
 | 
						|
  if (_hasScan)
 | 
						|
  {
 | 
						|
    in >> tmp8;
 | 
						|
    *validCPData = (tmp8 != 0);
 | 
						|
 | 
						|
    if (*validCPData)
 | 
						|
    {
 | 
						|
      in >> *lbid;
 | 
						|
      in >> tmp64;
 | 
						|
      *min = (int64_t)tmp64;
 | 
						|
      in >> tmp64;
 | 
						|
      *max = (int64_t)tmp64;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      in >> *lbid;
 | 
						|
  }
 | 
						|
 | 
						|
  in >> l_count;
 | 
						|
  // 	cout << "parsing " << l_count << " strings\n";
 | 
						|
  l_absRids = (uint64_t*)in.buf();
 | 
						|
  out->resize(l_count);
 | 
						|
  in.advance(l_count << 3);
 | 
						|
 | 
						|
  for (i = 0; i < l_count; ++i)
 | 
						|
  {
 | 
						|
    (*out)[i].first = l_absRids[i];
 | 
						|
    in >> (*out)[i].second;
 | 
						|
  }
 | 
						|
 | 
						|
  in >> *cachedIO;
 | 
						|
  in >> *physIO;
 | 
						|
  in >> *touchedBlocks;
 | 
						|
  // 	cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
 | 
						|
  // 		(int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
 | 
						|
  idbassert(in.length() == 0);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * When the output type is Tuples, the input message format is the same
 | 
						|
 * as when the output type is TableBands
 | 
						|
 */
 | 
						|
 | 
						|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
 | 
						|
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
 | 
						|
                                          bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
 | 
						|
                                          uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
 | 
						|
{
 | 
						|
  uint32_t i, j, pos, len;
 | 
						|
  uint16_t l_rowCount;
 | 
						|
  uint64_t l_baseRid;
 | 
						|
  uint16_t* l_relRids;
 | 
						|
  uint64_t absRids[8192];
 | 
						|
  // const uint8_t* columnData[projectCount];
 | 
						|
  const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
 | 
						|
  memset(columnData, 0, projectCount * sizeof(uint8_t*));
 | 
						|
  const uint8_t* buf;
 | 
						|
  // uint32_t colLengths[projectCount];
 | 
						|
  uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
 | 
						|
  uint64_t tmp64;
 | 
						|
  uint8_t tmp8;
 | 
						|
 | 
						|
  // 	cout << "getTuples msg is " << in.length() << " bytes\n";
 | 
						|
  /* skip the header */
 | 
						|
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | 
						|
 | 
						|
  if (_hasScan)
 | 
						|
  {
 | 
						|
    in >> tmp8;
 | 
						|
    *validCPData = (tmp8 != 0);
 | 
						|
 | 
						|
    if (*validCPData)
 | 
						|
    {
 | 
						|
      in >> *lbid;
 | 
						|
      in >> tmp64;
 | 
						|
      *min = (int64_t)tmp64;
 | 
						|
      in >> tmp64;
 | 
						|
      *max = (int64_t)tmp64;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      in >> *lbid;
 | 
						|
  }
 | 
						|
 | 
						|
  in >> l_rowCount;
 | 
						|
 | 
						|
  // 	cout << "read " << l_rowCount << " rows\n";
 | 
						|
 | 
						|
  if (needRidsAtDelivery)
 | 
						|
  {
 | 
						|
    in >> l_baseRid;
 | 
						|
    l_relRids = (uint16_t*)in.buf();
 | 
						|
 | 
						|
    for (i = 0; i < l_rowCount; i++)
 | 
						|
      absRids[i] = l_relRids[i] + l_baseRid;
 | 
						|
 | 
						|
    in.advance(l_rowCount << 1);
 | 
						|
  }
 | 
						|
 | 
						|
  /* Set up pointers to the column data */
 | 
						|
  pos = 0;
 | 
						|
  buf = in.buf();
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; i++)
 | 
						|
  {
 | 
						|
    colLengths[i] = *((uint32_t*)&buf[pos]);
 | 
						|
    pos += 4;
 | 
						|
    columnData[i] = &buf[pos];
 | 
						|
    // 		cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
 | 
						|
    pos += colLengths[i];
 | 
						|
    idbassert(pos < in.length());
 | 
						|
  }
 | 
						|
 | 
						|
  in.advance(pos);
 | 
						|
 | 
						|
  out->resize(l_rowCount);
 | 
						|
 | 
						|
  for (i = 0; i < l_rowCount; i++)
 | 
						|
  {
 | 
						|
    (*out)[i].first = absRids[i];
 | 
						|
    (*out)[i].second = new char[tupleLength];
 | 
						|
 | 
						|
    for (j = 0, pos = 0; j < projectCount; j++)
 | 
						|
    {
 | 
						|
      idbassert(pos + colWidths[j] <= tupleLength);
 | 
						|
 | 
						|
      if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
 | 
						|
      {
 | 
						|
        len = *((uint32_t*)columnData[j]);
 | 
						|
        columnData[j] += 4;
 | 
						|
        memcpy(&(*out)[i].second[pos], columnData[j], len);
 | 
						|
        pos += len;
 | 
						|
        columnData[j] += len;
 | 
						|
 | 
						|
        // insert padding...
 | 
						|
        memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
 | 
						|
        pos += colWidths[j] - len;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        switch (colWidths[j])
 | 
						|
        {
 | 
						|
          case 8:
 | 
						|
            *((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
 | 
						|
            columnData[j] += 8;
 | 
						|
            pos += 8;
 | 
						|
            break;
 | 
						|
 | 
						|
          case 4:
 | 
						|
            *((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
 | 
						|
            columnData[j] += 4;
 | 
						|
            pos += 4;
 | 
						|
            break;
 | 
						|
 | 
						|
          case 2:
 | 
						|
            *((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
 | 
						|
            columnData[j] += 2;
 | 
						|
            pos += 4;
 | 
						|
            break;
 | 
						|
 | 
						|
          case 1:
 | 
						|
            (*out)[i].second[pos] = (char)*columnData[j];
 | 
						|
            columnData[j]++;
 | 
						|
            pos++;
 | 
						|
            break;
 | 
						|
 | 
						|
          // TODO MCOL-641
 | 
						|
          case 16:
 | 
						|
            // fallthrough
 | 
						|
          default:
 | 
						|
            cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
 | 
						|
            throw logic_error("BPP::getTuples(): bad column width");
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  in >> *cachedIO;
 | 
						|
  in >> *physIO;
 | 
						|
  in >> *touchedBlocks;
 | 
						|
  idbassert(in.length() == 0);
 | 
						|
}
 | 
						|
 | 
						|
bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
 | 
						|
{
 | 
						|
  const uint8_t* data = in.buf();
 | 
						|
  uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);  // skip the headers
 | 
						|
 | 
						|
  ISMPacketHeader* hdr = (ISMPacketHeader*)(data);
 | 
						|
 | 
						|
  if (_hasScan && in.length() > offset)
 | 
						|
  {
 | 
						|
    // This is a legitimate error message sent by PrimProc
 | 
						|
    // so we need to return to allow upper layer to throw an error
 | 
						|
    // if needed.
 | 
						|
    if (hdr->Status > 0)
 | 
						|
    {
 | 
						|
      return true;
 | 
						|
    }
 | 
						|
 | 
						|
    if (data[offset] != 0)
 | 
						|
      offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
 | 
						|
                1;  // skip the CP data with wide min/max values (16/32 bytes each). we also skip
 | 
						|
                    // cpFromDictScan flag.
 | 
						|
    else
 | 
						|
      offset += CP_FLAG_AND_LBID;  // skip only the "valid CP data" & LBID bytes
 | 
						|
  }
 | 
						|
 | 
						|
  // Throw b/c PP throws and sends here error msg.
 | 
						|
  // See BatchPrimitiveProcessor::writeErrorMsg() for details.
 | 
						|
  // The inversion of the assert used here previously.
 | 
						|
  if (in.length() <= offset)
 | 
						|
  {
 | 
						|
    if (hdr->Status > 0)
 | 
						|
    {
 | 
						|
      throw std::runtime_error(" an exception originally thrown by PrimProc: ");
 | 
						|
    }
 | 
						|
    throw std::runtime_error(
 | 
						|
        " an exception because there is not enough \
 | 
						|
data in the Primitive message from PrimProc.");
 | 
						|
  }
 | 
						|
 | 
						|
  return (data[offset] != 0);
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
 | 
						|
{
 | 
						|
  RGData rgData;
 | 
						|
  uint32_t count, i;
 | 
						|
 | 
						|
  *in >> count;
 | 
						|
 | 
						|
  for (i = 0; i < count; i++)
 | 
						|
  {
 | 
						|
    rgData.deserialize(*in, true);
 | 
						|
    out->push_back(rgData);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
 | 
						|
                                                uint64_t* lbid, bool* fromDictScan, int128_t* min,
 | 
						|
                                                int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
 | 
						|
                                                uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
 | 
						|
                                                bool* hasWideColumn,
 | 
						|
                                                const execplan::CalpontSystemCatalog::ColType& colType) const
 | 
						|
{
 | 
						|
  uint64_t tmp64;
 | 
						|
  int128_t tmp128;
 | 
						|
  uint8_t tmp8;
 | 
						|
  RGData rgData;
 | 
						|
  uint32_t rowCount;
 | 
						|
  RowGroup& org = primprocRG[threadID];
 | 
						|
 | 
						|
  out->clear();
 | 
						|
 | 
						|
  if (in.length() == 0)
 | 
						|
  {
 | 
						|
    // done, return an empty RG
 | 
						|
    rgData = RGData(org, 0U);
 | 
						|
    org.setData(&rgData);
 | 
						|
    org.resetRowGroup(0);
 | 
						|
    out->push_back(rgData);
 | 
						|
    *cachedIO = 0;
 | 
						|
    *physIO = 0;
 | 
						|
    *touchedBlocks = 0;
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  /* skip the header */
 | 
						|
  in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
 | 
						|
 | 
						|
  if (_hasScan)
 | 
						|
  {
 | 
						|
    in >> tmp8;
 | 
						|
    *validCPData = (tmp8 != 0);
 | 
						|
 | 
						|
    if (*validCPData)
 | 
						|
    {
 | 
						|
      in >> *lbid;
 | 
						|
      in >> tmp8;
 | 
						|
      *fromDictScan = tmp8 != 0;
 | 
						|
      in >> tmp8;
 | 
						|
      *hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
 | 
						|
      if (UNLIKELY(*hasWideColumn))
 | 
						|
      {
 | 
						|
        idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
 | 
						|
        if (LIKELY(colType.isWideDecimalType()))
 | 
						|
        {
 | 
						|
          in >> tmp128;
 | 
						|
          *min = tmp128;
 | 
						|
          in >> tmp128;
 | 
						|
          *max = tmp128;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          std::ostringstream oss;
 | 
						|
          oss << __func__ << " WARNING!!! Not implemented for the data type ";
 | 
						|
          oss << colType.colDataType << std::endl;
 | 
						|
          std::cout << oss.str();
 | 
						|
          idbassert(false);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        in >> tmp64;
 | 
						|
        *min = static_cast<int128_t>(tmp64);
 | 
						|
        in >> tmp64;
 | 
						|
        *max = static_cast<int128_t>(tmp64);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
      in >> *lbid;
 | 
						|
  }
 | 
						|
 | 
						|
  in >> tmp8;
 | 
						|
  *countThis = (tmp8 != 0);
 | 
						|
 | 
						|
  /* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
 | 
						|
   * For later... */
 | 
						|
  if (aggregatorPM)
 | 
						|
  {
 | 
						|
    deserializeAggregateResult(&in, out);
 | 
						|
    // for (uint32_t z = 0; z < out->size(); z++) {
 | 
						|
    //	org.setData(&(*out)[z]);
 | 
						|
    //	cout << "BPPJL: " << org.toString() << endl;
 | 
						|
    //}
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    rgData.deserialize(in, true);
 | 
						|
    out->push_back(rgData);
 | 
						|
    org.setData(&rgData);
 | 
						|
 | 
						|
    rowCount = org.getRowCount();
 | 
						|
 | 
						|
    bool pmSendsMatchesAnyway =
 | 
						|
        (hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));
 | 
						|
 | 
						|
    if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
 | 
						|
    {
 | 
						|
      std::shared_ptr<vector<uint32_t>[]> joinResults;
 | 
						|
      uint32_t i, j;
 | 
						|
 | 
						|
      if (pmSendsMatchesAnyway)
 | 
						|
      {
 | 
						|
        uint16_t joinRowCount;
 | 
						|
        in >> joinRowCount;
 | 
						|
        rowCount = joinRowCount;
 | 
						|
      }
 | 
						|
 | 
						|
      for (j = 0; j < PMJoinerCount; j++)
 | 
						|
      {
 | 
						|
        /* Reuse the result space if possible */
 | 
						|
        joinResults = tJoiners[j]->getPMJoinArrays(threadID);
 | 
						|
 | 
						|
        if (joinResults.get() == NULL)
 | 
						|
        {
 | 
						|
          auto v = new vector<uint32_t>[8192];
 | 
						|
          joinResults.reset(v);
 | 
						|
          tJoiners[j]->setPMJoinResults(joinResults, threadID);
 | 
						|
        }
 | 
						|
 | 
						|
        for (i = 0; i < rowCount; i++)
 | 
						|
          deserializeInlineVector<uint32_t>(in, joinResults[i]);
 | 
						|
 | 
						|
        if (tJoiners[j]->smallOuterJoin())
 | 
						|
          tJoiners[j]->markMatches(threadID, rowCount);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (*countThis)
 | 
						|
  {
 | 
						|
    // 		cout << "grabbing io stats\n";
 | 
						|
    in >> *cachedIO;
 | 
						|
    in >> *physIO;
 | 
						|
    in >> *touchedBlocks;
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    *cachedIO = 0;
 | 
						|
    *physIO = 0;
 | 
						|
    *touchedBlocks = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  idbassert(in.length() == 0);
 | 
						|
}
 | 
						|
 | 
						|
RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
 | 
						|
{
 | 
						|
  RGData ret;
 | 
						|
  rowgroup::RowGroup rg(projectionRG);
 | 
						|
 | 
						|
  ret = RGData(rg, 0U);
 | 
						|
  rg.setData(&ret);
 | 
						|
  // rg.convertToInlineDataInPlace();
 | 
						|
  rg.resetRowGroup(0);
 | 
						|
  rg.setStatus(error);
 | 
						|
  return ret;
 | 
						|
  // return ret->rowData;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::reset()
 | 
						|
{
 | 
						|
  ridCount = 0;
 | 
						|
  ridMap = 0;
 | 
						|
  needToSetLBID = true;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
 | 
						|
  dbRoot = scannedExtent.dbRoot;
 | 
						|
  baseRid = rowgroup::convertToRid(
 | 
						|
      scannedExtent.partitionNum, scannedExtent.segmentNum,
 | 
						|
      scannedExtent.blockOffset / (scannedExtent.range.size * 1024),  // the extent #
 | 
						|
      (l - scannedExtent.range.start) / scannedExtent.range.size);    // the logical block #
 | 
						|
 | 
						|
  /*
 | 
						|
  cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
 | 
						|
              << " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
 | 
						|
              scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
 | 
						|
              " blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
 | 
						|
  */
 | 
						|
 | 
						|
  for (i = 0; i < filterCount; ++i)
 | 
						|
    filterSteps[i]->setLBID(baseRid, dbRoot);
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; ++i)
 | 
						|
    projectSteps[i]->setLBID(baseRid, dbRoot);
 | 
						|
}
 | 
						|
 | 
						|
string BatchPrimitiveProcessorJL::toString() const
 | 
						|
{
 | 
						|
  ostringstream ret;
 | 
						|
  uint32_t i;
 | 
						|
 | 
						|
  ret << "BatchPrimitiveProcessorJL:" << endl;
 | 
						|
 | 
						|
  if (!_hasScan)
 | 
						|
  {
 | 
						|
    if (sendValues)
 | 
						|
      ret << "   -- serializing values" << endl;
 | 
						|
 | 
						|
    if (sendAbsRids)
 | 
						|
      ret << "   -- serializing absolute rids" << endl;
 | 
						|
    else
 | 
						|
      ret << "   -- serializing relative rids" << endl;
 | 
						|
  }
 | 
						|
  else
 | 
						|
    ret << "   -- scan driven" << endl;
 | 
						|
 | 
						|
  ret << "   " << (int)filterCount << " filter steps:\n";
 | 
						|
 | 
						|
  for (i = 0; i < filterCount; i++)
 | 
						|
    ret << "      " << filterSteps[i]->toString() << endl;
 | 
						|
 | 
						|
  ret << "   " << (int)projectCount << " projection steps:\n";
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; i++)
 | 
						|
    ret << "      " << projectSteps[i]->toString() << endl;
 | 
						|
 | 
						|
  return ret.str();
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The creation messages have the following format:
 | 
						|
 *
 | 
						|
 * ISMPacketHeader (necessary for DEC and ReadThread classes)
 | 
						|
 * ---
 | 
						|
 * output type (ElementType, StringElementType, etc)
 | 
						|
 * version #
 | 
						|
 * transaction #
 | 
						|
 * session #  (unnecessary except for debugging)
 | 
						|
 * step ID    ( same )
 | 
						|
 * unique ID  (uniquely identifies the DEC queue on the UM)
 | 
						|
 * 8-bit flags
 | 
						|
 * if there's a join...
 | 
						|
 *      # of elements in the Joiner object
 | 
						|
 *       a flag whether or not to match every element (for inner joins)
 | 
						|
 * filter step count
 | 
						|
 * (filter count)x serialized Commands
 | 
						|
 * projection step count
 | 
						|
 * (projection count)x serialized Commands
 | 
						|
 */
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
 | 
						|
{
 | 
						|
  ISMPacketHeader ism;
 | 
						|
  uint32_t i;
 | 
						|
  uint16_t flags = 0;
 | 
						|
 | 
						|
  ism.Command = BATCH_PRIMITIVE_CREATE;
 | 
						|
 | 
						|
  bs.load((uint8_t*)&ism, sizeof(ism));
 | 
						|
  bs << (uint8_t)ot;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)txnID;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
  bs << uniqueID;
 | 
						|
  bs << versionInfo;
 | 
						|
 | 
						|
  if (needStrValues)
 | 
						|
    flags |= NEED_STR_VALUES;
 | 
						|
 | 
						|
  if (sendAbsRids)
 | 
						|
    flags |= GOT_ABS_RIDS;
 | 
						|
 | 
						|
  if (sendValues)
 | 
						|
    flags |= GOT_VALUES;
 | 
						|
 | 
						|
  if (LBIDTrace)
 | 
						|
    flags |= LBID_TRACE;
 | 
						|
 | 
						|
  if (needRidsAtDelivery)
 | 
						|
    flags |= SEND_RIDS_AT_DELIVERY;
 | 
						|
 | 
						|
  if (tJoiners.size() > 0)
 | 
						|
    flags |= HAS_JOINER;
 | 
						|
 | 
						|
  if (sendTupleJoinRowGroupData)
 | 
						|
    flags |= JOIN_ROWGROUP_DATA;
 | 
						|
 | 
						|
  if (wideColumnsWidths)
 | 
						|
    flags |= HAS_WIDE_COLUMNS;
 | 
						|
 | 
						|
  bs << flags;
 | 
						|
 | 
						|
  if (wideColumnsWidths)
 | 
						|
    bs << wideColumnsWidths;
 | 
						|
 | 
						|
  bs << bop;
 | 
						|
  bs << (uint8_t)(forHJ ? 1 : 0);
 | 
						|
 | 
						|
  if (ot == ROW_GROUP)
 | 
						|
  {
 | 
						|
    bs << projectionRG;
 | 
						|
    // 		cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;
 | 
						|
 | 
						|
    /* F&E serialization */
 | 
						|
    if (fe1)
 | 
						|
    {
 | 
						|
      bs << (uint8_t)1;
 | 
						|
      bs << *fe1;
 | 
						|
      bs << fe1Input;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      bs << (uint8_t)0;
 | 
						|
 | 
						|
    if (fe2)
 | 
						|
    {
 | 
						|
      bs << (uint8_t)1;
 | 
						|
      bs << *fe2;
 | 
						|
      bs << fe2Output;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      bs << (uint8_t)0;
 | 
						|
  }
 | 
						|
 | 
						|
  /* if HAS_JOINER, send the init params */
 | 
						|
  if (flags & HAS_JOINER)
 | 
						|
  {
 | 
						|
    bs << (uint32_t)maxPmJoinResultCount;
 | 
						|
    if (ot == ROW_GROUP)
 | 
						|
    {
 | 
						|
      idbassert(tJoiners.size() > 0);
 | 
						|
      bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;
 | 
						|
 | 
						|
      bool atLeastOneFE = false;
 | 
						|
#ifdef JLF_DEBUG
 | 
						|
      cout << "PMJoinerCount = " << PMJoinerCount << endl;
 | 
						|
#endif
 | 
						|
 | 
						|
      bool smallSideRGSent = false;
 | 
						|
      for (i = 0; i < PMJoinerCount; i++)
 | 
						|
      {
 | 
						|
        bs << (uint32_t)tJoiners[i]->size();
 | 
						|
        bs << tJoiners[i]->getJoinType();
 | 
						|
 | 
						|
        // bs << (uint64_t) tJoiners[i]->smallNullValue();
 | 
						|
 | 
						|
        bs << (uint8_t)tJoiners[i]->isTypelessJoin();
 | 
						|
 | 
						|
        if (tJoiners[i]->hasFEFilter())
 | 
						|
        {
 | 
						|
          atLeastOneFE = true;
 | 
						|
#ifdef JLF_DEBUG
 | 
						|
          cout << "serializing join FE object\n";
 | 
						|
#endif
 | 
						|
          bs << *tJoiners[i]->getFcnExpFilter();
 | 
						|
        }
 | 
						|
 | 
						|
        if (!tJoiners[i]->isTypelessJoin())
 | 
						|
        {
 | 
						|
          bs << (uint64_t)tJoiners[i]->smallNullValue();
 | 
						|
          bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
 | 
						|
          // cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
 | 
						|
          bs << (uint32_t)tJoiners[i]->getKeyLength();
 | 
						|
          // MCOL-4173 Notify PP if smallSide and largeSide have different column widths
 | 
						|
          // and send smallSide RG to PP.
 | 
						|
          bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
 | 
						|
          bs << (uint8_t)joinHasSkewedKeyColumn;
 | 
						|
          if (!smallSideRGSent && joinHasSkewedKeyColumn)
 | 
						|
          {
 | 
						|
            idbassert(!smallSideRGs.empty());
 | 
						|
            bs << smallSideRGs[0];
 | 
						|
            serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
 | 
						|
            smallSideRGSent = true;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      if (atLeastOneFE)
 | 
						|
        bs << joinFERG;
 | 
						|
 | 
						|
      if (sendTupleJoinRowGroupData)
 | 
						|
      {
 | 
						|
#ifdef JLF_DEBUG
 | 
						|
        cout << "sending smallside data\n";
 | 
						|
#endif
 | 
						|
        serializeVector<RowGroup>(bs, smallSideRGs);
 | 
						|
        bs << largeSideRG;
 | 
						|
        bs << joinedRG;  // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
 | 
						|
                         // 				cout << "joined RG: " << joinedRG.toString() << endl;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  bs << filterCount;
 | 
						|
 | 
						|
  for (i = 0; i < filterCount; ++i)
 | 
						|
  {
 | 
						|
    // 		cout << "serializing step " << i << endl;
 | 
						|
    filterSteps[i]->createCommand(bs);
 | 
						|
  }
 | 
						|
 | 
						|
  bs << projectCount;
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; ++i)
 | 
						|
    projectSteps[i]->createCommand(bs);
 | 
						|
 | 
						|
  // aggregate step only when output is row group
 | 
						|
  if (ot == ROW_GROUP)
 | 
						|
  {
 | 
						|
    if (aggregatorPM.get() != NULL)
 | 
						|
    {
 | 
						|
      bs << (uint8_t)1;
 | 
						|
      bs << aggregateRGPM;
 | 
						|
      bs << *(aggregatorPM.get());
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      bs << (uint8_t)0;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // decide which rowgroup is received by PrimProc
 | 
						|
  if (ot == ROW_GROUP)
 | 
						|
  {
 | 
						|
    primprocRG.reset(new RowGroup[threadCount]);
 | 
						|
 | 
						|
    for (uint32_t i = 0; i < threadCount; i++)
 | 
						|
      if (aggregatorPM)
 | 
						|
        primprocRG[i] = aggregateRGPM;
 | 
						|
      else if (fe2)
 | 
						|
        primprocRG[i] = fe2Output;
 | 
						|
      // This shouldn't be necessary. As of 2-17-14, PrimProc
 | 
						|
      // will only send joined results if fe2 || aggregatorPM,
 | 
						|
      // so it will never send back data for joinedRG.
 | 
						|
      // else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
 | 
						|
      //	primprocRG[i] = joinedRG;
 | 
						|
      else
 | 
						|
        primprocRG[i] = projectionRG;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The BPP Run messages have the following format:
 | 
						|
 *
 | 
						|
 * ISMPacketHeader
 | 
						|
 *    -- Interleave field is used for DEC interleaving data
 | 
						|
 *    -- Size field is used for the relative weight to process the message
 | 
						|
 * -----
 | 
						|
 * Session ID
 | 
						|
 * Step ID
 | 
						|
 * Unique ID
 | 
						|
 * Sequence #
 | 
						|
 * Iteration count
 | 
						|
 * Rid count
 | 
						|
 * If absolute rids are sent
 | 
						|
 *    (rid count)x 64-bit absolute rids
 | 
						|
 * else
 | 
						|
 *    RID range bitmap
 | 
						|
 *    base RID
 | 
						|
 *    (rid count)x 16-bit relative rids
 | 
						|
 * If values are sent
 | 
						|
 *    (rid count)x 64-bit values
 | 
						|
 * (filter count)x run msgs for filter Commands
 | 
						|
 * (projection count)x run msgs for projection Commands
 | 
						|
 */
 | 
						|
 | 
						|
// The deser counterpart function is BPP::resetBPP
 | 
						|
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
 | 
						|
{
 | 
						|
  ISMPacketHeader ism;
 | 
						|
  uint32_t i;
 | 
						|
 | 
						|
  /* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
 | 
						|
  save bandwidth.  They're completely unused otherwise.  We need to throw out all unused
 | 
						|
  fields of every defined header. */
 | 
						|
 | 
						|
  bs.restart();
 | 
						|
 | 
						|
  memset((void*)&ism, 0, sizeof(ism));
 | 
						|
  ism.Command = BATCH_PRIMITIVE_RUN;
 | 
						|
 | 
						|
  // TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
 | 
						|
  // defined for each PM.
 | 
						|
  ism.Interleave = pmNum;
 | 
						|
  //	ism.Interleave = pmNum - 1;
 | 
						|
 | 
						|
  /* ... and the Size field is used for the "weight" of processing a BPP
 | 
						|
    where 1 is one Command on one logical block. */
 | 
						|
  ism.Size = count * (filterCount + projectCount);
 | 
						|
 | 
						|
  bs.append((uint8_t*)&ism, sizeof(ism));
 | 
						|
 | 
						|
  /* The next 4 vars are for BPPSeeder; BPP itself skips them */
 | 
						|
  bs << sessionID;
 | 
						|
  bs << stepID;
 | 
						|
  bs << uniqueID;
 | 
						|
  bs << _priority;
 | 
						|
 | 
						|
  // The weight is used by PrimProc thread pool algo
 | 
						|
  uint32_t weight = calculateBPPWeight();
 | 
						|
  bs << weight;
 | 
						|
 | 
						|
  bs << dbRoot;
 | 
						|
  bs << count;
 | 
						|
  uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
 | 
						|
  bs << sentByEM;
 | 
						|
 | 
						|
  if (_hasScan)
 | 
						|
  {
 | 
						|
    idbassert(ridCount == 0);
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
 | 
						|
  }
 | 
						|
 | 
						|
  bs << ridCount;
 | 
						|
 | 
						|
  if (sendAbsRids)
 | 
						|
    bs.append((uint8_t*)absRids.get(), ridCount << 3);
 | 
						|
  else
 | 
						|
  {
 | 
						|
    bs << ridMap;
 | 
						|
    bs << baseRid;
 | 
						|
    bs.append((uint8_t*)relRids, ridCount << 1);
 | 
						|
  }
 | 
						|
 | 
						|
  if (sendValues)
 | 
						|
    bs.append((uint8_t*)values, ridCount << 3);
 | 
						|
 | 
						|
  for (i = 0; i < filterCount; i++)
 | 
						|
    filterSteps[i]->runCommand(bs);
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; i++)
 | 
						|
    projectSteps[i]->runCommand(bs);
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
 | 
						|
{
 | 
						|
  ISMPacketHeader ism;
 | 
						|
  bs.restart();
 | 
						|
 | 
						|
  memset((void*)&ism, 0, sizeof(ism));
 | 
						|
  ism.Command = BATCH_PRIMITIVE_RUN;
 | 
						|
  ism.Status = status;
 | 
						|
  ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);
 | 
						|
 | 
						|
  bs.append((uint8_t*)&ism, sizeof(ism));
 | 
						|
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
  bs << uniqueID;
 | 
						|
  bs << count;
 | 
						|
  bs << ridCount;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
 | 
						|
{
 | 
						|
  ISMPacketHeader ism;
 | 
						|
 | 
						|
  // 	if (!(sessionID & 0x80000000))
 | 
						|
  // 		cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
 | 
						|
  // 			<< rowsProcessed << " rows" << endl;
 | 
						|
 | 
						|
  memset((void*)&ism, 0, sizeof(ism));
 | 
						|
  ism.Command = BATCH_PRIMITIVE_DESTROY;
 | 
						|
 | 
						|
  bs.append((uint8_t*)&ism, sizeof(ism));
 | 
						|
  /* XXXPAT: We can get rid of sessionID and stepID;
 | 
						|
  it's there for easier debugging only */
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
  bs << uniqueID;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
 | 
						|
{
 | 
						|
  pos = 0;
 | 
						|
  joinerNum = 0;
 | 
						|
  tJoiners = j;
 | 
						|
 | 
						|
  PMJoinerCount = 0;
 | 
						|
  tlKeyLens.reset(new uint32_t[tJoiners.size()]);
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < tJoiners.size(); i++)
 | 
						|
  {
 | 
						|
    if (tJoiners[i]->inPM())
 | 
						|
    {
 | 
						|
      PMJoinerCount++;
 | 
						|
      smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
 | 
						|
      smallSideRGs.push_back(tJoiners[i]->getSmallRG());
 | 
						|
 | 
						|
      if (tJoiners[i]->isTypelessJoin())
 | 
						|
        tlKeyLens[i] = tJoiners[i]->getKeyLength();
 | 
						|
 | 
						|
      if (tJoiners[i]->hasFEFilter())
 | 
						|
        sendTupleJoinRowGroupData = true;
 | 
						|
 | 
						|
      if (tJoiners[i]->smallOuterJoin())
 | 
						|
        hasSmallOuterJoin = true;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  largeSideRG = tJoiners[0]->getLargeRG();
 | 
						|
 | 
						|
  if (aggregatorPM || fe2)
 | 
						|
  {
 | 
						|
    sendTupleJoinRowGroupData = true;
 | 
						|
#ifdef JLF_DEBUG
 | 
						|
    cout << "will send small side row data\n";
 | 
						|
#endif
 | 
						|
  }
 | 
						|
  posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
 | 
						|
  memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
 | 
						|
}
 | 
						|
 | 
						|
// helper fcn to interleave small side data by joinernum
 | 
						|
bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
 | 
						|
{
 | 
						|
  uint i;
 | 
						|
  // find the next joiner that still has more data to send.  Set joinerNum & pos.
 | 
						|
  for (i = 0; i < PMJoinerCount; i++)
 | 
						|
  {
 | 
						|
    joinerNum = (joinerNum + 1) % PMJoinerCount;
 | 
						|
    if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide().size())
 | 
						|
      break;
 | 
						|
  }
 | 
						|
  if (i == PMJoinerCount)
 | 
						|
    return false;
 | 
						|
  pos = posByJoinerNum[joinerNum];
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
/* This algorithm relies on the joiners being sorted by size atm */
 | 
						|
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
 | 
						|
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
 | 
						|
{
 | 
						|
  uint32_t toSend, i, j;
 | 
						|
  ISMPacketHeader ism;
 | 
						|
  Row r;
 | 
						|
  joiner::TypelessData tlData;
 | 
						|
  uint32_t smallKeyCol;
 | 
						|
  uint32_t largeKeyCol;
 | 
						|
  uint64_t smallkey;
 | 
						|
  bool isNull;
 | 
						|
  bool bSignedUnsigned;
 | 
						|
 | 
						|
  bool moreMsgs = pickNextJoinerNum();
 | 
						|
 | 
						|
  if (!moreMsgs)
 | 
						|
  {
 | 
						|
    /* last message */
 | 
						|
    // 		cout << "sending last joiner msg\n";
 | 
						|
    ism.Command = BATCH_PRIMITIVE_END_JOINER;
 | 
						|
    bs.load((uint8_t*)&ism, sizeof(ism));
 | 
						|
    bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
    bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
    bs << uniqueID;
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  memset((void*)&ism, 0, sizeof(ism));
 | 
						|
  auto& tSmallSide = tJoiners[joinerNum]->getSmallSide();
 | 
						|
  auto size = tSmallSide.size();
 | 
						|
 | 
						|
#if 0
 | 
						|
    if (joinerNum == PMJoinerCount - 1 && pos == size)
 | 
						|
    {
 | 
						|
        /* last message */
 | 
						|
// 		cout << "sending last joiner msg\n";
 | 
						|
        ism.Command = BATCH_PRIMITIVE_END_JOINER;
 | 
						|
        bs.load((uint8_t*) &ism, sizeof(ism));
 | 
						|
        bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
        bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
        bs << uniqueID;
 | 
						|
        pos = 0;
 | 
						|
        return false;
 | 
						|
    }
 | 
						|
    else if (pos == size)
 | 
						|
    {
 | 
						|
        joinerNum++;
 | 
						|
        tSmallSide = tJoiners[joinerNum]->getSmallSide();
 | 
						|
        size = tSmallSide->size();
 | 
						|
        pos = 0;
 | 
						|
    }
 | 
						|
#endif
 | 
						|
 | 
						|
  ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
 | 
						|
  bs.load((uint8_t*)&ism, sizeof(ism));
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)sessionID;
 | 
						|
  bs << (messageqcpp::ByteStream::quadbyte)stepID;
 | 
						|
  bs << uniqueID;
 | 
						|
 | 
						|
  smallSideRGs[joinerNum].initRow(&r);
 | 
						|
 | 
						|
  unsigned metasize = 12;  // 12 = sizeof(struct JoinerElements)
 | 
						|
 | 
						|
  if (tJoiners[joinerNum]->isTypelessJoin())
 | 
						|
    metasize = 0;
 | 
						|
 | 
						|
  unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
 | 
						|
  toSend = std::min<unsigned int>(size - pos, rowunits);
 | 
						|
  bs << toSend;
 | 
						|
  bs << pos;
 | 
						|
  bs << joinerNum;
 | 
						|
 | 
						|
  if (tJoiners[joinerNum]->isTypelessJoin())
 | 
						|
  {
 | 
						|
    // TODO: change RM ptr to ref b/c its scope and lifetime lasts till the end of the program.
 | 
						|
    auto alloc = rm_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
    utils::FixedAllocator fa(alloc, tlKeyLens[joinerNum], true);
 | 
						|
 | 
						|
    for (i = pos; i < pos + toSend; i++)
 | 
						|
    {
 | 
						|
      r.setPointer(tSmallSide[i]);
 | 
						|
      isNull = false;
 | 
						|
      bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
 | 
						|
 | 
						|
      for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
 | 
						|
      {
 | 
						|
        isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);
 | 
						|
 | 
						|
        if (UNLIKELY(bSignedUnsigned))
 | 
						|
        {
 | 
						|
          // BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
 | 
						|
          // then it should not compare. Send null to PM to prevent compare
 | 
						|
          smallKeyCol = smallSideKeys[joinerNum][j];
 | 
						|
          largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];
 | 
						|
 | 
						|
          if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
 | 
						|
          {
 | 
						|
            if (r.isUnsigned(smallKeyCol))
 | 
						|
              smallkey = r.getUintField(smallKeyCol);
 | 
						|
            else
 | 
						|
              smallkey = r.getIntField(smallKeyCol);
 | 
						|
 | 
						|
            if (smallkey & 0x8000000000000000ULL)
 | 
						|
            {
 | 
						|
              isNull = true;
 | 
						|
              break;
 | 
						|
            }
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      if (!isNull)
 | 
						|
      {
 | 
						|
        tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
 | 
						|
                                 tJoiners[joinerNum]->getLargeKeyColumns());
 | 
						|
        if (tlData.len == 0)
 | 
						|
        {
 | 
						|
          isNull = true;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      bs << (uint8_t)isNull;
 | 
						|
      if (!isNull)
 | 
						|
      {
 | 
						|
        tlData.serialize(bs);
 | 
						|
        bs << i;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
#pragma pack(push, 1)
 | 
						|
    struct JoinerElements
 | 
						|
    {
 | 
						|
      int64_t key;
 | 
						|
      uint32_t value;
 | 
						|
    } * arr;
 | 
						|
#pragma pack(pop)
 | 
						|
    bs.needAtLeast(toSend * sizeof(JoinerElements));
 | 
						|
    arr = (JoinerElements*)bs.getInputPtr();
 | 
						|
 | 
						|
    smallKeyCol = smallSideKeys[joinerNum][0];
 | 
						|
    bSignedUnsigned =
 | 
						|
        r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
 | 
						|
    j = 0;
 | 
						|
 | 
						|
    for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
 | 
						|
    {
 | 
						|
      r.setPointer(tSmallSide[i]);
 | 
						|
 | 
						|
      if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      {
 | 
						|
        // Small side is a long double. Since CS can't store larger than DOUBLE,
 | 
						|
        // we need to convert to whatever type large side is -- double or int64
 | 
						|
        long double smallkeyld = r.getLongDoubleField(smallKeyCol);
 | 
						|
        switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
 | 
						|
        {
 | 
						|
          case CalpontSystemCatalog::DOUBLE:
 | 
						|
          case CalpontSystemCatalog::UDOUBLE:
 | 
						|
          case CalpontSystemCatalog::FLOAT:
 | 
						|
          case CalpontSystemCatalog::UFLOAT:
 | 
						|
          {
 | 
						|
            if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
 | 
						|
            {
 | 
						|
              smallkey = joblist::UBIGINTEMPTYROW;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
              double d = (double)smallkeyld;
 | 
						|
              smallkey = *(int64_t*)&d;
 | 
						|
            }
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          default:
 | 
						|
          {
 | 
						|
            if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
 | 
						|
            {
 | 
						|
              smallkey = joblist::UBIGINTEMPTYROW;
 | 
						|
            }
 | 
						|
            else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
 | 
						|
            {
 | 
						|
              smallkey = joblist::UBIGINTEMPTYROW;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
              smallkey = (int64_t)smallkeyld;
 | 
						|
            }
 | 
						|
            break;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else if (r.isUnsigned(smallKeyCol))
 | 
						|
        smallkey = r.getUintField(smallKeyCol);
 | 
						|
      else
 | 
						|
        smallkey = r.getIntField(smallKeyCol);
 | 
						|
 | 
						|
      // If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
 | 
						|
      // against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
 | 
						|
      if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
 | 
						|
        smallkey = joblist::UBIGINTEMPTYROW;
 | 
						|
 | 
						|
      arr[j].key = (int64_t)smallkey;
 | 
						|
      arr[j].value = i;
 | 
						|
      // 			cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
 | 
						|
    }
 | 
						|
 | 
						|
    bs.advanceInputPtr(toSend * sizeof(JoinerElements));
 | 
						|
  }
 | 
						|
 | 
						|
  if (sendTupleJoinRowGroupData)
 | 
						|
  {
 | 
						|
    RowGroup& smallSide = smallSideRGs[joinerNum];
 | 
						|
    RGData tmpData(smallSide, toSend);
 | 
						|
    Row tmpRow;
 | 
						|
 | 
						|
    smallSide.setData(&tmpData);
 | 
						|
    smallSide.initRow(&tmpRow);
 | 
						|
    smallSide.getRow(0, &tmpRow);
 | 
						|
 | 
						|
    for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
 | 
						|
    {
 | 
						|
      r.setPointer(tSmallSide[i]);
 | 
						|
      copyRow(r, &tmpRow);
 | 
						|
    }
 | 
						|
 | 
						|
    smallSide.setRowCount(toSend);
 | 
						|
    tmpData.serialize(bs, smallSide.getDataSize());
 | 
						|
  }
 | 
						|
 | 
						|
  pos += toSend;
 | 
						|
  posByJoinerNum[joinerNum] = pos;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
 | 
						|
{
 | 
						|
  ot = ROW_GROUP;
 | 
						|
  projectionRG = rg;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
 | 
						|
{
 | 
						|
  joinedRG = rg;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
 | 
						|
{
 | 
						|
  sendAbsRids = false;
 | 
						|
  sendValues = false;
 | 
						|
  inputRG = rg;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
 | 
						|
                                                 const rowgroup::RowGroup& argpm)
 | 
						|
{
 | 
						|
  aggregatorPM = aggpm;
 | 
						|
  aggregateRGPM = argpm;
 | 
						|
 | 
						|
  if (tJoiners.size() > 0)
 | 
						|
    sendTupleJoinRowGroupData = true;
 | 
						|
}
 | 
						|
 | 
						|
/* OR hacks */
 | 
						|
void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
 | 
						|
{
 | 
						|
  bop = op;
 | 
						|
 | 
						|
  if (op == BOP_OR && filterCount > 1)
 | 
						|
  {
 | 
						|
    for (int i = 1; i < filterCount; ++i)
 | 
						|
    {
 | 
						|
      ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());
 | 
						|
 | 
						|
      if (colcmd != NULL)
 | 
						|
        colcmd->scan(false);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setForHJ(bool b)
 | 
						|
{
 | 
						|
  forHJ = b;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
 | 
						|
                                            const RowGroup& input)
 | 
						|
{
 | 
						|
  fe1 = fe;
 | 
						|
  fe1Input = input;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
 | 
						|
                                            const RowGroup& output)
 | 
						|
{
 | 
						|
  fe2 = fe;
 | 
						|
  fe2Output = output;
 | 
						|
 | 
						|
  if (tJoiners.size() > 0 && PMJoinerCount > 0)
 | 
						|
    sendTupleJoinRowGroupData = true;
 | 
						|
}
 | 
						|
 | 
						|
const string BatchPrimitiveProcessorJL::toMiniString() const
 | 
						|
{
 | 
						|
  ostringstream oss;
 | 
						|
  int i;
 | 
						|
  set<string> colset;
 | 
						|
  string col;
 | 
						|
 | 
						|
  for (i = 0; i < filterCount; i++)
 | 
						|
  {
 | 
						|
    col = filterSteps[i]->getColName();
 | 
						|
    // FilterCommandJL has two referenced columns, needs special handling.
 | 
						|
    FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());
 | 
						|
 | 
						|
    if (filterCmd == NULL)
 | 
						|
    {
 | 
						|
      colset.insert(col);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      // is a FilterCommandJL
 | 
						|
      size_t sep = col.find(',');
 | 
						|
      colset.insert(col.substr(0, sep));
 | 
						|
 | 
						|
      if (sep != string::npos)
 | 
						|
        colset.insert(col.substr(++sep));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  for (i = 0; i < projectCount; i++)
 | 
						|
  {
 | 
						|
    col = projectSteps[i]->getColName();
 | 
						|
    colset.insert(col);
 | 
						|
  }
 | 
						|
 | 
						|
  set<string>::iterator iter = colset.begin();
 | 
						|
  oss << '(' << *iter++;
 | 
						|
 | 
						|
  while (iter != colset.end())
 | 
						|
    oss << ',' << *iter++;
 | 
						|
 | 
						|
  oss << ')';
 | 
						|
 | 
						|
  return oss.str();
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
 | 
						|
{
 | 
						|
  joinFERG = rg;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
 | 
						|
{
 | 
						|
  ISMPacketHeader ism;
 | 
						|
 | 
						|
  memset((void*)&ism, 0, sizeof(ism));
 | 
						|
  ism.Command = BATCH_PRIMITIVE_ABORT;
 | 
						|
 | 
						|
  bs->load((uint8_t*)&ism, sizeof(ism));
 | 
						|
  *bs << uniqueID;
 | 
						|
}
 | 
						|
 | 
						|
void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
 | 
						|
{
 | 
						|
  if (aggregatorPM)
 | 
						|
    aggregateRGPM.setUseStringTable(b);
 | 
						|
  else if (fe2)
 | 
						|
    fe2Output.setUseStringTable(b);
 | 
						|
  else
 | 
						|
    projectionRG.setUseStringTable(b);
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace joblist
 |