mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
* fix(rowgroup): RGData now uses uint64_t counter for the fixed sizes columns data buf. The buffer can utilize > 4GB RAM that is necessary for PM side join. RGData ctor uses uint32_t allocating data buffer. This fact causes implicit heap overflow. * feat(bytestream,serdes): BS buffer size type is uint64_t This necessary to handle 64bit RGData, that comes as a separate patch. The pair of patches would allow to have PM joins when SmallSide size > 4GB. * feat(bytestream,serdes): Distribute BS buf size data type change to avoid implicit data type narrowing * feat(rowgroup): this returns bits lost during cherry-pick. The bits lost caused the first RGData::serialize to crash a process
1780 lines
46 KiB
C++
1780 lines
46 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
//
|
|
// $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
|
|
// C++ Implementation: batchprimitiveprocessor
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
|
|
//
|
|
// Copyright: See COPYING file that comes with this distribution
|
|
//
|
|
//
|
|
|
|
#include <unistd.h>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
#include <stdexcept>
|
|
#include <iostream>
|
|
#include <set>
|
|
using namespace std;
|
|
|
|
#include <boost/uuid/uuid.hpp>
|
|
#include <boost/uuid/uuid_generators.hpp>
|
|
namespace bu = boost::uuids;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "bpp-jl.h"
|
|
#include "jlf_common.h"
|
|
using namespace messageqcpp;
|
|
using namespace rowgroup;
|
|
using namespace joiner;
|
|
|
|
#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX
|
|
|
|
namespace joblist
|
|
{
|
|
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
|
|
: ot(BPS_ELEMENT_TYPE)
|
|
, needToSetLBID(true)
|
|
, count(1)
|
|
, baseRid(0)
|
|
, ridCount(0)
|
|
, needStrValues(false)
|
|
, wideColumnsWidths(0)
|
|
, filterCount(0)
|
|
, projectCount(0)
|
|
, needRidsAtDelivery(false)
|
|
, ridMap(0)
|
|
, sendValues(false)
|
|
, sendAbsRids(false)
|
|
, _hasScan(false)
|
|
, LBIDTrace(false)
|
|
, tupleLength(0)
|
|
, status(0)
|
|
, valueColumn(0)
|
|
, sendTupleJoinRowGroupData(false)
|
|
, bop(BOP_AND)
|
|
, forHJ(false)
|
|
, threadCount(1)
|
|
, fJoinerChunkSize(rm->getJlJoinerChunkSize())
|
|
, hasSmallOuterJoin(false)
|
|
, _priority(1)
|
|
{
|
|
PMJoinerCount = 0;
|
|
uuid = bu::nil_generator()();
|
|
}
|
|
|
|
BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
|
|
{
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
|
|
vector<BRM::LBID_t> lastScannedLBID,
|
|
bool hasAuxCol,
|
|
const std::vector<BRM::EMEntry>& extentsAux,
|
|
execplan::CalpontSystemCatalog::OID oidAux)
|
|
{
|
|
SCommand cc;
|
|
|
|
tableOID = scan.tableOid();
|
|
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setQueryUuid(scan.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
filterSteps.push_back(cc);
|
|
filterCount++;
|
|
_hasScan = true;
|
|
if (utils::isWide(cc->getWidth()))
|
|
wideColumnsWidths |= cc->getWidth();
|
|
idbassert(sessionID == scan.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
|
|
{
|
|
SCommand cc;
|
|
|
|
tableOID = pcs.tableOid();
|
|
cc.reset(new PseudoCCJL(pcs));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setQueryUuid(pcs.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
filterSteps.push_back(cc);
|
|
filterCount++;
|
|
idbassert(sessionID == pcs.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
tableOID = step.tableOid();
|
|
cc.reset(new ColumnCommandJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
filterSteps.push_back(cc);
|
|
filterCount++;
|
|
if (utils::isWide(cc->getWidth()))
|
|
wideColumnsWidths |= cc->getWidth();
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
tableOID = step.tableOid();
|
|
|
|
if (filterCount == 0)
|
|
{
|
|
sendAbsRids = true;
|
|
sendValues = true;
|
|
absRids.reset(new uint64_t[8192]);
|
|
}
|
|
|
|
cc.reset(new DictStepJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
|
|
#if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
|
|
if (filterSteps.size() > 0)
|
|
{
|
|
size_t stepsIndex = filterSteps.size() - 1;
|
|
SCommand prevCC = filterSteps[stepsIndex];
|
|
ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
|
|
DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
|
|
if (pcc && ccc)
|
|
{
|
|
filterSteps[stepsIndex].reset(
|
|
new ColumnCommandJL(*pcc, *ccc)); // column command will use same filters.
|
|
}
|
|
}
|
|
#endif
|
|
filterSteps.push_back(cc);
|
|
filterCount++;
|
|
needStrValues = true;
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
tableOID = step.tableOid();
|
|
cc.reset(new FilterCommandJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
filterSteps.push_back(cc);
|
|
filterCount++;
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
cc.reset(new PseudoCCJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setTupleKey(step.tupleId());
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
projectSteps.push_back(cc);
|
|
colWidths.push_back(cc->getWidth());
|
|
tupleLength += cc->getWidth();
|
|
projectCount++;
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
cc.reset(new ColumnCommandJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setTupleKey(step.tupleId());
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
projectSteps.push_back(cc);
|
|
colWidths.push_back(cc->getWidth());
|
|
tupleLength += cc->getWidth();
|
|
projectCount++;
|
|
if (utils::isWide(cc->getWidth()))
|
|
wideColumnsWidths |= cc->getWidth();
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
|
|
{
|
|
SCommand cc;
|
|
|
|
cc.reset(new PassThruCommandJL(step));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setTupleKey(step.tupleId());
|
|
cc->setQueryUuid(step.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
projectSteps.push_back(cc);
|
|
colWidths.push_back(cc->getWidth());
|
|
tupleLength += cc->getWidth();
|
|
projectCount++;
|
|
|
|
if (utils::isWide(cc->getWidth()))
|
|
wideColumnsWidths |= cc->getWidth();
|
|
|
|
if (filterCount == 0)
|
|
sendValues = true;
|
|
|
|
idbassert(sessionID == step.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
|
|
{
|
|
SCommand cc;
|
|
|
|
cc.reset(new RTSCommandJL(col, dict));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setTupleKey(dict.tupleId());
|
|
cc->setQueryUuid(col.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
projectSteps.push_back(cc);
|
|
colWidths.push_back(cc->getWidth());
|
|
tupleLength += cc->getWidth();
|
|
projectCount++;
|
|
needStrValues = true;
|
|
idbassert(sessionID == col.sessionId());
|
|
idbassert(sessionID == dict.sessionId());
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
|
|
{
|
|
SCommand cc;
|
|
|
|
cc.reset(new RTSCommandJL(p, dict));
|
|
cc->setBatchPrimitiveProcessor(this);
|
|
cc->setTupleKey(dict.tupleId());
|
|
cc->setQueryUuid(p.queryUuid());
|
|
cc->setStepUuid(uuid);
|
|
projectSteps.push_back(cc);
|
|
colWidths.push_back(cc->getWidth());
|
|
tupleLength += cc->getWidth();
|
|
projectCount++;
|
|
needStrValues = true;
|
|
|
|
if (filterCount == 0)
|
|
{
|
|
sendValues = true;
|
|
sendAbsRids = true;
|
|
absRids.reset(new uint64_t[8192]);
|
|
}
|
|
|
|
idbassert(sessionID == p.sessionId());
|
|
idbassert(sessionID == dict.sessionId());
|
|
}
|
|
|
|
#if 0
|
|
void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
|
|
{
|
|
idbassert(sessionID == ds.sessionId());
|
|
|
|
uint32_t i;
|
|
|
|
templateTB = ds.fEmptyTableBand;
|
|
tableOID = ds.fTableOID;
|
|
|
|
tableColumnCount = templateTB.getColumnCount();
|
|
idbassert(tableColumnCount > 0);
|
|
tablePositions.reset(new int[tableColumnCount]);
|
|
|
|
for (i = 0; i < tableColumnCount; i++)
|
|
tablePositions[i] = -1;
|
|
|
|
idbassert(projectCount <= projectSteps.size());
|
|
|
|
/* In theory, tablePositions maps a column's table position to a projection step
|
|
-1 means it's a null column */
|
|
CalpontSystemCatalog::OID oid;
|
|
int idx;
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());
|
|
|
|
if (oid > 0)
|
|
{
|
|
idx = templateTB.find(oid);
|
|
|
|
if (idx >= 0)
|
|
{
|
|
tablePositions[idx] = i;
|
|
}
|
|
else
|
|
{
|
|
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
|
|
" in tableband at pjstep idx " << i << endl;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
|
|
" doesn't have a valid OID" << endl;
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
|
|
{
|
|
uint32_t i;
|
|
// rowCounter++;
|
|
|
|
if (needToSetLBID)
|
|
{
|
|
needToSetLBID = false;
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
filterSteps[i]->setLBID(et.first, dbroot);
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
projectSteps[i]->setLBID(et.first, dbroot);
|
|
|
|
baseRid = et.first & 0xffffffffffffe000ULL;
|
|
}
|
|
|
|
// TODO: get rid of magics
|
|
if (sendAbsRids)
|
|
absRids[ridCount] = et.first;
|
|
else
|
|
{
|
|
relRids[ridCount] = et.first & 0x1fff; // 8192 rows per logical block
|
|
ridMap |= 1 << (relRids[ridCount] >> 9); // LSB -> 0-511, MSB -> 7680-8191
|
|
}
|
|
|
|
if (sendValues)
|
|
{
|
|
// cout << "adding value " << et.second << endl;
|
|
values[ridCount] = et.second;
|
|
}
|
|
|
|
ridCount++;
|
|
idbassert(ridCount <= 8192);
|
|
}
|
|
|
|
#if 0
|
|
void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
|
|
{
|
|
uint32_t i;
|
|
rowgroup::Row r;
|
|
|
|
inputRG.setData(rg.getData());
|
|
inputRG.initRow(&r);
|
|
inputRG.getRow(0, &r);
|
|
|
|
if (needToSetLBID)
|
|
{
|
|
needToSetLBID = false;
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
filterSteps[i]->setLBID(r.getRid(), 1);
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
projectSteps[i]->setLBID(r.getRid(), 1);
|
|
|
|
baseRid = r.getRid() & 0xffffffffffffe000ULL;
|
|
}
|
|
}
|
|
|
|
#endif
|
|
|
|
void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
|
|
{
|
|
if (filterCount == 0)
|
|
throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");
|
|
|
|
addElementType(ElementType(et.first, et.first), dbroot);
|
|
}
|
|
|
|
/**
|
|
* When the output type is ElementType, the messages have the following format:
|
|
*
|
|
* ISMPacketHeader (ignored)
|
|
* PrimitiveHeader (ignored)
|
|
* --
|
|
* If the BPP starts with a scan
|
|
* casual partitioning data from the scanned column
|
|
* Base Rid for the returned logical block
|
|
* rid count
|
|
* (rid count)x 16-bit rids
|
|
* (rid count)x 64-bit values
|
|
* If there's a join performed by this BPP
|
|
* pre-Join rid count (for logging purposes only)
|
|
* small-side new match count
|
|
* (match count)x ElementTypes
|
|
* cached IO count
|
|
* physical IO count
|
|
* block touches
|
|
*/
|
|
|
|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
|
|
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
|
|
uint64_t* lbid, int64_t* min, int64_t* max,
|
|
uint32_t* cachedIO, uint32_t* physIO,
|
|
uint32_t* touchedBlocks) const
|
|
{
|
|
uint32_t i;
|
|
uint16_t l_count;
|
|
uint64_t l_baseRid;
|
|
uint16_t* rids;
|
|
uint64_t* vals;
|
|
uint8_t* buf;
|
|
uint64_t tmp64;
|
|
uint8_t tmp8;
|
|
|
|
/* skip the header */
|
|
idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
|
|
if (_hasScan)
|
|
{
|
|
in >> tmp8;
|
|
*validCPData = (tmp8 != 0);
|
|
|
|
if (*validCPData)
|
|
{
|
|
in >> *lbid;
|
|
|
|
in >> tmp64;
|
|
*min = (int64_t)tmp64;
|
|
in >> tmp64;
|
|
*max = (int64_t)tmp64;
|
|
}
|
|
else
|
|
in >> *lbid;
|
|
}
|
|
|
|
in >> l_baseRid;
|
|
in >> l_count;
|
|
// rowsProcessed += l_count;
|
|
idbassert(l_count <= 8192);
|
|
out->resize(l_count);
|
|
|
|
buf = (uint8_t*)in.buf();
|
|
|
|
rids = (uint16_t*)buf;
|
|
vals = (uint64_t*)(buf + (l_count << 1));
|
|
idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
|
|
in.advance((l_count << 1) + (l_count << 3));
|
|
|
|
for (i = 0; i < l_count; ++i)
|
|
{
|
|
(*out)[i].first = rids[i] + l_baseRid;
|
|
// if (tableOID >= 3000)
|
|
// idbassert((*out)[i].first > 1023);
|
|
(*out)[i].second = vals[i];
|
|
}
|
|
|
|
in >> *cachedIO;
|
|
in >> *physIO;
|
|
in >> *touchedBlocks;
|
|
// cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
|
|
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
|
|
idbassert(in.length() == 0);
|
|
}
|
|
|
|
/**
|
|
* When the output type is StringElementType the messages have the following format:
|
|
*
|
|
* ISMPacketHeader (ignored)
|
|
* PrimitiveHeader (ignored)
|
|
* ---
|
|
* If the BPP starts with a scan
|
|
* Casual partitioning data from the column scanned
|
|
* Rid count
|
|
* (rid count)x 64-bit absolute rids
|
|
* (rid count)x serialized strings
|
|
* cached IO count
|
|
* physical IO count
|
|
* blocks touched
|
|
*/
|
|
|
|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
|
|
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
|
|
bool* validCPData, uint64_t* lbid, int64_t* min,
|
|
int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
|
|
uint32_t* touchedBlocks) const
|
|
{
|
|
uint32_t i;
|
|
uint16_t l_count;
|
|
uint64_t* l_absRids;
|
|
uint64_t tmp64;
|
|
uint8_t tmp8;
|
|
|
|
// cout << "get String ETs uniqueID\n";
|
|
/* skip the header */
|
|
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
|
|
if (_hasScan)
|
|
{
|
|
in >> tmp8;
|
|
*validCPData = (tmp8 != 0);
|
|
|
|
if (*validCPData)
|
|
{
|
|
in >> *lbid;
|
|
in >> tmp64;
|
|
*min = (int64_t)tmp64;
|
|
in >> tmp64;
|
|
*max = (int64_t)tmp64;
|
|
}
|
|
else
|
|
in >> *lbid;
|
|
}
|
|
|
|
in >> l_count;
|
|
// cout << "parsing " << l_count << " strings\n";
|
|
l_absRids = (uint64_t*)in.buf();
|
|
out->resize(l_count);
|
|
in.advance(l_count << 3);
|
|
|
|
for (i = 0; i < l_count; ++i)
|
|
{
|
|
(*out)[i].first = l_absRids[i];
|
|
in >> (*out)[i].second;
|
|
}
|
|
|
|
in >> *cachedIO;
|
|
in >> *physIO;
|
|
in >> *touchedBlocks;
|
|
// cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
|
|
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
|
|
idbassert(in.length() == 0);
|
|
}
|
|
|
|
/**
|
|
* When the output type is Tuples, the input message format is the same
|
|
* as when the output type is TableBands
|
|
*/
|
|
|
|
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
|
|
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
|
|
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
|
|
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
|
|
{
|
|
uint32_t i, j, pos, len;
|
|
uint16_t l_rowCount;
|
|
uint64_t l_baseRid;
|
|
uint16_t* l_relRids;
|
|
uint64_t absRids[8192];
|
|
// const uint8_t* columnData[projectCount];
|
|
const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
|
|
memset(columnData, 0, projectCount * sizeof(uint8_t*));
|
|
const uint8_t* buf;
|
|
// uint32_t colLengths[projectCount];
|
|
uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
|
|
uint64_t tmp64;
|
|
uint8_t tmp8;
|
|
|
|
// cout << "getTuples msg is " << in.length() << " bytes\n";
|
|
/* skip the header */
|
|
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
|
|
if (_hasScan)
|
|
{
|
|
in >> tmp8;
|
|
*validCPData = (tmp8 != 0);
|
|
|
|
if (*validCPData)
|
|
{
|
|
in >> *lbid;
|
|
in >> tmp64;
|
|
*min = (int64_t)tmp64;
|
|
in >> tmp64;
|
|
*max = (int64_t)tmp64;
|
|
}
|
|
else
|
|
in >> *lbid;
|
|
}
|
|
|
|
in >> l_rowCount;
|
|
|
|
// cout << "read " << l_rowCount << " rows\n";
|
|
|
|
if (needRidsAtDelivery)
|
|
{
|
|
in >> l_baseRid;
|
|
l_relRids = (uint16_t*)in.buf();
|
|
|
|
for (i = 0; i < l_rowCount; i++)
|
|
absRids[i] = l_relRids[i] + l_baseRid;
|
|
|
|
in.advance(l_rowCount << 1);
|
|
}
|
|
|
|
/* Set up pointers to the column data */
|
|
pos = 0;
|
|
buf = in.buf();
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
colLengths[i] = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
columnData[i] = &buf[pos];
|
|
// cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
|
|
pos += colLengths[i];
|
|
idbassert(pos < in.length());
|
|
}
|
|
|
|
in.advance(pos);
|
|
|
|
out->resize(l_rowCount);
|
|
|
|
for (i = 0; i < l_rowCount; i++)
|
|
{
|
|
(*out)[i].first = absRids[i];
|
|
(*out)[i].second = new char[tupleLength];
|
|
|
|
for (j = 0, pos = 0; j < projectCount; j++)
|
|
{
|
|
idbassert(pos + colWidths[j] <= tupleLength);
|
|
|
|
if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
|
|
{
|
|
len = *((uint32_t*)columnData[j]);
|
|
columnData[j] += 4;
|
|
memcpy(&(*out)[i].second[pos], columnData[j], len);
|
|
pos += len;
|
|
columnData[j] += len;
|
|
|
|
// insert padding...
|
|
memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
|
|
pos += colWidths[j] - len;
|
|
}
|
|
else
|
|
{
|
|
switch (colWidths[j])
|
|
{
|
|
case 8:
|
|
*((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
|
|
columnData[j] += 8;
|
|
pos += 8;
|
|
break;
|
|
|
|
case 4:
|
|
*((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
|
|
columnData[j] += 4;
|
|
pos += 4;
|
|
break;
|
|
|
|
case 2:
|
|
*((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
|
|
columnData[j] += 2;
|
|
pos += 4;
|
|
break;
|
|
|
|
case 1:
|
|
(*out)[i].second[pos] = (char)*columnData[j];
|
|
columnData[j]++;
|
|
pos++;
|
|
break;
|
|
|
|
// TODO MCOL-641
|
|
case 16:
|
|
// fallthrough
|
|
default:
|
|
cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
|
|
throw logic_error("BPP::getTuples(): bad column width");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
in >> *cachedIO;
|
|
in >> *physIO;
|
|
in >> *touchedBlocks;
|
|
idbassert(in.length() == 0);
|
|
}
|
|
|
|
bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
|
|
{
|
|
const uint8_t* data = in.buf();
|
|
uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader); // skip the headers
|
|
|
|
ISMPacketHeader* hdr = (ISMPacketHeader*)(data);
|
|
|
|
if (_hasScan && in.length() > offset)
|
|
{
|
|
// This is a legitimate error message sent by PrimProc
|
|
// so we need to return to allow upper layer to throw an error
|
|
// if needed.
|
|
if (hdr->Status > 0)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
if (data[offset] != 0)
|
|
offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
|
|
1; // skip the CP data with wide min/max values (16/32 bytes each). we also skip
|
|
// cpFromDictScan flag.
|
|
else
|
|
offset += CP_FLAG_AND_LBID; // skip only the "valid CP data" & LBID bytes
|
|
}
|
|
|
|
// Throw b/c PP throws and sends here error msg.
|
|
// See BatchPrimitiveProcessor::writeErrorMsg() for details.
|
|
// The inversion of the assert used here previously.
|
|
if (in.length() <= offset)
|
|
{
|
|
if (hdr->Status > 0)
|
|
{
|
|
throw std::runtime_error(" an exception originally thrown by PrimProc: ");
|
|
}
|
|
throw std::runtime_error(
|
|
" an exception because there is not enough \
|
|
data in the Primitive message from PrimProc.");
|
|
}
|
|
|
|
return (data[offset] != 0);
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
|
|
{
|
|
RGData rgData;
|
|
uint32_t count, i;
|
|
|
|
*in >> count;
|
|
|
|
for (i = 0; i < count; i++)
|
|
{
|
|
rgData.deserialize(*in, true);
|
|
out->push_back(rgData);
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
|
|
uint64_t* lbid, bool* fromDictScan, int128_t* min,
|
|
int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
|
|
uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
|
|
bool* hasWideColumn,
|
|
const execplan::CalpontSystemCatalog::ColType& colType) const
|
|
{
|
|
uint64_t tmp64;
|
|
int128_t tmp128;
|
|
uint8_t tmp8;
|
|
RGData rgData;
|
|
uint32_t rowCount;
|
|
RowGroup& org = primprocRG[threadID];
|
|
|
|
out->clear();
|
|
|
|
if (in.length() == 0)
|
|
{
|
|
// done, return an empty RG
|
|
rgData = RGData(org, 0);
|
|
org.setData(&rgData);
|
|
org.resetRowGroup(0);
|
|
out->push_back(rgData);
|
|
*cachedIO = 0;
|
|
*physIO = 0;
|
|
*touchedBlocks = 0;
|
|
return;
|
|
}
|
|
|
|
/* skip the header */
|
|
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
|
|
if (_hasScan)
|
|
{
|
|
in >> tmp8;
|
|
*validCPData = (tmp8 != 0);
|
|
|
|
if (*validCPData)
|
|
{
|
|
in >> *lbid;
|
|
in >> tmp8;
|
|
*fromDictScan = tmp8 != 0;
|
|
in >> tmp8;
|
|
*hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
|
|
if (UNLIKELY(*hasWideColumn))
|
|
{
|
|
idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
|
|
if (LIKELY(colType.isWideDecimalType()))
|
|
{
|
|
in >> tmp128;
|
|
*min = tmp128;
|
|
in >> tmp128;
|
|
*max = tmp128;
|
|
}
|
|
else
|
|
{
|
|
std::ostringstream oss;
|
|
oss << __func__ << " WARNING!!! Not implemented for the data type ";
|
|
oss << colType.colDataType << std::endl;
|
|
std::cout << oss.str();
|
|
idbassert(false);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
in >> tmp64;
|
|
*min = static_cast<int128_t>(tmp64);
|
|
in >> tmp64;
|
|
*max = static_cast<int128_t>(tmp64);
|
|
}
|
|
}
|
|
else
|
|
in >> *lbid;
|
|
}
|
|
|
|
in >> tmp8;
|
|
*countThis = (tmp8 != 0);
|
|
|
|
/* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
|
|
* For later... */
|
|
if (aggregatorPM)
|
|
{
|
|
deserializeAggregateResult(&in, out);
|
|
// for (uint32_t z = 0; z < out->size(); z++) {
|
|
// org.setData(&(*out)[z]);
|
|
// cout << "BPPJL: " << org.toString() << endl;
|
|
//}
|
|
}
|
|
else
|
|
{
|
|
rgData.deserialize(in, true);
|
|
out->push_back(rgData);
|
|
org.setData(&rgData);
|
|
|
|
rowCount = org.getRowCount();
|
|
|
|
bool pmSendsMatchesAnyway =
|
|
(hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));
|
|
|
|
if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
|
|
{
|
|
std::shared_ptr<vector<uint32_t>[]> joinResults;
|
|
uint32_t i, j;
|
|
|
|
if (pmSendsMatchesAnyway)
|
|
{
|
|
uint16_t joinRowCount;
|
|
in >> joinRowCount;
|
|
rowCount = joinRowCount;
|
|
}
|
|
|
|
for (j = 0; j < PMJoinerCount; j++)
|
|
{
|
|
/* Reuse the result space if possible */
|
|
joinResults = tJoiners[j]->getPMJoinArrays(threadID);
|
|
|
|
if (joinResults.get() == NULL)
|
|
{
|
|
auto v = new vector<uint32_t>[8192];
|
|
joinResults.reset(v);
|
|
tJoiners[j]->setPMJoinResults(joinResults, threadID);
|
|
}
|
|
|
|
for (i = 0; i < rowCount; i++)
|
|
deserializeInlineVector<uint32_t>(in, joinResults[i]);
|
|
|
|
if (tJoiners[j]->smallOuterJoin())
|
|
tJoiners[j]->markMatches(threadID, rowCount);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (*countThis)
|
|
{
|
|
// cout << "grabbing io stats\n";
|
|
in >> *cachedIO;
|
|
in >> *physIO;
|
|
in >> *touchedBlocks;
|
|
}
|
|
else
|
|
{
|
|
*cachedIO = 0;
|
|
*physIO = 0;
|
|
*touchedBlocks = 0;
|
|
}
|
|
|
|
idbassert(in.length() == 0);
|
|
}
|
|
|
|
RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
|
|
{
|
|
RGData ret;
|
|
rowgroup::RowGroup rg(projectionRG);
|
|
|
|
ret = RGData(rg, 0);
|
|
rg.setData(&ret);
|
|
// rg.convertToInlineDataInPlace();
|
|
rg.resetRowGroup(0);
|
|
rg.setStatus(error);
|
|
return ret;
|
|
// return ret->rowData;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::reset()
|
|
{
|
|
ridCount = 0;
|
|
ridMap = 0;
|
|
needToSetLBID = true;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
|
|
{
|
|
uint32_t i;
|
|
|
|
dbRoot = scannedExtent.dbRoot;
|
|
baseRid = rowgroup::convertToRid(
|
|
scannedExtent.partitionNum, scannedExtent.segmentNum,
|
|
scannedExtent.blockOffset / (scannedExtent.range.size * 1024), // the extent #
|
|
(l - scannedExtent.range.start) / scannedExtent.range.size); // the logical block #
|
|
|
|
/*
|
|
cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
|
|
<< " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
|
|
scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
|
|
" blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
|
|
*/
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
filterSteps[i]->setLBID(baseRid, dbRoot);
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
projectSteps[i]->setLBID(baseRid, dbRoot);
|
|
}
|
|
|
|
string BatchPrimitiveProcessorJL::toString() const
|
|
{
|
|
ostringstream ret;
|
|
uint32_t i;
|
|
|
|
ret << "BatchPrimitiveProcessorJL:" << endl;
|
|
|
|
if (!_hasScan)
|
|
{
|
|
if (sendValues)
|
|
ret << " -- serializing values" << endl;
|
|
|
|
if (sendAbsRids)
|
|
ret << " -- serializing absolute rids" << endl;
|
|
else
|
|
ret << " -- serializing relative rids" << endl;
|
|
}
|
|
else
|
|
ret << " -- scan driven" << endl;
|
|
|
|
ret << " " << (int)filterCount << " filter steps:\n";
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
ret << " " << filterSteps[i]->toString() << endl;
|
|
|
|
ret << " " << (int)projectCount << " projection steps:\n";
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
ret << " " << projectSteps[i]->toString() << endl;
|
|
|
|
return ret.str();
|
|
}
|
|
|
|
/**
|
|
* The creation messages have the following format:
|
|
*
|
|
* ISMPacketHeader (necessary for DEC and ReadThread classes)
|
|
* ---
|
|
* output type (ElementType, StringElementType, etc)
|
|
* version #
|
|
* transaction #
|
|
* session # (unnecessary except for debugging)
|
|
* step ID ( same )
|
|
* unique ID (uniquely identifies the DEC queue on the UM)
|
|
* 8-bit flags
|
|
* if there's a join...
|
|
* # of elements in the Joiner object
|
|
* a flag whether or not to match every element (for inner joins)
|
|
* filter step count
|
|
* (filter count)x serialized Commands
|
|
* projection step count
|
|
* (projection count)x serialized Commands
|
|
*/
|
|
|
|
void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
|
|
{
|
|
ISMPacketHeader ism;
|
|
uint32_t i;
|
|
uint16_t flags = 0;
|
|
|
|
ism.Command = BATCH_PRIMITIVE_CREATE;
|
|
|
|
bs.load((uint8_t*)&ism, sizeof(ism));
|
|
bs << (uint8_t)ot;
|
|
bs << (messageqcpp::ByteStream::quadbyte)txnID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
bs << versionInfo;
|
|
|
|
if (needStrValues)
|
|
flags |= NEED_STR_VALUES;
|
|
|
|
if (sendAbsRids)
|
|
flags |= GOT_ABS_RIDS;
|
|
|
|
if (sendValues)
|
|
flags |= GOT_VALUES;
|
|
|
|
if (LBIDTrace)
|
|
flags |= LBID_TRACE;
|
|
|
|
if (needRidsAtDelivery)
|
|
flags |= SEND_RIDS_AT_DELIVERY;
|
|
|
|
if (tJoiners.size() > 0)
|
|
flags |= HAS_JOINER;
|
|
|
|
if (sendTupleJoinRowGroupData)
|
|
flags |= JOIN_ROWGROUP_DATA;
|
|
|
|
if (wideColumnsWidths)
|
|
flags |= HAS_WIDE_COLUMNS;
|
|
|
|
bs << flags;
|
|
|
|
if (wideColumnsWidths)
|
|
bs << wideColumnsWidths;
|
|
|
|
bs << bop;
|
|
bs << (uint8_t)(forHJ ? 1 : 0);
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bs << projectionRG;
|
|
// cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;
|
|
|
|
/* F&E serialization */
|
|
if (fe1)
|
|
{
|
|
bs << (uint8_t)1;
|
|
bs << *fe1;
|
|
bs << fe1Input;
|
|
}
|
|
else
|
|
bs << (uint8_t)0;
|
|
|
|
if (fe2)
|
|
{
|
|
bs << (uint8_t)1;
|
|
bs << *fe2;
|
|
bs << fe2Output;
|
|
}
|
|
else
|
|
bs << (uint8_t)0;
|
|
}
|
|
|
|
/* if HAS_JOINER, send the init params */
|
|
if (flags & HAS_JOINER)
|
|
{
|
|
bs << (uint32_t)maxPmJoinResultCount;
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
idbassert(tJoiners.size() > 0);
|
|
bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;
|
|
|
|
bool atLeastOneFE = false;
|
|
#ifdef JLF_DEBUG
|
|
cout << "PMJoinerCount = " << PMJoinerCount << endl;
|
|
#endif
|
|
|
|
bool smallSideRGSent = false;
|
|
for (i = 0; i < PMJoinerCount; i++)
|
|
{
|
|
bs << (uint32_t)tJoiners[i]->size();
|
|
bs << tJoiners[i]->getJoinType();
|
|
|
|
// bs << (uint64_t) tJoiners[i]->smallNullValue();
|
|
|
|
bs << (uint8_t)tJoiners[i]->isTypelessJoin();
|
|
|
|
if (tJoiners[i]->hasFEFilter())
|
|
{
|
|
atLeastOneFE = true;
|
|
#ifdef JLF_DEBUG
|
|
cout << "serializing join FE object\n";
|
|
#endif
|
|
bs << *tJoiners[i]->getFcnExpFilter();
|
|
}
|
|
|
|
if (!tJoiners[i]->isTypelessJoin())
|
|
{
|
|
bs << (uint64_t)tJoiners[i]->smallNullValue();
|
|
bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
|
|
// cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
|
|
}
|
|
else
|
|
{
|
|
serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
|
|
bs << (uint32_t)tJoiners[i]->getKeyLength();
|
|
// MCOL-4173 Notify PP if smallSide and largeSide have different column widths
|
|
// and send smallSide RG to PP.
|
|
bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
|
|
bs << (uint8_t)joinHasSkewedKeyColumn;
|
|
if (!smallSideRGSent && joinHasSkewedKeyColumn)
|
|
{
|
|
idbassert(!smallSideRGs.empty());
|
|
bs << smallSideRGs[0];
|
|
serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
|
|
smallSideRGSent = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (atLeastOneFE)
|
|
bs << joinFERG;
|
|
|
|
if (sendTupleJoinRowGroupData)
|
|
{
|
|
#ifdef JLF_DEBUG
|
|
cout << "sending smallside data\n";
|
|
#endif
|
|
serializeVector<RowGroup>(bs, smallSideRGs);
|
|
bs << largeSideRG;
|
|
bs << joinedRG; // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
|
|
// cout << "joined RG: " << joinedRG.toString() << endl;
|
|
}
|
|
}
|
|
}
|
|
|
|
bs << filterCount;
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
{
|
|
// cout << "serializing step " << i << endl;
|
|
filterSteps[i]->createCommand(bs);
|
|
}
|
|
|
|
bs << projectCount;
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
projectSteps[i]->createCommand(bs);
|
|
|
|
// aggregate step only when output is row group
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
if (aggregatorPM.get() != NULL)
|
|
{
|
|
bs << (uint8_t)1;
|
|
bs << aggregateRGPM;
|
|
bs << *(aggregatorPM.get());
|
|
}
|
|
else
|
|
{
|
|
bs << (uint8_t)0;
|
|
}
|
|
}
|
|
|
|
// decide which rowgroup is received by PrimProc
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
primprocRG.reset(new RowGroup[threadCount]);
|
|
|
|
for (uint32_t i = 0; i < threadCount; i++)
|
|
if (aggregatorPM)
|
|
primprocRG[i] = aggregateRGPM;
|
|
else if (fe2)
|
|
primprocRG[i] = fe2Output;
|
|
// This shouldn't be necessary. As of 2-17-14, PrimProc
|
|
// will only send joined results if fe2 || aggregatorPM,
|
|
// so it will never send back data for joinedRG.
|
|
// else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
|
|
// primprocRG[i] = joinedRG;
|
|
else
|
|
primprocRG[i] = projectionRG;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* The BPP Run messages have the following format:
|
|
*
|
|
* ISMPacketHeader
|
|
* -- Interleave field is used for DEC interleaving data
|
|
* -- Size field is used for the relative weight to process the message
|
|
* -----
|
|
* Session ID
|
|
* Step ID
|
|
* Unique ID
|
|
* Sequence #
|
|
* Iteration count
|
|
* Rid count
|
|
* If absolute rids are sent
|
|
* (rid count)x 64-bit absolute rids
|
|
* else
|
|
* RID range bitmap
|
|
* base RID
|
|
* (rid count)x 16-bit relative rids
|
|
* If values are sent
|
|
* (rid count)x 64-bit values
|
|
* (filter count)x run msgs for filter Commands
|
|
* (projection count)x run msgs for projection Commands
|
|
*/
|
|
|
|
// The deser counterpart function is BPP::resetBPP
|
|
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
|
|
{
|
|
ISMPacketHeader ism;
|
|
uint32_t i;
|
|
|
|
/* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
|
|
save bandwidth. They're completely unused otherwise. We need to throw out all unused
|
|
fields of every defined header. */
|
|
|
|
bs.restart();
|
|
|
|
memset((void*)&ism, 0, sizeof(ism));
|
|
ism.Command = BATCH_PRIMITIVE_RUN;
|
|
|
|
// TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
|
|
// defined for each PM.
|
|
ism.Interleave = pmNum;
|
|
// ism.Interleave = pmNum - 1;
|
|
|
|
/* ... and the Size field is used for the "weight" of processing a BPP
|
|
where 1 is one Command on one logical block. */
|
|
ism.Size = count * (filterCount + projectCount);
|
|
|
|
bs.append((uint8_t*)&ism, sizeof(ism));
|
|
|
|
/* The next 4 vars are for BPPSeeder; BPP itself skips them */
|
|
bs << sessionID;
|
|
bs << stepID;
|
|
bs << uniqueID;
|
|
bs << _priority;
|
|
|
|
// The weight is used by PrimProc thread pool algo
|
|
uint32_t weight = calculateBPPWeight();
|
|
bs << weight;
|
|
|
|
bs << dbRoot;
|
|
bs << count;
|
|
uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
|
|
bs << sentByEM;
|
|
|
|
if (_hasScan)
|
|
{
|
|
idbassert(ridCount == 0);
|
|
}
|
|
else
|
|
{
|
|
idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
|
|
}
|
|
|
|
bs << ridCount;
|
|
|
|
if (sendAbsRids)
|
|
bs.append((uint8_t*)absRids.get(), ridCount << 3);
|
|
else
|
|
{
|
|
bs << ridMap;
|
|
bs << baseRid;
|
|
bs.append((uint8_t*)relRids, ridCount << 1);
|
|
}
|
|
|
|
if (sendValues)
|
|
bs.append((uint8_t*)values, ridCount << 3);
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
filterSteps[i]->runCommand(bs);
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
projectSteps[i]->runCommand(bs);
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
|
|
{
|
|
ISMPacketHeader ism;
|
|
bs.restart();
|
|
|
|
memset((void*)&ism, 0, sizeof(ism));
|
|
ism.Command = BATCH_PRIMITIVE_RUN;
|
|
ism.Status = status;
|
|
ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);
|
|
|
|
bs.append((uint8_t*)&ism, sizeof(ism));
|
|
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
bs << count;
|
|
bs << ridCount;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
|
|
{
|
|
ISMPacketHeader ism;
|
|
|
|
// if (!(sessionID & 0x80000000))
|
|
// cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
|
|
// << rowsProcessed << " rows" << endl;
|
|
|
|
memset((void*)&ism, 0, sizeof(ism));
|
|
ism.Command = BATCH_PRIMITIVE_DESTROY;
|
|
|
|
bs.append((uint8_t*)&ism, sizeof(ism));
|
|
/* XXXPAT: We can get rid of sessionID and stepID;
|
|
it's there for easier debugging only */
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
|
|
{
|
|
pos = 0;
|
|
joinerNum = 0;
|
|
tJoiners = j;
|
|
|
|
PMJoinerCount = 0;
|
|
tlKeyLens.reset(new uint32_t[tJoiners.size()]);
|
|
|
|
for (uint32_t i = 0; i < tJoiners.size(); i++)
|
|
{
|
|
if (tJoiners[i]->inPM())
|
|
{
|
|
PMJoinerCount++;
|
|
smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
|
|
smallSideRGs.push_back(tJoiners[i]->getSmallRG());
|
|
|
|
if (tJoiners[i]->isTypelessJoin())
|
|
tlKeyLens[i] = tJoiners[i]->getKeyLength();
|
|
|
|
if (tJoiners[i]->hasFEFilter())
|
|
sendTupleJoinRowGroupData = true;
|
|
|
|
if (tJoiners[i]->smallOuterJoin())
|
|
hasSmallOuterJoin = true;
|
|
}
|
|
}
|
|
|
|
largeSideRG = tJoiners[0]->getLargeRG();
|
|
|
|
if (aggregatorPM || fe2)
|
|
{
|
|
sendTupleJoinRowGroupData = true;
|
|
#ifdef JLF_DEBUG
|
|
cout << "will send small side row data\n";
|
|
#endif
|
|
}
|
|
posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
|
|
memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
|
|
}
|
|
|
|
// helper fcn to interleave small side data by joinernum
|
|
bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
|
|
{
|
|
uint i;
|
|
// find the next joiner that still has more data to send. Set joinerNum & pos.
|
|
for (i = 0; i < PMJoinerCount; i++)
|
|
{
|
|
joinerNum = (joinerNum + 1) % PMJoinerCount;
|
|
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
|
|
break;
|
|
}
|
|
if (i == PMJoinerCount)
|
|
return false;
|
|
pos = posByJoinerNum[joinerNum];
|
|
return true;
|
|
}
|
|
|
|
/* This algorithm relies on the joiners being sorted by size atm */
|
|
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
|
|
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
|
|
{
|
|
uint32_t size = 0, toSend, i, j;
|
|
ISMPacketHeader ism;
|
|
Row r;
|
|
vector<Row::Pointer>* tSmallSide;
|
|
joiner::TypelessData tlData;
|
|
uint32_t smallKeyCol;
|
|
uint32_t largeKeyCol;
|
|
uint64_t smallkey;
|
|
bool isNull;
|
|
bool bSignedUnsigned;
|
|
|
|
bool moreMsgs = pickNextJoinerNum();
|
|
|
|
if (!moreMsgs)
|
|
{
|
|
/* last message */
|
|
// cout << "sending last joiner msg\n";
|
|
ism.Command = BATCH_PRIMITIVE_END_JOINER;
|
|
bs.load((uint8_t*)&ism, sizeof(ism));
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
return false;
|
|
}
|
|
|
|
memset((void*)&ism, 0, sizeof(ism));
|
|
tSmallSide = tJoiners[joinerNum]->getSmallSide();
|
|
size = tSmallSide->size();
|
|
|
|
#if 0
|
|
if (joinerNum == PMJoinerCount - 1 && pos == size)
|
|
{
|
|
/* last message */
|
|
// cout << "sending last joiner msg\n";
|
|
ism.Command = BATCH_PRIMITIVE_END_JOINER;
|
|
bs.load((uint8_t*) &ism, sizeof(ism));
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
pos = 0;
|
|
return false;
|
|
}
|
|
else if (pos == size)
|
|
{
|
|
joinerNum++;
|
|
tSmallSide = tJoiners[joinerNum]->getSmallSide();
|
|
size = tSmallSide->size();
|
|
pos = 0;
|
|
}
|
|
#endif
|
|
|
|
ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
|
|
bs.load((uint8_t*)&ism, sizeof(ism));
|
|
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
|
|
bs << (messageqcpp::ByteStream::quadbyte)stepID;
|
|
bs << uniqueID;
|
|
|
|
smallSideRGs[joinerNum].initRow(&r);
|
|
|
|
unsigned metasize = 12; // 12 = sizeof(struct JoinerElements)
|
|
|
|
if (tJoiners[joinerNum]->isTypelessJoin())
|
|
metasize = 0;
|
|
|
|
unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
|
|
toSend = std::min<unsigned int>(size - pos, rowunits);
|
|
bs << toSend;
|
|
bs << pos;
|
|
bs << joinerNum;
|
|
|
|
if (tJoiners[joinerNum]->isTypelessJoin())
|
|
{
|
|
utils::FixedAllocator fa(tlKeyLens[joinerNum], true);
|
|
|
|
for (i = pos; i < pos + toSend; i++)
|
|
{
|
|
r.setPointer((*tSmallSide)[i]);
|
|
isNull = false;
|
|
bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
|
|
|
|
for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
|
|
{
|
|
isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);
|
|
|
|
if (UNLIKELY(bSignedUnsigned))
|
|
{
|
|
// BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
|
|
// then it should not compare. Send null to PM to prevent compare
|
|
smallKeyCol = smallSideKeys[joinerNum][j];
|
|
largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];
|
|
|
|
if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
|
|
{
|
|
if (r.isUnsigned(smallKeyCol))
|
|
smallkey = r.getUintField(smallKeyCol);
|
|
else
|
|
smallkey = r.getIntField(smallKeyCol);
|
|
|
|
if (smallkey & 0x8000000000000000ULL)
|
|
{
|
|
isNull = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!isNull)
|
|
{
|
|
tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
|
|
tJoiners[joinerNum]->getLargeKeyColumns());
|
|
if (tlData.len == 0)
|
|
{
|
|
isNull = true;
|
|
}
|
|
}
|
|
bs << (uint8_t)isNull;
|
|
if (!isNull)
|
|
{
|
|
tlData.serialize(bs);
|
|
bs << i;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
#pragma pack(push, 1)
|
|
struct JoinerElements
|
|
{
|
|
int64_t key;
|
|
uint32_t value;
|
|
} * arr;
|
|
#pragma pack(pop)
|
|
bs.needAtLeast(toSend * sizeof(JoinerElements));
|
|
arr = (JoinerElements*)bs.getInputPtr();
|
|
|
|
smallKeyCol = smallSideKeys[joinerNum][0];
|
|
bSignedUnsigned =
|
|
r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
|
|
j = 0;
|
|
|
|
for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
|
|
{
|
|
r.setPointer((*tSmallSide)[i]);
|
|
|
|
if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
|
|
{
|
|
// Small side is a long double. Since CS can't store larger than DOUBLE,
|
|
// we need to convert to whatever type large side is -- double or int64
|
|
long double smallkeyld = r.getLongDoubleField(smallKeyCol);
|
|
switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
|
|
{
|
|
case CalpontSystemCatalog::DOUBLE:
|
|
case CalpontSystemCatalog::UDOUBLE:
|
|
case CalpontSystemCatalog::FLOAT:
|
|
case CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
|
|
{
|
|
smallkey = joblist::UBIGINTEMPTYROW;
|
|
}
|
|
else
|
|
{
|
|
double d = (double)smallkeyld;
|
|
smallkey = *(int64_t*)&d;
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
|
|
{
|
|
smallkey = joblist::UBIGINTEMPTYROW;
|
|
}
|
|
else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
|
|
{
|
|
smallkey = joblist::UBIGINTEMPTYROW;
|
|
}
|
|
else
|
|
{
|
|
smallkey = (int64_t)smallkeyld;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
else if (r.isUnsigned(smallKeyCol))
|
|
smallkey = r.getUintField(smallKeyCol);
|
|
else
|
|
smallkey = r.getIntField(smallKeyCol);
|
|
|
|
// If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
|
|
// against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
|
|
if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
|
|
smallkey = joblist::UBIGINTEMPTYROW;
|
|
|
|
arr[j].key = (int64_t)smallkey;
|
|
arr[j].value = i;
|
|
// cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
|
|
}
|
|
|
|
bs.advanceInputPtr(toSend * sizeof(JoinerElements));
|
|
}
|
|
|
|
if (sendTupleJoinRowGroupData)
|
|
{
|
|
RowGroup& smallSide = smallSideRGs[joinerNum];
|
|
RGData tmpData(smallSide, toSend);
|
|
Row tmpRow;
|
|
|
|
smallSide.setData(&tmpData);
|
|
smallSide.initRow(&tmpRow);
|
|
smallSide.getRow(0, &tmpRow);
|
|
|
|
for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
|
|
{
|
|
r.setPointer((*tSmallSide)[i]);
|
|
copyRow(r, &tmpRow);
|
|
}
|
|
|
|
smallSide.setRowCount(toSend);
|
|
tmpData.serialize(bs, smallSide.getDataSize());
|
|
}
|
|
|
|
pos += toSend;
|
|
posByJoinerNum[joinerNum] = pos;
|
|
return true;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
|
|
{
|
|
ot = ROW_GROUP;
|
|
projectionRG = rg;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
|
|
{
|
|
joinedRG = rg;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
|
|
{
|
|
sendAbsRids = false;
|
|
sendValues = false;
|
|
inputRG = rg;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
|
|
const rowgroup::RowGroup& argpm)
|
|
{
|
|
aggregatorPM = aggpm;
|
|
aggregateRGPM = argpm;
|
|
|
|
if (tJoiners.size() > 0)
|
|
sendTupleJoinRowGroupData = true;
|
|
}
|
|
|
|
/* OR hacks */
|
|
void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
|
|
{
|
|
bop = op;
|
|
|
|
if (op == BOP_OR && filterCount > 1)
|
|
{
|
|
for (int i = 1; i < filterCount; ++i)
|
|
{
|
|
ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());
|
|
|
|
if (colcmd != NULL)
|
|
colcmd->scan(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setForHJ(bool b)
|
|
{
|
|
forHJ = b;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
|
|
const RowGroup& input)
|
|
{
|
|
fe1 = fe;
|
|
fe1Input = input;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
|
|
const RowGroup& output)
|
|
{
|
|
fe2 = fe;
|
|
fe2Output = output;
|
|
|
|
if (tJoiners.size() > 0 && PMJoinerCount > 0)
|
|
sendTupleJoinRowGroupData = true;
|
|
}
|
|
|
|
const string BatchPrimitiveProcessorJL::toMiniString() const
|
|
{
|
|
ostringstream oss;
|
|
int i;
|
|
set<string> colset;
|
|
string col;
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
{
|
|
col = filterSteps[i]->getColName();
|
|
// FilterCommandJL has two referenced columns, needs special handling.
|
|
FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());
|
|
|
|
if (filterCmd == NULL)
|
|
{
|
|
colset.insert(col);
|
|
}
|
|
else
|
|
{
|
|
// is a FilterCommandJL
|
|
size_t sep = col.find(',');
|
|
colset.insert(col.substr(0, sep));
|
|
|
|
if (sep != string::npos)
|
|
colset.insert(col.substr(++sep));
|
|
}
|
|
}
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
col = projectSteps[i]->getColName();
|
|
colset.insert(col);
|
|
}
|
|
|
|
set<string>::iterator iter = colset.begin();
|
|
oss << '(' << *iter++;
|
|
|
|
while (iter != colset.end())
|
|
oss << ',' << *iter++;
|
|
|
|
oss << ')';
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
|
|
{
|
|
joinFERG = rg;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
|
|
{
|
|
ISMPacketHeader ism;
|
|
|
|
memset((void*)&ism, 0, sizeof(ism));
|
|
ism.Command = BATCH_PRIMITIVE_ABORT;
|
|
|
|
bs->load((uint8_t*)&ism, sizeof(ism));
|
|
*bs << uniqueID;
|
|
}
|
|
|
|
void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
|
|
{
|
|
if (aggregatorPM)
|
|
aggregateRGPM.setUseStringTable(b);
|
|
else if (fe2)
|
|
fe2Output.setUseStringTable(b);
|
|
else
|
|
projectionRG.setUseStringTable(b);
|
|
}
|
|
|
|
} // namespace joblist
|