mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Remove handle_delayed_insert_impl().
This commit is contained in:
@@ -2291,201 +2291,6 @@ void kill_delayed_threads(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di)
|
|
||||||
{
|
|
||||||
DBUG_ENTER("handle_delayed_insert_impl");
|
|
||||||
thd->thread_stack= (char*) &thd;
|
|
||||||
if (init_thr_lock() || thd->store_globals())
|
|
||||||
{
|
|
||||||
/* Can't use my_error since store_globals has perhaps failed */
|
|
||||||
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
|
||||||
ER(ER_OUT_OF_RESOURCES), NULL);
|
|
||||||
thd->fatal_error();
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
Open table requires an initialized lex in case the table is
|
|
||||||
partitioned. The .frm file contains a partial SQL string which is
|
|
||||||
parsed using a lex, that depends on initialized thd->lex.
|
|
||||||
*/
|
|
||||||
lex_start(thd);
|
|
||||||
thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
|
|
||||||
/*
|
|
||||||
Statement-based replication of INSERT DELAYED has problems with RAND()
|
|
||||||
and user vars, so in mixed mode we go to row-based.
|
|
||||||
*/
|
|
||||||
thd->lex->set_stmt_unsafe();
|
|
||||||
thd->set_current_stmt_binlog_row_based_if_mixed();
|
|
||||||
|
|
||||||
/* Open table */
|
|
||||||
if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
|
|
||||||
TL_WRITE_DELAYED)))
|
|
||||||
{
|
|
||||||
thd->fatal_error(); // Abort waiting inserts
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
|
|
||||||
{
|
|
||||||
my_error(ER_DELAYED_NOT_SUPPORTED, MYF(ME_FATALERROR),
|
|
||||||
di->table_list.table_name);
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
if (di->table->triggers)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
Table has triggers. This is not an error, but we do
|
|
||||||
not support triggers with delayed insert. Terminate the delayed
|
|
||||||
thread without an error and thus request lock upgrade.
|
|
||||||
*/
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
di->table->copy_blobs=1;
|
|
||||||
|
|
||||||
/* Tell client that the thread is initialized */
|
|
||||||
pthread_cond_signal(&di->cond_client);
|
|
||||||
|
|
||||||
/* Now wait until we get an insert or lock to handle */
|
|
||||||
/* We will not abort as long as a client thread uses this thread */
|
|
||||||
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
if (thd->killed == THD::KILL_CONNECTION)
|
|
||||||
{
|
|
||||||
uint lock_count;
|
|
||||||
/*
|
|
||||||
Remove this from delay insert list so that no one can request a
|
|
||||||
table from this
|
|
||||||
*/
|
|
||||||
pthread_mutex_unlock(&di->mutex);
|
|
||||||
pthread_mutex_lock(&LOCK_delayed_insert);
|
|
||||||
di->unlink();
|
|
||||||
lock_count=di->lock_count();
|
|
||||||
pthread_mutex_unlock(&LOCK_delayed_insert);
|
|
||||||
pthread_mutex_lock(&di->mutex);
|
|
||||||
if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
|
|
||||||
break; // Time to die
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!di->status && !di->stacked_inserts)
|
|
||||||
{
|
|
||||||
struct timespec abstime;
|
|
||||||
set_timespec(abstime, delayed_insert_timeout);
|
|
||||||
|
|
||||||
/* Information for pthread_kill */
|
|
||||||
di->thd.mysys_var->current_mutex= &di->mutex;
|
|
||||||
di->thd.mysys_var->current_cond= &di->cond;
|
|
||||||
thd_proc_info(&(di->thd), "Waiting for INSERT");
|
|
||||||
|
|
||||||
DBUG_PRINT("info",("Waiting for someone to insert rows"));
|
|
||||||
while (!thd->killed)
|
|
||||||
{
|
|
||||||
int error;
|
|
||||||
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
|
|
||||||
error=pthread_cond_wait(&di->cond,&di->mutex);
|
|
||||||
#else
|
|
||||||
error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
|
|
||||||
#ifdef EXTRA_DEBUG
|
|
||||||
if (error && error != EINTR && error != ETIMEDOUT)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
|
|
||||||
DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
|
|
||||||
error));
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
if (thd->killed || di->status)
|
|
||||||
break;
|
|
||||||
if (error == ETIMEDOUT || error == ETIME)
|
|
||||||
{
|
|
||||||
thd->killed= THD::KILL_CONNECTION;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* We can't lock di->mutex and mysys_var->mutex at the same time */
|
|
||||||
pthread_mutex_unlock(&di->mutex);
|
|
||||||
pthread_mutex_lock(&di->thd.mysys_var->mutex);
|
|
||||||
di->thd.mysys_var->current_mutex= 0;
|
|
||||||
di->thd.mysys_var->current_cond= 0;
|
|
||||||
pthread_mutex_unlock(&di->thd.mysys_var->mutex);
|
|
||||||
pthread_mutex_lock(&di->mutex);
|
|
||||||
}
|
|
||||||
thd_proc_info(&(di->thd), 0);
|
|
||||||
|
|
||||||
if (di->tables_in_use && ! thd->lock)
|
|
||||||
{
|
|
||||||
bool not_used;
|
|
||||||
/*
|
|
||||||
Request for new delayed insert.
|
|
||||||
Lock the table, but avoid to be blocked by a global read lock.
|
|
||||||
If we got here while a global read lock exists, then one or more
|
|
||||||
inserts started before the lock was requested. These are allowed
|
|
||||||
to complete their work before the server returns control to the
|
|
||||||
client which requested the global read lock. The delayed insert
|
|
||||||
handler will close the table and finish when the outstanding
|
|
||||||
inserts are done.
|
|
||||||
*/
|
|
||||||
if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
|
|
||||||
MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
|
|
||||||
¬_used)))
|
|
||||||
{
|
|
||||||
/* Fatal error */
|
|
||||||
di->dead= 1;
|
|
||||||
thd->killed= THD::KILL_CONNECTION;
|
|
||||||
}
|
|
||||||
pthread_cond_broadcast(&di->cond_client);
|
|
||||||
}
|
|
||||||
if (di->stacked_inserts)
|
|
||||||
{
|
|
||||||
if (di->handle_inserts())
|
|
||||||
{
|
|
||||||
/* Some fatal error */
|
|
||||||
di->dead= 1;
|
|
||||||
thd->killed= THD::KILL_CONNECTION;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
di->status=0;
|
|
||||||
if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
No one is doing a insert delayed
|
|
||||||
Unlock table so that other threads can use it
|
|
||||||
*/
|
|
||||||
MYSQL_LOCK *lock=thd->lock;
|
|
||||||
thd->lock=0;
|
|
||||||
pthread_mutex_unlock(&di->mutex);
|
|
||||||
/*
|
|
||||||
We need to release next_insert_id before unlocking. This is
|
|
||||||
enforced by handler::ha_external_lock().
|
|
||||||
*/
|
|
||||||
di->table->file->ha_release_auto_increment();
|
|
||||||
mysql_unlock_tables(thd, lock);
|
|
||||||
ha_autocommit_or_rollback(thd, 0);
|
|
||||||
di->group_count=0;
|
|
||||||
pthread_mutex_lock(&di->mutex);
|
|
||||||
}
|
|
||||||
if (di->tables_in_use)
|
|
||||||
pthread_cond_broadcast(&di->cond_client); // If waiting clients
|
|
||||||
}
|
|
||||||
|
|
||||||
err:
|
|
||||||
/*
|
|
||||||
mysql_lock_tables() can potentially start a transaction and write
|
|
||||||
a table map. In the event of an error, that transaction has to be
|
|
||||||
rolled back. We only need to roll back a potential statement
|
|
||||||
transaction, since real transactions are rolled back in
|
|
||||||
close_thread_tables().
|
|
||||||
|
|
||||||
TODO: This is not true any more, table maps are generated on the
|
|
||||||
first call to ha_*_row() instead. Remove code that are used to
|
|
||||||
cover for the case outlined above.
|
|
||||||
*/
|
|
||||||
ha_autocommit_or_rollback(thd, 1);
|
|
||||||
|
|
||||||
DBUG_VOID_RETURN;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create a new delayed insert thread
|
* Create a new delayed insert thread
|
||||||
*/
|
*/
|
||||||
@@ -2518,11 +2323,201 @@ pthread_handler_t handle_delayed_insert(void *arg)
|
|||||||
/* Can't use my_error since store_globals has not yet been called */
|
/* Can't use my_error since store_globals has not yet been called */
|
||||||
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
||||||
ER(ER_OUT_OF_RESOURCES), NULL);
|
ER(ER_OUT_OF_RESOURCES), NULL);
|
||||||
goto end;
|
|
||||||
}
|
}
|
||||||
handle_delayed_insert_impl(thd, di);
|
else
|
||||||
|
{
|
||||||
|
DBUG_ENTER("handle_delayed_insert");
|
||||||
|
thd->thread_stack= (char*) &thd;
|
||||||
|
if (init_thr_lock() || thd->store_globals())
|
||||||
|
{
|
||||||
|
/* Can't use my_error since store_globals has perhaps failed */
|
||||||
|
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
||||||
|
ER(ER_OUT_OF_RESOURCES), NULL);
|
||||||
|
thd->fatal_error();
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Open table requires an initialized lex in case the table is
|
||||||
|
partitioned. The .frm file contains a partial SQL string which is
|
||||||
|
parsed using a lex, that depends on initialized thd->lex.
|
||||||
|
*/
|
||||||
|
lex_start(thd);
|
||||||
|
thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
|
||||||
|
/*
|
||||||
|
Statement-based replication of INSERT DELAYED has problems with RAND()
|
||||||
|
and user vars, so in mixed mode we go to row-based.
|
||||||
|
*/
|
||||||
|
thd->lex->set_stmt_unsafe();
|
||||||
|
thd->set_current_stmt_binlog_row_based_if_mixed();
|
||||||
|
|
||||||
|
/* Open table */
|
||||||
|
if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
|
||||||
|
TL_WRITE_DELAYED)))
|
||||||
|
{
|
||||||
|
thd->fatal_error(); // Abort waiting inserts
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
|
||||||
|
{
|
||||||
|
my_error(ER_DELAYED_NOT_SUPPORTED, MYF(ME_FATALERROR),
|
||||||
|
di->table_list.table_name);
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
if (di->table->triggers)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Table has triggers. This is not an error, but we do
|
||||||
|
not support triggers with delayed insert. Terminate the delayed
|
||||||
|
thread without an error and thus request lock upgrade.
|
||||||
|
*/
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
di->table->copy_blobs=1;
|
||||||
|
|
||||||
|
/* Tell client that the thread is initialized */
|
||||||
|
pthread_cond_signal(&di->cond_client);
|
||||||
|
|
||||||
|
/* Now wait until we get an insert or lock to handle */
|
||||||
|
/* We will not abort as long as a client thread uses this thread */
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
if (thd->killed == THD::KILL_CONNECTION)
|
||||||
|
{
|
||||||
|
uint lock_count;
|
||||||
|
/*
|
||||||
|
Remove this from delay insert list so that no one can request a
|
||||||
|
table from this
|
||||||
|
*/
|
||||||
|
pthread_mutex_unlock(&di->mutex);
|
||||||
|
pthread_mutex_lock(&LOCK_delayed_insert);
|
||||||
|
di->unlink();
|
||||||
|
lock_count=di->lock_count();
|
||||||
|
pthread_mutex_unlock(&LOCK_delayed_insert);
|
||||||
|
pthread_mutex_lock(&di->mutex);
|
||||||
|
if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
|
||||||
|
break; // Time to die
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!di->status && !di->stacked_inserts)
|
||||||
|
{
|
||||||
|
struct timespec abstime;
|
||||||
|
set_timespec(abstime, delayed_insert_timeout);
|
||||||
|
|
||||||
|
/* Information for pthread_kill */
|
||||||
|
di->thd.mysys_var->current_mutex= &di->mutex;
|
||||||
|
di->thd.mysys_var->current_cond= &di->cond;
|
||||||
|
thd_proc_info(&(di->thd), "Waiting for INSERT");
|
||||||
|
|
||||||
|
DBUG_PRINT("info",("Waiting for someone to insert rows"));
|
||||||
|
while (!thd->killed)
|
||||||
|
{
|
||||||
|
int error;
|
||||||
|
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
|
||||||
|
error=pthread_cond_wait(&di->cond,&di->mutex);
|
||||||
|
#else
|
||||||
|
error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
|
||||||
|
#ifdef EXTRA_DEBUG
|
||||||
|
if (error && error != EINTR && error != ETIMEDOUT)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
|
||||||
|
DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
|
||||||
|
error));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
if (thd->killed || di->status)
|
||||||
|
break;
|
||||||
|
if (error == ETIMEDOUT || error == ETIME)
|
||||||
|
{
|
||||||
|
thd->killed= THD::KILL_CONNECTION;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* We can't lock di->mutex and mysys_var->mutex at the same time */
|
||||||
|
pthread_mutex_unlock(&di->mutex);
|
||||||
|
pthread_mutex_lock(&di->thd.mysys_var->mutex);
|
||||||
|
di->thd.mysys_var->current_mutex= 0;
|
||||||
|
di->thd.mysys_var->current_cond= 0;
|
||||||
|
pthread_mutex_unlock(&di->thd.mysys_var->mutex);
|
||||||
|
pthread_mutex_lock(&di->mutex);
|
||||||
|
}
|
||||||
|
thd_proc_info(&(di->thd), 0);
|
||||||
|
|
||||||
|
if (di->tables_in_use && ! thd->lock)
|
||||||
|
{
|
||||||
|
bool not_used;
|
||||||
|
/*
|
||||||
|
Request for new delayed insert.
|
||||||
|
Lock the table, but avoid to be blocked by a global read lock.
|
||||||
|
If we got here while a global read lock exists, then one or more
|
||||||
|
inserts started before the lock was requested. These are allowed
|
||||||
|
to complete their work before the server returns control to the
|
||||||
|
client which requested the global read lock. The delayed insert
|
||||||
|
handler will close the table and finish when the outstanding
|
||||||
|
inserts are done.
|
||||||
|
*/
|
||||||
|
if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
|
||||||
|
MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
|
||||||
|
¬_used)))
|
||||||
|
{
|
||||||
|
/* Fatal error */
|
||||||
|
di->dead= 1;
|
||||||
|
thd->killed= THD::KILL_CONNECTION;
|
||||||
|
}
|
||||||
|
pthread_cond_broadcast(&di->cond_client);
|
||||||
|
}
|
||||||
|
if (di->stacked_inserts)
|
||||||
|
{
|
||||||
|
if (di->handle_inserts())
|
||||||
|
{
|
||||||
|
/* Some fatal error */
|
||||||
|
di->dead= 1;
|
||||||
|
thd->killed= THD::KILL_CONNECTION;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
di->status=0;
|
||||||
|
if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
No one is doing a insert delayed
|
||||||
|
Unlock table so that other threads can use it
|
||||||
|
*/
|
||||||
|
MYSQL_LOCK *lock=thd->lock;
|
||||||
|
thd->lock=0;
|
||||||
|
pthread_mutex_unlock(&di->mutex);
|
||||||
|
/*
|
||||||
|
We need to release next_insert_id before unlocking. This is
|
||||||
|
enforced by handler::ha_external_lock().
|
||||||
|
*/
|
||||||
|
di->table->file->ha_release_auto_increment();
|
||||||
|
mysql_unlock_tables(thd, lock);
|
||||||
|
ha_autocommit_or_rollback(thd, 0);
|
||||||
|
di->group_count=0;
|
||||||
|
pthread_mutex_lock(&di->mutex);
|
||||||
|
}
|
||||||
|
if (di->tables_in_use)
|
||||||
|
pthread_cond_broadcast(&di->cond_client); // If waiting clients
|
||||||
|
}
|
||||||
|
|
||||||
|
err:
|
||||||
|
/*
|
||||||
|
mysql_lock_tables() can potentially start a transaction and write
|
||||||
|
a table map. In the event of an error, that transaction has to be
|
||||||
|
rolled back. We only need to roll back a potential statement
|
||||||
|
transaction, since real transactions are rolled back in
|
||||||
|
close_thread_tables().
|
||||||
|
|
||||||
|
TODO: This is not true any more, table maps are generated on the
|
||||||
|
first call to ha_*_row() instead. Remove code that are used to
|
||||||
|
cover for the case outlined above.
|
||||||
|
*/
|
||||||
|
ha_autocommit_or_rollback(thd, 1);
|
||||||
|
|
||||||
|
DBUG_LEAVE;
|
||||||
|
}
|
||||||
|
|
||||||
end:
|
|
||||||
/*
|
/*
|
||||||
di should be unlinked from the thread handler list and have no active
|
di should be unlinked from the thread handler list and have no active
|
||||||
clients
|
clients
|
||||||
|
Reference in New Issue
Block a user