1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

PARTITION BY SYSTEM_TIME INTERVAL ...

Lots of changes:
* calculate the current history partition in ::external_lock(),
  not in ::write_row() or ::update_row()
* remove dynamically collected per-partition row_end stats
* no full table scan in open_table_from_share to calculate these
  stats, no manual MDL/thr_locks in open_table_from_share
* no shared stats in TABLE_SHARE = no mutexes or condition waits when
  calculating current history partition
* always compare timestamps, don't convert them to MYSQL_TIME
  (avoid DST ambiguity, and it's faster too)
* correct interval handling, 1 month = 1 month, not 30 * 24 * 3600 seconds
* save/restore first partition start time, and count intervals from there
* only allow to drop first partitions if INTERVAL
* when adding new history partitions, split the data in the last history
  parition, if it was overflowed
* show partition boundaries in INFORMATION_SCHEMA.PARTITIONS
This commit is contained in:
Sergei Golubchik
2018-02-21 15:16:19 +01:00
parent 7961bc4b89
commit e36c5ec0a5
16 changed files with 463 additions and 974 deletions

View File

@ -137,10 +137,6 @@ x A B
execute select_pn;
x C D
1 1 1
## pruning check
explain partitions select * from tN;
id select_type table partitions type possible_keys key key_len ref rows Extra
N SIMPLE tN pN,pn system NULL NULL NULL NULL N
set @str= concat('select row_start from t1 partition (pn) into @ts0');
prepare stmt from @str;
execute stmt;
@ -225,33 +221,41 @@ t1 CREATE TABLE `t1` (
PARTITION `pn` CURRENT ENGINE = DEFAULT_ENGINE)
alter table t1 drop partition non_existent;
ERROR HY000: Error in list of partitions to DROP
insert into t1 values (1), (2), (3);
insert into t1 values (1), (2), (3), (4), (5), (6);
select * from t1 partition (pn);
x
1
2
3
### warn about partition switching
4
5
6
delete from t1 where x < 4;
delete from t1;
Warnings:
Note 4114 Versioned table `test`.`t1`: switching from partition `p0` to `p1`
select * from t1 partition (p0);
x
1
2
3
select * from t1 partition (p1);
x
3
insert into t1 values (4), (5);
4
5
6
insert into t1 values (7), (8);
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
### warn about full partition
delete from t1;
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
select * from t1 partition (p1) order by x;
x
3
4
5
6
7
8
### Assertion in ALTER on warning from partitioning LIMIT [#446]
create or replace table t1 (x int) with system versioning;
insert into t1 values (1), (2);
@ -259,8 +263,6 @@ delete from t1;
alter table t1 partition by system_time limit 1 (
partition p1 history,
partition pn current);
Warnings:
Note 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
## rotation by INTERVAL
create or replace table t1 (x int)
with system versioning
@ -269,6 +271,16 @@ partition p0 history,
partition p1 history,
partition pn current);
ERROR HY000: Wrong parameters for partitioned `t1`: wrong value for 'INTERVAL'
create or replace table t1 (x int)
with system versioning
partition by system_time interval 1 second starts 12345 (
partition p0 history,
partition p1 history,
partition pn current);
ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'starts 12345 (
partition p0 history,
partition p1 history,
partition pn current)' at line 3
### ha_partition::update_row() check
create or replace table t1 (x int)
with system versioning
@ -286,7 +298,7 @@ x
delete from t1 where x < 3;
delete from t1;
Warnings:
Note 4114 Versioned table `test`.`t1`: switching from partition `p0` to `p1`
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
select * from t1 partition (p0) order by x;
x
1
@ -306,7 +318,7 @@ insert into t1 values (1);
update t1 set x= 2;
update t1 set x= 3;
Warnings:
Note 4114 Versioned table `test`.`t1`: switching from partition `p0` to `p1`
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
select * from t1 partition (p0);
x
1
@ -332,24 +344,23 @@ select * from t1 partition (pnsp1);
x
2
4
### warn about partition switching and about full partition
### warn about full partition
delete from t1 where x < 3;
delete from t1;
delete from t1;
Warnings:
Note 4114 Versioned table `test`.`t1`: switching from partition `p0` to `p1`
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
select * from t1 partition (p0sp0);
x
1
3
select * from t1 partition (p0sp1);
x
select * from t1 partition (p1sp0);
x
5
select * from t1 partition (p1sp1);
select * from t1 partition (p0sp1);
x
2
4
select * from t1 partition (p1sp0);
x
select * from t1 partition (p1sp1);
x
create or replace table t1 (a bigint)
with system versioning
partition by range (a)
@ -419,14 +430,8 @@ alter table t1 partition by system_time limit 1 (
partition p1 history,
partition p2 history,
partition pn current);
Warnings:
Note 4114 Versioned table `test`.`t1`: switching from partition `p1` to `p2`
delete from t1 where x = 1;
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p2` is full, add more HISTORY partitions
delete from t1 where x = 2;
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p2` is full, add more HISTORY partitions
# MDEV-14923 Assertion upon INSERT into locked versioned partitioned table
create or replace table t1 (x int) with system versioning
partition by system_time (partition p1 history, partition pn current);
@ -435,6 +440,95 @@ alter table t1 add partition (partition p1 history);
ERROR HY000: Duplicate partition name p1
insert into t1 values (1);
unlock tables;
create or replace table t1 (pk int) with system versioning
partition by system_time interval 10 year (
partition p1 history,
partition p2 history,
partition pn current
);
ERROR 22003: TIMESTAMP value is out of range in 'INTERVAL'
create or replace table t1 (i int) with system versioning
partition by system_time interval 1 hour (
partition p0 history, partition pn current);
set @ts=(select partition_description from information_schema.partitions
where table_schema='test' and table_name='t1' and partition_name='p0');
alter table t1 add column b int;
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
partition_name partition_ordinal_position partition_method timediff(partition_description, @ts)
p0 1 SYSTEM_TIME 00:00:00.000000
pn 2 SYSTEM_TIME NULL
Warnings:
Warning 1292 Truncated incorrect time value: 'CURRENT'
alter table t1 add partition (partition p1 history, partition p2 history);
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
partition_name partition_ordinal_position partition_method timediff(partition_description, @ts)
p0 1 SYSTEM_TIME 00:00:00.000000
p1 2 SYSTEM_TIME 01:00:00.000000
p2 3 SYSTEM_TIME 02:00:00.000000
pn 4 SYSTEM_TIME NULL
Warnings:
Warning 1292 Truncated incorrect time value: 'CURRENT'
alter table t1 drop partition p0;
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
partition_name partition_ordinal_position partition_method timediff(partition_description, @ts)
p1 1 SYSTEM_TIME 01:00:00.000000
p2 2 SYSTEM_TIME 02:00:00.000000
pn 3 SYSTEM_TIME NULL
Warnings:
Warning 1292 Truncated incorrect time value: 'CURRENT'
alter table t1 drop partition p2;
ERROR HY000: Can only drop oldest partitions when rotating by INTERVAL
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
partition_name partition_ordinal_position partition_method timediff(partition_description, @ts)
p1 1 SYSTEM_TIME 01:00:00.000000
p2 2 SYSTEM_TIME 02:00:00.000000
pn 3 SYSTEM_TIME NULL
Warnings:
Warning 1292 Truncated incorrect time value: 'CURRENT'
create or replace table t1 (i int) with system versioning
partition by system_time interval 1 second
subpartition by key (i) subpartitions 2
(partition p1 history, partition pn current);
insert t1 values (1);
delete from t1;
insert t1 values (2);
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
delete from t1;
Warnings:
Warning 4112 Versioned table `test`.`t1`: partition `p1` is full, add more HISTORY partitions
alter table t1 add partition (partition p0 history, partition p2 history);
select subpartition_name,table_rows from information_schema.partitions where table_schema='test' and table_name='t1';
subpartition_name table_rows
p1sp0 1
p1sp1 0
p0sp0 0
p0sp1 1
p2sp0 0
p2sp1 0
pnsp0 0
pnsp1 0
## pruning check
set @ts=(select partition_description from information_schema.partitions
where table_schema='test' and table_name='t1' and partition_name='p0' limit 1);
insert into t1 values (4),(5);
select * from t1;
i
4
5
explain partitions select * from t1;
id select_type table partitions type possible_keys key key_len ref rows Extra
1 SIMPLE t1 pn_pnsp0,pn_pnsp1 ALL NULL NULL NULL NULL 2 Using where
explain partitions select * from t1 for system_time as of from_unixtime(@ts);
id select_type table partitions type possible_keys key key_len ref rows Extra
1 SIMPLE t1 p1_p1sp0,p1_p1sp1,p0_p0sp0,p0_p0sp1,p2_p2sp0,p2_p2sp1,pn_pnsp0,pn_pnsp1 ALL NULL NULL NULL NULL # Using where
set @ts=(select row_end from t1 for system_time all where i=1);
select * from t1 for system_time all where row_end = @ts;
i
1
explain partitions select * from t1 for system_time all where row_end = @ts;
id select_type table partitions type possible_keys key key_len ref rows Extra
1 SIMPLE t1 p1_p1sp0,p1_p1sp1 # NULL NULL NULL NULL # #
# Test cleanup
drop database test;
create database test;

