mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Added configuration parameters for batch size, batch byte size
and max scan batch size Removed some debug printouts
This commit is contained in:
@ -64,24 +64,30 @@
|
||||
|
||||
#define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1)
|
||||
/*
|
||||
* Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The
|
||||
* Max Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The
|
||||
* API can order a multiple of this number of records at a time since
|
||||
* fragments can be scanned in parallel.
|
||||
*/
|
||||
#define MAX_PARALLEL_OP_PER_SCAN 992
|
||||
/*
|
||||
* The default batch size. Configurable parameter.
|
||||
*/
|
||||
#define DEF_BATCH_SIZE 64
|
||||
/*
|
||||
* When calculating the number of records sent from LQH in each batch
|
||||
* one uses SCAN_BATCH_SIZE divided by the expected size of signals
|
||||
* per row. This gives the batch size used for the scan. The NDB API
|
||||
* will receive one batch from each node at a time so there has to be
|
||||
* some care taken also so that the NDB API is not overloaded with
|
||||
* signals.
|
||||
* This parameter is configurable, this is the default value.
|
||||
*/
|
||||
#define SCAN_BATCH_SIZE 32768
|
||||
/*
|
||||
* To protect the NDB API from overload we also define a maximum total
|
||||
* batch size from all nodes. This parameter should most likely be
|
||||
* configurable, or dependent on sendBufferSize.
|
||||
* This parameter is configurable, this is the default value.
|
||||
*/
|
||||
#define MAX_SCAN_BATCH_SIZE 262144
|
||||
/*
|
||||
|
@ -121,6 +121,14 @@
|
||||
|
||||
#define CFG_REP_HEARTBEAT_INTERVAL 700
|
||||
|
||||
/**
|
||||
* API Config variables
|
||||
*
|
||||
*/
|
||||
#define CFG_MAX_SCAN_BATCH_SIZE 800
|
||||
#define CFG_BATCH_BYTE_SIZE 801
|
||||
#define CFG_BATCH_SIZE 802
|
||||
|
||||
/**
|
||||
* Internal
|
||||
*/
|
||||
|
@ -1166,6 +1166,42 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
0,
|
||||
MAX_INT_RNIL },
|
||||
|
||||
{
|
||||
CFG_MAX_SCAN_BATCH_SIZE,
|
||||
"MaxScanBatchSize",
|
||||
"API",
|
||||
"The maximum collective batch size for one scan",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
MAX_SCAN_BATCH_SIZE,
|
||||
32768,
|
||||
(1024*1024*16) },
|
||||
|
||||
{
|
||||
CFG_BATCH_BYTE_SIZE,
|
||||
"BatchByteSize",
|
||||
"API",
|
||||
"The default batch size in bytes",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
SCAN_BATCH_SIZE,
|
||||
1024,
|
||||
(1024*1024) },
|
||||
|
||||
{
|
||||
CFG_BATCH_SIZE,
|
||||
"BatchSize",
|
||||
"API",
|
||||
"The default batch size in number of records",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
DEF_BATCH_SIZE,
|
||||
1,
|
||||
MAX_PARALLEL_OP_PER_SCAN },
|
||||
|
||||
/****************************************************************************
|
||||
* MGM
|
||||
***************************************************************************/
|
||||
|
@ -8386,9 +8386,6 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
||||
scanptr.p->scanNumber = ~0;
|
||||
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
|
||||
|
||||
ndbout << "batch_size = " << scanptr.p->batch_size;
|
||||
ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations;
|
||||
ndbout << endl;
|
||||
if ((scanptr.p->scanConcurrentOperations == 0) ||
|
||||
(scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
|
||||
jam();
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <NdbRecAttr.hpp>
|
||||
#include <AttributeHeader.hpp>
|
||||
#include <NdbConnection.hpp>
|
||||
#include <TransporterFacade.hpp>
|
||||
|
||||
NdbReceiver::NdbReceiver(Ndb *aNdb) :
|
||||
theMagicNumber(0),
|
||||
@ -96,6 +97,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
|
||||
Uint32& batch_byte_size,
|
||||
Uint32& first_batch_size)
|
||||
{
|
||||
TransporterFacade *tp= TransporterFacade::instance();
|
||||
Uint32 max_scan_batch_size= tp->get_scan_batch_size();
|
||||
Uint32 max_batch_byte_size= tp->get_batch_byte_size();
|
||||
Uint32 max_batch_size= tp->get_batch_size();
|
||||
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
|
||||
NdbRecAttr *rec_attr= theFirstRecAttr;
|
||||
while (rec_attr != NULL) {
|
||||
@ -112,20 +117,20 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
|
||||
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
|
||||
* batch.
|
||||
*/
|
||||
batch_byte_size= SCAN_BATCH_SIZE;
|
||||
if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) {
|
||||
batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism;
|
||||
batch_byte_size= max_batch_byte_size;
|
||||
if (batch_byte_size * parallelism > max_scan_batch_size) {
|
||||
batch_byte_size= max_scan_batch_size / parallelism;
|
||||
}
|
||||
batch_size= batch_byte_size / tot_size;
|
||||
#ifdef VM_TRACE
|
||||
ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = ";
|
||||
ndbout << batch_size << "tot_size = " << tot_size << endl;
|
||||
#endif
|
||||
if (batch_size == 0) {
|
||||
batch_size= 1;
|
||||
} else {
|
||||
if (batch_size > max_batch_size) {
|
||||
batch_size= max_batch_size;
|
||||
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
|
||||
batch_size= MAX_PARALLEL_OP_PER_SCAN;
|
||||
}
|
||||
}
|
||||
first_batch_size= batch_size;
|
||||
return;
|
||||
}
|
||||
|
@ -550,6 +550,9 @@ TransporterFacade::TransporterFacade() :
|
||||
theArbitMgr = NULL;
|
||||
theStartNodeId = 1;
|
||||
m_open_count = 0;
|
||||
m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
|
||||
m_batch_byte_size= SCAN_BATCH_SIZE;
|
||||
m_batch_size= DEF_BATCH_SIZE;
|
||||
}
|
||||
|
||||
bool
|
||||
@ -593,7 +596,18 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
|
||||
iter.get(CFG_NODE_ARBIT_DELAY, &delay);
|
||||
theArbitMgr->setDelay(delay);
|
||||
}
|
||||
|
||||
Uint32 scan_batch_size= 0;
|
||||
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
|
||||
m_scan_batch_size= scan_batch_size;
|
||||
}
|
||||
Uint32 batch_byte_size= 0;
|
||||
if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
|
||||
m_batch_byte_size= batch_byte_size;
|
||||
}
|
||||
Uint32 batch_size= 0;
|
||||
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
|
||||
m_batch_size= batch_size;
|
||||
}
|
||||
#if 0
|
||||
}
|
||||
#endif
|
||||
|
@ -113,6 +113,11 @@ public:
|
||||
// Close this block number
|
||||
int close_local(BlockNumber blockNumber);
|
||||
|
||||
// Scan batch configuration parameters
|
||||
Uint32 get_scan_batch_size();
|
||||
Uint32 get_batch_byte_size();
|
||||
Uint32 get_batch_size();
|
||||
|
||||
private:
|
||||
/**
|
||||
* Send a signal unconditional of node status (used by ClusterMgr)
|
||||
@ -146,6 +151,11 @@ private:
|
||||
|
||||
void calculateSendLimit();
|
||||
|
||||
// Scan batch configuration parameters
|
||||
Uint32 m_scan_batch_size;
|
||||
Uint32 m_batch_byte_size;
|
||||
Uint32 m_batch_size;
|
||||
|
||||
// Declarations for the receive and send thread
|
||||
int theStopReceive;
|
||||
|
||||
@ -325,4 +335,24 @@ TransporterFacade::getNodeSequence(NodeId n) const {
|
||||
return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
TransporterFacade::get_scan_batch_size() {
|
||||
return m_scan_batch_size;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
TransporterFacade::get_batch_byte_size() {
|
||||
return m_batch_byte_size;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
TransporterFacade::get_batch_size() {
|
||||
return m_batch_size;
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endif // TransporterFacade_H
|
||||
|
Reference in New Issue
Block a user