You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-10 01:22:48 +03:00
After creating and populating tables with CHAR(5) case insensitive columns, in a set of consequent joins like: select * from t1, t2 where t1.c1=t2.c1; select * from t1, t2 where t1.c1=t2.c2; select * from t1, t2 where t1.c2=t2.c1; select * from t1, t2 where t1.c2=t2.c2; only the first join worked reliably case insensitively. Removing the remaining pieces of the code that used order_swap() to compare short CHAR columns, and using Charset::strnncollsp() instead. This fixes the issue.
3421 lines
107 KiB
C++
3421 lines
107 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019-2020 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: tuple-bps.cpp 9705 2013-07-17 20:06:07Z pleblanc $
|
|
|
|
|
|
#include <unistd.h>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <algorithm>
|
|
#include <ctime>
|
|
#include <sys/time.h>
|
|
#include <deque>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
#include <boost/uuid/uuid_io.hpp>
|
|
using namespace boost;
|
|
|
|
#include "bpp-jl.h"
|
|
#include "distributedenginecomm.h"
|
|
#include "elementtype.h"
|
|
#include "jlf_common.h"
|
|
#include "primitivestep.h"
|
|
#include "unique32generator.h"
|
|
#include "rowestimator.h"
|
|
using namespace joblist;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
#include "errorcodes.h"
|
|
#include "errorids.h"
|
|
#include "exceptclasses.h"
|
|
using namespace logging;
|
|
|
|
#include "liboamcpp.h"
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "brm.h"
|
|
using namespace BRM;
|
|
|
|
#include "oamcache.h"
|
|
|
|
#include "rowgroup.h"
|
|
using namespace rowgroup;
|
|
|
|
#include "threadnaming.h"
|
|
|
|
#include "querytele.h"
|
|
using namespace querytele;
|
|
|
|
#include "pseudocolumn.h"
|
|
//#define DEBUG 1
|
|
|
|
extern boost::mutex fileLock_g;
|
|
|
|
namespace
|
|
{
|
|
const uint32_t LOGICAL_EXTENT_CONVERTER = 10; // 10 + 13. 13 to convert to logical blocks,
|
|
// 10 to convert to groups of 1024 logical blocks
|
|
const uint32_t DEFAULT_EXTENTS_PER_SEG_FILE = 2;
|
|
|
|
}
|
|
|
|
/** Debug macro */
|
|
#define THROTTLE_DEBUG 0
|
|
#if THROTTLE_DEBUG
|
|
#define THROTTLEDEBUG std::cout
|
|
#else
|
|
#define THROTTLEDEBUG if (false) std::cout
|
|
#endif
|
|
namespace joblist
|
|
{
|
|
|
|
struct TupleBPSPrimitive
|
|
{
|
|
TupleBPSPrimitive(TupleBPS* batchPrimitiveStep) :
|
|
fBatchPrimitiveStep(batchPrimitiveStep)
|
|
{}
|
|
TupleBPS* fBatchPrimitiveStep;
|
|
void operator()()
|
|
{
|
|
try
|
|
{
|
|
utils::setThreadName("BPSPrimitive");
|
|
fBatchPrimitiveStep->sendPrimitiveMessages();
|
|
}
|
|
catch (std::exception& re)
|
|
{
|
|
cerr << "TupleBPS: send thread threw an exception: " << re.what() <<
|
|
"\t" << this << endl;
|
|
catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("TupleBPS: send thread threw an unknown exception ");
|
|
catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
|
|
cerr << msg << this << endl;
|
|
}
|
|
}
|
|
};
|
|
|
|
struct TupleBPSAggregators
|
|
{
|
|
TupleBPSAggregators(TupleBPS* batchPrimitiveStep, uint64_t index) :
|
|
fBatchPrimitiveStepCols(batchPrimitiveStep), fThreadId(index)
|
|
{}
|
|
TupleBPS* fBatchPrimitiveStepCols;
|
|
uint64_t fThreadId;
|
|
|
|
void operator()()
|
|
{
|
|
try
|
|
{
|
|
utils::setThreadName("BPSAggregator");
|
|
fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId);
|
|
}
|
|
catch (std::exception& re)
|
|
{
|
|
cerr << fBatchPrimitiveStepCols->toString() << ": receive thread threw an exception: " << re.what() << endl;
|
|
catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("TupleBPS: recv thread threw an unknown exception ");
|
|
cerr << fBatchPrimitiveStepCols->toString() << msg << endl;
|
|
catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
|
|
}
|
|
}
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initialize configurable parameters
|
|
//------------------------------------------------------------------------------
|
|
void TupleBPS::initializeConfigParms()
|
|
{
|
|
string strVal;
|
|
|
|
|
|
//...Get the tuning parameters that throttle msgs sent to primproc
|
|
//...fFilterRowReqLimit puts a cap on how many rids we will request from
|
|
//... primproc, before pausing to let the consumer thread catch up.
|
|
//... Without this limit, there is a chance that PrimProc could flood
|
|
//... ExeMgr with thousands of messages that will consume massive
|
|
//... amounts of memory for a 100 gigabyte database.
|
|
//...fFilterRowReqThreshold is the level at which the number of outstanding
|
|
//... rids must fall below, before the producer can send more rids.
|
|
|
|
//These could go in constructor
|
|
fRequestSize = fRm->getJlRequestSize();
|
|
fMaxOutstandingRequests = fRm->getJlMaxOutstandingRequests();
|
|
fProcessorThreadsPerScan = fRm->getJlProcessorThreadsPerScan();
|
|
fNumThreads = 0;
|
|
|
|
config::Config* cf = config::Config::makeConfig();
|
|
string epsf = cf->getConfig("ExtentMap", "ExtentsPerSegmentFile");
|
|
|
|
if ( epsf.length() != 0 )
|
|
fExtentsPerSegFile = cf->uFromText(epsf);
|
|
|
|
if (fRequestSize >= fMaxOutstandingRequests)
|
|
fRequestSize = 1;
|
|
|
|
if ((fSessionId & 0x80000000) == 0)
|
|
fMaxNumThreads = fRm->getJlNumScanReceiveThreads();
|
|
else
|
|
fMaxNumThreads = 1;
|
|
|
|
// Reserve the max number of thread space. A bit of an optimization.
|
|
fProducerThreads.clear();
|
|
fProducerThreads.reserve(fMaxNumThreads);
|
|
}
|
|
|
|
TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
|
|
BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
|
|
{
|
|
fInputJobStepAssociation = rhs.inputAssociation();
|
|
fOutputJobStepAssociation = rhs.outputAssociation();
|
|
fDec = 0;
|
|
fSessionId = rhs.sessionId();
|
|
fFilterCount = rhs.filterCount();
|
|
fFilterString = rhs.filterString();
|
|
isFilterFeeder = rhs.getFeederFlag();
|
|
fOid = rhs.oid();
|
|
fTableOid = rhs.tableOid();
|
|
extentSize = rhs.extentSize;
|
|
|
|
scannedExtents = rhs.extents;
|
|
extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
|
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
|
|
|
|
for (uint32_t z = 0; z < rhs.extents.size(); z++)
|
|
ref[rhs.extents[z].range.start] = rhs.extents[z];
|
|
|
|
lbidList = rhs.lbidList;
|
|
rpbShift = rhs.rpbShift;
|
|
divShift = rhs.divShift;
|
|
modMask = rhs.modMask;
|
|
numExtents = rhs.numExtents;
|
|
ridsRequested = 0;
|
|
ridsReturned = 0;
|
|
recvExited = 0;
|
|
totalMsgs = 0;
|
|
msgsSent = 0;
|
|
msgsRecvd = 0;
|
|
fMsgBytesIn = 0;
|
|
fMsgBytesOut = 0;
|
|
fBlockTouched = 0;
|
|
fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
|
|
recvWaiting = 0;
|
|
fStepCount = 1;
|
|
fCPEvaluated = false;
|
|
fEstimatedRows = 0;
|
|
fColType = rhs.colType();
|
|
alias(rhs.alias());
|
|
view(rhs.view());
|
|
name(rhs.name());
|
|
fColWidth = fColType.colWidth;
|
|
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
|
|
initializeConfigParms();
|
|
fBPP->setSessionID(fSessionId);
|
|
fBPP->setStepID(fStepId);
|
|
fBPP->setQueryContext(fVerId);
|
|
fBPP->setTxnID(fTxnId);
|
|
fTraceFlags = rhs.fTraceFlags;
|
|
fBPP->setTraceFlags(fTraceFlags);
|
|
fBPP->setOutputType(ROW_GROUP);
|
|
finishedSending = sendWaiting = false;
|
|
fNumBlksSkipped = 0;
|
|
fPhysicalIO = 0;
|
|
fCacheIO = 0;
|
|
BPPIsAllocated = false;
|
|
uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
fBPP->setUniqueID(uniqueID);
|
|
fBPP->setUuid(fStepUuid);
|
|
fCardinality = rhs.cardinality();
|
|
doJoin = false;
|
|
hasPMJoin = false;
|
|
hasUMJoin = false;
|
|
fRunExecuted = false;
|
|
fSwallowRows = false;
|
|
smallOuterJoiner = -1;
|
|
|
|
// @1098 initialize scanFlags to be true
|
|
scanFlags.assign(numExtents, true);
|
|
runtimeCPFlags.assign(numExtents, true);
|
|
bop = BOP_AND;
|
|
|
|
runRan = joinRan = false;
|
|
fDelivery = false;
|
|
fExtendedInfo = "TBPS: ";
|
|
fQtc.stepParms().stepType = StepTeleStats::T_BPS;
|
|
|
|
|
|
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
|
|
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
|
|
}
|
|
|
|
TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
|
|
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
|
|
{
|
|
fInputJobStepAssociation = rhs.inputAssociation();
|
|
fOutputJobStepAssociation = rhs.outputAssociation();
|
|
fDec = 0;
|
|
fFilterCount = rhs.filterCount();
|
|
fFilterString = rhs.filterString();
|
|
isFilterFeeder = rhs.getFeederFlag();
|
|
fOid = rhs.oid();
|
|
fTableOid = rhs.tableOid();
|
|
extentSize = rhs.extentSize;
|
|
lbidRanges = rhs.lbidRanges;
|
|
|
|
/* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */
|
|
scannedExtents = rhs.extents;
|
|
extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
|
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
|
|
|
|
for (uint32_t z = 0; z < rhs.extents.size(); z++)
|
|
ref[rhs.extents[z].range.start] = rhs.extents[z];
|
|
|
|
divShift = rhs.divShift;
|
|
totalMsgs = 0;
|
|
msgsSent = 0;
|
|
msgsRecvd = 0;
|
|
ridsReturned = 0;
|
|
ridsRequested = 0;
|
|
fNumBlksSkipped = 0;
|
|
fMsgBytesIn = 0;
|
|
fMsgBytesOut = 0;
|
|
fBlockTouched = 0;
|
|
fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
|
|
recvWaiting = 0;
|
|
fSwallowRows = false;
|
|
fStepCount = 1;
|
|
fCPEvaluated = false;
|
|
fEstimatedRows = 0;
|
|
fColType = rhs.colType();
|
|
alias(rhs.alias());
|
|
view(rhs.view());
|
|
name(rhs.name());
|
|
|
|
fColWidth = fColType.colWidth;
|
|
lbidList = rhs.lbidList;
|
|
|
|
finishedSending = sendWaiting = false;
|
|
firstRead = true;
|
|
fSwallowRows = false;
|
|
recvExited = 0;
|
|
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
|
|
initializeConfigParms();
|
|
fBPP->setSessionID(fSessionId);
|
|
fBPP->setQueryContext(fVerId);
|
|
fBPP->setTxnID(fTxnId);
|
|
fTraceFlags = rhs.fTraceFlags;
|
|
fBPP->setTraceFlags(fTraceFlags);
|
|
fBPP->setStepID(fStepId);
|
|
fBPP->setOutputType(ROW_GROUP);
|
|
fPhysicalIO = 0;
|
|
fCacheIO = 0;
|
|
BPPIsAllocated = false;
|
|
uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
fBPP->setUniqueID(uniqueID);
|
|
fBPP->setUuid(fStepUuid);
|
|
fCardinality = rhs.cardinality();
|
|
doJoin = false;
|
|
hasPMJoin = false;
|
|
hasUMJoin = false;
|
|
fRunExecuted = false;
|
|
smallOuterJoiner = -1;
|
|
bop = BOP_AND;
|
|
|
|
runRan = joinRan = false;
|
|
fDelivery = false;
|
|
fExtendedInfo = "TBPS: ";
|
|
|
|
initExtentMarkers();
|
|
fQtc.stepParms().stepType = StepTeleStats::T_BPS;
|
|
|
|
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
|
|
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
|
|
}
|
|
|
|
TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
|
|
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
|
|
{
|
|
fInputJobStepAssociation = rhs.inputAssociation();
|
|
fOutputJobStepAssociation = rhs.outputAssociation();
|
|
fDec = 0;
|
|
fFilterCount = 0;
|
|
fOid = rhs.oid();
|
|
fTableOid = rhs.tableOid();
|
|
ridsReturned = 0;
|
|
ridsRequested = 0;
|
|
fMsgBytesIn = 0;
|
|
fMsgBytesOut = 0;
|
|
fBlockTouched = 0;
|
|
fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
|
|
recvExited = 0;
|
|
totalMsgs = 0;
|
|
msgsSent = 0;
|
|
msgsRecvd = 0;
|
|
recvWaiting = 0;
|
|
fStepCount = 1;
|
|
fCPEvaluated = false;
|
|
fEstimatedRows = 0;
|
|
fColType = rhs.colType();
|
|
alias(rhs.alias());
|
|
view(rhs.view());
|
|
name(rhs.name());
|
|
fColWidth = fColType.colWidth;
|
|
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
|
|
initializeConfigParms();
|
|
fBPP->setSessionID(fSessionId);
|
|
fBPP->setStepID(fStepId);
|
|
fBPP->setQueryContext(fVerId);
|
|
fBPP->setTxnID(fTxnId);
|
|
fTraceFlags = rhs.fTraceFlags;
|
|
fBPP->setTraceFlags(fTraceFlags);
|
|
fBPP->setOutputType(ROW_GROUP);
|
|
finishedSending = sendWaiting = false;
|
|
fSwallowRows = false;
|
|
fNumBlksSkipped = 0;
|
|
fPhysicalIO = 0;
|
|
fCacheIO = 0;
|
|
BPPIsAllocated = false;
|
|
uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
fBPP->setUniqueID(uniqueID);
|
|
fBPP->setUuid(fStepUuid);
|
|
doJoin = false;
|
|
hasPMJoin = false;
|
|
hasUMJoin = false;
|
|
fRunExecuted = false;
|
|
isFilterFeeder = false;
|
|
smallOuterJoiner = -1;
|
|
|
|
// @1098 initialize scanFlags to be true
|
|
scanFlags.assign(numExtents, true);
|
|
runtimeCPFlags.assign(numExtents, true);
|
|
bop = BOP_AND;
|
|
|
|
runRan = joinRan = false;
|
|
fDelivery = false;
|
|
fExtendedInfo = "TBPS: ";
|
|
fQtc.stepParms().stepType = StepTeleStats::T_BPS;
|
|
|
|
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
|
|
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
|
|
|
|
}
|
|
|
|
TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
|
|
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
|
|
{
|
|
fInputJobStepAssociation = rhs.inputAssociation();
|
|
fOutputJobStepAssociation = rhs.outputAssociation();
|
|
fDec = 0;
|
|
fOid = rhs.oid();
|
|
fTableOid = rhs.tableOid();
|
|
totalMsgs = 0;
|
|
msgsSent = 0;
|
|
msgsRecvd = 0;
|
|
ridsReturned = 0;
|
|
ridsRequested = 0;
|
|
fNumBlksSkipped = 0;
|
|
fBlockTouched = 0;
|
|
fMsgBytesIn = 0;
|
|
fMsgBytesOut = 0;
|
|
fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
|
|
recvWaiting = 0;
|
|
fSwallowRows = false;
|
|
fStepCount = 1;
|
|
fCPEvaluated = false;
|
|
fEstimatedRows = 0;
|
|
alias(rhs.alias());
|
|
view(rhs.view());
|
|
name(rhs.name());
|
|
finishedSending = sendWaiting = false;
|
|
recvExited = 0;
|
|
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
|
|
initializeConfigParms();
|
|
fBPP->setSessionID(fSessionId);
|
|
fBPP->setStepID(fStepId);
|
|
fBPP->setQueryContext(fVerId);
|
|
fBPP->setTxnID(fTxnId);
|
|
fTraceFlags = rhs.fTraceFlags;
|
|
fBPP->setTraceFlags(fTraceFlags);
|
|
fBPP->setOutputType(ROW_GROUP);
|
|
fPhysicalIO = 0;
|
|
fCacheIO = 0;
|
|
BPPIsAllocated = false;
|
|
uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
fBPP->setUniqueID(uniqueID);
|
|
fBPP->setUuid(fStepUuid);
|
|
fCardinality = rhs.cardinality();
|
|
doJoin = false;
|
|
hasPMJoin = false;
|
|
hasUMJoin = false;
|
|
fRunExecuted = false;
|
|
isFilterFeeder = false;
|
|
smallOuterJoiner = -1;
|
|
// @1098 initialize scanFlags to be true
|
|
scanFlags.assign(numExtents, true);
|
|
runtimeCPFlags.assign(numExtents, true);
|
|
bop = BOP_AND;
|
|
|
|
runRan = joinRan = false;
|
|
fDelivery = false;
|
|
fExtendedInfo = "TBPS: ";
|
|
fQtc.stepParms().stepType = StepTeleStats::T_BPS;
|
|
|
|
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
|
|
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
|
|
|
|
}
|
|
|
|
TupleBPS::~TupleBPS()
|
|
{
|
|
if (fDec)
|
|
{
|
|
fDec->removeDECEventListener(this);
|
|
|
|
if (BPPIsAllocated)
|
|
{
|
|
ByteStream bs;
|
|
fBPP->destroyBPP(bs);
|
|
|
|
try
|
|
{
|
|
fDec->write(uniqueID, bs);
|
|
}
|
|
catch (const std::exception& e)
|
|
{
|
|
// log the exception
|
|
cerr << "~TupleBPS caught: " << e.what() << endl;
|
|
catchHandler(e.what(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "~TupleBPS caught unknown exception" << endl;
|
|
catchHandler("~TupleBPS caught unknown exception",
|
|
ERR_TUPLE_BPS, fErrorInfo, fSessionId);
|
|
}
|
|
}
|
|
|
|
fDec->removeQueue(uniqueID);
|
|
}
|
|
|
|
}
|
|
|
|
void TupleBPS::setBPP(JobStep* jobStep)
|
|
{
|
|
fCardinality = jobStep->cardinality();
|
|
|
|
pColStep* pcsp = dynamic_cast<pColStep*>(jobStep);
|
|
|
|
int colWidth = 0;
|
|
|
|
if (pcsp != 0)
|
|
{
|
|
PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep);
|
|
|
|
if (pseudo)
|
|
{
|
|
fBPP->addFilterStep(*pseudo);
|
|
|
|
if (pseudo->filterCount() > 0)
|
|
{
|
|
hasPCFilter = true;
|
|
|
|
switch (pseudo->pseudoColumnId())
|
|
{
|
|
case PSEUDO_EXTENTRELATIVERID:
|
|
hasRIDFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_DBROOT:
|
|
hasDBRootFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_PM:
|
|
hasPMFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_SEGMENT:
|
|
hasSegmentFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_SEGMENTDIR:
|
|
hasSegmentDirFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_EXTENTMIN:
|
|
hasMinFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_EXTENTMAX:
|
|
hasMaxFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_BLOCKID:
|
|
hasLBIDFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_EXTENTID:
|
|
hasExtentIDFilter = true;
|
|
break;
|
|
|
|
case PSEUDO_PARTITION:
|
|
hasPartitionFilter = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
fBPP->addFilterStep(*pcsp);
|
|
|
|
extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
|
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
|
|
|
|
for (uint32_t z = 0; z < pcsp->extents.size(); z++)
|
|
ref[pcsp->extents[z].range.start] = pcsp->extents[z];
|
|
|
|
colWidth = (pcsp->colType()).colWidth;
|
|
isFilterFeeder = pcsp->getFeederFlag();
|
|
|
|
// it17 does not allow combined AND/OR, this pcolstep is for hashjoin optimization.
|
|
if (bop == BOP_OR && isFilterFeeder == false)
|
|
fBPP->setForHJ(true);
|
|
}
|
|
else
|
|
{
|
|
pColScanStep* pcss = dynamic_cast<pColScanStep*>(jobStep);
|
|
|
|
if (pcss != 0)
|
|
{
|
|
fBPP->addFilterStep(*pcss, lastScannedLBID);
|
|
|
|
extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
|
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
|
|
|
|
for (uint32_t z = 0; z < pcss->extents.size(); z++)
|
|
ref[pcss->extents[z].range.start] = pcss->extents[z];
|
|
|
|
colWidth = (pcss->colType()).colWidth;
|
|
isFilterFeeder = pcss->getFeederFlag();
|
|
}
|
|
else
|
|
{
|
|
pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(jobStep);
|
|
|
|
if (pdsp != 0)
|
|
{
|
|
fBPP->addFilterStep(*pdsp);
|
|
colWidth = (pdsp->colType()).colWidth;
|
|
}
|
|
else
|
|
{
|
|
FilterStep* pfsp = dynamic_cast<FilterStep*>(jobStep);
|
|
|
|
if (pfsp)
|
|
{
|
|
fBPP->addFilterStep(*pfsp);
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
if (colWidth > fColWidth)
|
|
{
|
|
fColWidth = colWidth;
|
|
}
|
|
}
|
|
|
|
void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
|
|
{
|
|
int colWidth = 0;
|
|
|
|
if (jobStep2 != NULL)
|
|
{
|
|
pDictionaryStep* pdsp = 0;
|
|
pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
|
|
|
|
if (pcsp != 0)
|
|
{
|
|
pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
|
|
fBPP->addProjectStep(*pcsp, *pdsp);
|
|
|
|
//@Bug 961
|
|
if (!pcsp->isExeMgr())
|
|
fBPP->setNeedRidsAtDelivery(true);
|
|
|
|
colWidth = (pcsp->colType()).colWidth;
|
|
projectOids.push_back(jobStep1->oid());
|
|
}
|
|
else
|
|
{
|
|
PassThruStep* psth = dynamic_cast<PassThruStep*>(jobStep1);
|
|
|
|
if (psth != 0)
|
|
{
|
|
pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
|
|
fBPP->addProjectStep(*psth, *pdsp);
|
|
|
|
//@Bug 961
|
|
if (!psth->isExeMgr())
|
|
fBPP->setNeedRidsAtDelivery(true);
|
|
|
|
projectOids.push_back(jobStep1->oid());
|
|
colWidth = (psth->colType()).colWidth;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
|
|
|
|
if (pcsp != 0)
|
|
{
|
|
PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep1);
|
|
|
|
if (pseudo)
|
|
{
|
|
fBPP->addProjectStep(*pseudo);
|
|
}
|
|
else
|
|
fBPP->addProjectStep(*pcsp);
|
|
|
|
extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
|
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
|
|
|
|
for (uint32_t z = 0; z < pcsp->extents.size(); z++)
|
|
ref[pcsp->extents[z].range.start] = pcsp->extents[z];
|
|
|
|
//@Bug 961
|
|
if (!pcsp->isExeMgr())
|
|
fBPP->setNeedRidsAtDelivery(true);
|
|
|
|
colWidth = (pcsp->colType()).colWidth;
|
|
projectOids.push_back(jobStep1->oid());
|
|
}
|
|
else
|
|
{
|
|
PassThruStep* passthru = dynamic_cast<PassThruStep*>(jobStep1);
|
|
|
|
if (passthru != 0)
|
|
{
|
|
idbassert(!fBPP->getFilterSteps().empty());
|
|
|
|
if (static_cast<CalpontSystemCatalog::OID>(fBPP->getFilterSteps().back()->getOID()) != passthru->oid())
|
|
{
|
|
SJSTEP pts;
|
|
|
|
if (passthru->pseudoType() == 0)
|
|
{
|
|
pts.reset(new pColStep(*passthru));
|
|
pcsp = dynamic_cast<pColStep*>(pts.get());
|
|
}
|
|
else
|
|
{
|
|
pts.reset(new PseudoColStep(*passthru));
|
|
pcsp = dynamic_cast<PseudoColStep*>(pts.get());
|
|
}
|
|
|
|
fBPP->addProjectStep(*pcsp);
|
|
|
|
if (!passthru->isExeMgr())
|
|
fBPP->setNeedRidsAtDelivery(true);
|
|
|
|
colWidth = passthru->colType().colWidth;
|
|
projectOids.push_back(pts->oid());
|
|
}
|
|
else
|
|
{
|
|
fBPP->addProjectStep(*passthru);
|
|
|
|
//@Bug 961
|
|
if (!passthru->isExeMgr())
|
|
fBPP->setNeedRidsAtDelivery(true);
|
|
|
|
colWidth = (passthru->colType()).colWidth;
|
|
projectOids.push_back(jobStep1->oid());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (colWidth > fColWidth)
|
|
{
|
|
fColWidth = colWidth;
|
|
}
|
|
}
|
|
|
|
void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
|
|
{
|
|
const vector<SCommand>& colCmdVec = fBPP->getFilterSteps();
|
|
vector<ColumnCommandJL*> cpColVec;
|
|
vector<SP_LBIDList> lbidListVec;
|
|
ColumnCommandJL* colCmd = 0;
|
|
|
|
// @bug 2123. We call this earlier in the process for the hash join estimation process now. Return if we've already done the work.
|
|
if (fCPEvaluated)
|
|
{
|
|
return;
|
|
}
|
|
|
|
fCPEvaluated = true;
|
|
|
|
if (colCmdVec.size() == 0)
|
|
return;
|
|
|
|
for (uint32_t i = 0; i < colCmdVec.size(); i++)
|
|
{
|
|
colCmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
|
|
|
|
if (!colCmd || dynamic_cast<PseudoCCJL*>(colCmdVec[i].get()))
|
|
continue;
|
|
|
|
SP_LBIDList tmplbidList(new LBIDList(0));
|
|
|
|
if (tmplbidList->CasualPartitionDataType(colCmd->getColType().colDataType, colCmd->getColType().colWidth))
|
|
{
|
|
lbidListVec.push_back(tmplbidList);
|
|
cpColVec.push_back(colCmd);
|
|
}
|
|
|
|
// @Bug 3503. Use the total table size as the estimate for non CP columns.
|
|
else if (fEstimatedRows == 0 && estimateRowCounts)
|
|
{
|
|
RowEstimator rowEstimator;
|
|
fEstimatedRows = rowEstimator.estimateRowsForNonCPColumn(*colCmd);
|
|
}
|
|
}
|
|
|
|
|
|
if (cpColVec.size() == 0)
|
|
return;
|
|
|
|
const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0);
|
|
|
|
for (uint32_t idx = 0; idx < numExtents; idx++)
|
|
{
|
|
scanFlags[idx] = true;
|
|
|
|
for (uint32_t i = 0; i < cpColVec.size(); i++)
|
|
{
|
|
colCmd = cpColVec[i];
|
|
const EMEntry& extent = colCmd->getExtents()[idx];
|
|
|
|
/* If any column filter eliminates an extent, it doesn't get scanned */
|
|
scanFlags[idx] = scanFlags[idx] &&
|
|
(ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID ||
|
|
lbidListVec[i]->CasualPartitionPredicate(
|
|
extent.partition.cprange,
|
|
&(colCmd->getFilterString()),
|
|
colCmd->getFilterCount(),
|
|
colCmd->getColType(),
|
|
colCmd->getBOP())
|
|
);
|
|
|
|
if (!scanFlags[idx])
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// @bug 2123. Use the casual partitioning information to estimate the number of rows that will be returned for use in estimating
|
|
// the large side table for hashjoins.
|
|
if (estimateRowCounts)
|
|
{
|
|
RowEstimator rowEstimator;
|
|
fEstimatedRows = rowEstimator.estimateRows(cpColVec, scanFlags, dbrm, fOid);
|
|
}
|
|
}
|
|
|
|
void TupleBPS::startPrimitiveThread()
|
|
{
|
|
pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this));
|
|
}
|
|
|
|
void TupleBPS::startAggregationThread()
|
|
{
|
|
// This block of code starts all threads up front
|
|
// fMaxNumThreads = 1;
|
|
// fNumThreads = fMaxNumThreads;
|
|
// for (uint32_t i = 0; i < fMaxNumThreads; i++)
|
|
// fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i)));
|
|
|
|
// This block of code starts one thread at a time
|
|
if (fNumThreads >= fMaxNumThreads)
|
|
return;
|
|
|
|
fNumThreads++;
|
|
fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1)));
|
|
}
|
|
|
|
void TupleBPS::serializeJoiner()
|
|
{
|
|
ByteStream bs;
|
|
bool more = true;
|
|
|
|
/* false from nextJoinerMsg means it's the last msg,
|
|
it's not exactly the exit condition*/
|
|
while (more)
|
|
{
|
|
{
|
|
// code block to release the lock immediatly
|
|
boost::mutex::scoped_lock lk(serializeJoinerMutex);
|
|
more = fBPP->nextTupleJoinerMsg(bs);
|
|
}
|
|
#ifdef JLF_DEBUG
|
|
cout << "serializing joiner into " << bs.length() << " bytes" << endl;
|
|
#endif
|
|
fDec->write(uniqueID, bs);
|
|
bs.restart();
|
|
}
|
|
|
|
}
|
|
|
|
void TupleBPS::serializeJoiner(uint32_t conn)
|
|
{
|
|
// We need this lock for TupleBPS::serializeJoiner()
|
|
boost::mutex::scoped_lock lk(serializeJoinerMutex);
|
|
|
|
ByteStream bs;
|
|
bool more = true;
|
|
|
|
/* false from nextJoinerMsg means it's the last msg,
|
|
it's not exactly the exit condition*/
|
|
while (more)
|
|
{
|
|
more = fBPP->nextTupleJoinerMsg(bs);
|
|
fDec->write(bs, conn);
|
|
bs.restart();
|
|
}
|
|
}
|
|
|
|
void TupleBPS::prepCasualPartitioning()
|
|
{
|
|
uint32_t i;
|
|
int64_t min, max, seq;
|
|
int128_t bigMin, bigMax;
|
|
boost::mutex::scoped_lock lk(cpMutex);
|
|
|
|
for (i = 0; i < scannedExtents.size(); i++)
|
|
{
|
|
if (fOid >= 3000)
|
|
{
|
|
scanFlags[i] = scanFlags[i] && runtimeCPFlags[i];
|
|
|
|
if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType,
|
|
fColType.colWidth))
|
|
{
|
|
if (fColType.colWidth <= 8)
|
|
{
|
|
lbidList->GetMinMax(min, max, seq, (int64_t) scannedExtents[i].range.start,
|
|
&scannedExtents, fColType.colDataType);
|
|
}
|
|
else if (fColType.colWidth == 16)
|
|
{
|
|
lbidList->GetMinMax(bigMin, bigMax, seq, (int64_t) scannedExtents[i].range.start,
|
|
&scannedExtents, fColType.colDataType);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
scanFlags[i] = true;
|
|
}
|
|
}
|
|
|
|
bool TupleBPS::goodExtentCount()
|
|
{
|
|
uint32_t eCount = extentsMap.begin()->second.size();
|
|
map<CalpontSystemCatalog::OID, tr1::unordered_map<int64_t, EMEntry> >
|
|
::iterator it;
|
|
|
|
for (it = extentsMap.begin(); it != extentsMap.end(); ++it)
|
|
if (it->second.size() != eCount)
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
void TupleBPS::initExtentMarkers()
|
|
{
|
|
numDBRoots = fRm->getDBRootCount();
|
|
lastExtent.resize(numDBRoots);
|
|
lastScannedLBID.resize(numDBRoots);
|
|
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>& ref = extentsMap[fOid];
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>::iterator it;
|
|
|
|
// Map part# and seg# to an extent count per segment file.
|
|
// Part# is 32 hi order bits of key, seg# is 32 lo order bits
|
|
std::tr1::unordered_map<uint64_t, int> extentCountPerDbFile;
|
|
|
|
scannedExtents.clear();
|
|
|
|
for (it = ref.begin(); it != ref.end(); ++it)
|
|
{
|
|
scannedExtents.push_back(it->second);
|
|
|
|
//@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
|
|
// Track how many extents are in each segment file
|
|
if (fExtentsPerSegFile > 1)
|
|
{
|
|
EMEntry& e = it->second;
|
|
uint64_t key = ((uint64_t)e.partitionNum << 32) + e.segmentNum;
|
|
++extentCountPerDbFile[key];
|
|
}
|
|
}
|
|
|
|
sort(scannedExtents.begin(), scannedExtents.end(), ExtentSorter());
|
|
|
|
numExtents = scannedExtents.size();
|
|
// @1098 initialize scanFlags to be true
|
|
scanFlags.assign(numExtents, true);
|
|
runtimeCPFlags.assign(numExtents, true);
|
|
|
|
for (uint32_t i = 0; i < numDBRoots; i++)
|
|
lastExtent[i] = -1;
|
|
|
|
for (uint32_t i = 0; i < scannedExtents.size(); i++)
|
|
{
|
|
uint32_t dbRoot = scannedExtents[i].dbRoot - 1;
|
|
|
|
/* Kludge to account for gaps in the dbroot mapping. */
|
|
if (scannedExtents[i].dbRoot > numDBRoots)
|
|
{
|
|
lastExtent.resize(scannedExtents[i].dbRoot);
|
|
lastScannedLBID.resize(scannedExtents[i].dbRoot);
|
|
|
|
for (uint32_t z = numDBRoots; z < scannedExtents[i].dbRoot; z++)
|
|
lastExtent[z] = -1;
|
|
|
|
numDBRoots = scannedExtents[i].dbRoot;
|
|
}
|
|
|
|
if ((scannedExtents[i].status == EXTENTAVAILABLE) && (lastExtent[dbRoot] < (int) i))
|
|
lastExtent[dbRoot] = i;
|
|
|
|
//@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
|
|
// Use special status (EXTENTSTATUSMAX+1) to denote a single extent
|
|
// file with HWM 0; retrieve 1 block and not full extent.
|
|
if ((fExtentsPerSegFile > 1) && (scannedExtents[i].HWM == 0))
|
|
{
|
|
uint64_t key = ((uint64_t)scannedExtents[i].partitionNum << 32) +
|
|
scannedExtents[i].segmentNum;
|
|
|
|
if (extentCountPerDbFile[key] == 1)
|
|
scannedExtents[i].status = EXTENTSTATUSMAX + 1;
|
|
}
|
|
}
|
|
|
|
// if only 1 block is written in the last extent, HWM is 0 and didn't get counted.
|
|
for (uint32_t i = 0; i < numDBRoots; i++)
|
|
{
|
|
if (lastExtent[i] != -1)
|
|
lastScannedLBID[i] = scannedExtents[lastExtent[i]].range.start +
|
|
(scannedExtents[lastExtent[i]].HWM - scannedExtents[lastExtent[i]].blockOffset);
|
|
else
|
|
lastScannedLBID[i] = -1;
|
|
}
|
|
}
|
|
|
|
void TupleBPS::reloadExtentLists()
|
|
{
|
|
/*
|
|
* Iterate over each ColumnCommand instance
|
|
*
|
|
* 1) reload its extent array
|
|
* 2) update TupleBPS's extent array
|
|
* 3) update vars dependent on the extent layout (lastExtent, scanFlags, etc)
|
|
*/
|
|
|
|
uint32_t i, j;
|
|
ColumnCommandJL* cc;
|
|
vector<SCommand>& filters = fBPP->getFilterSteps();
|
|
vector<SCommand>& projections = fBPP->getProjectSteps();
|
|
uint32_t oid;
|
|
|
|
/* To reduce the race, make all CC's get new extents as close together
|
|
* as possible, then rebuild the local copies.
|
|
*/
|
|
|
|
for (i = 0; i < filters.size(); i++)
|
|
{
|
|
cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
|
|
|
|
if (cc != NULL)
|
|
cc->reloadExtents();
|
|
}
|
|
|
|
for (i = 0; i < projections.size(); i++)
|
|
{
|
|
cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
|
|
|
|
if (cc != NULL)
|
|
cc->reloadExtents();
|
|
}
|
|
|
|
extentsMap.clear();
|
|
|
|
for (i = 0; i < filters.size(); i++)
|
|
{
|
|
cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
|
|
|
|
if (cc == NULL)
|
|
continue;
|
|
|
|
const vector<EMEntry>& extents = cc->getExtents();
|
|
oid = cc->getOID();
|
|
|
|
extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
|
|
|
|
for (j = 0; j < extents.size(); j++)
|
|
mref[extents[j].range.start] = extents[j];
|
|
}
|
|
|
|
for (i = 0; i < projections.size(); i++)
|
|
{
|
|
cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
|
|
|
|
if (cc == NULL)
|
|
continue;
|
|
|
|
const vector<EMEntry>& extents = cc->getExtents();
|
|
oid = cc->getOID();
|
|
|
|
extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
|
|
|
|
for (j = 0; j < extents.size(); j++)
|
|
mref[extents[j].range.start] = extents[j];
|
|
}
|
|
|
|
initExtentMarkers();
|
|
}
|
|
|
|
void TupleBPS::run()
|
|
{
|
|
uint32_t i;
|
|
boost::mutex::scoped_lock lk(jlLock);
|
|
uint32_t retryCounter = 0;
|
|
const uint32_t retryMax = 1000; // 50s max; we've seen a 15s window so 50s should be 'safe'
|
|
const uint32_t waitInterval = 50000; // in us
|
|
|
|
if (fRunExecuted)
|
|
return;
|
|
|
|
fRunExecuted = true;
|
|
|
|
// make sure each numeric column has the same # of extents! See bugs 4564 and 3607
|
|
try
|
|
{
|
|
while (!goodExtentCount() && retryCounter++ < retryMax)
|
|
{
|
|
usleep(waitInterval);
|
|
reloadExtentLists();
|
|
}
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
ostringstream os;
|
|
os << "TupleBPS: Could not get a consistent extent count for each column. Got '"
|
|
<< e.what() << "'\n";
|
|
catchHandler(os.str(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
|
|
fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
|
|
return;
|
|
}
|
|
|
|
if (retryCounter == retryMax)
|
|
{
|
|
catchHandler("TupleBPS: Could not get a consistent extent count for each column.",
|
|
ERR_TUPLE_BPS, fErrorInfo, fSessionId);
|
|
fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
|
|
return;
|
|
}
|
|
|
|
if (traceOn())
|
|
{
|
|
syslogStartStep(16, // exemgr subsystem
|
|
std::string("TupleBPS")); // step name
|
|
}
|
|
|
|
ByteStream bs;
|
|
|
|
if (fDelivery)
|
|
{
|
|
deliveryDL.reset(new RowGroupDL(1, 5));
|
|
deliveryIt = deliveryDL->getIterator();
|
|
}
|
|
|
|
fBPP->setThreadCount(fMaxNumThreads);
|
|
|
|
if (doJoin)
|
|
for (i = 0; i < smallSideCount; i++)
|
|
tjoiners[i]->setThreadCount(fMaxNumThreads);
|
|
|
|
if (fe1)
|
|
fBPP->setFEGroup1(fe1, fe1Input);
|
|
|
|
if (fe2 && runFEonPM)
|
|
fBPP->setFEGroup2(fe2, fe2Output);
|
|
|
|
if (fe2)
|
|
{
|
|
primRowGroup.initRow(&fe2InRow);
|
|
fe2Output.initRow(&fe2OutRow);
|
|
}
|
|
|
|
try
|
|
{
|
|
fDec->addDECEventListener(this);
|
|
fBPP->priority(priority());
|
|
fBPP->createBPP(bs);
|
|
fDec->write(uniqueID, bs);
|
|
BPPIsAllocated = true;
|
|
|
|
if (doJoin && tjoiners[0]->inPM())
|
|
serializeJoiner();
|
|
|
|
prepCasualPartitioning();
|
|
startPrimitiveThread();
|
|
fProducerThreads.clear();
|
|
fProducerThreads.reserve(fMaxNumThreads);
|
|
startAggregationThread();
|
|
}
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(),
|
|
logging::ERR_TUPLE_BPS,
|
|
logging::ERR_ALWAYS_CRITICAL,
|
|
"TupleBPS::run()");
|
|
fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
|
|
}
|
|
}
|
|
|
|
void TupleBPS::join()
|
|
{
|
|
boost::mutex::scoped_lock lk(jlLock);
|
|
|
|
if (joinRan)
|
|
return;
|
|
|
|
joinRan = true;
|
|
|
|
if (fRunExecuted)
|
|
{
|
|
if (msgsRecvd < msgsSent)
|
|
{
|
|
// wake up the sending thread, it should drain the input dl and exit
|
|
boost::unique_lock<boost::mutex> tplLock(tplMutex);
|
|
condvarWakeupProducer.notify_all();
|
|
tplLock.unlock();
|
|
}
|
|
|
|
if (pThread)
|
|
jobstepThreadPool.join(pThread);
|
|
|
|
jobstepThreadPool.join(fProducerThreads);
|
|
|
|
if (BPPIsAllocated)
|
|
{
|
|
ByteStream bs;
|
|
fDec->removeDECEventListener(this);
|
|
fBPP->destroyBPP(bs);
|
|
|
|
try
|
|
{
|
|
fDec->write(uniqueID, bs);
|
|
}
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(),
|
|
logging::ERR_TUPLE_BPS,
|
|
logging::ERR_ALWAYS_CRITICAL,
|
|
"TupleBPS::join()");
|
|
}
|
|
|
|
BPPIsAllocated = false;
|
|
fDec->removeQueue(uniqueID);
|
|
tjoiners.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
void TupleBPS::sendError(uint16_t status)
|
|
{
|
|
ByteStream msgBpp;
|
|
fBPP->setCount(1);
|
|
fBPP->setStatus(status);
|
|
fBPP->runErrorBPP(msgBpp);
|
|
|
|
try
|
|
{
|
|
fDec->write(uniqueID, msgBpp);
|
|
}
|
|
catch (...)
|
|
{
|
|
// this fcn is only called in exception handlers
|
|
// let the first error take precedence
|
|
}
|
|
|
|
fBPP->reset();
|
|
finishedSending = true;
|
|
condvar.notify_all();
|
|
condvarWakeupProducer.notify_all();
|
|
}
|
|
|
|
uint32_t TupleBPS::nextBand(ByteStream& bs)
|
|
{
|
|
bool more = true;
|
|
RowGroup& realOutputRG = (fe2 ? fe2Output : primRowGroup);
|
|
RGData rgData;
|
|
uint32_t rowCount = 0;
|
|
|
|
bs.restart();
|
|
|
|
while (rowCount == 0 && more)
|
|
{
|
|
more = deliveryDL->next(deliveryIt, &rgData);
|
|
|
|
if (!more)
|
|
rgData = fBPP->getErrorRowGroupData(status());
|
|
|
|
realOutputRG.setData(&rgData);
|
|
rowCount = realOutputRG.getRowCount();
|
|
|
|
if ((more && rowCount > 0) || !more)
|
|
realOutputRG.serializeRGData(bs);
|
|
}
|
|
|
|
return rowCount;
|
|
}
|
|
|
|
/* The current interleaving rotates over PMs, clustering jobs for a single PM
|
|
* by dbroot to keep block accesses adjacent & make best use of prefetching &
|
|
* cache.
|
|
*/
|
|
void TupleBPS::interleaveJobs(vector<Job>* jobs) const
|
|
{
|
|
vector<Job> newJobs;
|
|
uint32_t i;
|
|
uint32_t pmCount = 0;
|
|
scoped_array<deque<Job> > bins;
|
|
|
|
// the input is grouped by dbroot
|
|
|
|
if (pmCount == 1)
|
|
return;
|
|
|
|
/* Need to get the 'real' PM count */
|
|
for (i = 0; i < jobs->size(); i++)
|
|
if (pmCount < (*jobs)[i].connectionNum + 1)
|
|
pmCount = (*jobs)[i].connectionNum + 1;
|
|
|
|
bins.reset(new deque<Job>[pmCount]);
|
|
|
|
// group by connection number
|
|
for (i = 0; i < jobs->size(); i++)
|
|
bins[(*jobs)[i].connectionNum].push_back((*jobs)[i]);
|
|
|
|
// interleave by connection num
|
|
bool noWorkDone;
|
|
|
|
while (newJobs.size() < jobs->size())
|
|
{
|
|
noWorkDone = true;
|
|
|
|
for (i = 0; i < pmCount; i++)
|
|
{
|
|
if (!bins[i].empty())
|
|
{
|
|
newJobs.push_back(bins[i].front());
|
|
bins[i].pop_front();
|
|
noWorkDone = false;
|
|
}
|
|
}
|
|
|
|
idbassert(!noWorkDone);
|
|
}
|
|
|
|
#if 0
|
|
/* Work in progress */
|
|
// up to this point, only the first connection to a PM is used.
|
|
// the last step is to round-robin on the connections of each PM.
|
|
// ex: on a 3 PM system where connections per PM is 2,
|
|
// connections 0 and 3 are for PM 1, 1 and 4 are PM2, 2 and 5 are PM3.
|
|
uint32_t* jobCounters = (uint32_t*) alloca(pmCount * sizeof(uint32_t));
|
|
memset(jobCounters, 0, pmCount * sizeof(uint32_t));
|
|
|
|
for (i = 0; i < newJobs.size(); i++)
|
|
{
|
|
uint32_t& conn = newJobs[i].connectionNum; // for readability's sake
|
|
conn = conn + (pmCount * jobCounters[conn]);
|
|
jobCounters[conn]++;
|
|
}
|
|
|
|
#endif
|
|
|
|
jobs->swap(newJobs);
|
|
// cout << "-------------\n";
|
|
// for (i = 0; i < jobs->size(); i++)
|
|
// cout << "job " << i+1 << ": dbroot " << (*jobs)[i].dbroot << ", PM "
|
|
// << (*jobs)[i].connectionNum + 1 << endl;
|
|
}
|
|
|
|
void TupleBPS::sendJobs(const vector<Job>& jobs)
|
|
{
|
|
uint32_t i;
|
|
boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
|
|
|
|
for (i = 0; i < jobs.size() && !cancelled(); i++)
|
|
{
|
|
fDec->write(uniqueID, *(jobs[i].msg));
|
|
tplLock.lock();
|
|
msgsSent += jobs[i].expectedResponses;
|
|
|
|
if (recvWaiting)
|
|
condvar.notify_all();
|
|
|
|
while ((msgsSent - msgsRecvd > fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)
|
|
&& !fDie)
|
|
{
|
|
sendWaiting = true;
|
|
condvarWakeupProducer.wait(tplLock);
|
|
sendWaiting = false;
|
|
}
|
|
|
|
tplLock.unlock();
|
|
}
|
|
}
|
|
|
|
template<typename T>
|
|
bool TupleBPS::compareSingleValue(uint8_t COP, T val1, T val2) const
|
|
{
|
|
switch (COP)
|
|
{
|
|
case COMPARE_LT:
|
|
case COMPARE_NGE:
|
|
return (val1 < val2);
|
|
|
|
case COMPARE_LE:
|
|
case COMPARE_NGT:
|
|
return (val1 <= val2);
|
|
|
|
case COMPARE_GT:
|
|
case COMPARE_NLE:
|
|
return (val1 > val2);
|
|
|
|
case COMPARE_GE:
|
|
case COMPARE_NLT:
|
|
return (val1 >= val2);
|
|
|
|
case COMPARE_EQ:
|
|
return (val1 == val2);
|
|
|
|
case COMPARE_NE:
|
|
return (val1 != val2);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/* (range COP val) comparisons */
|
|
bool TupleBPS::compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const
|
|
{
|
|
switch (COP)
|
|
{
|
|
case COMPARE_LT:
|
|
case COMPARE_NGE:
|
|
return (min < val);
|
|
|
|
case COMPARE_LE:
|
|
case COMPARE_NGT:
|
|
return (min <= val);
|
|
|
|
case COMPARE_GT:
|
|
case COMPARE_NLE:
|
|
return (max > val);
|
|
|
|
case COMPARE_GE:
|
|
case COMPARE_NLT:
|
|
return (max >= val);
|
|
|
|
case COMPARE_EQ: // an 'in' comparison
|
|
return (val >= min && val <= max);
|
|
|
|
case COMPARE_NE: // 'not in'
|
|
return (val < min || val > max);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool TupleBPS::processSingleFilterString_ranged(int8_t BOP, int8_t colWidth, int64_t min, int64_t max, const uint8_t* filterString,
|
|
uint32_t filterCount) const
|
|
{
|
|
uint j;
|
|
bool ret = true;
|
|
|
|
for (j = 0; j < filterCount; j++)
|
|
{
|
|
int8_t COP;
|
|
int64_t val2;
|
|
bool thisPredicate;
|
|
COP = *filterString++;
|
|
filterString++; // skip the round var, don't think that applies here
|
|
|
|
switch (colWidth)
|
|
{
|
|
case 1:
|
|
val2 = *((int8_t*) filterString);
|
|
filterString++;
|
|
break;
|
|
|
|
case 2:
|
|
val2 = *((int16_t*) filterString);
|
|
filterString += 2;
|
|
break;
|
|
|
|
case 4:
|
|
val2 = *((int32_t*) filterString);
|
|
filterString += 4;
|
|
break;
|
|
|
|
case 8:
|
|
val2 = *((int64_t*) filterString);
|
|
filterString += 8;
|
|
break;
|
|
|
|
default:
|
|
throw logic_error("invalid column width");
|
|
}
|
|
|
|
thisPredicate = compareRange(COP, min, max, val2);
|
|
|
|
if (j == 0)
|
|
ret = thisPredicate;
|
|
|
|
if (BOP == BOP_OR && thisPredicate)
|
|
return true;
|
|
else if (BOP == BOP_AND && !thisPredicate)
|
|
return false;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
template<typename T>
|
|
bool TupleBPS::processSingleFilterString(int8_t BOP, int8_t colWidth, T val, const uint8_t* filterString,
|
|
uint32_t filterCount) const
|
|
{
|
|
uint j;
|
|
bool ret = true;
|
|
|
|
for (j = 0; j < filterCount; j++)
|
|
{
|
|
int8_t COP;
|
|
int64_t val2;
|
|
int128_t bigVal2;
|
|
bool thisPredicate;
|
|
COP = *filterString++;
|
|
filterString++; // skip the round var, don't think that applies here
|
|
|
|
switch (colWidth)
|
|
{
|
|
case 1:
|
|
val2 = *((int8_t*) filterString);
|
|
filterString++;
|
|
break;
|
|
|
|
case 2:
|
|
val2 = *((int16_t*) filterString);
|
|
filterString += 2;
|
|
break;
|
|
|
|
case 4:
|
|
val2 = *((int32_t*) filterString);
|
|
filterString += 4;
|
|
break;
|
|
|
|
case 8:
|
|
val2 = *((int64_t*) filterString);
|
|
filterString += 8;
|
|
break;
|
|
|
|
case 16:
|
|
bigVal2 = *((int128_t*) filterString);
|
|
filterString += 16;
|
|
break;
|
|
|
|
default:
|
|
throw logic_error("invalid column width");
|
|
}
|
|
|
|
// Assumption is that colWidth > 0
|
|
if (static_cast<uint8_t>(colWidth) < datatypes::MAXDECIMALWIDTH)
|
|
thisPredicate = compareSingleValue(COP, (int64_t) val, val2);
|
|
else
|
|
thisPredicate = compareSingleValue(COP, (int128_t) val, bigVal2);
|
|
|
|
if (j == 0)
|
|
ret = thisPredicate;
|
|
|
|
if (BOP == BOP_OR && thisPredicate)
|
|
return true;
|
|
else if (BOP == BOP_AND && !thisPredicate)
|
|
return false;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
template<typename T>
|
|
bool TupleBPS::processOneFilterType(int8_t colWidth, T value, uint32_t type) const
|
|
{
|
|
const vector<SCommand>& filters = fBPP->getFilterSteps();
|
|
uint i;
|
|
bool ret = true;
|
|
bool firstPseudo = true;
|
|
|
|
for (i = 0; i < filters.size(); i++)
|
|
{
|
|
PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
|
|
|
|
if (!pseudo || pseudo->getFunction() != type)
|
|
continue;
|
|
|
|
int8_t BOP = pseudo->getBOP(); // I think this is the same as TupleBPS's bop var...?
|
|
|
|
/* 1-byte COP, 1-byte 'round', colWidth-bytes value */
|
|
const uint8_t* filterString = pseudo->getFilterString().buf();
|
|
uint32_t filterCount = pseudo->getFilterCount();
|
|
bool thisPredicate = processSingleFilterString(BOP, colWidth, value, filterString, filterCount);
|
|
|
|
if (firstPseudo)
|
|
{
|
|
firstPseudo = false;
|
|
ret = thisPredicate;
|
|
}
|
|
|
|
if (bop == BOP_OR && thisPredicate)
|
|
return true;
|
|
else if (bop == BOP_AND && !thisPredicate)
|
|
return false;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const
|
|
{
|
|
const vector<SCommand>& filters = fBPP->getFilterSteps();
|
|
uint i;
|
|
bool ret = true;
|
|
bool firstPseudo = true;
|
|
LBID_t firstLBID = emEntry.range.start;
|
|
LBID_t lastLBID = firstLBID + (emEntry.range.size * 1024) - 1;
|
|
|
|
for (i = 0; i < filters.size(); i++)
|
|
{
|
|
PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
|
|
|
|
if (!pseudo || pseudo->getFunction() != PSEUDO_BLOCKID)
|
|
continue;
|
|
|
|
int8_t BOP = pseudo->getBOP(); // I think this is the same as TupleBPS's bop var...?
|
|
|
|
/* 1-byte COP, 1-byte 'round', colWidth-bytes value */
|
|
const uint8_t* filterString = pseudo->getFilterString().buf();
|
|
uint32_t filterCount = pseudo->getFilterCount();
|
|
bool thisPredicate = processSingleFilterString_ranged(BOP, 8,
|
|
firstLBID, lastLBID, filterString, filterCount);
|
|
|
|
if (firstPseudo)
|
|
{
|
|
firstPseudo = false;
|
|
ret = thisPredicate;
|
|
}
|
|
|
|
if (bop == BOP_OR && thisPredicate)
|
|
return true;
|
|
else if (bop == BOP_AND && !thisPredicate)
|
|
return false;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<map<int, int> > dbRootPMMap) const
|
|
{
|
|
if (!hasPCFilter)
|
|
return true;
|
|
|
|
const EMEntry& emEntry = scannedExtents[extentIndex];
|
|
|
|
if (bop == BOP_AND)
|
|
{
|
|
/* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */
|
|
return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
|
|
&& (!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
|
|
&& (!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
|
|
&& (!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
|
|
&& (!hasExtentIDFilter || processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
|
|
&& (!hasMaxFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
|
|
(!fColType.isWideDecimalType() ?
|
|
processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hiVal, PSEUDO_EXTENTMAX) :
|
|
processOneFilterType(fColType.colWidth, emEntry.partition.cprange.bigHiVal, PSEUDO_EXTENTMAX)) : true))
|
|
&& (!hasMinFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
|
|
(!fColType.isWideDecimalType() ?
|
|
processOneFilterType(emEntry.range.size, emEntry.partition.cprange.loVal, PSEUDO_EXTENTMIN) :
|
|
processOneFilterType(fColType.colWidth, emEntry.partition.cprange.bigLoVal, PSEUDO_EXTENTMIN)) : true))
|
|
&& (!hasLBIDFilter || processLBIDFilter(emEntry))
|
|
;
|
|
}
|
|
else
|
|
{
|
|
return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
|
|
|| (hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
|
|
|| (hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
|
|
|| (hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
|
|
|| (hasExtentIDFilter && processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
|
|
|| (hasMaxFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
|
|
(!fColType.isWideDecimalType() ?
|
|
processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hiVal, PSEUDO_EXTENTMAX) :
|
|
processOneFilterType(fColType.colWidth, emEntry.partition.cprange.bigHiVal, PSEUDO_EXTENTMAX)) : false))
|
|
|| (hasMinFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
|
|
(!fColType.isWideDecimalType() ?
|
|
processOneFilterType(emEntry.range.size, emEntry.partition.cprange.loVal, PSEUDO_EXTENTMIN) :
|
|
processOneFilterType(fColType.colWidth, emEntry.partition.cprange.bigLoVal, PSEUDO_EXTENTMIN)) : false))
|
|
|| (hasLBIDFilter && processLBIDFilter(emEntry))
|
|
;
|
|
}
|
|
}
|
|
|
|
|
|
void TupleBPS::makeJobs(vector<Job>* jobs)
|
|
{
|
|
boost::shared_ptr<ByteStream> bs;
|
|
uint32_t i;
|
|
uint32_t lbidsToScan;
|
|
uint32_t blocksToScan;
|
|
uint32_t blocksPerJob;
|
|
LBID_t startingLBID;
|
|
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
boost::shared_ptr<map<int, int> > dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
|
|
boost::shared_ptr<map<int, int> > dbRootPMMap = oamCache->getDBRootToPMMap();
|
|
int localPMId = oamCache->getLocalPMId();
|
|
|
|
idbassert(ffirstStepType == SCAN);
|
|
|
|
if (fOid >= 3000 && bop == BOP_AND)
|
|
storeCasualPartitionInfo(false);
|
|
|
|
totalMsgs = 0;
|
|
|
|
for (i = 0; i < scannedExtents.size(); i++)
|
|
{
|
|
|
|
// the # of LBIDs to scan in this extent, if it will be scanned.
|
|
//@bug 5322: status EXTENTSTATUSMAX+1 means single block extent.
|
|
if ((scannedExtents[i].HWM == 0) &&
|
|
((int) i < lastExtent[scannedExtents[i].dbRoot - 1]) &&
|
|
(scannedExtents[i].status <= EXTENTSTATUSMAX))
|
|
lbidsToScan = scannedExtents[i].range.size * 1024;
|
|
else
|
|
lbidsToScan = scannedExtents[i].HWM - scannedExtents[i].blockOffset + 1;
|
|
|
|
// skip this extent if CP data rules it out or the scan has already passed
|
|
// the last extent for that DBRoot (import may be adding extents that shouldn't
|
|
// be read yet). Also skip if there's a pseudocolumn with a filter that would
|
|
// eliminate this extent
|
|
|
|
bool inBounds = ((int)i <= lastExtent[scannedExtents[i].dbRoot - 1]);
|
|
|
|
if (!inBounds)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if (!scanFlags[i])
|
|
{
|
|
fNumBlksSkipped += lbidsToScan;
|
|
continue;
|
|
}
|
|
|
|
if (!processPseudoColFilters(i, dbRootPMMap))
|
|
{
|
|
fNumBlksSkipped += lbidsToScan;
|
|
continue;
|
|
}
|
|
|
|
//if (!scanFlags[i] || !inBounds)
|
|
// continue;
|
|
|
|
/* Figure out many blocks have data in this extent
|
|
* Calc how many jobs to issue,
|
|
* Get the dbroot,
|
|
* construct the job msgs
|
|
*/
|
|
|
|
// Bug5741 If we are local only and this doesn't belongs to us, skip it
|
|
if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
|
|
{
|
|
if (localPMId == 0)
|
|
{
|
|
throw IDBExcept(ERR_LOCAL_QUERY_UM);
|
|
}
|
|
|
|
if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId)
|
|
continue;
|
|
}
|
|
|
|
// a necessary DB root is offline
|
|
if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
|
|
{
|
|
// MCOL-259 force a reload of the xml. This usualy fixes it.
|
|
Logger log;
|
|
log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap");
|
|
oamCache->forceReload();
|
|
dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
|
|
|
|
if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
|
|
{
|
|
log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap");
|
|
throw IDBExcept(ERR_DATA_OFFLINE);
|
|
}
|
|
}
|
|
|
|
|
|
// the # of logical blocks in this extent
|
|
if (lbidsToScan % fColType.colWidth)
|
|
blocksToScan = lbidsToScan / fColType.colWidth + 1;
|
|
else
|
|
blocksToScan = lbidsToScan / fColType.colWidth;
|
|
|
|
totalMsgs += blocksToScan;
|
|
|
|
// how many logical blocks to process with a single job (& single thread on the PM)
|
|
#if defined(_MSC_VER) && BOOST_VERSION < 105200
|
|
blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16UL);
|
|
#else
|
|
blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16U);
|
|
#endif
|
|
|
|
startingLBID = scannedExtents[i].range.start;
|
|
|
|
while (blocksToScan > 0)
|
|
{
|
|
uint32_t blocksThisJob = min(blocksToScan, blocksPerJob);
|
|
|
|
fBPP->setLBID(startingLBID, scannedExtents[i]);
|
|
fBPP->setCount(blocksThisJob);
|
|
bs.reset(new ByteStream());
|
|
fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]);
|
|
jobs->push_back(Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot],
|
|
blocksThisJob, bs));
|
|
blocksToScan -= blocksThisJob;
|
|
startingLBID += fColType.colWidth * blocksThisJob;
|
|
fBPP->reset();
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
void TupleBPS::sendPrimitiveMessages()
|
|
{
|
|
vector<Job> jobs;
|
|
|
|
idbassert(ffirstStepType == SCAN);
|
|
|
|
if (cancelled())
|
|
goto abort;
|
|
|
|
try
|
|
{
|
|
makeJobs(&jobs);
|
|
interleaveJobs(&jobs);
|
|
sendJobs(jobs);
|
|
}
|
|
catch (...)
|
|
{
|
|
sendError(logging::ERR_TUPLE_BPS);
|
|
handleException(std::current_exception(),
|
|
logging::ERR_TUPLE_BPS,
|
|
logging::ERR_ALWAYS_CRITICAL,
|
|
"st: " + std::to_string(fStepId) +
|
|
" TupleBPS::sendPrimitiveMessages()");
|
|
abort_nolock();
|
|
}
|
|
|
|
abort:
|
|
boost::unique_lock<boost::mutex> tplLock(tplMutex);
|
|
finishedSending = true;
|
|
condvar.notify_all();
|
|
tplLock.unlock();
|
|
}
|
|
|
|
struct _CPInfo
|
|
{
|
|
_CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val) { };
|
|
_CPInfo(int128_t BIGMIN, int128_t BIGMAX, uint64_t l, bool val) : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val) { };
|
|
union
|
|
{
|
|
int128_t bigMin;
|
|
int64_t min;
|
|
};
|
|
union
|
|
{
|
|
int128_t bigMax;
|
|
int64_t max;
|
|
};
|
|
uint64_t LBID;
|
|
bool valid;
|
|
};
|
|
|
|
void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
|
|
{
|
|
AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
|
|
RowGroupDL* dlp = (fDelivery ? deliveryDL.get() : dl->rowGroupDL());
|
|
|
|
uint32_t size = 0;
|
|
vector<boost::shared_ptr<ByteStream> > bsv;
|
|
|
|
RGData rgData;
|
|
vector<RGData> rgDatav;
|
|
vector<RGData> fromPrimProc;
|
|
|
|
bool validCPData;
|
|
bool hasBinaryColumn;
|
|
int128_t min;
|
|
int128_t max;
|
|
uint64_t lbid;
|
|
vector<_CPInfo> cpv;
|
|
uint32_t cachedIO;
|
|
uint32_t physIO;
|
|
uint32_t touchedBlocks;
|
|
uint32_t cachedIO_Thread = 0;
|
|
uint32_t physIO_Thread = 0;
|
|
uint32_t touchedBlocks_Thread = 0;
|
|
int64_t ridsReturned_Thread = 0;
|
|
bool lastThread = false;
|
|
uint32_t i, j, k;
|
|
RowGroup local_primRG = primRowGroup;
|
|
RowGroup local_outputRG = outputRowGroup;
|
|
bool didEOF = false;
|
|
bool unused;
|
|
|
|
/* Join vars */
|
|
vector<vector<Row::Pointer> > joinerOutput; // clean usage
|
|
Row largeSideRow, joinedBaseRow, largeNull, joinFERow; // LSR clean
|
|
scoped_array<Row> smallSideRows, smallNulls;
|
|
scoped_array<uint8_t> joinedBaseRowData;
|
|
scoped_array<uint8_t> joinFERowData;
|
|
shared_array<int> largeMapping;
|
|
vector<shared_array<int> > smallMappings;
|
|
vector<shared_array<int> > fergMappings;
|
|
RGData joinedData;
|
|
scoped_array<uint8_t> largeNullMemory;
|
|
scoped_array<shared_array<uint8_t> > smallNullMemory;
|
|
uint32_t matchCount;
|
|
|
|
/* Thread-scoped F&E 2 var */
|
|
Row postJoinRow; // postJoinRow is also used for joins
|
|
RowGroup local_fe2Output;
|
|
RGData local_fe2Data;
|
|
Row local_fe2OutRow;
|
|
funcexp::FuncExpWrapper local_fe2;
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
|
|
|
|
try
|
|
{
|
|
if (doJoin || fe2)
|
|
{
|
|
local_outputRG.initRow(&postJoinRow);
|
|
}
|
|
|
|
if (fe2)
|
|
{
|
|
local_fe2Output = fe2Output;
|
|
local_fe2Output.initRow(&local_fe2OutRow);
|
|
local_fe2Data.reinit(fe2Output);
|
|
local_fe2Output.setData(&local_fe2Data);
|
|
// local_fe2OutRow = fe2OutRow;
|
|
local_fe2 = *fe2;
|
|
}
|
|
|
|
if (doJoin)
|
|
{
|
|
joinerOutput.resize(smallSideCount);
|
|
smallSideRows.reset(new Row[smallSideCount]);
|
|
smallNulls.reset(new Row[smallSideCount]);
|
|
smallMappings.resize(smallSideCount);
|
|
fergMappings.resize(smallSideCount + 1);
|
|
smallNullMemory.reset(new shared_array<uint8_t>[smallSideCount]);
|
|
local_primRG.initRow(&largeSideRow);
|
|
local_outputRG.initRow(&joinedBaseRow, true);
|
|
joinedBaseRowData.reset(new uint8_t[joinedBaseRow.getSize()]);
|
|
joinedBaseRow.setData(joinedBaseRowData.get());
|
|
joinedBaseRow.initToNull();
|
|
largeMapping = makeMapping(local_primRG, local_outputRG);
|
|
|
|
bool hasJoinFE = false;
|
|
|
|
for (i = 0; i < smallSideCount; i++)
|
|
{
|
|
joinerMatchesRGs[i].initRow(&smallSideRows[i]);
|
|
smallMappings[i] = makeMapping(joinerMatchesRGs[i], local_outputRG);
|
|
|
|
// if (tjoiners[i]->semiJoin() || tjoiners[i]->antiJoin()) {
|
|
if (tjoiners[i]->hasFEFilter())
|
|
{
|
|
fergMappings[i] = makeMapping(joinerMatchesRGs[i], joinFERG);
|
|
hasJoinFE = true;
|
|
}
|
|
|
|
// }
|
|
}
|
|
|
|
if (hasJoinFE)
|
|
{
|
|
joinFERG.initRow(&joinFERow, true);
|
|
joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
|
|
memset(joinFERowData.get(), 0, joinFERow.getSize());
|
|
joinFERow.setData(joinFERowData.get());
|
|
fergMappings[smallSideCount] = makeMapping(local_primRG, joinFERG);
|
|
}
|
|
|
|
for (i = 0; i < smallSideCount; i++)
|
|
{
|
|
joinerMatchesRGs[i].initRow(&smallNulls[i], true);
|
|
smallNullMemory[i].reset(new uint8_t[smallNulls[i].getSize()]);
|
|
smallNulls[i].setData(smallNullMemory[i].get());
|
|
smallNulls[i].initToNull();
|
|
}
|
|
|
|
local_primRG.initRow(&largeNull, true);
|
|
largeNullMemory.reset(new uint8_t[largeNull.getSize()]);
|
|
largeNull.setData(largeNullMemory.get());
|
|
largeNull.initToNull();
|
|
|
|
#if 0
|
|
|
|
if (threadID == 0)
|
|
{
|
|
/* Some rowgroup debugging stuff. */
|
|
uint8_t* tmp8;
|
|
tmp8 = local_primRG.getData();
|
|
local_primRG.setData(NULL);
|
|
cout << "large-side RG: " << local_primRG.toString() << endl;
|
|
local_primRG.setData(tmp8);
|
|
|
|
for (i = 0; i < smallSideCount; i++)
|
|
{
|
|
tmp8 = joinerMatchesRGs[i].getData();
|
|
joinerMatchesRGs[i].setData(NULL);
|
|
cout << "small-side[" << i << "] RG: " << joinerMatchesRGs[i].toString() << endl;
|
|
}
|
|
|
|
tmp8 = local_outputRG.getData();
|
|
local_outputRG.setData(NULL);
|
|
cout << "output RG: " << local_outputRG.toString() << endl;
|
|
local_outputRG.setData(tmp8);
|
|
|
|
cout << "large mapping:\n";
|
|
|
|
for (i = 0; i < local_primRG.getColumnCount(); i++)
|
|
cout << largeMapping[i] << " ";
|
|
|
|
cout << endl;
|
|
|
|
for (uint32_t z = 0; z < smallSideCount; z++)
|
|
{
|
|
cout << "small mapping[" << z << "] :\n";
|
|
|
|
for (i = 0; i < joinerMatchesRGs[z].getColumnCount(); i++)
|
|
cout << smallMappings[z][i] << " ";
|
|
|
|
cout << endl;
|
|
}
|
|
}
|
|
|
|
#endif
|
|
}
|
|
|
|
tplLock.lock();
|
|
|
|
while (1)
|
|
{
|
|
// sync with the send side
|
|
while (!finishedSending && msgsSent == msgsRecvd)
|
|
{
|
|
recvWaiting++;
|
|
condvar.wait(tplLock);
|
|
recvWaiting--;
|
|
}
|
|
|
|
if (msgsSent == msgsRecvd && finishedSending)
|
|
break;
|
|
|
|
bool flowControlOn;
|
|
fDec->read_some(uniqueID, fNumThreads, bsv, &flowControlOn);
|
|
size = bsv.size();
|
|
|
|
// @bug 4562
|
|
if (traceOn() && fOid >= 3000 && dlTimes.FirstReadTime().tv_sec == 0)
|
|
dlTimes.setFirstReadTime();
|
|
|
|
if (fOid >= 3000 && threadID == 0 && sts.msg_type == StepTeleStats::ST_INVALID && size > 0)
|
|
{
|
|
sts.msg_type = StepTeleStats::ST_START;
|
|
sts.total_units_of_work = totalMsgs;
|
|
postStepStartTele(sts);
|
|
}
|
|
|
|
/* This is a simple ramp-up of the TBPS msg processing threads.
|
|
One thread is created by run(), and add'l threads are created
|
|
as needed. Right now, "as needed" means that flow control
|
|
is on, which means that the UM is not keeping up by definition,
|
|
or size > 5. We found that using flow control alone was not aggressive
|
|
enough when the messages were small. The 'size' parameter checks
|
|
the number of msgs waiting in the DEC buffers. Since each
|
|
message can be processed independently of the others, they can all
|
|
be processed in different threads. In benchmarking we found that
|
|
there was no end-to-end performance difference between using 1
|
|
and 20 msgs as the threshold. Erring on the side of aggressiveness,
|
|
we chose '5'.
|
|
'5' still preserves the original goal of not starting MAX threads
|
|
for small queries or when the UM can keep up with the PMs with
|
|
fewer threads. Tweak as necessary. */
|
|
|
|
if ((size > 5 || flowControlOn) && fNumThreads < fMaxNumThreads)
|
|
startAggregationThread();
|
|
|
|
for (uint32_t z = 0; z < size; z++)
|
|
{
|
|
if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z])))
|
|
++msgsRecvd;
|
|
}
|
|
|
|
//@Bug 1424,1298
|
|
|
|
if (sendWaiting && ((msgsSent - msgsRecvd) <=
|
|
(fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)))
|
|
{
|
|
condvarWakeupProducer.notify_one();
|
|
THROTTLEDEBUG << "receiveMultiPrimitiveMessages wakes up sending side .. " << " msgsSent: " << msgsSent << " msgsRecvd = " << msgsRecvd << endl;
|
|
}
|
|
|
|
/* If there's an error and the joblist is being aborted, don't
|
|
sit around forever waiting for responses. */
|
|
if (cancelled())
|
|
{
|
|
if (sendWaiting)
|
|
condvarWakeupProducer.notify_one();
|
|
|
|
break;
|
|
}
|
|
|
|
if (size == 0)
|
|
{
|
|
tplLock.unlock();
|
|
usleep(2000 * fNumThreads);
|
|
tplLock.lock();
|
|
continue;
|
|
}
|
|
|
|
tplLock.unlock();
|
|
|
|
for (i = 0; i < size && !cancelled(); i++)
|
|
{
|
|
ByteStream* bs = bsv[i].get();
|
|
|
|
// @bug 488. when PrimProc node is down. error out
|
|
//An error condition. We are not going to do anymore.
|
|
ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf());
|
|
|
|
if (bs->length() == 0 || hdr->Status > 0)
|
|
{
|
|
/* PM errors mean this should abort right away instead of draining the PM backlog */
|
|
tplLock.lock();
|
|
|
|
if (bs->length() == 0)
|
|
{
|
|
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN));
|
|
status(ERR_PRIMPROC_DOWN);
|
|
}
|
|
else
|
|
{
|
|
string errMsg;
|
|
|
|
bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
|
*bs >> errMsg;
|
|
status(hdr->Status);
|
|
errorMessage(errMsg);
|
|
}
|
|
|
|
abort_nolock();
|
|
goto out;
|
|
}
|
|
|
|
fromPrimProc.clear();
|
|
fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max,
|
|
&cachedIO, &physIO, &touchedBlocks, &unused, threadID, &hasBinaryColumn, fColType);
|
|
|
|
/* Another layer of messiness. Need to refactor this fcn. */
|
|
while (!fromPrimProc.empty() && !cancelled())
|
|
{
|
|
rgData = fromPrimProc.back();
|
|
fromPrimProc.pop_back();
|
|
|
|
local_primRG.setData(&rgData);
|
|
ridsReturned_Thread += local_primRG.getRowCount(); // TODO need the pre-join count even on PM joins... later
|
|
|
|
/* TupleHashJoinStep::joinOneRG() is a port of the main join loop here. Any
|
|
* changes made here should also be made there and vice versa. */
|
|
if (hasUMJoin || !fBPP->pmSendsFinalResult())
|
|
{
|
|
joinedData = RGData(local_outputRG);
|
|
local_outputRG.setData(&joinedData);
|
|
local_outputRG.resetRowGroup(local_primRG.getBaseRid());
|
|
local_outputRG.setDBRoot(local_primRG.getDBRoot());
|
|
local_primRG.getRow(0, &largeSideRow);
|
|
|
|
for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
|
|
{
|
|
matchCount = 0;
|
|
|
|
for (j = 0; j < smallSideCount; j++)
|
|
{
|
|
tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]);
|
|
#ifdef JLF_DEBUG
|
|
// Debugging code to print the matches
|
|
Row r;
|
|
joinerMatchesRGs[j].initRow(&r);
|
|
cout << joinerOutput[j].size() << " matches: \n";
|
|
for (uint32_t z = 0; z < joinerOutput[j].size(); z++) {
|
|
r.setPointer(joinerOutput[j][z]);
|
|
cout << " " << r.toString() << endl;
|
|
}
|
|
#endif
|
|
matchCount = joinerOutput[j].size();
|
|
|
|
if (tjoiners[j]->inUM())
|
|
{
|
|
/* Count the # of rows that pass the join filter */
|
|
if (tjoiners[j]->hasFEFilter() && matchCount > 0)
|
|
{
|
|
vector<Row::Pointer> newJoinerOutput;
|
|
applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow);
|
|
|
|
for (uint32_t z = 0; z < joinerOutput[j].size(); z++)
|
|
{
|
|
smallSideRows[j].setPointer(joinerOutput[j][z]);
|
|
applyMapping(fergMappings[j], smallSideRows[j], &joinFERow);
|
|
|
|
if (!tjoiners[j]->evaluateFilter(joinFERow, threadID))
|
|
matchCount--;
|
|
else
|
|
{
|
|
/* The first match includes it in a SEMI join result and excludes it from an ANTI join
|
|
* result. If it's SEMI & SCALAR however, it needs to continue.
|
|
*/
|
|
newJoinerOutput.push_back(joinerOutput[j][z]);
|
|
|
|
if (tjoiners[j]->antiJoin() || (tjoiners[j]->semiJoin() && !tjoiners[j]->scalar()))
|
|
break;
|
|
}
|
|
}
|
|
|
|
// the filter eliminated all matches, need to join with the NULL row
|
|
if (matchCount == 0 && tjoiners[j]->largeOuterJoin())
|
|
{
|
|
newJoinerOutput.push_back(Row::Pointer(smallNullMemory[j].get()));
|
|
matchCount = 1;
|
|
}
|
|
|
|
joinerOutput[j].swap(newJoinerOutput);
|
|
}
|
|
|
|
// XXXPAT: This has gone through enough revisions it would benefit
|
|
// from refactoring
|
|
|
|
/* If anti-join, reverse the result */
|
|
if (tjoiners[j]->antiJoin())
|
|
{
|
|
matchCount = (matchCount ? 0 : 1);
|
|
}
|
|
|
|
if (matchCount == 0)
|
|
{
|
|
joinerOutput[j].clear();
|
|
break;
|
|
}
|
|
else if (!tjoiners[j]->scalar() &&
|
|
(tjoiners[j]->antiJoin() || tjoiners[j]->semiJoin()))
|
|
{
|
|
joinerOutput[j].clear();
|
|
joinerOutput[j].push_back(Row::Pointer(smallNullMemory[j].get()));
|
|
matchCount = 1;
|
|
}
|
|
}
|
|
|
|
if (matchCount == 0 && tjoiners[j]->innerJoin())
|
|
break;
|
|
|
|
/* Scalar check */
|
|
if (tjoiners[j]->scalar() && matchCount > 1)
|
|
{
|
|
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW));
|
|
status(ERR_MORE_THAN_1_ROW);
|
|
abort();
|
|
}
|
|
|
|
if (tjoiners[j]->smallOuterJoin())
|
|
tjoiners[j]->markMatches(threadID, joinerOutput[j]);
|
|
|
|
}
|
|
|
|
if (matchCount > 0)
|
|
{
|
|
applyMapping(largeMapping, largeSideRow, &joinedBaseRow);
|
|
joinedBaseRow.setRid(largeSideRow.getRelRid());
|
|
generateJoinResultSet(joinerOutput, joinedBaseRow, smallMappings,
|
|
0, local_outputRG, joinedData, &rgDatav, smallSideRows, postJoinRow);
|
|
|
|
/* Bug 3510: Don't let the join results buffer get out of control. Need
|
|
to refactor this. All post-join processing needs to go here AND below
|
|
for now. */
|
|
if (rgDatav.size() * local_outputRG.getMaxDataSize() > 50000000)
|
|
{
|
|
RowGroup out(local_outputRG);
|
|
|
|
if (fe2 && !runFEonPM)
|
|
{
|
|
processFE2(out, local_fe2Output, postJoinRow,
|
|
local_fe2OutRow, &rgDatav, &local_fe2);
|
|
rgDataVecToDl(rgDatav, local_fe2Output, dlp);
|
|
}
|
|
else
|
|
rgDataVecToDl(rgDatav, out, dlp);
|
|
}
|
|
}
|
|
} // end of the for-loop in the join code
|
|
|
|
if (local_outputRG.getRowCount() > 0)
|
|
{
|
|
rgDatav.push_back(joinedData);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
rgDatav.push_back(rgData);
|
|
}
|
|
|
|
/* Execute UM F & E group 2 on rgDatav */
|
|
if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled())
|
|
{
|
|
processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
|
|
rgDataVecToDl(rgDatav, local_fe2Output, dlp);
|
|
}
|
|
|
|
cachedIO_Thread += cachedIO;
|
|
physIO_Thread += physIO;
|
|
touchedBlocks_Thread += touchedBlocks;
|
|
|
|
if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND)
|
|
{
|
|
if (fColType.colWidth <= 8)
|
|
{
|
|
cpv.push_back(_CPInfo((int64_t) min, (int64_t) max, lbid, validCPData));
|
|
}
|
|
else if (fColType.colWidth == 16)
|
|
{
|
|
cpv.push_back(_CPInfo(min, max, lbid, validCPData));
|
|
}
|
|
}
|
|
} // end of the per-rowgroup processing loop
|
|
|
|
// insert the resulting rowgroup data from a single bytestream into dlp
|
|
if (rgDatav.size() > 0)
|
|
{
|
|
if (fe2 && runFEonPM)
|
|
rgDataVecToDl(rgDatav, local_fe2Output, dlp);
|
|
else
|
|
rgDataVecToDl(rgDatav, local_outputRG, dlp);
|
|
}
|
|
} // end of the per-bytestream loop
|
|
|
|
// @bug 4562
|
|
if (traceOn() && fOid >= 3000)
|
|
dlTimes.setFirstInsertTime();
|
|
|
|
//update casual partition
|
|
size = cpv.size();
|
|
|
|
if (size > 0 && !cancelled())
|
|
{
|
|
cpMutex.lock();
|
|
|
|
for (i = 0; i < size; i++)
|
|
{
|
|
if (fColType.colWidth > 8)
|
|
{
|
|
lbidList->UpdateMinMax(cpv[i].bigMin, cpv[i].bigMax, cpv[i].LBID, fColType,
|
|
cpv[i].valid);
|
|
}
|
|
else
|
|
{
|
|
lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType,
|
|
cpv[i].valid);
|
|
}
|
|
}
|
|
|
|
cpMutex.unlock();
|
|
}
|
|
|
|
cpv.clear();
|
|
|
|
tplLock.lock();
|
|
|
|
if (fOid >= 3000)
|
|
{
|
|
uint64_t progress = msgsRecvd * 100 / totalMsgs;
|
|
bool postProgress = (progress > fProgress);
|
|
|
|
if (postProgress)
|
|
{
|
|
fProgress = progress;
|
|
|
|
sts.msg_type = StepTeleStats::ST_PROGRESS;
|
|
sts.total_units_of_work = totalMsgs;
|
|
sts.units_of_work_completed = msgsRecvd;
|
|
postStepProgressTele(sts);
|
|
}
|
|
}
|
|
|
|
} // done reading
|
|
|
|
}//try
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(),
|
|
logging::ERR_TUPLE_BPS,
|
|
logging::ERR_ALWAYS_CRITICAL,
|
|
"st: " + std::to_string(fStepId) +
|
|
" TupleBPS::receiveMultiPrimitiveMessages()");
|
|
abort_nolock();
|
|
}
|
|
|
|
out:
|
|
|
|
if (++recvExited == fNumThreads)
|
|
{
|
|
|
|
if (doJoin && smallOuterJoiner != -1 && !cancelled())
|
|
{
|
|
tplLock.unlock();
|
|
/* If this was a left outer join, this needs to put the unmatched
|
|
rows from the joiner into the output
|
|
XXXPAT: This might be a problem if later steps depend
|
|
on sensible rids and/or sensible ordering */
|
|
vector<Row::Pointer> unmatched;
|
|
#ifdef JLF_DEBUG
|
|
cout << "finishing small-outer join output\n";
|
|
#endif
|
|
i = smallOuterJoiner;
|
|
tjoiners[i]->getUnmarkedRows(&unmatched);
|
|
joinedData = RGData(local_outputRG);
|
|
local_outputRG.setData(&joinedData);
|
|
local_outputRG.resetRowGroup(-1);
|
|
local_outputRG.getRow(0, &joinedBaseRow);
|
|
|
|
for (j = 0; j < unmatched.size(); j++)
|
|
{
|
|
smallSideRows[i].setPointer(unmatched[j]);
|
|
|
|
for (k = 0; k < smallSideCount; k++)
|
|
{
|
|
if (i == k)
|
|
applyMapping(smallMappings[i], smallSideRows[i], &joinedBaseRow);
|
|
else
|
|
applyMapping(smallMappings[k], smallNulls[k], &joinedBaseRow);
|
|
}
|
|
|
|
applyMapping(largeMapping, largeNull, &joinedBaseRow);
|
|
joinedBaseRow.setRid(0);
|
|
joinedBaseRow.nextRow();
|
|
local_outputRG.incRowCount();
|
|
|
|
if (local_outputRG.getRowCount() == 8192)
|
|
{
|
|
if (fe2)
|
|
{
|
|
rgDatav.push_back(joinedData);
|
|
processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
|
|
|
|
if (rgDatav.size() > 0)
|
|
rgDataToDl(rgDatav[0], local_fe2Output, dlp);
|
|
|
|
rgDatav.clear();
|
|
}
|
|
else
|
|
rgDataToDl(joinedData, local_outputRG, dlp);
|
|
|
|
joinedData = RGData(local_outputRG);
|
|
local_outputRG.setData(&joinedData);
|
|
local_outputRG.resetRowGroup(-1);
|
|
local_outputRG.getRow(0, &joinedBaseRow);
|
|
}
|
|
}
|
|
|
|
if (local_outputRG.getRowCount() > 0)
|
|
{
|
|
if (fe2)
|
|
{
|
|
rgDatav.push_back(joinedData);
|
|
processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
|
|
|
|
if (rgDatav.size() > 0)
|
|
rgDataToDl(rgDatav[0], local_fe2Output, dlp);
|
|
|
|
rgDatav.clear();
|
|
}
|
|
else
|
|
rgDataToDl(joinedData, local_outputRG, dlp);
|
|
}
|
|
|
|
tplLock.lock();
|
|
}
|
|
|
|
if (traceOn() && fOid >= 3000)
|
|
{
|
|
//...Casual partitioning could cause us to do no processing. In that
|
|
//...case these time stamps did not get set. So we set them here.
|
|
if (dlTimes.FirstReadTime().tv_sec == 0)
|
|
{
|
|
dlTimes.setFirstReadTime();
|
|
dlTimes.setLastReadTime();
|
|
dlTimes.setFirstInsertTime();
|
|
}
|
|
|
|
dlTimes.setEndOfInputTime();
|
|
}
|
|
|
|
ByteStream bs;
|
|
|
|
try
|
|
{
|
|
if (BPPIsAllocated)
|
|
{
|
|
fDec->removeDECEventListener(this);
|
|
fBPP->destroyBPP(bs);
|
|
fDec->write(uniqueID, bs);
|
|
BPPIsAllocated = false;
|
|
}
|
|
}
|
|
// catch and do nothing. Let it continue with the clean up and profiling
|
|
catch (const std::exception& e)
|
|
{
|
|
cerr << "tuple-bps caught: " << e.what() << endl;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "tuple-bps caught unknown exception" << endl;
|
|
}
|
|
|
|
Stats stats = fDec->getNetworkStats(uniqueID);
|
|
fMsgBytesIn = stats.dataRecvd();
|
|
fMsgBytesOut = stats.dataSent();
|
|
fDec->removeQueue(uniqueID);
|
|
tjoiners.clear();
|
|
|
|
lastThread = true;
|
|
}
|
|
|
|
//@Bug 1099
|
|
ridsReturned += ridsReturned_Thread;
|
|
fPhysicalIO += physIO_Thread;
|
|
fCacheIO += cachedIO_Thread;
|
|
fBlockTouched += touchedBlocks_Thread;
|
|
tplLock.unlock();
|
|
|
|
if (fTableOid >= 3000 && lastThread)
|
|
{
|
|
struct timeval tvbuf;
|
|
gettimeofday(&tvbuf, 0);
|
|
FIFO<boost::shared_array<uint8_t> >* pFifo = 0;
|
|
uint64_t totalBlockedReadCount = 0;
|
|
uint64_t totalBlockedWriteCount = 0;
|
|
|
|
//...Sum up the blocked FIFO reads for all input associations
|
|
size_t inDlCnt = fInputJobStepAssociation.outSize();
|
|
|
|
for (size_t iDataList = 0; iDataList < inDlCnt; iDataList++)
|
|
{
|
|
pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(
|
|
fInputJobStepAssociation.outAt(iDataList)->rowGroupDL());
|
|
|
|
if (pFifo)
|
|
{
|
|
totalBlockedReadCount += pFifo->blockedReadCount();
|
|
}
|
|
}
|
|
|
|
//...Sum up the blocked FIFO writes for all output associations
|
|
size_t outDlCnt = fOutputJobStepAssociation.outSize();
|
|
|
|
for (size_t iDataList = 0; iDataList < outDlCnt; iDataList++)
|
|
{
|
|
pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(dlp);
|
|
|
|
if (pFifo)
|
|
{
|
|
totalBlockedWriteCount += pFifo->blockedWriteCount();
|
|
}
|
|
}
|
|
|
|
//...Roundoff msg byte counts to nearest KB for display
|
|
uint64_t msgBytesInKB = fMsgBytesIn >> 10;
|
|
uint64_t msgBytesOutKB = fMsgBytesOut >> 10;
|
|
|
|
if (fMsgBytesIn & 512)
|
|
msgBytesInKB++;
|
|
|
|
if (fMsgBytesOut & 512)
|
|
msgBytesOutKB++;
|
|
|
|
if (traceOn())
|
|
{
|
|
// @bug 828
|
|
ostringstream logStr;
|
|
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " <<
|
|
JSTimeStamp::format(tvbuf) << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" <<
|
|
fCacheIO << "; MsgsSent-" << msgsSent << "; MsgsRvcd-" << msgsRecvd <<
|
|
"; BlocksTouched-" << fBlockTouched <<
|
|
"; BlockedFifoIn/Out-" << totalBlockedReadCount <<
|
|
"/" << totalBlockedWriteCount <<
|
|
"; output size-" << ridsReturned << endl <<
|
|
"\tPartitionBlocksEliminated-" << fNumBlksSkipped <<
|
|
"; MsgBytesIn-" << msgBytesInKB << "KB" <<
|
|
"; MsgBytesOut-" << msgBytesOutKB << "KB" <<
|
|
"; TotalMsgs-" << totalMsgs << endl <<
|
|
"\t1st read " << dlTimes.FirstReadTimeString() <<
|
|
"; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
|
|
JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) <<
|
|
"s\n\tUUID " << uuids::to_string(fStepUuid) <<
|
|
"\n\tQuery UUID " << uuids::to_string(queryUuid()) <<
|
|
"\n\tJob completion status " << status() << endl;
|
|
logEnd(logStr.str().c_str());
|
|
|
|
syslogReadBlockCounts(16, // exemgr sybsystem
|
|
fPhysicalIO, // # blocks read from disk
|
|
fCacheIO, // # blocks read from cache
|
|
fNumBlksSkipped); // # casual partition block hits
|
|
syslogProcessingTimes(16, // exemgr subsystem
|
|
dlTimes.FirstReadTime(), // first datalist read
|
|
dlTimes.LastReadTime(), // last datalist read
|
|
dlTimes.FirstInsertTime(), // first datalist write
|
|
dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
|
|
syslogEndStep(16, // exemgr subsystem
|
|
totalBlockedReadCount, // blocked datalist input
|
|
totalBlockedWriteCount, // blocked datalist output
|
|
fMsgBytesIn, // incoming msg byte count
|
|
fMsgBytesOut); // outgoing msg byte count
|
|
fExtendedInfo += toString() + logStr.str();
|
|
formatMiniStats();
|
|
}
|
|
|
|
if (lastThread && fOid >= 3000)
|
|
{
|
|
sts.msg_type = StepTeleStats::ST_SUMMARY;
|
|
sts.phy_io = fPhysicalIO;
|
|
sts.cache_io = fCacheIO;
|
|
sts.msg_rcv_cnt = sts.total_units_of_work = sts.units_of_work_completed = msgsRecvd;
|
|
sts.cp_blocks_skipped = fNumBlksSkipped;
|
|
sts.msg_bytes_in = fMsgBytesIn;
|
|
sts.msg_bytes_out = fMsgBytesOut;
|
|
sts.rows = ridsReturned;
|
|
postStepSummaryTele(sts);
|
|
}
|
|
|
|
if (ffirstStepType == SCAN && bop == BOP_AND && !cancelled())
|
|
{
|
|
cpMutex.lock();
|
|
lbidList->UpdateAllPartitionInfo(fColType);
|
|
cpMutex.unlock();
|
|
}
|
|
}
|
|
|
|
// Bug 3136, let mini stats to be formatted if traceOn.
|
|
if (lastThread && !didEOF)
|
|
dlp->endOfInput();
|
|
}
|
|
|
|
const string TupleBPS::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "TupleBPS ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId <<
|
|
" tb/col:" << fTableOid << "/" << fOid;
|
|
|
|
if (alias().length()) oss << " alias:" << alias();
|
|
|
|
if (view().length()) oss << " view:" << view();
|
|
|
|
#if 0
|
|
|
|
// @bug 1282, don't have output datalist for delivery
|
|
if (!fDelivery)
|
|
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
|
|
|
|
#else
|
|
|
|
if (fDelivery)
|
|
oss << " is del ";
|
|
else
|
|
oss << " not del ";
|
|
|
|
if (bop == BOP_OR)
|
|
oss << " BOP_OR ";
|
|
|
|
if (fDie)
|
|
oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " ";
|
|
|
|
if (fOutputJobStepAssociation.outSize() > 0)
|
|
{
|
|
oss << fOutputJobStepAssociation.outAt(0);
|
|
|
|
if (fOutputJobStepAssociation.outSize() > 1)
|
|
oss << " (too many outputs?)";
|
|
}
|
|
else
|
|
{
|
|
oss << " (no outputs?)";
|
|
}
|
|
|
|
#endif
|
|
oss << " nf:" << fFilterCount;
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
{
|
|
oss << fInputJobStepAssociation.outAt(i);
|
|
}
|
|
|
|
oss << endl << " UUID: " << uuids::to_string(fStepUuid) << endl;
|
|
oss << " Query UUID: " << uuids::to_string(queryUuid()) << endl;
|
|
oss << " " << fBPP->toString() << endl;
|
|
return oss.str();
|
|
}
|
|
|
|
/* This exists to avoid a DBRM lookup for every rid. */
|
|
inline bool TupleBPS::scanit(uint64_t rid)
|
|
{
|
|
uint64_t fbo;
|
|
uint32_t extentIndex;
|
|
|
|
if (fOid < 3000)
|
|
return true;
|
|
|
|
fbo = rid >> rpbShift;
|
|
extentIndex = fbo >> divShift;
|
|
return scanFlags[extentIndex] && runtimeCPFlags[extentIndex];
|
|
}
|
|
|
|
uint64_t TupleBPS::getFBO(uint64_t lbid)
|
|
{
|
|
uint32_t i;
|
|
uint64_t lastLBID;
|
|
|
|
for (i = 0; i < numExtents; i++)
|
|
{
|
|
lastLBID = scannedExtents[i].range.start + (scannedExtents[i].range.size << 10) - 1;
|
|
|
|
if (lbid >= (uint64_t) scannedExtents[i].range.start && lbid <= lastLBID)
|
|
return (lbid - scannedExtents[i].range.start) + (i << divShift);
|
|
}
|
|
|
|
throw logic_error("TupleBPS: didn't find the FBO?");
|
|
}
|
|
|
|
void TupleBPS::useJoiner(boost::shared_ptr<joiner::TupleJoiner> tj)
|
|
{
|
|
vector<boost::shared_ptr<joiner::TupleJoiner> > v;
|
|
v.push_back(tj);
|
|
useJoiners(v);
|
|
}
|
|
|
|
void TupleBPS::useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner> >& joiners)
|
|
{
|
|
uint32_t i;
|
|
|
|
tjoiners = joiners;
|
|
doJoin = (joiners.size() != 0);
|
|
|
|
joinerMatchesRGs.clear();
|
|
smallSideCount = tjoiners.size();
|
|
hasPMJoin = false;
|
|
hasUMJoin = false;
|
|
|
|
for (i = 0; i < smallSideCount; i++)
|
|
{
|
|
joinerMatchesRGs.push_back(tjoiners[i]->getSmallRG());
|
|
|
|
if (tjoiners[i]->inPM())
|
|
hasPMJoin = true;
|
|
else
|
|
hasUMJoin = true;
|
|
|
|
if (tjoiners[i]->getJoinType() & SMALLOUTER)
|
|
smallOuterJoiner = i;
|
|
}
|
|
|
|
if (hasPMJoin)
|
|
fBPP->useJoiners(tjoiners);
|
|
}
|
|
|
|
void TupleBPS::newPMOnline(uint32_t connectionNumber)
|
|
{
|
|
ByteStream bs;
|
|
|
|
fBPP->createBPP(bs);
|
|
|
|
try
|
|
{
|
|
fDec->write(bs, connectionNumber);
|
|
|
|
if (hasPMJoin)
|
|
serializeJoiner(connectionNumber);
|
|
}
|
|
catch (IDBExcept& e)
|
|
{
|
|
abort();
|
|
catchHandler(e.what(), e.errorCode(), fErrorInfo, fSessionId);
|
|
}
|
|
}
|
|
|
|
void TupleBPS::setInputRowGroup(const rowgroup::RowGroup& rg)
|
|
{
|
|
inputRowGroup = rg;
|
|
fBPP->setInputRowGroup(rg);
|
|
}
|
|
|
|
void TupleBPS::setOutputRowGroup(const rowgroup::RowGroup& rg)
|
|
{
|
|
outputRowGroup = rg;
|
|
primRowGroup = rg;
|
|
fBPP->setProjectionRowGroup(rg);
|
|
checkDupOutputColumns(rg);
|
|
|
|
if (fe2)
|
|
fe2Mapping = makeMapping(outputRowGroup, fe2Output);
|
|
}
|
|
|
|
void TupleBPS::setJoinedResultRG(const rowgroup::RowGroup& rg)
|
|
{
|
|
outputRowGroup = rg;
|
|
checkDupOutputColumns(rg);
|
|
fBPP->setJoinedRowGroup(rg);
|
|
|
|
if (fe2)
|
|
fe2Mapping = makeMapping(outputRowGroup, fe2Output);
|
|
}
|
|
|
|
/* probably worthwhile to make some of these class vars */
|
|
void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput,
|
|
Row& baseRow, const vector<shared_array<int> >& mappings, const uint32_t depth,
|
|
RowGroup& outputRG, RGData& rgData, vector<RGData>* outputData, const scoped_array<Row>& smallRows,
|
|
Row& joinedRow)
|
|
{
|
|
uint32_t i;
|
|
Row& smallRow = smallRows[depth];
|
|
|
|
if (depth < smallSideCount - 1)
|
|
{
|
|
for (i = 0; i < joinerOutput[depth].size(); i++)
|
|
{
|
|
smallRow.setPointer(joinerOutput[depth][i]);
|
|
applyMapping(mappings[depth], smallRow, &baseRow);
|
|
generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
|
|
outputRG, rgData, outputData, smallRows, joinedRow);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
outputRG.getRow(outputRG.getRowCount(), &joinedRow);
|
|
|
|
for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(),
|
|
outputRG.incRowCount())
|
|
{
|
|
smallRow.setPointer(joinerOutput[depth][i]);
|
|
|
|
if (UNLIKELY(outputRG.getRowCount() == 8192))
|
|
{
|
|
uint32_t dbRoot = outputRG.getDBRoot();
|
|
uint64_t baseRid = outputRG.getBaseRid();
|
|
outputData->push_back(rgData);
|
|
rgData = RGData(outputRG);
|
|
outputRG.setData(&rgData);
|
|
outputRG.resetRowGroup(baseRid);
|
|
outputRG.setDBRoot(dbRoot);
|
|
outputRG.getRow(0, &joinedRow);
|
|
}
|
|
|
|
applyMapping(mappings[depth], smallRow, &baseRow);
|
|
copyRow(baseRow, &joinedRow);
|
|
}
|
|
}
|
|
}
|
|
|
|
const rowgroup::RowGroup& TupleBPS::getOutputRowGroup() const
|
|
{
|
|
return outputRowGroup;
|
|
}
|
|
|
|
void TupleBPS::setAggregateStep(const rowgroup::SP_ROWAGG_PM_t& agg, const rowgroup::RowGroup& rg)
|
|
{
|
|
if (rg.getColumnCount() > 0)
|
|
{
|
|
fAggRowGroupPm = rg;
|
|
fAggregatorPm = agg;
|
|
|
|
fBPP->addAggregateStep(agg, rg);
|
|
fBPP->setNeedRidsAtDelivery(false);
|
|
}
|
|
}
|
|
|
|
void TupleBPS::setBOP(uint8_t op)
|
|
{
|
|
bop = op;
|
|
fBPP->setBOP(bop);
|
|
}
|
|
|
|
void TupleBPS::setJobInfo(const JobInfo* jobInfo)
|
|
{
|
|
fBPP->jobInfo(jobInfo);
|
|
}
|
|
|
|
uint64_t TupleBPS::getEstimatedRowCount()
|
|
{
|
|
// Call function that populates the scanFlags array based on the extents that qualify based on casual partitioning.
|
|
storeCasualPartitionInfo(true);
|
|
// TODO: Strip out the cout below after a few days of testing.
|
|
#ifdef JLF_DEBUG
|
|
cout << "OID-" << fOid << " EstimatedRowCount-" << fEstimatedRows << endl;
|
|
#endif
|
|
return fEstimatedRows;
|
|
}
|
|
|
|
void TupleBPS::checkDupOutputColumns(const rowgroup::RowGroup& rg)
|
|
{
|
|
// bug 1965, find if any duplicate columns selected
|
|
map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
|
|
dupColumns.clear();
|
|
const vector<uint32_t>& keys = rg.getKeys();
|
|
|
|
for (uint32_t i = 0; i < keys.size(); i++)
|
|
{
|
|
map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
|
|
|
|
if (j == keymap.end())
|
|
keymap.insert(make_pair(keys[i], i)); // map key to col index
|
|
else
|
|
dupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
|
|
}
|
|
}
|
|
|
|
void TupleBPS::dupOutputColumns(RGData& data, RowGroup& rg)
|
|
{
|
|
rg.setData(&data);
|
|
dupOutputColumns(rg);
|
|
}
|
|
|
|
void TupleBPS::dupOutputColumns(RowGroup& rg)
|
|
{
|
|
Row workingRow;
|
|
rg.initRow(&workingRow);
|
|
rg.getRow(0, &workingRow);
|
|
|
|
for (uint64_t i = 0; i < rg.getRowCount(); i++)
|
|
{
|
|
for (uint64_t j = 0; j < dupColumns.size(); j++)
|
|
workingRow.copyField(dupColumns[j].first, dupColumns[j].second);
|
|
|
|
workingRow.nextRow();
|
|
}
|
|
}
|
|
|
|
void TupleBPS::stepId(uint16_t stepId)
|
|
{
|
|
fStepId = stepId;
|
|
fBPP->setStepID(stepId);
|
|
}
|
|
|
|
void TupleBPS::addFcnJoinExp(const vector<execplan::SRCP>& fe)
|
|
{
|
|
if (!fe1)
|
|
fe1.reset(new funcexp::FuncExpWrapper());
|
|
|
|
for (uint32_t i = 0; i < fe.size(); i++)
|
|
fe1->addReturnedColumn(fe[i]);
|
|
}
|
|
|
|
void TupleBPS::addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe)
|
|
{
|
|
if (!fe1)
|
|
fe1.reset(new funcexp::FuncExpWrapper());
|
|
|
|
fe1->addFilter(fe);
|
|
}
|
|
|
|
void TupleBPS::setFE1Input(const RowGroup& feInput)
|
|
{
|
|
fe1Input = feInput;
|
|
}
|
|
|
|
void TupleBPS::setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe,
|
|
const rowgroup::RowGroup& rg, bool runFE2onPM)
|
|
{
|
|
fe2 = fe;
|
|
fe2Output = rg;
|
|
checkDupOutputColumns(rg);
|
|
fe2Mapping = makeMapping(outputRowGroup, fe2Output);
|
|
runFEonPM = runFE2onPM;
|
|
|
|
if (runFEonPM)
|
|
fBPP->setFEGroup2(fe2, fe2Output);
|
|
}
|
|
|
|
void TupleBPS::setFcnExpGroup3(const vector<execplan::SRCP>& fe)
|
|
{
|
|
if (!fe2)
|
|
fe2.reset(new funcexp::FuncExpWrapper());
|
|
|
|
for (uint32_t i = 0; i < fe.size(); i++)
|
|
fe2->addReturnedColumn(fe[i]);
|
|
|
|
// if this is called, there's no join, so it can always run on the PM
|
|
runFEonPM = true;
|
|
fBPP->setFEGroup2(fe2, fe2Output);
|
|
}
|
|
|
|
void TupleBPS::setFE23Output(const rowgroup::RowGroup& feOutput)
|
|
{
|
|
fe2Output = feOutput;
|
|
checkDupOutputColumns(feOutput);
|
|
fe2Mapping = makeMapping(outputRowGroup, fe2Output);
|
|
|
|
if (fe2 && runFEonPM)
|
|
fBPP->setFEGroup2(fe2, fe2Output);
|
|
}
|
|
|
|
void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow,
|
|
Row& outRow, funcexp::FuncExpWrapper* local_fe)
|
|
{
|
|
bool ret;
|
|
uint32_t i;
|
|
|
|
output.resetRowGroup(input.getBaseRid());
|
|
output.setDBRoot(input.getDBRoot());
|
|
output.getRow(0, &outRow);
|
|
input.getRow(0, &inRow);
|
|
|
|
for (i = 0; i < input.getRowCount(); i++, inRow.nextRow())
|
|
{
|
|
ret = local_fe->evaluate(&inRow);
|
|
|
|
if (ret)
|
|
{
|
|
applyMapping(fe2Mapping, inRow, &outRow);
|
|
outRow.setRid(inRow.getRelRid());
|
|
output.incRowCount();
|
|
outRow.nextRow();
|
|
}
|
|
}
|
|
}
|
|
|
|
void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow,
|
|
vector<RGData>* rgData, funcexp::FuncExpWrapper* local_fe)
|
|
{
|
|
vector<RGData> results;
|
|
RGData result;
|
|
uint32_t i, j;
|
|
bool ret;
|
|
|
|
result = RGData(output);
|
|
output.setData(&result);
|
|
output.resetRowGroup(-1);
|
|
output.getRow(0, &outRow);
|
|
|
|
for (i = 0; i < rgData->size(); i++)
|
|
{
|
|
input.setData(&(*rgData)[i]);
|
|
|
|
if (output.getRowCount() == 0)
|
|
{
|
|
output.resetRowGroup(input.getBaseRid());
|
|
output.setDBRoot(input.getDBRoot());
|
|
}
|
|
|
|
input.getRow(0, &inRow);
|
|
|
|
for (j = 0; j < input.getRowCount(); j++, inRow.nextRow())
|
|
{
|
|
ret = local_fe->evaluate(&inRow);
|
|
|
|
if (ret)
|
|
{
|
|
applyMapping(fe2Mapping, inRow, &outRow);
|
|
outRow.setRid(inRow.getRelRid());
|
|
output.incRowCount();
|
|
outRow.nextRow();
|
|
|
|
if (output.getRowCount() == 8192 ||
|
|
output.getDBRoot() != input.getDBRoot() ||
|
|
output.getBaseRid() != input.getBaseRid()
|
|
)
|
|
{
|
|
results.push_back(result);
|
|
result = RGData(output);
|
|
output.setData(&result);
|
|
output.resetRowGroup(input.getBaseRid());
|
|
output.setDBRoot(input.getDBRoot());
|
|
output.getRow(0, &outRow);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (output.getRowCount() > 0)
|
|
{
|
|
results.push_back(result);
|
|
}
|
|
|
|
rgData->swap(results);
|
|
}
|
|
|
|
const rowgroup::RowGroup& TupleBPS::getDeliveredRowGroup() const
|
|
{
|
|
if (fe2)
|
|
return fe2Output;
|
|
|
|
return outputRowGroup;
|
|
}
|
|
|
|
void TupleBPS::deliverStringTableRowGroup(bool b)
|
|
{
|
|
if (fe2)
|
|
fe2Output.setUseStringTable(b);
|
|
else if (doJoin)
|
|
outputRowGroup.setUseStringTable(b);
|
|
else
|
|
{
|
|
outputRowGroup.setUseStringTable(b);
|
|
primRowGroup.setUseStringTable(b);
|
|
}
|
|
|
|
fBPP->deliverStringTableRowGroup(b);
|
|
}
|
|
|
|
bool TupleBPS::deliverStringTableRowGroup() const
|
|
{
|
|
if (fe2)
|
|
return fe2Output.usesStringTable();
|
|
|
|
return outputRowGroup.usesStringTable();
|
|
}
|
|
|
|
void TupleBPS::formatMiniStats()
|
|
{
|
|
|
|
ostringstream oss;
|
|
oss << "BPS "
|
|
<< "PM "
|
|
<< alias() << " "
|
|
<< fTableOid << " "
|
|
<< fBPP->toMiniString() << " "
|
|
<< fPhysicalIO << " "
|
|
<< fCacheIO << " "
|
|
<< fNumBlksSkipped << " "
|
|
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
|
|
<< ridsReturned << " ";
|
|
|
|
fMiniInfo += oss.str();
|
|
}
|
|
|
|
void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp)
|
|
{
|
|
// bug 1965, populate duplicate columns if any.
|
|
if (dupColumns.size() > 0)
|
|
dupOutputColumns(rgData, rg);
|
|
|
|
dlp->insert(rgData);
|
|
}
|
|
|
|
|
|
void TupleBPS::rgDataVecToDl(vector<RGData>& rgDatav, RowGroup& rg, RowGroupDL* dlp)
|
|
{
|
|
uint64_t size = rgDatav.size();
|
|
|
|
if (size > 0 && !cancelled())
|
|
{
|
|
dlMutex.lock();
|
|
|
|
for (uint64_t i = 0; i < size; i++)
|
|
{
|
|
rgDataToDl(rgDatav[i], rg, dlp);
|
|
}
|
|
|
|
dlMutex.unlock();
|
|
}
|
|
|
|
rgDatav.clear();
|
|
}
|
|
|
|
void TupleBPS::setJoinFERG(const RowGroup& rg)
|
|
{
|
|
joinFERG = rg;
|
|
fBPP->setJoinFERG(rg);
|
|
}
|
|
|
|
void TupleBPS::addCPPredicates(uint32_t OID, const vector<int128_t>& vals, bool isRange,
|
|
bool isSmallSideWideDecimal)
|
|
{
|
|
|
|
if (fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP || fOid < 3000)
|
|
return;
|
|
|
|
uint32_t i, j, k;
|
|
int64_t min, max, seq;
|
|
int128_t bigMin, bigMax;
|
|
bool isValid, intersection;
|
|
vector<SCommand> colCmdVec = fBPP->getFilterSteps();
|
|
ColumnCommandJL* cmd;
|
|
|
|
for (i = 0; i < fBPP->getProjectSteps().size(); i++)
|
|
colCmdVec.push_back(fBPP->getProjectSteps()[i]);
|
|
|
|
LBIDList ll(OID, 0);
|
|
|
|
/* Find the columncommand with that OID.
|
|
* Check that the column type is one handled by CP.
|
|
* For each extent in that OID,
|
|
* grab the min & max,
|
|
* OR together all of the intersection tests,
|
|
* AND it with the current CP flag.
|
|
*/
|
|
|
|
for (i = 0; i < colCmdVec.size(); i++)
|
|
{
|
|
cmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
|
|
|
|
if (cmd != NULL && cmd->getOID() == OID)
|
|
{
|
|
const execplan::CalpontSystemCatalog::ColType& colType = cmd->getColType();
|
|
|
|
if (!ll.CasualPartitionDataType(colType.colDataType, colType.colWidth)
|
|
|| cmd->isDict())
|
|
return;
|
|
|
|
// @bug 2989, use correct extents
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>* extentsPtr = NULL;
|
|
vector<struct BRM::EMEntry> extents; // in case the extents of OID is not in Map
|
|
|
|
// TODO: store the sorted vectors from the pcolscans/steps as a minor optimization
|
|
dbrm.getExtents(OID, extents);
|
|
sort(extents.begin(), extents.end(), ExtentSorter());
|
|
|
|
if (extentsMap.find(OID) != extentsMap.end())
|
|
{
|
|
extentsPtr = &extentsMap[OID];
|
|
}
|
|
else if (dbrm.getExtents(OID, extents) == 0)
|
|
{
|
|
extentsMap[OID] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
|
|
tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[OID];
|
|
|
|
for (uint32_t z = 0; z < extents.size(); z++)
|
|
mref[extents[z].range.start] = extents[z];
|
|
|
|
extentsPtr = &mref;
|
|
}
|
|
|
|
if (colType.colWidth <= 8)
|
|
{
|
|
for (j = 0; j < extents.size(); j++)
|
|
{
|
|
isValid = ll.GetMinMax(&min, &max, &seq, extents[j].range.start, *extentsPtr,
|
|
colType.colDataType);
|
|
|
|
if (isValid)
|
|
{
|
|
if (isRange)
|
|
{
|
|
if (!isSmallSideWideDecimal)
|
|
{
|
|
runtimeCPFlags[j] = ll.checkRangeOverlap(min, max, (int64_t) vals[0], (int64_t) vals[1],
|
|
colType) && runtimeCPFlags[j];
|
|
}
|
|
else
|
|
{
|
|
runtimeCPFlags[j] = ll.checkRangeOverlap((int128_t) min, (int128_t) max, vals[0], vals[1],
|
|
colType) && runtimeCPFlags[j];
|
|
}
|
|
}
|
|
else
|
|
{
|
|
intersection = false;
|
|
|
|
for (k = 0; k < vals.size(); k++)
|
|
{
|
|
if (!isSmallSideWideDecimal)
|
|
{
|
|
intersection = intersection ||
|
|
ll.checkSingleValue(min, max, (int64_t) vals[k], colType);
|
|
}
|
|
else
|
|
{
|
|
intersection = intersection ||
|
|
ll.checkSingleValue((int128_t) min, (int128_t) max, vals[k], colType);
|
|
}
|
|
}
|
|
|
|
runtimeCPFlags[j] = intersection && runtimeCPFlags[j];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (j = 0; j < extents.size(); j++)
|
|
{
|
|
isValid = ll.GetMinMax(&bigMin, &bigMax, &seq, extents[j].range.start, *extentsPtr,
|
|
colType.colDataType);
|
|
|
|
if (isValid)
|
|
{
|
|
if (isRange)
|
|
runtimeCPFlags[j] = ll.checkRangeOverlap(bigMin, bigMax, vals[0], vals[1],
|
|
colType) && runtimeCPFlags[j];
|
|
else
|
|
{
|
|
intersection = false;
|
|
|
|
for (k = 0; k < vals.size(); k++)
|
|
intersection = intersection ||
|
|
ll.checkSingleValue(bigMin, bigMax, vals[k], colType);
|
|
|
|
runtimeCPFlags[j] = intersection && runtimeCPFlags[j];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void TupleBPS::dec(DistributedEngineComm* dec)
|
|
{
|
|
if (fDec)
|
|
fDec->removeQueue(uniqueID);
|
|
|
|
fDec = dec;
|
|
|
|
if (fDec)
|
|
fDec->addQueue(uniqueID, true);
|
|
}
|
|
|
|
void TupleBPS::abort_nolock()
|
|
{
|
|
if (fDie)
|
|
return;
|
|
|
|
JobStep::abort();
|
|
|
|
if (fDec && BPPIsAllocated)
|
|
{
|
|
ByteStream bs;
|
|
fBPP->abortProcessing(&bs);
|
|
|
|
try
|
|
{
|
|
fDec->write(uniqueID, bs);
|
|
}
|
|
catch (...)
|
|
{
|
|
// this throws only if there are no PMs left. If there are none,
|
|
// that is the cause of the abort and that will be reported to the
|
|
// front-end already. Nothing to do here.
|
|
}
|
|
|
|
BPPIsAllocated = false;
|
|
fDec->shutdownQueue(uniqueID);
|
|
}
|
|
|
|
condvarWakeupProducer.notify_all();
|
|
condvar.notify_all();
|
|
}
|
|
|
|
void TupleBPS::abort()
|
|
{
|
|
boost::mutex::scoped_lock scoped(boost::mutex);
|
|
abort_nolock();
|
|
}
|
|
|
|
template
|
|
bool TupleBPS::processOneFilterType<int64_t>(int8_t colWidth, int64_t value, uint32_t type) const;
|
|
template
|
|
bool TupleBPS::processOneFilterType<int128_t>(int8_t colWidth, int128_t value, uint32_t type) const;
|
|
|
|
template
|
|
bool TupleBPS::processSingleFilterString<int64_t>(int8_t BOP, int8_t colWidth, int64_t val, const uint8_t* filterString,
|
|
uint32_t filterCount) const;
|
|
template
|
|
bool TupleBPS::processSingleFilterString<int128_t>(int8_t BOP, int8_t colWidth, int128_t val, const uint8_t* filterString,
|
|
uint32_t filterCount) const;
|
|
|
|
template
|
|
bool TupleBPS::compareSingleValue<int64_t>(uint8_t COP, int64_t val1, int64_t val2) const;
|
|
template
|
|
bool TupleBPS::compareSingleValue<int128_t>(uint8_t COP, int128_t val1, int128_t val2) const;
|
|
|
|
} //namespace
|
|
// vim:ts=4 sw=4:
|