mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
Cassandra SE
- Support UPDATE statements - Follow what CQL does: don't show deleted rows (they show up as rows without any columns in reads)
This commit is contained in:
@@ -41,7 +41,6 @@ select * from t1;
|
|||||||
pk data1 data2
|
pk data1 data2
|
||||||
rowkey12 data1-value3 454
|
rowkey12 data1-value3 454
|
||||||
rowkey10 data1-value 123456
|
rowkey10 data1-value 123456
|
||||||
rowkey11 NULL NULL
|
|
||||||
delete from t1;
|
delete from t1;
|
||||||
select * from t1;
|
select * from t1;
|
||||||
pk data1 data2
|
pk data1 data2
|
||||||
@@ -381,3 +380,30 @@ alter table t2 column_family='cf12';
|
|||||||
Writes made during ALTER TABLE
|
Writes made during ALTER TABLE
|
||||||
0
|
0
|
||||||
drop table t2;
|
drop table t2;
|
||||||
|
#
|
||||||
|
# UPDATE command support
|
||||||
|
#
|
||||||
|
create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
|
||||||
|
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
|
||||||
|
insert into t1 values ('rowkey10', 'data1-value', 123456);
|
||||||
|
insert into t1 values ('rowkey11', 'data1-value2', 34543);
|
||||||
|
insert into t1 values ('rowkey12', 'data1-value3', 454);
|
||||||
|
select * from t1;
|
||||||
|
pk data1 data2
|
||||||
|
rowkey12 data1-value3 454
|
||||||
|
rowkey10 data1-value 123456
|
||||||
|
rowkey11 data1-value2 34543
|
||||||
|
update t1 set data1='updated-1' where pk='rowkey11';
|
||||||
|
select * from t1;
|
||||||
|
pk data1 data2
|
||||||
|
rowkey12 data1-value3 454
|
||||||
|
rowkey10 data1-value 123456
|
||||||
|
rowkey11 updated-1 34543
|
||||||
|
update t1 set pk='new-rowkey12' where pk='rowkey12';
|
||||||
|
select * from t1;
|
||||||
|
pk data1 data2
|
||||||
|
rowkey10 data1-value 123456
|
||||||
|
new-rowkey12 data1-value3 454
|
||||||
|
rowkey11 updated-1 34543
|
||||||
|
delete from t1;
|
||||||
|
drop table t1;
|
||||||
|
@@ -492,6 +492,25 @@ eval select ($c2 - $c1) as 'Writes made during ALTER TABLE';
|
|||||||
|
|
||||||
drop table t2;
|
drop table t2;
|
||||||
|
|
||||||
|
--echo #
|
||||||
|
--echo # UPDATE command support
|
||||||
|
--echo #
|
||||||
|
create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
|
||||||
|
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
|
||||||
|
|
||||||
|
insert into t1 values ('rowkey10', 'data1-value', 123456);
|
||||||
|
insert into t1 values ('rowkey11', 'data1-value2', 34543);
|
||||||
|
insert into t1 values ('rowkey12', 'data1-value3', 454);
|
||||||
|
select * from t1;
|
||||||
|
|
||||||
|
update t1 set data1='updated-1' where pk='rowkey11';
|
||||||
|
select * from t1;
|
||||||
|
update t1 set pk='new-rowkey12' where pk='rowkey12';
|
||||||
|
select * from t1;
|
||||||
|
|
||||||
|
delete from t1;
|
||||||
|
drop table t1;
|
||||||
|
|
||||||
############################################################################
|
############################################################################
|
||||||
## Cassandra cleanup
|
## Cassandra cleanup
|
||||||
############################################################################
|
############################################################################
|
||||||
|
@@ -99,6 +99,8 @@ public:
|
|||||||
void clear_insert_buffer();
|
void clear_insert_buffer();
|
||||||
void start_row_insert(const char *key, int key_len);
|
void start_row_insert(const char *key, int key_len);
|
||||||
void add_insert_column(const char *name, const char *value, int value_len);
|
void add_insert_column(const char *name, const char *value, int value_len);
|
||||||
|
void add_row_deletion(const char *key, int key_len,
|
||||||
|
Column_name_enumerator *col_names);
|
||||||
|
|
||||||
bool do_insert();
|
bool do_insert();
|
||||||
|
|
||||||
@@ -313,6 +315,42 @@ void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
|
||||||
|
Column_name_enumerator *col_names)
|
||||||
|
{
|
||||||
|
std::string key_to_delete;
|
||||||
|
key_to_delete.assign(key, key_len);
|
||||||
|
|
||||||
|
batch_mutation[key_to_delete]= ColumnFamilyToMutation();
|
||||||
|
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete];
|
||||||
|
|
||||||
|
cf_mut[column_family]= std::vector<Mutation>();
|
||||||
|
std::vector<Mutation> &mutation_list= cf_mut[column_family];
|
||||||
|
|
||||||
|
Mutation mut;
|
||||||
|
mut.__isset.deletion= true;
|
||||||
|
mut.deletion.__isset.timestamp= true;
|
||||||
|
mut.deletion.timestamp= get_i64_timestamp();
|
||||||
|
mut.deletion.__isset.predicate= true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
Attempting to delete columns with SliceRange causes exception with message
|
||||||
|
"Deletion does not yet support SliceRange predicates".
|
||||||
|
|
||||||
|
Delete all columns individually.
|
||||||
|
*/
|
||||||
|
SlicePredicate slice_pred;
|
||||||
|
slice_pred.__isset.column_names= true;
|
||||||
|
const char *col_name;
|
||||||
|
while ((col_name= col_names->get_next_name()))
|
||||||
|
slice_pred.column_names.push_back(std::string(col_name));
|
||||||
|
|
||||||
|
mut.deletion.predicate= slice_pred;
|
||||||
|
|
||||||
|
mutation_list.push_back(mut);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
|
void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
|
||||||
int value_len)
|
int value_len)
|
||||||
{
|
{
|
||||||
@@ -531,7 +569,13 @@ restart:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key))
|
/*
|
||||||
|
(1) - skip the last row that we have read in the previous batch.
|
||||||
|
(2) - Rows that were deleted show up as rows without any columns. Skip
|
||||||
|
them, like CQL does.
|
||||||
|
*/
|
||||||
|
if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1)
|
||||||
|
key_slice_it->columns.size() == 0) // (2)
|
||||||
{
|
{
|
||||||
key_slice_it++;
|
key_slice_it++;
|
||||||
goto restart;
|
goto restart;
|
||||||
|
@@ -20,6 +20,14 @@ typedef enum
|
|||||||
THREE = 8-1,
|
THREE = 8-1,
|
||||||
} enum_cassandra_consistency_level;
|
} enum_cassandra_consistency_level;
|
||||||
|
|
||||||
|
|
||||||
|
class Column_name_enumerator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual const char* get_next_name()=0;
|
||||||
|
virtual ~Column_name_enumerator(){}
|
||||||
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Interface to one cassandra column family, i.e. one 'table'
|
Interface to one cassandra column family, i.e. one 'table'
|
||||||
*/
|
*/
|
||||||
@@ -45,6 +53,8 @@ public:
|
|||||||
|
|
||||||
/* Writes */
|
/* Writes */
|
||||||
virtual void clear_insert_buffer()=0;
|
virtual void clear_insert_buffer()=0;
|
||||||
|
virtual void add_row_deletion(const char *key, int key_len,
|
||||||
|
Column_name_enumerator *col_names)=0;
|
||||||
virtual void start_row_insert(const char *key, int key_len)=0;
|
virtual void start_row_insert(const char *key, int key_len)=0;
|
||||||
virtual void add_insert_column(const char *name, const char *value,
|
virtual void add_insert_column(const char *name, const char *value,
|
||||||
int value_len)=0;
|
int value_len)=0;
|
||||||
|
@@ -1605,11 +1605,95 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Column_name_enumerator_impl : public Column_name_enumerator
|
||||||
|
{
|
||||||
|
ha_cassandra *obj;
|
||||||
|
uint idx;
|
||||||
|
public:
|
||||||
|
Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
|
||||||
|
const char* get_next_name()
|
||||||
|
{
|
||||||
|
if (idx == obj->table->s->fields)
|
||||||
|
return NULL;
|
||||||
|
else
|
||||||
|
return obj->table->field[idx++]->field_name;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
|
int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
|
||||||
{
|
{
|
||||||
|
my_bitmap_map *old_map;
|
||||||
DBUG_ENTER("ha_cassandra::update_row");
|
DBUG_ENTER("ha_cassandra::update_row");
|
||||||
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
|
/* Currently, it is guaranteed that new_data == table->record[0] */
|
||||||
|
|
||||||
|
/* For now, just rewrite the full record */
|
||||||
|
se->clear_insert_buffer();
|
||||||
|
|
||||||
|
|
||||||
|
old_map= dbug_tmp_use_all_columns(table, table->read_set);
|
||||||
|
|
||||||
|
char *old_key;
|
||||||
|
int old_key_len;
|
||||||
|
se->get_read_rowkey(&old_key, &old_key_len);
|
||||||
|
|
||||||
|
/* Get the key we're going to write */
|
||||||
|
char *new_key;
|
||||||
|
int new_key_len;
|
||||||
|
if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
|
||||||
|
{
|
||||||
|
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
|
||||||
|
rowkey_converter->field->field_name, insert_lineno);
|
||||||
|
dbug_tmp_restore_column_map(table->read_set, old_map);
|
||||||
|
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Compare it to the key we've read. For all types that Cassandra supports,
|
||||||
|
binary byte-wise comparison can be used
|
||||||
|
*/
|
||||||
|
bool new_primary_key;
|
||||||
|
if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
|
||||||
|
new_primary_key= true;
|
||||||
|
else
|
||||||
|
new_primary_key= false;
|
||||||
|
|
||||||
|
|
||||||
|
if (new_primary_key)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Primary key value changed. This is essentially a DELETE + INSERT.
|
||||||
|
Add a DELETE operation into the batch
|
||||||
|
*/
|
||||||
|
Column_name_enumerator_impl name_enumerator(this);
|
||||||
|
se->add_row_deletion(old_key, old_key_len, &name_enumerator);
|
||||||
|
}
|
||||||
|
|
||||||
|
se->start_row_insert(new_key, new_key_len);
|
||||||
|
|
||||||
|
/* Convert other fields */
|
||||||
|
for (uint i= 1; i < table->s->fields; i++)
|
||||||
|
{
|
||||||
|
char *cass_data;
|
||||||
|
int cass_data_len;
|
||||||
|
if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
|
||||||
|
{
|
||||||
|
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
|
||||||
|
field_converters[i]->field->field_name, insert_lineno);
|
||||||
|
dbug_tmp_restore_column_map(table->read_set, old_map);
|
||||||
|
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
|
||||||
|
}
|
||||||
|
se->add_insert_column(field_converters[i]->field->field_name,
|
||||||
|
cass_data, cass_data_len);
|
||||||
|
}
|
||||||
|
dbug_tmp_restore_column_map(table->read_set, old_map);
|
||||||
|
|
||||||
|
bool res= se->do_insert();
|
||||||
|
|
||||||
|
if (res)
|
||||||
|
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
|
||||||
|
|
||||||
|
DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -45,6 +45,7 @@ struct ha_table_option_struct;
|
|||||||
*/
|
*/
|
||||||
class ha_cassandra: public handler
|
class ha_cassandra: public handler
|
||||||
{
|
{
|
||||||
|
friend class Column_name_enumerator_impl;
|
||||||
THR_LOCK_DATA lock; ///< MySQL lock
|
THR_LOCK_DATA lock; ///< MySQL lock
|
||||||
CASSANDRA_SHARE *share; ///< Shared lock info
|
CASSANDRA_SHARE *share; ///< Shared lock info
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user