From 2d88c4befb93c5d02f91aa01c33824d86d056188 Mon Sep 17 00:00:00 2001 From: Sergey Petrunya Date: Thu, 27 Sep 2012 11:59:14 +0400 Subject: [PATCH] Cassandra SE - Support UPDATE statements - Follow what CQL does: don't show deleted rows (they show up as rows without any columns in reads) --- mysql-test/r/cassandra.result | 28 +++++++++- mysql-test/t/cassandra.test | 19 +++++++ storage/cassandra/cassandra_se.cc | 46 +++++++++++++++- storage/cassandra/cassandra_se.h | 10 ++++ storage/cassandra/ha_cassandra.cc | 88 ++++++++++++++++++++++++++++++- storage/cassandra/ha_cassandra.h | 1 + 6 files changed, 188 insertions(+), 4 deletions(-) diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 6497eab6dd2..07720bb5b23 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -41,7 +41,6 @@ select * from t1; pk data1 data2 rowkey12 data1-value3 454 rowkey10 data1-value 123456 -rowkey11 NULL NULL delete from t1; select * from t1; pk data1 data2 @@ -381,3 +380,30 @@ alter table t2 column_family='cf12'; Writes made during ALTER TABLE 0 drop table t2; +# +# UPDATE command support +# +create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra +thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; +insert into t1 values ('rowkey10', 'data1-value', 123456); +insert into t1 values ('rowkey11', 'data1-value2', 34543); +insert into t1 values ('rowkey12', 'data1-value3', 454); +select * from t1; +pk data1 data2 +rowkey12 data1-value3 454 +rowkey10 data1-value 123456 +rowkey11 data1-value2 34543 +update t1 set data1='updated-1' where pk='rowkey11'; +select * from t1; +pk data1 data2 +rowkey12 data1-value3 454 +rowkey10 data1-value 123456 +rowkey11 updated-1 34543 +update t1 set pk='new-rowkey12' where pk='rowkey12'; +select * from t1; +pk data1 data2 +rowkey10 data1-value 123456 +new-rowkey12 data1-value3 454 +rowkey11 updated-1 34543 +delete from t1; +drop table t1; diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 3f5933e8b66..7e5b327580c 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -492,6 +492,25 @@ eval select ($c2 - $c1) as 'Writes made during ALTER TABLE'; drop table t2; +--echo # +--echo # UPDATE command support +--echo # +create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra + thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; + +insert into t1 values ('rowkey10', 'data1-value', 123456); +insert into t1 values ('rowkey11', 'data1-value2', 34543); +insert into t1 values ('rowkey12', 'data1-value3', 454); +select * from t1; + +update t1 set data1='updated-1' where pk='rowkey11'; +select * from t1; +update t1 set pk='new-rowkey12' where pk='rowkey12'; +select * from t1; + +delete from t1; +drop table t1; + ############################################################################ ## Cassandra cleanup ############################################################################ diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index e46d02f48d9..2239323bd20 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -99,6 +99,8 @@ public: void clear_insert_buffer(); void start_row_insert(const char *key, int key_len); void add_insert_column(const char *name, const char *value, int value_len); + void add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names); bool do_insert(); @@ -313,6 +315,42 @@ void Cassandra_se_impl::start_row_insert(const char *key, int key_len) } +void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names) +{ + std::string key_to_delete; + key_to_delete.assign(key, key_len); + + batch_mutation[key_to_delete]= ColumnFamilyToMutation(); + ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete]; + + cf_mut[column_family]= std::vector(); + std::vector &mutation_list= cf_mut[column_family]; + + Mutation mut; + mut.__isset.deletion= true; + mut.deletion.__isset.timestamp= true; + mut.deletion.timestamp= get_i64_timestamp(); + mut.deletion.__isset.predicate= true; + + /* + Attempting to delete columns with SliceRange causes exception with message + "Deletion does not yet support SliceRange predicates". + + Delete all columns individually. + */ + SlicePredicate slice_pred; + slice_pred.__isset.column_names= true; + const char *col_name; + while ((col_name= col_names->get_next_name())) + slice_pred.column_names.push_back(std::string(col_name)); + + mut.deletion.predicate= slice_pred; + + mutation_list.push_back(mut); +} + + void Cassandra_se_impl::add_insert_column(const char *name, const char *value, int value_len) { @@ -531,7 +569,13 @@ restart: } } - if (have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) + /* + (1) - skip the last row that we have read in the previous batch. + (2) - Rows that were deleted show up as rows without any columns. Skip + them, like CQL does. + */ + if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1) + key_slice_it->columns.size() == 0) // (2) { key_slice_it++; goto restart; diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index c6de779f8bc..069a3b238f9 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -20,6 +20,14 @@ typedef enum THREE = 8-1, } enum_cassandra_consistency_level; + +class Column_name_enumerator +{ +public: + virtual const char* get_next_name()=0; + virtual ~Column_name_enumerator(){} +}; + /* Interface to one cassandra column family, i.e. one 'table' */ @@ -45,6 +53,8 @@ public: /* Writes */ virtual void clear_insert_buffer()=0; + virtual void add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names)=0; virtual void start_row_insert(const char *key, int key_len)=0; virtual void add_insert_column(const char *name, const char *value, int value_len)=0; diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index f9aa8cd40a8..7ec7f7aa4eb 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -1605,11 +1605,95 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key, } +class Column_name_enumerator_impl : public Column_name_enumerator +{ + ha_cassandra *obj; + uint idx; +public: + Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {} + const char* get_next_name() + { + if (idx == obj->table->s->fields) + return NULL; + else + return obj->table->field[idx++]->field_name; + } +}; + + int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) { - + my_bitmap_map *old_map; DBUG_ENTER("ha_cassandra::update_row"); - DBUG_RETURN(HA_ERR_WRONG_COMMAND); + /* Currently, it is guaranteed that new_data == table->record[0] */ + + /* For now, just rewrite the full record */ + se->clear_insert_buffer(); + + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + char *old_key; + int old_key_len; + se->get_read_rowkey(&old_key, &old_key_len); + + /* Get the key we're going to write */ + char *new_key; + int new_key_len; + if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + rowkey_converter->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + + /* + Compare it to the key we've read. For all types that Cassandra supports, + binary byte-wise comparison can be used + */ + bool new_primary_key; + if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len)) + new_primary_key= true; + else + new_primary_key= false; + + + if (new_primary_key) + { + /* + Primary key value changed. This is essentially a DELETE + INSERT. + Add a DELETE operation into the batch + */ + Column_name_enumerator_impl name_enumerator(this); + se->add_row_deletion(old_key, old_key_len, &name_enumerator); + } + + se->start_row_insert(new_key, new_key_len); + + /* Convert other fields */ + for (uint i= 1; i < table->s->fields; i++) + { + char *cass_data; + int cass_data_len; + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + se->add_insert_column(field_converters[i]->field->field_name, + cass_data, cass_data_len); + } + dbug_tmp_restore_column_map(table->read_set, old_map); + + bool res= se->do_insert(); + + if (res) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); } diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index 52fd46fa5ef..f94dbc1c49d 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -45,6 +45,7 @@ struct ha_table_option_struct; */ class ha_cassandra: public handler { + friend class Column_name_enumerator_impl; THR_LOCK_DATA lock; ///< MySQL lock CASSANDRA_SHARE *share; ///< Shared lock info