mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Fixed memory handling
This commit is contained in:
@ -37,7 +37,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
NdbReceiver(Ndb *aNdb);
|
NdbReceiver(Ndb *aNdb);
|
||||||
void init(ReceiverType type, void* owner, bool keyInfo);
|
void init(ReceiverType type, void* owner);
|
||||||
void release();
|
void release();
|
||||||
~NdbReceiver();
|
~NdbReceiver();
|
||||||
|
|
||||||
|
@ -131,7 +131,8 @@ protected:
|
|||||||
int doSendScan(int ProcessorId);
|
int doSendScan(int ProcessorId);
|
||||||
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
|
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
|
||||||
|
|
||||||
int fix_receivers(Uint32 parallel, bool keyInfo);
|
int fix_receivers(Uint32 parallel);
|
||||||
|
Uint32* m_array; // containing all arrays below
|
||||||
Uint32 m_allocated_receivers;
|
Uint32 m_allocated_receivers;
|
||||||
NdbReceiver** m_receivers; // All receivers
|
NdbReceiver** m_receivers; // All receivers
|
||||||
|
|
||||||
|
@ -60,6 +60,48 @@
|
|||||||
// seen only when we debug the product
|
// seen only when we debug the product
|
||||||
#ifdef VM_TRACE
|
#ifdef VM_TRACE
|
||||||
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
|
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbOut &
|
||||||
|
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
|
||||||
|
out << (int)state;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
#else
|
#else
|
||||||
#define DEBUG(x)
|
#define DEBUG(x)
|
||||||
#endif
|
#endif
|
||||||
@ -7177,7 +7219,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||||||
ScanFragRef * ref;
|
ScanFragRef * ref;
|
||||||
const Uint32 transid1 = scanFragReq->transId1;
|
const Uint32 transid1 = scanFragReq->transId1;
|
||||||
const Uint32 transid2 = scanFragReq->transId2;
|
const Uint32 transid2 = scanFragReq->transId2;
|
||||||
Uint32 errorCode;
|
Uint32 errorCode= 0;
|
||||||
Uint32 senderData;
|
Uint32 senderData;
|
||||||
Uint32 hashIndex;
|
Uint32 hashIndex;
|
||||||
TcConnectionrecPtr nextHashptr;
|
TcConnectionrecPtr nextHashptr;
|
||||||
@ -8466,7 +8508,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||||||
const Uint32 type = getNodeInfo(nodeId).m_type;
|
const Uint32 type = getNodeInfo(nodeId).m_type;
|
||||||
const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
|
const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
|
||||||
const bool old_dest = (getNodeInfo(nodeId).m_version < MAKE_VERSION(3,5,0));
|
const bool old_dest = (getNodeInfo(nodeId).m_version < MAKE_VERSION(3,5,0));
|
||||||
const bool longable = is_api && !old_dest;
|
const bool longable = true; // TODO is_api && !old_dest;
|
||||||
|
|
||||||
Uint32 * dst = keyInfo->keyData;
|
Uint32 * dst = keyInfo->keyData;
|
||||||
dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
|
dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
|
||||||
@ -15736,7 +15778,7 @@ void Dblqh::completedLogPage(Signal* signal, Uint32 clpType)
|
|||||||
/* ---------------------------------------------------------------- */
|
/* ---------------------------------------------------------------- */
|
||||||
void Dblqh::deleteFragrec(Uint32 fragId)
|
void Dblqh::deleteFragrec(Uint32 fragId)
|
||||||
{
|
{
|
||||||
Uint32 indexFound;
|
Uint32 indexFound= RNIL;
|
||||||
fragptr.i = RNIL;
|
fragptr.i = RNIL;
|
||||||
for (Uint32 i = (NO_OF_FRAG_PER_NODE - 1); (Uint32)~i; i--) {
|
for (Uint32 i = (NO_OF_FRAG_PER_NODE - 1); (Uint32)~i; i--) {
|
||||||
jam();
|
jam();
|
||||||
|
@ -220,7 +220,7 @@ void Dbtup::sendReadAttrinfo(Signal* signal,
|
|||||||
*/
|
*/
|
||||||
Uint32 routeBlockref = regOperPtr->coordinatorTC;
|
Uint32 routeBlockref = regOperPtr->coordinatorTC;
|
||||||
|
|
||||||
if(is_api && !old_dest){
|
if(true){ // TODO is_api && !old_dest){
|
||||||
ljam();
|
ljam();
|
||||||
transIdAI->attrData[0] = recBlockref;
|
transIdAI->attrData[0] = recBlockref;
|
||||||
LinearSectionPtr ptr[3];
|
LinearSectionPtr ptr[3];
|
||||||
|
@ -1515,7 +1515,8 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
|
|||||||
ndbout_c("Failed to unpack buffer");
|
ndbout_c("Failed to unpack buffer");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delete prop;
|
||||||
return (ndb_mgm_configuration*)cvf.m_cfg;
|
return (ndb_mgm_configuration*)cvf.m_cfg;
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
|
@ -230,10 +230,6 @@ NdbColumnImpl::assign(const NdbColumnImpl& org)
|
|||||||
NdbTableImpl::NdbTableImpl()
|
NdbTableImpl::NdbTableImpl()
|
||||||
: NdbDictionary::Table(* this), m_facade(this)
|
: NdbDictionary::Table(* this), m_facade(this)
|
||||||
{
|
{
|
||||||
m_noOfKeys = 0;
|
|
||||||
m_sizeOfKeysInWords = 0;
|
|
||||||
m_noOfBlobs = 0;
|
|
||||||
m_index = 0;
|
|
||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1149,7 +1145,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
|
|||||||
|
|
||||||
Uint32 keyInfoPos = 0;
|
Uint32 keyInfoPos = 0;
|
||||||
Uint32 keyCount = 0;
|
Uint32 keyCount = 0;
|
||||||
Uint32 blobCount;
|
Uint32 blobCount = 0;
|
||||||
|
|
||||||
for(Uint32 i = 0; i < tableDesc.NoOfAttributes; i++) {
|
for(Uint32 i = 0; i < tableDesc.NoOfAttributes; i++) {
|
||||||
DictTabInfo::Attribute attrDesc; attrDesc.init();
|
DictTabInfo::Attribute attrDesc; attrDesc.init();
|
||||||
|
@ -37,8 +37,6 @@
|
|||||||
#include <signaldata/IndxKeyInfo.hpp>
|
#include <signaldata/IndxKeyInfo.hpp>
|
||||||
#include <signaldata/IndxAttrInfo.hpp>
|
#include <signaldata/IndxAttrInfo.hpp>
|
||||||
|
|
||||||
#define CHECK_NULL(v) assert(v == NULL); v = NULL;
|
|
||||||
|
|
||||||
NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
|
NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
|
||||||
NdbOperation(aNdb),
|
NdbOperation(aNdb),
|
||||||
m_theIndex(NULL),
|
m_theIndex(NULL),
|
||||||
@ -52,7 +50,7 @@ NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
|
|||||||
/**
|
/**
|
||||||
* Change receiver type
|
* Change receiver type
|
||||||
*/
|
*/
|
||||||
theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this, false);
|
theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
NdbIndexOperation::~NdbIndexOperation()
|
NdbIndexOperation::~NdbIndexOperation()
|
||||||
|
@ -92,7 +92,7 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
|
|||||||
theBoundATTRINFO(NULL),
|
theBoundATTRINFO(NULL),
|
||||||
theBlobList(NULL)
|
theBlobList(NULL)
|
||||||
{
|
{
|
||||||
theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
|
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
|
||||||
theError.code = 0;
|
theError.code = 0;
|
||||||
}
|
}
|
||||||
/*****************************************************************************
|
/*****************************************************************************
|
||||||
@ -195,7 +195,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
|
|||||||
tcKeyReq->scanInfo = 0;
|
tcKeyReq->scanInfo = 0;
|
||||||
theKEYINFOptr = &tcKeyReq->keyInfo[0];
|
theKEYINFOptr = &tcKeyReq->keyInfo[0];
|
||||||
theATTRINFOptr = &tcKeyReq->attrInfo[0];
|
theATTRINFOptr = &tcKeyReq->attrInfo[0];
|
||||||
theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
|
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,6 +291,7 @@ NdbOperation::release()
|
|||||||
theNdb->releaseNdbBlob(tSaveBlob);
|
theNdb->releaseNdbBlob(tSaveBlob);
|
||||||
}
|
}
|
||||||
theBlobList = NULL;
|
theBlobList = NULL;
|
||||||
|
theReceiver.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
NdbRecAttr*
|
NdbRecAttr*
|
||||||
|
@ -29,10 +29,20 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
|
|||||||
m_owner(0)
|
m_owner(0)
|
||||||
{
|
{
|
||||||
theCurrentRecAttr = theFirstRecAttr = 0;
|
theCurrentRecAttr = theFirstRecAttr = 0;
|
||||||
|
m_defined_rows = 0;
|
||||||
|
m_rows = new NdbRecAttr*[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NdbReceiver::~NdbReceiver()
|
||||||
|
{
|
||||||
|
if (m_id != NdbObjectIdMap::InvalidId) {
|
||||||
|
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
|
||||||
|
}
|
||||||
|
delete[] m_rows;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
|
NdbReceiver::init(ReceiverType type, void* owner)
|
||||||
{
|
{
|
||||||
theMagicNumber = 0x11223344;
|
theMagicNumber = 0x11223344;
|
||||||
m_type = type;
|
m_type = type;
|
||||||
@ -44,8 +54,6 @@ NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
|
|||||||
|
|
||||||
theFirstRecAttr = NULL;
|
theFirstRecAttr = NULL;
|
||||||
theCurrentRecAttr = NULL;
|
theCurrentRecAttr = NULL;
|
||||||
m_key_info = (keyInfo ? 1 : 0);
|
|
||||||
m_defined_rows = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -61,13 +69,6 @@ NdbReceiver::release(){
|
|||||||
theCurrentRecAttr = NULL;
|
theCurrentRecAttr = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
NdbReceiver::~NdbReceiver()
|
|
||||||
{
|
|
||||||
if (m_id != NdbObjectIdMap::InvalidId) {
|
|
||||||
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
NdbRecAttr *
|
NdbRecAttr *
|
||||||
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
|
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
|
||||||
NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
|
NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
|
||||||
@ -90,9 +91,13 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
|
|||||||
|
|
||||||
void
|
void
|
||||||
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
|
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
|
||||||
m_defined_rows = rows;
|
if(rows > m_defined_rows){
|
||||||
m_rows = new NdbRecAttr*[rows + 1]; m_rows[rows] = 0;
|
delete[] m_rows;
|
||||||
|
m_defined_rows = rows;
|
||||||
|
m_rows = new NdbRecAttr*[rows + 1];
|
||||||
|
}
|
||||||
|
m_rows[rows] = 0;
|
||||||
|
|
||||||
NdbColumnImpl key;
|
NdbColumnImpl key;
|
||||||
if(key_size){
|
if(key_size){
|
||||||
key.m_attrId = KEY_ATTR_ID;
|
key.m_attrId = KEY_ATTR_ID;
|
||||||
@ -159,7 +164,6 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
|
|||||||
{
|
{
|
||||||
bool ok = true;
|
bool ok = true;
|
||||||
NdbRecAttr* currRecAttr = theCurrentRecAttr;
|
NdbRecAttr* currRecAttr = theCurrentRecAttr;
|
||||||
NdbRecAttr* prevRecAttr = currRecAttr;
|
|
||||||
|
|
||||||
for (Uint32 used = 0; used < aLength ; used++){
|
for (Uint32 used = 0; used < aLength ; used++){
|
||||||
AttributeHeader ah(* aDataPtr++);
|
AttributeHeader ah(* aDataPtr++);
|
||||||
@ -171,18 +175,21 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
|
|||||||
*/
|
*/
|
||||||
while(currRecAttr && currRecAttr->attrId() != tAttrId){
|
while(currRecAttr && currRecAttr->attrId() != tAttrId){
|
||||||
ok &= currRecAttr->setNULL();
|
ok &= currRecAttr->setNULL();
|
||||||
prevRecAttr = currRecAttr;
|
|
||||||
currRecAttr = currRecAttr->next();
|
currRecAttr = currRecAttr->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
|
if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
|
||||||
used += tAttrSize;
|
used += tAttrSize;
|
||||||
aDataPtr += tAttrSize;
|
aDataPtr += tAttrSize;
|
||||||
prevRecAttr = currRecAttr;
|
|
||||||
currRecAttr = currRecAttr->next();
|
currRecAttr = currRecAttr->next();
|
||||||
} else {
|
} else {
|
||||||
ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p",
|
ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p",
|
||||||
this,ok, tAttrId, currRecAttr);
|
this,ok, tAttrId, currRecAttr);
|
||||||
|
currRecAttr = theCurrentRecAttr;
|
||||||
|
while(currRecAttr != 0){
|
||||||
|
ndbout_c("%d ", currRecAttr->attrId());
|
||||||
|
currRecAttr = currRecAttr->next();
|
||||||
|
}
|
||||||
abort();
|
abort();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -57,11 +57,15 @@ NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
|
|||||||
m_conf_receivers = 0;
|
m_conf_receivers = 0;
|
||||||
m_sent_receivers = 0;
|
m_sent_receivers = 0;
|
||||||
m_receivers = 0;
|
m_receivers = 0;
|
||||||
|
m_array = new Uint32[1]; // skip if on delete in fix_receivers
|
||||||
}
|
}
|
||||||
|
|
||||||
NdbScanOperation::~NdbScanOperation()
|
NdbScanOperation::~NdbScanOperation()
|
||||||
{
|
{
|
||||||
fix_receivers(0, false);
|
for(Uint32 i = 0; i<m_allocated_receivers; i++){
|
||||||
|
theNdb->releaseNdbScanRec(m_receivers[i]);
|
||||||
|
}
|
||||||
|
delete[] m_array;
|
||||||
if (m_resultSet)
|
if (m_resultSet)
|
||||||
delete m_resultSet;
|
delete m_resultSet;
|
||||||
}
|
}
|
||||||
@ -130,31 +134,23 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection)
|
|||||||
|
|
||||||
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||||
Uint32 batch,
|
Uint32 batch,
|
||||||
Uint32 parallell)
|
Uint32 parallel)
|
||||||
{
|
{
|
||||||
m_ordered = 0;
|
m_ordered = 0;
|
||||||
|
|
||||||
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
||||||
|
|
||||||
if(batch + parallell == 0){ // Max speed
|
if (batch + parallel == 0) {
|
||||||
batch = 16;
|
batch = 16;
|
||||||
parallell = fragCount;
|
parallel= fragCount;
|
||||||
}
|
} else {
|
||||||
|
if (batch == 0 && parallel > 0) { // Backward
|
||||||
if(batch == 0 && parallell > 0){ // Backward
|
batch = (parallel >= 16 ? 16 : parallel);
|
||||||
batch = (parallell >= 16 ? 16 : parallell & 15);
|
parallel = (parallel + 15) / 16;
|
||||||
parallell = (parallell + 15) / 16;
|
}
|
||||||
|
if (parallel > fragCount || parallel == 0)
|
||||||
if(parallell == 0)
|
parallel = fragCount;
|
||||||
parallell = 1;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if(parallell > fragCount)
|
|
||||||
parallell = fragCount;
|
|
||||||
else if(parallell == 0)
|
|
||||||
parallell = fragCount;
|
|
||||||
|
|
||||||
assert(parallell > 0);
|
|
||||||
|
|
||||||
// It is only possible to call openScan if
|
// It is only possible to call openScan if
|
||||||
// 1. this transcation don't already contain another scan operation
|
// 1. this transcation don't already contain another scan operation
|
||||||
@ -179,7 +175,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
lockHoldMode = true;
|
lockHoldMode = true;
|
||||||
readCommitted = false;
|
readCommitted = false;
|
||||||
break;
|
break;
|
||||||
case NdbScanOperation::LM_Dirty:
|
case NdbScanOperation::LM_CommittedRead:
|
||||||
lockExcl = false;
|
lockExcl = false;
|
||||||
lockHoldMode = false;
|
lockHoldMode = false;
|
||||||
readCommitted = true;
|
readCommitted = true;
|
||||||
@ -204,10 +200,10 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
range = true;
|
range = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
theParallelism = parallell;
|
theParallelism = parallel;
|
||||||
theBatchSize = batch;
|
theBatchSize = batch;
|
||||||
|
|
||||||
if(fix_receivers(parallell, lockExcl) == -1){
|
if(fix_receivers(parallel) == -1){
|
||||||
setErrorCodeAbort(4000);
|
setErrorCodeAbort(4000);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -226,7 +222,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
||||||
|
|
||||||
Uint32 reqInfo = 0;
|
Uint32 reqInfo = 0;
|
||||||
ScanTabReq::setParallelism(reqInfo, parallell);
|
ScanTabReq::setParallelism(reqInfo, parallel);
|
||||||
ScanTabReq::setScanBatch(reqInfo, batch);
|
ScanTabReq::setScanBatch(reqInfo, batch);
|
||||||
ScanTabReq::setLockMode(reqInfo, lockExcl);
|
ScanTabReq::setLockMode(reqInfo, lockExcl);
|
||||||
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
|
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
|
||||||
@ -244,38 +240,38 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
|
NdbScanOperation::fix_receivers(Uint32 parallel){
|
||||||
if(parallell == 0 || parallell > m_allocated_receivers){
|
assert(parallel > 0);
|
||||||
if(m_prepared_receivers) delete[] m_prepared_receivers;
|
if(parallel > m_allocated_receivers){
|
||||||
if(m_receivers) delete[] m_receivers;
|
const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
|
||||||
if(m_api_receivers) delete[] m_api_receivers;
|
|
||||||
if(m_conf_receivers) delete[] m_conf_receivers;
|
|
||||||
if(m_sent_receivers) delete[] m_sent_receivers;
|
|
||||||
|
|
||||||
m_allocated_receivers = parallell;
|
|
||||||
if(parallell == 0){
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
m_prepared_receivers = new Uint32[parallell];
|
|
||||||
m_receivers = new NdbReceiver*[parallell];
|
|
||||||
m_api_receivers = new NdbReceiver*[parallell];
|
|
||||||
m_conf_receivers = new NdbReceiver*[parallell];
|
|
||||||
m_sent_receivers = new NdbReceiver*[parallell];
|
|
||||||
|
|
||||||
|
Uint32 * tmp = new Uint32[(sz+3)/4];
|
||||||
|
// Save old receivers
|
||||||
|
memcpy(tmp+parallel, m_receivers, m_allocated_receivers*sizeof(char*));
|
||||||
|
delete[] m_array;
|
||||||
|
m_array = tmp;
|
||||||
|
|
||||||
|
m_prepared_receivers = tmp;
|
||||||
|
m_receivers = (NdbReceiver**)(tmp + parallel);
|
||||||
|
m_api_receivers = m_receivers + parallel;
|
||||||
|
m_conf_receivers = m_api_receivers + parallel;
|
||||||
|
m_sent_receivers = m_conf_receivers + parallel;
|
||||||
|
|
||||||
|
// Only get/init "new" receivers
|
||||||
NdbReceiver* tScanRec;
|
NdbReceiver* tScanRec;
|
||||||
for (Uint32 i = 0; i < parallell; i ++) {
|
for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
|
||||||
tScanRec = theNdb->getNdbScanRec();
|
tScanRec = theNdb->getNdbScanRec();
|
||||||
if (tScanRec == NULL) {
|
if (tScanRec == NULL) {
|
||||||
setErrorCodeAbort(4000);
|
setErrorCodeAbort(4000);
|
||||||
return -1;
|
return -1;
|
||||||
}//if
|
}//if
|
||||||
m_receivers[i] = tScanRec;
|
m_receivers[i] = tScanRec;
|
||||||
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this, keyInfo);
|
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
|
||||||
}
|
}
|
||||||
|
m_allocated_receivers = parallel;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(Uint32 i = 0; i<parallell; i++){
|
for(Uint32 i = 0; i<parallel; i++){
|
||||||
m_receivers[i]->m_list_index = i;
|
m_receivers[i]->m_list_index = i;
|
||||||
m_prepared_receivers[i] = m_receivers[i]->getId();
|
m_prepared_receivers[i] = m_receivers[i]->getId();
|
||||||
m_sent_receivers[i] = m_receivers[i];
|
m_sent_receivers[i] = m_receivers[i];
|
||||||
@ -285,7 +281,7 @@ NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
|
|||||||
|
|
||||||
m_api_receivers_count = 0;
|
m_api_receivers_count = 0;
|
||||||
m_current_api_receiver = 0;
|
m_current_api_receiver = 0;
|
||||||
m_sent_receivers_count = parallell;
|
m_sent_receivers_count = parallel;
|
||||||
m_conf_receivers_count = 0;
|
m_conf_receivers_count = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1242,7 +1238,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
|
|||||||
int
|
int
|
||||||
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
||||||
|
|
||||||
Uint32 u_idx, u_last;
|
Uint32 u_idx = 0, u_last = 0;
|
||||||
Uint32 s_idx = m_current_api_receiver; // first sorted
|
Uint32 s_idx = m_current_api_receiver; // first sorted
|
||||||
Uint32 s_last = theParallelism; // last sorted
|
Uint32 s_last = theParallelism; // last sorted
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
#include <mgmapi.h>
|
#include <mgmapi.h>
|
||||||
#include <Vector.hpp>
|
#include <Vector.hpp>
|
||||||
|
#include <BaseString.hpp>
|
||||||
|
|
||||||
class NdbRestarter {
|
class NdbRestarter {
|
||||||
public:
|
public:
|
||||||
@ -85,8 +86,8 @@ protected:
|
|||||||
Vector<ndb_mgm_node_state> apiNodes;
|
Vector<ndb_mgm_node_state> apiNodes;
|
||||||
|
|
||||||
bool connected;
|
bool connected;
|
||||||
const char* addr;
|
BaseString addr;
|
||||||
const char* host;
|
BaseString host;
|
||||||
int port;
|
int port;
|
||||||
NdbMgmHandle handle;
|
NdbMgmHandle handle;
|
||||||
ndb_mgm_configuration * m_config;
|
ndb_mgm_configuration * m_config;
|
||||||
|
@ -71,7 +71,7 @@ NdbBackup::getFileSystemPathForNode(int _node_id){
|
|||||||
*/
|
*/
|
||||||
ConfigRetriever cr;
|
ConfigRetriever cr;
|
||||||
|
|
||||||
ndb_mgm_configuration * p = cr.getConfig(host, port, 0);
|
ndb_mgm_configuration * p = cr.getConfig(host.c_str(), port, 0);
|
||||||
if(p == 0){
|
if(p == 0){
|
||||||
const char * s = cr.getErrorString();
|
const char * s = cr.getErrorString();
|
||||||
if(s == 0)
|
if(s == 0)
|
||||||
@ -156,7 +156,7 @@ NdbBackup::execRestore(bool _restore_data,
|
|||||||
|
|
||||||
snprintf(buf, 255, "ndb_restore -c \"nodeid=%d;host=%s\" -n %d -b %d %s %s .",
|
snprintf(buf, 255, "ndb_restore -c \"nodeid=%d;host=%s\" -n %d -b %d %s %s .",
|
||||||
ownNodeId,
|
ownNodeId,
|
||||||
addr,
|
addr.c_str(),
|
||||||
_node_id,
|
_node_id,
|
||||||
_backup_id,
|
_backup_id,
|
||||||
_restore_data?"-r":"",
|
_restore_data?"-r":"",
|
||||||
|
@ -33,13 +33,11 @@
|
|||||||
|
|
||||||
NdbRestarter::NdbRestarter(const char* _addr):
|
NdbRestarter::NdbRestarter(const char* _addr):
|
||||||
connected(false),
|
connected(false),
|
||||||
addr(_addr),
|
|
||||||
host(NULL),
|
|
||||||
port(-1),
|
port(-1),
|
||||||
handle(NULL),
|
handle(NULL),
|
||||||
m_config(0)
|
m_config(0)
|
||||||
{
|
{
|
||||||
if (addr == NULL){
|
if (_addr == NULL){
|
||||||
LocalConfig lcfg;
|
LocalConfig lcfg;
|
||||||
if(!lcfg.init()){
|
if(!lcfg.init()){
|
||||||
lcfg.printError();
|
lcfg.printError();
|
||||||
@ -60,20 +58,20 @@ NdbRestarter::NdbRestarter(const char* _addr):
|
|||||||
case MgmId_TCP:
|
case MgmId_TCP:
|
||||||
char buf[255];
|
char buf[255];
|
||||||
snprintf(buf, 255, "%s:%d", m->data.tcp.remoteHost, m->data.tcp.port);
|
snprintf(buf, 255, "%s:%d", m->data.tcp.remoteHost, m->data.tcp.port);
|
||||||
addr = strdup(buf);
|
addr.assign(buf);
|
||||||
host = strdup(m->data.tcp.remoteHost);
|
host.assign(m->data.tcp.remoteHost);
|
||||||
port = m->data.tcp.port;
|
port = m->data.tcp.port;
|
||||||
|
return;
|
||||||
break;
|
break;
|
||||||
case MgmId_File:
|
case MgmId_File:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (addr != NULL)
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
addr.assign(_addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
NdbRestarter::~NdbRestarter(){
|
NdbRestarter::~NdbRestarter(){
|
||||||
@ -398,10 +396,10 @@ NdbRestarter::connect(){
|
|||||||
g_err << "handle == NULL" << endl;
|
g_err << "handle == NULL" << endl;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
g_info << "Connecting to mgmsrv at " << addr << endl;
|
g_info << "Connecting to mgmsrv at " << addr.c_str() << endl;
|
||||||
if (ndb_mgm_connect(handle, addr) == -1) {
|
if (ndb_mgm_connect(handle, addr.c_str()) == -1) {
|
||||||
MGMERR(handle);
|
MGMERR(handle);
|
||||||
g_err << "Connection to " << addr << " failed" << endl;
|
g_err << "Connection to " << addr.c_str() << " failed" << endl;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user