diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h index f35cc617e86..8ee47158377 100644 --- a/ndb/include/kernel/ndb_limits.h +++ b/ndb/include/kernel/ndb_limits.h @@ -68,7 +68,22 @@ * API can order a multiple of this number of records at a time since * fragments can be scanned in parallel. */ -#define MAX_PARALLEL_OP_PER_SCAN 16 +#define MAX_PARALLEL_OP_PER_SCAN 64 +/* +* When calculating the number of records sent from LQH in each batch +* one uses SCAN_BATCH_SIZE divided by the expected size of signals +* per row. This gives the batch size used for the scan. The NDB API +* will receive one batch from each node at a time so there has to be +* some care taken also so that the NDB API is not overloaded with +* signals. +*/ +#define SCAN_BATCH_SIZE 32768 +/* +* To protect the NDB API from overload we also define a maximum total +* batch size from all nodes. This parameter should most likely be +* configurable, or dependent on sendBufferSize. +*/ +#define MAX_SCAN_BATCH_SIZE 196608 /* * Maximum number of Parallel Scan queries on one hash index fragment */ diff --git a/ndb/include/kernel/signaldata/ScanFrag.hpp b/ndb/include/kernel/signaldata/ScanFrag.hpp index 2b37e544e1f..ec6517b64b8 100644 --- a/ndb/include/kernel/signaldata/ScanFrag.hpp +++ b/ndb/include/kernel/signaldata/ScanFrag.hpp @@ -33,7 +33,7 @@ class ScanFragReq { */ friend class Dblqh; public: - STATIC_CONST( SignalLength = 25 ); + STATIC_CONST( SignalLength = 13 ); public: Uint32 senderData; @@ -45,9 +45,11 @@ public: Uint32 schemaVersion; Uint32 transId1; Uint32 transId2; - Uint32 clientOpPtr[MAX_PARALLEL_OP_PER_SCAN]; + Uint32 clientOpPtr; + Uint32 concurrency; + Uint32 batch_byte_size; + Uint32 first_batch_size; - static Uint32 getConcurrency(const Uint32 & requestInfo); static Uint32 getLockMode(const Uint32 & requestInfo); static Uint32 getHoldLockFlag(const Uint32 & requestInfo); static Uint32 getKeyinfoFlag(const Uint32 & requestInfo); @@ -56,7 +58,6 @@ public: static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo); - static void setConcurrency(Uint32 & requestInfo, Uint32 concurrency); static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); @@ -79,7 +80,6 @@ class KeyInfo20 { friend class NdbOperation; friend class NdbScanReceiver; public: - //STATIC_CONST( SignalLength = 21 ); STATIC_CONST( HeaderLength = 5); STATIC_CONST( DataLength = 20 ); @@ -110,15 +110,15 @@ class ScanFragConf { friend class Backup; friend class Suma; public: - STATIC_CONST( SignalLength = 21 ); + STATIC_CONST( SignalLength = 6 ); public: Uint32 senderData; Uint32 completedOps; Uint32 fragmentCompleted; - Uint32 opReturnDataLen[16]; Uint32 transId1; Uint32 transId2; + Uint32 total_len; }; class ScanFragRef { @@ -188,7 +188,6 @@ public: * Request Info * * a = Length of attrinfo - 16 Bits (16-31) - * c = Concurrency - 5 Bits (0-4) -> Max 31 * l = Lock Mode - 1 Bit 5 * h = Hold lock - 1 Bit 7 * k = Keyinfo - 1 Bit 8 @@ -198,11 +197,8 @@ public: * * 1111111111222222222233 * 01234567890123456789012345678901 - * ccccclxhkr ppppaaaaaaaaaaaaaaaa + * lxhkr ppppaaaaaaaaaaaaaaaa */ -#define SF_CONCURRENCY_SHIFT (0) -#define SF_CONCURRENCY_MASK (31) - #define SF_LOCK_MODE_SHIFT (5) #define SF_LOCK_MODE_MASK (1) @@ -217,12 +213,6 @@ public: #define SF_PRIO_SHIFT 12 #define SF_PRIO_MASK 15 -inline -Uint32 -ScanFragReq::getConcurrency(const Uint32 & requestInfo){ - return (requestInfo >> SF_CONCURRENCY_SHIFT) & SF_CONCURRENCY_MASK; -} - inline Uint32 ScanFragReq::getLockMode(const Uint32 & requestInfo){ @@ -272,13 +262,6 @@ ScanFragReq::setScanPrio(UintR & requestInfo, UintR val){ requestInfo |= (val << SF_PRIO_SHIFT); } -inline -void -ScanFragReq::setConcurrency(UintR & requestInfo, UintR val){ - ASSERT_MAX(val, SF_CONCURRENCY_MASK, "ScanFragReq::setConcurrency"); - requestInfo |= (val << SF_CONCURRENCY_SHIFT); -} - inline void ScanFragReq::setLockMode(UintR & requestInfo, UintR val){ @@ -324,7 +307,7 @@ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){ inline Uint32 KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){ - ASSERT_MAX(opNo, 15, "KeyInfo20::setScanInfo"); + ASSERT_MAX(opNo, 1023, "KeyInfo20::setScanInfo"); ASSERT_MAX(scanNo, 255, "KeyInfo20::setScanInfo"); return (opNo << 8) + scanNo; } @@ -338,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){ inline Uint32 KeyInfo20::getScanOp(Uint32 scanInfo){ - return (scanInfo >> 8) & 0xF; + return (scanInfo >> 8) & 0x1023; } #endif diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp index ab2978e48da..6fca313c9bd 100644 --- a/ndb/include/kernel/signaldata/ScanTab.hpp +++ b/ndb/include/kernel/signaldata/ScanTab.hpp @@ -45,7 +45,7 @@ public: /** * Length of signal */ - STATIC_CONST( SignalLength = 25 ); + STATIC_CONST( StaticLength = 11 ); private: @@ -63,7 +63,8 @@ private: UintR transId1; // DATA 6 UintR transId2; // DATA 7 UintR buddyConPtr; // DATA 8 - UintR apiOperationPtr[16]; // DATA 9-25 + UintR batch_byte_size; // DATA 9 + UintR first_batch_size; // DATA 10 /** * Get:ers for requestInfo @@ -95,11 +96,11 @@ private: h = Hold lock mode - 1 Bit 10 c = Read Committed - 1 Bit 11 x = Range Scan (TUX) - 1 Bit 15 - b = Scan batch - 5 Bit 16-19 (max 15) + b = Scan batch - 10 Bit 16-25 (max 1023) 1111111111222222222233 01234567890123456789012345678901 - ppppppppl hc xbbbbb + ppppppppl hc xbbbbbbbbbb */ #define PARALLELL_SHIFT (0) @@ -118,7 +119,7 @@ private: #define RANGE_SCAN_MASK (1) #define SCAN_BATCH_SHIFT (16) -#define SCAN_BATCH_MASK (31) +#define SCAN_BATCH_MASK (1023) inline Uint8 @@ -201,6 +202,7 @@ inline void ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch"); + requestInfo &= ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT); requestInfo |= (flag << SCAN_BATCH_SHIFT); } @@ -250,8 +252,8 @@ private: Uint32 info; }; - static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 5; }; - static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 31;} + static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 10; }; + static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 1023;} }; /** diff --git a/ndb/include/kernel/signaldata/TcKeyReq.hpp b/ndb/include/kernel/signaldata/TcKeyReq.hpp index f7d3c2e3282..ba0f1a5ed8c 100644 --- a/ndb/include/kernel/signaldata/TcKeyReq.hpp +++ b/ndb/include/kernel/signaldata/TcKeyReq.hpp @@ -228,21 +228,21 @@ private: * Scan Info * t = Scan take over indicator - 1 Bit - n = Take over node - 16 Bits -> max 65535 - p = Scan Info - 12 Bits -> max 4095 + n = Take over node - 12 Bits -> max 65535 + p = Scan Info - 18 Bits -> max 4095 1111111111222222222233 01234567890123456789012345678901 - tpppppppppppp nnnnnnnnnnnnnnnn + tpppppppppppppppppp nnnnnnnnnnnn */ #define TAKE_OVER_SHIFT (0) -#define TAKE_OVER_NODE_SHIFT (16) -#define TAKE_OVER_NODE_MASK (65535) +#define TAKE_OVER_NODE_SHIFT (20) +#define TAKE_OVER_NODE_MASK (4095) #define SCAN_INFO_SHIFT (1) -#define SCAN_INFO_MASK (4095) +#define SCAN_INFO_MASK (262143) /** * Attr Len diff --git a/ndb/include/ndbapi/NdbReceiver.hpp b/ndb/include/ndbapi/NdbReceiver.hpp index 13898fc8e5f..b7f73bb618d 100644 --- a/ndb/include/ndbapi/NdbReceiver.hpp +++ b/ndb/include/ndbapi/NdbReceiver.hpp @@ -75,6 +75,7 @@ private: class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr); void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size); void prepareSend(); + void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&); int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len); int execTRANSID_AI(const Uint32* ptr, Uint32 len); diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index c7ae029e742..638ca39409a 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -122,7 +122,6 @@ protected: NdbConnection *m_transConnection; // Scan related variables - Uint32 theBatchSize; Uint32 theParallelism; Uint32 m_keyInfo; NdbApiSignal* theSCAN_TABREQ; diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 4b057171963..1dc40da8d2d 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -27,7 +27,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv const UintR requestInfo = sig->requestInfo; - fprintf(output, " apiConnectPtr: H\'%.8x\n", + fprintf(output, " apiConnectPtr: H\'%.8x", sig->apiConnectPtr); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n", @@ -42,23 +42,8 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n", sig->transId1, sig->transId2, sig->storedProcId); - - fprintf(output, " OperationPtr(s):\n "); - Uint32 restLen = (len - 9); - const Uint32 * rest = &sig->apiOperationPtr[0]; - while(restLen >= 7){ - fprintf(output, - " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n", - rest[0], rest[1], rest[2], rest[3], - rest[4], rest[5], rest[6]); - restLen -= 7; - rest += 7; - } - if(restLen > 0){ - for(Uint32 i = 0; ibatch_byte_size, sig->first_batch_size); return false; } diff --git a/ndb/src/kernel/blocks/backup/Backup.cpp b/ndb/src/kernel/blocks/backup/Backup.cpp index 52a543dbcdc..d0337b32bc1 100644 --- a/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/ndb/src/kernel/blocks/backup/Backup.cpp @@ -3324,20 +3324,16 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) req->requestInfo = 0; req->savePointId = 0; req->tableId = table.tableId; - ScanFragReq::setConcurrency(req->requestInfo, parallelism); + //ScanFragReq::setConcurrency(req->requestInfo, parallelism); ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 1); ScanFragReq::setAttrLen(req->requestInfo,attrLen); req->transId1 = 0; req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); - - Uint32 i; - for(i = 0; iclientOpPtr[i] = filePtr.i; - }//for - sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB); + req->clientOpPtr= filePtr.i; + sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, + ScanFragReq::SignalLength, JBB); signal->theData[0] = filePtr.i; signal->theData[1] = 0; @@ -3351,6 +3347,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) signal->theData[7] = 0; Uint32 dataPos = 8; + Uint32 i; for(i = 0; ioperation; - op.scanConf(conf->completedOps, conf->opReturnDataLen); + //op.scanConf(conf->completedOps, conf->opReturnDataLen); const Uint32 completed = conf->fragmentCompleted; if(completed != 2) { diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index e0994955818..bdda05f35af 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -533,9 +533,11 @@ public: COPY = 2 }; UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN]; - UintR scanApiOpPtr[MAX_PARALLEL_OP_PER_SCAN]; - UintR scanOpLength[MAX_PARALLEL_OP_PER_SCAN]; + UintR scanApiOpPtr; UintR scanLocalref[2]; + Uint32 scan_batch_len; + Uint32 first_batch_size; + Uint32 batch_byte_size; UintR copyPtr; union { Uint32 nextPool; diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 28a8a3c0065..8a0c8aa43d9 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -890,7 +890,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal) &ctcConnectrecFileSize)); clogFileFileSize = 4 * cnoLogFiles; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize)); - cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG; + cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN; initRecords(); initialiseRecordsLab(signal, 0, ref, senderData); @@ -2099,8 +2099,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal) c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec); ndbout << " scanState = " << TscanPtr.p->scanState << endl; //TscanPtr.p->scanAccOpPtr[16]; - //TscanPtr.p->scanApiOpPtr[16]; - //TscanPtr.p->scanOpLength[16]; //TscanPtr.p->scanLocalref[2]; ndbout << " copyPtr="<copyPtr << " scanAccPtr="<scanAccPtr @@ -6988,6 +6986,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) initScanAccOp(signal); scanptr.p->scanCompletedOperations = 0; + scanptr.p->scan_batch_len= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() @@ -7142,6 +7141,7 @@ void Dblqh::closeScanRequestLab(Signal* signal) }//if tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE; scanptr.p->scanCompletedOperations = 0; + scanptr.p->scan_batch_len= 0; sendScanFragConf(signal, ZTRUE); break; case TcConnectionrec::SCAN_TUPKEY: @@ -7225,7 +7225,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal) * ------------------------------------------------------------------------- */ void Dblqh::execSCAN_FRAGREQ(Signal* signal) { - const ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0]; + ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0]; ScanFragRef * ref; const Uint32 transid1 = scanFragReq->transId1; const Uint32 transid2 = scanFragReq->transId2; @@ -7238,7 +7238,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) const Uint32 reqinfo = scanFragReq->requestInfo; const Uint32 fragId = scanFragReq->fragmentNo; tabptr.i = scanFragReq->tableId; - const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo); + const Uint32 scanConcurrentOperations = scanFragReq->concurrency; const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo); const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo); const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo); @@ -7256,9 +7256,9 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) tcConnectptr.p->savePointId = scanFragReq->savePointId; } else { jam(); - /* --------------------------------------------------------------------- - * NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST. - * --------------------------------------------------------------------- */ + /* -------------------------------------------------------------------- + * NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST. + * -------------------------------------------------------------------- */ errorCode = ZNO_TC_CONNECT_ERROR; senderData = scanFragReq->senderData; goto error_handler_early; @@ -7871,8 +7871,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1]; tupKeyReq->attrBufLen = 0; ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); - tupKeyReq->opRef = - scanptr.p->scanApiOpPtr[scanptr.p->scanCompletedOperations]; + tupKeyReq->opRef = scanptr.p->scanApiOpPtr; tupKeyReq->applRef = scanptr.p->scanApiBlockref; tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion; tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId; @@ -7963,7 +7962,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell }//if ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); - scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4; + scanptr.p->scan_batch_len+= tdata4; scanptr.p->scanCompletedOperations++; if ((scanptr.p->scanCompletedOperations == scanptr.p->scanConcurrentOperations) && @@ -8217,6 +8216,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) } else { jam(); scanptr.p->scanCompletedOperations = 0; + scanptr.p->scan_batch_len= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); @@ -8249,7 +8249,7 @@ void Dblqh::initScanAccOp(Signal* signal) Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) { const Uint32 reqinfo = scanFragReq->requestInfo; - const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo); + const Uint32 scanConcurrentOperations = scanFragReq->concurrency; const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo); const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo); const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo); @@ -8267,7 +8267,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion; scanptr.p->scanCompletedOperations = 0; + scanptr.p->scan_batch_len= 0; scanptr.p->scanConcurrentOperations = scanConcurrentOperations; + scanptr.p->batch_byte_size= scanFragReq->batch_byte_size; + scanptr.p->first_batch_size= scanFragReq->first_batch_size; scanptr.p->scanErrorCounter = 0; scanptr.p->scanLockMode = scanLockMode; scanptr.p->readCommitted = readCommitted; @@ -8279,11 +8282,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanLocalFragid = 0; scanptr.p->scanTcWaiting = ZTRUE; scanptr.p->scanNumber = ~0; - + scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; for (Uint32 i = 0; i < scanConcurrentOperations; i++) { - jam(); - scanptr.p->scanApiOpPtr[i] = scanFragReq->clientOpPtr[i]; - scanptr.p->scanOpLength[i] = 0; scanptr.p->scanAccOpPtr[i] = 0; }//for @@ -8547,11 +8547,11 @@ void Dblqh::sendKeyinfo20(Signal* signal, TdataBuf.i = TdataBuf.p->nextDatabuf; } - keyInfo->clientOpPtr = scanP->scanApiOpPtr[scanOp]; + keyInfo->clientOpPtr = scanP->scanApiOpPtr; keyInfo->keyLen = keyLen; keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp, scanP->scanNumber)+ - (getOwnNodeId() << 16); + (getOwnNodeId() << 20); keyInfo->transId1 = tcConP->transid[0]; keyInfo->transId2 = tcConP->transid[1]; @@ -8632,23 +8632,27 @@ void Dblqh::sendKeyinfo20(Signal* signal, * ------------------------------------------------------------------------ */ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) { + Uint32 completed_ops= scanptr.p->scanCompletedOperations; + Uint32 total_len= scanptr.p->scan_batch_len; + scanptr.p->scanTcWaiting = ZFALSE; + if(ERROR_INSERTED(5037)){ CLEAR_ERROR_INSERT_VALUE; return; } - - scanptr.p->scanTcWaiting = ZFALSE; ScanFragConf * conf = (ScanFragConf*)&signal->theData[0]; + NodeId tc_node_id= refToNode(tcConnectptr.p->clientBlockref); + Uint32 trans_id1= tcConnectptr.p->transid[0]; + Uint32 trans_id2= tcConnectptr.p->transid[1]; conf->senderData = tcConnectptr.p->clientConnectrec; - conf->completedOps = scanptr.p->scanCompletedOperations; + conf->completedOps = completed_ops; conf->fragmentCompleted = scanCompleted; - for(Uint32 i = 0; iopReturnDataLen[i] = scanptr.p->scanOpLength[i]; - conf->transId1 = tcConnectptr.p->transid[0]; - conf->transId2 = tcConnectptr.p->transid[1]; + conf->transId1 = trans_id1; + conf->transId2 = trans_id2; + conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, - signal, ScanFragConf::SignalLength, JBB); + signal, ScanFragConf::SignalLength, JBB); }//Dblqh::sendScanFragConf() /* ######################################################################### */ diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index 095ba9b0bbe..b0de535b48b 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1184,7 +1184,11 @@ public: Uint32 scanTableref; // Number of operation records per scanned fragment + // Number of operations in first batch + // Max number of bytes per batch Uint16 noOprecPerFrag; + Uint16 first_batch_size; + Uint32 batch_byte_size; // Shall the locks be held until the application have read the // records @@ -1417,17 +1421,13 @@ private: UintR anApiConnectPtr); void handleScanStop(Signal* signal, UintR aFailedNode); void initScanTcrec(Signal* signal); - void initScanApirec(Signal* signal, - Uint32 buddyPtr, - UintR transid1, - UintR transid2); - void initScanrec(ScanRecordPtr, const class ScanTabReq*, + void initScanrec(ScanRecordPtr, const class ScanTabReq*, const UintR scanParallel, const UintR noOprecPerFrag); void initScanfragrec(Signal* signal); void releaseScanResources(ScanRecordPtr); ScanRecordPtr seizeScanrec(Signal* signal); - void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*); + void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*); void sendScanTabConf(Signal* signal, ScanRecord*); void close_scan_req(Signal*, ScanRecordPtr, bool received_req); void close_scan_req_send_conf(Signal*, ScanRecordPtr); diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index ed467db1c6c..a7697a81f4d 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -2630,8 +2630,9 @@ void Dbtc::execTCKEYREQ(Signal* signal) { Uint32 TDistrGHIndex = tcKeyReq->getScanIndFlag(Treqinfo); Uint32 TDistrKeyIndex = TDistrGHIndex + TDistrGroupFlag; - Uint32 TscanNode = tcKeyReq->getTakeOverScanNode(TOptionalDataPtr[0]); - Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]); + + Uint32 TscanNode = tcKeyReq->getTakeOverScanNode(TOptionalDataPtr[0]); + Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]); regCachePtr->scanTakeOverInd = TDistrGHIndex; regCachePtr->scanNode = TscanNode; @@ -8405,11 +8406,11 @@ void Dbtc::systemErrorLab(Signal* signal) void Dbtc::execSCAN_TABREQ(Signal* signal) { const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0]; - const UintR reqinfo = scanTabReq->requestInfo; + const Uint32 reqinfo = scanTabReq->requestInfo; const Uint32 aiLength = scanTabReq->attrLen; const Uint32 schemaVersion = scanTabReq->tableSchemaVersion; - const UintR transid1 = scanTabReq->transId1; - const UintR transid2 = scanTabReq->transId2; + const Uint32 transid1 = scanTabReq->transId1; + const Uint32 transid2 = scanTabReq->transId2; const Uint32 tmpXX = scanTabReq->buddyConPtr; const Uint32 buddyPtr = (tmpXX == 0xFFFFFFFF ? RNIL : tmpXX); Uint32 currSavePointId = 0; @@ -8420,17 +8421,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) Uint32 errCode; ScanRecordPtr scanptr; - if(noOprecPerFrag == 0){ - jam(); - scanParallel = (scanConcurrency + 15) / 16; - noOprecPerFrag = (scanConcurrency >= 16 ? 16 : scanConcurrency & 15); - } + jamEntry(); + + SegmentedSectionPtr api_op_ptr; + signal->getSection(api_op_ptr, 0); + copy(&cdata[0], api_op_ptr); + releaseSections(signal); - jamEntry(); apiConnectptr.i = scanTabReq->apiConnectPtr; tabptr.i = scanTabReq->tableId; - for(int i=0; i<16; i++) - cdata[i] = scanTabReq->apiOperationPtr[i]; if (apiConnectptr.i >= capiConnectFilesize || tabptr.i >= ctabrecFilesize) { @@ -8440,7 +8439,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) }//if ptrAss(apiConnectptr, apiConnectRecord); ApiConnectRecord * transP = apiConnectptr.p; - if (transP->apiConnectstate != CS_CONNECTED) { jam(); // could be left over from TCKEYREQ rollback @@ -8454,50 +8452,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) } } ptrAss(tabptr, tableRecord); - - if (aiLength == 0) { - jam() - errCode = ZSCAN_AI_LEN_ERROR; - goto SCAN_TAB_error; - }//if - if (!tabptr.p->checkTable(schemaVersion)){ - jam(); - goto SCAN_TAB_schema_error; - }//if - /***************************************************************** - * THE CONCURRENCY LEVEL SPECIFIED BY THE APPLICATION. IT MUST BE - * BETWEEN 1 AND 240. IF IT IS 16 OR GREATER IT MUST BE A MULTIPLE - * OF 16. CONCURRENCY LEVELS UPTO 16 ONLY SCAN ONE FRAGMENT AT A - * TIME. IF WE SPECIFY 32 IT WILL SCAN TWO FRAGMENTS AT A TIME AND - * SO FORTH. MAXIMUM 15 PARALLEL SCANS ARE ALLOWED - ******************************************************************/ - if (scanConcurrency == 0) { - jam(); - errCode = ZNO_CONCURRENCY_ERROR; - goto SCAN_TAB_error; - }//if - - /********************************************************** - * CALCULATE THE NUMBER OF SCAN_TABINFO SIGNALS THAT WILL - * ARRIVE TO DEFINE THIS SCAN. THIS ALSO DEFINES THE NUMBER - * OF PARALLEL SCANS AND IT ALSO DEFINES THE NUMBER OF SCAN - * OPERATION POINTER RECORDS TO ALLOCATE. - **********************************************************/ - if (cfirstfreeTcConnect == RNIL) { - jam(); - errCode = ZNO_FREE_TC_CONNECTION; - goto SCAN_TAB_error; - }//if - - if (cfirstfreeScanrec == RNIL) { - jam(); - errCode = ZNO_SCANREC_ERROR; - goto SCAN_TAB_error; - }//if - + if ((aiLength == 0) || + (!tabptr.p->checkTable(schemaVersion)) || + (scanConcurrency == 0) || + (cfirstfreeTcConnect == RNIL) || + (cfirstfreeScanrec == RNIL)) { + goto SCAN_error_check; + } if (buddyPtr != RNIL) { jam(); - ApiConnectRecordPtr buddyApiPtr; buddyApiPtr.i = buddyPtr; ptrCheckGuard(buddyApiPtr, capiConnectFilesize, apiConnectRecord); @@ -8527,7 +8490,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag); - //initScanApirec(signal, buddyPtr, transid1, transid2); transP->apiScanRec = scanptr.i; transP->returncode = 0; transP->transid[0] = transid1; @@ -8553,10 +8515,32 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) scanptr.p->scanState = ScanRecord::WAIT_AI; return; - SCAN_TAB_schema_error: + SCAN_error_check: + if (aiLength == 0) { + jam() + errCode = ZSCAN_AI_LEN_ERROR; + goto SCAN_TAB_error; + }//if + if (!tabptr.p->checkTable(schemaVersion)){ + jam(); + errCode = tabptr.p->getErrorCode(schemaVersion); + goto SCAN_TAB_error; + }//if + if (scanConcurrency == 0) { + jam(); + errCode = ZNO_CONCURRENCY_ERROR; + goto SCAN_TAB_error; + }//if + if (cfirstfreeTcConnect == RNIL) { + jam(); + errCode = ZNO_FREE_TC_CONNECTION; + goto SCAN_TAB_error; + }//if + ndbrequire(cfirstfreeScanrec == RNIL); jam(); - errCode = tabptr.p->getErrorCode(schemaVersion); - + errCode = ZNO_SCANREC_ERROR; + goto SCAN_TAB_error; + SCAN_TAB_error: jam(); ScanTabRef * ref = (ScanTabRef*)&signal->theData[0]; @@ -8567,23 +8551,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) ref->closeNeeded = 0; sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF, signal, ScanTabRef::SignalLength, JBB); - return; }//Dbtc::execSCAN_TABREQ() - -void Dbtc::initScanApirec(Signal* signal, - Uint32 buddyPtr, UintR transid1, UintR transid2) -{ -}//Dbtc::initScanApirec() - void Dbtc::initScanrec(ScanRecordPtr scanptr, const ScanTabReq * scanTabReq, UintR scanParallel, UintR noOprecPerFrag) { const UintR reqinfo = scanTabReq->requestInfo; - ndbrequire(scanParallel < 16); scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanApiRec = apiConnectptr.i; @@ -8592,6 +8568,8 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; scanptr.p->noOprecPerFrag = noOprecPerFrag; + scanptr.p->first_batch_size= scanTabReq->first_batch_size; + scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo); scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo); scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo); @@ -8599,7 +8577,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanStoredProcId = scanTabReq->storedProcId; scanptr.p->scanState = ScanRecord::RUNNING; scanptr.p->m_queued_count = 0; - + ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags); for (Uint32 i = 0; i < scanParallel; i++) { @@ -9079,6 +9057,7 @@ void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode) ************************************************************/ void Dbtc::execSCAN_FRAGCONF(Signal* signal) { + Uint32 transid1, transid2, total_len; jamEntry(); const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; @@ -9094,8 +9073,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) apiConnectptr.i = scanptr.p->scanApiRec; ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); - Uint32 transid1 = apiConnectptr.p->transid[0] ^ conf->transId1; - Uint32 transid2 = apiConnectptr.p->transid[1] ^ conf->transId2; + transid1 = apiConnectptr.p->transid[0] ^ conf->transId1; + transid2 = apiConnectptr.p->transid[1] ^ conf->transId2; + total_len= conf->total_len; transid1 = transid1 | transid2; if (transid1 != 0) { jam(); @@ -9145,15 +9125,13 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); return; } - - Uint32 chksum = 0; + /* Uint32 totalLen = 0; for(Uint32 i = 0; iopReturnDataLen[i]; - chksum += (tmp << i); totalLen += tmp; } - + */ { ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags); @@ -9164,8 +9142,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) } scanFragptr.p->m_ops = noCompletedOps; - scanFragptr.p->m_chksum = chksum; - scanFragptr.p->m_totalLen = totalLen; + scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; scanFragptr.p->stopFragTimer(); @@ -9477,9 +9454,10 @@ Dbtc::seizeScanrec(Signal* signal) { void Dbtc::sendScanFragReq(Signal* signal, ScanRecord* scanP, - ScanFragRec* scanFragP){ + ScanFragRec* scanFragP) +{ Uint32 requestInfo = 0; - ScanFragReq::setConcurrency(requestInfo, scanFragP->scanFragConcurrency); + ScanFragReq * const req = (ScanFragReq *)&signal->theData[0]; ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode); ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold); if(scanP->scanLockMode == 1){ // Not read -> keyinfo @@ -9491,24 +9469,24 @@ void Dbtc::sendScanFragReq(Signal* signal, ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength); ScanFragReq::setScanPrio(requestInfo, 1); apiConnectptr.i = scanP->scanApiRec; - ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); - ScanFragReq * const req = (ScanFragReq *)&signal->theData[0]; - req->senderData = scanFragptr.i; - req->resultRef = apiConnectptr.p->ndbapiBlockref; - req->requestInfo = requestInfo; - req->savePointId = apiConnectptr.p->currSavePointId; req->tableId = scanP->scanTableref; - req->fragmentNo = scanFragP->scanFragId; req->schemaVersion = scanP->scanSchemaVersion; + ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); + req->senderData = scanFragptr.i; + req->requestInfo = requestInfo; + req->fragmentNo = scanFragP->scanFragId; + req->resultRef = apiConnectptr.p->ndbapiBlockref; + req->savePointId = apiConnectptr.p->currSavePointId; req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; - for(int i = 0; i<16; i++){ - req->clientOpPtr[i] = scanFragP->m_apiPtr; - } - sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB); + req->concurrency= scanFragP->scanFragConcurrency; + req->clientOpPtr = scanFragP->m_apiPtr; + req->batch_byte_size= scanP->batch_byte_size; + req->first_batch_size= scanP->first_batch_size; + sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, + ScanFragReq::SignalLength, JBB); updateBuddyTimer(apiConnectptr); scanFragP->startFragTimer(ctcTimer); - }//Dbtc::sendScanFragReq() @@ -9537,7 +9515,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { * ops++ = curr.p->m_apiPtr; * ops++ = curr.i; - * ops++ = (curr.p->m_totalLen << 5) + curr.p->m_ops; + * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); if(curr.p->m_ops > 0){ diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index ec9dc4a3766..2f7e4403209 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -1844,7 +1844,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->tableId = tabPtr.p->m_tableId; req->requestInfo = 0; req->savePointId = 0; - ScanFragReq::setConcurrency(req->requestInfo, parallelism); + //ScanFragReq::setConcurrency(req->requestInfo, parallelism); ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); @@ -1853,9 +1853,10 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->schemaVersion = tabPtr.p->m_schemaVersion; req->transId1 = 0; req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); - + for(unsigned int i = 0; iclientOpPtr[i] = (ptrI << 16) + (i + 1); + //req->clientOpPtr[i] = (ptrI << 16) + (i + 1); + req->clientOpPtr = (ptrI << 16) + (i + 1); } suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB); diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index 550c6313058..cbf89faf7fb 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -511,7 +511,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ /*-----------------------------------------------------------------------*/ cfg.put(CFG_ACC_OP_RECS, ((11 * noOfOperations) / 10 + 50) + - (noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) + + (noOfLocalScanRecords * MAX_PARALLEL_OP_PER_SCAN) + NODE_RECOVERY_SCAN_OP_RECORDS); cfg.put(CFG_ACC_OVERFLOW_RECS, diff --git a/ndb/src/ndbapi/NdbApiSignal.cpp b/ndb/src/ndbapi/NdbApiSignal.cpp index 6f5e1e50d2c..d7b2b74b2bf 100644 --- a/ndb/src/ndbapi/NdbApiSignal.cpp +++ b/ndb/src/ndbapi/NdbApiSignal.cpp @@ -177,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) theTrace = TestOrd::TraceAPI; theReceiversBlockNumber = DBTC; theVerId_signalNumber = GSN_SCAN_TABREQ; - theLength = 9; // ScanTabReq::SignalLength; + theLength = ScanTabReq::StaticLength; } break; @@ -186,7 +186,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) theTrace = TestOrd::TraceAPI; theReceiversBlockNumber = DBTC; theVerId_signalNumber = GSN_SCAN_NEXTREQ; - theLength = 4; + theLength = ScanNextReq::SignalLength; } break; diff --git a/ndb/src/ndbapi/NdbApiSignal.hpp b/ndb/src/ndbapi/NdbApiSignal.hpp index 9d5bc0847be..52c3be2256c 100644 --- a/ndb/src/ndbapi/NdbApiSignal.hpp +++ b/ndb/src/ndbapi/NdbApiSignal.hpp @@ -71,6 +71,8 @@ public: const Uint32 * getDataPtr() const; Uint32 * getDataPtrSend(); + NodeId get_sender_node(); + /** * Fragmentation */ @@ -103,6 +105,17 @@ private: NdbApiSignal *theNextSignal; Uint32 *theRealData; }; +/********************************************************************** +NodeId get_sender_node +Remark: Get the node id of the sender +***********************************************************************/ +inline +NodeId +NdbApiSignal::get_sender_node() +{ + return refToNode(theSendersBlockRef); +} + /********************************************************************** void getLength Remark: Get the length of the signal. diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index 0c4490015ff..78a2114584b 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -99,11 +99,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, } for(Uint32 i = 0; iint2void(ptrI); assert(tPtr); // For now diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index bdb5e6c7e78..0d85ca205b3 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -89,6 +89,47 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){ #define KEY_ATTR_ID (~0) +void +NdbReceiver::calculate_batch_size(Uint32 key_size, + Uint32 parallelism, + Uint32& batch_size, + Uint32& batch_byte_size, + Uint32& first_batch_size) +{ + Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead + NdbRecAttr *rec_attr= theFirstRecAttr; + while (rec_attr != NULL) { + Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize(); + attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead + tot_size+= attr_size; + rec_attr= rec_attr->next(); + } + tot_size+= 32; //include signal overhead + + /** + * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE + * bytes sent for each batch from each node. We do however ensure that + * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per + * batch. + */ + batch_byte_size= SCAN_BATCH_SIZE; + if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) { + batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism; + } + batch_size= batch_byte_size / tot_size; +#ifdef VM_TRACE + ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = "; + ndbout << batch_size << "tot_size = " << tot_size << endl; +#endif + if (batch_size == 0) { + batch_size= 1; + } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) { + batch_size= MAX_PARALLEL_OP_PER_SCAN; + } + first_batch_size= batch_size; + return; +} + void NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ if(rows > m_defined_rows){ @@ -139,7 +180,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ } prepareSend(); - return ; //0; + return; } void diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 7d51974da7c..c5fcbec855a 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -140,17 +140,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 fragCount = m_currentTable->m_fragmentCount; - if (batch + parallel == 0) { - batch = 16; - parallel= fragCount; - } else { - if (batch == 0 && parallel > 0) { // Backward - batch = (parallel >= 16 ? 16 : parallel); - parallel = (parallel + 15) / 16; - } - if (parallel > fragCount || parallel == 0) + if (parallel > fragCount || parallel == 0) { parallel = fragCount; - } + } // It is only possible to call openScan if // 1. this transcation don't already contain another scan operation @@ -201,7 +193,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } theParallelism = parallel; - theBatchSize = batch; if(fix_receivers(parallel) == -1){ setErrorCodeAbort(4000); @@ -223,7 +214,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); - ScanTabReq::setScanBatch(reqInfo, batch); + ScanTabReq::setScanBatch(reqInfo, 0); ScanTabReq::setLockMode(reqInfo, lockExcl); ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); @@ -815,8 +806,23 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, theReceiver.prepareSend(); bool keyInfo = m_keyInfo; Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; + /** + * The number of records sent by each LQH is calculated and the kernel + * is informed of this number by updating the SCAN_TABREQ signal + */ + Uint32 batch_size, batch_byte_size, first_batch_size; + theReceiver.calculate_batch_size(key_size, + theParallelism, + batch_size, + batch_byte_size, + first_batch_size); + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + ScanTabReq::setScanBatch(req->requestInfo, batch_size); + req->batch_byte_size= batch_byte_size; + req->first_batch_size= first_batch_size; + for(Uint32 i = 0; ido_get_value(&theReceiver, theBatchSize, key_size); + m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); } return 0; } @@ -856,23 +862,13 @@ NdbScanOperation::doSendScan(int aProcessorId) if (theOperationType == OpenRangeScanRequest) req->attrLen += theTotalBoundAI_Len; TransporterFacade *tp = TransporterFacade::instance(); - if(theParallelism > 16){ - LinearSectionPtr ptr[3]; - ptr[0].p = m_prepared_receivers; - ptr[0].sz = theParallelism; - if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { - setErrorCode(4002); - return -1; - } - } else { - tSignal->setLength(9+theParallelism); - memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism); - if (tp->sendSignal(tSignal, aProcessorId) == -1) { - setErrorCode(4002); - return -1; - } - } - + LinearSectionPtr ptr[3]; + ptr[0].p = m_prepared_receivers; + ptr[0].sz = theParallelism; + if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { + setErrorCode(4002); + return -1; + } if (theOperationType == OpenRangeScanRequest) { // must have at least one signal since it contains attrLen for bounds assert(theBoundATTRINFO != NULL); @@ -969,8 +965,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ } const Uint32 * src = (Uint32*)tRecAttr->aRef(); - const Uint32 tScanInfo = src[len] & 0xFFFF; - const Uint32 tTakeOverNode = src[len] >> 16; + const Uint32 tScanInfo = src[len] & 0x3FFFF; + const Uint32 tTakeOverNode = src[len] >> 20; { UintR scanInfo = 0; TcKeyReq::setTakeOverScanFlag(scanInfo, 1);