1
0
mirror of https://github.com/MariaDB/server.git synced 2025-11-03 14:33:32 +03:00

wl1671 - Sorted scan

This commit is contained in:
joreland@mysql.com
2004-05-26 13:24:14 +02:00
parent 05b304d096
commit 4c90c593af
78 changed files with 3523 additions and 5330 deletions

View File

@@ -47,10 +47,14 @@ UtilTransactions::clearTable(Ndb* pNdb,
}
}
int
UtilTransactions::clearTable1(Ndb* pNdb,
int records,
int parallelism){
#if 1
return clearTable3(pNdb, records, 1);
#else
// Scan all records exclusive and delete
// them one by one
int retryAttempt = 0;
@@ -191,12 +195,16 @@ UtilTransactions::clearTable1(Ndb* pNdb,
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
int
UtilTransactions::clearTable2(Ndb* pNdb,
int records,
int parallelism){
#if 1
return clearTable3(pNdb, records, parallelism);
#else
// Scan all records exclusive and delete
// them one by one
int retryAttempt = 0;
@@ -336,6 +344,7 @@ UtilTransactions::clearTable2(Ndb* pNdb,
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
int
@@ -451,7 +460,7 @@ UtilTransactions::copyTableData(Ndb* pNdb,
int parallelism = 240;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
while (true){
@@ -477,14 +486,15 @@ UtilTransactions::copyTableData(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
pOp = pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->openScanRead(parallelism);
NdbResultSet* rs = pOp->readTuples(NdbScanOperation::LM_Read,
parallelism);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@@ -508,7 +518,7 @@ UtilTransactions::copyTableData(Ndb* pNdb,
}
}
check = pTrans->executeScan();
check = pTrans->execute(NoCommit);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@@ -516,39 +526,27 @@ UtilTransactions::copyTableData(Ndb* pNdb,
}
int eof;
NdbConnection* pInsTrans;
while((eof = pTrans->nextScanResult(true)) == 0){
pInsTrans = pNdb->startTransaction();
if (pInsTrans == NULL) {
const NdbError err = pNdb->getNdbError();
ERR(err);
pNdb->closeTransaction(pInsTrans);
return NDBT_FAILED;
}
while((eof = rs->nextResult(true)) == 0){
do {
insertedRows++;
if (addRowToInsert(pNdb, pInsTrans, row, destName) != 0){
pNdb->closeTransaction(pInsTrans);
if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
} while((eof = pTrans->nextScanResult(false)) == 0);
check = pInsTrans->execute(Commit);
} while((eof = rs->nextResult(false)) == 0);
check = pTrans->execute(Commit);
pTrans->releaseCompletedOperations();
if( check == -1 ) {
const NdbError err = pInsTrans->getNdbError();
const NdbError err = pTrans->getNdbError();
ERR(err);
pNdb->closeTransaction(pInsTrans);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pInsTrans);
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
@@ -562,29 +560,16 @@ UtilTransactions::copyTableData(Ndb* pNdb,
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << insertedRows << " rows copied" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
}
int
UtilTransactions::addRowToDelete(Ndb* pNdb,
NdbConnection* pDelTrans,
NdbOperation* pOrgOp){
NdbOperation* pDelOp = pOrgOp->takeOverForDelete(pDelTrans);
if (pDelOp == NULL){
ERR(pNdb->getNdbError());
return NDBT_FAILED;
}
return NDBT_OK;
}
int
UtilTransactions::addRowToInsert(Ndb* pNdb,
NdbConnection* pInsTrans,
@@ -621,101 +606,6 @@ UtilTransactions::addRowToInsert(Ndb* pNdb,
return NDBT_OK;
}
// Take over one record from pOrgOp and delete it
int
UtilTransactions::takeOverAndDeleteRecord(Ndb* pNdb,
NdbOperation* pOrgOp){
int retryAttempt = 0;
const int retryMax = 10;
int check;
NdbConnection *pDelTrans;
NdbOperation *pDelOp;
while (true){
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pDelTrans = pNdb->startTransaction();
if (pDelTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
if ((pDelOp = pOrgOp->takeOverForDelete(pDelTrans)) == NULL){
ERR(pNdb->getNdbError());
return NDBT_FAILED;
}
#if 0
// It should not be necessary to call deleteTuple HERE!!!
check = pDelOp->deleteTuple();
if( check == -1 ) {
ERR(pDelTrans->getNdbError());
pNdb->closeTransaction(pDelTrans);
return NDBT_FAILED;
}
#endif
check = pDelTrans->execute( Commit );
if(check == -1 ) {
const NdbError err = pDelTrans->getNdbError();
pNdb->closeTransaction(pDelTrans);
ERR(err);
if(err.code == 250 || err.code == 499)
return RESTART_SCAN;
switch(err.status){
case NdbError::Success:
g_info << "ERROR: NdbError reports success when transcaction failed"
<< endl;
RETURN_FAIL(err);
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(50+50*retryAttempt);
retryAttempt++;
continue;
break;
case NdbError::UnknownResult:
RETURN_FAIL(err);
break;
default:
case NdbError::PermanentError:
switch (err.classification){
default:
RETURN_FAIL(err);
break;
}
break;
}
}
else{
pNdb->closeTransaction(pDelTrans);
}
return NDBT_OK;
}
return NDBT_FAILED;
}
int
UtilTransactions::scanReadRecords(Ndb* pNdb,
@@ -730,7 +620,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
while (true){
@@ -755,18 +645,18 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
pOp = pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
if (exclusive == true)
check = pOp->openScanExclusive(parallelism);
else
check = pOp->openScanRead(parallelism);
if( check == -1 ) {
NdbResultSet * rs = pOp->readTuples(exclusive ?
NdbScanOperation::LM_Exclusive :
NdbScanOperation::LM_Read,
0, parallelism);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
@@ -778,7 +668,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Call getValue for all the attributes supplied in attrib_list
// ************************************************
for (int a = 0; a < noAttribs; a++){
@@ -793,8 +683,8 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
}
}
// *************************************************
check = pTrans->executeScan();
check = pTrans->execute(NoCommit);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
@@ -812,15 +702,14 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
while(eof == 0){
while((eof = rs->nextResult()) == 0){
rows++;
// Call callback for each record returned
if(fn != NULL)
fn(&row);
eof = pTrans->nextScanResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
@@ -856,14 +745,15 @@ UtilTransactions::selectCount(Ndb* pNdb,
int parallelism,
int* count_rows,
ScanLock lock,
NdbConnection* pBuddyTrans){
NdbConnection* pTrans){
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation *pOp;
if(!pTrans)
pTrans = pNdb->startTransaction();
while (true){
if (retryAttempt >= retryMax){
@@ -871,39 +761,27 @@ UtilTransactions::selectCount(Ndb* pNdb,
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->hupp(pBuddyTrans);
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
pOp = pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet * rs;
switch(lock){
case SL_ReadHold:
check = pOp->openScanReadHoldLock(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Read);
break;
case SL_Exclusive:
check = pOp->openScanExclusive(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive);
break;
case SL_Read:
default:
check = pOp->openScanRead(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead);
}
if( check == -1 ) {
if( rs == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
@@ -922,9 +800,9 @@ UtilTransactions::selectCount(Ndb* pNdb,
return NDBT_FAILED;
}
}
check = pTrans->executeScan();
check = pTrans->execute(NoCommit);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@@ -933,15 +811,14 @@ UtilTransactions::selectCount(Ndb* pNdb,
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
while(eof == 0){
while((eof = rs->nextResult()) == 0){
rows++;
eof = pTrans->nextScanResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
@@ -952,7 +829,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
if (count_rows != NULL){
@@ -963,7 +840,6 @@ UtilTransactions::selectCount(Ndb* pNdb,
}
return NDBT_FAILED;
}
int
UtilTransactions::verifyIndex(Ndb* pNdb,
@@ -1028,7 +904,7 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
parallelism = 1;
@@ -1055,20 +931,21 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
pOp = pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet* rs;
if(transactional){
check = pOp->openScanReadHoldLock(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
} else {
check = pOp->openScanRead(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
}
if( check == -1 ) {
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
@@ -1091,10 +968,10 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
}
}
check = pTrans->executeScan();
check = pTrans->execute(NoCommit);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
@@ -1109,14 +986,14 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
while(eof == 0){
while((eof = rs->nextResult()) == 0){
rows++;
// ndbout << row.c_str().c_str() << endl;
if (readRowFromTableAndIndex(pNdb,
pTrans,
indexName,
@@ -1124,11 +1001,6 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
eof = pTrans->nextScanResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
@@ -1265,13 +1137,13 @@ UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
* Read the record from INDEX_TABLE
*/
NdbIndexOperation* pIndexOp= NULL;
NdbScanOperation *pScanOp= NULL;
NdbIndexScanOperation *pScanOp= NULL;
{
void* pOpCheck= NULL;
if (indexType == NdbDictionary::Index::UniqueHashIndex) {
pOpCheck= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tab.getName());
} else {
pOpCheck= pScanOp= pTrans1->getNdbScanOperation(indexName, tab.getName());
pOpCheck= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tab.getName());
}
if (pOpCheck == NULL) {
@@ -1308,7 +1180,7 @@ UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
// setBound not possible for null attributes
if ( !row.attributeStore(col->getName())->isNULL() ) {
r = pScanOp->setBound(col->getName(),
NdbOperation::BoundEQ,
NdbIndexScanOperation::BoundEQ,
row.attributeStore(col->getName())->aRef());
}
}