From 9011082eb83fd6f5e36603ece2ee87ee46e3e56e Mon Sep 17 00:00:00 2001 From: "joreland@mysql.com" <> Date: Mon, 3 Jan 2005 12:07:47 +0100 Subject: [PATCH] ndb Updated example --- .../ndbapi_scan_example/ndbapi_scan.cpp | 384 ++++++++---------- ndb/include/ndbapi/NdbScanFilter.hpp | 2 +- 2 files changed, 180 insertions(+), 206 deletions(-) diff --git a/ndb/examples/ndbapi_scan_example/ndbapi_scan.cpp b/ndb/examples/ndbapi_scan_example/ndbapi_scan.cpp index 22641bc5b57..65457873423 100644 --- a/ndb/examples/ndbapi_scan_example/ndbapi_scan.cpp +++ b/ndb/examples/ndbapi_scan_example/ndbapi_scan.cpp @@ -30,15 +30,19 @@ * getDictionary() * startTransaction() * closeTransaction() - * sendPreparedTransactions() - * pollNdb() * - * NdbConnection - * getNdbOperation() - * executeAsynchPrepare() - * getNdbError() - * executeScan() - * nextScanResult() + * NdbTransaction + * getNdbScanOperation() + * execute() + * + * NdbResultSet + * + * NdbScanOperation + * getValue() + * readTuples() + * nextResult() + * deleteCurrentTuple() + * updateCurrentTuple() * * NdbDictionary::Dictionary * getTable() @@ -60,33 +64,15 @@ * insertTuple() * equal() * setValue() - * openScanRead() - * openScanExclusive() - * - * NdbRecAttr - * aRef() - * u_32_value() - * - * NdbResultSet - * nextResult() - * deleteTuple() - * updateTuple() - * - * NdbScanOperation - * getValue() - * readTuplesExclusive() * * NdbScanFilter * begin() * eq() * end() * - * */ -#include - #include #include // Used for cout @@ -114,30 +100,12 @@ milliSleep(int milliseconds){ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } -/* - * callback : This is called when the transaction is polled - * - * (This function must have three arguments: - * - The result of the transaction, - * - The NdbConnection object, and - * - A pointer to an arbitrary object.) - */ -static void -callback(int result, NdbConnection* myTrans, void* aObject) +struct Car { - if (result == -1) { - std::cout << "In callback: " << std::endl; - /** - * Put error checking code here (see ndb_async_example) - */ - APIERROR(myTrans->getNdbError()); - } else { - /** - * Ok! - */ - return; - } -} + unsigned int reg_no; + char brand[20]; + char color[20]; +}; /** * Function to create table @@ -161,6 +129,8 @@ int create_table(Ndb * myNdb) exit(1); } } + + Car car; myTable.setName("GARAGE"); @@ -173,7 +143,7 @@ int create_table(Ndb * myNdb) myColumn.setName("BRAND"); myColumn.setType(NdbDictionary::Column::Char); - myColumn.setLength(20); + myColumn.setLength(sizeof(car.brand)); myColumn.setPrimaryKey(false); myColumn.setNullable(false); myTable.addColumn(myColumn); @@ -181,7 +151,7 @@ int create_table(Ndb * myNdb) myColumn.setName("COLOR"); myColumn.setType(NdbDictionary::Column::Char); - myColumn.setLength(20); + myColumn.setLength(sizeof(car.color)); myColumn.setPrimaryKey(false); myColumn.setNullable(false); myTable.addColumn(myColumn); @@ -196,93 +166,66 @@ int create_table(Ndb * myNdb) int populate(Ndb * myNdb) { - NdbConnection* myNdbConnection[15]; // For transactions - NdbOperation* myNdbOperation; // For operations - /****************************************************** - * Insert (we do 15 insert transactions in parallel) * - ******************************************************/ + int i; + Car cars[15]; + /** * Five blue mercedes */ - for (int i = 0; i < 5; i++) + for (i = 0; i < 5; i++) { - myNdbConnection[i] = myNdb->startTransaction(); - if (myNdbConnection[i] == NULL) - APIERROR(myNdb->getNdbError()); - myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE"); - // Error check. If error, then maybe table GARAGE is not in database - if (myNdbOperation == NULL) - APIERROR(myNdbConnection[i]->getNdbError()); - myNdbOperation->insertTuple(); - myNdbOperation->equal("REG_NO", i); - myNdbOperation->setValue("BRAND", "Mercedes"); - myNdbOperation->setValue("COLOR", "Blue"); - // Prepare transaction (the transaction is NOT yet sent to NDB) - myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL); + cars[i].reg_no = i; + sprintf(cars[i].brand, "Mercedes"); + sprintf(cars[i].color, "Blue"); } - /** * Five black bmw */ - for (int i = 5; i < 10; i++) + for (i = 5; i < 10; i++) { - myNdbConnection[i] = myNdb->startTransaction(); - if (myNdbConnection[i] == NULL) - APIERROR(myNdb->getNdbError()); - myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE"); - // Error check. If error, then maybe table MYTABLENAME is not in database - if (myNdbOperation == NULL) - APIERROR(myNdbConnection[i]->getNdbError()); - myNdbOperation->insertTuple(); - myNdbOperation->equal("REG_NO", i); - myNdbOperation->setValue("BRAND", "BMW"); - myNdbOperation->setValue("COLOR", "Black"); - // Prepare transaction (the transaction is NOT yet sent to NDB) - myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL); + cars[i].reg_no = i; + sprintf(cars[i].brand, "BMW"); + sprintf(cars[i].color, "Black"); } /** * Five pink toyotas */ - for (int i = 10; i < 15; i++) { - myNdbConnection[i] = myNdb->startTransaction(); - if (myNdbConnection[i] == NULL) APIERROR(myNdb->getNdbError()); - myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE"); + for (i = 10; i < 15; i++) + { + cars[i].reg_no = i; + sprintf(cars[i].brand, "Toyota"); + sprintf(cars[i].color, "Pink"); + } + + NdbTransaction* myTrans = myNdb->startTransaction(); + if (myTrans == NULL) + APIERROR(myNdb->getNdbError()); + + for (i = 0; i < 15; i++) + { + NdbOperation* myNdbOperation = myTrans->getNdbOperation("GARAGE"); // Error check. If error, then maybe table MYTABLENAME is not in database - if (myNdbOperation == NULL) APIERROR(myNdbConnection[i]->getNdbError()); + if (myNdbOperation == NULL) + APIERROR(myTrans->getNdbError()); myNdbOperation->insertTuple(); - myNdbOperation->equal("REG_NO", i); - myNdbOperation->setValue("BRAND", "Toyota"); - myNdbOperation->setValue("COLOR", "Pink"); - // Prepare transaction (the transaction is NOT yet sent to NDB) - myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL); + myNdbOperation->equal("REG_NO", cars[i].reg_no); + myNdbOperation->setValue("BRAND", cars[i].brand); + myNdbOperation->setValue("COLOR", cars[i].color); } - // Send all transactions to NDB - myNdb->sendPreparedTransactions(0); - // Poll all transactions - myNdb->pollNdb(3000, 0); + int check = myTrans->execute(Commit); - // it is also possible to use sendPollNdb instead of - // myNdb->sendPreparedTransactions(0); and myNdb->pollNdb(3000, 15); above. - // myNdb->sendPollNdb(3000,0); - // Note! Neither sendPollNdb or pollNdb returs until all 15 callbacks have - // executed. + myTrans->close(); - // Close all transactions. It is also possible to close transactions - // in the callback. - for (int i = 0; i < 15; i++) - myNdb->closeTransaction(myNdbConnection[i]); - return 1; + return check != -1; } int scan_delete(Ndb* myNdb, - int parallelism, int column, - int column_len, const char * color) - + { // Scan all records exclusive and delete @@ -342,20 +285,20 @@ int scan_delete(Ndb* myNdb, /** * Define a result set for the scan. */ - NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism); - if( rs == 0 ) { + if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } - + /** * Use NdbScanFilter to define a search critera */ NdbScanFilter filter(myScanOp) ; if(filter.begin(NdbScanFilter::AND) < 0 || - filter.eq(column, color, column_len, false) <0|| - filter.end() <0) + filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 || + filter.end() < 0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); @@ -384,9 +327,11 @@ int scan_delete(Ndb* myNdb, * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ - while((check = rs->nextResult(true)) == 0){ - do { - if (rs->deleteTuple() != 0){ + while((check = myScanOp->nextResult(true)) == 0){ + do + { + if (myScanOp->deleteCurrentTuple() != 0) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; @@ -398,21 +343,32 @@ int scan_delete(Ndb* myNdb, * cached in the NDBAPI are modified before * fetching more rows from NDB. */ - } while((check = rs->nextResult(false)) == 0); + } while((check = myScanOp->nextResult(false)) == 0); /** * Commit when all cached tuple have been marked for deletion */ - if(check != -1){ + if(check != -1) + { check = myTrans->execute(Commit); - myTrans->releaseCompletedOperations(); } + + if(check == -1) + { + /** + * Create a new transaction, while keeping scan open + */ + check = myTrans->restart(); + } + /** * Check for errors */ err = myTrans->getNdbError(); - if(check == -1){ - if(err.status == NdbError::TemporaryError){ + if(check == -1) + { + if(err.status == NdbError::TemporaryError) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); @@ -426,10 +382,10 @@ int scan_delete(Ndb* myNdb, std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return 0; - - } - if(myTrans!=0) { + + if(myTrans!=0) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); } @@ -438,10 +394,7 @@ int scan_delete(Ndb* myNdb, int scan_update(Ndb* myNdb, - int parallelism, - int column_len, int update_column, - const char * column_name, const char * before_color, const char * after_color) @@ -505,8 +458,8 @@ int scan_update(Ndb* myNdb, /** * Define a result set for the scan. */ - NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism); - if( rs == 0 ) { + if( myScanOp->readTuplesExclusive(NdbOperation::LM_Exclusive) ) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; @@ -517,7 +470,7 @@ int scan_update(Ndb* myNdb, */ NdbScanFilter filter(myScanOp) ; if(filter.begin(NdbScanFilter::AND) < 0 || - filter.eq(update_column, before_color, column_len, false) <0|| + filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0|| filter.end() <0) { std::cout << myTrans->getNdbError().message << std::endl; @@ -528,7 +481,8 @@ int scan_update(Ndb* myNdb, /** * Start scan (NoCommit since we are only reading at this stage); */ - if(myTrans->execute(NoCommit) != 0){ + if(myTrans->execute(NoCommit) != 0) + { err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; @@ -541,49 +495,49 @@ int scan_update(Ndb* myNdb, return -1; } - /** - * Define an update operation - */ - NdbOperation * myUpdateOp; - /** - * start of loop: nextResult(true) means that "parallelism" number of - * rows are fetched from NDB and cached in NDBAPI - */ - while((check = rs->nextResult(true)) == 0){ + /** + * start of loop: nextResult(true) means that "parallelism" number of + * rows are fetched from NDB and cached in NDBAPI + */ + while((check = myScanOp->nextResult(true)) == 0){ do { /** * Get update operation */ - myUpdateOp = rs->updateTuple(); - if (myUpdateOp == 0){ + NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple(); + if (myUpdateOp == 0) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } updatedRows++; + /** * do the update */ - myUpdateOp->setValue(update_column,after_color); + myUpdateOp->setValue(update_column, after_color); /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ - } while((check = rs->nextResult(false)) == 0); + } while((check = myScanOp->nextResult(false)) == 0); /** - * Commit when all cached tuple have been updated + * NoCommit when all cached tuple have been updated */ - if(check != -1){ - check = myTrans->execute(Commit); - myTrans->releaseCompletedOperations(); + if(check != -1) + { + check = myTrans->execute(NoCommit); } + /** * Check for errors */ err = myTrans->getNdbError(); - if(check == -1){ + if(check == -1) + { if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); @@ -595,13 +549,28 @@ int scan_update(Ndb* myNdb, * End of loop */ } + + /** + * Commit all prepared operations + */ + if(myTrans->execute(Commit) == -1) + { + if(err.status == NdbError::TemporaryError){ + std::cout << myTrans->getNdbError().message << std::endl; + myNdb->closeTransaction(myTrans); + milliSleep(50); + continue; + } + } + std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); - return 0; - - + return 0; } - if(myTrans!=0) { + + + if(myTrans!=0) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); } @@ -610,9 +579,7 @@ int scan_update(Ndb* myNdb, -int scan_print(Ndb * myNdb, int parallelism, - int column_len_brand, - int column_len_color) +int scan_print(Ndb * myNdb) { // Scan all records exclusive and update // them one by one @@ -674,10 +641,10 @@ int scan_print(Ndb * myNdb, int parallelism, } /** - * Define a result set for the scan. - */ - NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism); - if( rs == 0 ) { + * Read without locks, without being placed in lock queue + */ + if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1) + { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; @@ -719,7 +686,7 @@ int scan_print(Ndb * myNdb, int parallelism, * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ - while((check = rs->nextResult(true)) == 0){ + while((check = myScanOp->nextResult(true)) == 0){ do { fetchedRows++; @@ -727,28 +694,23 @@ int scan_print(Ndb * myNdb, int parallelism, * print REG_NO unsigned int */ std::cout << myRecAttr[0]->u_32_value() << "\t"; - char * buf_brand = new char[column_len_brand+1]; - char * buf_color = new char[column_len_color+1]; + /** * print BRAND character string */ - memcpy(buf_brand, myRecAttr[1]->aRef(), column_len_brand); - buf_brand[column_len_brand] = 0; - std::cout << buf_brand << "\t"; - delete [] buf_brand; + std::cout << myRecAttr[1]->aRef() << "\t"; + /** * print COLOR character string */ - memcpy(buf_color, myRecAttr[2]->aRef(), column_len_color); - buf_brand[column_len_color] = 0; - std::cout << buf_color << std::endl; - delete [] buf_color; + std::cout << myRecAttr[2]->aRef() << std::endl; + /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ - } while((check = rs->nextResult(false)) == 0); + } while((check = myScanOp->nextResult(false)) == 0); } myNdb->closeTransaction(myTrans); @@ -762,55 +724,67 @@ int scan_print(Ndb * myNdb, int parallelism, int main() { ndb_init(); - Ndb* myNdb = new Ndb( "TEST_DB" ); // Object representing the database + Ndb_cluster_connection cluster_connection; + + if (cluster_connection.connect(12, 5, 1)) + { + std::cout << "Unable to connect to cluster within 30 secs." << std::endl; + exit(-1); + } + + if (cluster_connection.wait_until_ready(30,30)) + { + std::cout << "Cluster was not ready within 30 secs." << std::endl; + exit(-1); + } + Ndb myNdb(&cluster_connection,"TEST_DB" ); /******************************************* * Initialize NDB and wait until its ready * *******************************************/ - if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions - APIERROR(myNdb->getNdbError()); + if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions + APIERROR(myNdb.getNdbError()); exit(-1); } - if (myNdb->waitUntilReady(30) != 0) { - std::cout << "NDB was not ready within 30 secs." << std::endl; - exit(-1); - } - create_table(myNdb); + create_table(&myNdb); - NdbDictionary::Dictionary* myDict = myNdb->getDictionary(); + NdbDictionary::Dictionary* myDict = myNdb.getDictionary(); int column_color = myDict->getTable("GARAGE")->getColumn("COLOR")->getColumnNo(); - int column_len_color = - myDict->getTable("GARAGE")->getColumn("COLOR")->getLength(); - int column_len_brand = - myDict->getTable("GARAGE")->getColumn("BRAND")->getLength(); - int parallelism = 16; - - if(populate(myNdb) > 0) + if(populate(&myNdb) > 0) std::cout << "populate: Success!" << std::endl; - - if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0) + + if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; std::cout << "Going to delete all pink cars!" << std::endl; - if(scan_delete(myNdb, parallelism, column_color, - column_len_color, "Pink") > 0) - std::cout << "scan_delete: Success!" << std::endl << std::endl; + + { + /** + * Note! color needs to be of exact the same size as column defined + */ + char color[20] = "Pink"; + if(scan_delete(&myNdb, column_color, color) > 0) + std::cout << "scan_delete: Success!" << std::endl << std::endl; + } - if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0) + if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; - std::cout << "Going to update all blue cars to black cars!" << std::endl; - if(scan_update(myNdb, parallelism, column_len_color, column_color, - "COLOR", "Blue", "Black") > 0) { - std::cout << "scan_update: Success!" << std::endl << std::endl; + /** + * Note! color1 & 2 need to be of exact the same size as column defined + */ + char color1[20] = "Blue"; + char color2[20] = "Black"; + std::cout << "Going to update all " << color1 + << " cars to " << color2 << " cars!" << std::endl; + if(scan_update(&myNdb, column_color, color1, color2) > 0) + std::cout << "scan_update: Success!" << std::endl << std::endl; } - if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0) + if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; - - delete myNdb; } diff --git a/ndb/include/ndbapi/NdbScanFilter.hpp b/ndb/include/ndbapi/NdbScanFilter.hpp index 6acb0456333..5f214451321 100644 --- a/ndb/include/ndbapi/NdbScanFilter.hpp +++ b/ndb/include/ndbapi/NdbScanFilter.hpp @@ -88,7 +88,7 @@ public: /** * Compare column ColId with val */ - int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len); + int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len = 0); /** * @name Integer Comparators