You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
1148 lines
34 KiB
C++
1148 lines
34 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: largehashjoin.cpp 9655 2013-06-25 23:08:13Z xlou $
|
|
#include <string>
|
|
#include <sstream>
|
|
#include <cassert>
|
|
#include <stdexcept>
|
|
#include <ctime>
|
|
#include <sys/time.h>
|
|
#include <iomanip>
|
|
using namespace std;
|
|
|
|
#include <boost/thread/mutex.hpp>
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "jobstep.h"
|
|
#include "largehashjoin.h"
|
|
#include "elementtype.h"
|
|
using namespace joblist;
|
|
|
|
#include "mcsconfig.h"
|
|
|
|
boost::mutex fileLock_g;
|
|
|
|
namespace
|
|
{
|
|
void logDiskIoInfo(uint64_t stepId, const AnyDataListSPtr& spdl)
|
|
{
|
|
boost::mutex::scoped_lock lk(fileLock_g);
|
|
string umDiskioLog = string(MCSLOGDIR) + "/trace/umdiskio.log";
|
|
string umDiskioBak = string(MCSLOGDIR) + "/trace/umdiskio.bak";
|
|
|
|
ofstream umDiskIoFile(umDiskioLog.c_str(), ios_base::app);
|
|
|
|
CalpontSystemCatalog::OID oid;
|
|
uint64_t maxBuckets = 0;
|
|
list<DiskIoInfo>* infoList = NULL;
|
|
string bkt("bkt");
|
|
BucketDL<ElementType>* bdl = spdl->bucketDL();
|
|
BucketDL<StringElementType>* sbdl = spdl->stringBucketDL();
|
|
ZDL<ElementType>* zdl = spdl->zonedDL();
|
|
ZDL<StringElementType>* szdl = spdl->stringZonedDL();
|
|
|
|
if (bdl != NULL)
|
|
{
|
|
maxBuckets = bdl->bucketCount();
|
|
oid = bdl->OID();
|
|
}
|
|
else if (zdl != NULL)
|
|
{
|
|
maxBuckets = zdl->bucketCount();
|
|
oid = zdl->OID();
|
|
bkt = "zdl";
|
|
}
|
|
else if (sbdl != NULL)
|
|
{
|
|
maxBuckets = sbdl->bucketCount();
|
|
oid = sbdl->OID();
|
|
}
|
|
else if (szdl != NULL)
|
|
{
|
|
maxBuckets = szdl->bucketCount();
|
|
oid = szdl->OID();
|
|
bkt = "zdl";
|
|
}
|
|
else
|
|
{
|
|
// not logged for now.
|
|
return;
|
|
}
|
|
|
|
for (uint64_t i = 0; i < maxBuckets; i++)
|
|
{
|
|
if (bdl)
|
|
infoList = &(bdl->diskIoInfoList(i));
|
|
else if (zdl)
|
|
infoList = &(zdl->diskIoInfoList(i));
|
|
else if (sbdl)
|
|
infoList = &(sbdl->diskIoInfoList(i));
|
|
else if (szdl)
|
|
infoList = &(szdl->diskIoInfoList(i));
|
|
|
|
for (list<DiskIoInfo>::iterator j = infoList->begin(); j != infoList->end(); j++)
|
|
{
|
|
boost::posix_time::time_duration td = j->fEnd - j->fStart;
|
|
umDiskIoFile << setfill('0') << "st" << setw(2) << stepId << "oid" << oid << bkt << setw(3) << i
|
|
<< (j->fWrite ? " writes " : " reads ") << setw(7) << setfill(' ') << j->fBytes
|
|
<< " bytes, at " << j->fStart << " duration " << td.total_microseconds() << " mcs @ "
|
|
<< (j->fBytes / td.total_microseconds()) << "MB/s" << endl;
|
|
}
|
|
}
|
|
|
|
streampos curPos = umDiskIoFile.tellp();
|
|
umDiskIoFile.close();
|
|
|
|
// move the current file to bak when size above .5 G, so total log is 1 G
|
|
if (curPos > 0x20000000)
|
|
{
|
|
string cmd = "/bin/mv " + umDiskioLog + " " + umDiskioBak;
|
|
(void)system(cmd.c_str());
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
namespace joblist
|
|
{
|
|
const uint64_t ZDL_VEC_SIZE = 4096;
|
|
//@bug 686. Make hashjoin doing jobs in seperate thread and return to main immediately.
|
|
// So the other job steps can start.
|
|
struct HJRunner
|
|
{
|
|
HJRunner(LargeHashJoin* p) : joiner(p)
|
|
{
|
|
}
|
|
LargeHashJoin* joiner;
|
|
void operator()()
|
|
{
|
|
try
|
|
{
|
|
joiner->doHashJoin();
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << "HJRunner caught: " << e.what();
|
|
joiner->errorLogging(errMsg.str());
|
|
joiner->unblockDatalists(logging::largeHashJoinErr);
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("HJRunner caught something not an exception!");
|
|
joiner->errorLogging(msg);
|
|
joiner->unblockDatalists(logging::largeHashJoinErr);
|
|
}
|
|
}
|
|
};
|
|
|
|
struct StringHJRunner
|
|
{
|
|
StringHJRunner(StringHashJoinStep* p) : joiner(p)
|
|
{
|
|
}
|
|
StringHashJoinStep* joiner;
|
|
void operator()()
|
|
{
|
|
try
|
|
{
|
|
joiner->doStringHashJoin();
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << "StringHJRunner caught: " << e.what();
|
|
joiner->errorLogging(errMsg.str());
|
|
joiner->unblockDatalists(logging::stringHashJoinStepErr);
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("StringHJRunner caught something not an exception!");
|
|
joiner->errorLogging(msg);
|
|
joiner->unblockDatalists(logging::stringHashJoinStepErr);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Thread function used by HashJoin
|
|
//
|
|
template <typename e_t>
|
|
void* HashJoinByBucket_thr(void* arg)
|
|
{
|
|
typename HashJoin<e_t>::thrParams_t* params = (typename HashJoin<e_t>::thrParams_t*)arg;
|
|
HashJoin<e_t>* hjPtr = params->hjptr;
|
|
const uint32_t thrIdx = params->thrIdx;
|
|
long set1Size = 0;
|
|
long set2Size = 0;
|
|
bool sendAllHashSet = false;
|
|
bool sendAllSearchSet = false;
|
|
|
|
try
|
|
{
|
|
for (uint32_t idx = 0, bucketIdx = params->startBucket; idx < params->numBuckets; idx++, bucketIdx++)
|
|
{
|
|
#ifdef DEBUG
|
|
cout << "\tJoinByBucket() thr " << dec << thrIdx << " bkt " << bucketIdx << "/"
|
|
<< hjPtr->Set1()->bucketCount() << "/" << params->numBuckets << endl;
|
|
#endif
|
|
|
|
JoinType joinType = hjPtr->getJoinType();
|
|
|
|
set1Size = hjPtr->Set1()->size(bucketIdx);
|
|
set2Size = hjPtr->Set2()->size(bucketIdx);
|
|
|
|
if (set1Size <= 0 && set2Size <= 0)
|
|
{
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
if (set1Size > set2Size)
|
|
{
|
|
hjPtr->setSearchSet(hjPtr->Set1()->getBDL(), thrIdx);
|
|
hjPtr->setHashSet(hjPtr->Set2()->getBDL(), thrIdx);
|
|
hjPtr->setSearchResult(hjPtr->Result1(), thrIdx);
|
|
hjPtr->setHashResult(hjPtr->Result2(), thrIdx);
|
|
sendAllHashSet = (joinType == RIGHTOUTER);
|
|
sendAllSearchSet = (joinType == LEFTOUTER);
|
|
}
|
|
else
|
|
{
|
|
hjPtr->setHashSet(hjPtr->Set1()->getBDL(), thrIdx);
|
|
hjPtr->setSearchSet(hjPtr->Set2()->getBDL(), thrIdx);
|
|
hjPtr->setHashResult(hjPtr->Result1(), thrIdx);
|
|
hjPtr->setSearchResult(hjPtr->Result2(), thrIdx);
|
|
sendAllHashSet = (joinType == LEFTOUTER);
|
|
sendAllSearchSet = (joinType == RIGHTOUTER);
|
|
} // if set1Size > set2Size ...
|
|
|
|
} // if set1Size <=0 . . .
|
|
|
|
params->timeset.setTimer(createHashStr);
|
|
hjPtr->createHash(hjPtr->HashSet(thrIdx), hjPtr->HashTable(thrIdx), bucketIdx, sendAllHashSet,
|
|
hjPtr->HashResult(thrIdx), params->dlTimes, params->die);
|
|
params->timeset.holdTimer(createHashStr);
|
|
|
|
#ifdef DEBUG
|
|
long hashSetTotal = 0;
|
|
long searchSetTotal = 0;
|
|
|
|
for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
|
|
hashSetTotal += hjPtr->HashSet(thrIdx)->bucketCount(); // are bucketDL
|
|
|
|
for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
|
|
searchSetTotal += hjPtr->SearchSet(thrIdx)->size(j); // can be any datalist
|
|
|
|
cout << "\t\tJoinByBucket() thr " << dec << thrIdx << " bkt " << bucketIdx << " hashSize "
|
|
<< hashSetTotal << " searchSize " << searchSetTotal << endl;
|
|
#endif
|
|
|
|
bool more;
|
|
e_t e;
|
|
e_t e2;
|
|
const uint64_t InvalidRID = static_cast<uint64_t>(-1);
|
|
int iter = hjPtr->SearchSet(thrIdx)->getIterator(bucketIdx);
|
|
|
|
ZonedDL* zdl1 = dynamic_cast<ZonedDL*>(hjPtr->SearchResult(thrIdx));
|
|
ZonedDL* zdl2 = dynamic_cast<ZonedDL*>(hjPtr->HashResult(thrIdx));
|
|
vector<e_t> vec1;
|
|
vector<e_t> vec2;
|
|
|
|
std::pair<typename HashJoin<e_t>::hashIter_t, typename HashJoin<e_t>::hashIter_t> hashItPair;
|
|
typename HashJoin<e_t>::hashIter_t hashIt;
|
|
typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
|
|
params->timeset.setTimer(hashJoinStr);
|
|
|
|
for (more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e); more && !(*params->die);
|
|
more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e))
|
|
{
|
|
// If sendAllSearchSet=true, keep all of the search set. If this is
|
|
// a right outer, we are dealing with a join such as
|
|
// col1 = col2 (+)
|
|
// where col1 is the SearchSet and col2 is the HashSet. We want to include
|
|
// all of col1 in this case regardless of whether there is a matching col2.
|
|
if (sendAllSearchSet)
|
|
{
|
|
if (zdl1)
|
|
{
|
|
vec1.push_back(e);
|
|
|
|
if (vec1.size() >= ZDL_VEC_SIZE)
|
|
{
|
|
params->timeset.setTimer(insertResultsStr);
|
|
hjPtr->SearchResult(thrIdx)->insert(vec1);
|
|
vec1.clear();
|
|
params->timeset.holdTimer(insertResultsStr);
|
|
}
|
|
}
|
|
else
|
|
hjPtr->SearchResult(thrIdx)->insert(e);
|
|
}
|
|
|
|
hashIt = ht->find(e.second);
|
|
|
|
if (hashIt != ht->end())
|
|
{
|
|
#ifdef DEBUG
|
|
|
|
if (hjPtr->SearchResult(thrIdx)->OID() >= 3000)
|
|
cout << "JoinByBucket() SearchResult add " << bucketIdx << " [" << e.first << "][" << e.second
|
|
<< "]" << endl;
|
|
|
|
uint32_t a = 0;
|
|
e_t b = e_t();
|
|
#endif
|
|
|
|
// If sendAllSearchSet=false, we already added the search result
|
|
// before the if condition above.
|
|
if (!sendAllSearchSet)
|
|
{
|
|
if (zdl1)
|
|
{
|
|
vec1.push_back(e);
|
|
|
|
if (vec1.size() >= ZDL_VEC_SIZE)
|
|
{
|
|
params->timeset.setTimer(insertResultsStr);
|
|
hjPtr->SearchResult(thrIdx)->insert(vec1);
|
|
vec1.clear();
|
|
params->timeset.holdTimer(insertResultsStr);
|
|
}
|
|
}
|
|
else
|
|
hjPtr->SearchResult(thrIdx)->insert(e);
|
|
}
|
|
|
|
// If sendAllHashSet=false, add the hash results to the output datalist.
|
|
// If it is a left outer join then we already added all of the right side rows
|
|
// in the bucket in the createHash call earlier in this function.
|
|
if (!sendAllHashSet)
|
|
{
|
|
// If the matching pair has it's RID set to invalid, it's already been encountered,
|
|
// so no reason to add it to the output datalist or keep searching for more matching values.
|
|
if (hashIt->second != InvalidRID)
|
|
{
|
|
// If the matching pair has it's RID set to invalid, it's already been encountered,
|
|
hashItPair = ht->equal_range(e.second);
|
|
|
|
for (hashIt = hashItPair.first; hashIt != hashItPair.second; hashIt++)
|
|
{
|
|
e2.first = hashIt->second;
|
|
e2.second = e.second;
|
|
|
|
if (zdl2)
|
|
{
|
|
vec2.push_back(e2);
|
|
|
|
if (vec2.size() >= ZDL_VEC_SIZE)
|
|
{
|
|
params->timeset.setTimer(insertResultsStr);
|
|
hjPtr->HashResult(thrIdx)->insert(vec2);
|
|
vec2.clear();
|
|
params->timeset.holdTimer(insertResultsStr);
|
|
}
|
|
}
|
|
else
|
|
hjPtr->HashResult(thrIdx)->insert(e2);
|
|
|
|
#ifdef DEBUG
|
|
a++;
|
|
b = v.second;
|
|
#endif
|
|
|
|
// Set the RID to invalid rid now that it's been matched and added to the output datalist.
|
|
// This will keep us from duplicating it if it is matched again.
|
|
hashIt->second = InvalidRID;
|
|
}
|
|
|
|
#ifdef DEBUG
|
|
cout << "\t\tadded " << b << " " << a << " times" << endl << endl;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
} // if hashIt != hashIt->end()
|
|
|
|
} // for ( hjPtr...
|
|
|
|
params->timeset.holdTimer(hashJoinStr);
|
|
|
|
params->timeset.setTimer(insertLastResultsStr);
|
|
|
|
if (vec1.size() != 0)
|
|
{
|
|
hjPtr->SearchResult(thrIdx)->insert(vec1);
|
|
vec1.clear();
|
|
}
|
|
|
|
if (vec2.size() != 0)
|
|
{
|
|
hjPtr->HashResult(thrIdx)->insert(vec2);
|
|
vec2.clear();
|
|
}
|
|
|
|
params->timeset.holdTimer(insertLastResultsStr);
|
|
|
|
// typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
|
|
ht->clear();
|
|
|
|
} // for (bucketIdx...
|
|
} // try
|
|
// We don't have to call JSA.endOfInput() for this exception, because
|
|
// the parent thread takes care of that in performThreadedJoin().
|
|
catch (const logging::LargeDataListExcept& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
|
|
if (typeid(e_t) == typeid(StringElementType))
|
|
{
|
|
errMsg << "HashJoinByBucket_thr<String>: caught LDL error: " << ex.what();
|
|
hjPtr->status(logging::stringHashJoinStepLargeDataListFileErr);
|
|
}
|
|
else
|
|
{
|
|
errMsg << "HashJoinByBucket_thr: caught LDL error: " << ex.what();
|
|
hjPtr->status(logging::largeHashJoinLargeDataListFileErr);
|
|
}
|
|
|
|
cerr << errMsg.str() << endl;
|
|
catchHandler(errMsg.str(), hjPtr->sessionId());
|
|
}
|
|
catch (const exception& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
|
|
if (typeid(e_t) == typeid(StringElementType))
|
|
{
|
|
errMsg << "HashJoinByBucket_thr<String>: caught: " << ex.what();
|
|
hjPtr->status(logging::stringHashJoinStepErr);
|
|
}
|
|
else
|
|
{
|
|
errMsg << "HashJoinByBucket_thr: caught: " << ex.what();
|
|
hjPtr->status(logging::largeHashJoinErr);
|
|
}
|
|
|
|
cerr << errMsg.str() << endl;
|
|
catchHandler(errMsg.str(), hjPtr->sessionId());
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream errMsg;
|
|
|
|
if (typeid(e_t) == typeid(StringElementType))
|
|
{
|
|
errMsg << "HashJoinByBucket_thr<String>: caught unknown exception: ";
|
|
hjPtr->status(logging::stringHashJoinStepErr);
|
|
}
|
|
else
|
|
{
|
|
errMsg << "HashJoinByBucket_thr: caught unknown exception";
|
|
hjPtr->status(logging::largeHashJoinErr);
|
|
}
|
|
|
|
cerr << errMsg.str() << endl;
|
|
catchHandler(errMsg.str(), hjPtr->sessionId());
|
|
}
|
|
|
|
return NULL;
|
|
} // HashJoinByBucket_thr
|
|
|
|
LargeHashJoin::LargeHashJoin(JoinType joinType, uint32_t sessionId, uint32_t txnId, uint32_t statementId,
|
|
ResourceManager* rm)
|
|
: fSessionId(sessionId)
|
|
, fTxnId(txnId)
|
|
, fStepId(0)
|
|
, fStatementId(statementId)
|
|
, fTableOID1(0)
|
|
, fTableOID2(0)
|
|
, fJoinType(joinType)
|
|
, fRm(rm)
|
|
, fAlias1()
|
|
, fAlias2()
|
|
{
|
|
// fConfig = config::Config::makeConfig();
|
|
// fJoinType = joinType;
|
|
}
|
|
|
|
LargeHashJoin::~LargeHashJoin()
|
|
{
|
|
if (traceOn())
|
|
{
|
|
for (uint64_t i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
logDiskIoInfo(fStepId, fInputJobStepAssociation.outAt(i));
|
|
|
|
for (uint64_t i = 0; i < fOutputJobStepAssociation.outSize(); i++)
|
|
logDiskIoInfo(fStepId, fOutputJobStepAssociation.outAt(i));
|
|
}
|
|
}
|
|
|
|
void LargeHashJoin::join()
|
|
{
|
|
runner->join();
|
|
}
|
|
|
|
void LargeHashJoin::run()
|
|
{
|
|
if (traceOn())
|
|
{
|
|
syslogStartStep(16, // exemgr subsystem
|
|
std::string("LargeHashJoin")); // step name
|
|
}
|
|
|
|
runner.reset(new boost::thread(HJRunner(this)));
|
|
}
|
|
|
|
void LargeHashJoin::unblockDatalists(uint16_t status)
|
|
{
|
|
fOutputJobStepAssociation.status(status);
|
|
fOutputJobStepAssociation.outAt(0)->dataList()->endOfInput();
|
|
fOutputJobStepAssociation.outAt(1)->dataList()->endOfInput();
|
|
}
|
|
|
|
void LargeHashJoin::errorLogging(const string& msg) const
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << "Step " << stepId() << "; " << msg;
|
|
cerr << errMsg.str() << endl;
|
|
catchHandler(errMsg.str(), sessionId());
|
|
}
|
|
|
|
void LargeHashJoin::doHashJoin()
|
|
{
|
|
string val;
|
|
|
|
idbassert(fInputJobStepAssociation.outSize() >= 2);
|
|
idbassert(fOutputJobStepAssociation.outSize() >= 2);
|
|
BucketDataList* Ap = 0;
|
|
BucketDataList* Bp = 0;
|
|
BucketDataList* tAp = 0;
|
|
BucketDataList* tBp = 0;
|
|
DataList_t* inDL1 = 0;
|
|
DataList_t* inDL2 = 0;
|
|
inDL1 = fInputJobStepAssociation.outAt(0)->dataList();
|
|
inDL2 = fInputJobStepAssociation.outAt(1)->dataList();
|
|
idbassert(inDL1);
|
|
idbassert(inDL2);
|
|
|
|
HashJoin<ElementType>* hj = 0;
|
|
double createWorkTime = 0;
|
|
double hashWorkTime = 0;
|
|
double insertWorkTime = 0;
|
|
DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
|
|
DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
|
|
|
|
if (0 < fInputJobStepAssociation.status())
|
|
{
|
|
unblockDatalists(fInputJobStepAssociation.status());
|
|
}
|
|
else
|
|
{
|
|
string currentAction("preparing join");
|
|
|
|
try
|
|
{
|
|
// If we're given BucketDL's, use them
|
|
if (typeid(*inDL1) == typeid(BucketDataList))
|
|
{
|
|
if (typeid(*inDL2) != typeid(BucketDataList))
|
|
{
|
|
throw logic_error("LargeHashJoin::run: expected either 0 or 2 BucketDL's!");
|
|
}
|
|
|
|
Ap = dynamic_cast<BucketDataList*>(inDL1);
|
|
Bp = dynamic_cast<BucketDataList*>(inDL2);
|
|
}
|
|
else
|
|
{
|
|
throw logic_error("HashJoin will take only BucketDLs as inputs");
|
|
int maxBuckets = fRm.getHjMaxBuckets();
|
|
joblist::ridtype_t maxElems = fRm.getHjMaxElems();
|
|
|
|
BucketDataList* tAp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
|
|
BucketDataList* tBp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
|
|
tAp->setHashMode(1);
|
|
tBp->setHashMode(1);
|
|
|
|
ElementType element;
|
|
int id;
|
|
|
|
id = inDL1->getIterator();
|
|
|
|
while (inDL1->next(id, &element))
|
|
{
|
|
tAp->insert(element);
|
|
}
|
|
|
|
tAp->endOfInput();
|
|
|
|
id = inDL2->getIterator();
|
|
|
|
while (inDL2->next(id, &element))
|
|
{
|
|
tBp->insert(element);
|
|
}
|
|
|
|
tBp->endOfInput();
|
|
|
|
Ap = tAp;
|
|
Bp = tBp;
|
|
}
|
|
|
|
unsigned numThreads = fRm.getHjNumThreads();
|
|
|
|
BDLWrapper<ElementType> setA(Ap);
|
|
BDLWrapper<ElementType> setB(Bp);
|
|
|
|
hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, fJoinType, &dlTimes,
|
|
fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
|
|
|
|
if (fTableOID2 >= 3000)
|
|
{
|
|
ostringstream logStr2;
|
|
logStr2 << "LargeHashJoin::run: ses:" << fSessionId << " st:" << fStepId
|
|
<< " input sizes: " << setA.size() << "/" << setB.size() << endl;
|
|
cout << logStr2.str();
|
|
}
|
|
|
|
currentAction = "performing join";
|
|
|
|
if (fTableOID2 >= 3000)
|
|
{
|
|
dlTimes.setFirstReadTime();
|
|
dlTimes.setEndOfInputTime(dlTimes.FirstReadTime());
|
|
}
|
|
|
|
hj->performJoin(numThreads);
|
|
|
|
} // try
|
|
catch (const logging::LargeDataListExcept& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught LDL error: " << ex.what();
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::largeHashJoinLargeDataListFileErr);
|
|
}
|
|
catch (const exception& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught: " << ex.what();
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::largeHashJoinErr);
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught unknown exception";
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::largeHashJoinErr);
|
|
}
|
|
|
|
if (hj)
|
|
{
|
|
//..hashWorkTime is the time to perform the hashjoin excluding the
|
|
// the output insertion time. insertWorkTime is the sum or total
|
|
// of both insert times. The end result is that createWorkTime +
|
|
// hashWorkTime + insertWorkTime roughly equates to the total work
|
|
// time.
|
|
createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
|
|
hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) - hj->getTimeSet()->totalTime(insertResultsStr);
|
|
insertWorkTime =
|
|
hj->getTimeSet()->totalTime(insertResultsStr) + hj->getTimeSet()->totalTime(insertLastResultsStr);
|
|
}
|
|
|
|
} // (fInputJobStepAssociation.status() == 0)
|
|
|
|
if (fTableOID2 >= 3000 && traceOn())
|
|
{
|
|
time_t finTime = time(0);
|
|
char finTimeString[50];
|
|
ctime_r(&finTime, finTimeString);
|
|
finTimeString[strlen(finTimeString) - 1] = '\0';
|
|
|
|
ostringstream logStr;
|
|
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << finTimeString << "; 1st read "
|
|
<< dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << endl
|
|
<< "\tLargeHashJoin::run: output sizes: " << resultA->totalSize() << "/" << resultB->totalSize()
|
|
<< " run time: " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
|
|
<< fixed << setprecision(2) << "s\n\tTotal work times: create hash: " << createWorkTime
|
|
<< "s, hash join: " << hashWorkTime << "s, insert results: " << insertWorkTime << "s\n"
|
|
<< "\tJob completion status " << fInputJobStepAssociation.status() << endl;
|
|
logEnd(logStr.str().c_str());
|
|
|
|
syslogProcessingTimes(16, // exemgr subsystem
|
|
dlTimes.FirstReadTime(), // use join start time for first read time
|
|
dlTimes.EndOfInputTime(), // use join end time for last read time
|
|
dlTimes.FirstReadTime(), // use join start time for first write time
|
|
dlTimes.EndOfInputTime()); // use join end time for last write time
|
|
syslogEndStep(16, // exemgr subsystem
|
|
0, // no blocked datalist input to report
|
|
0); // no blocked datalist output to report
|
|
}
|
|
|
|
delete hj;
|
|
delete tAp;
|
|
delete tBp;
|
|
}
|
|
|
|
const string LargeHashJoin::toString() const
|
|
{
|
|
ostringstream oss;
|
|
CalpontSystemCatalog::OID oid1 = 0;
|
|
CalpontSystemCatalog::OID oid2 = 0;
|
|
DataList_t* dl1;
|
|
DataList_t* dl2;
|
|
size_t idlsz;
|
|
|
|
idlsz = fInputJobStepAssociation.outSize();
|
|
idbassert(idlsz == 2);
|
|
dl1 = fInputJobStepAssociation.outAt(0)->dataList();
|
|
|
|
if (dl1)
|
|
oid1 = dl1->OID();
|
|
|
|
dl2 = fInputJobStepAssociation.outAt(1)->dataList();
|
|
|
|
if (dl2)
|
|
oid2 = dl2->OID();
|
|
|
|
oss << "LargeHashJoin ses:" << fSessionId << " st:" << fStepId;
|
|
oss << omitOidInDL;
|
|
oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
|
|
oss << " " << fInputJobStepAssociation.outAt(0);
|
|
oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
|
|
oss << " " << fInputJobStepAssociation.outAt(1);
|
|
|
|
idlsz = fOutputJobStepAssociation.outSize();
|
|
idbassert(idlsz == 2);
|
|
dl1 = fOutputJobStepAssociation.outAt(0)->dataList();
|
|
|
|
if (dl1)
|
|
oid1 = dl1->OID();
|
|
|
|
dl2 = fOutputJobStepAssociation.outAt(1)->dataList();
|
|
|
|
if (dl2)
|
|
oid2 = dl2->OID();
|
|
|
|
oss << endl << " ";
|
|
oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
|
|
oss << " " << fOutputJobStepAssociation.outAt(0);
|
|
oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
|
|
oss << " " << fOutputJobStepAssociation.outAt(1) << endl;
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
StringHashJoinStep::StringHashJoinStep(JoinType joinType, uint32_t sessionId, uint32_t txnId,
|
|
uint32_t statementId, ResourceManager* rm)
|
|
: LargeHashJoin(joinType, sessionId, txnId, statementId, rm)
|
|
{
|
|
}
|
|
|
|
StringHashJoinStep::~StringHashJoinStep()
|
|
{
|
|
}
|
|
|
|
void StringHashJoinStep::run()
|
|
{
|
|
if (traceOn())
|
|
{
|
|
syslogStartStep(16, // exemgr subsystem
|
|
std::string("StringHashJoinStep")); // step name
|
|
}
|
|
|
|
runner.reset(new boost::thread(StringHJRunner(this)));
|
|
}
|
|
|
|
void StringHashJoinStep::doStringHashJoin()
|
|
{
|
|
string val;
|
|
|
|
idbassert(fInputJobStepAssociation.outSize() >= 2);
|
|
idbassert(fOutputJobStepAssociation.outSize() >= 2);
|
|
DataList<StringElementType>* inDL1 = fInputJobStepAssociation.outAt(0)->stringDataList();
|
|
DataList<StringElementType>* inDL2 = fInputJobStepAssociation.outAt(1)->stringDataList();
|
|
idbassert(inDL1);
|
|
idbassert(inDL2);
|
|
|
|
BucketDL<StringElementType>* Ap = 0;
|
|
BucketDL<StringElementType>* Bp = 0;
|
|
BucketDL<StringElementType>* tAp = 0;
|
|
BucketDL<StringElementType>* tBp = 0;
|
|
|
|
HashJoin<StringElementType>* hj = 0;
|
|
double createWorkTime = 0;
|
|
double hashWorkTime = 0;
|
|
double insertWorkTime = 0;
|
|
DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
|
|
DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
|
|
struct timeval start_time;
|
|
gettimeofday(&start_time, 0);
|
|
struct timeval end_time = start_time;
|
|
ZonedDL* bdl1 = 0;
|
|
ZonedDL* bdl2 = 0;
|
|
|
|
// result from hashjoinstep is expected to be BandedDataList
|
|
// but the HashJoin<StringElementType> returns StringDataList
|
|
// also, the null is reported as "_CpNuLl_" by pDictionStep
|
|
// create two StringDataList for the intermediate result BDL
|
|
// @bug 721. use zdl.
|
|
StringZonedDL* dlA = new StringZonedDL(1, fRm);
|
|
dlA->setMultipleProducers(true);
|
|
StringZonedDL* dlB = new StringZonedDL(1, fRm);
|
|
dlB->setMultipleProducers(true);
|
|
|
|
if (0 < fInputJobStepAssociation.status())
|
|
{
|
|
unblockDatalists(fInputJobStepAssociation.status());
|
|
}
|
|
else
|
|
{
|
|
string currentAction("preparing join");
|
|
|
|
try
|
|
{
|
|
// If we're given BucketDL's, use them
|
|
if (typeid(*inDL1) == typeid(BucketDL<StringElementType>))
|
|
{
|
|
if (typeid(*inDL2) != typeid(BucketDL<StringElementType>))
|
|
{
|
|
throw logic_error("StringHashJoinStep::run: expected either 0 or 2 BucketDL's!");
|
|
}
|
|
|
|
Ap = dynamic_cast<BucketDL<StringElementType>*>(inDL1);
|
|
Bp = dynamic_cast<BucketDL<StringElementType>*>(inDL2);
|
|
}
|
|
else
|
|
{
|
|
int maxBuckets = fRm.getHjMaxBuckets();
|
|
joblist::ridtype_t maxElems = fRm.getHjMaxElems();
|
|
|
|
// int maxBuckets=4;
|
|
// joblist::ridtype_t maxElems=1024*8;
|
|
// val = fConfig->getConfig("HashJoin", "MaxBuckets"); // same as HashJoin
|
|
// if (val.size() > 0)
|
|
// maxBuckets = static_cast<int>(config::Config::fromText(val));
|
|
// if (maxBuckets <=0)
|
|
// maxBuckets=4;
|
|
// val = fConfig->getConfig("HashJoin", "MaxElems"); // same as HashJoin
|
|
// if (val.size() >0)
|
|
// maxElems = config::Config::uFromText(val);
|
|
// if (maxElems<=0)
|
|
// maxElems=1024*8;
|
|
|
|
tAp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
|
|
tBp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
|
|
tAp->setHashMode(1);
|
|
tBp->setHashMode(1);
|
|
|
|
StringElementType element;
|
|
int id = inDL1->getIterator();
|
|
|
|
while (inDL1->next(id, &element))
|
|
{
|
|
tAp->insert(element);
|
|
}
|
|
|
|
tAp->endOfInput();
|
|
|
|
id = inDL2->getIterator();
|
|
|
|
while (inDL2->next(id, &element))
|
|
{
|
|
tBp->insert(element);
|
|
}
|
|
|
|
tBp->endOfInput();
|
|
|
|
Ap = tAp;
|
|
Bp = tBp;
|
|
}
|
|
|
|
unsigned numThreads = fRm.getHjNumThreads();
|
|
// unsigned numThreads = 0;
|
|
// val = fConfig->getConfig("HashJoin", "NumThreads");
|
|
// if (val.size() > 0)
|
|
// numThreads = static_cast<unsigned>(config::Config::uFromText(val));
|
|
// if (numThreads <= 0)
|
|
// numThreads = 4;
|
|
|
|
BDLWrapper<StringElementType> setA(Ap);
|
|
BDLWrapper<StringElementType> setB(Bp);
|
|
|
|
HashJoin<StringElementType>* hj =
|
|
new HashJoin<StringElementType>(setA, setB, dlA, dlB, fJoinType, &dlTimes,
|
|
fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
|
|
|
|
if ((dlA == NULL) || (dlB == NULL) || (hj == NULL))
|
|
{
|
|
ostringstream oss;
|
|
oss << "StringHashJoinStep::run() null pointer from new -- ";
|
|
oss << "StringDataList A(0x" << hex << (ptrdiff_t)dlA << "), B(0x" << (ptrdiff_t)dlB
|
|
<< "), HashJoin hj(0x" << (ptrdiff_t)hj << ")";
|
|
throw(runtime_error(oss.str().c_str()));
|
|
}
|
|
|
|
// leave this in
|
|
if (fTableOID2 >= 3000)
|
|
{
|
|
ostringstream logStr2;
|
|
logStr2 << "StringHashJoinStep::run: ses:" << fSessionId << " st:" << fStepId
|
|
<< " input sizes: " << setA.size() << "/" << setB.size() << endl;
|
|
cout << logStr2.str();
|
|
}
|
|
|
|
currentAction = "performing join";
|
|
|
|
if (fTableOID2 >= 3000)
|
|
{
|
|
dlTimes.setFirstReadTime();
|
|
dlTimes.setEndOfInputTime(dlTimes.FirstReadTime());
|
|
}
|
|
|
|
hj->performJoin(numThreads);
|
|
|
|
currentAction = "after join";
|
|
|
|
// convert from StringElementType to ElementType by grabbing the rid
|
|
// take _CpNuLl_ out of the result
|
|
StringElementType se;
|
|
ElementType e;
|
|
int id = dlA->getIterator();
|
|
|
|
bdl1 = dynamic_cast<ZonedDL*>(resultA);
|
|
bdl2 = dynamic_cast<ZonedDL*>(resultB);
|
|
vector<ElementType> v;
|
|
v.reserve(ZDL_VEC_SIZE);
|
|
|
|
if (bdl1)
|
|
{
|
|
while (dlA->next(id, &se))
|
|
{
|
|
if (se.second != CPNULLSTRMARK)
|
|
{
|
|
e.first = se.first;
|
|
v.push_back(e);
|
|
|
|
if (v.size() >= ZDL_VEC_SIZE)
|
|
{
|
|
resultA->insert(v);
|
|
v.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (v.size() > 0)
|
|
resultA->insert(v);
|
|
|
|
resultA->endOfInput();
|
|
}
|
|
|
|
else
|
|
{
|
|
while (dlA->next(id, &se))
|
|
{
|
|
if (se.second != CPNULLSTRMARK)
|
|
{
|
|
e.first = se.first;
|
|
resultA->insert(e);
|
|
}
|
|
}
|
|
|
|
resultA->endOfInput();
|
|
}
|
|
|
|
id = dlB->getIterator();
|
|
|
|
if (bdl2)
|
|
{
|
|
v.clear();
|
|
|
|
while (dlB->next(id, &se))
|
|
{
|
|
if (se.second != CPNULLSTRMARK)
|
|
{
|
|
e.first = se.first;
|
|
v.push_back(e);
|
|
|
|
if (v.size() >= ZDL_VEC_SIZE)
|
|
{
|
|
resultB->insert(v);
|
|
v.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (v.size() > 0)
|
|
resultB->insert(v);
|
|
|
|
resultB->endOfInput();
|
|
}
|
|
else
|
|
{
|
|
while (dlB->next(id, &se))
|
|
{
|
|
if (se.second != CPNULLSTRMARK)
|
|
{
|
|
e.first = se.first;
|
|
resultB->insert(e);
|
|
}
|
|
}
|
|
|
|
resultB->endOfInput();
|
|
}
|
|
} // try
|
|
catch (const logging::LargeDataListExcept& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught LDL error: " << ex.what();
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::stringHashJoinStepLargeDataListFileErr);
|
|
dlA->endOfInput();
|
|
dlB->endOfInput();
|
|
}
|
|
catch (const exception& ex)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught: " << ex.what();
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::stringHashJoinStepErr);
|
|
dlA->endOfInput();
|
|
dlB->endOfInput();
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught unknown exception";
|
|
errorLogging(errMsg.str());
|
|
unblockDatalists(logging::stringHashJoinStepErr);
|
|
dlA->endOfInput();
|
|
dlB->endOfInput();
|
|
}
|
|
|
|
gettimeofday(&end_time, 0);
|
|
|
|
if (fTableOID2 >= 3000)
|
|
dlTimes.setEndOfInputTime();
|
|
|
|
if (hj)
|
|
{
|
|
//..hashWorkTime is the time to perform the hashjoin excluding the
|
|
// the output insertion time. insertWorkTime is the sum or total
|
|
// of both insert times. The end result is that createWorkTime +
|
|
// hashWorkTime + insertWorkTime roughly equates to the total work
|
|
// time.
|
|
createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
|
|
hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) - hj->getTimeSet()->totalTime(insertResultsStr);
|
|
insertWorkTime =
|
|
hj->getTimeSet()->totalTime(insertResultsStr) + hj->getTimeSet()->totalTime(insertLastResultsStr);
|
|
}
|
|
|
|
} // (fInputJobStepAssociation.status() == 0)
|
|
|
|
if (fTableOID2 >= 3000 && traceOn())
|
|
{
|
|
time_t finTime = time(0);
|
|
char finTimeString[50];
|
|
ctime_r(&finTime, finTimeString);
|
|
finTimeString[strlen(finTimeString) - 1] = '\0';
|
|
|
|
ostringstream logStr;
|
|
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << finTimeString << "; 1st read "
|
|
<< dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << endl
|
|
<< "\tStringHashJoinStep::run: output sizes: " << dlA->totalSize() << "/" << dlB->totalSize()
|
|
<< " [";
|
|
|
|
if (bdl1 && bdl2)
|
|
logStr << bdl1->totalSize() << "/" << bdl2->totalSize();
|
|
|
|
logStr << "] run time: " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
|
|
<< fixed << setprecision(2) << "s\n\tTotal work times: create hash: " << createWorkTime
|
|
<< "s, hash join: " << hashWorkTime << "s, insert results: " << insertWorkTime << "s\n"
|
|
<< "\tJob completion status " << fInputJobStepAssociation.status() << endl;
|
|
logEnd(logStr.str().c_str());
|
|
|
|
syslogProcessingTimes(16, // exemgr subsystem
|
|
start_time, // use join start time for first read time
|
|
end_time, // use join end time for last read time
|
|
start_time, // use join start time for first write time
|
|
end_time); // use join end time for last write time
|
|
syslogEndStep(16, // exemgr subsystem
|
|
0, // no blocked datalist input to report
|
|
0); // no blocked datalist output to report
|
|
}
|
|
|
|
delete hj;
|
|
delete tAp;
|
|
delete tBp;
|
|
delete dlA;
|
|
delete dlB;
|
|
}
|
|
|
|
const string StringHashJoinStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
CalpontSystemCatalog::OID oid1 = 0;
|
|
CalpontSystemCatalog::OID oid2 = 0;
|
|
|
|
size_t idlsz = fInputJobStepAssociation.outSize();
|
|
idbassert(idlsz == 2);
|
|
DataList<StringElementType>* dl1 = fInputJobStepAssociation.outAt(0)->stringDataList();
|
|
|
|
if (dl1)
|
|
oid1 = dl1->OID();
|
|
|
|
DataList<StringElementType>* dl2 = fInputJobStepAssociation.outAt(1)->stringDataList();
|
|
|
|
if (dl2)
|
|
oid2 = dl2->OID();
|
|
|
|
oss << "StringHashJoinStep ses:" << fSessionId << " st:" << fStepId;
|
|
oss << omitOidInDL;
|
|
oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
|
|
oss << " " << fInputJobStepAssociation.outAt(0);
|
|
oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
|
|
oss << " " << fInputJobStepAssociation.outAt(1);
|
|
|
|
idlsz = fOutputJobStepAssociation.outSize();
|
|
idbassert(idlsz == 2);
|
|
DataList_t* dl3 = fOutputJobStepAssociation.outAt(0)->dataList();
|
|
|
|
if (dl3)
|
|
oid1 = dl3->OID();
|
|
|
|
DataList_t* dl4 = fOutputJobStepAssociation.outAt(1)->dataList();
|
|
|
|
if (dl4)
|
|
oid2 = dl4->OID();
|
|
|
|
oss << endl << " ";
|
|
oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
|
|
oss << " " << fOutputJobStepAssociation.outAt(0);
|
|
oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
|
|
oss << " " << fOutputJobStepAssociation.outAt(1);
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
} // namespace joblist
|