diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index f1d86224adb..1154e98475d 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -401,6 +401,79 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table, } +/** + Upgrade table-level lock of INSERT statement to TL_WRITE if + a more concurrent lock is infeasible for some reason. This is + necessary for engines without internal locking support (MyISAM). + An engine with internal locking implementation might later + downgrade the lock in handler::store_lock() method. +*/ + +void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, + enum_duplicates duplic, + bool is_multi_insert) +{ + if (duplic == DUP_UPDATE || + duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT) + { + *lock_type= TL_WRITE; + return; + } + + if (*lock_type == TL_WRITE_DELAYED) + { +#ifdef EMBEDDED_LIBRARY + /* No auxiliary threads in the embedded server. */ + *lock_type= TL_WRITE; + return; +#else + /* + We do not use delayed threads if: + - we're running in the safe mode or skip-new - the feature + is disabled in these modes + - we're running this query in statement level replication, + on a replication slave - because we must ensure serial + execution of queries on the slave + - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the + insert cannot be concurrent + */ + if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) || + thd->slave_thread || + thd->variables.max_insert_delayed_threads == 0) + { + *lock_type= TL_WRITE; + return; + } +#endif + bool log_on= (thd->options & OPTION_BIN_LOG || + ! (thd->security_ctx->master_access & SUPER_ACL)); + if (log_on && mysql_bin_log.is_open() && is_multi_insert) + { + /* + Statement-based binary logging does not work in this case, because: + a) two concurrent statements may have their rows intermixed in the + queue, leading to autoincrement replication problems on slave (because + the values generated used for one statement don't depend only on the + value generated for the first row of this statement, so are not + replicable) + b) if first row of the statement has an error the full statement is + not binlogged, while next rows of the statement may be inserted. + c) if first row succeeds, statement is binlogged immediately with a + zero error code (i.e. "no error"), if then second row fails, query + will fail on slave too and slave will stop (wrongly believing that the + master got no error). + So we fall back to non-delayed INSERT. + */ + *lock_type= TL_WRITE; + } + } +} + + +/** + INSERT statement implementation +*/ + bool mysql_insert(THD *thd,TABLE_LIST *table_list, List &fields, List &values_list, @@ -436,58 +509,31 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, DBUG_ENTER("mysql_insert"); /* - in safe mode or with skip-new change delayed insert to be regular - if we are told to replace duplicates, the insert cannot be concurrent - delayed insert changed to regular in slave thread - */ -#ifdef EMBEDDED_LIBRARY - if (lock_type == TL_WRITE_DELAYED) - lock_type=TL_WRITE; -#else - if ((lock_type == TL_WRITE_DELAYED && - ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) || - thd->slave_thread || !thd->variables.max_insert_delayed_threads)) || - (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) || - (duplic == DUP_UPDATE)) - lock_type=TL_WRITE; -#endif - if ((lock_type == TL_WRITE_DELAYED) && - log_on && mysql_bin_log.is_open() && - (values_list.elements > 1)) + Upgrade lock type if the requested lock is incompatible with + the current connection mode or table operation. + */ + upgrade_lock_type(thd, &table_list->lock_type, duplic, + values_list.elements > 1); + lock_type= table_list->lock_type; + + /* + We can't write-delayed into a table locked with LOCK TABLES: + this will lead to a deadlock, since the delayed thread will + never be able to get a lock on the table. QQQ: why not + upgrade the lock here instead? + */ + if (lock_type == TL_WRITE_DELAYED && thd->locked_tables && + find_locked_table(thd, table_list->db, table_list->table_name)) { - /* - Statement-based binary logging does not work in this case, because: - a) two concurrent statements may have their rows intermixed in the - queue, leading to autoincrement replication problems on slave (because - the values generated used for one statement don't depend only on the - value generated for the first row of this statement, so are not - replicable) - b) if first row of the statement has an error the full statement is - not binlogged, while next rows of the statement may be inserted. - c) if first row succeeds, statement is binlogged immediately with a - zero error code (i.e. "no error"), if then second row fails, query - will fail on slave too and slave will stop (wrongly believing that the - master got no error). - So we fallback to non-delayed INSERT. - */ - lock_type= TL_WRITE; + my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), + table_list->table_name); + DBUG_RETURN(TRUE); } - table_list->lock_type= lock_type; #ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { res= 1; - if (thd->locked_tables) - { - DBUG_ASSERT(table_list->db); /* Must be set in the parser */ - if (find_locked_table(thd, table_list->db, table_list->table_name)) - { - my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), - table_list->table_name); - DBUG_RETURN(TRUE); - } - } if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error) { /* @@ -1460,6 +1506,12 @@ public: } }; +/** + delayed_insert - context of a thread responsible for delayed insert + into one table. When processing delayed inserts, we create an own + thread for every distinct table. Later on all delayed inserts directed + into that table are handled by a dedicated thread. +*/ class delayed_insert :public ilink { uint locks_in_memory; @@ -1551,6 +1603,12 @@ public: I_List delayed_threads; +/** + Return an instance of delayed insert thread that can handle + inserts into a given table, if it exists. Otherwise return NULL. +*/ + +static delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) { thd->proc_info="waiting for delay_list"; @@ -1571,6 +1629,18 @@ delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) } +/** + Attempt to find or create a delayed insert thread to handle inserts + into this table. + + @return Return an instance of the table in the delayed thread + @retval NULL too many delayed threads OR + this thread ran out of resources OR + a newly created delayed insert thread ran out of resources OR + the delayed insert thread failed to open the table. + In the last three cases an error is set in THD. +*/ + static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) { int error; @@ -1679,11 +1749,17 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) } -/* - As we can't let many threads modify the same TABLE structure, we create - an own structure for each tread. This includes a row buffer to save the - column values and new fields that points to the new row buffer. - The memory is allocated in the client thread and is freed automaticly. +/** + As we can't let many client threads modify the same TABLE + structure of the dedicated delayed insert thread, we create an + own structure for each client thread. This includes a row + buffer to save the column values and new fields that point to + the new row buffer. The memory is allocated in the client + thread and is freed automatically. + + @pre This function is called from the client thread. Delayed + insert thread mutex must be acquired before invoking this + function. */ TABLE *delayed_insert::get_local_table(THD* client_thd)