/****************************************************************************** * $Id: bandeddl.h.set 3842 2008-03-26 15:02:51Z rdempsey $ * * Copyright (c) 2006 Calpont Corporation * All rights reserved. *****************************************************************************/ /** @file * class XXX interface */ #include #include "largedatalist.h" #include "bucketdl.h" #include #ifndef _BANDEDDL_HPP_ #define _BANDEDDL_HPP_ namespace joblist { /** @brief class BandedDL * */ template class BandedDL : public LargeDataList, element_t> { typedef LargeDataList, element_t> base; public: BandedDL(uint numConsumers); BandedDL(BucketDL &, uint numConsumers); virtual ~BandedDL(); int saveBand(); void loadBand(uint); int bandCount(); /// loads the first band, next() will return the first element void restart(); virtual void insert(const element_t &); virtual uint getIterator(); virtual bool next(uint it, element_t *e); virtual void endOfInput(); using DataListImpl, element_t>::shrink; uint64_t totalSize(); bool next(uint it, element_t *e, bool *endOfBand); bool get(const element_t &key, element_t *out); protected: private: explicit BandedDL() { }; explicit BandedDL(const BandedDL &) { }; BandedDL & operator=(const BandedDL &) { }; // vars to support the WSDL-like next() fcn pthread_cond_t nextSetLoaded; uint waitingConsumers; }; template BandedDL::BandedDL(uint nc) : base(nc) { pthread_cond_init(&nextSetLoaded, NULL); waitingConsumers = 0; } template BandedDL::BandedDL(BucketDL &b, uint nc) : base(nc) { uint i, it; element_t e; bool more; pthread_cond_init(&nextSetLoaded, NULL); waitingConsumers = 0; #ifdef PROFILE struct timespec ts1, ts2; clock_gettime(CLOCK_REALTIME, &ts1); #endif for (i = 0; i < b.bucketCount(); i++) { it = b.getIterator(i); more = b.next(i, it, &e); while (more) { insert(e); more = b.next(i, it, &e); } saveBand(); } endOfInput(); #ifdef PROFILE clock_gettime(CLOCK_REALTIME, &ts2); /* What should we do with this profile info? */ #endif } template BandedDL::~BandedDL() { pthread_cond_destroy(&nextSetLoaded); } template int BandedDL::saveBand() { int ret; base::lock(); ret = base::save(); base::registerNewSet(); base::unlock(); return ret; } template void BandedDL::loadBand(uint band) { base::lock(); base::load(band); if (waitingConsumers > 0) pthread_cond_broadcast(&nextSetLoaded); base::unlock(); } template int BandedDL::bandCount() { int ret; base::lock(); ret = base::setCount(); base::unlock(); return ret; } template uint BandedDL::getIterator() { uint ret; base::lock(); ret = base::getIterator(); base::unlock(); return ret; } template void BandedDL::endOfInput() { base::lock(); base::endOfInput(); base::save(); base::load(0); base::unlock(); } template bool BandedDL::next(uint it, element_t *e) { /* Note: this is the code for WSDL::next(). The more I think about it, the more I think they're the same thing. Not entirely sure yet though. */ bool ret, locked = false; uint nextSet; if (base::numConsumers > 1 || base::phase == 0) { locked = true; base::lock(); } ret = base::next(it, e); /* XXXPAT: insignificant race condition here. Technically, there's no guarantee the caller will be wakened when the next set is loaded. It could get skipped. It won't happen realistically, but it exists... */ // signifies the caller is at the end of the loaded set, // but there are more sets if (ret == false && (base::loadedSet < base::setCount - 1)) { nextSet = base::loadedSet + 1; waitingConsumers++; if (waitingConsumers < base::numConsumers) while (nextSet != base::loadedSet) { // std::cout << "waiting on nextSetLoaded" << std::endl; pthread_cond_wait(&nextSetLoaded, &(this->mutex)); } else { // std::cout << "loading set " << nextSet << std::endl; base::load(nextSet); pthread_cond_broadcast(&nextSetLoaded); } waitingConsumers--; ret = base::next(it, e); } if (ret == false && ++base::consumersFinished == base::numConsumers) base::shrink(); if (locked) base::unlock(); return ret; } template void BandedDL::insert(const element_t &e) { if (base::multipleProducers) base::lock(); base::insert(e); if (base::multipleProducers) base::unlock(); } template bool BandedDL::get(const element_t &key, element_t *out) { typename std::set::iterator it; bool ret, locked = false; if (base::numConsumers > 1 || base::phase == 0) { locked = true; base::lock(); } it = base::c->find(key); if (it != base::c->end()) { *out = *it; ret = true; } else ret = false; if (locked) base::unlock(); return ret; } template void BandedDL::restart() { base::lock(); // base::waitForConsumePhase(); base::load(0); base::unlock(); } template bool BandedDL::next(uint it, element_t *e, bool *endOfBand) { bool ret, locked = false; if (base::numConsumers > 1 || base::phase == 0) { locked = true; base::lock(); } base::waitForConsumePhase(); ret = base::next(it, e); if (ret) { if (locked) base::unlock(); *endOfBand = false; return ret; } else { *endOfBand = true; ret = base::loadedSet < (base::setCount() - 1); if (locked) base::unlock(); return ret; } } template uint64_t BandedDL::totalSize() { //std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size(); uint64_t ret; base::lock(); ret = base::totalSize(); base::unlock(); return ret; } } // namespace #endif