mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2821 lines
84 KiB
C++
2821 lines
84 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016-2020 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
//
|
|
// $Id: batchprimitiveprocessor.cpp 2136 2013-07-24 21:04:30Z pleblanc $
|
|
// C++ Implementation: batchprimitiveprocessor
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
|
|
//
|
|
// Copyright: See COPYING file that comes with this distribution
|
|
//
|
|
//
|
|
|
|
#include <stdexcept>
|
|
#include <unistd.h>
|
|
#include <cstring>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
#include <string>
|
|
#include <sstream>
|
|
#include <set>
|
|
#include <stdlib.h>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/shared_ptr.hpp>
|
|
using namespace boost;
|
|
|
|
#include "bpp.h"
|
|
#include "primitiveserver.h"
|
|
#include "errorcodes.h"
|
|
#include "exceptclasses.h"
|
|
#include "pp_logger.h"
|
|
#include "funcexpwrapper.h"
|
|
#include "fixedallocator.h"
|
|
#include "blockcacheclient.h"
|
|
#include "MonitorProcMem.h"
|
|
#include "threadnaming.h"
|
|
#include "vlarray.h"
|
|
#include "widedecimalutils.h"
|
|
|
|
#define MAX64 0x7fffffffffffffffLL
|
|
#define MIN64 0x8000000000000000LL
|
|
|
|
using namespace messageqcpp;
|
|
using namespace joiner;
|
|
using namespace std::tr1;
|
|
using namespace rowgroup;
|
|
using namespace funcexp;
|
|
using namespace logging;
|
|
using namespace utils;
|
|
using namespace joblist;
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
#include "poormanprofiler.inc"
|
|
#endif
|
|
|
|
// these are config parms defined in primitiveserver.cpp, initialized by PrimProc main().
|
|
extern uint32_t blocksReadAhead;
|
|
extern uint32_t dictBufferSize;
|
|
extern uint32_t defaultBufferSize;
|
|
extern int fCacheCount;
|
|
extern uint32_t connectionsPerUM;
|
|
extern int noVB;
|
|
|
|
// copied from https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
|
|
uint nextPowOf2(uint x)
|
|
{
|
|
x--;
|
|
x |= x >> 1;
|
|
x |= x >> 2;
|
|
x |= x >> 4;
|
|
x |= x >> 8;
|
|
x |= x >> 16;
|
|
x++;
|
|
return x;
|
|
}
|
|
|
|
BatchPrimitiveProcessor::BatchPrimitiveProcessor()
|
|
: ot(BPS_ELEMENT_TYPE)
|
|
, txnID(0)
|
|
, sessionID(0)
|
|
, stepID(0)
|
|
, uniqueID(0)
|
|
, count(1)
|
|
, baseRid(0)
|
|
, ridCount(0)
|
|
, needStrValues(false)
|
|
, wideColumnsWidths(0)
|
|
, filterCount(0)
|
|
, projectCount(0)
|
|
, sendRidsAtDelivery(false)
|
|
, ridMap(0)
|
|
, gotAbsRids(false)
|
|
, gotValues(false)
|
|
, hasScan(false)
|
|
, validCPData(false)
|
|
, minVal(MAX64)
|
|
, maxVal(MIN64)
|
|
, cpDataFromDictScan(false)
|
|
, lbidForCP(0)
|
|
, hasWideColumnOut(false)
|
|
, busyLoaderCount(0)
|
|
, physIO(0)
|
|
, cachedIO(0)
|
|
, touchedBlocks(0)
|
|
, LBIDTrace(false)
|
|
, fBusy(false)
|
|
, doJoin(false)
|
|
, hasFilterStep(false)
|
|
, filtOnString(false)
|
|
, prefetchThreshold(0)
|
|
, mJOINHasSkewedKeyColumn(false)
|
|
, mSmallSideRGPtr(nullptr)
|
|
, mSmallSideKeyColumnsPtr(nullptr)
|
|
, hasDictStep(false)
|
|
, sockIndex(0)
|
|
, endOfJoinerRan(false)
|
|
, processorThreads(0)
|
|
, ptMask(0)
|
|
, firstInstance(false)
|
|
, valuesLBID(0)
|
|
{
|
|
pp.setLogicalBlockMode(true);
|
|
pp.setBlockPtr((int*)blockData);
|
|
pthread_mutex_init(&objLock, NULL);
|
|
}
|
|
|
|
BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
|
|
boost::shared_ptr<BPPSendThread> bppst,
|
|
uint _processorThreads)
|
|
: ot(BPS_ELEMENT_TYPE)
|
|
, txnID(0)
|
|
, sessionID(0)
|
|
, stepID(0)
|
|
, uniqueID(0)
|
|
, count(1)
|
|
, baseRid(0)
|
|
, ridCount(0)
|
|
, needStrValues(false)
|
|
, wideColumnsWidths(0)
|
|
, filterCount(0)
|
|
, projectCount(0)
|
|
, sendRidsAtDelivery(false)
|
|
, ridMap(0)
|
|
, gotAbsRids(false)
|
|
, gotValues(false)
|
|
, hasScan(false)
|
|
, validCPData(false)
|
|
, minVal(MAX64)
|
|
, maxVal(MIN64)
|
|
, cpDataFromDictScan(false)
|
|
, lbidForCP(0)
|
|
, hasWideColumnOut(false)
|
|
, busyLoaderCount(0)
|
|
, physIO(0)
|
|
, cachedIO(0)
|
|
, touchedBlocks(0)
|
|
, LBIDTrace(false)
|
|
, fBusy(false)
|
|
, doJoin(false)
|
|
, hasFilterStep(false)
|
|
, filtOnString(false)
|
|
, prefetchThreshold(prefetch)
|
|
, mJOINHasSkewedKeyColumn(false)
|
|
, mSmallSideRGPtr(nullptr)
|
|
, mSmallSideKeyColumnsPtr(nullptr)
|
|
, hasDictStep(false)
|
|
, sockIndex(0)
|
|
, endOfJoinerRan(false)
|
|
, processorThreads(_processorThreads)
|
|
// processorThreads(32),
|
|
// ptMask(processorThreads - 1),
|
|
, firstInstance(true)
|
|
, valuesLBID(0)
|
|
{
|
|
// promote processorThreads to next power of 2. also need to change the name to bucketCount or similar
|
|
processorThreads = nextPowOf2(processorThreads);
|
|
ptMask = processorThreads - 1;
|
|
|
|
pp.setLogicalBlockMode(true);
|
|
pp.setBlockPtr((int*)blockData);
|
|
sendThread = bppst;
|
|
pthread_mutex_init(&objLock, NULL);
|
|
initBPP(b);
|
|
}
|
|
|
|
#if 0
|
|
BatchPrimitiveProcessor::BatchPrimitiveProcessor(const BatchPrimitiveProcessor& bpp)
|
|
{
|
|
throw logic_error("copy BPP deprecated");
|
|
}
|
|
#endif
|
|
|
|
BatchPrimitiveProcessor::~BatchPrimitiveProcessor()
|
|
{
|
|
// FIXME: just do a sync fetch
|
|
counterLock.lock(); // need to make sure the loader has exited
|
|
|
|
while (busyLoaderCount > 0)
|
|
{
|
|
counterLock.unlock();
|
|
usleep(100000);
|
|
counterLock.lock();
|
|
}
|
|
|
|
counterLock.unlock();
|
|
pthread_mutex_destroy(&objLock);
|
|
}
|
|
|
|
/**
|
|
* InitBPP Parses the creation messages from BatchPrimitiveProcessor-JL::createBPP()
|
|
* Refer to that fcn for message format info.
|
|
*/
|
|
|
|
void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
|
{
|
|
uint32_t i;
|
|
uint8_t tmp8;
|
|
uint16_t tmp16;
|
|
Command::CommandType type;
|
|
|
|
bs.advance(sizeof(ISMPacketHeader)); // skip the header
|
|
bs >> tmp8;
|
|
ot = static_cast<BPSOutputType>(tmp8);
|
|
bs >> txnID;
|
|
bs >> sessionID;
|
|
bs >> stepID;
|
|
bs >> uniqueID;
|
|
bs >> versionInfo;
|
|
|
|
bs >> tmp16;
|
|
needStrValues = tmp16 & NEED_STR_VALUES;
|
|
gotAbsRids = tmp16 & GOT_ABS_RIDS;
|
|
gotValues = tmp16 & GOT_VALUES;
|
|
LBIDTrace = tmp16 & LBID_TRACE;
|
|
sendRidsAtDelivery = tmp16 & SEND_RIDS_AT_DELIVERY;
|
|
doJoin = tmp16 & HAS_JOINER;
|
|
hasRowGroup = tmp16 & HAS_ROWGROUP;
|
|
getTupleJoinRowGroupData = tmp16 & JOIN_ROWGROUP_DATA;
|
|
bool hasWideColumnsIn = tmp16 & HAS_WIDE_COLUMNS;
|
|
|
|
// This used to signify that there was input row data from previous jobsteps, and
|
|
// it never quite worked right. No need to fix it or update it; all BPP's have started
|
|
// with a scan for years. Took it out.
|
|
assert(!hasRowGroup);
|
|
|
|
if (hasWideColumnsIn)
|
|
bs >> wideColumnsWidths;
|
|
|
|
bs >> bop;
|
|
bs >> forHJ;
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bs >> outputRG;
|
|
// outputRG.setUseStringTable(true);
|
|
bs >> tmp8;
|
|
|
|
if (tmp8)
|
|
{
|
|
fe1.reset(new FuncExpWrapper());
|
|
bs >> *fe1;
|
|
bs >> fe1Input;
|
|
}
|
|
|
|
bs >> tmp8;
|
|
|
|
if (tmp8)
|
|
{
|
|
fe2.reset(new FuncExpWrapper());
|
|
bs >> *fe2;
|
|
bs >> fe2Output;
|
|
}
|
|
}
|
|
|
|
if (doJoin)
|
|
{
|
|
pthread_mutex_lock(&objLock);
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bs >> joinerCount;
|
|
// cout << "joinerCount = " << joinerCount << endl;
|
|
joinTypes.reset(new JoinType[joinerCount]);
|
|
|
|
tJoiners.reset(new boost::shared_array<boost::shared_ptr<TJoiner> >[joinerCount]);
|
|
for (uint j = 0; j < joinerCount; ++j)
|
|
tJoiners[j].reset(new boost::shared_ptr<TJoiner>[processorThreads]);
|
|
|
|
tlJoiners.reset(new boost::shared_array<boost::shared_ptr<TLJoiner> >[joinerCount]);
|
|
for (uint j = 0; j < joinerCount; ++j)
|
|
tlJoiners[j].reset(new boost::shared_ptr<TLJoiner>[processorThreads]);
|
|
|
|
addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[joinerCount]);
|
|
for (uint j = 0; j < joinerCount; ++j)
|
|
addToJoinerLocks[j].reset(new boost::mutex[processorThreads]);
|
|
|
|
smallSideDataLocks.reset(new boost::mutex[joinerCount]);
|
|
tJoinerSizes.reset(new std::atomic<uint32_t>[joinerCount]);
|
|
largeSideKeyColumns.reset(new uint32_t[joinerCount]);
|
|
tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
|
|
tlSmallSideKeyColumns.reset(new std::vector<uint32_t>);
|
|
typelessJoin.reset(new bool[joinerCount]);
|
|
tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]);
|
|
|
|
storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
|
|
for (uint j = 0; j < joinerCount; ++j)
|
|
storedKeyAllocators[j].setUseLock(true);
|
|
|
|
joinNullValues.reset(new uint64_t[joinerCount]);
|
|
doMatchNulls.reset(new bool[joinerCount]);
|
|
joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
|
|
hasJoinFEFilters = false;
|
|
hasSmallOuterJoin = false;
|
|
bool smallSideRGRecvd = false;
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
{
|
|
doMatchNulls[i] = false;
|
|
uint32_t tmp32;
|
|
bs >> tmp32;
|
|
tJoinerSizes[i] = tmp32;
|
|
// bs >> tJoinerSizes[i];
|
|
// cout << "joiner size = " << tJoinerSizes[i] << endl;
|
|
bs >> joinTypes[i];
|
|
bs >> tmp8;
|
|
typelessJoin[i] = (bool)tmp8;
|
|
|
|
if (joinTypes[i] & WITHFCNEXP)
|
|
{
|
|
hasJoinFEFilters = true;
|
|
joinFEFilters[i].reset(new FuncExpWrapper());
|
|
bs >> *joinFEFilters[i];
|
|
}
|
|
|
|
if (joinTypes[i] & SMALLOUTER)
|
|
hasSmallOuterJoin = true;
|
|
|
|
if (!typelessJoin[i])
|
|
{
|
|
bs >> joinNullValues[i];
|
|
bs >> largeSideKeyColumns[i];
|
|
// cout << "large side key is " << largeSideKeyColumns[i] << endl;
|
|
for (uint j = 0; j < processorThreads; ++j)
|
|
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher()));
|
|
}
|
|
else
|
|
{
|
|
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
|
|
bs >> tlSmallSideKeyLengths[i];
|
|
bs >> tmp8;
|
|
mJOINHasSkewedKeyColumn = (bool)tmp8;
|
|
// Deser smallSideRG if key data types are different, e.g. INT vs wide-DECIMAL.
|
|
if (mJOINHasSkewedKeyColumn && !smallSideRGRecvd)
|
|
{
|
|
smallSideRGs.emplace_back(rowgroup::RowGroup(bs));
|
|
// LargeSide key columns number equals to SmallSide key columns number.
|
|
deserializeVector<uint32_t>(bs, *tlSmallSideKeyColumns);
|
|
mSmallSideRGPtr = &smallSideRGs[0];
|
|
mSmallSideKeyColumnsPtr = &(*tlSmallSideKeyColumns);
|
|
smallSideRGRecvd = true;
|
|
}
|
|
|
|
for (uint j = 0; j < processorThreads; ++j)
|
|
{
|
|
auto tlHasher = TupleJoiner::TypelessDataHasher(&outputRG, &tlLargeSideKeyColumns[i],
|
|
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
|
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i],
|
|
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
|
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (hasJoinFEFilters)
|
|
{
|
|
joinFERG.reset(new RowGroup());
|
|
bs >> *joinFERG;
|
|
}
|
|
|
|
if (getTupleJoinRowGroupData)
|
|
{
|
|
deserializeVector(bs, smallSideRGs);
|
|
// cout << "deserialized " << smallSideRGs.size() << " small-side
|
|
// rowgroups\n";
|
|
idbassert(smallSideRGs.size() == joinerCount);
|
|
smallSideRowLengths.reset(new uint32_t[joinerCount]);
|
|
smallSideRowData.reset(new RGData[joinerCount]);
|
|
smallNullRowData.reset(new RGData[joinerCount]);
|
|
smallNullPointers.reset(new Row::Pointer[joinerCount]);
|
|
ssrdPos.reset(new uint64_t[joinerCount]);
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
{
|
|
smallSideRowLengths[i] = smallSideRGs[i].getRowSize();
|
|
;
|
|
smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]);
|
|
// smallSideRowData[i].reset(new uint8_t[
|
|
// smallSideRGs[i].getEmptySize() +
|
|
// (uint64_t) smallSideRowLengths[i] * tJoinerSizes[i]]);
|
|
smallSideRGs[i].setData(&smallSideRowData[i]);
|
|
smallSideRGs[i].resetRowGroup(0);
|
|
ssrdPos[i] = smallSideRGs[i].getEmptySize();
|
|
|
|
if (joinTypes[i] & (LARGEOUTER | SEMI | ANTI))
|
|
{
|
|
Row smallRow;
|
|
smallSideRGs[i].initRow(&smallRow);
|
|
smallNullRowData[i] = RGData(smallSideRGs[i], 1);
|
|
smallSideRGs[i].setData(&smallNullRowData[i]);
|
|
smallSideRGs[i].getRow(0, &smallRow);
|
|
smallRow.initToNull();
|
|
smallNullPointers[i] = smallRow.getPointer();
|
|
smallSideRGs[i].setData(&smallSideRowData[i]);
|
|
}
|
|
}
|
|
|
|
bs >> largeSideRG;
|
|
bs >> joinedRG;
|
|
// cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n";
|
|
}
|
|
}
|
|
|
|
#ifdef __FreeBSD__
|
|
pthread_mutex_unlock(&objLock);
|
|
#endif
|
|
}
|
|
|
|
bs >> filterCount;
|
|
filterSteps.resize(filterCount);
|
|
// cout << "deserializing " << filterCount << " filters\n";
|
|
hasScan = false;
|
|
hasPassThru = false;
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
{
|
|
// cout << "deserializing step " << i << endl;
|
|
filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps));
|
|
|
|
if (type == Command::COLUMN_COMMAND)
|
|
{
|
|
ColumnCommand* col = (ColumnCommand*)filterSteps[i].get();
|
|
|
|
if (col->isScan())
|
|
hasScan = true;
|
|
|
|
if (bop == BOP_OR)
|
|
col->setScan(true);
|
|
}
|
|
else if (type == Command::FILTER_COMMAND)
|
|
{
|
|
hasFilterStep = true;
|
|
|
|
if (dynamic_cast<StrFilterCmd*>(filterSteps[i].get()) != NULL)
|
|
filtOnString = true;
|
|
}
|
|
else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
|
|
hasDictStep = true;
|
|
}
|
|
|
|
bs >> projectCount;
|
|
// cout << "deserializing " << projectCount << " projected columns\n\n";
|
|
projectSteps.resize(projectCount);
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
{
|
|
// cout << "deserializing step " << i << endl;
|
|
projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps));
|
|
|
|
if (type == Command::PASS_THRU)
|
|
hasPassThru = true;
|
|
else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
|
|
hasDictStep = true;
|
|
}
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bs >> tmp8;
|
|
|
|
if (tmp8 > 0)
|
|
{
|
|
bs >> fAggregateRG;
|
|
fAggregator.reset(new RowAggregation);
|
|
bs >> *(fAggregator.get());
|
|
|
|
// If there's UDAF involved, set up for PM processing
|
|
for (uint64_t i = 0; i < fAggregator->getAggFunctions().size(); i++)
|
|
{
|
|
RowUDAFFunctionCol* rowUDAF =
|
|
dynamic_cast<RowUDAFFunctionCol*>(fAggregator->getAggFunctions()[i].get());
|
|
|
|
if (rowUDAF)
|
|
{
|
|
// On the PM, the aux column is not sent, but rather is output col + 1.
|
|
rowUDAF->fAuxColumnIndex = rowUDAF->fOutputColumnIndex + 1;
|
|
// Set the PM flag in case the UDAF cares.
|
|
rowUDAF->fUDAFContext.setContextFlags(rowUDAF->fUDAFContext.getContextFlags() |
|
|
mcsv1sdk::CONTEXT_IS_PM);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
initProcessor();
|
|
}
|
|
|
|
/**
|
|
* resetBPP Parses the run messages from BatchPrimitiveProcessor-JL::runBPP()
|
|
* Refer to that fcn for message format info.
|
|
*/
|
|
void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, const SP_UM_IOSOCK& s)
|
|
{
|
|
uint32_t i;
|
|
vector<uint64_t> preloads;
|
|
|
|
pthread_mutex_lock(&objLock);
|
|
|
|
writelock = w;
|
|
sock = s;
|
|
newConnection = true;
|
|
|
|
// skip the header, sessionID, stepID, uniqueID, and priority
|
|
bs.advance(sizeof(ISMPacketHeader) + 16);
|
|
bs >> dbRoot;
|
|
bs >> count;
|
|
bs >> ridCount;
|
|
|
|
if (gotAbsRids)
|
|
{
|
|
assert(0);
|
|
memcpy(absRids.get(), bs.buf(), ridCount << 3);
|
|
bs.advance(ridCount << 3);
|
|
/* TODO: this loop isn't always necessary or sensible */
|
|
ridMap = 0;
|
|
baseRid = absRids[0] & 0xffffffffffffe000ULL;
|
|
|
|
for (uint32_t i = 0; i < ridCount; i++)
|
|
{
|
|
relRids[i] = absRids[i] - baseRid;
|
|
ridMap |= 1 << (relRids[i] >> 9);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
bs >> ridMap;
|
|
bs >> baseRid;
|
|
memcpy(relRids, bs.buf(), ridCount << 1);
|
|
bs.advance(ridCount << 1);
|
|
}
|
|
|
|
if (gotValues)
|
|
{
|
|
memcpy(values, bs.buf(), ridCount << 3);
|
|
bs.advance(ridCount << 3);
|
|
}
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
{
|
|
filterSteps[i]->resetCommand(bs);
|
|
}
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
{
|
|
projectSteps[i]->resetCommand(bs);
|
|
}
|
|
|
|
idbassert(bs.length() == 0);
|
|
|
|
/* init vars not part of the BS */
|
|
currentBlockOffset = 0;
|
|
memset(relLBID.get(), 0, sizeof(uint64_t) * (projectCount + 1));
|
|
memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 1));
|
|
|
|
buildVSSCache(count);
|
|
#ifdef __FreeBSD__
|
|
pthread_mutex_unlock(&objLock);
|
|
#endif
|
|
}
|
|
|
|
// This version of addToJoiner() is multithreaded. Values are first
|
|
// hashed into thread-local vectors corresponding to the shared hash
|
|
// tables. Once the incoming values are organized locally, it grabs
|
|
// the lock for each shared table and inserts them there.
|
|
void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
|
|
{
|
|
/* to get wall-time of hash table construction
|
|
idbassert(processorThreads != 0);
|
|
if (firstCallTime.is_not_a_date_time())
|
|
firstCallTime = boost::posix_time::microsec_clock::universal_time();
|
|
*/
|
|
|
|
uint32_t count, i, joinerNum, tlIndex, startPos, bucket;
|
|
#pragma pack(push, 1)
|
|
struct JoinerElements
|
|
{
|
|
uint64_t key;
|
|
uint32_t value;
|
|
} * arr;
|
|
#pragma pack(pop)
|
|
|
|
/* skip the header */
|
|
bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
|
|
|
|
bs >> count;
|
|
bs >> startPos;
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bs >> joinerNum;
|
|
idbassert(joinerNum < joinerCount);
|
|
arr = (JoinerElements*)bs.buf();
|
|
|
|
std::atomic<uint32_t>& tJoinerSize = tJoinerSizes[joinerNum];
|
|
|
|
// XXXPAT: enormous if stmts are evil. TODO: move each block into
|
|
// properly-named functions for clarity.
|
|
if (typelessJoin[joinerNum])
|
|
{
|
|
utils::VLArray<vector<pair<TypelessData, uint32_t> > > tmpBuckets(processorThreads);
|
|
uint8_t nullFlag;
|
|
PoolAllocator& storedKeyAllocator = storedKeyAllocators[joinerNum];
|
|
// this first loop hashes incoming values into vectors that parallel the hash tables.
|
|
uint nullCount = 0;
|
|
for (i = 0; i < count; ++i)
|
|
{
|
|
bs >> nullFlag;
|
|
if (nullFlag == 0)
|
|
{
|
|
TypelessData tlSmallSideKey(bs, storedKeyAllocator);
|
|
if (mJOINHasSkewedKeyColumn)
|
|
tlSmallSideKey.setSmallSideWithSkewedData();
|
|
else
|
|
tlSmallSideKey.setSmallSide();
|
|
bs >> tlIndex;
|
|
// The bucket number corresponds with the index used later inserting TL keys into permanent JOIN
|
|
// hash map.
|
|
auto ha = tlSmallSideKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum], mSmallSideKeyColumnsPtr,
|
|
mSmallSideRGPtr);
|
|
|
|
bucket = ha & ptMask;
|
|
tmpBuckets[bucket].push_back(make_pair(tlSmallSideKey, tlIndex));
|
|
}
|
|
else
|
|
++nullCount;
|
|
}
|
|
tJoinerSize -= nullCount;
|
|
|
|
bool done = false, didSomeWork;
|
|
// uint loopCounter = 0, noWorkCounter = 0;
|
|
// this loop moves the elements from each vector into its corresponding hash table.
|
|
while (!done)
|
|
{
|
|
//++loopCounter;
|
|
done = true;
|
|
didSomeWork = false;
|
|
for (i = 0; i < processorThreads; ++i)
|
|
{
|
|
if (!tmpBuckets[i].empty())
|
|
{
|
|
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
|
|
if (!gotIt)
|
|
{
|
|
done = false; // didn't get it, don't block, try the next bucket
|
|
continue;
|
|
}
|
|
for (auto& element : tmpBuckets[i])
|
|
tlJoiners[joinerNum][i]->insert(element);
|
|
addToJoinerLocks[joinerNum][i].unlock();
|
|
tmpBuckets[i].clear();
|
|
didSomeWork = true;
|
|
}
|
|
}
|
|
// if this iteration did no useful work, everything we need is locked; wait briefly
|
|
// and try again.
|
|
if (!done && !didSomeWork)
|
|
{
|
|
::usleep(500 * processorThreads);
|
|
//++noWorkCounter;
|
|
}
|
|
}
|
|
// cout << "TL join insert. Took " << loopCounter << " loops" << endl;
|
|
}
|
|
else
|
|
{
|
|
boost::shared_array<boost::shared_ptr<TJoiner> > tJoiner = tJoiners[joinerNum];
|
|
uint64_t nullValue = joinNullValues[joinerNum];
|
|
bool& l_doMatchNulls = doMatchNulls[joinerNum];
|
|
joblist::JoinType joinType = joinTypes[joinerNum];
|
|
utils::VLArray<vector<pair<uint64_t, uint32_t> > > tmpBuckets(processorThreads);
|
|
|
|
if (joinType & MATCHNULLS)
|
|
{
|
|
// this first loop hashes incoming values into vectors that parallel the hash tables.
|
|
for (i = 0; i < count; ++i)
|
|
{
|
|
/* A minor optimization: the matchnull logic should only be used with
|
|
* the jointype specifies it and there's a null value in the small side */
|
|
if (!l_doMatchNulls && arr[i].key == nullValue)
|
|
l_doMatchNulls = true;
|
|
bucket = bucketPicker((char*)&arr[i].key, 8, bpSeed) & ptMask;
|
|
tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
|
|
}
|
|
|
|
bool done = false, didSomeWork;
|
|
// uint loopCounter = 0, noWorkCounter = 0;
|
|
// this loop moves the elements from each vector into its corresponding hash table.
|
|
while (!done)
|
|
{
|
|
//++loopCounter;
|
|
done = true;
|
|
didSomeWork = false;
|
|
for (i = 0; i < processorThreads; ++i)
|
|
{
|
|
if (!tmpBuckets[i].empty())
|
|
{
|
|
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
|
|
if (!gotIt)
|
|
{
|
|
done = false; // didn't get it, don't block, try the next bucket
|
|
continue;
|
|
}
|
|
for (auto& element : tmpBuckets[i])
|
|
tJoiners[joinerNum][i]->insert(element);
|
|
addToJoinerLocks[joinerNum][i].unlock();
|
|
tmpBuckets[i].clear();
|
|
didSomeWork = true;
|
|
}
|
|
}
|
|
// if this iteration did no useful work, everything we need is locked; wait briefly
|
|
// and try again.
|
|
if (!done && !didSomeWork)
|
|
{
|
|
::usleep(500 * processorThreads);
|
|
//++noWorkCounter;
|
|
}
|
|
}
|
|
|
|
// cout << "T numeric join insert. Took " << loopCounter << " loops" << endl;
|
|
}
|
|
else
|
|
{
|
|
// this first loop hashes incoming values into vectors that parallel the hash tables.
|
|
for (i = 0; i < count; ++i)
|
|
{
|
|
bucket = bucketPicker((char*)&arr[i].key, 8, bpSeed) & ptMask;
|
|
tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
|
|
}
|
|
|
|
bool done = false;
|
|
bool didSomeWork;
|
|
// uint loopCounter = 0, noWorkCounter = 0;
|
|
// this loop moves the elements from each vector into its corresponding hash table.
|
|
while (!done)
|
|
{
|
|
//++loopCounter;
|
|
done = true;
|
|
didSomeWork = false;
|
|
for (i = 0; i < processorThreads; ++i)
|
|
{
|
|
if (!tmpBuckets[i].empty())
|
|
{
|
|
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
|
|
if (!gotIt)
|
|
{
|
|
done = false; // didn't get it, don't block, try the next bucket
|
|
continue;
|
|
}
|
|
for (auto& element : tmpBuckets[i])
|
|
tJoiners[joinerNum][i]->insert(element);
|
|
addToJoinerLocks[joinerNum][i].unlock();
|
|
tmpBuckets[i].clear();
|
|
didSomeWork = true;
|
|
}
|
|
}
|
|
// if this iteration did no useful work, everything we need is locked; wait briefly
|
|
// and try again.
|
|
if (!done && !didSomeWork)
|
|
{
|
|
::usleep(500 * processorThreads);
|
|
//++noWorkCounter;
|
|
}
|
|
}
|
|
// cout << "T numeric join insert 2. Took " << loopCounter << " loops," <<
|
|
// " unproductive iterations = " << noWorkCounter << endl;
|
|
}
|
|
}
|
|
|
|
if (!typelessJoin[joinerNum])
|
|
bs.advance(count * sizeof(JoinerElements));
|
|
|
|
if (getTupleJoinRowGroupData)
|
|
{
|
|
RowGroup& smallSide = smallSideRGs[joinerNum];
|
|
RGData offTheWire;
|
|
|
|
// TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid
|
|
// the extra copying.
|
|
offTheWire.deserialize(bs);
|
|
boost::mutex::scoped_lock lk(smallSideDataLocks[joinerNum]);
|
|
smallSide.setData(&smallSideRowData[joinerNum]);
|
|
smallSide.append(offTheWire, startPos);
|
|
|
|
/* This prints the row data
|
|
smallSideRGs[joinerNum].initRow(&r);
|
|
for (i = 0; i < (tJoinerSizes[joinerNum] * smallSideRowLengths[joinerNum]);
|
|
i+=r.getSize()) { r.setData(&smallSideRowData[joinerNum][i +
|
|
smallSideRGs[joinerNum].getEmptySize()]); cout << " got row: " << r.toString() << endl;
|
|
}
|
|
*/
|
|
}
|
|
}
|
|
|
|
idbassert(bs.length() == 0);
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::doneSendingJoinerData()
|
|
{
|
|
/* to get wall-time of hash table construction
|
|
if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
|
|
{
|
|
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
|
|
Logger logger;
|
|
ostringstream os;
|
|
os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
|
|
logger.logMessage(os.str());
|
|
cout << os.str() << endl;
|
|
}
|
|
*/
|
|
}
|
|
|
|
int BatchPrimitiveProcessor::endOfJoiner()
|
|
{
|
|
/* Wait for all joiner elements to be added */
|
|
uint32_t i;
|
|
size_t currentSize;
|
|
// it should be safe to run this without grabbing this lock
|
|
// boost::mutex::scoped_lock scoped(addToJoinerLock);
|
|
|
|
if (endOfJoinerRan)
|
|
return 0;
|
|
|
|
// minor hack / optimization. The instances not inserting the table data don't
|
|
// need to check that the table is complete.
|
|
if (!firstInstance)
|
|
{
|
|
endOfJoinerRan = true;
|
|
pthread_mutex_unlock(&objLock);
|
|
return 0;
|
|
}
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
{
|
|
if (!typelessJoin[i])
|
|
{
|
|
currentSize = 0;
|
|
for (uint j = 0; j < processorThreads; ++j)
|
|
if (!tJoiners[i] || !tJoiners[i][j])
|
|
return -1;
|
|
else
|
|
currentSize += tJoiners[i][j]->size();
|
|
if (currentSize != tJoinerSizes[i])
|
|
return -1;
|
|
// if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
|
|
// return -1;
|
|
}
|
|
else
|
|
{
|
|
currentSize = 0;
|
|
for (uint j = 0; j < processorThreads; ++j)
|
|
if (!tlJoiners[i] || !tlJoiners[i][j])
|
|
return -1;
|
|
else
|
|
currentSize += tlJoiners[i][j]->size();
|
|
if (currentSize != tJoinerSizes[i])
|
|
return -1;
|
|
// if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
|
|
// return -1;
|
|
}
|
|
}
|
|
|
|
endOfJoinerRan = true;
|
|
|
|
#ifndef __FreeBSD__
|
|
pthread_mutex_unlock(&objLock);
|
|
#endif
|
|
return 0;
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::initProcessor()
|
|
{
|
|
uint32_t i, j;
|
|
|
|
if (gotAbsRids || needStrValues || hasRowGroup)
|
|
absRids.reset(new uint64_t[LOGICAL_BLOCK_RIDS]);
|
|
|
|
if (needStrValues)
|
|
strValues.reset(new string[LOGICAL_BLOCK_RIDS]);
|
|
|
|
outMsgSize = defaultBufferSize;
|
|
outputMsg.reset(reinterpret_cast<uint8_t*>(aligned_alloc(utils::MAXCOLUMNWIDTH, outMsgSize)));
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
// calculate the projection -> outputRG mapping
|
|
projectionMap.reset(new int[projectCount]);
|
|
bool* reserved = (bool*)alloca(outputRG.getColumnCount() * sizeof(bool));
|
|
|
|
for (i = 0; i < outputRG.getColumnCount(); i++)
|
|
reserved[i] = false;
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
for (j = 0; j < outputRG.getColumnCount(); j++)
|
|
if (projectSteps[i]->getTupleKey() == outputRG.getKeys()[j] && !reserved[j])
|
|
{
|
|
projectionMap[i] = j;
|
|
reserved[j] = true;
|
|
break;
|
|
}
|
|
|
|
if (j == outputRG.getColumnCount())
|
|
projectionMap[i] = -1;
|
|
}
|
|
|
|
if (doJoin)
|
|
{
|
|
outputRG.initRow(&oldRow);
|
|
outputRG.initRow(&newRow);
|
|
|
|
tSmallSideMatches.reset(new MatchedData[joinerCount]);
|
|
keyColumnProj.reset(new bool[projectCount]);
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
keyColumnProj[i] = false;
|
|
|
|
for (j = 0; j < joinerCount; j++)
|
|
{
|
|
if (!typelessJoin[j])
|
|
{
|
|
if (projectionMap[i] == (int)largeSideKeyColumns[j])
|
|
{
|
|
keyColumnProj[i] = true;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (uint32_t k = 0; k < tlLargeSideKeyColumns[j].size(); k++)
|
|
{
|
|
if (projectionMap[i] == (int)tlLargeSideKeyColumns[j][k])
|
|
{
|
|
keyColumnProj[i] = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (hasJoinFEFilters)
|
|
{
|
|
joinFERG->initRow(&joinFERow, true);
|
|
joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
|
|
joinFERow.setData(joinFERowData.get());
|
|
joinFEMappings.reset(new shared_array<int>[joinerCount + 1]);
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
joinFEMappings[i] = makeMapping(smallSideRGs[i], *joinFERG);
|
|
|
|
joinFEMappings[joinerCount] = makeMapping(largeSideRG, *joinFERG);
|
|
}
|
|
}
|
|
|
|
/*
|
|
Calculate the FE1 -> projection mapping
|
|
Calculate the projection step -> FE1 input mapping
|
|
*/
|
|
if (fe1)
|
|
{
|
|
fe1ToProjection = makeMapping(fe1Input, outputRG);
|
|
projectForFE1.reset(new int[projectCount]);
|
|
bool* reserved = (bool*)alloca(fe1Input.getColumnCount() * sizeof(bool));
|
|
|
|
for (i = 0; i < fe1Input.getColumnCount(); i++)
|
|
reserved[i] = false;
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
{
|
|
projectForFE1[i] = -1;
|
|
|
|
for (j = 0; j < fe1Input.getColumnCount(); j++)
|
|
{
|
|
if (projectSteps[i]->getTupleKey() == fe1Input.getKeys()[j] && !reserved[j])
|
|
{
|
|
reserved[j] = true;
|
|
projectForFE1[i] = j;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
fe1Input.initRow(&fe1In);
|
|
outputRG.initRow(&fe1Out);
|
|
}
|
|
|
|
if (fe2)
|
|
{
|
|
fe2Input = (doJoin ? &joinedRG : &outputRG);
|
|
fe2Mapping = makeMapping(*fe2Input, fe2Output);
|
|
fe2Input->initRow(&fe2In);
|
|
fe2Output.initRow(&fe2Out);
|
|
}
|
|
|
|
if (getTupleJoinRowGroupData)
|
|
{
|
|
gjrgPlaceHolders.reset(new uint32_t[joinerCount]);
|
|
outputRG.initRow(&largeRow);
|
|
joinedRG.initRow(&joinedRow);
|
|
joinedRG.initRow(&baseJRow, true);
|
|
smallRows.reset(new Row[joinerCount]);
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
smallSideRGs[i].initRow(&smallRows[i], true);
|
|
|
|
baseJRowMem.reset(new uint8_t[baseJRow.getSize()]);
|
|
baseJRow.setData(baseJRowMem.get());
|
|
gjrgMappings.reset(new shared_array<int>[joinerCount + 1]);
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
gjrgMappings[i] = makeMapping(smallSideRGs[i], joinedRG);
|
|
|
|
gjrgMappings[joinerCount] = makeMapping(outputRG, joinedRG);
|
|
}
|
|
}
|
|
|
|
// @bug 1051
|
|
if (hasFilterStep)
|
|
{
|
|
for (uint64_t i = 0; i < 2; i++)
|
|
{
|
|
fFiltRidCount[i] = 0;
|
|
fFiltCmdRids[i].reset(new uint16_t[LOGICAL_BLOCK_RIDS]);
|
|
fFiltCmdValues[i].reset(new int64_t[LOGICAL_BLOCK_RIDS]);
|
|
if (wideColumnsWidths | datatypes::MAXDECIMALWIDTH)
|
|
fFiltCmdBinaryValues[i].reset(new int128_t[LOGICAL_BLOCK_RIDS]);
|
|
|
|
if (filtOnString)
|
|
fFiltStrValues[i].reset(new string[LOGICAL_BLOCK_RIDS]);
|
|
}
|
|
}
|
|
|
|
/* init the Commands */
|
|
if (filterCount > 0)
|
|
{
|
|
for (i = 0; i < (uint32_t)filterCount - 1; ++i)
|
|
{
|
|
// cout << "prepping filter " << i << endl;
|
|
filterSteps[i]->setBatchPrimitiveProcessor(this);
|
|
|
|
if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
|
|
filterSteps[i]->prep(OT_BOTH, true);
|
|
else if (filterSteps[i]->filterFeeder() != Command::NOT_FEEDER)
|
|
filterSteps[i]->prep(OT_BOTH, false);
|
|
else
|
|
filterSteps[i]->prep(OT_RID, false);
|
|
}
|
|
|
|
// cout << "prepping filter " << i << endl;
|
|
filterSteps[i]->setBatchPrimitiveProcessor(this);
|
|
filterSteps[i]->prep(OT_BOTH, false);
|
|
}
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
{
|
|
// cout << "prepping projection " << i << endl;
|
|
projectSteps[i]->setBatchPrimitiveProcessor(this);
|
|
|
|
if (noVB)
|
|
projectSteps[i]->prep(OT_BOTH, false);
|
|
else
|
|
projectSteps[i]->prep(OT_DATAVALUE, false);
|
|
|
|
if (0 < filterCount)
|
|
{
|
|
// if there is an rtscommand with a passThru, the passThru must make its own absRids
|
|
// unless there is only one project step, then the last filter step can make absRids
|
|
RTSCommand* rts = dynamic_cast<RTSCommand*>(projectSteps[i].get());
|
|
|
|
if (rts && rts->isPassThru())
|
|
{
|
|
if (1 == projectCount)
|
|
filterSteps[filterCount - 1]->setMakeAbsRids(true);
|
|
else
|
|
rts->setAbsNull();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (fAggregator.get() != NULL)
|
|
{
|
|
// fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
|
|
fAggRowGroupData.reinit(fAggregateRG);
|
|
fAggregateRG.setData(&fAggRowGroupData);
|
|
|
|
if (doJoin)
|
|
{
|
|
fAggregator->setInputOutput(fe2 ? fe2Output : joinedRG, &fAggregateRG);
|
|
fAggregator->setJoinRowGroups(&smallSideRGs, &largeSideRG);
|
|
}
|
|
else
|
|
fAggregator->setInputOutput(fe2 ? fe2Output : outputRG, &fAggregateRG);
|
|
}
|
|
|
|
if (LIKELY(!hasWideColumnOut))
|
|
{
|
|
minVal = MAX64;
|
|
maxVal = MIN64;
|
|
}
|
|
else
|
|
{
|
|
max128Val = datatypes::Decimal::minInt128;
|
|
min128Val = datatypes::Decimal::maxInt128;
|
|
}
|
|
|
|
// @bug 1269, initialize data used by execute() for async loading blocks
|
|
// +1 for the scan filter step with no predicate, if any
|
|
relLBID.reset(new uint64_t[projectCount + 1]);
|
|
asyncLoaded.reset(new bool[projectCount + 1]);
|
|
}
|
|
|
|
/* This version does a join on projected rows */
|
|
// In order to prevent super size result sets in the case of near cartesian joins on three or more joins,
|
|
// the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of
|
|
// the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0-
|
|
uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
|
|
{
|
|
uint32_t newRowCount = 0, i, j;
|
|
vector<uint32_t> matches;
|
|
uint64_t largeKey;
|
|
uint64_t resultCount = 0;
|
|
uint32_t newStartRid = startRid;
|
|
outputRG.getRow(0, &oldRow);
|
|
outputRG.getRow(0, &newRow);
|
|
|
|
// cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl;
|
|
// ridCount gets modified based on the number of Rids actually processed during this call.
|
|
// origRidCount is the number of rids for this thread after filter, which are the total
|
|
// number of rids to be processed from all calls to this function during this thread.
|
|
for (i = startRid; i < origRidCount && !sendThread->aborted(); i++, oldRow.nextRow())
|
|
{
|
|
/* Decide whether this large-side row belongs in the output. The breaks
|
|
* in the loop mean that it doesn't.
|
|
*
|
|
* In English the logic is:
|
|
* Reject the row if there's no match and it's not an anti or an outer join
|
|
* or if there is a match and it's an anti join with no filter.
|
|
* If there's an antijoin with a filter nothing can be eliminated at this stage.
|
|
* If there's an antijoin where the join op should match NULL values, and there
|
|
* are NULL values to match against, but there is no filter, all rows can be eliminated.
|
|
*/
|
|
|
|
// cout << "large side row: " << oldRow.toString() << endl;
|
|
for (j = 0; j < joinerCount; j++)
|
|
{
|
|
bool found;
|
|
|
|
if (UNLIKELY(joinTypes[j] & ANTI))
|
|
{
|
|
if (joinTypes[j] & WITHFCNEXP)
|
|
continue;
|
|
else if (doMatchNulls[j])
|
|
break;
|
|
}
|
|
|
|
if (LIKELY(!typelessJoin[j]))
|
|
{
|
|
// cout << "not typeless join\n";
|
|
bool isNull;
|
|
uint32_t colIndex = largeSideKeyColumns[j];
|
|
|
|
if (oldRow.isUnsigned(colIndex))
|
|
largeKey = oldRow.getUintField(colIndex);
|
|
else
|
|
largeKey = oldRow.getIntField(colIndex);
|
|
uint bucket = bucketPicker((char*)&largeKey, 8, bpSeed) & ptMask;
|
|
|
|
bool joinerIsEmpty = tJoiners[j][bucket]->empty() ? true : false;
|
|
|
|
found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end());
|
|
isNull = oldRow.isNullValue(colIndex);
|
|
/* These conditions define when the row is NOT in the result set:
|
|
* - if the key is not in the small side, and the join isn't a large-outer or anti join
|
|
* - if the key is NULL, and the join isn't anti- or large-outer
|
|
* - if it's an anti-join and the key is either in the small side or it's NULL
|
|
*/
|
|
|
|
if (((!found || isNull) && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
|
|
((joinTypes[j] & ANTI) && !joinerIsEmpty &&
|
|
((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull))))
|
|
{
|
|
// cout << " - not in the result set\n";
|
|
break;
|
|
}
|
|
|
|
// else
|
|
// cout << " - in the result set\n";
|
|
}
|
|
else
|
|
{
|
|
// cout << " typeless join\n";
|
|
// the null values are not sent by UM in typeless case. null -> !found
|
|
TypelessData tlLargeKey(&oldRow);
|
|
uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j], mSmallSideKeyColumnsPtr,
|
|
mSmallSideRGPtr ? &mSmallSideRGPtr->getColWidths() : nullptr) &
|
|
ptMask;
|
|
found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
|
|
|
|
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || (joinTypes[j] & ANTI))
|
|
{
|
|
/* Separated the ANTI join logic for readability.
|
|
*
|
|
*/
|
|
if (joinTypes[j] & ANTI)
|
|
{
|
|
if (found)
|
|
break;
|
|
else if (joinTypes[j] & MATCHNULLS)
|
|
{
|
|
bool hasNull = false;
|
|
|
|
for (uint32_t z = 0; z < tlLargeSideKeyColumns[j].size(); z++)
|
|
if (oldRow.isNullValue(tlLargeSideKeyColumns[j][z]))
|
|
{
|
|
hasNull = true;
|
|
break;
|
|
}
|
|
|
|
if (hasNull) // keys with nulls match everything
|
|
break;
|
|
else
|
|
continue; // non-null keys not in the small side
|
|
|
|
// are in the result
|
|
}
|
|
else // signifies a not-exists query
|
|
continue;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (j == joinerCount)
|
|
{
|
|
uint32_t matchCount;
|
|
for (j = 0; j < joinerCount; j++)
|
|
{
|
|
/* The result is already known if...
|
|
* -- anti-join with no fcnexp
|
|
* -- semi-join with no fcnexp and not scalar
|
|
*
|
|
* The ANTI join case isn't just a shortcut. getJoinResults() will produce results
|
|
* for a different case and generate the wrong result. Need to fix that, later.
|
|
*/
|
|
if ((joinTypes[j] & (SEMI | ANTI)) && !(joinTypes[j] & WITHFCNEXP) && !(joinTypes[j] & SCALAR))
|
|
{
|
|
tSmallSideMatches[j][newRowCount].push_back(-1);
|
|
continue;
|
|
}
|
|
|
|
getJoinResults(oldRow, j, tSmallSideMatches[j][newRowCount]);
|
|
matchCount = tSmallSideMatches[j][newRowCount].size();
|
|
|
|
if (joinTypes[j] & WITHFCNEXP)
|
|
{
|
|
vector<uint32_t> newMatches;
|
|
applyMapping(joinFEMappings[joinerCount], oldRow, &joinFERow);
|
|
|
|
for (uint32_t k = 0; k < matchCount; k++)
|
|
{
|
|
if (tSmallSideMatches[j][newRowCount][k] == (uint32_t)-1)
|
|
smallRows[j].setPointer(smallNullPointers[j]);
|
|
else
|
|
{
|
|
smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]);
|
|
// uint64_t rowOffset = ((uint64_t) tSmallSideMatches[j][newRowCount][k]) *
|
|
// smallRows[j].getSize() + smallSideRGs[j].getEmptySize();
|
|
// smallRows[j].setData(&smallSideRowData[j][rowOffset]);
|
|
}
|
|
|
|
applyMapping(joinFEMappings[j], smallRows[j], &joinFERow);
|
|
|
|
if (joinFEFilters[j]->evaluate(&joinFERow))
|
|
{
|
|
/* The first match includes it in a SEMI join result and excludes it from an ANTI join
|
|
* result. If it's SEMI & SCALAR however, it needs to continue.
|
|
*/
|
|
newMatches.push_back(tSmallSideMatches[j][newRowCount][k]);
|
|
|
|
if ((joinTypes[j] & ANTI) || ((joinTypes[j] & (SEMI | SCALAR)) == SEMI))
|
|
break;
|
|
}
|
|
}
|
|
|
|
tSmallSideMatches[j][newRowCount].swap(newMatches);
|
|
matchCount = tSmallSideMatches[j][newRowCount].size();
|
|
}
|
|
|
|
if (matchCount == 0 && (joinTypes[j] & LARGEOUTER))
|
|
{
|
|
tSmallSideMatches[j][newRowCount].push_back(-1);
|
|
matchCount = 1;
|
|
}
|
|
|
|
/* Scalar check */
|
|
if ((joinTypes[j] & SCALAR) && matchCount > 1)
|
|
throw scalar_exception();
|
|
|
|
/* Reverse the result for anti-join */
|
|
if (joinTypes[j] & ANTI)
|
|
{
|
|
if (matchCount == 0)
|
|
{
|
|
tSmallSideMatches[j][newRowCount].push_back(-1);
|
|
matchCount = 1;
|
|
}
|
|
else
|
|
{
|
|
tSmallSideMatches[j][newRowCount].clear();
|
|
matchCount = 0;
|
|
}
|
|
}
|
|
|
|
/* For all join types, no matches here means it's not in the result */
|
|
if (matchCount == 0)
|
|
break;
|
|
|
|
/* Pair non-scalar semi-joins with a NULL row */
|
|
if ((joinTypes[j] & SEMI) && !(joinTypes[j] & SCALAR))
|
|
{
|
|
tSmallSideMatches[j][newRowCount].clear();
|
|
tSmallSideMatches[j][newRowCount].push_back(-1);
|
|
matchCount = 1;
|
|
}
|
|
|
|
resultCount += matchCount;
|
|
}
|
|
|
|
/* Finally, copy the row into the output */
|
|
if (j == joinerCount)
|
|
{
|
|
// We need to update 8 and 16 bytes in values and wide128Values buffers
|
|
// otherwise unrelated values will be observed in the JOIN-ed output RGData.
|
|
if (i != newRowCount)
|
|
{
|
|
values[newRowCount] = values[i];
|
|
if (mJOINHasSkewedKeyColumn)
|
|
wide128Values[newRowCount] = wide128Values[i];
|
|
relRids[newRowCount] = relRids[i];
|
|
copyRow(oldRow, &newRow);
|
|
// cout << "joined row: " << newRow.toString() << endl;
|
|
}
|
|
|
|
newRowCount++;
|
|
newRow.nextRow();
|
|
}
|
|
// else
|
|
// cout << "j != joinerCount\n";
|
|
}
|
|
// If we've accumulated more than maxResultCount -- 1048576 (2^20)_ of resultCounts, cut off processing.
|
|
// The caller will restart to continue where we left off.
|
|
if (resultCount >= maxResultCount)
|
|
{
|
|
newStartRid += newRowCount;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (resultCount < maxResultCount)
|
|
newStartRid = 0;
|
|
|
|
ridCount = newRowCount;
|
|
outputRG.setRowCount(ridCount);
|
|
|
|
/* prints out the whole result set.
|
|
if (ridCount != 0) {
|
|
cout << "RG rowcount=" << outputRG.getRowCount() << " BPP ridcount=" << ridCount << endl;
|
|
for (i = 0; i < joinerCount; i++) {
|
|
for (j = 0; j < ridCount; j++) {
|
|
cout << "joiner " << i << " has " << tSmallSideMatches[i][j].size() << "
|
|
entries" << endl; cout << "row " << j << ":"; for (uint32_t k = 0; k < tSmallSideMatches[i][j].size();
|
|
k++) cout << " " << tSmallSideMatches[i][j][k]; cout << endl;
|
|
}
|
|
cout << endl;
|
|
}
|
|
}
|
|
*/
|
|
return newStartRid;
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
void BatchPrimitiveProcessor::execute(StopWatch* stopwatch)
|
|
#else
|
|
void BatchPrimitiveProcessor::execute()
|
|
#endif
|
|
{
|
|
uint8_t sendCount = 0;
|
|
// bool smoreRGs = false;
|
|
// uint32_t sStartRid = 0;
|
|
uint32_t i, j;
|
|
|
|
try
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("BatchPrimitiveProcessor::execute first part");
|
|
#endif
|
|
|
|
// if only one scan step which has no predicate, async load all columns
|
|
if (filterCount == 1 && hasScan)
|
|
{
|
|
ColumnCommand* col = dynamic_cast<ColumnCommand*>(filterSteps[0].get());
|
|
|
|
if ((col != NULL) && (col->getFilterCount() == 0) && (col->getLBID() != 0))
|
|
{
|
|
// stored in last pos in relLBID[] and asyncLoaded[]
|
|
uint64_t p = projectCount;
|
|
asyncLoaded[p] = asyncLoaded[p] && (relLBID[p] % blocksReadAhead != 0);
|
|
relLBID[p] += col->getWidth();
|
|
|
|
if (!asyncLoaded[p] && col->willPrefetch())
|
|
{
|
|
loadBlockAsync(col->getLBID(), versionInfo, txnID, col->getCompType(), &cachedIO, &physIO,
|
|
LBIDTrace, sessionID, &counterLock, &busyLoaderCount, sendThread, &vssCache);
|
|
asyncLoaded[p] = true;
|
|
}
|
|
|
|
asyncLoadProjectColumns();
|
|
}
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("BatchPrimitiveProcessor::execute first part");
|
|
stopwatch->start("BatchPrimitiveProcessor::execute second part");
|
|
#endif
|
|
|
|
// filters use relrids and values for intermediate results.
|
|
if (bop == BOP_AND)
|
|
{
|
|
for (j = 0; j < filterCount; ++j)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("- filterSteps[j]->execute()");
|
|
filterSteps[j]->execute();
|
|
stopwatch->stop("- filterSteps[j]->execute()");
|
|
#else
|
|
filterSteps[j]->execute();
|
|
#endif
|
|
}
|
|
}
|
|
else // BOP_OR
|
|
{
|
|
/* XXXPAT: This is a hacky impl of OR logic. Each filter is configured to
|
|
be a scan operation on init. This code runs each independently and
|
|
unions their output ridlists using accumulator. At the end it turns
|
|
accumulator into a final ridlist for subsequent steps.
|
|
|
|
If there's a join or a passthru command in the projection list, the
|
|
values array has to contain values from the last filter step. In that
|
|
case, the last filter step isn't part of the "OR" filter processing.
|
|
JLF has added it to prep those operations, not to be a filter.
|
|
|
|
7/7/09 update: the multiple-table join required relocating the join op. It's
|
|
no longer necessary to add the loader columncommand to the filter array.
|
|
*/
|
|
|
|
bool accumulator[LOGICAL_BLOCK_RIDS];
|
|
// uint32_t realFilterCount = ((forHJ || hasPassThru) ? filterCount - 1 : filterCount);
|
|
uint32_t realFilterCount = filterCount;
|
|
|
|
for (i = 0; i < LOGICAL_BLOCK_RIDS; i++)
|
|
accumulator[i] = false;
|
|
|
|
if (!hasScan) // there are input rids
|
|
for (i = 0; i < ridCount; i++)
|
|
accumulator[relRids[i]] = true;
|
|
|
|
ridCount = 0;
|
|
|
|
for (i = 0; i < realFilterCount; ++i)
|
|
{
|
|
filterSteps[i]->execute();
|
|
|
|
if (!filterSteps[i]->filterFeeder())
|
|
{
|
|
for (j = 0; j < ridCount; j++)
|
|
accumulator[relRids[j]] = true;
|
|
|
|
ridCount = 0;
|
|
}
|
|
}
|
|
|
|
for (ridMap = 0, i = 0; i < LOGICAL_BLOCK_RIDS; ++i)
|
|
{
|
|
if (accumulator[i])
|
|
{
|
|
relRids[ridCount] = i;
|
|
ridMap |= 1 << (relRids[ridCount] >> 9);
|
|
++ridCount;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("BatchPrimitiveProcessor::execute second part");
|
|
stopwatch->start("BatchPrimitiveProcessor::execute third part");
|
|
#endif
|
|
|
|
if (projectCount > 0 || ot == ROW_GROUP)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("- writeProjectionPreamble");
|
|
writeProjectionPreamble();
|
|
stopwatch->stop("- writeProjectionPreamble");
|
|
#else
|
|
writeProjectionPreamble();
|
|
#endif
|
|
}
|
|
|
|
// async load blocks for project phase, if not alread loaded
|
|
if (ridCount > 0)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("- asyncLoadProjectColumns");
|
|
asyncLoadProjectColumns();
|
|
stopwatch->stop("- asyncLoadProjectColumns");
|
|
#else
|
|
asyncLoadProjectColumns();
|
|
#endif
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("BatchPrimitiveProcessor::execute third part");
|
|
stopwatch->start("BatchPrimitiveProcessor::execute fourth part");
|
|
#endif
|
|
|
|
// projection commands read relrids and write output directly to a rowgroup
|
|
// or the serialized bytestream
|
|
if (ot != ROW_GROUP)
|
|
{
|
|
for (j = 0; j < projectCount; ++j)
|
|
{
|
|
projectSteps[j]->project();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/* Function & Expression group 1 processing
|
|
- project for FE1
|
|
- execute FE1 row by row
|
|
- if return value = true, map input row into the projection RG, adjust ridlist
|
|
*/
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("- if(ot != ROW_GROUP) else");
|
|
#endif
|
|
outputRG.resetRowGroup(baseRid);
|
|
|
|
if (fe1)
|
|
{
|
|
uint32_t newRidCount = 0;
|
|
fe1Input.resetRowGroup(baseRid);
|
|
fe1Input.setRowCount(ridCount);
|
|
fe1Input.getRow(0, &fe1In);
|
|
outputRG.getRow(0, &fe1Out);
|
|
|
|
for (j = 0; j < projectCount; j++)
|
|
if (projectForFE1[j] != -1)
|
|
projectSteps[j]->projectIntoRowGroup(fe1Input, projectForFE1[j]);
|
|
|
|
for (j = 0; j < ridCount; j++, fe1In.nextRow())
|
|
if (fe1->evaluate(&fe1In))
|
|
{
|
|
applyMapping(fe1ToProjection, fe1In, &fe1Out);
|
|
relRids[newRidCount] = relRids[j];
|
|
values[newRidCount++] = values[j];
|
|
fe1Out.nextRow();
|
|
}
|
|
|
|
ridCount = newRidCount;
|
|
}
|
|
|
|
outputRG.setRowCount(ridCount);
|
|
|
|
if (sendRidsAtDelivery)
|
|
{
|
|
Row r;
|
|
outputRG.initRow(&r);
|
|
outputRG.getRow(0, &r);
|
|
|
|
for (j = 0; j < ridCount; ++j)
|
|
{
|
|
r.setRid(relRids[j]);
|
|
r.nextRow();
|
|
}
|
|
}
|
|
|
|
/* 7/7/09 PL: I Changed the projection alg to reduce block touches when there's
|
|
a join. The key columns get projected first, the join is executed to further
|
|
reduce the ridlist, then the rest of the columns get projected */
|
|
|
|
if (!doJoin)
|
|
{
|
|
for (j = 0; j < projectCount; ++j)
|
|
{
|
|
// cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl;
|
|
if (projectionMap[j] != -1)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("-- projectIntoRowGroup");
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
stopwatch->stop("-- projectIntoRowGroup");
|
|
#else
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
#endif
|
|
}
|
|
|
|
// else
|
|
// cout << " no target found for OID " << projectSteps[j]->getOID() <<
|
|
//endl;
|
|
}
|
|
if (fe2)
|
|
{
|
|
/* functionize this -> processFE2() */
|
|
fe2Output.resetRowGroup(baseRid);
|
|
fe2Output.getRow(0, &fe2Out);
|
|
fe2Input->getRow(0, &fe2In);
|
|
|
|
// cerr << "input row: " << fe2In.toString() << endl;
|
|
for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow())
|
|
{
|
|
if (fe2->evaluate(&fe2In))
|
|
{
|
|
applyMapping(fe2Mapping, fe2In, &fe2Out);
|
|
// cerr << " passed. output row: " << fe2Out.toString() << endl;
|
|
fe2Out.setRid(fe2In.getRelRid());
|
|
fe2Output.incRowCount();
|
|
fe2Out.nextRow();
|
|
}
|
|
}
|
|
|
|
if (!fAggregator)
|
|
{
|
|
*serialized << (uint8_t)1; // the "count this msg" var
|
|
fe2Output.setDBRoot(dbRoot);
|
|
fe2Output.serializeRGData(*serialized);
|
|
//*serialized << fe2Output.getDataSize();
|
|
// serialized->append(fe2Output.getData(), fe2Output.getDataSize());
|
|
}
|
|
}
|
|
|
|
if (fAggregator)
|
|
{
|
|
*serialized << (uint8_t)1; // the "count this msg" var
|
|
|
|
RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
|
|
// toAggregate.convertToInlineDataInPlace();
|
|
|
|
if (fe2)
|
|
fe2Output.setDBRoot(dbRoot);
|
|
else
|
|
outputRG.setDBRoot(dbRoot);
|
|
|
|
fAggregator->addRowGroup(&toAggregate);
|
|
|
|
if ((currentBlockOffset + 1) == count) // @bug4507, 8k
|
|
{
|
|
fAggregator->loadResult(*serialized); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
|
|
{
|
|
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
else // @bug4507, 8k
|
|
{
|
|
fAggregator->loadResult(*serialized); // @bug4507, 8k
|
|
fAggregator->aggReset(); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
}
|
|
|
|
if (!fAggregator && !fe2)
|
|
{
|
|
*serialized << (uint8_t)1; // the "count this msg" var
|
|
outputRG.setDBRoot(dbRoot);
|
|
// cerr << "serializing " << outputRG.toString() << endl;
|
|
outputRG.serializeRGData(*serialized);
|
|
|
|
//*serialized << outputRG.getDataSize();
|
|
// serialized->append(outputRG.getData(), outputRG.getDataSize());
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("- if(ot != ROW_GROUP) else");
|
|
#endif
|
|
}
|
|
else // Is doJoin
|
|
{
|
|
uint32_t startRid = 0;
|
|
ByteStream preamble = *serialized;
|
|
origRidCount = ridCount; // ridCount can get modified by executeTupleJoin(). We need to keep track of
|
|
// the original val.
|
|
/* project the key columns. If there's the filter IN the join, project everything.
|
|
Also need to project 'long' strings b/c executeTupleJoin may copy entire rows
|
|
using copyRow(), which will try to interpret the uninit'd string ptr.
|
|
Valgrind will legitimately complain about copying uninit'd values for the
|
|
other types but that is technically safe. */
|
|
for (j = 0; j < projectCount; j++)
|
|
{
|
|
if (keyColumnProj[j] ||
|
|
(projectionMap[j] != -1 && (hasJoinFEFilters || oldRow.isLongString(projectionMap[j]))))
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("-- projectIntoRowGroup");
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
stopwatch->stop("-- projectIntoRowGroup");
|
|
#else
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
do // while (startRid > 0)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("-- executeTupleJoin()");
|
|
startRid = executeTupleJoin(startRid);
|
|
stopwatch->stop("-- executeTupleJoin()");
|
|
#else
|
|
startRid = executeTupleJoin(startRid);
|
|
// sStartRid = startRid;
|
|
#endif
|
|
/* project the non-key columns */
|
|
for (j = 0; j < projectCount; ++j)
|
|
{
|
|
if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
|
|
!oldRow.isLongString(projectionMap[j]))
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("-- projectIntoRowGroup");
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
stopwatch->stop("-- projectIntoRowGroup");
|
|
#else
|
|
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
|
|
#endif
|
|
}
|
|
}
|
|
/* The RowGroup is fully joined at this point.
|
|
* Add additional RowGroup processing here.
|
|
* TODO: Try to clean up all of the switching */
|
|
|
|
if (fe2 || fAggregator)
|
|
{
|
|
bool moreRGs = true;
|
|
initGJRG();
|
|
|
|
while (moreRGs && !sendThread->aborted())
|
|
{
|
|
/*
|
|
* generate 1 rowgroup (8192 rows max) of joined rows
|
|
* if there's an FE2, run it
|
|
* -pack results into a new rowgroup
|
|
* -if there are < 8192 rows in the new RG, continue
|
|
* if there's an agg, run it
|
|
* send the result
|
|
*/
|
|
resetGJRG();
|
|
moreRGs = generateJoinedRowGroup(baseJRow);
|
|
// smoreRGs = moreRGs;
|
|
sendCount = (uint8_t)(!moreRGs && !startRid);
|
|
// *serialized << (uint8_t)(!moreRGs && !startRid); // the "count
|
|
// this msg" var
|
|
*serialized << sendCount;
|
|
if (fe2)
|
|
{
|
|
/* functionize this -> processFE2()*/
|
|
fe2Output.resetRowGroup(baseRid);
|
|
fe2Output.setDBRoot(dbRoot);
|
|
fe2Output.getRow(0, &fe2Out);
|
|
fe2Input->getRow(0, &fe2In);
|
|
|
|
for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow())
|
|
{
|
|
if (fe2->evaluate(&fe2In))
|
|
{
|
|
applyMapping(fe2Mapping, fe2In, &fe2Out);
|
|
fe2Out.setRid(fe2In.getRelRid());
|
|
fe2Output.incRowCount();
|
|
fe2Out.nextRow();
|
|
}
|
|
}
|
|
}
|
|
|
|
RowGroup& nextRG = (fe2 ? fe2Output : joinedRG);
|
|
nextRG.setDBRoot(dbRoot);
|
|
|
|
if (fAggregator)
|
|
{
|
|
fAggregator->addRowGroup(&nextRG);
|
|
|
|
if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k
|
|
{
|
|
fAggregator->loadResult(*serialized); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
|
|
{
|
|
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
else // @bug4507, 8k
|
|
{
|
|
fAggregator->loadResult(*serialized); // @bug4507, 8k
|
|
fAggregator->aggReset(); // @bug4507, 8k
|
|
} // @bug4507, 8k
|
|
}
|
|
else
|
|
{
|
|
// cerr <<" * serialzing " << nextRG.toString() << endl;
|
|
nextRG.serializeRGData(*serialized);
|
|
}
|
|
|
|
/* send the msg & reinit the BS */
|
|
if (moreRGs)
|
|
{
|
|
sendResponse();
|
|
serialized.reset(new ByteStream());
|
|
*serialized = preamble;
|
|
}
|
|
}
|
|
|
|
if (hasSmallOuterJoin)
|
|
{
|
|
// Should we happen to finish sending data rows right on the boundary of when moreRGs flips off,
|
|
// then we need to start a new buffer. I.e., it needs the count this message byte pushed.
|
|
if (serialized->length() == preamble.length())
|
|
*serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var
|
|
|
|
*serialized << ridCount;
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
{
|
|
for (j = 0; j < ridCount; ++j)
|
|
{
|
|
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
|
|
tSmallSideMatches[i][j].clear();
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// We hae no more use for this allocation
|
|
for (i = 0; i < joinerCount; i++)
|
|
for (j = 0; j < ridCount; ++j)
|
|
tSmallSideMatches[i][j].clear();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
*serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var
|
|
outputRG.setDBRoot(dbRoot);
|
|
// cerr << "serializing " << outputRG.toString() << endl;
|
|
outputRG.serializeRGData(*serialized);
|
|
|
|
//*serialized << outputRG.getDataSize();
|
|
// serialized->append(outputRG.getData(), outputRG.getDataSize());
|
|
for (i = 0; i < joinerCount; i++)
|
|
{
|
|
for (j = 0; j < ridCount; ++j)
|
|
{
|
|
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
|
|
tSmallSideMatches[i][j].clear();
|
|
}
|
|
}
|
|
}
|
|
if (startRid > 0)
|
|
{
|
|
sendResponse();
|
|
serialized.reset(new ByteStream());
|
|
*serialized = preamble;
|
|
}
|
|
} while (startRid > 0);
|
|
}
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("- if(ot != ROW_GROUP) else");
|
|
#endif
|
|
}
|
|
ridCount = origRidCount; // May not be needed, but just to be safe.
|
|
// std::cout << "end of send. startRid=" << sStartRid << " moreRG=" << smoreRGs << " sendCount=" <<
|
|
// sendCount << std::endl;
|
|
if (projectCount > 0 || ot == ROW_GROUP)
|
|
{
|
|
*serialized << cachedIO;
|
|
cachedIO = 0;
|
|
*serialized << physIO;
|
|
physIO = 0;
|
|
*serialized << touchedBlocks;
|
|
touchedBlocks = 0;
|
|
// cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
|
|
// " touchedBlocks=" << touchedBlocks << endl;
|
|
}
|
|
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop("BatchPrimitiveProcessor::execute fourth part");
|
|
#endif
|
|
}
|
|
catch (logging::QueryDataExcept& qex)
|
|
{
|
|
writeErrorMsg(qex.what(), qex.errorCode());
|
|
}
|
|
catch (logging::DictionaryBufferOverflow& db)
|
|
{
|
|
writeErrorMsg(db.what(), db.errorCode());
|
|
}
|
|
catch (scalar_exception& se)
|
|
{
|
|
writeErrorMsg(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW), ERR_MORE_THAN_1_ROW, false);
|
|
}
|
|
catch (NeedToRestartJob& n)
|
|
{
|
|
#if 0
|
|
|
|
/* This block of code will flush the problematic OIDs from the
|
|
* cache. It seems to have no effect on the problem, so it's commented
|
|
* for now.
|
|
*
|
|
* This is currently thrown only on syscat queries. If we find the problem
|
|
* in user tables also, we should avoid dropping entire OIDs if possible.
|
|
*
|
|
* In local testing there was no need for flushing, because DDL flushes
|
|
* the syscat constantly. However, it can take a long time (>10 s) before
|
|
* that happens. Doing it locally should make it much more likely only
|
|
* one restart is necessary.
|
|
*/
|
|
|
|
try
|
|
{
|
|
vector<uint32_t> oids;
|
|
uint32_t oid;
|
|
|
|
for (uint32_t i = 0; i < filterCount; i++)
|
|
{
|
|
oid = filterSteps[i]->getOID();
|
|
|
|
if (oid > 0)
|
|
oids.push_back(oid);
|
|
}
|
|
|
|
for (uint32_t i = 0; i < projectCount; i++)
|
|
{
|
|
oid = projectSteps[i]->getOID();
|
|
|
|
if (oid > 0)
|
|
oids.push_back(oid);
|
|
}
|
|
|
|
#if 0
|
|
Logger logger;
|
|
ostringstream os;
|
|
os << "dropping OIDs: ";
|
|
|
|
for (int i = 0; i < oids.size(); i++)
|
|
os << oids[i] << " ";
|
|
|
|
logger.logMessage(os.str());
|
|
#endif
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
dbbc::blockCacheClient bc(*BRPp[i]);
|
|
// bc.flushCache();
|
|
bc.flushOIDs(&oids[0], oids.size());
|
|
}
|
|
}
|
|
catch (...) { } // doesn't matter if this fails, just avoid crashing
|
|
|
|
#endif
|
|
|
|
#ifndef __FreeBSD__
|
|
pthread_mutex_unlock(&objLock);
|
|
#endif
|
|
throw n; // need to pass this through to BPPSeeder
|
|
}
|
|
catch (IDBExcept& iex)
|
|
{
|
|
writeErrorMsg(iex.what(), iex.errorCode(), true, false);
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr);
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("BatchPrimitiveProcessor caught an unknown exception");
|
|
writeErrorMsg(msg, logging::batchPrimitiveProcessorErr);
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCode, bool logIt, bool critical)
|
|
{
|
|
ISMPacketHeader ism;
|
|
PrimitiveHeader ph;
|
|
|
|
// we don't need every field of these headers. Init'ing them anyway
|
|
// makes memory checkers happy.
|
|
void* ismp = static_cast<void*>(&ism);
|
|
void* php = static_cast<void*>(&ph);
|
|
memset(ismp, 0, sizeof(ISMPacketHeader));
|
|
memset(php, 0, sizeof(PrimitiveHeader));
|
|
ph.SessionID = sessionID;
|
|
ph.StepID = stepID;
|
|
ph.UniqueID = uniqueID;
|
|
ism.Status = errCode;
|
|
|
|
serialized.reset(new ByteStream());
|
|
serialized->append((uint8_t*)&ism, sizeof(ism));
|
|
serialized->append((uint8_t*)&ph, sizeof(ph));
|
|
*serialized << error;
|
|
|
|
if (logIt)
|
|
{
|
|
Logger log;
|
|
log.logMessage(error, critical);
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::writeProjectionPreamble()
|
|
{
|
|
ISMPacketHeader ism;
|
|
PrimitiveHeader ph;
|
|
|
|
// we don't need every field of these headers. Init'ing them anyway
|
|
// makes memory checkers happy.
|
|
void* ismp = static_cast<void*>(&ism);
|
|
void* php = static_cast<void*>(&ph);
|
|
memset(ismp, 0, sizeof(ISMPacketHeader));
|
|
memset(php, 0, sizeof(PrimitiveHeader));
|
|
ph.SessionID = sessionID;
|
|
ph.StepID = stepID;
|
|
ph.UniqueID = uniqueID;
|
|
|
|
serialized.reset(new ByteStream());
|
|
serialized->append((uint8_t*)&ism, sizeof(ism));
|
|
serialized->append((uint8_t*)&ph, sizeof(ph));
|
|
|
|
/* add-ons */
|
|
if (hasScan)
|
|
{
|
|
if (validCPData)
|
|
{
|
|
*serialized << (uint8_t)1;
|
|
*serialized << lbidForCP;
|
|
*serialized << ((uint8_t)cpDataFromDictScan);
|
|
if (UNLIKELY(hasWideColumnOut))
|
|
{
|
|
// PSA width
|
|
*serialized << (uint8_t)wideColumnWidthOut;
|
|
*serialized << min128Val;
|
|
*serialized << max128Val;
|
|
}
|
|
else
|
|
{
|
|
*serialized << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value
|
|
*serialized << (uint64_t)minVal;
|
|
*serialized << (uint64_t)maxVal;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
*serialized << (uint8_t)0;
|
|
*serialized << lbidForCP;
|
|
}
|
|
}
|
|
|
|
// ridsOut += ridCount;
|
|
/* results */
|
|
|
|
if (ot != ROW_GROUP)
|
|
{
|
|
*serialized << ridCount;
|
|
|
|
if (sendRidsAtDelivery)
|
|
{
|
|
*serialized << baseRid;
|
|
serialized->append((uint8_t*)relRids, ridCount << 1);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::serializeElementTypes()
|
|
{
|
|
*serialized << baseRid;
|
|
*serialized << ridCount;
|
|
serialized->append((uint8_t*)relRids, ridCount << 1);
|
|
serialized->append((uint8_t*)values, ridCount << 3);
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::serializeStrings()
|
|
{
|
|
*serialized << ridCount;
|
|
serialized->append((uint8_t*)absRids.get(), ridCount << 3);
|
|
|
|
for (uint32_t i = 0; i < ridCount; ++i)
|
|
*serialized << strValues[i];
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::sendResponse()
|
|
{
|
|
if (sendThread->flowControlEnabled())
|
|
{
|
|
// newConnection should be set only for the first result of a batch job
|
|
// it tells sendthread it should consider it for the connection array
|
|
sendThread->sendResult(BPPSendThread::Msg_t(serialized, sock, writelock, sockIndex), newConnection);
|
|
newConnection = false;
|
|
}
|
|
else
|
|
{
|
|
boost::mutex::scoped_lock lk(*writelock);
|
|
sock->write(*serialized);
|
|
}
|
|
|
|
serialized.reset();
|
|
}
|
|
|
|
/* The output of a filter chain is either ELEMENT_TYPE or STRING_ELEMENT_TYPE */
|
|
void BatchPrimitiveProcessor::makeResponse()
|
|
{
|
|
ISMPacketHeader ism;
|
|
PrimitiveHeader ph;
|
|
|
|
// we don't need every field of these headers. Init'ing them anyway
|
|
// makes memory checkers happy.
|
|
void* ismp = static_cast<void*>(&ism);
|
|
void* php = static_cast<void*>(&ph);
|
|
memset(ismp, 0, sizeof(ISMPacketHeader));
|
|
memset(php, 0, sizeof(PrimitiveHeader));
|
|
ph.SessionID = sessionID;
|
|
ph.StepID = stepID;
|
|
ph.UniqueID = uniqueID;
|
|
|
|
serialized.reset(new ByteStream());
|
|
serialized->append((uint8_t*)&ism, sizeof(ism));
|
|
serialized->append((uint8_t*)&ph, sizeof(ph));
|
|
|
|
/* add-ons */
|
|
if (hasScan)
|
|
{
|
|
if (validCPData)
|
|
{
|
|
*serialized << (uint8_t)1;
|
|
*serialized << lbidForCP;
|
|
*serialized << ((uint8_t)cpDataFromDictScan);
|
|
|
|
if (UNLIKELY(hasWideColumnOut))
|
|
{
|
|
// PSA width
|
|
// Remove the assert for >16 bytes DTs.
|
|
assert(wideColumnWidthOut == datatypes::MAXDECIMALWIDTH);
|
|
*serialized << (uint8_t)wideColumnWidthOut;
|
|
*serialized << min128Val;
|
|
*serialized << max128Val;
|
|
}
|
|
else
|
|
{
|
|
*serialized << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value
|
|
*serialized << (uint64_t)minVal;
|
|
*serialized << (uint64_t)maxVal;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
*serialized << (uint8_t)0;
|
|
*serialized << lbidForCP;
|
|
}
|
|
}
|
|
|
|
/* results */
|
|
/* Take the rid and value arrays, munge into OutputType ot */
|
|
switch (ot)
|
|
{
|
|
case BPS_ELEMENT_TYPE: serializeElementTypes(); break;
|
|
|
|
case STRING_ELEMENT_TYPE: serializeStrings(); break;
|
|
|
|
default:
|
|
{
|
|
ostringstream oss;
|
|
oss << "BPP: makeResponse(): Bad output type: " << ot;
|
|
throw logic_error(oss.str());
|
|
}
|
|
|
|
// throw logic_error("BPP: makeResponse(): Bad output type");
|
|
}
|
|
|
|
*serialized << cachedIO;
|
|
cachedIO = 0;
|
|
*serialized << physIO;
|
|
physIO = 0;
|
|
*serialized << touchedBlocks;
|
|
touchedBlocks = 0;
|
|
|
|
// cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
|
|
// " touchedBlocks=" << touchedBlocks << endl;
|
|
}
|
|
|
|
int BatchPrimitiveProcessor::operator()()
|
|
{
|
|
utils::setThreadName("PPBatchPrimProc");
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
const static std::string msg{"BatchPrimitiveProcessor::operator()"};
|
|
logging::StopWatch* stopwatch = profiler.getTimer();
|
|
#endif
|
|
|
|
if (currentBlockOffset == 0)
|
|
{
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start(msg);
|
|
#endif
|
|
idbassert(count > 0);
|
|
}
|
|
|
|
if (fAggregator && currentBlockOffset == 0) // @bug4507, 8k
|
|
fAggregator->aggReset(); // @bug4507, 8k
|
|
|
|
for (; currentBlockOffset < count; currentBlockOffset++)
|
|
{
|
|
if (!(sessionID & 0x80000000)) // can't do this with syscat queries
|
|
{
|
|
if (sendThread->aborted())
|
|
break;
|
|
|
|
if (sendThread->sizeTooBig())
|
|
{
|
|
// The send buffer is full of messages yet to be sent, so this thread would block anyway.
|
|
freeLargeBuffers();
|
|
return -1; // the reschedule error code
|
|
}
|
|
}
|
|
|
|
allocLargeBuffers();
|
|
|
|
if (LIKELY(!hasWideColumnOut))
|
|
{
|
|
minVal = MAX64;
|
|
maxVal = MIN64;
|
|
}
|
|
else
|
|
{
|
|
max128Val = datatypes::Decimal::minInt128;
|
|
min128Val = datatypes::Decimal::maxInt128;
|
|
}
|
|
|
|
validCPData = false;
|
|
cpDataFromDictScan = false;
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->start("BPP() execute");
|
|
execute(stopwatch);
|
|
stopwatch->stop("BPP() execute");
|
|
#else
|
|
execute();
|
|
#endif
|
|
|
|
if (projectCount == 0 && ot != ROW_GROUP)
|
|
makeResponse();
|
|
|
|
try
|
|
{
|
|
sendResponse();
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
break; // If we make this throw, be sure to do the cleanup at the end
|
|
}
|
|
|
|
// Bug 4475: Control outgoing socket so that all messages from a
|
|
// batch go out the same socket
|
|
sockIndex = (sockIndex + 1) % connectionsPerUM;
|
|
|
|
nextLBID();
|
|
|
|
/* Base RIDs are now a combination of partition#, segment#, extent#, and block#. */
|
|
uint32_t partNum;
|
|
uint16_t segNum;
|
|
uint8_t extentNum;
|
|
uint16_t blockNum;
|
|
rowgroup::getLocationFromRid(baseRid, &partNum, &segNum, &extentNum, &blockNum);
|
|
/*
|
|
cout << "baseRid=" << baseRid << " partNum=" << partNum << " segNum=" << segNum <<
|
|
" extentNum=" << (int) extentNum
|
|
<< " blockNum=" << blockNum << endl;
|
|
*/
|
|
blockNum++;
|
|
baseRid = rowgroup::convertToRid(partNum, segNum, extentNum, blockNum);
|
|
/*
|
|
cout << "-- baseRid=" << baseRid << " partNum=" << partNum << " extentNum=" << (int) extentNum
|
|
<< " blockNum=" << blockNum << endl;
|
|
*/
|
|
}
|
|
vssCache.clear();
|
|
#ifndef __FreeBSD__
|
|
pthread_mutex_unlock(&objLock);
|
|
#endif
|
|
freeLargeBuffers();
|
|
#ifdef PRIMPROC_STOPWATCH
|
|
stopwatch->stop(msg);
|
|
#endif
|
|
// cout << "sent " << count << " responses" << endl;
|
|
fBusy = false;
|
|
return 0;
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::allocLargeBuffers()
|
|
{
|
|
if (ot == ROW_GROUP && !outRowGroupData)
|
|
{
|
|
// outputRG.setUseStringTable(true);
|
|
outRowGroupData.reset(new RGData(outputRG));
|
|
outputRG.setData(outRowGroupData.get());
|
|
}
|
|
|
|
if (fe1 && !fe1Data)
|
|
{
|
|
// fe1Input.setUseStringTable(true);
|
|
fe1Data.reset(new RGData(fe1Input));
|
|
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
|
|
fe1Input.setData(fe1Data.get());
|
|
}
|
|
|
|
if (fe2 && !fe2Data)
|
|
{
|
|
// fe2Output.setUseStringTable(true);
|
|
fe2Data.reset(new RGData(fe2Output));
|
|
fe2Output.setData(fe2Data.get());
|
|
}
|
|
|
|
if (getTupleJoinRowGroupData && !joinedRGMem)
|
|
{
|
|
// joinedRG.setUseStringTable(true);
|
|
joinedRGMem.reset(new RGData(joinedRG));
|
|
joinedRG.setData(joinedRGMem.get());
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::freeLargeBuffers()
|
|
{
|
|
/* Get rid of large buffers */
|
|
if (ot == ROW_GROUP && outputRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
|
|
outRowGroupData.reset();
|
|
|
|
if (fe1 && fe1Input.getMaxDataSizeWithStrings() > maxIdleBufferSize)
|
|
fe1Data.reset();
|
|
|
|
if (fe2 && fe2Output.getMaxDataSizeWithStrings() > maxIdleBufferSize)
|
|
fe2Data.reset();
|
|
|
|
if (getTupleJoinRowGroupData && joinedRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
|
|
joinedRGMem.reset();
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::nextLBID()
|
|
{
|
|
uint32_t i;
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
filterSteps[i]->nextLBID();
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
projectSteps[i]->nextLBID();
|
|
}
|
|
|
|
SBPP BatchPrimitiveProcessor::duplicate()
|
|
{
|
|
SBPP bpp;
|
|
uint32_t i;
|
|
|
|
// cout << "duplicating a bpp\n";
|
|
|
|
bpp.reset(new BatchPrimitiveProcessor());
|
|
bpp->ot = ot;
|
|
bpp->versionInfo = versionInfo;
|
|
bpp->txnID = txnID;
|
|
bpp->sessionID = sessionID;
|
|
bpp->stepID = stepID;
|
|
bpp->uniqueID = uniqueID;
|
|
bpp->needStrValues = needStrValues;
|
|
bpp->wideColumnsWidths = wideColumnsWidths;
|
|
bpp->gotAbsRids = gotAbsRids;
|
|
bpp->gotValues = gotValues;
|
|
bpp->LBIDTrace = LBIDTrace;
|
|
bpp->hasScan = hasScan;
|
|
bpp->hasFilterStep = hasFilterStep;
|
|
bpp->filtOnString = filtOnString;
|
|
bpp->hasRowGroup = hasRowGroup;
|
|
bpp->getTupleJoinRowGroupData = getTupleJoinRowGroupData;
|
|
bpp->bop = bop;
|
|
bpp->hasPassThru = hasPassThru;
|
|
bpp->forHJ = forHJ;
|
|
bpp->processorThreads = processorThreads; // is a power-of-2 at this point
|
|
bpp->ptMask = processorThreads - 1;
|
|
|
|
if (ot == ROW_GROUP)
|
|
{
|
|
bpp->outputRG = outputRG;
|
|
|
|
if (fe1)
|
|
{
|
|
bpp->fe1.reset(new FuncExpWrapper(*fe1));
|
|
bpp->fe1Input = fe1Input;
|
|
}
|
|
|
|
if (fe2)
|
|
{
|
|
bpp->fe2.reset(new FuncExpWrapper(*fe2));
|
|
bpp->fe2Output = fe2Output;
|
|
}
|
|
}
|
|
|
|
bpp->doJoin = doJoin;
|
|
|
|
if (doJoin)
|
|
{
|
|
pthread_mutex_lock(&bpp->objLock);
|
|
/* There are add'l join vars, but only these are necessary for processing
|
|
a join */
|
|
bpp->tJoinerSizes = tJoinerSizes;
|
|
bpp->joinerCount = joinerCount;
|
|
bpp->joinTypes = joinTypes;
|
|
bpp->largeSideKeyColumns = largeSideKeyColumns;
|
|
bpp->tJoiners = tJoiners;
|
|
// bpp->_pools = _pools;
|
|
bpp->typelessJoin = typelessJoin;
|
|
bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
|
|
bpp->tlSmallSideKeyColumns = tlSmallSideKeyColumns;
|
|
bpp->tlJoiners = tlJoiners;
|
|
bpp->tlSmallSideKeyLengths = tlSmallSideKeyLengths;
|
|
bpp->storedKeyAllocators = storedKeyAllocators;
|
|
bpp->joinNullValues = joinNullValues;
|
|
bpp->doMatchNulls = doMatchNulls;
|
|
bpp->hasJoinFEFilters = hasJoinFEFilters;
|
|
bpp->hasSmallOuterJoin = hasSmallOuterJoin;
|
|
bpp->mJOINHasSkewedKeyColumn = mJOINHasSkewedKeyColumn;
|
|
bpp->mSmallSideRGPtr = mSmallSideRGPtr;
|
|
bpp->mSmallSideKeyColumnsPtr = mSmallSideKeyColumnsPtr;
|
|
if (!getTupleJoinRowGroupData && mJOINHasSkewedKeyColumn)
|
|
{
|
|
idbassert(!smallSideRGs.empty());
|
|
bpp->smallSideRGs.push_back(smallSideRGs[0]);
|
|
}
|
|
|
|
if (hasJoinFEFilters)
|
|
{
|
|
bpp->joinFERG = joinFERG;
|
|
bpp->joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
|
|
|
|
for (i = 0; i < joinerCount; i++)
|
|
if (joinFEFilters[i])
|
|
bpp->joinFEFilters[i].reset(new FuncExpWrapper(*joinFEFilters[i]));
|
|
}
|
|
|
|
if (getTupleJoinRowGroupData)
|
|
{
|
|
bpp->smallSideRGs = smallSideRGs;
|
|
bpp->largeSideRG = largeSideRG;
|
|
bpp->smallSideRowLengths = smallSideRowLengths;
|
|
bpp->smallSideRowData = smallSideRowData;
|
|
bpp->smallNullRowData = smallNullRowData;
|
|
bpp->smallNullPointers = smallNullPointers;
|
|
bpp->joinedRG = joinedRG;
|
|
}
|
|
|
|
#ifdef __FreeBSD__
|
|
pthread_mutex_unlock(&bpp->objLock);
|
|
#endif
|
|
}
|
|
|
|
bpp->filterCount = filterCount;
|
|
bpp->filterSteps.resize(filterCount);
|
|
|
|
for (i = 0; i < filterCount; ++i)
|
|
bpp->filterSteps[i] = filterSteps[i]->duplicate();
|
|
|
|
bpp->projectCount = projectCount;
|
|
bpp->projectSteps.resize(projectCount);
|
|
|
|
for (i = 0; i < projectCount; ++i)
|
|
bpp->projectSteps[i] = projectSteps[i]->duplicate();
|
|
|
|
if (fAggregator.get() != NULL)
|
|
{
|
|
bpp->fAggregateRG = fAggregateRG;
|
|
bpp->fAggregator.reset(new RowAggregation(fAggregator->getGroupByCols(), fAggregator->getAggFunctions()));
|
|
bpp->fAggregator->timeZone(fAggregator->timeZone());
|
|
}
|
|
|
|
bpp->sendRidsAtDelivery = sendRidsAtDelivery;
|
|
bpp->prefetchThreshold = prefetchThreshold;
|
|
|
|
bpp->sock = sock;
|
|
bpp->writelock = writelock;
|
|
bpp->hasDictStep = hasDictStep;
|
|
bpp->sendThread = sendThread;
|
|
bpp->newConnection = true;
|
|
bpp->initProcessor();
|
|
return bpp;
|
|
}
|
|
|
|
#if 0
|
|
bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const
|
|
{
|
|
uint32_t i;
|
|
|
|
if (ot != bpp.ot)
|
|
return false;
|
|
|
|
if (versionInfo != bpp.versionInfo)
|
|
return false;
|
|
|
|
if (txnID != bpp.txnID)
|
|
return false;
|
|
|
|
if (sessionID != bpp.sessionID)
|
|
return false;
|
|
|
|
if (stepID != bpp.stepID)
|
|
return false;
|
|
|
|
if (uniqueID != bpp.uniqueID)
|
|
return false;
|
|
|
|
if (gotValues != bpp.gotValues)
|
|
return false;
|
|
|
|
if (gotAbsRids != bpp.gotAbsRids)
|
|
return false;
|
|
|
|
if (needStrValues != bpp.needStrValues)
|
|
return false;
|
|
|
|
if (filterCount != bpp.filterCount)
|
|
return false;
|
|
|
|
if (projectCount != bpp.projectCount)
|
|
return false;
|
|
|
|
if (sendRidsAtDelivery != bpp.sendRidsAtDelivery)
|
|
return false;
|
|
|
|
if (hasScan != bpp.hasScan)
|
|
return false;
|
|
|
|
if (hasFilterStep != bpp.hasFilterStep)
|
|
return false;
|
|
|
|
if (filtOnString != bpp.filtOnString)
|
|
return false;
|
|
|
|
if (doJoin != bpp.doJoin)
|
|
return false;
|
|
|
|
if (doJoin)
|
|
|
|
/* Join equality test is a bit out of date */
|
|
if (joiner != bpp.joiner || joinerSize != bpp.joinerSize)
|
|
return false;
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
if (*filterSteps[i] != *bpp.filterSteps[i])
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
void BatchPrimitiveProcessor::asyncLoadProjectColumns()
|
|
{
|
|
// relLBID is the LBID related to the primMsg->LBID,
|
|
// it is used to keep the read ahead boundary for asyncLoads
|
|
// 1. scan driven case: load blocks in # to (# + blocksReadAhead - 1) range,
|
|
// where # is a multiple of ColScanReadAheadBlocks in Columnstore.xml
|
|
// 2. non-scan driven case: load blocks in the logical block.
|
|
// because 1 logical block per primMsg, asyncLoad only once per message.
|
|
for (uint64_t i = 0; i < projectCount; ++i)
|
|
{
|
|
// only care about column commands
|
|
ColumnCommand* col = dynamic_cast<ColumnCommand*>(projectSteps[i].get());
|
|
|
|
if (col != NULL)
|
|
{
|
|
asyncLoaded[i] = asyncLoaded[i] && (relLBID[i] % blocksReadAhead != 0);
|
|
relLBID[i] += col->getWidth();
|
|
|
|
if (!asyncLoaded[i] && col->willPrefetch())
|
|
{
|
|
loadBlockAsync(col->getLBID(), versionInfo, txnID, col->getCompType(), &cachedIO, &physIO, LBIDTrace,
|
|
sessionID, &counterLock, &busyLoaderCount, sendThread, &vssCache);
|
|
asyncLoaded[i] = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool BatchPrimitiveProcessor::generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth)
|
|
{
|
|
Row& smallRow = smallRows[depth];
|
|
const bool lowestLvl = (depth == joinerCount - 1);
|
|
|
|
while (gjrgRowNumber < ridCount &&
|
|
gjrgPlaceHolders[depth] < tSmallSideMatches[depth][gjrgRowNumber].size() && !gjrgFull)
|
|
{
|
|
const vector<uint32_t>& results = tSmallSideMatches[depth][gjrgRowNumber];
|
|
const uint32_t size = results.size();
|
|
|
|
if (depth == 0)
|
|
{
|
|
outputRG.getRow(gjrgRowNumber, &largeRow);
|
|
applyMapping(gjrgMappings[joinerCount], largeRow, &baseRow);
|
|
baseRow.setRid(largeRow.getRelRid());
|
|
}
|
|
|
|
// cout << "rowNum = " << gjrgRowNumber << " at depth " << depth << " size is " << size << endl;
|
|
for (uint32_t& i = gjrgPlaceHolders[depth]; i < size && !gjrgFull; i++)
|
|
{
|
|
if (results[i] != (uint32_t)-1)
|
|
{
|
|
smallSideRGs[depth].getRow(results[i], &smallRow);
|
|
// rowOffset = ((uint64_t) results[i]) * smallRowSize;
|
|
// smallRow.setData(&rowDataAtThisLvl.rowData[rowOffset] + emptySize);
|
|
}
|
|
else
|
|
smallRow.setPointer(smallNullPointers[depth]);
|
|
|
|
// cout << "small row: " << smallRow.toString() << endl;
|
|
applyMapping(gjrgMappings[depth], smallRow, &baseRow);
|
|
|
|
if (!lowestLvl)
|
|
generateJoinedRowGroup(baseRow, depth + 1);
|
|
else
|
|
{
|
|
copyRow(baseRow, &joinedRow);
|
|
// memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
|
|
// cerr << "joined row " << joinedRG.getRowCount() << ": " << joinedRow.toString() << endl;
|
|
joinedRow.nextRow();
|
|
joinedRG.incRowCount();
|
|
|
|
if (joinedRG.getRowCount() == 8192)
|
|
{
|
|
i++;
|
|
gjrgFull = true;
|
|
}
|
|
}
|
|
|
|
if (gjrgFull)
|
|
break;
|
|
}
|
|
|
|
if (depth == 0 && gjrgPlaceHolders[0] == tSmallSideMatches[0][gjrgRowNumber].size())
|
|
{
|
|
gjrgPlaceHolders[0] = 0;
|
|
gjrgRowNumber++;
|
|
}
|
|
}
|
|
|
|
// if (depth == 0)
|
|
// cout << "gjrg returning " << (uint32_t) gjrgFull << endl;
|
|
if (!gjrgFull)
|
|
gjrgPlaceHolders[depth] = 0;
|
|
|
|
return gjrgFull;
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::resetGJRG()
|
|
{
|
|
gjrgFull = false;
|
|
joinedRG.resetRowGroup(baseRid);
|
|
joinedRG.getRow(0, &joinedRow);
|
|
joinedRG.setDBRoot(dbRoot);
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::initGJRG()
|
|
{
|
|
for (uint32_t z = 0; z < joinerCount; z++)
|
|
gjrgPlaceHolders[z] = 0;
|
|
|
|
gjrgRowNumber = 0;
|
|
}
|
|
|
|
inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jIndex, vector<uint32_t>& v)
|
|
{
|
|
uint bucket;
|
|
|
|
if (!typelessJoin[jIndex])
|
|
{
|
|
if (r.isNullValue(largeSideKeyColumns[jIndex]))
|
|
{
|
|
/* Bug 3524. This matches everything. */
|
|
if (joinTypes[jIndex] & ANTI)
|
|
{
|
|
TJoiner::iterator it;
|
|
|
|
for (uint i = 0; i < processorThreads; ++i)
|
|
for (it = tJoiners[jIndex][i]->begin(); it != tJoiners[jIndex][i]->end(); ++it)
|
|
v.push_back(it->second);
|
|
|
|
return;
|
|
}
|
|
else
|
|
return;
|
|
}
|
|
|
|
uint64_t largeKey;
|
|
uint32_t colIndex = largeSideKeyColumns[jIndex];
|
|
|
|
if (r.isUnsigned(colIndex))
|
|
{
|
|
largeKey = r.getUintField(colIndex);
|
|
}
|
|
else
|
|
{
|
|
largeKey = r.getIntField(colIndex);
|
|
}
|
|
|
|
bucket = bucketPicker((char*)&largeKey, 8, bpSeed) & ptMask;
|
|
pair<TJoiner::iterator, TJoiner::iterator> range = tJoiners[jIndex][bucket]->equal_range(largeKey);
|
|
for (; range.first != range.second; ++range.first)
|
|
v.push_back(range.first->second);
|
|
|
|
if (doMatchNulls[jIndex]) // add the nulls to the match list
|
|
{
|
|
bucket = bucketPicker((char*)&joinNullValues[jIndex], 8, bpSeed) & ptMask;
|
|
range = tJoiners[jIndex][bucket]->equal_range(joinNullValues[jIndex]);
|
|
for (; range.first != range.second; ++range.first)
|
|
v.push_back(range.first->second);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/* Bug 3524. Large-side NULL + ANTI join matches everything. */
|
|
if (joinTypes[jIndex] & ANTI)
|
|
{
|
|
bool hasNullValue = false;
|
|
|
|
for (uint32_t i = 0; i < tlLargeSideKeyColumns[jIndex].size(); i++)
|
|
if (r.isNullValue(tlLargeSideKeyColumns[jIndex][i]))
|
|
{
|
|
hasNullValue = true;
|
|
break;
|
|
}
|
|
|
|
if (hasNullValue)
|
|
{
|
|
TLJoiner::iterator it;
|
|
for (uint i = 0; i < processorThreads; ++i)
|
|
for (it = tlJoiners[jIndex][i]->begin(); it != tlJoiners[jIndex][i]->end(); ++it)
|
|
v.push_back(it->second);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
TypelessData largeKey(&r);
|
|
bucket = r.hashTypeless(tlLargeSideKeyColumns[jIndex], mSmallSideKeyColumnsPtr,
|
|
mSmallSideRGPtr ? &mSmallSideRGPtr->getColWidths() : nullptr) &
|
|
ptMask;
|
|
pair<TLJoiner::iterator, TLJoiner::iterator> range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
|
|
for (; range.first != range.second; ++range.first)
|
|
v.push_back(range.first->second);
|
|
}
|
|
}
|
|
|
|
void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
|
|
{
|
|
vector<int64_t> lbidList;
|
|
vector<BRM::VSSData> vssData;
|
|
uint32_t i;
|
|
int rc;
|
|
|
|
for (i = 0; i < filterCount; i++)
|
|
filterSteps[i]->getLBIDList(loopCount, &lbidList);
|
|
|
|
for (i = 0; i < projectCount; i++)
|
|
projectSteps[i]->getLBIDList(loopCount, &lbidList);
|
|
|
|
rc = brm->bulkVSSLookup(lbidList, versionInfo, (int)txnID, &vssData);
|
|
|
|
if (rc == 0)
|
|
for (i = 0; i < vssData.size(); i++)
|
|
vssCache.insert(make_pair(lbidList[i], vssData[i]));
|
|
|
|
// cout << "buildVSSCache inserted " << vssCache.size() << " elements" << endl;
|
|
}
|
|
|
|
} // namespace primitiveprocessor
|