1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

A fix and a test case for Bug#6513 "Test Suite: Values inserted by using

cursor is interpreted latin1 character and Bug#9819 "Cursors: Mysql Server
Crash while fetching from table with 5 million records."
A fix for a possible memory leak when fetching into an SP cursor
in a long loop.
The patch uses a common implementation of cursors in the binary protocol and 
in stored procedures and implements materialized cursors.
For implementation details, see comments in sql_cursor.cc
This commit is contained in:
konstantin@mysql.com
2005-09-22 02:11:21 +04:00
parent e89c5220b5
commit 6f8d3c4844
30 changed files with 1567 additions and 857 deletions

View File

@ -23,6 +23,7 @@
#include "mysql_priv.h"
#include "sql_select.h"
#include "sql_cursor.h"
#include <m_ctype.h>
#include <hash.h>
@ -107,20 +108,15 @@ static bool const_expression_in_where(COND *conds,Item *item, Item **comp_item);
static bool open_tmp_table(TABLE *table);
static bool create_myisam_tmp_table(TABLE *table,TMP_TABLE_PARAM *param,
ulong options);
static Next_select_func setup_end_select_func(JOIN *join);
static int do_select(JOIN *join,List<Item> *fields,TABLE *tmp_table,
Procedure *proc);
static enum_nested_loop_state
sub_select_cache(JOIN *join, JOIN_TAB *join_tab, bool end_of_records);
static enum_nested_loop_state
evaluate_join_record(JOIN *join, JOIN_TAB *join_tab,
int error, my_bool *report_error);
static enum_nested_loop_state
evaluate_null_complemented_join_record(JOIN *join, JOIN_TAB *join_tab);
static enum_nested_loop_state
sub_select(JOIN *join,JOIN_TAB *join_tab, bool end_of_records);
static enum_nested_loop_state
flush_cached_records(JOIN *join, JOIN_TAB *join_tab, bool skip_last);
static enum_nested_loop_state
end_send(JOIN *join, JOIN_TAB *join_tab, bool end_of_records);
@ -1716,262 +1712,6 @@ JOIN::destroy()
DBUG_RETURN(error);
}
/************************* Cursor ******************************************/
Cursor::Cursor(THD *thd)
:Query_arena(&main_mem_root, INITIALIZED),
join(0), unit(0),
protocol(thd),
close_at_commit(FALSE)
{
/* We will overwrite it at open anyway. */
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
thr_lock_owner_init(&lock_id, &thd->lock_info);
bzero((void*) ht_info, sizeof(ht_info));
}
void
Cursor::init_from_thd(THD *thd)
{
Engine_info *info;
/*
We need to save and reset thd->mem_root, otherwise it'll be freed
later in mysql_parse.
We can't just change the thd->mem_root here as we want to keep the
things that are already allocated in thd->mem_root for Cursor::fetch()
*/
main_mem_root= *thd->mem_root;
state= thd->stmt_arena->state;
/* Allocate new memory root for thd */
init_sql_alloc(thd->mem_root,
thd->variables.query_alloc_block_size,
thd->variables.query_prealloc_size);
/*
The same is true for open tables and lock: save tables and zero THD
pointers to prevent table close in close_thread_tables (This is a part
of the temporary solution to make cursors work with minimal changes to
the current source base).
*/
derived_tables= thd->derived_tables;
open_tables= thd->open_tables;
lock= thd->lock;
query_id= thd->query_id;
free_list= thd->free_list;
change_list= thd->change_list;
reset_thd(thd);
/* Now we have an active cursor and can cause a deadlock */
thd->lock_info.n_cursors++;
close_at_commit= FALSE; /* reset in case we're reusing the cursor */
info= &ht_info[0];
for (handlerton **pht= thd->transaction.stmt.ht; *pht; pht++)
{
const handlerton *ht= *pht;
close_at_commit|= test(ht->flags & HTON_CLOSE_CURSORS_AT_COMMIT);
if (ht->create_cursor_read_view)
{
info->ht= ht;
info->read_view= (ht->create_cursor_read_view)();
++info;
}
}
/*
XXX: thd->locked_tables is not changed.
What problems can we have with it if cursor is open?
TODO: must be fixed because of the prelocked mode.
*/
}
void
Cursor::reset_thd(THD *thd)
{
thd->derived_tables= 0;
thd->open_tables= 0;
thd->lock= 0;
thd->free_list= 0;
thd->change_list.empty();
}
int
Cursor::open(JOIN *join_arg)
{
join= join_arg;
THD *thd= join->thd;
/* First non-constant table */
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
DBUG_ENTER("Cursor::open");
/*
Send fields description to the client; server_status is sent
in 'EOF' packet, which ends send_fields().
*/
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS);
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
/* Prepare JOIN for reading rows. */
join->tmp_table= 0;
join->join_tab[join->tables-1].next_select= setup_end_select_func(join);
join->send_records= 0;
join->fetch_limit= join->unit->offset_limit_cnt;
/* Disable JOIN CACHE as it is not working with cursors yet */
for (JOIN_TAB *tab= join_tab;
tab != join->join_tab + join->tables - 1;
tab++)
{
if (tab->next_select == sub_select_cache)
tab->next_select= sub_select;
}
DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0);
DBUG_ASSERT(join_tab->not_used_in_distinct == 0);
/*
null_row is set only if row not found and it's outer join: should never
happen for the first table in join_tab list
*/
DBUG_ASSERT(join_tab->table->null_row == 0);
DBUG_RETURN(0);
}
/*
DESCRIPTION
Fetch next num_rows rows from the cursor and sent them to the client
PRECONDITION:
Cursor is open
RETURN VALUES:
none, this function will send error or OK to network if necessary.
*/
void
Cursor::fetch(ulong num_rows)
{
THD *thd= join->thd;
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
enum_nested_loop_state error= NESTED_LOOP_OK;
Query_arena backup_arena;
Engine_info *info;
DBUG_ENTER("Cursor::fetch");
DBUG_PRINT("enter",("rows: %lu", num_rows));
DBUG_ASSERT(thd->derived_tables == 0 && thd->open_tables == 0 &&
thd->lock == 0);
thd->derived_tables= derived_tables;
thd->open_tables= open_tables;
thd->lock= lock;
thd->query_id= query_id;
thd->change_list= change_list;
/* save references to memory, allocated during fetch */
thd->set_n_backup_active_arena(this, &backup_arena);
for (info= ht_info; info->read_view ; info++)
(info->ht->set_cursor_read_view)(info->read_view);
join->fetch_limit+= num_rows;
error= sub_select(join, join_tab, 0);
if (error == NESTED_LOOP_OK || error == NESTED_LOOP_NO_MORE_ROWS)
error= sub_select(join,join_tab,1);
if (error == NESTED_LOOP_QUERY_LIMIT)
error= NESTED_LOOP_OK; /* select_limit used */
if (error == NESTED_LOOP_CURSOR_LIMIT)
join->resume_nested_loop= TRUE;
#ifdef USING_TRANSACTIONS
ha_release_temporary_latches(thd);
#endif
/* Grab free_list here to correctly free it in close */
thd->restore_active_arena(this, &backup_arena);
for (info= ht_info; info->read_view; info++)
(info->ht->set_cursor_read_view)(0);
if (error == NESTED_LOOP_CURSOR_LIMIT)
{
/* Fetch limit worked, possibly more rows are there */
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
change_list= thd->change_list;
reset_thd(thd);
}
else
{
close(TRUE);
if (error == NESTED_LOOP_OK)
{
thd->server_status|= SERVER_STATUS_LAST_ROW_SENT;
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT;
}
else if (error != NESTED_LOOP_KILLED)
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
}
DBUG_VOID_RETURN;
}
void
Cursor::close(bool is_active)
{
THD *thd= join->thd;
DBUG_ENTER("Cursor::close");
/*
In case of UNIONs JOIN is freed inside of unit->cleanup(),
otherwise in select_lex->cleanup().
*/
if (unit)
(void) unit->cleanup();
else
(void) join->select_lex->cleanup();
for (Engine_info *info= ht_info; info->read_view; info++)
{
(info->ht->close_cursor_read_view)(info->read_view);
info->read_view= 0;
info->ht= 0;
}
if (is_active)
close_thread_tables(thd);
else
{
/* XXX: Another hack: closing tables used in the cursor */
DBUG_ASSERT(lock || open_tables || derived_tables);
TABLE *tmp_derived_tables= thd->derived_tables;
MYSQL_LOCK *tmp_lock= thd->lock;
thd->open_tables= open_tables;
thd->derived_tables= derived_tables;
thd->lock= lock;
close_thread_tables(thd);
thd->open_tables= tmp_derived_tables;
thd->derived_tables= tmp_derived_tables;
thd->lock= tmp_lock;
}
thd->lock_info.n_cursors--; /* Decrease the number of active cursors */
join= 0;
unit= 0;
free_items();
change_list.empty();
DBUG_VOID_RETURN;
}
/*********************************************************************/
/*
An entry point to single-unit select (a select without UNION).
@ -2051,9 +1791,9 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
}
else
{
if (join->prepare(rref_pointer_array, tables, wild_num,
conds, og_num, order, group, having, proc_param,
select_lex, unit))
if (err= join->prepare(rref_pointer_array, tables, wild_num,
conds, og_num, order, group, having, proc_param,
select_lex, unit))
{
goto err;
}
@ -2068,9 +1808,9 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
DBUG_RETURN(TRUE);
thd->proc_info="init";
thd->used_tables=0; // Updated by setup_fields
if (join->prepare(rref_pointer_array, tables, wild_num,
conds, og_num, order, group, having, proc_param,
select_lex, unit))
if (err= join->prepare(rref_pointer_array, tables, wild_num,
conds, og_num, order, group, having, proc_param,
select_lex, unit))
{
goto err;
}
@ -2112,7 +1852,7 @@ err:
if (free_join)
{
thd->proc_info="end";
err= select_lex->cleanup();
err|= select_lex->cleanup();
DBUG_RETURN(err || thd->net.report_error);
}
DBUG_RETURN(join->error);
@ -5976,7 +5716,7 @@ void JOIN::join_free(bool full)
cleanup(full);
for (unit= select_lex->first_inner_unit(); unit; unit= unit->next_unit())
for (sl= unit->first_select_in_union(); sl; sl= sl->next_select())
for (sl= unit->first_select(); sl; sl= sl->next_select())
{
JOIN *join= sl->join;
if (join)
@ -8122,9 +7862,31 @@ Field *create_tmp_field(THD *thd, TABLE *table,Item *item, Item::Type type,
/*
Create a temp table according to a field list.
Set distinct if duplicates could be removed
Given fields field pointers are changed to point at tmp_table
for send_fields
SYNOPSIS
create_tmp_table()
thd thread handle
param a description used as input to create the table
fields list of items that will be used to define
column types of the table (also see NOTES)
group TODO document
distinct should table rows be distinct
save_sum_fields see NOTES
select_options
rows_limit
table_alias possible name of the temporary table that can be used
for name resolving; can be "".
DESCRIPTION
Given field pointers are changed to point at tmp_table for
send_fields. The table object is self contained: it's
allocated in its own memory root, as well as Field objects
created for table columns.
This function will replace Item_sum items in 'fields' list with
corresponding Item_field items, pointing at the fields in the
temporary table, unless this was prohibited by TRUE
value of argument save_sum_fields. The Item_field objects
are created in THD memory root.
*/
#define STRING_TOTAL_LENGTH_TO_PACK_ROWS 128
@ -8138,6 +7900,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
ulonglong select_options, ha_rows rows_limit,
char *table_alias)
{
MEM_ROOT *mem_root_save, own_root;
TABLE *table;
uint i,field_count,null_count,null_pack_length;
uint hidden_null_count, hidden_null_pack_length, hidden_field_count;
@ -8202,29 +7965,33 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
field_count=param->field_count+param->func_count+param->sum_func_count;
hidden_field_count=param->hidden_field_count;
if (!my_multi_malloc(MYF(MY_WME),
&table,sizeof(*table),
&reg_field, sizeof(Field*)*(field_count+1),
&blob_field, sizeof(uint)*(field_count+1),
&from_field, sizeof(Field*)*field_count,
&copy_func,sizeof(*copy_func)*(param->func_count+1),
&param->keyinfo,sizeof(*param->keyinfo),
&key_part_info,
sizeof(*key_part_info)*(param->group_parts+1),
&param->start_recinfo,
sizeof(*param->recinfo)*(field_count*2+4),
&tmpname,(uint) strlen(path)+1,
&group_buff,group && ! using_unique_constraint ?
param->group_length : 0,
NullS))
init_sql_alloc(&own_root, TABLE_ALLOC_BLOCK_SIZE, 0);
if (!multi_alloc_root(&own_root,
&table, sizeof(*table),
&reg_field, sizeof(Field*) * (field_count+1),
&blob_field, sizeof(uint)*(field_count+1),
&from_field, sizeof(Field*)*field_count,
&copy_func, sizeof(*copy_func)*(param->func_count+1),
&param->keyinfo, sizeof(*param->keyinfo),
&key_part_info,
sizeof(*key_part_info)*(param->group_parts+1),
&param->start_recinfo,
sizeof(*param->recinfo)*(field_count*2+4),
&tmpname, (uint) strlen(path)+1,
&group_buff, group && ! using_unique_constraint ?
param->group_length : 0,
NullS))
{
bitmap_clear_bit(&temp_pool, temp_pool_slot);
DBUG_RETURN(NULL); /* purecov: inspected */
}
if (!(param->copy_field=copy=new Copy_field[field_count]))
/* Copy_field belongs to TMP_TABLE_PARAM, allocate it in THD mem_root */
if (!(param->copy_field= copy= new (thd->mem_root) Copy_field[field_count]))
{
bitmap_clear_bit(&temp_pool, temp_pool_slot);
my_free((gptr) table,MYF(0)); /* purecov: inspected */
free_root(&own_root, MYF(0)); /* purecov: inspected */
DBUG_RETURN(NULL); /* purecov: inspected */
}
param->items_to_copy= copy_func;
@ -8234,6 +8001,11 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
bzero((char*) table,sizeof(*table));
bzero((char*) reg_field,sizeof(Field*)*(field_count+1));
bzero((char*) from_field,sizeof(Field*)*field_count);
table->mem_root= own_root;
mem_root_save= thd->mem_root;
thd->mem_root= &table->mem_root;
table->field=reg_field;
table->alias= table_alias;
table->reginfo.lock_type=TL_WRITE; /* Will be updated */
@ -8318,7 +8090,9 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
string_count++;
string_total_length+= new_field->pack_length();
}
thd->mem_root= mem_root_save;
thd->change_item_tree(argp, new Item_field(new_field));
thd->mem_root= &table->mem_root;
if (!(new_field->flags & NOT_NULL_FLAG))
{
null_count++;
@ -8432,7 +8206,8 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
{
uint alloc_length=ALIGN_SIZE(reclength+MI_UNIQUE_HASH_LENGTH+1);
table->s->rec_buff_length= alloc_length;
if (!(table->record[0]= (byte *) my_malloc(alloc_length*3, MYF(MY_WME))))
if (!(table->record[0]= (byte*)
alloc_root(&table->mem_root, alloc_length*3)))
goto err;
table->record[1]= table->record[0]+alloc_length;
table->s->default_values= table->record[1]+alloc_length;
@ -8618,8 +8393,10 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
table->s->uniques= 1;
}
if (!(key_part_info= (KEY_PART_INFO*)
sql_calloc((keyinfo->key_parts)*sizeof(KEY_PART_INFO))))
alloc_root(&table->mem_root,
keyinfo->key_parts * sizeof(KEY_PART_INFO))))
goto err;
bzero((void*) key_part_info, keyinfo->key_parts * sizeof(KEY_PART_INFO));
table->key_info=keyinfo;
keyinfo->key_part=key_part_info;
keyinfo->flags=HA_NOSAME | HA_NULL_ARE_EQUAL;
@ -8667,10 +8444,15 @@ create_tmp_table(THD *thd,TMP_TABLE_PARAM *param,List<Item> &fields,
if (create_myisam_tmp_table(table,param,select_options))
goto err;
}
if (!open_tmp_table(table))
DBUG_RETURN(table);
if (open_tmp_table(table))
goto err;
err:
thd->mem_root= mem_root_save;
DBUG_RETURN(table);
err:
thd->mem_root= mem_root_save;
free_tmp_table(thd,table); /* purecov: inspected */
bitmap_clear_bit(&temp_pool, temp_pool_slot);
DBUG_RETURN(NULL); /* purecov: inspected */
@ -8815,11 +8597,12 @@ static bool create_myisam_tmp_table(TABLE *table,TMP_TABLE_PARAM *param,
if (table->s->keys)
{ // Get keys for ni_create
bool using_unique_constraint=0;
HA_KEYSEG *seg= (HA_KEYSEG*) sql_calloc(sizeof(*seg) *
keyinfo->key_parts);
HA_KEYSEG *seg= (HA_KEYSEG*) alloc_root(&table->mem_root,
sizeof(*seg) * keyinfo->key_parts);
if (!seg)
goto err;
bzero(seg, sizeof(*seg) * keyinfo->key_parts);
if (keyinfo->key_length >= table->file->max_key_length() ||
keyinfo->key_parts > table->file->max_key_parts() ||
table->s->uniques)
@ -8916,13 +8699,14 @@ static bool create_myisam_tmp_table(TABLE *table,TMP_TABLE_PARAM *param,
void
free_tmp_table(THD *thd, TABLE *entry)
{
MEM_ROOT own_root= entry->mem_root;
const char *save_proc_info;
DBUG_ENTER("free_tmp_table");
DBUG_PRINT("enter",("table: %s",entry->alias));
save_proc_info=thd->proc_info;
thd->proc_info="removing tmp table";
free_blobs(entry);
if (entry->file)
{
if (entry->db_stat)
@ -8943,12 +8727,11 @@ free_tmp_table(THD *thd, TABLE *entry)
/* free blobs */
for (Field **ptr=entry->field ; *ptr ; ptr++)
(*ptr)->free();
my_free((gptr) entry->record[0],MYF(0));
free_io_cache(entry);
bitmap_clear_bit(&temp_pool, entry->temp_pool_slot);
my_free((gptr) entry,MYF(0));
free_root(&own_root, MYF(0)); /* the table is allocated in its own root */
thd->proc_info=save_proc_info;
DBUG_VOID_RETURN;
@ -9063,7 +8846,7 @@ bool create_myisam_from_heap(THD *thd, TABLE *table, TMP_TABLE_PARAM *param,
end_select function to use. This function can't fail.
*/
static Next_select_func setup_end_select_func(JOIN *join)
Next_select_func setup_end_select_func(JOIN *join)
{
TABLE *table= join->tmp_table;
Next_select_func end_select;
@ -9218,7 +9001,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
}
static enum_nested_loop_state
enum_nested_loop_state
sub_select_cache(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
{
enum_nested_loop_state rc;
@ -9359,7 +9142,7 @@ sub_select_cache(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
return one of enum_nested_loop_state, except NESTED_LOOP_NO_MORE_ROWS.
*/
static enum_nested_loop_state
enum_nested_loop_state
sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
{
join_tab->table->null_row=0;
@ -13782,8 +13565,7 @@ bool mysql_explain_union(THD *thd, SELECT_LEX_UNIT *unit, select_result *result)
unit->fake_select_lex->select_number= UINT_MAX; // jost for initialization
unit->fake_select_lex->type= "UNION RESULT";
unit->fake_select_lex->options|= SELECT_DESCRIBE;
if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK | SELECT_DESCRIBE,
"")))
if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK | SELECT_DESCRIBE)))
res= unit->exec();
res|= unit->cleanup();
}