You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
543 lines
14 KiB
C++
543 lines
14 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
#pragma once
|
|
// $Id: largehashjoin.h 9655 2013-06-25 23:08:13Z xlou $
|
|
//
|
|
// C++ Implementation: hashjoin
|
|
//
|
|
// Author: Jason Rodriguez <jrodriguez@calpont.com>
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
//
|
|
|
|
/** @file */
|
|
|
|
#include <sstream>
|
|
#include <vector>
|
|
#include <list>
|
|
#include <tr1/unordered_map>
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
|
|
#include <sys/time.h>
|
|
#include <cassert>
|
|
#include <boost/scoped_array.hpp>
|
|
|
|
#include "elementtype.h"
|
|
#include "bdlwrapper.h"
|
|
#include "joblisttypes.h"
|
|
#include "hasher.h"
|
|
#include "timestamp.h"
|
|
#include "timeset.h"
|
|
#include "jl_logger.h"
|
|
|
|
#ifdef PROFILE
|
|
extern void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff);
|
|
#endif
|
|
|
|
namespace joblist
|
|
{
|
|
const string createHashStr("create hash");
|
|
const string hashJoinStr("hash join");
|
|
const string insertResultsStr("insert results");
|
|
const string insertLastResultsStr("insert last results");
|
|
|
|
template <typename element_t>
|
|
void* HashJoinByBucket_thr(void* arg);
|
|
|
|
/** @brief class HjHasher
|
|
*
|
|
*/
|
|
template <typename element_t>
|
|
class HjHasher
|
|
{
|
|
private:
|
|
utils::Hasher hasher;
|
|
|
|
public:
|
|
uint32_t operator()(const typename element_t::second_type& v) const
|
|
{
|
|
return hasher((const char*)&v, sizeof(typename element_t::second_type));
|
|
}
|
|
};
|
|
|
|
// template specialization for string
|
|
template <>
|
|
class HjHasher<StringElementType>
|
|
{
|
|
private:
|
|
utils::Hasher hasher;
|
|
|
|
public:
|
|
uint32_t operator()(const std::string& v) const
|
|
{
|
|
return hasher(v.c_str(), (uint32_t)v.length());
|
|
}
|
|
};
|
|
|
|
/** @brief class HashJoin
|
|
*
|
|
*/
|
|
template <typename element_t>
|
|
class HashJoin
|
|
{
|
|
public:
|
|
typedef std::list<element_t> hashList_t;
|
|
typedef typename std::list<element_t>::iterator hashListIter_t;
|
|
|
|
// @Bug 867 - Changed the unordered_map to an unordered_multimap and changed the value to be RIDs rather
|
|
// than a list of ElementType to reduce memory utilization and to increase the performance of loading the
|
|
// map. typedef std::tr1::unordered_map<typename element_t::second_type, std::list<element_t>,
|
|
// HjHasher<element_t> > hash_t;
|
|
typedef
|
|
typename std::tr1::unordered_multimap<typename element_t::second_type, typename element_t::first_type>
|
|
hash_t;
|
|
typedef typename std::tr1::unordered_multimap<typename element_t::second_type,
|
|
typename element_t::first_type>::iterator hashIter_t;
|
|
typedef typename std::tr1::unordered_multimap<typename element_t::second_type,
|
|
typename element_t::first_type>::value_type hashPair_t;
|
|
|
|
// allow each thread to have its own pointers
|
|
struct control_struct
|
|
{
|
|
hash_t hashTbl;
|
|
BucketDL<element_t>* searchSet;
|
|
BucketDL<element_t>* hashSet;
|
|
DataList<element_t>* searchResult;
|
|
DataList<element_t>* hashResult;
|
|
};
|
|
|
|
boost::scoped_array<struct control_struct> controls; // TODO: needs to be >= HJ threads from
|
|
// Columnstore.xml
|
|
typedef struct thrParams_struct
|
|
{
|
|
HashJoin<element_t>* hjptr;
|
|
uint32_t startBucket;
|
|
uint32_t numBuckets;
|
|
uint32_t thrIdx;
|
|
TimeSet timeset;
|
|
JSTimeStamp dlTimes;
|
|
volatile bool* die;
|
|
} thrParams_t;
|
|
|
|
HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
|
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2, JoinType joinType,
|
|
JSTimeStamp* dlTimes, const SErrorInfo& status, uint32_t sessionId, volatile bool* die);
|
|
|
|
HashJoin();
|
|
HashJoin(const HashJoin& hj);
|
|
JoinType getJoinType()
|
|
{
|
|
return fJoinType;
|
|
}
|
|
virtual ~HashJoin();
|
|
virtual int performJoin(const uint32_t thrCount = 1);
|
|
virtual int performThreadedJoin(const uint32_t numThreads);
|
|
|
|
joblist::BDLWrapper<element_t>* Set1()
|
|
{
|
|
return &fSet1;
|
|
}
|
|
joblist::BDLWrapper<element_t>* Set2()
|
|
{
|
|
return &fSet2;
|
|
}
|
|
joblist::DataList<element_t>* Result1()
|
|
{
|
|
return fResult1;
|
|
}
|
|
joblist::DataList<element_t>* Result2()
|
|
{
|
|
return fResult2;
|
|
}
|
|
|
|
void setSearchSet(BucketDL<element_t>* bdl, const uint32_t idx)
|
|
{
|
|
controls[idx].searchSet = bdl;
|
|
}
|
|
void setHashSet(BucketDL<element_t>* bdl, const uint32_t idx)
|
|
{
|
|
controls[idx].hashSet = bdl;
|
|
}
|
|
|
|
BucketDL<element_t>* SearchSet(const uint32_t idx)
|
|
{
|
|
return controls[idx].searchSet;
|
|
}
|
|
BucketDL<element_t>* HashSet(const uint32_t idx)
|
|
{
|
|
return controls[idx].hashSet;
|
|
}
|
|
|
|
void setSearchResult(DataList<element_t>* bdl, const uint32_t idx)
|
|
{
|
|
controls[idx].searchResult = bdl;
|
|
}
|
|
void setHashResult(DataList<element_t>* bdl, const uint32_t idx)
|
|
{
|
|
controls[idx].hashResult = bdl;
|
|
}
|
|
|
|
joblist::DataList<element_t>* SearchResult(const uint32_t idx)
|
|
{
|
|
return controls[idx].searchResult;
|
|
}
|
|
joblist::DataList<element_t>* HashResult(const uint32_t idx)
|
|
{
|
|
return controls[idx].hashResult;
|
|
}
|
|
|
|
hash_t* HashTable(const uint64_t i)
|
|
{
|
|
return &(controls[i].hashTbl);
|
|
}
|
|
|
|
void createHash(BucketDL<element_t>* bdlptr, hash_t* destHashTbl, const uint32_t idx,
|
|
bool populateResult, // true if bdlptr is opposite an outer join
|
|
joblist::DataList<element_t>* result, // populated if populateResult true
|
|
JSTimeStamp& thrDlTimes, volatile bool* die);
|
|
void init();
|
|
TimeSet* getTimeSet()
|
|
{
|
|
return &fTimeSet;
|
|
}
|
|
uint16_t status() const
|
|
{
|
|
return fStatus->errCode;
|
|
}
|
|
void status(uint16_t s)
|
|
{
|
|
fStatus->errCode;
|
|
}
|
|
uint32_t sessionId()
|
|
{
|
|
return fSessionId;
|
|
}
|
|
|
|
private:
|
|
// input sets
|
|
joblist::BDLWrapper<element_t> fSet1;
|
|
joblist::BDLWrapper<element_t> fSet2;
|
|
|
|
// result sets
|
|
joblist::DataList<element_t>* fResult1;
|
|
joblist::DataList<element_t>* fResult2;
|
|
|
|
// convenience pointers
|
|
BucketDL<element_t>* fSearchSet;
|
|
BucketDL<element_t>* fHashSet;
|
|
joblist::DataList<element_t>* fSearchResult;
|
|
joblist::DataList<element_t>* fHashResult;
|
|
|
|
JoinType fJoinType;
|
|
JSTimeStamp* dlTimes;
|
|
TimeSet fTimeSet;
|
|
SErrorInfo fStatus;
|
|
uint32_t fSessionId;
|
|
volatile bool* die;
|
|
};
|
|
|
|
template <typename element_t>
|
|
HashJoin<element_t>::HashJoin()
|
|
{
|
|
}
|
|
|
|
template <typename element_t>
|
|
HashJoin<element_t>::HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
|
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2,
|
|
JoinType joinType, JSTimeStamp* dlt, const SErrorInfo& status,
|
|
uint32_t sessionId, volatile bool* d)
|
|
: fTimeSet(), fStatus(status), fSessionId(sessionId)
|
|
{
|
|
fSet1 = set1;
|
|
fSet2 = set2;
|
|
fResult1 = result1;
|
|
fResult2 = result2;
|
|
fSearchResult = NULL;
|
|
fHashResult = NULL;
|
|
fJoinType = joinType;
|
|
die = d;
|
|
init();
|
|
dlTimes = dlt;
|
|
}
|
|
|
|
template <typename element_t>
|
|
void HashJoin<element_t>::init()
|
|
{
|
|
controls.reset(new control_struct[32]);
|
|
|
|
for (int idx = 0; idx < 32; idx++)
|
|
{
|
|
HashTable(idx)->clear();
|
|
setSearchSet(NULL, idx);
|
|
setHashSet(NULL, idx);
|
|
setSearchResult(NULL, idx);
|
|
setHashResult(NULL, idx);
|
|
}
|
|
}
|
|
|
|
template <typename element_t>
|
|
HashJoin<element_t>::~HashJoin()
|
|
{
|
|
controls.reset();
|
|
}
|
|
|
|
template <typename element_t>
|
|
int HashJoin<element_t>::performThreadedJoin(const uint32_t numThreads)
|
|
{
|
|
// boost::thread thrArr[numThreads];
|
|
boost::scoped_array<boost::thread> thrArr(new boost::thread[numThreads]);
|
|
// typename HashJoin<element_t>::thrParams_t params[numThreads];
|
|
boost::scoped_array<typename HashJoin<element_t>::thrParams_t> params(
|
|
new typename HashJoin<element_t>::thrParams_t[numThreads]);
|
|
uint32_t maxThreads = numThreads;
|
|
int realCnt = 0;
|
|
uint32_t bucketsPerThr = 0;
|
|
uint32_t totalBuckets = 0;
|
|
|
|
// TODO: maybe this should throw an exception
|
|
if (maxThreads <= 0 || maxThreads > 32)
|
|
{
|
|
maxThreads = 1;
|
|
#ifdef DEBUG
|
|
cerr << "HashJoin: invalid requested thread value n=" << numThreads << endl;
|
|
#endif
|
|
}
|
|
|
|
// s/b equal buckets so check Set1() buckets
|
|
if (Set1()->bucketCount() < maxThreads)
|
|
{
|
|
maxThreads = Set1()->bucketCount();
|
|
}
|
|
|
|
bucketsPerThr = (uint32_t)(Set1()->bucketCount() / maxThreads);
|
|
uint32_t idx = 0;
|
|
|
|
for (idx = 0; idx < maxThreads && totalBuckets < Set1()->bucketCount(); idx++)
|
|
{
|
|
params[idx].hjptr = this;
|
|
params[idx].startBucket = totalBuckets;
|
|
params[idx].numBuckets = bucketsPerThr;
|
|
params[idx].thrIdx = idx;
|
|
params[idx].die = die;
|
|
totalBuckets += bucketsPerThr;
|
|
|
|
if ((totalBuckets + bucketsPerThr) > Set1()->bucketCount())
|
|
bucketsPerThr = Set1()->bucketCount() - totalBuckets;
|
|
|
|
#ifdef DEBUG
|
|
cout << "thr i " << idx << " [" << params[idx].startBucket << ", " << params[idx].numBuckets << "] "
|
|
<< totalBuckets << " " << (int)bucketsPerThr << endl;
|
|
#endif
|
|
|
|
} // for(
|
|
|
|
// add the remaining buckets to the last thread
|
|
// instead of adding a thread
|
|
if (totalBuckets < Set1()->bucketCount())
|
|
{
|
|
idx--;
|
|
params[idx].numBuckets += bucketsPerThr;
|
|
}
|
|
|
|
#ifdef DEBUG
|
|
cout << "thr i " << idx << " [" << params[idx].startBucket << ", " << params[idx].numBuckets << "] "
|
|
<< totalBuckets << " " << (int)bucketsPerThr << "-" << endl;
|
|
#endif
|
|
|
|
try
|
|
{
|
|
for (idx = 0; idx < maxThreads; idx++)
|
|
{
|
|
int ret = 0;
|
|
// ret = pthread_create(&thrArr[idx], NULL, HashJoinByBucket_thr<element_t>, ¶ms[idx]);
|
|
boost::thread t(HashJoinByBucket_thr<element_t>, ¶ms[idx]);
|
|
thrArr[idx].swap(t);
|
|
|
|
if (ret != 0)
|
|
throw logic_error("HashJoin: pthread_create failure");
|
|
else
|
|
{
|
|
realCnt++;
|
|
}
|
|
|
|
#ifdef DEBUG
|
|
cout << "Started thr " << idx << endl;
|
|
#endif
|
|
}
|
|
|
|
idbassert((unsigned)realCnt == maxThreads);
|
|
} // try
|
|
catch (std::exception& e)
|
|
{
|
|
std::ostringstream errMsg;
|
|
|
|
if (typeid(element_t) == typeid(StringElementType))
|
|
{
|
|
errMsg << "performThreadedJoin<String>: caught: " << e.what();
|
|
*fStatus = logging::stringHashJoinStepErr;
|
|
}
|
|
else
|
|
{
|
|
errMsg << "performThreadedJoin: caught: " << e.what();
|
|
*fStatus = logging::largeHashJoinErr;
|
|
}
|
|
|
|
std::cerr << errMsg.str() << std::endl;
|
|
catchHandler(errMsg.str(), sessionId());
|
|
}
|
|
catch (...)
|
|
{
|
|
std::ostringstream errMsg;
|
|
|
|
if (typeid(element_t) == typeid(StringElementType))
|
|
{
|
|
errMsg << "performThreadedJoin<String>: unknown exception";
|
|
*fStatus = logging::stringHashJoinStepErr;
|
|
}
|
|
else
|
|
{
|
|
errMsg << "performThreadedJoin: caught: unknown exception";
|
|
*fStatus = logging::largeHashJoinErr;
|
|
}
|
|
|
|
std::cerr << errMsg.str() << std::endl;
|
|
catchHandler(errMsg.str(), sessionId());
|
|
}
|
|
|
|
for (int idx = 0; idx < realCnt; idx++)
|
|
{
|
|
thrArr[idx].join(); // pthread_join(thrArr[idx], NULL);
|
|
#ifdef DEBUG
|
|
cout << "HJ " << hex << this << dec << ": Joining thr " << idx << endl;
|
|
#endif
|
|
}
|
|
|
|
Result1()->endOfInput();
|
|
Result2()->endOfInput();
|
|
|
|
if (realCnt > 0)
|
|
dlTimes->setFirstReadTime(params[0].dlTimes.FirstReadTime());
|
|
|
|
dlTimes->setEndOfInputTime();
|
|
|
|
for (int i = 0; i < realCnt; i++)
|
|
{
|
|
fTimeSet += params[i].timeset;
|
|
|
|
// Select earliest read time as overall firstReadTime
|
|
if (params[i].dlTimes.FirstReadTime().tv_sec < dlTimes->FirstReadTime().tv_sec)
|
|
dlTimes->setFirstReadTime(params[i].dlTimes.FirstReadTime());
|
|
}
|
|
|
|
controls.reset();
|
|
|
|
return realCnt;
|
|
}
|
|
|
|
// defaults to 1 thread
|
|
template <typename element_t>
|
|
int HashJoin<element_t>::performJoin(const uint32_t thrCount)
|
|
{
|
|
return performThreadedJoin(thrCount);
|
|
}
|
|
|
|
// create a hash table from the elements in the bucketDL at bdlptr[bucketNum]
|
|
//
|
|
template <typename element_t>
|
|
void HashJoin<element_t>::createHash(BucketDL<element_t>* srcBucketDL, hash_t* destHashTbl,
|
|
const uint32_t bucketNum, bool populateResult,
|
|
joblist::DataList<element_t>* result, JSTimeStamp& thrDlTimes,
|
|
volatile bool* die)
|
|
{
|
|
bool more;
|
|
element_t e;
|
|
element_t temp;
|
|
#ifdef DEBUG
|
|
int idx = 0;
|
|
#endif
|
|
#ifdef PROFILE
|
|
timespec ts1, ts2, diff;
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
#endif
|
|
uint32_t bucketIter = srcBucketDL->getIterator((int)bucketNum);
|
|
// @bug 828. catch hashjoin starting time
|
|
more = srcBucketDL->next(bucketNum, bucketIter, &e);
|
|
|
|
if (thrDlTimes.FirstReadTime().tv_sec == 0)
|
|
{
|
|
thrDlTimes.setFirstReadTime();
|
|
}
|
|
|
|
for (; more & !(*die); more = srcBucketDL->next(bucketNum, bucketIter, &e))
|
|
{
|
|
#ifdef DEBUG
|
|
cout << "createHash() bkt " << bucketNum << " idx " << idx << " find(" << e.second << ")" << endl;
|
|
#endif
|
|
|
|
// If the bucket dl is the other side of an outer join, we want to go ahead and populate the output
|
|
// data list with every row. For example, if the where clause is:
|
|
// where colA (+) = colB
|
|
// and the passed bucket datalist contains colb, we will go ahead and populate the output datalist here
|
|
// because all of colB should be returned regardless of whether there is a matching colA.
|
|
if (populateResult)
|
|
{
|
|
result->insert(e);
|
|
}
|
|
|
|
try
|
|
{
|
|
// std::list<element_t> tmp(1,e);
|
|
destHashTbl->insert(
|
|
std::pair<typename element_t::second_type, typename element_t::first_type>(e.second, e.first));
|
|
}
|
|
catch (exception& exc)
|
|
{
|
|
std::ostringstream errMsg;
|
|
errMsg << "Exception in createHash() " << exc.what();
|
|
std::cerr << errMsg.str() << endl;
|
|
catchHandler(errMsg.str(), sessionId());
|
|
throw; // rethrow
|
|
}
|
|
catch (...)
|
|
{
|
|
std::string errMsg("Unknown exception in createHash()");
|
|
std::cerr << errMsg << endl;
|
|
catchHandler(errMsg, sessionId());
|
|
throw; // rethrow
|
|
}
|
|
|
|
} // for (more...
|
|
|
|
#ifdef DEBUG
|
|
cout << "createHash() bkt " << bucketNum << " complete" << endl;
|
|
#endif
|
|
|
|
#ifdef PROFILE
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
timespec_sub(ts1, ts2, diff);
|
|
std::cout << "Time to create hash for bucket pair " << bucketNum << ": " << diff.tv_sec << "s "
|
|
<< diff.tv_nsec << "ns" << std::endl;
|
|
#endif
|
|
} // createHash
|
|
|
|
} // namespace joblist
|