1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/pdictionary.cpp
Roman Nozdrin 4fe9cd64a3 Revert "No boost condition (#2822)" (#2828)
This reverts commit f916e64927.
2023-04-22 15:49:50 +03:00

194 lines
4.4 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2016-2020 MariaDB
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: pdictionary.cpp 9655 2013-06-25 23:08:13Z xlou $
*
*
***********************************************************************/
#include <iostream>
#include <stdexcept>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
using namespace std;
#include "messagequeue.h"
using namespace messageqcpp;
#include "configcpp.h"
using namespace config;
#include "messagelog.h"
#include "messageobj.h"
#include "loggingid.h"
using namespace logging;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "brm.h"
using namespace BRM;
#include "distributedenginecomm.h"
#include "elementtype.h"
#include "unique32generator.h"
#include "jlf_common.h"
#include "primitivestep.h"
namespace joblist
{
// struct pDictionaryStepPrimitive
//{
// pDictionaryStepPrimitive(pDictionaryStep* pDictStep) : fPDictionaryStep(pDictStep)
// {}
//
// pDictionaryStep *fPDictionaryStep;
//
// void operator()()
// {
// try
// {
// fPDictionaryStep->sendPrimitiveMessages();
// } catch(runtime_error&)
// {
// }
// }
//
//};
//
// struct pDictStepAggregator
//{
// pDictStepAggregator(pDictionaryStep* pDictStep) : fPDictStep(pDictStep)
// {}
// pDictionaryStep *fPDictStep;
// void operator()()
// {
// try
// {
// fPDictStep->receivePrimitiveMessages();
// }
// catch(runtime_error&)
// {
// }
// }
//};
pDictionaryStep::pDictionaryStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t,
const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo)
: JobStep(jobInfo)
, fOid(o)
, fTableOid(t)
, fBOP(BOP_NONE)
, msgsSent(0)
, msgsRecvd(0)
, finishedSending(false)
, recvWaiting(false)
, ridCount(0)
, fColType(ct)
, pThread(0)
, cThread(0)
, fFilterCount(0)
, requestList(0)
, fInterval(jobInfo.flushInterval)
, fMsgBytesIn(0)
, fMsgBytesOut(0)
, fRm(jobInfo.rm)
, hasEqualityFilter(false)
{
}
void pDictionaryStep::addFilter(int8_t COP, const string& value)
{
fFilterString << (uint8_t)COP;
fFilterString << (uint16_t)value.size();
fFilterString.append((const uint8_t*)value.c_str(), value.size());
fFilterCount++;
if (fFilterCount == 1 && (COP == COMPARE_EQ || COP == COMPARE_NE))
{
hasEqualityFilter = true;
tmpCOP = COP;
}
if (hasEqualityFilter)
{
if (COP != tmpCOP)
{
hasEqualityFilter = false;
eqFilter.clear();
}
else
eqFilter.push_back(value);
}
}
const string pDictionaryStep::toString() const
{
ostringstream oss;
oss << "pDictionaryStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId
<< " tb/col:" << fTableOid << "/" << fOid;
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
{
oss << fInputJobStepAssociation.outAt(i) << ", ";
}
#ifdef FIFO_SINK
if (fOid < 3000)
oss << " (sink)";
#endif
return oss.str();
}
void pDictionaryStep::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
{
ByteStream bs(filter); // need to preserve the input BS
uint8_t* buf;
uint8_t COP;
uint16_t size;
string value;
while (bs.length() > 0)
{
bs >> COP;
bs >> size;
buf = bs.buf();
value = string((char*)buf, size);
addFilter(COP, value);
bs.advance(size);
}
}
void pDictionaryStep::addFilter(const Filter* f)
{
if (NULL != f)
fFilters.push_back(f);
}
void pDictionaryStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
{
fFilters.insert(fFilters.end(), fs.begin(), fs.end());
}
} // namespace joblist