1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/largehashjoin.cpp
2022-01-21 16:43:49 +00:00

1148 lines
34 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: largehashjoin.cpp 9655 2013-06-25 23:08:13Z xlou $
#include <string>
#include <sstream>
#include <cassert>
#include <stdexcept>
#include <ctime>
#include <sys/time.h>
#include <iomanip>
using namespace std;
#include <boost/thread/mutex.hpp>
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "jobstep.h"
#include "largehashjoin.h"
#include "elementtype.h"
using namespace joblist;
#include "mcsconfig.h"
boost::mutex fileLock_g;
namespace
{
void logDiskIoInfo(uint64_t stepId, const AnyDataListSPtr& spdl)
{
boost::mutex::scoped_lock lk(fileLock_g);
string umDiskioLog = string(MCSLOGDIR) + "/trace/umdiskio.log";
string umDiskioBak = string(MCSLOGDIR) + "/trace/umdiskio.bak";
ofstream umDiskIoFile(umDiskioLog.c_str(), ios_base::app);
CalpontSystemCatalog::OID oid;
uint64_t maxBuckets = 0;
list<DiskIoInfo>* infoList = NULL;
string bkt("bkt");
BucketDL<ElementType>* bdl = spdl->bucketDL();
BucketDL<StringElementType>* sbdl = spdl->stringBucketDL();
ZDL<ElementType>* zdl = spdl->zonedDL();
ZDL<StringElementType>* szdl = spdl->stringZonedDL();
if (bdl != NULL)
{
maxBuckets = bdl->bucketCount();
oid = bdl->OID();
}
else if (zdl != NULL)
{
maxBuckets = zdl->bucketCount();
oid = zdl->OID();
bkt = "zdl";
}
else if (sbdl != NULL)
{
maxBuckets = sbdl->bucketCount();
oid = sbdl->OID();
}
else if (szdl != NULL)
{
maxBuckets = szdl->bucketCount();
oid = szdl->OID();
bkt = "zdl";
}
else
{
// not logged for now.
return;
}
for (uint64_t i = 0; i < maxBuckets; i++)
{
if (bdl)
infoList = &(bdl->diskIoInfoList(i));
else if (zdl)
infoList = &(zdl->diskIoInfoList(i));
else if (sbdl)
infoList = &(sbdl->diskIoInfoList(i));
else if (szdl)
infoList = &(szdl->diskIoInfoList(i));
for (list<DiskIoInfo>::iterator j = infoList->begin(); j != infoList->end(); j++)
{
boost::posix_time::time_duration td = j->fEnd - j->fStart;
umDiskIoFile << setfill('0') << "st" << setw(2) << stepId << "oid" << oid << bkt << setw(3) << i
<< (j->fWrite ? " writes " : " reads ") << setw(7) << setfill(' ') << j->fBytes
<< " bytes, at " << j->fStart << " duration " << td.total_microseconds() << " mcs @ "
<< (j->fBytes / td.total_microseconds()) << "MB/s" << endl;
}
}
streampos curPos = umDiskIoFile.tellp();
umDiskIoFile.close();
// move the current file to bak when size above .5 G, so total log is 1 G
if (curPos > 0x20000000)
{
string cmd = "/bin/mv " + umDiskioLog + " " + umDiskioBak;
(void)system(cmd.c_str());
}
}
} // namespace
namespace joblist
{
const uint64_t ZDL_VEC_SIZE = 4096;
//@bug 686. Make hashjoin doing jobs in seperate thread and return to main immediately.
// So the other job steps can start.
struct HJRunner
{
HJRunner(LargeHashJoin* p) : joiner(p)
{
}
LargeHashJoin* joiner;
void operator()()
{
try
{
joiner->doHashJoin();
}
catch (std::exception& e)
{
ostringstream errMsg;
errMsg << "HJRunner caught: " << e.what();
joiner->errorLogging(errMsg.str());
joiner->unblockDatalists(logging::largeHashJoinErr);
}
catch (...)
{
string msg("HJRunner caught something not an exception!");
joiner->errorLogging(msg);
joiner->unblockDatalists(logging::largeHashJoinErr);
}
}
};
struct StringHJRunner
{
StringHJRunner(StringHashJoinStep* p) : joiner(p)
{
}
StringHashJoinStep* joiner;
void operator()()
{
try
{
joiner->doStringHashJoin();
}
catch (std::exception& e)
{
ostringstream errMsg;
errMsg << "StringHJRunner caught: " << e.what();
joiner->errorLogging(errMsg.str());
joiner->unblockDatalists(logging::stringHashJoinStepErr);
}
catch (...)
{
string msg("StringHJRunner caught something not an exception!");
joiner->errorLogging(msg);
joiner->unblockDatalists(logging::stringHashJoinStepErr);
}
}
};
// Thread function used by HashJoin
//
template <typename e_t>
void* HashJoinByBucket_thr(void* arg)
{
typename HashJoin<e_t>::thrParams_t* params = (typename HashJoin<e_t>::thrParams_t*)arg;
HashJoin<e_t>* hjPtr = params->hjptr;
const uint32_t thrIdx = params->thrIdx;
long set1Size = 0;
long set2Size = 0;
bool sendAllHashSet = false;
bool sendAllSearchSet = false;
try
{
for (uint32_t idx = 0, bucketIdx = params->startBucket; idx < params->numBuckets; idx++, bucketIdx++)
{
#ifdef DEBUG
cout << "\tJoinByBucket() thr " << dec << thrIdx << " bkt " << bucketIdx << "/"
<< hjPtr->Set1()->bucketCount() << "/" << params->numBuckets << endl;
#endif
JoinType joinType = hjPtr->getJoinType();
set1Size = hjPtr->Set1()->size(bucketIdx);
set2Size = hjPtr->Set2()->size(bucketIdx);
if (set1Size <= 0 && set2Size <= 0)
{
continue;
}
else
{
if (set1Size > set2Size)
{
hjPtr->setSearchSet(hjPtr->Set1()->getBDL(), thrIdx);
hjPtr->setHashSet(hjPtr->Set2()->getBDL(), thrIdx);
hjPtr->setSearchResult(hjPtr->Result1(), thrIdx);
hjPtr->setHashResult(hjPtr->Result2(), thrIdx);
sendAllHashSet = (joinType == RIGHTOUTER);
sendAllSearchSet = (joinType == LEFTOUTER);
}
else
{
hjPtr->setHashSet(hjPtr->Set1()->getBDL(), thrIdx);
hjPtr->setSearchSet(hjPtr->Set2()->getBDL(), thrIdx);
hjPtr->setHashResult(hjPtr->Result1(), thrIdx);
hjPtr->setSearchResult(hjPtr->Result2(), thrIdx);
sendAllHashSet = (joinType == LEFTOUTER);
sendAllSearchSet = (joinType == RIGHTOUTER);
} // if set1Size > set2Size ...
} // if set1Size <=0 . . .
params->timeset.setTimer(createHashStr);
hjPtr->createHash(hjPtr->HashSet(thrIdx), hjPtr->HashTable(thrIdx), bucketIdx, sendAllHashSet,
hjPtr->HashResult(thrIdx), params->dlTimes, params->die);
params->timeset.holdTimer(createHashStr);
#ifdef DEBUG
long hashSetTotal = 0;
long searchSetTotal = 0;
for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
hashSetTotal += hjPtr->HashSet(thrIdx)->bucketCount(); // are bucketDL
for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
searchSetTotal += hjPtr->SearchSet(thrIdx)->size(j); // can be any datalist
cout << "\t\tJoinByBucket() thr " << dec << thrIdx << " bkt " << bucketIdx << " hashSize "
<< hashSetTotal << " searchSize " << searchSetTotal << endl;
#endif
bool more;
e_t e;
e_t e2;
const uint64_t InvalidRID = static_cast<uint64_t>(-1);
int iter = hjPtr->SearchSet(thrIdx)->getIterator(bucketIdx);
ZonedDL* zdl1 = dynamic_cast<ZonedDL*>(hjPtr->SearchResult(thrIdx));
ZonedDL* zdl2 = dynamic_cast<ZonedDL*>(hjPtr->HashResult(thrIdx));
vector<e_t> vec1;
vector<e_t> vec2;
std::pair<typename HashJoin<e_t>::hashIter_t, typename HashJoin<e_t>::hashIter_t> hashItPair;
typename HashJoin<e_t>::hashIter_t hashIt;
typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
params->timeset.setTimer(hashJoinStr);
for (more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e); more && !(*params->die);
more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e))
{
// If sendAllSearchSet=true, keep all of the search set. If this is
// a right outer, we are dealing with a join such as
// col1 = col2 (+)
// where col1 is the SearchSet and col2 is the HashSet. We want to include
// all of col1 in this case regardless of whether there is a matching col2.
if (sendAllSearchSet)
{
if (zdl1)
{
vec1.push_back(e);
if (vec1.size() >= ZDL_VEC_SIZE)
{
params->timeset.setTimer(insertResultsStr);
hjPtr->SearchResult(thrIdx)->insert(vec1);
vec1.clear();
params->timeset.holdTimer(insertResultsStr);
}
}
else
hjPtr->SearchResult(thrIdx)->insert(e);
}
hashIt = ht->find(e.second);
if (hashIt != ht->end())
{
#ifdef DEBUG
if (hjPtr->SearchResult(thrIdx)->OID() >= 3000)
cout << "JoinByBucket() SearchResult add " << bucketIdx << " [" << e.first << "][" << e.second
<< "]" << endl;
uint32_t a = 0;
e_t b = e_t();
#endif
// If sendAllSearchSet=false, we already added the search result
// before the if condition above.
if (!sendAllSearchSet)
{
if (zdl1)
{
vec1.push_back(e);
if (vec1.size() >= ZDL_VEC_SIZE)
{
params->timeset.setTimer(insertResultsStr);
hjPtr->SearchResult(thrIdx)->insert(vec1);
vec1.clear();
params->timeset.holdTimer(insertResultsStr);
}
}
else
hjPtr->SearchResult(thrIdx)->insert(e);
}
// If sendAllHashSet=false, add the hash results to the output datalist.
// If it is a left outer join then we already added all of the right side rows
// in the bucket in the createHash call earlier in this function.
if (!sendAllHashSet)
{
// If the matching pair has it's RID set to invalid, it's already been encountered,
// so no reason to add it to the output datalist or keep searching for more matching values.
if (hashIt->second != InvalidRID)
{
// If the matching pair has it's RID set to invalid, it's already been encountered,
hashItPair = ht->equal_range(e.second);
for (hashIt = hashItPair.first; hashIt != hashItPair.second; hashIt++)
{
e2.first = hashIt->second;
e2.second = e.second;
if (zdl2)
{
vec2.push_back(e2);
if (vec2.size() >= ZDL_VEC_SIZE)
{
params->timeset.setTimer(insertResultsStr);
hjPtr->HashResult(thrIdx)->insert(vec2);
vec2.clear();
params->timeset.holdTimer(insertResultsStr);
}
}
else
hjPtr->HashResult(thrIdx)->insert(e2);
#ifdef DEBUG
a++;
b = v.second;
#endif
// Set the RID to invalid rid now that it's been matched and added to the output datalist.
// This will keep us from duplicating it if it is matched again.
hashIt->second = InvalidRID;
}
#ifdef DEBUG
cout << "\t\tadded " << b << " " << a << " times" << endl << endl;
#endif
}
}
} // if hashIt != hashIt->end()
} // for ( hjPtr...
params->timeset.holdTimer(hashJoinStr);
params->timeset.setTimer(insertLastResultsStr);
if (vec1.size() != 0)
{
hjPtr->SearchResult(thrIdx)->insert(vec1);
vec1.clear();
}
if (vec2.size() != 0)
{
hjPtr->HashResult(thrIdx)->insert(vec2);
vec2.clear();
}
params->timeset.holdTimer(insertLastResultsStr);
// typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
ht->clear();
} // for (bucketIdx...
} // try
// We don't have to call JSA.endOfInput() for this exception, because
// the parent thread takes care of that in performThreadedJoin().
catch (const logging::LargeDataListExcept& ex)
{
ostringstream errMsg;
if (typeid(e_t) == typeid(StringElementType))
{
errMsg << "HashJoinByBucket_thr<String>: caught LDL error: " << ex.what();
hjPtr->status(logging::stringHashJoinStepLargeDataListFileErr);
}
else
{
errMsg << "HashJoinByBucket_thr: caught LDL error: " << ex.what();
hjPtr->status(logging::largeHashJoinLargeDataListFileErr);
}
cerr << errMsg.str() << endl;
catchHandler(errMsg.str(), hjPtr->sessionId());
}
catch (const exception& ex)
{
ostringstream errMsg;
if (typeid(e_t) == typeid(StringElementType))
{
errMsg << "HashJoinByBucket_thr<String>: caught: " << ex.what();
hjPtr->status(logging::stringHashJoinStepErr);
}
else
{
errMsg << "HashJoinByBucket_thr: caught: " << ex.what();
hjPtr->status(logging::largeHashJoinErr);
}
cerr << errMsg.str() << endl;
catchHandler(errMsg.str(), hjPtr->sessionId());
}
catch (...)
{
ostringstream errMsg;
if (typeid(e_t) == typeid(StringElementType))
{
errMsg << "HashJoinByBucket_thr<String>: caught unknown exception: ";
hjPtr->status(logging::stringHashJoinStepErr);
}
else
{
errMsg << "HashJoinByBucket_thr: caught unknown exception";
hjPtr->status(logging::largeHashJoinErr);
}
cerr << errMsg.str() << endl;
catchHandler(errMsg.str(), hjPtr->sessionId());
}
return NULL;
} // HashJoinByBucket_thr
LargeHashJoin::LargeHashJoin(JoinType joinType, uint32_t sessionId, uint32_t txnId, uint32_t statementId,
ResourceManager* rm)
: fSessionId(sessionId)
, fTxnId(txnId)
, fStepId(0)
, fStatementId(statementId)
, fTableOID1(0)
, fTableOID2(0)
, fJoinType(joinType)
, fRm(rm)
, fAlias1()
, fAlias2()
{
// fConfig = config::Config::makeConfig();
// fJoinType = joinType;
}
LargeHashJoin::~LargeHashJoin()
{
if (traceOn())
{
for (uint64_t i = 0; i < fInputJobStepAssociation.outSize(); i++)
logDiskIoInfo(fStepId, fInputJobStepAssociation.outAt(i));
for (uint64_t i = 0; i < fOutputJobStepAssociation.outSize(); i++)
logDiskIoInfo(fStepId, fOutputJobStepAssociation.outAt(i));
}
}
void LargeHashJoin::join()
{
runner->join();
}
void LargeHashJoin::run()
{
if (traceOn())
{
syslogStartStep(16, // exemgr subsystem
std::string("LargeHashJoin")); // step name
}
runner.reset(new boost::thread(HJRunner(this)));
}
void LargeHashJoin::unblockDatalists(uint16_t status)
{
fOutputJobStepAssociation.status(status);
fOutputJobStepAssociation.outAt(0)->dataList()->endOfInput();
fOutputJobStepAssociation.outAt(1)->dataList()->endOfInput();
}
void LargeHashJoin::errorLogging(const string& msg) const
{
ostringstream errMsg;
errMsg << "Step " << stepId() << "; " << msg;
cerr << errMsg.str() << endl;
catchHandler(errMsg.str(), sessionId());
}
void LargeHashJoin::doHashJoin()
{
string val;
idbassert(fInputJobStepAssociation.outSize() >= 2);
idbassert(fOutputJobStepAssociation.outSize() >= 2);
BucketDataList* Ap = 0;
BucketDataList* Bp = 0;
BucketDataList* tAp = 0;
BucketDataList* tBp = 0;
DataList_t* inDL1 = 0;
DataList_t* inDL2 = 0;
inDL1 = fInputJobStepAssociation.outAt(0)->dataList();
inDL2 = fInputJobStepAssociation.outAt(1)->dataList();
idbassert(inDL1);
idbassert(inDL2);
HashJoin<ElementType>* hj = 0;
double createWorkTime = 0;
double hashWorkTime = 0;
double insertWorkTime = 0;
DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
if (0 < fInputJobStepAssociation.status())
{
unblockDatalists(fInputJobStepAssociation.status());
}
else
{
string currentAction("preparing join");
try
{
// If we're given BucketDL's, use them
if (typeid(*inDL1) == typeid(BucketDataList))
{
if (typeid(*inDL2) != typeid(BucketDataList))
{
throw logic_error("LargeHashJoin::run: expected either 0 or 2 BucketDL's!");
}
Ap = dynamic_cast<BucketDataList*>(inDL1);
Bp = dynamic_cast<BucketDataList*>(inDL2);
}
else
{
throw logic_error("HashJoin will take only BucketDLs as inputs");
int maxBuckets = fRm.getHjMaxBuckets();
joblist::ridtype_t maxElems = fRm.getHjMaxElems();
BucketDataList* tAp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
BucketDataList* tBp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
tAp->setHashMode(1);
tBp->setHashMode(1);
ElementType element;
int id;
id = inDL1->getIterator();
while (inDL1->next(id, &element))
{
tAp->insert(element);
}
tAp->endOfInput();
id = inDL2->getIterator();
while (inDL2->next(id, &element))
{
tBp->insert(element);
}
tBp->endOfInput();
Ap = tAp;
Bp = tBp;
}
unsigned numThreads = fRm.getHjNumThreads();
BDLWrapper<ElementType> setA(Ap);
BDLWrapper<ElementType> setB(Bp);
hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, fJoinType, &dlTimes,
fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
if (fTableOID2 >= 3000)
{
ostringstream logStr2;
logStr2 << "LargeHashJoin::run: ses:" << fSessionId << " st:" << fStepId
<< " input sizes: " << setA.size() << "/" << setB.size() << endl;
cout << logStr2.str();
}
currentAction = "performing join";
if (fTableOID2 >= 3000)
{
dlTimes.setFirstReadTime();
dlTimes.setEndOfInputTime(dlTimes.FirstReadTime());
}
hj->performJoin(numThreads);
} // try
catch (const logging::LargeDataListExcept& ex)
{
ostringstream errMsg;
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught LDL error: " << ex.what();
errorLogging(errMsg.str());
unblockDatalists(logging::largeHashJoinLargeDataListFileErr);
}
catch (const exception& ex)
{
ostringstream errMsg;
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught: " << ex.what();
errorLogging(errMsg.str());
unblockDatalists(logging::largeHashJoinErr);
}
catch (...)
{
ostringstream errMsg;
errMsg << __FILE__ << " doHashJoin: " << currentAction << ", caught unknown exception";
errorLogging(errMsg.str());
unblockDatalists(logging::largeHashJoinErr);
}
if (hj)
{
//..hashWorkTime is the time to perform the hashjoin excluding the
// the output insertion time. insertWorkTime is the sum or total
// of both insert times. The end result is that createWorkTime +
// hashWorkTime + insertWorkTime roughly equates to the total work
// time.
createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) - hj->getTimeSet()->totalTime(insertResultsStr);
insertWorkTime =
hj->getTimeSet()->totalTime(insertResultsStr) + hj->getTimeSet()->totalTime(insertLastResultsStr);
}
} // (fInputJobStepAssociation.status() == 0)
if (fTableOID2 >= 3000 && traceOn())
{
time_t finTime = time(0);
char finTimeString[50];
ctime_r(&finTime, finTimeString);
finTimeString[strlen(finTimeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << finTimeString << "; 1st read "
<< dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << endl
<< "\tLargeHashJoin::run: output sizes: " << resultA->totalSize() << "/" << resultB->totalSize()
<< " run time: " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< fixed << setprecision(2) << "s\n\tTotal work times: create hash: " << createWorkTime
<< "s, hash join: " << hashWorkTime << "s, insert results: " << insertWorkTime << "s\n"
<< "\tJob completion status " << fInputJobStepAssociation.status() << endl;
logEnd(logStr.str().c_str());
syslogProcessingTimes(16, // exemgr subsystem
dlTimes.FirstReadTime(), // use join start time for first read time
dlTimes.EndOfInputTime(), // use join end time for last read time
dlTimes.FirstReadTime(), // use join start time for first write time
dlTimes.EndOfInputTime()); // use join end time for last write time
syslogEndStep(16, // exemgr subsystem
0, // no blocked datalist input to report
0); // no blocked datalist output to report
}
delete hj;
delete tAp;
delete tBp;
}
const string LargeHashJoin::toString() const
{
ostringstream oss;
CalpontSystemCatalog::OID oid1 = 0;
CalpontSystemCatalog::OID oid2 = 0;
DataList_t* dl1;
DataList_t* dl2;
size_t idlsz;
idlsz = fInputJobStepAssociation.outSize();
idbassert(idlsz == 2);
dl1 = fInputJobStepAssociation.outAt(0)->dataList();
if (dl1)
oid1 = dl1->OID();
dl2 = fInputJobStepAssociation.outAt(1)->dataList();
if (dl2)
oid2 = dl2->OID();
oss << "LargeHashJoin ses:" << fSessionId << " st:" << fStepId;
oss << omitOidInDL;
oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
oss << " " << fInputJobStepAssociation.outAt(0);
oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
oss << " " << fInputJobStepAssociation.outAt(1);
idlsz = fOutputJobStepAssociation.outSize();
idbassert(idlsz == 2);
dl1 = fOutputJobStepAssociation.outAt(0)->dataList();
if (dl1)
oid1 = dl1->OID();
dl2 = fOutputJobStepAssociation.outAt(1)->dataList();
if (dl2)
oid2 = dl2->OID();
oss << endl << " ";
oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
oss << " " << fOutputJobStepAssociation.outAt(0);
oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
oss << " " << fOutputJobStepAssociation.outAt(1) << endl;
return oss.str();
}
StringHashJoinStep::StringHashJoinStep(JoinType joinType, uint32_t sessionId, uint32_t txnId,
uint32_t statementId, ResourceManager* rm)
: LargeHashJoin(joinType, sessionId, txnId, statementId, rm)
{
}
StringHashJoinStep::~StringHashJoinStep()
{
}
void StringHashJoinStep::run()
{
if (traceOn())
{
syslogStartStep(16, // exemgr subsystem
std::string("StringHashJoinStep")); // step name
}
runner.reset(new boost::thread(StringHJRunner(this)));
}
void StringHashJoinStep::doStringHashJoin()
{
string val;
idbassert(fInputJobStepAssociation.outSize() >= 2);
idbassert(fOutputJobStepAssociation.outSize() >= 2);
DataList<StringElementType>* inDL1 = fInputJobStepAssociation.outAt(0)->stringDataList();
DataList<StringElementType>* inDL2 = fInputJobStepAssociation.outAt(1)->stringDataList();
idbassert(inDL1);
idbassert(inDL2);
BucketDL<StringElementType>* Ap = 0;
BucketDL<StringElementType>* Bp = 0;
BucketDL<StringElementType>* tAp = 0;
BucketDL<StringElementType>* tBp = 0;
HashJoin<StringElementType>* hj = 0;
double createWorkTime = 0;
double hashWorkTime = 0;
double insertWorkTime = 0;
DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
struct timeval start_time;
gettimeofday(&start_time, 0);
struct timeval end_time = start_time;
ZonedDL* bdl1 = 0;
ZonedDL* bdl2 = 0;
// result from hashjoinstep is expected to be BandedDataList
// but the HashJoin<StringElementType> returns StringDataList
// also, the null is reported as "_CpNuLl_" by pDictionStep
// create two StringDataList for the intermediate result BDL
// @bug 721. use zdl.
StringZonedDL* dlA = new StringZonedDL(1, fRm);
dlA->setMultipleProducers(true);
StringZonedDL* dlB = new StringZonedDL(1, fRm);
dlB->setMultipleProducers(true);
if (0 < fInputJobStepAssociation.status())
{
unblockDatalists(fInputJobStepAssociation.status());
}
else
{
string currentAction("preparing join");
try
{
// If we're given BucketDL's, use them
if (typeid(*inDL1) == typeid(BucketDL<StringElementType>))
{
if (typeid(*inDL2) != typeid(BucketDL<StringElementType>))
{
throw logic_error("StringHashJoinStep::run: expected either 0 or 2 BucketDL's!");
}
Ap = dynamic_cast<BucketDL<StringElementType>*>(inDL1);
Bp = dynamic_cast<BucketDL<StringElementType>*>(inDL2);
}
else
{
int maxBuckets = fRm.getHjMaxBuckets();
joblist::ridtype_t maxElems = fRm.getHjMaxElems();
// int maxBuckets=4;
// joblist::ridtype_t maxElems=1024*8;
// val = fConfig->getConfig("HashJoin", "MaxBuckets"); // same as HashJoin
// if (val.size() > 0)
// maxBuckets = static_cast<int>(config::Config::fromText(val));
// if (maxBuckets <=0)
// maxBuckets=4;
// val = fConfig->getConfig("HashJoin", "MaxElems"); // same as HashJoin
// if (val.size() >0)
// maxElems = config::Config::uFromText(val);
// if (maxElems<=0)
// maxElems=1024*8;
tAp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
tBp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
tAp->setHashMode(1);
tBp->setHashMode(1);
StringElementType element;
int id = inDL1->getIterator();
while (inDL1->next(id, &element))
{
tAp->insert(element);
}
tAp->endOfInput();
id = inDL2->getIterator();
while (inDL2->next(id, &element))
{
tBp->insert(element);
}
tBp->endOfInput();
Ap = tAp;
Bp = tBp;
}
unsigned numThreads = fRm.getHjNumThreads();
// unsigned numThreads = 0;
// val = fConfig->getConfig("HashJoin", "NumThreads");
// if (val.size() > 0)
// numThreads = static_cast<unsigned>(config::Config::uFromText(val));
// if (numThreads <= 0)
// numThreads = 4;
BDLWrapper<StringElementType> setA(Ap);
BDLWrapper<StringElementType> setB(Bp);
HashJoin<StringElementType>* hj =
new HashJoin<StringElementType>(setA, setB, dlA, dlB, fJoinType, &dlTimes,
fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
if ((dlA == NULL) || (dlB == NULL) || (hj == NULL))
{
ostringstream oss;
oss << "StringHashJoinStep::run() null pointer from new -- ";
oss << "StringDataList A(0x" << hex << (ptrdiff_t)dlA << "), B(0x" << (ptrdiff_t)dlB
<< "), HashJoin hj(0x" << (ptrdiff_t)hj << ")";
throw(runtime_error(oss.str().c_str()));
}
// leave this in
if (fTableOID2 >= 3000)
{
ostringstream logStr2;
logStr2 << "StringHashJoinStep::run: ses:" << fSessionId << " st:" << fStepId
<< " input sizes: " << setA.size() << "/" << setB.size() << endl;
cout << logStr2.str();
}
currentAction = "performing join";
if (fTableOID2 >= 3000)
{
dlTimes.setFirstReadTime();
dlTimes.setEndOfInputTime(dlTimes.FirstReadTime());
}
hj->performJoin(numThreads);
currentAction = "after join";
// convert from StringElementType to ElementType by grabbing the rid
// take _CpNuLl_ out of the result
StringElementType se;
ElementType e;
int id = dlA->getIterator();
bdl1 = dynamic_cast<ZonedDL*>(resultA);
bdl2 = dynamic_cast<ZonedDL*>(resultB);
vector<ElementType> v;
v.reserve(ZDL_VEC_SIZE);
if (bdl1)
{
while (dlA->next(id, &se))
{
if (se.second != CPNULLSTRMARK)
{
e.first = se.first;
v.push_back(e);
if (v.size() >= ZDL_VEC_SIZE)
{
resultA->insert(v);
v.clear();
}
}
}
if (v.size() > 0)
resultA->insert(v);
resultA->endOfInput();
}
else
{
while (dlA->next(id, &se))
{
if (se.second != CPNULLSTRMARK)
{
e.first = se.first;
resultA->insert(e);
}
}
resultA->endOfInput();
}
id = dlB->getIterator();
if (bdl2)
{
v.clear();
while (dlB->next(id, &se))
{
if (se.second != CPNULLSTRMARK)
{
e.first = se.first;
v.push_back(e);
if (v.size() >= ZDL_VEC_SIZE)
{
resultB->insert(v);
v.clear();
}
}
}
if (v.size() > 0)
resultB->insert(v);
resultB->endOfInput();
}
else
{
while (dlB->next(id, &se))
{
if (se.second != CPNULLSTRMARK)
{
e.first = se.first;
resultB->insert(e);
}
}
resultB->endOfInput();
}
} // try
catch (const logging::LargeDataListExcept& ex)
{
ostringstream errMsg;
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught LDL error: " << ex.what();
errorLogging(errMsg.str());
unblockDatalists(logging::stringHashJoinStepLargeDataListFileErr);
dlA->endOfInput();
dlB->endOfInput();
}
catch (const exception& ex)
{
ostringstream errMsg;
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught: " << ex.what();
errorLogging(errMsg.str());
unblockDatalists(logging::stringHashJoinStepErr);
dlA->endOfInput();
dlB->endOfInput();
}
catch (...)
{
ostringstream errMsg;
errMsg << __FILE__ << " doStringHashJoin: " << currentAction << ", caught unknown exception";
errorLogging(errMsg.str());
unblockDatalists(logging::stringHashJoinStepErr);
dlA->endOfInput();
dlB->endOfInput();
}
gettimeofday(&end_time, 0);
if (fTableOID2 >= 3000)
dlTimes.setEndOfInputTime();
if (hj)
{
//..hashWorkTime is the time to perform the hashjoin excluding the
// the output insertion time. insertWorkTime is the sum or total
// of both insert times. The end result is that createWorkTime +
// hashWorkTime + insertWorkTime roughly equates to the total work
// time.
createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) - hj->getTimeSet()->totalTime(insertResultsStr);
insertWorkTime =
hj->getTimeSet()->totalTime(insertResultsStr) + hj->getTimeSet()->totalTime(insertLastResultsStr);
}
} // (fInputJobStepAssociation.status() == 0)
if (fTableOID2 >= 3000 && traceOn())
{
time_t finTime = time(0);
char finTimeString[50];
ctime_r(&finTime, finTimeString);
finTimeString[strlen(finTimeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << finTimeString << "; 1st read "
<< dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << endl
<< "\tStringHashJoinStep::run: output sizes: " << dlA->totalSize() << "/" << dlB->totalSize()
<< " [";
if (bdl1 && bdl2)
logStr << bdl1->totalSize() << "/" << bdl2->totalSize();
logStr << "] run time: " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< fixed << setprecision(2) << "s\n\tTotal work times: create hash: " << createWorkTime
<< "s, hash join: " << hashWorkTime << "s, insert results: " << insertWorkTime << "s\n"
<< "\tJob completion status " << fInputJobStepAssociation.status() << endl;
logEnd(logStr.str().c_str());
syslogProcessingTimes(16, // exemgr subsystem
start_time, // use join start time for first read time
end_time, // use join end time for last read time
start_time, // use join start time for first write time
end_time); // use join end time for last write time
syslogEndStep(16, // exemgr subsystem
0, // no blocked datalist input to report
0); // no blocked datalist output to report
}
delete hj;
delete tAp;
delete tBp;
delete dlA;
delete dlB;
}
const string StringHashJoinStep::toString() const
{
ostringstream oss;
CalpontSystemCatalog::OID oid1 = 0;
CalpontSystemCatalog::OID oid2 = 0;
size_t idlsz = fInputJobStepAssociation.outSize();
idbassert(idlsz == 2);
DataList<StringElementType>* dl1 = fInputJobStepAssociation.outAt(0)->stringDataList();
if (dl1)
oid1 = dl1->OID();
DataList<StringElementType>* dl2 = fInputJobStepAssociation.outAt(1)->stringDataList();
if (dl2)
oid2 = dl2->OID();
oss << "StringHashJoinStep ses:" << fSessionId << " st:" << fStepId;
oss << omitOidInDL;
oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
oss << " " << fInputJobStepAssociation.outAt(0);
oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
oss << " " << fInputJobStepAssociation.outAt(1);
idlsz = fOutputJobStepAssociation.outSize();
idbassert(idlsz == 2);
DataList_t* dl3 = fOutputJobStepAssociation.outAt(0)->dataList();
if (dl3)
oid1 = dl3->OID();
DataList_t* dl4 = fOutputJobStepAssociation.outAt(1)->dataList();
if (dl4)
oid2 = dl4->OID();
oss << endl << " ";
oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
oss << " " << fOutputJobStepAssociation.outAt(0);
oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
oss << " " << fOutputJobStepAssociation.outAt(1);
return oss.str();
}
} // namespace joblist