1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-columnstore-engine/dbcon/joblist/batchprimitiveprocessor-jl.cpp
drrtuy 8ae5a3da40
Fix/mcol 5787 rgdata buffer max size dev (#3325)
* fix(rowgroup): RGData now uses uint64_t counter for the fixed sizes columns data buf.
	The buffer can utilize > 4GB RAM that is necessary for PM side join.
	RGData ctor uses uint32_t allocating data buffer.
 	This fact causes implicit heap overflow.

* feat(bytestream,serdes): BS buffer size type is uint64_t
	This necessary to handle 64bit RGData, that comes as
	a separate patch. The pair of patches would allow to
	have PM joins when SmallSide size > 4GB.

* feat(bytestream,serdes): Distribute BS buf size data type change to avoid implicit data type narrowing

* feat(rowgroup): this returns bits lost during cherry-pick. The bits lost caused the first RGData::serialize to crash a process
2024-11-09 19:44:02 +00:00

1780 lines
46 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
// C++ Implementation: batchprimitiveprocessor
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <unistd.h>
//#define NDEBUG
#include <cassert>
#include <stdexcept>
#include <iostream>
#include <set>
using namespace std;
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
namespace bu = boost::uuids;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "bpp-jl.h"
#include "jlf_common.h"
using namespace messageqcpp;
using namespace rowgroup;
using namespace joiner;
#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX
namespace joblist
{
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
: ot(BPS_ELEMENT_TYPE)
, needToSetLBID(true)
, count(1)
, baseRid(0)
, ridCount(0)
, needStrValues(false)
, wideColumnsWidths(0)
, filterCount(0)
, projectCount(0)
, needRidsAtDelivery(false)
, ridMap(0)
, sendValues(false)
, sendAbsRids(false)
, _hasScan(false)
, LBIDTrace(false)
, tupleLength(0)
, status(0)
, valueColumn(0)
, sendTupleJoinRowGroupData(false)
, bop(BOP_AND)
, forHJ(false)
, threadCount(1)
, fJoinerChunkSize(rm->getJlJoinerChunkSize())
, hasSmallOuterJoin(false)
, _priority(1)
{
PMJoinerCount = 0;
uuid = bu::nil_generator()();
}
BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
{
}
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
vector<BRM::LBID_t> lastScannedLBID,
bool hasAuxCol,
const std::vector<BRM::EMEntry>& extentsAux,
execplan::CalpontSystemCatalog::OID oidAux)
{
SCommand cc;
tableOID = scan.tableOid();
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(scan.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
_hasScan = true;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == scan.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
{
SCommand cc;
tableOID = pcs.tableOid();
cc.reset(new PseudoCCJL(pcs));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(pcs.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
idbassert(sessionID == pcs.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
{
SCommand cc;
tableOID = step.tableOid();
cc.reset(new ColumnCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
{
SCommand cc;
tableOID = step.tableOid();
if (filterCount == 0)
{
sendAbsRids = true;
sendValues = true;
absRids.reset(new uint64_t[8192]);
}
cc.reset(new DictStepJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
#if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
if (filterSteps.size() > 0)
{
size_t stepsIndex = filterSteps.size() - 1;
SCommand prevCC = filterSteps[stepsIndex];
ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
if (pcc && ccc)
{
filterSteps[stepsIndex].reset(
new ColumnCommandJL(*pcc, *ccc)); // column command will use same filters.
}
}
#endif
filterSteps.push_back(cc);
filterCount++;
needStrValues = true;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
{
SCommand cc;
tableOID = step.tableOid();
cc.reset(new FilterCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
{
SCommand cc;
cc.reset(new PseudoCCJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
{
SCommand cc;
cc.reset(new ColumnCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
{
SCommand cc;
cc.reset(new PassThruCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
if (filterCount == 0)
sendValues = true;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
{
SCommand cc;
cc.reset(new RTSCommandJL(col, dict));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(dict.tupleId());
cc->setQueryUuid(col.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
needStrValues = true;
idbassert(sessionID == col.sessionId());
idbassert(sessionID == dict.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
{
SCommand cc;
cc.reset(new RTSCommandJL(p, dict));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(dict.tupleId());
cc->setQueryUuid(p.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
needStrValues = true;
if (filterCount == 0)
{
sendValues = true;
sendAbsRids = true;
absRids.reset(new uint64_t[8192]);
}
idbassert(sessionID == p.sessionId());
idbassert(sessionID == dict.sessionId());
}
#if 0
void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
{
idbassert(sessionID == ds.sessionId());
uint32_t i;
templateTB = ds.fEmptyTableBand;
tableOID = ds.fTableOID;
tableColumnCount = templateTB.getColumnCount();
idbassert(tableColumnCount > 0);
tablePositions.reset(new int[tableColumnCount]);
for (i = 0; i < tableColumnCount; i++)
tablePositions[i] = -1;
idbassert(projectCount <= projectSteps.size());
/* In theory, tablePositions maps a column's table position to a projection step
-1 means it's a null column */
CalpontSystemCatalog::OID oid;
int idx;
for (i = 0; i < projectCount; i++)
{
oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());
if (oid > 0)
{
idx = templateTB.find(oid);
if (idx >= 0)
{
tablePositions[idx] = i;
}
else
{
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
" in tableband at pjstep idx " << i << endl;
}
}
else
{
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
" doesn't have a valid OID" << endl;
}
}
}
#endif
void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
{
uint32_t i;
// rowCounter++;
if (needToSetLBID)
{
needToSetLBID = false;
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(et.first, dbroot);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(et.first, dbroot);
baseRid = et.first & 0xffffffffffffe000ULL;
}
// TODO: get rid of magics
if (sendAbsRids)
absRids[ridCount] = et.first;
else
{
relRids[ridCount] = et.first & 0x1fff; // 8192 rows per logical block
ridMap |= 1 << (relRids[ridCount] >> 9); // LSB -> 0-511, MSB -> 7680-8191
}
if (sendValues)
{
// cout << "adding value " << et.second << endl;
values[ridCount] = et.second;
}
ridCount++;
idbassert(ridCount <= 8192);
}
#if 0
void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
{
uint32_t i;
rowgroup::Row r;
inputRG.setData(rg.getData());
inputRG.initRow(&r);
inputRG.getRow(0, &r);
if (needToSetLBID)
{
needToSetLBID = false;
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(r.getRid(), 1);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(r.getRid(), 1);
baseRid = r.getRid() & 0xffffffffffffe000ULL;
}
}
#endif
void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
{
if (filterCount == 0)
throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");
addElementType(ElementType(et.first, et.first), dbroot);
}
/**
* When the output type is ElementType, the messages have the following format:
*
* ISMPacketHeader (ignored)
* PrimitiveHeader (ignored)
* --
* If the BPP starts with a scan
* casual partitioning data from the scanned column
* Base Rid for the returned logical block
* rid count
* (rid count)x 16-bit rids
* (rid count)x 64-bit values
* If there's a join performed by this BPP
* pre-Join rid count (for logging purposes only)
* small-side new match count
* (match count)x ElementTypes
* cached IO count
* physical IO count
* block touches
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
uint64_t* lbid, int64_t* min, int64_t* max,
uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const
{
uint32_t i;
uint16_t l_count;
uint64_t l_baseRid;
uint16_t* rids;
uint64_t* vals;
uint8_t* buf;
uint64_t tmp64;
uint8_t tmp8;
/* skip the header */
idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_baseRid;
in >> l_count;
// rowsProcessed += l_count;
idbassert(l_count <= 8192);
out->resize(l_count);
buf = (uint8_t*)in.buf();
rids = (uint16_t*)buf;
vals = (uint64_t*)(buf + (l_count << 1));
idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
in.advance((l_count << 1) + (l_count << 3));
for (i = 0; i < l_count; ++i)
{
(*out)[i].first = rids[i] + l_baseRid;
// if (tableOID >= 3000)
// idbassert((*out)[i].first > 1023);
(*out)[i].second = vals[i];
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
// cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
idbassert(in.length() == 0);
}
/**
* When the output type is StringElementType the messages have the following format:
*
* ISMPacketHeader (ignored)
* PrimitiveHeader (ignored)
* ---
* If the BPP starts with a scan
* Casual partitioning data from the column scanned
* Rid count
* (rid count)x 64-bit absolute rids
* (rid count)x serialized strings
* cached IO count
* physical IO count
* blocks touched
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const
{
uint32_t i;
uint16_t l_count;
uint64_t* l_absRids;
uint64_t tmp64;
uint8_t tmp8;
// cout << "get String ETs uniqueID\n";
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_count;
// cout << "parsing " << l_count << " strings\n";
l_absRids = (uint64_t*)in.buf();
out->resize(l_count);
in.advance(l_count << 3);
for (i = 0; i < l_count; ++i)
{
(*out)[i].first = l_absRids[i];
in >> (*out)[i].second;
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
// cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
idbassert(in.length() == 0);
}
/**
* When the output type is Tuples, the input message format is the same
* as when the output type is TableBands
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
{
uint32_t i, j, pos, len;
uint16_t l_rowCount;
uint64_t l_baseRid;
uint16_t* l_relRids;
uint64_t absRids[8192];
// const uint8_t* columnData[projectCount];
const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
memset(columnData, 0, projectCount * sizeof(uint8_t*));
const uint8_t* buf;
// uint32_t colLengths[projectCount];
uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
uint64_t tmp64;
uint8_t tmp8;
// cout << "getTuples msg is " << in.length() << " bytes\n";
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_rowCount;
// cout << "read " << l_rowCount << " rows\n";
if (needRidsAtDelivery)
{
in >> l_baseRid;
l_relRids = (uint16_t*)in.buf();
for (i = 0; i < l_rowCount; i++)
absRids[i] = l_relRids[i] + l_baseRid;
in.advance(l_rowCount << 1);
}
/* Set up pointers to the column data */
pos = 0;
buf = in.buf();
for (i = 0; i < projectCount; i++)
{
colLengths[i] = *((uint32_t*)&buf[pos]);
pos += 4;
columnData[i] = &buf[pos];
// cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
pos += colLengths[i];
idbassert(pos < in.length());
}
in.advance(pos);
out->resize(l_rowCount);
for (i = 0; i < l_rowCount; i++)
{
(*out)[i].first = absRids[i];
(*out)[i].second = new char[tupleLength];
for (j = 0, pos = 0; j < projectCount; j++)
{
idbassert(pos + colWidths[j] <= tupleLength);
if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
{
len = *((uint32_t*)columnData[j]);
columnData[j] += 4;
memcpy(&(*out)[i].second[pos], columnData[j], len);
pos += len;
columnData[j] += len;
// insert padding...
memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
pos += colWidths[j] - len;
}
else
{
switch (colWidths[j])
{
case 8:
*((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
columnData[j] += 8;
pos += 8;
break;
case 4:
*((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
columnData[j] += 4;
pos += 4;
break;
case 2:
*((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
columnData[j] += 2;
pos += 4;
break;
case 1:
(*out)[i].second[pos] = (char)*columnData[j];
columnData[j]++;
pos++;
break;
// TODO MCOL-641
case 16:
// fallthrough
default:
cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
throw logic_error("BPP::getTuples(): bad column width");
}
}
}
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
idbassert(in.length() == 0);
}
bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
{
const uint8_t* data = in.buf();
uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader); // skip the headers
ISMPacketHeader* hdr = (ISMPacketHeader*)(data);
if (_hasScan && in.length() > offset)
{
// This is a legitimate error message sent by PrimProc
// so we need to return to allow upper layer to throw an error
// if needed.
if (hdr->Status > 0)
{
return true;
}
if (data[offset] != 0)
offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
1; // skip the CP data with wide min/max values (16/32 bytes each). we also skip
// cpFromDictScan flag.
else
offset += CP_FLAG_AND_LBID; // skip only the "valid CP data" & LBID bytes
}
// Throw b/c PP throws and sends here error msg.
// See BatchPrimitiveProcessor::writeErrorMsg() for details.
// The inversion of the assert used here previously.
if (in.length() <= offset)
{
if (hdr->Status > 0)
{
throw std::runtime_error(" an exception originally thrown by PrimProc: ");
}
throw std::runtime_error(
" an exception because there is not enough \
data in the Primitive message from PrimProc.");
}
return (data[offset] != 0);
}
void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
{
RGData rgData;
uint32_t count, i;
*in >> count;
for (i = 0; i < count; i++)
{
rgData.deserialize(*in, true);
out->push_back(rgData);
}
}
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
uint64_t* lbid, bool* fromDictScan, int128_t* min,
int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
bool* hasWideColumn,
const execplan::CalpontSystemCatalog::ColType& colType) const
{
uint64_t tmp64;
int128_t tmp128;
uint8_t tmp8;
RGData rgData;
uint32_t rowCount;
RowGroup& org = primprocRG[threadID];
out->clear();
if (in.length() == 0)
{
// done, return an empty RG
rgData = RGData(org, 0);
org.setData(&rgData);
org.resetRowGroup(0);
out->push_back(rgData);
*cachedIO = 0;
*physIO = 0;
*touchedBlocks = 0;
return;
}
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp8;
*fromDictScan = tmp8 != 0;
in >> tmp8;
*hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
if (UNLIKELY(*hasWideColumn))
{
idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
if (LIKELY(colType.isWideDecimalType()))
{
in >> tmp128;
*min = tmp128;
in >> tmp128;
*max = tmp128;
}
else
{
std::ostringstream oss;
oss << __func__ << " WARNING!!! Not implemented for the data type ";
oss << colType.colDataType << std::endl;
std::cout << oss.str();
idbassert(false);
}
}
else
{
in >> tmp64;
*min = static_cast<int128_t>(tmp64);
in >> tmp64;
*max = static_cast<int128_t>(tmp64);
}
}
else
in >> *lbid;
}
in >> tmp8;
*countThis = (tmp8 != 0);
/* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
* For later... */
if (aggregatorPM)
{
deserializeAggregateResult(&in, out);
// for (uint32_t z = 0; z < out->size(); z++) {
// org.setData(&(*out)[z]);
// cout << "BPPJL: " << org.toString() << endl;
//}
}
else
{
rgData.deserialize(in, true);
out->push_back(rgData);
org.setData(&rgData);
rowCount = org.getRowCount();
bool pmSendsMatchesAnyway =
(hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));
if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
{
std::shared_ptr<vector<uint32_t>[]> joinResults;
uint32_t i, j;
if (pmSendsMatchesAnyway)
{
uint16_t joinRowCount;
in >> joinRowCount;
rowCount = joinRowCount;
}
for (j = 0; j < PMJoinerCount; j++)
{
/* Reuse the result space if possible */
joinResults = tJoiners[j]->getPMJoinArrays(threadID);
if (joinResults.get() == NULL)
{
auto v = new vector<uint32_t>[8192];
joinResults.reset(v);
tJoiners[j]->setPMJoinResults(joinResults, threadID);
}
for (i = 0; i < rowCount; i++)
deserializeInlineVector<uint32_t>(in, joinResults[i]);
if (tJoiners[j]->smallOuterJoin())
tJoiners[j]->markMatches(threadID, rowCount);
}
}
}
if (*countThis)
{
// cout << "grabbing io stats\n";
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
}
else
{
*cachedIO = 0;
*physIO = 0;
*touchedBlocks = 0;
}
idbassert(in.length() == 0);
}
RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
{
RGData ret;
rowgroup::RowGroup rg(projectionRG);
ret = RGData(rg, 0);
rg.setData(&ret);
// rg.convertToInlineDataInPlace();
rg.resetRowGroup(0);
rg.setStatus(error);
return ret;
// return ret->rowData;
}
void BatchPrimitiveProcessorJL::reset()
{
ridCount = 0;
ridMap = 0;
needToSetLBID = true;
}
void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
{
uint32_t i;
dbRoot = scannedExtent.dbRoot;
baseRid = rowgroup::convertToRid(
scannedExtent.partitionNum, scannedExtent.segmentNum,
scannedExtent.blockOffset / (scannedExtent.range.size * 1024), // the extent #
(l - scannedExtent.range.start) / scannedExtent.range.size); // the logical block #
/*
cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
<< " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
" blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
*/
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(baseRid, dbRoot);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(baseRid, dbRoot);
}
string BatchPrimitiveProcessorJL::toString() const
{
ostringstream ret;
uint32_t i;
ret << "BatchPrimitiveProcessorJL:" << endl;
if (!_hasScan)
{
if (sendValues)
ret << " -- serializing values" << endl;
if (sendAbsRids)
ret << " -- serializing absolute rids" << endl;
else
ret << " -- serializing relative rids" << endl;
}
else
ret << " -- scan driven" << endl;
ret << " " << (int)filterCount << " filter steps:\n";
for (i = 0; i < filterCount; i++)
ret << " " << filterSteps[i]->toString() << endl;
ret << " " << (int)projectCount << " projection steps:\n";
for (i = 0; i < projectCount; i++)
ret << " " << projectSteps[i]->toString() << endl;
return ret.str();
}
/**
* The creation messages have the following format:
*
* ISMPacketHeader (necessary for DEC and ReadThread classes)
* ---
* output type (ElementType, StringElementType, etc)
* version #
* transaction #
* session # (unnecessary except for debugging)
* step ID ( same )
* unique ID (uniquely identifies the DEC queue on the UM)
* 8-bit flags
* if there's a join...
* # of elements in the Joiner object
* a flag whether or not to match every element (for inner joins)
* filter step count
* (filter count)x serialized Commands
* projection step count
* (projection count)x serialized Commands
*/
void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
{
ISMPacketHeader ism;
uint32_t i;
uint16_t flags = 0;
ism.Command = BATCH_PRIMITIVE_CREATE;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (uint8_t)ot;
bs << (messageqcpp::ByteStream::quadbyte)txnID;
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
bs << versionInfo;
if (needStrValues)
flags |= NEED_STR_VALUES;
if (sendAbsRids)
flags |= GOT_ABS_RIDS;
if (sendValues)
flags |= GOT_VALUES;
if (LBIDTrace)
flags |= LBID_TRACE;
if (needRidsAtDelivery)
flags |= SEND_RIDS_AT_DELIVERY;
if (tJoiners.size() > 0)
flags |= HAS_JOINER;
if (sendTupleJoinRowGroupData)
flags |= JOIN_ROWGROUP_DATA;
if (wideColumnsWidths)
flags |= HAS_WIDE_COLUMNS;
bs << flags;
if (wideColumnsWidths)
bs << wideColumnsWidths;
bs << bop;
bs << (uint8_t)(forHJ ? 1 : 0);
if (ot == ROW_GROUP)
{
bs << projectionRG;
// cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;
/* F&E serialization */
if (fe1)
{
bs << (uint8_t)1;
bs << *fe1;
bs << fe1Input;
}
else
bs << (uint8_t)0;
if (fe2)
{
bs << (uint8_t)1;
bs << *fe2;
bs << fe2Output;
}
else
bs << (uint8_t)0;
}
/* if HAS_JOINER, send the init params */
if (flags & HAS_JOINER)
{
bs << (uint32_t)maxPmJoinResultCount;
if (ot == ROW_GROUP)
{
idbassert(tJoiners.size() > 0);
bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;
bool atLeastOneFE = false;
#ifdef JLF_DEBUG
cout << "PMJoinerCount = " << PMJoinerCount << endl;
#endif
bool smallSideRGSent = false;
for (i = 0; i < PMJoinerCount; i++)
{
bs << (uint32_t)tJoiners[i]->size();
bs << tJoiners[i]->getJoinType();
// bs << (uint64_t) tJoiners[i]->smallNullValue();
bs << (uint8_t)tJoiners[i]->isTypelessJoin();
if (tJoiners[i]->hasFEFilter())
{
atLeastOneFE = true;
#ifdef JLF_DEBUG
cout << "serializing join FE object\n";
#endif
bs << *tJoiners[i]->getFcnExpFilter();
}
if (!tJoiners[i]->isTypelessJoin())
{
bs << (uint64_t)tJoiners[i]->smallNullValue();
bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
// cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
}
else
{
serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
bs << (uint32_t)tJoiners[i]->getKeyLength();
// MCOL-4173 Notify PP if smallSide and largeSide have different column widths
// and send smallSide RG to PP.
bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
bs << (uint8_t)joinHasSkewedKeyColumn;
if (!smallSideRGSent && joinHasSkewedKeyColumn)
{
idbassert(!smallSideRGs.empty());
bs << smallSideRGs[0];
serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
smallSideRGSent = true;
}
}
}
if (atLeastOneFE)
bs << joinFERG;
if (sendTupleJoinRowGroupData)
{
#ifdef JLF_DEBUG
cout << "sending smallside data\n";
#endif
serializeVector<RowGroup>(bs, smallSideRGs);
bs << largeSideRG;
bs << joinedRG; // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
// cout << "joined RG: " << joinedRG.toString() << endl;
}
}
}
bs << filterCount;
for (i = 0; i < filterCount; ++i)
{
// cout << "serializing step " << i << endl;
filterSteps[i]->createCommand(bs);
}
bs << projectCount;
for (i = 0; i < projectCount; ++i)
projectSteps[i]->createCommand(bs);
// aggregate step only when output is row group
if (ot == ROW_GROUP)
{
if (aggregatorPM.get() != NULL)
{
bs << (uint8_t)1;
bs << aggregateRGPM;
bs << *(aggregatorPM.get());
}
else
{
bs << (uint8_t)0;
}
}
// decide which rowgroup is received by PrimProc
if (ot == ROW_GROUP)
{
primprocRG.reset(new RowGroup[threadCount]);
for (uint32_t i = 0; i < threadCount; i++)
if (aggregatorPM)
primprocRG[i] = aggregateRGPM;
else if (fe2)
primprocRG[i] = fe2Output;
// This shouldn't be necessary. As of 2-17-14, PrimProc
// will only send joined results if fe2 || aggregatorPM,
// so it will never send back data for joinedRG.
// else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
// primprocRG[i] = joinedRG;
else
primprocRG[i] = projectionRG;
}
}
/**
* The BPP Run messages have the following format:
*
* ISMPacketHeader
* -- Interleave field is used for DEC interleaving data
* -- Size field is used for the relative weight to process the message
* -----
* Session ID
* Step ID
* Unique ID
* Sequence #
* Iteration count
* Rid count
* If absolute rids are sent
* (rid count)x 64-bit absolute rids
* else
* RID range bitmap
* base RID
* (rid count)x 16-bit relative rids
* If values are sent
* (rid count)x 64-bit values
* (filter count)x run msgs for filter Commands
* (projection count)x run msgs for projection Commands
*/
// The deser counterpart function is BPP::resetBPP
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
{
ISMPacketHeader ism;
uint32_t i;
/* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
save bandwidth. They're completely unused otherwise. We need to throw out all unused
fields of every defined header. */
bs.restart();
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_RUN;
// TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
// defined for each PM.
ism.Interleave = pmNum;
// ism.Interleave = pmNum - 1;
/* ... and the Size field is used for the "weight" of processing a BPP
where 1 is one Command on one logical block. */
ism.Size = count * (filterCount + projectCount);
bs.append((uint8_t*)&ism, sizeof(ism));
/* The next 4 vars are for BPPSeeder; BPP itself skips them */
bs << sessionID;
bs << stepID;
bs << uniqueID;
bs << _priority;
// The weight is used by PrimProc thread pool algo
uint32_t weight = calculateBPPWeight();
bs << weight;
bs << dbRoot;
bs << count;
uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
bs << sentByEM;
if (_hasScan)
{
idbassert(ridCount == 0);
}
else
{
idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
}
bs << ridCount;
if (sendAbsRids)
bs.append((uint8_t*)absRids.get(), ridCount << 3);
else
{
bs << ridMap;
bs << baseRid;
bs.append((uint8_t*)relRids, ridCount << 1);
}
if (sendValues)
bs.append((uint8_t*)values, ridCount << 3);
for (i = 0; i < filterCount; i++)
filterSteps[i]->runCommand(bs);
for (i = 0; i < projectCount; i++)
projectSteps[i]->runCommand(bs);
}
void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
{
ISMPacketHeader ism;
bs.restart();
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_RUN;
ism.Status = status;
ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);
bs.append((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
bs << count;
bs << ridCount;
}
void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
{
ISMPacketHeader ism;
// if (!(sessionID & 0x80000000))
// cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
// << rowsProcessed << " rows" << endl;
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_DESTROY;
bs.append((uint8_t*)&ism, sizeof(ism));
/* XXXPAT: We can get rid of sessionID and stepID;
it's there for easier debugging only */
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
}
void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
{
pos = 0;
joinerNum = 0;
tJoiners = j;
PMJoinerCount = 0;
tlKeyLens.reset(new uint32_t[tJoiners.size()]);
for (uint32_t i = 0; i < tJoiners.size(); i++)
{
if (tJoiners[i]->inPM())
{
PMJoinerCount++;
smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
smallSideRGs.push_back(tJoiners[i]->getSmallRG());
if (tJoiners[i]->isTypelessJoin())
tlKeyLens[i] = tJoiners[i]->getKeyLength();
if (tJoiners[i]->hasFEFilter())
sendTupleJoinRowGroupData = true;
if (tJoiners[i]->smallOuterJoin())
hasSmallOuterJoin = true;
}
}
largeSideRG = tJoiners[0]->getLargeRG();
if (aggregatorPM || fe2)
{
sendTupleJoinRowGroupData = true;
#ifdef JLF_DEBUG
cout << "will send small side row data\n";
#endif
}
posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
}
// helper fcn to interleave small side data by joinernum
bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
{
uint i;
// find the next joiner that still has more data to send. Set joinerNum & pos.
for (i = 0; i < PMJoinerCount; i++)
{
joinerNum = (joinerNum + 1) % PMJoinerCount;
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
break;
}
if (i == PMJoinerCount)
return false;
pos = posByJoinerNum[joinerNum];
return true;
}
/* This algorithm relies on the joiners being sorted by size atm */
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
{
uint32_t size = 0, toSend, i, j;
ISMPacketHeader ism;
Row r;
vector<Row::Pointer>* tSmallSide;
joiner::TypelessData tlData;
uint32_t smallKeyCol;
uint32_t largeKeyCol;
uint64_t smallkey;
bool isNull;
bool bSignedUnsigned;
bool moreMsgs = pickNextJoinerNum();
if (!moreMsgs)
{
/* last message */
// cout << "sending last joiner msg\n";
ism.Command = BATCH_PRIMITIVE_END_JOINER;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
return false;
}
memset((void*)&ism, 0, sizeof(ism));
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
#if 0
if (joinerNum == PMJoinerCount - 1 && pos == size)
{
/* last message */
// cout << "sending last joiner msg\n";
ism.Command = BATCH_PRIMITIVE_END_JOINER;
bs.load((uint8_t*) &ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
pos = 0;
return false;
}
else if (pos == size)
{
joinerNum++;
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
pos = 0;
}
#endif
ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
smallSideRGs[joinerNum].initRow(&r);
unsigned metasize = 12; // 12 = sizeof(struct JoinerElements)
if (tJoiners[joinerNum]->isTypelessJoin())
metasize = 0;
unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
toSend = std::min<unsigned int>(size - pos, rowunits);
bs << toSend;
bs << pos;
bs << joinerNum;
if (tJoiners[joinerNum]->isTypelessJoin())
{
utils::FixedAllocator fa(tlKeyLens[joinerNum], true);
for (i = pos; i < pos + toSend; i++)
{
r.setPointer((*tSmallSide)[i]);
isNull = false;
bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
{
isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);
if (UNLIKELY(bSignedUnsigned))
{
// BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
// then it should not compare. Send null to PM to prevent compare
smallKeyCol = smallSideKeys[joinerNum][j];
largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];
if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
{
if (r.isUnsigned(smallKeyCol))
smallkey = r.getUintField(smallKeyCol);
else
smallkey = r.getIntField(smallKeyCol);
if (smallkey & 0x8000000000000000ULL)
{
isNull = true;
break;
}
}
}
}
if (!isNull)
{
tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
tJoiners[joinerNum]->getLargeKeyColumns());
if (tlData.len == 0)
{
isNull = true;
}
}
bs << (uint8_t)isNull;
if (!isNull)
{
tlData.serialize(bs);
bs << i;
}
}
}
else
{
#pragma pack(push, 1)
struct JoinerElements
{
int64_t key;
uint32_t value;
} * arr;
#pragma pack(pop)
bs.needAtLeast(toSend * sizeof(JoinerElements));
arr = (JoinerElements*)bs.getInputPtr();
smallKeyCol = smallSideKeys[joinerNum][0];
bSignedUnsigned =
r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
j = 0;
for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
{
r.setPointer((*tSmallSide)[i]);
if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
{
// Small side is a long double. Since CS can't store larger than DOUBLE,
// we need to convert to whatever type large side is -- double or int64
long double smallkeyld = r.getLongDoubleField(smallKeyCol);
switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else
{
double d = (double)smallkeyld;
smallkey = *(int64_t*)&d;
}
break;
}
default:
{
if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else
{
smallkey = (int64_t)smallkeyld;
}
break;
}
}
}
else if (r.isUnsigned(smallKeyCol))
smallkey = r.getUintField(smallKeyCol);
else
smallkey = r.getIntField(smallKeyCol);
// If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
// against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
smallkey = joblist::UBIGINTEMPTYROW;
arr[j].key = (int64_t)smallkey;
arr[j].value = i;
// cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
}
bs.advanceInputPtr(toSend * sizeof(JoinerElements));
}
if (sendTupleJoinRowGroupData)
{
RowGroup& smallSide = smallSideRGs[joinerNum];
RGData tmpData(smallSide, toSend);
Row tmpRow;
smallSide.setData(&tmpData);
smallSide.initRow(&tmpRow);
smallSide.getRow(0, &tmpRow);
for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
{
r.setPointer((*tSmallSide)[i]);
copyRow(r, &tmpRow);
}
smallSide.setRowCount(toSend);
tmpData.serialize(bs, smallSide.getDataSize());
}
pos += toSend;
posByJoinerNum[joinerNum] = pos;
return true;
}
void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
{
ot = ROW_GROUP;
projectionRG = rg;
}
void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
{
joinedRG = rg;
}
void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
{
sendAbsRids = false;
sendValues = false;
inputRG = rg;
}
void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
const rowgroup::RowGroup& argpm)
{
aggregatorPM = aggpm;
aggregateRGPM = argpm;
if (tJoiners.size() > 0)
sendTupleJoinRowGroupData = true;
}
/* OR hacks */
void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
{
bop = op;
if (op == BOP_OR && filterCount > 1)
{
for (int i = 1; i < filterCount; ++i)
{
ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());
if (colcmd != NULL)
colcmd->scan(false);
}
}
}
void BatchPrimitiveProcessorJL::setForHJ(bool b)
{
forHJ = b;
}
void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
const RowGroup& input)
{
fe1 = fe;
fe1Input = input;
}
void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
const RowGroup& output)
{
fe2 = fe;
fe2Output = output;
if (tJoiners.size() > 0 && PMJoinerCount > 0)
sendTupleJoinRowGroupData = true;
}
const string BatchPrimitiveProcessorJL::toMiniString() const
{
ostringstream oss;
int i;
set<string> colset;
string col;
for (i = 0; i < filterCount; i++)
{
col = filterSteps[i]->getColName();
// FilterCommandJL has two referenced columns, needs special handling.
FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());
if (filterCmd == NULL)
{
colset.insert(col);
}
else
{
// is a FilterCommandJL
size_t sep = col.find(',');
colset.insert(col.substr(0, sep));
if (sep != string::npos)
colset.insert(col.substr(++sep));
}
}
for (i = 0; i < projectCount; i++)
{
col = projectSteps[i]->getColName();
colset.insert(col);
}
set<string>::iterator iter = colset.begin();
oss << '(' << *iter++;
while (iter != colset.end())
oss << ',' << *iter++;
oss << ')';
return oss.str();
}
void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
{
joinFERG = rg;
}
void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
{
ISMPacketHeader ism;
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_ABORT;
bs->load((uint8_t*)&ism, sizeof(ism));
*bs << uniqueID;
}
void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
{
if (aggregatorPM)
aggregateRGPM.setUseStringTable(b);
else if (fe2)
fe2Output.setUseStringTable(b);
else
projectionRG.setUseStringTable(b);
}
} // namespace joblist