1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/bandeddl.h.set
2016-01-06 14:08:59 -06:00

305 lines
6.0 KiB
Plaintext

/******************************************************************************
* $Id: bandeddl.h.set 3842 2008-03-26 15:02:51Z rdempsey $
*
* Copyright (c) 2006 Calpont Corporation
* All rights reserved.
*****************************************************************************/
/** @file
* class XXX interface
*/
#include <set>
#include "largedatalist.h"
#include "bucketdl.h"
#include <time.h>
#ifndef _BANDEDDL_HPP_
#define _BANDEDDL_HPP_
namespace joblist {
/** @brief class BandedDL
*
*/
template<typename element_t>
class BandedDL : public LargeDataList<std::set<element_t>, element_t>
{
typedef LargeDataList<std::set<element_t>, element_t> base;
public:
BandedDL(uint numConsumers);
BandedDL(BucketDL<element_t> &, uint numConsumers);
virtual ~BandedDL();
int saveBand();
void loadBand(uint);
int bandCount();
/// loads the first band, next() will return the first element
void restart();
virtual void insert(const element_t &);
virtual uint getIterator();
virtual bool next(uint it, element_t *e);
virtual void endOfInput();
using DataListImpl<std::set<element_t>, element_t>::shrink;
uint64_t totalSize();
bool next(uint it, element_t *e, bool *endOfBand);
bool get(const element_t &key, element_t *out);
protected:
private:
explicit BandedDL() { };
explicit BandedDL(const BandedDL &) { };
BandedDL & operator=(const BandedDL &) { };
// vars to support the WSDL-like next() fcn
pthread_cond_t nextSetLoaded;
uint waitingConsumers;
};
template<typename element_t>
BandedDL<element_t>::BandedDL(uint nc) : base(nc)
{
pthread_cond_init(&nextSetLoaded, NULL);
waitingConsumers = 0;
}
template<typename element_t>
BandedDL<element_t>::BandedDL(BucketDL<element_t> &b, uint nc) : base(nc)
{
uint i, it;
element_t e;
bool more;
pthread_cond_init(&nextSetLoaded, NULL);
waitingConsumers = 0;
#ifdef PROFILE
struct timespec ts1, ts2;
clock_gettime(CLOCK_REALTIME, &ts1);
#endif
for (i = 0; i < b.bucketCount(); i++) {
it = b.getIterator(i);
more = b.next(i, it, &e);
while (more) {
insert(e);
more = b.next(i, it, &e);
}
saveBand();
}
endOfInput();
#ifdef PROFILE
clock_gettime(CLOCK_REALTIME, &ts2);
/* What should we do with this profile info? */
#endif
}
template<typename element_t>
BandedDL<element_t>::~BandedDL()
{
pthread_cond_destroy(&nextSetLoaded);
}
template<typename element_t>
int BandedDL<element_t>::saveBand()
{
int ret;
base::lock();
ret = base::save();
base::registerNewSet();
base::unlock();
return ret;
}
template<typename element_t>
void BandedDL<element_t>::loadBand(uint band)
{
base::lock();
base::load(band);
if (waitingConsumers > 0)
pthread_cond_broadcast(&nextSetLoaded);
base::unlock();
}
template<typename element_t>
int BandedDL<element_t>::bandCount()
{
int ret;
base::lock();
ret = base::setCount();
base::unlock();
return ret;
}
template<typename element_t>
uint BandedDL<element_t>::getIterator()
{
uint ret;
base::lock();
ret = base::getIterator();
base::unlock();
return ret;
}
template<typename element_t>
void BandedDL<element_t>::endOfInput()
{
base::lock();
base::endOfInput();
base::save();
base::load(0);
base::unlock();
}
template<typename element_t>
bool BandedDL<element_t>::next(uint it, element_t *e)
{
/* Note: this is the code for WSDL::next(). The more I think about it,
the more I think they're the same thing. Not entirely sure yet though. */
bool ret, locked = false;
uint nextSet;
if (base::numConsumers > 1 || base::phase == 0) {
locked = true;
base::lock();
}
ret = base::next(it, e);
/* XXXPAT: insignificant race condition here. Technically, there's no
guarantee the caller will be wakened when the next set is loaded. It could
get skipped. It won't happen realistically, but it exists... */
// signifies the caller is at the end of the loaded set,
// but there are more sets
if (ret == false && (base::loadedSet < base::setCount - 1)) {
nextSet = base::loadedSet + 1;
waitingConsumers++;
if (waitingConsumers < base::numConsumers)
while (nextSet != base::loadedSet) {
// std::cout << "waiting on nextSetLoaded" << std::endl;
pthread_cond_wait(&nextSetLoaded, &(this->mutex));
}
else {
// std::cout << "loading set " << nextSet << std::endl;
base::load(nextSet);
pthread_cond_broadcast(&nextSetLoaded);
}
waitingConsumers--;
ret = base::next(it, e);
}
if (ret == false && ++base::consumersFinished == base::numConsumers)
base::shrink();
if (locked)
base::unlock();
return ret;
}
template<typename element_t>
void BandedDL<element_t>::insert(const element_t &e)
{
if (base::multipleProducers)
base::lock();
base::insert(e);
if (base::multipleProducers)
base::unlock();
}
template<typename element_t>
bool BandedDL<element_t>::get(const element_t &key, element_t *out)
{
typename std::set<element_t>::iterator it;
bool ret, locked = false;
if (base::numConsumers > 1 || base::phase == 0) {
locked = true;
base::lock();
}
it = base::c->find(key);
if (it != base::c->end()) {
*out = *it;
ret = true;
}
else
ret = false;
if (locked)
base::unlock();
return ret;
}
template<typename element_t>
void BandedDL<element_t>::restart()
{
base::lock();
// base::waitForConsumePhase();
base::load(0);
base::unlock();
}
template<typename element_t>
bool BandedDL<element_t>::next(uint it, element_t *e, bool *endOfBand)
{
bool ret, locked = false;
if (base::numConsumers > 1 || base::phase == 0) {
locked = true;
base::lock();
}
base::waitForConsumePhase();
ret = base::next(it, e);
if (ret) {
if (locked)
base::unlock();
*endOfBand = false;
return ret;
}
else {
*endOfBand = true;
ret = base::loadedSet < (base::setCount() - 1);
if (locked)
base::unlock();
return ret;
}
}
template<typename element_t>
uint64_t BandedDL<element_t>::totalSize()
{
//std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size();
uint64_t ret;
base::lock();
ret = base::totalSize();
base::unlock();
return ret;
}
} // namespace
#endif