1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/batchprimitiveprocessor-jl.cpp
Sergey Zefirov 69b8e1c779 feat(extent-elimination)!: re-enable extent-elimination for dictionary columns scanning
This is "productization" of an old code that would enable extent
elimination for dictionary columns.

This concrete patch enables it, fixes perfomance degradation (main
problem with old code) and also fixes incorrect behavior of cpimport.
2023-11-17 17:14:35 +03:00

1797 lines
46 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: batchprimitiveprocessor-jl.cpp 9705 2013-07-17 20:06:07Z pleblanc $
// C++ Implementation: batchprimitiveprocessor
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <unistd.h>
//#define NDEBUG
#include <cassert>
#include <stdexcept>
#include <iostream>
#include <set>
using namespace std;
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
namespace bu = boost::uuids;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "bpp-jl.h"
#include "jlf_common.h"
using namespace messageqcpp;
using namespace rowgroup;
using namespace joiner;
#define XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX
namespace joblist
{
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
: ot(BPS_ELEMENT_TYPE)
, needToSetLBID(true)
, count(1)
, baseRid(0)
, ridCount(0)
, needStrValues(false)
, wideColumnsWidths(0)
, filterCount(0)
, projectCount(0)
, needRidsAtDelivery(false)
, ridMap(0)
, sendValues(false)
, sendAbsRids(false)
, _hasScan(false)
, LBIDTrace(false)
, tupleLength(0)
, status(0)
, sendRowGroups(false)
, valueColumn(0)
, sendTupleJoinRowGroupData(false)
, bop(BOP_AND)
, forHJ(false)
, threadCount(1)
, fJoinerChunkSize(rm->getJlJoinerChunkSize())
, hasSmallOuterJoin(false)
, _priority(1)
{
PMJoinerCount = 0;
uuid = bu::nil_generator()();
}
BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
{
}
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
vector<BRM::LBID_t> lastScannedLBID,
bool hasAuxCol,
const std::vector<BRM::EMEntry>& extentsAux,
execplan::CalpontSystemCatalog::OID oidAux)
{
SCommand cc;
tableOID = scan.tableOid();
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(scan.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
_hasScan = true;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == scan.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
{
SCommand cc;
tableOID = pcs.tableOid();
cc.reset(new PseudoCCJL(pcs));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(pcs.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
idbassert(sessionID == pcs.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
{
SCommand cc;
tableOID = step.tableOid();
cc.reset(new ColumnCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step)
{
SCommand cc;
tableOID = step.tableOid();
if (filterCount == 0 && !sendRowGroups)
{
sendAbsRids = true;
sendValues = true;
absRids.reset(new uint64_t[8192]);
}
cc.reset(new DictStepJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
#if defined(XXX_BATCHPRIMPROC_TOKENS_RANGES_XXX)
if (filterSteps.size() > 0)
{
size_t stepsIndex = filterSteps.size() - 1;
SCommand prevCC = filterSteps[stepsIndex];
ColumnCommandJL* pcc = dynamic_cast<ColumnCommandJL*>(prevCC.get());
DictStepJL* ccc = dynamic_cast<DictStepJL*>(cc.get());
if (pcc && ccc)
{
filterSteps[stepsIndex].reset(
new ColumnCommandJL(*pcc, *ccc)); // column command will use same filters.
}
}
#endif
filterSteps.push_back(cc);
filterCount++;
needStrValues = true;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addFilterStep(const FilterStep& step)
{
SCommand cc;
tableOID = step.tableOid();
cc.reset(new FilterCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
{
SCommand cc;
cc.reset(new PseudoCCJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
{
SCommand cc;
cc.reset(new ColumnCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
{
SCommand cc;
cc.reset(new PassThruCommandJL(step));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(step.tupleId());
cc->setQueryUuid(step.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
if (utils::isWide(cc->getWidth()))
wideColumnsWidths |= cc->getWidth();
if (filterCount == 0 && !sendRowGroups)
sendValues = true;
idbassert(sessionID == step.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& col, const pDictionaryStep& dict)
{
SCommand cc;
cc.reset(new RTSCommandJL(col, dict));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(dict.tupleId());
cc->setQueryUuid(col.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
needStrValues = true;
idbassert(sessionID == col.sessionId());
idbassert(sessionID == dict.sessionId());
}
void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDictionaryStep& dict)
{
SCommand cc;
cc.reset(new RTSCommandJL(p, dict));
cc->setBatchPrimitiveProcessor(this);
cc->setTupleKey(dict.tupleId());
cc->setQueryUuid(p.queryUuid());
cc->setStepUuid(uuid);
projectSteps.push_back(cc);
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
needStrValues = true;
if (filterCount == 0 && !sendRowGroups)
{
sendValues = true;
sendAbsRids = true;
absRids.reset(new uint64_t[8192]);
}
idbassert(sessionID == p.sessionId());
idbassert(sessionID == dict.sessionId());
}
#if 0
void BatchPrimitiveProcessorJL::addDeliveryStep(const DeliveryStep& ds)
{
idbassert(sessionID == ds.sessionId());
uint32_t i;
templateTB = ds.fEmptyTableBand;
tableOID = ds.fTableOID;
tableColumnCount = templateTB.getColumnCount();
idbassert(tableColumnCount > 0);
tablePositions.reset(new int[tableColumnCount]);
for (i = 0; i < tableColumnCount; i++)
tablePositions[i] = -1;
idbassert(projectCount <= projectSteps.size());
/* In theory, tablePositions maps a column's table position to a projection step
-1 means it's a null column */
CalpontSystemCatalog::OID oid;
int idx;
for (i = 0; i < projectCount; i++)
{
oid = static_cast<CalpontSystemCatalog::OID>(projectSteps[i]->getOID());
if (oid > 0)
{
idx = templateTB.find(oid);
if (idx >= 0)
{
tablePositions[idx] = i;
}
else
{
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): didn't find OID " << oid <<
" in tableband at pjstep idx " << i << endl;
}
}
else
{
cout << "BatchPrimitiveProcessorJL::addDeliveryStep(): pjstep idx " << i <<
" doesn't have a valid OID" << endl;
}
}
}
#endif
void BatchPrimitiveProcessorJL::addElementType(const ElementType& et, uint32_t dbroot)
{
uint32_t i;
// rowCounter++;
if (needToSetLBID)
{
needToSetLBID = false;
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(et.first, dbroot);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(et.first, dbroot);
baseRid = et.first & 0xffffffffffffe000ULL;
}
// TODO: get rid of magics
if (sendAbsRids)
absRids[ridCount] = et.first;
else
{
relRids[ridCount] = et.first & 0x1fff; // 8192 rows per logical block
ridMap |= 1 << (relRids[ridCount] >> 9); // LSB -> 0-511, MSB -> 7680-8191
}
if (sendValues)
{
// cout << "adding value " << et.second << endl;
values[ridCount] = et.second;
}
ridCount++;
idbassert(ridCount <= 8192);
}
#if 0
void BatchPrimitiveProcessorJL::setRowGroupData(const rowgroup::RowGroup& rg)
{
uint32_t i;
rowgroup::Row r;
inputRG.setData(rg.getData());
inputRG.initRow(&r);
inputRG.getRow(0, &r);
if (needToSetLBID)
{
needToSetLBID = false;
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(r.getRid(), 1);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(r.getRid(), 1);
baseRid = r.getRid() & 0xffffffffffffe000ULL;
}
}
#endif
void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint32_t dbroot)
{
if (filterCount == 0)
throw logic_error("BPPJL::addElementType(StringElementType): doesn't work without filter steps yet");
addElementType(ElementType(et.first, et.first), dbroot);
}
/**
* When the output type is ElementType, the messages have the following format:
*
* ISMPacketHeader (ignored)
* PrimitiveHeader (ignored)
* --
* If the BPP starts with a scan
* casual partitioning data from the scanned column
* Base Rid for the returned logical block
* rid count
* (rid count)x 16-bit rids
* (rid count)x 64-bit values
* If there's a join performed by this BPP
* pre-Join rid count (for logging purposes only)
* small-side new match count
* (match count)x ElementTypes
* cached IO count
* physical IO count
* block touches
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in, vector<ElementType>* out, bool* validCPData,
uint64_t* lbid, int64_t* min, int64_t* max,
uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const
{
uint32_t i;
uint16_t l_count;
uint64_t l_baseRid;
uint16_t* rids;
uint64_t* vals;
uint8_t* buf;
uint64_t tmp64;
uint8_t tmp8;
/* skip the header */
idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_baseRid;
in >> l_count;
// rowsProcessed += l_count;
idbassert(l_count <= 8192);
out->resize(l_count);
buf = (uint8_t*)in.buf();
rids = (uint16_t*)buf;
vals = (uint64_t*)(buf + (l_count << 1));
idbassert(in.length() > (uint32_t)((l_count << 1) + (l_count << 3)));
in.advance((l_count << 1) + (l_count << 3));
for (i = 0; i < l_count; ++i)
{
(*out)[i].first = rids[i] + l_baseRid;
// if (tableOID >= 3000)
// idbassert((*out)[i].first > 1023);
(*out)[i].second = vals[i];
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
// cout << "ET: got physIO=" << (int) *physIO << " cachedIO=" <<
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
idbassert(in.length() == 0);
}
/**
* When the output type is StringElementType the messages have the following format:
*
* ISMPacketHeader (ignored)
* PrimitiveHeader (ignored)
* ---
* If the BPP starts with a scan
* Casual partitioning data from the column scanned
* Rid count
* (rid count)x 64-bit absolute rids
* (rid count)x serialized strings
* cached IO count
* physical IO count
* blocks touched
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in, vector<StringElementType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const
{
uint32_t i;
uint16_t l_count;
uint64_t* l_absRids;
uint64_t tmp64;
uint8_t tmp8;
// cout << "get String ETs uniqueID\n";
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_count;
// cout << "parsing " << l_count << " strings\n";
l_absRids = (uint64_t*)in.buf();
out->resize(l_count);
in.advance(l_count << 3);
for (i = 0; i < l_count; ++i)
{
(*out)[i].first = l_absRids[i];
in >> (*out)[i].second;
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
// cout << "SET: got physIO=" << (int) *physIO << " cachedIO=" <<
// (int) *cachedIO << " touchedBlocks=" << (int) *touchedBlocks << endl;
idbassert(in.length() == 0);
}
/**
* When the output type is Tuples, the input message format is the same
* as when the output type is TableBands
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
{
uint32_t i, j, pos, len;
uint16_t l_rowCount;
uint64_t l_baseRid;
uint16_t* l_relRids;
uint64_t absRids[8192];
// const uint8_t* columnData[projectCount];
const uint8_t** columnData = (const uint8_t**)alloca(projectCount * sizeof(uint8_t*));
memset(columnData, 0, projectCount * sizeof(uint8_t*));
const uint8_t* buf;
// uint32_t colLengths[projectCount];
uint32_t* colLengths = (uint32_t*)alloca(projectCount * sizeof(uint32_t));
uint64_t tmp64;
uint8_t tmp8;
// cout << "getTuples msg is " << in.length() << " bytes\n";
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp64;
*min = (int64_t)tmp64;
in >> tmp64;
*max = (int64_t)tmp64;
}
else
in >> *lbid;
}
in >> l_rowCount;
// cout << "read " << l_rowCount << " rows\n";
if (needRidsAtDelivery)
{
in >> l_baseRid;
l_relRids = (uint16_t*)in.buf();
for (i = 0; i < l_rowCount; i++)
absRids[i] = l_relRids[i] + l_baseRid;
in.advance(l_rowCount << 1);
}
/* Set up pointers to the column data */
pos = 0;
buf = in.buf();
for (i = 0; i < projectCount; i++)
{
colLengths[i] = *((uint32_t*)&buf[pos]);
pos += 4;
columnData[i] = &buf[pos];
// cout << "column " << i << " is " << colLengths[i] << " long at " << pos << endl;
pos += colLengths[i];
idbassert(pos < in.length());
}
in.advance(pos);
out->resize(l_rowCount);
for (i = 0; i < l_rowCount; i++)
{
(*out)[i].first = absRids[i];
(*out)[i].second = new char[tupleLength];
for (j = 0, pos = 0; j < projectCount; j++)
{
idbassert(pos + colWidths[j] <= tupleLength);
if (projectSteps[j]->getCommandType() == CommandJL::RID_TO_STRING)
{
len = *((uint32_t*)columnData[j]);
columnData[j] += 4;
memcpy(&(*out)[i].second[pos], columnData[j], len);
pos += len;
columnData[j] += len;
// insert padding...
memset(&(*out)[i].second[pos], 0, colWidths[j] - len);
pos += colWidths[j] - len;
}
else
{
switch (colWidths[j])
{
case 8:
*((uint64_t*)&(*out)[i].second[pos]) = *((uint64_t*)columnData[j]);
columnData[j] += 8;
pos += 8;
break;
case 4:
*((uint32_t*)&(*out)[i].second[pos]) = *((uint32_t*)columnData[j]);
columnData[j] += 4;
pos += 4;
break;
case 2:
*((uint16_t*)&(*out)[i].second[pos]) = *((uint16_t*)columnData[j]);
columnData[j] += 2;
pos += 4;
break;
case 1:
(*out)[i].second[pos] = (char)*columnData[j];
columnData[j]++;
pos++;
break;
// TODO MCOL-641
case 16:
// fallthrough
default:
cout << "BPP::getTuples(): bad column width of " << colWidths[j] << endl;
throw logic_error("BPP::getTuples(): bad column width");
}
}
}
}
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
idbassert(in.length() == 0);
}
bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
{
const uint8_t* data = in.buf();
uint32_t offset = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader); // skip the headers
ISMPacketHeader* hdr = (ISMPacketHeader*)(data);
if (_hasScan && in.length() > offset)
{
// This is a legitimate error message sent by PrimProc
// so we need to return to allow upper layer to throw an error
// if needed.
if (hdr->Status > 0)
{
return true;
}
if (data[offset] != 0)
offset += (data[offset + CP_FLAG_AND_LBID + 1] * 2) + CP_FLAG_AND_LBID + 1 +
1; // skip the CP data with wide min/max values (16/32 bytes each). we also skip
// cpFromDictScan flag.
else
offset += CP_FLAG_AND_LBID; // skip only the "valid CP data" & LBID bytes
}
// Throw b/c PP throws and sends here error msg.
// See BatchPrimitiveProcessor::writeErrorMsg() for details.
// The inversion of the assert used here previously.
if (in.length() <= offset)
{
if (hdr->Status > 0)
{
throw std::runtime_error(" an exception originally thrown by PrimProc: ");
}
throw std::runtime_error(
" an exception because there is not enough \
data in the Primitive message from PrimProc.");
}
return (data[offset] != 0);
}
void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in, vector<RGData>* out) const
{
RGData rgData;
uint32_t count, i;
*in >> count;
for (i = 0; i < count; i++)
{
rgData.deserialize(*in, true);
out->push_back(rgData);
}
}
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out, bool* validCPData,
uint64_t* lbid, bool* fromDictScan, int128_t* min,
int128_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
bool* hasWideColumn,
const execplan::CalpontSystemCatalog::ColType& colType) const
{
uint64_t tmp64;
int128_t tmp128;
uint8_t tmp8;
RGData rgData;
uint32_t rowCount;
RowGroup& org = primprocRG[threadID];
out->clear();
if (in.length() == 0)
{
// done, return an empty RG
rgData = RGData(org, 0);
org.setData(&rgData);
org.resetRowGroup(0);
out->push_back(rgData);
*cachedIO = 0;
*physIO = 0;
*touchedBlocks = 0;
return;
}
/* skip the header */
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
if (_hasScan)
{
in >> tmp8;
*validCPData = (tmp8 != 0);
if (*validCPData)
{
in >> *lbid;
in >> tmp8;
*fromDictScan = tmp8 != 0;
in >> tmp8;
*hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
if (UNLIKELY(*hasWideColumn))
{
idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
if (LIKELY(colType.isWideDecimalType()))
{
in >> tmp128;
*min = tmp128;
in >> tmp128;
*max = tmp128;
}
else
{
std::ostringstream oss;
oss << __func__ << " WARNING!!! Not implemented for the data type ";
oss << colType.colDataType << std::endl;
std::cout << oss.str();
idbassert(false);
}
}
else
{
in >> tmp64;
*min = static_cast<int128_t>(tmp64);
in >> tmp64;
*max = static_cast<int128_t>(tmp64);
}
}
else
in >> *lbid;
}
in >> tmp8;
*countThis = (tmp8 != 0);
/* Would be cleaner to make the PM BPP's send a msg count to unify the msg formats.
* For later... */
if (aggregatorPM)
{
deserializeAggregateResult(&in, out);
// for (uint32_t z = 0; z < out->size(); z++) {
// org.setData(&(*out)[z]);
// cout << "BPPJL: " << org.toString() << endl;
//}
}
else
{
rgData.deserialize(in, true);
out->push_back(rgData);
org.setData(&rgData);
rowCount = org.getRowCount();
bool pmSendsMatchesAnyway =
(hasSmallOuterJoin && *countThis && PMJoinerCount > 0 && (fe2 || aggregatorPM));
if (!pmSendsFinalResult() || pmSendsMatchesAnyway)
{
std::shared_ptr<vector<uint32_t>[]> joinResults;
uint32_t i, j;
if (pmSendsMatchesAnyway)
{
uint16_t joinRowCount;
in >> joinRowCount;
rowCount = joinRowCount;
}
for (j = 0; j < PMJoinerCount; j++)
{
/* Reuse the result space if possible */
joinResults = tJoiners[j]->getPMJoinArrays(threadID);
if (joinResults.get() == NULL)
{
auto v = new vector<uint32_t>[8192];
joinResults.reset(v);
tJoiners[j]->setPMJoinResults(joinResults, threadID);
}
for (i = 0; i < rowCount; i++)
deserializeInlineVector<uint32_t>(in, joinResults[i]);
if (tJoiners[j]->smallOuterJoin())
tJoiners[j]->markMatches(threadID, rowCount);
}
}
}
if (*countThis)
{
// cout << "grabbing io stats\n";
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
}
else
{
*cachedIO = 0;
*physIO = 0;
*touchedBlocks = 0;
}
idbassert(in.length() == 0);
}
RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
{
RGData ret;
rowgroup::RowGroup rg(projectionRG);
ret = RGData(rg, 0);
rg.setData(&ret);
// rg.convertToInlineDataInPlace();
rg.resetRowGroup(0);
rg.setStatus(error);
return ret;
// return ret->rowData;
}
void BatchPrimitiveProcessorJL::reset()
{
ridCount = 0;
ridMap = 0;
needToSetLBID = true;
}
void BatchPrimitiveProcessorJL::setLBID(uint64_t l, const BRM::EMEntry& scannedExtent)
{
uint32_t i;
dbRoot = scannedExtent.dbRoot;
baseRid = rowgroup::convertToRid(
scannedExtent.partitionNum, scannedExtent.segmentNum,
scannedExtent.blockOffset / (scannedExtent.range.size * 1024), // the extent #
(l - scannedExtent.range.start) / scannedExtent.range.size); // the logical block #
/*
cout << "got baserid=" << baseRid << " from partnum=" << scannedExtent.partitionNum
<< " segnum=" << scannedExtent.segmentNum << " extentnum=" <<
scannedExtent.blockOffset/(scannedExtent.range.size * 1024) <<
" blocknum=" << (l - scannedExtent.range.start)/scannedExtent.range.size << endl;
*/
for (i = 0; i < filterCount; ++i)
filterSteps[i]->setLBID(baseRid, dbRoot);
for (i = 0; i < projectCount; ++i)
projectSteps[i]->setLBID(baseRid, dbRoot);
}
string BatchPrimitiveProcessorJL::toString() const
{
ostringstream ret;
uint32_t i;
ret << "BatchPrimitiveProcessorJL:" << endl;
if (!_hasScan)
{
if (sendValues)
ret << " -- serializing values" << endl;
if (sendAbsRids)
ret << " -- serializing absolute rids" << endl;
else
ret << " -- serializing relative rids" << endl;
}
else
ret << " -- scan driven" << endl;
ret << " " << (int)filterCount << " filter steps:\n";
for (i = 0; i < filterCount; i++)
ret << " " << filterSteps[i]->toString() << endl;
ret << " " << (int)projectCount << " projection steps:\n";
for (i = 0; i < projectCount; i++)
ret << " " << projectSteps[i]->toString() << endl;
return ret.str();
}
/**
* The creation messages have the following format:
*
* ISMPacketHeader (necessary for DEC and ReadThread classes)
* ---
* output type (ElementType, StringElementType, etc)
* version #
* transaction #
* session # (unnecessary except for debugging)
* step ID ( same )
* unique ID (uniquely identifies the DEC queue on the UM)
* 8-bit flags
* if there's a join...
* # of elements in the Joiner object
* a flag whether or not to match every element (for inner joins)
* filter step count
* (filter count)x serialized Commands
* projection step count
* (projection count)x serialized Commands
*/
void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
{
ISMPacketHeader ism;
uint32_t i;
uint16_t flags = 0;
ism.Command = BATCH_PRIMITIVE_CREATE;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (uint8_t)ot;
bs << (messageqcpp::ByteStream::quadbyte)txnID;
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
bs << versionInfo;
if (needStrValues)
flags |= NEED_STR_VALUES;
if (sendAbsRids)
flags |= GOT_ABS_RIDS;
if (sendValues)
flags |= GOT_VALUES;
if (LBIDTrace)
flags |= LBID_TRACE;
if (needRidsAtDelivery)
flags |= SEND_RIDS_AT_DELIVERY;
if (tJoiners.size() > 0)
flags |= HAS_JOINER;
if (sendRowGroups)
flags |= HAS_ROWGROUP;
if (sendTupleJoinRowGroupData)
flags |= JOIN_ROWGROUP_DATA;
if (wideColumnsWidths)
flags |= HAS_WIDE_COLUMNS;
bs << flags;
if (wideColumnsWidths)
bs << wideColumnsWidths;
bs << bop;
bs << (uint8_t)(forHJ ? 1 : 0);
if (sendRowGroups)
{
bs << valueColumn;
bs << inputRG;
}
if (ot == ROW_GROUP)
{
bs << projectionRG;
// cout << "BPPJL: projectionRG is:\n" << projectionRG.toString() << endl;
/* F&E serialization */
if (fe1)
{
bs << (uint8_t)1;
bs << *fe1;
bs << fe1Input;
}
else
bs << (uint8_t)0;
if (fe2)
{
bs << (uint8_t)1;
bs << *fe2;
bs << fe2Output;
}
else
bs << (uint8_t)0;
}
/* if HAS_JOINER, send the init params */
if (flags & HAS_JOINER)
{
bs << (uint32_t)maxPmJoinResultCount;
if (ot == ROW_GROUP)
{
idbassert(tJoiners.size() > 0);
bs << (messageqcpp::ByteStream::quadbyte)PMJoinerCount;
bool atLeastOneFE = false;
#ifdef JLF_DEBUG
cout << "PMJoinerCount = " << PMJoinerCount << endl;
#endif
bool smallSideRGSent = false;
for (i = 0; i < PMJoinerCount; i++)
{
bs << (uint32_t)tJoiners[i]->size();
bs << tJoiners[i]->getJoinType();
// bs << (uint64_t) tJoiners[i]->smallNullValue();
bs << (uint8_t)tJoiners[i]->isTypelessJoin();
if (tJoiners[i]->hasFEFilter())
{
atLeastOneFE = true;
#ifdef JLF_DEBUG
cout << "serializing join FE object\n";
#endif
bs << *tJoiners[i]->getFcnExpFilter();
}
if (!tJoiners[i]->isTypelessJoin())
{
bs << (uint64_t)tJoiners[i]->smallNullValue();
bs << (messageqcpp::ByteStream::quadbyte)tJoiners[i]->getLargeKeyColumn();
// cout << "large key column is " << (uint32_t) tJoiners[i]->getLargeKeyColumn() << endl;
}
else
{
serializeVector<uint32_t>(bs, tJoiners[i]->getLargeKeyColumns());
bs << (uint32_t)tJoiners[i]->getKeyLength();
// MCOL-4173 Notify PP if smallSide and largeSide have different column widths
// and send smallSide RG to PP.
bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
bs << (uint8_t)joinHasSkewedKeyColumn;
if (!smallSideRGSent && joinHasSkewedKeyColumn)
{
idbassert(!smallSideRGs.empty());
bs << smallSideRGs[0];
serializeVector<uint32_t>(bs, tJoiners[i]->getSmallKeyColumns());
smallSideRGSent = true;
}
}
}
if (atLeastOneFE)
bs << joinFERG;
if (sendTupleJoinRowGroupData)
{
#ifdef JLF_DEBUG
cout << "sending smallside data\n";
#endif
serializeVector<RowGroup>(bs, smallSideRGs);
bs << largeSideRG;
bs << joinedRG; // TODO: I think we can omit joinedRG if (!(fe2 || aggregatorPM))
// cout << "joined RG: " << joinedRG.toString() << endl;
}
}
}
bs << filterCount;
for (i = 0; i < filterCount; ++i)
{
// cout << "serializing step " << i << endl;
filterSteps[i]->createCommand(bs);
}
bs << projectCount;
for (i = 0; i < projectCount; ++i)
projectSteps[i]->createCommand(bs);
// aggregate step only when output is row group
if (ot == ROW_GROUP)
{
if (aggregatorPM.get() != NULL)
{
bs << (uint8_t)1;
bs << aggregateRGPM;
bs << *(aggregatorPM.get());
}
else
{
bs << (uint8_t)0;
}
}
// decide which rowgroup is received by PrimProc
if (ot == ROW_GROUP)
{
primprocRG.reset(new RowGroup[threadCount]);
for (uint32_t i = 0; i < threadCount; i++)
if (aggregatorPM)
primprocRG[i] = aggregateRGPM;
else if (fe2)
primprocRG[i] = fe2Output;
// This shouldn't be necessary. As of 2-17-14, PrimProc
// will only send joined results if fe2 || aggregatorPM,
// so it will never send back data for joinedRG.
// else if ((flags & HAS_JOINER) && sendTupleJoinRowGroupData)
// primprocRG[i] = joinedRG;
else
primprocRG[i] = projectionRG;
}
}
/**
* The BPP Run messages have the following format:
*
* ISMPacketHeader
* -- Interleave field is used for DEC interleaving data
* -- Size field is used for the relative weight to process the message
* -----
* Session ID
* Step ID
* Unique ID
* Sequence #
* Iteration count
* Rid count
* If absolute rids are sent
* (rid count)x 64-bit absolute rids
* else
* RID range bitmap
* base RID
* (rid count)x 16-bit relative rids
* If values are sent
* (rid count)x 64-bit values
* (filter count)x run msgs for filter Commands
* (projection count)x run msgs for projection Commands
*/
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
{
ISMPacketHeader ism;
uint32_t i;
/* XXXPAT: BPPJL currently reuses the ism Size fields for other things to
save bandwidth. They're completely unused otherwise. We need to throw out all unused
fields of every defined header. */
bs.restart();
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_RUN;
// TODO: this causes the code using this ism to send on only one socket, even if more than one socket is
// defined for each PM.
ism.Interleave = pmNum;
// ism.Interleave = pmNum - 1;
/* ... and the Size field is used for the "weight" of processing a BPP
where 1 is one Command on one logical block. */
ism.Size = count * (filterCount + projectCount);
bs.append((uint8_t*)&ism, sizeof(ism));
/* The next 4 vars are for BPPSeeder; BPP itself skips them */
bs << sessionID;
bs << stepID;
bs << uniqueID;
bs << _priority;
// The weight is used by PrimProc thread pool algo
uint32_t weight = calculateBPPWeight();
bs << weight;
bs << dbRoot;
bs << count;
uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
bs << sentByEM;
if (_hasScan)
idbassert(ridCount == 0);
else if (!sendRowGroups)
idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids));
else
idbassert(inputRG.getRowCount() > 0);
if (sendRowGroups)
{
uint32_t rgSize = inputRG.getDataSize();
bs << rgSize;
bs.append(inputRG.getData(), rgSize);
}
else
{
bs << ridCount;
if (sendAbsRids)
bs.append((uint8_t*)absRids.get(), ridCount << 3);
else
{
bs << ridMap;
bs << baseRid;
bs.append((uint8_t*)relRids, ridCount << 1);
}
if (sendValues)
bs.append((uint8_t*)values, ridCount << 3);
}
for (i = 0; i < filterCount; i++)
filterSteps[i]->runCommand(bs);
for (i = 0; i < projectCount; i++)
projectSteps[i]->runCommand(bs);
}
void BatchPrimitiveProcessorJL::runErrorBPP(ByteStream& bs)
{
ISMPacketHeader ism;
bs.restart();
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_RUN;
ism.Status = status;
ism.Size = sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader);
bs.append((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
bs << count;
bs << ridCount;
}
void BatchPrimitiveProcessorJL::destroyBPP(ByteStream& bs) const
{
ISMPacketHeader ism;
// if (!(sessionID & 0x80000000))
// cout << "step ID " << stepID << " added " << rowCounter << " rows, processed "
// << rowsProcessed << " rows" << endl;
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_DESTROY;
bs.append((uint8_t*)&ism, sizeof(ism));
/* XXXPAT: We can get rid of sessionID and stepID;
it's there for easier debugging only */
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
}
void BatchPrimitiveProcessorJL::useJoiners(const vector<std::shared_ptr<joiner::TupleJoiner> >& j)
{
pos = 0;
joinerNum = 0;
tJoiners = j;
PMJoinerCount = 0;
tlKeyLens.reset(new uint32_t[tJoiners.size()]);
for (uint32_t i = 0; i < tJoiners.size(); i++)
{
if (tJoiners[i]->inPM())
{
PMJoinerCount++;
smallSideKeys.push_back(tJoiners[i]->getSmallKeyColumns());
smallSideRGs.push_back(tJoiners[i]->getSmallRG());
if (tJoiners[i]->isTypelessJoin())
tlKeyLens[i] = tJoiners[i]->getKeyLength();
if (tJoiners[i]->hasFEFilter())
sendTupleJoinRowGroupData = true;
if (tJoiners[i]->smallOuterJoin())
hasSmallOuterJoin = true;
}
}
largeSideRG = tJoiners[0]->getLargeRG();
if (aggregatorPM || fe2)
{
sendTupleJoinRowGroupData = true;
#ifdef JLF_DEBUG
cout << "will send small side row data\n";
#endif
}
posByJoinerNum.reset(new uint32_t[PMJoinerCount]);
memset(posByJoinerNum.get(), 0, PMJoinerCount * sizeof(uint32_t));
}
// helper fcn to interleave small side data by joinernum
bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
{
uint i;
// find the next joiner that still has more data to send. Set joinerNum & pos.
for (i = 0; i < PMJoinerCount; i++)
{
joinerNum = (joinerNum + 1) % PMJoinerCount;
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
break;
}
if (i == PMJoinerCount)
return false;
pos = posByJoinerNum[joinerNum];
return true;
}
/* This algorithm relies on the joiners being sorted by size atm */
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
{
uint32_t size = 0, toSend, i, j;
ISMPacketHeader ism;
Row r;
vector<Row::Pointer>* tSmallSide;
joiner::TypelessData tlData;
uint32_t smallKeyCol;
uint32_t largeKeyCol;
uint64_t smallkey;
bool isNull;
bool bSignedUnsigned;
bool moreMsgs = pickNextJoinerNum();
if (!moreMsgs)
{
/* last message */
// cout << "sending last joiner msg\n";
ism.Command = BATCH_PRIMITIVE_END_JOINER;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
return false;
}
memset((void*)&ism, 0, sizeof(ism));
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
#if 0
if (joinerNum == PMJoinerCount - 1 && pos == size)
{
/* last message */
// cout << "sending last joiner msg\n";
ism.Command = BATCH_PRIMITIVE_END_JOINER;
bs.load((uint8_t*) &ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
pos = 0;
return false;
}
else if (pos == size)
{
joinerNum++;
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
pos = 0;
}
#endif
ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
bs.load((uint8_t*)&ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
smallSideRGs[joinerNum].initRow(&r);
unsigned metasize = 12; // 12 = sizeof(struct JoinerElements)
if (tJoiners[joinerNum]->isTypelessJoin())
metasize = 0;
unsigned rowunits = fJoinerChunkSize / (r.getSize() + metasize);
toSend = std::min<unsigned int>(size - pos, rowunits);
bs << toSend;
bs << pos;
bs << joinerNum;
if (tJoiners[joinerNum]->isTypelessJoin())
{
utils::FixedAllocator fa(tlKeyLens[joinerNum], true);
for (i = pos; i < pos + toSend; i++)
{
r.setPointer((*tSmallSide)[i]);
isNull = false;
bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
for (j = 0; j < smallSideKeys[joinerNum].size(); j++)
{
isNull |= r.isNullValue(smallSideKeys[joinerNum][j]);
if (UNLIKELY(bSignedUnsigned))
{
// BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either side,
// then it should not compare. Send null to PM to prevent compare
smallKeyCol = smallSideKeys[joinerNum][j];
largeKeyCol = tJoiners[joinerNum]->getLargeKeyColumns()[j];
if (r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(largeKeyCol))
{
if (r.isUnsigned(smallKeyCol))
smallkey = r.getUintField(smallKeyCol);
else
smallkey = r.getIntField(smallKeyCol);
if (smallkey & 0x8000000000000000ULL)
{
isNull = true;
break;
}
}
}
}
if (!isNull)
{
tlData = makeTypelessKey(r, smallSideKeys[joinerNum], tlKeyLens[joinerNum], &fa, largeSideRG,
tJoiners[joinerNum]->getLargeKeyColumns());
if (tlData.len == 0)
{
isNull = true;
}
}
bs << (uint8_t)isNull;
if (!isNull)
{
tlData.serialize(bs);
bs << i;
}
}
}
else
{
#pragma pack(push, 1)
struct JoinerElements
{
int64_t key;
uint32_t value;
} * arr;
#pragma pack(pop)
bs.needAtLeast(toSend * sizeof(JoinerElements));
arr = (JoinerElements*)bs.getInputPtr();
smallKeyCol = smallSideKeys[joinerNum][0];
bSignedUnsigned =
r.isUnsigned(smallKeyCol) != largeSideRG.isUnsigned(tJoiners[joinerNum]->getLargeKeyColumns()[0]);
j = 0;
for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
{
r.setPointer((*tSmallSide)[i]);
if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
{
// Small side is a long double. Since CS can't store larger than DOUBLE,
// we need to convert to whatever type large side is -- double or int64
long double smallkeyld = r.getLongDoubleField(smallKeyCol);
switch (largeSideRG.getColType(tJoiners[joinerNum]->getLargeKeyColumns()[0]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
if (smallkeyld > MAX_DOUBLE || smallkeyld < MIN_DOUBLE)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else
{
double d = (double)smallkeyld;
smallkey = *(int64_t*)&d;
}
break;
}
default:
{
if (r.isUnsigned(smallKeyCol) && smallkeyld > MAX_UBIGINT)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else if (smallkeyld > MAX_BIGINT || smallkeyld < MIN_BIGINT)
{
smallkey = joblist::UBIGINTEMPTYROW;
}
else
{
smallkey = (int64_t)smallkeyld;
}
break;
}
}
}
else if (r.isUnsigned(smallKeyCol))
smallkey = r.getUintField(smallKeyCol);
else
smallkey = r.getIntField(smallKeyCol);
// If this is a compare signed vs unsigned and the sign bit is on for this value, then all compares
// against the large side should fall. UBIGINTEMPTYROW is not a valid value, so nothing will match.
if (bSignedUnsigned && (smallkey & 0x8000000000000000ULL))
smallkey = joblist::UBIGINTEMPTYROW;
arr[j].key = (int64_t)smallkey;
arr[j].value = i;
// cout << "sending " << arr[j].key << ", " << arr[j].value << endl;
}
bs.advanceInputPtr(toSend * sizeof(JoinerElements));
}
if (sendTupleJoinRowGroupData)
{
RowGroup& smallSide = smallSideRGs[joinerNum];
RGData tmpData(smallSide, toSend);
Row tmpRow;
smallSide.setData(&tmpData);
smallSide.initRow(&tmpRow);
smallSide.getRow(0, &tmpRow);
for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
{
r.setPointer((*tSmallSide)[i]);
copyRow(r, &tmpRow);
}
smallSide.setRowCount(toSend);
tmpData.serialize(bs, smallSide.getDataSize());
}
pos += toSend;
posByJoinerNum[joinerNum] = pos;
return true;
}
void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
{
ot = ROW_GROUP;
projectionRG = rg;
}
void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg)
{
joinedRG = rg;
}
void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg)
{
sendRowGroups = true;
sendAbsRids = false;
sendValues = false;
inputRG = rg;
}
void BatchPrimitiveProcessorJL::addAggregateStep(const rowgroup::SP_ROWAGG_PM_t& aggpm,
const rowgroup::RowGroup& argpm)
{
aggregatorPM = aggpm;
aggregateRGPM = argpm;
if (tJoiners.size() > 0)
sendTupleJoinRowGroupData = true;
}
/* OR hacks */
void BatchPrimitiveProcessorJL::setBOP(uint32_t op)
{
bop = op;
if (op == BOP_OR && filterCount > 1)
{
for (int i = 1; i < filterCount; ++i)
{
ColumnCommandJL* colcmd = dynamic_cast<ColumnCommandJL*>(filterSteps[i].get());
if (colcmd != NULL)
colcmd->scan(false);
}
}
}
void BatchPrimitiveProcessorJL::setForHJ(bool b)
{
forHJ = b;
}
void BatchPrimitiveProcessorJL::setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
const RowGroup& input)
{
fe1 = fe;
fe1Input = input;
}
void BatchPrimitiveProcessorJL::setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper> fe,
const RowGroup& output)
{
fe2 = fe;
fe2Output = output;
if (tJoiners.size() > 0 && PMJoinerCount > 0)
sendTupleJoinRowGroupData = true;
}
const string BatchPrimitiveProcessorJL::toMiniString() const
{
ostringstream oss;
int i;
set<string> colset;
string col;
for (i = 0; i < filterCount; i++)
{
col = filterSteps[i]->getColName();
// FilterCommandJL has two referenced columns, needs special handling.
FilterCommandJL* filterCmd = dynamic_cast<FilterCommandJL*>(filterSteps[i].get());
if (filterCmd == NULL)
{
colset.insert(col);
}
else
{
// is a FilterCommandJL
size_t sep = col.find(',');
colset.insert(col.substr(0, sep));
if (sep != string::npos)
colset.insert(col.substr(++sep));
}
}
for (i = 0; i < projectCount; i++)
{
col = projectSteps[i]->getColName();
colset.insert(col);
}
set<string>::iterator iter = colset.begin();
oss << '(' << *iter++;
while (iter != colset.end())
oss << ',' << *iter++;
oss << ')';
return oss.str();
}
void BatchPrimitiveProcessorJL::setJoinFERG(const RowGroup& rg)
{
joinFERG = rg;
}
void BatchPrimitiveProcessorJL::abortProcessing(ByteStream* bs)
{
ISMPacketHeader ism;
memset((void*)&ism, 0, sizeof(ism));
ism.Command = BATCH_PRIMITIVE_ABORT;
bs->load((uint8_t*)&ism, sizeof(ism));
*bs << uniqueID;
}
void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
{
if (aggregatorPM)
aggregateRGPM.setUseStringTable(b);
else if (fe2)
fe2Output.setUseStringTable(b);
else
projectionRG.setUseStringTable(b);
}
} // namespace joblist