You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
334 lines
6.9 KiB
Plaintext
334 lines
6.9 KiB
Plaintext
/******************************************************************************
|
|
* $Id: bandeddl.h.vector 3842 2008-03-26 15:02:51Z rdempsey $
|
|
*
|
|
* Copyright (c) 2006 Calpont Corporation
|
|
* All rights reserved.
|
|
*****************************************************************************/
|
|
|
|
/** @file
|
|
* class XXX interface
|
|
*/
|
|
|
|
#include <set>
|
|
#include "largedatalist.h"
|
|
#include "bucketdl.h"
|
|
|
|
#include <time.h>
|
|
|
|
#ifndef _BANDEDDL_HPP_
|
|
#define _BANDEDDL_HPP_
|
|
|
|
namespace joblist {
|
|
|
|
/** @brief class BandedDL
|
|
*
|
|
*/
|
|
template<typename element_t>
|
|
class BandedDL : public LargeDataList<std::vector<element_t>, element_t>
|
|
{
|
|
typedef LargeDataList<std::vector<element_t>, element_t> base;
|
|
|
|
public:
|
|
BandedDL(uint numConsumers);
|
|
BandedDL(BucketDL<element_t> &, uint numConsumers);
|
|
virtual ~BandedDL();
|
|
|
|
int saveBand();
|
|
void loadBand(uint);
|
|
int bandCount();
|
|
|
|
/// loads the first band, next() will return the first element
|
|
void restart();
|
|
|
|
virtual void insert(const element_t &);
|
|
virtual uint getIterator();
|
|
virtual bool next(uint it, element_t *e);
|
|
virtual void endOfInput();
|
|
using DataListImpl<std::vector<element_t>, element_t>::shrink;
|
|
uint64_t totalSize();
|
|
bool next(uint it, element_t *e, bool *endOfBand);
|
|
// bool get(const element_t &key, element_t *out);
|
|
|
|
protected:
|
|
|
|
private:
|
|
explicit BandedDL() { };
|
|
explicit BandedDL(const BandedDL &) { };
|
|
BandedDL & operator=(const BandedDL &) { };
|
|
|
|
// vars to support the WSDL-like next() fcn
|
|
pthread_cond_t nextSetLoaded;
|
|
uint waitingConsumers;
|
|
};
|
|
|
|
template<typename element_t>
|
|
BandedDL<element_t>::BandedDL(uint nc) : base(nc)
|
|
{
|
|
pthread_cond_init(&nextSetLoaded, NULL);
|
|
waitingConsumers = 0;
|
|
}
|
|
|
|
|
|
template<typename element_t>
|
|
BandedDL<element_t>::BandedDL(BucketDL<element_t> &b, uint nc) : base(nc)
|
|
{
|
|
uint i, it;
|
|
element_t e;
|
|
bool more;
|
|
|
|
pthread_cond_init(&nextSetLoaded, NULL);
|
|
waitingConsumers = 0;
|
|
|
|
#ifdef PROFILE
|
|
struct timespec ts1, ts2;
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
#endif
|
|
|
|
for (i = 0; i < b.bucketCount(); i++) {
|
|
it = b.getIterator(i);
|
|
more = b.next(i, it, &e);
|
|
while (more) {
|
|
insert(e);
|
|
more = b.next(i, it, &e);
|
|
}
|
|
saveBand();
|
|
}
|
|
endOfInput();
|
|
|
|
#ifdef PROFILE
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
/* What should we do with this profile info? */
|
|
#endif
|
|
|
|
}
|
|
|
|
template<typename element_t>
|
|
BandedDL<element_t>::~BandedDL()
|
|
{
|
|
pthread_cond_destroy(&nextSetLoaded);
|
|
}
|
|
|
|
template<typename element_t>
|
|
int BandedDL<element_t>::saveBand()
|
|
{
|
|
int ret;
|
|
|
|
if (base::multipleProducers)
|
|
base::lock();
|
|
sort(base::c->begin(), base::c->end());
|
|
if (typeid(element_t) == typeid(ElementType) ||
|
|
typeid(element_t) == typeid(DoubleElementType))
|
|
ret = base::save_contiguous();
|
|
else
|
|
ret = base::save();
|
|
base::registerNewSet();
|
|
if (base::multipleProducers)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
|
|
template<typename element_t>
|
|
void BandedDL<element_t>::loadBand(uint band)
|
|
{
|
|
|
|
base::lock();
|
|
if (typeid(element_t) == typeid(ElementType) ||
|
|
typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(band);
|
|
else
|
|
base::load(band);
|
|
if (waitingConsumers > 0)
|
|
pthread_cond_broadcast(&nextSetLoaded);
|
|
base::unlock();
|
|
}
|
|
|
|
template<typename element_t>
|
|
int BandedDL<element_t>::bandCount()
|
|
{
|
|
int ret;
|
|
|
|
base::lock();
|
|
ret = base::setCount();
|
|
base::unlock();
|
|
return ret;
|
|
}
|
|
|
|
template<typename element_t>
|
|
uint BandedDL<element_t>::getIterator()
|
|
{
|
|
uint ret;
|
|
|
|
base::lock();
|
|
ret = base::getIterator();
|
|
base::unlock();
|
|
return ret;
|
|
}
|
|
|
|
template<typename element_t>
|
|
void BandedDL<element_t>::endOfInput()
|
|
{
|
|
base::lock();
|
|
sort(base::c->begin(), base::c->end());
|
|
base::endOfInput();
|
|
if (typeid(element_t) == typeid(ElementType) ||
|
|
typeid(element_t) == typeid(DoubleElementType)) {
|
|
base::save_contiguous();
|
|
base::load_contiguous(0);
|
|
}
|
|
else {
|
|
base::save();
|
|
base::load(0);
|
|
}
|
|
base::unlock();
|
|
}
|
|
|
|
template<typename element_t>
|
|
bool BandedDL<element_t>::next(uint it, element_t *e)
|
|
{
|
|
|
|
/* Note: this is the code for WSDL::next(). The more I think about it,
|
|
the more I think they're the same thing. Not entirely sure yet though. */
|
|
|
|
bool ret, locked = false;
|
|
uint nextSet;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0) {
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
ret = base::next(it, e);
|
|
|
|
/* XXXPAT: insignificant race condition here. Technically, there's no
|
|
guarantee the caller will be wakened when the next set is loaded. It could
|
|
get skipped. It won't happen realistically, but it exists... */
|
|
|
|
// signifies the caller is at the end of the loaded set,
|
|
// but there are more sets
|
|
if (ret == false && (base::loadedSet < base::setCount - 1)) {
|
|
|
|
nextSet = base::loadedSet + 1;
|
|
waitingConsumers++;
|
|
if (waitingConsumers < base::numConsumers)
|
|
while (nextSet != base::loadedSet) {
|
|
// std::cout << "waiting on nextSetLoaded" << std::endl;
|
|
pthread_cond_wait(&nextSetLoaded, &(this->mutex));
|
|
}
|
|
else {
|
|
// std::cout << "loading set " << nextSet << std::endl;
|
|
if (typeid(element_t) == typeid(ElementType) ||
|
|
typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(nextSet);
|
|
else
|
|
base::load(nextSet);
|
|
pthread_cond_broadcast(&nextSetLoaded);
|
|
}
|
|
waitingConsumers--;
|
|
ret = base::next(it, e);
|
|
}
|
|
|
|
if (ret == false && ++base::consumersFinished == base::numConsumers)
|
|
base::shrink();
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
template<typename element_t>
|
|
void BandedDL<element_t>::insert(const element_t &e)
|
|
{
|
|
if (base::multipleProducers)
|
|
base::lock();
|
|
base::insert(e);
|
|
if (base::multipleProducers)
|
|
base::unlock();
|
|
}
|
|
|
|
/*
|
|
template<typename element_t>
|
|
bool BandedDL<element_t>::get(const element_t &key, element_t *out)
|
|
{
|
|
typename std::set<element_t>::iterator it;
|
|
bool ret, locked = false;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0) {
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
it = base::c->find(key);
|
|
if (it != base::c->end()) {
|
|
*out = *it;
|
|
ret = true;
|
|
}
|
|
else
|
|
ret = false;
|
|
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
*/
|
|
|
|
template<typename element_t>
|
|
void BandedDL<element_t>::restart()
|
|
{
|
|
base::lock();
|
|
// base::waitForConsumePhase();
|
|
if (typeid(element_t) == typeid(ElementType) ||
|
|
typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(0);
|
|
else
|
|
base::load(0);
|
|
base::unlock();
|
|
}
|
|
|
|
template<typename element_t>
|
|
bool BandedDL<element_t>::next(uint it, element_t *e, bool *endOfBand)
|
|
{
|
|
bool ret, locked = false;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0) {
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
base::waitForConsumePhase();
|
|
ret = base::next(it, e);
|
|
if (ret) {
|
|
if (locked)
|
|
base::unlock();
|
|
*endOfBand = false;
|
|
return ret;
|
|
}
|
|
else {
|
|
*endOfBand = true;
|
|
ret = base::loadedSet < (base::setCount() - 1);
|
|
if (locked)
|
|
base::unlock();
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
template<typename element_t>
|
|
uint64_t BandedDL<element_t>::totalSize()
|
|
{
|
|
//std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size();
|
|
uint64_t ret;
|
|
|
|
base::lock();
|
|
ret = base::totalSize();
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
#endif
|
|
|