View File

@ -56,8 +56,6 @@ partition pn current);
insert into t values (1);
update t set a= 2;
update t set a= 3;
Warnings:
Note 4114 Versioned table `test`.`t`: switching from partition `p0` to `p1`
delete history from t;
select * from t for system_time all;
a

View File

@ -139,10 +139,6 @@ prepare select_pn from @str;
execute select_p0;
execute select_pn;
--echo ## pruning check
--replace_regex /\d/N/ /ALL/system/ /Using where//
explain partitions select * from t1;
set @str= concat('select row_start from t1 partition (pn) into @ts0');
prepare stmt from @str; execute stmt; drop prepare stmt;
@ -212,14 +208,14 @@ show create table t1;
--error ER_DROP_PARTITION_NON_EXISTENT
alter table t1 drop partition non_existent;
insert into t1 values (1), (2), (3);
insert into t1 values (1), (2), (3), (4), (5), (6);
select * from t1 partition (pn);
--echo ### warn about partition switching
delete from t1 where x < 4;
delete from t1;
select * from t1 partition (p0);
select * from t1 partition (p1);
insert into t1 values (4), (5);
insert into t1 values (7), (8);
--echo ### warn about full partition
delete from t1;
select * from t1 partition (p1) order by x;
@ -241,6 +237,14 @@ partition by system_time interval 0 second (
partition p1 history,
partition pn current);
--error ER_PARSE_ERROR
create or replace table t1 (x int)
with system versioning
partition by system_time interval 1 second starts 12345 (
partition p0 history,
partition p1 history,
partition pn current);
--echo ### ha_partition::update_row() check
create or replace table t1 (x int)
with system versioning
@ -288,7 +292,9 @@ insert into t1 (x) values (1), (2), (3), (4), (5);
select * from t1 partition (pnsp0);
select * from t1 partition (pnsp1);
--echo ### warn about partition switching and about full partition
--echo ### warn about full partition
delete from t1 where x < 3;
delete from t1;
delete from t1;
select * from t1 partition (p0sp0);
select * from t1 partition (p0sp1);
@ -384,6 +390,56 @@ alter table t1 add partition (partition p1 history);
insert into t1 values (1);
unlock tables;
--error ER_DATA_OUT_OF_RANGE
create or replace table t1 (pk int) with system versioning
partition by system_time interval 10 year (
partition p1 history,
partition p2 history,
partition pn current
);
# INTERVAL and ALTER TABLE
create or replace table t1 (i int) with system versioning
partition by system_time interval 1 hour (
partition p0 history, partition pn current);
set @ts=(select partition_description from information_schema.partitions
where table_schema='test' and table_name='t1' and partition_name='p0');
alter table t1 add column b int;
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
alter table t1 add partition (partition p1 history, partition p2 history);
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
alter table t1 drop partition p0;
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
--error ER_VERS_DROP_PARTITION_INTERVAL
alter table t1 drop partition p2;
select partition_name,partition_ordinal_position,partition_method,timediff(partition_description, @ts) from information_schema.partitions where table_schema='test' and table_name='t1';
create or replace table t1 (i int) with system versioning
partition by system_time interval 1 second
subpartition by key (i) subpartitions 2
(partition p1 history, partition pn current);
insert t1 values (1); delete from t1;
sleep 1;
insert t1 values (2); delete from t1;
alter table t1 add partition (partition p0 history, partition p2 history);
select subpartition_name,table_rows from information_schema.partitions where table_schema='test' and table_name='t1';
--echo ## pruning check
set @ts=(select partition_description from information_schema.partitions
where table_schema='test' and table_name='t1' and partition_name='p0' limit 1);
insert into t1 values (4),(5);
--sorted_result
select * from t1;
explain partitions select * from t1;
--replace_column 10 #
explain partitions select * from t1 for system_time as of from_unixtime(@ts);
set @ts=(select row_end from t1 for system_time all where i=1);
select * from t1 for system_time all where row_end = @ts;
--replace_column 5 # 10 # 11 #
explain partitions select * from t1 for system_time all where row_end = @ts;
--echo # Test cleanup
drop database test;
create database test;

View File

@ -2037,6 +2037,11 @@ int ha_partition::copy_partitions(ulonglong * const copied,
else
set_linear_hash_mask(m_part_info, m_part_info->num_subparts);
}
else if (m_part_info->part_type == VERSIONING_PARTITION)
{
if (m_part_info->check_constants(ha_thd(), m_part_info))
goto init_error;
}
while (reorg_part < m_reorged_parts)
{
@ -3909,8 +3914,13 @@ int ha_partition::external_lock(THD *thd, int lock_type)
(void) (*file)->ha_external_lock(thd, lock_type);
} while (*(++file));
}
if (lock_type == F_WRLCK && m_part_info->part_expr)
m_part_info->part_expr->walk(&Item::register_field_in_read_map, 1, 0);
if (lock_type == F_WRLCK)
{
if (m_part_info->part_expr)
m_part_info->part_expr->walk(&Item::register_field_in_read_map, 1, 0);
if (m_part_info->part_type == VERSIONING_PARTITION)
m_part_info->vers_set_hist_part(thd);
}
DBUG_RETURN(0);
err_handler:
@ -4270,9 +4280,6 @@ int ha_partition::write_row(uchar * buf)
set_auto_increment_if_higher(table->next_number_field);
reenable_binlog(thd);
if (m_part_info->part_type == VERSIONING_PARTITION)
m_part_info->vers_update_stats(thd, part_id);
exit:
thd->variables.sql_mode= saved_sql_mode;
table->auto_increment_field_not_null= saved_auto_inc_field_not_null;
@ -4381,9 +4388,6 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
if (error)
goto exit;
if (m_part_info->part_type == VERSIONING_PARTITION)
m_part_info->vers_update_stats(thd, new_part_id);
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[old_part_id]->ha_delete_row(old_data);
reenable_binlog(thd);

View File

@ -7295,27 +7295,6 @@ bool Item_temporal_literal::eq(const Item *item, bool binary_cmp) const
&((Item_temporal_literal *) item)->cached_time);
}
bool Item_temporal_literal::operator<(const MYSQL_TIME &ltime) const
{
if (my_time_compare(&cached_time, &ltime) < 0)
return true;
return false;
}
bool Item_temporal_literal::operator>(const MYSQL_TIME &ltime) const
{
if (my_time_compare(&cached_time, &ltime) > 0)
return true;
return false;
}
bool Item_temporal_literal::operator==(const MYSQL_TIME &ltime) const
{
if (my_time_compare(&cached_time, &ltime) == 0)
return true;
return false;
}
void Item_date_literal::print(String *str, enum_query_type query_type)
{
str->append("DATE'");

View File

@ -4230,13 +4230,6 @@ public:
{ return val_decimal_from_date(decimal_value); }
int save_in_field(Field *field, bool no_conversions)
{ return save_date_in_field(field, no_conversions); }
void set_time(MYSQL_TIME *ltime)
{
cached_time= *ltime;
}
bool operator>(const MYSQL_TIME &ltime) const;
bool operator<(const MYSQL_TIME &ltime) const;
bool operator==(const MYSQL_TIME &ltime) const;
};

View File

@ -3463,13 +3463,6 @@ bool prune_partitions(THD *thd, TABLE *table, Item *pprune_cond)
DBUG_RETURN(FALSE);
}
if (part_info->part_type == VERSIONING_PARTITION &&
part_info->vers_update_range_constants(thd))
{
retval= TRUE;
goto end2;
}
dbug_tmp_use_all_columns(table, old_sets,
table->read_set, table->write_set);
range_par->thd= thd;
@ -3569,7 +3562,6 @@ all_used:
mark_all_partitions_as_used(prune_param.part_info);
end:
dbug_tmp_restore_column_maps(table->read_set, table->write_set, old_sets);
end2:
thd->no_errors=0;
thd->mem_root= range_par->old_root;
free_root(&alloc,MYF(0)); // Return memory & allocator

