/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: tdriver-dec.cpp 9210 2013-01-21 14:10:42Z rdempsey $ #include #include #include using namespace std; #include using namespace boost; #include "primitivemsg.h" #include "bytestream.h" using namespace messageqcpp; #include "distributedenginecomm.h" using namespace joblist; class TestDistributedEngineComm { public: TestDistributedEngineComm(DistributedEngineComm* dec) : fDec(dec) { } void addDataToOutput(const ByteStream& bs) { fDec->addDataToOutput(bs); } private: DistributedEngineComm* fDec; }; namespace { const ByteStream buildBs(Int16 sessionId, Int16 stepId) { uint32_t len = sizeof(ISMPacketHeader) + 2 * sizeof(PrimitiveHeader); ByteStream::byte* bpr = new ByteStream::byte[len]; ISMPacketHeader* hdr = reinterpret_cast(bpr); PrimitiveHeader* p = reinterpret_cast(hdr + 1); p->SessionID = sessionId; p->StepID = stepId; ByteStream bs(bpr, len); delete[] bpr; return bs; } void readBs(const ByteStream& bs, Int16& sessionId, Int16& stepId) { const ISMPacketHeader* hdr = reinterpret_cast(bs.buf()); const PrimitiveHeader* p = reinterpret_cast(hdr + 1); sessionId = p->SessionID; stepId = p->StepID; return; } class ThdFun1 { public: ThdFun1(DistributedEngineComm* dec, int sessionId, int stepId) : fDec(dec), fSessionId(sessionId), fStepId(stepId) { } void operator()() { ByteStream bs = fDec->read(fSessionId, fStepId); idbassert(bs.length() == 0); return; } private: DistributedEngineComm* fDec; int fSessionId; int fStepId; }; } // namespace int main(int argc, char** argv) { int leakCheck = 0; if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0) leakCheck = 1; DistributedEngineComm* dec; dec = DistributedEngineComm::instance("./config-dec.xml"); dec->addSession(12345); dec->addStep(12345, 0); dec->addStep(12345, 1); dec->addStep(12345, 3); dec->addStep(12345, 10); TestDistributedEngineComm tdec(dec); ByteStream bs; tdec.addDataToOutput(buildBs(12345, 0)); tdec.addDataToOutput(buildBs(12345, 1)); tdec.addDataToOutput(buildBs(12345, 3)); tdec.addDataToOutput(buildBs(12345, 10)); Int16 sessionId, stepId; bs = dec->read(12345, 10); readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == 10); bs = dec->read(12345, 1); readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == 1); bs = dec->read(12345, 0); readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == 0); bs = dec->read(12345, 3); readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == 3); unsigned i; bs = buildBs(12345, 1); // 1M seems a bit too much for a dev box // 500K is about the max const unsigned loopMax = 200000 / (leakCheck * 99 + 1); for (i = 0; i < loopMax; i++) { tdec.addDataToOutput(bs); } for (i = 0; i < loopMax; i++) { bs = dec->read(12345, 1); readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == 1); } unsigned throws; throws = 0; for (i = 0; i < loopMax; i++) { bs = buildBs(12345, (i % 10)); // some of these shoud throw since there's only a few steps added try { tdec.addDataToOutput(bs); } catch (runtime_error& re) { throws++; continue; } } idbassert(throws > 0); throws = 0; for (i = 0; i < loopMax; i++) { // some of these shoud throw since there's only a few steps added try { bs = dec->read(12345, (i % 10)); } catch (runtime_error& re) { throws++; continue; } readBs(bs, sessionId, stepId); idbassert(sessionId == 12345); idbassert(stepId == (i % 10)); } idbassert(throws > 0); ThdFun1 fun1(dec, 12345, 1); thread thd1(fun1); dec->removeSession(12345); thd1.join(); // delete dec; return 0; }