1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

Implement LEAD and LAG and NTH_VALUE functions

Refactour out (into a copy for now) the logic of Item_sum_hybrid, to
allow for multiple arguments. It does not contain the comparator
members. The result is the class Item_sum_hybrid_simple.

LEAD and LAG make use of this Item to store previous rows in a chache.
It also helps in specifying the field type. Currently LEAD/LAG do not
support default values.

NTH_VALUE behaves identical to LEAD and LAG, except that the starting
position cursor is placed on the top of the frame instead of the current
row.
This commit is contained in:
Vicențiu Ciorbaru
2016-09-22 18:26:55 +02:00
parent 29b227c335
commit 53cf265b3b
9 changed files with 872 additions and 71 deletions

View File

@ -273,7 +273,7 @@ int compare_order_lists(SQL_I_List<ORDER> *part_list1,
return CMP_GT_C;
if (elem2)
return CMP_LT_C;
return CMP_EQ;
return CMP_EQ;
}
@ -686,7 +686,17 @@ public:
if ((res= Table_read_cursor::next()) ||
(res= fetch()))
{
/* TODO(cvicentiu) This does not consider table read failures.
Perhaps assuming end of table like this is fine in that case. */
/* This row is the final row in the table. To maintain semantics
that cursors always point to the last valid row, move back one step,
but mark end_of_partition as true. */
Table_read_cursor::prev();
end_of_partition= true;
return res;
}
if (bound_tracker.compare_with_cache())
{
@ -1886,19 +1896,25 @@ private:
/* A cursor that follows a target cursor. Each time a new row is added,
the window functions are cleared and only have the row at which the target
is point at added to them.
The window functions are cleared if the bounds or the position cursors are
outside computational bounds.
*/
class Frame_positional_cursor : public Frame_cursor
{
public:
Frame_positional_cursor(const Frame_cursor &position_cursor) :
position_cursor(position_cursor), bound(NULL), offset(NULL),
position_cursor(position_cursor), top_bound(NULL),
bottom_bound(NULL), offset(NULL), overflowed(false),
negative_offset(false) {}
Frame_positional_cursor(const Frame_cursor &position_cursor,
const Frame_cursor &bound,
const Frame_cursor &top_bound,
const Frame_cursor &bottom_bound,
Item &offset,
bool negative_offset) :
position_cursor(position_cursor), bound(&bound), offset(&offset),
position_cursor(position_cursor), top_bound(&top_bound),
bottom_bound(&bottom_bound), offset(&offset),
negative_offset(negative_offset) {}
void init(READ_RECORD *info)
@ -1908,35 +1924,26 @@ class Frame_positional_cursor : public Frame_cursor
void pre_next_partition(ha_rows rownum)
{
clear_sum_functions();
/* The offset is dependant on the current row values. We can only get
* it here accurately. When fetching other rows, it changes. */
save_offset_value();
}
void next_partition(ha_rows rownum)
{
ha_rows position= get_current_position();
if (position_is_within_bounds(position))
{
cursor.move_to(position);
cursor.fetch();
add_value_to_items();
}
save_positional_value();
}
void pre_next_row()
{
/* The offset is dependant on the current row values. We can only get
* it here accurately. When fetching other rows, it changes. */
save_offset_value();
}
void next_row()
{
ha_rows position= get_current_position();
if (!position_is_within_bounds(position))
clear_sum_functions();
else
{
cursor.move_to(position_cursor.get_curr_rownum());
cursor.fetch();
add_value_to_items();
}
save_positional_value();
}
ha_rows get_curr_rownum() const
@ -1947,30 +1954,25 @@ class Frame_positional_cursor : public Frame_cursor
private:
/* Check if a our position is within bounds.
* The position is passed as a parameter to avoid recalculating it. */
bool position_is_within_bounds(ha_rows position)
bool position_is_within_bounds()
{
if (!offset)
return !position_cursor.is_outside_computation_bounds();
/* No valid bound to compare to. */
if (position_cursor.is_outside_computation_bounds() ||
bound->is_outside_computation_bounds())
if (overflowed)
return false;
if (negative_offset)
{
if (position_cursor.get_curr_rownum() < position)
return false; /* Overflow below 0. */
if (position < bound->get_curr_rownum()) /* We are over the bound. */
return false;
}
else
{
if (position_cursor.get_curr_rownum() > position)
return false; /* Overflow over MAX_HA_ROWS. */
if (position > bound->get_curr_rownum()) /* We are over the bound. */
return false;
}
/* No valid bound to compare to. */
if (position_cursor.is_outside_computation_bounds() ||
top_bound->is_outside_computation_bounds() ||
bottom_bound->is_outside_computation_bounds())
return false;
/* We are over the bound. */
if (position < top_bound->get_curr_rownum())
return false;
if (position > bottom_bound->get_curr_rownum())
return false;
return true;
}
@ -1978,18 +1980,55 @@ private:
/* Get the current position, accounting for the offset value, if present.
NOTE: This function does not check over/underflow.
*/
ha_rows get_current_position()
void get_current_position()
{
ha_rows position = position_cursor.get_curr_rownum();
position = position_cursor.get_curr_rownum();
overflowed= false;
if (offset)
position += offset->val_int() * (negative_offset ? -1 : 1);
return position;
{
if (offset_value < 0 &&
position + offset_value > position)
{
overflowed= true;
}
if (offset_value > 0 &&
position + offset_value < position)
{
overflowed= true;
}
position += offset_value;
}
}
void save_offset_value()
{
if (offset)
offset_value= offset->val_int() * (negative_offset ? -1 : 1);
else
offset_value= 0;
}
void save_positional_value()
{
get_current_position();
if (!position_is_within_bounds())
clear_sum_functions();
else
{
cursor.move_to(position);
cursor.fetch();
add_value_to_items();
}
}
const Frame_cursor &position_cursor;
const Frame_cursor *bound;
const Frame_cursor *top_bound;
const Frame_cursor *bottom_bound;
Item *offset;
Table_read_cursor cursor;
ha_rows position;
longlong offset_value;
bool overflowed;
bool negative_offset;
};
@ -2107,6 +2146,7 @@ void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
{
Window_spec *spec= window_func->window_spec;
Item_sum *item_sum= window_func->window_func();
DBUG_PRINT("info", ("Get arg count: %d", item_sum->get_arg_count()));
Frame_cursor *fc;
switch (item_sum->sum_func())
{
@ -2135,6 +2175,44 @@ void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
fc->add_sum_func(item_sum);
cursor_manager->add_cursor(fc);
break;
case Item_sum::LEAD_FUNC:
case Item_sum::LAG_FUNC:
{
Frame_cursor *bottom_bound= new Frame_unbounded_following(thd,
spec->partition_list,
spec->order_list);
Frame_cursor *top_bound= new Frame_unbounded_preceding(thd,
spec->partition_list,
spec->order_list);
Frame_cursor *current_row_pos= new Frame_rows_current_row_bottom;
cursor_manager->add_cursor(bottom_bound);
cursor_manager->add_cursor(top_bound);
cursor_manager->add_cursor(current_row_pos);
DBUG_ASSERT(item_sum->fixed);
bool negative_offset= item_sum->sum_func() == Item_sum::LAG_FUNC;
fc= new Frame_positional_cursor(*current_row_pos,
*top_bound, *bottom_bound,
*item_sum->get_arg(1),
negative_offset);
fc->add_sum_func(item_sum);
cursor_manager->add_cursor(fc);
break;
}
case Item_sum::NTH_VALUE_FUNC:
{
Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
cursor_manager->add_cursor(bottom_bound);
cursor_manager->add_cursor(top_bound);
DBUG_ASSERT(item_sum->fixed);
fc= new Frame_positional_cursor(*top_bound,
*top_bound, *bottom_bound,
*item_sum->get_arg(1),
false);
fc->add_sum_func(item_sum);
cursor_manager->add_cursor(fc);
break;
}
default:
fc= new Frame_unbounded_preceding(
thd, spec->partition_list, spec->order_list);