mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
Merge mysql.com:/home/jonas/src/mysql-4.1-ndb
into mysql.com:/home/jonas/src/wl1873 ndb/include/ndbapi/NdbScanOperation.hpp: Auto merged ndb/src/ndbapi/NdbResultSet.cpp: Auto merged ndb/src/ndbapi/NdbScanOperation.cpp: Auto merged ndb/test/ndbapi/testScan.cpp: Auto merged
This commit is contained in:
@ -96,6 +96,11 @@ public:
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Restart
|
||||
*/
|
||||
int restart();
|
||||
|
||||
/**
|
||||
* Transfer scan operation to an updating transaction. Use this function
|
||||
* when a scan has found a record that you want to update.
|
||||
|
@ -157,6 +157,8 @@ protected:
|
||||
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
|
||||
|
||||
Uint32 m_ordered;
|
||||
|
||||
int restart();
|
||||
};
|
||||
|
||||
inline
|
||||
|
@ -89,3 +89,8 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
NdbResultSet::restart(){
|
||||
return m_operation->restart();
|
||||
}
|
||||
|
@ -470,6 +470,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
||||
if(DEBUG_NEXT_RESULT)
|
||||
ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
|
||||
|
||||
if(DEBUG_NEXT_RESULT)
|
||||
ndbout_c("nextResult(%d) idx=%d last=%d",
|
||||
fetchAllowed,
|
||||
idx, last);
|
||||
|
||||
/**
|
||||
* Check next buckets
|
||||
*/
|
||||
@ -1395,3 +1400,88 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
|
||||
tSignal.setLength(4+1);
|
||||
return tp->sendSignal(&tSignal, nodeId);
|
||||
}
|
||||
|
||||
int
|
||||
NdbScanOperation::restart(){
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
|
||||
Uint32 seq = theNdbCon->theNodeSequence;
|
||||
Uint32 nodeId = theNdbCon->theDBnode;
|
||||
|
||||
if(seq != tp->getNodeSequence(nodeId)){
|
||||
theNdbCon->theReleaseOnClose = true;
|
||||
return -1;
|
||||
}
|
||||
|
||||
while(m_sent_receivers_count){
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
switch(return_code){
|
||||
case 0:
|
||||
break;
|
||||
case -1:
|
||||
setErrorCode(4008);
|
||||
case -2:
|
||||
m_api_receivers_count = 0;
|
||||
m_conf_receivers_count = 0;
|
||||
m_sent_receivers_count = 0;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if(m_api_receivers_count+m_conf_receivers_count){
|
||||
// Send close scan
|
||||
if(send_next_scan(0, true) == -1) // Close scan
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* wait for close scan conf
|
||||
*/
|
||||
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
switch(return_code){
|
||||
case 0:
|
||||
break;
|
||||
case -1:
|
||||
setErrorCode(4008);
|
||||
case -2:
|
||||
m_api_receivers_count = 0;
|
||||
m_conf_receivers_count = 0;
|
||||
m_sent_receivers_count = 0;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset receivers
|
||||
*/
|
||||
const Uint32 parallell = theParallelism;
|
||||
|
||||
for(Uint32 i = 0; i<parallell; i++){
|
||||
m_receivers[i]->m_list_index = i;
|
||||
m_prepared_receivers[i] = m_receivers[i]->getId();
|
||||
m_sent_receivers[i] = m_receivers[i];
|
||||
m_conf_receivers[i] = 0;
|
||||
m_api_receivers[i] = 0;
|
||||
m_receivers[i]->prepareSend();
|
||||
}
|
||||
|
||||
m_api_receivers_count = 0;
|
||||
m_current_api_receiver = 0;
|
||||
m_sent_receivers_count = parallell;
|
||||
m_conf_receivers_count = 0;
|
||||
|
||||
if(m_ordered){
|
||||
m_current_api_receiver = parallell;
|
||||
}
|
||||
|
||||
if (doSendScan(nodeId) == -1)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -881,6 +881,93 @@ int runCheckInactivityBeforeClose(NDBT_Context* ctx, NDBT_Step* step){
|
||||
|
||||
}
|
||||
|
||||
int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
|
||||
int loops = ctx->getNumLoops();
|
||||
int records = ctx->getNumRecords();
|
||||
Ndb * pNdb = GETNDB(step);
|
||||
const NdbDictionary::Table* pTab = ctx->getTab();
|
||||
|
||||
HugoCalculator calc(* pTab);
|
||||
NDBT_ResultRow tmpRow(* pTab);
|
||||
|
||||
int i = 0;
|
||||
while (i<loops && !ctx->isTestStopped()) {
|
||||
g_info << i++ << ": ";
|
||||
const int record = (rand() % records);
|
||||
g_info << " row=" << record;
|
||||
|
||||
NdbConnection* pCon = pNdb->startTransaction();
|
||||
NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
|
||||
if (pOp == NULL) {
|
||||
ERR(pCon->getNdbError());
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
NdbResultSet* rs = pOp->readTuples();
|
||||
if( rs == 0 ) {
|
||||
ERR(pCon->getNdbError());
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
int check = pOp->interpret_exit_ok();
|
||||
if( check == -1 ) {
|
||||
ERR(pCon->getNdbError());
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
// Define attributes to read
|
||||
for(int a = 0; a<pTab->getNoOfColumns(); a++){
|
||||
if((tmpRow.attributeStore(a) =
|
||||
pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
|
||||
ERR(pCon->getNdbError());
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
check = pCon->execute(NoCommit);
|
||||
if( check == -1 ) {
|
||||
ERR(pCon->getNdbError());
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
int res;
|
||||
int row = 0;
|
||||
while(row < record && (res = rs->nextResult()) == 0) {
|
||||
if(calc.verifyRowValues(&tmpRow) != 0){
|
||||
abort();
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
row++;
|
||||
}
|
||||
if(row != record){
|
||||
ERR(pCon->getNdbError());
|
||||
abort();
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
g_info << " restarting" << endl;
|
||||
if((res = rs->restart()) != 0){
|
||||
ERR(pCon->getNdbError());
|
||||
abort();
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
row = 0;
|
||||
while((res = rs->nextResult()) == 0) {
|
||||
if(calc.verifyRowValues(&tmpRow) != 0){
|
||||
abort();
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
row++;
|
||||
}
|
||||
if(res != 1 || row != records){
|
||||
ERR(pCon->getNdbError());
|
||||
abort();
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
pCon->close();
|
||||
}
|
||||
return NDBT_OK;
|
||||
}
|
||||
|
||||
|
||||
NDBT_TESTSUITE(testScan);
|
||||
@ -1304,6 +1391,12 @@ TESTCASE("ScanReadWhileNodeIsDown",
|
||||
STEP(runStopAndStartNode);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("ScanRestart",
|
||||
"Verify restart functionallity"){
|
||||
INITIALIZER(runLoadTable);
|
||||
STEP(runScanRestart);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
NDBT_TESTSUITE_END(testScan);
|
||||
|
||||
int main(int argc, const char** argv){
|
||||
|
Reference in New Issue
Block a user