You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
384 lines
8.6 KiB
C++
384 lines
8.6 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/******************************************************************************
|
|
* $Id: bandeddl.h 9655 2013-06-25 23:08:13Z xlou $
|
|
*
|
|
*****************************************************************************/
|
|
|
|
/** @file
|
|
* class XXX interface
|
|
*/
|
|
|
|
#include <set>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
|
|
#include "largedatalist.h"
|
|
//#include "bucketdl.h"
|
|
|
|
#include <time.h>
|
|
|
|
#pragma once
|
|
|
|
namespace joblist
|
|
{
|
|
/** @brief class BandedDL
|
|
*
|
|
*/
|
|
template <typename element_t>
|
|
class BandedDL : public LargeDataList<std::vector<element_t>, element_t>
|
|
{
|
|
typedef LargeDataList<std::vector<element_t>, element_t> base;
|
|
|
|
public:
|
|
BandedDL(uint32_t numConsumers, const ResourceManager& rm);
|
|
// BandedDL(BucketDL<element_t> &, uint32_t numConsumers, const ResourceManager& rm);
|
|
virtual ~BandedDL();
|
|
|
|
int64_t saveBand();
|
|
void loadBand(uint64_t);
|
|
int64_t bandCount();
|
|
|
|
/// loads the first band, next() will return the first element
|
|
void restart();
|
|
|
|
void insert(const element_t&);
|
|
void insert(const std::vector<element_t>&);
|
|
uint64_t getIterator();
|
|
bool next(uint64_t it, element_t* e);
|
|
void endOfInput();
|
|
using DataListImpl<std::vector<element_t>, element_t>::shrink;
|
|
uint64_t totalSize();
|
|
bool next(uint64_t it, element_t* e, bool* endOfBand);
|
|
|
|
protected:
|
|
private:
|
|
explicit BandedDL(){};
|
|
explicit BandedDL(const BandedDL&){};
|
|
BandedDL& operator=(const BandedDL&){};
|
|
|
|
// vars to support the WSDL-like next() fcn
|
|
boost::condition nextSetLoaded;
|
|
uint64_t waitingConsumers;
|
|
};
|
|
|
|
template <typename element_t>
|
|
BandedDL<element_t>::BandedDL(uint32_t nc, const ResourceManager& rm)
|
|
: base(nc, sizeof(uint64_t), sizeof(uint64_t), rm)
|
|
{
|
|
// pthread_cond_init(&nextSetLoaded, NULL);
|
|
waitingConsumers = 0;
|
|
}
|
|
|
|
#if 0
|
|
template<typename element_t>
|
|
BandedDL<element_t>::BandedDL(BucketDL<element_t>& b, uint32_t nc, const ResourceManager& rm) : base(nc, sizeof(uint64_t), sizeof(uint64_t), rm)
|
|
{
|
|
uint64_t i, it;
|
|
element_t e;
|
|
bool more;
|
|
|
|
//pthread_cond_init(&nextSetLoaded, NULL);
|
|
waitingConsumers = 0;
|
|
|
|
for (i = 0; i < b.bucketCount(); i++)
|
|
{
|
|
it = b.getIterator(i);
|
|
more = b.next(i, it, &e);
|
|
|
|
while (more)
|
|
{
|
|
insert(e);
|
|
more = b.next(i, it, &e);
|
|
}
|
|
|
|
saveBand();
|
|
}
|
|
|
|
endOfInput();
|
|
}
|
|
#endif
|
|
|
|
template <typename element_t>
|
|
BandedDL<element_t>::~BandedDL()
|
|
{
|
|
// pthread_cond_destroy(&nextSetLoaded);
|
|
}
|
|
|
|
template <typename element_t>
|
|
int64_t BandedDL<element_t>::saveBand()
|
|
{
|
|
int64_t ret;
|
|
|
|
if (base::multipleProducers)
|
|
base::lock();
|
|
|
|
sort(base::c->begin(), base::c->end());
|
|
|
|
if (typeid(element_t) == typeid(ElementType) || typeid(element_t) == typeid(DoubleElementType))
|
|
ret = base::save_contiguous();
|
|
else
|
|
ret = base::save();
|
|
|
|
base::registerNewSet();
|
|
|
|
if (base::multipleProducers)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
|
|
template <typename element_t>
|
|
void BandedDL<element_t>::loadBand(uint64_t band)
|
|
{
|
|
base::lock();
|
|
|
|
if (typeid(element_t) == typeid(ElementType) || typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(band);
|
|
else
|
|
base::load(band);
|
|
|
|
if (waitingConsumers > 0)
|
|
nextSetLoaded.notify_all(); // pthread_cond_broadcast(&nextSetLoaded);
|
|
|
|
base::unlock();
|
|
}
|
|
|
|
template <typename element_t>
|
|
int64_t BandedDL<element_t>::bandCount()
|
|
{
|
|
int64_t ret;
|
|
|
|
base::lock();
|
|
ret = base::setCount();
|
|
base::unlock();
|
|
return ret;
|
|
}
|
|
|
|
template <typename element_t>
|
|
uint64_t BandedDL<element_t>::getIterator()
|
|
{
|
|
uint64_t ret;
|
|
|
|
base::lock();
|
|
ret = base::getIterator();
|
|
base::unlock();
|
|
return ret;
|
|
}
|
|
|
|
template <typename element_t>
|
|
void BandedDL<element_t>::endOfInput()
|
|
{
|
|
base::lock();
|
|
sort(base::c->begin(), base::c->end());
|
|
base::endOfInput();
|
|
|
|
if (base::setCount > 1)
|
|
{
|
|
if (typeid(element_t) == typeid(ElementType) || typeid(element_t) == typeid(DoubleElementType))
|
|
{
|
|
base::save_contiguous();
|
|
base::load_contiguous(0);
|
|
}
|
|
else
|
|
{
|
|
base::save();
|
|
base::load(0);
|
|
}
|
|
}
|
|
else
|
|
base::resetIterators();
|
|
|
|
base::unlock();
|
|
}
|
|
|
|
template <typename element_t>
|
|
bool BandedDL<element_t>::next(uint64_t it, element_t* e)
|
|
{
|
|
/* Note: this is the code for WSDL::next(). The more I think about it,
|
|
the more I think they're the same thing. Not entirely sure yet though. */
|
|
|
|
bool ret, locked = false;
|
|
uint64_t nextSet;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0)
|
|
{
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
ret = base::next(it, e);
|
|
|
|
/* XXXPAT: insignificant race condition here. Technically, there's no
|
|
guarantee the caller will be wakened when the next set is loaded. It could
|
|
get skipped. It won't happen realistically, but it exists... */
|
|
|
|
// signifies the caller is at the end of the loaded set,
|
|
// but there are more sets
|
|
if (ret == false && (base::loadedSet < base::setCount - 1))
|
|
{
|
|
nextSet = base::loadedSet + 1;
|
|
waitingConsumers++;
|
|
|
|
if (waitingConsumers < base::numConsumers)
|
|
while (nextSet != base::loadedSet)
|
|
{
|
|
// std::cout << "waiting on nextSetLoaded" << std::endl;
|
|
nextSetLoaded.wait(this->mutex); // pthread_cond_wait(&nextSetLoaded, &(this->mutex));
|
|
}
|
|
else
|
|
{
|
|
// std::cout << "loading set " << nextSet << std::endl;
|
|
if (typeid(element_t) == typeid(ElementType) || typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(nextSet);
|
|
else
|
|
base::load(nextSet);
|
|
|
|
nextSetLoaded.notify_all(); // pthread_cond_broadcast(&nextSetLoaded);
|
|
}
|
|
|
|
waitingConsumers--;
|
|
ret = base::next(it, e);
|
|
}
|
|
|
|
if (ret == false && ++base::consumersFinished == base::numConsumers)
|
|
base::shrink();
|
|
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
|
|
template <typename element_t>
|
|
void BandedDL<element_t>::insert(const element_t& e)
|
|
{
|
|
if (base::multipleProducers)
|
|
base::lock();
|
|
|
|
base::insert(e);
|
|
|
|
if (base::multipleProducers)
|
|
base::unlock();
|
|
}
|
|
|
|
template <typename element_t>
|
|
void BandedDL<element_t>::insert(const std::vector<element_t>& e)
|
|
{
|
|
throw std::logic_error("BandedDL::insert(vector) isn't implemented yet");
|
|
}
|
|
|
|
/*
|
|
template<typename element_t>
|
|
bool BandedDL<element_t>::get(const element_t &key, element_t *out)
|
|
{
|
|
typename std::set<element_t>::iterator it;
|
|
bool ret, locked = false;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0) {
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
it = base::c->find(key);
|
|
if (it != base::c->end()) {
|
|
*out = *it;
|
|
ret = true;
|
|
}
|
|
else
|
|
ret = false;
|
|
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
*/
|
|
|
|
template <typename element_t>
|
|
void BandedDL<element_t>::restart()
|
|
{
|
|
base::lock();
|
|
// base::waitForConsumePhase();
|
|
|
|
// hack! has it been shrunk already?
|
|
if (base::c == NULL)
|
|
base::c = new std::vector<element_t>();
|
|
|
|
if (base::setCount > 1)
|
|
{
|
|
if (typeid(element_t) == typeid(ElementType) || typeid(element_t) == typeid(DoubleElementType))
|
|
base::load_contiguous(0);
|
|
else
|
|
base::load(0);
|
|
}
|
|
else
|
|
base::resetIterators();
|
|
|
|
base::unlock();
|
|
}
|
|
|
|
template <typename element_t>
|
|
bool BandedDL<element_t>::next(uint64_t it, element_t* e, bool* endOfBand)
|
|
{
|
|
bool ret, locked = false;
|
|
|
|
if (base::numConsumers > 1 || base::phase == 0)
|
|
{
|
|
locked = true;
|
|
base::lock();
|
|
}
|
|
|
|
base::waitForConsumePhase();
|
|
ret = base::next(it, e);
|
|
|
|
if (ret)
|
|
{
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
*endOfBand = false;
|
|
return ret;
|
|
}
|
|
else
|
|
{
|
|
*endOfBand = true;
|
|
ret = base::loadedSet < (base::setCount() - 1);
|
|
|
|
if (locked)
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
template <typename element_t>
|
|
uint64_t BandedDL<element_t>::totalSize()
|
|
{
|
|
// std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size();
|
|
uint64_t ret;
|
|
|
|
base::lock();
|
|
ret = base::totalSize();
|
|
base::unlock();
|
|
|
|
return ret;
|
|
}
|
|
|
|
} // namespace joblist
|