1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-04 04:42:30 +03:00
Files
2022-01-21 16:43:49 +00:00

2358 lines
58 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: pingproc.cpp 2101 2013-01-21 14:12:52Z rdempsey $
#include <iostream>
#include <iomanip>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <sstream>
#include <vector>
using namespace std;
#include <boost/thread.hpp>
using namespace boost;
#include "bytestream.h"
using namespace messageqcpp;
#include "distributedenginecomm.h"
#include "primitivemsg.h"
#include "jobstep.h"
#include "batchprimitiveprocessor-jl.h"
using namespace joblist;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "brm.h"
using namespace BRM;
// Global vars
bool debug;
bool thdFcnFailure;
//
// TODO: Why is this namespace here?
namespace
{
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff)
{
if (tv2.tv_nsec < tv1.tv_nsec)
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec + 1000000000;
}
else
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
}
} // timespec_sub
//
//
class OidOperation
{
public:
enum OpType_t
{
SCAN = 0,
BLOCK = 1,
LOOPBACK = 2,
NONE = 3,
BATCHSCAN = 4,
BATCHSTEP = 5,
BATCHFILT = 6
};
OidOperation(const OID_t oid, const OpType_t opType, const uint32_t sessionId = 0);
~OidOperation(){};
void addFilter(const int8_t COP, const int64_t value);
const OID_t OID() const
{
return fOid;
}
const OpType_t OpType() const
{
return fOpType;
}
const CalpontSystemCatalog::ColType& ColumnType()
{
return fColType;
}
const uint32_t ColumnWidth() const
{
return fColType.colWidth;
}
const uint32_t FilterCount() const
{
return fFilterCount;
}
const ByteStream& FilterString()
{
return fFilterList;
}
const uint32_t SessionId() const
{
return fSessionId;
}
const uint32_t DataType() const
{
return fColType.colDataType;
}
const uint32_t BOP() const
{
return fBOP;
}
void BOP(const uint32_t bop)
{
if (FilterCount() >= 2)
fBOP = bop;
}
const uint32_t COP1() const
{
return fCOP1;
}
void COP1(const uint32_t cop)
{
fCOP1 = cop;
}
const uint32_t COP2() const
{
return fCOP2;
}
void COP2(const uint32_t cop)
{
fCOP2 = cop;
}
bool isIntegralDataType();
void setLbidTraceOn();
void setPMProfileOn();
bool LbidTrace()
{
return fLbidTrace;
}
bool PMProfile()
{
return fPMProfile;
}
void deSerializeFilter(int8_t& COP, int64_t& value);
// private:
OidOperation(){};
OID_t fOid;
ByteStream fFilterList;
uint32_t fFilterCount;
OpType_t fOpType;
CalpontSystemCatalog::ColType fColType;
uint32_t fSessionId;
uint32_t fBOP;
uint32_t fCOP1;
uint32_t fCOP2;
bool fLbidTrace;
bool fPMProfile;
}; // class OidOperation
//
//
OidOperation::OidOperation(const OID_t oid, const OpType_t opType, const uint32_t sessionId)
: fOid(oid)
, fFilterList()
, fFilterCount(0)
, fOpType(opType)
, fSessionId(sessionId)
, fBOP(BOP_NONE)
, fCOP1(COMPARE_NIL)
, fCOP2(COMPARE_NIL)
, fLbidTrace(false)
, fPMProfile(false)
{
boost::shared_ptr<CalpontSystemCatalog> cat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid());
fColType = cat->colType(oid);
fFilterList.reset();
}
void OidOperation::setLbidTraceOn()
{
fLbidTrace = true;
}
void OidOperation::setPMProfileOn()
{
fPMProfile = true;
}
void OidOperation::addFilter(const int8_t COP, const int64_t value)
{
if (fFilterCount == 2)
return;
fFilterList << (uint8_t)COP;
// converts to a type of the appropriate width, then bitwise
// copies into the filter ByteStream
switch (ColumnWidth())
{
case 1:
int8_t tmp8;
tmp8 = value;
fFilterList << *((uint8_t*)&tmp8);
break;
case 2:
int16_t tmp16;
tmp16 = value;
fFilterList << *((uint16_t*)&tmp16);
break;
case 4:
int32_t tmp32;
tmp32 = value;
fFilterList << *((uint32_t*)&tmp32);
break;
case 8: fFilterList << *((uint64_t*)&value); break;
default:
ostringstream o;
o << "addFilter: colType says OID "
<< " has a width of " << ColumnWidth();
throw runtime_error(o.str());
}
fFilterCount++;
}
void OidOperation::deSerializeFilter(int8_t& COP, int64_t& value)
{
if (fFilterCount == 0)
{
COP = COMPARE_NIL;
return;
}
fFilterList >> *(uint8_t*)&COP;
switch (ColumnWidth())
{
case 1:
int8_t tmp8;
tmp8 = value;
fFilterList >> *((uint8_t*)&tmp8);
value = tmp8;
break;
case 2:
int16_t tmp16;
fFilterList >> *((uint16_t*)&tmp16);
value = tmp16;
break;
case 4:
int32_t tmp32;
fFilterList >> *((uint32_t*)&tmp32);
value = tmp32;
break;
case 8: fFilterList >> *((uint64_t*)&value); break;
default:
ostringstream o;
o << "deSerializeFilter: colType says OID "
<< " has a width of " << ColumnWidth();
throw runtime_error(o.str());
}
fFilterCount--;
}
typedef vector<OidOperation*> OperationList;
// Only process these column types
//
bool OidOperation::isIntegralDataType()
{
if (DataType() == CalpontSystemCatalog::BIT || DataType() == CalpontSystemCatalog::TINYINT ||
DataType() == CalpontSystemCatalog::SMALLINT || DataType() == CalpontSystemCatalog::MEDINT ||
DataType() == CalpontSystemCatalog::INT || DataType() == CalpontSystemCatalog::DATE ||
DataType() == CalpontSystemCatalog::BIGINT || DataType() == CalpontSystemCatalog::DATETIME ||
DataType() == CalpontSystemCatalog::TIMESTAMP || DataType() == CalpontSystemCatalog::TIME ||
DataType() == CalpontSystemCatalog::UTINYINT || DataType() == CalpontSystemCatalog::USMALLINT ||
DataType() == CalpontSystemCatalog::UMEDINT || DataType() == CalpontSystemCatalog::UINT ||
DataType() == CalpontSystemCatalog::UBIGINT)
return true;
if (DataType() == CalpontSystemCatalog::CHAR && 1 == fColType.colWidth)
return true;
return false;
}
const ByteStream formatLoopBackMsg(const uint32_t sessionId, uint32_t uniqueId)
{
ByteStream primMsg;
ISMPacketHeader ism;
memset(&ism, 0, sizeof(ism));
ism.Command = COL_LOOPBACK;
ism.Size = sizeof(ism) + sizeof(ColLoopback);
ism.Type = 2;
primMsg.load((const uint8_t*)&ism, sizeof(ism));
struct ColLoopback lb;
memset(&lb, 0, sizeof(lb));
lb.Hdr.SessionID = sessionId;
lb.Hdr.StatementID = 0;
lb.Hdr.TransactionID = sessionId;
lb.Hdr.VerID = 0;
lb.Hdr.StepID = sessionId;
lb.Hdr.UniqueID = uniqueId;
primMsg.append((const uint8_t*)&lb, sizeof(lb));
return primMsg;
} // formatLoopBackMsg
const ByteStream formatDictionaryMsg(const uint64_t lbid, ByteStream& ridList, const uint16_t ridCount,
OidOperation& oidOp)
{
ByteStream primMsg;
DictSignatureRequestHeader hdr;
ISMPacketHeader ism;
ism.Flags = 0; // planFlagsToPrimFlags(fTraceFlags);
ism.Command = DICT_SIGNATURE;
ism.Size = sizeof(DictSignatureRequestHeader) + ridList.length();
ism.Type = 2;
hdr.Hdr.SessionID = oidOp.SessionId();
hdr.Hdr.StatementID = 0;
hdr.Hdr.TransactionID = oidOp.SessionId();
hdr.Hdr.VerID = 0;
hdr.Hdr.StepID = 0;
hdr.LBID = lbid;
hdr.PBID = 0;
idbassert(ridCount <= 8000);
hdr.NVALS = ridCount;
primMsg.load((const uint8_t*)&ism, sizeof(ism));
primMsg.append((const uint8_t*)&hdr, sizeof(DictSignatureRequestHeader));
primMsg += ridList;
return primMsg;
}
const ByteStream formatColStepMsg(const uint64_t lbid, ByteStream& ridList, const uint16_t ridCount,
OidOperation& oidOp, uint32_t uniqueId)
{
ByteStream primMsg;
NewColRequestHeader hdr;
memset(&hdr, 0, sizeof(hdr));
hdr.ism.Reserve = 0;
hdr.ism.Flags = 0;
if (oidOp.LbidTrace() == true)
hdr.ism.Flags |= PF_LBID_TRACE;
if (oidOp.PMProfile() == true)
hdr.ism.Flags |= PF_PM_PROF;
hdr.ism.Command = COL_BY_SCAN;
hdr.ism.Size = sizeof(NewColRequestHeader) + oidOp.FilterString().length() + ridList.length();
hdr.ism.Type = 2;
hdr.hdr.SessionID = oidOp.SessionId();
hdr.hdr.StatementID = 0;
hdr.hdr.TransactionID = oidOp.SessionId();
hdr.hdr.VerID = 0;
hdr.hdr.StepID = oidOp.SessionId();
hdr.hdr.UniqueID = uniqueId;
hdr.LBID = lbid;
idbassert(hdr.LBID > 0);
hdr.PBID = 0;
hdr.DataSize = oidOp.ColumnWidth();
hdr.DataType = oidOp.DataType();
hdr.OutputType = OT_BOTH;
hdr.BOP = BOP_NONE;
// hdr.InputFlags = 0;
hdr.NOPS = oidOp.FilterCount();
hdr.NVALS = ridCount;
hdr.sort = 0;
primMsg.load((const uint8_t*)&hdr, sizeof(NewColRequestHeader));
if (oidOp.FilterCount() > 0)
primMsg += oidOp.FilterString();
if (ridCount > 0)
primMsg += ridList;
return primMsg;
} // formatColStepMsg
const ByteStream formatDictionaryScanMsg(const uint64_t lbid, const uint16_t count, OidOperation& oidOp)
{
ByteStream primMsg;
DictTokenByScanRequestHeader hdr;
hdr.ism.Reserve = 0;
hdr.ism.Flags = 0;
hdr.ism.Command = DICT_TOKEN_BY_SCAN_COMPARE;
hdr.ism.Size = sizeof(DictTokenByScanRequestHeader) + oidOp.FilterString().length();
hdr.ism.Type = 2;
hdr.Hdr.SessionID = oidOp.SessionId();
hdr.Hdr.StatementID = 0;
hdr.Hdr.TransactionID = 0;
hdr.Hdr.VerID = 0;
hdr.Hdr.StepID = oidOp.SessionId();
hdr.LBID = lbid;
idbassert(hdr.LBID >= 0);
hdr.PBID = 0;
hdr.OutputType = OT_TOKEN;
hdr.BOP = oidOp.BOP();
hdr.COP1 = oidOp.COP1();
hdr.COP2 = oidOp.COP2();
hdr.NVALS = oidOp.FilterCount();
hdr.Count = count;
hdr.charsetNumber = oidOp.ColumnType().charsetNumber;
idbassert(hdr.Count > 0);
primMsg.load((const uint8_t*)&hdr.ism, sizeof(ISMPacketHeader));
primMsg.append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader));
primMsg += oidOp.FilterString();
return primMsg;
}
const ByteStream formatColScanMsg(const uint64_t lbid, const uint16_t count, OidOperation& oidOp,
uint32_t uniqueId)
{
ByteStream primMsg;
ISMPacketHeader ism;
ColByScanRangeRequestHeader fMsgHeader;
memset(&fMsgHeader, 0, sizeof(fMsgHeader));
memset(&ism, 0, sizeof(ism));
ism.Reserve = 0;
ism.Flags = 0;
if (oidOp.LbidTrace() == true)
ism.Flags |= PF_LBID_TRACE;
if (oidOp.PMProfile() == true)
ism.Flags |= PF_PM_PROF;
ism.Command = COL_BY_SCAN_RANGE;
ism.Size = sizeof(fMsgHeader) + sizeof(ism) + oidOp.FilterString().length();
ism.Type = 2;
primMsg.load((const uint8_t*)&ism, sizeof(ism));
fMsgHeader.LBID = lbid;
idbassert(fMsgHeader.LBID >= 0);
fMsgHeader.PBID = 0;
fMsgHeader.DataSize = oidOp.ColumnWidth();
fMsgHeader.DataType = oidOp.DataType();
fMsgHeader.OutputType = OT_BOTH;
fMsgHeader.BOP = oidOp.BOP();
fMsgHeader.NOPS = oidOp.FilterCount();
fMsgHeader.NVALS = 0;
fMsgHeader.Count = count; //(hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
idbassert(fMsgHeader.Count > 0);
fMsgHeader.Hdr.SessionID = oidOp.SessionId();
fMsgHeader.Hdr.StatementID = 0;
fMsgHeader.Hdr.TransactionID = oidOp.SessionId();
fMsgHeader.Hdr.VerID = 0;
fMsgHeader.Hdr.StepID = oidOp.SessionId();
fMsgHeader.Hdr.UniqueID = uniqueId;
primMsg.append((const uint8_t*)&fMsgHeader, sizeof(fMsgHeader));
if (oidOp.FilterCount() > 0)
primMsg += oidOp.FilterString();
return primMsg;
} // formatColScanMsg
//
void doBatchOp_scan(OidOperation& OidOp);
void doBatchOp_step(OidOperation& OidOp);
void doBatchOp_filt(OidOperation& OidOp);
void doBatchQueryOp(OperationList& OidOps);
void doColScan(OidOperation& OidOp);
void doColStep(OidOperation& OidOp);
void doDictScan(OidOperation& OidOp);
void doDictStep(OidOperation& OidOp);
// receive the responses from PrimProc
//
struct ThdFcn
{
void operator()()
{
uint64_t totalBytes = 0;
try
{
ByteStream ibs;
if (debug)
cout << "Waiting on " << fNumMsgs << " messages." << endl;
for (uint32_t k = 0; k < fNumMsgs; k++)
{
// cout << "reading msg #" << k << "...\n";
ibs = fDec->read(uniqueID);
// cout << "got msg #" << k << endl;
if (debug)
if (k % 10240 == 0)
cout << "ThdFcn: read " << fSessionid << " " << k << "/" << fNumMsgs << " " << ibs.length() << "/"
<< totalBytes << endl;
if (ibs.length() == 0)
break;
totalBytes += ibs.length();
}
}
catch (exception& e)
{
cerr << "read exception: " << e.what() << endl;
thdFcnFailure = true;
}
if (debug)
cout << totalBytes << " bytes read in " << fNumMsgs << " messages" << endl;
} // void operator()
uint32_t fSessionid;
uint32_t uniqueID;
DistributedEngineComm* fDec;
unsigned fNumMsgs;
}; // struct ThdFcn
struct QryThdFcn
{
void operator()()
{
uint64_t totalBytes = 0;
int64_t min;
int64_t max;
uint64_t lbid;
uint32_t cachedIO;
uint32_t physIO;
uint32_t touchedBlocks;
bool validCPData;
try
{
ByteStream ibs;
ByteStream obs;
if (debug)
cout << "Waiting on " << fNumMsgs << " messages." << endl;
for (uint32_t k = 0; k < fNumMsgs; k++)
{
// cout << "reading msg #" << k << "...\n";
ibs = fDec->read(uniqueID);
// cout << "got msg #" << k << endl;
if (debug)
if (k % 10240 == 0)
cout << "QryThdFcn: read " << fSessionid << " " << k << "/" << fNumMsgs << " " << ibs.length()
<< "/" << totalBytes << " rows: " << fRows << endl;
if (ibs.length() == 0)
break;
totalBytes += ibs.length();
fRows +=
fBpp.getTableBand(ibs, &obs, &validCPData, &lbid, &min, &max, &cachedIO, &physIO, &touchedBlocks);
fBlockTouched += touchedBlocks;
}
}
catch (exception& e)
{
cerr << "read exception: " << e.what() << endl;
thdFcnFailure = true;
}
if (debug)
cout << totalBytes << " bytes read in " << fNumMsgs << " messages for " << fRows << " rows and "
<< fBlockTouched << " blocks\n";
} // void operator()
QryThdFcn(BatchPrimitiveProcessorJL& bpp, uint64_t& rows, uint32_t& blk)
: fBpp(bpp), fNumMsgs(0), fRows(rows), fBlockTouched(blk)
{
}
BatchPrimitiveProcessorJL& fBpp;
uint32_t fSessionid;
DistributedEngineComm* fDec;
unsigned fNumMsgs;
uint64_t& fRows;
uint32_t& fBlockTouched;
uint32_t uniqueID;
}; // struct ThdFcn
struct BatchScanThr
{
BatchScanThr(OidOperation& oidOp) : fOidOp(oidOp)
{
}
void operator()()
{
doBatchOp_scan(fOidOp);
} // void operator()
OidOperation& fOidOp;
}; // struct BatchScanThr
struct BatchStepThr
{
BatchStepThr(OidOperation& oidOp) : fOidOp(oidOp)
{
}
void operator()()
{
doBatchOp_step(fOidOp);
} // void operator()
OidOperation& fOidOp;
}; // struct BatchStepThr
struct BatchFiltThr
{
BatchFiltThr(OidOperation& oidOp) : fOidOp(oidOp)
{
}
void operator()()
{
doBatchOp_filt(fOidOp);
} // void operator()
OidOperation& fOidOp;
}; // struct BatchFiltThr
struct BatchQueryThr
{
BatchQueryThr(OperationList& oidOps) : fOidOps(oidOps)
{
}
void operator()()
{
doBatchQueryOp(fOidOps);
} // void operator()
OperationList& fOidOps;
}; // struct
struct ColStepThr
{
ColStepThr(OidOperation& oidOp) : fOidOp(oidOp)
{
}
void operator()()
{
doColStep(fOidOp);
} // void operator()
OidOperation& fOidOp;
}; // struct ColStepThr
struct ColScanThr
{
ColScanThr(OidOperation& oidOp) : fOidOp(oidOp)
{
}
void operator()()
{
doColScan(fOidOp);
} // void operator()
OidOperation& fOidOp;
}; // struct ColStepThr
struct DictSigThr
{
}; // DictSigThr
struct DictScanThr
{
}; // DictScanThr
// doColScan
void doDictionaryScan(OidOperation& OidOp)
{
}
// doColScan
void doColScan(OidOperation& OidOp)
{
if (debug)
cout << "beginning doColScan\n";
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
struct timespec ts1;
struct timespec ts2;
struct timespec diff;
uint32_t uniqueID;
uint32_t sessionid = getpid();
uint32_t totalBlks = 0;
// dec->addSession(sessionid);
// dec->addStep(sessionid, sessionid);
DBRM dbrm;
uniqueID = dbrm.getUnique32();
dec->addQueue(uniqueID);
int err = dbrm.lookup(OidOp.OID(), lbidRanges);
if (err)
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
err = dbrm.getHWM(OidOp.OID(), hwm);
if (err)
throw runtime_error("doAColScan: BRM HWM lookup failure (3)");
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
ThdFcn f1;
f1.fSessionid = sessionid;
f1.uniqueID = uniqueID;
f1.fDec = dec;
f1.fNumMsgs = 0;
thdFcnFailure = false;
uint32_t rangeSize = 0;
ByteStream obs;
// calculate the expected number of messages
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
totalBlks += rangeSize;
} // for
f1.fNumMsgs = (totalBlks / OidOp.ColumnWidth());
if (0 < totalBlks % OidOp.ColumnWidth())
++f1.fNumMsgs;
idbassert(f1.fNumMsgs);
if (debug)
cout << "Scanning OID " << OidOp.OID() << " " << f1.fNumMsgs << " msgs" << endl;
thread t1(f1);
clock_gettime(CLOCK_REALTIME, &ts1);
// send the primitive requests
int rCount = 0;
totalBlks = 0;
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
try
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "doAColScan dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
obs = formatColScanMsg(lbid, rangeSize, OidOp, uniqueID);
if (obs.length() > 0)
{
dec->write(obs);
rCount++;
if (debug)
cout << "colScan: " << rCount << "/" << lbidRanges.size() << " sending " << obs.length()
<< " bytes "
<< " lbid " << lbid << " sz " << rangeSize << endl;
}
}
catch (exception& e)
{
cerr << "catch " << e.what() << endl;
}
totalBlks += rangeSize;
} // for (lbidRanges ...
t1.join();
clock_gettime(CLOCK_REALTIME, &ts2);
// dec->removeSession(sessionid);
dec->removeQueue(uniqueID);
timespec_sub(ts1, ts2, diff);
cout << "ColScan stats OID: " << OidOp.OID() << "\tFilter: " << (int)OidOp.FilterCount()
<< "\tBlocks: " << (int)totalBlks << "\tElapse: " << diff.tv_sec + (diff.tv_nsec / 1000000000.0)
<< "s";
float rate = 0;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << " Blks/sec : " << rate << endl;
if (thdFcnFailure)
cout << "There was a failure in the read thread." << endl;
cout << endl;
} // doAScan
//
void usage()
{
cerr << "PingProc operation [filter] [operation] [filter] [reporting]" << endl
<< "PingProc -c -s <oid> -gt 0 -t <oid> -d " << endl
<< "\t---- operation flags ----" << endl
<< "\t--scan -s <scan the oid>" << endl
<< "\t--block -t <colstep the oid>" << endl
<< "\t--BatchPrimitiveScan -B <batch scan the oid>" << endl
<< "\t--BatchPrimitiveStep -Z <batch step the oid>" << endl
<< "\t--concurrent -c perform each operation in its own thread" << endl
<< "\t--lbid-trace -lb set lbid trace flag in request" << endl
<< "\t---- filter flags ----" << endl
<< "\t--equal -eq <int> equivalency test of values in a block>" << endl
<< "\t--greater-than -gt <int> - greater than (>) test of values in a block" << endl
<< "\t--greater-than-equal -ge <int> greater than or equal to (>=) test of values in a block" << endl
<< "\t--less-than -lt <int> less than (<) test of values in a block" << endl
<< "\t--less-than-equal -le <int> less than or equal to (<=) test of values in a block" << endl
<< "\t--not-equal -ne <int> not equal to (!=) test of values in a block" << endl
<< "\t--bop <1 or 0> binary operator when 2 comparison filters are present" << endl
<< "\t---- reporting flags ----" << endl
<< "\t--debug -d turn debug output on>" << endl
<< "\t--list <oid> -list <oid> -l <oid> print out all oids and their ranges" << endl
<< "\t--loopback <count=100000> -p <count> send count loopback requests" << endl
<< "\t--query -q run batch query for all the oids (enter with -B or -Z)" << endl;
} // usage()
const int64_t getInt(string s)
{
if (s.length() <= 0)
return -1;
// if (atoll(s.data()) < 0)
// return -1;
return atoll(s.data());
} // getInt
// dictionary
void doDictionarySig(OidOperation& OidOp)
{
} // doDictionarySig
// col step
void doColStep(OidOperation& OidOp)
{
struct timespec ts1;
struct timespec ts2;
struct timespec diff;
DBRM dbrm;
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
uint32_t totalBlks = 0;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
ThdFcn f1;
// dec->addSession(OidOp.SessionId());
// dec->addStep(OidOp.SessionId(), OidOp.SessionId());
uint32_t uniqueID = dbrm.getUnique32();
dec->addQueue(uniqueID);
f1.fSessionid = OidOp.SessionId();
f1.uniqueID = uniqueID;
f1.fDec = dec;
f1.fNumMsgs = 0;
thdFcnFailure = false;
ByteStream ridlist;
uint16_t ridCount = 0; // BLOCK_SIZE/OidOp.ColumnWidth();
for (uint16_t i = 0; i < ridCount; i++)
ridlist << i;
int err = dbrm.lookup(OidOp.OID(), lbidRanges);
if (err)
throw runtime_error("doAColStep: BRM LBID range lookup failure (1)");
err = dbrm.getHWM(OidOp.OID(), hwm);
if (err)
throw runtime_error("doAColStep: BRM HWM lookup failure (3)");
uint32_t rangeSize = 0;
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "pColStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
break; // continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
totalBlks += rangeSize;
} // for
uint32_t colwidth = OidOp.ColumnWidth();
f1.fNumMsgs = totalBlks / (colwidth);
if (0 < totalBlks % colwidth)
++f1.fNumMsgs;
idbassert(f1.fNumMsgs);
thread t1(f1);
ByteStream obs;
totalBlks = 0;
clock_gettime(CLOCK_REALTIME, &ts1);
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
if (debug)
cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
break; // continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
for (unsigned i = 0; i < rangeSize; i++)
{
obs += formatColStepMsg(lbid + i, ridlist, ridCount, OidOp, uniqueID);
if (0 == (i + 1) % colwidth)
{
dec->write(obs);
if (debug && i + 1 == rangeSize)
cout << "colStep: " << i << "/" << rangeSize << " " << obs.length() << " lbid " << lbid + i << endl;
obs.restart();
}
}
totalBlks += rangeSize;
} // for
if (obs.length())
{
dec->write(obs);
if (debug)
cout << "colStep: last"
<< "/" << rangeSize << " " << obs.length() << endl;
}
obs.reset();
t1.join(); //@bug 849 moved join here and changed output to be like pColScan.
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
// t1.join();
cout << "ColStep stats OID: " << OidOp.OID() << "\tFilter: " << (int)OidOp.FilterCount()
<< "\tBlocks: " << (int)totalBlks << "\tElapse: " << diff.tv_sec + (diff.tv_nsec / 1000000000.0)
<< "s";
float rate = 0;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tBlks/sec " << rate << endl;
if (thdFcnFailure)
cerr << "There was a failure in the read thread." << endl;
cout << endl;
} // doColStep
void doBatchOp_scan(OidOperation& OidOp)
{
struct timespec ts1, ts2, diff;
JobStepAssociation injs, outjs;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
ThdFcn f1;
boost::shared_ptr<CalpontSystemCatalog> sysCat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid());
pColScanStep scan(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId,
OidOp.fSessionId, OidOp.fSessionId, rm);
int32_t filters = OidOp.FilterCount();
while (OidOp.FilterCount() > 0)
{
int8_t cop;
int64_t value;
OidOp.deSerializeFilter(cop, value);
scan.addFilter(cop, value);
}
BatchPrimitiveProcessorJL bpp;
ByteStream bs;
DBRM dbrm;
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
uint32_t uniqueID = dbrm.getUnique32();
bpp.setUniqueID(uniqueID);
bpp.setSessionID(OidOp.SessionId());
bpp.setStepID(OidOp.SessionId());
bpp.addFilterStep(scan);
cout << "session number = " << OidOp.SessionId() << endl;
// dec->addSession(OidOp.SessionId());
// dec->addStep(OidOp.SessionId(), OidOp.SessionId());
dec->addQueue(uniqueID);
f1.fSessionid = OidOp.SessionId();
f1.uniqueID = uniqueID;
f1.fDec = dec;
thdFcnFailure = false;
int err = dbrm.lookup(OidOp.OID(), lbidRanges);
if (err)
{
cerr << "doAColScan: BRM LBID range lookup failure (1)\n";
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
}
err = dbrm.getHWM(OidOp.OID(), hwm);
if (err)
{
cerr << "doAColScan: BRM HWM lookup failure (3)" << endl;
throw runtime_error("doAColScan: BRM HWM lookup failure (3)");
}
f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0);
thread t1(f1);
bpp.createBPP(bs);
dec->write(bs);
bs.restart();
uint32_t rangeSize = 0, totalBlks = 0;
clock_gettime(CLOCK_REALTIME, &ts1);
// cout << "BPP scaning\n";
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
bpp.setLBID(lbid);
bpp.setCount(rangeSize / OidOp.fColType.colWidth + (rangeSize % OidOp.fColType.colWidth ? 1 : 0));
bpp.runBPP(bs);
dec->write(bs);
// cout << "sending the BPP\n";
bpp.reset();
bs.restart();
totalBlks += rangeSize;
}
t1.join();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
float rate = 0;
cout << "ColStep stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s"
<< "\tFilters: " << filters << "\tBlocks : " << (int)totalBlks;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tBlks/sec " << rate << endl;
if (thdFcnFailure)
cerr << "There was a failure in the read thread." << endl;
bpp.destroyBPP(bs);
dec->write(bs);
cout << endl;
}
void doBatchOp_filt(OidOperation& OidOp)
{
struct timespec ts1, ts2, diff;
JobStepAssociation injs, outjs;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
ThdFcn f1;
boost::shared_ptr<CalpontSystemCatalog> sysCat = CalpontSystemCatalog::makeCalpontSystemCatalog(getpid());
ByteStream bs;
DBRM dbrm;
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
uint32_t uniqueID;
BatchPrimitiveProcessorJL bpp;
uniqueID = dbrm.getUnique32();
bpp.setUniqueID(uniqueID);
bpp.setSessionID(OidOp.SessionId());
bpp.setStepID(OidOp.SessionId());
pColScanStep scan(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId,
OidOp.fSessionId, OidOp.fSessionId, rm);
while (OidOp.FilterCount() > 0)
{
int8_t cop;
int64_t value;
OidOp.deSerializeFilter(cop, value);
scan.addFilter(cop, value);
}
bpp.addFilterStep(scan);
pColStep step(injs, outjs, dec, sysCat, OidOp.fOid + 1, OidOp.fOid + 1, OidOp.fSessionId, 0,
OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, rm);
while (OidOp.FilterCount() > 0)
{
int8_t cop;
int64_t value;
OidOp.deSerializeFilter(cop, value);
step.addFilter(cop, value);
}
bpp.addFilterStep(step);
execplan::CalpontSystemCatalog::ColType colType;
FilterStep filt(OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, colType);
filt.setBOP(OidOp.BOP());
bpp.addFilterStep(filt);
cout << "session number = " << OidOp.SessionId() << endl;
// dec->addSession(OidOp.SessionId());
// dec->addStep(OidOp.SessionId(), OidOp.SessionId());
dec->addQueue(uniqueID);
f1.fSessionid = OidOp.SessionId();
f1.uniqueID = uniqueID;
f1.fDec = dec;
thdFcnFailure = false;
int err = dbrm.lookup(OidOp.OID(), lbidRanges);
if (err)
{
cerr << "doBatchOp_filt: BRM LBID range lookup failure (1)\n";
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
}
err = dbrm.getHWM(OidOp.OID(), hwm);
if (err)
{
cerr << "doBatchOp_filt: BRM HWM lookup failure (2)" << endl;
throw runtime_error("doBatchOp_filt: BRM HWM lookup failure (2)");
}
f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0);
thread t1(f1);
bpp.createBPP(bs);
dec->write(bs);
bs.restart();
uint32_t rangeSize = 0, totalBlks = 0;
clock_gettime(CLOCK_REALTIME, &ts1);
// cout << "BPP scaning\n";
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "doBatchOp_filt: dbrm.lookup failed for lbid (3)" << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
bpp.setLBID(lbid);
bpp.setCount(rangeSize / OidOp.fColType.colWidth + (rangeSize % OidOp.fColType.colWidth ? 1 : 0));
bpp.runBPP(bs);
dec->write(bs);
// cout << "sending the BPP\n";
bpp.reset();
bs.restart();
totalBlks += rangeSize;
}
t1.join();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
float rate = 0;
cout << "doBatchOp_filt stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0))
<< "s"
<< "\tBlocks : " << (int)totalBlks;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tBlks/sec " << rate << endl;
if (thdFcnFailure)
cerr << "There was a failure in the read thread." << endl;
bpp.destroyBPP(bs);
dec->write(bs);
cout << endl;
}
void doBatchQueryOp(OperationList& OidOps)
{
struct timespec ts1, ts2, diff;
JobStepAssociation injs, outjs;
BatchPrimitiveProcessorJL bpp;
uint64_t rows = 0;
uint32_t blockTouched = 0;
DBRM dbrm;
QryThdFcn f1(bpp, rows, blockTouched);
OperationList::iterator filterOp = OidOps.begin();
uint32_t sessionId = (*filterOp)->SessionId();
uint32_t uniqueID = dbrm.getUnique32();
bpp.setUniqueID(uniqueID);
bpp.setSessionID(sessionId);
bpp.setStepID(sessionId);
cout << "session number = " << sessionId << endl;
f1.fSessionid = sessionId;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
// dec->addSession(sessionId);
// dec->addStep(sessionId, sessionId);
dec->addQueue(uniqueID);
f1.fDec = dec;
f1.uniqueID = uniqueID;
// boost::shared_ptr<CalpontSystemCatalog> sysCat =
//execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid());
// first column is made into the first scan filter step including filters
OID_t scanOid = (*filterOp)->fOid;
uint32_t scanWidth = (*filterOp)->ColumnWidth();
uint32_t maxWidth = scanWidth;
uint32_t pid = getpid();
pColScanStep scan(injs, outjs, dec, execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(pid), scanOid,
scanOid, sessionId, 0, sessionId, sessionId, sessionId, rm);
uint32_t filterCount = (*filterOp)->FilterCount();
while ((*filterOp)->FilterCount() > 0)
{
int8_t cop;
int64_t value;
(*filterOp)->deSerializeFilter(cop, value);
scan.addFilter(cop, value);
}
bpp.addFilterStep(scan);
// Any other columns that are batch scans are added as filter steps, the rest as project steps.
// The last filter step is added as a passthru step into the project list.
OperationList::iterator listend = OidOps.end();
for (OperationList::iterator op = OidOps.begin() + 1; op != listend; ++op)
{
pColStep step(injs, outjs, dec, execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(pid),
(*op)->fOid, (*op)->fOid, sessionId, 0, sessionId, sessionId, sessionId, rm);
if ((*op)->OpType() == OidOperation::BATCHSCAN)
{
filterCount += (*op)->FilterCount();
while ((*op)->FilterCount() > 0)
{
int8_t cop;
int64_t value;
(*op)->deSerializeFilter(cop, value);
step.addFilter(cop, value);
}
filterOp = op;
bpp.addFilterStep(step);
}
else
{
bpp.addProjectStep(step);
}
if ((*op)->ColumnWidth() > maxWidth)
maxWidth = (*op)->ColumnWidth();
}
PassThruStep pass(injs, outjs, dec, (*filterOp)->ColumnType(), (*filterOp)->fOid, (*filterOp)->fOid,
sessionId, 0, sessionId, sessionId, sessionId, false, rm);
bpp.addProjectStep(pass);
ByteStream bs;
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
thdFcnFailure = false;
int err = dbrm.lookup(scanOid, lbidRanges);
if (err)
{
cerr << "doQueryScan: BRM LBID range lookup failure (1)\n";
throw runtime_error("doQueryScan: BRM LBID range lookup failure (1)");
}
err = dbrm.getHWM(scanOid, hwm);
if (err)
{
cerr << "doQueryScan: BRM HWM lookup failure (3)" << endl;
throw runtime_error("doQueryScan: BRM HWM lookup failure (3)");
}
f1.fNumMsgs = hwm / scanWidth + (hwm % scanWidth ? 1 : 0);
thread t1(f1);
uint32_t cnt = dbrm.getExtentSize() / maxWidth;
bpp.createBPP(bs);
dec->write(bs);
bs.restart();
uint32_t rangeSize = 0, totalBlks = 0;
clock_gettime(CLOCK_REALTIME, &ts1);
// cout << "BPP scaning\n";
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
cerr << "doBatchQuery dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
uint32_t totallbid = rangeSize / scanWidth + (0 < rangeSize % scanWidth ? 1 : 0);
while (0 < totallbid)
{
if (totallbid < cnt)
cnt = totallbid;
bpp.setLBID(lbid);
bpp.setCount(cnt);
bpp.runBPP(bs);
dec->write(bs);
// cout << "sending the BPP with range cnt " << cnt << " lbid " << lbid << "\n";
bpp.reset();
bs.restart();
lbid += cnt * scanWidth;
totallbid -= cnt;
}
for (OperationList::iterator op = OidOps.begin(); op != OidOps.end(); ++op)
{
totalBlks += (uint32_t)(rangeSize * (double)((double)(*op)->ColumnWidth() / scanWidth));
}
}
t1.join();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
float rate = 0;
cout << "QueryScan stats - " << bpp.toString()
<< "\tElapsed: " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s"
<< "\tFilters: " << filterCount << "\tBlocks : " << (int)totalBlks;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tBlks/sec " << rate << endl;
cout << "\tTouched Blocks: " << blockTouched;
rate = blockTouched / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tTouched Blks/sec " << rate << endl;
cout << "\tRows: " << rows;
rate = rows / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\t\tRows/sec " << fixed << setprecision(2) << rate << endl;
if (thdFcnFailure)
cerr << "There was a failure in the read thread." << endl;
bpp.destroyBPP(bs);
dec->write(bs);
cout << endl;
}
void doBatchOp_step(OidOperation& OidOp)
{
struct timespec ts1, ts2, diff;
DBRM dbrm;
BRM::LBIDRange_v lbidRanges;
HWM_t hwm = 0;
LBIDRange_v::iterator it;
OID_t tmp;
uint32_t fbo;
uint32_t totalBlks = 0;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
ThdFcn f1;
JobStepAssociation injs, outjs;
boost::shared_ptr<CalpontSystemCatalog> sysCat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid());
pColStep step(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId,
OidOp.fSessionId, OidOp.fSessionId, rm);
int32_t filters = OidOp.FilterCount();
while (OidOp.FilterCount() > 0)
{
int8_t cop;
int64_t value;
OidOp.deSerializeFilter(cop, value);
step.addFilter(cop, value);
}
BatchPrimitiveProcessorJL bpp;
ElementType et;
ByteStream obs;
uint32_t uniqueID = dbrm.getUnique32();
bpp.setUniqueID(uniqueID);
bpp.setSessionID(OidOp.SessionId());
bpp.setStepID(OidOp.SessionId());
bpp.addFilterStep(step);
// dec->addSession(OidOp.SessionId());
// dec->addStep(OidOp.SessionId(), OidOp.SessionId());
dec->addQueue(uniqueID);
f1.fSessionid = OidOp.SessionId();
f1.uniqueID = uniqueID;
f1.fDec = dec;
f1.fNumMsgs = 0;
thdFcnFailure = false;
ByteStream ridlist;
uint16_t ridCount = 0; // BLOCK_SIZE/OidOp.ColumnWidth();
for (uint16_t i = 0; i < ridCount; i++)
ridlist << i;
int err = dbrm.lookup(OidOp.OID(), lbidRanges);
if (err)
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
err = dbrm.getHWM(OidOp.OID(), hwm);
if (err)
throw runtime_error("doAColScan: BRM HWM lookup failure (3)");
uint32_t rangeSize = 0;
f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0);
thread t1(f1);
totalBlks = 0;
bpp.createBPP(obs);
dec->write(obs);
obs.restart();
clock_gettime(CLOCK_REALTIME, &ts1);
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
{
BRM::LBID_t lbid = (*it).start;
if (dbrm.lookup(lbid, 0, false, tmp, fbo))
{
if (debug)
cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl;
abort();
}
if (hwm < fbo)
continue;
rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1);
for (unsigned i = 0; i < rangeSize; i++)
{
/* insert all rids for this LBID */
for (uint32_t j = 0; j < BLOCK_SIZE / OidOp.fColType.colWidth; ++j)
{
et.first = ((fbo + i) * BLOCK_SIZE / OidOp.fColType.colWidth) + j;
et.second = j;
bpp.addElementType(et);
}
/* If on a logical block boundary, send the primitive */
if (i % OidOp.fColType.colWidth == (unsigned)OidOp.fColType.colWidth - 1)
{
// cout << "serializing at extent offset " << i << endl;
bpp.runBPP(obs);
dec->write(obs);
bpp.reset();
obs.restart();
}
if (debug && i + 1 == rangeSize)
cout << "colStep: " << i + 1 << "/" << rangeSize << " " << obs.length() << " lbid " << lbid + i
<< endl;
}
if (rangeSize % OidOp.fColType.colWidth)
{
// cout << "serializing last msg\n";
bpp.runBPP(obs);
dec->write(obs);
bpp.reset();
obs.restart();
}
totalBlks += rangeSize;
} // for
t1.join();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
float rate = 0;
cout << "ColStep stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s"
<< "\tFilters: " << filters << "\tBlocks : " << (int)totalBlks;
rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\tBlks/sec " << rate << endl;
if (thdFcnFailure)
cerr << "There was a failure in the read thread." << endl;
bpp.destroyBPP(obs);
dec->write(obs);
cout << endl;
}
//
//
void doListOp(const OID_t o = 0)
{
DBRM dbrm;
BRM::LBIDRange_v lbidRanges;
LBIDRange_v::iterator it;
OID_t oid = 3000;
HWM_t hwm = 0;
if (o != 0)
{
int err = dbrm.lookup(o, lbidRanges);
if (err)
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
err = dbrm.getHWM(o, hwm);
if (err)
throw runtime_error("doAColScan: BRM HWM lookup failure (3)");
cout << "Object ID: " << o << " HWM: " << hwm << endl;
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
cout << "\tStart: " << (*it).start << " sz: " << (*it).size << endl;
}
else
{
for (; oid < 100000; oid++)
{
int err = dbrm.lookup(oid, lbidRanges);
if (lbidRanges.size() == 0)
continue;
if (err)
throw runtime_error("doAColScan: BRM LBID range lookup failure (1)");
err = dbrm.getHWM(oid, hwm);
if (err)
throw runtime_error("doAColScan: BRM HWM lookup failure (3)");
cout << "Object ID: " << oid << " HWM: " << hwm << endl;
for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
cout << "\tStart: " << (*it).start << " sz: " << (*it).size << endl;
hwm = 0;
lbidRanges.clear();
} // for (; oid...
} // else
} // doListOp
//
// do LoopBackOp
void doLoopBack(const uint64_t loopcount)
{
ByteStream lbMsg;
struct timespec ts1;
struct timespec ts2;
struct timespec diff;
uint32_t sessionid = getpid();
DBRM dbrm;
ResourceManager* rm = ResourceManager::instance();
DistributedEngineComm* dec = DistributedEngineComm::instance(rm);
ThdFcn f1;
// dec->addSession(sessionid);
// dec->addStep(sessionid, sessionid);
uint32_t uniqueID = dbrm.getUnique32();
dec->addQueue(uniqueID);
f1.fSessionid = sessionid;
f1.uniqueID = uniqueID;
f1.fDec = dec;
f1.fNumMsgs = loopcount;
thdFcnFailure = false;
thread t1(f1);
lbMsg = formatLoopBackMsg(sessionid, uniqueID);
cout << "Sending " << loopcount << " LOOPBACK requests" << endl;
clock_gettime(CLOCK_REALTIME, &ts1);
for (uint64_t i = 0; i < loopcount; i++)
{
lbMsg = formatLoopBackMsg(sessionid, uniqueID);
dec->write(lbMsg);
}
clock_gettime(CLOCK_REALTIME, &ts2);
cout << loopcount << " LOOPBACK msgs sent" << endl;
t1.join();
timespec_sub(ts1, ts2, diff);
cout << "\ttotal runtime: " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << endl;
float rate = 0;
rate = loopcount / (diff.tv_sec + (diff.tv_nsec / 1000000000.0));
cout << "\t" << rate << " rqsts/s" << endl;
dec->removeQueue(uniqueID);
} // doLoopBack
} // namespace
/*---------------------------------------------------------------------------
//Command line parameter definition
//
// -o <oid1> -o <oid2> -o . . . <oidN>
// -s <scan the oid>
// -t <colstep the oid>
// -d <turn debug output on>
// -eq -equal <int> < == test of values in a block>
// -gt -greater-than <int> < greater than (>) test of values in a block>
// -ge -greater-than-equal <int> < greater than or equal to (>=) test of values in a block>
// -lt -less-than <int> < less than (<) test of values in a block>
// -le -less-than-equal <int> < less than or equal to (<=) test of values in a block>
// -ne -not-equal <int> < not equal to (!=) test of values in a block>
// -bop <1 or 0> <1 AND 0 OR binary operator when 2 comparison filters are present>
// --list -list -l <optional oid> <print out all oids or the specified oid and their ranges>
// --loopback -loopback <count> send <count> loopback requests
// --concurrent run all jobs currently if set to true. defaults to false
// --lbid-trace -lb turn on lbid tracing
// --pm-profile -pmp turn on pm profiling
-----------------------------------------------------------------------------*/
int main(int argc, char** argv)
{
int64_t eq_val = 0;
int64_t gt_val = 0;
int64_t ge_val = 0;
int64_t lt_val = 0;
int64_t le_val = 0;
int64_t ne_val = 0;
int64_t bop_val = 0;
int64_t loopback_count = 100000;
int64_t scanOid = 0;
int64_t stepOid = 0;
bool list = false;
// OID_t listOid=0;
string oidString;
vector<OID_t> oidv;
int ch = 0;
bool concurrent_flag = false;
bool lbidtrace_flag = false;
bool pmprofile_flag = false;
bool query_flag = false;
enum CLA_ENUM
{
OID = (int)0,
SCANOP = (int)1,
BLOCKOP = (int)2,
DEBUG = (int)3,
EQFILTER = (int)4,
GTFILTER = (int)5,
GEFILTER = (int)6,
LTFILTER = (int)7,
LEFILTER = (int)8,
NEFILTER = (int)9,
BOP = (int)10,
LISTOP = (int)11,
LOOPBACKOP = (int)12,
CONCURRENT = (int)13,
LBIDTRACE = (int)14,
PMPROFILE = (int)15,
INVALIDOP = (int)16,
BATCHSCANOP = (int)17,
BATCHSTEPOP = (int)18,
BATCHFILTOP = (int)19,
QUERYOP = (int)20
};
/**
// longopt struct
struct option {
const char *name;
int has_arg;
int *flag;
int val;
};
**/
static struct option long_options[] = {
// {const char *name, int has_arg, int *flag, int val},
{"scan", required_argument, NULL, SCANOP},
{"block", required_argument, NULL, BLOCKOP},
{"debug", no_argument, NULL, DEBUG},
{"equal", required_argument, NULL, EQFILTER},
{"eq", required_argument, NULL, EQFILTER},
{"greater-than", required_argument, NULL, GTFILTER},
{"gt", required_argument, NULL, GTFILTER},
{"greater-than-equal", required_argument, NULL, GEFILTER},
{"ge", required_argument, NULL, GEFILTER},
{"less-than", required_argument, NULL, LTFILTER},
{"lt", required_argument, NULL, LTFILTER},
{"less-than-equal", required_argument, NULL, LEFILTER},
{"le", required_argument, NULL, LEFILTER},
{"not-equal", required_argument, NULL, NEFILTER},
{"ne", required_argument, NULL, NEFILTER},
{"bop", optional_argument, NULL, BOP},
{"list", no_argument, NULL, LISTOP},
{"loopback", optional_argument, NULL, LOOPBACKOP},
{"concurrent", no_argument, NULL, CONCURRENT},
{"lbid-trace", no_argument, NULL, LBIDTRACE},
{"lb", no_argument, NULL, LBIDTRACE},
{"pm-prof", no_argument, NULL, PMPROFILE},
{"pm-profile", no_argument, NULL, PMPROFILE},
{"pmp", no_argument, NULL, PMPROFILE},
{"batchscan", required_argument, NULL, BATCHSCANOP},
{"batchstep", required_argument, NULL, BATCHSTEPOP},
{"batchfilt", required_argument, NULL, BATCHFILTOP},
{"queryop", no_argument, NULL, QUERYOP},
{0, 0, 0, 0}};
OidOperation* currOp = NULL;
OperationList OpList;
if (argc <= 1)
{
usage();
}
// process command line arguments
while ((ch = getopt_long_only(argc, argv, "B:Z:F:ds:t:lcqp:", long_options, NULL)) != -1)
{
pid_t pidId = getpid();
switch (ch)
{
case SCANOP:
case 's':
if (optarg)
scanOid = getInt(optarg);
// cout << "OPT=" << ch << " ARG " << scanOid << endl;
currOp = NULL;
if (scanOid > 0)
currOp = new OidOperation(scanOid, OidOperation::SCAN, pidId);
else
{
cout << "PingProc: scan missing or invalid OID parameter value" << endl;
break;
}
if (currOp && currOp->isIntegralDataType())
OpList.push_back(currOp);
else
{
cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl;
delete currOp;
currOp = NULL;
}
break;
case BATCHSCANOP:
case 'B':
if (optarg)
scanOid = getInt(optarg);
else
cout << "no optarg\n";
cout << "OPT=" << ch << " ARG " << scanOid << endl;
currOp = NULL;
if (scanOid > 0)
currOp = new OidOperation(scanOid, OidOperation::BATCHSCAN, pidId);
else
{
cout << "PingProc: batch scan missing or invalid OID parameter value" << endl;
break;
}
if (currOp && currOp->isIntegralDataType())
OpList.push_back(currOp);
else
{
cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl;
delete currOp;
currOp = NULL;
}
break;
case BATCHSTEPOP:
case 'Z':
if (optarg)
scanOid = getInt(optarg);
else
cout << "no optarg\n";
cout << "OPT=" << ch << " ARG " << scanOid << endl;
currOp = NULL;
if (scanOid > 0)
currOp = new OidOperation(scanOid, OidOperation::BATCHSTEP, pidId);
else
{
cout << "PingProc: batch step missing or invalid OID parameter value" << endl;
break;
}
if (currOp && currOp->isIntegralDataType())
OpList.push_back(currOp);
else
{
cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl;
delete currOp;
currOp = NULL;
}
break;
case BATCHFILTOP:
case 'F':
if (optarg)
scanOid = getInt(optarg);
else
cout << "no optarg\n";
cout << "OPT=" << ch << " ARG " << scanOid << endl;
currOp = NULL;
if (scanOid > 0)
currOp = new OidOperation(scanOid, OidOperation::BATCHFILT, pidId);
else
{
cout << "PingProc: batch filter missing or invalid OID parameter value" << endl;
break;
}
if (currOp && currOp->isIntegralDataType())
OpList.push_back(currOp);
else
{
cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl;
delete currOp;
currOp = NULL;
}
break;
case BLOCKOP:
case 't':
if (optarg)
stepOid = getInt(optarg);
// cout << "OPT=" << ch << " ARG " << stepOid << endl;
currOp = NULL;
if (stepOid > 0)
currOp = new OidOperation(stepOid, OidOperation::BLOCK, pidId);
else
{
cout << "PingProc: step missing or invalid OID parameter value" << endl;
break;
}
if (currOp && currOp->isIntegralDataType())
OpList.push_back(currOp);
else
{
cout << "PingProc cannot process this ColumnType-oid: " << stepOid << endl;
delete currOp;
currOp = NULL;
}
break;
case DEBUG:
case 'd':
// cout << "OPT=" << ch << endl;
debug = true;
break;
case EQFILTER:
if (optarg)
eq_val = getInt(optarg);
else
eq_val = 0;
cout << "OPT=" << ch << " ARG=" << eq_val << endl;
if (currOp)
currOp->addFilter(COMPARE_EQ, eq_val);
else
; // TODO: error Processing
break;
case GTFILTER:
if (optarg)
gt_val = getInt(optarg);
else
gt_val = 0;
// cout << "OPT=" << ch << " ARG=" << gt_val << endl;
if (currOp)
currOp->addFilter(COMPARE_GT, gt_val);
break;
case GEFILTER:
if (optarg)
ge_val = getInt(optarg);
else
ge_val = 0;
// cout << "OPT=" << ch << " ARG=" << ge_val << endl;
if (currOp)
currOp->addFilter(COMPARE_GE, ge_val);
break;
case LTFILTER:
if (optarg)
lt_val = getInt(optarg);
else
lt_val = 0;
// cout << "OPT=" << ch << " ARG=" << lt_val << endl;
if (currOp)
currOp->addFilter(COMPARE_LT, lt_val);
break;
case LEFILTER:
if (optarg)
le_val = getInt(optarg);
else
le_val = 0;
// cout << "OPT=" << ch << " ARG=" << le_val << endl;
if (currOp)
currOp->addFilter(COMPARE_LE, le_val);
break;
case NEFILTER:
case 'n':
if (optarg)
ne_val = getInt(optarg);
else
ne_val = 0;
// cout << "OPT=" << ch << " ARG=" << ne_val << endl;
if (currOp)
currOp->addFilter(COMPARE_NE, ne_val);
break;
case BOP:
case 'b':
if (optarg)
bop_val = getInt(optarg);
else
bop_val = 1; // assume AND
// cout << "OPT=" << ch << " ARG=" << bop_val << endl;
if (currOp)
currOp->BOP(bop_val ? BOP_AND : BOP_OR);
break;
case LISTOP:
case 'l':
/**
if (optarg)
listOid=getInt(optarg);
else
listOid=0;
cout << "OPT=" << ch << " LISTOP " << listOid << endl;
**/
list = true;
break;
case LOOPBACKOP:
case 'p':
if (optarg)
loopback_count = getInt(optarg);
// cout << "OPT=" << ch << " LOOPBACKOP " << loopback_count << endl;
currOp = NULL;
currOp = new OidOperation(0, OidOperation::LOOPBACK, pidId);
OpList.push_back(currOp);
break;
case 'c':
case CONCURRENT:
concurrent_flag = true;
// cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl;
break;
case LBIDTRACE:
lbidtrace_flag = true;
// cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl;
if (currOp)
currOp->setLbidTraceOn();
break;
case PMPROFILE:
pmprofile_flag = true;
// cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl;
if (currOp)
currOp->setPMProfileOn();
break;
case 'q':
case QUERYOP:
query_flag = true;
// cout << "OPT=" << ch << " QUERY FLAG " << query_flag << endl;
break;
case '?':
default: cout << "optarg " << optarg << endl; usage();
}
if (list == true)
break;
} // while
// if list is requested, print the listing and exit
//
vector<struct BatchScanThr*> BatchScanThreads;
vector<struct BatchStepThr*> BatchStepThreads;
vector<struct BatchFiltThr*> BatchFiltThreads;
vector<struct ColScanThr*> ColScanThreads;
vector<struct ColStepThr*> ColStepThreads;
vector<struct DictScanThr*> DictScanThreads;
vector<struct DictSigThr*> DictSigThreads;
vector<thread*> thrArray;
if (query_flag)
{
cout << "starting batch query thread\n";
struct BatchQueryThr* qt = new struct BatchQueryThr(OpList);
thread* t1 = new thread(*qt);
if (concurrent_flag)
thrArray.push_back(t1);
else
t1->join();
}
else if (list)
{
doListOp();
}
else
{
for (uint32_t i = 0; i < OpList.size(); i++)
{
if (OpList[i]->OpType() == OidOperation::LOOPBACK)
{
doLoopBack(loopback_count);
}
else if (OpList[i]->OpType() == OidOperation::SCAN)
{
struct ColScanThr* cst = new struct ColScanThr(*OpList[i]);
ColScanThreads.push_back(cst);
thread* t1 = new thread(*cst);
if (concurrent_flag)
thrArray.push_back(t1);
else
{
t1->join();
delete t1;
}
}
else if (OpList[i]->OpType() == OidOperation::BLOCK)
{
struct ColStepThr* cst = new struct ColStepThr(*OpList[i]);
ColStepThreads.push_back(cst);
thread* t1 = new thread(*cst);
if (concurrent_flag)
thrArray.push_back(t1);
else
{
t1->join();
delete t1;
}
}
else if (OpList[i]->OpType() == OidOperation::BATCHSCAN)
{
cout << "starting batch scan thread\n";
struct BatchScanThr* cst = new struct BatchScanThr(*OpList[i]);
BatchScanThreads.push_back(cst);
thread* t1 = new thread(*cst);
if (concurrent_flag)
thrArray.push_back(t1);
else
{
t1->join();
delete t1;
}
}
else if (OpList[i]->OpType() == OidOperation::BATCHSTEP)
{
cout << "starting batch step thread\n";
struct BatchStepThr* cst = new struct BatchStepThr(*OpList[i]);
BatchStepThreads.push_back(cst);
thread* t1 = new thread(*cst);
if (concurrent_flag)
thrArray.push_back(t1);
else
{
t1->join();
delete t1;
}
}
else if (OpList[i]->OpType() == OidOperation::BATCHFILT)
{
cout << "starting batch filt thread\n";
struct BatchFiltThr* cst = new struct BatchFiltThr(*OpList[i]);
BatchFiltThreads.push_back(cst);
thread* t1 = new thread(*cst);
if (concurrent_flag)
thrArray.push_back(t1);
else
{
t1->join();
delete t1;
}
i += 2;
}
} // for
} // else
// join threads to main
for (uint32_t i = 0; i < thrArray.size(); i++)
thrArray[i]->join();
// clean up
for (uint32_t i = 0; i < OpList.size(); i++)
delete OpList[i];
OpList.clear();
for (uint32_t i = 0; i < BatchScanThreads.size(); i++)
delete BatchScanThreads[i];
BatchScanThreads.clear();
for (uint32_t i = 0; i < BatchStepThreads.size(); i++)
delete BatchStepThreads[i];
BatchStepThreads.clear();
for (uint32_t i = 0; i < BatchFiltThreads.size(); i++)
delete BatchFiltThreads[i];
BatchFiltThreads.clear();
for (uint32_t i = 0; i < ColScanThreads.size(); i++)
delete ColScanThreads[i];
ColScanThreads.clear();
for (uint32_t i = 0; i < ColStepThreads.size(); i++)
delete ColStepThreads[i];
ColStepThreads.clear();
for (uint32_t i = 0; i < DictScanThreads.size(); i++)
delete DictScanThreads[i];
DictScanThreads.clear();
for (uint32_t i = 0; i < DictSigThreads.size(); i++)
delete DictSigThreads[i];
DictSigThreads.clear();
for (uint32_t i = 0; i < thrArray.size(); i++)
delete thrArray[i];
thrArray.clear();
} // main()