You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
194 lines
4.4 KiB
C++
194 lines
4.4 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (c) 2016-2020 MariaDB
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: pdictionary.cpp 9655 2013-06-25 23:08:13Z xlou $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
|
|
#include <iostream>
|
|
#include <stdexcept>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
using namespace std;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
using namespace logging;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "brm.h"
|
|
using namespace BRM;
|
|
|
|
#include "distributedenginecomm.h"
|
|
#include "elementtype.h"
|
|
#include "unique32generator.h"
|
|
#include "jlf_common.h"
|
|
#include "primitivestep.h"
|
|
|
|
namespace joblist
|
|
{
|
|
// struct pDictionaryStepPrimitive
|
|
//{
|
|
// pDictionaryStepPrimitive(pDictionaryStep* pDictStep) : fPDictionaryStep(pDictStep)
|
|
// {}
|
|
//
|
|
// pDictionaryStep *fPDictionaryStep;
|
|
//
|
|
// void operator()()
|
|
// {
|
|
// try
|
|
// {
|
|
// fPDictionaryStep->sendPrimitiveMessages();
|
|
// } catch(runtime_error&)
|
|
// {
|
|
// }
|
|
// }
|
|
//
|
|
//};
|
|
//
|
|
// struct pDictStepAggregator
|
|
//{
|
|
// pDictStepAggregator(pDictionaryStep* pDictStep) : fPDictStep(pDictStep)
|
|
// {}
|
|
// pDictionaryStep *fPDictStep;
|
|
// void operator()()
|
|
// {
|
|
// try
|
|
// {
|
|
// fPDictStep->receivePrimitiveMessages();
|
|
// }
|
|
// catch(runtime_error&)
|
|
// {
|
|
// }
|
|
// }
|
|
//};
|
|
|
|
pDictionaryStep::pDictionaryStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t,
|
|
const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo)
|
|
: JobStep(jobInfo)
|
|
, fOid(o)
|
|
, fTableOid(t)
|
|
, fBOP(BOP_NONE)
|
|
, msgsSent(0)
|
|
, msgsRecvd(0)
|
|
, finishedSending(false)
|
|
, recvWaiting(false)
|
|
, ridCount(0)
|
|
, fColType(ct)
|
|
, pThread(0)
|
|
, cThread(0)
|
|
, fFilterCount(0)
|
|
, requestList(0)
|
|
, fInterval(jobInfo.flushInterval)
|
|
, fMsgBytesIn(0)
|
|
, fMsgBytesOut(0)
|
|
, fRm(jobInfo.rm)
|
|
, hasEqualityFilter(false)
|
|
{
|
|
}
|
|
|
|
void pDictionaryStep::addFilter(int8_t COP, const string& value)
|
|
{
|
|
fFilterString << (uint8_t)COP;
|
|
fFilterString << (uint16_t)value.size();
|
|
fFilterString.append((const uint8_t*)value.c_str(), value.size());
|
|
fFilterCount++;
|
|
|
|
if (fFilterCount == 1 && (COP == COMPARE_EQ || COP == COMPARE_NE))
|
|
{
|
|
hasEqualityFilter = true;
|
|
tmpCOP = COP;
|
|
}
|
|
|
|
if (hasEqualityFilter)
|
|
{
|
|
if (COP != tmpCOP)
|
|
{
|
|
hasEqualityFilter = false;
|
|
eqFilter.clear();
|
|
}
|
|
else
|
|
eqFilter.push_back(value);
|
|
}
|
|
}
|
|
|
|
const string pDictionaryStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
|
|
oss << "pDictionaryStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId
|
|
<< " tb/col:" << fTableOid << "/" << fOid;
|
|
oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
{
|
|
oss << fInputJobStepAssociation.outAt(i) << ", ";
|
|
}
|
|
|
|
#ifdef FIFO_SINK
|
|
|
|
if (fOid < 3000)
|
|
oss << " (sink)";
|
|
|
|
#endif
|
|
return oss.str();
|
|
}
|
|
|
|
void pDictionaryStep::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
|
|
{
|
|
ByteStream bs(filter); // need to preserve the input BS
|
|
uint8_t* buf;
|
|
uint8_t COP;
|
|
uint16_t size;
|
|
string value;
|
|
|
|
while (bs.length() > 0)
|
|
{
|
|
bs >> COP;
|
|
bs >> size;
|
|
buf = bs.buf();
|
|
value = string((char*)buf, size);
|
|
addFilter(COP, value);
|
|
bs.advance(size);
|
|
}
|
|
}
|
|
|
|
void pDictionaryStep::addFilter(const Filter* f)
|
|
{
|
|
if (NULL != f)
|
|
fFilters.push_back(f);
|
|
}
|
|
|
|
void pDictionaryStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
|
|
{
|
|
fFilters.insert(fFilters.end(), fs.begin(), fs.end());
|
|
}
|
|
|
|
} // namespace joblist
|