/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: cacheutils.cpp 4051 2013-08-09 22:38:47Z wweeks $ #include #include "cacheutils.h" #include #include #include #include //#define NDEBUG #include using namespace std; #include #include #include using namespace boost; #include "configcpp.h" using namespace config; #include "bytestream.h" #include "messagequeue.h" using namespace messageqcpp; #include "primitivemsg.h" #include "brmtypes.h" using namespace BRM; #include "atomicops.h" namespace { // Only one of the cacheutils fcns can run at a time boost::mutex CacheOpsMutex; // This global is updated only w/ atomic ops volatile uint32_t MultiReturnCode; int32_t extractRespCode(const ByteStream& bs) { if (bs.length() < (sizeof(ISMPacketHeader) + sizeof(int32_t))) return 1; const uint8_t* bytePtr = bs.buf(); const ISMPacketHeader* hdrp = reinterpret_cast(bytePtr); if (hdrp->Command != CACHE_OP_RESULTS) return 1; const int32_t* resp = reinterpret_cast(bytePtr + sizeof(ISMPacketHeader)); return *resp; } class CacheOpThread { public: CacheOpThread(const string& svr, const ByteStream& outBs) : fServerName(svr), fOutBs(outBs) { } ~CacheOpThread() { } void operator()() { struct timespec ts = {10, 0}; int32_t rc = 0; scoped_ptr cl(new MessageQueueClient(fServerName)); try { cl->write(fOutBs); rc = extractRespCode(cl->read(&ts)); } catch (...) { rc = 1; } if (rc != 0) atomicops::atomicCAS(&MultiReturnCode, 0, 1); } private: // CacheOpThread(const CacheOpThread& rhs); // CacheOpThread& operator=(const CacheOpThread& rhs); string fServerName; ByteStream fOutBs; }; int sendToAll(const ByteStream& outBs) { // Not thread-safe: external synchronization is needed! // added code here to flush any running primprocs that may be active // TODO: we really only need to flush each unique PrimProc, but we can't tell from the // config file which those are, so use the same logic as joblist::DistributedEngineComm Config* cf = Config::makeConfig(); const string section = "PrimitiveServers"; int cnt = static_cast(Config::fromText(cf->getConfig(section, "Count"))); if (cnt <= 0) cnt = 1; thread_group tg; int rc = 0; MultiReturnCode = 0; for (int i = 0; i < cnt; i++) { ostringstream oss; oss << "PMS" << (i + 1); tg.create_thread(CacheOpThread(oss.str(), outBs)); } tg.join_all(); if (MultiReturnCode != 0) rc = -1; return rc; } } // namespace namespace cacheutils { /** * */ int flushPrimProcCache() { boost::mutex::scoped_lock lk(CacheOpsMutex); try { const int msgsize = sizeof(ISMPacketHeader); uint8_t msgbuf[msgsize]; memset(msgbuf, 0, sizeof(ISMPacketHeader)); ISMPacketHeader* hdrp = reinterpret_cast(&msgbuf[0]); hdrp->Command = CACHE_FLUSH; ByteStream bs(msgbuf, msgsize); int rc = sendToAll(bs); return rc; } catch (...) { } return -1; } /** * */ int flushPrimProcBlocks(const BRM::BlockList_t& list) { if (list.empty()) return 0; boost::mutex::scoped_lock lk(CacheOpsMutex); #if defined(__LP64__) || defined(_WIN64) if (list.size() > numeric_limits::max()) return -1; #endif try { const size_t msgsize = sizeof(ISMPacketHeader) + sizeof(uint32_t) + sizeof(LbidAtVer) * list.size(); scoped_array msgbuf(new uint8_t[msgsize]); memset(msgbuf.get(), 0, sizeof(ISMPacketHeader)); ISMPacketHeader* hdrp = reinterpret_cast(msgbuf.get()); hdrp->Command = CACHE_CLEAN_VSS; uint32_t* cntp = reinterpret_cast(msgbuf.get() + sizeof(ISMPacketHeader)); *cntp = static_cast(list.size()); LbidAtVer* itemp = reinterpret_cast(msgbuf.get() + sizeof(ISMPacketHeader) + sizeof(uint32_t)); BlockList_t::const_iterator iter = list.begin(); BlockList_t::const_iterator end = list.end(); while (iter != end) { itemp->LBID = static_cast(iter->first); itemp->Ver = static_cast(iter->second); ++itemp; ++iter; } ByteStream bs(msgbuf.get(), msgsize); int rc = sendToAll(bs); return rc; } catch (...) { } return -1; } int flushPrimProcAllverBlocks(const vector& list) { if (list.empty()) return 0; ByteStream bs(sizeof(ISMPacketHeader) + sizeof(uint32_t) + (sizeof(LBID_t) * list.size())); ISMPacketHeader* hdr; int rc; hdr = (ISMPacketHeader*)bs.getInputPtr(); hdr->Command = FLUSH_ALL_VERSION; bs.advanceInputPtr(sizeof(ISMPacketHeader)); bs << (uint32_t)list.size(); bs.append((uint8_t*)&list[0], sizeof(LBID_t) * list.size()); try { boost::mutex::scoped_lock lk(CacheOpsMutex); rc = sendToAll(bs); return rc; } catch (...) { } return -1; } int flushOIDsFromCache(const vector& oids) { /* Message format: * ISMPacketHeader * uint32_t - OID count * uint32_t * - OID array */ boost::mutex::scoped_lock lk(CacheOpsMutex, boost::defer_lock_t()); ByteStream bs; ISMPacketHeader ism; uint32_t i; void* ismp = static_cast(&ism); memset(ismp, 0, sizeof(ISMPacketHeader)); ism.Command = CACHE_FLUSH_BY_OID; bs.load((uint8_t*)&ism, sizeof(ISMPacketHeader)); bs << (uint32_t)oids.size(); for (i = 0; i < oids.size(); i++) bs << (uint32_t)oids[i]; lk.lock(); return sendToAll(bs); } int flushPartition(const std::vector& oids, set& partitionNums) { /* Message format: * ISMPacketHeader * uint32_t - partition count * LogicalPartition * - partitionNum * uint32_t - OID count * uint32_t * - OID array */ boost::mutex::scoped_lock lk(CacheOpsMutex, boost::defer_lock_t()); ByteStream bs; ISMPacketHeader ism; void* ismp = static_cast(&ism); memset(ismp, 0, sizeof(ISMPacketHeader)); ism.Command = CACHE_FLUSH_PARTITION; bs.load((uint8_t*)&ism, sizeof(ISMPacketHeader)); serializeSet(bs, partitionNums); serializeInlineVector(bs, oids); lk.lock(); return sendToAll(bs); } int dropPrimProcFdCache() { const int msgsize = sizeof(ISMPacketHeader); uint8_t msgbuf[msgsize]; memset(msgbuf, 0, sizeof(ISMPacketHeader)); ISMPacketHeader* hdrp = reinterpret_cast(&msgbuf[0]); hdrp->Command = CACHE_DROP_FDS; ByteStream bs(msgbuf, msgsize); try { boost::mutex::scoped_lock lk(CacheOpsMutex); int rc = sendToAll(bs); return rc; } catch (...) { } return -1; } int purgePrimProcFdCache(const std::vector files, const int pmId) { const int msgsize = sizeof(ISMPacketHeader); uint8_t msgbuf[msgsize]; memset(msgbuf, 0, sizeof(ISMPacketHeader)); ISMPacketHeader* hdrp = reinterpret_cast(&msgbuf[0]); hdrp->Command = CACHE_PURGE_FDS; ByteStream bs(msgbuf, msgsize); serializeInlineVector(bs, files); int32_t rc = 0; try { struct timespec ts = {10, 0}; ostringstream oss; oss << "PMS" << pmId; scoped_ptr cl(new MessageQueueClient(oss.str())); cl->write(bs); rc = extractRespCode(cl->read(&ts)); } catch (...) { rc = -1; } return rc; } } // namespace cacheutils