You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
218 lines
4.7 KiB
C++
218 lines
4.7 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: tdriver-dec.cpp 9210 2013-01-21 14:10:42Z rdempsey $
|
|
#include <iostream>
|
|
#include <cassert>
|
|
#include <stdexcept>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
using namespace boost;
|
|
|
|
#include "primitivemsg.h"
|
|
|
|
#include "bytestream.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "distributedenginecomm.h"
|
|
using namespace joblist;
|
|
|
|
class TestDistributedEngineComm
|
|
{
|
|
public:
|
|
TestDistributedEngineComm(DistributedEngineComm* dec) : fDec(dec)
|
|
{
|
|
}
|
|
void addDataToOutput(const ByteStream& bs)
|
|
{
|
|
fDec->addDataToOutput(bs);
|
|
}
|
|
|
|
private:
|
|
DistributedEngineComm* fDec;
|
|
};
|
|
|
|
namespace
|
|
{
|
|
const ByteStream buildBs(Int16 sessionId, Int16 stepId)
|
|
{
|
|
uint32_t len = sizeof(ISMPacketHeader) + 2 * sizeof(PrimitiveHeader);
|
|
ByteStream::byte* bpr = new ByteStream::byte[len];
|
|
|
|
ISMPacketHeader* hdr = reinterpret_cast<ISMPacketHeader*>(bpr);
|
|
PrimitiveHeader* p = reinterpret_cast<PrimitiveHeader*>(hdr + 1);
|
|
|
|
p->SessionID = sessionId;
|
|
p->StepID = stepId;
|
|
|
|
ByteStream bs(bpr, len);
|
|
delete[] bpr;
|
|
return bs;
|
|
}
|
|
|
|
void readBs(const ByteStream& bs, Int16& sessionId, Int16& stepId)
|
|
{
|
|
const ISMPacketHeader* hdr = reinterpret_cast<const ISMPacketHeader*>(bs.buf());
|
|
const PrimitiveHeader* p = reinterpret_cast<const PrimitiveHeader*>(hdr + 1);
|
|
sessionId = p->SessionID;
|
|
stepId = p->StepID;
|
|
return;
|
|
}
|
|
|
|
class ThdFun1
|
|
{
|
|
public:
|
|
ThdFun1(DistributedEngineComm* dec, int sessionId, int stepId)
|
|
: fDec(dec), fSessionId(sessionId), fStepId(stepId)
|
|
{
|
|
}
|
|
void operator()()
|
|
{
|
|
ByteStream bs = fDec->read(fSessionId, fStepId);
|
|
idbassert(bs.length() == 0);
|
|
return;
|
|
}
|
|
|
|
private:
|
|
DistributedEngineComm* fDec;
|
|
int fSessionId;
|
|
int fStepId;
|
|
};
|
|
|
|
} // namespace
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
int leakCheck = 0;
|
|
|
|
if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0)
|
|
leakCheck = 1;
|
|
|
|
DistributedEngineComm* dec;
|
|
|
|
dec = DistributedEngineComm::instance("./config-dec.xml");
|
|
|
|
dec->addSession(12345);
|
|
dec->addStep(12345, 0);
|
|
dec->addStep(12345, 1);
|
|
dec->addStep(12345, 3);
|
|
dec->addStep(12345, 10);
|
|
|
|
TestDistributedEngineComm tdec(dec);
|
|
ByteStream bs;
|
|
|
|
tdec.addDataToOutput(buildBs(12345, 0));
|
|
tdec.addDataToOutput(buildBs(12345, 1));
|
|
tdec.addDataToOutput(buildBs(12345, 3));
|
|
tdec.addDataToOutput(buildBs(12345, 10));
|
|
|
|
Int16 sessionId, stepId;
|
|
bs = dec->read(12345, 10);
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == 10);
|
|
|
|
bs = dec->read(12345, 1);
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == 1);
|
|
|
|
bs = dec->read(12345, 0);
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == 0);
|
|
|
|
bs = dec->read(12345, 3);
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == 3);
|
|
|
|
unsigned i;
|
|
bs = buildBs(12345, 1);
|
|
// 1M seems a bit too much for a dev box
|
|
// 500K is about the max
|
|
const unsigned loopMax = 200000 / (leakCheck * 99 + 1);
|
|
|
|
for (i = 0; i < loopMax; i++)
|
|
{
|
|
tdec.addDataToOutput(bs);
|
|
}
|
|
|
|
for (i = 0; i < loopMax; i++)
|
|
{
|
|
bs = dec->read(12345, 1);
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == 1);
|
|
}
|
|
|
|
unsigned throws;
|
|
throws = 0;
|
|
|
|
for (i = 0; i < loopMax; i++)
|
|
{
|
|
bs = buildBs(12345, (i % 10));
|
|
|
|
// some of these shoud throw since there's only a few steps added
|
|
try
|
|
{
|
|
tdec.addDataToOutput(bs);
|
|
}
|
|
catch (runtime_error& re)
|
|
{
|
|
throws++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
idbassert(throws > 0);
|
|
|
|
throws = 0;
|
|
|
|
for (i = 0; i < loopMax; i++)
|
|
{
|
|
// some of these shoud throw since there's only a few steps added
|
|
try
|
|
{
|
|
bs = dec->read(12345, (i % 10));
|
|
}
|
|
catch (runtime_error& re)
|
|
{
|
|
throws++;
|
|
continue;
|
|
}
|
|
|
|
readBs(bs, sessionId, stepId);
|
|
idbassert(sessionId == 12345);
|
|
idbassert(stepId == (i % 10));
|
|
}
|
|
|
|
idbassert(throws > 0);
|
|
|
|
ThdFun1 fun1(dec, 12345, 1);
|
|
thread thd1(fun1);
|
|
|
|
dec->removeSession(12345);
|
|
|
|
thd1.join();
|
|
|
|
// delete dec;
|
|
|
|
return 0;
|
|
}
|