diff --git a/include/my_base.h b/include/my_base.h index 617cdb8c3f0..503d858a41c 100644 --- a/include/my_base.h +++ b/include/my_base.h @@ -180,7 +180,12 @@ enum ha_extra_function { These flags are reset by the handler::extra(HA_EXTRA_RESET) call. */ HA_EXTRA_DELETE_CANNOT_BATCH, - HA_EXTRA_UPDATE_CANNOT_BATCH + HA_EXTRA_UPDATE_CANNOT_BATCH, + /* + Inform handler that an "INSERT...ON DUPLICATE KEY UPDATE" will be + executed. This condition is unset by HA_EXTRA_NO_IGNORE_DUP_KEY. + */ + HA_EXTRA_INSERT_WITH_UPDATE }; /* The following is parameter to ha_panic() */ diff --git a/mysql-test/r/federated.result b/mysql-test/r/federated.result index 6e0c139ee14..42228fac2c9 100644 --- a/mysql-test/r/federated.result +++ b/mysql-test/r/federated.result @@ -1843,6 +1843,45 @@ C3A4C3B6C3BCC39F D18DD184D184D0B5D0BAD182D0B8D0B2D0BDD183D18E drop table federated.t1; drop table federated.t1; +create table federated.t1 (a int primary key, b varchar(64)) +DEFAULT CHARSET=utf8; +create table federated.t1 (a int primary key, b varchar(64)) +ENGINE=FEDERATED +connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1' + DEFAULT CHARSET=utf8; +insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; +a b +1 Larry +2 Curly +truncate federated.t1; +replace into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; +a b +1 Moe +2 Curly +update ignore federated.t1 set a=a+1; +select * from federated.t1; +a b +1 Moe +3 Curly +drop table federated.t1; +drop table federated.t1; +create table federated.t1 (a int primary key, b varchar(64)) +DEFAULT CHARSET=utf8; +create table federated.t1 (a int primary key, b varchar(64)) +ENGINE=FEDERATED +connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1' + DEFAULT CHARSET=utf8; +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe") +on duplicate key update a=a+100; +ERROR 23000: Can't write; duplicate key in table 't1' +select * from federated.t1; +a b +1 Larry +2 Curly +drop table federated.t1; +drop table federated.t1; DROP TABLE IF EXISTS federated.t1; DROP DATABASE IF EXISTS federated; DROP TABLE IF EXISTS federated.t1; diff --git a/mysql-test/r/federated_innodb.result b/mysql-test/r/federated_innodb.result new file mode 100644 index 00000000000..70ba3acb279 --- /dev/null +++ b/mysql-test/r/federated_innodb.result @@ -0,0 +1,34 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +stop slave; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +create table federated.t1 (a int primary key, b varchar(64)) +engine=myisam; +create table federated.t1 (a int primary key, b varchar(64)) +engine=federated +connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'; +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +ERROR 23000: Can't write; duplicate key in table 't1' +select * from federated.t1; +a b +1 Larry +2 Curly +truncate federated.t1; +alter table federated.t1 engine=innodb; +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +ERROR 23000: Can't write; duplicate key in table 't1' +select * from federated.t1; +a b +drop table federated.t1; +drop table federated.t1; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; diff --git a/mysql-test/t/federated.test b/mysql-test/t/federated.test index 0eaead8011e..fa65568e9cc 100644 --- a/mysql-test/t/federated.test +++ b/mysql-test/t/federated.test @@ -1630,4 +1630,57 @@ connection slave; drop table federated.t1; +# +# BUG#21019 Federated Engine does not support REPLACE/INSERT IGNORE/UPDATE IGNORE +# +connection slave; +create table federated.t1 (a int primary key, b varchar(64)) + DEFAULT CHARSET=utf8; +connection master; +--replace_result $SLAVE_MYPORT SLAVE_PORT +eval create table federated.t1 (a int primary key, b varchar(64)) + ENGINE=FEDERATED + connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1' + DEFAULT CHARSET=utf8; + +insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +truncate federated.t1; +replace into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +update ignore federated.t1 set a=a+1; +select * from federated.t1; + +drop table federated.t1; +connection slave; +drop table federated.t1; + +# +# BUG#25511 Federated Insert failures. +# +# When the user performs a INSERT...ON DUPLICATE KEY UPDATE, we want +# it to fail if a duplicate key exists instead of ignoring it. +# +connection slave; +create table federated.t1 (a int primary key, b varchar(64)) + DEFAULT CHARSET=utf8; +connection master; +--replace_result $SLAVE_MYPORT SLAVE_PORT +eval create table federated.t1 (a int primary key, b varchar(64)) + ENGINE=FEDERATED + connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1' + DEFAULT CHARSET=utf8; + +--error ER_DUP_KEY +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe") +on duplicate key update a=a+100; +select * from federated.t1; + +drop table federated.t1; +connection slave; +drop table federated.t1; + + source include/federated_cleanup.inc; diff --git a/mysql-test/t/federated_innodb-slave.opt b/mysql-test/t/federated_innodb-slave.opt new file mode 100644 index 00000000000..627becdbfb5 --- /dev/null +++ b/mysql-test/t/federated_innodb-slave.opt @@ -0,0 +1 @@ +--innodb diff --git a/mysql-test/t/federated_innodb.test b/mysql-test/t/federated_innodb.test new file mode 100644 index 00000000000..772e37a2929 --- /dev/null +++ b/mysql-test/t/federated_innodb.test @@ -0,0 +1,34 @@ +source include/federated.inc; +source include/have_innodb.inc; + +# +# Bug#25513 Federated transaction failures +# +connection slave; +create table federated.t1 (a int primary key, b varchar(64)) + engine=myisam; +connection master; +--replace_result $SLAVE_MYPORT SLAVE_PORT +eval create table federated.t1 (a int primary key, b varchar(64)) + engine=federated + connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'; + +--error ER_DUP_KEY +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +connection slave; +truncate federated.t1; +alter table federated.t1 engine=innodb; +connection master; + +--error ER_DUP_KEY +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +drop table federated.t1; +connection slave; +drop table federated.t1; + + +source include/federated_cleanup.inc; diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index c0efa621422..72a381aeac2 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -695,6 +695,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, if (duplic == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + if (duplic == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); /* let's *try* to start bulk inserts. It won't necessary start them as values_list.elements should be greater than @@ -2538,6 +2540,8 @@ bool Delayed_insert::handle_inserts(void) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); using_opt_replace= 1; } + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); thd.clear_error(); // reset error for binlog if (write_record(&thd, table, &info)) { @@ -2882,6 +2886,8 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); thd->no_trans_update.stmt= FALSE; thd->abort_on_warning= (!info.ignore && (thd->variables.sql_mode & @@ -3469,6 +3475,8 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); if (!thd->prelocked_mode) table->file->ha_start_bulk_insert((ha_rows) 0); thd->no_trans_update.stmt= FALSE; diff --git a/storage/federated/ha_federated.cc b/storage/federated/ha_federated.cc index 96ce0013587..18460ad4cfc 100644 --- a/storage/federated/ha_federated.cc +++ b/storage/federated/ha_federated.cc @@ -388,6 +388,11 @@ /* Variables for federated share methods */ static HASH federated_open_tables; // To track open tables pthread_mutex_t federated_mutex; // To init the hash +static char ident_quote_char= '`'; // Character for quoting + // identifiers +static char value_quote_char= '\''; // Character for quoting + // literals +static const int bulk_padding= 64; // bytes "overhead" in packet /* Variables used when chopping off trailing characters */ static const uint sizeof_trailing_comma= sizeof(", ") - 1; @@ -415,7 +420,7 @@ static handler *federated_create_handler(handlerton *hton, /* Function we use in the creation of our hash to get key */ static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length, - my_bool not_used __attribute__ ((unused))) + my_bool not_used __attribute__ ((unused))) { *length= share->share_key_length; return (uchar*) share->share_key; @@ -477,6 +482,57 @@ int federated_done(void *p) } +/** + @brief Append identifiers to the string. + + @param[in,out] string The target string. + @param[in] name Identifier name + @param[in] length Length of identifier name in bytes + @param[in] quote_char Quote char to use for quoting identifier. + + @return Operation Status + @retval FALSE OK + @retval TRUE There was an error appending to the string. + + @note This function is based upon the append_identifier() function + in sql_show.cc except that quoting always occurs. +*/ + +static bool append_ident(String *string, const char *name, uint length, + const char quote_char) +{ + bool result; + uint clen; + const char *name_end; + DBUG_ENTER("append_ident"); + + if (quote_char) + { + string->reserve(length * 2 + 2); + if ((result= string->append("e_char, 1, system_charset_info))) + goto err; + + for (name_end= name+length; name < name_end; name+= clen) + { + uchar c= *(uchar *) name; + if (!(clen= my_mbcharlen(system_charset_info, c))) + clen= 1; + if (clen == 1 && c == (uchar) quote_char && + (result= string->append("e_char, 1, system_charset_info))) + goto err; + if ((result= string->append(name, clen, string->charset()))) + goto err; + } + result= string->append("e_char, 1, system_charset_info); + } + else + result= string->append(name, length, system_charset_info); + +err: + DBUG_RETURN(result); +} + + /* Check (in create) whether the tables exists, and that it can be connected to @@ -495,7 +551,6 @@ int federated_done(void *p) static int check_foreign_data_source(FEDERATED_SHARE *share, bool table_create_flag) { - char escaped_table_name[NAME_LEN*2]; char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; uint error_code; @@ -536,7 +591,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, } else { - int escaped_table_name_length= 0; /* Since we do not support transactions at this version, we can let the client API silently reconnect. For future versions, we will need more @@ -551,14 +605,10 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, the query will be: SELECT * FROM `tablename` WHERE 1=0 */ - query.append(STRING_WITH_LEN("SELECT * FROM `")); - escaped_table_name_length= - escape_string_for_mysql(&my_charset_bin, (char*)escaped_table_name, - sizeof(escaped_table_name), - share->table_name, - share->table_name_length); - query.append(escaped_table_name, escaped_table_name_length); - query.append(STRING_WITH_LEN("` WHERE 1=0")); + query.append(STRING_WITH_LEN("SELECT * FROM ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); + query.append(STRING_WITH_LEN(" WHERE 1=0"); if (mysql_real_query(mysql, query.ptr(), query.length())) { @@ -907,6 +957,7 @@ ha_federated::ha_federated(handlerton *hton, mysql(0), stored_result(0) { trx_next= 0; + bzero(&bulk_insert, sizeof(bulk_insert)); } @@ -969,9 +1020,8 @@ uint ha_federated::convert_row_to_internal_format(uchar *record, static bool emit_key_part_name(String *to, KEY_PART_INFO *part) { DBUG_ENTER("emit_key_part_name"); - if (to->append(STRING_WITH_LEN("`")) || - to->append(part->field->field_name) || - to->append(STRING_WITH_LEN("`"))) + if (append_ident(to, part->field->field_name, + strlen(part->field->field_name), ident_quote_char)) DBUG_RETURN(1); // Out of memory DBUG_RETURN(0); } @@ -1515,20 +1565,20 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) query.append(STRING_WITH_LEN("SELECT ")); for (field= table->field; *field; field++) { - query.append(STRING_WITH_LEN("`")); - query.append((*field)->field_name); - query.append(STRING_WITH_LEN("`, ")); + append_ident(&query, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); + query.append(STRING_WITH_LEN(", ")); } /* chops off trailing comma */ query.length(query.length() - sizeof_trailing_comma); query.append(STRING_WITH_LEN(" FROM `")); - query.append(tmp_share.table_name, tmp_share.table_name_length); - query.append(STRING_WITH_LEN("`")); - DBUG_PRINT("info", ("calling alloc_root")); + + append_ident(&query, tmp_share.table_name, + tmp_share.table_name_length, ident_quote_char); if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) || - !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length()))) + !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1))) goto error; share->use_count= 0; @@ -1669,6 +1719,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) table->s->reclength); DBUG_PRINT("info", ("ref_length: %u", ref_length)); + reset(); + DBUG_RETURN(0); } @@ -1741,6 +1793,83 @@ static inline uint field_in_record_is_null(TABLE *table, } +/** + @brief Construct the INSERT statement. + + @details This method will construct the INSERT statement and appends it to + the supplied query string buffer. + + @return + @retval FALSE No error + @retval TRUE Failure +*/ + +bool ha_federated::append_stmt_insert(String *query) +{ + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + Field **field; + uint tmp_length; + + /* The main insert query string */ + String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); + DBUG_ENTER("ha_federated::append_stmt_insert"); + + insert_string.length(0); + + if (replace_duplicates) + insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); + else if (ignore_duplicates && !insert_dup_update) + insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); + else + insert_string.append(STRING_WITH_LEN("INSERT INTO ")); + append_ident(&insert_string, share->table_name, share->table_name_length, + ident_quote_char); + insert_string.append(FEDERATED_OPENPAREN); + tmp_length= insert_string.length() - strlen(STRING_WITH_LEN(", ")); + + /* + loop through the field pointer array, add any fields to both the values + list and the fields list that match the current query id + */ + for (field= table->field; *field; field++) + { + if (bitmap_is_set(table->write_set, (*field)->field_index)) + { + /* append the field name */ + append_ident(&insert_string, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); + + /* append commas between both fields and fieldnames */ + /* + unfortunately, we can't use the logic if *(fields + 1) to + make the following appends conditional as we don't know if the + next field is in the write set + */ + insert_string.append(STRING_WITH_LEN(", ")); + } + } + + /* + remove trailing comma + */ + insert_string.length(insert_string.length() - sizeof_trailing_comma); + + /* + if there were no fields, we don't want to add a closing paren + AND, we don't want to chop off the last char '(' + insert will be "INSERT INTO t1 VALUES ();" + */ + if (insert_string.length() > tmp_length) + { + insert_string.append(STRING_WITH_LEN(") "); + } + + insert_string.append(STRING_WITH_LEN(" VALUES ")); + + DBUG_RETURN(query->append(insert_string)); +} + + /* write_row() inserts a row. No extra() hint is given currently if a bulk load is happeneding. buf() is a byte array of data. You can use the field @@ -1757,69 +1886,14 @@ static inline uint field_in_record_is_null(TABLE *table, int ha_federated::write_row(uchar *buf) { - /* - I need a bool again, in 5.0, I used table->s->fields to accomplish this. - This worked as a flag that says there are fields with values or not. - In 5.1, this value doesn't work the same, and I end up with the code - truncating open parenthesis: - - the statement "INSERT INTO t1 VALUES ()" ends up being first built - in two strings - "INSERT INTO t1 (" - and - " VALUES (" - - If there are fields with values, they get appended, with commas, and - the last loop, a trailing comma is there - - "INSERT INTO t1 ( col1, col2, colN, " - - " VALUES ( 'val1', 'val2', 'valN', " - - Then, if there are fields, it should decrement the string by ", " length. - - "INSERT INTO t1 ( col1, col2, colN" - " VALUES ( 'val1', 'val2', 'valN'" - - Then it adds a close paren to both - if there are fields - - "INSERT INTO t1 ( col1, col2, colN)" - " VALUES ( 'val1', 'val2', 'valN')" - - Then appends both together - "INSERT INTO t1 ( col1, col2, colN) VALUES ( 'val1', 'val2', 'valN')" - - So... the problem, is if you have the original statement: - - "INSERT INTO t1 VALUES ()" - - Which is legitimate, but if the code thinks there are fields - - "INSERT INTO t1 (" - " VALUES ( " - - If the field flag is set, but there are no commas, reduces the - string by strlen(", ") - - "INSERT INTO t1 " - " VALUES " - - Then adds the close parenthesis - - "INSERT INTO t1 )" - " VALUES )" - - So, I have to use a bool as before, set in the loop where fields and commas - are appended to the string - */ - my_bool commas_added= FALSE; - char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; Field **field; + uint tmp_length; + int error= 0; + bool use_bulk_insert; + bool auto_increment_update_required= (table->next_number_field != NULL); - /* The main insert query string */ - String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); /* The string containing the values to be added to the insert */ String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); /* The actual value of the field, to be added to the values_string */ @@ -1830,7 +1904,6 @@ int ha_federated::write_row(uchar *buf) DBUG_ENTER("ha_federated::write_row"); values_string.length(0); - insert_string.length(0); insert_field_value_string.length(0); ha_statistic_increment(&SSV::ha_write_count); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) @@ -1838,14 +1911,19 @@ int ha_federated::write_row(uchar *buf) /* start both our field and field values strings + We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE" + Ignore duplicates is always true when insert_dup_update is true. + When replace_duplicates == TRUE, we can safely enable multi-row insert. + When performing multi-row insert, we only collect the columns values for + the row. The start of the statement is only created when the first + row is copied in to the bulk_insert string. */ - insert_string.append(STRING_WITH_LEN("INSERT INTO `")); - insert_string.append(share->table_name, share->table_name_length); - insert_string.append('`'); - insert_string.append(STRING_WITH_LEN(" (")); + if (!(use_bulk_insert= bulk_insert.str && + (!insert_dup_update || replace_duplicates))) + append_stmt_insert(&values_string); - values_string.append(STRING_WITH_LEN(" VALUES ")); values_string.append(STRING_WITH_LEN(" (")); + tmp_length= values_string.length(); /* loop through the field pointer array, add any fields to both the values @@ -1855,7 +1933,6 @@ int ha_federated::write_row(uchar *buf) { if (bitmap_is_set(table->write_set, (*field)->field_index)) { - commas_added= TRUE; if ((*field)->is_null()) values_string.append(STRING_WITH_LEN(" NULL ")); else @@ -1863,15 +1940,13 @@ int ha_federated::write_row(uchar *buf) bool needs_quote= (*field)->str_needs_quotes(); (*field)->val_str(&insert_field_value_string); if (needs_quote) - values_string.append('\''); + values_string.append(value_quote_char); insert_field_value_string.print(&values_string); if (needs_quote) - values_string.append('\''); + values_string.append(value_quote_char); insert_field_value_string.length(0); } - /* append the field name */ - insert_string.append((*field)->field_name); /* append commas between both fields and fieldnames */ /* @@ -1879,7 +1954,6 @@ int ha_federated::write_row(uchar *buf) make the following appends conditional as we don't know if the next field is in the write set */ - insert_string.append(STRING_WITH_LEN(", ")); values_string.append(STRING_WITH_LEN(", ")); } } @@ -1890,26 +1964,53 @@ int ha_federated::write_row(uchar *buf) AND, we don't want to chop off the last char '(' insert will be "INSERT INTO t1 VALUES ();" */ - if (commas_added) + if (values_string.length() > tmp_length) { - insert_string.length(insert_string.length() - sizeof_trailing_comma); - /* chops off leading commas */ + /* chops off trailing comma */ values_string.length(values_string.length() - sizeof_trailing_comma); - insert_string.append(STRING_WITH_LEN(") ")); } - else - { - /* chops off trailing ) */ - insert_string.length(insert_string.length() - sizeof_trailing_closeparen); - } - /* we always want to append this, even if there aren't any fields */ values_string.append(STRING_WITH_LEN(") ")); - /* add the values */ - insert_string.append(values_string); + if (use_bulk_insert) + { + /* + Send the current bulk insert out if appending the current row would + cause the statement to overflow the packet size, otherwise set + auto_increment_update_required to FALSE as no query was executed. + */ + if (bulk_insert.length + values_string.length() + bulk_padding > + mysql->net.max_packet_size && bulk_insert.length) + { + error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length); + bulk_insert.length= 0; + } + else + auto_increment_update_required= FALSE; + + if (bulk_insert.length == 0) + { + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + String insert_string(insert_buffer, sizeof(insert_buffer), + &my_charset_bin); + insert_string.length(0); + append_stmt_insert(&insert_string); + dynstr_append_mem(&bulk_insert, insert_string.ptr(), + insert_string.length()); + } + else + dynstr_append_mem(&bulk_insert, ",", 1); - if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) + dynstr_append_mem(&bulk_insert, values_string.ptr(), + values_string.length()); + } + else + { + error= mysql_real_query(mysql, values_string.ptr(), + values_string.length()); + } + + if (error) { DBUG_RETURN(stash_remote_error()); } @@ -1917,12 +2018,79 @@ int ha_federated::write_row(uchar *buf) If the table we've just written a record to contains an auto_increment field, then store the last_insert_id() value from the foreign server */ - if (table->next_number_field) + if (auto_increment_update_required) update_auto_increment(); DBUG_RETURN(0); } + +/** + @brief Prepares the storage engine for bulk inserts. + + @param[in] rows estimated number of rows in bulk insert + or 0 if unknown. + + @details Initializes memory structures required for bulk insert. +*/ + +void ha_federated::start_bulk_insert(ha_rows rows) +{ + uint page_size; + DBUG_ENTER("ha_federated::start_bulk_insert"); + + dynstr_free(&bulk_insert); + + /** + We don't bother with bulk-insert semantics when the estimated rows == 1 + The rows value will be 0 if the server does not know how many rows + would be inserted. This can occur when performing INSERT...SELECT + */ + + if (rows == 1) + DBUG_VOID_RETURN; + + page_size= (uint) my_getpagesize(); + + if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size)) + DBUG_VOID_RETURN; + + bulk_insert.length= 0; + DBUG_VOID_RETURN; +} + + +/** + @brief End bulk insert. + + @details This method will send any remaining rows to the remote server. + Finally, it will deinitialize the bulk insert data structure. + + @return Operation status + @retval 0 No error + @retval != 0 Error occured at remote server. Also sets my_errno. +*/ + +int ha_federated::end_bulk_insert() +{ + int error= 0; + DBUG_ENTER("ha_federated::end_bulk_insert"); + + if (bulk_insert.str && bulk_insert.length) + { + if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length)) + error= stash_remote_error(); + else + if (table->next_number_field) + update_auto_increment(); + } + + dynstr_free(&bulk_insert); + + DBUG_RETURN(my_errno= error); +} + + /* ha_federated::update_auto_increment @@ -1952,9 +2120,9 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt) query.length(0); query.set_charset(system_charset_info); - query.append(STRING_WITH_LEN("OPTIMIZE TABLE `")); - query.append(share->table_name, share->table_name_length); - query.append(STRING_WITH_LEN("`")); + query.append(STRING_WITH_LEN("OPTIMIZE TABLE ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); if (mysql_real_query(mysql, query.ptr(), query.length())) { @@ -1974,9 +2142,9 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt) query.length(0); query.set_charset(system_charset_info); - query.append(STRING_WITH_LEN("REPAIR TABLE `")); - query.append(share->table_name, share->table_name_length); - query.append(STRING_WITH_LEN("`")); + query.append(STRING_WITH_LEN("REPAIR TABLE ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); if (check_opt->flags & T_QUICK) query.append(STRING_WITH_LEN(" QUICK")); if (check_opt->flags & T_EXTEND) @@ -2053,9 +2221,13 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data) update_string.length(0); where_string.length(0); - update_string.append(STRING_WITH_LEN("UPDATE `")); - update_string.append(share->table_name); - update_string.append(STRING_WITH_LEN("` SET ")); + if (ignore_duplicates) + update_string.append(STRING_WITH_LEN("UPDATE IGNORE ")); + else + update_string.append(STRING_WITH_LEN("UPDATE ")); + append_ident(&update_string, share->table_name, + share->table_name_length, ident_quote_char); + update_string.append(STRING_WITH_LEN(" SET ")); /* In this loop, we want to match column names to values being inserted @@ -2071,7 +2243,9 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data) { if (bitmap_is_set(table->write_set, (*field)->field_index)) { - update_string.append((*field)->field_name); + uint field_name_length= strlen((*field)->field_name); + append_ident(&update_string, (*field)->field_name, field_name_length, + ident_quote_char); update_string.append(STRING_WITH_LEN(" = ")); if ((*field)->is_null()) @@ -2083,10 +2257,10 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data) bool needs_quote= (*field)->str_needs_quotes(); (*field)->val_str(&field_value); if (needs_quote) - update_string.append('\''); + update_string.append(value_quote_char); field_value.print(&update_string); if (needs_quote) - update_string.append('\''); + update_string.append(value_quote_char); field_value.length(0); tmp_restore_column_map(table->read_set, old_map); } @@ -2095,7 +2269,9 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data) if (bitmap_is_set(table->read_set, (*field)->field_index)) { - where_string.append((*field)->field_name); + uint field_name_length= strlen((*field)->field_name); + append_ident(&where_string, (*field)->field_name, field_name_length, + ident_quote_char); if (field_in_record_is_null(table, *field, (char*) old_data)) where_string.append(STRING_WITH_LEN(" IS NULL ")); else @@ -2105,10 +2281,10 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data) (*field)->val_str(&field_value, (old_data + (*field)->offset(record))); if (needs_quote) - where_string.append('\''); + where_string.append(value_quote_char); field_value.print(&where_string); if (needs_quote) - where_string.append('\''); + where_string.append(value_quote_char); field_value.length(0); } where_string.append(STRING_WITH_LEN(" AND ")); @@ -2165,9 +2341,10 @@ int ha_federated::delete_row(const uchar *buf) DBUG_ENTER("ha_federated::delete_row"); delete_string.length(0); - delete_string.append(STRING_WITH_LEN("DELETE FROM `")); - delete_string.append(share->table_name); - delete_string.append(STRING_WITH_LEN("` WHERE ")); + delete_string.append(STRING_WITH_LEN("DELETE FROM ")); + append_ident(&delete_string, share->table_name, + share->table_name_length, ident_quote_char); + delete_string.append(STRING_WITH_LEN(" WHERE ")); for (Field **field= table->field; *field; field++) { @@ -2175,8 +2352,9 @@ int ha_federated::delete_row(const uchar *buf) found++; if (bitmap_is_set(table->read_set, cur_field->field_index)) { + append_ident(&delete_string, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); data_string.length(0); - delete_string.append(cur_field->field_name); if (cur_field->is_null()) { delete_string.append(STRING_WITH_LEN(" IS NULL ")); @@ -2187,13 +2365,12 @@ int ha_federated::delete_row(const uchar *buf) delete_string.append(STRING_WITH_LEN(" = ")); cur_field->val_str(&data_string); if (needs_quote) - delete_string.append('\''); + delete_string.append(value_quote_char); data_string.print(&delete_string); if (needs_quote) - delete_string.append('\''); + delete_string.append(value_quote_char); } delete_string.append(STRING_WITH_LEN(" AND ")); - } } // Remove trailing AND @@ -2680,7 +2857,6 @@ int ha_federated::info(uint flag) { char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; - char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE]; int error; uint error_code; MYSQL_RES *result= 0; @@ -2693,13 +2869,9 @@ int ha_federated::info(uint flag) if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST)) { status_query_string.length(0); - status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE '")); - escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name, - sizeof(escaped_table_name), - share->table_name, - share->table_name_length); - status_query_string.append(escaped_table_name); - status_query_string.append(STRING_WITH_LEN("'")); + status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE ")); + append_ident(&status_query_string, share->table_name, + share->table_name_length, value_quote_char); if (mysql_real_query(mysql, status_query_string.ptr(), status_query_string.length())) @@ -2770,6 +2942,51 @@ error: } +/** + @brief Handles extra signals from MySQL server + + @param[in] operation Hint for storage engine + + @return Operation Status + @retval 0 OK + */ +int ha_federated::extra(ha_extra_function operation) +{ + DBUG_ENTER("ha_federated::extra"); + switch (operation) { + case HA_EXTRA_IGNORE_DUP_KEY: + ignore_duplicates= TRUE; + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + break; + case HA_EXTRA_WRITE_CAN_REPLACE: + replace_duplicates= TRUE; + break; + case HA_EXTRA_WRITE_CANNOT_REPLACE: + /* + We use this flag to ensure that we do not create an "INSERT IGNORE" + statement when inserting new rows into the remote table. + */ + replace_duplicates= FALSE; + break; + case HA_EXTRA_INSERT_WITH_UPDATE: + insert_dup_update= TRUE; + break; + case HA_EXTRA_RESET: + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + replace_duplicates= FALSE; + break; + default: + /* do nothing */ + DBUG_PRINT("info",("unhandled operation: %d", (uint) operation)); + } + DBUG_RETURN(0); +} + + /* Used to delete all rows in a table. Both for cases of truncate and for cases where the optimizer realizes that all rows will be @@ -2791,9 +3008,9 @@ int ha_federated::delete_all_rows() query.length(0); query.set_charset(system_charset_info); - query.append(STRING_WITH_LEN("TRUNCATE `")); - query.append(share->table_name); - query.append(STRING_WITH_LEN("`")); + query.append(STRING_WITH_LEN("TRUNCATE ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); /* TRUNCATE won't return anything in mysql_affected_rows @@ -2901,6 +3118,9 @@ int ha_federated::stash_remote_error() DBUG_ENTER("ha_federated::stash_remote_error()"); remote_error_number= mysql_errno(mysql); strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1); + if (remote_error_number == ER_DUP_ENTRY || + remote_error_number == ER_DUP_KEY) + DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM); } diff --git a/storage/federated/ha_federated.h b/storage/federated/ha_federated.h index 861f50fad06..399451888e2 100644 --- a/storage/federated/ha_federated.h +++ b/storage/federated/ha_federated.h @@ -88,6 +88,9 @@ class ha_federated: public handler MYSQL_ROW_OFFSET current_position; // Current position used by ::position() int remote_error_number; char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; + bool ignore_duplicates, replace_duplicates; + bool insert_dup_update; + DYNAMIC_STRING bulk_insert; private: /* @@ -102,6 +105,14 @@ private: bool records_in_range, bool eq_range); int stash_remote_error(); + bool append_stmt_insert(String *query); + + int read_next(byte *buf, MYSQL_RES *result); + int index_read_idx_with_result_set(byte *buf, uint index, + const byte *key, + uint key_len, + ha_rkey_function find_flag, + MYSQL_RES **result); public: ha_federated(handlerton *hton, TABLE_SHARE *table_arg); ~ha_federated() {} @@ -189,6 +200,8 @@ public: int open(const char *name, int mode, uint test_if_locked); // required int close(void); // required + void start_bulk_insert(ha_rows rows); + int end_bulk_insert(); int write_row(uchar *buf); int update_row(const uchar *old_data, uchar *new_data); int delete_row(const uchar *buf); @@ -217,6 +230,7 @@ public: int rnd_pos(uchar *buf, uchar *pos); //required void position(const uchar *record); //required int info(uint); //required + int extra(ha_extra_function operation); void update_auto_increment(void); int repair(THD* thd, HA_CHECK_OPT* check_opt); @@ -231,18 +245,11 @@ public: THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type); //required - virtual bool get_error_message(int error, String *buf); + bool get_error_message(int error, String *buf); int external_lock(THD *thd, int lock_type); int connection_commit(); int connection_rollback(); int connection_autocommit(bool state); int execute_simple_query(const char *query, int len); - - int read_next(uchar *buf, MYSQL_RES *result); - int index_read_idx_with_result_set(uchar *buf, uint index, - const uchar *key, - uint key_len, - ha_rkey_function find_flag, - MYSQL_RES **result); };