mirror of
https://github.com/MariaDB/server.git
synced 2025-08-30 11:22:14 +03:00
ndb Updated example
This commit is contained in:
@@ -30,15 +30,19 @@
|
|||||||
* getDictionary()
|
* getDictionary()
|
||||||
* startTransaction()
|
* startTransaction()
|
||||||
* closeTransaction()
|
* closeTransaction()
|
||||||
* sendPreparedTransactions()
|
|
||||||
* pollNdb()
|
|
||||||
*
|
*
|
||||||
* NdbConnection
|
* NdbTransaction
|
||||||
* getNdbOperation()
|
* getNdbScanOperation()
|
||||||
* executeAsynchPrepare()
|
* execute()
|
||||||
* getNdbError()
|
*
|
||||||
* executeScan()
|
* NdbResultSet
|
||||||
* nextScanResult()
|
*
|
||||||
|
* NdbScanOperation
|
||||||
|
* getValue()
|
||||||
|
* readTuples()
|
||||||
|
* nextResult()
|
||||||
|
* deleteCurrentTuple()
|
||||||
|
* updateCurrentTuple()
|
||||||
*
|
*
|
||||||
* NdbDictionary::Dictionary
|
* NdbDictionary::Dictionary
|
||||||
* getTable()
|
* getTable()
|
||||||
@@ -60,33 +64,15 @@
|
|||||||
* insertTuple()
|
* insertTuple()
|
||||||
* equal()
|
* equal()
|
||||||
* setValue()
|
* setValue()
|
||||||
* openScanRead()
|
|
||||||
* openScanExclusive()
|
|
||||||
*
|
|
||||||
* NdbRecAttr
|
|
||||||
* aRef()
|
|
||||||
* u_32_value()
|
|
||||||
*
|
|
||||||
* NdbResultSet
|
|
||||||
* nextResult()
|
|
||||||
* deleteTuple()
|
|
||||||
* updateTuple()
|
|
||||||
*
|
|
||||||
* NdbScanOperation
|
|
||||||
* getValue()
|
|
||||||
* readTuplesExclusive()
|
|
||||||
*
|
*
|
||||||
* NdbScanFilter
|
* NdbScanFilter
|
||||||
* begin()
|
* begin()
|
||||||
* eq()
|
* eq()
|
||||||
* end()
|
* end()
|
||||||
*
|
*
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
#include <ndb_global.h>
|
|
||||||
|
|
||||||
#include <NdbApi.hpp>
|
#include <NdbApi.hpp>
|
||||||
#include <NdbScanFilter.hpp>
|
#include <NdbScanFilter.hpp>
|
||||||
// Used for cout
|
// Used for cout
|
||||||
@@ -114,30 +100,12 @@ milliSleep(int milliseconds){
|
|||||||
<< error.code << ", msg: " << error.message << "." << std::endl; \
|
<< error.code << ", msg: " << error.message << "." << std::endl; \
|
||||||
exit(-1); }
|
exit(-1); }
|
||||||
|
|
||||||
/*
|
struct Car
|
||||||
* callback : This is called when the transaction is polled
|
|
||||||
*
|
|
||||||
* (This function must have three arguments:
|
|
||||||
* - The result of the transaction,
|
|
||||||
* - The NdbConnection object, and
|
|
||||||
* - A pointer to an arbitrary object.)
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
callback(int result, NdbConnection* myTrans, void* aObject)
|
|
||||||
{
|
{
|
||||||
if (result == -1) {
|
unsigned int reg_no;
|
||||||
std::cout << "In callback: " << std::endl;
|
char brand[20];
|
||||||
/**
|
char color[20];
|
||||||
* Put error checking code here (see ndb_async_example)
|
};
|
||||||
*/
|
|
||||||
APIERROR(myTrans->getNdbError());
|
|
||||||
} else {
|
|
||||||
/**
|
|
||||||
* Ok!
|
|
||||||
*/
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function to create table
|
* Function to create table
|
||||||
@@ -162,6 +130,8 @@ int create_table(Ndb * myNdb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Car car;
|
||||||
|
|
||||||
myTable.setName("GARAGE");
|
myTable.setName("GARAGE");
|
||||||
|
|
||||||
myColumn.setName("REG_NO");
|
myColumn.setName("REG_NO");
|
||||||
@@ -173,7 +143,7 @@ int create_table(Ndb * myNdb)
|
|||||||
|
|
||||||
myColumn.setName("BRAND");
|
myColumn.setName("BRAND");
|
||||||
myColumn.setType(NdbDictionary::Column::Char);
|
myColumn.setType(NdbDictionary::Column::Char);
|
||||||
myColumn.setLength(20);
|
myColumn.setLength(sizeof(car.brand));
|
||||||
myColumn.setPrimaryKey(false);
|
myColumn.setPrimaryKey(false);
|
||||||
myColumn.setNullable(false);
|
myColumn.setNullable(false);
|
||||||
myTable.addColumn(myColumn);
|
myTable.addColumn(myColumn);
|
||||||
@@ -181,7 +151,7 @@ int create_table(Ndb * myNdb)
|
|||||||
|
|
||||||
myColumn.setName("COLOR");
|
myColumn.setName("COLOR");
|
||||||
myColumn.setType(NdbDictionary::Column::Char);
|
myColumn.setType(NdbDictionary::Column::Char);
|
||||||
myColumn.setLength(20);
|
myColumn.setLength(sizeof(car.color));
|
||||||
myColumn.setPrimaryKey(false);
|
myColumn.setPrimaryKey(false);
|
||||||
myColumn.setNullable(false);
|
myColumn.setNullable(false);
|
||||||
myTable.addColumn(myColumn);
|
myTable.addColumn(myColumn);
|
||||||
@@ -196,91 +166,64 @@ int create_table(Ndb * myNdb)
|
|||||||
|
|
||||||
int populate(Ndb * myNdb)
|
int populate(Ndb * myNdb)
|
||||||
{
|
{
|
||||||
NdbConnection* myNdbConnection[15]; // For transactions
|
int i;
|
||||||
NdbOperation* myNdbOperation; // For operations
|
Car cars[15];
|
||||||
/******************************************************
|
|
||||||
* Insert (we do 15 insert transactions in parallel) *
|
|
||||||
******************************************************/
|
|
||||||
/**
|
/**
|
||||||
* Five blue mercedes
|
* Five blue mercedes
|
||||||
*/
|
*/
|
||||||
for (int i = 0; i < 5; i++)
|
for (i = 0; i < 5; i++)
|
||||||
{
|
{
|
||||||
myNdbConnection[i] = myNdb->startTransaction();
|
cars[i].reg_no = i;
|
||||||
if (myNdbConnection[i] == NULL)
|
sprintf(cars[i].brand, "Mercedes");
|
||||||
APIERROR(myNdb->getNdbError());
|
sprintf(cars[i].color, "Blue");
|
||||||
myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
|
|
||||||
// Error check. If error, then maybe table GARAGE is not in database
|
|
||||||
if (myNdbOperation == NULL)
|
|
||||||
APIERROR(myNdbConnection[i]->getNdbError());
|
|
||||||
myNdbOperation->insertTuple();
|
|
||||||
myNdbOperation->equal("REG_NO", i);
|
|
||||||
myNdbOperation->setValue("BRAND", "Mercedes");
|
|
||||||
myNdbOperation->setValue("COLOR", "Blue");
|
|
||||||
// Prepare transaction (the transaction is NOT yet sent to NDB)
|
|
||||||
myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Five black bmw
|
* Five black bmw
|
||||||
*/
|
*/
|
||||||
for (int i = 5; i < 10; i++)
|
for (i = 5; i < 10; i++)
|
||||||
{
|
{
|
||||||
myNdbConnection[i] = myNdb->startTransaction();
|
cars[i].reg_no = i;
|
||||||
if (myNdbConnection[i] == NULL)
|
sprintf(cars[i].brand, "BMW");
|
||||||
APIERROR(myNdb->getNdbError());
|
sprintf(cars[i].color, "Black");
|
||||||
myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
|
|
||||||
// Error check. If error, then maybe table MYTABLENAME is not in database
|
|
||||||
if (myNdbOperation == NULL)
|
|
||||||
APIERROR(myNdbConnection[i]->getNdbError());
|
|
||||||
myNdbOperation->insertTuple();
|
|
||||||
myNdbOperation->equal("REG_NO", i);
|
|
||||||
myNdbOperation->setValue("BRAND", "BMW");
|
|
||||||
myNdbOperation->setValue("COLOR", "Black");
|
|
||||||
// Prepare transaction (the transaction is NOT yet sent to NDB)
|
|
||||||
myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Five pink toyotas
|
* Five pink toyotas
|
||||||
*/
|
*/
|
||||||
for (int i = 10; i < 15; i++) {
|
for (i = 10; i < 15; i++)
|
||||||
myNdbConnection[i] = myNdb->startTransaction();
|
{
|
||||||
if (myNdbConnection[i] == NULL) APIERROR(myNdb->getNdbError());
|
cars[i].reg_no = i;
|
||||||
myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
|
sprintf(cars[i].brand, "Toyota");
|
||||||
// Error check. If error, then maybe table MYTABLENAME is not in database
|
sprintf(cars[i].color, "Pink");
|
||||||
if (myNdbOperation == NULL) APIERROR(myNdbConnection[i]->getNdbError());
|
|
||||||
myNdbOperation->insertTuple();
|
|
||||||
myNdbOperation->equal("REG_NO", i);
|
|
||||||
myNdbOperation->setValue("BRAND", "Toyota");
|
|
||||||
myNdbOperation->setValue("COLOR", "Pink");
|
|
||||||
// Prepare transaction (the transaction is NOT yet sent to NDB)
|
|
||||||
myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send all transactions to NDB
|
NdbTransaction* myTrans = myNdb->startTransaction();
|
||||||
myNdb->sendPreparedTransactions(0);
|
if (myTrans == NULL)
|
||||||
// Poll all transactions
|
APIERROR(myNdb->getNdbError());
|
||||||
myNdb->pollNdb(3000, 0);
|
|
||||||
|
|
||||||
// it is also possible to use sendPollNdb instead of
|
for (i = 0; i < 15; i++)
|
||||||
// myNdb->sendPreparedTransactions(0); and myNdb->pollNdb(3000, 15); above.
|
{
|
||||||
// myNdb->sendPollNdb(3000,0);
|
NdbOperation* myNdbOperation = myTrans->getNdbOperation("GARAGE");
|
||||||
// Note! Neither sendPollNdb or pollNdb returs until all 15 callbacks have
|
// Error check. If error, then maybe table MYTABLENAME is not in database
|
||||||
// executed.
|
if (myNdbOperation == NULL)
|
||||||
|
APIERROR(myTrans->getNdbError());
|
||||||
|
myNdbOperation->insertTuple();
|
||||||
|
myNdbOperation->equal("REG_NO", cars[i].reg_no);
|
||||||
|
myNdbOperation->setValue("BRAND", cars[i].brand);
|
||||||
|
myNdbOperation->setValue("COLOR", cars[i].color);
|
||||||
|
}
|
||||||
|
|
||||||
// Close all transactions. It is also possible to close transactions
|
int check = myTrans->execute(Commit);
|
||||||
// in the callback.
|
|
||||||
for (int i = 0; i < 15; i++)
|
myTrans->close();
|
||||||
myNdb->closeTransaction(myNdbConnection[i]);
|
|
||||||
return 1;
|
return check != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int scan_delete(Ndb* myNdb,
|
int scan_delete(Ndb* myNdb,
|
||||||
int parallelism,
|
|
||||||
int column,
|
int column,
|
||||||
int column_len,
|
|
||||||
const char * color)
|
const char * color)
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -342,8 +285,8 @@ int scan_delete(Ndb* myNdb,
|
|||||||
/**
|
/**
|
||||||
* Define a result set for the scan.
|
* Define a result set for the scan.
|
||||||
*/
|
*/
|
||||||
NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
|
if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
|
||||||
if( rs == 0 ) {
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -354,8 +297,8 @@ int scan_delete(Ndb* myNdb,
|
|||||||
*/
|
*/
|
||||||
NdbScanFilter filter(myScanOp) ;
|
NdbScanFilter filter(myScanOp) ;
|
||||||
if(filter.begin(NdbScanFilter::AND) < 0 ||
|
if(filter.begin(NdbScanFilter::AND) < 0 ||
|
||||||
filter.eq(column, color, column_len, false) <0||
|
filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
|
||||||
filter.end() <0)
|
filter.end() < 0)
|
||||||
{
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
@@ -384,9 +327,11 @@ int scan_delete(Ndb* myNdb,
|
|||||||
* start of loop: nextResult(true) means that "parallelism" number of
|
* start of loop: nextResult(true) means that "parallelism" number of
|
||||||
* rows are fetched from NDB and cached in NDBAPI
|
* rows are fetched from NDB and cached in NDBAPI
|
||||||
*/
|
*/
|
||||||
while((check = rs->nextResult(true)) == 0){
|
while((check = myScanOp->nextResult(true)) == 0){
|
||||||
do {
|
do
|
||||||
if (rs->deleteTuple() != 0){
|
{
|
||||||
|
if (myScanOp->deleteCurrentTuple() != 0)
|
||||||
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -398,21 +343,32 @@ int scan_delete(Ndb* myNdb,
|
|||||||
* cached in the NDBAPI are modified before
|
* cached in the NDBAPI are modified before
|
||||||
* fetching more rows from NDB.
|
* fetching more rows from NDB.
|
||||||
*/
|
*/
|
||||||
} while((check = rs->nextResult(false)) == 0);
|
} while((check = myScanOp->nextResult(false)) == 0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit when all cached tuple have been marked for deletion
|
* Commit when all cached tuple have been marked for deletion
|
||||||
*/
|
*/
|
||||||
if(check != -1){
|
if(check != -1)
|
||||||
|
{
|
||||||
check = myTrans->execute(Commit);
|
check = myTrans->execute(Commit);
|
||||||
myTrans->releaseCompletedOperations();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(check == -1)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Create a new transaction, while keeping scan open
|
||||||
|
*/
|
||||||
|
check = myTrans->restart();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check for errors
|
* Check for errors
|
||||||
*/
|
*/
|
||||||
err = myTrans->getNdbError();
|
err = myTrans->getNdbError();
|
||||||
if(check == -1){
|
if(check == -1)
|
||||||
if(err.status == NdbError::TemporaryError){
|
{
|
||||||
|
if(err.status == NdbError::TemporaryError)
|
||||||
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
milliSleep(50);
|
milliSleep(50);
|
||||||
@@ -426,10 +382,10 @@ int scan_delete(Ndb* myNdb,
|
|||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if(myTrans!=0) {
|
|
||||||
|
if(myTrans!=0)
|
||||||
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
}
|
}
|
||||||
@@ -438,10 +394,7 @@ int scan_delete(Ndb* myNdb,
|
|||||||
|
|
||||||
|
|
||||||
int scan_update(Ndb* myNdb,
|
int scan_update(Ndb* myNdb,
|
||||||
int parallelism,
|
|
||||||
int column_len,
|
|
||||||
int update_column,
|
int update_column,
|
||||||
const char * column_name,
|
|
||||||
const char * before_color,
|
const char * before_color,
|
||||||
const char * after_color)
|
const char * after_color)
|
||||||
|
|
||||||
@@ -505,8 +458,8 @@ int scan_update(Ndb* myNdb,
|
|||||||
/**
|
/**
|
||||||
* Define a result set for the scan.
|
* Define a result set for the scan.
|
||||||
*/
|
*/
|
||||||
NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
|
if( myScanOp->readTuplesExclusive(NdbOperation::LM_Exclusive) )
|
||||||
if( rs == 0 ) {
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -517,7 +470,7 @@ int scan_update(Ndb* myNdb,
|
|||||||
*/
|
*/
|
||||||
NdbScanFilter filter(myScanOp) ;
|
NdbScanFilter filter(myScanOp) ;
|
||||||
if(filter.begin(NdbScanFilter::AND) < 0 ||
|
if(filter.begin(NdbScanFilter::AND) < 0 ||
|
||||||
filter.eq(update_column, before_color, column_len, false) <0||
|
filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
|
||||||
filter.end() <0)
|
filter.end() <0)
|
||||||
{
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
@@ -528,7 +481,8 @@ int scan_update(Ndb* myNdb,
|
|||||||
/**
|
/**
|
||||||
* Start scan (NoCommit since we are only reading at this stage);
|
* Start scan (NoCommit since we are only reading at this stage);
|
||||||
*/
|
*/
|
||||||
if(myTrans->execute(NoCommit) != 0){
|
if(myTrans->execute(NoCommit) != 0)
|
||||||
|
{
|
||||||
err = myTrans->getNdbError();
|
err = myTrans->getNdbError();
|
||||||
if(err.status == NdbError::TemporaryError){
|
if(err.status == NdbError::TemporaryError){
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
@@ -541,49 +495,49 @@ int scan_update(Ndb* myNdb,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Define an update operation
|
|
||||||
*/
|
|
||||||
NdbOperation * myUpdateOp;
|
|
||||||
/**
|
/**
|
||||||
* start of loop: nextResult(true) means that "parallelism" number of
|
* start of loop: nextResult(true) means that "parallelism" number of
|
||||||
* rows are fetched from NDB and cached in NDBAPI
|
* rows are fetched from NDB and cached in NDBAPI
|
||||||
*/
|
*/
|
||||||
while((check = rs->nextResult(true)) == 0){
|
while((check = myScanOp->nextResult(true)) == 0){
|
||||||
do {
|
do {
|
||||||
/**
|
/**
|
||||||
* Get update operation
|
* Get update operation
|
||||||
*/
|
*/
|
||||||
myUpdateOp = rs->updateTuple();
|
NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
|
||||||
if (myUpdateOp == 0){
|
if (myUpdateOp == 0)
|
||||||
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
updatedRows++;
|
updatedRows++;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* do the update
|
* do the update
|
||||||
*/
|
*/
|
||||||
myUpdateOp->setValue(update_column,after_color);
|
myUpdateOp->setValue(update_column, after_color);
|
||||||
/**
|
/**
|
||||||
* nextResult(false) means that the records
|
* nextResult(false) means that the records
|
||||||
* cached in the NDBAPI are modified before
|
* cached in the NDBAPI are modified before
|
||||||
* fetching more rows from NDB.
|
* fetching more rows from NDB.
|
||||||
*/
|
*/
|
||||||
} while((check = rs->nextResult(false)) == 0);
|
} while((check = myScanOp->nextResult(false)) == 0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit when all cached tuple have been updated
|
* NoCommit when all cached tuple have been updated
|
||||||
*/
|
*/
|
||||||
if(check != -1){
|
if(check != -1)
|
||||||
check = myTrans->execute(Commit);
|
{
|
||||||
myTrans->releaseCompletedOperations();
|
check = myTrans->execute(NoCommit);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check for errors
|
* Check for errors
|
||||||
*/
|
*/
|
||||||
err = myTrans->getNdbError();
|
err = myTrans->getNdbError();
|
||||||
if(check == -1){
|
if(check == -1)
|
||||||
|
{
|
||||||
if(err.status == NdbError::TemporaryError){
|
if(err.status == NdbError::TemporaryError){
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
@@ -595,13 +549,28 @@ int scan_update(Ndb* myNdb,
|
|||||||
* End of loop
|
* End of loop
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit all prepared operations
|
||||||
|
*/
|
||||||
|
if(myTrans->execute(Commit) == -1)
|
||||||
|
{
|
||||||
|
if(err.status == NdbError::TemporaryError){
|
||||||
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
|
myNdb->closeTransaction(myTrans);
|
||||||
|
milliSleep(50);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if(myTrans!=0) {
|
|
||||||
|
|
||||||
|
if(myTrans!=0)
|
||||||
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
}
|
}
|
||||||
@@ -610,9 +579,7 @@ int scan_update(Ndb* myNdb,
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int scan_print(Ndb * myNdb, int parallelism,
|
int scan_print(Ndb * myNdb)
|
||||||
int column_len_brand,
|
|
||||||
int column_len_color)
|
|
||||||
{
|
{
|
||||||
// Scan all records exclusive and update
|
// Scan all records exclusive and update
|
||||||
// them one by one
|
// them one by one
|
||||||
@@ -674,10 +641,10 @@ int scan_print(Ndb * myNdb, int parallelism,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Define a result set for the scan.
|
* Read without locks, without being placed in lock queue
|
||||||
*/
|
*/
|
||||||
NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
|
if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
|
||||||
if( rs == 0 ) {
|
{
|
||||||
std::cout << myTrans->getNdbError().message << std::endl;
|
std::cout << myTrans->getNdbError().message << std::endl;
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -719,7 +686,7 @@ int scan_print(Ndb * myNdb, int parallelism,
|
|||||||
* start of loop: nextResult(true) means that "parallelism" number of
|
* start of loop: nextResult(true) means that "parallelism" number of
|
||||||
* rows are fetched from NDB and cached in NDBAPI
|
* rows are fetched from NDB and cached in NDBAPI
|
||||||
*/
|
*/
|
||||||
while((check = rs->nextResult(true)) == 0){
|
while((check = myScanOp->nextResult(true)) == 0){
|
||||||
do {
|
do {
|
||||||
|
|
||||||
fetchedRows++;
|
fetchedRows++;
|
||||||
@@ -727,28 +694,23 @@ int scan_print(Ndb * myNdb, int parallelism,
|
|||||||
* print REG_NO unsigned int
|
* print REG_NO unsigned int
|
||||||
*/
|
*/
|
||||||
std::cout << myRecAttr[0]->u_32_value() << "\t";
|
std::cout << myRecAttr[0]->u_32_value() << "\t";
|
||||||
char * buf_brand = new char[column_len_brand+1];
|
|
||||||
char * buf_color = new char[column_len_color+1];
|
|
||||||
/**
|
/**
|
||||||
* print BRAND character string
|
* print BRAND character string
|
||||||
*/
|
*/
|
||||||
memcpy(buf_brand, myRecAttr[1]->aRef(), column_len_brand);
|
std::cout << myRecAttr[1]->aRef() << "\t";
|
||||||
buf_brand[column_len_brand] = 0;
|
|
||||||
std::cout << buf_brand << "\t";
|
|
||||||
delete [] buf_brand;
|
|
||||||
/**
|
/**
|
||||||
* print COLOR character string
|
* print COLOR character string
|
||||||
*/
|
*/
|
||||||
memcpy(buf_color, myRecAttr[2]->aRef(), column_len_color);
|
std::cout << myRecAttr[2]->aRef() << std::endl;
|
||||||
buf_brand[column_len_color] = 0;
|
|
||||||
std::cout << buf_color << std::endl;
|
|
||||||
delete [] buf_color;
|
|
||||||
/**
|
/**
|
||||||
* nextResult(false) means that the records
|
* nextResult(false) means that the records
|
||||||
* cached in the NDBAPI are modified before
|
* cached in the NDBAPI are modified before
|
||||||
* fetching more rows from NDB.
|
* fetching more rows from NDB.
|
||||||
*/
|
*/
|
||||||
} while((check = rs->nextResult(false)) == 0);
|
} while((check = myScanOp->nextResult(false)) == 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
myNdb->closeTransaction(myTrans);
|
myNdb->closeTransaction(myTrans);
|
||||||
@@ -762,55 +724,67 @@ int scan_print(Ndb * myNdb, int parallelism,
|
|||||||
int main()
|
int main()
|
||||||
{
|
{
|
||||||
ndb_init();
|
ndb_init();
|
||||||
Ndb* myNdb = new Ndb( "TEST_DB" ); // Object representing the database
|
|
||||||
|
|
||||||
|
Ndb_cluster_connection cluster_connection;
|
||||||
|
|
||||||
|
if (cluster_connection.connect(12, 5, 1))
|
||||||
|
{
|
||||||
|
std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster_connection.wait_until_ready(30,30))
|
||||||
|
{
|
||||||
|
std::cout << "Cluster was not ready within 30 secs." << std::endl;
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ndb myNdb(&cluster_connection,"TEST_DB" );
|
||||||
|
|
||||||
/*******************************************
|
/*******************************************
|
||||||
* Initialize NDB and wait until its ready *
|
* Initialize NDB and wait until its ready *
|
||||||
*******************************************/
|
*******************************************/
|
||||||
if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions
|
if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
|
||||||
APIERROR(myNdb->getNdbError());
|
APIERROR(myNdb.getNdbError());
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (myNdb->waitUntilReady(30) != 0) {
|
create_table(&myNdb);
|
||||||
std::cout << "NDB was not ready within 30 secs." << std::endl;
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
create_table(myNdb);
|
|
||||||
|
|
||||||
NdbDictionary::Dictionary* myDict = myNdb->getDictionary();
|
NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
|
||||||
int column_color = myDict->getTable("GARAGE")->getColumn("COLOR")->getColumnNo();
|
int column_color = myDict->getTable("GARAGE")->getColumn("COLOR")->getColumnNo();
|
||||||
int column_len_color =
|
|
||||||
myDict->getTable("GARAGE")->getColumn("COLOR")->getLength();
|
|
||||||
int column_len_brand =
|
|
||||||
myDict->getTable("GARAGE")->getColumn("BRAND")->getLength();
|
|
||||||
int parallelism = 16;
|
|
||||||
|
|
||||||
|
if(populate(&myNdb) > 0)
|
||||||
if(populate(myNdb) > 0)
|
|
||||||
std::cout << "populate: Success!" << std::endl;
|
std::cout << "populate: Success!" << std::endl;
|
||||||
|
|
||||||
if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
|
if(scan_print(&myNdb) > 0)
|
||||||
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
||||||
|
|
||||||
std::cout << "Going to delete all pink cars!" << std::endl;
|
std::cout << "Going to delete all pink cars!" << std::endl;
|
||||||
if(scan_delete(myNdb, parallelism, column_color,
|
|
||||||
column_len_color, "Pink") > 0)
|
|
||||||
std::cout << "scan_delete: Success!" << std::endl << std::endl;
|
|
||||||
|
|
||||||
if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
|
{
|
||||||
|
/**
|
||||||
|
* Note! color needs to be of exact the same size as column defined
|
||||||
|
*/
|
||||||
|
char color[20] = "Pink";
|
||||||
|
if(scan_delete(&myNdb, column_color, color) > 0)
|
||||||
|
std::cout << "scan_delete: Success!" << std::endl << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(scan_print(&myNdb) > 0)
|
||||||
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
||||||
|
|
||||||
std::cout << "Going to update all blue cars to black cars!" << std::endl;
|
|
||||||
if(scan_update(myNdb, parallelism, column_len_color, column_color,
|
|
||||||
"COLOR", "Blue", "Black") > 0)
|
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Note! color1 & 2 need to be of exact the same size as column defined
|
||||||
|
*/
|
||||||
|
char color1[20] = "Blue";
|
||||||
|
char color2[20] = "Black";
|
||||||
|
std::cout << "Going to update all " << color1
|
||||||
|
<< " cars to " << color2 << " cars!" << std::endl;
|
||||||
|
if(scan_update(&myNdb, column_color, color1, color2) > 0)
|
||||||
std::cout << "scan_update: Success!" << std::endl << std::endl;
|
std::cout << "scan_update: Success!" << std::endl << std::endl;
|
||||||
}
|
}
|
||||||
if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
|
if(scan_print(&myNdb) > 0)
|
||||||
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
std::cout << "scan_print: Success!" << std::endl << std::endl;
|
||||||
|
|
||||||
delete myNdb;
|
|
||||||
}
|
}
|
||||||
|
@@ -88,7 +88,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Compare column <b>ColId</b> with <b>val</b>
|
* Compare column <b>ColId</b> with <b>val</b>
|
||||||
*/
|
*/
|
||||||
int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len);
|
int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len = 0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @name Integer Comparators
|
* @name Integer Comparators
|
||||||
|
Reference in New Issue
Block a user