1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

SQL, IB: Copy history via CREATE .. SELECT [closes #157, #152]

This commit is contained in:
Aleksey Midenkov
2017-03-24 16:00:42 +03:00
parent 7a525e7e93
commit 67cd92b6f4
29 changed files with 636 additions and 276 deletions

View File

@ -6599,6 +6599,14 @@ bool Vers_parse_info::is_trx_end(const char *name) const
DBUG_ASSERT(name);
return generated_as_row.end && !strcmp(generated_as_row.end->c_ptr(), name);
}
bool Vers_parse_info::is_trx_start(const Create_field &f) const
{
return f.flags & VERS_SYS_START_FLAG;
}
bool Vers_parse_info::is_trx_end(const Create_field &f) const
{
return f.flags & VERS_SYS_END_FLAG;
}
static bool create_string(MEM_ROOT *mem_root, String **s, const char *value)
{
@ -6675,12 +6683,28 @@ bool Vers_parse_info::check_and_fix_implicit(
bool integer_fields=
create_info->db_type->flags & HTON_SUPPORTS_SYS_VERSIONING;
if (vers_force) {
SELECT_LEX &slex= thd->lex->select_lex;
int vers_tables= 0;
bool from_select= slex.item_list.elements ? true : false;
if (from_select)
{
for (TABLE_LIST *table= slex.table_list.first; table; table= table->next_local)
{
if (table->table && table->table->versioned())
vers_tables++;
}
}
// CREATE ... SELECT: if at least one table in SELECT is versioned,
// then created table will be versioned.
if (vers_force || vers_tables > 0)
{
declared_with_system_versioning= true;
create_info->options|= HA_VERSIONED_TABLE;
}
if (!need_to_check())
if (!need_check())
return false;
if (declared_without_system_versioning)
@ -6697,11 +6721,43 @@ bool Vers_parse_info::check_and_fix_implicit(
return true;
}
TABLE *orig_table= NULL;
List_iterator<Create_field> it(alter_info->create_list);
while (Create_field *f= it++)
{
if (is_trx_start(f->field_name) || is_trx_end(f->field_name))
if (is_trx_start(*f))
{
if (!generated_as_row.start) // not inited in CREATE ... SELECT
{
DBUG_ASSERT(vers_tables > 0);
if (orig_table && orig_table != f->field->orig_table)
{
err_different_tables:
my_error(ER_VERS_WRONG_PARAMS, MYF(0), table_name,
"system fields selected from different tables");
return true;
}
orig_table= f->field->orig_table;
generated_as_row.start= new (thd->mem_root) String(f->field_name, system_charset_info);
period_for_system_time.start= generated_as_row.start;
}
continue;
}
if (is_trx_end(*f))
{
if (!generated_as_row.end)
{
DBUG_ASSERT(vers_tables > 0);
if (orig_table && orig_table != f->field->orig_table)
{
goto err_different_tables;
}
orig_table= f->field->orig_table;
generated_as_row.end= new (thd->mem_root) String(f->field_name, system_charset_info);
period_for_system_time.end= generated_as_row.end;
}
continue;
}
if ((f->versioning == Column_definition::VERSIONING_NOT_SET &&
!declared_with_system_versioning) ||
@ -6711,32 +6767,50 @@ bool Vers_parse_info::check_and_fix_implicit(
}
}
if (fix_implicit(thd, alter_info, integer_fields))
if (vers_tables > 0)
{
if (!generated_as_row.start && !generated_as_row.end)
{
declared_with_system_versioning= false;
create_info->options&= ~HA_VERSIONED_TABLE;
return false;
}
if (!generated_as_row.start || !generated_as_row.end)
{
my_error(ER_VERS_WRONG_PARAMS, MYF(0), table_name,
"both ROW START and ROW END system fields required in SELECT resultset");
return true;
}
}
else if (fix_implicit(thd, alter_info, integer_fields))
return true;
int not_set= 0;
int with= 0;
int plain_cols= 0; // column doesn't have WITH or WITHOUT SYSTEM VERSIONING
int vers_cols= 0; // column has WITH SYSTEM VERSIONING
it.rewind();
while (const Create_field *f= it++)
{
if (is_trx_start(f->field_name) || is_trx_end(f->field_name))
if (is_trx_start(*f) || is_trx_end(*f))
continue;
if (f->versioning == Column_definition::VERSIONING_NOT_SET)
not_set++;
plain_cols++;
else if (f->versioning == Column_definition::WITH_VERSIONING)
with++;
vers_cols++;
}
bool table_with_system_versioning=
generated_as_row.start || generated_as_row.end ||
period_for_system_time.start || period_for_system_time.end;
if (!thd->lex->tmp_table() && with == 0 &&
(not_set == 0 || !table_with_system_versioning))
if (!thd->lex->tmp_table() &&
// CREATE from SELECT (Create_fields are not yet added)
!from_select &&
vers_cols == 0 &&
(plain_cols == 0 || !table_with_system_versioning))
{
my_error(ER_VERS_WRONG_PARAMS, MYF(0), table_name,
"versioned fields missing");
"no columns defined with system versioning!");
return true;
}
@ -6762,7 +6836,7 @@ bool Vers_parse_info::check_and_fix_alter(THD *thd, Alter_info *alter_info,
create_info->db_type->flags & HTON_SUPPORTS_SYS_VERSIONING;
const char *table_name= share->table_name.str;
if (!need_to_check() && !share->versioned)
if (!need_check() && !share->versioned)
return false;
if (declared_without_system_versioning)
@ -6934,7 +7008,7 @@ bool Vers_parse_info::check_generated_type(const char *table_name,
List_iterator<Create_field> it(alter_info->create_list);
while (Create_field *f= it++)
{
if (is_trx_start(f->field_name) || is_trx_end(f->field_name))
if (is_trx_start(*f) || is_trx_end(*f))
{
if (integer_fields)
{