diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h index 16456ad10d0..9411a98f091 100644 --- a/ndb/include/kernel/ndb_limits.h +++ b/ndb/include/kernel/ndb_limits.h @@ -64,24 +64,30 @@ #define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1) /* - * Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The + * Max Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The * API can order a multiple of this number of records at a time since * fragments can be scanned in parallel. */ #define MAX_PARALLEL_OP_PER_SCAN 992 /* +* The default batch size. Configurable parameter. +*/ +#define DEF_BATCH_SIZE 64 +/* * When calculating the number of records sent from LQH in each batch * one uses SCAN_BATCH_SIZE divided by the expected size of signals * per row. This gives the batch size used for the scan. The NDB API * will receive one batch from each node at a time so there has to be * some care taken also so that the NDB API is not overloaded with * signals. +* This parameter is configurable, this is the default value. */ #define SCAN_BATCH_SIZE 32768 /* * To protect the NDB API from overload we also define a maximum total * batch size from all nodes. This parameter should most likely be * configurable, or dependent on sendBufferSize. +* This parameter is configurable, this is the default value. */ #define MAX_SCAN_BATCH_SIZE 262144 /* diff --git a/ndb/include/mgmapi/mgmapi_config_parameters.h b/ndb/include/mgmapi/mgmapi_config_parameters.h index 3eca49055fe..a8ad2ad379b 100644 --- a/ndb/include/mgmapi/mgmapi_config_parameters.h +++ b/ndb/include/mgmapi/mgmapi_config_parameters.h @@ -121,6 +121,14 @@ #define CFG_REP_HEARTBEAT_INTERVAL 700 +/** + * API Config variables + * + */ +#define CFG_MAX_SCAN_BATCH_SIZE 800 +#define CFG_BATCH_BYTE_SIZE 801 +#define CFG_BATCH_SIZE 802 + /** * Internal */ diff --git a/ndb/src/common/mgmcommon/ConfigInfo.cpp b/ndb/src/common/mgmcommon/ConfigInfo.cpp index 1dffd6751b5..924c391e1c7 100644 --- a/ndb/src/common/mgmcommon/ConfigInfo.cpp +++ b/ndb/src/common/mgmcommon/ConfigInfo.cpp @@ -1166,6 +1166,42 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { 0, MAX_INT_RNIL }, + { + CFG_MAX_SCAN_BATCH_SIZE, + "MaxScanBatchSize", + "API", + "The maximum collective batch size for one scan", + ConfigInfo::USED, + false, + ConfigInfo::INT, + MAX_SCAN_BATCH_SIZE, + 32768, + (1024*1024*16) }, + + { + CFG_BATCH_BYTE_SIZE, + "BatchByteSize", + "API", + "The default batch size in bytes", + ConfigInfo::USED, + false, + ConfigInfo::INT, + SCAN_BATCH_SIZE, + 1024, + (1024*1024) }, + + { + CFG_BATCH_SIZE, + "BatchSize", + "API", + "The default batch size in number of records", + ConfigInfo::USED, + false, + ConfigInfo::INT, + DEF_BATCH_SIZE, + 1, + MAX_PARALLEL_OP_PER_SCAN }, + /**************************************************************************** * MGM ***************************************************************************/ diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 87a6eebda6f..591d9562d06 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -8386,9 +8386,6 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanNumber = ~0; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; - ndbout << "batch_size = " << scanptr.p->batch_size; - ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations; - ndbout << endl; if ((scanptr.p->scanConcurrentOperations == 0) || (scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) { jam(); diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index 0d85ca205b3..8cc17a9d1d7 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -20,6 +20,7 @@ #include #include #include +#include NdbReceiver::NdbReceiver(Ndb *aNdb) : theMagicNumber(0), @@ -96,6 +97,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, Uint32& batch_byte_size, Uint32& first_batch_size) { + TransporterFacade *tp= TransporterFacade::instance(); + Uint32 max_scan_batch_size= tp->get_scan_batch_size(); + Uint32 max_batch_byte_size= tp->get_batch_byte_size(); + Uint32 max_batch_size= tp->get_batch_size(); Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead NdbRecAttr *rec_attr= theFirstRecAttr; while (rec_attr != NULL) { @@ -112,19 +117,19 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per * batch. */ - batch_byte_size= SCAN_BATCH_SIZE; - if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) { - batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism; + batch_byte_size= max_batch_byte_size; + if (batch_byte_size * parallelism > max_scan_batch_size) { + batch_byte_size= max_scan_batch_size / parallelism; } batch_size= batch_byte_size / tot_size; -#ifdef VM_TRACE - ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = "; - ndbout << batch_size << "tot_size = " << tot_size << endl; -#endif if (batch_size == 0) { batch_size= 1; - } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) { - batch_size= MAX_PARALLEL_OP_PER_SCAN; + } else { + if (batch_size > max_batch_size) { + batch_size= max_batch_size; + } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) { + batch_size= MAX_PARALLEL_OP_PER_SCAN; + } } first_batch_size= batch_size; return; diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index d1e57e874ee..36ec9579124 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -550,6 +550,9 @@ TransporterFacade::TransporterFacade() : theArbitMgr = NULL; theStartNodeId = 1; m_open_count = 0; + m_scan_batch_size= MAX_SCAN_BATCH_SIZE; + m_batch_byte_size= SCAN_BATCH_SIZE; + m_batch_size= DEF_BATCH_SIZE; } bool @@ -593,7 +596,18 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) iter.get(CFG_NODE_ARBIT_DELAY, &delay); theArbitMgr->setDelay(delay); } - + Uint32 scan_batch_size= 0; + if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) { + m_scan_batch_size= scan_batch_size; + } + Uint32 batch_byte_size= 0; + if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) { + m_batch_byte_size= batch_byte_size; + } + Uint32 batch_size= 0; + if (!iter.get(CFG_BATCH_SIZE, &batch_size)) { + m_batch_size= batch_size; + } #if 0 } #endif diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 906bb7c34b2..bf29802cae3 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -113,6 +113,11 @@ public: // Close this block number int close_local(BlockNumber blockNumber); + // Scan batch configuration parameters + Uint32 get_scan_batch_size(); + Uint32 get_batch_byte_size(); + Uint32 get_batch_size(); + private: /** * Send a signal unconditional of node status (used by ClusterMgr) @@ -146,6 +151,11 @@ private: void calculateSendLimit(); + // Scan batch configuration parameters + Uint32 m_scan_batch_size; + Uint32 m_batch_byte_size; + Uint32 m_batch_size; + // Declarations for the receive and send thread int theStopReceive; @@ -325,4 +335,24 @@ TransporterFacade::getNodeSequence(NodeId n) const { return theClusterMgr->getNodeInfo(n).m_info.m_connectCount; } +inline +Uint32 +TransporterFacade::get_scan_batch_size() { + return m_scan_batch_size; +} + +inline +Uint32 +TransporterFacade::get_batch_byte_size() { + return m_batch_byte_size; +} + +inline +Uint32 +TransporterFacade::get_batch_size() { + return m_batch_size; +} + + + #endif // TransporterFacade_H