mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
moved all ndb thread specific data into new placeholder
new methods to keep "records" up to date unset flag HA_NOT_EXACT_COUNT to make handler read "records" field, for count() optim and join optimization new methods to keep "records" up to datecorrect record field in ndbcluster handler new method for ndbcluster handler to store/retrieve table and thread specific data changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl hanged deleteKey to return ponter to deleted object moved heavy global cache fetch from inline to separate method mysql-test/r/ndb_alter_table.result: correct record field in ndbcluster handler mysql-test/r/ndb_blob.result: correct record field in ndbcluster handler ndb/include/ndbapi/NdbDictionary.hpp: new method for ndbcluster handler to store/retrieve table and thread specific data ndb/src/ndbapi/DictCache.cpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl ndb/src/ndbapi/DictCache.hpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl ndb/src/ndbapi/Ndb.cpp: replaced method DictionaryImpl::getTable with DictionaryImpl::get_local_table_info ndb/src/ndbapi/NdbDictionary.cpp: new method for ndbcluster handler to store/retrieve table and thread specific data ndb/src/ndbapi/NdbDictionaryImpl.cpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl moved heavy global cache fetch from inline to separate method ndb/src/ndbapi/NdbDictionaryImpl.hpp: replaced method DictionaryImpl::getTable with DictionaryImpl::get_local_table_info ndb/src/ndbapi/NdbLinHash.hpp: changed deleteKey to return ponter to deleted object sql/ha_ndbcluster.cc: moved all ndb thread specific data into new placeholder new methods to keep "records" up to date unset flag HA_NOT_EXACT_COUNT to make handler read "records" field, for count() optim and join optimization sql/ha_ndbcluster.h: new methods to keep "records" up to date sql/sql_class.h: moved all ndb thread specific data into new placeholder
This commit is contained in:
@ -18,12 +18,12 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
|
||||
col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
|
||||
show table status;
|
||||
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
|
||||
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
|
||||
t1 ndbcluster 9 Dynamic 0 0 0 NULL 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
|
||||
insert into t1 values
|
||||
(0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7);
|
||||
show table status;
|
||||
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
|
||||
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
|
||||
t1 ndbcluster 9 Dynamic 9 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
|
||||
select * from t1 order by col1;
|
||||
col1 col2 col3 col4 col5 col6 to_be_deleted
|
||||
0 4 3 5 PENDING 1 7
|
||||
@ -43,7 +43,7 @@ change column col2 fourth varchar(30) not null after col3,
|
||||
modify column col6 int not null first;
|
||||
show table status;
|
||||
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
|
||||
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
|
||||
t1 ndbcluster 9 Dynamic 9 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
|
||||
select * from t1 order by col1;
|
||||
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
|
||||
1 0 3 4 5 PENDING 0000-00-00 00:00:00
|
||||
@ -58,7 +58,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
|
||||
insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00');
|
||||
show table status;
|
||||
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
|
||||
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
|
||||
t1 ndbcluster 9 Dynamic 10 0 0 NULL 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
|
||||
select * from t1 order by col1;
|
||||
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
|
||||
1 0 3 4 5 PENDING 0000-00-00 00:00:00
|
||||
|
@ -150,7 +150,7 @@ insert into t1 values(9,'b9',999,'dd9');
|
||||
commit;
|
||||
explain select * from t1;
|
||||
id select_type table type possible_keys key key_len ref rows Extra
|
||||
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
|
||||
1 SIMPLE t1 ALL NULL NULL NULL NULL 9
|
||||
select * from t1 order by a;
|
||||
a b c d
|
||||
1 b1 111 dd1
|
||||
@ -185,7 +185,7 @@ insert into t1 values(2,@b2,222,@d2);
|
||||
commit;
|
||||
explain select * from t1;
|
||||
id select_type table type possible_keys key key_len ref rows Extra
|
||||
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
|
||||
1 SIMPLE t1 ALL NULL NULL NULL NULL 2
|
||||
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
|
||||
from t1 order by a;
|
||||
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
|
||||
|
@ -1066,6 +1066,8 @@ public:
|
||||
Dictionary(NdbDictionaryImpl&);
|
||||
const Table * getIndexTable(const char * indexName,
|
||||
const char * tableName);
|
||||
public:
|
||||
const Table * getTable(const char * name, void **data);
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -21,6 +21,21 @@
|
||||
#include <NdbCondition.h>
|
||||
#include <NdbSleep.h>
|
||||
|
||||
Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl, Uint32 sz)
|
||||
{
|
||||
m_table_impl= table_impl;
|
||||
if (sz)
|
||||
m_local_data= malloc(sz);
|
||||
else
|
||||
m_local_data= 0;
|
||||
}
|
||||
|
||||
Ndb_local_table_info::~Ndb_local_table_info()
|
||||
{
|
||||
if (m_local_data)
|
||||
free(m_local_data);
|
||||
}
|
||||
|
||||
LocalDictCache::LocalDictCache(){
|
||||
m_tableHash.createHashTable();
|
||||
}
|
||||
@ -29,22 +44,24 @@ LocalDictCache::~LocalDictCache(){
|
||||
m_tableHash.releaseHashTable();
|
||||
}
|
||||
|
||||
NdbTableImpl *
|
||||
Ndb_local_table_info *
|
||||
LocalDictCache::get(const char * name){
|
||||
const Uint32 len = strlen(name);
|
||||
return m_tableHash.getData(name, len);
|
||||
}
|
||||
|
||||
void
|
||||
LocalDictCache::put(const char * name, NdbTableImpl * tab){
|
||||
const Uint32 id = tab->m_tableId;
|
||||
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
|
||||
const Uint32 id = tab_info->m_table_impl->m_tableId;
|
||||
|
||||
m_tableHash.insertKey(name, strlen(name), id, tab);
|
||||
m_tableHash.insertKey(name, strlen(name), id, tab_info);
|
||||
}
|
||||
|
||||
void
|
||||
LocalDictCache::drop(const char * name){
|
||||
m_tableHash.deleteKey(name, strlen(name));
|
||||
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
|
||||
DBUG_ASSERT(info != 0);
|
||||
delete info;
|
||||
}
|
||||
|
||||
/*****************************************************************
|
||||
|
@ -27,6 +27,16 @@
|
||||
#include <Ndb.hpp>
|
||||
#include "NdbLinHash.hpp"
|
||||
|
||||
class Ndb_local_table_info {
|
||||
public:
|
||||
Ndb_local_table_info(NdbTableImpl *table_impl, Uint32 sz=0);
|
||||
~Ndb_local_table_info();
|
||||
NdbTableImpl *m_table_impl;
|
||||
Uint64 m_first_tuple_id;
|
||||
Uint64 m_last_tuple_id;
|
||||
void *m_local_data;
|
||||
};
|
||||
|
||||
/**
|
||||
* A non thread safe dict cache
|
||||
*/
|
||||
@ -35,12 +45,12 @@ public:
|
||||
LocalDictCache();
|
||||
~LocalDictCache();
|
||||
|
||||
NdbTableImpl * get(const char * name);
|
||||
Ndb_local_table_info * get(const char * name);
|
||||
|
||||
void put(const char * name, NdbTableImpl *);
|
||||
void put(const char * name, Ndb_local_table_info *);
|
||||
void drop(const char * name);
|
||||
|
||||
NdbLinHash<NdbTableImpl> m_tableHash; // On name
|
||||
NdbLinHash<Ndb_local_table_info> m_tableHash; // On name
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -753,9 +753,11 @@ Uint64
|
||||
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
|
||||
{
|
||||
DEBUG_TRACE("getAutoIncrementValue");
|
||||
const NdbTableImpl* table = theDictionary->getTable(aTableName);
|
||||
if (table == 0)
|
||||
const char * internalTableName = internalizeTableName(aTableName);
|
||||
Ndb_local_table_info *info= theDictionary->get_local_table_info(internalTableName);
|
||||
if (info == 0)
|
||||
return ~0;
|
||||
const NdbTableImpl *table= info->m_table_impl;
|
||||
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
|
||||
return tupleId;
|
||||
}
|
||||
@ -832,11 +834,13 @@ bool
|
||||
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
|
||||
{
|
||||
DEBUG_TRACE("setAutoIncrementValue " << val);
|
||||
const NdbTableImpl* table = theDictionary->getTable(aTableName);
|
||||
if (table == 0) {
|
||||
const char * internalTableName= internalizeTableName(aTableName);
|
||||
Ndb_local_table_info *info= theDictionary->get_local_table_info(internalTableName);
|
||||
if (info == 0) {
|
||||
theError= theDictionary->getNdbError();
|
||||
return false;
|
||||
}
|
||||
const NdbTableImpl* table= info->m_table_impl;
|
||||
return setTupleIdInNdb(table->m_tableId, val, increase);
|
||||
}
|
||||
|
||||
|
@ -681,13 +681,18 @@ NdbDictionary::Dictionary::alterTable(const Table & t){
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbDictionary::Dictionary::getTable(const char * name){
|
||||
NdbTableImpl * t = m_impl.getTable(name);
|
||||
NdbDictionary::Dictionary::getTable(const char * name, void **data){
|
||||
NdbTableImpl * t = m_impl.getTable(name, data);
|
||||
if(t)
|
||||
return t->m_facade;
|
||||
return 0;
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbDictionary::Dictionary::getTable(const char * name){
|
||||
return getTable(name, 0);
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictionary::Dictionary::invalidateTable(const char * name){
|
||||
NdbTableImpl * t = m_impl.getTable(name);
|
||||
|
@ -595,11 +595,12 @@ static int f_dictionary_count = 0;
|
||||
|
||||
NdbDictionaryImpl::~NdbDictionaryImpl()
|
||||
{
|
||||
NdbElement_t<NdbTableImpl> * curr = m_localHash.m_tableHash.getNext(0);
|
||||
NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0);
|
||||
if(m_globalHash){
|
||||
while(curr != 0){
|
||||
m_globalHash->lock();
|
||||
m_globalHash->release(curr->theData);
|
||||
m_globalHash->release(curr->theData->m_table_impl);
|
||||
delete curr->theData;
|
||||
m_globalHash->unlock();
|
||||
|
||||
curr = m_localHash.m_tableHash.getNext(curr);
|
||||
@ -620,7 +621,39 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
|
||||
}
|
||||
}
|
||||
|
||||
Ndb_local_table_info *
|
||||
NdbDictionaryImpl::fetchGlobalTableImpl(const char * internalTableName)
|
||||
{
|
||||
NdbTableImpl *impl;
|
||||
|
||||
m_globalHash->lock();
|
||||
impl = m_globalHash->get(internalTableName);
|
||||
m_globalHash->unlock();
|
||||
|
||||
if (impl == 0){
|
||||
impl = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames());
|
||||
m_globalHash->lock();
|
||||
m_globalHash->put(internalTableName, impl);
|
||||
m_globalHash->unlock();
|
||||
|
||||
if(impl == 0){
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
Ndb_local_table_info *info= new Ndb_local_table_info(impl, 32);
|
||||
info->m_first_tuple_id= ~0;
|
||||
info->m_last_tuple_id= ~0;
|
||||
|
||||
m_localHash.put(internalTableName, info);
|
||||
|
||||
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
|
||||
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
|
||||
|
||||
addBlobTables(*impl);
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
#if 0
|
||||
bool
|
||||
@ -1504,7 +1537,6 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
|
||||
: createTable(&tSignal, ptr);
|
||||
|
||||
if (!alter && haveAutoIncrement) {
|
||||
// if (!ndb.setAutoIncrementValue(impl.m_internalName.c_str(), autoIncrementValue)) {
|
||||
if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), autoIncrementValue)) {
|
||||
if (ndb.theError.code == 0) {
|
||||
m_error.code = 4336;
|
||||
@ -1775,11 +1807,12 @@ NdbIndexImpl*
|
||||
NdbDictionaryImpl::getIndexImpl(const char * externalName,
|
||||
const char * internalName)
|
||||
{
|
||||
NdbTableImpl* tab = getTableImpl(internalName);
|
||||
if(tab == 0){
|
||||
Ndb_local_table_info * info = get_local_table_info(internalName);
|
||||
if(info == 0){
|
||||
m_error.code = 4243;
|
||||
return 0;
|
||||
}
|
||||
NdbTableImpl * tab = info->m_table_impl;
|
||||
|
||||
if(tab->m_indexType == NdbDictionary::Index::Undefined){
|
||||
// Not an index
|
||||
|
@ -390,8 +390,8 @@ public:
|
||||
int listObjects(List& list, NdbDictionary::Object::Type type);
|
||||
int listIndexes(List& list, const char * tableName);
|
||||
|
||||
NdbTableImpl * getTable(const char * tableName);
|
||||
NdbTableImpl * getTableImpl(const char * internalName);
|
||||
NdbTableImpl * getTable(const char * tableName, void **data= 0);
|
||||
Ndb_local_table_info * get_local_table_info(const char * internalName);
|
||||
NdbIndexImpl * getIndex(const char * indexName,
|
||||
const char * tableName);
|
||||
NdbIndexImpl * getIndexImpl(const char * name, const char * internalName);
|
||||
@ -410,6 +410,8 @@ public:
|
||||
|
||||
NdbDictInterface m_receiver;
|
||||
Ndb & m_ndb;
|
||||
private:
|
||||
Ndb_local_table_info * fetchGlobalTableImpl(const char * internalName);
|
||||
};
|
||||
|
||||
inline
|
||||
@ -598,45 +600,28 @@ NdbDictionaryImpl::getImpl(const NdbDictionary::Dictionary & t){
|
||||
|
||||
inline
|
||||
NdbTableImpl *
|
||||
NdbDictionaryImpl::getTable(const char * tableName)
|
||||
NdbDictionaryImpl::getTable(const char * tableName, void **data)
|
||||
{
|
||||
const char * internalTableName = m_ndb.internalizeTableName(tableName);
|
||||
|
||||
return getTableImpl(internalTableName);
|
||||
Ndb_local_table_info *info= get_local_table_info(internalTableName);
|
||||
if (info == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (data) {
|
||||
*data= info->m_local_data;
|
||||
}
|
||||
return info->m_table_impl;
|
||||
}
|
||||
|
||||
inline
|
||||
NdbTableImpl *
|
||||
NdbDictionaryImpl::getTableImpl(const char * internalTableName)
|
||||
Ndb_local_table_info *
|
||||
NdbDictionaryImpl::get_local_table_info(const char * internalTableName)
|
||||
{
|
||||
NdbTableImpl *ret = m_localHash.get(internalTableName);
|
||||
|
||||
if (ret != 0) {
|
||||
return ret; // autoincrement already initialized
|
||||
Ndb_local_table_info *info= m_localHash.get(internalTableName);
|
||||
if (info != 0) {
|
||||
return info; // autoincrement already initialized
|
||||
}
|
||||
|
||||
m_globalHash->lock();
|
||||
ret = m_globalHash->get(internalTableName);
|
||||
m_globalHash->unlock();
|
||||
|
||||
if (ret == 0){
|
||||
ret = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames());
|
||||
m_globalHash->lock();
|
||||
m_globalHash->put(internalTableName, ret);
|
||||
m_globalHash->unlock();
|
||||
|
||||
if(ret == 0){
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
m_localHash.put(internalTableName, ret);
|
||||
|
||||
m_ndb.theFirstTupleId[ret->getTableId()] = ~0;
|
||||
m_ndb.theLastTupleId[ret->getTableId()] = ~0;
|
||||
|
||||
addBlobTables(*ret);
|
||||
|
||||
return ret;
|
||||
return fetchGlobalTableImpl(internalTableName);
|
||||
}
|
||||
|
||||
inline
|
||||
@ -654,9 +639,9 @@ NdbDictionaryImpl::getIndex(const char * indexName,
|
||||
internalIndexName = m_ndb.internalizeTableName(indexName); // Index is also a table
|
||||
}
|
||||
if (internalIndexName) {
|
||||
NdbTableImpl * tab = getTableImpl(internalIndexName);
|
||||
|
||||
if (tab) {
|
||||
Ndb_local_table_info * info = get_local_table_info(internalIndexName);
|
||||
if (info) {
|
||||
NdbTableImpl * tab = info->m_table_impl;
|
||||
if (tab->m_index == 0)
|
||||
tab->m_index = getIndexImpl(indexName, internalIndexName);
|
||||
if (tab->m_index != 0)
|
||||
|
@ -59,7 +59,7 @@ public:
|
||||
void releaseHashTable(void);
|
||||
|
||||
int insertKey(const char * str, Uint32 len, Uint32 lkey1, C* data);
|
||||
int deleteKey(const char * str, Uint32 len);
|
||||
C *deleteKey(const char * str, Uint32 len);
|
||||
|
||||
C* getData(const char *, Uint32);
|
||||
Uint32* getKey(const char *, Uint32);
|
||||
@ -277,7 +277,7 @@ NdbLinHash<C>::getData( const char* str, Uint32 len ){
|
||||
|
||||
template <class C>
|
||||
inline
|
||||
int
|
||||
C *
|
||||
NdbLinHash<C>::deleteKey ( const char* str, Uint32 len){
|
||||
const Uint32 hash = Hash(str, len);
|
||||
int dir, seg;
|
||||
@ -288,19 +288,21 @@ NdbLinHash<C>::deleteKey ( const char* str, Uint32 len){
|
||||
for(NdbElement_t<C> * chain = *chainp; chain != 0; chain = chain->next){
|
||||
if(chain->len == len && !memcmp(chain->str, str, len)){
|
||||
if (oldChain == 0) {
|
||||
C *data= chain->theData;
|
||||
delete chain;
|
||||
* chainp = 0;
|
||||
return 1;
|
||||
return data;
|
||||
} else {
|
||||
C *data= chain->theData;
|
||||
oldChain->next = chain->next;
|
||||
delete chain;
|
||||
return 1;
|
||||
return data;
|
||||
}
|
||||
} else {
|
||||
oldChain = chain;
|
||||
}
|
||||
}
|
||||
return -1; /* Element doesn't exist */
|
||||
return 0; /* Element doesn't exist */
|
||||
}
|
||||
|
||||
template <class C>
|
||||
|
@ -87,7 +87,8 @@ static int unpackfrm(const void **data, uint *len,
|
||||
const void* pack_data);
|
||||
|
||||
static int ndb_get_table_statistics(Ndb*, const char *,
|
||||
Uint64* rows, Uint64* commits);
|
||||
Uint64* rows, Uint64* commits);
|
||||
|
||||
|
||||
/*
|
||||
Error handling functions
|
||||
@ -137,6 +138,93 @@ static int ndb_to_mysql_error(const NdbError *err)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Place holder for ha_ndbcluster thread specific data
|
||||
*/
|
||||
|
||||
class Thd_ndb {
|
||||
public:
|
||||
Thd_ndb();
|
||||
~Thd_ndb();
|
||||
Ndb *ndb;
|
||||
ulong count;
|
||||
uint lock_count;
|
||||
};
|
||||
|
||||
Thd_ndb::Thd_ndb()
|
||||
{
|
||||
ndb= 0;
|
||||
lock_count= 0;
|
||||
count= 0;
|
||||
}
|
||||
|
||||
Thd_ndb::~Thd_ndb()
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* manage uncommitted insert/deletes during transactio to get records correct
|
||||
*/
|
||||
|
||||
struct Ndb_table_local_info {
|
||||
int no_uncommitted_rows_count;
|
||||
ulong transaction_count;
|
||||
ha_rows records;
|
||||
};
|
||||
|
||||
void ha_ndbcluster::records_update()
|
||||
{
|
||||
DBUG_ENTER("ha_ndbcluster::records_update");
|
||||
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
|
||||
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
|
||||
((const NDBTAB *)m_table)->getTableId(),
|
||||
info->no_uncommitted_rows_count));
|
||||
if (info->records == ~(ha_rows)0)
|
||||
{
|
||||
Uint64 rows;
|
||||
if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
|
||||
info->records= rows;
|
||||
}
|
||||
}
|
||||
records= info->records+ info->no_uncommitted_rows_count;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
|
||||
{
|
||||
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
|
||||
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
|
||||
Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
|
||||
if (info->transaction_count != thd_ndb->count)
|
||||
{
|
||||
info->transaction_count = thd_ndb->count;
|
||||
info->no_uncommitted_rows_count= 0;
|
||||
info->records= ~(ha_rows)0;
|
||||
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
|
||||
((const NDBTAB *)m_table)->getTableId(),
|
||||
info->no_uncommitted_rows_count));
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void ha_ndbcluster::no_uncommitted_rows_update(int c)
|
||||
{
|
||||
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
|
||||
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
|
||||
info->no_uncommitted_rows_count+= c;
|
||||
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
|
||||
((const NDBTAB *)m_table)->getTableId(),
|
||||
info->no_uncommitted_rows_count));
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
|
||||
{
|
||||
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
|
||||
((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/*
|
||||
Take care of the error that occured in NDB
|
||||
|
||||
@ -145,6 +233,7 @@ static int ndb_to_mysql_error(const NdbError *err)
|
||||
# The mapped error code
|
||||
*/
|
||||
|
||||
|
||||
int ha_ndbcluster::ndb_err(NdbConnection *trans)
|
||||
{
|
||||
int res;
|
||||
@ -506,7 +595,7 @@ int ha_ndbcluster::get_metadata(const char *path)
|
||||
DBUG_ENTER("get_metadata");
|
||||
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
|
||||
|
||||
if (!(tab= dict->getTable(m_tabname)))
|
||||
if (!(tab= dict->getTable(m_tabname, &m_table_info)))
|
||||
ERR_RETURN(dict->getNdbError());
|
||||
DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
|
||||
|
||||
@ -556,10 +645,6 @@ int ha_ndbcluster::get_metadata(const char *path)
|
||||
|
||||
// All checks OK, lets use the table
|
||||
m_table= (void*)tab;
|
||||
Uint64 rows;
|
||||
if(false && ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
|
||||
records= rows;
|
||||
}
|
||||
|
||||
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
|
||||
}
|
||||
@ -1480,6 +1565,7 @@ int ha_ndbcluster::write_row(byte *record)
|
||||
Find out how this is detected!
|
||||
*/
|
||||
rows_inserted++;
|
||||
no_uncommitted_rows_update(1);
|
||||
bulk_insert_not_flushed= true;
|
||||
if ((rows_to_insert == 1) ||
|
||||
((rows_inserted % bulk_insert_rows) == 0) ||
|
||||
@ -1701,6 +1787,8 @@ int ha_ndbcluster::delete_row(const byte *record)
|
||||
ERR_RETURN(trans->getNdbError());
|
||||
ops_pending++;
|
||||
|
||||
no_uncommitted_rows_update(-1);
|
||||
|
||||
// If deleting from cursor, NoCommit will be handled in next_result
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
@ -1711,6 +1799,8 @@ int ha_ndbcluster::delete_row(const byte *record)
|
||||
op->deleteTuple() != 0)
|
||||
ERR_RETURN(trans->getNdbError());
|
||||
|
||||
no_uncommitted_rows_update(-1);
|
||||
|
||||
if (table->primary_key == MAX_KEY)
|
||||
{
|
||||
// This table has no primary key, use "hidden" primary key
|
||||
@ -2259,7 +2349,10 @@ void ha_ndbcluster::info(uint flag)
|
||||
if (flag & HA_STATUS_CONST)
|
||||
DBUG_PRINT("info", ("HA_STATUS_CONST"));
|
||||
if (flag & HA_STATUS_VARIABLE)
|
||||
{
|
||||
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
|
||||
records_update();
|
||||
}
|
||||
if (flag & HA_STATUS_ERRKEY)
|
||||
{
|
||||
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
|
||||
@ -2558,9 +2651,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
NdbConnection* trans= NULL;
|
||||
|
||||
DBUG_ENTER("external_lock");
|
||||
DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
|
||||
thd->transaction.ndb_lock_count));
|
||||
|
||||
/*
|
||||
Check that this handler instance has a connection
|
||||
set up to the Ndb object of thd
|
||||
@ -2568,10 +2658,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
if (check_ndb_connection())
|
||||
DBUG_RETURN(1);
|
||||
|
||||
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
|
||||
|
||||
DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d",
|
||||
thd_ndb->lock_count));
|
||||
|
||||
if (lock_type != F_UNLCK)
|
||||
{
|
||||
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
|
||||
if (!thd->transaction.ndb_lock_count++)
|
||||
if (!thd_ndb->lock_count++)
|
||||
{
|
||||
PRINT_OPTION_FLAGS(thd);
|
||||
|
||||
@ -2584,6 +2679,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
trans= m_ndb->startTransaction();
|
||||
if (trans == NULL)
|
||||
ERR_RETURN(m_ndb->getNdbError());
|
||||
no_uncommitted_rows_reset(thd);
|
||||
thd->transaction.stmt.ndb_tid= trans;
|
||||
}
|
||||
else
|
||||
@ -2597,6 +2693,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
trans= m_ndb->startTransaction();
|
||||
if (trans == NULL)
|
||||
ERR_RETURN(m_ndb->getNdbError());
|
||||
no_uncommitted_rows_reset(thd);
|
||||
|
||||
/*
|
||||
If this is the start of a LOCK TABLE, a table look
|
||||
@ -2633,11 +2730,12 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
// Start of transaction
|
||||
retrieve_all_fields= FALSE;
|
||||
ops_pending= 0;
|
||||
no_uncommitted_rows_init(thd);
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_PRINT("info", ("lock_type == F_UNLCK"));
|
||||
if (!--thd->transaction.ndb_lock_count)
|
||||
if (!--thd_ndb->lock_count)
|
||||
{
|
||||
DBUG_PRINT("trans", ("Last external_lock"));
|
||||
PRINT_OPTION_FLAGS(thd);
|
||||
@ -2696,6 +2794,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
|
||||
trans= m_ndb->startTransaction();
|
||||
if (trans == NULL)
|
||||
ERR_RETURN(m_ndb->getNdbError());
|
||||
no_uncommitted_rows_reset(thd);
|
||||
thd->transaction.stmt.ndb_tid= trans;
|
||||
}
|
||||
m_active_trans= trans;
|
||||
@ -2715,7 +2814,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
|
||||
int ndbcluster_commit(THD *thd, void *ndb_transaction)
|
||||
{
|
||||
int res= 0;
|
||||
Ndb *ndb= (Ndb*)thd->transaction.ndb;
|
||||
Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
|
||||
NdbConnection *trans= (NdbConnection*)ndb_transaction;
|
||||
|
||||
DBUG_ENTER("ndbcluster_commit");
|
||||
@ -2745,7 +2844,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
|
||||
int ndbcluster_rollback(THD *thd, void *ndb_transaction)
|
||||
{
|
||||
int res= 0;
|
||||
Ndb *ndb= (Ndb*)thd->transaction.ndb;
|
||||
Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
|
||||
NdbConnection *trans= (NdbConnection*)ndb_transaction;
|
||||
|
||||
DBUG_ENTER("ndbcluster_rollback");
|
||||
@ -3222,9 +3321,9 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
|
||||
m_active_cursor(NULL),
|
||||
m_ndb(NULL),
|
||||
m_table(NULL),
|
||||
m_table_info(NULL),
|
||||
m_table_flags(HA_REC_NOT_IN_SEQ |
|
||||
HA_NULL_IN_KEY |
|
||||
HA_NOT_EXACT_COUNT |
|
||||
HA_NO_PREFIX_CHAR_KEYS),
|
||||
m_share(0),
|
||||
m_use_write(false),
|
||||
@ -3249,7 +3348,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
|
||||
|
||||
// TODO Adjust number of records and other parameters for proper
|
||||
// selection of scan/pk access
|
||||
records= 100;
|
||||
// records= 100;
|
||||
records= 0;
|
||||
block_size= 1024;
|
||||
|
||||
for (i= 0; i < MAX_KEY; i++)
|
||||
@ -3401,25 +3501,30 @@ int ha_ndbcluster::check_ndb_connection()
|
||||
Ndb* ndb;
|
||||
DBUG_ENTER("check_ndb_connection");
|
||||
|
||||
if (!thd->transaction.ndb)
|
||||
if (!thd->transaction.thd_ndb)
|
||||
{
|
||||
ndb= seize_ndb();
|
||||
if (!ndb)
|
||||
DBUG_RETURN(2);
|
||||
thd->transaction.ndb= ndb;
|
||||
thd->transaction.thd_ndb= new Thd_ndb();
|
||||
((Thd_ndb *)thd->transaction.thd_ndb)->ndb= ndb;
|
||||
}
|
||||
m_ndb= (Ndb*)thd->transaction.ndb;
|
||||
m_ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
|
||||
m_ndb->setDatabaseName(m_dbname);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
void ndbcluster_close_connection(THD *thd)
|
||||
{
|
||||
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
|
||||
Ndb* ndb;
|
||||
DBUG_ENTER("ndbcluster_close_connection");
|
||||
ndb= (Ndb*)thd->transaction.ndb;
|
||||
ha_ndbcluster::release_ndb(ndb);
|
||||
thd->transaction.ndb= NULL;
|
||||
if (thd_ndb)
|
||||
{
|
||||
ha_ndbcluster::release_ndb(thd_ndb->ndb);
|
||||
delete thd_ndb;
|
||||
thd->transaction.thd_ndb= NULL;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@ -3539,6 +3644,7 @@ bool ndbcluster_init()
|
||||
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
|
||||
(hash_get_key) ndbcluster_get_key,0,0);
|
||||
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
|
||||
|
||||
ndbcluster_inited= 1;
|
||||
#ifdef USE_DISCOVER_ON_STARTUP
|
||||
if (ndb_discover_tables() != 0)
|
||||
|
@ -215,6 +215,7 @@ class ha_ndbcluster: public handler
|
||||
NdbResultSet *m_active_cursor;
|
||||
Ndb *m_ndb;
|
||||
void *m_table;
|
||||
void *m_table_info;
|
||||
char m_dbname[FN_HEADLEN];
|
||||
//char m_schemaname[FN_HEADLEN];
|
||||
char m_tabname[FN_HEADLEN];
|
||||
@ -238,6 +239,11 @@ class ha_ndbcluster: public handler
|
||||
char *blobs_buffer;
|
||||
uint32 blobs_buffer_size;
|
||||
uint dupkey;
|
||||
|
||||
void records_update();
|
||||
void no_uncommitted_rows_update(int);
|
||||
void no_uncommitted_rows_init(THD *);
|
||||
void no_uncommitted_rows_reset(THD *);
|
||||
};
|
||||
|
||||
bool ndbcluster_init(void);
|
||||
|
@ -764,9 +764,8 @@ public:
|
||||
THD_TRANS all; // Trans since BEGIN WORK
|
||||
THD_TRANS stmt; // Trans for current statement
|
||||
uint bdb_lock_count;
|
||||
uint ndb_lock_count;
|
||||
#ifdef HAVE_NDBCLUSTER_DB
|
||||
void* ndb;
|
||||
void* thd_ndb;
|
||||
#endif
|
||||
bool on;
|
||||
/*
|
||||
|
Reference in New Issue
Block a user