mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Port of cursors to be pushed into 5.0 tree:
- client side part is simple and may be considered stable - server side part now just joggles with THD state to save execution state and has no additional locking wisdom. Lot's of it are to be rewritten.
This commit is contained in:
@ -1105,7 +1105,8 @@ JOIN::exec()
|
||||
(zero_result_cause?zero_result_cause:"No tables used"));
|
||||
else
|
||||
{
|
||||
result->send_fields(fields_list,1);
|
||||
result->send_fields(fields_list,
|
||||
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
|
||||
if (!having || having->val_int())
|
||||
{
|
||||
if (do_send_rows && (procedure ? (procedure->send_row(fields_list) ||
|
||||
@ -1512,12 +1513,45 @@ JOIN::exec()
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
}
|
||||
/* XXX: When can we have here thd->net.report_error not zero? */
|
||||
if (thd->net.report_error)
|
||||
{
|
||||
error= thd->net.report_error;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
curr_join->having= curr_join->tmp_having;
|
||||
thd->proc_info="Sending data";
|
||||
error= thd->net.report_error ||
|
||||
do_select(curr_join, curr_fields_list, NULL, procedure);
|
||||
thd->limit_found_rows= curr_join->send_records;
|
||||
thd->examined_row_count= curr_join->examined_rows;
|
||||
curr_join->fields= curr_fields_list;
|
||||
curr_join->procedure= procedure;
|
||||
|
||||
if (unit == &thd->lex->unit &&
|
||||
(unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) &&
|
||||
thd->cursor && tables != const_tables)
|
||||
{
|
||||
/*
|
||||
We are here if this is JOIN::exec for the last select of the main unit
|
||||
and the client requested to open a cursor.
|
||||
We check that not all tables are constant because this case is not
|
||||
handled by do_select() separately, and this case is not implemented
|
||||
for cursors yet.
|
||||
*/
|
||||
DBUG_ASSERT(error == 0);
|
||||
/*
|
||||
curr_join is used only for reusable joins - that is,
|
||||
to perform SELECT for each outer row (like in subselects).
|
||||
This join is main, so we know for sure that curr_join == join.
|
||||
*/
|
||||
DBUG_ASSERT(curr_join == this);
|
||||
/* Open cursor for the last join sweep */
|
||||
error= thd->cursor->open(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
thd->proc_info="Sending data";
|
||||
error= do_select(curr_join, curr_fields_list, NULL, procedure);
|
||||
thd->limit_found_rows= curr_join->send_records;
|
||||
thd->examined_row_count= curr_join->examined_rows;
|
||||
}
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@ -1566,6 +1600,306 @@ JOIN::cleanup()
|
||||
}
|
||||
|
||||
|
||||
/************************* Cursor ******************************************/
|
||||
|
||||
void
|
||||
Cursor::init_from_thd(THD *thd)
|
||||
{
|
||||
/*
|
||||
We need to save and reset thd->mem_root, otherwise it'll be freed
|
||||
later in mysql_parse.
|
||||
*/
|
||||
mem_root= thd->mem_root;
|
||||
init_sql_alloc(&thd->mem_root,
|
||||
thd->variables.query_alloc_block_size,
|
||||
thd->variables.query_prealloc_size);
|
||||
|
||||
/*
|
||||
The same is true for open tables and lock: save tables and zero THD
|
||||
pointers to prevent table close in close_thread_tables (This is a part
|
||||
of the temporary solution to make cursors work with minimal changes to
|
||||
the current source base).
|
||||
*/
|
||||
derived_tables= thd->derived_tables;
|
||||
open_tables= thd->open_tables;
|
||||
lock= thd->lock;
|
||||
query_id= thd->query_id;
|
||||
free_list= thd->free_list;
|
||||
reset_thd(thd);
|
||||
/*
|
||||
XXX: thd->locked_tables is not changed.
|
||||
What problems can we have with it if cursor is open?
|
||||
*/
|
||||
/*
|
||||
TODO: grab thd->free_list here?
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Cursor::init_thd(THD *thd)
|
||||
{
|
||||
thd->mem_root= mem_root;
|
||||
|
||||
DBUG_ASSERT(thd->derived_tables == 0);
|
||||
thd->derived_tables= derived_tables;
|
||||
|
||||
DBUG_ASSERT(thd->open_tables == 0);
|
||||
thd->open_tables= open_tables;
|
||||
|
||||
DBUG_ASSERT(thd->lock== 0);
|
||||
thd->lock= lock;
|
||||
thd->query_id= query_id;
|
||||
thd->free_list= free_list;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Cursor::reset_thd(THD *thd)
|
||||
{
|
||||
thd->derived_tables= 0;
|
||||
thd->open_tables= 0;
|
||||
thd->lock= 0;
|
||||
thd->free_list= 0;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
Cursor::open(JOIN *join_arg)
|
||||
{
|
||||
join= join_arg;
|
||||
|
||||
THD *thd= join->thd;
|
||||
|
||||
/* First non-constant table */
|
||||
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
|
||||
|
||||
/*
|
||||
Send fields description to the client; server_status is sent
|
||||
in 'EOF' packet, which ends send_fields().
|
||||
*/
|
||||
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
|
||||
join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS);
|
||||
::send_eof(thd);
|
||||
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
|
||||
|
||||
/* Prepare JOIN for reading rows. */
|
||||
|
||||
Next_select_func end_select= join->sort_and_group || join->procedure &&
|
||||
join->procedure->flags & PROC_GROUP ?
|
||||
end_send_group : end_send;
|
||||
|
||||
join->join_tab[join->tables-1].next_select= end_select;
|
||||
join->send_records= 0;
|
||||
join->fetch_limit= join->unit->offset_limit_cnt;
|
||||
|
||||
/* Disable JOIN CACHE as it is not working with cursors yet */
|
||||
for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab)
|
||||
{
|
||||
if (tab->next_select == sub_select_cache)
|
||||
tab->next_select= sub_select;
|
||||
}
|
||||
|
||||
DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0);
|
||||
DBUG_ASSERT(join_tab->not_used_in_distinct == 0);
|
||||
/*
|
||||
null_row is set only if row not found and it's outer join: should never
|
||||
happen for the first table in join_tab list
|
||||
*/
|
||||
DBUG_ASSERT(join_tab->table->null_row == 0);
|
||||
|
||||
return join_tab->read_first_record(join_tab);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
DESCRIPTION
|
||||
Fetch next num_rows rows from the cursor and sent them to the client
|
||||
PRECONDITION:
|
||||
Cursor is open
|
||||
RETURN VALUES:
|
||||
-4 there are more rows, send_eof sent to the client
|
||||
0 no more rows, send_eof was sent to the client, cursor is closed
|
||||
other fatal fetch error, cursor is closed (error is not reported)
|
||||
*/
|
||||
|
||||
int
|
||||
Cursor::fetch(ulong num_rows)
|
||||
{
|
||||
THD *thd= join->thd;
|
||||
JOIN_TAB *join_tab= join->join_tab + join->const_tables;;
|
||||
COND *on_expr= join_tab->on_expr;
|
||||
COND *select_cond= join_tab->select_cond;
|
||||
READ_RECORD *info= &join_tab->read_record;
|
||||
|
||||
int error= 0;
|
||||
|
||||
join->fetch_limit+= num_rows;
|
||||
|
||||
/*
|
||||
Run while there are new rows in the first table;
|
||||
For each row, satisfying ON and WHERE clauses (those parts of them which
|
||||
can be evaluated early), call next_select.
|
||||
*/
|
||||
do
|
||||
{
|
||||
int no_more_rows;
|
||||
|
||||
join->examined_rows++;
|
||||
|
||||
if (thd->killed) /* Aborted by user */
|
||||
{
|
||||
my_error(ER_SERVER_SHUTDOWN,MYF(0));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (on_expr == 0 || on_expr->val_int())
|
||||
{
|
||||
if (select_cond == 0 || select_cond->val_int())
|
||||
{
|
||||
/*
|
||||
TODO: call table->unlock_row() to unlock row failed selection,
|
||||
when this feature will be used.
|
||||
*/
|
||||
error= join_tab->next_select(join, join_tab + 1, 0);
|
||||
DBUG_ASSERT(error <= 0);
|
||||
if (error)
|
||||
{
|
||||
/* real error or LIMIT/FETCH LIMIT worked */
|
||||
if (error == -4)
|
||||
{
|
||||
/*
|
||||
FETCH LIMIT, read ahead one row, and close cursor
|
||||
if there is no more rows XXX: to be fixed to support
|
||||
non-equi-joins!
|
||||
*/
|
||||
if ((no_more_rows= info->read_record(info)))
|
||||
error= no_more_rows > 0 ? -1: 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* read next row; break loop if there was an error */
|
||||
if ((no_more_rows= info->read_record(info)))
|
||||
{
|
||||
if (no_more_rows > 0)
|
||||
error= -1;
|
||||
else
|
||||
{
|
||||
enum { END_OF_RECORDS= 1 };
|
||||
error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (thd->net.report_error == 0);
|
||||
|
||||
if (thd->net.report_error)
|
||||
error= -1;
|
||||
|
||||
switch (error) {
|
||||
/* Fetch limit worked, possibly more rows are there */
|
||||
case -4:
|
||||
if (thd->transaction.all.innobase_tid)
|
||||
ha_release_temporary_latches(thd);
|
||||
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
|
||||
::send_eof(thd);
|
||||
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
|
||||
/* save references to memory, allocated during fetch */
|
||||
mem_root= thd->mem_root;
|
||||
free_list= thd->free_list;
|
||||
break;
|
||||
/* Limit clause worked: this is the same as 'no more rows' */
|
||||
case -3: /* LIMIT clause worked */
|
||||
error= 0;
|
||||
/* fallthrough */
|
||||
case 0: /* No more rows */
|
||||
if (thd->transaction.all.innobase_tid)
|
||||
ha_release_temporary_latches(thd);
|
||||
close();
|
||||
thd->server_status|= SERVER_STATUS_LAST_ROW_SENT;
|
||||
::send_eof(thd);
|
||||
thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT;
|
||||
join= 0;
|
||||
unit= 0;
|
||||
free_items(thd->free_list);
|
||||
thd->free_list= free_list= 0;
|
||||
/*
|
||||
Must be last, as some memory might be allocated for free purposes,
|
||||
like in free_tmp_table() (TODO: fix this issue)
|
||||
*/
|
||||
mem_root= thd->mem_root;
|
||||
free_root(&mem_root, MYF(0));
|
||||
break;
|
||||
default:
|
||||
close();
|
||||
join= 0;
|
||||
unit= 0;
|
||||
free_items(thd->free_list);
|
||||
thd->free_list= free_list= 0;
|
||||
/*
|
||||
Must be last, as some memory might be allocated for free purposes,
|
||||
like in free_tmp_table() (TODO: fix this issue)
|
||||
*/
|
||||
mem_root= thd->mem_root;
|
||||
free_root(&mem_root, MYF(0));
|
||||
break;
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Cursor::close()
|
||||
{
|
||||
THD *thd= join->thd;
|
||||
join->join_free(0);
|
||||
if (unit)
|
||||
{
|
||||
/* In case of UNIONs JOIN is freed inside unit->cleanup() */
|
||||
unit->cleanup();
|
||||
}
|
||||
else
|
||||
{
|
||||
join->cleanup();
|
||||
delete join;
|
||||
}
|
||||
/* XXX: Another hack: closing tables used in the cursor */
|
||||
{
|
||||
DBUG_ASSERT(lock || open_tables || derived_tables);
|
||||
|
||||
TABLE *tmp_open_tables= thd->open_tables;
|
||||
TABLE *tmp_derived_tables= thd->derived_tables;
|
||||
MYSQL_LOCK *tmp_lock= thd->lock;
|
||||
|
||||
thd->open_tables= open_tables;
|
||||
thd->derived_tables= derived_tables;
|
||||
thd->lock= lock;
|
||||
close_thread_tables(thd);
|
||||
|
||||
thd->open_tables= tmp_derived_tables;
|
||||
thd->derived_tables= tmp_derived_tables;
|
||||
thd->lock= tmp_lock;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Cursor::~Cursor()
|
||||
{
|
||||
if (is_open())
|
||||
close();
|
||||
free_items(free_list);
|
||||
/*
|
||||
Must be last, as some memory might be allocated for free purposes,
|
||||
like in free_tmp_table() (TODO: fix this issue)
|
||||
*/
|
||||
free_root(&mem_root, MYF(0));
|
||||
}
|
||||
|
||||
/*********************************************************************/
|
||||
|
||||
|
||||
int
|
||||
mysql_select(THD *thd, Item ***rref_pointer_array,
|
||||
TABLE_LIST *tables, uint wild_num, List<Item> &fields,
|
||||
@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
|
||||
|
||||
join->exec();
|
||||
|
||||
if (thd->cursor && thd->cursor->is_open())
|
||||
{
|
||||
/*
|
||||
A cursor was opened for the last sweep in exec().
|
||||
We are here only if this is mysql_select for top-level SELECT_LEX_UNIT
|
||||
and there were no error.
|
||||
*/
|
||||
free_join= 0;
|
||||
}
|
||||
|
||||
if (thd->lex->describe & DESCRIBE_EXTENDED)
|
||||
{
|
||||
select_lex->where= join->conds_history;
|
||||
@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables,
|
||||
if (having && having->val_int() == 0)
|
||||
send_row=0;
|
||||
}
|
||||
if (!(result->send_fields(fields,1)))
|
||||
if (!(result->send_fields(fields,
|
||||
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)))
|
||||
{
|
||||
if (send_row)
|
||||
{
|
||||
@ -7035,7 +7380,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
|
||||
{
|
||||
int error= 0;
|
||||
JOIN_TAB *join_tab;
|
||||
int (*end_select)(JOIN *, struct st_join_table *,bool);
|
||||
Next_select_func end_select;
|
||||
DBUG_ENTER("do_select");
|
||||
|
||||
join->procedure=procedure;
|
||||
@ -7043,7 +7388,8 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
|
||||
Tell the client how many fields there are in a row
|
||||
*/
|
||||
if (!table)
|
||||
join->result->send_fields(*fields,1);
|
||||
join->result->send_fields(*fields,
|
||||
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
|
||||
else
|
||||
{
|
||||
VOID(table->file->extra(HA_EXTRA_WRITE_CACHE));
|
||||
@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
|
||||
}
|
||||
DBUG_RETURN(-3); // Abort nicely
|
||||
}
|
||||
else if (join->send_records >= join->fetch_limit)
|
||||
{
|
||||
/*
|
||||
There is a server side cursor and all rows for
|
||||
this fetch request are sent.
|
||||
*/
|
||||
DBUG_RETURN(-4);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
|
||||
join->do_send_rows=0;
|
||||
join->unit->select_limit_cnt = HA_POS_ERROR;
|
||||
}
|
||||
else if (join->send_records >= join->fetch_limit)
|
||||
{
|
||||
/*
|
||||
There is a server side cursor and all rows
|
||||
for this fetch request are sent.
|
||||
*/
|
||||
DBUG_RETURN(-4);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
Reference in New Issue
Block a user