mirror of
https://github.com/MariaDB/server.git
synced 2025-11-15 09:02:33 +03:00
Merge perch.ndb.mysql.com:/home/jonas/src/50-work
into perch.ndb.mysql.com:/home/jonas/src/mysql-5.0
This commit is contained in:
@@ -41,7 +41,9 @@ public:
|
||||
* @param parallel No of fragments to scan in parallel (0=max)
|
||||
*/
|
||||
virtual int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
||||
Uint32 scan_flags = 0,
|
||||
Uint32 parallel = 0,
|
||||
Uint32 batch = 0);
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
/**
|
||||
@@ -69,7 +71,7 @@ public:
|
||||
(SF_ReadRangeNo & -(Int32)read_range_no) |
|
||||
(SF_KeyInfo & -(Int32)keyinfo);
|
||||
|
||||
return readTuples(lock_mode, scan_flags, parallel);
|
||||
return readTuples(lock_mode, scan_flags, parallel, batch);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@@ -57,7 +57,9 @@ public:
|
||||
*/
|
||||
virtual
|
||||
int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
||||
Uint32 scan_flags = 0,
|
||||
Uint32 parallel = 0,
|
||||
Uint32 batch = 0);
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
|
||||
/**
|
||||
|
||||
@@ -7373,15 +7373,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
||||
scanptr.p->m_curr_batch_size_rows = 0;
|
||||
scanptr.p->m_curr_batch_size_bytes = 0;
|
||||
closeScanLab(signal);
|
||||
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
|
||||
jam();
|
||||
closeScanLab(signal);
|
||||
return;
|
||||
} else if (scanptr.p->check_scan_batch_completed() &&
|
||||
scanptr.p->scanLockHold != ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
sendScanFragConf(signal, ZFALSE);
|
||||
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
|
||||
jam();
|
||||
closeScanLab(signal);
|
||||
return;
|
||||
} else {
|
||||
jam();
|
||||
/*
|
||||
|
||||
@@ -7047,6 +7047,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags);
|
||||
for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr))
|
||||
{
|
||||
jam();
|
||||
if (refToNode(ptr.p->lqhBlockref) == failedNodeId)
|
||||
{
|
||||
jam();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(found){
|
||||
jam();
|
||||
|
||||
@@ -26,12 +26,12 @@ public:
|
||||
void init() { m_confs.clear(); m_nRefs = 0; }
|
||||
|
||||
template<typename SignalClass>
|
||||
void init(SafeCounterManager& mgr,
|
||||
bool init(SafeCounterManager& mgr,
|
||||
NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData)
|
||||
{
|
||||
init();
|
||||
SafeCounter tmp(mgr, m_sc);
|
||||
tmp.init<SignalClass>(rg, GSN, senderData);
|
||||
return tmp.init<SignalClass>(rg, GSN, senderData);
|
||||
}
|
||||
|
||||
bool ignoreRef(SafeCounterManager& mgr, Uint32 nodeId)
|
||||
|
||||
@@ -230,10 +230,13 @@ inline
|
||||
bool
|
||||
SafeCounter::init(NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData){
|
||||
|
||||
bool b = init<Ref>(rg.m_block, GSN, senderData);
|
||||
m_nodes = rg.m_nodes;
|
||||
m_count = m_nodes.count();
|
||||
return b;
|
||||
if (init<Ref>(rg.m_block, GSN, senderData))
|
||||
{
|
||||
m_nodes = rg.m_nodes;
|
||||
m_count = m_nodes.count();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
template<typename Ref>
|
||||
@@ -241,10 +244,13 @@ inline
|
||||
bool
|
||||
SafeCounter::init(NodeReceiverGroup rg, Uint32 senderData){
|
||||
|
||||
bool b = init<Ref>(rg.m_block, Ref::GSN, senderData);
|
||||
m_nodes = rg.m_nodes;
|
||||
m_count = m_nodes.count();
|
||||
return b;
|
||||
if (init<Ref>(rg.m_block, Ref::GSN, senderData))
|
||||
{
|
||||
m_nodes = rg.m_nodes;
|
||||
m_count = m_nodes.count();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
inline
|
||||
|
||||
@@ -30,6 +30,7 @@ extern my_bool opt_core;
|
||||
#define MAX_LINE_LENGTH 255
|
||||
#define KEY_INTERNAL 0
|
||||
#define MAX_INT_RNIL 0xfffffeff
|
||||
#define MAX_PORT_NO 65535
|
||||
|
||||
#define _STR_VALUE(x) #x
|
||||
#define STR_VALUE(x) _STR_VALUE(x)
|
||||
@@ -422,7 +423,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
UNDEFINED,
|
||||
"1",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
CFG_DB_NO_REPLICAS,
|
||||
@@ -1462,7 +1463,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
NDB_PORT,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
KEY_INTERNAL,
|
||||
@@ -1474,7 +1475,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
UNDEFINED,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
CFG_NODE_ARBIT_RANK,
|
||||
@@ -1616,7 +1617,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
CFG_TCP_SEND_BUFFER_SIZE,
|
||||
@@ -1722,7 +1723,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
CFG_SHM_SIGNUM,
|
||||
@@ -1944,7 +1945,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::CI_INT,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
STR_VALUE(MAX_PORT_NO) },
|
||||
|
||||
{
|
||||
CFG_SCI_HOST1_ID_0,
|
||||
|
||||
@@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
|
||||
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
|
||||
* batch.
|
||||
*/
|
||||
batch_byte_size= max_batch_byte_size;
|
||||
if (batch_size == 0)
|
||||
{
|
||||
batch_byte_size= max_batch_byte_size;
|
||||
}
|
||||
else
|
||||
{
|
||||
batch_byte_size= batch_size * tot_size;
|
||||
}
|
||||
|
||||
if (batch_byte_size * parallelism > max_scan_batch_size) {
|
||||
batch_byte_size= max_scan_batch_size / parallelism;
|
||||
}
|
||||
|
||||
@@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
|
||||
int
|
||||
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||
Uint32 scan_flags,
|
||||
Uint32 parallel)
|
||||
Uint32 parallel,
|
||||
Uint32 batch)
|
||||
{
|
||||
m_ordered = m_descending = false;
|
||||
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
||||
@@ -181,9 +182,12 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||
bool tupScan = (scan_flags & SF_TupScan);
|
||||
if (tupScan && rangeScan)
|
||||
tupScan = false;
|
||||
|
||||
theParallelism = parallel;
|
||||
|
||||
if (rangeScan && (scan_flags & SF_OrderBy))
|
||||
parallel = fragCount;
|
||||
|
||||
theParallelism = parallel;
|
||||
|
||||
if(fix_receivers(parallel) == -1){
|
||||
setErrorCodeAbort(4000);
|
||||
return -1;
|
||||
@@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||
req->tableSchemaVersion = m_accessTable->m_version;
|
||||
req->storedProcId = 0xFFFF;
|
||||
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
||||
req->first_batch_size = batch; // Save user specified batch size
|
||||
|
||||
Uint32 reqInfo = 0;
|
||||
ScanTabReq::setParallelism(reqInfo, parallel);
|
||||
@@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
|
||||
* The number of records sent by each LQH is calculated and the kernel
|
||||
* is informed of this number by updating the SCAN_TABREQ signal
|
||||
*/
|
||||
Uint32 batch_size, batch_byte_size, first_batch_size;
|
||||
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
||||
Uint32 batch_size = req->first_batch_size; // User specified
|
||||
Uint32 batch_byte_size, first_batch_size;
|
||||
theReceiver.calculate_batch_size(key_size,
|
||||
theParallelism,
|
||||
batch_size,
|
||||
batch_byte_size,
|
||||
first_batch_size);
|
||||
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
||||
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
|
||||
req->batch_byte_size= batch_byte_size;
|
||||
req->first_batch_size= first_batch_size;
|
||||
@@ -1216,13 +1222,14 @@ error:
|
||||
int
|
||||
NdbIndexScanOperation::readTuples(LockMode lm,
|
||||
Uint32 scan_flags,
|
||||
Uint32 parallel)
|
||||
Uint32 parallel,
|
||||
Uint32 batch)
|
||||
{
|
||||
const bool order_by = scan_flags & SF_OrderBy;
|
||||
const bool order_desc = scan_flags & SF_Descending;
|
||||
const bool read_range_no = scan_flags & SF_ReadRangeNo;
|
||||
|
||||
int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
|
||||
|
||||
int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
|
||||
if(!res && read_range_no)
|
||||
{
|
||||
m_read_range_no = 1;
|
||||
|
||||
Reference in New Issue
Block a user