diff --git a/dbcon/ddlpackageproc/ddlindexpopulator.cpp b/dbcon/ddlpackageproc/ddlindexpopulator.cpp index dd0a05196..e5062e04c 100644 --- a/dbcon/ddlpackageproc/ddlindexpopulator.cpp +++ b/dbcon/ddlpackageproc/ddlindexpopulator.cpp @@ -65,7 +65,7 @@ namespace ddlpackageprocessor { CalpontSelectExecutionPlan csep; makeCsep(csep); - ResourceManager rm; + ResourceManager *rm; if (! fEC) { fEC = DistributedEngineComm::instance(rm); diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index 635927c7b..2d7ba3bc6 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -795,7 +795,7 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, ByteStream bs; uint32_t status; - ResourceManager rm(true); + ResourceManager *rm = ResourceManager::instance(true); DistributedEngineComm* fEc = DistributedEngineComm::instance(rm); SJLP jl = JobListFactory::makeJobList(&csep, rm, true); //@bug 2221. Work around to prevent DMLProc crash. @@ -5349,8 +5349,8 @@ void CalpontSystemCatalog::buildSysColinfomap() aCol.precision = 10; aCol.compressionType = 0; - ResourceManager rm; - if( rm.useHdfs() ) + ResourceManager *rm = ResourceManager::instance(); + if( rm->useHdfs() ) aCol.compressionType = 2; DictOID notDict; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index d95950252..ce47ff0bd 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -52,7 +52,7 @@ using namespace joiner; namespace joblist { -BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager& rm) : +BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) : ot(BPS_ELEMENT_TYPE), needToSetLBID(true), count(1), @@ -75,7 +75,7 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager& rm) bop(BOP_AND), forHJ(false), threadCount(1), - fJoinerChunkSize(rm.getJlJoinerChunkSize()), + fJoinerChunkSize(rm->getJlJoinerChunkSize()), hasSmallOuterJoin(false), _priority(1) { diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 99d628745..bb3880e0a 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -57,7 +57,7 @@ class BatchPrimitiveProcessorJL { public: /* Constructor used by the JobStep */ - explicit BatchPrimitiveProcessorJL(const ResourceManager& rm); + explicit BatchPrimitiveProcessorJL(const ResourceManager* rm); ~BatchPrimitiveProcessorJL(); /* Interface used by the JobStep */ diff --git a/dbcon/joblist/bucketdl.h b/dbcon/joblist/bucketdl.h index 3a391f71a..894921389 100644 --- a/dbcon/joblist/bucketdl.h +++ b/dbcon/joblist/bucketdl.h @@ -61,7 +61,7 @@ class BucketDL : public DataList * @param hash The function object that calculates which bucket an elements goes into on insertion. */ - BucketDL(uint32_t numBuckets, uint32_t numConsumers, uint32_t maxElementsPerBucket, ResourceManager& rm, + BucketDL(uint32_t numBuckets, uint32_t numConsumers, uint32_t maxElementsPerBucket, ResourceManager* rm, boost::function hash = utils::Hasher()); virtual ~BucketDL(); @@ -146,7 +146,7 @@ class BucketDL : public DataList explicit BucketDL(const BucketDL &); BucketDL& operator=(const BucketDL &); - ResourceManager& fRm; + ResourceManager* fRm; WSDL **buckets; WSDL **rbuckets; TWSVec fTBuckets; @@ -169,7 +169,7 @@ class BucketDL : public DataList }; template -BucketDL::BucketDL(uint32_t nb, uint32_t nc, uint32_t me, ResourceManager& rm, +BucketDL::BucketDL(uint32_t nb, uint32_t nc, uint32_t me, ResourceManager* rm, boost::function hash) : base(), fRm(rm), buckets(0), rbuckets(0), fTraceOn(false), fHashLen(0), fElementLen(0), bucketDoneCount(0), fReuseControl(NULL) diff --git a/dbcon/joblist/columncommand-jl.cpp b/dbcon/joblist/columncommand-jl.cpp index 1af4474fe..a7a5adc22 100644 --- a/dbcon/joblist/columncommand-jl.cpp +++ b/dbcon/joblist/columncommand-jl.cpp @@ -103,8 +103,8 @@ ColumnCommandJL::ColumnCommandJL(const pColStep &step) OID = step.fOid; colName = step.fName; fIsDict = step.fIsDict; - ResourceManager rm; - numDBRoots = rm.getDBRootCount(); + ResourceManager *rm = ResourceManager::instance(); + numDBRoots = rm->getDBRootCount(); // grab the last LBID for this column. It's a _minor_ optimization for the block loader. //dbrm.getLastLocalHWM((BRM::OID_t)OID, dbroot, partNum, segNum, lastHWM); diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 98098fe62..e310d3d14 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -417,7 +417,7 @@ int64_t CrossEngineStep::convertValueNum( void CrossEngineStep::getMysqldInfo(const JobInfo& jobInfo) { - if (jobInfo.rm.getMysqldInfo(fHost, fUser, fPasswd, fPort) == false) + if (jobInfo.rm->getMysqldInfo(fHost, fUser, fPasswd, fPort) == false) throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG); } diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index dab56b82a..856440ddf 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -109,7 +109,7 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bo if (largeLimit == 0) largeLimit = numeric_limits::max(); - uint64_t totalUMMemory = thjs->resourceManager.getConfiguredUMMemLimit(); + uint64_t totalUMMemory = thjs->resourceManager->getConfiguredUMMemLimit(); jp.reset(new JoinPartition(largeRG, smallRG, smallKeyCols, largeKeyCols, typeless, (joinType & ANTI) && (joinType & MATCHNULLS), (bool) fe, totalUMMemory, partitionSize)); diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a059623fc..1243cdcbe 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -171,7 +171,7 @@ namespace joblist DistributedEngineComm* DistributedEngineComm::fInstance = 0; /*static*/ - DistributedEngineComm* DistributedEngineComm::instance(ResourceManager& rm, bool isExeMgr) + DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr) { if (fInstance == 0) fInstance = new DistributedEngineComm(rm, isExeMgr); @@ -186,9 +186,9 @@ namespace joblist fInstance = 0; } - DistributedEngineComm::DistributedEngineComm(ResourceManager& rm, bool isExeMgr) : + DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) : fRm(rm), - fLBIDShift(fRm.getPsLBID_Shift()), + fLBIDShift(fRm->getPsLBID_Shift()), pmCount(0), fIsExeMgr(isExeMgr) { @@ -219,10 +219,10 @@ void DistributedEngineComm::Setup() newClients.clear(); newLocks.clear(); - throttleThreshold = fRm.getDECThrottleThreshold(); - uint32_t newPmCount = fRm.getPsCount(); - int cpp = (fIsExeMgr ? fRm.getPsConnectionsPerPrimProc() : 1); - tbpsThreadCount = fRm.getJlNumScanReceiveThreads(); + throttleThreshold = fRm->getDECThrottleThreshold(); + uint32_t newPmCount = fRm->getPsCount(); + int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1); + tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); unsigned numConnections = newPmCount * cpp; oam::Oam oam; ModuleTypeConfig moduletypeconfig; @@ -246,7 +246,7 @@ void DistributedEngineComm::Setup() string fServer (oss.str()); boost::shared_ptr - cl(new MessageQueueClient(fServer, fRm.getConfig())); + cl(new MessageQueueClient(fServer, fRm->getConfig())); boost::shared_ptr nl(new boost::mutex()); try { if (cl->connect()) { diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 0c29cd1a9..1737fa24f 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -89,7 +89,7 @@ public: */ EXPORT virtual ~DistributedEngineComm(); - EXPORT static DistributedEngineComm* instance(ResourceManager& rm, bool isExeMgr=false); + EXPORT static DistributedEngineComm* instance(ResourceManager* rm, bool isExeMgr=false); /** @brief delete the static instance * This has the effect of causing the connection to be rebuilt @@ -217,7 +217,7 @@ private: //The mapping of session ids to StepMsgQueueLists typedef std::map > MessageQueueMap; - explicit DistributedEngineComm(ResourceManager& rm, bool isExeMgr); + explicit DistributedEngineComm(ResourceManager* rm, bool isExeMgr); void StartClientListener(boost::shared_ptr cl, uint32_t connIndex); @@ -234,7 +234,7 @@ private: uint32_t senderID = std::numeric_limits::max(), bool doInterleaving=false); static DistributedEngineComm* fInstance; - ResourceManager& fRm; + ResourceManager* fRm; ClientList fPmConnections; // all the pm servers ReaderList fPmReader; // all the reader threads for the pm servers diff --git a/dbcon/joblist/groupconcat.cpp b/dbcon/joblist/groupconcat.cpp index 2d93d0f96..50812dfc7 100644 --- a/dbcon/joblist/groupconcat.cpp +++ b/dbcon/joblist/groupconcat.cpp @@ -83,7 +83,7 @@ void GroupConcatInfo::prepGroupConcat(JobInfo& jobInfo) groupConcat->fSeparator = gcc->separator(); groupConcat->fDistinct = gcc->distinct(); groupConcat->fSize = gcc->resultType().colWidth; - groupConcat->fRm = &(jobInfo.rm); + groupConcat->fRm = jobInfo.rm; groupConcat->fSessionMemLimit = jobInfo.umMemLimit; int key = -1; diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index ce0f3d153..288a7527b 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -165,23 +165,23 @@ struct TupleKeyInfo //------------------------------------------------------------------------------ struct JobInfo { - JobInfo(ResourceManager& r) : + JobInfo(ResourceManager* r) : rm(r), sessionId(0), txnId(0), statementId(0), - maxBuckets(rm.getHjMaxBuckets()), - maxElems(rm.getHjMaxElems()), - flushInterval(rm.getJLFlushInterval()), - fifoSize(rm.getJlFifoSize()), - fifoSizeLargeSideHj(rm.getHjFifoSizeLargeSide()), - scanLbidReqLimit(rm.getJlScanLbidReqLimit()), - scanLbidReqThreshold(rm.getJlScanLbidReqThreshold()), - tempSaveSize(rm.getScTempSaveSize()), + maxBuckets(rm->getHjMaxBuckets()), + maxElems(rm->getHjMaxElems()), + flushInterval(rm->getJLFlushInterval()), + fifoSize(rm->getJlFifoSize()), + fifoSizeLargeSideHj(rm->getHjFifoSizeLargeSide()), + scanLbidReqLimit(rm->getJlScanLbidReqLimit()), + scanLbidReqThreshold(rm->getJlScanLbidReqThreshold()), + tempSaveSize(rm->getScTempSaveSize()), logger(new Logger()), traceFlags(0), - tupleDLMaxSize(rm.getTwMaxSize()), - tupleMaxBuckets(rm.getTwMaxBuckets()), + tupleDLMaxSize(rm->getTwMaxSize()), + tupleMaxBuckets(rm->getTwMaxBuckets()), projectingTableOID(0), isExeMgr(false), trace(false), @@ -202,7 +202,7 @@ struct JobInfo wfqLimitStart(0), wfqLimitCount(-1) { } - ResourceManager& rm; + ResourceManager* rm; uint32_t sessionId; uint32_t txnId; BRM::QueryContext verId; diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 99079710a..da1bacba2 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -1589,7 +1589,7 @@ namespace SJLP makeJobList_( CalpontExecutionPlan* cplan, - ResourceManager& rm, + ResourceManager* rm, bool isExeMgr, unsigned& errCode, string& emsg) { @@ -1609,7 +1609,7 @@ SJLP makeJobList_( jl->setPMsConfigured(pmsConfigured); jl->priority(csep->priority()); jl->errorInfo(errorInfo); - rm.setTraceFlags(csep->traceFlags()); + rm->setTraceFlags(csep->traceFlags()); //Stuff a util struct with some stuff we always need JobInfo jobInfo(rm); @@ -1792,7 +1792,7 @@ namespace joblist /* static */ SJLP JobListFactory::makeJobList( CalpontExecutionPlan* cplan, - ResourceManager& rm, + ResourceManager* rm, bool tryTuple, bool isExeMgr) { diff --git a/dbcon/joblist/joblistfactory.h b/dbcon/joblist/joblistfactory.h index 537a6b03a..21486ed7e 100644 --- a/dbcon/joblist/joblistfactory.h +++ b/dbcon/joblist/joblistfactory.h @@ -65,7 +65,7 @@ public: */ EXPORT static SJLP makeJobList( execplan::CalpontExecutionPlan* cplan, - ResourceManager& rm, + ResourceManager* rm, bool tryTuple=false, bool isExeMgr = false); diff --git a/dbcon/joblist/largedatalist.h b/dbcon/joblist/largedatalist.h index cfb793ad2..d2a8e0dc7 100644 --- a/dbcon/joblist/largedatalist.h +++ b/dbcon/joblist/largedatalist.h @@ -108,7 +108,7 @@ class LargeDataList : public DataListImpl LargeDataList(uint32_t numConsumers, uint32_t elementSaveSize1, uint32_t elementSaveSize2, - const ResourceManager& rm); + const ResourceManager* rm); virtual ~LargeDataList(); virtual void endOfInput(); @@ -191,7 +191,7 @@ class LargeDataList : public DataListImpl }; template -LargeDataList::LargeDataList(uint32_t nc, uint32_t elementSaveSize1st, uint32_t elementSaveSize2nd, const ResourceManager& rm): +LargeDataList::LargeDataList(uint32_t nc, uint32_t elementSaveSize1st, uint32_t elementSaveSize2nd, const ResourceManager* rm): base(nc), path(rm.getScTempDiskPath()), fTraceOn(false), fReUse(false), fSaveForReuse(false), fRestoreInfo(NULL) { // config::Config *config = config::Config::makeConfig(); diff --git a/dbcon/joblist/largehashjoin.cpp b/dbcon/joblist/largehashjoin.cpp index 4394aa30d..ce37c2d62 100644 --- a/dbcon/joblist/largehashjoin.cpp +++ b/dbcon/joblist/largehashjoin.cpp @@ -442,7 +442,7 @@ LargeHashJoin::LargeHashJoin(JoinType joinType, uint32_t sessionId, uint32_t txnId, uint32_t statementId, - ResourceManager& rm ): + ResourceManager* rm ): fSessionId(sessionId), fTxnId(txnId), fStepId(0), fStatementId(statementId), fTableOID1(0), fTableOID2(0), fJoinType(joinType), fRm(rm), fAlias1(), fAlias2() @@ -717,7 +717,7 @@ StringHashJoinStep::StringHashJoinStep(JoinType joinType, uint32_t sessionId, uint32_t txnId, uint32_t statementId, - ResourceManager& rm): + ResourceManager* rm): LargeHashJoin(joinType, sessionId, txnId, statementId, rm) { } diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index 1b85c4b36..2199c7c90 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -57,7 +57,7 @@ LimitedOrderBy::~LimitedOrderBy() void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo) { - fRm = &jobInfo.rm; + fRm = jobInfo.rm; fSessionMemLimit = jobInfo.umMemLimit; fErrorCode = ERR_LIMIT_TOO_BIG; diff --git a/dbcon/joblist/pcolscan.cpp b/dbcon/joblist/pcolscan.cpp index c28e40a66..7a5927b75 100644 --- a/dbcon/joblist/pcolscan.cpp +++ b/dbcon/joblist/pcolscan.cpp @@ -123,7 +123,7 @@ pColScanStep::pColScanStep( const JobInfo& jobInfo) : JobStep(jobInfo), fRm(jobInfo.rm), - fNumThreads(fRm.getJlNumScanReceiveThreads()), + fNumThreads(fRm->getJlNumScanReceiveThreads()), fFilterCount(0), fOid(o), fTableOid(t), @@ -131,8 +131,8 @@ pColScanStep::pColScanStep( fBOP(BOP_OR), sentCount(0), recvCount(0), - fScanLbidReqLimit(fRm.getJlScanLbidReqLimit()), - fScanLbidReqThreshold(fRm.getJlScanLbidReqThreshold()), + fScanLbidReqLimit(fRm->getJlScanLbidReqLimit()), + fScanLbidReqThreshold(fRm->getJlScanLbidReqThreshold()), fStopSending(false), fSingleThread(false), fPhysicalIO(0), @@ -198,7 +198,7 @@ pColScanStep::pColScanStep( throw runtime_error("pColScan: BRM HWM lookup failure (4)"); sort(extents.begin(), extents.end(), BRM::ExtentSorter()); numExtents = extents.size(); - extentSize = (fRm.getExtentRows()*fColType.colWidth)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*fColType.colWidth)/BLOCK_SIZE; if (fOid>3000) { lbidList.reset(new LBIDList(fOid, 0)); @@ -982,7 +982,7 @@ pColScanStep::pColScanStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.resourceManager()) { - fNumThreads = fRm.getJlNumScanReceiveThreads(); + fNumThreads = fRm->getJlNumScanReceiveThreads(); fFilterCount = rhs.filterCount(); fFilterString = rhs.filterString(); isFilterFeeder = rhs.getFeederFlag(); @@ -993,8 +993,8 @@ pColScanStep::pColScanStep(const pColStep& rhs) : fIsDict = rhs.isDictCol(); sentCount = 0; recvCount = 0; - fScanLbidReqLimit = fRm.getJlScanLbidReqLimit(); - fScanLbidReqThreshold = fRm.getJlScanLbidReqThreshold(); + fScanLbidReqLimit = fRm->getJlScanLbidReqLimit(); + fScanLbidReqThreshold = fRm->getJlScanLbidReqThreshold(); fStopSending = false; fSingleThread = false; fPhysicalIO = 0; @@ -1022,7 +1022,7 @@ pColScanStep::pColScanStep(const pColStep& rhs) : throw runtime_error("pColScan: BRM HWM lookup failure (4)"); sort(extents.begin(), extents.end(), BRM::ExtentSorter()); numExtents = extents.size(); - extentSize = (fRm.getExtentRows()*fColType.colWidth)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*fColType.colWidth)/BLOCK_SIZE; lbidList=rhs.lbidList; //pthread_mutex_init(&mutex, NULL); //pthread_mutex_init(&dlMutex, NULL); diff --git a/dbcon/joblist/pcolstep.cpp b/dbcon/joblist/pcolstep.cpp index 31cae38be..93680cd33 100644 --- a/dbcon/joblist/pcolstep.cpp +++ b/dbcon/joblist/pcolstep.cpp @@ -122,8 +122,8 @@ pColStep::pColStep( ridCount(0), fFlushInterval(jobInfo.flushInterval), fSwallowRows(false), - fProjectBlockReqLimit(fRm.getJlProjectBlockReqLimit()), - fProjectBlockReqThreshold(fRm.getJlProjectBlockReqThreshold()), + fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()), + fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()), fStopSending(false), isFilterFeeder(false), fPhysicalIO(0), @@ -191,7 +191,7 @@ pColStep::pColStep( ridsPerBlock = BLOCK_SIZE/fColType.colWidth; /* calculate some shortcuts for extent and block based arithmetic */ - extentSize = (fRm.getExtentRows()*fColType.colWidth)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*fColType.colWidth)/BLOCK_SIZE; for (i = 1, mask = 1, modMask = 0; i <= 32; i++) { mask <<= 1; modMask = (modMask << 1) | 1; @@ -265,8 +265,8 @@ pColStep::pColStep(const pColScanStep& rhs) : // Per Cindy, it's save to put fFlushInterval to be 0 fFlushInterval(0), fSwallowRows(false), - fProjectBlockReqLimit(fRm.getJlProjectBlockReqLimit()), - fProjectBlockReqThreshold(fRm.getJlProjectBlockReqThreshold()), + fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()), + fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()), fStopSending(false), fPhysicalIO(0), fCacheIO(0), @@ -285,7 +285,7 @@ pColStep::pColStep(const pColScanStep& rhs) : ridsPerBlock = rhs.getRidsPerBlock(); /* calculate some shortcuts for extent and block based arithmetic */ - extentSize = (fRm.getExtentRows()*fColType.colWidth)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*fColType.colWidth)/BLOCK_SIZE; for (i = 1, mask = 1, modMask = 0; i <= 32; i++) { mask <<= 1; modMask = (modMask << 1) | 1; @@ -360,8 +360,8 @@ pColStep::pColStep(const PassThruStep& rhs) : // Per Cindy, it's save to put fFlushInterval to be 0 fFlushInterval(0), fSwallowRows(false), - fProjectBlockReqLimit(fRm.getJlProjectBlockReqLimit()), - fProjectBlockReqThreshold(fRm.getJlProjectBlockReqThreshold()), + fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()), + fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()), fStopSending(false), fPhysicalIO(0), fCacheIO(0), @@ -380,7 +380,7 @@ pColStep::pColStep(const PassThruStep& rhs) : ridsPerBlock = BLOCK_SIZE/fColType.colWidth; /* calculate some shortcuts for extent and block based arithmetic */ - extentSize = (fRm.getExtentRows()*fColType.colWidth)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*fColType.colWidth)/BLOCK_SIZE; for (i = 1, mask = 1, modMask = 0; i <= 32; i++) { mask <<= 1; modMask = (modMask << 1) | 1; diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index 34c958ecf..f2e09eac2 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -135,8 +135,8 @@ pDictionaryScan::pDictionaryScan( ridCount(0), ridList(0), colType(ct), - fScanLbidReqLimit(jobInfo.rm.getJlScanLbidReqLimit()), - fScanLbidReqThreshold(jobInfo.rm.getJlScanLbidReqThreshold()), + fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), + fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), fStopSending(false), fSingleThread(false), fPhysicalIO(0), @@ -169,7 +169,7 @@ pDictionaryScan::pDictionaryScan( } sort(extents.begin(), extents.end(), ExtentSorter()); numExtents = extents.size(); - extentSize = (fRm.getExtentRows()*8)/BLOCK_SIZE; + extentSize = (fRm->getExtentRows()*8)/BLOCK_SIZE; uint64_t i = 1, mask = 1; for (; i <= 32; i++) @@ -209,7 +209,7 @@ pDictionaryScan::~pDictionaryScan() //------------------------------------------------------------------------------ void pDictionaryScan::initializeConfigParms() { - fLogicalBlocksPerScan = fRm.getJlLogicalBlocksPerScan(); + fLogicalBlocksPerScan = fRm->getJlLogicalBlocksPerScan(); } void pDictionaryScan::startPrimitiveThread() diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index be4e5cee0..c1594b73a 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -253,7 +253,7 @@ public: //...to define abstract method in base class, but if start adding to other //...classes, then should consider adding pure virtual method to JobStep. uint64_t blksSkipped () const { return fNumBlksSkipped; } - ResourceManager& resourceManager() const { return fRm; } + ResourceManager* resourceManager() const { return fRm; } SP_LBIDList getlbidList() const { return lbidList;} @@ -281,7 +281,7 @@ private: uint64_t getLBID(uint64_t rid, bool& scan); uint64_t getFBO(uint64_t lbid); - ResourceManager& fRm; + ResourceManager* fRm; boost::shared_ptr sysCat; execplan::CalpontSystemCatalog::OID fOid; execplan::CalpontSystemCatalog::OID fTableOid; @@ -440,7 +440,7 @@ public: virtual execplan::CalpontSystemCatalog::OID tableOid() const { return fTableOid; } const execplan::CalpontSystemCatalog::ColType& colType() const { return fColType; } - ResourceManager& resourceManager() const { return fRm; } + ResourceManager* resourceManager() const { return fRm; } virtual uint64_t phyIOCount () const { return fPhysicalIO; } virtual uint64_t cacheIOCount () const { return fCacheIO; } @@ -483,7 +483,7 @@ private: uint64_t getFBO(uint64_t lbid); bool isEmptyVal(const uint8_t *val8) const; - ResourceManager& fRm; + ResourceManager* fRm; ColByScanRangeRequestHeader fMsgHeader; SPTHD fConsumerThread; /// number of threads on the receive side @@ -652,7 +652,7 @@ private: uint64_t fMsgBytesIn; // total byte count for incoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages uint32_t uniqueID; - ResourceManager& fRm; + ResourceManager* fRm; //@bug 3128 change ParseTree* to vector std::vector fFilters; @@ -797,7 +797,7 @@ private: uint32_t fMsgsToPm; // total number of messages sent to PMs uint32_t fMsgsExpect; // total blocks to scan uint32_t uniqueID; - ResourceManager& fRm; + ResourceManager* fRm; BPSOutputType fOutType; rowgroup::RowGroup fOutputRowGroup; uint64_t fRidResults; @@ -1108,7 +1108,7 @@ private: std::vector scanFlags; // use to keep track of which extents to eliminate from this step bool BPPIsAllocated; uint32_t uniqueID; - ResourceManager& fRm; + ResourceManager* fRm; /* HashJoin support */ @@ -1307,7 +1307,7 @@ public: bool isDictCol() const { return isDictColumn; } bool isExeMgr() const { return isEM; } const execplan::CalpontSystemCatalog::ColType& colType() const { return fColType; } - ResourceManager& resourceManager() const { return fRm; } + ResourceManager* resourceManager() const { return fRm; } void pseudoType(uint32_t p) { fPseudoType = p; } uint32_t pseudoType() const { return fPseudoType; } @@ -1338,7 +1338,7 @@ private: // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // Running with this one will swallow rows at projection. bool fSwallowRows; - ResourceManager& fRm; + ResourceManager* fRm; friend class PassThruCommandJL; friend class RTSCommandJL; friend class BatchPrimitiveStep; diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index 946920276..74889df14 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -55,6 +55,17 @@ namespace joblist { //const string ResourceManager::fBatchInsertStr("BatchInsert"); const string ResourceManager::fOrderByLimitStr("OrderByLimit"); + ResourceManager* ResourceManager::fInstance = NULL; + mutex mx; + + ResourceManager* ResourceManager::instance(bool runningInExeMgr) + { + mutex::scoped_lock lk(mx); + if (!fInstance) + fInstance = new ResourceManager(runningInExeMgr); + return fInstance; + } + ResourceManager::ResourceManager(bool runningInExeMgr) : fExeMgrStr("ExeMgr1"), fSystemConfigStr("SystemConfig"), diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index d65eef877..8d31ba1c3 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -136,6 +136,7 @@ namespace joblist * */ EXPORT ResourceManager(bool runningInExeMgr=false); + static ResourceManager* instance(bool runningInExeMgr=false); // ResourceManager(const config::Config *cf); // ResourceManager(const std::string& config); //passed by ExeMgr and DistributedEngineComm to MessageQueueServer or -Client @@ -364,7 +365,7 @@ namespace joblist /*static const*/ std::string fBatchInsertStr; static const std::string fOrderByLimitStr; config::Config* fConfig; - + static ResourceManager* fInstance; uint32_t fTraceFlags; unsigned fNumCores; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 30cca8e2e..166842595 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -163,9 +163,9 @@ void TupleBPS::initializeConfigParms() //... rids must fall below, before the producer can send more rids. //These could go in constructor - fRequestSize = fRm.getJlRequestSize(); - fMaxOutstandingRequests = fRm.getJlMaxOutstandingRequests(); - fProcessorThreadsPerScan = fRm.getJlProcessorThreadsPerScan(); + fRequestSize = fRm->getJlRequestSize(); + fMaxOutstandingRequests = fRm->getJlMaxOutstandingRequests(); + fProcessorThreadsPerScan = fRm->getJlProcessorThreadsPerScan(); fNumThreads = 0; config::Config* cf = config::Config::makeConfig(); @@ -176,7 +176,7 @@ void TupleBPS::initializeConfigParms() if (fRequestSize >= fMaxOutstandingRequests) fRequestSize = 1; if ((fSessionId & 0x80000000) == 0) - fMaxNumThreads = fRm.getJlNumScanReceiveThreads(); + fMaxNumThreads = fRm->getJlNumScanReceiveThreads(); else fMaxNumThreads = 1; @@ -896,7 +896,7 @@ bool TupleBPS::goodExtentCount() void TupleBPS::initExtentMarkers() { - numDBRoots = fRm.getDBRootCount(); + numDBRoots = fRm->getDBRootCount(); lastExtent.resize(numDBRoots); lastScannedLBID.resize(numDBRoots); diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 595cb174d..5c4bdba0b 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -197,9 +197,9 @@ TupleAggregateStep::TupleAggregateStep( fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0); // initialize multi-thread variables - fNumOfThreads = fRm.aggNumThreads(); - fNumOfBuckets = fRm.aggNumBuckets(); - fNumOfRowGroups = fRm.aggNumRowGroups(); + fNumOfThreads = fRm->aggNumThreads(); + fNumOfBuckets = fRm->aggNumBuckets(); + fNumOfRowGroups = fRm->aggNumRowGroups(); fMemUsage.reset(new uint64_t[fNumOfThreads]); memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t)); @@ -211,7 +211,7 @@ TupleAggregateStep::TupleAggregateStep( TupleAggregateStep::~TupleAggregateStep() { for (uint32_t i = 0; i < fNumOfThreads; i++) - fRm.returnMemory(fMemUsage[i], fSessionMemLimit); + fRm->returnMemory(fMemUsage[i], fSessionMemLimit); for (uint32_t i = 0; i < fAgg_mutex.size(); i++) delete fAgg_mutex[i]; } @@ -1311,7 +1311,7 @@ void TupleAggregateStep::prep1PhaseAggregate( posAgg.push_back(posAgg[i] + widthAgg[i]); RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, scaleAgg, precisionAgg, jobInfo.stringTableThreshold); - SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit)); rowgroups.push_back(aggRG); aggregators.push_back(rowAgg); @@ -2111,14 +2111,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( posAgg.push_back(posAgg[i] + widthAgg[i]); RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, scaleAgg, precisionAgg, jobInfo.stringTableThreshold); - SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit)); posAggDist.push_back(2); // rid for (uint64_t i = 0; i < oidsAggDist.size(); i++) posAggDist.push_back(posAggDist[i] + widthAggDist[i]); RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold); - SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVec2, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit)); // mapping the group_concat columns, if any. if (jobInfo.groupConcatInfo.groupConcat().size() > 0) @@ -2133,7 +2133,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (jobInfo.distinctColVec.size() > 1) { RowAggregationMultiDistinct* multiDistinctAggregator = - new RowAggregationMultiDistinct(groupByNoDist, functionVec2, &jobInfo.rm, jobInfo.umMemLimit); + new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit); rowAggDist.reset(multiDistinctAggregator); rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat()); @@ -2244,7 +2244,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // construct sub-aggregator SP_ROWAGG_UM_t subAgg( - new RowAggregationSubDistinct(groupBySub, functionSub1, &jobInfo.rm, jobInfo.umMemLimit)); + new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); // add to rowAggDist @@ -2298,7 +2298,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // construct sub-aggregator SP_ROWAGG_UM_t subAgg( - new RowAggregationUM(groupBySubNoDist, functionSub1, &jobInfo.rm, jobInfo.umMemLimit)); + new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); // add to rowAggDist @@ -2926,7 +2926,7 @@ void TupleAggregateStep::prep2PhasesAggregate( posAggUm.push_back(posAggUm[i] + widthAggUm[i]); RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold); - SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit)); rowgroups.push_back(aggRgUm); aggregators.push_back(rowAggUm); @@ -3716,21 +3716,21 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( posAggUm.push_back(posAggUm[i] + widthAggUm[i]); RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold); - SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit)); posAggDist.push_back(2); // rid for (uint64_t i = 0; i < oidsAggDist.size(); i++) posAggDist.push_back(posAggDist[i] + widthAggDist[i]); RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold); - SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit)); // if distinct key word applied to more than one aggregate column, reset rowAggDist vector subRgVec; if (jobInfo.distinctColVec.size() > 1) { RowAggregationMultiDistinct* multiDistinctAggregator = - new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit); + new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit); rowAggDist.reset(multiDistinctAggregator); // construct and add sub-aggregators to rowAggDist @@ -3840,7 +3840,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( } // construct sub-aggregator - SP_ROWAGG_UM_t subAgg(new RowAggregationSubDistinct(groupBySub, functionSub1, &jobInfo.rm, jobInfo.umMemLimit)); + SP_ROWAGG_UM_t subAgg(new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2); @@ -3892,7 +3892,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // construct sub-aggregator SP_ROWAGG_UM_t subAgg( - new RowAggregationUMP2(groupBySubNoDist, functionSub1, &jobInfo.rm, jobInfo.umMemLimit)); + new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2); @@ -4217,7 +4217,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) fRowGroupIns[threadID].setData(&rgData); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); - if (!fRm.getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) + if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) { rgDatas.clear(); // to short-cut the rest of processing abort(); @@ -4335,7 +4335,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) usleep(1000); // avoid using all CPU during busy wait } rgDatas.clear(); - fRm.returnMemory(fMemUsage[threadID], fSessionMemLimit); + fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit); fMemUsage[threadID] = 0; if (cancelled()) diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index 79ae2a5a8..c586b4646 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -168,7 +168,7 @@ private: boost::scoped_ptr fRunner; bool fUmOnly; - ResourceManager& fRm; + ResourceManager *fRm; // multi-threaded uint32_t fNumOfThreads; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 9726b5fd4..ef8bb388b 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -91,12 +91,12 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) : should stay the same for other element sizes. */ - pmMemLimit = resourceManager.getHjPmMaxMemorySmallSide(fSessionId); - uniqueLimit = resourceManager.getHjCPUniqueLimit(); + pmMemLimit = resourceManager->getHjPmMaxMemorySmallSide(fSessionId); + uniqueLimit = resourceManager->getHjCPUniqueLimit(); fExtendedInfo = "THJS: "; joinType = INIT; - joinThreadCount = resourceManager.getJlNumScanReceiveThreads(); + joinThreadCount = resourceManager->getJlNumScanReceiveThreads(); largeBPS = NULL; moreInput = true; fQtc.stepParms().stepType = StepTeleStats::T_HJS; @@ -128,7 +128,7 @@ TupleHashJoinStep::~TupleHashJoinStep() if (ownsOutputDL) delete outputDL; if (totalUMMemoryUsage != 0) - resourceManager.returnMemory(totalUMMemoryUsage, sessionMemLimit); + resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); //cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl; } @@ -245,7 +245,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index) joiner->setInUM(); } - resourceManager.getMemory(joiner->getMemUsage(), sessionMemLimit, false); + resourceManager->getMemory(joiner->getMemUsage(), sessionMemLimit, false); (void)atomicops::atomicAdd(&totalUMMemoryUsage, joiner->getMemUsage()); memUsedByEachJoin[index] += joiner->getMemUsage(); @@ -280,7 +280,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index) memUseAfter = joiner->getMemUsage() + rgDataSize; } - gotMem = resourceManager.getMemory(memUseAfter - memUseBefore, sessionMemLimit, false); + gotMem = resourceManager->getMemory(memUseAfter - memUseBefore, sessionMemLimit, false); atomicops::atomicAdd(&totalUMMemoryUsage, memUseAfter - memUseBefore); memUsedByEachJoin[index] += memUseAfter - memUseBefore; /* This is kind of kludgy and overlaps with segreateJoiners() atm. @@ -599,7 +599,7 @@ void TupleHashJoinStep::hjRunner() try { for (i = 0; !cancelled() && i < smallSideCount; i++) { vector empty; - resourceManager.returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); + resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); atomicops::atomicSub(&totalUMMemoryUsage, memUsedByEachJoin[djsJoinerMap[i]]); djs[i].loadExistingData(rgData[djsJoinerMap[i]]); rgData[djsJoinerMap[i]].swap(empty); @@ -686,7 +686,7 @@ void TupleHashJoinStep::hjRunner() joiners.clear(); tbpsJoiners.clear(); rgData.reset(); - resourceManager.returnMemory(totalUMMemoryUsage, sessionMemLimit); + resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); totalUMMemoryUsage = 0; } } @@ -836,7 +836,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream &bs) more = dl->next(it, &oneRG); joiners.clear(); rgData.reset(); - resourceManager.returnMemory(totalUMMemoryUsage, sessionMemLimit); + resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); totalUMMemoryUsage = 0; return 0; } @@ -852,7 +852,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream &bs) if (status() != 0) cout << " -- returning error status " << deliveredRG->getStatus() << endl; deliveredRG->serializeRGData(bs); - resourceManager.returnMemory(totalUMMemoryUsage, sessionMemLimit); + resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); totalUMMemoryUsage = 0; return 0; } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index f340d58e4..d56ad8441 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -244,7 +244,7 @@ private: std::vector > largeSideKeys; std::vector > smallSideKeys; - ResourceManager& resourceManager; + ResourceManager* resourceManager; volatile uint64_t totalUMMemoryUsage; struct JoinerSorter { diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 33bf0aa17..370f09d6e 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -103,7 +103,7 @@ TupleUnion::TupleUnion(CalpontSystemCatalog::OID tableOID, const JobInfo& jobInf TupleUnion::~TupleUnion() { - rm.returnMemory(memUsage, sessionMemLimit); + rm->returnMemory(memUsage, sessionMemLimit); if (!runRan && output) output->endOfInput(); } @@ -227,7 +227,7 @@ void TupleUnion::readInput(uint32_t which) memDiff += (memUsageAfter - memUsageBefore); memUsage += memDiff; } - if (!rm.getMemory(memDiff, sessionMemLimit)) { + if (!rm->getMemory(memDiff, sessionMemLimit)) { fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_UNION_TOO_BIG); if (status() == 0) // preserve existing error code { @@ -789,7 +789,7 @@ void TupleUnion::join() runners.clear(); uniquer->clear(); rowMemory.clear(); - rm.returnMemory(memUsage, sessionMemLimit); + rm->returnMemory(memUsage, sessionMemLimit); memUsage = 0; } diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 60859e3ad..3f4440d11 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -142,7 +142,7 @@ private: uint32_t rowLength; rowgroup::Row row, row2; std::vector distinctFlags; - ResourceManager& rm; + ResourceManager* rm; utils::STLPoolAllocator allocator; boost::scoped_array normalizedData; diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index 4d56b6cc8..ec37573d8 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -156,7 +156,7 @@ WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : fRm(jobInfo.rm), fSessionMemLimit(jobInfo.umMemLimit) { - fTotalThreads = fRm.windowFunctionThreads(); + fTotalThreads = fRm->windowFunctionThreads(); fExtendedInfo = "WFS: "; fQtc.stepParms().stepType = StepTeleStats::T_WFS; } @@ -165,7 +165,7 @@ WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : WindowFunctionStep::~WindowFunctionStep() { if (fMemUsage > 0) - fRm.returnMemory(fMemUsage, fSessionMemLimit); + fRm->returnMemory(fMemUsage, fSessionMemLimit); } @@ -782,7 +782,7 @@ void WindowFunctionStep::execute() { fInRowGroupData.push_back(rgData); uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition); - if (fRm.getMemory(memAdd, fSessionMemLimit) == false) + if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); fMemUsage += memAdd; @@ -917,7 +917,7 @@ void WindowFunctionStep::doFunction() while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled()) { uint64_t memAdd = fRows.size() * sizeof(RowPosition); - if (fRm.getMemory(memAdd, fSessionMemLimit) == false) + if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); fMemUsage += memAdd; fFunctions[i]->setCallback(this, i); diff --git a/dbcon/joblist/windowfunctionstep.h b/dbcon/joblist/windowfunctionstep.h index a4c105362..027b6eae3 100644 --- a/dbcon/joblist/windowfunctionstep.h +++ b/dbcon/joblist/windowfunctionstep.h @@ -208,7 +208,7 @@ private: // for resource management uint64_t fMemUsage; - ResourceManager& fRm; + ResourceManager* fRm; boost::shared_ptr fSessionMemLimit; friend class windowfunction::WindowFunction; diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index 149a09b3a..99b228fd2 100755 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -96,8 +96,8 @@ namespace { typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_; -ResourceManager rm; -bool useHdfs = rm.useHdfs(); +ResourceManager *rm = ResourceManager::instance(); +bool useHdfs = rm->useHdfs(); #include "ha_autoi.cpp" diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index 6f115689e..e92a76e09 100755 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -77,9 +77,9 @@ using namespace joblist; namespace { -ResourceManager rm; -uint64_t fBatchInsertGroupRows = rm.getRowsPerBatch(); -bool useHdfs = rm.useHdfs(); +ResourceManager *rm = ResourceManager::instance(); +uint64_t fBatchInsertGroupRows = rm->getRowsPerBatch(); +bool useHdfs = rm->useHdfs(); //convenience fcn inline uint32_t tid2sid(const uint32_t tid) diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index f86a8a73c..ecfc3e83d 100755 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -169,8 +169,8 @@ const unsigned NONSUPPORTED_ERR_THRESH = 2000; //TODO: make this session-safe (put in connMap?) vector rmParms; -ResourceManager rm; -bool useHdfs = rm.useHdfs(); +ResourceManager *rm = ResourceManager::instance(); +bool useHdfs = rm->useHdfs(); //convenience fcn inline uint32_t tid2sid(const uint32_t tid) { @@ -1859,8 +1859,8 @@ const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, algorithm::to_lower(pstr); if (pstr == PmSmallSideMaxMemory) { - joblist::ResourceManager rm; - if (rm.getHjTotalUmMaxMemorySmallSide() >= value) + joblist::ResourceManager *rm = joblist::ResourceManager::instance(); + if (rm->getHjTotalUmMaxMemorySmallSide() >= value) { rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value)); @@ -1869,7 +1869,7 @@ const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, } else { - msg = invalidParmSizeMessage(rm.getHjTotalUmMaxMemorySmallSide(), mlen); + msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen); includeInput = false; } } diff --git a/dbcon/mysql/is_columnstore_columns.cpp b/dbcon/mysql/is_columnstore_columns.cpp index ea9e80aa3..21c9e748e 100644 --- a/dbcon/mysql/is_columnstore_columns.cpp +++ b/dbcon/mysql/is_columnstore_columns.cpp @@ -65,10 +65,12 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) const std::vector< std::pair > catalog_tables = systemCatalogPtr->getTables(); + systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { - execplan::CalpontSystemCatalog::RIDList column_rid_list = systemCatalogPtr->columnRIDs((*it).second); + execplan::CalpontSystemCatalog::RIDList column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true); for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++) { execplan::CalpontSystemCatalog::TableColName tcn = systemCatalogPtr->colName(column_rid_list[col_num].objnum); diff --git a/dbcon/mysql/is_columnstore_tables.cpp b/dbcon/mysql/is_columnstore_tables.cpp index 5b3bc7fc6..47ce4970c 100644 --- a/dbcon/mysql/is_columnstore_tables.cpp +++ b/dbcon/mysql/is_columnstore_tables.cpp @@ -50,6 +50,8 @@ static int is_columnstore_tables_fill(THD *thd, TABLE_LIST *tables, COND *cond) boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); + systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + const std::vector< std::pair > catalog_tables = systemCatalogPtr->getTables(); diff --git a/ddlproc/ddlproc.cpp b/ddlproc/ddlproc.cpp index 45cf09327..c14233cc6 100644 --- a/ddlproc/ddlproc.cpp +++ b/ddlproc/ddlproc.cpp @@ -106,7 +106,7 @@ int main(int argc, char* argv[]) idbdatafile::IDBPolicy::configIDBPolicy(); #endif - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); Dec = DistributedEngineComm::instance(rm); #ifndef _MSC_VER /* set up some signal handlers */ diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index fe542e3a4..ced77ec82 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -567,7 +567,7 @@ int main(int argc, char* argv[]) catch (...) { } - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); Dec = DistributedEngineComm::instance(rm); #ifndef _MSC_VER diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 3444a84b2..b0f36305b 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -474,7 +474,7 @@ int PackageHandler::clearTableAccess() void PackageHandler::run() { - ResourceManager frm; + ResourceManager *frm = ResourceManager::instance(); dmlpackageprocessor::DMLPackageProcessor::DMLResult result; result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR; //cout << "PackageHandler handling "; @@ -890,7 +890,7 @@ void PackageHandler::run() //session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID())); fProcessor->setEngineComm(fEC); - fProcessor->setRM( &frm); + fProcessor->setRM( frm); idbassert( fTxnid != 0); result = fProcessor->processPackage(*(updatePkg.get())) ; qts.msg_type = QueryTeleStats::QT_SUMMARY; @@ -945,7 +945,7 @@ void PackageHandler::run() //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID())); fProcessor->setEngineComm(fEC); - fProcessor->setRM( &frm); + fProcessor->setRM( frm); idbassert( fTxnid != 0); result = fProcessor->processPackage(*(deletePkg.get())) ; qts.msg_type = QueryTeleStats::QT_SUMMARY; @@ -1116,7 +1116,7 @@ void PackageHandler::rollbackPending() void added_a_pm(int) { DistributedEngineComm *dec; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); dec = DistributedEngineComm::instance(rm); dec->Setup(); // MCOL-140 clear the waiting queue as all transactions are probably going to fail @@ -1185,10 +1185,10 @@ void DMLProcessor::operator()() //messageqcpp::ByteStream bs; uint8_t packageType; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* fEC = DistributedEngineComm::instance(rm); - uint64_t maxDeleteRows = rm.getDMLMaxDeleteRows(); + uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows(); fConcurrentSupport = true; string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions"); diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 002ba6aab..cb84213fd 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -136,7 +136,7 @@ ActiveStatementCounter *statementsRunningCount; DistributedEngineComm *ec; -ResourceManager rm(true); +ResourceManager *rm = ResourceManager::instance(true); int toInt(const string& val) { @@ -240,7 +240,7 @@ class SessionThread { public: - SessionThread(const IOSocket& ios, DistributedEngineComm* ec, ResourceManager& rm) : + SessionThread(const IOSocket& ios, DistributedEngineComm* ec, ResourceManager* rm) : fIos(ios), fEc(ec), fRm(rm), fStatsRetrieved(false), @@ -253,7 +253,7 @@ private: IOSocket fIos; DistributedEngineComm *fEc; - ResourceManager& fRm; + ResourceManager* fRm; querystats::QueryStats fStats; // Variables used to store return stats @@ -464,12 +464,12 @@ private: { case PMSMALLSIDEMEMORY: { - fRm.addHJPmMaxSmallSideMap(it->sessionId, it->value); + fRm->addHJPmMaxSmallSideMap(it->sessionId, it->value); break; } case UMSMALLSIDEMEMORY: { - fRm.addHJUmMaxSmallSideMap(it->sessionId, it->value); + fRm->addHJUmMaxSmallSideMap(it->sessionId, it->value); break; } default: ; @@ -1206,7 +1206,7 @@ void added_a_pm(int) void printTotalUmMemory(int sig) { - int64_t num = rm.availableMemory(); + int64_t num = rm->availableMemory(); cout << "Total UM memory available: " << num << endl; } @@ -1232,9 +1232,9 @@ void setupSignalHandlers() #endif } -void setupCwd(ResourceManager& rm) +void setupCwd(ResourceManager* rm) { - string workdir = rm.getScWorkingDir(); + string workdir = rm->getScWorkingDir(); (void)chdir(workdir.c_str()); if (access(".", W_OK) != 0) (void)chdir("/tmp"); @@ -1366,11 +1366,11 @@ int main(int argc, char* argv[]) MessageQueueServer* mqs; - statementsRunningCount = new ActiveStatementCounter(rm.getEmExecQueueSize()); + statementsRunningCount = new ActiveStatementCounter(rm->getEmExecQueueSize()); for (;;) { try { - mqs = new MessageQueueServer(ExeMgr, rm.getConfig(), ByteStream::BlockSize, 64); + mqs = new MessageQueueServer(ExeMgr, rm->getConfig(), ByteStream::BlockSize, 64); break; } catch (runtime_error& re) { @@ -1391,11 +1391,11 @@ int main(int argc, char* argv[]) } } - int serverThreads = rm.getEmServerThreads(); - int serverQueueSize = rm.getEmServerQueueSize(); - int maxPct = rm.getEmMaxPct(); - int pauseSeconds = rm.getEmSecondsBetweenMemChecks(); - int priority = rm.getEmPriority(); + int serverThreads = rm->getEmServerThreads(); + int serverQueueSize = rm->getEmServerQueueSize(); + int maxPct = rm->getEmMaxPct(); + int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); + int priority = rm->getEmPriority(); if (maxPct > 0) startRssMon(maxPct, pauseSeconds); @@ -1404,10 +1404,10 @@ int main(int argc, char* argv[]) setpriority(PRIO_PROCESS, 0, priority); #endif - string teleServerHost(rm.getConfig()->getConfig("QueryTele", "Host")); + string teleServerHost(rm->getConfig()->getConfig("QueryTele", "Host")); if (!teleServerHost.empty()) { - int teleServerPort = toInt(rm.getConfig()->getConfig("QueryTele", "Port")); + int teleServerPort = toInt(rm->getConfig()->getConfig("QueryTele", "Port")); if (teleServerPort > 0) { gTeleServerParms.host = teleServerHost; @@ -1416,8 +1416,8 @@ int main(int argc, char* argv[]) } cout << "Starting ExeMgr: st = " << serverThreads << ", sq = " << - serverQueueSize << ", qs = " << rm.getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << - rm.getConfig()->configFile() << endl; + serverQueueSize << ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << + rm->getConfig()->configFile() << endl; //set ACTIVE state { diff --git a/tools/dbbuilder/systemcatalog.cpp b/tools/dbbuilder/systemcatalog.cpp index d4640e486..5bcfbb06a 100644 --- a/tools/dbbuilder/systemcatalog.cpp +++ b/tools/dbbuilder/systemcatalog.cpp @@ -68,9 +68,9 @@ void SystemCatalog::build() uint32_t partition = 0; uint16_t segment=0; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); std::map oids; - if( rm.useHdfs() ) + if( rm->useHdfs() ) { compressionType = 2; oids[OID_SYSTABLE_TABLENAME] = OID_SYSTABLE_TABLENAME; @@ -191,7 +191,7 @@ void SystemCatalog::build() // dbRoot = 1; //SYSCOLUMN - if( rm.useHdfs() ) + if( rm->useHdfs() ) { oids[OID_SYSCOLUMN_SCHEMA] = OID_SYSCOLUMN_SCHEMA; oids[DICTOID_SYSCOLUMN_SCHEMA] = DICTOID_SYSCOLUMN_SCHEMA; diff --git a/tools/pingproc/pingproc.cpp b/tools/pingproc/pingproc.cpp index fc90c3100..f02a86186 100644 --- a/tools/pingproc/pingproc.cpp +++ b/tools/pingproc/pingproc.cpp @@ -681,7 +681,7 @@ void doColScan(OidOperation& OidOp) { if (debug) cout << "beginning doColScan\n"; BRM::LBIDRange_v lbidRanges; HWM_t hwm=0; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); struct timespec ts1; struct timespec ts2; @@ -862,7 +862,7 @@ void doColStep(OidOperation& OidOp) { OID_t tmp; uint32_t fbo; uint32_t totalBlks=0; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; @@ -976,7 +976,7 @@ void doBatchOp_scan(OidOperation &OidOp) { struct timespec ts1, ts2, diff; JobStepAssociation injs, outjs; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; boost::shared_ptr sysCat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); @@ -1086,7 +1086,7 @@ void doBatchOp_filt(OidOperation &OidOp) { struct timespec ts1, ts2, diff; JobStepAssociation injs, outjs; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; boost::shared_ptr sysCat = CalpontSystemCatalog::makeCalpontSystemCatalog(getpid()); @@ -1227,7 +1227,7 @@ void doBatchQueryOp(OperationList& OidOps) f1.fSessionid = sessionId; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); // dec->addSession(sessionId); // dec->addStep(sessionId, sessionId); @@ -1394,7 +1394,7 @@ void doBatchOp_step(OidOperation &OidOp) OID_t tmp; uint32_t fbo; uint32_t totalBlks=0; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; JobStepAssociation injs, outjs; @@ -1581,7 +1581,7 @@ void doLoopBack(const uint64_t loopcount) struct timespec diff; uint32_t sessionid = getpid(); DBRM dbrm; - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); DistributedEngineComm* dec = DistributedEngineComm::instance(rm); ThdFcn f1; diff --git a/tools/sendPlan/sendplan.cpp b/tools/sendPlan/sendplan.cpp index 2bada6e23..27d1c9bd9 100644 --- a/tools/sendPlan/sendplan.cpp +++ b/tools/sendPlan/sendplan.cpp @@ -173,7 +173,7 @@ int main(int argc, char** argv) csep.verID(sm.verID()); csep.traceFlags(0); - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); jl = JobListFactory::makeJobList(&csep, rm); csep.traceFlags(tlvl); diff --git a/utils/querystats/querystats.cpp b/utils/querystats/querystats.cpp index ef593b3fc..2494646d5 100644 --- a/utils/querystats/querystats.cpp +++ b/utils/querystats/querystats.cpp @@ -192,16 +192,16 @@ void QueryStats::unserialize(ByteStream& b) */ void QueryStats::insert() { - ResourceManager rm; + ResourceManager *rm = ResourceManager::instance(); // check if query stats is enabled in Columnstore.xml - if (!rm.queryStatsEnabled()) + if (!rm->queryStatsEnabled()) return; // get configure for every query to allow only changing of connect info string host, user, pwd; uint32_t port; - if (rm.getMysqldInfo(host, user, pwd, port) == false) + if (rm->getMysqldInfo(host, user, pwd, port) == false) throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG); diff --git a/versioning/BRM/extentmap.cpp b/versioning/BRM/extentmap.cpp index 3663e940c..b464cbeef 100644 --- a/versioning/BRM/extentmap.cpp +++ b/versioning/BRM/extentmap.cpp @@ -4205,6 +4205,8 @@ void ExtentMap::getExtents(int OID, vector& entries, grabEMEntryTable(READ); emEntries = fEMShminfo->allocdSize/sizeof(struct EMEntry); + // Pre-expand entries to stop lots of small allocs + entries.reserve(emEntries); if (incOutOfService) { for (i = 0 ; i < emEntries; i++) if ((fExtentMap[i].fileID == OID) && diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index a384d25ba..d456c909b 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -179,7 +179,7 @@ namespace WriteEngine { void WEClients::Setup() { makeBusy(true); - joblist::ResourceManager rm; + joblist::ResourceManager *rm = joblist::ResourceManager::instance(); oam::Oam oam; string ipAddress; ModuleTypeConfig moduletypeconfig; @@ -224,7 +224,7 @@ void WEClients::Setup() string fServer (buff); boost::shared_ptr - cl(new MessageQueueClient(fServer, rm.getConfig())); + cl(new MessageQueueClient(fServer, rm->getConfig())); boost::shared_ptr nl(new boost::mutex()); //Bug 5224. Take out the retrys. If connection fails, we assume the server is down. try { diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index fa3e0a876..4e8e49322 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -34,6 +34,7 @@ using namespace std; #include "we_messages.h" +#include "resourcemanager.h" #include @@ -141,7 +142,6 @@ WESDHandler::WESDHandler(WESplitterApp& Ref) : fRef(Ref), fLog(), fQId(101), // 101 - took it from air - fRm(), fOam(), fModuleTypeConfig(), fDebugLvl(0), @@ -167,6 +167,7 @@ WESDHandler::WESDHandler(WESplitterApp& Ref) : fBrmRptVec(), fpBatchLoader(0) { + fRm = joblist::ResourceManager::instance(); } //------------------------------------------------------------------------------ diff --git a/writeengine/splitter/we_sdhandler.h b/writeengine/splitter/we_sdhandler.h index a5e22c1de..2d4b20cc2 100644 --- a/writeengine/splitter/we_sdhandler.h +++ b/writeengine/splitter/we_sdhandler.h @@ -193,7 +193,7 @@ public: // for multi-table support private: unsigned int fQId; - joblist::ResourceManager fRm; + joblist::ResourceManager *fRm; oam::Oam fOam; oam::ModuleTypeConfig fModuleTypeConfig; int fDebugLvl;