mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
WL #1034 updates after review
sql/event.cc: - fix TODO (remove things already done) - check the length of the event's name and body during creation and report an error if longer than what can be fit into mysql.event (nothing like non-strict mode here) - report to sql_parse.cc and error when open table failed, otherwise send_ok() was being called and the error have become an warning. - update function documentation a bit - evex_db_find_routine_aux returns 0 and not EVEX_OK sql/event_executor.cc: - CS changes to definitions of the main and worker thread routines - reorder code a bit to prevent crashes because of reading of already freed data -> first wait all events to finish their work, namely all worker threads to finish, and then destroy in-memory structures - more error checking and error reporting at the place of failure. sql/event_priv.h: code simplifying macro sql/event_timed.cc: CS cosmetics
This commit is contained in:
@ -21,7 +21,7 @@
|
||||
|
||||
#define DBUG_FAULTY_THR2
|
||||
|
||||
static uint workers_count;
|
||||
extern ulong thread_created;
|
||||
|
||||
|
||||
pthread_mutex_t LOCK_event_arrays,
|
||||
@ -33,10 +33,8 @@ bool evex_is_running= false;
|
||||
|
||||
ulong opt_event_executor;
|
||||
my_bool event_executor_running_global_var= false;
|
||||
|
||||
extern ulong thread_created;
|
||||
|
||||
static my_bool evex_mutexes_initted= false;
|
||||
static uint workers_count;
|
||||
|
||||
static int
|
||||
evex_load_events_from_db(THD *thd);
|
||||
@ -48,8 +46,11 @@ evex_load_events_from_db(THD *thd);
|
||||
the main thread or not.
|
||||
*/
|
||||
|
||||
pthread_handler_t event_executor_worker(void *arg);
|
||||
pthread_handler_t event_executor_main(void *arg);
|
||||
pthread_handler_t
|
||||
event_executor_worker(void *arg);
|
||||
|
||||
pthread_handler_t
|
||||
event_executor_main(void *arg);
|
||||
|
||||
static
|
||||
void evex_init_mutexes()
|
||||
@ -142,8 +143,8 @@ init_event_thread(THD* thd)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
pthread_handler_t event_executor_main(void *arg)
|
||||
pthread_handler_t
|
||||
event_executor_main(void *arg)
|
||||
{
|
||||
THD *thd; /* needs to be first for thread_stack */
|
||||
ulonglong iter_num= 0;
|
||||
@ -152,13 +153,10 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
DBUG_ENTER("event_executor_main");
|
||||
DBUG_PRINT("event_executor_main", ("EVEX thread started"));
|
||||
|
||||
VOID(pthread_mutex_lock(&LOCK_evex_running));
|
||||
evex_is_running= true;
|
||||
event_executor_running_global_var= opt_event_executor;
|
||||
VOID(pthread_mutex_unlock(&LOCK_evex_running));
|
||||
|
||||
// init memory root
|
||||
init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
|
||||
|
||||
|
||||
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
|
||||
my_thread_init();
|
||||
@ -196,6 +194,15 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
VOID(my_init_dynamic_array(&evex_executing_queue, sizeof(event_timed *), 50, 100));
|
||||
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
|
||||
|
||||
/*
|
||||
eventually manifest that we are running, not to crashe because of
|
||||
usage of non-initialized memory structures.
|
||||
*/
|
||||
VOID(pthread_mutex_lock(&LOCK_evex_running));
|
||||
evex_is_running= true;
|
||||
event_executor_running_global_var= opt_event_executor;
|
||||
VOID(pthread_mutex_unlock(&LOCK_evex_running));
|
||||
|
||||
if (evex_load_events_from_db(thd))
|
||||
goto err;
|
||||
|
||||
@ -208,7 +215,6 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
my_ulonglong cnt;
|
||||
|
||||
DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt));
|
||||
// sql_print_information("[EVEX] External Loop!");
|
||||
thd->proc_info = "Sleeping";
|
||||
my_sleep(1000000);// sleep 1s
|
||||
if (!event_executor_running_global_var)
|
||||
@ -216,16 +222,13 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
time(&now);
|
||||
my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
|
||||
|
||||
|
||||
VOID(pthread_mutex_lock(&LOCK_event_arrays));
|
||||
for (i= 0; (i < evex_executing_queue.elements) && !thd->killed; ++i)
|
||||
{
|
||||
event_timed **p_et=dynamic_element(&evex_executing_queue,i,event_timed**);
|
||||
event_timed *et= *p_et;
|
||||
// sql_print_information("[EVEX] External Loop 2!");
|
||||
|
||||
event_timed *et= *dynamic_element(&evex_executing_queue,i,event_timed**);
|
||||
// printf("%llu\n", TIME_to_ulonglong_datetime(&et->m_execute_at));
|
||||
if (!event_executor_running_global_var)
|
||||
break;// soon we will do only continue (see the code a bit above)
|
||||
break;
|
||||
|
||||
thd->proc_info = "Iterating";
|
||||
THD_CHECK_SENTRY(thd);
|
||||
@ -233,7 +236,7 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
if this is the first event which is after time_now then no
|
||||
more need to iterate over more elements since the array is sorted.
|
||||
*/
|
||||
if (et->m_execute_at.year &&
|
||||
if (et->m_execute_at.year > 1969 &&
|
||||
my_time_compare(&time_now, &et->m_execute_at) == -1)
|
||||
break;
|
||||
|
||||
@ -250,8 +253,7 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
|
||||
{
|
||||
sql_print_error("Problem while trying to create a thread");
|
||||
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
|
||||
goto err; // for now finish execution of the Executor
|
||||
UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
|
||||
}
|
||||
#else
|
||||
event_executor_worker((void *) et);
|
||||
@ -272,12 +274,12 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
j= 0;
|
||||
while (j < i && j < evex_executing_queue.elements)
|
||||
{
|
||||
event_timed **p_et= dynamic_element(&evex_executing_queue, j, event_timed**);
|
||||
event_timed *et= *p_et;
|
||||
event_timed *et= *dynamic_element(&evex_executing_queue, j, event_timed**);
|
||||
if (et->m_flags & EVENT_EXEC_NO_MORE || et->m_status == MYSQL_EVENT_DISABLED)
|
||||
{
|
||||
delete_dynamic_element(&evex_executing_queue, j);
|
||||
DBUG_PRINT("", ("DELETING FROM EXECUTION QUEUE [%s.%s]",et->m_db.str, et->m_name.str));
|
||||
DBUG_PRINT("EVEX main thread", ("DELETING FROM EXECUTION QUEUE [%s.%s]",
|
||||
et->m_db.str, et->m_name.str));
|
||||
// nulling the position, will delete later
|
||||
if (et->m_dropped)
|
||||
{
|
||||
@ -301,29 +303,44 @@ pthread_handler_t event_executor_main(void *arg)
|
||||
);
|
||||
|
||||
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
|
||||
}// while (!thd->killed)
|
||||
}
|
||||
|
||||
err:
|
||||
// First manifest that this thread does not work and then destroy
|
||||
VOID(pthread_mutex_lock(&LOCK_evex_running));
|
||||
evex_is_running= false;
|
||||
VOID(pthread_mutex_unlock(&LOCK_evex_running));
|
||||
|
||||
sql_print_information("Event executor stopping");
|
||||
// LEX_STRINGs reside in the memory root and will be destroyed with it.
|
||||
// Hence no need of delete but only freeing of SP
|
||||
for (i=0; i < events_array.elements; ++i)
|
||||
{
|
||||
event_timed *et= dynamic_element(&events_array, i, event_timed*);
|
||||
et->free_sp();
|
||||
}
|
||||
// TODO Andrey: USE lock here!
|
||||
|
||||
/*
|
||||
TODO: A better will be with a conditional variable
|
||||
*/
|
||||
/*
|
||||
Read workers_count without lock, no need for locking.
|
||||
In the worst case we have to wait 1sec more.
|
||||
*/
|
||||
while (workers_count)
|
||||
my_sleep(1000000);// 1s
|
||||
|
||||
/*
|
||||
LEX_STRINGs reside in the memory root and will be destroyed with it.
|
||||
Hence no need of delete but only freeing of SP
|
||||
*/
|
||||
for (i= 0; i < events_array.elements; ++i)
|
||||
dynamic_element(&events_array, i, event_timed*)->free_sp();
|
||||
|
||||
VOID(pthread_mutex_lock(&LOCK_event_arrays));
|
||||
// No need to use lock here if EVEX is not running but anyway
|
||||
delete_dynamic(&evex_executing_queue);
|
||||
delete_dynamic(&events_array);
|
||||
VOID(pthread_mutex_unlock(&LOCK_evex_running));
|
||||
|
||||
thd->proc_info = "Clearing";
|
||||
DBUG_ASSERT(thd->net.buff != 0);
|
||||
net_end(&thd->net); // destructor will not free it, because we are weird
|
||||
THD_CHECK_SENTRY(thd);
|
||||
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
@ -331,28 +348,6 @@ err:
|
||||
delete thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
/*
|
||||
sleeping some time may help not crash the server. sleeping
|
||||
is done to wait for spawned threads to finish.
|
||||
|
||||
TODO: A better will be with a conditional variable
|
||||
*/
|
||||
{
|
||||
uint tries= 0;
|
||||
while (tries++ < 5)
|
||||
{
|
||||
VOID(pthread_mutex_lock(&LOCK_workers_count));
|
||||
if (!workers_count)
|
||||
{
|
||||
VOID(pthread_mutex_unlock(&LOCK_workers_count));
|
||||
break;
|
||||
}
|
||||
VOID(pthread_mutex_unlock(&LOCK_workers_count));
|
||||
DBUG_PRINT("info", ("Sleep %d", tries));
|
||||
my_sleep(1000000 * tries);// 1s
|
||||
}
|
||||
DBUG_PRINT("info", ("Maybe now it is ok to kill the thread and evex MRoot"));
|
||||
}
|
||||
|
||||
err_no_thd:
|
||||
VOID(pthread_mutex_lock(&LOCK_evex_running));
|
||||
@ -362,30 +357,25 @@ err_no_thd:
|
||||
free_root(&evex_mem_root, MYF(0));
|
||||
sql_print_information("Event executor stopped");
|
||||
|
||||
// shutdown_events();
|
||||
|
||||
my_thread_end();
|
||||
pthread_exit(0);
|
||||
DBUG_RETURN(0); // Can't return anything here
|
||||
DBUG_RETURN(0);// Can't return anything here
|
||||
}
|
||||
|
||||
|
||||
pthread_handler_t event_executor_worker(void *event_void)
|
||||
pthread_handler_t
|
||||
event_executor_worker(void *event_void)
|
||||
{
|
||||
THD *thd; /* needs to be first for thread_stack */
|
||||
List<Item> empty_item_list;
|
||||
event_timed *event = (event_timed *) event_void;
|
||||
MEM_ROOT mem_root;
|
||||
MEM_ROOT worker_mem_root;
|
||||
|
||||
DBUG_ENTER("event_executor_worker");
|
||||
VOID(pthread_mutex_lock(&LOCK_workers_count));
|
||||
++workers_count;
|
||||
VOID(pthread_mutex_unlock(&LOCK_workers_count));
|
||||
|
||||
init_alloc_root(&mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
|
||||
|
||||
//we pass this empty list as parameter to the SP_HEAD of the event
|
||||
empty_item_list.empty();
|
||||
init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
|
||||
|
||||
my_thread_init();
|
||||
|
||||
@ -395,8 +385,10 @@ pthread_handler_t event_executor_worker(void *event_void)
|
||||
goto err_no_thd;
|
||||
}
|
||||
thd->thread_stack = (char*)&thd; // remember where our stack is
|
||||
thd->mem_root= &mem_root;
|
||||
thd->mem_root= &worker_mem_root;
|
||||
|
||||
pthread_detach(pthread_self());
|
||||
|
||||
if (init_event_thread(thd))
|
||||
goto err;
|
||||
|
||||
@ -425,7 +417,7 @@ pthread_handler_t event_executor_worker(void *event_void)
|
||||
my_TIME_to_str(&event->m_execute_at, exec_time);
|
||||
DBUG_PRINT("info", (" EVEX EXECUTING event for event %s.%s [EXPR:%d][EXECUTE_AT:%s]", event->m_db.str, event->m_name.str,(int) event->m_expr, exec_time));
|
||||
sql_print_information(" EVEX EXECUTING event for event %s.%s [EXPR:%d][EXECUTE_AT:%s]", event->m_db.str, event->m_name.str,(int) event->m_expr, exec_time);
|
||||
ret= event->execute(thd, &mem_root);
|
||||
ret= event->execute(thd, &worker_mem_root);
|
||||
sql_print_information(" EVEX EXECUTED event for event %s.%s [EXPR:%d][EXECUTE_AT:%s]. RetCode=%d", event->m_db.str, event->m_name.str,(int) event->m_expr, exec_time, ret);
|
||||
DBUG_PRINT("info", (" EVEX EXECUTED event for event %s.%s [EXPR:%d][EXECUTE_AT:%s]", event->m_db.str, event->m_name.str,(int) event->m_expr, exec_time));
|
||||
}
|
||||
@ -454,8 +446,7 @@ err:
|
||||
|
||||
err_no_thd:
|
||||
|
||||
free_root(&mem_root, MYF(0));
|
||||
// sql_print_information(" Worker thread exiting");
|
||||
free_root(&worker_mem_root, MYF(0));
|
||||
|
||||
VOID(pthread_mutex_lock(&LOCK_workers_count));
|
||||
--workers_count;
|
||||
@ -496,29 +487,38 @@ evex_load_events_from_db(THD *thd)
|
||||
}
|
||||
DBUG_PRINT("evex_load_events_from_db", ("Loading event from row."));
|
||||
|
||||
if (et->load_from_row(&evex_mem_root, table))
|
||||
//error loading!
|
||||
continue;
|
||||
if ((ret= et->load_from_row(&evex_mem_root, table)))
|
||||
{
|
||||
sql_print_error("Error while loading from mysql.event. "
|
||||
"Table probably corrupted");
|
||||
goto end;
|
||||
}
|
||||
|
||||
DBUG_PRINT("evex_load_events_from_db",
|
||||
("Event %s loaded from row. Time to compile", et->m_name.str));
|
||||
|
||||
if (et->compile(thd, &evex_mem_root))
|
||||
//problem during compile
|
||||
continue;
|
||||
if ((ret= et->compile(thd, &evex_mem_root)))
|
||||
{
|
||||
sql_print_error("Error while compiling %s.%s. Aborting load.",
|
||||
et->m_db.str, et->m_name.str);
|
||||
goto end;
|
||||
}
|
||||
// let's find when to be executed
|
||||
et->compute_next_execution_time();
|
||||
|
||||
DBUG_PRINT("evex_load_events_from_db",
|
||||
("Adding %s to the executor list.", et->m_name.str));
|
||||
VOID(push_dynamic(&events_array,(gptr) et));
|
||||
// we always add at the end so the number of elements - 1 is the place
|
||||
// in the buffer
|
||||
/*
|
||||
We always add at the end so the number of elements - 1 is the place
|
||||
in the buffer.
|
||||
DYNAMIC_ARRAY copies the object bit by bit so we have a hollow copy
|
||||
in event_array. We don't need the original therefore we delete it.
|
||||
*/
|
||||
et_copy= dynamic_element(&events_array, events_array.elements - 1,
|
||||
event_timed*);
|
||||
event_timed*);
|
||||
VOID(push_dynamic(&evex_executing_queue,(gptr) &et_copy));
|
||||
et->m_free_sphead_on_delete= false;
|
||||
DBUG_PRINT("info", (""));
|
||||
delete et;
|
||||
}
|
||||
end_read_record(&read_record_info);
|
||||
@ -536,8 +536,7 @@ evex_load_events_from_db(THD *thd)
|
||||
end:
|
||||
close_thread_tables(thd);
|
||||
|
||||
DBUG_PRINT("evex_load_events_from_db",
|
||||
("Events loaded from DB. Status code %d", ret));
|
||||
DBUG_PRINT("info", ("Finishing with status code %d", ret));
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
@ -547,7 +546,8 @@ bool sys_var_event_executor::update(THD *thd, set_var *var)
|
||||
{
|
||||
// here start the thread if not running.
|
||||
VOID(pthread_mutex_lock(&LOCK_evex_running));
|
||||
if ((my_bool) var->save_result.ulong_value && !evex_is_running) {
|
||||
if ((my_bool) var->save_result.ulong_value && !evex_is_running)
|
||||
{
|
||||
VOID(pthread_mutex_unlock(&LOCK_evex_running));
|
||||
init_events();
|
||||
} else
|
||||
|
Reference in New Issue
Block a user