From c59fac93c4916f522248996932bfe9d28573a29a Mon Sep 17 00:00:00 2001 From: "jonas@perch.ndb.mysql.com" <> Date: Tue, 27 Jun 2006 11:41:00 +0200 Subject: [PATCH] ndb - bug#20252 allow user to specify scan batch size in readTuples --- ndb/include/ndbapi/NdbIndexScanOperation.hpp | 6 +++-- ndb/include/ndbapi/NdbScanOperation.hpp | 4 +++- ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 8 +++---- ndb/src/ndbapi/NdbReceiver.cpp | 10 ++++++++- ndb/src/ndbapi/NdbScanOperation.cpp | 23 +++++++++++++------- 5 files changed, 35 insertions(+), 16 deletions(-) diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp index e9f92d84d1c..e7393bdfd17 100644 --- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -41,7 +41,9 @@ public: * @param parallel No of fragments to scan in parallel (0=max) */ virtual int readTuples(LockMode lock_mode = LM_Read, - Uint32 scan_flags = 0, Uint32 parallel = 0); + Uint32 scan_flags = 0, + Uint32 parallel = 0, + Uint32 batch = 0); #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL /** @@ -66,7 +68,7 @@ public: (SF_OrderBy & -(Int32)order_by) | (SF_Descending & -(Int32)order_desc) | (SF_ReadRangeNo & -(Int32)read_range_no); - return readTuples(lock_mode, scan_flags, parallel); + return readTuples(lock_mode, scan_flags, parallel, batch); } #endif diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 77b255dc2f4..2cab02c58a9 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -56,7 +56,9 @@ public: */ virtual int readTuples(LockMode lock_mode = LM_Read, - Uint32 scan_flags = 0, Uint32 parallel = 0); + Uint32 scan_flags = 0, + Uint32 parallel = 0, + Uint32 batch = 0); #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED /** diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 19a003f00fc..c8fd440a150 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -7349,15 +7349,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal) scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); + } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { + jam(); + closeScanLab(signal); + return; } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); - } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { - jam(); - closeScanLab(signal); - return; } else { jam(); /* diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index df16ae66915..62119880076 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per * batch. */ - batch_byte_size= max_batch_byte_size; + if (batch_size == 0) + { + batch_byte_size= max_batch_byte_size; + } + else + { + batch_byte_size= batch_size * tot_size; + } + if (batch_byte_size * parallelism > max_scan_batch_size) { batch_byte_size= max_scan_batch_size / parallelism; } diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 869704c7bb3..f14b5409ce8 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection) int NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 scan_flags, - Uint32 parallel) + Uint32 parallel, + Uint32 batch) { m_ordered = m_descending = false; Uint32 fragCount = m_currentTable->m_fragmentCount; @@ -181,9 +182,12 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, bool tupScan = (scan_flags & SF_TupScan); if (tupScan && rangeScan) tupScan = false; - - theParallelism = parallel; + if (rangeScan && (scan_flags & SF_OrderBy)) + parallel = fragCount; + + theParallelism = parallel; + if(fix_receivers(parallel) == -1){ setErrorCodeAbort(4000); return -1; @@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->tableSchemaVersion = m_accessTable->m_version; req->storedProcId = 0xFFFF; req->buddyConPtr = theNdbCon->theBuddyConPtr; + req->first_batch_size = batch; // Save user specified batch size Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); @@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, * The number of records sent by each LQH is calculated and the kernel * is informed of this number by updating the SCAN_TABREQ signal */ - Uint32 batch_size, batch_byte_size, first_batch_size; + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + Uint32 batch_size = req->first_batch_size; // User specified + Uint32 batch_byte_size, first_batch_size; theReceiver.calculate_batch_size(key_size, theParallelism, batch_size, batch_byte_size, first_batch_size); - ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); ScanTabReq::setScanBatch(req->requestInfo, batch_size); req->batch_byte_size= batch_byte_size; req->first_batch_size= first_batch_size; @@ -1206,13 +1212,14 @@ error: int NdbIndexScanOperation::readTuples(LockMode lm, Uint32 scan_flags, - Uint32 parallel) + Uint32 parallel, + Uint32 batch) { const bool order_by = scan_flags & SF_OrderBy; const bool order_desc = scan_flags & SF_Descending; const bool read_range_no = scan_flags & SF_ReadRangeNo; - - int res = NdbScanOperation::readTuples(lm, scan_flags, 0); + + int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch); if(!res && read_range_no) { m_read_range_no = 1;