1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

MDEV-10139 Support for SEQUENCE objects

- SETVAL(sequence_name, next_value, is_used, round)
- ALTER SEQUENCE, including RESTART WITH

Other things:
- Added handler::extra() option HA_EXTRA_PREPARE_FOR_ALTER_TABLE to signal
  ha_sequence() that it should allow write_row statments.
- ALTER ONLINE TABLE now works with SEQUENCE:s
This commit is contained in:
Monty
2017-05-08 02:44:55 +03:00
parent 1e04ad284c
commit 71fa413c16
31 changed files with 1399 additions and 85 deletions

View File

@ -22,6 +22,7 @@
#include "sql_base.h"
#include "transaction.h"
#include "lock.h"
#include "sql_acl.h"
struct Field_definition
{
@ -162,7 +163,7 @@ void sequence_definition::store_fields(TABLE *table)
/*
Check the sequence fields through seq_fields when create sequence.qq
Check the sequence fields through seq_fields when createing a sequence.
RETURN VALUES
false Success
@ -269,7 +270,6 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *table_list)
Reprepare_observer *save_reprepare_observer;
sequence_definition *seq= lex->create_info.seq_create_info;
bool temporary_table= table_list->table != 0;
MY_BITMAP *save_write_set;
DBUG_ENTER("sequence_insert");
/* If not temporary table */
@ -316,33 +316,7 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *table_list)
}
seq->reserved_until= seq->start;
seq->store_fields(table);
/* Store the sequence values in table share */
table->s->sequence->copy(seq);
/*
Sequence values will be replicated as a statement
like 'create sequence'. So disable binary log temporarily
*/
tmp_disable_binlog(thd);
save_write_set= table->write_set;
table->write_set= &table->s->all_set;
table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE;
error= table->file->ha_write_row(table->record[0]);
table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED;
reenable_binlog(thd);
table->write_set= save_write_set;
if (error)
table->file->print_error(error, MYF(0));
else
{
/*
Sequence structure is up to date and table has one row,
sequence is now usable
*/
table->s->sequence->initialized= SEQUENCE::SEQ_READY_TO_USE;
}
error= seq->write_initial_sequence(table);
trans_commit_stmt(thd);
trans_commit_implicit(thd);
@ -463,7 +437,7 @@ int SEQUENCE::read_stored_values()
DBUG_RETURN(error);
}
read_fields(table);
adjust_values();
adjust_values(reserved_until);
all_values_used= 0;
DBUG_RETURN(0);
@ -474,12 +448,12 @@ int SEQUENCE::read_stored_values()
Adjust values after reading a the stored state
*/
void SEQUENCE::adjust_values()
void SEQUENCE::adjust_values(longlong next_value)
{
offset= 0;
next_free_value= reserved_until;
next_free_value= next_value;
if (!(real_increment= increment))
{
longlong offset= 0;
longlong off, to_add;
/* Use auto_increment_increment and auto_increment_offset */
@ -514,6 +488,72 @@ void SEQUENCE::adjust_values()
}
/**
Write initial sequence information for CREATE and ALTER to sequence table
*/
int sequence_definition::write_initial_sequence(TABLE *table)
{
int error;
THD *thd= table->in_use;
MY_BITMAP *save_write_set;
store_fields(table);
/* Store the sequence values in table share */
table->s->sequence->copy(this);
/*
Sequence values will be replicated as a statement
like 'create sequence'. So disable binary log temporarily
*/
tmp_disable_binlog(thd);
save_write_set= table->write_set;
table->write_set= &table->s->all_set;
table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE;
error= table->file->ha_write_row(table->record[0]);
table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED;
reenable_binlog(thd);
table->write_set= save_write_set;
if (error)
table->file->print_error(error, MYF(0));
else
{
/*
Sequence structure is up to date and table has one row,
sequence is now usable
*/
table->s->sequence->initialized= SEQUENCE::SEQ_READY_TO_USE;
}
return error;
}
/**
Store current sequence values into the sequence table
*/
int sequence_definition::write(TABLE *table)
{
int error;
MY_BITMAP *save_rpl_write_set, *save_write_set;
/* Log a full insert (ok as table is small) */
save_rpl_write_set= table->rpl_write_set;
/* Update table */
save_write_set= table->write_set;
table->rpl_write_set= table->write_set= &table->s->all_set;
store_fields(table);
/* Tell ha_sequence::write_row that we already hold the mutex */
((ha_sequence*) table->file)->sequence_locked= 1;
if ((error= table->file->ha_write_row(table->record[0])))
table->file->print_error(error, MYF(0));
((ha_sequence*) table->file)->sequence_locked= 0;
table->rpl_write_set= save_rpl_write_set;
table->write_set= save_write_set;
return error;
}
/**
Get next value for sequence
@ -546,7 +586,6 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
{
longlong res_value, org_reserved_until, add_to;
bool out_of_values;
MY_BITMAP *save_rpl_write_set, *save_write_set;
DBUG_ENTER("SEQUENCE::next_value");
*error= 0;
@ -554,24 +593,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
lock();
res_value= next_free_value;
/* Increment next_free_value */
if (real_increment > 0)
{
if (next_free_value + real_increment > max_value ||
next_free_value > max_value - real_increment)
next_free_value= max_value + 1;
else
next_free_value+= real_increment;
}
else
{
if (next_free_value + real_increment < min_value ||
next_free_value < min_value - real_increment)
next_free_value= min_value - 1;
else
next_free_value+= real_increment;
}
next_free_value= increment_value(next_free_value);
if ((real_increment > 0 && res_value < reserved_until) ||
(real_increment < 0 && res_value > reserved_until))
@ -621,7 +643,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
goto err;
round++;
reserved_until= real_increment >0 ? min_value : max_value;
adjust_values(); // Fix next_free_value
adjust_values(reserved_until); // Fix next_free_value
/*
We have to do everything again to ensure that the given range was
not empty, which could happen if increment == 0
@ -629,25 +651,11 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
DBUG_RETURN(next_value(table, 1, error));
}
/* Log a full insert (ok as table is small) */
save_rpl_write_set= table->rpl_write_set;
/* Update table */
save_write_set= table->write_set;
table->rpl_write_set= table->write_set= &table->s->all_set;
store_fields(table);
/* Tell ha_sequence::write_row that we already hold the mutex */
((ha_sequence*) table->file)->sequence_locked= 1;
if ((*error= table->file->ha_write_row(table->record[0])))
if ((*error= write(table)))
{
table->file->print_error(*error, MYF(0));
/* Restore original range */
reserved_until= org_reserved_until;
next_free_value= res_value;
}
((ha_sequence*) table->file)->sequence_locked= 0;
table->rpl_write_set= save_rpl_write_set;
table->write_set= save_write_set;
unlock();
DBUG_RETURN(res_value);
@ -681,3 +689,179 @@ void SEQUENCE_LAST_VALUE::set_version(TABLE *table)
{
memcpy(table_version, table->s->tabledef_version.str, MY_UUID_SIZE);
}
/**
Set the next value for sequence
@param in table Sequence table
@param in next_val Next free value
@param in next_round Round for 'next_value' (in cace of cycles)
@param in is_used 1 if next_val is already used
<20>@retval 0 ok, value adjusted
1 value was less than current value or
error when storing value
@comment
A new value is set only if "nextval,next_round" is less than
"next_free_value,round". This is needed as in replication
setvalue() calls may come out to the slave out-of-order.
Storing only the highest value ensures that sequence object will always
contain the highest used value when the slave is promoted to a master.
*/
bool SEQUENCE::set_value(TABLE *table, longlong next_val, ulonglong next_round,
bool is_used)
{
bool error= 1;
bool needs_to_be_stored= 0;
longlong org_reserved_until= reserved_until;
longlong org_next_free_value= next_free_value;
ulonglong org_round= round;
DBUG_ENTER("SEQUENCE::set_value");
lock();
if (is_used)
next_val= increment_value(next_val);
if (round > next_round)
goto end;
if (round == next_round)
{
if (real_increment > 0 ?
next_val < next_free_value :
next_val > next_free_value)
goto end;
if (next_val == next_free_value)
{
error= 0;
goto end;
}
}
else if (cycle == 0)
goto end; // round < next_round && no cycles
else
needs_to_be_stored= 1;
round= next_round;
adjust_values(next_val);
if ((real_increment > 0 ?
next_free_value > reserved_until :
next_free_value < reserved_until) ||
needs_to_be_stored)
{
reserved_until= next_free_value;
if (write(table))
{
reserved_until= org_reserved_until;
next_free_value= org_next_free_value;
round= org_round;
goto end;
}
}
error= 0;
end:
unlock();
DBUG_RETURN(error);
}
bool Sql_cmd_alter_sequence::execute(THD *thd)
{
int error= 0;
int trapped_errors= 0;
LEX *lex= thd->lex;
TABLE_LIST *first_table= lex->query_tables;
TABLE *table;
sequence_definition *new_seq= lex->create_info.seq_create_info;
SEQUENCE *seq;
No_such_table_error_handler no_such_table_handler;
DBUG_ENTER("Sql_cmd_alter_sequence::execute");
if (check_access(thd, ALTER_ACL, first_table->db,
&first_table->grant.privilege,
&first_table->grant.m_internal,
0, 0))
DBUG_RETURN(TRUE); /* purecov: inspected */
if (check_grant(thd, ALTER_ACL, first_table, FALSE, UINT_MAX, FALSE))
DBUG_RETURN(TRUE); /* purecov: inspected */
if (lex->check_exists)
thd->push_internal_handler(&no_such_table_handler);
error= open_and_lock_tables(thd, first_table, FALSE, 0);
if (lex->check_exists)
{
trapped_errors= no_such_table_handler.safely_trapped_errors();
thd->pop_internal_handler();
}
if (error)
{
if (trapped_errors)
{
StringBuffer<FN_REFLEN> tbl_name;
tbl_name.append(first_table->db);
tbl_name.append('.');
tbl_name.append(first_table->table_name);
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_UNKNOWN_SEQUENCES,
ER_THD(thd, ER_UNKNOWN_SEQUENCES),
tbl_name.c_ptr_safe());
my_ok(thd);
DBUG_RETURN(FALSE);
}
DBUG_RETURN(TRUE);
}
table= first_table->table;
seq= table->s->sequence;
new_seq->reserved_until= seq->reserved_until;
/* Copy from old sequence those fields that the user didn't specified */
if (!(new_seq->used_fields & seq_field_used_increment))
new_seq->increment= seq->increment;
if (!(new_seq->used_fields & seq_field_used_min_value))
new_seq->min_value= seq->min_value;
if (!(new_seq->used_fields & seq_field_used_max_value))
new_seq->max_value= seq->max_value;
if (!(new_seq->used_fields & seq_field_used_start))
new_seq->start= seq->start;
if (!(new_seq->used_fields & seq_field_used_cache))
new_seq->cache= seq->cache;
if (!(new_seq->used_fields & seq_field_used_cycle))
new_seq->cycle= seq->cycle;
/* If we should restart from a new value */
if (new_seq->used_fields & seq_field_used_restart)
{
if (!(new_seq->used_fields & seq_field_used_restart_value))
new_seq->restart= new_seq->start;
new_seq->reserved_until= new_seq->restart;
}
/* Let check_and_adjust think all fields are used */
new_seq->used_fields= ~0;
if (new_seq->check_and_adjust())
{
my_error(ER_SEQUENCE_INVALID_DATA, MYF(0),
first_table->db,
first_table->table_name);
error= 1;
goto end;
}
if (!(error= new_seq->write(table)))
{
/* Store the sequence values in table share */
table->s->sequence->copy(new_seq);
}
trans_commit_stmt(thd);
trans_commit_implicit(thd);
if (!error)
my_ok(thd);
end:
close_thread_tables(thd);
DBUG_RETURN(error);
}