1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/pcolscan.cpp
Leonid Fedorov 8f93fc3623 MCOL-5493: First portion of UBSan fixes (#2842)
Multiple UB fixes
2023-06-02 17:02:09 +03:00

437 lines
11 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: pcolscan.cpp 9655 2013-06-25 23:08:13Z xlou $
*
*
***********************************************************************/
#include <unistd.h>
#include <stdexcept>
#include <cstring>
#include <utility>
#include <sstream>
#include <cassert>
using namespace std;
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
using namespace boost;
#include "messagequeue.h"
using namespace messageqcpp;
#include "configcpp.h"
using namespace config;
#include "messageobj.h"
using namespace logging;
#include "logicoperator.h"
using namespace execplan;
#include "distributedenginecomm.h"
#include "primitivemsg.h"
#include "timestamp.h"
#include "unique32generator.h"
#include "jlf_common.h"
#include "primitivestep.h"
//#define DEBUG 1
//#define DEBUG2 1
namespace joblist
{
pColScanStep::pColScanStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t,
const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo)
: JobStep(jobInfo)
, fRm(jobInfo.rm)
, fMsgHeader()
, fFilterCount(0)
, fOid(o)
, fTableOid(t)
, fColType(ct)
, fBOP(BOP_OR)
, fNumBlksSkipped(0)
, fMsgBytesIn(0)
, fMsgBytesOut(0)
, fMsgsToPm(0)
{
if (fTableOid == 0) // cross engine support
return;
int err, i, mask;
finishedSending = false;
recvWaiting = 0;
recvExited = 0;
rDoNothing = false;
fIsDict = false;
// If this is a dictionary column, fudge the numbers...
if (fColType.colDataType == CalpontSystemCatalog::VARCHAR)
{
if (8 > fColType.colWidth && 4 <= fColType.colWidth)
fColType.colDataType = CalpontSystemCatalog::CHAR;
fColType.colWidth++;
}
// If this is a dictionary column, fudge the numbers...
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY) ||
(fColType.colDataType == CalpontSystemCatalog::BLOB) ||
(fColType.colDataType == CalpontSystemCatalog::TEXT))
{
fColType.colWidth = 8;
fIsDict = true;
}
// MCOL-641 WIP
else if (fColType.colWidth > 8 && fColType.colDataType != CalpontSystemCatalog::DECIMAL &&
fColType.colDataType != CalpontSystemCatalog::UDECIMAL)
{
fColType.colWidth = 8;
fIsDict = true;
// TODO: is this right?
fColType.colDataType = CalpontSystemCatalog::VARCHAR;
}
// Round colWidth up
if (fColType.colWidth == 3)
fColType.colWidth = 4;
else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
fColType.colWidth = 8;
err = dbrm.lookup(fOid, lbidRanges);
if (err)
throw runtime_error("pColScan: BRM LBID range lookup failure (1)");
err = dbrm.getExtents(fOid, extents);
if (err)
throw runtime_error("pColScan: BRM HWM lookup failure (4)");
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
numExtents = extents.size();
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
if (fOid > 3000)
{
lbidList.reset(new LBIDList(fOid, 0));
}
/* calculate shortcuts for rid-based arithmetic */
for (i = 1, mask = 1; i <= 32; i++)
{
mask <<= 1;
if (extentSize & mask)
{
divShift = i;
break;
}
}
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
if (extentSize & mask)
throw runtime_error("pColScan: Extent size must be a power of 2 in blocks");
ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
for (i = 1, mask = 1; i <= 32; i++)
{
mask <<= 1;
if (ridsPerBlock & mask)
{
rpbShift = i;
break;
}
}
for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
if (ridsPerBlock & mask)
throw runtime_error("pColScan: Block size and column width must be a power of 2");
}
struct CPInfo
{
CPInfo(int64_t MIN, int64_t MAX, uint64_t l) : min(MIN), max(MAX), LBID(l){};
int64_t min;
int64_t max;
uint64_t LBID;
};
void pColScanStep::addFilter(int8_t COP, float value)
{
fFilterString << (uint8_t)COP;
fFilterString << (uint8_t)0;
fFilterString << *((uint32_t*)&value);
fFilterCount++;
}
void pColScanStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
{
int8_t tmp8;
int16_t tmp16;
int32_t tmp32;
fFilterString << (uint8_t)COP;
fFilterString << roundFlag;
// converts to a type of the appropriate width, then bitwise
// copies into the filter ByteStream
switch (fColType.colWidth)
{
case 1:
tmp8 = value;
fFilterString << *((uint8_t*)&tmp8);
break;
case 2:
tmp16 = value;
fFilterString << *((uint16_t*)&tmp16);
break;
case 4:
tmp32 = value;
fFilterString << *((uint32_t*)&tmp32);
break;
case 8: fFilterString << *((uint64_t*)&value); break;
default:
ostringstream o;
o << "pColScanStep: CalpontSystemCatalog says OID " << fOid << " has a width of " << fColType.colWidth;
throw runtime_error(o.str());
}
fFilterCount++;
}
const string pColScanStep::toString() const
{
ostringstream oss;
oss << "pColScanStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId
<< " tb/col:" << fTableOid << "/" << fOid;
if (alias().length())
oss << " alias:" << alias();
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
oss << " nf:" << fFilterCount;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
{
oss << fInputJobStepAssociation.outAt(i) << ", ";
}
return oss.str();
}
uint64_t pColScanStep::getFBO(uint64_t lbid)
{
uint32_t i;
uint64_t lastLBID;
for (i = 0; i < numExtents; i++)
{
lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1;
if (lbid >= (uint64_t)extents[i].range.start && lbid <= lastLBID)
return (lbid - extents[i].range.start) + (i << divShift);
}
cerr << "pColScan: didn't find the FBO?\n";
throw logic_error("pColScan: didn't find the FBO?");
}
pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resourceManager()), fMsgHeader()
{
fFilterCount = rhs.filterCount();
fFilterString = rhs.filterString();
isFilterFeeder = rhs.getFeederFlag();
fOid = rhs.oid();
fTableOid = rhs.tableOid();
fColType = rhs.colType();
fBOP = rhs.BOP();
fIsDict = rhs.isDictCol();
fNumBlksSkipped = 0;
fMsgBytesIn = 0;
fMsgBytesOut = 0;
fMsgsToPm = 0;
fCardinality = rhs.cardinality();
fFilters = rhs.fFilters;
fOnClauseFilter = rhs.onClauseFilter();
if (fTableOid == 0) // cross engine support
return;
int err;
err = dbrm.lookup(fOid, lbidRanges);
if (err)
throw runtime_error("pColScan: BRM LBID range lookup failure (1)");
err = dbrm.getExtents(fOid, extents);
if (err)
throw runtime_error("pColScan: BRM HWM lookup failure (4)");
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
numExtents = extents.size();
extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
lbidList = rhs.lbidList;
finishedSending = sendWaiting = rDoNothing = false;
recvWaiting = 0;
recvExited = 0;
/* calculate some shortcuts for extent-based arithmetic */
ridsPerBlock = rhs.ridsPerBlock;
rpbShift = rhs.rpbShift;
divShift = rhs.divShift;
fTraceFlags = rhs.fTraceFlags;
}
bool pColScanStep::isEmptyVal(const uint8_t* val8) const
{
const int width = fColType.colWidth;
switch (fColType.colDataType)
{
case CalpontSystemCatalog::UTINYINT:
{
return (*val8 == joblist::UTINYINTEMPTYROW);
}
case CalpontSystemCatalog::USMALLINT:
{
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
return (*val16 == joblist::USMALLINTEMPTYROW);
}
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
{
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
return (*val32 == joblist::UINTEMPTYROW);
}
case CalpontSystemCatalog::UBIGINT:
{
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
return (*val64 == joblist::BIGINTEMPTYROW);
}
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
return (*val32 == joblist::FLOATEMPTYROW);
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
return (*val64 == joblist::DOUBLEEMPTYROW);
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIME:
case CalpontSystemCatalog::TIMESTAMP:
if (width == 1)
{
return (*val8 == joblist::CHAR1EMPTYROW);
}
else if (width == 2)
{
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
return (*val16 == joblist::CHAR2EMPTYROW);
}
else if (width <= 4)
{
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
return (*val32 == joblist::CHAR4EMPTYROW);
}
else if (width <= 8)
{
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
return (*val64 == joblist::CHAR8EMPTYROW);
}
default: break;
}
switch (width)
{
case 1:
{
return (*val8 == joblist::TINYINTEMPTYROW);
}
case 2:
{
const uint16_t* val16 = reinterpret_cast<const uint16_t*>(val8);
return (*val16 == joblist::SMALLINTEMPTYROW);
}
case 4:
{
const uint32_t* val32 = reinterpret_cast<const uint32_t*>(val8);
return (*val32 == joblist::INTEMPTYROW);
}
case 8:
{
const uint64_t* val64 = reinterpret_cast<const uint64_t*>(val8);
return (*val64 == joblist::BIGINTEMPTYROW);
}
default:
MessageLog logger(LoggingID(28));
logging::Message::Args colWidth;
Message msg(33);
colWidth.add(width);
msg.format(colWidth);
logger.logErrorMessage(msg);
return false;
}
return false;
}
void pColScanStep::addFilter(const Filter* f)
{
if (NULL != f)
fFilters.push_back(f);
}
void pColScanStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
{
fFilters.insert(fFilters.end(), fs.begin(), fs.end());
}
} // namespace joblist