1
0
mirror of https://github.com/MariaDB/server.git synced 2025-09-02 09:41:40 +03:00
This commit is contained in:
magnus@neptunus.(none)
2004-09-13 14:49:50 +02:00
19 changed files with 635 additions and 188 deletions

View File

@@ -32,7 +32,6 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp>
#define USE_DISCOVER_ON_STARTUP
//#define USE_NDB_POOL
// Default value for parallelism
@@ -48,11 +47,13 @@ static const ha_rows autoincrement_prefetch= 32;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
static const char *ha_ndb_ext=".ndb";
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
#define ERR_PRINT(err) \
DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
#define ERR_RETURN(err) \
{ \
@@ -106,7 +107,9 @@ static const err_code_mapping err_map[]=
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
{ 241, HA_ERR_OLD_METADATA },
{ 709, HA_ERR_NO_SUCH_TABLE },
{ 284, HA_ERR_NO_SUCH_TABLE },
{ 266, HA_ERR_LOCK_WAIT_TIMEOUT },
{ 274, HA_ERR_LOCK_WAIT_TIMEOUT },
@@ -160,6 +163,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
NDBDICT *dict= m_ndb->getDictionary();
DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
dict->invalidateTable(m_tabname);
table->version=0L; /* Free when thread is ready */
break;
}
default:
@@ -199,7 +203,8 @@ bool ha_ndbcluster::get_error_message(int error,
/*
Check if type is supported by NDB.
TODO Use this once, not in every operation
TODO Use this once in open(), not in every operation
*/
static inline bool ndb_supported_type(enum_field_types type)
@@ -523,7 +528,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_PRINT("error",
("Wrong number of columns, ndb: %d mysql: %d",
ndb_columns, mysql_columns));
DBUG_RETURN(HA_ERR_OLD_METADATA);
DBUG_RETURN(3);
}
/*
@@ -547,7 +552,7 @@ int ha_ndbcluster::get_metadata(const char *path)
memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
error= HA_ERR_OLD_METADATA;
error= 2;
}
my_free((char*)data, MYF(0));
my_free((char*)pack_data, MYF(0));
@@ -2473,7 +2478,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
{ static const char *ext[1]= { NullS }; return ext; }
{ static const char *ext[]= { ".ndb", NullS }; return ext; }
/*
@@ -2654,18 +2659,26 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd->transaction.stmt.ndb_tid= 0;
}
}
if (m_active_trans)
DBUG_PRINT("warning", ("m_active_trans != NULL"));
/*
This is the place to make sure this handler instance
no longer are connected to the active transaction.
And since the handler is no longer part of the transaction
it can't have open cursors, ops or blobs pending.
*/
m_active_trans= NULL;
if (m_active_cursor)
DBUG_PRINT("warning", ("m_active_cursor != NULL"));
m_active_cursor= NULL;
if (blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0"));
blobs_pending= 0;
if (ops_pending)
DBUG_PRINT("warning", ("ops_pending != 0L"));
m_active_trans= NULL;
m_active_cursor= NULL;
ops_pending= 0;
blobs_pending= 0;
}
DBUG_RETURN(error);
}
@@ -2961,12 +2974,24 @@ int ha_ndbcluster::create(const char *name,
const void *data, *pack_data;
const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
DBUG_ENTER("create");
DBUG_PRINT("enter", ("name: %s", name));
fn_format(name2, name, "", "",2); // Remove the .frm extension
set_dbname(name2);
set_tabname(name2);
set_tabname(name2);
if (create_from_engine)
{
/*
Table alreay exists in NDB and frm file has been created by
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
my_errno= write_ndb_file();
DBUG_RETURN(my_errno);
}
DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname);
@@ -3007,16 +3032,12 @@ int ha_ndbcluster::create(const char *name,
tab.addColumn(col);
}
my_errno= 0;
if (check_ndb_connection())
{
my_errno= HA_ERR_NO_CONNECTION;
if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
}
// Create the table in NDB
NDBDICT *dict= m_ndb->getDictionary();
if (dict->createTable(tab))
if (dict->createTable(tab) != 0)
{
const NdbError err= dict->getNdbError();
ERR_PRINT(err);
@@ -3029,6 +3050,9 @@ int ha_ndbcluster::create(const char *name,
// Create secondary indexes
my_errno= build_index_list(form, ILBP_CREATE);
if (!my_errno)
my_errno= write_ndb_file();
DBUG_RETURN(my_errno);
}
@@ -3105,14 +3129,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
set_tabname(from);
set_tabname(to, new_tabname);
if (check_ndb_connection()) {
my_errno= HA_ERR_NO_CONNECTION;
DBUG_RETURN(my_errno);
}
if (check_ndb_connection())
DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
int result= alter_table_name(m_tabname, new_tabname);
if (result == 0)
{
set_tabname(to);
handler::rename_table(from, to);
}
DBUG_RETURN(result);
}
@@ -3156,6 +3182,8 @@ int ha_ndbcluster::delete_table(const char *name)
if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION);
handler::delete_table(name);
DBUG_RETURN(drop_table());
}
@@ -3324,7 +3352,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(get_metadata(name));
}
@@ -3390,24 +3418,33 @@ void ha_ndbcluster::release_ndb(Ndb* ndb)
seize a Ndb object, assign it to current THD and use it.
Having a Ndb object also means that a connection to
NDB cluster has been opened. The connection is
checked.
NDB cluster has been opened.
*/
Ndb* check_ndb_in_thd(THD* thd)
{
Ndb* ndb;
DBUG_ENTER("check_ndb_in_thd");
if (!(ndb= (Ndb*)thd->transaction.ndb))
{
ndb= ha_ndbcluster::seize_ndb();
if (!ndb)
DBUG_RETURN(NULL);
thd->transaction.ndb= ndb;
}
DBUG_RETURN(ndb);
}
int ha_ndbcluster::check_ndb_connection()
{
THD* thd= current_thd;
Ndb* ndb;
DBUG_ENTER("check_ndb_connection");
if (!thd->transaction.ndb)
{
ndb= seize_ndb();
if (!ndb)
DBUG_RETURN(2);
thd->transaction.ndb= ndb;
}
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
m_ndb= (Ndb*)thd->transaction.ndb;
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
@@ -3428,27 +3465,28 @@ void ndbcluster_close_connection(THD *thd)
Try to discover one table from NDB
*/
int ndbcluster_discover(const char *dbname, const char *name,
int ndbcluster_discover(THD* thd, const char *db, const char *name,
const void** frmblob, uint* frmlen)
{
uint len;
const void* data;
const NDBTAB* tab;
Ndb* ndb;
DBUG_ENTER("ndbcluster_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
Ndb ndb(g_ndb_cluster_connection, dbname);
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db);
if (ndb.init())
ERR_RETURN(ndb.getNdbError());
if (ndb.waitUntilReady(0))
ERR_RETURN(ndb.getNdbError());
if (!(tab= ndb.getDictionary()->getTable(name)))
{
DBUG_PRINT("info", ("Table %s not found", name));
DBUG_RETURN(1);
NDBDICT* dict= ndb->getDictionary();
dict->invalidateTable(name);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (err.code == 709)
DBUG_RETURN(1);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
@@ -3471,40 +3509,79 @@ int ndbcluster_discover(const char *dbname, const char *name,
}
#ifdef USE_DISCOVER_ON_STARTUP
int ndbcluster_can_discover(THD *thd, const char *name)
{
DBUG_ENTER("ndbcluster_can_discover");
DBUG_RETURN(!my_strcasecmp(system_charset_info, fn_ext(name), ha_ndb_ext))
}
/*
Dicover tables from NDB Cluster
- fetch a list of tables from NDB
- store the frm file for each table on disk
- if the table has an attached frm file
- if the database of the table exists
Check if a table exists in NDB
*/
int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
{
uint len;
const void* data;
const NDBTAB* tab;
Ndb* ndb;
DBUG_ENTER("ndbcluster_table_exists");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db);
NDBDICT* dict= ndb->getDictionary();
dict->invalidateTable(name);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (err.code == 709)
DBUG_RETURN(0);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
DBUG_RETURN(1);
}
/*
List tables in NDB Cluster
*/
int ndb_discover_tables()
int ndbcluster_list_tables(THD* thd, HASH *tables, const char* db)
{
uint i;
NdbDictionary::Dictionary::List list;
NdbDictionary::Dictionary* dict;
char path[FN_REFLEN];
DBUG_ENTER("ndb_discover_tables");
Ndb* ndb;
DBUG_ENTER("ndbcluster_list_tables");
DBUG_PRINT("enter", ("db: %s", db));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
/* List tables in NDB Cluster kernel */
dict= g_ndb->getDictionary();
NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
if (create_table_from_handler(t.database, t.name, true))
DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
if (strcmp(t.database, db) == 0)
{
DBUG_PRINT("info", ("my_hash_insert %s", t.name));
(void)my_hash_insert(tables, (byte*)thd->strdup(t.name));;
}
}
DBUG_RETURN(0);
}
#endif
/*
@@ -3554,10 +3631,6 @@ bool ndbcluster_init()
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
DBUG_RETURN(TRUE);
#endif
DBUG_RETURN(false);
}
@@ -3571,7 +3644,6 @@ bool ndbcluster_init()
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
@@ -3942,4 +4014,30 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(-1);
}
/*
Create a .ndb file to serve as a placeholder indicating
that the table with this name is a ndb table
*/
int ha_ndbcluster::write_ndb_file()
{
File file;
bool error=1;
char path[FN_REFLEN];
DBUG_ENTER("write_ndb_file");
DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
(void)strxnmov(path, FN_REFLEN,
mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
{
// It's an empty file
error=0;
my_close(file,MYF(0));
}
DBUG_RETURN(error);
}
#endif /* HAVE_NDBCLUSTER_DB */