View File

@ -90,61 +90,6 @@ typedef struct p_elem_val
struct st_ddl_log_memory_entry;
/* Used for collecting MIN/MAX stats on row_end for doing pruning
in SYSTEM_TIME partitiong. */
class Vers_min_max_stats : public Sql_alloc
{
static const uint buf_size= 4 + (TIME_SECOND_PART_DIGITS + 1) / 2;
uchar min_buf[buf_size];
uchar max_buf[buf_size];
Field_timestampf min_value;
Field_timestampf max_value;
mysql_rwlock_t lock;
public:
Vers_min_max_stats(const LEX_CSTRING *field_name, TABLE_SHARE *share) :
min_value(min_buf, NULL, 0, Field::NONE, field_name, share, 6),
max_value(max_buf, NULL, 0, Field::NONE, field_name, share, 6)
{
min_value.set_max();
memset(max_buf, 0, buf_size);
mysql_rwlock_init(key_rwlock_LOCK_vers_stats, &lock);
}
~Vers_min_max_stats()
{
mysql_rwlock_destroy(&lock);
}
bool update_unguarded(Field *from)
{
return
from->update_min(&min_value, false) +
from->update_max(&max_value, false);
}
bool update(Field *from)
{
mysql_rwlock_wrlock(&lock);
bool res= update_unguarded(from);
mysql_rwlock_unlock(&lock);
return res;
}
my_time_t min_time()
{
mysql_rwlock_rdlock(&lock);
ulong sec_part;
my_time_t res= min_value.get_timestamp(&sec_part);
mysql_rwlock_unlock(&lock);
return res;
}
my_time_t max_time()
{
mysql_rwlock_rdlock(&lock);
ulong sec_part;
my_time_t res= max_value.get_timestamp(&sec_part);
mysql_rwlock_unlock(&lock);
return res;
}
};
enum stat_trx_field
{
STAT_TRX_END= 0

View File

@ -871,487 +871,99 @@ bool partition_info::vers_init_info(THD * thd)
return false;
}
bool partition_info::vers_set_interval(const INTERVAL & i)
void partition_info::vers_set_hist_part(THD *thd)
{
if (i.neg || i.second_part)
return true;
DBUG_ASSERT(vers_info);
// TODO: INTERVAL conversion to seconds leads to mismatch with calendar intervals (MONTH and YEAR)
vers_info->interval= static_cast<my_time_t>(
i.second +
i.minute * 60 +
i.hour * 60 * 60 +
i.day * 24 * 60 * 60 +
i.month * 30 * 24 * 60 * 60 +
i.year * 365 * 30 * 24 * 60 * 60);
if (vers_info->interval == 0)
return true;
return false;
}
bool partition_info::vers_set_limit(ulonglong limit)
{
if (limit < 1)
return true;
DBUG_ASSERT(vers_info);
vers_info->limit= limit;
return false;
}
partition_element*
partition_info::vers_part_rotate(THD * thd)
{
DBUG_ASSERT(table && table->s);
DBUG_ASSERT(vers_info && vers_info->initialized());
if (table->s->hist_part_id >= vers_info->now_part->id - 1)
if (vers_info->limit)
{
DBUG_ASSERT(table->s->hist_part_id == vers_info->now_part->id - 1);
push_warning_printf(thd,
thd->lex->sql_command == SQLCOM_ALTER_TABLE ?
Sql_condition::WARN_LEVEL_NOTE :
Sql_condition::WARN_LEVEL_WARN,
WARN_VERS_PART_FULL,
ER_THD(thd, WARN_VERS_PART_FULL),
table->s->db.str, table->s->error_table_name(),
vers_info->hist_part->partition_name);
return vers_info->hist_part;
ha_partition *hp= (ha_partition*)(table->file);
partition_element *next= NULL;
List_iterator<partition_element> it(partitions);
while (next != vers_info->hist_part)
next= it++;
ha_rows records= hp->part_records(next);
while ((next= it++) != vers_info->now_part)
{
ha_rows next_records= hp->part_records(next);
if (next_records == 0)
break;
vers_info->hist_part= next;
records= next_records;
}
if (records > vers_info->limit)
{
if (next == vers_info->now_part)
goto warn;
vers_info->hist_part= next;
}
return;
}
table->s->hist_part_id++;
const char* old_part_name= vers_info->hist_part->partition_name;
vers_hist_part();
push_warning_printf(thd,
Sql_condition::WARN_LEVEL_NOTE,
WARN_VERS_PART_ROTATION,
ER_THD(thd, WARN_VERS_PART_ROTATION),
table->s->db.str, table->s->error_table_name(),
old_part_name,
vers_info->hist_part->partition_name);
return vers_info->hist_part;
}
bool partition_info::vers_set_expression(THD *thd, partition_element *el, MYSQL_TIME& t)
{
curr_part_elem= el;
init_column_part(thd);
el->list_val_list.empty();
el->list_val_list.push_back(curr_list_val, thd->mem_root);
for (uint i= 0; i < num_columns; ++i)
if (vers_info->interval.is_set())
{
part_column_list_val *col_val= add_column_value(thd);
if (el->type() == partition_element::CURRENT)
if (vers_info->hist_part->range_value > thd->system_time)
return;
partition_element *next= NULL;
List_iterator<partition_element> it(partitions);
while (next != vers_info->hist_part)
next= it++;
while ((next= it++) != vers_info->now_part)
{
col_val->max_value= true;
col_val->item_expression= NULL;
col_val->column_value= NULL;
col_val->part_info= this;
col_val->fixed= 1;
continue;
vers_info->hist_part= next;
if (next->range_value > thd->system_time)
return;
}
Item *item_expression= new (thd->mem_root)
Item_datetime_literal(thd, &t, 0);
if (!item_expression)
return true;
/* We initialize col_val with bogus max value to make fix_partition_func() and check_range_constants() happy.
Later in vers_setup_stats() it is initialized with real stat value if there will be any. */
/* FIXME: TIME_RESULT in col_val is expensive. It should be INT_RESULT
(got to be fixed when InnoDB is supported). */
init_col_val(col_val, item_expression);
DBUG_ASSERT(item_expression == el->get_col_val(i).item_expression);
} // for (num_columns)
return false;
goto warn;
}
return;
warn:
my_error(WARN_VERS_PART_FULL, MYF(ME_WARNING|ME_ERROR_LOG),
table->s->db.str, table->s->error_table_name(),
vers_info->hist_part->partition_name);
}
bool partition_info::vers_setup_expression(THD * thd, uint32 alter_add)
{
DBUG_ASSERT(part_type == VERSIONING_PARTITION);
if (!table->versioned(VERS_TIMESTAMP))
if (!table->versioned())
{
my_error(ER_VERS_ENGINE_UNSUPPORTED, MYF(0), table->s->table_name.str);
return true;
}
if (alter_add)
DBUG_ASSERT(part_type == VERSIONING_PARTITION);
DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
DBUG_ASSERT(num_columns == 1);
if (!alter_add)
{
DBUG_ASSERT(partitions.elements > alter_add + 1);
Vers_min_max_stats** old_array= table->s->stat_trx;
table->s->stat_trx= static_cast<Vers_min_max_stats**>(
alloc_root(&table->s->mem_root, sizeof(void *) * (partitions.elements * num_columns + 1)));
memcpy(table->s->stat_trx, old_array, sizeof(void *) * (partitions.elements - alter_add) * num_columns);
table->s->stat_trx[partitions.elements * num_columns]= NULL;
}
else
{
/* Prepare part_field_list */
Field *row_end= table->vers_end_field();
part_field_list.push_back(row_end->field_name.str, thd->mem_root);
DBUG_ASSERT(part_field_list.elements == num_columns);
DBUG_ASSERT(part_field_list.elements == 1);
// needed in handle_list_of_fields()
row_end->flags|= GET_FIXED_FIELDS_FLAG;
}
List_iterator<partition_element> it(partitions);
partition_element *el;
MYSQL_TIME t;
memset(&t, 0, sizeof(t));
my_time_t ts= TIMESTAMP_MAX_VALUE - partitions.elements;
uint32 id= 0;
while ((el= it++))
if (alter_add)
{
DBUG_ASSERT(el->type() != partition_element::CONVENTIONAL);
++ts;
if (alter_add)
List_iterator<partition_element> it(partitions);
partition_element *el;
for(uint32 id= 0; ((el= it++)); id++)
{
/* Non-empty historical partitions are left as is. */
if (el->type() == partition_element::HISTORY && !el->empty)
{
++id;
continue;
}
DBUG_ASSERT(el->type() != partition_element::CONVENTIONAL);
/* Newly added element is inserted before AS_OF_NOW. */
if (el->id == UINT_MAX32 || el->type() == partition_element::CURRENT)
{
DBUG_ASSERT(table && table->s);
Vers_min_max_stats *stat_trx_end= new (&table->s->mem_root)
Vers_min_max_stats(&table->s->vers_end_field()->field_name, table->s);
table->s->stat_trx[id * num_columns + STAT_TRX_END]= stat_trx_end;
el->id= id++;
el->id= id;
if (el->type() == partition_element::CURRENT)
break;
goto set_expression;
}
/* Existing element expression is recalculated. */
thd->variables.time_zone->gmt_sec_to_TIME(&t, ts);
for (uint i= 0; i < num_columns; ++i)
{
part_column_list_val &col_val= el->get_col_val(i);
static_cast<Item_datetime_literal *>(col_val.item_expression)->set_time(&t);
col_val.fixed= 0;
}
++id;
continue;
}
set_expression:
thd->variables.time_zone->gmt_sec_to_TIME(&t, ts);
if (vers_set_expression(thd, el, t))
return true;
}
return false;
}
class Table_locker
{
THD *thd;
TABLE &table;
thr_lock_type saved_type;
MYSQL_LOCK *saved_lock;
enum_locked_tables_mode saved_mode;
TABLE_LIST **saved_query_tables_own_last;
TABLE_LIST table_list;
bool locked;
public:
Table_locker(THD *_thd, TABLE &_table, thr_lock_type lock_type) :
thd(_thd),
table(_table),
saved_type(table.reginfo.lock_type),
saved_lock(_thd->lock),
saved_mode(_thd->locked_tables_mode),
saved_query_tables_own_last(_thd->lex->query_tables_own_last),
table_list(&_table, lock_type),
locked(false)
{
table.reginfo.lock_type= lock_type;
}
bool lock()
{
DBUG_ASSERT(table.file);
// FIXME: check consistency with table.reginfo.lock_type
if (table.file->get_lock_type() != F_UNLCK
|| table.s->tmp_table)
{
return false;
}
thd->lock= NULL;
thd->locked_tables_mode= LTM_NONE;
thd->lex->query_tables_own_last= NULL;
bool res= lock_tables(thd, &table_list, 1, 0);
locked= !res;
return res;
}
~Table_locker()
{
if (locked)
mysql_unlock_tables(thd, thd->lock);
table.reginfo.lock_type= saved_type;
thd->lock= saved_lock;
thd->locked_tables_mode= saved_mode;
thd->lex->query_tables_own_last= saved_query_tables_own_last;
if (locked && !thd->in_sub_stmt)
{
ha_commit_trans(thd, false);
ha_commit_trans(thd, true);
}
}
};
// scan table for min/max row_end
inline
bool partition_info::vers_scan_min_max(THD *thd, partition_element *part)
{
uint32 sub_factor= num_subparts ? num_subparts : 1;
uint32 part_id= part->id * sub_factor;
uint32 part_id_end= part_id + sub_factor;
DBUG_ASSERT(part->empty);
DBUG_ASSERT(part->type() == partition_element::HISTORY);
DBUG_ASSERT(table->s->stat_trx);
Table_locker l(thd, *table, TL_READ);
if (l.lock())
{
my_error(ER_INTERNAL_ERROR, MYF(0), "min/max scan failed on lock_tables()");
return true;
}
for (; part_id < part_id_end; ++part_id)
{
handler *file= table->file->part_handler(part_id); // requires update_partition() for ha_innopart
DBUG_ASSERT(file);
table->default_column_bitmaps();
bitmap_set_bit(table->read_set, table->vers_end_field()->field_index);
file->column_bitmaps_signal();
int rc= file->ha_rnd_init(true);
if (!rc)
{
while ((rc= file->ha_rnd_next(table->record[0])) != HA_ERR_END_OF_FILE)
{
if (part->empty)
part->empty= false;
if (thd->killed)
{
file->ha_rnd_end();
file->update_partition(part_id);
ha_commit_trans(thd, false);
return true;
}
if (rc)
{
if (rc == HA_ERR_RECORD_DELETED)
continue;
break;
}
if (table->vers_end_field()->is_max())
{
rc= HA_ERR_INTERNAL_ERROR;
push_warning_printf(thd,
Sql_condition::WARN_LEVEL_WARN,
WARN_VERS_PART_NON_HISTORICAL,
ER_THD(thd, WARN_VERS_PART_NON_HISTORICAL),
part->partition_name);
break;
}
if (table->versioned(VERS_TRX_ID))
{
uchar buf[8];
Field_timestampf fld(buf, NULL, 0, Field::NONE, &table->vers_end_field()->field_name, NULL, 6);
if (!vers_trx_id_to_ts(thd, table->vers_end_field(), fld))
{
vers_stat_trx(STAT_TRX_END, part).update_unguarded(&fld);
}
}
else
{
vers_stat_trx(STAT_TRX_END, part).update_unguarded(table->vers_end_field());
}
}
file->ha_rnd_end();
}
file->update_partition(part_id);
if (rc != HA_ERR_END_OF_FILE)
{
// TODO: print rc code
my_error(ER_INTERNAL_ERROR, MYF(0), "min/max scan failed in versioned partitions setup (see warnings)");
return true;
}
}
return false;
}
void partition_info::vers_update_col_vals(THD *thd, partition_element *el0, partition_element *el1)
{
MYSQL_TIME t;
memset(&t, 0, sizeof(t));
DBUG_ASSERT(table && table->s && table->s->stat_trx);
DBUG_ASSERT(!el0 || el1->id == el0->id + 1);
const uint idx= el1->id * num_columns;
my_time_t ts;
part_column_list_val *col_val;
Item_datetime_literal *val_item;
Vers_min_max_stats *stat_trx_x;
for (uint i= 0; i < num_columns; ++i)
{
stat_trx_x= table->s->stat_trx[idx + i];
if (el0)
{
ts= stat_trx_x->min_time();
thd->variables.time_zone->gmt_sec_to_TIME(&t, ts);
col_val= &el0->get_col_val(i);
val_item= static_cast<Item_datetime_literal*>(col_val->item_expression);
DBUG_ASSERT(val_item);
if (*val_item > t)
{
val_item->set_time(&t);
col_val->fixed= 0;
}
}
col_val= &el1->get_col_val(i);
if (!col_val->max_value)
{
ts= stat_trx_x->max_time() + 1;
thd->variables.time_zone->gmt_sec_to_TIME(&t, ts);
val_item= static_cast<Item_datetime_literal*>(col_val->item_expression);
DBUG_ASSERT(val_item);
if (*val_item < t)
{
val_item->set_time(&t);
col_val->fixed= 0;
}
}
}
}
// setup at open() phase (TABLE_SHARE is initialized)
bool partition_info::vers_setup_stats(THD * thd, bool is_create_table_ind)
{
DBUG_ASSERT(part_type == VERSIONING_PARTITION);
DBUG_ASSERT(vers_info && vers_info->initialized(false));
DBUG_ASSERT(table && table->s);
bool error= false;
TABLE_LIST tl(table, TL_READ);
MDL_auto_lock mdl_lock(thd, tl);
if (mdl_lock.acquire_error())
return true;
mysql_mutex_lock(&table->s->LOCK_rotation);
if (table->s->busy_rotation)
{
table->s->vers_wait_rotation();
vers_hist_part();
}
else
{
table->s->busy_rotation= true;
mysql_mutex_unlock(&table->s->LOCK_rotation);
DBUG_ASSERT(part_field_list.elements == num_columns);
bool dont_stat= true;
bool col_val_updated= false;
// initialize stat_trx
if (!table->s->stat_trx)
{
DBUG_ASSERT(partitions.elements > 1);
table->s->stat_trx= static_cast<Vers_min_max_stats**>(
alloc_root(&table->s->mem_root, sizeof(void *) * (partitions.elements * num_columns + 1)));
table->s->stat_trx[partitions.elements * num_columns]= NULL;
dont_stat= false;
}
// build freelist, scan min/max, assign hist_part
List_iterator<partition_element> it(partitions);
partition_element *el= NULL, *prev;
while ((prev= el, el= it++))
{
if (el->type() == partition_element::HISTORY && dont_stat)
{
if (el->id == table->s->hist_part_id)
{
vers_info->hist_part= el;
break;
}
continue;
}
{
Vers_min_max_stats *stat_trx_end= new (&table->s->mem_root)
Vers_min_max_stats(&table->s->vers_end_field()->field_name, table->s);
table->s->stat_trx[el->id * num_columns + STAT_TRX_END]= stat_trx_end;
}
if (!is_create_table_ind)
{
if (el->type() == partition_element::CURRENT)
{
uchar buf[8];
Field_timestampf fld(buf, NULL, 0, Field::NONE, &table->vers_end_field()->field_name, NULL, 6);
fld.set_max();
vers_stat_trx(STAT_TRX_END, el).update_unguarded(&fld);
el->empty= false;
}
else if (vers_scan_min_max(thd, el))
{
table->s->stat_trx= NULL; // may be a leak on endless table open
error= true;
break;
}
if (!el->empty)
{
vers_update_col_vals(thd, prev, el);
col_val_updated= true;
}
}
if (el->type() == partition_element::CURRENT)
break;
DBUG_ASSERT(el->type() == partition_element::HISTORY);
if (vers_info->hist_part)
{
if (!el->empty)
goto set_hist_part;
}
else
{
set_hist_part:
vers_info->hist_part= el;
continue;
}
} // while
if (!error && !dont_stat)
{
if (col_val_updated)
table->s->stat_serial++;
table->s->hist_part_id= vers_info->hist_part->id;
}
mysql_mutex_lock(&table->s->LOCK_rotation);
mysql_cond_broadcast(&table->s->COND_rotation);
table->s->busy_rotation= false;
}
mysql_mutex_unlock(&table->s->LOCK_rotation);
return error;
}
/*
Check that the partition/subpartition is setup to use the correct
@ -1682,11 +1294,11 @@ bool partition_info::check_partition_info(THD *thd, handlerton **eng_type,
DBUG_ASSERT(vers_info);
if (num_parts < 2 || !vers_info->now_part)
{
DBUG_ASSERT(info && info->alias.str);
DBUG_ASSERT(info);
DBUG_ASSERT(info->alias.str);
my_error(ER_VERS_WRONG_PARTS, MYF(0), info->alias.str);
goto end;
}
DBUG_ASSERT(vers_info->initialized(false));
DBUG_ASSERT(num_parts == partitions.elements);
}
i= 0;
@ -1800,7 +1412,7 @@ bool partition_info::check_partition_info(THD *thd, handlerton **eng_type,
if (hist_parts > 1)
{
if (unlikely(vers_info->limit == 0 && vers_info->interval == 0))
if (vers_info->limit == 0 && !vers_info->interval.is_set())
{
push_warning_printf(thd,
Sql_condition::WARN_LEVEL_WARN,

View File

@ -37,40 +37,43 @@ struct st_ddl_log_memory_entry;
struct Vers_part_info : public Sql_alloc
{
Vers_part_info() :
interval(0),
limit(0),
now_part(NULL),
hist_part(NULL),
stat_serial(0)
hist_part(NULL)
{
interval.type= INTERVAL_LAST;
}
Vers_part_info(Vers_part_info &src) :
interval(src.interval),
limit(src.limit),
now_part(NULL),
hist_part(NULL),
stat_serial(src.stat_serial)
hist_part(NULL)
{
}
bool initialized(bool fully= true)
bool initialized()
{
if (now_part)
{
DBUG_ASSERT(now_part->id != UINT_MAX32);
DBUG_ASSERT(now_part->type() == partition_element::CURRENT);
DBUG_ASSERT(!fully || (bool) hist_part);
DBUG_ASSERT(!hist_part || (
hist_part->id != UINT_MAX32 &&
hist_part->type() == partition_element::HISTORY));
if (hist_part)
{
DBUG_ASSERT(hist_part->id != UINT_MAX32);
DBUG_ASSERT(hist_part->type() == partition_element::HISTORY);
}
return true;
}
return false;
}
my_time_t interval;
struct {
my_time_t start;
INTERVAL step;
enum interval_type type;
bool is_set() { return type < INTERVAL_LAST; }
} interval;
ulonglong limit;
partition_element *now_part;
partition_element *hist_part;
ulonglong stat_serial;
};
class partition_info : public Sql_alloc
@ -391,38 +394,25 @@ public:
bool has_unique_name(partition_element *element);
bool vers_init_info(THD *thd);
bool vers_set_interval(const INTERVAL &i);
bool vers_set_limit(ulonglong limit);
partition_element* vers_part_rotate(THD *thd);
bool vers_set_expression(THD *thd, partition_element *el, MYSQL_TIME &t);
bool vers_setup_expression(THD *thd, uint32 alter_add= 0); /* Stage 1. */
bool vers_setup_stats(THD *thd, bool is_create_table_ind); /* Stage 2. */
bool vers_scan_min_max(THD *thd, partition_element *part);
void vers_update_col_vals(THD *thd, partition_element *el0, partition_element *el1);
partition_element *vers_hist_part()
bool vers_set_interval(Item *item, interval_type int_type, my_time_t start)
{
DBUG_ASSERT(table && table->s);
DBUG_ASSERT(vers_info && vers_info->initialized());
DBUG_ASSERT(table->s->hist_part_id != UINT_MAX32);
if (table->s->hist_part_id == vers_info->hist_part->id)
return vers_info->hist_part;
List_iterator<partition_element> it(partitions);
partition_element *el;
while ((el= it++))
{
DBUG_ASSERT(el->type() != partition_element::CONVENTIONAL);
if (el->type() == partition_element::HISTORY &&
el->id == table->s->hist_part_id)
{
vers_info->hist_part= el;
return vers_info->hist_part;
}
}
DBUG_ASSERT(0);
return NULL;
DBUG_ASSERT(part_type == VERSIONING_PARTITION);
vers_info->interval.type= int_type;
vers_info->interval.start= start;
return get_interval_value(item, int_type, &vers_info->interval.step) ||
vers_info->interval.step.neg || vers_info->interval.step.second_part ||
!(vers_info->interval.step.year || vers_info->interval.step.month ||
vers_info->interval.step.day || vers_info->interval.step.hour ||
vers_info->interval.step.minute || vers_info->interval.step.second);
}
bool vers_set_limit(ulonglong limit)
{
DBUG_ASSERT(part_type == VERSIONING_PARTITION);
vers_info->limit= limit;
return !limit;
}
void vers_set_hist_part(THD *thd);
bool vers_setup_expression(THD *thd, uint32 alter_add= 0); /* Stage 1. */
partition_element *get_partition(uint part_id)
{
List_iterator<partition_element> it(partitions);
@ -434,120 +424,7 @@ public:
}
return NULL;
}
bool vers_limit_exceed(partition_element *part= NULL)
{
DBUG_ASSERT(vers_info);
if (!vers_info->limit)
return false;
if (!part)
{
DBUG_ASSERT(vers_info->initialized());
part= vers_hist_part();
}
// TODO: cache thread-shared part_recs and increment on INSERT
return table->file->part_records(part) >= vers_info->limit;
}
Vers_min_max_stats& vers_stat_trx(stat_trx_field fld, uint32 part_element_id)
{
DBUG_ASSERT(table && table->s && table->s->stat_trx);
Vers_min_max_stats* res= table->s->stat_trx[part_element_id * num_columns + fld];
DBUG_ASSERT(res);
return *res;
}
Vers_min_max_stats& vers_stat_trx(stat_trx_field fld, partition_element *part)
{
DBUG_ASSERT(part);
return vers_stat_trx(fld, part->id);
}
bool vers_interval_exceed(my_time_t max_time, partition_element *part= NULL)
{
DBUG_ASSERT(vers_info);
if (!vers_info->interval)
return false;
if (!part)
{
DBUG_ASSERT(vers_info->initialized());
part= vers_hist_part();
}
my_time_t min_time= vers_stat_trx(STAT_TRX_END, part).min_time();
return max_time - min_time > vers_info->interval;
}
bool vers_interval_exceed(partition_element *part)
{
return vers_interval_exceed(vers_stat_trx(STAT_TRX_END, part).max_time(), part);
}
bool vers_interval_exceed()
{
return vers_interval_exceed(vers_hist_part());
}
bool vers_trx_id_to_ts(THD *thd, Field *in_trx_id, Field_timestamp &out_ts);
void vers_update_stats(THD *thd, partition_element *el)
{
DBUG_ASSERT(vers_info && vers_info->initialized());
DBUG_ASSERT(table && table->s);
DBUG_ASSERT(el && el->type() == partition_element::HISTORY);
bool updated;
mysql_rwlock_wrlock(&table->s->LOCK_stat_serial);
el->empty= false;
if (table->versioned(VERS_TRX_ID))
{
// transaction is not yet pushed to VTQ, so we use now-time
my_time_t end_ts= my_time_t(0);
uchar buf[8];
Field_timestampf fld(buf, NULL, 0, Field::NONE, &table->vers_end_field()->field_name, NULL, 6);
fld.store_TIME(end_ts, 0);
updated=
vers_stat_trx(STAT_TRX_END, el->id).update(&fld);
}
else
{
updated=
vers_stat_trx(STAT_TRX_END, el->id).update(table->vers_end_field());
}
if (updated)
table->s->stat_serial++;
mysql_rwlock_unlock(&table->s->LOCK_stat_serial);
if (updated)
{
vers_update_col_vals(thd,
el->id > 0 ? get_partition(el->id - 1) : NULL,
el);
}
}
void vers_update_stats(THD *thd, uint part_id)
{
DBUG_ASSERT(vers_info && vers_info->initialized());
uint lpart_id= num_subparts ? part_id / num_subparts : part_id;
if (lpart_id < vers_info->now_part->id)
vers_update_stats(thd, get_partition(lpart_id));
}
bool vers_update_range_constants(THD *thd)
{
DBUG_ASSERT(vers_info && vers_info->initialized());
DBUG_ASSERT(table && table->s);
mysql_rwlock_rdlock(&table->s->LOCK_stat_serial);
if (vers_info->stat_serial == table->s->stat_serial)
{
mysql_rwlock_unlock(&table->s->LOCK_stat_serial);
return false;
}
bool result= false;
for (uint i= 0; i < num_columns; ++i)
{
Field *f= part_field_array[i];
bitmap_set_bit(f->table->write_set, f->field_index);
}
MEM_ROOT *old_root= thd->mem_root;
thd->mem_root= &table->mem_root;
//result= check_range_constants(thd, false);
thd->mem_root= old_root;
vers_info->stat_serial= table->s->stat_serial;
mysql_rwlock_unlock(&table->s->LOCK_stat_serial);
return result;
}
};
uint32 get_next_partition_id_range(struct st_partition_iter* part_iter);

