diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index b340fbeaa..c8d36b379 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -95,7 +95,7 @@ void fix_column_length(SchemaObject* elem, const CHARSET_INFO* def_cs) { %} %expect 17 -%pure-parser +%define api.pure %lex-param {void * scanner} %parse-param {struct ddlpackage::pass_to_bison * x} diff --git a/dbcon/dmlpackage/dml.y b/dbcon/dmlpackage/dml.y index 73dcf26ea..3bc62ecb8 100644 --- a/dbcon/dmlpackage/dml.y +++ b/dbcon/dmlpackage/dml.y @@ -90,7 +90,7 @@ char* copy_string(const char *str); } %} -%pure-parser +%define api.pure %lex-param {void * scanner} %parse-param {void * scanner} %debug diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index 1bbff02b4..b025005b8 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -36,7 +36,6 @@ set(joblist_LIB_SRCS pcolstep.cpp pdictionary.cpp pdictionaryscan.cpp - primitivemsg.cpp pseudocc-jl.cpp resourcedistributor.cpp resourcemanager.cpp diff --git a/dbcon/joblist/bpp-jl.h b/dbcon/joblist/bpp-jl.h index fc64fae2f..4da0d5e1c 100644 --- a/dbcon/joblist/bpp-jl.h +++ b/dbcon/joblist/bpp-jl.h @@ -28,7 +28,6 @@ // // /** @file */ -// #include "primitivemsg.h" #include "bytestream.h" #include "messagequeue.h" #include "serializeable.h" diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a960e7374..67d4d7747 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -199,7 +199,7 @@ void DistributedEngineComm::reset() } DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) - : fRm(rm), fLBIDShift(fRm->getPsLBID_Shift()), pmCount(0), fIsExeMgr(isExeMgr) +: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr) { Setup(); } @@ -250,10 +250,6 @@ void DistributedEngineComm::Setup() if (newPmCount == 0) writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs", LOG_TYPE_CRITICAL); - // This needs to make sense when compared to the extent size - // fLBIDShift = static_cast(config::Config::uFromText(fConfig->getConfig(section, - // "LBID_Shift"))); - auto* config = fRm->getConfig(); std::vector pmsAddressesAndPorts; for (size_t i = 1; i <= newPmCount; ++i) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 51ae885fe..0ec286289 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -269,7 +269,6 @@ class DistributedEngineComm boost::mutex fMlock; // sessionMessages mutex std::vector > fWlock; // PrimProc socket write mutexes bool fBusy; - unsigned fLBIDShift; volatile uint32_t pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex fSetupMutex; diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index d85c40a4f..61006e9e8 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -185,14 +185,8 @@ struct JobInfo , maxElems(rm->getHjMaxElems()) , flushInterval(rm->getJLFlushInterval()) , fifoSize(rm->getJlFifoSize()) - , fifoSizeLargeSideHj(rm->getHjFifoSizeLargeSide()) - , scanLbidReqLimit(rm->getJlScanLbidReqLimit()) - , scanLbidReqThreshold(rm->getJlScanLbidReqThreshold()) - , tempSaveSize(rm->getScTempSaveSize()) , logger(new Logger()) , traceFlags(0) - , tupleDLMaxSize(rm->getTwMaxSize()) - , tupleMaxBuckets(rm->getTwMaxBuckets()) , projectingTableOID(0) , isExeMgr(false) , trace(false) @@ -226,18 +220,8 @@ struct JobInfo JobStepVectorStack stack; uint32_t flushInterval; uint32_t fifoSize; - uint32_t fifoSizeLargeSideHj; - //...joblist does not use scanLbidReqLimit and SdanLbidReqThreshold. - //...They are actually used by pcolscan and pdictionaryscan, but - //...we have joblist get and report the values here since they - //...are global to the job. - uint32_t scanLbidReqLimit; - uint32_t scanLbidReqThreshold; - uint32_t tempSaveSize; SPJL logger; uint32_t traceFlags; - uint64_t tupleDLMaxSize; - uint32_t tupleMaxBuckets; SErrorInfo errorInfo; execplan::CalpontSystemCatalog::OID* projectingTableOID; // DeliveryWSDLs get a reference to this bool isExeMgr; diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 60985cc8f..26809d394 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -2097,9 +2097,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMg oss << endl; oss << endl << "job parms: " << endl; oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems - << ", flushInterval = " << jobInfo.flushInterval << ", fifoSize = " << jobInfo.fifoSize - << ", ScanLimit/Threshold = " << jobInfo.scanLbidReqLimit << "/" << jobInfo.scanLbidReqThreshold - << endl; + << ", flushInterval = " << jobInfo.flushInterval << ", fifoSize = " << jobInfo.fifoSize << endl; oss << "UUID: " << jobInfo.uuid << endl; oss << endl << "job filter steps: " << endl; ostream_iterator oIter(oss, "\n"); diff --git a/dbcon/joblist/pcolscan.cpp b/dbcon/joblist/pcolscan.cpp index 6f5d51448..2a3e64fba 100644 --- a/dbcon/joblist/pcolscan.cpp +++ b/dbcon/joblist/pcolscan.cpp @@ -53,64 +53,6 @@ using namespace execplan; //#define DEBUG 1 //#define DEBUG2 1 -namespace -{ -//// const uint32_t defaultScanLbidReqLimit = 10000; -//// const uint32_t defaultScanLbidReqThreshold = 5000; -// -// struct pColScanStepPrimitive -//{ -// pColScanStepPrimitive(pColScanStep* pColScanStep) : fPColScanStep(pColScanStep) -// {} -// pColScanStep *fPColScanStep; -// void operator()() -// { -// try -// { -// fPColScanStep->sendPrimitiveMessages(); -// } -// catch(std::exception& re) -// { -// string msg = re.what(); -// cerr << "pColScanStep: send thread threw an exception: " << msg << endl; -// -// //Whoa! is this really what we want to do? It's not clear that any good can be had by -// //sticking around, but this seems drastic... -// if (msg.find("there are no primitive processors") != string::npos) -// { -// SPJL logger = fPColScanStep->logger(); -// logger->logMessage(LOG_TYPE_CRITICAL, LogNoPrimProcs, Message::Args(), -//LoggingID(5)); exit(1); -// } -// } -// } -//}; -// -// struct pColScanStepAggregater -//{ -// pColScanStepAggregater(pColScanStep* pColScanStep, uint64_t index) : -// fPColScanStepCol(pColScanStep), fThreadId(index) -// {} -// pColScanStep *fPColScanStepCol; -// uint64_t fThreadId; -// -// void operator()() -// { -// try -// { -// fPColScanStepCol->receivePrimitiveMessages(fThreadId); -// } -// catch(std::exception& re) -// { -// cerr << fPColScanStepCol->toString() << ": receive thread threw an exception: " << -//re.what() << endl; -// } -// } -//}; -// - -} // namespace - namespace joblist { pColScanStep::pColScanStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t, @@ -118,20 +60,11 @@ pColScanStep::pColScanStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OI : JobStep(jobInfo) , fRm(jobInfo.rm) , fMsgHeader() - , fNumThreads(fRm->getJlNumScanReceiveThreads()) , fFilterCount(0) , fOid(o) , fTableOid(t) , fColType(ct) , fBOP(BOP_OR) - , sentCount(0) - , recvCount(0) - , fScanLbidReqLimit(fRm->getJlScanLbidReqLimit()) - , fScanLbidReqThreshold(fRm->getJlScanLbidReqThreshold()) - , fStopSending(false) - , fSingleThread(false) - , fPhysicalIO(0) - , fCacheIO(0) , fNumBlksSkipped(0) , fMsgBytesIn(0) , fMsgBytesOut(0) @@ -234,332 +167,6 @@ pColScanStep::pColScanStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OI throw runtime_error("pColScan: Block size and column width must be a power of 2"); } -pColScanStep::~pColScanStep() -{ - // pthread_mutex_destroy(&mutex); - // pthread_mutex_destroy(&dlMutex); - // pthread_mutex_destroy(&cpMutex); - // pthread_cond_destroy(&condvar); - // pthread_cond_destroy(&condvarWakeupProducer); - // delete lbidList; - // delete [] fProducerThread; - // if (fDec) - // fDec->removeQueue(uniqueID); // in case it gets aborted -} - -//------------------------------------------------------------------------------ -// Initialize configurable parameters -//------------------------------------------------------------------------------ -void pColScanStep::initializeConfigParms() -{ - // const string section ( "JobList" ); - // const string sendLimitName ( "ScanLbidReqLimit" ); - // const string sendThresholdName ( "ScanLbidReqThreshold" ); - // const string numReadThreadsName ( "NumScanReceiveThreads" ); - // Config* cf = Config::makeConfig(); - - // string strVal; - - //...Get the tuning parameters that throttle msgs sent to primproc - //...fScanLbidReqLimit puts a cap on how many LBID's we will request from - //... primproc, before pausing to let the consumer thread catch up. - //... Without this limit, there is a chance that PrimProc could flood - //... ExeMgr with thousands of messages that will consume massive - //... amounts of memory for a 100 gigabyte database. - //...fScanLbidReqThreshold is the level at which the number of outstanding - //... LBID reqs must fall below, before the producer can send more LBIDs. - // strVal = cf->getConfig(section, sendLimitName); - // if (strVal.size() > 0) - // fScanLbidReqLimit = static_cast(Config::uFromText(strVal)); - // - // strVal = cf->getConfig(section, sendThresholdName); - // if (strVal.size() > 0) - // fScanLbidReqThreshold = static_cast(Config::uFromText(strVal)); - // - // fNumThreads = 8; - // strVal = cf->getConfig(section, numReadThreadsName); - // if (strVal.size() > 0) - // fNumThreads = static_cast(Config::uFromText(strVal)); - - // fProducerThread = new SPTHD[fNumThreads]; -} - -void pColScanStep::startPrimitiveThread() -{ - // fConsumerThread.reset(new boost::thread(pColScanStepPrimitive(this))); -} - -void pColScanStep::startAggregationThread() -{ - // for (uint32_t i = 0; i < fNumThreads; i++) - // fProducerThread[i].reset(new boost::thread(pColScanStepAggregater(this, i))); -} - -void pColScanStep::run() -{ - // if (traceOn()) - // { - // syslogStartStep(16, // exemgr subsystem - // std::string("pColScanStep")); // step name - // } - // - // //"consume" input datalist. In this case, there is no IDL, we just send one primitive - // startPrimitiveThread(); - // //produce output datalist - // //Don't start this yet...see below - // //startAggregationThread(); -} - -void pColScanStep::join() -{ - // fConsumerThread->join(); - // if ( !fSingleThread ) { - // for (uint32_t i = 0; i < fNumThreads; i++) - // fProducerThread[i]->join(); - // } -} - -void pColScanStep::sendPrimitiveMessages() -{ - // //The presence of an input DL means we (probably) have a pDictionaryScan step feeding this scan step - // // a list of tokens to get the rids for. Convert the input tokens to a filter string. - // if (fInputJobStepAssociation.outSize() > 0) - // { - // addFilters(); - // if (fTableOid >= 3000) - // cout << toString() << endl; - // //If we got no input rids (as opposed to no input DL at all) then there were no matching rows from - // // the previous step, so this step should not return any rows either. This would be the case, for - // // instance, if P_NAME LIKE '%xxxx%' produced no signature matches. - // if (fFilterCount == 0) { - // rDoNothing=true; - // startAggregationThread(); - // return; - // } - // } - // - // startAggregationThread(); - // - // /* for all the blocks that need to be sent - // * build out a message for each block with the primitive message header filled - // * out and all the NOPS and BOP structures as well. - // * Then serialize it to a BytStream and then send it on its way - // */ - // - // LBIDRange_v::iterator it; - // uint64_t fbo; - // - // ISMPacketHeader ism; - // ism.Flags = planFlagsToPrimFlags(fTraceFlags); - // ism.Command=COL_BY_SCAN_RANGE; - // ism.Size = sizeof(ISMPacketHeader) + sizeof(ColByScanRangeRequestHeader) + fFilterString.length(); - // ism.Type=2; - // //bool firstWrite = true; - // - // //...Counter used to track the number of LBIDs we are requesting from - // //...primproc in the current set of msgs, till we reach fScanLbidReqLimit - // uint32_t runningLbidCount = 0; - // bool exitLoop = false; - // const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0); - // - // for (it = lbidRanges.begin(); it != lbidRanges.end(); it++) - // { - // BRM::LBID_t lbid = (*it).start; - // - // fbo = getFBO(lbid); - // if (hwm < fbo) - // continue; - // - // if (fOid >= 3000 && lbidList->CasualPartitionDataType(fColType.colDataType, fColType.colWidth) ) - // { - // int64_t Min=0; - // int64_t Max=0; - // int64_t SeqNum=0; - // bool MinMaxValid=true; - // bool cpPredicate=true; - // - // // can we consolidate these crit sections? - // cpMutex.lock(); //pthread_mutex_lock(&cpMutex); - // MinMaxValid = lbidList->GetMinMax(Min, Max, SeqNum, lbid, 0); - // - // if (MinMaxValid) - // { - // cpPredicate=lbidList->CasualPartitionPredicate(Min, - // Max, - // &fFilterString, - // fFilterCount, - // fColType, - // fBOP) || - //ignoreCP; - // } - // cpMutex.unlock(); //pthread_mutex_unlock(&cpMutex); - // - // if (cpPredicate==false){ //don't scan this extent - //#ifdef DEBUG - // cout << "Scan Skip " << lbid << endl; - //#endif - // //...Track the number of LBIDs we skip due to Casual Partioning. - // //...We use the same equation we use to initialize remainingLbids - // //...in the code that follows down below this. - // fNumBlksSkipped += ( (hwm > (fbo + it->size - 1)) ? - // (it->size) : (hwm - fbo + 1) ); - // continue; - // } - //#ifdef DEBUG - // else - // cout << "Scan " << lbid << endl; - //#endif - // - // } - // - // LBID_t msgLbidStart = it->start; - // uint32_t remainingLbids = - // ( (hwm > (fbo + it->size - 1)) ? (it->size) : (hwm - fbo + 1) ); - // uint32_t msgLbidCount = 0; - // - // while ( remainingLbids > 0 ) - // { - // //...Break up this range of LBIDs when we reach the msg size - // //...limit for one request (fScanLbidReqLimit) - // if ( (runningLbidCount + remainingLbids) >= fScanLbidReqLimit ) - // { - // msgLbidCount = fScanLbidReqLimit - runningLbidCount; - // sendAPrimitiveMessage(ism, msgLbidStart, msgLbidCount ); - // //...Wait for the consuming thread to catch up if our - // //...backlog of work is >= the allowable threshold limit - // - // mutex.lock(); //pthread_mutex_lock(&mutex); - // sentCount += msgLbidCount; - // if (recvWaiting) - // condvar.notify_all(); //pthread_cond_broadcast(&condvar); //signal consumer to - //resume - // - // while ( ((sentCount - recvCount) >= fScanLbidReqThreshold) - // && !fStopSending ) - // { - // sendWaiting = true; - //#ifdef DEBUG2 - // if (fOid >= 3000) - // cout << "pColScanStep producer WAITING: " << - // "st:" << fStepId << - // "; sentCount-" << sentCount << - // "; recvCount-" << recvCount << - // "; threshold-" << fScanLbidReqThreshold << endl; - //#endif - // condvarWakeupProducer.wait(mutex); //pthread_cond_wait ( &condvarWakeupProducer, &mutex - //); #ifdef DEBUG2 if (fOid >= 3000) cout << "pColScanStep producer RESUMING: " << "st:" << fStepId << endl; - //#endif - // sendWaiting = false; - // } - // - // //...Set flag to quit if consumer thread tells us to - // if (fStopSending) - // exitLoop = true; - // - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // runningLbidCount = 0; - // } - // else - // { - // msgLbidCount = remainingLbids; - // - // sendAPrimitiveMessage(ism, msgLbidStart, msgLbidCount ); - // mutex.lock(); //pthread_mutex_lock(&mutex); - // sentCount += msgLbidCount; - // if (recvWaiting) - // condvar.notify_all(); //pthread_cond_broadcast(&condvar); //signal consumer to - //resume - // - // //...Set flag to quit if consumer thread tells us to - // if (fStopSending) - // exitLoop = true; - // - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // runningLbidCount += msgLbidCount; - // } - // - // //...If consuming thread has quit, then we should do the same. - // //...This can happen if consuming thread receives empty ByteStream - // if (exitLoop) - // break; - // - // remainingLbids -= msgLbidCount; - // msgLbidStart += msgLbidCount; - // } - // - // if (exitLoop) - // break; - // - // } // end of loop through LBID ranges to be requested from primproc - // - // mutex.lock(); //pthread_mutex_lock(&mutex); - // finishedSending = true; - // if (recvWaiting) - // condvar.notify_all(); //pthread_cond_broadcast(&condvar); - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - //// cerr << "send side exiting" << endl; - // - //#ifdef DEBUG2 - // if (fOid >= 3000) - // { - // time_t t = time(0); - // char timeString[50]; - // ctime_r(&t, timeString); - // timeString[strlen(timeString)-1 ] = '\0'; - // cout << "pColScanStep Finished sending primitives for: " << - // fOid << " at " << timeString << endl; - // } - //#endif - // -} - -//------------------------------------------------------------------------------ -// Construct and send a single primitive message to primproc -//------------------------------------------------------------------------------ -void pColScanStep::sendAPrimitiveMessage(ISMPacketHeader& ism, BRM::LBID_t msgLbidStart, - uint32_t msgLbidCount) -{ - // ByteStream bs; - // - // bs.load(reinterpret_cast(&ism), sizeof(ism)); - // - // fMsgHeader.LBID = msgLbidStart; - // fMsgHeader.DataSize = fColType.colWidth; - // fMsgHeader.DataType = fColType.colDataType; - // fMsgHeader.CompType = fColType.compressionType; - // if (fFilterCount > 0) - // fMsgHeader.OutputType = 3; // pairs - // else - // fMsgHeader.OutputType = OT_DATAVALUE; - // fMsgHeader.BOP = fBOP; - // fMsgHeader.NOPS = fFilterCount; - // fMsgHeader.NVALS = 0; - // fMsgHeader.Count = msgLbidCount; - // fMsgHeader.Hdr.SessionID = fSessionId; - // //fMsgHeader.Hdr.StatementID = 0; - // fMsgHeader.Hdr.TransactionID = fTxnId; - // fMsgHeader.Hdr.VerID = fVerId; - // fMsgHeader.Hdr.StepID = fStepId; - // fMsgHeader.Hdr.UniqueID = uniqueID; - // - // bs.append(reinterpret_cast(&fMsgHeader), - // sizeof(fMsgHeader)); - // bs += fFilterString; - // - //#ifdef DEBUG2 - // if (fOid >= 3000) - // cout << "pColScanStep producer st: " << fStepId << - // ": sending req for lbid start " << msgLbidStart << - // "; lbid count " << msgLbidCount << endl; - //#endif - // - // fMsgBytesOut += bs.lengthWithHdrOverhead(); - // fDec->write(bs); - // fMsgsToPm++; -} - struct CPInfo { CPInfo(int64_t MIN, int64_t MAX, uint64_t l) : min(MIN), max(MAX), LBID(l){}; @@ -568,327 +175,6 @@ struct CPInfo uint64_t LBID; }; -void pColScanStep::receivePrimitiveMessages(uint64_t tid) -{ - // AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); - // DataList_t* dlp = dl->dataList(); - // FifoDataList *fifo = dl->fifoDL(); - // BucketDL *bucket = dynamic_cast *>(dlp); - // ZDL *zdl = dynamic_cast *>(dlp); - // int64_t l_ridsReturned = 0; - // uint64_t l_physicalIO = 0, l_cachedIO = 0; - // uint64_t fbo; - // uint64_t ridBase; - // vector v; - // UintRowGroup rw; - // vector > bsv; - // uint32_t i, k, size, bsLength; - // bool lastThread = false; - // vector cpv; - // - // if (bucket || zdl) - // dlp->setMultipleProducers(true); - // - // mutex.lock(); //pthread_mutex_lock(&mutex); - // - // // count the LBIDs - // for (; !rDoNothing; ) { - // - // // sync with the send side - // while (!finishedSending && sentCount == recvCount) { - // recvWaiting++; - // condvar.wait(mutex); //pthread_cond_wait(&condvar, &mutex); - // recvWaiting--; - // } - // if (sentCount == recvCount && finishedSending) { - //// cout << "done recving" << endl; - // break; - // } - // - // fDec->read_some(uniqueID, fNumThreads, bsv); - // for (unsigned int jj=0; jjlengthWithHdrOverhead(); - // } - // - // - // size = bsv.size(); - // - // if (size == 0) { - // /* XXXPAT: Need to give other threads a chance to update recvCount. - // As of 2/25/08, each BS contains multiple responses. With the current - // protocol, the exact # isn't known until they're processed. Threads - // waiting for more input will have to busy wait on either recvCount - // being updated or input. It's only an issue at the tail end of the - // responses, and probably doesn't matter then either. */ - // - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // usleep(1000); // 1ms Good? - // mutex.lock(); //pthread_mutex_lock(&mutex); - // continue; - // } - // - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) - // dlTimes.setFirstReadTime(); - // if (fOid>=3000) dlTimes.setLastReadTime(); - // - //// cerr << "got a response of " << size << " msgs\n"; - // - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // uint32_t msgCount = 0; - // for (i = 0; i < size; i++) { - // const ByteStream::byte* bsp = bsv[i]->buf(); - // - // bsLength = bsv[i]->length(); - // k = 0; - // while (k < bsLength) { - // ++msgCount; - //// cout << "got msg " << msgCount << " k = " << k << endl; - // k += sizeof(ISMPacketHeader); - // const ColResultHeader* crh = reinterpret_cast(&bsp[k]); - // // get the ColumnResultHeader out of the bytestream - // k += sizeof(ColResultHeader); - // - // l_cachedIO += crh->CacheIO; - // l_physicalIO += crh->PhysicalIO; - // fbo = getFBO(crh->LBID); - // ridBase = fbo << rpbShift; - // - // for(int j = 0; j < crh->NVALS; j++) - // { - // uint64_t dv; - // uint64_t rid; - // - // if (crh->OutputType == OT_DATAVALUE) { - // if (isEmptyVal(&bsp[k])) { - // k += fColType.colWidth; - // continue; - // } - // rid = j + ridBase; - // } - // else { - // rid = *((const uint16_t *) &bsp[k]) + ridBase; - // k += sizeof(uint16_t); - // } - // - // switch (fColType.colWidth) { - // case 8: dv = *((const uint64_t *) &bsp[k]); k += 8; break; - // case 4: dv = *((const uint32_t *) &bsp[k]); k += 4; break; - // case 2: dv = *((const uint16_t *) &bsp[k]); k += 2; break; - // case 1: dv = *((const uint8_t *) &bsp[k]); ++k; break; - // default: - // throw runtime_error("pColStep: invalid column - //width!"); - // } - // - // v.push_back(ElementType(rid, dv)); - // ++l_ridsReturned; - //#ifdef DEBUG - //// if (fOid >=3000) - //// cout << " -- inserting <" << rid << ", " << dv << ">" << - ///endl; - //#endif - // // per row operations... - // } // for - // - // // per block operations... - // - //#ifdef DEBUG - // cout << "recvPrimMsgs Oid " << fOid - // << " valid " << crh->ValidMinMax - // << " LBID " << crh->LBID - // << " Mn/Mx " << crh->Min << "/" << crh->Max - // << " nvals " << crh->NVALS - // << "/" << l_ridsReturned << endl; - //#endif - // if (fOid >= 3000 && crh->ValidMinMax) - // cpv.push_back(CPInfo(crh->Min, crh->Max, crh->LBID)); - // } - // // per ByteStream operations... - // } - // // per read operations.... - // - // if (bucket) { - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // - // bucket->insert(v); - // } - // else if (zdl) { - // zdl->insert(v); - // } - // else { - // size = v.size(); - // if (size>0) - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // - // dlMutex.lock(); //pthread_mutex_lock(&dlMutex); - // for (i = 0; i < size; ++i) { - // rw.et[rw.count++] = v[i]; - // if (rw.count == rw.ElementsPerGroup) - // { - // fifo->insert(rw); - // rw.count = 0; - // } - // } - // - // if (rw.count > 0) - // { - // fifo->insert(rw); - // rw.count = 0; - // } - // - // dlMutex.unlock(); //pthread_mutex_unlock(&dlMutex); - // } - // v.clear(); - // - // size = cpv.size(); - // if (size > 0) { - // cpMutex.lock(); //pthread_mutex_lock(&cpMutex); - // for (i = 0; i < size; i++) { - // CPInfo *cpi = &(cpv[i]); - // lbidList->UpdateMinMax(cpi->min, cpi->max, cpi->LBID, fColType.colDataType); - // } - // cpMutex.unlock(); //pthread_mutex_unlock(&cpMutex); - // cpv.clear(); - // } - // - // mutex.lock(); //pthread_mutex_lock(&mutex); - // recvCount += msgCount; - // //...If producer is waiting, and we have gone below our threshold value, - // //...then we signal the producer to request more data from primproc - // if ( (sendWaiting) && ( (sentCount - recvCount) < fScanLbidReqThreshold ) ) - // { - //#ifdef DEBUG2 - // if (fOid >= 3000) - // cout << "pColScanStep consumer signaling producer for more data: "<< - // "st:" << fStepId << - // "; sentCount-" << sentCount << - // "; recvCount-" << recvCount << - // "; threshold-" << fScanLbidReqThreshold << endl; - //#endif - // condvarWakeupProducer.notify_one(); //pthread_cond_signal(&condvarWakeupProducer); - // } - // } // end of loop to read LBID responses from primproc - // - // fPhysicalIO += l_physicalIO; - // fCacheIO += l_cachedIO; - // ridsReturned += l_ridsReturned; - //// cerr << "out of the main loop " << recvExited << endl; - // if (++recvExited == fNumThreads) { - // //...Casual partitioning could cause us to do no processing. In that - // //...case these time stamps did not get set. So we set them here. - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // dlTimes.setLastReadTime(); - // dlTimes.setFirstInsertTime(); - // } - // if (fOid>=3000) dlTimes.setEndOfInputTime(); - // - // //@bug 699: Reset StepMsgQueue - // fDec->removeQueue(uniqueID); - // - // if (fifo) - // fifo->endOfInput(); - // else - // dlp->endOfInput(); - // lastThread = true; - // } - // - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // if (fTableOid >= 3000 && lastThread) - // { - // //...Construct timestamp using ctime_r() instead of ctime() not - // //...necessarily due to re-entrancy, but because we want to strip - // //...the newline ('\n') off the end of the formatted string. - // time_t t = time(0); - // char timeString[50]; - // ctime_r(&t, timeString); - // timeString[strlen(timeString)-1 ] = '\0'; - // - // FifoDataList* pFifo = 0; - // uint64_t totalBlockedReadCount = 0; - // uint64_t totalBlockedWriteCount = 0; - // - // //...Sum up the blocked FIFO reads for all input associations - // size_t inDlCnt = fInputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedReadCount += pFifo->blockedReadCount(); - // } - // } - // - // //...Sum up the blocked FIFO writes for all output associations - // size_t outDlCnt = fOutputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedWriteCount += pFifo->blockedWriteCount(); - // } - // } - // - // //...Roundoff inbound msg byte count to nearest KB for display; - // //...no need to do so for outbound, because it should be small. - // uint64_t msgBytesInKB = fMsgBytesIn >> 10; - // if (fMsgBytesIn & 512) - // msgBytesInKB++; - // // @bug 807 - // if (fifo) - // fifo->totalSize(ridsReturned); - // - // if (traceOn()) - // { - // //...Print job step completion information - // ostringstream logStr; - // logStr << "ses:" << fSessionId << - // " st: " << fStepId << " finished at " << - // timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" << - // fCacheIO << "; MsgsSent-" << fMsgsToPm << "; MsgsRcvd-" << recvCount << - // "; BlockedFifoIn/Out-" << totalBlockedReadCount << - // "/" << totalBlockedWriteCount << - // "; output size-" << ridsReturned << endl << - // "\tPartitionBlocksEliminated-" << fNumBlksSkipped << - // "; MsgBytesIn-" << msgBytesInKB << "KB" << - // "; MsgBytesOut-" << fMsgBytesOut << "B" << endl << - // "\t1st read " << dlTimes.FirstReadTimeString() << - // "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << - // JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(),dlTimes.FirstReadTime()) << - // "s" << endl; - // - // logEnd(logStr.str().c_str()); - // - // syslogReadBlockCounts(16, // exemgr subsystem - // fPhysicalIO, // # blocks read from disk - // fCacheIO, // # blocks read from cache - // fNumBlksSkipped); // # casual partition block hits - // syslogProcessingTimes(16, // exemgr subsystem - // dlTimes.FirstReadTime(), // first datalist read - // dlTimes.LastReadTime(), // last datalist read - // dlTimes.FirstInsertTime(), // first datalist write - // dlTimes.EndOfInputTime()); // last (endOfInput) datalist write - // syslogEndStep(16, // exemgr subsystem - // totalBlockedReadCount, // blocked datalist input - // totalBlockedWriteCount, // blocked datalist output - // fMsgBytesIn, // incoming msg byte count - // fMsgBytesOut); // outgoing msg byte count - // } - // - // } - // - // if (fOid >=3000 && lastThread) - // lbidList->UpdateAllPartitionInfo(); - // - //// cerr << "recv thread exiting" << endl; -} - void pColScanStep::addFilter(int8_t COP, float value) { fFilterString << (uint8_t)COP; @@ -937,22 +223,6 @@ void pColScanStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag) fFilterCount++; } -void pColScanStep::setBOP(int8_t B) -{ - fBOP = B; -} - -void pColScanStep::setSingleThread(bool b) -{ - fSingleThread = b; - fNumThreads = 1; -} - -void pColScanStep::setOutputType(int8_t OutputType) -{ - fOutputType = OutputType; -} - const string pColScanStep::toString() const { ostringstream oss; @@ -983,10 +253,7 @@ uint64_t pColScanStep::getFBO(uint64_t lbid) { lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1; - // lastLBID = extents[i].range.start + (extents[i].range.size * 1024) - 1; - // cerr << "start: " << extents[i].range.start << " end:" << lastLBID <= (uint64_t)extents[i].range.start && lbid <= lastLBID) - // return (lbid - extents[i].range.start) + (extentSize * i); return (lbid - extents[i].range.start) + (i << divShift); } @@ -996,7 +263,6 @@ uint64_t pColScanStep::getFBO(uint64_t lbid) pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resourceManager()), fMsgHeader() { - fNumThreads = fRm->getJlNumScanReceiveThreads(); fFilterCount = rhs.filterCount(); fFilterString = rhs.filterString(); isFilterFeeder = rhs.getFeederFlag(); @@ -1005,14 +271,6 @@ pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resource fColType = rhs.colType(); fBOP = rhs.BOP(); fIsDict = rhs.isDictCol(); - sentCount = 0; - recvCount = 0; - fScanLbidReqLimit = fRm->getJlScanLbidReqLimit(); - fScanLbidReqThreshold = fRm->getJlScanLbidReqThreshold(); - fStopSending = false; - fSingleThread = false; - fPhysicalIO = 0; - fCacheIO = 0; fNumBlksSkipped = 0; fMsgBytesIn = 0; fMsgBytesOut = 0; @@ -1040,11 +298,6 @@ pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resource numExtents = extents.size(); extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE; lbidList = rhs.lbidList; - // pthread_mutex_init(&mutex, NULL); - // pthread_mutex_init(&dlMutex, NULL); - // pthread_mutex_init(&cpMutex, NULL); - // pthread_cond_init(&condvar, NULL); - // pthread_cond_init(&condvarWakeupProducer, NULL); finishedSending = sendWaiting = rDoNothing = false; recvWaiting = 0; recvExited = 0; @@ -1054,12 +307,7 @@ pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resource rpbShift = rhs.rpbShift; divShift = rhs.divShift; - // initializeConfigParms ( ); fTraceFlags = rhs.fTraceFlags; - // uniqueID = UniqueNumberGenerator::instance()->getUnique32(); - // if (fDec) - // fDec->addQueue(uniqueID); - // fProducerThread = new SPTHD[fNumThreads]; } void pColScanStep::addFilters() diff --git a/dbcon/joblist/pcolstep.cpp b/dbcon/joblist/pcolstep.cpp index 125491ebd..3a314a7d6 100644 --- a/dbcon/joblist/pcolstep.cpp +++ b/dbcon/joblist/pcolstep.cpp @@ -58,47 +58,6 @@ using namespace BRM; namespace joblist { -#if 0 -//const uint32_t defaultProjectBlockReqLimit = 32768; -//const uint32_t defaultProjectBlockReqThreshold = 16384; -struct pColStepPrimitive -{ - pColStepPrimitive(pColStep* pColStep) : fPColStep(pColStep) - {} - pColStep* fPColStep; - void operator()() - { - try - { - fPColStep->sendPrimitiveMessages(); - } - catch (exception& re) - { - cerr << "pColStep: send thread threw an exception: " << re.what() << - "\t" << this << endl; - } - } -}; - -struct pColStepAggregator -{ - pColStepAggregator(pColStep* pColStep) : fPColStepCol(pColStep) - {} - pColStep* fPColStepCol; - void operator()() - { - try - { - fPColStepCol->receivePrimitiveMessages(); - } - catch (exception& re) - { - cerr << fPColStepCol->toString() << ": recv thread threw an exception: " << re.what() << endl; - } - } -}; -#endif - pColStep::pColStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t, const CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo) : JobStep(jobInfo) @@ -117,14 +76,8 @@ pColStep::pColStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t, , fIsDict(false) , isEM(jobInfo.isExeMgr) , ridCount(0) - , fFlushInterval(jobInfo.flushInterval) , fSwallowRows(false) - , fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()) - , fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()) - , fStopSending(false) , isFilterFeeder(false) - , fPhysicalIO(0) - , fCacheIO(0) , fNumBlksSkipped(0) , fMsgBytesIn(0) , fMsgBytesOut(0) @@ -135,11 +88,6 @@ pColStep::pColStep(CalpontSystemCatalog::OID o, CalpontSystemCatalog::OID t, int err, i; uint32_t mask; - if (fFlushInterval == 0 || !isEM) - fOutputType = OT_BOTH; - else - fOutputType = OT_TOKEN; - if (fOid < 1000) throw runtime_error("pColStep: invalid column"); @@ -279,15 +227,7 @@ pColStep::pColStep(const pColScanStep& rhs) , recvWaiting(false) , fIsDict(rhs.isDictCol()) , ridCount(0) - , - // Per Cindy, it's save to put fFlushInterval to be 0 - fFlushInterval(0) , fSwallowRows(false) - , fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()) - , fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()) - , fStopSending(false) - , fPhysicalIO(0) - , fCacheIO(0) , fNumBlksSkipped(0) , fMsgBytesIn(0) , fMsgBytesOut(0) @@ -390,15 +330,7 @@ pColStep::pColStep(const PassThruStep& rhs) , recvWaiting(false) , fIsDict(rhs.isDictCol()) , ridCount(0) - , - // Per Cindy, it's save to put fFlushInterval to be 0 - fFlushInterval(0) , fSwallowRows(false) - , fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()) - , fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()) - , fStopSending(false) - , fPhysicalIO(0) - , fCacheIO(0) , fNumBlksSkipped(0) , fMsgBytesIn(0) , fMsgBytesOut(0) @@ -474,107 +406,6 @@ pColStep::pColStep(const PassThruStep& rhs) sort(extents.begin(), extents.end(), ExtentSorter()); numExtents = extents.size(); - // uniqueID = UniqueNumberGenerator::instance()->getUnique32(); - // if (fDec) - // fDec->addQueue(uniqueID); - // initializeConfigParms ( ); -} - -pColStep::~pColStep() -{ - // join? - // delete lbidList; - // if (fDec) - // fDec->removeQueue(uniqueID); -} - -//------------------------------------------------------------------------------ -// Initialize configurable parameters -//------------------------------------------------------------------------------ -void pColStep::initializeConfigParms() -{ - // const string section ( "JobList" ); - // const string sendLimitName ( "ProjectBlockReqLimit" ); - // const string sendThresholdName ( "ProjectBlockReqThreshold" ); - // Config* cf = Config::makeConfig(); - // - // string strVal; - // uint64_t numVal; - - //...Get the tuning parameters that throttle msgs sent to primproc - //...fFilterRowReqLimit puts a cap on how many rids we will request from - //... primproc, before pausing to let the consumer thread catch up. - //... Without this limit, there is a chance that PrimProc could flood - //... ExeMgr with thousands of messages that will consume massive - //... amounts of memory for a 100 gigabyte database. - //...fFilterRowReqThreshhold is the level at which the number of outstanding - //... rids must fall below, before the producer can send more rids. - - // strVal = cf->getConfig(section, sendLimitName); - // if (strVal.size() > 0) - // { - // errno = 0; - // numVal = Config::uFromText(strVal); - // if ( errno == 0 ) - // fProjectBlockReqLimit = (uint32_t)numVal; - // } - // - // strVal = cf->getConfig(section, sendThresholdName); - // if (strVal.size() > 0) - // { - // errno = 0; - // numVal = Config::uFromText(strVal); - // if ( errno == 0 ) - // fProjectBlockReqThreshold = (uint32_t)numVal; - // } -} - -void pColStep::startPrimitiveThread() -{ - // pThread.reset(new boost::thread(pColStepPrimitive(this))); -} - -void pColStep::startAggregationThread() -{ - // cThread.reset(new boost::thread(pColStepAggregator(this))); -} - -void pColStep::run() -{ - // if (traceOn()) - // { - // syslogStartStep(16, // exemgr subsystem - // std::string("pColStep")); // step name - // } - // - // size_t sz = fInputJobStepAssociation.outSize(); - // idbassert(sz > 0); - // const AnyDataListSPtr& dl = fInputJobStepAssociation.outAt(0); - // DataList_t* dlp = dl->dataList(); - // DataList* strDlp = dl->stringDataList(); - // if ( dlp ) - // setRidList(dlp); - // else - // { - // setStrRidList( strDlp ); - // } - // //Sort can be set through the jobstep or the input JSA if fFlushinterval is 0 - // fToSort = (fFlushInterval) ? 0 : (!fToSort) ? fInputJobStepAssociation.toSort() : fToSort; - // fToSort = 0; - // //pthread_mutex_init(&mutex, NULL); - // //pthread_cond_init(&condvar, NULL); - // //pthread_cond_init(&flushed, NULL); - // startPrimitiveThread(); - // startAggregationThread(); -} - -void pColStep::join() -{ - // pThread->join(); - // cThread->join(); - // //pthread_mutex_destroy(&mutex); - // //pthread_cond_destroy(&condvar); - // //pthread_cond_destroy(&flushed); } void pColStep::addFilter(int8_t COP, float value) @@ -636,732 +467,6 @@ void pColStep::addFilter(int8_t COP, const int128_t& value, uint8_t roundFlag) fFilterCount++; } -void pColStep::setRidList(DataList* dl) -{ - ridList = dl; -} - -void pColStep::setStrRidList(DataList* strDl) -{ - strRidList = strDl; -} - -void pColStep::setBOP(int8_t b) -{ - fBOP = b; -} - -void pColStep::setOutputType(int8_t OutputType) -{ - fOutputType = OutputType; -} - -void pColStep::setSwallowRows(const bool swallowRows) -{ - fSwallowRows = swallowRows; -} - -void pColStep::sendPrimitiveMessages() -{ - // int it = -1; - // int msgRidCount = 0; - // int ridListIdx = 0; - // bool more = false; - // uint64_t absoluteRID = 0; - // int64_t msgLBID = -1; - // int64_t nextLBID = -1; - // int64_t msgLargeBlock = -1; - // int64_t nextLargeBlock = -1; - // uint16_t blockRelativeRID; - // uint32_t msgCount = 0; - // uint32_t sentBlockCount = 0; - // int msgsSkip=0; - // bool scan=false; - // bool scanThisBlock=false; - // ElementType e; - // UintRowGroup rw; - // StringElementType strE; - // StringRowGroup strRw; - // - // ByteStream msgRidList; - // ByteStream primMsg(MAX_BUFFER_SIZE); //the MAX_BUFFER_SIZE as of 8/20 - // - // NewColRequestHeader hdr; - // - // AnyDataListSPtr dl; - // FifoDataList *fifo = NULL; - // StringFifoDataList* strFifo = NULL; - // - // const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0); - // - // //The presence of more than 1 input DL means we (probably) have a pDictionaryScan step feeding this - //step - // // a list of tokens to get the rids for. Convert the input tokens to a filter string. We also have a - //rid - // // list as the second input dl - // if (fInputJobStepAssociation.outSize() > 1) - // { - // addFilters(); - // if (fTableOid >= 3000) - // cout << toString() << endl; - // //If we got no input rids (as opposed to no input DL at all) then there were no matching rows - //from - // // the previous step, so this step should not return any rows either. This would be the case, - //for - // // instance, if P_NAME LIKE '%xxxx%' produced no signature matches. - // if (fFilterCount == 0) - // { - // goto done; - // } - // } - // - // // determine which ranges/extents to eliminate from this step - // - //#ifdef DEBUG - // if (fOid>=3000) - // cout << "oid " << fOid << endl; - //#endif - // - // scanFlags.resize(numExtents); - // - // for (uint32_t idx=0; idx CasualPartitionPredicate( - // extents[idx].partition.cprange.loVal, - // extents[idx].partition.cprange.hiVal, - // &fFilterString, - // fFilterCount, - // fColType, - // fBOP) || ignoreCP; - // scanFlags[idx]=flag; - //#ifdef DEBUG - // if (fOid >= 3000 && flushInterval == 0) - // cout << (flag ? " will scan " : " will not scan ") - // << "extent with range " << extents[idx].partition.cprange.loVal - // << "-" << extents[idx].partition.cprange.hiVal << endl; - //#endif - // - // } - // - //// if (fOid>=3000) - //// cout << " " << scanFlags[idx]; - // } - //// if (scanFlags.size()>0) - //// cout << endl; - // - // // If there was more than 1 input DL, the first is a list of filters and the second is a list of rids, - // // otherwise the first is the list of rids. - // if (fInputJobStepAssociation.outSize() > 1) - // ridListIdx = 1; - // else - // ridListIdx = 0; - // - // dl = fInputJobStepAssociation.outAt(ridListIdx); - // ridList = dl->dataList(); - // if ( ridList ) - // { - // fifo = dl->fifoDL(); - // - // if (fifo) - // it = fifo->getIterator(); - // else - // it = ridList->getIterator(); - // } - // else - // { - // strRidList = dl->stringDataList(); - // strFifo = dl->stringDL(); - // - // if (strFifo) - // it = strFifo->getIterator(); - // else - // it = strRidList->getIterator(); - // } - // - // if (ridList) - // { - // if (fifo) - // { - // more = fifo->next(it, &rw); - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // } - // absoluteRID = rw.et[0].first; - // } - // else - // { - // more = ridList->next(it, &e); - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // } - // absoluteRID = e.first; - // rw.count = 1; - // } - // } - // else - // { - // if (strFifo) - // { - // more = strFifo->next(it, &strRw); - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // } - // absoluteRID = strRw.et[0].first; - // } - // else - // { - // more = strRidList->next(it, &strE); - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // } - // absoluteRID = strE.first; - // strRw.count = 1; - // } - // } - // - // if (more) - // msgLBID = getLBID(absoluteRID, scan); - // scanThisBlock = scan; - // msgLargeBlock = absoluteRID >> blockSizeShift; - // - // while (more || msgRidCount > 0) { - // uint64_t rwCount; - // if ( ridList) - // rwCount = rw.count; - // else - // rwCount = strRw.count; - // - // for (uint64_t i = 0; ((i < rwCount) || (!more && msgRidCount > 0)); ) - // { - // if ( ridList) - // { - // if (fifo) - // absoluteRID = rw.et[i].first; - // else - // absoluteRID = e.first; - // } - // else - // { - // if (strFifo) - // absoluteRID = strRw.et[i].first; - // else - // absoluteRID = strE.first; - // } - // - // if (more) { - // nextLBID = getLBID(absoluteRID, scan); - // nextLargeBlock = absoluteRID >> blockSizeShift; - // } - // - // //XXXPAT: need to prove N & S here - // if (nextLBID == msgLBID && more) { - //// blockRelativeRID = absoluteRID % ridsPerBlock; - // blockRelativeRID = absoluteRID & rpbMask; - // msgRidList << blockRelativeRID; - // msgRidCount++; - // ++i; - // } - // else { - // //Bug 831: move building msg after the check of scanThisBlock - // if (scanThisBlock==true) - // { - // hdr.ism.Interleave=0; - // hdr.ism.Flags=planFlagsToPrimFlags(fTraceFlags); - // hdr.ism.Command=COL_BY_SCAN; - // hdr.ism.Size=sizeof(NewColRequestHeader) + fFilterString.length() + - // msgRidList.length(); - // hdr.ism.Type=2; - // - // hdr.hdr.SessionID = fSessionId; - // //hdr.hdr.StatementID = 0; - // hdr.hdr.TransactionID = fTxnId; - // hdr.hdr.VerID = fVerId; - // hdr.hdr.StepID = fStepId; - // hdr.hdr.UniqueID = uniqueID; - // - // hdr.LBID = msgLBID; - //// idbassert(hdr.LBID >= 0); - // hdr.DataSize = fColType.colWidth; - // hdr.DataType = fColType.colDataType; - // hdr.CompType = fColType.compressionType; - // hdr.OutputType = fOutputType; - // hdr.BOP = fBOP; - // hdr.NOPS = fFilterCount; - // hdr.NVALS = msgRidCount; - // hdr.sort = fToSort; - // - // primMsg.append((const uint8_t *) &hdr, sizeof(NewColRequestHeader)); - // primMsg += fFilterString; - // primMsg += msgRidList; - // ridCount += msgRidCount; - // ++sentBlockCount; - // - //#ifdef DEBUG - // if (flushInterval == 0 && fOid >= 3000) - // cout << "sending a prim msg for LBID " << msgLBID << endl; - //#endif - // ++msgCount; - //// cout << "made a primitive\n"; - // if (msgLargeBlock != nextLargeBlock || !more) { - //// cout << "writing " << msgCount << " primitives\n"; - // fMsgBytesOut += primMsg.lengthWithHdrOverhead(); - // fDec->write(primMsg); - // msgsSent += msgCount; - // msgCount = 0; - // primMsg.restart(); - // msgLargeBlock = nextLargeBlock; - // - // // @bug 769 - Added "&& !fSwallowRows" condition below to fix problem - //with - // // caltraceon(16) not working for tpch01 and some other queries. If a - //query - // // ever held off requesting more blocks, it would lock and never finish. - // //Bug 815 - // if (( sentBlockCount >= fProjectBlockReqLimit) && !fSwallowRows - //&& - // (( msgsSent - msgsRecvd) > fProjectBlockReqThreshold)) - // { - // mutex.lock(); //pthread_mutex_lock(&mutex); - // fStopSending = true; - // - // // @bug 836. Wake up the receiver if he's sleeping. - // if (recvWaiting) - // condvar.notify_one(); - ////pthread_cond_signal(&condvar); flushed.wait(mutex); //pthread_cond_wait(&flushed, &mutex); fStopSending - //= false; mutex.unlock(); //pthread_mutex_unlock(&mutex); sentBlockCount = 0; - // } - // } - // } - // else - // { - // msgsSkip++; - // } - // msgLBID = nextLBID; - // msgRidList.restart(); - // msgRidCount = 0; - // - // mutex.lock(); //pthread_mutex_lock(&mutex); - // - // if (scanThisBlock) { - // if (recvWaiting) - // condvar.notify_one(); //pthread_cond_signal(&condvar); - // #ifdef DEBUG - //// cout << "msgsSent++ = " << msgsSent << endl; - // #endif - // } - // scanThisBlock = scan; - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // // break the for loop - // if (!more) - // break; - // } - // } // for rw.count - // - // if (more) - // { - // if ( ridList ) - // { - // if (fifo) - // { - // rw.count = 0; - // more = fifo->next(it, &rw); - // } - // else - // { - // rw.count = 1; - // more = ridList->next(it, &e); - // } - // } - // else - // { - // if (strFifo) - // { - // strRw.count = 0; - // more = strFifo->next(it, &strRw); - // } - // else - // { - // strRw.count = 1; - // more = strRidList->next(it, &strE); - // } - // } - // } - // } - // - // if (fOid>=3000) dlTimes.setLastReadTime(); - // - // done: - // mutex.lock(); //pthread_mutex_lock(&mutex); - // finishedSending = true; - // if (recvWaiting) - // condvar.notify_one(); //pthread_cond_signal(&condvar); - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - //#ifdef DEBUG - // if (fOid >=3000) - // cout << "pColStep msgSent " - // << msgsSent << "/" << msgsSkip - // << " rids " << ridCount - // << " oid " << fOid << " " << msgLBID << endl; - //#endif - // //...Track the number of LBIDs we skip due to Casual Partioning. - // fNumBlksSkipped += msgsSkip; -} - -void pColStep::receivePrimitiveMessages() -{ - // int64_t ridResults = 0; - // AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); - // DataList_t* dlp = dl->dataList(); - // uint64_t fbo; - // FifoDataList *fifo = dl->fifoDL(); - // UintRowGroup rw; - // uint64_t ridBase; - // boost::shared_ptr bs; - // uint32_t i = 0, length; - // - // while (1) { - // // sync with the send side - // mutex.lock(); //pthread_mutex_lock(&mutex); - // while (!finishedSending && msgsSent == msgsRecvd) { - // recvWaiting = true; - // #ifdef DEBUG - // cout << "c sleeping" << endl; - // #endif - // // @bug 836. Wake up the sender if he's sleeping. - // if (fStopSending) - // flushed.notify_one(); //pthread_cond_signal(&flushed); - // condvar.wait(mutex); //pthread_cond_wait(&condvar, &mutex); - // #ifdef DEBUG - // cout << "c waking" << endl; - // #endif - // recvWaiting = false; - // } - // if (msgsSent == msgsRecvd) { - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // break; - // } - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // // do the recv - // fDec->read(uniqueID, bs); - // fMsgBytesIn += bs->lengthWithHdrOverhead(); - // - // // no more messages, and buffered messages should be already processed by now. - // if (bs->length() == 0) break; - // - // #ifdef DEBUG - // cout << "msgsRecvd++ = " << msgsRecvd << ". RidResults = " << ridResults << endl; - // cout << "Got a ColResultHeader!: " << bs.length() << " bytes" << endl; - // #endif - // - // const ByteStream::byte* bsp = bs->buf(); - // - // // get the ISMPacketHeader out of the bytestream - // //const ISMPacketHeader* ism = reinterpret_cast(bsp); - // - // // get the ColumnResultHeader out of the bytestream - // const ColResultHeader* crh = reinterpret_cast - // (&bsp[sizeof(ISMPacketHeader)]); - // - // bool firstRead = true; - // length = bs->length(); - // - // i = 0; - // uint32_t msgCount = 0; - // while (i < length) { - // ++msgCount; - // - // i += sizeof(ISMPacketHeader); - // crh = reinterpret_cast(&bsp[i]); - // // double check the sequence number is increased by one each time - // i += sizeof(ColResultHeader); - // - // fCacheIO += crh->CacheIO; - // fPhysicalIO += crh->PhysicalIO; - // - // // From this point on the rest of the bytestream is the data that comes back from the - //primitive server - // // This needs to be fed to a datalist that is retrieved from the outputassociation - //object. - // - // fbo = getFBO(crh->LBID); - // ridBase = fbo << rpbShift; - // - // #ifdef DEBUG - //// cout << " NVALS = " << crh->NVALS << " fbo = " << fbo << " lbid = " << crh->LBID << - ///endl; - // #endif - // - // //Check output type - // if ( fOutputType == OT_RID ) - // { - // ridResults += crh->NVALS; - // } - // - // /* XXXPAT: This clause is executed when ExeMgr calls the new nextBand(BS) fcn. - // - // TODO: both classes have to agree - // on which nextBand() variant will be called. pColStep - // currently has to infer that from flushInterval and the - // Table OID. It would be better to have a more explicit form - // of agreement. - // - // The goal of the nextBand(BS) fcn is to avoid iterating over - // every row except at unserialization. This clause copies - // the raw results from the PrimProc response directly into - // the memory used for the ElementType array. DeliveryStep - // will also treat the ElementType array as raw memory and - // serialize that. TableColumn now parses the packed data - // instead of whole ElementTypes. - // */ - // else if (fOutputType == OT_TOKEN && fFlushInterval > 0 && !fIsDict) { - // - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // ridResults += crh->NVALS; - // - // /* memcpy the bytestream into the output set */ - // uint32_t toCopy, bsPos = 0; - // uint8_t *pos; - // while (bsPos < crh->NVALS) { - // toCopy = (crh->NVALS - bsPos > rw.ElementsPerGroup - rw.count ? - // rw.ElementsPerGroup - rw.count : crh->NVALS - bsPos); - // pos = ((uint8_t *) &rw.et[0]) + (rw.count * fColType.colWidth); - // memcpy(pos, &bsp[i], toCopy * fColType.colWidth); - // bsPos += toCopy; - // i += toCopy * fColType.colWidth; - // rw.count += toCopy; - // if (rw.count == rw.ElementsPerGroup) { - // if (!fSwallowRows) - // fifo->insert(rw); - // rw.count = 0; - // } - // } - // } - // else if ( fOutputType == OT_TOKEN) - // { - // uint64_t dv; - // uint64_t rid; - // - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // ridResults += crh->NVALS; - // for(int j = 0; j < crh->NVALS; ++j) - // { - // // XXXPAT: Only use this when the RID doesn't matter or when - // // the response contains every row. - // - // rid = j + ridBase; - // switch (fColType.colWidth) { - // case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; break; - // case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break; - // case 2: dv = *((const uint16_t *) &bsp[i]); i += 2; break; - // case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break; - // default: - // throw runtime_error("pColStep: invalid column - //width!"); - // } - // - // // @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is - //on. - // // This options swallows rows in the project steps. - // if (!fSwallowRows) - // { - // if (fifo) - // { - // rw.et[rw.count].first = rid; - // rw.et[rw.count++].second = dv; - // if (rw.count == rw.ElementsPerGroup) - // { - // fifo->insert(rw); - // rw.count = 0; - // } - // } - // else - // { - // dlp->insert(ElementType(rid, dv)); - // } - // #ifdef DEBUG - // //cout << " -- inserting <" << rid << ", " << dv << "> " << *prid << - //endl; #endif - // } - // } - // } - // else if ( fOutputType == OT_BOTH ) - // { - // ridResults += crh->NVALS; - // for(int j = 0; j < crh->NVALS; ++j) - // { - // uint64_t dv; - // uint64_t rid; - // - // rid = *((const uint16_t *) &bsp[i]) + ridBase; - // i += sizeof(uint16_t); - // switch (fColType.colWidth) { - // case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; - //break; case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break; case 2: dv = *((const uint16_t *) - //&bsp[i]); i += 2; break; case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break; default: throw - //runtime_error("pColStep: invalid column width!"); - // } - // - // // @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is - //on. - // // This options swallows rows in the project steps. - // if (!fSwallowRows) { - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // if(fifo) - // { - //// rw.et[rw.count++] = ElementType(rid, dv); - // rw.et[rw.count].first = rid; - // rw.et[rw.count++].second = dv; - // if (rw.count == rw.ElementsPerGroup) - // { - // fifo->insert(rw); - // rw.count = 0; - // } - // } - // else - // { - // dlp->insert(ElementType(rid, dv)); - // } - // #ifdef DEBUG - // //cout << " -- inserting <" << rid << ", " << dv << "> " << *prid << - //endl; #endif - // } - // } - // } - // } // unpacking the BS - // - // //Bug 815: Check whether we have enough to process - // //++lockCount; - // mutex.lock(); //pthread_mutex_lock(&mutex); - // if ( fStopSending && ((msgsSent - msgsRecvd ) <= fProjectBlockReqThreshold) ) - // { - // flushed.notify_one(); //pthread_cond_signal(&flushed); - // } - // mutex.unlock(); //pthread_mutex_unlock(&mutex); - // - // firstRead = false; - // msgsRecvd += msgCount; - // } // read loop - // // done reading - // - // if (fifo && rw.count > 0) - // fifo->insert(rw); - // - // //...Casual partitioning could cause us to do no processing. In that - // //...case these time stamps did not get set. So we set them here. - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) { - // dlTimes.setFirstReadTime(); - // dlTimes.setLastReadTime(); - // dlTimes.setFirstInsertTime(); - // } - // if (fOid>=3000) dlTimes.setEndOfInputTime(); - // - // //@bug 699: Reset StepMsgQueue - // fDec->removeQueue(uniqueID); - // - // if (fifo) - // fifo->endOfInput(); - // else - // dlp->endOfInput(); - // - // if (fTableOid >= 3000) - // { - // //...Construct timestamp using ctime_r() instead of ctime() not - // //...necessarily due to re-entrancy, but because we want to strip - // //...the newline ('\n') off the end of the formatted string. - // time_t t = time(0); - // char timeString[50]; - // ctime_r(&t, timeString); - // timeString[strlen(timeString)-1 ] = '\0'; - // - // FifoDataList* pFifo = 0; - // uint64_t totalBlockedReadCount = 0; - // uint64_t totalBlockedWriteCount = 0; - // - // //...Sum up the blocked FIFO reads for all input associations - // size_t inDlCnt = fInputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedReadCount += pFifo->blockedReadCount(); - // } - // } - // - // //...Sum up the blocked FIFO writes for all output associations - // size_t outDlCnt = fOutputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedWriteCount += pFifo->blockedWriteCount(); - // } - // } - // - // //...Roundoff msg byte counts to nearest KB for display - // uint64_t msgBytesInKB = fMsgBytesIn >> 10; - // uint64_t msgBytesOutKB = fMsgBytesOut >> 10; - // if (fMsgBytesIn & 512) - // msgBytesInKB++; - // if (fMsgBytesOut & 512) - // msgBytesOutKB++; - // - // // @bug 828 - // if (fifo) - // fifo->totalSize(ridResults); - // - // if (traceOn()) - // { - // //...Print job step completion information - // ostringstream logStr; - // logStr << "ses:" << fSessionId << - // " st: " << fStepId << " finished at " << - // timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" << - // fCacheIO << "; MsgsRvcd-" << msgsRecvd << - // "; BlockedFifoIn/Out-" << totalBlockedReadCount << - // "/" << totalBlockedWriteCount << - // "; output size-" << ridResults << endl << - // "\tPartitionBlocksEliminated-" << fNumBlksSkipped << - // "; MsgBytesIn-" << msgBytesInKB << "KB" << - // "; MsgBytesOut-" << msgBytesOutKB << "KB" << endl << - // "\t1st read " << dlTimes.FirstReadTimeString() << - // "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << - // JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(),dlTimes.FirstReadTime()) << - // "s" << endl; - // - // logEnd(logStr.str().c_str()); - // - // syslogReadBlockCounts(16, // exemgr sybsystem - // fPhysicalIO, // # blocks read from disk - // fCacheIO, // # blocks read from cache - // fNumBlksSkipped); // # casual partition block hits - // syslogProcessingTimes(16, // exemgr subsystem - // dlTimes.FirstReadTime(), // first datalist read - // dlTimes.LastReadTime(), // last datalist read - // dlTimes.FirstInsertTime(), // first datalist write - // dlTimes.EndOfInputTime()); // last (endOfInput) datalist write - // syslogEndStep(16, // exemgr subsystem - // totalBlockedReadCount, // blocked datalist input - // totalBlockedWriteCount, // blocked datalist output - // fMsgBytesIn, // incoming msg byte count - // fMsgBytesOut); // outgoing msg byte count - // } - // } -} - const string pColStep::toString() const { ostringstream oss; diff --git a/dbcon/joblist/pdictionary.cpp b/dbcon/joblist/pdictionary.cpp index 71a941713..7372a15ed 100644 --- a/dbcon/joblist/pdictionary.cpp +++ b/dbcon/joblist/pdictionary.cpp @@ -105,64 +105,11 @@ pDictionaryStep::pDictionaryStep(CalpontSystemCatalog::OID o, CalpontSystemCatal , fFilterCount(0) , requestList(0) , fInterval(jobInfo.flushInterval) - , fPhysicalIO(0) - , fCacheIO(0) , fMsgBytesIn(0) , fMsgBytesOut(0) , fRm(jobInfo.rm) , hasEqualityFilter(false) { - // uniqueID = UniqueNumberGenerator::instance()->getUnique32(); - - // fColType.compressionType = fColType.ddn.compressionType = ct; -} - -pDictionaryStep::~pDictionaryStep() -{ - // if (fDec) - // fDec->removeQueue(uniqueID); -} - -void pDictionaryStep::startPrimitiveThread() -{ - // pThread.reset(new boost::thread(pDictionaryStepPrimitive(this))); -} - -void pDictionaryStep::startAggregationThread() -{ - // cThread.reset(new boost::thread(pDictStepAggregator(this))); -} - -void pDictionaryStep::run() -{ - // if (traceOn()) - // { - // syslogStartStep(16, // exemgr subsystem - // std::string("pDictionaryStep")); // step name - // } - // - // const AnyDataListSPtr& dl = fInputJobStepAssociation.outAt(0); - // DataList_t* dlp = dl->dataList(); - // setInputList(dlp); - // - // startPrimitiveThread(); - // startAggregationThread(); -} - -void pDictionaryStep::join() -{ - // pThread->join(); - // cThread->join(); -} - -void pDictionaryStep::setInputList(DataList_t* dl) -{ - requestList = dl; -} - -void pDictionaryStep::setBOP(int8_t b) -{ - fBOP = b; } void pDictionaryStep::addFilter(int8_t COP, const string& value) @@ -190,322 +137,6 @@ void pDictionaryStep::addFilter(int8_t COP, const string& value) } } -void pDictionaryStep::sendPrimitiveMessages() -{ - // int it = -1; - // int msgRidCount = 0; - // bool more; - // int64_t sigToken, msgLBID, nextLBID = -1; - // uint16_t sigOrd; - // ByteStream msgRidList, primMsg(65536); //the MAX_BUFFER_SIZE as of 8/20 - // DictSignatureRequestHeader hdr; - // ISMPacketHeader ism; - // OldGetSigParams pt; - // FifoDataList* fifo = fInputJobStepAssociation.outAt(0)->fifoDL(); - // UintRowGroup rw; - // - ///* XXXPAT: Does this primitive need to care about the HWM as a sanity check, given - // that a ridlist is supplied? */ - // - // if (fifo == 0) - // throw logic_error("Use p_colscanrange instead here"); - // - // try{ - // it = fifo->getIterator(); - // }catch(exception& ex) { - // cerr << "pDictionaryStep::sendPrimitiveMessages: caught exception: " << ex.what() << endl; - // }catch(...) { - // cerr << "pDictionaryStep::sendPrimitiveMessages: caught exception" << endl; - // } - // - // more = fifo->next(it, &rw); - // - // sigToken = rw.et[0].second; - // msgLBID = sigToken >> 10; - // while (more || msgRidCount > 0) { - // for (uint64_t i = 0; ((i < rw.count) || (!more && msgRidCount > 0)); ) - // { - // if (more) - // { - // ridCount++; - // sigToken = rw.et[i].second; - // nextLBID = sigToken >> 10; - //#ifdef DEBUG - // cout << "sigToken = " << sigToken << " lbid = " << nextLBID << endl; - //#endif - // } - // - // // @bug 472 - // if (nextLBID == msgLBID && more && msgRidCount < 8000) { //XXXPAT: need to prove N & S - //here sigOrd = sigToken & 0x3ff; pt.rid = (nextLBID >= 0 ? rw.et[i].first : 0x8000000000000000LL | - //rw.et[i].first); pt.offsetIndex = sigOrd; msgRidList.append(reinterpret_cast(&pt), - //sizeof(pt)); msgRidCount++; - // ++i; - //#ifdef DEBUG - // cout << "added signature ordinal " << sigOrd << endl; - //#endif - // } - // else { - //#ifdef DEBUG - // cout << "sending a prim msg" << endl; - //#endif - // - // // send the primitive, start constructing the next msg - // ism.Interleave=0; - // ism.Flags=planFlagsToPrimFlags(fTraceFlags); - // ism.Command=DICT_SIGNATURE; - // ism.Size=sizeof(DictSignatureRequestHeader) + msgRidList.length(); - // ism.Type=2; - // - // hdr.Hdr.SessionID = fSessionId; - // //hdr.Hdr.StatementID = 0; - // hdr.Hdr.TransactionID = fTxnId; - // hdr.Hdr.VerID = fVerId; - // hdr.Hdr.StepID = fStepId; - // hdr.Hdr.UniqueID = uniqueID; - // - // hdr.LBID = msgLBID; - // idbassert(msgRidCount <= 8000); - // hdr.NVALS = msgRidCount; - // hdr.CompType = fColType.ddn.compressionType; - // - // primMsg.load((const uint8_t *) &ism, sizeof(ism)); - // primMsg.append((const uint8_t *) &hdr, sizeof(DictSignatureRequestHeader)); - // primMsg += msgRidList; - // fMsgBytesOut += primMsg.lengthWithHdrOverhead(); - // fDec->write(primMsg); - // - // msgLBID = nextLBID; - // primMsg.restart(); - // msgRidList.restart(); - // msgRidCount = 0; - // - // mutex.lock(); - // msgsSent++; - // if (recvWaiting) - // condvar.notify_one(); - //#ifdef DEBUG - // cout << "msgsSent++ = " << msgsSent << endl; - //#endif - // mutex.unlock(); - // - // if (!more) - // break; - // } - // } // rw.count - // - // if (more) - // { - // rw.count = 0; - // more = fifo->next(it, &rw); - // } - // } - // - // mutex.lock(); - // finishedSending = true; - // if (recvWaiting) - // condvar.notify_one(); - // mutex.unlock(); -} - -void pDictionaryStep::receivePrimitiveMessages() -{ - // int64_t ridResults = 0; - // AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); - // StrDataList* dlp = dl->stringDataList(); - // StringFifoDataList *fifo = fOutputJobStepAssociation.outAt(0)->stringDL(); - // StringRowGroup rw; - // - // while (1) { - // - // // sync with the send side - // mutex.lock(); - // - // while (!finishedSending && msgsSent==msgsRecvd) { - // recvWaiting = true; - // condvar.wait(mutex); - // if (msgsSent == msgsRecvd) { - // mutex.unlock(); - // break; - // } - // recvWaiting = false; - // } - // - // if (finishedSending != 0 && msgsRecvd >= msgsSent) { - // goto junk; - // } - // mutex.unlock(); - // - // // do the recv - // - // ByteStream bs = fDec->read(uniqueID); - // fMsgBytesIn += bs.lengthWithHdrOverhead(); - // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) - // dlTimes.setFirstReadTime(); - // if (fOid>=3000) dlTimes.setLastReadTime(); - // - // msgsRecvd++; - // if (bs.length() == 0) - // break; - // - // const ByteStream::byte* bsp = bs.buf(); - // - // // get the ResultHeader out of the bytestream - // const DictOutput* drh = reinterpret_cast(bsp); - // - // bsp += sizeof(DictOutput); - // - // fCacheIO += drh->CacheIO; - // fPhysicalIO += drh->PhysicalIO; - // - // // From this point on the rest of the bytestream is the data that comes back from the primitive - //server - // // This needs to be fed to a datalist that is retrieved from the outputassociation object. - // - // char d[8192]; - //// memset(d, 0, 8192); - // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0) - // dlTimes.setFirstInsertTime(); - // for(int j = 0; j < drh->NVALS; j++) - // { - // const uint64_t* ridp = (const uint64_t*)bsp; - // bsp += sizeof(*ridp); - // uint64_t rid = *ridp; - // const uint16_t* lenp = (const uint16_t*)bsp; - // bsp += sizeof(*lenp); - // uint16_t len = *lenp; - // memcpy(d, bsp, len); - // bsp += len; - // d[len] = 0; - // if (rid == 0xFFFFFFFFFFFFFFFFULL) - // { - // strcpy(d, CPNULLSTRMARK.c_str()); - // } - //#ifdef FIFO_SINK - // if (fOid < 3000) - //#endif - // if (fifo) - // { - // rw.et[rw.count++] = StringElementType(rid, d); - // if (rw.count == rw.ElementsPerGroup) - // { - // fifo->insert(rw); - // rw.count = 0; - // } - // } - // else - // { - // dlp->insert(StringElementType(rid, d)); - // } - // - //#ifdef DEBUG - // cout << " -- inserting <" << rid << ", " << d << ">" << endl; - //#endif - // ridResults++; - // - // } - // } - // - // junk: - // - // if (fifo && rw.count > 0) - // fifo->insert(rw); - // - // //@bug 699: Reset StepMsgQueue - // fDec->removeQueue(uniqueID); - // - // if (fOid>=3000) dlTimes.setEndOfInputTime(); - // dlp->endOfInput(); - // - // if (fTableOid >= 3000) - // { - // //...Construct timestamp using ctime_r() instead of ctime() not - // //...necessarily due to re-entrancy, but because we want to strip - // //...the newline ('\n') off the end of the formatted string. - // time_t t = time(0); - // char timeString[50]; - // ctime_r(&t, timeString); - // timeString[strlen(timeString)-1 ] = '\0'; - // - // FifoDataList* pFifo = 0; - // uint64_t totalBlockedReadCount = 0; - // uint64_t totalBlockedWriteCount = 0; - // - // //...Sum up the blocked FIFO reads for all input associations - // size_t inDlCnt = fInputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedReadCount += pFifo->blockedReadCount(); - // } - // } - // - // //...Sum up the blocked FIFO writes for all output associations - // size_t outDlCnt = fOutputJobStepAssociation.outSize(); - // for (size_t iDataList=0; iDataListfifoDL(); - // if (pFifo) - // { - // totalBlockedWriteCount += pFifo->blockedWriteCount(); - // } - // } - // - // - // - // //...Roundoff msg byte counts to nearest KB for display - // uint64_t msgBytesInKB = fMsgBytesIn >> 10; - // uint64_t msgBytesOutKB = fMsgBytesOut >> 10; - // if (fMsgBytesIn & 512) - // msgBytesInKB++; - // if (fMsgBytesOut & 512) - // msgBytesOutKB++; - // - // // @bug 807 - // if (fifo) - // fifo->totalSize(ridResults); - // - // if (traceOn()) - // { - // //...Print job step completion information - // ostringstream logStr; - // logStr << "ses:" << fSessionId << " st: " << fStepId << - // " finished at " << - // timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" << - // fCacheIO << "; MsgsRcvd-" << msgsRecvd << - // "; BlockedFifoIn/Out-" << totalBlockedReadCount << - // "/" << totalBlockedWriteCount << - // "; output size-" << ridResults << endl << - // "\tMsgBytesIn-" << msgBytesInKB << "KB" << - // "; MsgBytesOut-" << msgBytesOutKB << "KB" << endl << - // "\t1st read " << dlTimes.FirstReadTimeString() << - // "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << - // JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(),dlTimes.FirstReadTime()) << - // "s" << endl; - // - // logEnd(logStr.str().c_str()); - // - // syslogReadBlockCounts(16, // exemgr subsystem - // fPhysicalIO, // # blocks read from disk - // fCacheIO, // # blocks read from cache - // 0); // # casual partition block hits - // syslogProcessingTimes(16, // exemgr subsystem - // dlTimes.FirstReadTime(), // first datalist read - // dlTimes.LastReadTime(), // last datalist read - // dlTimes.FirstInsertTime(), // first datalist write - // dlTimes.EndOfInputTime()); // last (endOfInput) datalist write - // syslogEndStep(16, // exemgr subsystem - // totalBlockedReadCount, // blocked datalist input - // totalBlockedWriteCount, // blocked datalist output - // fMsgBytesIn, // incoming msg byte count - // fMsgBytesOut); // outgoing msg byte count - // } - // } - // -} - const string pDictionaryStep::toString() const { ostringstream oss; @@ -546,9 +177,6 @@ void pDictionaryStep::appendFilter(const messageqcpp::ByteStream& filter, unsign addFilter(COP, value); bs.advance(size); } - - // fFilterString += filter; - // fFilterCount += count; } void pDictionaryStep::addFilter(const Filter* f) diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index 94f70ba2e..164ac157c 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -138,10 +138,8 @@ pDictionaryScan::pDictionaryScan(CalpontSystemCatalog::OID o, CalpontSystemCatal , fColType(ct) , pThread(0) , cThread(0) - , fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()) , fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()) , fStopSending(false) - , fSingleThread(false) , fPhysicalIO(0) , fCacheIO(0) , fMsgBytesIn(0) @@ -915,4 +913,19 @@ void pDictionaryScan::abort() fDec->shutdownQueue(uniqueID); } +// Unfortuneately we have 32 bits in the execplan flags, but only 16 that can be sent to +// PrimProc, so we have to convert them (throwing some away). +uint16_t pDictionaryScan::planFlagsToPrimFlags(uint32_t planFlags) +{ + uint16_t flags = 0; + + if (planFlags & CalpontSelectExecutionPlan::TRACE_LBIDS) + flags |= PF_LBID_TRACE; + + if (planFlags & CalpontSelectExecutionPlan::PM_PROFILE) + flags |= PF_PM_PROF; + + return flags; +} + } // namespace joblist diff --git a/dbcon/joblist/primitivemsg.cpp b/dbcon/joblist/primitivemsg.cpp deleted file mode 100644 index 47e7b8e2f..000000000 --- a/dbcon/joblist/primitivemsg.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -/* - * $Id: primitivemsg.cpp 9210 2013-01-21 14:10:42Z rdempsey $ - */ - -#include -using namespace std; - -#include "calpontselectexecutionplan.h" -using namespace execplan; - -#include "primitivemsg.h" -#include "primitivestep.h" -using namespace joblist; - -namespace joblist -{ -void PrimitiveMsg::send() -{ - throw logic_error("somehow ended up in PrimitiveMsg::send()!"); -} - -void PrimitiveMsg::buildPrimitiveMessage(ISMPACKETCOMMAND, void*, void*) -{ - throw logic_error("somehow ended up in PrimitiveMsg::buildPrimitiveMessage()!"); -} - -void PrimitiveMsg::receive() -{ - throw logic_error("somehow ended up in PrimitiveMsg::receive()!"); -} - -void PrimitiveMsg::sendPrimitiveMessages() -{ - throw logic_error("somehow ended up in PrimitiveMsg::sendPrimitiveMessages()!"); -} - -void PrimitiveMsg::receivePrimitiveMessages() -{ - throw logic_error("somehow ended up in PrimitiveMsg::receivePrimitiveMessages()!"); -} - -// Unfortuneately we have 32 bits in the execplan flags, but only 16 that can be sent to -// PrimProc, so we have to convert them (throwing some away). -uint16_t PrimitiveMsg::planFlagsToPrimFlags(uint32_t planFlags) -{ - uint16_t flags = 0; - - if (planFlags & CalpontSelectExecutionPlan::TRACE_LBIDS) - flags |= PF_LBID_TRACE; - - if (planFlags & CalpontSelectExecutionPlan::PM_PROFILE) - flags |= PF_PM_PROF; - - return flags; -} - -} // namespace joblist diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index e68837c6c..ec78b893f 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -93,39 +93,9 @@ enum PrimitiveStepType AGGRFILTERSTEP }; -/** @brief class PrimitiveMsg - * - */ -class PrimitiveMsg -{ - public: - /** @brief virtual void Send method - */ - virtual void send(); - /** @brief virtual void Receive method - */ - virtual void receive(); - /** @brief virtual void BuildPrimitiveMessage method - */ - virtual void buildPrimitiveMessage(ISMPACKETCOMMAND cmd, void* filterValues, void* ridArray); - virtual void sendPrimitiveMessages(); - virtual void receivePrimitiveMessages(); - - PrimitiveMsg() - { - } - - virtual ~PrimitiveMsg() - { - } - - uint16_t planFlagsToPrimFlags(uint32_t planFlags); - - private: -}; class pColScanStep; -class pColStep : public JobStep, public PrimitiveMsg +class pColStep : public JobStep { typedef std::pair element_t; @@ -141,48 +111,30 @@ class pColStep : public JobStep, public PrimitiveMsg pColStep(const PassThruStep& rhs); - virtual ~pColStep(); + virtual ~pColStep(){}; /** @brief Starts processing. Set at least the RID list before calling. * * Starts processing. Set at least the RID list before calling this. */ - virtual void run(); + virtual void run(){}; /** @brief Sync's the caller with the end of execution. * * Does nothing. Returns when this instance is finished. */ - virtual void join(); + virtual void join(){}; virtual const std::string toString() const; virtual bool isDictCol() const { return fIsDict; - }; + } bool isExeMgr() const { return isEM; } - /** @brief Set config parameters for this JobStep. - * - * Set the config parameters this JobStep. - */ - void initializeConfigParms(); - - /** @brief The main loop for the send-side thread - * - * The main loop for the primitive-issuing thread. Don't call it directly. - */ - void sendPrimitiveMessages(); - - /** @brief The main loop for the recv-side thread - * - * The main loop for the receive-side thread. Don't call it directly. - */ - void receivePrimitiveMessages(); - /** @brief Add a filter. Use this interface when the column stores anything but 4-byte floats. * * Add a filter. Use this interface when the column stores anything but 4-byte floats. @@ -197,35 +149,38 @@ class pColStep : public JobStep, public PrimitiveMsg * this class from pColScan. Use pColScan if the every RID should be considered; it's * faster at that. */ - void setRidList(DataList* rids); - + void setRidList(DataList* rids) + { + ridList = rids; + } /** @brief Sets the String DataList to get RID values from. * * Sets the string DataList to get RID values from. Filtering by RID distinguishes * this class from pColScan. Use pColScan if the every RID should be considered; it's * faster at that. */ - void setStrRidList(DataList* strDl); - + void setStrRidList(DataList* strDl) + { + strRidList = strDl; + } /** @brief Set the binary operator for the filter predicate (BOP_AND or BOP_OR). * * Set the binary operator for the filter predicate (BOP_AND or BOP_OR). */ - void setBOP(int8_t BOP); - - /** @brief Set the output type. - * - * Set the output type (1 = RID, 2 = Token, 3 = Both). - */ - void setOutputType(int8_t OutputType); + void setBOP(int8_t BOP) + { + fBOP = BOP; + } /** @brief Set the swallowRows flag. * * * If true, no rows will be inserted to the output datalists. */ - void setSwallowRows(const bool swallowRows); - + void setSwallowRows(const bool swallowRows) + { + fSwallowRows = swallowRows; + } /** @brief Get the swallowRows flag. * * @@ -263,10 +218,6 @@ class pColStep : public JobStep, public PrimitiveMsg return fColType; } void appendFilter(const messageqcpp::ByteStream& filter, unsigned count); - uint32_t flushInterval() const - { - return fFlushInterval; - } bool getFeederFlag() const { return isFilterFeeder; @@ -276,14 +227,6 @@ class pColStep : public JobStep, public PrimitiveMsg { isFilterFeeder = filterFeeder; } - virtual uint64_t phyIOCount() const - { - return fPhysicalIO; - } - virtual uint64_t cacheIOCount() const - { - return fCacheIO; - } virtual uint64_t msgsRcvdCount() const { return msgsRecvd; @@ -329,14 +272,6 @@ class pColStep : public JobStep, public PrimitiveMsg */ explicit pColStep(); - /** @brief StartPrimitiveThread - * Utility function to start worker thread that sends primitive messages - */ - void startPrimitiveThread(); - /** @brief StartAggregationThread - * Utility function to start worker thread that receives result aggregation from primitive servers - */ - void startAggregationThread(); uint64_t getLBID(uint64_t rid, bool& scan); uint64_t getFBO(uint64_t lbid); @@ -347,7 +282,6 @@ class pColStep : public JobStep, public PrimitiveMsg execplan::CalpontSystemCatalog::ColType fColType; uint32_t fFilterCount; int8_t fBOP; - int8_t fOutputType; uint16_t realWidth; DataList_t* ridList; StrDataList* strRidList; @@ -359,29 +293,18 @@ class pColStep : public JobStep, public PrimitiveMsg bool finishedSending, recvWaiting, fIsDict; bool isEM; int64_t ridCount; - uint32_t fFlushInterval; // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // Running with this one will swallow rows at projection. bool fSwallowRows; - uint32_t fProjectBlockReqLimit; // max number of rids to send in a scan - // request to primproc - uint32_t fProjectBlockReqThreshold; // min level of rids backlog before - // consumer will tell producer to send - // more rids scan requests to primproc - volatile bool fStopSending; bool isFilterFeeder; - uint64_t fPhysicalIO; // total physical I/O count - uint64_t fCacheIO; // total cache I/O count uint64_t fNumBlksSkipped; // total number of block scans skipped due to CP uint64_t fMsgBytesIn; // total byte count for incoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages BRM::DBRM dbrm; - // boost::shared_ptr cThread; //consumer thread - // boost::shared_ptr pThread; //producer thread boost::mutex mutex; boost::condition condvar; boost::condition flushed; @@ -413,7 +336,7 @@ class pColStep : public JobStep, public PrimitiveMsg * c) send messages to the primitive server as quickly as possible */ -class pColScanStep : public JobStep, public PrimitiveMsg +class pColScanStep : public JobStep { public: /** @brief pColScanStep constructor @@ -422,38 +345,25 @@ class pColScanStep : public JobStep, public PrimitiveMsg const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo); pColScanStep(const pColStep& rhs); - ~pColScanStep(); + ~pColScanStep(){} /** @brief Starts processing. * * Starts processing. */ - virtual void run(); + virtual void run(){} /** @brief Sync's the caller with the end of execution. * * Does nothing. Returns when this instance is finished. */ - virtual void join(); + virtual void join(){} virtual bool isDictCol() const { return fIsDict; }; - /** @brief The main loop for the send-side thread - * - * The main loop for the primitive-issuing thread. Don't call it directly. - */ - void sendPrimitiveMessages(); - - /** @brief The main loop for the recv-side thread - * - * The main loop for the receive-side thread. Don't call it directly. - */ - using PrimitiveMsg::receivePrimitiveMessages; - void receivePrimitiveMessages(uint64_t i = 0); - /** @brief Add a filter when the column is a 4-byte float type * * Add a filter when the column is a 4-byte float type @@ -472,7 +382,11 @@ class pColScanStep : public JobStep, public PrimitiveMsg * Set the binary operator for the filter predicates (BOP_AND or BOP_OR). * It is initialized to OR. */ - void setBOP(int8_t BOP); // AND or OR + void setBOP(int8_t BOP) // AND or OR + { + fBOP = BOP; + } + int8_t BOP() const { return fBOP; @@ -496,17 +410,6 @@ class pColScanStep : public JobStep, public PrimitiveMsg return fFilterString; } - void setSingleThread(bool b); - bool getSingleThread() - { - return fSingleThread; - } - - /** @brief Set the output type. - * - * Set the output type (1 = RID, 2 = Token, 3 = Both).pColScan - */ - void setOutputType(int8_t OutputType); uint32_t filterCount() const { return fFilterCount; @@ -532,18 +435,6 @@ class pColScanStep : public JobStep, public PrimitiveMsg return fRm; } - virtual uint64_t phyIOCount() const - { - return fPhysicalIO; - } - virtual uint64_t cacheIOCount() const - { - return fCacheIO; - } - virtual uint64_t msgsRcvdCount() const - { - return recvCount; - } virtual uint64_t msgBytesIn() const { return fMsgBytesIn; @@ -599,18 +490,12 @@ class pColScanStep : public JobStep, public PrimitiveMsg // pColScanStep& operator=(const pColScanStep& rhs); typedef boost::shared_ptr SPTHD; - void startPrimitiveThread(); - void startAggregationThread(); - void initializeConfigParms(); - void sendAPrimitiveMessage(ISMPacketHeader& ism, BRM::LBID_t msgLbidStart, uint32_t msgLbidCount); uint64_t getFBO(uint64_t lbid); bool isEmptyVal(const uint8_t* val8) const; ResourceManager* fRm; ColByScanRangeRequestHeader fMsgHeader; SPTHD fConsumerThread; - /// number of threads on the receive side - uint32_t fNumThreads; SPTHD* fProducerThread; messageqcpp::ByteStream fFilterString; @@ -619,16 +504,10 @@ class pColScanStep : public JobStep, public PrimitiveMsg execplan::CalpontSystemCatalog::OID fTableOid; execplan::CalpontSystemCatalog::ColType fColType; int8_t fBOP; - int8_t fOutputType; - uint32_t sentCount; - uint32_t recvCount; BRM::LBIDRange_v lbidRanges; BRM::DBRM dbrm; SP_LBIDList lbidList; - boost::mutex mutex; - boost::mutex dlMutex; - boost::mutex cpMutex; boost::condition condvar; boost::condition condvarWakeupProducer; bool finishedSending, sendWaiting, rDoNothing, fIsDict; @@ -638,17 +517,7 @@ class pColScanStep : public JobStep, public PrimitiveMsg uint32_t extentSize, divShift, ridsPerBlock, rpbShift, numExtents; // config::Config *fConfig; - uint32_t fScanLbidReqLimit; // max number of LBIDs to send in a scan - // request to primproc - uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before - // consumer will tell producer to send - // more LBID scan requests to primproc - - bool fStopSending; - bool fSingleThread; bool isFilterFeeder; - uint64_t fPhysicalIO; // total physical I/O count - uint64_t fCacheIO; // total cache I/O count uint64_t fNumBlksSkipped; // total number of block scans skipped due to CP uint64_t fMsgBytesIn; // total byte count for incoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages @@ -667,35 +536,10 @@ class pColScanStep : public JobStep, public PrimitiveMsg friend class TupleBPS; }; -#if 0 -class pIdxStep : public JobStep -{ -public: - /** @brief pIdxStep constructor - * @param in the inputAssociation pointer - * @param out the outputAssociation pointer - * @param ec the DistributedEngineComm pointer - */ - pIdxStep(JobStepAssociation* in, JobStepAssociation* out, DistributedEngineComm* ec); - /** @brief virtual void Run method - */ - virtual void run(); -private: - pIdxStep(); - void startPrimitveThread(); - void startAggregationThread(); - -protected: - DistributedEngineComm* fDec; - JobStepAssociation* fInputJobStepAssociation; - JobStepAssociation* fOutputJobStepAssociation; -}; -#endif - /** @brief class pDictionaryStep * */ -class pDictionaryStep : public JobStep, public PrimitiveMsg +class pDictionaryStep : public JobStep { public: /** @brief pDictionaryStep constructor @@ -704,17 +548,22 @@ class pDictionaryStep : public JobStep, public PrimitiveMsg pDictionaryStep(execplan::CalpontSystemCatalog::OID oid, execplan::CalpontSystemCatalog::OID tabelOid, const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo); - virtual ~pDictionaryStep(); + virtual ~pDictionaryStep(){} /** @brief virtual void Run method */ - virtual void run(); - virtual void join(); + virtual void run(){} + virtual void join(){} // void setOutList(StringDataList* rids); - void setInputList(DataList_t* rids); - void setBOP(int8_t b); - void sendPrimitiveMessages(); - void receivePrimitiveMessages(); + void setInputList(DataList_t* rids) + { + requestList = rids; + } + + void setBOP(int8_t b) + { + fBOP = b; + } virtual const std::string toString() const; @@ -735,14 +584,6 @@ class pDictionaryStep : public JobStep, public PrimitiveMsg { return fTableOid; } - virtual uint64_t phyIOCount() const - { - return fPhysicalIO; - } - virtual uint64_t cacheIOCount() const - { - return fCacheIO; - } virtual uint64_t msgsRcvdCount() const { return msgsRecvd; @@ -780,8 +621,6 @@ class pDictionaryStep : public JobStep, public PrimitiveMsg private: pDictionaryStep(); - void startPrimitiveThread(); - void startAggregationThread(); boost::shared_ptr sysCat; execplan::CalpontSystemCatalog::OID fOid; @@ -804,8 +643,6 @@ class pDictionaryStep : public JobStep, public PrimitiveMsg boost::mutex mutex; boost::condition condvar; uint32_t fInterval; - uint64_t fPhysicalIO; // total physical I/O count - uint64_t fCacheIO; // total cache I/O count uint64_t fMsgBytesIn; // total byte count for incoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages uint32_t uniqueID; @@ -828,7 +665,7 @@ class pDictionaryStep : public JobStep, public PrimitiveMsg /** @brief class pDictionaryScan * */ -class pDictionaryScan : public JobStep, public PrimitiveMsg +class pDictionaryScan : public JobStep { public: /** @brief pDictionaryScan constructor @@ -953,6 +790,7 @@ class pDictionaryScan : public JobStep, public PrimitiveMsg private: pDictionaryScan(); + uint16_t planFlagsToPrimFlags(uint32_t planFlags); void startPrimitiveThread(); void startAggregationThread(); void initializeConfigParms(); @@ -990,12 +828,10 @@ class pDictionaryScan : public JobStep, public PrimitiveMsg uint64_t extentSize; uint64_t divShift; uint64_t numExtents; - uint32_t fScanLbidReqLimit; // max number of LBIDs to send in a scan // request to primproc uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before // consumer will tell producer to send bool fStopSending; - bool fSingleThread; uint64_t fPhysicalIO; // total physical I/O count uint64_t fCacheIO; // total cache I/O count uint64_t fMsgBytesIn; // total byte count for incoming messages @@ -1018,7 +854,7 @@ class pDictionaryScan : public JobStep, public PrimitiveMsg void destroyEqualityFilter(); }; -class BatchPrimitive : public JobStep, public PrimitiveMsg, public DECEventListener +class BatchPrimitive : public JobStep, public DECEventListener { public: BatchPrimitive(const JobInfo& jobInfo) : JobStep(jobInfo) @@ -1697,7 +1533,7 @@ class FilterStep : public JobStep /** @brief class PassThruStep * */ -class PassThruStep : public JobStep, public PrimitiveMsg +class PassThruStep : public JobStep { typedef std::pair element_t; diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index be03066c8..05db9abbb 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -64,7 +64,6 @@ ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) , fHjNumThreads(defaultNumThreads) , fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan) , fJlNumScanReceiveThreads(defaultScanReceiveThreads) - , fTwNumThreads(defaultNumThreads) , fJlMaxOutstandingRequests(defaultMaxOutstandingRequests) , fHJUmMaxMemorySmallSideDistributor( fHashJoinStr, "UmMaxMemorySmallSide", @@ -101,7 +100,6 @@ ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) { fHjNumThreads = fNumCores; fJlNumScanReceiveThreads = fNumCores; - fTwNumThreads = fNumCores; } // possibly override any calculated values @@ -139,11 +137,6 @@ ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) fDECConnectionsPerQuery = (fDECConnectionsPerQuery) ? fDECConnectionsPerQuery : getPsConnectionsPerPrimProc(); - temp = getIntVal(fTupleWSDLStr, "NumThreads", -1); - - if (temp > 0) - fTwNumThreads = temp; - pmJoinMemLimit = getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide); // Need to use different limits if this instance isn't running on the UM, @@ -317,106 +310,6 @@ void ResourceManager::logResourceChangeMessage(logging::LOG_TYPE logType, uint32 log.logMessage(logType, mid, args, logging::LoggingID(5, sessionID)); } -void ResourceManager::emServerThreads() -{ -} -void ResourceManager::emServerQueueSize() -{ -} -void ResourceManager::emSecondsBetweenMemChecks() -{ -} -void ResourceManager::emMaxPct() -{ -} -void ResourceManager::emPriority() -{ -} -void ResourceManager::emExecQueueSize() -{ -} - -void ResourceManager::hjNumThreads() -{ -} -void ResourceManager::hjMaxBuckets() -{ -} -void ResourceManager::hjMaxElems() -{ -} -void ResourceManager::hjFifoSizeLargeSide() -{ -} -void ResourceManager::hjPmMaxMemorySmallSide() -{ -} - -void ResourceManager::jlFlushInterval() -{ -} -void ResourceManager::jlFifoSize() -{ -} -void ResourceManager::jlScanLbidReqLimit() -{ -} -void ResourceManager::jlScanLbidReqThreshold() -{ -} -void ResourceManager::jlProjectBlockReqLimit() -{ -} -void ResourceManager::jlProjectBlockReqThreshold() -{ -} -void ResourceManager::jlNumScanReceiveThreads() -{ -} - -void ResourceManager::psCount() -{ -} -void ResourceManager::psConnectionsPerPrimProc() -{ -} -void ResourceManager::psLBID_Shift() -{ -} - -void ResourceManager::scTempDiskPath() -{ -} -void ResourceManager::scTempSaveSize() -{ -} -void ResourceManager::scWorkingDir() -{ -} - -void ResourceManager::twMaxSize() -{ -} -void ResourceManager::twInitialCapacity() -{ -} -void ResourceManager::twMaxBuckets() -{ -} -void ResourceManager::twNumThreads() -{ -} -void ResourceManager::zdl_MaxElementsInMem() -{ -} -void ResourceManager::zdl_MaxElementsPerBucket() -{ -} - -void ResourceManager::hbrPredicate() -{ -} - bool ResourceManager::getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const { static const std::string hostUserUnassignedValue("unassigned"); diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 76b08760b..d7f9cfef0 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -52,31 +52,18 @@ const uint32_t defaultNumThreads = 8; // joblistfactory const uint32_t defaultFlushInterval = 8 * 1024; const uint32_t defaultFifoSize = 10; -const uint32_t defaultHJFifoSizeLargeSide = 128; const uint64_t defaultHJMaxElems = 512 * 1024; // hashjoin uses 8192 const int defaultHJMaxBuckets = 32; // hashjoin uses 4 const uint64_t defaultHJPmMaxMemorySmallSide = 1 * 1024 * 1024 * 1024ULL; const uint64_t defaultHJUmMaxMemorySmallSide = 4 * 1024 * 1024 * 1024ULL; -const uint32_t defaultTempSaveSize = defaultHJMaxElems; const uint64_t defaultTotalUmMemory = 8 * 1024 * 1024 * 1024ULL; -const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL; - -const uint32_t defaultTupleDLMaxSize = 64 * 1024; - const uint32_t defaultJLThreadPoolSize = 100; // pcolscan.cpp -const uint32_t defaultScanLbidReqLimit = 10000; const uint32_t defaultScanLbidReqThreshold = 5000; const uint32_t defaultLogicalBlocksPerScan = 1024; // added for bug 1264. -const uint32_t defaultScanBlockThreshhold = 10000; // in jobstep.h - const uint32_t defaultScanReceiveThreads = 8; -// pcolstep.cpp -const uint32_t defaultProjectBlockReqLimit = 32 * 1024; -const uint32_t defaultProjectBlockReqThreshold = 16 * 1024; // 256 in jobstep.h - // BatchPrimitiveStep const uint32_t defaultRequestSize = 1; const uint32_t defaultMaxOutstandingRequests = 20; @@ -85,15 +72,6 @@ const uint32_t defaultJoinerChunkSize = 16 * 1024 * 1024; // bucketreuse const std::string defaultTempDiskPath = "/tmp"; -const std::string defaultWorkingDir = "."; //"/tmp"; - -// largedatalist -const uint32_t defaultLDLMaxElements = 32 * 1024 * 1024; - -// zdl -const uint64_t defaultMaxElementsInMem = 32 * 1024 * 1024; -const uint64_t defaultNumBuckets = 128; -const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024; const int defaultEMServerThreads = 50; const int defaultEMSecondsBetweenMemChecks = 1; @@ -101,11 +79,8 @@ const int defaultEMMaxPct = 95; const int defaultEMPriority = 21; // @Bug 3385 const int defaultEMExecQueueSize = 20; -const uint64_t defaultInitialCapacity = 1024 * 1024; -const int defaultTWMaxBuckets = 256; const int defaultPSCount = 0; const int defaultConnectionsPerPrimProc = 1; -const uint32_t defaultLBID_Shift = 13; const uint64_t defaultExtentRows = 8 * 1024 * 1024; // DMLProc @@ -120,13 +95,8 @@ const uint64_t defaultRowsPerBatch = 10000; /* HJ CP feedback, see bug #1465 */ const uint32_t defaultHjCPUniqueLimit = 100; -// Order By and Limit -const uint64_t defaultOrderByLimitMaxMemory = 1 * 1024 * 1024 * 1024ULL; - const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB -const uint8_t defaultUseCpimport = 1; - const bool defaultAllowDiskAggregation = false; /** @brief ResourceManager @@ -172,7 +142,7 @@ class ResourceManager { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); } - EXPORT int getEmPriority() const; + EXPORT int getEmPriority() const; // FOr Windows only int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); @@ -200,10 +170,6 @@ class ResourceManager { return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems); } - uint32_t getHjFifoSizeLargeSide() const - { - return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide); - } uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); @@ -221,10 +187,6 @@ class ResourceManager { return getUintVal(fJobListStr, "FifoSize", defaultFifoSize); } - uint32_t getJlScanLbidReqLimit() const - { - return getUintVal(fJobListStr, "ScanLbidReqLimit", defaultScanLbidReqLimit); - } uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr, "ScanLbidReqThreshold", defaultScanLbidReqThreshold); @@ -250,14 +212,6 @@ class ResourceManager { return getUintVal(fJobListStr, "LogicalBlocksPerScan", defaultLogicalBlocksPerScan); } - uint32_t getJlProjectBlockReqLimit() const - { - return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit); - } - uint32_t getJlProjectBlockReqThreshold() const - { - return getUintVal(fJobListStr, "ProjectBlockReqThreshold", defaultProjectBlockReqThreshold); - } uint32_t getJlNumScanReceiveThreads() const { return fJlNumScanReceiveThreads; @@ -290,49 +244,15 @@ class ResourceManager { return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc); } - uint32_t getPsLBID_Shift() const - { - return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift); - } - std::string getScTempDiskPath() const { return startup::StartUp::tmpDir(); } - uint64_t getScTempSaveSize() const - { - return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize); - } std::string getScWorkingDir() const { return startup::StartUp::tmpDir(); } - uint32_t getTwMaxSize() const - { - return getUintVal(fTupleWSDLStr, "MaxSize", defaultTupleDLMaxSize); - } - uint64_t getTwInitialCapacity() const - { - return getUintVal(fTupleWSDLStr, "InitialCapacity", defaultInitialCapacity); - } - int getTwMaxBuckets() const - { - return getUintVal(fTupleWSDLStr, "MaxBuckets", defaultTWMaxBuckets); - } - uint8_t getTwNumThreads() const - { - return fTwNumThreads; - } // getUintVal(fTupleWSDLStr, "NumThreads", defaultNumThreads ); } - uint64_t getZdl_MaxElementsInMem() const - { - return getUintVal(fZDLStr, "ZDL_MaxElementsInMem", defaultMaxElementsInMem); - } - uint64_t getZdl_MaxElementsPerBucket() const - { - return getUintVal(fZDLStr, "ZDL_MaxElementsPerBucket", defaultMaxElementsPerBuckert); - } - uint64_t getExtentRows() const { return getUintVal(fExtentMapStr, "ExtentRows", defaultExtentRows); @@ -347,13 +267,6 @@ class ResourceManager return getUintVal(fPrimitiveServersStr, "Count", 1); } - std::vector getHbrPredicate() const - { - std::vector columns; - fConfig->getConfig(fHashBucketReuseStr, "Predicate", columns); - return columns; - } - uint64_t getDMLMaxDeleteRows() const { return getUintVal(fDMLProcStr, "MaxDeleteRows", defaultDMLMaxDeleteRows); @@ -364,17 +277,6 @@ class ResourceManager return getUintVal(fBatchInsertStr, "RowsPerBatch", defaultRowsPerBatch); } - uint8_t getUseCpimport() const - { - int val = getIntVal(fBatchInsertStr, "UseCpimport", defaultUseCpimport); - return val; - } - - uint64_t getOrderByLimitMaxMemory() const - { - return getUintVal(fOrderByLimitStr, "MaxMemory", defaultOrderByLimitMaxMemory); - } - uint64_t getDECThrottleThreshold() const { return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold); @@ -455,32 +357,6 @@ class ResourceManager fHJUmMaxMemorySmallSideDistributor.returnResource(mem); } - EXPORT void jlFlushInterval(); - EXPORT void jlFifoSize(); - EXPORT void jlScanLbidReqLimit(); - EXPORT void jlScanLbidReqThreshold(); - EXPORT void jlProjectBlockReqLimit(); - EXPORT void jlProjectBlockReqThreshold(); - EXPORT void jlNumScanReceiveThreads(); - - EXPORT void psCount(); - EXPORT void psConnectionsPerPrimProc(); - EXPORT void psLBID_Shift(); - - EXPORT void scTempDiskPath(); - EXPORT void scTempSaveSize(); - EXPORT void scWorkingDir(); - - EXPORT void twMaxSize(); - EXPORT void twInitialCapacity(); - EXPORT void twMaxBuckets(); - EXPORT void twNumThreads(); - - EXPORT void zdl_MaxElementsInMem(); - EXPORT void zdl_MaxElementsPerBucket(); - - EXPORT void hbrPredicate(); - void setTraceFlags(uint32_t flags) { fTraceFlags = flags; @@ -577,12 +453,9 @@ class ResourceManager std::string fExeMgrStr; inline static const std::string fHashJoinStr = "HashJoin"; - inline static const std::string fHashBucketReuseStr = "HashBucketReuse"; inline static const std::string fJobListStr = "JobList"; inline static const std::string fPrimitiveServersStr = "PrimitiveServers"; /*static const*/ std::string fSystemConfigStr; - inline static const std::string fTupleWSDLStr = "TupleWSDL"; - inline static const std::string fZDLStr = "ZDL"; inline static const std::string fExtentMapStr = "ExtentMap"; /*static const*/ std::string fDMLProcStr; /*static const*/ std::string fBatchInsertStr; @@ -596,7 +469,6 @@ class ResourceManager unsigned fHjNumThreads; uint32_t fJlProcessorThreadsPerScan; uint32_t fJlNumScanReceiveThreads; - uint8_t fTwNumThreads; uint32_t fJlMaxOutstandingRequests; /* old HJ support */ diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index d8e161301..56393792e 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -9,61 +9,6 @@ 8601 unassigned - - 0.0.0.0 - 8602 - - - 127.0.0.1 - 8603 - - - 127.0.0.1 - 8606 - - - 127.0.0.1 - 8604 - - - 0.0.0.0 - 8605 - - - - - 127.0.0.1 - 8800 - - - 0.0.0.0 - 8800 - - - 0.0.0.0 - 8800 - - - 127.0.0.1 - 8800 - - - 0.0.0.0 - 8622 - - - 0.0.0.0 - 8622 - - - 127.0.0.1 - 8622 - 127.0.0.1 8630 @@ -85,7 +30,6 @@ 128 10K 0 - 13 512 512 @@ -96,206 +40,40 @@ y - - - + + + 127.0.0.1 8620 - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - columnstore-1 pm1 - unassigned pm1 - 1 - 3 - 12 // 2.5 minutes - 1 - /var/lib/columnstore/data1 - /var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves - /var/lib/columnstore/data1/systemFiles/dbrm/tablelocks + /var/lib/columnstore/data1 + /var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves + /var/lib/columnstore/data1/systemFiles/dbrm/tablelocks 15 100000 - 90 - 80 - 70 - 10 - 0.0.0.0 - 128M 10 - 10 - 120 - restartSystem - n 95 OFF /rdwrscratch - /columnstore_tmp_files /tmp/columnstore_tmp_files - dm - Director Module - SIMPLEX - 0 - 0.0.0.0 - unassigned - ENABLED - 0 - 0 - 0 - 0 - 90 - 80 - 70 - 90 - 0 - 0 - 90 - 80 - 70 - / - unassigned - unassigned um User Module - SIMPLEX 0 0.0.0.0 unassigned @@ -318,7 +96,6 @@ unassigned pm Performance Module - SIMPLEX 1 127.0.0.1 localhost @@ -340,30 +117,24 @@ 1 1 - - 0 - unassigned - 0.0.0.0 - ENABLED - 1000 - /var/lib/columnstore/data1/systemFiles/dbrm/SMTxnID + /var/lib/columnstore/data1/systemFiles/dbrm/SMTxnID + One version buffer file will be put on each DB root. --> 1GB - /var/lib/columnstore/data1/systemFiles/dbrm/oidbitmap + /var/lib/columnstore/data1/systemFiles/dbrm/oidbitmap 3000 - /var/log/mariadb/columnstore/data/bulk - /var/lib/columnstore/data1/systemFiles/bulkRollback + /var/log/mariadb/columnstore/data/bulk + /var/lib/columnstore/data1/systemFiles/bulkRollback 98 1 @@ -378,55 +149,10 @@ 8700 pm1 - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - + 1 @@ -439,40 +165,13 @@ 50 - n - y - y 2 y n internal internal - /etc/rsyslog.d/49-columnstore.conf - unassigned - autoassign - unassigned - unassigned - unassigned - gp2 - unassigned - unassigned - unassigned - gp2 - unassigned - y - y - 0 - unassigned - unassigned - n - 0 - unassigned - n - 3306 - /dev/xvd - /var/lock/subsys /etc/profile.d/columnstoreAlias.sh - + - 64 1G 25% 100 N Y - Snappy + Snappy 16K @@ -511,11 +209,6 @@ 100 - - 1M - 1M - 512 - @@ -540,17 +233,17 @@ Y - Snappy + Snappy 127.0.0.1 0 - - 30 - N - - - - + + 30 + N + + + + diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver deleted file mode 100644 index 0f70acc4e..000000000 --- a/oam/etc/Columnstore.xml.singleserver +++ /dev/null @@ -1,547 +0,0 @@ - - - - - 127.0.0.1 - 8601 - pm1 - - - 0.0.0.0 - 8602 - - - 127.0.0.1 - 8603 - - - 127.0.0.1 - 8606 - - - 127.0.0.1 - 8604 - - - 0.0.0.0 - 8605 - - - 127.0.0.1 - 8800 - - - 0.0.0.0 - 8800 - - - 0.0.0.0 - 8800 - - - 127.0.0.1 - 8800 - - - 0.0.0.0 - 8622 - - - 0.0.0.0 - 8622 - - - 127.0.0.1 - 8622 - - - 127.0.0.1 - 8630 - - - 127.0.0.1 - 8612 - - - 127.0.0.1 - 8614 - - - 10000 - - - 1 - 8 - 128 - - 10K - 0 - 13 - 512 - 512 - - 1 - 0 - n - - - - 95 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - 127.0.0.1 - 8620 - - - C - columnstore-1 - pm1 - unassigned - - - 1 - /var/lib/columnstore/data1 - /var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves - /var/lib/columnstore/data1/systemFiles/dbrm/tablelocks - 20 - 100000 - 90 - 80 - 70 - /tmp - /tmp - 10 - 0.0.0.0 - 128M - 10 - /tmp/columnstore_tmp_files - 10 - 3 - 10 - 120 - restartSystem - n - 95 - OFF - - /tmp/rdwrscratch - - /tmp/columnstore_tmp_files - - - dm - Director Module - SIMPLEX - 0 - 0.0.0.0 - unassigned - ENABLED - 0 - 0 - 0 - 0 - 90 - 80 - 70 - 90 - 0 - 0 - 90 - 80 - 70 - / - unassigned - unassigned - um - User Module - SIMPLEX - 0 - 0.0.0.0 - unassigned - ENABLED - 0 - 0 - 0 - 0 - 90 - 80 - 70 - 90 - 0 - 0 - 90 - 80 - 70 - / - unassigned - unassigned - pm - Performance Module - SIMPLEX - 1 - 127.0.0.1 - localhost - ENABLED - 0 - 0 - 0 - 0 - 90 - 80 - 70 - 90 - 0 - 0 - 90 - 80 - 70 - / - 1 - 1 - - - 0 - unassigned - 0.0.0.0 - ENABLED - - - 1000 - /var/lib/columnstore/data1/systemFiles/dbrm/CalpontShm - /var/lib/columnstore/data1/systemFiles/dbrm/SMTxnID - - - /var/lib/columnstore/data1/systemFiles/dbrm/CalpontSessionMonitorShm - 10 - - - - 1GB - - - - - /var/lib/columnstore/data1/systemFiles/dbrm/oidbitmap - - 3000 - - - /var/lib/columnstore/data/bulk - /var/lib/columnstore/data/bulk/rollback - 98 - 1 - - - 1 - 127.0.0.1 - 8616 - - - - 127.0.0.1 - 8700 - pm1 - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - 0.0.0.0 - 8700 - unassigned - - - - - 50 - - 1 - 0 - 0 - 65536 - 2K - 200 - 0 - - - n - y - y - 2 - n - n - internal - internal - unassigned - unassigned - unassigned - rpm - unassigned - unassigned - us-east-1 - unassigned - unassigned - unassigned - unassigned - unassigned - unassigned - unassigned - unassigned - n - n - 0 - unassigned - unassigned - n - unassigned - unassigned - 3306 - /var/lock/subsys - - - - 4 - 0x0 - - - 128 - 128K - 64 - 1G - 25% - 10% - 100 - N - Y - - - 16K - 16 - 1 - - - - - 100 - - - 1M - 1M - 512 - - - - - - - - - - 127.0.0.1 - 3306 - root - - - - - - - N - - - N - - - Y - - - 127.0.0.1 - 0 - - - 30 - N - - diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 015f09d83..471535848 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -167,7 +167,6 @@ void Oam::getSystemConfig(const std::string& moduletype, ModuleTypeConfig& modul const string Section = "SystemModuleConfig"; const string MODULE_TYPE = "ModuleType"; const string MODULE_DESC = "ModuleDesc"; - const string MODULE_RUN_TYPE = "RunType"; const string MODULE_COUNT = "ModuleCount"; const string MODULE_DISABLE_STATE = "ModuleDisableState"; const string MODULE_CPU_CRITICAL = "ModuleCPUCriticalThreshold"; @@ -198,7 +197,6 @@ void Oam::getSystemConfig(const std::string& moduletype, ModuleTypeConfig& modul string ModuleCount = MODULE_COUNT + itoa(moduleTypeID); string ModuleType = MODULE_TYPE + itoa(moduleTypeID); string ModuleDesc = MODULE_DESC + itoa(moduleTypeID); - string ModuleRunType = MODULE_RUN_TYPE + itoa(moduleTypeID); string ModuleCPUCriticalThreshold = MODULE_CPU_CRITICAL + itoa(moduleTypeID); string ModuleCPUMajorThreshold = MODULE_CPU_MAJOR + itoa(moduleTypeID); string ModuleCPUMinorThreshold = MODULE_CPU_MINOR + itoa(moduleTypeID); @@ -216,7 +214,6 @@ void Oam::getSystemConfig(const std::string& moduletype, ModuleTypeConfig& modul moduletypeconfig.ModuleCount = strtol(sysConfig->getConfig(Section, ModuleCount).c_str(), 0, 0); moduletypeconfig.ModuleType = sysConfig->getConfig(Section, ModuleType); moduletypeconfig.ModuleDesc = sysConfig->getConfig(Section, ModuleDesc); - moduletypeconfig.RunType = sysConfig->getConfig(Section, ModuleRunType); moduletypeconfig.ModuleCPUCriticalThreshold = strtol(sysConfig->getConfig(Section, ModuleCPUCriticalThreshold).c_str(), 0, 0); moduletypeconfig.ModuleCPUMajorThreshold = diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index c2df5bf24..b4fbd207b 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -115,7 +115,6 @@ const std::string UnassignedName = "unassigned"; const std::string configSections[] = {"SystemConfig", "SystemModuleConfig", "SystemModuleConfig", - "SystemExtDeviceConfig", "SessionManager", "VersionBuffer", "OIDManager", @@ -235,7 +234,6 @@ struct ModuleTypeConfig_s { std::string ModuleType; //!< Module Type std::string ModuleDesc; //!< Module Description - std::string RunType; //!< Run Type uint16_t ModuleCount; //!< Module Equipage Count uint16_t ModuleCPUCriticalThreshold; //!< CPU Critical Threahold % uint16_t ModuleCPUMajorThreshold; //!< CPU Major Threahold % diff --git a/oamapps/columnstoreSupport/columnstoreSupport.cpp b/oamapps/columnstoreSupport/columnstoreSupport.cpp index 0f8abad74..d7bf28a9b 100644 --- a/oamapps/columnstoreSupport/columnstoreSupport.cpp +++ b/oamapps/columnstoreSupport/columnstoreSupport.cpp @@ -62,7 +62,6 @@ string rootPassword = ""; string debug_flag = "0"; string mysqlpw = " "; string tmpDir; -string ProfileFile; int runningThreads = 0; pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; @@ -121,9 +120,8 @@ void* childReportThread(threadInfo_t* st) cout << "Get " + reportType + " report data for " + remoteModuleName + " " << endl; - string cmd = "remote_command.sh " + remoteModuleIP + " " + rootPassword + " '. " + ProfileFile + ";" + - reportType + "Report.sh " + remoteModuleName + "' " + debug_flag + " - forcetty"; - + string cmd = "remote_command.sh " + remoteModuleIP + " " + rootPassword + ";" + reportType + "Report.sh " + + remoteModuleName + "' " + debug_flag + " - forcetty"; int rtnCode = system(cmd.c_str()); if (WEXITSTATUS(rtnCode) != 0) @@ -333,7 +331,7 @@ int main(int argc, char* argv[]) } // get Local Module Name and Server Install Indicator - string singleServerInstall; + string singleServerInstall = "n"; oamModuleInfo_t st; @@ -348,15 +346,6 @@ int main(int argc, char* argv[]) exit(-1); } - try - { - oam.getSystemConfig("SingleServerInstall", singleServerInstall); - } - catch (...) - { - singleServerInstall = "y"; - } - if (argc == 1) { argv[1] = &helpArg[0]; @@ -592,17 +581,6 @@ int main(int argc, char* argv[]) exit(-1); } - // Get Profile file - try - { - ProfileFile = sysConfig->getConfig(InstallSection, "ProfileFile"); - } - catch (...) - { - cout << "ERROR: Problem getting ProfileFile" << endl; - exit(-1); - } - string ModuleSection = "SystemModuleConfig"; for (unsigned int i = 0; i < sysModuleTypeConfig.moduletypeconfig.size(); i++) diff --git a/oamapps/columnstoreSupport/mcsSupportUtil.cpp b/oamapps/columnstoreSupport/mcsSupportUtil.cpp index 4c3e7106e..586a342e5 100644 --- a/oamapps/columnstoreSupport/mcsSupportUtil.cpp +++ b/oamapps/columnstoreSupport/mcsSupportUtil.cpp @@ -160,7 +160,6 @@ void getModuleTypeConfig(FILE* pOutputFile) fprintf(pOutputFile, "ModuleType '%s' Configuration information\n", moduletype.c_str()); fprintf(pOutputFile, "ModuleDesc = %s\n", systemmoduletypeconfig.moduletypeconfig[i].ModuleDesc.c_str()); - fprintf(pOutputFile, "RunType = %s\n", systemmoduletypeconfig.moduletypeconfig[i].RunType.c_str()); fprintf(pOutputFile, "ModuleCount = %i\n", moduleCount); if (moduleCount > 0) @@ -379,23 +378,16 @@ void getStorageConfig(FILE* pOutputFile) string volumeName = oam::UnassignedName; string deviceNameID = "PMVolumeDeviceName" + oam.itoa(*pt); string deviceName = oam::UnassignedName; - string amazonDeviceNameID = "PMVolumeAmazonDeviceName" + oam.itoa(*pt); - string amazondeviceName = oam::UnassignedName; try { oam.getSystemConfig(volumeNameID, volumeName); oam.getSystemConfig(deviceNameID, deviceName); - oam.getSystemConfig(amazonDeviceNameID, amazondeviceName); } catch (...) { continue; } - - fprintf(pOutputFile, - "Amazon EC2 Volume Name/Device Name/Amazon Device Name for DBRoot%u: %s, %s, %s", *pt, - volumeName.c_str(), deviceName.c_str(), amazondeviceName.c_str()); } } catch (exception& e) @@ -412,22 +404,16 @@ void getStorageConfig(FILE* pOutputFile) string volumeName = oam::UnassignedName; string deviceNameID = "PMVolumeDeviceName" + oam.itoa(*pt1); string deviceName = oam::UnassignedName; - string amazonDeviceNameID = "PMVolumeAmazonDeviceName" + oam.itoa(*pt1); - string amazondeviceName = oam::UnassignedName; try { oam.getSystemConfig( volumeNameID, volumeName); oam.getSystemConfig( deviceNameID, deviceName); - oam.getSystemConfig( amazonDeviceNameID, amazondeviceName); } catch (...) { continue; } - - fprintf(pOutputFile,"Amazon EC2 Volume Name/Device Name/Amazon Device Name for DBRoot%u: %s, %s, - %s",*pt1,volumeName.c_str(),deviceName.c_str(),amazondeviceName.c_str()); }*/ } diff --git a/primitives/primproc/bpp.h b/primitives/primproc/bpp.h index 2190129bb..8d9161695 100644 --- a/primitives/primproc/bpp.h +++ b/primitives/primproc/bpp.h @@ -27,7 +27,6 @@ // Copyright: See COPYING file that comes with this distribution // // -#include "primitivemsg.h" #include "bytestream.h" #include "messagequeue.h" #include "serializeable.h" diff --git a/primitives/primproc/dictstep.cpp b/primitives/primproc/dictstep.cpp index 7688a9f9f..69fa86ccc 100644 --- a/primitives/primproc/dictstep.cpp +++ b/primitives/primproc/dictstep.cpp @@ -117,7 +117,6 @@ void DictStep::prep(int8_t outputType, bool makeAbsRids) primMsg->ism.Interleave = 0; primMsg->ism.Flags = 0; - // primMsg->ism.Flags = PrimitiveMsg::planFlagsToPrimFlags(traceFlags); primMsg->ism.Command = DICT_SIGNATURE; primMsg->ism.Size = bufferSize; primMsg->ism.Type = 2; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 260a3ca96..4d761899a 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -601,7 +601,6 @@ int ServicePrimProc::Child() if (temp >= 0) maxPct = temp; - // @bug4507, configurable pm aggregation AggregationMemoryCheck // We could use this same mechanism for other growing buffers. int aggPct = 95; temp = toInt(cf->getConfig("SystemConfig", "MemoryCheckPercent")); diff --git a/primitives/primproc/umsocketselector.cpp b/primitives/primproc/umsocketselector.cpp index 7b58fa73a..5cdd5f383 100644 --- a/primitives/primproc/umsocketselector.cpp +++ b/primitives/primproc/umsocketselector.cpp @@ -119,7 +119,6 @@ void UmSocketSelector::loadUMModuleInfo() std::cout << "ModuleConfig for type: " << UM_MODTYPE << std::endl; std::cout << "ModuleDesc = " << moduleTypeConfig.ModuleDesc << std::endl; std::cout << "ModuleCount = " << moduleCount << std::endl; - std::cout << "RunType = " << moduleTypeConfig.RunType << std::endl; #endif if (moduleCount > 0) diff --git a/utils/configcpp/tdriver2.cpp b/utils/configcpp/tdriver2.cpp index e652ac045..f6d00cc49 100644 --- a/utils/configcpp/tdriver2.cpp +++ b/utils/configcpp/tdriver2.cpp @@ -65,7 +65,6 @@ class WOConfigFileTest : public CppUnit::TestFixture void test1() { WriteOnceConfig woc(cf); - CPPUNIT_ASSERT(woc.owns("PrimitiveServers", "LBID_Shift")); CPPUNIT_ASSERT(woc.owns("SystemConfig", "DBRootCount")); CPPUNIT_ASSERT(woc.owns("SystemConfig", "DBRMRoot")); @@ -73,9 +72,6 @@ class WOConfigFileTest : public CppUnit::TestFixture int vali; - vali = Config::fromText(woc.getConfig("PrimitiveServers", "LBID_Shift")); - CPPUNIT_ASSERT(vali == 13); - woc.setConfig("SystemConfig", "DBRootCount", "10"); vali = Config::fromText(woc.getConfig("SystemConfig", "DBRootCount")); CPPUNIT_ASSERT(vali == 10); diff --git a/utils/winport/fixup.cpp b/utils/winport/fixup.cpp index dfba53a50..2e909d564 100644 --- a/utils/winport/fixup.cpp +++ b/utils/winport/fixup.cpp @@ -258,22 +258,6 @@ int fixupCalpontXML() cout << "Fixing " << section << "." << parm << " = " << val << endl; } - // Fixup WES - section = "pm1_WriteEngineServer"; - parm = "IPAddr"; - val = cf->getConfig(section, parm); - - if (val.empty()) - { - val = "127.0.0.1"; - cf->setConfig(section, parm, val); - cout << "Adding " << section << "." << parm << " = " << val << endl; - parm = "Port"; - val = "8630"; - cf->setConfig(section, parm, val); - cout << "Adding " << section << "." << parm << " = " << val << endl; - } - // Fixup TableLockSaveFile section = "SystemConfig"; parm = "TableLockSaveFile";