diff --git a/sql/event.cc b/sql/event.cc index f8bfde44dde..362af209f9a 100644 --- a/sql/event.cc +++ b/sql/event.cc @@ -23,8 +23,6 @@ - The default value of created/modified should not be 0000-00-00 because of STRICT mode restricions. - - Use timestamps instead of datetime. - - CREATE EVENT should not go into binary log! Does it now? The SQL statements issued by the EVENT are replicated. I have an idea how to solve the problem at failover. So the status field @@ -43,13 +41,14 @@ - Maybe move all allocations during parsing to evex_mem_root thus saving double parsing in evex_create_event! - - If the server is killed (stopping) try to kill executing events.. + - If the server is killed (stopping) try to kill executing events? - What happens if one renames an event in the DB while it is in memory? Or even deleting it? - Consider using conditional variable when doing shutdown instead of - waiting till all worker threads end. + waiting till all worker threads end. + - Make event_timed::get_show_create_event() work - Add function documentation whenever needed. @@ -58,10 +57,6 @@ - Move comparison code to class event_timed - - Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY. - This will skip copy operation as well as will simplify the code which is - now aware of events_array DYNAMIC_ARRAY - Warning: - For now parallel execution is not possible because the same sp_head cannot be executed few times!!! There is still no lock attached to particular event. @@ -72,7 +67,6 @@ Warning: bool mysql_event_table_exists= 1; -DYNAMIC_ARRAY events_array; QUEUE EVEX_EQ_NAME; MEM_ROOT evex_mem_root; @@ -81,55 +75,13 @@ MEM_ROOT evex_mem_root; void evex_queue_init(EVEX_QUEUE_TYPE *queue) { -#ifndef EVEX_USE_QUEUE - VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100)); -#else if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 100 /*auto_extent*/)) sql_print_error("Insufficient memory to initialize executing queue."); -#endif } -int -evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element) -{ -#ifndef EVEX_USE_QUEUE - VOID(push_dynamic(queue, element)); - return 0; -#else - return queue_insert_safe(queue, element); -#endif -} - -void -evex_queue_top_updated(EVEX_QUEUE_TYPE *queue) -{ -#ifdef EVEX_USE_QUEUE - queue_replaced(queue); -#endif -} - -void -evex_queue_sort(EVEX_QUEUE_TYPE *queue) -{ -#ifndef EVEX_USE_QUEUE - qsort((gptr) dynamic_element(queue, 0, event_timed**), - queue->elements, - sizeof(event_timed **), - (qsort_cmp) event_timed_compare); -#endif -} - -/* NOTE Andrey: Document better - Compares two TIME structures. - - a > b -> 1 - a = b -> 0 - a < b -> -1 -*/ - static int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) { @@ -714,7 +666,7 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) { int ret= 0; MEM_ROOT *tmp_mem_root; - event_timed *ett, *ett_copy; + event_timed *ett; DBUG_ENTER("db_load_and_compile_event"); DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str)); @@ -737,18 +689,12 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) if (use_lock) VOID(pthread_mutex_lock(&LOCK_event_arrays)); - VOID(push_dynamic(&events_array,(gptr) ett)); - ett_copy= dynamic_element(&events_array, events_array.elements - 1, - event_timed*); - - evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy); + evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett); /* There is a copy in the array which we don't need. sphead won't be destroyed. */ - ett->free_sphead_on_delete= false; - delete ett; if (use_lock) VOID(pthread_mutex_unlock(&LOCK_event_arrays)); @@ -783,43 +729,14 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock) if (!sortcmp_lex_string(*name, et->name, system_charset_info) && !sortcmp_lex_string(*db, et->dbname, system_charset_info)) { - int idx= get_index_dynamic(&events_array, (gptr) et); - //we are lucky the event is in the executing queue, no need of second pass - //destruct first and then remove. the destructor will delete sp_head et->free_sp(); - delete_dynamic_element(&events_array, idx); + delete et; evex_queue_delete_element(&EVEX_EQ_NAME, i); // ok, we have cleaned goto done; } } - /* - ToDo Andrey : Think about whether second pass is needed. All events - that are in memory are enabled. If an event is being - disabled (by a SQL stmt) it will be uncached. Hmm... - However is this true for events that has been - disabled because of another reason like - no need - to be executed because ENDS is in the past? - For instance, second_pass is needed when an event - was created as DISABLED but then altered as ENABLED. - */ - /* - we haven't found the event in the executing queue. This is nice! :) - Look for it in the events_array. - */ - for (i= 0; i < events_array.elements; ++i) - { - event_timed *et= dynamic_element(&events_array, i, event_timed*); - - if (!sortcmp_lex_string(*name, et->name, system_charset_info) && - !sortcmp_lex_string(*db, et->dbname, system_charset_info)) - { - delete_dynamic_element(&events_array, i); - break; - } - } - done: if (use_lock) VOID(pthread_mutex_unlock(&LOCK_event_arrays)); diff --git a/sql/event_executor.cc b/sql/event_executor.cc index ea5ed6c059b..530625c8b03 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -160,6 +160,7 @@ event_executor_main(void *arg) THD *thd; /* needs to be first for thread_stack */ ulonglong iter_num= 0; uint i=0, j=0; + my_ulonglong cnt= 0; DBUG_ENTER("event_executor_main"); DBUG_PRINT("event_executor_main", ("EVEX thread started")); @@ -194,24 +195,16 @@ event_executor_main(void *arg) thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - DBUG_PRINT("EVEX main thread", ("Initing events_array")); - - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - /* - my_malloc is used as underlying allocator which does not use a mem_root - thus data should be freed at later stage. - */ - VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100)); - - evex_queue_init(&EVEX_EQ_NAME); - - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_PRINT("EVEX main thread", ("Initing events_queuey")); /* eventually manifest that we are running, not to crashe because of usage of non-initialized memory structures. */ VOID(pthread_mutex_lock(&LOCK_evex_running)); + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + evex_queue_init(&EVEX_EQ_NAME); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); evex_is_running= true; event_executor_running_global_var= opt_event_executor; VOID(pthread_mutex_unlock(&LOCK_evex_running)); @@ -222,15 +215,16 @@ event_executor_main(void *arg) THD_CHECK_SENTRY(thd); /* Read queries from the IO/THREAD until this thread is killed */ evex_main_thread_id= thd->thread_id; - + sql_print_information("Scheduler thread started"); while (!thd->killed) { TIME time_now; my_time_t now; - my_ulonglong cnt; event_timed *et; - DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt)); + cnt++; + DBUG_PRINT("info", ("EVEX External Loop %d", cnt)); + if (cnt > 1000) continue; thd->proc_info = "Sleeping"; if (!evex_queue_num_elements(EVEX_EQ_NAME) || !event_executor_running_global_var) @@ -326,18 +320,12 @@ event_executor_main(void *arg) } if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) { - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top if (et->dropped) - { - // we have to drop the event - int idx; et->drop(thd); - idx= get_index_dynamic(&events_array, (gptr) et); - DBUG_ASSERT(idx != -1); - delete_dynamic_element(&events_array, idx); - } + delete et; + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top } else - evex_queue_first_updated(&EVEX_EQ_NAME); + evex_queue_first_updated(&EVEX_EQ_NAME); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); }// while @@ -365,15 +353,15 @@ err: LEX_STRINGs reside in the memory root and will be destroyed with it. Hence no need of delete but only freeing of SP */ - for (i= 0; i < events_array.elements; ++i) - dynamic_element(&events_array, i, event_timed*)->free_sp(); - - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - // No need to use lock here if EVEX is not running but anyway - delete_queue(&executing_queue); + // First we free all objects ... + for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) + { + event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); + et->free_sp(); + delete et; + } + // ... then we can thras the whole queue at once evex_queue_destroy(&EVEX_EQ_NAME); - delete_dynamic(&events_array); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); thd->proc_info = "Clearing"; DBUG_ASSERT(thd->net.buff != 0); @@ -529,7 +517,7 @@ evex_load_events_from_db(THD *thd) init_read_record(&read_record_info, thd, table ,NULL,1,0); while (!(read_record_info.read_record(&read_record_info))) { - event_timed *et, *et_copy; + event_timed *et; if (!(et= new event_timed)) { DBUG_PRINT("evex_load_events_from_db", ("Out of memory")); @@ -564,20 +552,10 @@ evex_load_events_from_db(THD *thd) et->compute_next_execution_time(); DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list.")); - VOID(push_dynamic(&events_array,(gptr) et)); - /* - We always add at the end so the number of elements - 1 is the place - in the buffer. - DYNAMIC_ARRAY copies the object bit by bit so we have a hollow copy - in event_array. We don't need the original therefore we delete it. - */ - et_copy= dynamic_element(&events_array, events_array.elements - 1, - event_timed*); - evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy); - printf("%p %s\n", et_copy, et_copy->name.str); - et->free_sphead_on_delete= false; - delete et; + evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et); + DBUG_PRINT("evex_load_events_from_db", ("%p %*s", + et, et->name.length,et->name.str)); } ret= 0; diff --git a/sql/event_priv.h b/sql/event_priv.h index 1b0ad078f5a..786d65ea94d 100644 --- a/sql/event_priv.h +++ b/sql/event_priv.h @@ -67,52 +67,29 @@ evex_time_diff(TIME *a, TIME *b); #define EXEC_QUEUE_QUEUE_NAME executing_queue #define EXEC_QUEUE_DARR_NAME evex_executing_queue -#ifdef EVEX_USE_QUEUE - #define EVEX_QUEUE_TYPE QUEUE - #define EVEX_PTOQEL byte * - #define EVEX_EQ_NAME executing_queue - #define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue)) - #define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue)) - #define evex_queue_delete_element(queue, idx) queue_remove(queue, idx) - #define evex_queue_destroy(queue) delete_queue(queue) - #define evex_queue_first_updated(queue) queue_replaced(queue) - #define evex_queue_insert(queue, element) queue_insert_safe(queue, element); +#define EVEX_QUEUE_TYPE QUEUE +#define EVEX_PTOQEL byte * -#else - #define EVEX_QUEUE_TYPE DYNAMIC_ARRAY - #define EVEX_PTOQEL gptr - #define EVEX_EQ_NAME evex_executing_queue +#define EVEX_EQ_NAME executing_queue +#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue)) +#define evex_queue_element(queue, idx, __cast) ((__cast)queue_element(queue, idx)) +#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx) +#define evex_queue_destroy(queue) delete_queue(queue) +#define evex_queue_first_updated(queue) queue_replaced(queue) +#define evex_queue_insert(queue, element) queue_insert_safe(queue, element); - #define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast) - #define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx); - #define evex_queue_destroy(queue) delete_dynamic(queue) -/* - push_dynamic() expects ptr to the memory to put in, to make things fast - so when a pointer has to be put inside a ptr-to-ptr is being passed -*/ - #define evex_queue_first_updated(queue) - #define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element)) - - -#endif void evex_queue_init(EVEX_QUEUE_TYPE *queue); -int -evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element); - -void -evex_queue_sort(EVEX_QUEUE_TYPE *queue); - #define evex_queue_num_elements(queue) queue.elements extern bool evex_is_running; extern bool mysql_event_table_exists; -extern DYNAMIC_ARRAY events_array; +//extern DYNAMIC_ARRAY events_array; extern MEM_ROOT evex_mem_root; extern pthread_mutex_t LOCK_event_arrays, LOCK_workers_count, diff --git a/sql/event_timed.cc b/sql/event_timed.cc index 19f42a44ec6..db5e031b09b 100644 --- a/sql/event_timed.cc +++ b/sql/event_timed.cc @@ -939,6 +939,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) sphead->optimize(); ret= 0; done: + delete lex.et; lex_end(&lex); thd->lex= old_lex; thd->query= old_query; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index cd01a0e7181..15dc3bde0a5 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3685,9 +3685,11 @@ end_with_restore_list: res= true; break; } + if (check_access(thd, EVENT_ACL, lex->et->dbname.str, 0, 0, 0, is_schema_db(lex->et->dbname.str))) break; + switch (lex->sql_command) { case SQLCOM_CREATE_EVENT: res= evex_create_event(thd, lex->et, (uint) lex->create_info.options); @@ -5652,6 +5654,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length) delete thd->lex->sphead; thd->lex->sphead= NULL; } + if (thd->lex->et) + { + delete thd->lex->et; + thd->lex->et= NULL; + } } else { @@ -5687,6 +5694,11 @@ void mysql_parse(THD *thd, char *inBuf, uint length) delete thd->lex->sphead; thd->lex->sphead= NULL; } + if (thd->lex->et) + { + delete thd->lex->et; + thd->lex->et= NULL; + } } thd->proc_info="freeing items"; thd->end_statement();