/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************** * $Id: fifo.h 9655 2013-06-25 23:08:13Z xlou $ * *****************************************************************************/ /** @file * class XXX interface */ #pragma once #include #include #include #include #include #include "elementtype.h" #include "datalistimpl.h" namespace joblist { /** @brief class FIFO * */ /* This derives from DataListImpl > but it manages its own data */ template class FIFO : public DataListImpl, element_t> { private: typedef DataListImpl, element_t> base; public: enum ElementMode { RID_ONLY, RID_VALUE }; FIFO(uint32_t numConsumers, uint32_t maxElements); virtual ~FIFO(); /* DataList interface */ inline void insert(const element_t& e); inline void insert(const std::vector& v); inline bool next(uint64_t it, element_t* e); uint64_t getIterator(); void endOfInput(); void setMultipleProducers(bool b); /* Use this insert() to detect when insertion fills up buffer. */ /* When this happens, call waitTillReadyForInserts() before resuming*/ /* with more inserts. */ inline void insert(const element_t& e, bool& bufferFullBlocked, bool& consumptionStarted); inline void waitTillReadyForInserts(); inline bool isOutputBlocked() const; void OID(execplan::CalpontSystemCatalog::OID oid) { base::OID(oid); } execplan::CalpontSystemCatalog::OID OID() const { return base::OID(); } inline void dropToken(){}; inline void dropToken(uint32_t){}; // Counters that reflect how many many times this FIFO blocked on reads/writes uint64_t blockedWriteCount() const; uint64_t blockedReadCount() const; // @bug 653 set number of consumers when it is empty. void setNumConsumers(uint32_t nc); void inOrder(bool order) { fInOrder = order; } bool inOrder() const { return fInOrder; } // Default behavior tracks number of element inserts. If application // is inserting complex elements (ex: RowWrappers), then application // should call totalSize() mutator to record accurate element count. // This is not a blocking call. If totalSize() accessor is called // prior to completion, the "current" total size will be returned. void totalSize(const uint64_t totSize) { fTotSize = totSize; } uint64_t totalSize() { return fTotSize; } void maxElements(uint64_t max); uint64_t maxElements() { return fMaxElements; } // FIFO only uses elementmode to control how the elements are saved // to temp disk, should the application choose to page to disk. If // a FIFO is converted to another datalist, this enum should be copied // over to the datalist, as a ZDL for example can use the element mode // for other reasons. void setElementMode(uint32_t mode) { fElementMode = mode; } uint32_t getElementMode() const { return fElementMode; } // Total number of files and filespace used for temp files void setTotalFileCounts(uint64_t numFiles, uint64_t numBytes); void totalFileCounts(uint64_t& numFiles, uint64_t& numBytes) const; // returns true if there might be more data to read, // false if there is no more data. Similar to next(), but // does not return data. bool more(uint64_t id); protected: private: boost::condition finishedConsuming, moreData; element_t* pBuffer; element_t* cBuffer; uint64_t ppos; uint64_t* cpos; uint64_t cDone; uint64_t fMaxElements; uint64_t cWaiting; uint64_t fTotSize; bool fInOrder; uint64_t fConsumerFinishedCount; volatile bool fConsumptionStarted; uint32_t fElementMode; uint64_t fNumFiles; uint64_t fNumBytes; // Counters that reflect how many many times this FIFO blocked // on reads and writes due to the FIFO being empty or full. uint64_t blockedInsertWriteCount; uint64_t blockedNextReadCount; FIFO& operator=(const FIFO&); FIFO(const FIFO&); FIFO(); void signalPs(); bool swapBuffers(bool waitIfBlocked = true); bool waitForSwap(uint64_t id); }; // #define FIFO_DEBUG // toggles consumer behavior st it only has one critical section. // Need to bench both ways before changing it. // #define ONE_CS template FIFO::FIFO(uint32_t con, uint32_t max) : DataListImpl, element_t>(con) { fMaxElements = max; pBuffer = 0; cBuffer = 0; cpos = new uint64_t[con]; ppos = 0; cWaiting = 0; fTotSize = 0; fInOrder = false; fConsumerFinishedCount = 0; fConsumptionStarted = false; fElementMode = RID_ONLY; fNumFiles = 0; fNumBytes = 0; for (uint64_t i = 0; i < con; ++i) cpos[i] = fMaxElements; cDone = con; blockedInsertWriteCount = blockedNextReadCount = 0; } template FIFO::FIFO() { throw std::logic_error("don't use FIFO()"); } template FIFO::FIFO(const FIFO& f) { throw std::logic_error("don't use FIFO(FIFO &)"); } template FIFO& FIFO::operator=(const FIFO& f) { throw std::logic_error("don't use FIFO:: ="); } template FIFO::~FIFO() { if (pBuffer) delete[] pBuffer; if (cBuffer) delete[] cBuffer; delete[] cpos; } // if waitIfBlocked is false, the return value indicates if the swap blocked template bool FIFO::swapBuffers(bool waitIfBlocked) { element_t* tmp; boost::mutex::scoped_lock scoped(base::mutex); if (cDone < base::numConsumers) { blockedInsertWriteCount++; if (!waitIfBlocked) return true; while (cDone < base::numConsumers) finishedConsuming.wait(scoped); } tmp = pBuffer; pBuffer = cBuffer; cBuffer = tmp; cDone = 0; ppos = 0; memset(cpos, 0, sizeof(*cpos) * base::numConsumers); if (cWaiting) { moreData.notify_all(); cWaiting = 0; } return false; } template inline void FIFO::insert(const element_t& e) { if (!pBuffer) { pBuffer = new element_t[fMaxElements]; cBuffer = new element_t[fMaxElements]; } pBuffer[ppos++] = e; fTotSize++; if (ppos == fMaxElements) swapBuffers(); } // version of insert that will return rather than block if insert would block template inline void FIFO::insert(const element_t& e, bool& bufferFullBlocked, bool& consumptionStarted) { if (!pBuffer) { pBuffer = new element_t[fMaxElements]; cBuffer = new element_t[fMaxElements]; } pBuffer[ppos++] = e; fTotSize++; bufferFullBlocked = false; consumptionStarted = fConsumptionStarted; if (ppos == fMaxElements) bufferFullBlocked = swapBuffers(false); } template inline void FIFO::waitTillReadyForInserts() { if (ppos == fMaxElements) swapBuffers(); } template inline bool FIFO::isOutputBlocked() const { if (ppos == fMaxElements) return true; else return false; } template inline void FIFO::insert(const std::vector& e) { typename std::vector::const_iterator it = e.begin(); typename std::vector::const_iterator end = e.end(); while (it != end) { insert(*it); ++it; } } template bool FIFO::waitForSwap(uint64_t id) { boost::mutex::scoped_lock scoped(base::mutex); #ifdef ONE_CS if (cpos[id] == fMaxElements) if (++cDone == base::numConsumers) finishedConsuming.notify_all() #endif while (cpos[id] == fMaxElements && !base::noMoreInput) { ++cWaiting; blockedNextReadCount++; moreData.wait(scoped); } if (cpos[id] == fMaxElements) { // Before we free the lock, let's check to see if all our consumers // are finished, in which case we can delete our data buffers. if (++fConsumerFinishedCount == base::numConsumers) { delete[] pBuffer; delete[] cBuffer; pBuffer = 0; cBuffer = 0; } return false; } return true; } template bool FIFO::more(uint64_t id) { boost::mutex::scoped_lock scoped(base::mutex); return !(cpos[id] == fMaxElements && base::noMoreInput); } template void FIFO::signalPs() { boost::mutex::scoped_lock scoped(base::mutex); if (++cDone == base::numConsumers) finishedConsuming.notify_all(); } template inline bool FIFO::next(uint64_t id, element_t* out) { base::mutex.lock(); fConsumptionStarted = true; if (cpos[id] >= fMaxElements) { base::mutex.unlock(); if (!waitForSwap(id)) return false; base::mutex.lock(); } *out = cBuffer[cpos[id]++]; #ifndef ONE_CS if (cpos[id] == fMaxElements) { base::mutex.unlock(); signalPs(); return true; } #endif base::mutex.unlock(); return true; } template void FIFO::endOfInput() { element_t* tmp; boost::mutex::scoped_lock scoped(base::mutex); if (ppos != 0) { while (cDone < base::numConsumers) finishedConsuming.wait(scoped); fMaxElements = ppos; tmp = pBuffer; pBuffer = cBuffer; cBuffer = tmp; cDone = 0; memset(cpos, 0, sizeof(*cpos) * base::numConsumers); } base::endOfInput(); if (cWaiting) moreData.notify_all(); } template uint64_t FIFO::getIterator() { uint64_t ret; boost::mutex::scoped_lock scoped(base::mutex); ret = base::getIterator(); return ret; } template uint64_t FIFO::blockedWriteCount() const { return blockedInsertWriteCount; } template uint64_t FIFO::blockedReadCount() const { return blockedNextReadCount; } template void FIFO::setMultipleProducers(bool b) { if (b) throw std::logic_error("FIFO: setMultipleProducers() doesn't work yet"); } //@bug 653 template void FIFO::setNumConsumers(uint32_t nc) { delete[] cpos; base::setNumConsumers(nc); cpos = new uint64_t[nc]; for (uint64_t i = 0; i < nc; ++i) cpos[i] = fMaxElements; cDone = nc; } //@bug 864 template void FIFO::maxElements(uint64_t max) { if (fMaxElements != max) { fMaxElements = max; if (pBuffer) delete[] pBuffer; if (cBuffer) delete[] cBuffer; pBuffer = 0; cBuffer = 0; for (uint64_t i = 0; i < base::numConsumers; ++i) cpos[i] = fMaxElements; } } // // Sets/Returns the number of temp files and the space taken up by those files // (in bytes) by this FIFO collection, if the application code chose to cache // to disk. // template void FIFO::setTotalFileCounts(uint64_t numFiles, uint64_t numBytes) { fNumFiles = numFiles; fNumBytes = numBytes; } template void FIFO::totalFileCounts(uint64_t& numFiles, uint64_t& numBytes) const { numFiles = fNumFiles; numBytes = fNumBytes; } } // namespace joblist