/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: pingproc.cpp 2101 2013-01-21 14:12:52Z rdempsey $ #include #include #include #include #include #include #include using namespace std; #include using namespace boost; #include "bytestream.h" using namespace messageqcpp; #include "distributedenginecomm.h" #include "primitivemsg.h" #include "jobstep.h" #include "batchprimitiveprocessor-jl.h" using namespace joblist; #include "calpontsystemcatalog.h" using namespace execplan; #include "brm.h" using namespace BRM; // Global vars bool debug; bool thdFcnFailure; // // TODO: Why is this namespace here? namespace { void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff) { if (tv2.tv_nsec < tv1.tv_nsec) { diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1; diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec + 1000000000; } else { diff.tv_sec = tv2.tv_sec - tv1.tv_sec; diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec; } } // timespec_sub // // class OidOperation { public: enum OpType_t { SCAN = 0, BLOCK = 1, LOOPBACK = 2, NONE = 3, BATCHSCAN = 4, BATCHSTEP = 5, BATCHFILT = 6 }; OidOperation(const OID_t oid, const OpType_t opType, const uint32_t sessionId = 0); ~OidOperation(){}; void addFilter(const int8_t COP, const int64_t value); const OID_t OID() const { return fOid; } const OpType_t OpType() const { return fOpType; } const CalpontSystemCatalog::ColType& ColumnType() { return fColType; } const uint32_t ColumnWidth() const { return fColType.colWidth; } const uint32_t FilterCount() const { return fFilterCount; } const ByteStream& FilterString() { return fFilterList; } const uint32_t SessionId() const { return fSessionId; } const uint32_t DataType() const { return fColType.colDataType; } const uint32_t BOP() const { return fBOP; } void BOP(const uint32_t bop) { if (FilterCount() >= 2) fBOP = bop; } const uint32_t COP1() const { return fCOP1; } void COP1(const uint32_t cop) { fCOP1 = cop; } const uint32_t COP2() const { return fCOP2; } void COP2(const uint32_t cop) { fCOP2 = cop; } bool isIntegralDataType(); void setLbidTraceOn(); void setPMProfileOn(); bool LbidTrace() { return fLbidTrace; } bool PMProfile() { return fPMProfile; } void deSerializeFilter(int8_t& COP, int64_t& value); // private: OidOperation(){}; OID_t fOid; ByteStream fFilterList; uint32_t fFilterCount; OpType_t fOpType; CalpontSystemCatalog::ColType fColType; uint32_t fSessionId; uint32_t fBOP; uint32_t fCOP1; uint32_t fCOP2; bool fLbidTrace; bool fPMProfile; }; // class OidOperation // // OidOperation::OidOperation(const OID_t oid, const OpType_t opType, const uint32_t sessionId) : fOid(oid) , fFilterList() , fFilterCount(0) , fOpType(opType) , fSessionId(sessionId) , fBOP(BOP_NONE) , fCOP1(COMPARE_NIL) , fCOP2(COMPARE_NIL) , fLbidTrace(false) , fPMProfile(false) { boost::shared_ptr cat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); fColType = cat->colType(oid); fFilterList.reset(); } void OidOperation::setLbidTraceOn() { fLbidTrace = true; } void OidOperation::setPMProfileOn() { fPMProfile = true; } void OidOperation::addFilter(const int8_t COP, const int64_t value) { if (fFilterCount == 2) return; fFilterList << (uint8_t)COP; // converts to a type of the appropriate width, then bitwise // copies into the filter ByteStream switch (ColumnWidth()) { case 1: int8_t tmp8; tmp8 = value; fFilterList << *((uint8_t*)&tmp8); break; case 2: int16_t tmp16; tmp16 = value; fFilterList << *((uint16_t*)&tmp16); break; case 4: int32_t tmp32; tmp32 = value; fFilterList << *((uint32_t*)&tmp32); break; case 8: fFilterList << *((uint64_t*)&value); break; default: ostringstream o; o << "addFilter: colType says OID " << " has a width of " << ColumnWidth(); throw runtime_error(o.str()); } fFilterCount++; } void OidOperation::deSerializeFilter(int8_t& COP, int64_t& value) { if (fFilterCount == 0) { COP = COMPARE_NIL; return; } fFilterList >> *(uint8_t*)&COP; switch (ColumnWidth()) { case 1: int8_t tmp8; tmp8 = value; fFilterList >> *((uint8_t*)&tmp8); value = tmp8; break; case 2: int16_t tmp16; fFilterList >> *((uint16_t*)&tmp16); value = tmp16; break; case 4: int32_t tmp32; fFilterList >> *((uint32_t*)&tmp32); value = tmp32; break; case 8: fFilterList >> *((uint64_t*)&value); break; default: ostringstream o; o << "deSerializeFilter: colType says OID " << " has a width of " << ColumnWidth(); throw runtime_error(o.str()); } fFilterCount--; } typedef vector OperationList; // Only process these column types // bool OidOperation::isIntegralDataType() { if (DataType() == CalpontSystemCatalog::BIT || DataType() == CalpontSystemCatalog::TINYINT || DataType() == CalpontSystemCatalog::SMALLINT || DataType() == CalpontSystemCatalog::MEDINT || DataType() == CalpontSystemCatalog::INT || DataType() == CalpontSystemCatalog::DATE || DataType() == CalpontSystemCatalog::BIGINT || DataType() == CalpontSystemCatalog::DATETIME || DataType() == CalpontSystemCatalog::TIMESTAMP || DataType() == CalpontSystemCatalog::TIME || DataType() == CalpontSystemCatalog::UTINYINT || DataType() == CalpontSystemCatalog::USMALLINT || DataType() == CalpontSystemCatalog::UMEDINT || DataType() == CalpontSystemCatalog::UINT || DataType() == CalpontSystemCatalog::UBIGINT) return true; if (DataType() == CalpontSystemCatalog::CHAR && 1 == fColType.colWidth) return true; return false; } const ByteStream formatLoopBackMsg(const uint32_t sessionId, uint32_t uniqueId) { ByteStream primMsg; ISMPacketHeader ism; memset(&ism, 0, sizeof(ism)); ism.Command = COL_LOOPBACK; ism.Size = sizeof(ism) + sizeof(ColLoopback); ism.Type = 2; primMsg.load((const uint8_t*)&ism, sizeof(ism)); struct ColLoopback lb; memset(&lb, 0, sizeof(lb)); lb.Hdr.SessionID = sessionId; lb.Hdr.StatementID = 0; lb.Hdr.TransactionID = sessionId; lb.Hdr.VerID = 0; lb.Hdr.StepID = sessionId; lb.Hdr.UniqueID = uniqueId; primMsg.append((const uint8_t*)&lb, sizeof(lb)); return primMsg; } // formatLoopBackMsg const ByteStream formatDictionaryMsg(const uint64_t lbid, ByteStream& ridList, const uint16_t ridCount, OidOperation& oidOp) { ByteStream primMsg; DictSignatureRequestHeader hdr; ISMPacketHeader ism; ism.Flags = 0; // planFlagsToPrimFlags(fTraceFlags); ism.Command = DICT_SIGNATURE; ism.Size = sizeof(DictSignatureRequestHeader) + ridList.length(); ism.Type = 2; hdr.Hdr.SessionID = oidOp.SessionId(); hdr.Hdr.StatementID = 0; hdr.Hdr.TransactionID = oidOp.SessionId(); hdr.Hdr.VerID = 0; hdr.Hdr.StepID = 0; hdr.LBID = lbid; hdr.PBID = 0; idbassert(ridCount <= 8000); hdr.NVALS = ridCount; primMsg.load((const uint8_t*)&ism, sizeof(ism)); primMsg.append((const uint8_t*)&hdr, sizeof(DictSignatureRequestHeader)); primMsg += ridList; return primMsg; } const ByteStream formatColStepMsg(const uint64_t lbid, ByteStream& ridList, const uint16_t ridCount, OidOperation& oidOp, uint32_t uniqueId) { ByteStream primMsg; NewColRequestHeader hdr; memset(&hdr, 0, sizeof(hdr)); hdr.ism.Reserve = 0; hdr.ism.Flags = 0; if (oidOp.LbidTrace() == true) hdr.ism.Flags |= PF_LBID_TRACE; if (oidOp.PMProfile() == true) hdr.ism.Flags |= PF_PM_PROF; hdr.ism.Command = COL_BY_SCAN; hdr.ism.Size = sizeof(NewColRequestHeader) + oidOp.FilterString().length() + ridList.length(); hdr.ism.Type = 2; hdr.hdr.SessionID = oidOp.SessionId(); hdr.hdr.StatementID = 0; hdr.hdr.TransactionID = oidOp.SessionId(); hdr.hdr.VerID = 0; hdr.hdr.StepID = oidOp.SessionId(); hdr.hdr.UniqueID = uniqueId; hdr.LBID = lbid; idbassert(hdr.LBID > 0); hdr.PBID = 0; hdr.DataSize = oidOp.ColumnWidth(); hdr.DataType = oidOp.DataType(); hdr.OutputType = OT_BOTH; hdr.BOP = BOP_NONE; // hdr.InputFlags = 0; hdr.NOPS = oidOp.FilterCount(); hdr.NVALS = ridCount; hdr.sort = 0; primMsg.load((const uint8_t*)&hdr, sizeof(NewColRequestHeader)); if (oidOp.FilterCount() > 0) primMsg += oidOp.FilterString(); if (ridCount > 0) primMsg += ridList; return primMsg; } // formatColStepMsg const ByteStream formatDictionaryScanMsg(const uint64_t lbid, const uint16_t count, OidOperation& oidOp) { ByteStream primMsg; DictTokenByScanRequestHeader hdr; hdr.ism.Reserve = 0; hdr.ism.Flags = 0; hdr.ism.Command = DICT_TOKEN_BY_SCAN_COMPARE; hdr.ism.Size = sizeof(DictTokenByScanRequestHeader) + oidOp.FilterString().length(); hdr.ism.Type = 2; hdr.Hdr.SessionID = oidOp.SessionId(); hdr.Hdr.StatementID = 0; hdr.Hdr.TransactionID = 0; hdr.Hdr.VerID = 0; hdr.Hdr.StepID = oidOp.SessionId(); hdr.LBID = lbid; idbassert(hdr.LBID >= 0); hdr.PBID = 0; hdr.OutputType = OT_TOKEN; hdr.BOP = oidOp.BOP(); hdr.COP1 = oidOp.COP1(); hdr.COP2 = oidOp.COP2(); hdr.NVALS = oidOp.FilterCount(); hdr.Count = count; hdr.charsetNumber = oidOp.ColumnType().charsetNumber; idbassert(hdr.Count > 0); primMsg.load((const uint8_t*)&hdr.ism, sizeof(ISMPacketHeader)); primMsg.append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader)); primMsg += oidOp.FilterString(); return primMsg; } const ByteStream formatColScanMsg(const uint64_t lbid, const uint16_t count, OidOperation& oidOp, uint32_t uniqueId) { ByteStream primMsg; ISMPacketHeader ism; ColByScanRangeRequestHeader fMsgHeader; memset(&fMsgHeader, 0, sizeof(fMsgHeader)); memset(&ism, 0, sizeof(ism)); ism.Reserve = 0; ism.Flags = 0; if (oidOp.LbidTrace() == true) ism.Flags |= PF_LBID_TRACE; if (oidOp.PMProfile() == true) ism.Flags |= PF_PM_PROF; ism.Command = COL_BY_SCAN_RANGE; ism.Size = sizeof(fMsgHeader) + sizeof(ism) + oidOp.FilterString().length(); ism.Type = 2; primMsg.load((const uint8_t*)&ism, sizeof(ism)); fMsgHeader.LBID = lbid; idbassert(fMsgHeader.LBID >= 0); fMsgHeader.PBID = 0; fMsgHeader.DataSize = oidOp.ColumnWidth(); fMsgHeader.DataType = oidOp.DataType(); fMsgHeader.OutputType = OT_BOTH; fMsgHeader.BOP = oidOp.BOP(); fMsgHeader.NOPS = oidOp.FilterCount(); fMsgHeader.NVALS = 0; fMsgHeader.Count = count; //(hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); idbassert(fMsgHeader.Count > 0); fMsgHeader.Hdr.SessionID = oidOp.SessionId(); fMsgHeader.Hdr.StatementID = 0; fMsgHeader.Hdr.TransactionID = oidOp.SessionId(); fMsgHeader.Hdr.VerID = 0; fMsgHeader.Hdr.StepID = oidOp.SessionId(); fMsgHeader.Hdr.UniqueID = uniqueId; primMsg.append((const uint8_t*)&fMsgHeader, sizeof(fMsgHeader)); if (oidOp.FilterCount() > 0) primMsg += oidOp.FilterString(); return primMsg; } // formatColScanMsg // void doBatchOp_scan(OidOperation& OidOp); void doBatchOp_step(OidOperation& OidOp); void doBatchOp_filt(OidOperation& OidOp); void doBatchQueryOp(OperationList& OidOps); void doColScan(OidOperation& OidOp); void doColStep(OidOperation& OidOp); void doDictScan(OidOperation& OidOp); void doDictStep(OidOperation& OidOp); // receive the responses from PrimProc // struct ThdFcn { void operator()() { uint64_t totalBytes = 0; try { ByteStream ibs; if (debug) cout << "Waiting on " << fNumMsgs << " messages." << endl; for (uint32_t k = 0; k < fNumMsgs; k++) { // cout << "reading msg #" << k << "...\n"; ibs = fDec->read(uniqueID); // cout << "got msg #" << k << endl; if (debug) if (k % 10240 == 0) cout << "ThdFcn: read " << fSessionid << " " << k << "/" << fNumMsgs << " " << ibs.length() << "/" << totalBytes << endl; if (ibs.length() == 0) break; totalBytes += ibs.length(); } } catch (exception& e) { cerr << "read exception: " << e.what() << endl; thdFcnFailure = true; } if (debug) cout << totalBytes << " bytes read in " << fNumMsgs << " messages" << endl; } // void operator() uint32_t fSessionid; uint32_t uniqueID; DistributedEngineComm* fDec; unsigned fNumMsgs; }; // struct ThdFcn struct QryThdFcn { void operator()() { uint64_t totalBytes = 0; int64_t min; int64_t max; uint64_t lbid; uint32_t cachedIO; uint32_t physIO; uint32_t touchedBlocks; bool validCPData; try { ByteStream ibs; ByteStream obs; if (debug) cout << "Waiting on " << fNumMsgs << " messages." << endl; for (uint32_t k = 0; k < fNumMsgs; k++) { // cout << "reading msg #" << k << "...\n"; ibs = fDec->read(uniqueID); // cout << "got msg #" << k << endl; if (debug) if (k % 10240 == 0) cout << "QryThdFcn: read " << fSessionid << " " << k << "/" << fNumMsgs << " " << ibs.length() << "/" << totalBytes << " rows: " << fRows << endl; if (ibs.length() == 0) break; totalBytes += ibs.length(); fRows += fBpp.getTableBand(ibs, &obs, &validCPData, &lbid, &min, &max, &cachedIO, &physIO, &touchedBlocks); fBlockTouched += touchedBlocks; } } catch (exception& e) { cerr << "read exception: " << e.what() << endl; thdFcnFailure = true; } if (debug) cout << totalBytes << " bytes read in " << fNumMsgs << " messages for " << fRows << " rows and " << fBlockTouched << " blocks\n"; } // void operator() QryThdFcn(BatchPrimitiveProcessorJL& bpp, uint64_t& rows, uint32_t& blk) : fBpp(bpp), fNumMsgs(0), fRows(rows), fBlockTouched(blk) { } BatchPrimitiveProcessorJL& fBpp; uint32_t fSessionid; DistributedEngineComm* fDec; unsigned fNumMsgs; uint64_t& fRows; uint32_t& fBlockTouched; uint32_t uniqueID; }; // struct ThdFcn struct BatchScanThr { BatchScanThr(OidOperation& oidOp) : fOidOp(oidOp) { } void operator()() { doBatchOp_scan(fOidOp); } // void operator() OidOperation& fOidOp; }; // struct BatchScanThr struct BatchStepThr { BatchStepThr(OidOperation& oidOp) : fOidOp(oidOp) { } void operator()() { doBatchOp_step(fOidOp); } // void operator() OidOperation& fOidOp; }; // struct BatchStepThr struct BatchFiltThr { BatchFiltThr(OidOperation& oidOp) : fOidOp(oidOp) { } void operator()() { doBatchOp_filt(fOidOp); } // void operator() OidOperation& fOidOp; }; // struct BatchFiltThr struct BatchQueryThr { BatchQueryThr(OperationList& oidOps) : fOidOps(oidOps) { } void operator()() { doBatchQueryOp(fOidOps); } // void operator() OperationList& fOidOps; }; // struct struct ColStepThr { ColStepThr(OidOperation& oidOp) : fOidOp(oidOp) { } void operator()() { doColStep(fOidOp); } // void operator() OidOperation& fOidOp; }; // struct ColStepThr struct ColScanThr { ColScanThr(OidOperation& oidOp) : fOidOp(oidOp) { } void operator()() { doColScan(fOidOp); } // void operator() OidOperation& fOidOp; }; // struct ColStepThr struct DictSigThr { }; // DictSigThr struct DictScanThr { }; // DictScanThr // doColScan void doDictionaryScan(OidOperation& OidOp) { } // doColScan void doColScan(OidOperation& OidOp) { if (debug) cout << "beginning doColScan\n"; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); struct timespec ts1; struct timespec ts2; struct timespec diff; uint32_t uniqueID; uint32_t sessionid = getpid(); uint32_t totalBlks = 0; // dec->addSession(sessionid); // dec->addStep(sessionid, sessionid); DBRM dbrm; uniqueID = dbrm.getUnique32(); dec->addQueue(uniqueID); int err = dbrm.lookup(OidOp.OID(), lbidRanges); if (err) throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); err = dbrm.getHWM(OidOp.OID(), hwm); if (err) throw runtime_error("doAColScan: BRM HWM lookup failure (3)"); LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; ThdFcn f1; f1.fSessionid = sessionid; f1.uniqueID = uniqueID; f1.fDec = dec; f1.fNumMsgs = 0; thdFcnFailure = false; uint32_t rangeSize = 0; ByteStream obs; // calculate the expected number of messages for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); totalBlks += rangeSize; } // for f1.fNumMsgs = (totalBlks / OidOp.ColumnWidth()); if (0 < totalBlks % OidOp.ColumnWidth()) ++f1.fNumMsgs; idbassert(f1.fNumMsgs); if (debug) cout << "Scanning OID " << OidOp.OID() << " " << f1.fNumMsgs << " msgs" << endl; thread t1(f1); clock_gettime(CLOCK_REALTIME, &ts1); // send the primitive requests int rCount = 0; totalBlks = 0; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { try { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "doAColScan dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); obs = formatColScanMsg(lbid, rangeSize, OidOp, uniqueID); if (obs.length() > 0) { dec->write(obs); rCount++; if (debug) cout << "colScan: " << rCount << "/" << lbidRanges.size() << " sending " << obs.length() << " bytes " << " lbid " << lbid << " sz " << rangeSize << endl; } } catch (exception& e) { cerr << "catch " << e.what() << endl; } totalBlks += rangeSize; } // for (lbidRanges ... t1.join(); clock_gettime(CLOCK_REALTIME, &ts2); // dec->removeSession(sessionid); dec->removeQueue(uniqueID); timespec_sub(ts1, ts2, diff); cout << "ColScan stats OID: " << OidOp.OID() << "\tFilter: " << (int)OidOp.FilterCount() << "\tBlocks: " << (int)totalBlks << "\tElapse: " << diff.tv_sec + (diff.tv_nsec / 1000000000.0) << "s"; float rate = 0; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << " Blks/sec : " << rate << endl; if (thdFcnFailure) cout << "There was a failure in the read thread." << endl; cout << endl; } // doAScan // void usage() { cerr << "PingProc operation [filter] [operation] [filter] [reporting]" << endl << "PingProc -c -s -gt 0 -t -d " << endl << "\t---- operation flags ----" << endl << "\t--scan -s " << endl << "\t--block -t " << endl << "\t--BatchPrimitiveScan -B " << endl << "\t--BatchPrimitiveStep -Z " << endl << "\t--concurrent -c perform each operation in its own thread" << endl << "\t--lbid-trace -lb set lbid trace flag in request" << endl << "\t---- filter flags ----" << endl << "\t--equal -eq equivalency test of values in a block>" << endl << "\t--greater-than -gt - greater than (>) test of values in a block" << endl << "\t--greater-than-equal -ge greater than or equal to (>=) test of values in a block" << endl << "\t--less-than -lt less than (<) test of values in a block" << endl << "\t--less-than-equal -le less than or equal to (<=) test of values in a block" << endl << "\t--not-equal -ne not equal to (!=) test of values in a block" << endl << "\t--bop <1 or 0> binary operator when 2 comparison filters are present" << endl << "\t---- reporting flags ----" << endl << "\t--debug -d turn debug output on>" << endl << "\t--list -list -l print out all oids and their ranges" << endl << "\t--loopback -p send count loopback requests" << endl << "\t--query -q run batch query for all the oids (enter with -B or -Z)" << endl; } // usage() const int64_t getInt(string s) { if (s.length() <= 0) return -1; // if (atoll(s.data()) < 0) // return -1; return atoll(s.data()); } // getInt // dictionary void doDictionarySig(OidOperation& OidOp) { } // doDictionarySig // col step void doColStep(OidOperation& OidOp) { struct timespec ts1; struct timespec ts2; struct timespec diff; DBRM dbrm; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; uint32_t totalBlks = 0; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; // dec->addSession(OidOp.SessionId()); // dec->addStep(OidOp.SessionId(), OidOp.SessionId()); uint32_t uniqueID = dbrm.getUnique32(); dec->addQueue(uniqueID); f1.fSessionid = OidOp.SessionId(); f1.uniqueID = uniqueID; f1.fDec = dec; f1.fNumMsgs = 0; thdFcnFailure = false; ByteStream ridlist; uint16_t ridCount = 0; // BLOCK_SIZE/OidOp.ColumnWidth(); for (uint16_t i = 0; i < ridCount; i++) ridlist << i; int err = dbrm.lookup(OidOp.OID(), lbidRanges); if (err) throw runtime_error("doAColStep: BRM LBID range lookup failure (1)"); err = dbrm.getHWM(OidOp.OID(), hwm); if (err) throw runtime_error("doAColStep: BRM HWM lookup failure (3)"); uint32_t rangeSize = 0; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "pColStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) break; // continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); totalBlks += rangeSize; } // for uint32_t colwidth = OidOp.ColumnWidth(); f1.fNumMsgs = totalBlks / (colwidth); if (0 < totalBlks % colwidth) ++f1.fNumMsgs; idbassert(f1.fNumMsgs); thread t1(f1); ByteStream obs; totalBlks = 0; clock_gettime(CLOCK_REALTIME, &ts1); for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { if (debug) cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) break; // continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); for (unsigned i = 0; i < rangeSize; i++) { obs += formatColStepMsg(lbid + i, ridlist, ridCount, OidOp, uniqueID); if (0 == (i + 1) % colwidth) { dec->write(obs); if (debug && i + 1 == rangeSize) cout << "colStep: " << i << "/" << rangeSize << " " << obs.length() << " lbid " << lbid + i << endl; obs.restart(); } } totalBlks += rangeSize; } // for if (obs.length()) { dec->write(obs); if (debug) cout << "colStep: last" << "/" << rangeSize << " " << obs.length() << endl; } obs.reset(); t1.join(); //@bug 849 moved join here and changed output to be like pColScan. clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); // t1.join(); cout << "ColStep stats OID: " << OidOp.OID() << "\tFilter: " << (int)OidOp.FilterCount() << "\tBlocks: " << (int)totalBlks << "\tElapse: " << diff.tv_sec + (diff.tv_nsec / 1000000000.0) << "s"; float rate = 0; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tBlks/sec " << rate << endl; if (thdFcnFailure) cerr << "There was a failure in the read thread." << endl; cout << endl; } // doColStep void doBatchOp_scan(OidOperation& OidOp) { struct timespec ts1, ts2, diff; JobStepAssociation injs, outjs; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; boost::shared_ptr sysCat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); pColScanStep scan(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, rm); int32_t filters = OidOp.FilterCount(); while (OidOp.FilterCount() > 0) { int8_t cop; int64_t value; OidOp.deSerializeFilter(cop, value); scan.addFilter(cop, value); } BatchPrimitiveProcessorJL bpp; ByteStream bs; DBRM dbrm; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; uint32_t uniqueID = dbrm.getUnique32(); bpp.setUniqueID(uniqueID); bpp.setSessionID(OidOp.SessionId()); bpp.setStepID(OidOp.SessionId()); bpp.addFilterStep(scan); cout << "session number = " << OidOp.SessionId() << endl; // dec->addSession(OidOp.SessionId()); // dec->addStep(OidOp.SessionId(), OidOp.SessionId()); dec->addQueue(uniqueID); f1.fSessionid = OidOp.SessionId(); f1.uniqueID = uniqueID; f1.fDec = dec; thdFcnFailure = false; int err = dbrm.lookup(OidOp.OID(), lbidRanges); if (err) { cerr << "doAColScan: BRM LBID range lookup failure (1)\n"; throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); } err = dbrm.getHWM(OidOp.OID(), hwm); if (err) { cerr << "doAColScan: BRM HWM lookup failure (3)" << endl; throw runtime_error("doAColScan: BRM HWM lookup failure (3)"); } f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0); thread t1(f1); bpp.createBPP(bs); dec->write(bs); bs.restart(); uint32_t rangeSize = 0, totalBlks = 0; clock_gettime(CLOCK_REALTIME, &ts1); // cout << "BPP scaning\n"; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); bpp.setLBID(lbid); bpp.setCount(rangeSize / OidOp.fColType.colWidth + (rangeSize % OidOp.fColType.colWidth ? 1 : 0)); bpp.runBPP(bs); dec->write(bs); // cout << "sending the BPP\n"; bpp.reset(); bs.restart(); totalBlks += rangeSize; } t1.join(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); float rate = 0; cout << "ColStep stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << "\tFilters: " << filters << "\tBlocks : " << (int)totalBlks; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tBlks/sec " << rate << endl; if (thdFcnFailure) cerr << "There was a failure in the read thread." << endl; bpp.destroyBPP(bs); dec->write(bs); cout << endl; } void doBatchOp_filt(OidOperation& OidOp) { struct timespec ts1, ts2, diff; JobStepAssociation injs, outjs; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; boost::shared_ptr sysCat = CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); ByteStream bs; DBRM dbrm; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; uint32_t uniqueID; BatchPrimitiveProcessorJL bpp; uniqueID = dbrm.getUnique32(); bpp.setUniqueID(uniqueID); bpp.setSessionID(OidOp.SessionId()); bpp.setStepID(OidOp.SessionId()); pColScanStep scan(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, rm); while (OidOp.FilterCount() > 0) { int8_t cop; int64_t value; OidOp.deSerializeFilter(cop, value); scan.addFilter(cop, value); } bpp.addFilterStep(scan); pColStep step(injs, outjs, dec, sysCat, OidOp.fOid + 1, OidOp.fOid + 1, OidOp.fSessionId, 0, OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, rm); while (OidOp.FilterCount() > 0) { int8_t cop; int64_t value; OidOp.deSerializeFilter(cop, value); step.addFilter(cop, value); } bpp.addFilterStep(step); execplan::CalpontSystemCatalog::ColType colType; FilterStep filt(OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, colType); filt.setBOP(OidOp.BOP()); bpp.addFilterStep(filt); cout << "session number = " << OidOp.SessionId() << endl; // dec->addSession(OidOp.SessionId()); // dec->addStep(OidOp.SessionId(), OidOp.SessionId()); dec->addQueue(uniqueID); f1.fSessionid = OidOp.SessionId(); f1.uniqueID = uniqueID; f1.fDec = dec; thdFcnFailure = false; int err = dbrm.lookup(OidOp.OID(), lbidRanges); if (err) { cerr << "doBatchOp_filt: BRM LBID range lookup failure (1)\n"; throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); } err = dbrm.getHWM(OidOp.OID(), hwm); if (err) { cerr << "doBatchOp_filt: BRM HWM lookup failure (2)" << endl; throw runtime_error("doBatchOp_filt: BRM HWM lookup failure (2)"); } f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0); thread t1(f1); bpp.createBPP(bs); dec->write(bs); bs.restart(); uint32_t rangeSize = 0, totalBlks = 0; clock_gettime(CLOCK_REALTIME, &ts1); // cout << "BPP scaning\n"; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "doBatchOp_filt: dbrm.lookup failed for lbid (3)" << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); bpp.setLBID(lbid); bpp.setCount(rangeSize / OidOp.fColType.colWidth + (rangeSize % OidOp.fColType.colWidth ? 1 : 0)); bpp.runBPP(bs); dec->write(bs); // cout << "sending the BPP\n"; bpp.reset(); bs.restart(); totalBlks += rangeSize; } t1.join(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); float rate = 0; cout << "doBatchOp_filt stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << "\tBlocks : " << (int)totalBlks; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tBlks/sec " << rate << endl; if (thdFcnFailure) cerr << "There was a failure in the read thread." << endl; bpp.destroyBPP(bs); dec->write(bs); cout << endl; } void doBatchQueryOp(OperationList& OidOps) { struct timespec ts1, ts2, diff; JobStepAssociation injs, outjs; BatchPrimitiveProcessorJL bpp; uint64_t rows = 0; uint32_t blockTouched = 0; DBRM dbrm; QryThdFcn f1(bpp, rows, blockTouched); OperationList::iterator filterOp = OidOps.begin(); uint32_t sessionId = (*filterOp)->SessionId(); uint32_t uniqueID = dbrm.getUnique32(); bpp.setUniqueID(uniqueID); bpp.setSessionID(sessionId); bpp.setStepID(sessionId); cout << "session number = " << sessionId << endl; f1.fSessionid = sessionId; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); // dec->addSession(sessionId); // dec->addStep(sessionId, sessionId); dec->addQueue(uniqueID); f1.fDec = dec; f1.uniqueID = uniqueID; // boost::shared_ptr sysCat = //execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); // first column is made into the first scan filter step including filters OID_t scanOid = (*filterOp)->fOid; uint32_t scanWidth = (*filterOp)->ColumnWidth(); uint32_t maxWidth = scanWidth; uint32_t pid = getpid(); pColScanStep scan(injs, outjs, dec, execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(pid), scanOid, scanOid, sessionId, 0, sessionId, sessionId, sessionId, rm); uint32_t filterCount = (*filterOp)->FilterCount(); while ((*filterOp)->FilterCount() > 0) { int8_t cop; int64_t value; (*filterOp)->deSerializeFilter(cop, value); scan.addFilter(cop, value); } bpp.addFilterStep(scan); // Any other columns that are batch scans are added as filter steps, the rest as project steps. // The last filter step is added as a passthru step into the project list. OperationList::iterator listend = OidOps.end(); for (OperationList::iterator op = OidOps.begin() + 1; op != listend; ++op) { pColStep step(injs, outjs, dec, execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(pid), (*op)->fOid, (*op)->fOid, sessionId, 0, sessionId, sessionId, sessionId, rm); if ((*op)->OpType() == OidOperation::BATCHSCAN) { filterCount += (*op)->FilterCount(); while ((*op)->FilterCount() > 0) { int8_t cop; int64_t value; (*op)->deSerializeFilter(cop, value); step.addFilter(cop, value); } filterOp = op; bpp.addFilterStep(step); } else { bpp.addProjectStep(step); } if ((*op)->ColumnWidth() > maxWidth) maxWidth = (*op)->ColumnWidth(); } PassThruStep pass(injs, outjs, dec, (*filterOp)->ColumnType(), (*filterOp)->fOid, (*filterOp)->fOid, sessionId, 0, sessionId, sessionId, sessionId, false, rm); bpp.addProjectStep(pass); ByteStream bs; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; thdFcnFailure = false; int err = dbrm.lookup(scanOid, lbidRanges); if (err) { cerr << "doQueryScan: BRM LBID range lookup failure (1)\n"; throw runtime_error("doQueryScan: BRM LBID range lookup failure (1)"); } err = dbrm.getHWM(scanOid, hwm); if (err) { cerr << "doQueryScan: BRM HWM lookup failure (3)" << endl; throw runtime_error("doQueryScan: BRM HWM lookup failure (3)"); } f1.fNumMsgs = hwm / scanWidth + (hwm % scanWidth ? 1 : 0); thread t1(f1); uint32_t cnt = dbrm.getExtentSize() / maxWidth; bpp.createBPP(bs); dec->write(bs); bs.restart(); uint32_t rangeSize = 0, totalBlks = 0; clock_gettime(CLOCK_REALTIME, &ts1); // cout << "BPP scaning\n"; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { cerr << "doBatchQuery dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); uint32_t totallbid = rangeSize / scanWidth + (0 < rangeSize % scanWidth ? 1 : 0); while (0 < totallbid) { if (totallbid < cnt) cnt = totallbid; bpp.setLBID(lbid); bpp.setCount(cnt); bpp.runBPP(bs); dec->write(bs); // cout << "sending the BPP with range cnt " << cnt << " lbid " << lbid << "\n"; bpp.reset(); bs.restart(); lbid += cnt * scanWidth; totallbid -= cnt; } for (OperationList::iterator op = OidOps.begin(); op != OidOps.end(); ++op) { totalBlks += (uint32_t)(rangeSize * (double)((double)(*op)->ColumnWidth() / scanWidth)); } } t1.join(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); float rate = 0; cout << "QueryScan stats - " << bpp.toString() << "\tElapsed: " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << "\tFilters: " << filterCount << "\tBlocks : " << (int)totalBlks; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tBlks/sec " << rate << endl; cout << "\tTouched Blocks: " << blockTouched; rate = blockTouched / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tTouched Blks/sec " << rate << endl; cout << "\tRows: " << rows; rate = rows / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\t\tRows/sec " << fixed << setprecision(2) << rate << endl; if (thdFcnFailure) cerr << "There was a failure in the read thread." << endl; bpp.destroyBPP(bs); dec->write(bs); cout << endl; } void doBatchOp_step(OidOperation& OidOp) { struct timespec ts1, ts2, diff; DBRM dbrm; BRM::LBIDRange_v lbidRanges; HWM_t hwm = 0; LBIDRange_v::iterator it; OID_t tmp; uint32_t fbo; uint32_t totalBlks = 0; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; JobStepAssociation injs, outjs; boost::shared_ptr sysCat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); pColStep step(injs, outjs, dec, sysCat, OidOp.fOid, OidOp.fOid, OidOp.fSessionId, 0, OidOp.fSessionId, OidOp.fSessionId, OidOp.fSessionId, rm); int32_t filters = OidOp.FilterCount(); while (OidOp.FilterCount() > 0) { int8_t cop; int64_t value; OidOp.deSerializeFilter(cop, value); step.addFilter(cop, value); } BatchPrimitiveProcessorJL bpp; ElementType et; ByteStream obs; uint32_t uniqueID = dbrm.getUnique32(); bpp.setUniqueID(uniqueID); bpp.setSessionID(OidOp.SessionId()); bpp.setStepID(OidOp.SessionId()); bpp.addFilterStep(step); // dec->addSession(OidOp.SessionId()); // dec->addStep(OidOp.SessionId(), OidOp.SessionId()); dec->addQueue(uniqueID); f1.fSessionid = OidOp.SessionId(); f1.uniqueID = uniqueID; f1.fDec = dec; f1.fNumMsgs = 0; thdFcnFailure = false; ByteStream ridlist; uint16_t ridCount = 0; // BLOCK_SIZE/OidOp.ColumnWidth(); for (uint16_t i = 0; i < ridCount; i++) ridlist << i; int err = dbrm.lookup(OidOp.OID(), lbidRanges); if (err) throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); err = dbrm.getHWM(OidOp.OID(), hwm); if (err) throw runtime_error("doAColScan: BRM HWM lookup failure (3)"); uint32_t rangeSize = 0; f1.fNumMsgs = hwm / OidOp.fColType.colWidth + (hwm % OidOp.fColType.colWidth ? 1 : 0); thread t1(f1); totalBlks = 0; bpp.createBPP(obs); dec->write(obs); obs.restart(); clock_gettime(CLOCK_REALTIME, &ts1); for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) { BRM::LBID_t lbid = (*it).start; if (dbrm.lookup(lbid, 0, false, tmp, fbo)) { if (debug) cerr << "pColScanStep::sendPrimitiveMessages: dbrm.lookup failed for lbid " << lbid << endl; abort(); } if (hwm < fbo) continue; rangeSize = (hwm > (fbo + (*it).size - 1) ? (*it).size : hwm - fbo + 1); for (unsigned i = 0; i < rangeSize; i++) { /* insert all rids for this LBID */ for (uint32_t j = 0; j < BLOCK_SIZE / OidOp.fColType.colWidth; ++j) { et.first = ((fbo + i) * BLOCK_SIZE / OidOp.fColType.colWidth) + j; et.second = j; bpp.addElementType(et); } /* If on a logical block boundary, send the primitive */ if (i % OidOp.fColType.colWidth == (unsigned)OidOp.fColType.colWidth - 1) { // cout << "serializing at extent offset " << i << endl; bpp.runBPP(obs); dec->write(obs); bpp.reset(); obs.restart(); } if (debug && i + 1 == rangeSize) cout << "colStep: " << i + 1 << "/" << rangeSize << " " << obs.length() << " lbid " << lbid + i << endl; } if (rangeSize % OidOp.fColType.colWidth) { // cout << "serializing last msg\n"; bpp.runBPP(obs); dec->write(obs); bpp.reset(); obs.restart(); } totalBlks += rangeSize; } // for t1.join(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); float rate = 0; cout << "ColStep stats OID: " << OidOp.OID() << " " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << "\tFilters: " << filters << "\tBlocks : " << (int)totalBlks; rate = totalBlks / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\tBlks/sec " << rate << endl; if (thdFcnFailure) cerr << "There was a failure in the read thread." << endl; bpp.destroyBPP(obs); dec->write(obs); cout << endl; } // // void doListOp(const OID_t o = 0) { DBRM dbrm; BRM::LBIDRange_v lbidRanges; LBIDRange_v::iterator it; OID_t oid = 3000; HWM_t hwm = 0; if (o != 0) { int err = dbrm.lookup(o, lbidRanges); if (err) throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); err = dbrm.getHWM(o, hwm); if (err) throw runtime_error("doAColScan: BRM HWM lookup failure (3)"); cout << "Object ID: " << o << " HWM: " << hwm << endl; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) cout << "\tStart: " << (*it).start << " sz: " << (*it).size << endl; } else { for (; oid < 100000; oid++) { int err = dbrm.lookup(oid, lbidRanges); if (lbidRanges.size() == 0) continue; if (err) throw runtime_error("doAColScan: BRM LBID range lookup failure (1)"); err = dbrm.getHWM(oid, hwm); if (err) throw runtime_error("doAColScan: BRM HWM lookup failure (3)"); cout << "Object ID: " << oid << " HWM: " << hwm << endl; for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) cout << "\tStart: " << (*it).start << " sz: " << (*it).size << endl; hwm = 0; lbidRanges.clear(); } // for (; oid... } // else } // doListOp // // do LoopBackOp void doLoopBack(const uint64_t loopcount) { ByteStream lbMsg; struct timespec ts1; struct timespec ts2; struct timespec diff; uint32_t sessionid = getpid(); DBRM dbrm; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; // dec->addSession(sessionid); // dec->addStep(sessionid, sessionid); uint32_t uniqueID = dbrm.getUnique32(); dec->addQueue(uniqueID); f1.fSessionid = sessionid; f1.uniqueID = uniqueID; f1.fDec = dec; f1.fNumMsgs = loopcount; thdFcnFailure = false; thread t1(f1); lbMsg = formatLoopBackMsg(sessionid, uniqueID); cout << "Sending " << loopcount << " LOOPBACK requests" << endl; clock_gettime(CLOCK_REALTIME, &ts1); for (uint64_t i = 0; i < loopcount; i++) { lbMsg = formatLoopBackMsg(sessionid, uniqueID); dec->write(lbMsg); } clock_gettime(CLOCK_REALTIME, &ts2); cout << loopcount << " LOOPBACK msgs sent" << endl; t1.join(); timespec_sub(ts1, ts2, diff); cout << "\ttotal runtime: " << (diff.tv_sec + (diff.tv_nsec / 1000000000.0)) << "s" << endl; float rate = 0; rate = loopcount / (diff.tv_sec + (diff.tv_nsec / 1000000000.0)); cout << "\t" << rate << " rqsts/s" << endl; dec->removeQueue(uniqueID); } // doLoopBack } // namespace /*--------------------------------------------------------------------------- //Command line parameter definition // // -o -o -o . . . // -s // -t // -d // -eq -equal < == test of values in a block> // -gt -greater-than < greater than (>) test of values in a block> // -ge -greater-than-equal < greater than or equal to (>=) test of values in a block> // -lt -less-than < less than (<) test of values in a block> // -le -less-than-equal < less than or equal to (<=) test of values in a block> // -ne -not-equal < not equal to (!=) test of values in a block> // -bop <1 or 0> <1 AND 0 OR binary operator when 2 comparison filters are present> // --list -list -l // --loopback -loopback send loopback requests // --concurrent run all jobs currently if set to true. defaults to false // --lbid-trace -lb turn on lbid tracing // --pm-profile -pmp turn on pm profiling -----------------------------------------------------------------------------*/ int main(int argc, char** argv) { int64_t eq_val = 0; int64_t gt_val = 0; int64_t ge_val = 0; int64_t lt_val = 0; int64_t le_val = 0; int64_t ne_val = 0; int64_t bop_val = 0; int64_t loopback_count = 100000; int64_t scanOid = 0; int64_t stepOid = 0; bool list = false; // OID_t listOid=0; string oidString; vector oidv; int ch = 0; bool concurrent_flag = false; bool lbidtrace_flag = false; bool pmprofile_flag = false; bool query_flag = false; enum CLA_ENUM { OID = (int)0, SCANOP = (int)1, BLOCKOP = (int)2, DEBUG = (int)3, EQFILTER = (int)4, GTFILTER = (int)5, GEFILTER = (int)6, LTFILTER = (int)7, LEFILTER = (int)8, NEFILTER = (int)9, BOP = (int)10, LISTOP = (int)11, LOOPBACKOP = (int)12, CONCURRENT = (int)13, LBIDTRACE = (int)14, PMPROFILE = (int)15, INVALIDOP = (int)16, BATCHSCANOP = (int)17, BATCHSTEPOP = (int)18, BATCHFILTOP = (int)19, QUERYOP = (int)20 }; /** // longopt struct struct option { const char *name; int has_arg; int *flag; int val; }; **/ static struct option long_options[] = { // {const char *name, int has_arg, int *flag, int val}, {"scan", required_argument, NULL, SCANOP}, {"block", required_argument, NULL, BLOCKOP}, {"debug", no_argument, NULL, DEBUG}, {"equal", required_argument, NULL, EQFILTER}, {"eq", required_argument, NULL, EQFILTER}, {"greater-than", required_argument, NULL, GTFILTER}, {"gt", required_argument, NULL, GTFILTER}, {"greater-than-equal", required_argument, NULL, GEFILTER}, {"ge", required_argument, NULL, GEFILTER}, {"less-than", required_argument, NULL, LTFILTER}, {"lt", required_argument, NULL, LTFILTER}, {"less-than-equal", required_argument, NULL, LEFILTER}, {"le", required_argument, NULL, LEFILTER}, {"not-equal", required_argument, NULL, NEFILTER}, {"ne", required_argument, NULL, NEFILTER}, {"bop", optional_argument, NULL, BOP}, {"list", no_argument, NULL, LISTOP}, {"loopback", optional_argument, NULL, LOOPBACKOP}, {"concurrent", no_argument, NULL, CONCURRENT}, {"lbid-trace", no_argument, NULL, LBIDTRACE}, {"lb", no_argument, NULL, LBIDTRACE}, {"pm-prof", no_argument, NULL, PMPROFILE}, {"pm-profile", no_argument, NULL, PMPROFILE}, {"pmp", no_argument, NULL, PMPROFILE}, {"batchscan", required_argument, NULL, BATCHSCANOP}, {"batchstep", required_argument, NULL, BATCHSTEPOP}, {"batchfilt", required_argument, NULL, BATCHFILTOP}, {"queryop", no_argument, NULL, QUERYOP}, {0, 0, 0, 0}}; OidOperation* currOp = NULL; OperationList OpList; if (argc <= 1) { usage(); } // process command line arguments while ((ch = getopt_long_only(argc, argv, "B:Z:F:ds:t:lcqp:", long_options, NULL)) != -1) { pid_t pidId = getpid(); switch (ch) { case SCANOP: case 's': if (optarg) scanOid = getInt(optarg); // cout << "OPT=" << ch << " ARG " << scanOid << endl; currOp = NULL; if (scanOid > 0) currOp = new OidOperation(scanOid, OidOperation::SCAN, pidId); else { cout << "PingProc: scan missing or invalid OID parameter value" << endl; break; } if (currOp && currOp->isIntegralDataType()) OpList.push_back(currOp); else { cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl; delete currOp; currOp = NULL; } break; case BATCHSCANOP: case 'B': if (optarg) scanOid = getInt(optarg); else cout << "no optarg\n"; cout << "OPT=" << ch << " ARG " << scanOid << endl; currOp = NULL; if (scanOid > 0) currOp = new OidOperation(scanOid, OidOperation::BATCHSCAN, pidId); else { cout << "PingProc: batch scan missing or invalid OID parameter value" << endl; break; } if (currOp && currOp->isIntegralDataType()) OpList.push_back(currOp); else { cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl; delete currOp; currOp = NULL; } break; case BATCHSTEPOP: case 'Z': if (optarg) scanOid = getInt(optarg); else cout << "no optarg\n"; cout << "OPT=" << ch << " ARG " << scanOid << endl; currOp = NULL; if (scanOid > 0) currOp = new OidOperation(scanOid, OidOperation::BATCHSTEP, pidId); else { cout << "PingProc: batch step missing or invalid OID parameter value" << endl; break; } if (currOp && currOp->isIntegralDataType()) OpList.push_back(currOp); else { cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl; delete currOp; currOp = NULL; } break; case BATCHFILTOP: case 'F': if (optarg) scanOid = getInt(optarg); else cout << "no optarg\n"; cout << "OPT=" << ch << " ARG " << scanOid << endl; currOp = NULL; if (scanOid > 0) currOp = new OidOperation(scanOid, OidOperation::BATCHFILT, pidId); else { cout << "PingProc: batch filter missing or invalid OID parameter value" << endl; break; } if (currOp && currOp->isIntegralDataType()) OpList.push_back(currOp); else { cout << "PingProc cannot process this ColumnType-oid: " << scanOid << endl; delete currOp; currOp = NULL; } break; case BLOCKOP: case 't': if (optarg) stepOid = getInt(optarg); // cout << "OPT=" << ch << " ARG " << stepOid << endl; currOp = NULL; if (stepOid > 0) currOp = new OidOperation(stepOid, OidOperation::BLOCK, pidId); else { cout << "PingProc: step missing or invalid OID parameter value" << endl; break; } if (currOp && currOp->isIntegralDataType()) OpList.push_back(currOp); else { cout << "PingProc cannot process this ColumnType-oid: " << stepOid << endl; delete currOp; currOp = NULL; } break; case DEBUG: case 'd': // cout << "OPT=" << ch << endl; debug = true; break; case EQFILTER: if (optarg) eq_val = getInt(optarg); else eq_val = 0; cout << "OPT=" << ch << " ARG=" << eq_val << endl; if (currOp) currOp->addFilter(COMPARE_EQ, eq_val); else ; // TODO: error Processing break; case GTFILTER: if (optarg) gt_val = getInt(optarg); else gt_val = 0; // cout << "OPT=" << ch << " ARG=" << gt_val << endl; if (currOp) currOp->addFilter(COMPARE_GT, gt_val); break; case GEFILTER: if (optarg) ge_val = getInt(optarg); else ge_val = 0; // cout << "OPT=" << ch << " ARG=" << ge_val << endl; if (currOp) currOp->addFilter(COMPARE_GE, ge_val); break; case LTFILTER: if (optarg) lt_val = getInt(optarg); else lt_val = 0; // cout << "OPT=" << ch << " ARG=" << lt_val << endl; if (currOp) currOp->addFilter(COMPARE_LT, lt_val); break; case LEFILTER: if (optarg) le_val = getInt(optarg); else le_val = 0; // cout << "OPT=" << ch << " ARG=" << le_val << endl; if (currOp) currOp->addFilter(COMPARE_LE, le_val); break; case NEFILTER: case 'n': if (optarg) ne_val = getInt(optarg); else ne_val = 0; // cout << "OPT=" << ch << " ARG=" << ne_val << endl; if (currOp) currOp->addFilter(COMPARE_NE, ne_val); break; case BOP: case 'b': if (optarg) bop_val = getInt(optarg); else bop_val = 1; // assume AND // cout << "OPT=" << ch << " ARG=" << bop_val << endl; if (currOp) currOp->BOP(bop_val ? BOP_AND : BOP_OR); break; case LISTOP: case 'l': /** if (optarg) listOid=getInt(optarg); else listOid=0; cout << "OPT=" << ch << " LISTOP " << listOid << endl; **/ list = true; break; case LOOPBACKOP: case 'p': if (optarg) loopback_count = getInt(optarg); // cout << "OPT=" << ch << " LOOPBACKOP " << loopback_count << endl; currOp = NULL; currOp = new OidOperation(0, OidOperation::LOOPBACK, pidId); OpList.push_back(currOp); break; case 'c': case CONCURRENT: concurrent_flag = true; // cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl; break; case LBIDTRACE: lbidtrace_flag = true; // cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl; if (currOp) currOp->setLbidTraceOn(); break; case PMPROFILE: pmprofile_flag = true; // cout << "OPT=" << ch << " CONCURRENT " << concurrent_flag << endl; if (currOp) currOp->setPMProfileOn(); break; case 'q': case QUERYOP: query_flag = true; // cout << "OPT=" << ch << " QUERY FLAG " << query_flag << endl; break; case '?': default: cout << "optarg " << optarg << endl; usage(); } if (list == true) break; } // while // if list is requested, print the listing and exit // vector BatchScanThreads; vector BatchStepThreads; vector BatchFiltThreads; vector ColScanThreads; vector ColStepThreads; vector DictScanThreads; vector DictSigThreads; vector thrArray; if (query_flag) { cout << "starting batch query thread\n"; struct BatchQueryThr* qt = new struct BatchQueryThr(OpList); thread* t1 = new thread(*qt); if (concurrent_flag) thrArray.push_back(t1); else t1->join(); } else if (list) { doListOp(); } else { for (uint32_t i = 0; i < OpList.size(); i++) { if (OpList[i]->OpType() == OidOperation::LOOPBACK) { doLoopBack(loopback_count); } else if (OpList[i]->OpType() == OidOperation::SCAN) { struct ColScanThr* cst = new struct ColScanThr(*OpList[i]); ColScanThreads.push_back(cst); thread* t1 = new thread(*cst); if (concurrent_flag) thrArray.push_back(t1); else { t1->join(); delete t1; } } else if (OpList[i]->OpType() == OidOperation::BLOCK) { struct ColStepThr* cst = new struct ColStepThr(*OpList[i]); ColStepThreads.push_back(cst); thread* t1 = new thread(*cst); if (concurrent_flag) thrArray.push_back(t1); else { t1->join(); delete t1; } } else if (OpList[i]->OpType() == OidOperation::BATCHSCAN) { cout << "starting batch scan thread\n"; struct BatchScanThr* cst = new struct BatchScanThr(*OpList[i]); BatchScanThreads.push_back(cst); thread* t1 = new thread(*cst); if (concurrent_flag) thrArray.push_back(t1); else { t1->join(); delete t1; } } else if (OpList[i]->OpType() == OidOperation::BATCHSTEP) { cout << "starting batch step thread\n"; struct BatchStepThr* cst = new struct BatchStepThr(*OpList[i]); BatchStepThreads.push_back(cst); thread* t1 = new thread(*cst); if (concurrent_flag) thrArray.push_back(t1); else { t1->join(); delete t1; } } else if (OpList[i]->OpType() == OidOperation::BATCHFILT) { cout << "starting batch filt thread\n"; struct BatchFiltThr* cst = new struct BatchFiltThr(*OpList[i]); BatchFiltThreads.push_back(cst); thread* t1 = new thread(*cst); if (concurrent_flag) thrArray.push_back(t1); else { t1->join(); delete t1; } i += 2; } } // for } // else // join threads to main for (uint32_t i = 0; i < thrArray.size(); i++) thrArray[i]->join(); // clean up for (uint32_t i = 0; i < OpList.size(); i++) delete OpList[i]; OpList.clear(); for (uint32_t i = 0; i < BatchScanThreads.size(); i++) delete BatchScanThreads[i]; BatchScanThreads.clear(); for (uint32_t i = 0; i < BatchStepThreads.size(); i++) delete BatchStepThreads[i]; BatchStepThreads.clear(); for (uint32_t i = 0; i < BatchFiltThreads.size(); i++) delete BatchFiltThreads[i]; BatchFiltThreads.clear(); for (uint32_t i = 0; i < ColScanThreads.size(); i++) delete ColScanThreads[i]; ColScanThreads.clear(); for (uint32_t i = 0; i < ColStepThreads.size(); i++) delete ColStepThreads[i]; ColStepThreads.clear(); for (uint32_t i = 0; i < DictScanThreads.size(); i++) delete DictScanThreads[i]; DictScanThreads.clear(); for (uint32_t i = 0; i < DictSigThreads.size(); i++) delete DictSigThreads[i]; DictSigThreads.clear(); for (uint32_t i = 0; i < thrArray.size(); i++) delete thrArray[i]; thrArray.clear(); } // main()