1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

MDEV-9526: Compute Aggregate functions as window functions

- Add temporary code: clone_read_record() clones READ_RECORD structure,
  as long as it is used for reading filesort() result that fits into
  memory.
- Add frame bounds for ROWS-type frames. ROWS n PRECEDING|FOLLOWING,
  ROWS UNBOUNDED PRECEDING|FOLLOWING, CURRENT ROW are supported.
- Add Item_sum_count::remove() which allows "streaming" computation
  of COUNT() over a moving frame.
This commit is contained in:
Sergei Petrunia
2016-02-18 01:25:26 +03:00
parent 0c223a96c1
commit be15858245
8 changed files with 706 additions and 27 deletions

View File

@ -4,7 +4,6 @@
#include "sql_base.h"
#include "sql_window.h"
//TODO: why pass List<Window_spec> by value??
bool
Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
@ -83,6 +82,481 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
}
/*
Do a pass over sorted table and compute window function values.
This function is for handling window functions that can be computed on the
fly. Examples are RANK() and ROW_NUMBER().
*/
bool compute_window_func_values(Item_window_func *item_win,
TABLE *tbl, READ_RECORD *info)
{
int err;
while (!(err=info->read_record(info)))
{
store_record(tbl,record[1]);
/*
This will cause window function to compute its value for the
current row :
*/
item_win->advance_window();
/*
Put the new value into temptable's field
TODO: Should this use item_win->update_field() call?
Regular aggegate function implementations seem to implement it.
*/
item_win->save_in_field(item_win->result_field, true);
err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
if (err && err != HA_ERR_RECORD_IS_THE_SAME)
return true;
}
return false;
}
/////////////////////////////////////////////////////////////////////////////
// Window Frames support
/////////////////////////////////////////////////////////////////////////////
// note: make rr_from_pointers static again when not need it here anymore
int rr_from_pointers(READ_RECORD *info);
/*
A temporary way to clone READ_RECORD structures until Monty provides the real
one.
*/
bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
{
DBUG_ASSERT(src->table->sort.record_pointers);
DBUG_ASSERT(src->read_record == rr_from_pointers);
memcpy(dst, src, sizeof(READ_RECORD));
return false;
}
/* A wrapper around test_if_group_changed */
class Group_bound_tracker
{
List<Cached_item> group_fields;
public:
void init(THD *thd, SQL_I_List<ORDER> *list)
{
for (ORDER *curr = list->first; curr; curr=curr->next)
{
Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
group_fields.push_back(tmp);
}
}
bool check_if_next_group()
{
if (test_if_group_changed(group_fields) > -1)
return true;
return false;
}
};
/*
Window frame bound cursor. Abstract interface.
@detail
The cursor moves within the partition that the current row is in.
It may be ahead or behind the current row.
The cursor also assumes that the current row moves forward through the
partition and will move to the next adjacent partition after this one.
@todo
- if we want to allocate this on the MEM_ROOT we should make sure
it is not re-allocated for every subquery execution.
*/
class Frame_cursor : public Sql_alloc
{
public:
virtual void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list)
{}
/*
Current row has entered the new partition (this also includes the first
partition).
//TODO: can we also say "tbl->record[0] is the first row in the partition?"
// TODO2: and if yes, will this function keep it so? (NO)
Position the frame bound accordingly.
*/
virtual void next_partition(bool first, Item_sum* item)=0;
/*
The current row has moved one row forward.
Move this frame bound accordingly, and update the value of aggregate
function as necessary.
*/
virtual void next_row(Item_sum* item)=0;
virtual ~Frame_cursor(){}
};
/*
UNBOUNDED (PRECEDING|FOLLOWING) frame bound
*/
class Frame_unbounded : public Frame_cursor
{
const bool is_start;
const bool is_preceding;
READ_RECORD cursor;
Group_bound_tracker bound_tracker;
public:
Frame_unbounded(bool is_start_arg, bool is_preceding_arg) :
is_start(is_start_arg), is_preceding(is_preceding_arg)
{}
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
{
if (!is_preceding)
{
// Cursor is only needed by UNBOUNDED FOLLOWING
clone_read_record(info, &cursor);
bound_tracker.init(thd, partition_list);
}
}
void next_partition(bool first, Item_sum* item)
{
if (is_preceding)
{
/*
UNBOUNDED PRECEDING frame end just stays on the first row.
We are start of the frame, so we don't need to update the sum function.
*/
}
else
{
/*
UNBOUNDED FOLLOWING should reach the end of the partition and stay
there.
*/
if (first)
{
/* Read the first row */
if (cursor.read_record(&cursor))
return;
}
/* Remember what partition we are in */
bound_tracker.check_if_next_group();
item->add();
while (!cursor.read_record(&cursor))
{
if (bound_tracker.check_if_next_group())
break;
item->add();
}
}
}
void next_row(Item_sum* item)
{
/* Do nothing, UNBOUNDED PRECEDING|FOLLOWING frame ends don't move. */
}
};
/*
ROWS $n (PRECEDING|FOLLOWING) frame bound.
*/
class Frame_n_rows : public Frame_cursor
{
/* Whether this start of frame or end of rame */
const bool is_start;
const ha_rows n_rows;
const bool is_preceding;
ha_rows n_rows_to_skip;
READ_RECORD cursor;
bool cursor_eof;
Group_bound_tracker bound_tracker;
bool at_partition_start;
bool at_partition_end;
public:
Frame_n_rows(bool is_start_arg, bool is_preceding_arg, ha_rows n_rows_arg) :
is_start(is_start_arg), n_rows(n_rows_arg), is_preceding(is_preceding_arg)
{}
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
{
clone_read_record(info, &cursor);
cursor_eof= false;
at_partition_start= true;
bound_tracker.init(thd, partition_list);
}
void next_partition(bool first, Item_sum* item)
{
cursor_eof= false;
at_partition_start= true;
at_partition_end= false;
if (is_preceding)
{
if (!first)
{
/*
The cursor in "ROWS n PRECEDING" lags behind by n_rows rows.
Catch up.
*/
while (!(cursor_eof= (0 != cursor.read_record(&cursor))))
{
if (bound_tracker.check_if_next_group())
break;
}
}
n_rows_to_skip= n_rows - (is_start? 0:1);
}
else
{
/*
"ROWS n FOLLOWING" is already at the first row in the next partition.
Move it to be n_rows ahead.
*/
n_rows_to_skip= 0;
if (!first && (!is_start || n_rows))
{
// We are positioned at the first row in the partition:
if (is_start) // this is frame start endpoint
item->remove();
else
item->add();
}
/*
Note: i_end=-1 when this is a start-endpoint "CURRENT ROW" which is
implemented as "ROWS 0 FOLLOWING".
*/
longlong i_end= n_rows + (first?1:0)- is_start;
for (longlong i= 0; i < i_end; i++)
{
if (next_row_intern(item))
break;
}
}
}
void next_row(Item_sum* item)
{
if (n_rows_to_skip)
{
n_rows_to_skip--;
return;
}
if (at_partition_end)
return;
next_row_intern(item);
}
private:
bool next_row_intern(Item_sum *item)
{
if (!cursor_eof)
{
if (!(cursor_eof= (0 != cursor.read_record(&cursor))))
{
bool new_group= bound_tracker.check_if_next_group();
if (at_partition_start || !new_group)
{
if (is_start) // this is frame start endpoint
item->remove();
else
item->add();
at_partition_start= false;
return false; /* Action done */
}
else
{
at_partition_end= true;
return true;
}
}
}
return true; /* Action not done */
}
};
/* CURRENT ROW is the same as "ROWS 0 FOLLOWING" */
class Frame_current_row : public Frame_n_rows
{
public:
Frame_current_row(bool is_start_arg) :
Frame_n_rows(is_start_arg, false /*is_preceding*/, ha_rows(0))
{}
};
Frame_cursor *get_frame_cursor(Window_frame_bound *bound, bool is_start_bound)
{
if (bound->precedence_type == Window_frame_bound::PRECEDING ||
bound->precedence_type == Window_frame_bound::FOLLOWING)
{
bool is_preceding= (bound->precedence_type ==
Window_frame_bound::PRECEDING);
if (bound->offset == NULL)
{
return new Frame_unbounded(is_start_bound, is_preceding);
}
longlong n_rows= bound->offset->val_int();
return new Frame_n_rows(is_start_bound, is_preceding, n_rows);
}
if (bound->precedence_type == Window_frame_bound::CURRENT)
{
return new Frame_current_row(is_start_bound);
}
return NULL;
}
/*
Streamed window function computation with window frames.
We make a single pass over the ordered temp.table, but we're using three
cursors:
- current row - the row that we're computing window func value for)
- start_bound - the start of the frame
- end_bound - the end of the frame
All three cursors move together.
@todo
Provided bounds have their 'cursors'... is it better to re-clone their
cursors or re-position them onto the current row?
@detail
ROWS BETWEEN 3 PRECEDING -- frame start
AND 3 FOLLOWING -- frame end
/------ frame end (aka BOTTOM)
Dataset start |
--------====*=======[*]========*========-------->> dataset end
| \
| +-------- current row
|
\-------- frame start ("TOP")
- frame_end moves forward and adds rows into the aggregate function.
- frame_start follows behind and removes rows from the aggregate function.
- current_row is the row where the value of aggregate function is stored.
@TODO: Only the first cursor needs to check for run-out-of-partition
condition (Others can catch up by counting rows?)
*/
bool compute_window_func_with_frames(Item_window_func *item_win,
TABLE *tbl, READ_RECORD *info)
{
THD *thd= current_thd;
int err= 0;
Frame_cursor *start_bound;
Frame_cursor *end_bound;
Item_sum *sum_func= item_win->window_func;
/* This algorithm doesn't support DISTINCT aggregator */
sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
Window_frame *window_frame= item_win->window_spec->window_frame;
DBUG_ASSERT(window_frame->units == Window_frame::UNITS_ROWS);
start_bound= get_frame_cursor(window_frame->top_bound, true);
end_bound= get_frame_cursor(window_frame->bottom_bound, false);
start_bound->init(thd, info, &item_win->window_spec->partition_list);
end_bound->init(thd, info, &item_win->window_spec->partition_list);
bool is_error= false;
bool first_row= true;
uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
while (true)
{
if (first_row)
{
/* Start the first partition */
sum_func->clear();
end_bound->next_partition(true, sum_func);
start_bound->next_partition(true, sum_func);
}
else
{
/* These can write into tbl->record[0] */
end_bound->next_row(sum_func);
start_bound->next_row(sum_func);
}
if ((err=info->read_record(info)))
{
/* End of file */
break;
}
store_record(tbl,record[1]);
bool partition_changed= (item_win->check_partition_bound() > -1)? true:
false;
if (!first_row && partition_changed)
{
sum_func->clear();
tbl->file->position(tbl->record[0]);
memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
/*
Ok, the current row is the first row in the new partition.
We move end_bound first, because we want rows to be added into the
aggregate before start_bound attempts to remove them.
*/
end_bound->next_partition(false, sum_func);
/*
The problem is, the above call may have made tbl->record[0] to point to
some other record.
*/
tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
start_bound->next_partition(false, sum_func);
/*
The same problem again. The above call may have moved table's current
record. We need to make the current row current, so that ha_update_row
call below updates the right row.
*/
tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
}
first_row= false;
/* Read the current row and update it */
item_win->save_in_field(item_win->result_field, true);
err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
if (err && err != HA_ERR_RECORD_IS_THE_SAME)
{
is_error= true;
break;
}
}
my_free(rowid_buf);
delete start_bound;
delete end_bound;
return is_error? true: false;
}
/*
@brief
This function is called by JOIN::exec to compute window function values
@ -264,30 +738,34 @@ bool JOIN::process_window_functions(List<Item> *curr_fields_list)
TABLE *tbl= join_tab[top_join_tab_count].table;
if (init_read_record(&info, thd, tbl, select, 0, 1, FALSE))
return true;
bool is_error= false;
item_win->setup_partition_border_check(thd);
int err;
while (!(err=info.read_record(&info)))
Item_sum::Sumfunctype type= item_win->window_func->sum_func();
if (type == Item_sum::ROW_NUMBER_FUNC ||
type == Item_sum::RANK_FUNC ||
type == Item_sum::DENSE_RANK_FUNC)
{
store_record(tbl,record[1]);
/*
This will cause window function to compute its value for the
current row :
One-pass window function computation, walk through the rows and
assign values.
*/
item_win->advance_window();
/*
Put the new value into temptable's field
TODO: Should this use item_win->update_field() call?
Regular aggegate function implementations seem to implement it.
*/
item_win->save_in_field(item_win->result_field, true);
err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
if (err && err != HA_ERR_RECORD_IS_THE_SAME)
return true;
if (compute_window_func_values(item_win, tbl, &info))
is_error= true;
}
else if (type == Item_sum::COUNT_FUNC)
{
/*
Frame-aware window function computation. It does one pass, but
uses three cursors -frame_start, current_row, and frame_end.
*/
if (compute_window_func_with_frames(item_win, tbl, &info))
is_error= true;
}
else
DBUG_ASSERT(0);
item_win->set_read_value_from_result_field();
/* This calls filesort_free_buffers(): */
end_read_record(&info);
@ -295,6 +773,9 @@ bool JOIN::process_window_functions(List<Item> *curr_fields_list)
delete join_tab[top_join_tab_count].filesort;
join_tab[top_join_tab_count].filesort= NULL;
free_io_cache(tbl);
if (is_error)
return true;
}
}
}