1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Merge c-0c0be253.1238-1-64736c10.cust.bredbandsbolaget.se:/home/pappa/mysql-5.1-new

into  c-0c0be253.1238-1-64736c10.cust.bredbandsbolaget.se:/home/pappa/wl2604-push


sql/ha_ndbcluster.cc:
  Auto merged
sql/ha_ndbcluster.h:
  Auto merged
sql/ha_ndbcluster_binlog.cc:
  Auto merged
This commit is contained in:
unknown
2006-01-17 09:36:55 -05:00
71 changed files with 11855 additions and 2085 deletions

View File

@ -52,6 +52,8 @@ static const int parallelism= 0;
// createable against NDB from this handler
static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
static uint ndbcluster_partition_flags();
static uint ndbcluster_alter_table_flags(uint flags);
static bool ndbcluster_init(void);
static int ndbcluster_end(ha_panic_function flag);
static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
@ -72,6 +74,23 @@ static handler *ndbcluster_create_handler(TABLE_SHARE *table)
return new ha_ndbcluster(table);
}
static uint ndbcluster_partition_flags()
{
return (HA_CAN_PARTITION | HA_CAN_UPDATE_PARTITION_KEY |
HA_CAN_PARTITION_UNIQUE | HA_USE_AUTO_PARTITION);
}
static uint ndbcluster_alter_table_flags(uint flags)
{
if (flags & ALTER_DROP_PARTITION)
return 0;
else
return (HA_ONLINE_ADD_INDEX | HA_ONLINE_DROP_INDEX |
HA_ONLINE_ADD_UNIQUE_INDEX | HA_ONLINE_DROP_UNIQUE_INDEX |
HA_PARTITION_FUNCTION_SUPPORTED);
}
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
#define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
@ -117,10 +136,6 @@ static int rename_share(NDB_SHARE *share, const char *new_key);
#endif
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
static int unpackfrm(const void **data, uint *len,
const void* pack_data);
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
@ -348,7 +363,7 @@ struct Ndb_local_table_statistics {
void ha_ndbcluster::set_rec_per_key()
{
DBUG_ENTER("ha_ndbcluster::get_status_const");
for (uint i=0 ; i < table->s->keys ; i++)
for (uint i=0 ; i < table_share->keys ; i++)
{
table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
}
@ -447,7 +462,7 @@ void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
*/
void
ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
ha_ndbcluster::invalidate_dictionary_cache(TABLE_SHARE *share, Ndb *ndb,
const char *tabname, bool global)
{
NDBDICT *dict= ndb->getDictionary();
@ -470,16 +485,16 @@ ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
}
else
dict->removeCachedTable(tabname);
table->s->version=0L; /* Free when thread is ready */
share->version=0L; /* Free when thread is ready */
DBUG_VOID_RETURN;
}
void ha_ndbcluster::invalidate_dictionary_cache(bool global)
{
NDBDICT *dict= get_ndb()->getDictionary();
invalidate_dictionary_cache(table, get_ndb(), m_tabname, global);
invalidate_dictionary_cache(table_share, get_ndb(), m_tabname, global);
/* Invalidate indexes */
for (uint i= 0; i < table->s->keys; i++)
for (uint i= 0; i < table_share->keys; i++)
{
NDBINDEX *index = (NDBINDEX *) m_index[i].index;
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
@ -549,7 +564,7 @@ int ha_ndbcluster::ndb_err(NdbTransaction *trans)
if (res == HA_ERR_FOUND_DUPP_KEY)
{
if (m_rows_to_insert == 1)
m_dupkey= table->s->primary_key;
m_dupkey= table_share->primary_key;
else
{
/* We are batching inserts, offending key is not available */
@ -788,7 +803,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
for (int loop= 0; loop <= 1; loop++)
{
uint32 offset= 0;
for (uint i= 0; i < table->s->fields; i++)
for (uint i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
NdbValue value= m_value[i];
@ -892,10 +907,10 @@ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
*/
bool ha_ndbcluster::uses_blob_value()
{
if (table->s->blob_fields == 0)
if (table_share->blob_fields == 0)
return FALSE;
{
uint no_fields= table->s->fields;
uint no_fields= table_share->fields;
int i;
// They always put blobs at the end..
for (i= no_fields - 1; i >= 0; i--)
@ -1423,7 +1438,7 @@ static void shrink_varchar(Field* field, const byte* & ptr, char* buf)
int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
{
KEY* key_info= table->key_info + table->s->primary_key;
KEY* key_info= table->key_info + table_share->primary_key;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
DBUG_ENTER("set_primary_key");
@ -1445,7 +1460,7 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *record)
{
KEY* key_info= table->key_info + table->s->primary_key;
KEY* key_info= table->key_info + table_share->primary_key;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
DBUG_ENTER("set_primary_key_from_record");
@ -1490,7 +1505,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
DBUG_ENTER("define_read_attrs");
// Define attributes to read
for (i= 0; i < table->s->fields; i++)
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
if (ha_get_bit_in_read_set(i+1) ||
@ -1505,11 +1520,11 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
}
}
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
DBUG_PRINT("info", ("Getting hidden key"));
// Scanning table with no primary key
int hidden_no= table->s->fields;
int hidden_no= table_share->fields;
#ifndef DBUG_OFF
const NDBTAB *tab= (const NDBTAB *) m_table;
if (!tab->getColumn(hidden_no))
@ -1529,7 +1544,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
uint32 part_id)
{
uint no_fields= table->s->fields;
uint no_fields= table_share->fields;
NdbConnection *trans= m_active_trans;
NdbOperation *op;
@ -1547,7 +1562,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
if (m_use_partition_function)
op->setPartitionId(part_id);
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
@ -1587,7 +1602,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data,
uint32 old_part_id)
{
uint no_fields= table->s->fields, i;
uint no_fields= table_share->fields, i;
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
DBUG_ENTER("complemented_pk_read");
@ -2135,13 +2150,13 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_ENTER("write_row");
if (!m_use_write && m_ignore_dup_key && table->s->primary_key != MAX_KEY)
if (!m_use_write && m_ignore_dup_key && table_share->primary_key != MAX_KEY)
{
int peek_res= peek_row(record);
if (!peek_res)
{
m_dupkey= table->s->primary_key;
m_dupkey= table_share->primary_key;
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
}
if (peek_res != HA_ERR_KEY_NOT_FOUND)
@ -2171,7 +2186,7 @@ int ha_ndbcluster::write_row(byte *record)
op->setPartitionId(part_id);
}
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
// Table has hidden primary key
Ndb *ndb= get_ndb();
@ -2184,7 +2199,7 @@ int ha_ndbcluster::write_row(byte *record)
ndb->getNdbError().status == NdbError::TemporaryError);
if (auto_value == NDB_FAILED_AUTO_INCREMENT)
ERR_RETURN(ndb->getNdbError());
if (set_hidden_key(op, table->s->fields, (const byte*)&auto_value))
if (set_hidden_key(op, table_share->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError());
}
else
@ -2208,7 +2223,7 @@ int ha_ndbcluster::write_row(byte *record)
// Set non-key attribute(s)
bool set_blob_value= FALSE;
for (i= 0; i < table->s->fields; i++)
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
@ -2349,8 +2364,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
/* Check for update of primary key for special handling */
if ((table->s->primary_key != MAX_KEY) &&
(key_cmp(table->s->primary_key, old_data, new_data)) ||
if ((table_share->primary_key != MAX_KEY) &&
(key_cmp(table_share->primary_key, old_data, new_data)) ||
(old_part_id != new_part_id))
{
int read_res, insert_res, delete_res, undo_res;
@ -2424,14 +2439,14 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (m_use_partition_function)
op->setPartitionId(new_part_id);
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->s->fields;
uint no_fields= table_share->fields;
const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
@ -2450,7 +2465,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
m_rows_changed++;
// Set non-key attribute(s)
for (i= 0; i < table->s->fields; i++)
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
if (ha_get_bit_in_write_set(i+1) &&
@ -2529,11 +2544,11 @@ int ha_ndbcluster::delete_row(const byte *record)
no_uncommitted_rows_update(-1);
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->s->fields;
uint no_fields= table_share->fields;
const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
@ -2656,10 +2671,10 @@ void ha_ndbcluster::unpack_record(byte *buf)
ndb_unpack_record(table, m_value, 0, buf);
#ifndef DBUG_OFF
// Read and print all values that was fetched
if (table->s->primary_key == MAX_KEY)
if (table_share->primary_key == MAX_KEY)
{
// Table with hidden primary key
int hidden_no= table->s->fields;
int hidden_no= table_share->fields;
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
const NdbRecAttr* rec= m_value[hidden_no].rec;
@ -2686,7 +2701,7 @@ void ha_ndbcluster::print_results()
char buf_type[MAX_FIELD_WIDTH], buf_val[MAX_FIELD_WIDTH];
String type(buf_type, sizeof(buf_type), &my_charset_bin);
String val(buf_val, sizeof(buf_val), &my_charset_bin);
for (uint f= 0; f < table->s->fields; f++)
for (uint f= 0; f < table_share->fields; f++)
{
/* Use DBUG_PRINT since DBUG_FILE cannot be filtered out */
char buf[2000];
@ -2953,7 +2968,7 @@ int ha_ndbcluster::rnd_init(bool scan)
DBUG_RETURN(-1);
}
}
index_init(table->s->primary_key, 0);
index_init(table_share->primary_key, 0);
DBUG_RETURN(0);
}
@ -3051,9 +3066,9 @@ void ha_ndbcluster::position(const byte *record)
byte *buff;
DBUG_ENTER("position");
if (table->s->primary_key != MAX_KEY)
if (table_share->primary_key != MAX_KEY)
{
key_info= table->key_info + table->s->primary_key;
key_info= table->key_info + table_share->primary_key;
key_part= key_info->key_part;
end= key_part + key_info->key_parts;
buff= ref;
@ -3095,7 +3110,7 @@ void ha_ndbcluster::position(const byte *record)
{
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->s->fields;
int hidden_no= table_share->fields;
const NdbRecAttr* rec= m_value[hidden_no].rec;
memcpy(ref, (const void*)rec->aRef(), ref_length);
#ifndef DBUG_OFF
@ -4057,7 +4072,7 @@ int ha_ndbcluster::create(const char *name,
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
if ((my_errno= write_ndb_file()))
if ((my_errno= write_ndb_file(name)))
DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG
if (ndb_binlog_thread_running > 0)
@ -4164,20 +4179,10 @@ int ha_ndbcluster::create(const char *name,
// Check partition info
partition_info *part_info= form->part_info;
if (part_info)
if ((my_errno= set_up_partition_info(part_info, form, (void*)&tab)))
{
int error;
if ((error= set_up_partition_info(part_info, form, (void*)&tab)))
{
DBUG_RETURN(error);
}
DBUG_RETURN(my_errno);
}
else
{
ndb_set_fragmentation(tab, form, pk_length);
}
if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
@ -4199,7 +4204,7 @@ int ha_ndbcluster::create(const char *name,
my_errno= create_indexes(ndb, form);
if (!my_errno)
my_errno= write_ndb_file();
my_errno= write_ndb_file(name);
else
{
/*
@ -4921,9 +4926,9 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
primary key to be written in the ref variable
*/
if (table->s->primary_key != MAX_KEY)
if (table_share->primary_key != MAX_KEY)
{
key= table->key_info+table->s->primary_key;
key= table->key_info+table_share->primary_key;
ref_length= key->key_length;
DBUG_PRINT("info", (" ref_length: %d", ref_length));
}
@ -4945,10 +4950,23 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
if (!res)
info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
DBUG_RETURN(res);
}
/*
Set partition info
SYNOPSIS
set_part_info()
part_info
RETURN VALUE
NONE
DESCRIPTION
Set up partition info when handler object created
*/
void ha_ndbcluster::set_part_info(partition_info *part_info)
{
m_part_info= part_info;
@ -5572,6 +5590,8 @@ static bool ndbcluster_init()
h.panic= ndbcluster_end; /* Panic call */
h.show_status= ndbcluster_show_status; /* Show status */
h.alter_tablespace= ndbcluster_alter_tablespace; /* Show status */
h.partition_flags= ndbcluster_partition_flags; /* Partition flags */
h.alter_table_flags=ndbcluster_alter_table_flags; /* Alter table flags */
#ifdef HAVE_NDB_BINLOG
ndbcluster_binlog_init_handlerton();
#endif
@ -5723,6 +5743,20 @@ static int ndbcluster_end(ha_panic_function type)
DBUG_RETURN(0);
}
void ha_ndbcluster::print_error(int error, myf errflag)
{
DBUG_ENTER("ha_ndbcluster::print_error");
DBUG_PRINT("enter", ("error = %d", error));
if (error == HA_ERR_NO_PARTITION_FOUND)
my_error(ER_NO_PARTITION_FOR_GIVEN_VALUE, MYF(0),
(int)m_part_info->part_expr->val_int());
else
handler::print_error(error, errflag);
DBUG_VOID_RETURN;
}
/*
Static error print function called from
static handler method ndbcluster_commit
@ -5749,8 +5783,10 @@ void ndbcluster_print_error(int error, const NdbOperation *error_op)
*/
void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
{
char *end, *ptr;
char *end, *ptr, *tmp_name;
char tmp_buff[FN_REFLEN];
tmp_name= tmp_buff;
/* Scan name from the end */
ptr= strend(path_name)-1;
while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
@ -5762,18 +5798,19 @@ void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
ptr--;
}
uint name_len= end - ptr;
memcpy(dbname, ptr + 1, name_len);
dbname[name_len]= '\0';
memcpy(tmp_name, ptr + 1, name_len);
tmp_name[name_len]= '\0';
#ifdef __WIN__
/* Put to lower case */
ptr= dbname;
ptr= tmp_name;
while (*ptr != '\0') {
*ptr= tolower(*ptr);
ptr++;
}
#endif
filename_to_tablename(tmp_name, dbname, FN_REFLEN);
}
/*
@ -5792,8 +5829,10 @@ void ha_ndbcluster::set_dbname(const char *path_name)
void
ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
{
char *end, *ptr;
char *end, *ptr, *tmp_name;
char tmp_buff[FN_REFLEN];
tmp_name= tmp_buff;
/* Scan name from the end */
end= strend(path_name)-1;
ptr= end;
@ -5801,17 +5840,18 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
ptr--;
}
uint name_len= end - ptr;
memcpy(tabname, ptr + 1, end - ptr);
tabname[name_len]= '\0';
memcpy(tmp_name, ptr + 1, end - ptr);
tmp_name[name_len]= '\0';
#ifdef __WIN__
/* Put to lower case */
ptr= tabname;
ptr= tmp_name;
while (*ptr != '\0') {
*ptr= tolower(*ptr);
ptr++;
}
#endif
filename_to_tablename(tmp_name, tabname, FN_REFLEN);
}
/*
@ -6583,104 +6623,6 @@ void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
}
/*
Internal representation of the frm blob
*/
struct frm_blob_struct
{
struct frm_blob_header
{
uint ver; // Version of header
uint orglen; // Original length of compressed data
uint complen; // Compressed length of data, 0=uncompressed
} head;
char data[1];
};
static int packfrm(const void *data, uint len,
const void **pack_data, uint *pack_len)
{
int error;
ulong org_len, comp_len;
uint blob_len;
frm_blob_struct* blob;
DBUG_ENTER("packfrm");
DBUG_PRINT("enter", ("data: 0x%lx len: %d", data, len));
error= 1;
org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
DBUG_PRINT("info", ("org_len: %d comp_len: %d", org_len, comp_len));
DBUG_DUMP("compressed", (char*)data, org_len);
error= 2;
blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
goto err;
// Store compressed blob in machine independent format
int4store((char*)(&blob->head.ver), 1);
int4store((char*)(&blob->head.orglen), comp_len);
int4store((char*)(&blob->head.complen), org_len);
// Copy frm data into blob, already in machine independent format
memcpy(blob->data, data, org_len);
*pack_data= blob;
*pack_len= blob_len;
error= 0;
DBUG_PRINT("exit",
("pack_data: 0x%lx pack_len: %d", *pack_data, *pack_len));
err:
DBUG_RETURN(error);
}
static int unpackfrm(const void **unpack_data, uint *unpack_len,
const void *pack_data)
{
const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data));
complen= uint4korr((char*)&blob->head.complen);
orglen= uint4korr((char*)&blob->head.orglen);
ver= uint4korr((char*)&blob->head.ver);
DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
ver,complen,orglen));
DBUG_DUMP("blob->data", (char*) blob->data, complen);
if (ver != 1)
DBUG_RETURN(1);
if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
DBUG_RETURN(2);
memcpy(data, blob->data, complen);
if (my_uncompress(data, &complen, &orglen))
{
my_free((char*)data, MYF(0));
DBUG_RETURN(3);
}
*unpack_data= data;
*unpack_len= complen;
DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len));
DBUG_RETURN(0);
}
static
int
ndb_get_table_statistics(Ndb* ndb, const char * table,
@ -6763,17 +6705,17 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
that the table with this name is a ndb table
*/
int ha_ndbcluster::write_ndb_file()
int ha_ndbcluster::write_ndb_file(const char *name)
{
File file;
bool error=1;
char path[FN_REFLEN];
DBUG_ENTER("write_ndb_file");
DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
DBUG_PRINT("enter", ("name: %s", name));
(void)strxnmov(path, FN_REFLEN-1,
mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
mysql_data_home,"/",name,ha_ndb_ext,NullS);
if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
{
@ -6797,7 +6739,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
int res;
KEY* key_info= table->key_info + active_index;
NDB_INDEX_TYPE index_type= get_index_type(active_index);
ulong reclength= table->s->reclength;
ulong reclength= table_share->reclength;
NdbOperation* op;
if (uses_blob_value())
@ -7004,7 +6946,7 @@ ha_ndbcluster::read_multi_range_next(KEY_MULTI_RANGE ** multi_range_found_p)
int res;
int range_no;
ulong reclength= table->s->reclength;
ulong reclength= table_share->reclength;
const NdbOperation* op= m_current_multi_operation;
for (;multi_range_curr < m_multi_range_defined; multi_range_curr++)
{
@ -7153,7 +7095,7 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
Field **field, **end;
NdbValue *value= m_value;
end= table->field + table->s->fields;
end= table->field + table_share->fields;
for (field= table->field; field < end; field++, value++)
{
@ -8874,11 +8816,116 @@ int ha_ndbcluster::get_default_no_partitions(ulonglong max_rows)
uint reported_frags;
uint no_fragments= get_no_fragments(max_rows);
uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
adjusted_frag_count(no_fragments, no_nodes, reported_frags);
if (adjusted_frag_count(no_fragments, no_nodes, reported_frags))
{
push_warning(current_thd,
MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
"Ndb might have problems storing the max amount of rows specified");
}
return (int)reported_frags;
}
/*
Set-up auto-partitioning for NDB Cluster
SYNOPSIS
set_auto_partitions()
part_info Partition info struct to set-up
RETURN VALUE
NONE
DESCRIPTION
Set-up auto partitioning scheme for tables that didn't define any
partitioning. We'll use PARTITION BY KEY() in this case which
translates into partition by primary key if a primary key exists
and partition by hidden key otherwise.
*/
void ha_ndbcluster::set_auto_partitions(partition_info *part_info)
{
DBUG_ENTER("ha_ndbcluster::set_auto_partitions");
part_info->list_of_part_fields= TRUE;
part_info->part_type= HASH_PARTITION;
switch (opt_ndb_distribution_id)
{
case ND_KEYHASH:
part_info->linear_hash_ind= FALSE;
break;
case ND_LINHASH:
part_info->linear_hash_ind= TRUE;
break;
}
DBUG_VOID_RETURN;
}
int ha_ndbcluster::set_range_data(void *tab_ref, partition_info *part_info)
{
NDBTAB *tab= (NDBTAB*)tab_ref;
int32 *range_data= (int32*)my_malloc(part_info->no_parts*sizeof(int32),
MYF(0));
uint i;
int error= 0;
DBUG_ENTER("set_range_data");
if (!range_data)
{
mem_alloc_error(part_info->no_parts*sizeof(int32));
DBUG_RETURN(1);
}
for (i= 0; i < part_info->no_parts; i++)
{
longlong range_val= part_info->range_int_array[i];
if (range_val < INT_MIN32 || range_val > INT_MAX32)
{
my_error(ER_LIMITED_PART_RANGE, MYF(0), "NDB");
error= 1;
goto error;
}
range_data[i]= (int32)range_val;
}
tab->setRangeListData(range_data, sizeof(int32)*part_info->no_parts);
error:
my_free((char*)range_data, MYF(0));
DBUG_RETURN(error);
}
int ha_ndbcluster::set_list_data(void *tab_ref, partition_info *part_info)
{
NDBTAB *tab= (NDBTAB*)tab_ref;
int32 *list_data= (int32*)my_malloc(part_info->no_list_values * 2
* sizeof(int32), MYF(0));
uint32 *part_id, i;
int error= 0;
DBUG_ENTER("set_list_data");
if (!list_data)
{
mem_alloc_error(part_info->no_list_values*2*sizeof(int32));
DBUG_RETURN(1);
}
for (i= 0; i < part_info->no_list_values; i++)
{
LIST_PART_ENTRY *list_entry= &part_info->list_array[i];
longlong list_val= list_entry->list_value;
if (list_val < INT_MIN32 || list_val > INT_MAX32)
{
my_error(ER_LIMITED_PART_RANGE, MYF(0), "NDB");
error= 1;
goto error;
}
list_data[2*i]= (int32)list_val;
part_id= (uint32*)&list_data[2*i+1];
*part_id= list_entry->partition_id;
}
tab->setRangeListData(list_data, 2*sizeof(int32)*part_info->no_list_values);
error:
my_free((char*)list_data, MYF(0));
DBUG_RETURN(error);
}
/*
User defined partitioning set-up. We need to check how many fragments the
user wants defined and which node groups to put those into. Later we also
@ -8896,12 +8943,18 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
TABLE *table,
void *tab_par)
{
DBUG_ENTER("ha_ndbcluster::set_up_partition_info");
ushort node_group[MAX_PARTITIONS];
ulong ng_index= 0, i, j;
uint16 frag_data[MAX_PARTITIONS];
char *ts_names[MAX_PARTITIONS];
ulong ts_index= 0, fd_index= 0, i, j;
NDBTAB *tab= (NDBTAB*)tab_par;
NDBTAB::FragmentType ftype= NDBTAB::UserDefined;
partition_element *part_elem;
bool first= TRUE;
uint ts_id, ts_version, part_count= 0, tot_ts_name_len;
List_iterator<partition_element> part_it(part_info->partitions);
int error;
char *name_ptr;
DBUG_ENTER("ha_ndbcluster::set_up_partition_info");
if (part_info->part_type == HASH_PARTITION &&
part_info->list_of_part_fields == TRUE)
@ -8920,93 +8973,60 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
col->setPartitionKey(TRUE);
}
}
List_iterator<partition_element> part_it(part_info->partitions);
for (i= 0; i < part_info->no_parts; i++)
else if (part_info->part_type == RANGE_PARTITION)
{
if ((error= set_range_data((void*)tab, part_info)))
{
DBUG_RETURN(error);
}
}
else if (part_info->part_type == LIST_PARTITION)
{
if ((error= set_list_data((void*)tab, part_info)))
{
DBUG_RETURN(error);
}
}
tab->setFragmentType(ftype);
i= 0;
tot_ts_name_len= 0;
do
{
uint ng;
part_elem= part_it++;
if (!is_sub_partitioned(part_info))
{
node_group[ng_index++]= part_elem->nodegroup_id;
//Here we should insert tablespace id based on tablespace name
ng= part_elem->nodegroup_id;
if (first && ng == UNDEF_NODEGROUP)
ng= 0;
ts_names[fd_index]= part_elem->tablespace_name;
frag_data[fd_index++]= ng;
}
else
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
for (j= 0; j < part_info->no_subparts; j++)
j= 0;
do
{
part_elem= sub_it++;
node_group[ng_index++]= part_elem->nodegroup_id;
//Here we should insert tablespace id based on tablespace name
}
ng= part_elem->nodegroup_id;
if (first && ng == UNDEF_NODEGROUP)
ng= 0;
ts_names[fd_index]= part_elem->tablespace_name;
frag_data[fd_index++]= ng;
} while (++j < part_info->no_subparts);
}
}
{
uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
if (ng_index > 4 * no_nodes)
{
DBUG_RETURN(1300);
}
}
tab->setNodeGroupIds(&node_group, ng_index);
tab->setFragmentType(ftype);
first= FALSE;
} while (++i < part_info->no_parts);
tab->setDefaultNoPartitionsFlag(part_info->use_default_no_partitions);
tab->setMaxRows(table->s->max_rows);
tab->setTablespaceNames(ts_names, fd_index*sizeof(char*));
tab->setFragmentCount(fd_index);
tab->setFragmentData(&frag_data, fd_index*2);
DBUG_RETURN(0);
}
/*
This routine is used to set-up fragmentation when the user has only specified
ENGINE = NDB and no user defined partitioning what so ever. Thus all values
will be based on default values. We will choose Linear Hash or Hash with
perfect spread dependent on a session variable defined in MySQL.
*/
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
{
NDBTAB::FragmentType ftype= NDBTAB::DistrKeyHash;
ushort node_group[MAX_PARTITIONS];
uint no_nodes= g_ndb_cluster_connection->no_db_nodes(), no_fragments, i;
DBUG_ENTER("ndb_set_fragmentation");
if (form->s->max_rows == (ha_rows) 0)
{
no_fragments= no_nodes;
}
else
{
/*
Ensure that we get enough fragments to handle all rows and ensure that
the table is fully distributed by keeping the number of fragments a
multiple of the number of nodes.
*/
uint fragments= get_no_fragments(form->s->max_rows);
if (adjusted_frag_count(fragments, no_nodes, no_fragments))
{
push_warning(current_thd,
MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
"Ndb might have problems storing the max amount of rows specified");
}
}
/*
Always start with node group 0 and continue with next node group from
there
*/
node_group[0]= 0;
for (i= 1; i < no_fragments; i++)
node_group[i]= UNDEF_NODEGROUP;
switch (opt_ndb_distribution_id)
{
case ND_KEYHASH:
ftype= NDBTAB::DistrKeyHash;
break;
case ND_LINHASH:
ftype= NDBTAB::DistrKeyLin;
break;
}
tab.setFragmentType(ftype);
tab.setNodeGroupIds(&node_group, no_fragments);
DBUG_VOID_RETURN;
}
bool ha_ndbcluster::check_if_incompatible_data(HA_CREATE_INFO *info,
uint table_changes)
{
@ -9259,3 +9279,41 @@ ndberror:
DBUG_RETURN(1);
}
bool ha_ndbcluster::get_no_parts(const char *name, uint *no_parts)
{
Ndb *ndb;
NDBDICT *dict;
const NDBTAB *tab;
int err;
DBUG_ENTER("ha_ndbcluster::get_no_parts");
set_dbname(name);
set_tabname(name);
do
{
if (check_ndb_connection())
{
err= HA_ERR_NO_CONNECTION;
break;
}
ndb= get_ndb();
dict= ndb->getDictionary();
if (!(tab= dict->getTable(m_tabname)))
ERR_BREAK(dict->getNdbError(), err);
// Check if thread has stale local cache
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
invalidate_dictionary_cache(FALSE);
if (!(tab= dict->getTable(m_tabname)))
ERR_BREAK(dict->getNdbError(), err);
}
*no_parts= tab->getFragmentCount();
DBUG_RETURN(FALSE);
} while (1);
end:
print_error(err, MYF(0));
DBUG_RETURN(TRUE);
}