You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
calpontsys.syscolumn syscat table to be latin1. This change is done in one of the ctors of pColStep which is initiated while building the job list from the execution plan.
556 lines
13 KiB
C++
556 lines
13 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: pcolstep.cpp 9655 2013-06-25 23:08:13Z xlou $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <algorithm>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
using namespace std;
|
|
|
|
#include "distributedenginecomm.h"
|
|
#include "elementtype.h"
|
|
#include "unique32generator.h"
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
using namespace logging;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
#include "logicoperator.h"
|
|
using namespace execplan;
|
|
|
|
#include "brm.h"
|
|
using namespace BRM;
|
|
|
|
#include "idbcompress.h"
|
|
#include "jlf_common.h"
|
|
#include "primitivestep.h"
|
|
|
|
// #define DEBUG 1
|
|
|
|
namespace joblist
|
|
{
|
|
pColStep::pColStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t,
|
|
const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo)
|
|
: JobStep(jobInfo)
|
|
, fRm(jobInfo.rm)
|
|
, sysCat(jobInfo.csc)
|
|
, fOid(o)
|
|
, fTableOid(t)
|
|
, fColType(ct)
|
|
, fFilterCount(0)
|
|
, fBOP(BOP_NONE)
|
|
, ridList(0)
|
|
, msgsSent(0)
|
|
, msgsRecvd(0)
|
|
, finishedSending(false)
|
|
, recvWaiting(false)
|
|
, fIsDict(false)
|
|
, isEM(jobInfo.isExeMgr)
|
|
, ridCount(0)
|
|
, fSwallowRows(false)
|
|
, isFilterFeeder(false)
|
|
, fNumBlksSkipped(0)
|
|
, fMsgBytesIn(0)
|
|
, fMsgBytesOut(0)
|
|
{
|
|
if (fTableOid == 0) // cross engine support
|
|
return;
|
|
|
|
int err, i;
|
|
uint32_t mask;
|
|
|
|
// MCOL-5572 Force the charset on the autoincrement column
|
|
// of calpontsys.syscolumn syscat table to be latin1.
|
|
if (fOid == execplan::OID_SYSCOLUMN_AUTOINC)
|
|
{
|
|
const CHARSET_INFO* cs = &my_charset_latin1;
|
|
fColType.charsetNumber = cs->number;
|
|
}
|
|
|
|
if (fOid < 1000)
|
|
throw runtime_error("pColStep: invalid column");
|
|
|
|
if (!compress::CompressInterface::isCompressionAvail(fColType.compressionType))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unsupported compression type " << fColType.compressionType;
|
|
oss << " for " << sysCat->colName(fOid);
|
|
#ifdef SKIP_IDB_COMPRESSION
|
|
oss << ". It looks you're running Community binaries on an Enterprise database.";
|
|
#endif
|
|
throw runtime_error(oss.str());
|
|
}
|
|
|
|
realWidth = fColType.colWidth;
|
|
|
|
if (fColType.colDataType == CalpontSystemCatalog::VARCHAR)
|
|
{
|
|
if (8 > fColType.colWidth && 4 <= fColType.colWidth)
|
|
fColType.colDataType = CalpontSystemCatalog::CHAR;
|
|
|
|
fColType.colWidth++;
|
|
}
|
|
|
|
// If this is a dictionary column, fudge the numbers...
|
|
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY) ||
|
|
(fColType.colDataType == CalpontSystemCatalog::BLOB) ||
|
|
(fColType.colDataType == CalpontSystemCatalog::TEXT))
|
|
{
|
|
fColType.colWidth = 8;
|
|
fIsDict = true;
|
|
}
|
|
// WIP MCOL-641
|
|
else if (fColType.colWidth > 8 && fColType.colDataType != CalpontSystemCatalog::DECIMAL &&
|
|
fColType.colDataType != CalpontSystemCatalog::UDECIMAL)
|
|
{
|
|
fColType.colWidth = 8;
|
|
fIsDict = true;
|
|
// TODO: is this right?
|
|
fColType.colDataType = CalpontSystemCatalog::VARCHAR;
|
|
}
|
|
|
|
// Round colWidth up
|
|
if (fColType.colWidth == 3)
|
|
fColType.colWidth = 4;
|
|
else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
|
|
fColType.colWidth = 8;
|
|
|
|
idbassert(fColType.colWidth > 0);
|
|
ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
|
|
|
|
/* calculate some shortcuts for extent and block based arithmetic */
|
|
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
|
|
|
|
for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
modMask = (modMask << 1) | 1;
|
|
|
|
if (extentSize & mask)
|
|
{
|
|
divShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (extentSize & mask)
|
|
throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
|
|
|
|
/* calculate shortcuts for rid-based arithmetic */
|
|
for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
rpbMask = (rpbMask << 1) | 1;
|
|
|
|
if (ridsPerBlock & mask)
|
|
{
|
|
rpbShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (ridsPerBlock & mask)
|
|
throw runtime_error("pColStep: Block size and column width must be a power of 2");
|
|
|
|
for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
|
|
{
|
|
if (mask == BLOCK_SIZE)
|
|
{
|
|
blockSizeShift = i;
|
|
break;
|
|
}
|
|
|
|
mask <<= 1;
|
|
}
|
|
|
|
if (i == 32)
|
|
throw runtime_error("pColStep: Block size must be a power of 2");
|
|
|
|
err = dbrm.getExtents(o, extents);
|
|
|
|
if (err)
|
|
{
|
|
ostringstream os;
|
|
os << "pColStep: BRM lookup error. Could not get extents for OID " << o;
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
if (fOid > 3000)
|
|
{
|
|
lbidList.reset(new LBIDList(fOid, 0));
|
|
}
|
|
|
|
sort(extents.begin(), extents.end(), ExtentSorter());
|
|
numExtents = extents.size();
|
|
// uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
// if (fDec)
|
|
// fDec->addQueue(uniqueID);
|
|
// initializeConfigParms ( );
|
|
}
|
|
|
|
pColStep::pColStep(const pColScanStep& rhs)
|
|
: JobStep(rhs)
|
|
, fRm(rhs.resourceManager())
|
|
, fOid(rhs.oid())
|
|
, fTableOid(rhs.tableOid())
|
|
, fColType(rhs.colType())
|
|
, fFilterCount(rhs.filterCount())
|
|
, fBOP(rhs.BOP())
|
|
, ridList(0)
|
|
, fFilterString(rhs.filterString())
|
|
, msgsSent(0)
|
|
, msgsRecvd(0)
|
|
, finishedSending(false)
|
|
, recvWaiting(false)
|
|
, fIsDict(rhs.isDictCol())
|
|
, ridCount(0)
|
|
, fSwallowRows(false)
|
|
, fNumBlksSkipped(0)
|
|
, fMsgBytesIn(0)
|
|
, fMsgBytesOut(0)
|
|
, fFilters(rhs.getFilters())
|
|
{
|
|
int err, i;
|
|
uint32_t mask;
|
|
|
|
if (fTableOid == 0) // cross engine support
|
|
return;
|
|
|
|
if (fOid < 1000)
|
|
throw runtime_error("pColStep: invalid column");
|
|
|
|
ridsPerBlock = rhs.getRidsPerBlock();
|
|
/* calculate some shortcuts for extent and block based arithmetic */
|
|
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
|
|
|
|
for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
modMask = (modMask << 1) | 1;
|
|
|
|
if (extentSize & mask)
|
|
{
|
|
divShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (extentSize & mask)
|
|
throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
|
|
|
|
/* calculate shortcuts for rid-based arithmetic */
|
|
for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
rpbMask = (rpbMask << 1) | 1;
|
|
|
|
if (ridsPerBlock & mask)
|
|
{
|
|
rpbShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (ridsPerBlock & mask)
|
|
throw runtime_error("pColStep: Block size and column width must be a power of 2");
|
|
|
|
for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
|
|
{
|
|
if (mask == BLOCK_SIZE)
|
|
{
|
|
blockSizeShift = i;
|
|
break;
|
|
}
|
|
|
|
mask <<= 1;
|
|
}
|
|
|
|
if (i == 32)
|
|
throw runtime_error("pColStep: Block size must be a power of 2");
|
|
|
|
err = dbrm.getExtents(fOid, extents);
|
|
|
|
if (err)
|
|
{
|
|
ostringstream os;
|
|
os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
lbidList = rhs.getlbidList();
|
|
|
|
sort(extents.begin(), extents.end(), ExtentSorter());
|
|
numExtents = extents.size();
|
|
|
|
fOnClauseFilter = rhs.onClauseFilter();
|
|
|
|
// uniqueID = UniqueNumberGenerator::instance()->getUnique32();
|
|
// if (fDec)
|
|
// fDec->addQueue(uniqueID);
|
|
// initializeConfigParms ( );
|
|
}
|
|
|
|
pColStep::pColStep(const PassThruStep& rhs)
|
|
: JobStep(rhs)
|
|
, fRm(rhs.resourceManager())
|
|
, fOid(rhs.oid())
|
|
, fTableOid(rhs.tableOid())
|
|
, fColType(rhs.colType())
|
|
, fFilterCount(0)
|
|
, fBOP(BOP_NONE)
|
|
, ridList(0)
|
|
, msgsSent(0)
|
|
, msgsRecvd(0)
|
|
, finishedSending(false)
|
|
, recvWaiting(false)
|
|
, fIsDict(rhs.isDictCol())
|
|
, ridCount(0)
|
|
, fSwallowRows(false)
|
|
, fNumBlksSkipped(0)
|
|
, fMsgBytesIn(0)
|
|
, fMsgBytesOut(0)
|
|
{
|
|
int err, i;
|
|
uint32_t mask;
|
|
|
|
if (fTableOid == 0) // cross engine support
|
|
return;
|
|
|
|
if (fOid < 1000)
|
|
throw runtime_error("pColStep: invalid column");
|
|
|
|
ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
|
|
/* calculate some shortcuts for extent and block based arithmetic */
|
|
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
|
|
|
|
for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
modMask = (modMask << 1) | 1;
|
|
|
|
if (extentSize & mask)
|
|
{
|
|
divShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (extentSize & mask)
|
|
throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
|
|
|
|
/* calculate shortcuts for rid-based arithmetic */
|
|
for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
|
|
{
|
|
mask <<= 1;
|
|
rpbMask = (rpbMask << 1) | 1;
|
|
|
|
if (ridsPerBlock & mask)
|
|
{
|
|
rpbShift = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
|
|
if (ridsPerBlock & mask)
|
|
throw runtime_error("pColStep: Block size and column width must be a power of 2");
|
|
|
|
for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
|
|
{
|
|
if (mask == BLOCK_SIZE)
|
|
{
|
|
blockSizeShift = i;
|
|
break;
|
|
}
|
|
|
|
mask <<= 1;
|
|
}
|
|
|
|
if (i == 32)
|
|
throw runtime_error("pColStep: Block size must be a power of 2");
|
|
|
|
err = dbrm.getExtents(fOid, extents);
|
|
|
|
if (err)
|
|
{
|
|
ostringstream os;
|
|
os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
sort(extents.begin(), extents.end(), ExtentSorter());
|
|
numExtents = extents.size();
|
|
}
|
|
|
|
void pColStep::addFilter(int8_t COP, float value)
|
|
{
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << (uint8_t)0;
|
|
fFilterString << *((uint32_t*)&value);
|
|
fFilterCount++;
|
|
}
|
|
|
|
void pColStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
|
|
{
|
|
int8_t tmp8;
|
|
int16_t tmp16;
|
|
int32_t tmp32;
|
|
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << roundFlag;
|
|
|
|
// converts to a type of the appropriate width, then bitwise
|
|
// copies into the filter ByteStream
|
|
switch (fColType.colWidth)
|
|
{
|
|
case 1:
|
|
tmp8 = value;
|
|
fFilterString << *((uint8_t*)&tmp8);
|
|
break;
|
|
|
|
case 2:
|
|
tmp16 = value;
|
|
fFilterString << *((uint16_t*)&tmp16);
|
|
break;
|
|
|
|
case 4:
|
|
tmp32 = value;
|
|
fFilterString << *((uint32_t*)&tmp32);
|
|
break;
|
|
|
|
case 8: fFilterString << *((uint64_t*)&value); break;
|
|
|
|
default:
|
|
ostringstream o;
|
|
|
|
o << "pColStep: CalpontSystemCatalog says OID " << fOid << " has a width of " << fColType.colWidth;
|
|
throw runtime_error(o.str());
|
|
}
|
|
|
|
fFilterCount++;
|
|
}
|
|
|
|
void pColStep::addFilter(int8_t COP, const int128_t& value, uint8_t roundFlag)
|
|
{
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << roundFlag;
|
|
|
|
// bitwise copies into the filter ByteStream
|
|
fFilterString << *reinterpret_cast<const uint128_t*>(&value);
|
|
|
|
fFilterCount++;
|
|
}
|
|
|
|
const string pColStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "pColStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId
|
|
<< " tb/col:" << fTableOid << "/" << fOid;
|
|
|
|
if (alias().length())
|
|
oss << " alias:" << alias();
|
|
|
|
if (view().length())
|
|
oss << " view:" << view();
|
|
|
|
if (fOutputJobStepAssociation.outSize() > 0)
|
|
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
|
|
else
|
|
oss << " (no output yet)";
|
|
|
|
oss << " nf:" << fFilterCount;
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
{
|
|
oss << fInputJobStepAssociation.outAt(i) << ", ";
|
|
}
|
|
|
|
if (fSwallowRows)
|
|
oss << " (sink)";
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
/* This exists to avoid a DBRM lookup for every rid. */
|
|
inline uint64_t pColStep::getLBID(uint64_t rid, bool& scan)
|
|
{
|
|
uint32_t extentIndex, extentOffset;
|
|
uint64_t fbo;
|
|
fbo = rid >> rpbShift;
|
|
extentIndex = fbo >> divShift;
|
|
extentOffset = fbo & modMask;
|
|
scan = (scanFlags[extentIndex] != 0);
|
|
return extents[extentIndex].range.start + extentOffset;
|
|
}
|
|
|
|
inline uint64_t pColStep::getFBO(uint64_t lbid)
|
|
{
|
|
uint32_t i;
|
|
uint64_t lastLBID;
|
|
|
|
for (i = 0; i < numExtents; i++)
|
|
{
|
|
lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1;
|
|
|
|
if (lbid >= (uint64_t)extents[i].range.start && lbid <= lastLBID)
|
|
return (lbid - extents[i].range.start) + (i << divShift);
|
|
}
|
|
|
|
cerr << "pColStep: didn't find the FBO?\n";
|
|
throw logic_error("pColStep: didn't find the FBO?");
|
|
}
|
|
|
|
void pColStep::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
|
|
{
|
|
fFilterString += filter;
|
|
fFilterCount += count;
|
|
}
|
|
|
|
void pColStep::addFilter(const Filter* f)
|
|
{
|
|
if (NULL != f)
|
|
fFilters.push_back(f);
|
|
}
|
|
|
|
void pColStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
|
|
{
|
|
fFilters.insert(fFilters.end(), fs.begin(), fs.end());
|
|
}
|
|
|
|
} // namespace joblist
|