diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h index 8ee47158377..62a10c600f6 100644 --- a/ndb/include/kernel/ndb_limits.h +++ b/ndb/include/kernel/ndb_limits.h @@ -68,7 +68,7 @@ * API can order a multiple of this number of records at a time since * fragments can be scanned in parallel. */ -#define MAX_PARALLEL_OP_PER_SCAN 64 +#define MAX_PARALLEL_OP_PER_SCAN 512 /* * When calculating the number of records sent from LQH in each batch * one uses SCAN_BATCH_SIZE divided by the expected size of signals @@ -83,7 +83,7 @@ * batch size from all nodes. This parameter should most likely be * configurable, or dependent on sendBufferSize. */ -#define MAX_SCAN_BATCH_SIZE 196608 +#define MAX_SCAN_BATCH_SIZE 262144 /* * Maximum number of Parallel Scan queries on one hash index fragment */ diff --git a/ndb/include/kernel/signaldata/ScanFrag.hpp b/ndb/include/kernel/signaldata/ScanFrag.hpp index ec6517b64b8..ad5c5c1850b 100644 --- a/ndb/include/kernel/signaldata/ScanFrag.hpp +++ b/ndb/include/kernel/signaldata/ScanFrag.hpp @@ -321,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){ inline Uint32 KeyInfo20::getScanOp(Uint32 scanInfo){ - return (scanInfo >> 8) & 0x1023; + return (scanInfo >> 8) & 0x3FF; } #endif diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index a34f89b2119..b5fbfb808af 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -6115,15 +6115,15 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){ switch(fragmentType){ case DictTabInfo::AllNodesSmallTable: jam(); - noOfFragments = cnoOfNodeGroups; + noOfFragments = csystemnodes; break; case DictTabInfo::AllNodesMediumTable: jam(); - noOfFragments = 2 * cnoOfNodeGroups; + noOfFragments = csystemnodes; break; case DictTabInfo::AllNodesLargeTable: jam(); - noOfFragments = 8 * cnoOfNodeGroups; + noOfFragments = csystemnodes; break; case DictTabInfo::SingleFragment: jam(); diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 8f8da4094fe..d9eb3c465bd 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -532,7 +532,8 @@ public: SCAN = 1, COPY = 2 }; - UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN]; + UintR scan_acc_op_ptr[MAX_PARALLEL_OP_PER_SCAN]; + Uint32 scan_acc_index; UintR scanApiOpPtr; UintR scanLocalref[2]; Uint32 scan_batch_len; @@ -2219,6 +2220,13 @@ private: void execTUX_ADD_ATTRREF(Signal* signal); // Statement blocks + + void init_acc_ptr_list(ScanRecord*); + bool seize_acc_ptr_list(ScanRecord*, Uint32); + void release_acc_ptr_list(ScanRecord*); + Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32); + void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32); + void removeTable(Uint32 tableId); void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId); void sendEMPTY_LCP_CONF(Signal* signal, bool idle); @@ -2248,7 +2256,6 @@ private: void sendAttrinfoSignal(Signal* signal); void sendLqhAttrinfoSignal(Signal* signal); void sendKeyinfoAcc(Signal* signal); - void initScanAccOp(Signal* signal); Uint32 initScanrec(const class ScanFragReq *); void initScanTc(Signal* signal, Uint32 transid1, diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 06dc39508c4..d36d9e703ab 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2098,7 +2098,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal) ScanRecordPtr TscanPtr; c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec); ndbout << " scanState = " << TscanPtr.p->scanState << endl; - //TscanPtr.p->scanAccOpPtr[16]; //TscanPtr.p->scanLocalref[2]; ndbout << " copyPtr="<copyPtr << " scanAccPtr="<scanAccPtr @@ -3584,7 +3583,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) takeOverErrorLab(signal); return; }//if - Uint32 accOpPtr = scanptr.p->scanAccOpPtr[ttcScanOp]; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp); if (accOpPtr == RNIL) { jam(); releaseActiveFrag(signal); @@ -6984,7 +6983,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; - initScanAccOp(signal); + init_acc_ptr_list(scanptr.p); scanptr.p->scanCompletedOperations = 0; scanptr.p->scan_batch_len= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; @@ -7029,9 +7028,8 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal) c_scanRecordPool.getPtr(scanptr); scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK; signal->theData[0] = scanptr.p->scanAccPtr; - ndbrequire((scanptr.p->scanReleaseCounter -1) < MAX_PARALLEL_OP_PER_SCAN); - signal->theData[1] = - scanptr.p->scanAccOpPtr[scanptr.p->scanReleaseCounter -1]; + signal->theData[1]= + get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1); signal->theData[2] = NextScanReq::ZSCAN_COMMIT; if (! scanptr.p->rangeScan) sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); @@ -7220,6 +7218,43 @@ void Dblqh::scanLockReleasedLab(Signal* signal) }//if }//Dblqh::scanLockReleasedLab() +bool +Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size) +{ + scanP->scan_acc_index = 0; + return true; +} + +void +Dblqh::release_acc_ptr_list(ScanRecord* scanP) +{ + scanP->scan_acc_index = 0; +} + +void +Dblqh::init_acc_ptr_list(ScanRecord* scanP) +{ + scanP->scan_acc_index = 0; +} + +Uint32 +Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index) +{ + ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) && + index < scanP->scan_acc_index); + return scanP->scan_acc_op_ptr[index]; +} + +void +Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP, + Uint32 index, Uint32 acc) +{ + ndbrequire((index == 0 || scanP->scan_acc_index == index) && + (index < MAX_PARALLEL_OP_PER_SCAN)); + scanP->scan_acc_index= index + 1; + scanP->scan_acc_op_ptr[index]= acc; +} + /* ------------------------------------------------------------------------- * SCAN_FRAGREQ: Request to start scanning the specified fragment of a table. * ------------------------------------------------------------------------- */ @@ -7615,7 +7650,7 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN; - initScanAccOp(signal); + init_acc_ptr_list(scanptr.p); signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1] = RNIL; signal->theData[2] = NextScanReq::ZSCAN_NEXT; @@ -7784,9 +7819,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal) return; }//if - ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); - scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations] = - nextScanConf->accOperationPtr; + set_acc_ptr_in_scan_record(scanptr.p, + scanptr.p->scanCompletedOperations, + nextScanConf->accOperationPtr); scanptr.p->scanLocalref[0] = nextScanConf->localKey[0]; scanptr.p->scanLocalref[1] = nextScanConf->localKey[1]; scanptr.p->scanLocalFragid = nextScanConf->fragId; @@ -7870,7 +7905,6 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0]; tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1]; tupKeyReq->attrBufLen = 0; - ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); tupKeyReq->opRef = scanptr.p->scanApiOpPtr; tupKeyReq->applRef = scanptr.p->scanApiBlockref; tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion; @@ -7886,7 +7920,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength); } -}//Dblqh::nextScanConfLoopLab() +} /* ------------------------------------------------------------------------- * RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES. @@ -8024,10 +8058,12 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal) if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_ABORT) { jam(); scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; - accOpPtr = scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations]; + accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, + scanptr.p->scanCompletedOperations); } else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) { jam(); - accOpPtr = scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations - 1]; + accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, + scanptr.p->scanCompletedOperations); } else { jam(); accOpPtr = RNIL; // The value is not used in ACC @@ -8219,6 +8255,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) scanptr.p->scan_batch_len= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if + release_acc_ptr_list(scanptr.p); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->tcScanRec = RNIL; @@ -8227,20 +8264,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) releaseTcrec(signal, tcConnectptr); }//Dblqh::tupScanCloseConfLab() -/* ========================================================================= - * ======= INITIATE SCAN_ACC_OP_PTR TO RNIL IN SCAN RECORD ======= - * - * SUBROUTINE SHORT NAME = ISA - * ========================================================================= */ -void Dblqh::initScanAccOp(Signal* signal) -{ - UintR tisaIndex; - - for (tisaIndex = 0; tisaIndex < MAX_PARALLEL_OP_PER_SCAN; tisaIndex++) { - scanptr.p->scanAccOpPtr[tisaIndex] = RNIL; - }//for -}//Dblqh::initScanAccOp() - /* ========================================================================= * ======= INITIATE SCAN RECORD ======= * @@ -8283,10 +8306,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanTcWaiting = ZTRUE; scanptr.p->scanNumber = ~0; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; - for (Uint32 i = 0; i < scanConcurrentOperations; i++) { - scanptr.p->scanAccOpPtr[i] = 0; - }//for + if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) { + jam(); + return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR; + } /** * Used for scan take over */ @@ -8341,38 +8365,9 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) #endif c_scanTakeOverHash.add(scanptr); } + init_acc_ptr_list(scanptr.p); return ZOK; - -#if 0 - if (! rangeScan) { - jam(); - for (Int32 i = NR_ScanNo - 1; i >= 0; i--) { - jam(); - if (fragptr.p->fragScanRec[i] == ZNIL) { - jam(); - scanptr.p->scanNumber = i; - fragptr.p->fragScanRec[i] = scanptr.i; - return ZOK; - }//if - }//for - } else { - jam(); - // put in second half of fragScanRec of primary table fragment - FragrecordPtr tFragPtr; - tFragPtr.i = fragptr.p->tableFragptr; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - for (Uint32 i = NR_MinRangeScanNo; i < NR_MaxRangeScanNo; i++) { - if (tFragPtr.p->fragScanRec[i] == ZNIL) { - jam(); - scanptr.p->scanNumber = i; - tFragPtr.p->fragScanRec[i] = scanptr.i; - return ZOK; - } - } - } - return ZNO_FREE_FRAG_SCAN_REC_ERROR; -#endif -}//Dblqh::initScanrec() +} /* ========================================================================= * ======= INITIATE TC RECORD AT SCAN ======= @@ -8943,7 +8938,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal) return; } - scanptr.p->scanAccOpPtr[0] = nextScanConf->accOperationPtr; + set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr); initCopyTc(signal); if (tcConnectptr.p->primKeyLen > 4) { jam(); @@ -9217,8 +9212,9 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); tcConnectptr.p->errorCode = 0; + Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0); signal->theData[0] = scanptr.p->scanAccPtr; - signal->theData[1] = scanptr.p->scanAccOpPtr[0]; + signal->theData[1] = acc_op_ptr; signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT; sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); return; diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index b0de535b48b..1ffec0dce51 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -927,17 +927,22 @@ public: UintR distributionGroup; UintR nextCacheRec; UintR distributionKeySize; - Uint16 scanNode; - unsigned scanTakeOverInd : 1; - unsigned scanInfo : 15; // 12 bits used currently + Uint32 scanInfo; //--------------------------------------------------- - // Third and fourth 16 byte cache line in second 64 - // byte cache line. Not used currently. + // Third 16 byte cache line in second 64 + // byte cache line. Diverse use. //--------------------------------------------------- + Uint32 scanNode; + Uint32 scanTakeOverInd; UintR firstKeybuf; /* POINTER THE LINKED LIST OF KEY BUFFERS */ UintR lastKeybuf; /* VARIABLE POINTING TO THE LAST KEY BUFFER */ - UintR packedCacheVar[6]; + + //--------------------------------------------------- + // Fourth 16 byte cache line in second 64 + // byte cache line. Not used currently. + //--------------------------------------------------- + UintR packedCacheVar[4]; }; typedef Ptr CacheRecordPtr; diff --git a/ndb/test/ndbapi/testScanPerf.cpp b/ndb/test/ndbapi/testScanPerf.cpp index 61af1ffb989..bfd5949da47 100644 --- a/ndb/test/ndbapi/testScanPerf.cpp +++ b/ndb/test/ndbapi/testScanPerf.cpp @@ -196,7 +196,6 @@ int clear_table(){ if(!g_paramters[P_LOAD].value) return 0; - int rows = g_paramters[P_ROWS].value; UtilTransactions utilTrans(* g_table); @@ -215,8 +214,8 @@ void err(NdbError e){ int run_scan(){ int iter = g_paramters[P_LOOPS].value; - Uint64 start1; - Uint64 sum1 = 0; + NDB_TICKS start1, stop; + int sum_time= 0; Uint32 tot = g_paramters[P_ROWS].value; @@ -357,12 +356,15 @@ run_scan(){ pTrans->close(); - Uint64 stop = NdbTick_CurrentMillisecond(); - start1 = (stop - start1); - sum1 += start1; + stop = NdbTick_CurrentMillisecond(); + int time_passed= (int)(stop - start1); + g_err.println("Time: %d ms = %u rows/sec", time_passed, + (1000*tot)/time_passed); + sum_time+= time_passed; } - sum1 /= iter; + sum_time= sum_time / iter; - g_err.println("Avg time: %Ldms = %d rows/sec", sum1, (1000*tot)/sum1); + g_err.println("Avg time: %d ms = %u rows/sec", sum_time, + (1000*tot)/sum_time); return 0; }