diff --git a/mysql-test/r/win.result b/mysql-test/r/win.result index f6cc0055c0e..3d770370f65 100644 --- a/mysql-test/r/win.result +++ b/mysql-test/r/win.result @@ -136,3 +136,120 @@ pk a b rank() over (partition by b order by a) dense_rank() over (partition by b 9 4 20 4 2 10 4 20 4 2 drop table t3; +# +# Try Aggregates as window functions. With frames. +# +create table t0 (a int); +insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); +create table t1 (pk int, c int); +insert into t1 select a+1,1 from t0; +update t1 set c=2 where pk not in (1,2,3,4); +select * from t1; +pk c +1 1 +2 1 +3 1 +4 1 +5 2 +6 2 +7 2 +8 2 +9 2 +10 2 +select +pk, c, +count(*) over (partition by c order by pk +rows between 2 preceding and 2 following) as CNT +from t1; +pk c CNT +1 1 3 +2 1 4 +3 1 4 +4 1 3 +5 2 3 +6 2 4 +7 2 5 +8 2 5 +9 2 4 +10 2 3 +select +pk, c, +count(*) over (partition by c order by pk +rows between 1 preceding and 2 following) as CNT +from t1; +pk c CNT +1 1 3 +2 1 4 +3 1 3 +4 1 2 +5 2 3 +6 2 4 +7 2 4 +8 2 4 +9 2 3 +10 2 2 +select +pk, c, +count(*) over (partition by c order by pk +rows between 2 preceding and current row) as CNT +from t1; +pk c CNT +1 1 1 +2 1 2 +3 1 3 +4 1 3 +5 2 1 +6 2 2 +7 2 3 +8 2 3 +9 2 3 +10 2 3 +select +pk,c, +count(*) over (partition by c order by pk rows +between 1 following and 2 following) as CNT +from t1; +pk c CNT +1 1 2 +2 1 2 +3 1 1 +4 1 0 +5 2 2 +6 2 2 +7 2 2 +8 2 2 +9 2 1 +10 2 0 +select +pk,c, +count(*) over (partition by c order by pk rows +between 2 preceding and 1 preceding) as CNT +from t1; +pk c CNT +1 1 0 +2 1 1 +3 1 2 +4 1 2 +5 2 0 +6 2 1 +7 2 2 +8 2 2 +9 2 2 +10 2 2 +select +pk, c, +count(*) over (partition by c order by pk +rows between current row and 1 following) as CNT +from t1; +pk c CNT +1 1 2 +2 1 2 +3 1 2 +4 1 1 +5 2 2 +6 2 2 +7 2 2 +8 2 2 +9 2 2 +10 2 1 +drop table t0,t1; diff --git a/mysql-test/t/win.test b/mysql-test/t/win.test index d7ed2e06f10..bbb706d5421 100644 --- a/mysql-test/t/win.test +++ b/mysql-test/t/win.test @@ -110,3 +110,52 @@ select pk, a, b, rank() over (order by a), dense_rank() over (order by a) from t select pk, a, b, rank() over (partition by b order by a), dense_rank() over (partition by b order by a) from t3; drop table t3; + +--echo # +--echo # Try Aggregates as window functions. With frames. +--echo # +create table t0 (a int); +insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); + +create table t1 (pk int, c int); +insert into t1 select a+1,1 from t0; +update t1 set c=2 where pk not in (1,2,3,4); +select * from t1; + +select + pk, c, + count(*) over (partition by c order by pk + rows between 2 preceding and 2 following) as CNT +from t1; + +select + pk, c, + count(*) over (partition by c order by pk + rows between 1 preceding and 2 following) as CNT +from t1; + +select + pk, c, + count(*) over (partition by c order by pk + rows between 2 preceding and current row) as CNT +from t1; + +select + pk,c, + count(*) over (partition by c order by pk rows + between 1 following and 2 following) as CNT +from t1; + +select + pk,c, + count(*) over (partition by c order by pk rows + between 2 preceding and 1 preceding) as CNT +from t1; + +select + pk, c, + count(*) over (partition by c order by pk + rows between current row and 1 following) as CNT +from t1; +drop table t0,t1; + diff --git a/sql/item_sum.cc b/sql/item_sum.cc index cb4cb802bc5..91abea866b6 100644 --- a/sql/item_sum.cc +++ b/sql/item_sum.cc @@ -1531,6 +1531,18 @@ bool Item_sum_count::add() return 0; } + +/* + Remove a row. This is used by window functions. +*/ + +void Item_sum_count::remove() +{ + /* TODO: Handle DECIMAL type */ + DBUG_ASSERT(aggr->Aggrtype() == Aggregator::SIMPLE_AGGREGATOR); + count--; +} + longlong Item_sum_count::val_int() { DBUG_ASSERT(fixed == 1); diff --git a/sql/item_sum.h b/sql/item_sum.h index e0e74efef28..c01735b00ff 100644 --- a/sql/item_sum.h +++ b/sql/item_sum.h @@ -545,6 +545,9 @@ public: virtual void clear()= 0; virtual bool add()= 0; virtual bool setup(THD *thd) { return false; } + + //psergey: for window functions: + virtual void remove() { DBUG_ASSERT(0); } virtual void cleanup(); bool check_vcol_func_processor(uchar *int_arg) @@ -784,6 +787,8 @@ class Item_sum_count :public Item_sum_int void clear(); bool add(); void cleanup(); + // psergey-added: + void remove(); public: Item_sum_count(THD *thd, Item *item_par): diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc index fbe9a728765..66aaec7280e 100644 --- a/sql/item_windowfunc.cc +++ b/sql/item_windowfunc.cc @@ -18,6 +18,8 @@ Item_window_func::fix_fields(THD *thd, Item **ref) if (window_func->fix_fields(thd, ref)) return TRUE; + max_length= window_func->max_length; + fixed= 1; force_return_blank= true; read_value_from_result_field= false; @@ -88,10 +90,14 @@ bool Item_sum_rank::add() return false; } +int Item_window_func::check_partition_bound() +{ + return test_if_group_changed(partition_fields); +} -void Item_window_func::advance_window() { - - int changed = test_if_group_changed(partition_fields); +void Item_window_func::advance_window() +{ + int changed= check_partition_bound(); if (changed > -1) { diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h index 202b0db406e..5e5d3bf36ed 100644 --- a/sql/item_windowfunc.h +++ b/sql/item_windowfunc.h @@ -275,6 +275,7 @@ class Item_sum_cume_dist: public Item_sum_num class Item_window_func : public Item_result_field { /* Window function parameters as we've got them from the parser */ +public: Item_sum *window_func; LEX_STRING *window_name; public: @@ -301,10 +302,12 @@ public: /* Computation functions. + TODO: consoder merging these with class Group_bound_tracker. */ void setup_partition_border_check(THD *thd); void advance_window(); + int check_partition_bound(); enum_field_types field_type() const { return window_func->field_type(); } enum Item::Type type() const { return Item::WINDOW_FUNC_ITEM; } @@ -367,12 +370,18 @@ public: my_decimal* val_decimal(my_decimal* dec) { if (force_return_blank) + { + my_decimal_set_zero(dec); return dec; + } return read_value_from_result_field? result_field->val_decimal(dec) : window_func->val_decimal(dec); } - void fix_length_and_dec() { } + void fix_length_and_dec() + { + window_func->fix_length_and_dec(); + } const char* func_name() const { return "WF"; } diff --git a/sql/records.cc b/sql/records.cc index e826f2b631a..81de1878ba6 100644 --- a/sql/records.cc +++ b/sql/records.cc @@ -39,7 +39,7 @@ int rr_sequential(READ_RECORD *info); static int rr_from_tempfile(READ_RECORD *info); static int rr_unpack_from_tempfile(READ_RECORD *info); static int rr_unpack_from_buffer(READ_RECORD *info); -static int rr_from_pointers(READ_RECORD *info); +int rr_from_pointers(READ_RECORD *info); static int rr_from_cache(READ_RECORD *info); static int init_rr_cache(THD *thd, READ_RECORD *info); static int rr_cmp(uchar *a,uchar *b); @@ -531,8 +531,8 @@ static int rr_unpack_from_tempfile(READ_RECORD *info) return 0; } - -static int rr_from_pointers(READ_RECORD *info) +//psergey: make this 'static' again: +int rr_from_pointers(READ_RECORD *info) { int tmp; uchar *cache_pos; diff --git a/sql/sql_window.cc b/sql/sql_window.cc index 1aa4ac03947..aa9e3e18a16 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -4,7 +4,6 @@ #include "sql_base.h" #include "sql_window.h" -//TODO: why pass List by value?? bool Window_spec::check_window_names(List_iterator_fast &it) @@ -83,6 +82,481 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, } +/* + Do a pass over sorted table and compute window function values. + + This function is for handling window functions that can be computed on the + fly. Examples are RANK() and ROW_NUMBER(). +*/ +bool compute_window_func_values(Item_window_func *item_win, + TABLE *tbl, READ_RECORD *info) +{ + int err; + while (!(err=info->read_record(info))) + { + store_record(tbl,record[1]); + + /* + This will cause window function to compute its value for the + current row : + */ + item_win->advance_window(); + + /* + Put the new value into temptable's field + TODO: Should this use item_win->update_field() call? + Regular aggegate function implementations seem to implement it. + */ + item_win->save_in_field(item_win->result_field, true); + err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); + if (err && err != HA_ERR_RECORD_IS_THE_SAME) + return true; + } + return false; +} + +///////////////////////////////////////////////////////////////////////////// +// Window Frames support +///////////////////////////////////////////////////////////////////////////// + +// note: make rr_from_pointers static again when not need it here anymore +int rr_from_pointers(READ_RECORD *info); + +/* + A temporary way to clone READ_RECORD structures until Monty provides the real + one. +*/ +bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) +{ + DBUG_ASSERT(src->table->sort.record_pointers); + DBUG_ASSERT(src->read_record == rr_from_pointers); + memcpy(dst, src, sizeof(READ_RECORD)); + return false; +} + + +/* A wrapper around test_if_group_changed */ +class Group_bound_tracker +{ + List group_fields; +public: + void init(THD *thd, SQL_I_List *list) + { + for (ORDER *curr = list->first; curr; curr=curr->next) + { + Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE); + group_fields.push_back(tmp); + } + } + + bool check_if_next_group() + { + if (test_if_group_changed(group_fields) > -1) + return true; + return false; + } +}; + + +/* + Window frame bound cursor. Abstract interface. + + @detail + The cursor moves within the partition that the current row is in. + It may be ahead or behind the current row. + + The cursor also assumes that the current row moves forward through the + partition and will move to the next adjacent partition after this one. + + @todo + - if we want to allocate this on the MEM_ROOT we should make sure + it is not re-allocated for every subquery execution. +*/ + +class Frame_cursor : public Sql_alloc +{ +public: + virtual void init(THD *thd, READ_RECORD *info, + SQL_I_List *partition_list) + {} + + /* + Current row has entered the new partition (this also includes the first + partition). + //TODO: can we also say "tbl->record[0] is the first row in the partition?" + // TODO2: and if yes, will this function keep it so? (NO) + + Position the frame bound accordingly. + */ + virtual void next_partition(bool first, Item_sum* item)=0; + + /* + The current row has moved one row forward. + Move this frame bound accordingly, and update the value of aggregate + function as necessary. + */ + virtual void next_row(Item_sum* item)=0; + + virtual ~Frame_cursor(){} +}; + + +/* + UNBOUNDED (PRECEDING|FOLLOWING) frame bound +*/ + +class Frame_unbounded : public Frame_cursor +{ + const bool is_start; + const bool is_preceding; + + READ_RECORD cursor; + Group_bound_tracker bound_tracker; +public: + Frame_unbounded(bool is_start_arg, bool is_preceding_arg) : + is_start(is_start_arg), is_preceding(is_preceding_arg) + {} + + void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list) + { + if (!is_preceding) + { + // Cursor is only needed by UNBOUNDED FOLLOWING + clone_read_record(info, &cursor); + bound_tracker.init(thd, partition_list); + } + } + + void next_partition(bool first, Item_sum* item) + { + if (is_preceding) + { + /* + UNBOUNDED PRECEDING frame end just stays on the first row. + We are start of the frame, so we don't need to update the sum function. + */ + } + else + { + /* + UNBOUNDED FOLLOWING should reach the end of the partition and stay + there. + */ + if (first) + { + /* Read the first row */ + if (cursor.read_record(&cursor)) + return; + } + /* Remember what partition we are in */ + bound_tracker.check_if_next_group(); + + item->add(); + + while (!cursor.read_record(&cursor)) + { + if (bound_tracker.check_if_next_group()) + break; + item->add(); + } + } + } + + void next_row(Item_sum* item) + { + /* Do nothing, UNBOUNDED PRECEDING|FOLLOWING frame ends don't move. */ + } +}; + + +/* + ROWS $n (PRECEDING|FOLLOWING) frame bound. +*/ + +class Frame_n_rows : public Frame_cursor +{ + /* Whether this start of frame or end of rame */ + const bool is_start; + const ha_rows n_rows; + const bool is_preceding; + + ha_rows n_rows_to_skip; + + READ_RECORD cursor; + bool cursor_eof; + + Group_bound_tracker bound_tracker; + bool at_partition_start; + bool at_partition_end; +public: + Frame_n_rows(bool is_start_arg, bool is_preceding_arg, ha_rows n_rows_arg) : + is_start(is_start_arg), n_rows(n_rows_arg), is_preceding(is_preceding_arg) + {} + + void init(THD *thd, READ_RECORD *info, SQL_I_List *partition_list) + { + clone_read_record(info, &cursor); + cursor_eof= false; + at_partition_start= true; + bound_tracker.init(thd, partition_list); + } + + void next_partition(bool first, Item_sum* item) + { + cursor_eof= false; + at_partition_start= true; + at_partition_end= false; + if (is_preceding) + { + if (!first) + { + /* + The cursor in "ROWS n PRECEDING" lags behind by n_rows rows. + Catch up. + */ + while (!(cursor_eof= (0 != cursor.read_record(&cursor)))) + { + if (bound_tracker.check_if_next_group()) + break; + } + } + n_rows_to_skip= n_rows - (is_start? 0:1); + } + else + { + /* + "ROWS n FOLLOWING" is already at the first row in the next partition. + Move it to be n_rows ahead. + */ + n_rows_to_skip= 0; + + if (!first && (!is_start || n_rows)) + { + // We are positioned at the first row in the partition: + if (is_start) // this is frame start endpoint + item->remove(); + else + item->add(); + } + /* + Note: i_end=-1 when this is a start-endpoint "CURRENT ROW" which is + implemented as "ROWS 0 FOLLOWING". + */ + longlong i_end= n_rows + (first?1:0)- is_start; + for (longlong i= 0; i < i_end; i++) + { + if (next_row_intern(item)) + break; + } + } + } + + void next_row(Item_sum* item) + { + if (n_rows_to_skip) + { + n_rows_to_skip--; + return; + } + if (at_partition_end) + return; + next_row_intern(item); + } + +private: + bool next_row_intern(Item_sum *item) + { + if (!cursor_eof) + { + if (!(cursor_eof= (0 != cursor.read_record(&cursor)))) + { + bool new_group= bound_tracker.check_if_next_group(); + if (at_partition_start || !new_group) + { + if (is_start) // this is frame start endpoint + item->remove(); + else + item->add(); + + at_partition_start= false; + return false; /* Action done */ + } + else + { + at_partition_end= true; + return true; + } + } + } + return true; /* Action not done */ + } +}; + + +/* CURRENT ROW is the same as "ROWS 0 FOLLOWING" */ +class Frame_current_row : public Frame_n_rows +{ +public: + Frame_current_row(bool is_start_arg) : + Frame_n_rows(is_start_arg, false /*is_preceding*/, ha_rows(0)) + {} +}; + + +Frame_cursor *get_frame_cursor(Window_frame_bound *bound, bool is_start_bound) +{ + if (bound->precedence_type == Window_frame_bound::PRECEDING || + bound->precedence_type == Window_frame_bound::FOLLOWING) + { + bool is_preceding= (bound->precedence_type == + Window_frame_bound::PRECEDING); + if (bound->offset == NULL) + { + return new Frame_unbounded(is_start_bound, is_preceding); + } + + longlong n_rows= bound->offset->val_int(); + return new Frame_n_rows(is_start_bound, is_preceding, n_rows); + } + + if (bound->precedence_type == Window_frame_bound::CURRENT) + { + return new Frame_current_row(is_start_bound); + } + return NULL; +} + + +/* + Streamed window function computation with window frames. + + We make a single pass over the ordered temp.table, but we're using three + cursors: + - current row - the row that we're computing window func value for) + - start_bound - the start of the frame + - end_bound - the end of the frame + + All three cursors move together. + + @todo + Provided bounds have their 'cursors'... is it better to re-clone their + cursors or re-position them onto the current row? + + @detail + ROWS BETWEEN 3 PRECEDING -- frame start + AND 3 FOLLOWING -- frame end + + /------ frame end (aka BOTTOM) + Dataset start | + --------====*=======[*]========*========-------->> dataset end + | \ + | +-------- current row + | + \-------- frame start ("TOP") + + - frame_end moves forward and adds rows into the aggregate function. + - frame_start follows behind and removes rows from the aggregate function. + - current_row is the row where the value of aggregate function is stored. + + @TODO: Only the first cursor needs to check for run-out-of-partition + condition (Others can catch up by counting rows?) + +*/ + +bool compute_window_func_with_frames(Item_window_func *item_win, + TABLE *tbl, READ_RECORD *info) +{ + THD *thd= current_thd; + int err= 0; + Frame_cursor *start_bound; + Frame_cursor *end_bound; + + Item_sum *sum_func= item_win->window_func; + /* This algorithm doesn't support DISTINCT aggregator */ + sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); + + Window_frame *window_frame= item_win->window_spec->window_frame; + DBUG_ASSERT(window_frame->units == Window_frame::UNITS_ROWS); + start_bound= get_frame_cursor(window_frame->top_bound, true); + end_bound= get_frame_cursor(window_frame->bottom_bound, false); + + start_bound->init(thd, info, &item_win->window_spec->partition_list); + end_bound->init(thd, info, &item_win->window_spec->partition_list); + + bool is_error= false; + bool first_row= true; + uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0)); + + while (true) + { + if (first_row) + { + /* Start the first partition */ + sum_func->clear(); + end_bound->next_partition(true, sum_func); + start_bound->next_partition(true, sum_func); + } + else + { + /* These can write into tbl->record[0] */ + end_bound->next_row(sum_func); + start_bound->next_row(sum_func); + } + + if ((err=info->read_record(info))) + { + /* End of file */ + break; + } + + store_record(tbl,record[1]); + bool partition_changed= (item_win->check_partition_bound() > -1)? true: + false; + if (!first_row && partition_changed) + { + sum_func->clear(); + tbl->file->position(tbl->record[0]); + memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length); + /* + Ok, the current row is the first row in the new partition. + We move end_bound first, because we want rows to be added into the + aggregate before start_bound attempts to remove them. + */ + end_bound->next_partition(false, sum_func); + + /* + The problem is, the above call may have made tbl->record[0] to point to + some other record. + */ + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); + start_bound->next_partition(false, sum_func); + + /* + The same problem again. The above call may have moved table's current + record. We need to make the current row current, so that ha_update_row + call below updates the right row. + */ + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); + } + first_row= false; + + /* Read the current row and update it */ + item_win->save_in_field(item_win->result_field, true); + err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); + if (err && err != HA_ERR_RECORD_IS_THE_SAME) + { + is_error= true; + break; + } + } + + my_free(rowid_buf); + delete start_bound; + delete end_bound; + return is_error? true: false; +} + + /* @brief This function is called by JOIN::exec to compute window function values @@ -264,30 +738,34 @@ bool JOIN::process_window_functions(List *curr_fields_list) TABLE *tbl= join_tab[top_join_tab_count].table; if (init_read_record(&info, thd, tbl, select, 0, 1, FALSE)) return true; - + bool is_error= false; + item_win->setup_partition_border_check(thd); - - int err; - while (!(err=info.read_record(&info))) + + Item_sum::Sumfunctype type= item_win->window_func->sum_func(); + if (type == Item_sum::ROW_NUMBER_FUNC || + type == Item_sum::RANK_FUNC || + type == Item_sum::DENSE_RANK_FUNC) { - store_record(tbl,record[1]); - /* - This will cause window function to compute its value for the - current row : + One-pass window function computation, walk through the rows and + assign values. */ - item_win->advance_window(); - - /* - Put the new value into temptable's field - TODO: Should this use item_win->update_field() call? - Regular aggegate function implementations seem to implement it. - */ - item_win->save_in_field(item_win->result_field, true); - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); - if (err && err != HA_ERR_RECORD_IS_THE_SAME) - return true; + if (compute_window_func_values(item_win, tbl, &info)) + is_error= true; } + else if (type == Item_sum::COUNT_FUNC) + { + /* + Frame-aware window function computation. It does one pass, but + uses three cursors -frame_start, current_row, and frame_end. + */ + if (compute_window_func_with_frames(item_win, tbl, &info)) + is_error= true; + } + else + DBUG_ASSERT(0); + item_win->set_read_value_from_result_field(); /* This calls filesort_free_buffers(): */ end_read_record(&info); @@ -295,6 +773,9 @@ bool JOIN::process_window_functions(List *curr_fields_list) delete join_tab[top_join_tab_count].filesort; join_tab[top_join_tab_count].filesort= NULL; free_io_cache(tbl); + + if (is_error) + return true; } } }