mirror of
https://github.com/MariaDB/server.git
synced 2025-11-16 20:23:18 +03:00
ndb - bug#20252
allow user to specify scan batch size in readTuples
This commit is contained in:
@@ -41,7 +41,9 @@ public:
|
|||||||
* @param parallel No of fragments to scan in parallel (0=max)
|
* @param parallel No of fragments to scan in parallel (0=max)
|
||||||
*/
|
*/
|
||||||
virtual int readTuples(LockMode lock_mode = LM_Read,
|
virtual int readTuples(LockMode lock_mode = LM_Read,
|
||||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
Uint32 scan_flags = 0,
|
||||||
|
Uint32 parallel = 0,
|
||||||
|
Uint32 batch = 0);
|
||||||
|
|
||||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||||
/**
|
/**
|
||||||
@@ -66,7 +68,7 @@ public:
|
|||||||
(SF_OrderBy & -(Int32)order_by) |
|
(SF_OrderBy & -(Int32)order_by) |
|
||||||
(SF_Descending & -(Int32)order_desc) |
|
(SF_Descending & -(Int32)order_desc) |
|
||||||
(SF_ReadRangeNo & -(Int32)read_range_no);
|
(SF_ReadRangeNo & -(Int32)read_range_no);
|
||||||
return readTuples(lock_mode, scan_flags, parallel);
|
return readTuples(lock_mode, scan_flags, parallel, batch);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
@@ -56,7 +56,9 @@ public:
|
|||||||
*/
|
*/
|
||||||
virtual
|
virtual
|
||||||
int readTuples(LockMode lock_mode = LM_Read,
|
int readTuples(LockMode lock_mode = LM_Read,
|
||||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
Uint32 scan_flags = 0,
|
||||||
|
Uint32 parallel = 0,
|
||||||
|
Uint32 batch = 0);
|
||||||
|
|
||||||
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
|
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -7349,15 +7349,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||||||
scanptr.p->m_curr_batch_size_rows = 0;
|
scanptr.p->m_curr_batch_size_rows = 0;
|
||||||
scanptr.p->m_curr_batch_size_bytes = 0;
|
scanptr.p->m_curr_batch_size_bytes = 0;
|
||||||
closeScanLab(signal);
|
closeScanLab(signal);
|
||||||
|
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
|
||||||
|
jam();
|
||||||
|
closeScanLab(signal);
|
||||||
|
return;
|
||||||
} else if (scanptr.p->check_scan_batch_completed() &&
|
} else if (scanptr.p->check_scan_batch_completed() &&
|
||||||
scanptr.p->scanLockHold != ZTRUE) {
|
scanptr.p->scanLockHold != ZTRUE) {
|
||||||
jam();
|
jam();
|
||||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||||
sendScanFragConf(signal, ZFALSE);
|
sendScanFragConf(signal, ZFALSE);
|
||||||
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
|
|
||||||
jam();
|
|
||||||
closeScanLab(signal);
|
|
||||||
return;
|
|
||||||
} else {
|
} else {
|
||||||
jam();
|
jam();
|
||||||
/*
|
/*
|
||||||
|
|||||||
@@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
|
|||||||
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
|
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
|
||||||
* batch.
|
* batch.
|
||||||
*/
|
*/
|
||||||
batch_byte_size= max_batch_byte_size;
|
if (batch_size == 0)
|
||||||
|
{
|
||||||
|
batch_byte_size= max_batch_byte_size;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
batch_byte_size= batch_size * tot_size;
|
||||||
|
}
|
||||||
|
|
||||||
if (batch_byte_size * parallelism > max_scan_batch_size) {
|
if (batch_byte_size * parallelism > max_scan_batch_size) {
|
||||||
batch_byte_size= max_scan_batch_size / parallelism;
|
batch_byte_size= max_scan_batch_size / parallelism;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
|
|||||||
int
|
int
|
||||||
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||||
Uint32 scan_flags,
|
Uint32 scan_flags,
|
||||||
Uint32 parallel)
|
Uint32 parallel,
|
||||||
|
Uint32 batch)
|
||||||
{
|
{
|
||||||
m_ordered = m_descending = false;
|
m_ordered = m_descending = false;
|
||||||
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
||||||
@@ -182,6 +183,9 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
if (tupScan && rangeScan)
|
if (tupScan && rangeScan)
|
||||||
tupScan = false;
|
tupScan = false;
|
||||||
|
|
||||||
|
if (rangeScan && (scan_flags & SF_OrderBy))
|
||||||
|
parallel = fragCount;
|
||||||
|
|
||||||
theParallelism = parallel;
|
theParallelism = parallel;
|
||||||
|
|
||||||
if(fix_receivers(parallel) == -1){
|
if(fix_receivers(parallel) == -1){
|
||||||
@@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
req->tableSchemaVersion = m_accessTable->m_version;
|
req->tableSchemaVersion = m_accessTable->m_version;
|
||||||
req->storedProcId = 0xFFFF;
|
req->storedProcId = 0xFFFF;
|
||||||
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
||||||
|
req->first_batch_size = batch; // Save user specified batch size
|
||||||
|
|
||||||
Uint32 reqInfo = 0;
|
Uint32 reqInfo = 0;
|
||||||
ScanTabReq::setParallelism(reqInfo, parallel);
|
ScanTabReq::setParallelism(reqInfo, parallel);
|
||||||
@@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
|
|||||||
* The number of records sent by each LQH is calculated and the kernel
|
* The number of records sent by each LQH is calculated and the kernel
|
||||||
* is informed of this number by updating the SCAN_TABREQ signal
|
* is informed of this number by updating the SCAN_TABREQ signal
|
||||||
*/
|
*/
|
||||||
Uint32 batch_size, batch_byte_size, first_batch_size;
|
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
||||||
|
Uint32 batch_size = req->first_batch_size; // User specified
|
||||||
|
Uint32 batch_byte_size, first_batch_size;
|
||||||
theReceiver.calculate_batch_size(key_size,
|
theReceiver.calculate_batch_size(key_size,
|
||||||
theParallelism,
|
theParallelism,
|
||||||
batch_size,
|
batch_size,
|
||||||
batch_byte_size,
|
batch_byte_size,
|
||||||
first_batch_size);
|
first_batch_size);
|
||||||
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
|
||||||
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
|
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
|
||||||
req->batch_byte_size= batch_byte_size;
|
req->batch_byte_size= batch_byte_size;
|
||||||
req->first_batch_size= first_batch_size;
|
req->first_batch_size= first_batch_size;
|
||||||
@@ -1206,13 +1212,14 @@ error:
|
|||||||
int
|
int
|
||||||
NdbIndexScanOperation::readTuples(LockMode lm,
|
NdbIndexScanOperation::readTuples(LockMode lm,
|
||||||
Uint32 scan_flags,
|
Uint32 scan_flags,
|
||||||
Uint32 parallel)
|
Uint32 parallel,
|
||||||
|
Uint32 batch)
|
||||||
{
|
{
|
||||||
const bool order_by = scan_flags & SF_OrderBy;
|
const bool order_by = scan_flags & SF_OrderBy;
|
||||||
const bool order_desc = scan_flags & SF_Descending;
|
const bool order_desc = scan_flags & SF_Descending;
|
||||||
const bool read_range_no = scan_flags & SF_ReadRangeNo;
|
const bool read_range_no = scan_flags & SF_ReadRangeNo;
|
||||||
|
|
||||||
int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
|
int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
|
||||||
if(!res && read_range_no)
|
if(!res && read_range_no)
|
||||||
{
|
{
|
||||||
m_read_range_no = 1;
|
m_read_range_no = 1;
|
||||||
|
|||||||
Reference in New Issue
Block a user