You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
437 lines
11 KiB
C++
437 lines
11 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: pcolscan.cpp 9655 2013-06-25 23:08:13Z xlou $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
|
|
#include <unistd.h>
|
|
#include <stdexcept>
|
|
#include <cstring>
|
|
#include <utility>
|
|
#include <sstream>
|
|
#include <cassert>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
using namespace boost;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
#include "messageobj.h"
|
|
using namespace logging;
|
|
|
|
#include "logicoperator.h"
|
|
using namespace execplan;
|
|
|
|
#include "distributedenginecomm.h"
|
|
#include "primitivemsg.h"
|
|
#include "timestamp.h"
|
|
#include "unique32generator.h"
|
|
#include "jlf_common.h"
|
|
#include "primitivestep.h"
|
|
|
|
//#define DEBUG 1
|
|
//#define DEBUG2 1
|
|
|
|
namespace joblist
|
|
{
|
|
pColScanStep::pColScanStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t,
|
|
const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo)
|
|
: JobStep(jobInfo)
|
|
, fRm(jobInfo.rm)
|
|
, fMsgHeader()
|
|
, fFilterCount(0)
|
|
, fOid(o)
|
|
, fTableOid(t)
|
|
, fColType(ct)
|
|
, fBOP(BOP_OR)
|
|
, fNumBlksSkipped(0)
|
|
, fMsgBytesIn(0)
|
|
, fMsgBytesOut(0)
|
|
, fMsgsToPm(0)
|
|
{
|
|
if (fTableOid == 0) // cross engine support
|
|
return;
|
|
|
|
int err, i, mask;
|
|
|
|
finishedSending = false;
|
|
recvWaiting = 0;
|
|
recvExited = 0;
|
|
rDoNothing = false;
|
|
fIsDict = false;
|
|
|
|
// If this is a dictionary column, fudge the numbers...
|
|
if (fColType.colDataType == CalpontSystemCatalog::VARCHAR)
|
|
{
|
|
if (8 > fColType.colWidth && 4 <= fColType.colWidth)
|
|
fColType.colDataType = CalpontSystemCatalog::CHAR;
|
|
|
|
fColType.colWidth++;
|
|
}
|
|
|
|
// If this is a dictionary column, fudge the numbers...
|
|
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY) ||
|
|
(fColType.colDataType == CalpontSystemCatalog::BLOB) ||
|
|
(fColType.colDataType == CalpontSystemCatalog::TEXT))
|
|
{
|
|
fColType.colWidth = 8;
|
|
fIsDict = true;
|
|
}
|
|
// MCOL-641 WIP
|
|
else if (fColType.colWidth > 8 && fColType.colDataType != CalpontSystemCatalog::DECIMAL &&
|
|
fColType.colDataType != CalpontSystemCatalog::UDECIMAL)
|
|
{
|
|
fColType.colWidth = 8;
|
|
fIsDict = true;
|
|
// TODO: is this right?
|
|
fColType.colDataType = CalpontSystemCatalog::VARCHAR;
|
|
}
|
|
|
|
// Round colWidth up
|
|
if (fColType.colWidth == 3)
|
|
fColType.colWidth = 4;
|
|
else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
|
|
fColType.colWidth = 8;
|
|
|
|
err = dbrm.lookup(fOid, lbidRanges);
|
|
|
|
if (err)
|
|
throw runtime_error("pColScan: BRM LBID range lookup failure (1)");
|
|
|
|
err = dbrm.getExtents(fOid, extents);
|
|
|
|
if (err)
|
|
throw runtime_error("pColScan: BRM HWM lookup failure (4)");
|
|
|
|
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
|
|
numExtents = extents.size();
|
|
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
|
|
|
|
if (fOid > 3000)
|
|
{
|
|
lbidList.reset(new LBIDList(fOid, 0));
|
|
}
|
|
|
|
/* calculate shortcuts for rid-based arithmetic */
|
|
for (i = 1, mask = 1; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
|
|
if (extentSize & mask)
|
|
{
|
|
divShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (extentSize & mask)
|
|
throw runtime_error("pColScan: Extent size must be a power of 2 in blocks");
|
|
|
|
ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
|
|
|
|
for (i = 1, mask = 1; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
|
|
if (ridsPerBlock & mask)
|
|
{
|
|
rpbShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (ridsPerBlock & mask)
|
|
throw runtime_error("pColScan: Block size and column width must be a power of 2");
|
|
}
|
|
|
|
struct CPInfo
|
|
{
|
|
CPInfo(int64_t MIN, int64_t MAX, uint64_t l) : min(MIN), max(MAX), LBID(l){};
|
|
int64_t min;
|
|
int64_t max;
|
|
uint64_t LBID;
|
|
};
|
|
|
|
void pColScanStep::addFilter(int8_t COP, float value)
|
|
{
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << (uint8_t)0;
|
|
fFilterString << *((uint32_t*)&value);
|
|
fFilterCount++;
|
|
}
|
|
|
|
void pColScanStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
|
|
{
|
|
int8_t tmp8;
|
|
int16_t tmp16;
|
|
int32_t tmp32;
|
|
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << roundFlag;
|
|
|
|
// converts to a type of the appropriate width, then bitwise
|
|
// copies into the filter ByteStream
|
|
switch (fColType.colWidth)
|
|
{
|
|
case 1:
|
|
tmp8 = value;
|
|
fFilterString << *((uint8_t*)&tmp8);
|
|
break;
|
|
|
|
case 2:
|
|
tmp16 = value;
|
|
fFilterString << *((uint16_t*)&tmp16);
|
|
break;
|
|
|
|
case 4:
|
|
tmp32 = value;
|
|
fFilterString << *((uint32_t*)&tmp32);
|
|
break;
|
|
|
|
case 8: fFilterString << *((uint64_t*)&value); break;
|
|
|
|
default:
|
|
ostringstream o;
|
|
|
|
o << "pColScanStep: CalpontSystemCatalog says OID " << fOid << " has a width of " << fColType.colWidth;
|
|
throw runtime_error(o.str());
|
|
}
|
|
|
|
fFilterCount++;
|
|
}
|
|
|
|
const string pColScanStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "pColScanStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId
|
|
<< " tb/col:" << fTableOid << "/" << fOid;
|
|
|
|
if (alias().length())
|
|
oss << " alias:" << alias();
|
|
|
|
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
|
|
oss << " nf:" << fFilterCount;
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
{
|
|
oss << fInputJobStepAssociation.outAt(i) << ", ";
|
|
}
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
uint64_t pColScanStep::getFBO(uint64_t lbid)
|
|
{
|
|
uint32_t i;
|
|
uint64_t lastLBID;
|
|
|
|
for (i = 0; i < numExtents; i++)
|
|
{
|
|
lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1;
|
|
|
|
if (lbid >= (uint64_t)extents[i].range.start && lbid <= lastLBID)
|
|
return (lbid - extents[i].range.start) + (i << divShift);
|
|
}
|
|
|
|
cerr << "pColScan: didn't find the FBO?\n";
|
|
throw logic_error("pColScan: didn't find the FBO?");
|
|
}
|
|
|
|
pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resourceManager()), fMsgHeader()
|
|
{
|
|
fFilterCount = rhs.filterCount();
|
|
fFilterString = rhs.filterString();
|
|
isFilterFeeder = rhs.getFeederFlag();
|
|
fOid = rhs.oid();
|
|
fTableOid = rhs.tableOid();
|
|
fColType = rhs.colType();
|
|
fBOP = rhs.BOP();
|
|
fIsDict = rhs.isDictCol();
|
|
fNumBlksSkipped = 0;
|
|
fMsgBytesIn = 0;
|
|
fMsgBytesOut = 0;
|
|
fMsgsToPm = 0;
|
|
fCardinality = rhs.cardinality();
|
|
fFilters = rhs.fFilters;
|
|
fOnClauseFilter = rhs.onClauseFilter();
|
|
|
|
if (fTableOid == 0) // cross engine support
|
|
return;
|
|
|
|
int err;
|
|
|
|
err = dbrm.lookup(fOid, lbidRanges);
|
|
|
|
if (err)
|
|
throw runtime_error("pColScan: BRM LBID range lookup failure (1)");
|
|
|
|
err = dbrm.getExtents(fOid, extents);
|
|
|
|
if (err)
|
|
throw runtime_error("pColScan: BRM HWM lookup failure (4)");
|
|
|
|
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
|
|
numExtents = extents.size();
|
|
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
|
|
lbidList = rhs.lbidList;
|
|
finishedSending = sendWaiting = rDoNothing = false;
|
|
recvWaiting = 0;
|
|
recvExited = 0;
|
|
|
|
/* calculate some shortcuts for extent-based arithmetic */
|
|
ridsPerBlock = rhs.ridsPerBlock;
|
|
rpbShift = rhs.rpbShift;
|
|
divShift = rhs.divShift;
|
|
|
|
fTraceFlags = rhs.fTraceFlags;
|
|
}
|
|
|
|
bool pColScanStep::isEmptyVal(const uint8_t* val8) const
|
|
{
|
|
const int width = fColType.colWidth;
|
|
|
|
switch (fColType.colDataType)
|
|
{
|
|
case CalpontSystemCatalog::UTINYINT:
|
|
{
|
|
return (*val8 == joblist::UTINYINTEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::USMALLINT:
|
|
{
|
|
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
|
|
return (*val16 == joblist::USMALLINTEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::UMEDINT:
|
|
case CalpontSystemCatalog::UINT:
|
|
{
|
|
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
|
|
return (*val32 == joblist::UINTEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
|
|
return (*val64 == joblist::BIGINTEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::FLOAT:
|
|
case CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
|
|
return (*val32 == joblist::FLOATEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::DOUBLE:
|
|
case CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
|
|
return (*val64 == joblist::DOUBLEEMPTYROW);
|
|
}
|
|
|
|
case CalpontSystemCatalog::CHAR:
|
|
case CalpontSystemCatalog::VARCHAR:
|
|
case CalpontSystemCatalog::DATE:
|
|
case CalpontSystemCatalog::DATETIME:
|
|
case CalpontSystemCatalog::TIME:
|
|
case CalpontSystemCatalog::TIMESTAMP:
|
|
if (width == 1)
|
|
{
|
|
return (*val8 == joblist::CHAR1EMPTYROW);
|
|
}
|
|
else if (width == 2)
|
|
{
|
|
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
|
|
return (*val16 == joblist::CHAR2EMPTYROW);
|
|
}
|
|
else if (width <= 4)
|
|
{
|
|
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
|
|
return (*val32 == joblist::CHAR4EMPTYROW);
|
|
}
|
|
else if (width <= 8)
|
|
{
|
|
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
|
|
return (*val64 == joblist::CHAR8EMPTYROW);
|
|
}
|
|
|
|
default: break;
|
|
}
|
|
|
|
switch (width)
|
|
{
|
|
case 1:
|
|
{
|
|
return (*val8 == joblist::TINYINTEMPTYROW);
|
|
}
|
|
|
|
case 2:
|
|
{
|
|
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
|
|
return (*val16 == joblist::SMALLINTEMPTYROW);
|
|
}
|
|
|
|
case 4:
|
|
{
|
|
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
|
|
return (*val32 == joblist::INTEMPTYROW);
|
|
}
|
|
|
|
case 8:
|
|
{
|
|
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
|
|
return (*val64 == joblist::BIGINTEMPTYROW);
|
|
}
|
|
|
|
default:
|
|
MessageLog logger(LoggingID(28));
|
|
logging::Message::Args colWidth;
|
|
Message msg(33);
|
|
|
|
colWidth.add(width);
|
|
msg.format(colWidth);
|
|
logger.logErrorMessage(msg);
|
|
return false;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void pColScanStep::addFilter(const Filter* f)
|
|
{
|
|
if (NULL != f)
|
|
fFilters.push_back(f);
|
|
}
|
|
|
|
void pColScanStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
|
|
{
|
|
fFilters.insert(fFilters.end(), fs.begin(), fs.end());
|
|
}
|
|
|
|
} // namespace joblist
|