1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-27 18:02:13 +03:00

Make cursor implementation uniform

Cursors now report their current row number as the boundary of the
partition. This is used by Frame_scan_cursor to compute aggregate
functions that do not support removal.
This commit is contained in:
Vicențiu Ciorbaru
2016-09-09 14:46:47 +03:00
parent ffed20c563
commit dfd3be928d
2 changed files with 390 additions and 144 deletions

View File

@ -4,6 +4,7 @@
#include "filesort.h"
#include "sql_base.h"
#include "sql_window.h"
#include "my_dbug.h"
bool
@ -549,29 +550,44 @@ public:
ref_length= info->ref_length;
}
virtual int get_next()
virtual int next()
{
/* Allow multiple get_next() calls in EOF state*/
/* Allow multiple next() calls in EOF state. */
if (cache_pos == cache_end)
return -1;
cache_pos+= ref_length;
DBUG_ASSERT(cache_pos <= cache_end);
return 0;
}
ha_rows get_rownum()
virtual int prev()
{
/* Allow multiple prev() calls when positioned at the start. */
if (cache_pos == cache_start)
return -1;
cache_pos-= ref_length;
DBUG_ASSERT(cache_pos >= cache_start);
return 0;
}
ha_rows get_rownum() const
{
return (cache_pos - cache_start) / ref_length;
}
void move_to(ha_rows row_number)
{
cache_pos= cache_start + row_number * ref_length;
cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
DBUG_ASSERT(cache_pos <= cache_end);
}
protected:
bool at_eof() { return (cache_pos == cache_end); }
uchar *get_last_rowid()
uchar *get_prev_rowid()
{
if (cache_pos == cache_start)
return NULL;
@ -601,30 +617,25 @@ public:
void init(READ_RECORD *info)
{
Rowid_seq_cursor::init(info);
read_record= info;
table= info->table;
record= info->record;
}
virtual int get_next()
virtual int fetch()
{
if (at_eof())
return -1;
uchar* curr_rowid= get_curr_rowid();
int res= Rowid_seq_cursor::get_next();
if (!res)
{
res= read_record->table->file->ha_rnd_pos(read_record->record,
curr_rowid);
}
return res;
return table->file->ha_rnd_pos(record, curr_rowid);
}
bool restore_last_row()
bool fetch_prev_row()
{
uchar *p;
if ((p= get_last_rowid()))
if ((p= get_prev_rowid()))
{
int rc= read_record->table->file->ha_rnd_pos(read_record->record, p);
int rc= table->file->ha_rnd_pos(record, p);
if (!rc)
return true; // restored ok
}
@ -632,11 +643,10 @@ public:
}
private:
/*
Note: we don't own *read_record, somebody else is using it.
We only look at the constant part of it, e.g. table, record buffer, etc.
*/
READ_RECORD *read_record;
/* The table that is acccesed by this cursor. */
TABLE *table;
/* Buffer where to store the table's record data. */
uchar *record;
// TODO(spetrunia): should move_to() also read row here?
};
@ -645,6 +655,8 @@ private:
/*
A cursor which only moves within a partition. The scan stops at the partition
end, and it needs an explicit command to move to the next partition.
This cursor can not move backwards.
*/
class Partition_read_cursor : public Table_read_cursor
@ -669,39 +681,36 @@ public:
void on_next_partition(ha_rows rownum)
{
/* Remember the sort key value from the new partition */
move_to(rownum);
bound_tracker.check_if_next_group();
end_of_partition= false;
}
/*
Moves to a new row. The row is assumed to be within the current partition.
*/
void move_to(ha_rows rownum) { Table_read_cursor::move_to(rownum); }
}
/*
This returns -1 when end of partition was reached.
*/
int get_next()
int next()
{
int res;
if (end_of_partition)
return -1;
if ((res= Table_read_cursor::get_next()))
if ((res= Table_read_cursor::next()) ||
(res= fetch()))
return res;
if (bound_tracker.compare_with_cache())
{
/* This row is part of a new partition, don't move
forward any more untill we get informed of a new partition. */
Table_read_cursor::prev();
end_of_partition= true;
return -1;
}
return 0;
}
bool restore_last_row()
{
return Table_read_cursor::restore_last_row();
}
private:
Group_bound_tracker bound_tracker;
bool end_of_partition;
@ -783,6 +792,8 @@ public:
virtual void pre_next_row() {};
virtual void next_row()=0;
virtual bool is_outside_computation_bounds() const { return false; };
virtual ~Frame_cursor() {}
/*
@ -796,7 +807,7 @@ public:
}
/* Retrieves the row number that this cursor currently points at. */
virtual ha_rows get_curr_rownum()= 0;
virtual ha_rows get_curr_rownum() const= 0;
protected:
inline void add_value_to_items()
@ -962,7 +973,6 @@ public:
void next_partition(ha_rows rownum)
{
cursor.move_to(rownum);
walk_till_non_peer();
}
@ -982,26 +992,39 @@ public:
(prev_row + n) >= R
We need to check about the current row.
*/
if (cursor.restore_last_row())
{
if (order_direction * range_expr->cmp_read_only() <= 0)
return;
remove_value_from_items();
}
walk_till_non_peer();
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
bool is_outside_computation_bounds() const
{
if (end_of_partition)
return true;
return false;
}
private:
void walk_till_non_peer()
{
if (cursor.fetch()) // ERROR
return;
// Current row is not a peer.
if (order_direction * range_expr->cmp_read_only() <= 0)
return;
remove_value_from_items();
int res;
while (!(res= cursor.get_next()))
while (!(res= cursor.next()))
{
/* Note, no need to fetch the value explicitly here. The partition
read cursor will fetch it to check if the partition has changed.
TODO(cvicentiu) make this piece of information not necessary by
reimplementing Partition_read_cursor.
*/
if (order_direction * range_expr->cmp_read_only() <= 0)
break;
remove_value_from_items();
@ -1009,6 +1032,7 @@ private:
if (res)
end_of_partition= true;
}
};
@ -1049,7 +1073,7 @@ public:
SQL_I_List<ORDER> *order_list,
bool is_preceding_arg, Item *n_val_arg) :
cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
is_preceding(is_preceding_arg)
is_preceding(is_preceding_arg), added_values(false)
{
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
@ -1085,6 +1109,7 @@ public:
cursor.on_next_partition(rownum);
end_of_partition= false;
added_values= false;
}
void next_partition(ha_rows rownum)
@ -1109,25 +1134,38 @@ public:
(prev_row + n) >= R
We need to check about the current row.
*/
if (cursor.restore_last_row())
{
if (order_direction * range_expr->cmp_read_only() < 0)
return;
add_value_to_items();
}
walk_till_non_peer();
}
ha_rows get_curr_rownum()
bool is_outside_computation_bounds() const
{
return cursor.get_rownum();
if (!added_values)
return true;
return false;
}
ha_rows get_curr_rownum() const
{
if (end_of_partition)
return cursor.get_rownum(); // Cursor does not pass over partition bound.
else
return cursor.get_rownum() - 1; // Cursor is placed on first non peer.
}
private:
bool added_values;
void walk_till_non_peer()
{
cursor.fetch();
// Current row is not a peer.
if (order_direction * range_expr->cmp_read_only() < 0)
return;
add_value_to_items(); // Add current row.
added_values= true;
int res;
while (!(res= cursor.get_next()))
while (!(res= cursor.next()))
{
if (order_direction * range_expr->cmp_read_only() < 0)
break;
@ -1179,11 +1217,8 @@ public:
// Save the value of the current_row
peer_tracker.check_if_next_group();
cursor.on_next_partition(rownum);
if (rownum != 0)
{
// Add the current row now because our cursor has already seen it
add_value_to_items();
}
// Add the current row now because our cursor has already seen it
add_value_to_items();
}
void next_partition(ha_rows rownum)
@ -1194,8 +1229,6 @@ public:
void pre_next_row()
{
dont_move= !peer_tracker.check_if_next_group();
if (!dont_move)
add_value_to_items();
}
void next_row()
@ -1213,7 +1246,7 @@ public:
walk_till_non_peer();
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@ -1225,10 +1258,14 @@ private:
Walk forward until we've met first row that's not a peer of the current
row
*/
while (!cursor.get_next())
while (!cursor.next())
{
if (peer_tracker.compare_with_cache())
{
cursor.prev(); // Move to our peer.
break;
}
add_value_to_items();
}
}
@ -1278,7 +1315,7 @@ public:
{
// Fetch the value from the first row
peer_tracker.check_if_next_group();
cursor.move_to(rownum+1);
cursor.move_to(rownum);
}
void next_partition(ha_rows rownum) {}
@ -1298,17 +1335,17 @@ public:
Our cursor is pointing at the first row that was a peer of the previous
current row. Or, it was the first row in the partition.
*/
if (cursor.restore_last_row())
{
// todo: need the following check ?
if (!peer_tracker.compare_with_cache())
return;
remove_value_from_items();
}
if (cursor.fetch())
return;
// todo: need the following check ?
if (!peer_tracker.compare_with_cache())
return;
remove_value_from_items();
do
{
if (cursor.get_next())
if (cursor.next() || cursor.fetch())
return;
if (!peer_tracker.compare_with_cache())
return;
@ -1318,7 +1355,7 @@ public:
}
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@ -1357,7 +1394,7 @@ public:
/* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return curr_rownum;
}
@ -1394,16 +1431,12 @@ public:
void next_partition(ha_rows rownum)
{
if (!rownum)
{
/* Read the first row */
if (cursor.get_next())
return;
}
/* Activate the first row */
cursor.fetch();
add_value_to_items();
/* Walk to the end of the partition, updating the SUM function */
while (!cursor.get_next())
while (!cursor.next())
{
add_value_to_items();
}
@ -1414,7 +1447,7 @@ public:
/* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@ -1432,16 +1465,12 @@ public:
void next_partition(ha_rows rownum)
{
ha_rows num_rows_in_partition= 0;
if (!rownum)
{
/* Read the first row */
if (cursor.get_next())
return;
}
if (cursor.fetch())
return;
num_rows_in_partition++;
/* Walk to the end of the partition, find how many rows there are. */
while (!cursor.get_next())
while (!cursor.next())
num_rows_in_partition++;
List_iterator_fast<Item_sum> it(sum_functions);
@ -1454,7 +1483,7 @@ public:
}
}
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@ -1474,12 +1503,12 @@ class Frame_n_rows_preceding : public Frame_cursor
const ha_rows n_rows;
/* Number of rows that we need to skip before our cursor starts moving */
ha_rows n_rows_to_skip;
ha_rows n_rows_behind;
Table_read_cursor cursor;
public:
Frame_n_rows_preceding(bool is_top_bound_arg, ha_rows n_rows_arg) :
is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), n_rows_behind(0)
{}
void init(READ_RECORD *info)
@ -1493,8 +1522,9 @@ public:
Position our cursor to point at the first row in the new partition
(for rownum=0, it is already there, otherwise, it lags behind)
*/
if (rownum != 0)
cursor.move_to(rownum);
cursor.move_to(rownum);
/* Cursor is in the same spot as current row. */
n_rows_behind= 0;
/*
Suppose the bound is ROWS 2 PRECEDING, and current row is row#n:
@ -1508,37 +1538,65 @@ public:
- bottom bound should add row #(n-2) into the window function
- top bound should remove row (#n-3) from the window function.
*/
n_rows_to_skip= n_rows + (is_top_bound? 1:0) - 1;
move_cursor_if_possible();
/* Bottom bound "ROWS 0 PRECEDING" is a special case: */
if (n_rows_to_skip == ha_rows(-1))
{
cursor.get_next();
add_value_to_items();
n_rows_to_skip= 0;
}
}
void next_row()
{
if (n_rows_to_skip)
n_rows_behind++;
move_cursor_if_possible();
}
bool is_outside_computation_bounds() const
{
/* As a bottom boundary, rows have not yet been added. */
if (!is_top_bound && n_rows - n_rows_behind)
return true;
return false;
}
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
private:
void move_cursor_if_possible()
{
int rows_difference= n_rows - n_rows_behind;
if (rows_difference > 0) /* We still have to wait. */
return;
/* The cursor points to the first row in the frame. */
if (rows_difference == 0)
{
n_rows_to_skip--;
if (!is_top_bound)
{
cursor.fetch();
add_value_to_items();
}
/* For top bound we don't have to remove anything as nothing was added. */
return;
}
if (cursor.get_next())
return; // this is not expected to happen.
/* We need to catch up by one row. */
DBUG_ASSERT(rows_difference == -1);
if (is_top_bound) // this is frame start endpoint
if (is_top_bound)
{
cursor.fetch();
remove_value_from_items();
cursor.next();
}
else
{
cursor.next();
cursor.fetch();
add_value_to_items();
}
ha_rows get_curr_rownum()
{
return cursor.get_rownum();
}
/* We've advanced one row. We are no longer behind. */
n_rows_behind--;
}
};
@ -1559,6 +1617,7 @@ public:
void pre_next_partition(ha_rows rownum)
{
add_value_to_items();
curr_rownum= rownum;
}
void next_partition(ha_rows rownum) {}
@ -1574,7 +1633,7 @@ public:
curr_rownum++;
};
ha_rows get_curr_rownum()
ha_rows get_curr_rownum() const
{
return curr_rownum;
}
@ -1643,56 +1702,197 @@ public:
at_partition_end= false;
cursor.on_next_partition(rownum);
if (rownum != 0)
{
// This is only needed for "FOLLOWING 1". It is one row behind
cursor.move_to(rownum+1);
// Current row points at the first row in the partition
if (is_top_bound) // this is frame top endpoint
remove_value_from_items();
else
add_value_to_items();
}
}
/* Move our cursor to be n_rows ahead. */
void next_partition(ha_rows rownum)
{
ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
for (ha_rows i= 0; i < i_end; i++)
{
if (next_row_intern())
break;
}
if (is_top_bound)
next_part_top(rownum);
else
next_part_bottom(rownum);
}
void next_row()
{
if (at_partition_end)
return;
next_row_intern();
if (is_top_bound)
next_row_top();
else
next_row_bottom();
}
ha_rows get_curr_rownum()
bool is_outside_computation_bounds() const
{
/*
The top bound can go over the current partition. In this case,
the sum function has 0 values added to it.
*/
if (at_partition_end && is_top_bound)
return true;
return false;
}
ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
private:
bool next_row_intern()
void next_part_top(ha_rows rownum)
{
if (!cursor.get_next())
for (ha_rows i= 0; i < n_rows; i++)
{
if (is_top_bound) // this is frame start endpoint
remove_value_from_items();
else
add_value_to_items();
if (cursor.fetch())
break;
remove_value_from_items();
if (cursor.next())
at_partition_end= true;
}
else
}
void next_part_bottom(ha_rows rownum)
{
if (cursor.fetch())
return;
add_value_to_items();
for (ha_rows i= 0; i < n_rows; i++)
{
if (cursor.next())
{
at_partition_end= true;
break;
}
add_value_to_items();
}
return;
}
void next_row_top()
{
if (cursor.fetch()) // PART END OR FAILURE
{
at_partition_end= true;
return at_partition_end;
return;
}
remove_value_from_items();
if (cursor.next())
{
at_partition_end= true;
return;
}
}
void next_row_bottom()
{
if (at_partition_end)
return;
if (cursor.next())
{
at_partition_end= true;
return;
}
add_value_to_items();
}
};
/*
A cursor that performs a table scan between two indices. The indices
are provided by the two cursors representing the top and bottom bound
of the window function's frame definition.
Each scan clears the sum function.
NOTE:
The cursor does not alter the top and bottom cursors.
This type of cursor is expensive computational wise. This is only to be
used when the sum functions do not support removal.
*/
class Frame_scan_cursor : public Frame_cursor
{
public:
Frame_scan_cursor(const Frame_cursor &top_bound,
const Frame_cursor &bottom_bound) :
top_bound(top_bound), bottom_bound(bottom_bound) {}
void init(READ_RECORD *info)
{
cursor.init(info);
}
void pre_next_partition(ha_rows rownum)
{
/* TODO(cvicentiu) Sum functions get cleared on next partition anyway during
the window function computation algorithm. Either perform this only in
cursors, or remove it from pre_next_partition.
*/
curr_rownum= rownum;
clear_sum_functions();
}
void next_partition(ha_rows rownum)
{
compute_values_for_current_row();
}
void pre_next_row()
{
clear_sum_functions();
}
void next_row()
{
curr_rownum++;
compute_values_for_current_row();
}
ha_rows get_curr_rownum() const
{
return curr_rownum;
}
private:
const Frame_cursor &top_bound;
const Frame_cursor &bottom_bound;
Table_read_cursor cursor;
ha_rows curr_rownum;
/* Clear all sum functions handled by this cursor. */
void clear_sum_functions()
{
List_iterator_fast<Item_sum> iter_sum_func(sum_functions);
Item_sum *sum_func;
while ((sum_func= iter_sum_func++))
{
sum_func->clear();
}
}
/* Scan the rows between the top bound and bottom bound. Add all the values
between them, top bound row and bottom bound row inclusive. */
void compute_values_for_current_row()
{
if (top_bound.is_outside_computation_bounds() ||
bottom_bound.is_outside_computation_bounds())
return;
ha_rows start_rownum= top_bound.get_curr_rownum();
ha_rows bottom_rownum= bottom_bound.get_curr_rownum();
DBUG_PRINT("info", ("COMPUTING (%llu %llu)", start_rownum, bottom_rownum));
cursor.move_to(start_rownum);
for (ha_rows idx= start_rownum; idx <= bottom_rownum; idx++)
{
if (cursor.fetch()) //EOF
break;
add_value_to_items();
if (cursor.next()) // EOF
break;
}
}
};
@ -1836,6 +2036,20 @@ void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
}
static bool is_computed_with_remove(Item_sum::Sumfunctype sum_func)
{
switch (sum_func)
{
case Item_sum::CUME_DIST_FUNC:
case Item_sum::ROW_NUMBER_FUNC:
case Item_sum::RANK_FUNC:
case Item_sum::DENSE_RANK_FUNC:
case Item_sum::NTILE_FUNC:
return false;
default:
return true;
}
}
/*
Create required frame cursors for the list of window functions.
Register all functions to their appropriate cursors.
@ -1896,6 +2110,17 @@ void get_window_functions_required_cursors(
*/
cursor_manager->add_cursor(frame_bottom);
cursor_manager->add_cursor(frame_top);
if (is_computed_with_remove(sum_func->sum_func()) &&
!sum_func->supports_removal())
{
frame_bottom->set_no_action();
frame_top->set_no_action();
Frame_cursor *scan_cursor= new Frame_scan_cursor(*frame_top,
*frame_bottom);
scan_cursor->add_sum_func(sum_func);
cursor_manager->add_cursor(scan_cursor);
}
cursor_managers->push_back(cursor_manager);
}
}