From 1fa12cdfb7d580955d664f6a842f7a2ab6363ad4 Mon Sep 17 00:00:00 2001 From: Sergei Petrunia Date: Sun, 6 Mar 2016 23:10:20 +0300 Subject: [PATCH] MDEV-9676: RANGE-type frames for window functions Support RANGE ... CURRENT ROW as frame's first and second bound. --- mysql-test/r/win.result | 108 +++++++++++++ mysql-test/t/win.test | 83 ++++++++++ sql/item.h | 29 ++-- sql/item_buff.cc | 85 ++++++++++ sql/sql_window.cc | 347 ++++++++++++++++++++++++++++++++-------- 5 files changed, 576 insertions(+), 76 deletions(-) diff --git a/mysql-test/r/win.result b/mysql-test/r/win.result index e7900bf8f09..6f79370cf95 100644 --- a/mysql-test/r/win.result +++ b/mysql-test/r/win.result @@ -460,3 +460,111 @@ part_id pk a CNT 1 8 1 3 1 9 1 2 drop table t0, t2; +# +# RANGE-type bounds +# +create table t3 ( +pk int, +val int +); +insert into t3 values +(0, 1), +(1, 1), +(2, 1), +(3, 2), +(4, 2), +(5, 2), +(6, 2); +select +pk, +val, +count(val) over (order by val +range between current row and +current row) +as CNT +from t3; +pk val CNT +0 1 3 +1 1 3 +2 1 3 +3 2 4 +4 2 4 +5 2 4 +6 2 4 +insert into t3 values +(7, 3), +(8, 3); +select +pk, +val, +count(val) over (order by val +range between current row and +current row) +as CNT +from t3; +pk val CNT +0 1 3 +1 1 3 +2 1 3 +3 2 4 +4 2 4 +5 2 4 +6 2 4 +7 3 2 +8 3 2 +drop table t3; +# Now, check with PARTITION BY +create table t4 ( +part_id int, +pk int, +val int +); +insert into t4 values +(1234, 100, 1), +(1234, 101, 1), +(1234, 102, 1), +(1234, 103, 2), +(1234, 104, 2), +(1234, 105, 2), +(1234, 106, 2), +(1234, 107, 3), +(1234, 108, 3), +(5678, 200, 1), +(5678, 201, 1), +(5678, 202, 1), +(5678, 203, 2), +(5678, 204, 2), +(5678, 205, 2), +(5678, 206, 2), +(5678, 207, 3), +(5678, 208, 3); +select +part_id, +pk, +val, +count(val) over (partition by part_id +order by val +range between current row and +current row) +as CNT +from t4; +part_id pk val CNT +1234 100 1 3 +1234 101 1 3 +1234 102 1 3 +1234 103 2 4 +1234 104 2 4 +1234 105 2 4 +1234 106 2 4 +1234 107 3 2 +1234 108 3 2 +5678 200 1 3 +5678 201 1 3 +5678 202 1 3 +5678 203 2 4 +5678 204 2 4 +5678 205 2 4 +5678 206 2 4 +5678 207 3 2 +5678 208 3 2 +drop table t4; diff --git a/mysql-test/t/win.test b/mysql-test/t/win.test index b5dfa20c34b..e92ab912cc1 100644 --- a/mysql-test/t/win.test +++ b/mysql-test/t/win.test @@ -303,3 +303,86 @@ from t2; drop table t0, t2; +--echo # +--echo # RANGE-type bounds +--echo # + +create table t3 ( + pk int, + val int +); + +insert into t3 values +(0, 1), +(1, 1), +(2, 1), +(3, 2), +(4, 2), +(5, 2), +(6, 2); + +select + pk, + val, + count(val) over (order by val + range between current row and + current row) + as CNT +from t3; + +insert into t3 values +(7, 3), +(8, 3); + +select + pk, + val, + count(val) over (order by val + range between current row and + current row) + as CNT +from t3; + +drop table t3; + +--echo # Now, check with PARTITION BY +create table t4 ( + part_id int, + pk int, + val int +); + +insert into t4 values +(1234, 100, 1), +(1234, 101, 1), +(1234, 102, 1), +(1234, 103, 2), +(1234, 104, 2), +(1234, 105, 2), +(1234, 106, 2), +(1234, 107, 3), +(1234, 108, 3), + +(5678, 200, 1), +(5678, 201, 1), +(5678, 202, 1), +(5678, 203, 2), +(5678, 204, 2), +(5678, 205, 2), +(5678, 206, 2), +(5678, 207, 3), +(5678, 208, 3); + +select + part_id, + pk, + val, + count(val) over (partition by part_id + order by val + range between current row and + current row) + as CNT +from t4; + +drop table t4; + diff --git a/sql/item.h b/sql/item.h index 46d09c247fe..d02a3142f23 100644 --- a/sql/item.h +++ b/sql/item.h @@ -4778,17 +4778,10 @@ public: - cmp() method that compares the saved value with the current value of the source item, and if they were not equal saves item's value into the saved value. -*/ -/* - Cached_item_XXX objects are not exactly caches. They do the following: - - Each Cached_item_XXX object has - - its source item - - saved value of the source item - - cmp() method that compares the saved value with the current value of the - source item, and if they were not equal saves item's value into the saved - value. + TODO: add here: + - a way to save the new value w/o comparison + - a way to do less/equal/greater comparison */ class Cached_item :public Sql_alloc @@ -4796,7 +4789,18 @@ class Cached_item :public Sql_alloc public: bool null_value; Cached_item() :null_value(0) {} + /* + Compare the cached value with the source value. If not equal, copy + the source value to the cache. + @return + true - Not equal + false - Equal + */ virtual bool cmp(void)=0; + + /* Compare the cached value with the source value, without copying */ + virtual int cmp_read_only()=0; + virtual ~Cached_item(); /*line -e1509 */ }; @@ -4808,6 +4812,7 @@ class Cached_item_str :public Cached_item public: Cached_item_str(THD *thd, Item *arg); bool cmp(void); + int cmp_read_only(); ~Cached_item_str(); // Deallocate String:s }; @@ -4819,6 +4824,7 @@ class Cached_item_real :public Cached_item public: Cached_item_real(Item *item_par) :item(item_par),value(0.0) {} bool cmp(void); + int cmp_read_only(); }; class Cached_item_int :public Cached_item @@ -4828,6 +4834,7 @@ class Cached_item_int :public Cached_item public: Cached_item_int(Item *item_par) :item(item_par),value(0) {} bool cmp(void); + int cmp_read_only(); }; @@ -4838,6 +4845,7 @@ class Cached_item_decimal :public Cached_item public: Cached_item_decimal(Item *item_par); bool cmp(void); + int cmp_read_only(); }; class Cached_item_field :public Cached_item @@ -4854,6 +4862,7 @@ public: buff= (uchar*) thd_calloc(thd, length= field->pack_length()); } bool cmp(void); + int cmp_read_only(); }; class Item_default_value : public Item_field diff --git a/sql/item_buff.cc b/sql/item_buff.cc index 62c2f76dc2e..3798aad06f0 100644 --- a/sql/item_buff.cc +++ b/sql/item_buff.cc @@ -98,6 +98,25 @@ bool Cached_item_str::cmp(void) return tmp; } + +int Cached_item_str::cmp_read_only() +{ + String *res= item->val_str(&tmp_value); + + if (null_value) + { + if (item->null_value) + return 0; + else + return -1; + } + if (item->null_value) + return 1; + + return sortcmp(&value, res, item->collation.collation); +} + + Cached_item_str::~Cached_item_str() { item=0; // Safety @@ -115,6 +134,23 @@ bool Cached_item_real::cmp(void) return FALSE; } + +int Cached_item_real::cmp_read_only() +{ + double nr= item->val_real(); + if (null_value) + { + if (item->null_value) + return 0; + else + return -1; + } + if (item->null_value) + return 1; + return (nr == value)? 0 : ((nr < value)? 1: -1); +} + + bool Cached_item_int::cmp(void) { longlong nr=item->val_int(); @@ -128,6 +164,22 @@ bool Cached_item_int::cmp(void) } +int Cached_item_int::cmp_read_only() +{ + longlong nr= item->val_int(); + if (null_value) + { + if (item->null_value) + return 0; + else + return -1; + } + if (item->null_value) + return 1; + return (nr == value)? 0 : ((nr < value)? 1: -1); +} + + bool Cached_item_field::cmp(void) { bool tmp= FALSE; // Value is identical @@ -148,6 +200,22 @@ bool Cached_item_field::cmp(void) } +int Cached_item_field::cmp_read_only() +{ + if (null_value) + { + if (field->is_null()) + return 0; + else + return -1; + } + if (field->is_null()) + return 1; + + return field->cmp(buff); +} + + Cached_item_decimal::Cached_item_decimal(Item *it) :item(it) { @@ -174,3 +242,20 @@ bool Cached_item_decimal::cmp() return FALSE; } + +int Cached_item_decimal::cmp_read_only() +{ + my_decimal tmp; + my_decimal *ptmp= item->val_decimal(&tmp); + if (null_value) + { + if (item->null_value) + return 0; + else + return -1; + } + if (item->null_value) + return 1; + return my_decimal_cmp(&value, ptmp); +} + diff --git a/sql/sql_window.cc b/sql/sql_window.cc index 29fa938a2a6..9d21b1fd164 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -221,6 +221,15 @@ public: } protected: bool at_eof() { return (cache_pos == cache_end); } + + uchar *get_last_rowid() + { + if (cache_pos == cache_start) + return NULL; + else + return cache_pos - ref_length; + } + uchar *get_curr_rowid() { return cache_pos; } }; @@ -259,6 +268,18 @@ public: return res; } + bool restore_last_row() + { + uchar *p; + if ((p= get_last_rowid())) + { + int rc= read_record->table->file->ha_rnd_pos(read_record->record, p); + if (!rc) + return true; + } + return false; + } + // todo: should move_to() also read row here? }; @@ -294,6 +315,19 @@ public: return true; return false; } + + int compare_with_cache() + { + List_iterator li(group_fields); + Cached_item *ptr; + int res; + while ((ptr= li++)) + { + if ((res= ptr->cmp_read_only())) + return res; + } + return 0; + } }; @@ -316,7 +350,8 @@ class Frame_cursor : public Sql_alloc { public: virtual void init(THD *thd, READ_RECORD *info, - SQL_I_List *partition_list) + SQL_I_List *partition_list, + SQL_I_List *order_list) {} /* @@ -335,18 +370,178 @@ public: - The callee may move tbl->file and tbl->record[0] to point to some other row. */ - virtual void next_partition(bool first, Item_sum* item)=0; + virtual void pre_next_partition(longlong rownum, Item_sum* item){}; + virtual void next_partition(longlong rownum, Item_sum* item)=0; /* The current row has moved one row forward. Move this frame bound accordingly, and update the value of aggregate function as necessary. */ + virtual void pre_next_row(Item_sum* item){}; virtual void next_row(Item_sum* item)=0; virtual ~Frame_cursor(){} }; +// +// RANGE-type frames +// + +/* + RANGE BETWEEN ... AND CURRENT ROW + + This is a bottom endpoint of RANGE-CURRENT ROW frame. + + It moves ahead of the current_row. It is located just in front of the first + peer of the currrent_row. +*/ + +class Frame_range_current_row_bottom: public Frame_cursor +{ + Table_read_cursor cursor; + Group_bound_tracker peer_tracker; + + bool dont_move; +public: + void init(THD *thd, READ_RECORD *info, + SQL_I_List *partition_list, + SQL_I_List *order_list) + { + cursor.init(info); + peer_tracker.init(thd, order_list); + } + + void pre_next_partition(longlong rownum, Item_sum* item) + { + // Save the value of the current_row + peer_tracker.check_if_next_group(); + if (rownum != 0) + item->add(); // current row is in + } + + void next_partition(longlong rownum, Item_sum* item) + { + walk_till_non_peer(item); + } + + void pre_next_row(Item_sum* item) + { + // Check if our cursor is pointing at a peer of the current row. + // If not, move forward until that becomes true + dont_move= !peer_tracker.check_if_next_group(); + if (!dont_move) + item->add(); + } + // New condition: this now assumes that table's current + // row is pointing to the current_row's position + void next_row(Item_sum* item) + { + // Check if our cursor is pointing at a peer of the current row. + // If not, move forward until that becomes true + if (dont_move) + { + /* + Our current is not a peer of the current row. + No need to move the bound. + */ + return; + } + walk_till_non_peer(item); + } + +private: + void walk_till_non_peer(Item_sum* item) + { + /* + Walk forward until we've met first row that's not a peer of the current + row + */ + while (!cursor.get_next()) + { + if (peer_tracker.compare_with_cache()) + break; + item->add(); + } + } + +}; + + +/* + RANGE BETWEEN CURRENT ROW AND ... + + This is a top endpoint of RANGE-CURRENT ROW frame. + + It moves behind the current_row. It is located right after the first peer of + the current_row. +*/ + +class Frame_range_current_row_top : public Frame_cursor +{ + Group_bound_tracker bound_tracker; + + Table_read_cursor cursor; + Group_bound_tracker peer_tracker; + + bool move; + bool at_partition_start; +public: + void init(THD *thd, READ_RECORD *info, + SQL_I_List *partition_list, + SQL_I_List *order_list) + { + bound_tracker.init(thd, partition_list); + + cursor.init(info); + peer_tracker.init(thd, order_list); + } + + void pre_next_partition(longlong rownum, Item_sum* item) + { + // fetch the value from the first row + peer_tracker.check_if_next_group(); + } + + void next_partition(longlong rownum, Item_sum* item) + { + at_partition_start= true; + cursor.move_to(rownum+1); + } + + void pre_next_row(Item_sum* item) + { + // Check if our current row is pointing to a peer of the current row. + // If not, move forward until that becomes true. + move= peer_tracker.check_if_next_group(); + } + + void next_row(Item_sum* item) + { + bool was_at_partition_start= at_partition_start; + at_partition_start= false; + if (move) + { + if (!was_at_partition_start && + cursor.restore_last_row()) + { + item->remove(); + } + + do + { + if (cursor.get_next()) + return; + if (!peer_tracker.compare_with_cache()) + return; + item->remove(); + } + while (1); + } + } +}; + + ////////////////////////////////////////////////////////////////////////////////// /* UNBOUNDED PRECEDING frame bound @@ -354,7 +549,7 @@ public: class Frame_unbounded_preceding : public Frame_cursor { public: - void next_partition(bool first, Item_sum* item) + void next_partition(longlong rownum, Item_sum* item) { /* UNBOUNDED PRECEDING frame end just stays on the first row. @@ -378,15 +573,16 @@ class Frame_unbounded_following : public Frame_cursor Group_bound_tracker bound_tracker; public: - void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list) + void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list, + SQL_I_List *order_list) { cursor.init(info); bound_tracker.init(thd, partition_list); } - void next_partition(bool first, Item_sum* item) + void next_partition(longlong rownum, Item_sum* item) { - if (first) + if (!rownum) { /* Read the first row */ if (cursor.get_next()) @@ -407,7 +603,7 @@ public: void next_row(Item_sum* item) { - /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't */ + /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */ } }; @@ -436,7 +632,8 @@ public: is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), is_preceding(is_preceding_arg) {} - void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list) + void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list, + SQL_I_List *order_list) { cursor.init(info); cursor_eof= false; @@ -444,14 +641,14 @@ public: bound_tracker.init(thd, partition_list); } - void next_partition(bool first, Item_sum* item) + void next_partition(longlong rownum, Item_sum* item) { cursor_eof= false; at_partition_start= true; at_partition_end= false; if (is_preceding) { - if (!first) + if (rownum != 0) { /* The cursor in "ROWS n PRECEDING" lags behind by n_rows rows. @@ -474,9 +671,10 @@ public: */ n_rows_to_skip= 0; - if (!first && (!is_top_bound || n_rows)) + if ((rownum != 0) && (!is_top_bound || n_rows)) { - // We are positioned at the first row in the partition: + // We are positioned at the first row in the partition anyway + //cursor.restore_cur_row(); if (is_top_bound) // this is frame top endpoint item->remove(); else @@ -485,13 +683,18 @@ public: /* Note: i_end=-1 when this is a top-endpoint "CURRENT ROW" which is implemented as "ROWS 0 FOLLOWING". - */ - longlong i_end= n_rows + (first?1:0)- is_top_bound; + */ + longlong i_end= n_rows + ((rownum==0)?1:0)- is_top_bound; for (longlong i= 0; i < i_end; i++) { if (next_row_intern(item)) break; } + if (i_end == -1) + { + if (!cursor.get_next()) + bound_tracker.check_if_next_group(); + } } } @@ -547,29 +750,49 @@ public: }; -Frame_cursor *get_frame_cursor(Window_frame_bound *bound, bool is_top_bound) +Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) { + Window_frame_bound *bound= is_top_bound? frame->top_bound : + frame->bottom_bound; + if (bound->precedence_type == Window_frame_bound::PRECEDING || - bound->precedence_type == Window_frame_bound::FOLLOWING) + bound->precedence_type == Window_frame_bound::FOLLOWING) { bool is_preceding= (bound->precedence_type == Window_frame_bound::PRECEDING); if (bound->offset == NULL) /* this is UNBOUNDED */ { + /* The following serve both RANGE and ROWS: */ if (is_preceding) return new Frame_unbounded_preceding; else return new Frame_unbounded_following; } - longlong n_rows= bound->offset->val_int(); - return new Frame_n_rows(is_top_bound, is_preceding, n_rows); + if (frame->units == Window_frame::UNITS_ROWS) + { + longlong n_rows= bound->offset->val_int(); + return new Frame_n_rows(is_top_bound, is_preceding, n_rows); + } + else + { + // todo: Frame_range_n_rows here . + DBUG_ASSERT(0); + } } if (bound->precedence_type == Window_frame_bound::CURRENT) { - return new Frame_current_row(is_top_bound); + if (frame->units == Window_frame::UNITS_ROWS) + return new Frame_current_row(is_top_bound); + else + { + if (is_top_bound) + return new Frame_range_current_row_top; + else + return new Frame_range_current_row_bottom; + } } return NULL; } @@ -624,71 +847,63 @@ bool compute_window_func_with_frames(Item_window_func *item_win, sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); Window_frame *window_frame= item_win->window_spec->window_frame; - DBUG_ASSERT(window_frame->units == Window_frame::UNITS_ROWS); - top_bound= get_frame_cursor(window_frame->top_bound, true); - bottom_bound= get_frame_cursor(window_frame->bottom_bound, false); + top_bound= get_frame_cursor(window_frame, true); + bottom_bound= get_frame_cursor(window_frame, false); - top_bound->init(thd, info, &item_win->window_spec->partition_list); - bottom_bound->init(thd, info, &item_win->window_spec->partition_list); + top_bound->init(thd, info, &item_win->window_spec->partition_list, + &item_win->window_spec->order_list); + bottom_bound->init(thd, info, &item_win->window_spec->partition_list, + &item_win->window_spec->order_list); bool is_error= false; - bool first_row= true; + longlong rownum= 0; uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0)); while (true) { - if (first_row) + /* Move the current_row */ + if ((err=info->read_record(info))) + { + break; /* End of file */ + } + bool partition_changed= (item_win->check_partition_bound() > -1)? true: + false; + tbl->file->position(tbl->record[0]); + memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length); + + /* Adjust partition bounds */ + + if (partition_changed || (rownum == 0)) { /* Start the first partition */ sum_func->clear(); - bottom_bound->next_partition(true, sum_func); - top_bound->next_partition(true, sum_func); + bottom_bound->pre_next_partition(rownum, sum_func); + top_bound->pre_next_partition(rownum, sum_func); + /* + We move bottom_bound first, because we want rows to be added into the + aggregate before top_bound attempts to remove them. + */ + bottom_bound->next_partition(rownum, sum_func); + top_bound->next_partition(rownum, sum_func); } else { + bottom_bound->pre_next_row(sum_func); + top_bound->pre_next_row(sum_func); + /* These can write into tbl->record[0] */ bottom_bound->next_row(sum_func); top_bound->next_row(sum_func); } - - if ((err=info->read_record(info))) - { - /* End of file */ - break; - } - - store_record(tbl,record[1]); - bool partition_changed= (item_win->check_partition_bound() > -1)? true: - false; - if (!first_row && partition_changed) - { - sum_func->clear(); - tbl->file->position(tbl->record[0]); - memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length); - /* - Ok, the current row is the first row in the new partition. - We move bottom_bound first, because we want rows to be added into the - aggregate before top_bound attempts to remove them. - */ - bottom_bound->next_partition(false, sum_func); - - /* - The problem is, the above call may have made tbl->record[0] to point to - some other record. - */ - tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); - top_bound->next_partition(false, sum_func); + rownum++; - /* - The same problem again. The above call may have moved table's current - record. We need to make the current row current, so that ha_update_row - call below updates the right row. - */ - tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); - } - first_row= false; - - /* Read the current row and update it */ + /* + The bounds may have made tbl->record[0] to point to some record other + than current_row. This applies to tbl->file's internal state, too. + Fix this by reading the current row again. + */ + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); + store_record(tbl,record[1]); item_win->save_in_field(item_win->result_field, true); err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); if (err && err != HA_ERR_RECORD_IS_THE_SAME)