View File

@ -7837,8 +7837,8 @@ WARN_VERS_PART_FULL
WARN_VERS_PARAMETERS
eng "Maybe missing parameters: %s"
WARN_VERS_PART_ROTATION
eng "Versioned table %`s.%`s: switching from partition %`s to %`s"
ER_VERS_DROP_PARTITION_INTERVAL
eng "Can only drop oldest partitions when rotating by INTERVAL"
WARN_VERS_TRX_MISSING
eng "VTQ missing transaction ID %lu"

View File

@ -68,6 +68,7 @@
#include "sql_alter.h" // Alter_table_ctx
#include "sql_select.h"
#include "sql_tablespace.h" // check_tablespace_name
#include "tztime.h" // my_tz_OFFSET0
#include <algorithm>
using std::max;
@ -107,27 +108,12 @@ static uint32 get_next_subpartition_via_walking(PARTITION_ITERATOR*);
uint32 get_next_partition_id_range(PARTITION_ITERATOR* part_iter);
uint32 get_next_partition_id_list(PARTITION_ITERATOR* part_iter);
int get_part_iter_for_interval_via_mapping(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array,
uchar *min_value, uchar *max_value,
uint min_len, uint max_len,
uint flags,
PARTITION_ITERATOR *part_iter);
int get_part_iter_for_interval_cols_via_map(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array,
uchar *min_value, uchar *max_value,
uint min_len, uint max_len,
uint flags,
PARTITION_ITERATOR *part_iter);
int get_part_iter_for_interval_via_walking(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array,
uchar *min_value, uchar *max_value,
uint min_len, uint max_len,
uint flags,
PARTITION_ITERATOR *part_iter);
static int get_part_iter_for_interval_via_mapping(partition_info *, bool,
uint32 *, uchar *, uchar *, uint, uint, uint, PARTITION_ITERATOR *);
static int get_part_iter_for_interval_cols_via_map(partition_info *, bool,
uint32 *, uchar *, uchar *, uint, uint, uint, PARTITION_ITERATOR *);
static int get_part_iter_for_interval_via_walking(partition_info *, bool,
uint32 *, uchar *, uchar *, uint, uint, uint, PARTITION_ITERATOR *);
#ifdef WITH_PARTITION_STORAGE_ENGINE
static int cmp_rec_and_tuple(part_column_list_val *val, uint32 nvals_in_rec);
@ -1560,6 +1546,44 @@ end:
}
/* Set partition boundaries when rotating by INTERVAL */
static bool check_vers_constants(THD *thd, partition_info *part_info)
{
uint hist_parts= part_info->num_parts - 1;
Vers_part_info *vers_info= part_info->vers_info;
vers_info->hist_part= part_info->partitions.head();
vers_info->now_part= part_info->partitions.elem(hist_parts);
if (!vers_info->interval.is_set())
return 0;
part_info->range_int_array=
(longlong*) thd->alloc(hist_parts * sizeof(longlong));
MYSQL_TIME ltime;
List_iterator<partition_element> it(part_info->partitions);
partition_element *el;
my_tz_OFFSET0->gmt_sec_to_TIME(&ltime, vers_info->interval.start);
while ((el= it++)->id < hist_parts)
{
if (date_add_interval(&ltime, vers_info->interval.type,
vers_info->interval.step))
goto err;
uint error= 0;
part_info->range_int_array[el->id]= el->range_value=
my_tz_OFFSET0->TIME_to_gmt_sec(&ltime, &error);
if (error)
goto err;
if (vers_info->hist_part->range_value <= thd->system_time)
vers_info->hist_part= el;
}
return 0;
err:
my_error(ER_DATA_OUT_OF_RANGE, MYF(0), "TIMESTAMP", "INTERVAL");
return 1;
}
/*
Set up function pointers for partition function
@ -1727,6 +1751,8 @@ static void set_up_partition_func_pointers(partition_info *part_info)
part_info->check_constants= check_range_constants;
else if (part_info->part_type == LIST_PARTITION)
part_info->check_constants= check_list_constants;
else if (part_info->part_type == VERSIONING_PARTITION)
part_info->check_constants= check_vers_constants;
else
part_info->check_constants= check_no_constants;
DBUG_VOID_RETURN;
@ -2628,11 +2654,16 @@ char *generate_partition_syntax(THD *thd, partition_info *part_info,
{
Vers_part_info *vers_info= part_info->vers_info;
DBUG_ASSERT(vers_info);
if (vers_info->interval)
if (vers_info->interval.is_set())
{
err+= str.append(STRING_WITH_LEN("INTERVAL "));
err+= str.append_ulonglong(vers_info->interval);
err+= str.append(STRING_WITH_LEN(" SECOND "));
err+= append_interval(&str, vers_info->interval.type,
vers_info->interval.step);
if (create_info) // not SHOW CREATE
{
err+= str.append(STRING_WITH_LEN(" STARTS "));
err+= str.append_ulonglong(vers_info->interval.start);
}
}
if (vers_info->limit)
{
@ -3447,70 +3478,43 @@ int get_partition_id_range_col(partition_info *part_info,
}
int vers_get_partition_id(partition_info *part_info,
uint32 *part_id,
int vers_get_partition_id(partition_info *part_info, uint32 *part_id,
longlong *func_value)
{
DBUG_ENTER("vers_get_partition_id");
DBUG_ASSERT(part_info);
Field *row_end= part_info->part_field_array[STAT_TRX_END];
DBUG_ASSERT(row_end);
TABLE *table= part_info->table;
DBUG_ASSERT(table);
Vers_part_info *vers_info= part_info->vers_info;
DBUG_ASSERT(vers_info);
DBUG_ASSERT(vers_info->initialized());
DBUG_ASSERT(row_end->table == table);
DBUG_ASSERT(table->versioned());
DBUG_ASSERT(table->vers_end_field() == row_end);
if (row_end->is_max() || row_end->is_null())
{
*part_id= vers_info->now_part->id;
}
else // row is historical
{
THD *thd= current_thd;
longlong *range_value= part_info->range_int_array;
uint max_hist_id= part_info->num_parts - 2;
uint min_hist_id= 0, loc_hist_id= vers_info->hist_part->id;
ulong unused;
my_time_t ts;
switch (thd->lex->sql_command)
if (!range_value)
goto done; // fastpath
ts= row_end->get_timestamp(&unused);
if ((loc_hist_id == 0 || range_value[loc_hist_id - 1] < ts) &&
(loc_hist_id == max_hist_id || range_value[loc_hist_id] >= ts))
goto done; // fastpath
while (max_hist_id > min_hist_id)
{
case SQLCOM_DELETE:
if (thd->lex->vers_conditions)
break; // DELETE HISTORY
case SQLCOM_DELETE_MULTI:
case SQLCOM_UPDATE:
case SQLCOM_UPDATE_MULTI:
case SQLCOM_ALTER_TABLE:
mysql_mutex_lock(&table->s->LOCK_rotation);
if (table->s->busy_rotation)
{
table->s->vers_wait_rotation();
part_info->vers_hist_part();
}
loc_hist_id= (max_hist_id + min_hist_id) / 2;
if (range_value[loc_hist_id] <= ts)
min_hist_id= loc_hist_id + 1;
else
{
table->s->busy_rotation= true;
mysql_mutex_unlock(&table->s->LOCK_rotation);
// transaction is not yet pushed to VTQ, so we use now-time
ulong sec_part;
my_time_t end_ts= row_end->table->versioned(VERS_TRX_ID) ?
my_time_t(0) : row_end->get_timestamp(&sec_part);
if (part_info->vers_limit_exceed() || part_info->vers_interval_exceed(end_ts))
{
part_info->vers_part_rotate(thd);
}
mysql_mutex_lock(&table->s->LOCK_rotation);
mysql_cond_broadcast(&table->s->COND_rotation);
table->s->busy_rotation= false;
}
mysql_mutex_unlock(&table->s->LOCK_rotation);
break;
default:
;
max_hist_id= loc_hist_id;
}
*part_id= vers_info->hist_part->id;
loc_hist_id= max_hist_id;
done:
*part_id= (uint32)loc_hist_id;
}
DBUG_PRINT("exit",("partition: %d", *part_id));
DBUG_RETURN(0);
}
@ -4448,8 +4452,7 @@ bool mysql_unpack_partition(THD *thd,
{
bool result= TRUE;
partition_info *part_info;
CHARSET_INFO *old_character_set_client=
thd->variables.character_set_client;
CHARSET_INFO *old_character_set_client= thd->variables.character_set_client;
LEX *old_lex= thd->lex;
LEX lex;
PSI_statement_locker *parent_locker= thd->m_statement_psi;
@ -4464,17 +4467,9 @@ bool mysql_unpack_partition(THD *thd,
if (init_lex_with_single_table(thd, table, &lex))
goto end;
/*
All Items created is put into a free list on the THD object. This list
is used to free all Item objects after completing a query. We don't
want that to happen with the Item tree created as part of the partition
info. This should be attached to the table object and remain so until
the table object is released.
Thus we move away the current list temporarily and start a new list that
we then save in the partition info structure.
*/
*work_part_info_used= FALSE;
lex.part_info= new partition_info();/* Indicates MYSQLparse from this place */
lex.part_info= new partition_info();
lex.part_info->table= table; /* Indicates MYSQLparse from this place */
if (!lex.part_info)
{
mem_alloc_error(sizeof(partition_info));
@ -5205,16 +5200,14 @@ adding and copying partitions, the second after completing the adding
and copying and finally the third line after also dropping the partitions
that are reorganised.
*/
if (*fast_alter_table &&
tab_part_info->part_type == HASH_PARTITION)
if (*fast_alter_table && tab_part_info->part_type == HASH_PARTITION)
{
uint part_no= 0, start_part= 1, start_sec_part= 1;
uint end_part= 0, end_sec_part= 0;
uint upper_2n= tab_part_info->linear_hash_mask + 1;
uint lower_2n= upper_2n >> 1;
bool all_parts= TRUE;
if (tab_part_info->linear_hash_ind &&
num_new_partitions < upper_2n)
if (tab_part_info->linear_hash_ind && num_new_partitions < upper_2n)
{
/*
An analysis of which parts needs reorganisation shows that it is
@ -5314,11 +5307,16 @@ that are reorganised.
{
if (el->type() == partition_element::CURRENT)
{
DBUG_ASSERT(tab_part_info->vers_info && el == tab_part_info->vers_info->now_part);
it.remove();
now_part= el;
}
}
if (*fast_alter_table && tab_part_info->vers_info->interval.is_set())
{
partition_element *hist_part= tab_part_info->vers_info->hist_part;
if (hist_part->range_value <= thd->system_time)
hist_part->part_state= PART_CHANGED;
}
}
List_iterator<partition_element> alt_it(alt_part_info->partitions);
uint part_count= 0;
@ -5405,12 +5403,23 @@ that are reorganised.
if (is_name_in_list(part_elem->partition_name,
alter_info->partition_names))
{
if (tab_part_info->part_type == VERSIONING_PARTITION &&
part_elem->type() == partition_element::CURRENT)
if (tab_part_info->part_type == VERSIONING_PARTITION)
{
DBUG_ASSERT(table && table->s && table->s->table_name.str);
my_error(ER_VERS_WRONG_PARTS, MYF(0), table->s->table_name.str);
goto err;
if (part_elem->type() == partition_element::CURRENT)
{
my_error(ER_VERS_WRONG_PARTS, MYF(0), table->s->table_name.str);
goto err;
}
if (tab_part_info->vers_info->interval.is_set())
{
if (num_parts_found < part_count)
{
my_error(ER_VERS_DROP_PARTITION_INTERVAL, MYF(0));
goto err;
}
tab_part_info->vers_info->interval.start=
(my_time_t)part_elem->range_value;
}
}
/*
Set state to indicate that the partition is to be dropped.
@ -7673,7 +7682,6 @@ static void set_up_range_analysis_info(partition_info *part_info)
switch (part_info->part_type) {
case RANGE_PARTITION:
case LIST_PARTITION:
case VERSIONING_PARTITION:
if (!part_info->column_list)
{
if (part_info->part_expr->get_monotonicity_info() != NON_MONOTONIC)
@ -7960,13 +7968,11 @@ uint32 get_partition_id_cols_range_for_endpoint(partition_info *part_info,
}
int get_part_iter_for_interval_cols_via_map(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array,
uchar *min_value, uchar *max_value,
uint min_len, uint max_len,
uint flags,
PARTITION_ITERATOR *part_iter)
static int get_part_iter_for_interval_cols_via_map(partition_info *part_info,
bool is_subpart, uint32 *store_length_array,
uchar *min_value, uchar *max_value,
uint min_len, uint max_len,
uint flags, PARTITION_ITERATOR *part_iter)
{
bool can_match_multiple_values;
uint32 nparts;
@ -8087,13 +8093,12 @@ int get_part_iter_for_interval_cols_via_map(partition_info *part_info,
@retval -1 All partitions would match (iterator not initialized)
*/
int get_part_iter_for_interval_via_mapping(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array, /* ignored */
uchar *min_value, uchar *max_value,
uint min_len, uint max_len, /* ignored */
uint flags,
PARTITION_ITERATOR *part_iter)
static int get_part_iter_for_interval_via_mapping(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array, /* ignored */
uchar *min_value, uchar *max_value,
uint min_len, uint max_len, /* ignored */
uint flags, PARTITION_ITERATOR *part_iter)
{
Field *field= part_info->part_field_array[0];
uint32 UNINIT_VAR(max_endpoint_val);
@ -8334,13 +8339,12 @@ not_found:
-1 - All partitions would match, iterator not initialized
*/
int get_part_iter_for_interval_via_walking(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array, /* ignored */
uchar *min_value, uchar *max_value,
uint min_len, uint max_len, /* ignored */
uint flags,
PARTITION_ITERATOR *part_iter)
static int get_part_iter_for_interval_via_walking(partition_info *part_info,
bool is_subpart,
uint32 *store_length_array, /* ignored */
uchar *min_value, uchar *max_value,
uint min_len, uint max_len, /* ignored */
uint flags, PARTITION_ITERATOR *part_iter)
{
Field *field;
uint total_parts;

View File

@ -7196,11 +7196,8 @@ static void store_schema_partitions_record(THD *thd, TABLE *schema_table,
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
static int
get_partition_column_description(THD *thd,
partition_info *part_info,
part_elem_value *list_value,
String &tmp_str)
static int get_partition_column_description(THD *thd, partition_info *part_info,
part_elem_value *list_value, String &tmp_str)
{
uint num_elements= part_info->part_field_list.elements;
uint i;
@ -7305,8 +7302,7 @@ static int get_schema_partitions_record(THD *thd, TABLE_LIST *tables,
table->field[7]->store(tmp_res.ptr(), tmp_res.length(), cs);
break;
case VERSIONING_PARTITION:
tmp_res.length(0);
tmp_res.append(STRING_WITH_LEN("SYSTEM_TIME"));
table->field[7]->store(STRING_WITH_LEN("SYSTEM_TIME"), cs);
break;
default:
DBUG_ASSERT(0);
@ -7374,13 +7370,9 @@ static int get_schema_partitions_record(THD *thd, TABLE_LIST *tables,
List_iterator<part_elem_value> list_val_it(part_elem->list_val_list);
part_elem_value *list_value= list_val_it++;
tmp_str.length(0);
if (get_partition_column_description(thd,
part_info,
list_value,
if (get_partition_column_description(thd, part_info, list_value,
tmp_str))
{
DBUG_RETURN(1);
}
table->field[11]->store(tmp_str.ptr(), tmp_str.length(), cs);
}
else
@ -7411,13 +7403,9 @@ static int get_schema_partitions_record(THD *thd, TABLE_LIST *tables,
{
if (part_info->part_field_list.elements > 1U)
tmp_str.append(STRING_WITH_LEN("("));
if (get_partition_column_description(thd,
part_info,
list_value,
if (get_partition_column_description(thd, part_info, list_value,
tmp_str))
{
DBUG_RETURN(1);
}
if (part_info->part_field_list.elements > 1U)
tmp_str.append(")");
}
@ -7435,6 +7423,19 @@ static int get_schema_partitions_record(THD *thd, TABLE_LIST *tables,
table->field[11]->store(tmp_str.ptr(), tmp_str.length(), cs);
table->field[11]->set_notnull();
}
else if (part_info->part_type == VERSIONING_PARTITION)
{
if (part_elem == part_info->vers_info->now_part)
{
table->field[11]->store(STRING_WITH_LEN("CURRENT"), cs);
table->field[11]->set_notnull();
}
else if (part_info->vers_info->interval.is_set())
{
table->field[11]->store_timestamp((my_time_t)part_elem->range_value, 0);
table->field[11]->set_notnull();
}
}
if (part_elem->subpartitions.elements)
{

View File

@ -1740,7 +1740,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
%type <ulong_num>
ulong_num real_ulong_num merge_insert_types
ws_nweights
ws_nweights opt_versioning_interval_start
ws_level_flag_desc ws_level_flag_reverse ws_level_flags
opt_ws_levels ws_level_list ws_level_list_item ws_level_number
ws_level_range ws_level_list_or_range bool
@ -5157,12 +5157,12 @@ have_partitioning:
partition_entry:
PARTITION_SYM
{
LEX *lex= Lex;
if (!lex->part_info)
if (!Lex->part_info)
{
thd->parse_error(ER_PARTITION_ENTRY_ERROR);
MYSQL_YYABORT;
}
DBUG_ASSERT(Lex->part_info->table);
/*
We enter here when opening the frm file to translate
partition info string into part_info data structure.
@ -5844,13 +5844,10 @@ opt_part_option:
opt_versioning_interval:
/* empty */ {}
| INTERVAL_SYM expr interval
| INTERVAL_SYM expr interval opt_versioning_interval_start
{
partition_info *part_info= Lex->part_info;
DBUG_ASSERT(part_info->part_type == VERSIONING_PARTITION);
INTERVAL interval;
if (get_interval_value($2, $3, &interval) ||
part_info->vers_set_interval(interval))
if (part_info->vers_set_interval($2, $3, $4))
{
my_error(ER_PART_WRONG_VALUE, MYF(0),
Lex->create_last_non_select_table->table_name.str,
@ -5860,20 +5857,36 @@ opt_versioning_interval:
}
;
opt_versioning_interval_start:
/* empty */
{
$$= thd->system_time;
}
| remember_tok_start STARTS_SYM ulong_num
{
/* only allowed from mysql_unpack_partition() */
if (!Lex->part_info->table)
{
thd->parse_error(ER_SYNTAX_ERROR, $1);
MYSQL_YYABORT;
}
$$= (ulong)$3;
}
;
opt_versioning_limit:
/* empty */ {}
| LIMIT ulonglong_num
{
partition_info *part_info= Lex->part_info;
if (part_info->vers_set_limit($2))
{
partition_info *part_info= Lex->part_info;
DBUG_ASSERT(part_info->part_type == VERSIONING_PARTITION);
if (part_info->vers_set_limit($2))
{
my_error(ER_PART_WRONG_VALUE, MYF(0),
Lex->create_last_non_select_table->table_name.str,
"LIMIT");
MYSQL_YYABORT;
}
my_error(ER_PART_WRONG_VALUE, MYF(0),
Lex->create_last_non_select_table->table_name.str,
"LIMIT");
MYSQL_YYABORT;
}
}
;
/*

View File

@ -425,9 +425,6 @@ void TABLE_SHARE::destroy()
DBUG_ENTER("TABLE_SHARE::destroy");
DBUG_PRINT("info", ("db: %s table: %s", db.str, table_name.str));
if (versioned)
vers_destroy();
if (ha_share)
{
delete ha_share;
@ -1791,7 +1788,6 @@ int TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
DBUG_PRINT("info", ("Columns with system versioning: [%d, %d]", row_start, row_end));
versioned= VERS_TIMESTAMP;
vers_can_native= plugin_hton(se_plugin)->flags & HTON_NATIVE_SYS_VERSIONING;
vers_init();
row_start_field= row_start;
row_end_field= row_end;
} // if (system_period == NULL)
@ -3522,38 +3518,6 @@ partititon_err:
if (share->no_replicate || !binlog_filter->db_ok(share->db.str))
share->can_do_row_logging= 0; // No row based replication
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (outparam->part_info &&
outparam->part_info->part_type == VERSIONING_PARTITION)
{
Query_arena *backup_stmt_arena_ptr= thd->stmt_arena;
Query_arena backup_arena;
Query_arena part_func_arena(&outparam->mem_root,
Query_arena::STMT_INITIALIZED);
if (!work_part_info_used)
{
thd->set_n_backup_active_arena(&part_func_arena, &backup_arena);
thd->stmt_arena= &part_func_arena;
}
bool err= outparam->part_info->vers_setup_stats(thd, is_create_table);
if (!work_part_info_used)
{
thd->stmt_arena= backup_stmt_arena_ptr;
thd->restore_active_arena(&part_func_arena, &backup_arena);
}
if (err)
{
outparam->file->ha_close();
error= OPEN_FRM_OPEN_ERROR;
error_reported= true;
goto err;
}
}
#endif
/* Increment the opened_tables counter, only when open flags set. */
if (db_stat)
thd->status_var.opened_tables++;
@ -8590,20 +8554,6 @@ LEX_CSTRING *fk_option_name(enum_fk_option opt)
return names + opt;
}
void TABLE_SHARE::vers_destroy()
{
mysql_mutex_destroy(&LOCK_rotation);
mysql_cond_destroy(&COND_rotation);
mysql_rwlock_destroy(&LOCK_stat_serial);
if (stat_trx)
{
for (Vers_min_max_stats** p= stat_trx; *p; ++p)
{
delete *p;
}
}
}
enum TR_table::enabled TR_table::use_transaction_registry= TR_table::MAYBE;
TR_table::TR_table(THD* _thd, bool rw) :

View File

@ -575,8 +575,6 @@ struct TABLE_STATISTICS_CB
bool histograms_are_read;
};
class Vers_min_max_stats;
enum vers_sys_type_t
{
VERS_UNDEFINED= 0,
@ -777,27 +775,6 @@ struct TABLE_SHARE
bool vtmd;
uint16 row_start_field;
uint16 row_end_field;
uint32 hist_part_id;
Vers_min_max_stats** stat_trx;
ulonglong stat_serial; // guards check_range_constants() updates
bool busy_rotation;
mysql_mutex_t LOCK_rotation;
mysql_cond_t COND_rotation;
mysql_rwlock_t LOCK_stat_serial;
void vers_init()
{
hist_part_id= UINT_MAX32;
busy_rotation= false;
stat_trx= NULL;
stat_serial= 0;
mysql_mutex_init(key_TABLE_SHARE_LOCK_rotation, &LOCK_rotation, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_TABLE_SHARE_COND_rotation, &COND_rotation, NULL);
mysql_rwlock_init(key_rwlock_LOCK_stat_serial, &LOCK_stat_serial);
}
void vers_destroy();
Field *vers_start_field()
{
@ -809,12 +786,6 @@ struct TABLE_SHARE
return field[row_end_field];
}
void vers_wait_rotation()
{
while (busy_rotation)
mysql_cond_wait(&COND_rotation, &LOCK_rotation);
}
/**
Cache the checked structure of this table.