1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

MDEV-15413 Unexpected errors upon CREATE TABLE .. WITH SYSTEM VERSIONING AS SELECT ...

numerous fixes for CREATE ... SELECT with system versioning:
In CREATE ... SELECT the table is created based on the result set,
field properties do not count. That is
* field invisibility is *not* copied over
* AS ROW START/END is *not* copied over
* the history is *not* copied over
* system row_start/row_end fields can *not* be created from the SELECT part
This commit is contained in:
Sergei Golubchik
2018-04-02 19:35:27 +02:00
parent 72dd813f7e
commit 9bd3af97df
10 changed files with 114 additions and 191 deletions

View File

@ -6887,7 +6887,7 @@ static bool vers_create_sys_field(THD *thd, const char *field_name,
const LString_i Vers_parse_info::default_start= "row_start";
const LString_i Vers_parse_info::default_end= "row_end";
bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info, int *added)
bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info)
{
// If user specified some of these he must specify the others too. Do nothing.
if (*this)
@ -6903,8 +6903,6 @@ bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info, int *added)
{
return true;
}
if (added)
*added+= 2;
return false;
}
@ -6932,47 +6930,15 @@ bool Table_scope_and_contents_source_st::vers_native(THD *thd) const
}
bool Table_scope_and_contents_source_st::vers_fix_system_fields(
THD *thd,
Alter_info *alter_info,
const TABLE_LIST &create_table,
const TABLE_LIST *select_tables,
List<Item> *items,
bool *versioned_write)
THD *thd, Alter_info *alter_info, const TABLE_LIST &create_table,
bool create_select)
{
DBUG_ASSERT(!(alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING));
int vers_tables= 0;
if (select_tables)
{
for (const TABLE_LIST *table= select_tables; table; table= table->next_local)
{
if (table->table && table->table->versioned())
vers_tables++;
}
}
DBUG_EXECUTE_IF("sysvers_force", if (!tmp_table()) {
alter_info->flags|= ALTER_ADD_SYSTEM_VERSIONING;
options|= HA_VERSIONED_TABLE; });
// Possibly override default storage engine to match one used in source table.
if (vers_tables && alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING &&
!(used_fields & HA_CREATE_USED_ENGINE))
{
List_iterator_fast<Create_field> it(alter_info->create_list);
while (Create_field *f= it++)
{
if (vers_info.is_start(*f) || vers_info.is_end(*f))
{
if (f->field)
{
db_type= f->field->orig_table->file->ht;
}
break;
}
}
}
if (!vers_info.need_check(alter_info))
return false;
@ -6991,15 +6957,7 @@ bool Table_scope_and_contents_source_st::vers_fix_system_fields(
return true;
}
if (vers_tables)
{
DBUG_ASSERT(options & HA_VERSIONED_TABLE);
DBUG_ASSERT(versioned_write);
*versioned_write= true;
}
List_iterator<Create_field> it(alter_info->create_list);
bool explicit_declared= vers_info.as_row.start || vers_info.as_row.end;
while (Create_field *f= it++)
{
if ((f->versioning == Column_definition::VERSIONING_NOT_SET &&
@ -7008,81 +6966,11 @@ bool Table_scope_and_contents_source_st::vers_fix_system_fields(
{
f->flags|= VERS_UPDATE_UNVERSIONED_FLAG;
}
/* Assign selected implicit fields when no explicit fields */
if (!vers_tables || explicit_declared)
continue;
DBUG_ASSERT(versioned_write);
if (vers_info.is_start(*f) && vers_info.default_start.streq(f->field_name))
{
if (vers_info.as_row.start)
it.remove();
else
{
vers_info.set_start(f->field_name);
*versioned_write= false;
}
continue;
}
if (vers_info.is_end(*f) && vers_info.default_end.streq(f->field_name))
{
if (vers_info.as_row.end)
it.remove();
else
{
vers_info.set_end(f->field_name);
*versioned_write= false;
}
continue;
}
} // while (Create_field *f= it++)
/* Assign selected system fields to explicit system fields if any */
if (vers_tables)
{
it.rewind();
while (Create_field *f= it++)
{
uint flags_left= VERS_SYSTEM_FIELD;
if (flags_left && (vers_info.is_start(*f) || vers_info.is_end(*f)) && !f->field)
{
uint sys_flag= f->flags & flags_left;
flags_left-= sys_flag;
List_iterator_fast<Item> it2(*items);
while (Item *item= it2++)
{
if (item->type() != Item::FIELD_ITEM)
continue;
Field *fld= static_cast<Item_field *>(item)->field;
DBUG_ASSERT(fld);
if ((fld->flags & sys_flag) &&
lex_string_syseq(&f->field_name, &fld->field_name))
{
f->field= fld;
*versioned_write= false;
}
} // while (item)
} // if (flags_left ...
} // while (Create_field *f= it++)
} // if (vers_tables)
int added= 0;
if (vers_info.fix_implicit(thd, alter_info, &added))
if (vers_info.fix_implicit(thd, alter_info))
return true;
DBUG_ASSERT(added >= 0);
if (vers_tables)
{
DBUG_ASSERT(items);
while (added--)
{
Item_default_value *item= new (thd->mem_root)
Item_default_value(thd, thd->lex->current_context());
items->push_back(item, thd->mem_root);
}
}
int plain_cols= 0; // columns don't have WITH or WITHOUT SYSTEM VERSIONING
int vers_cols= 0; // columns have WITH SYSTEM VERSIONING
it.rewind();
@ -7099,25 +6987,27 @@ bool Table_scope_and_contents_source_st::vers_fix_system_fields(
if (!thd->lex->tmp_table() &&
// CREATE from SELECT (Create_fields are not yet added)
!select_tables &&
vers_cols == 0 &&
(plain_cols == 0 || !vers_info))
!create_select && vers_cols == 0 && (plain_cols == 0 || !vers_info))
{
my_error(ER_VERS_TABLE_MUST_HAVE_COLUMNS, MYF(0),
create_table.table_name.str);
return true;
}
if (vers_info.check_conditions(create_table.table_name.str, create_table.db))
return true;
bool native= vers_native(thd);
if (vers_info.check_sys_fields(create_table.table_name.str, alter_info, native))
return true;
return false;
}
bool Table_scope_and_contents_source_st::vers_check_system_fields(
THD *thd, Alter_info *alter_info, const TABLE_LIST &create_table)
{
if (!(options & HA_VERSIONED_TABLE))
return false;
return vers_info.check_sys_fields(create_table.table_name, create_table.db,
alter_info, vers_native(thd));
}
bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info,
HA_CREATE_INFO *create_info, TABLE *table)
{
@ -7221,10 +7111,8 @@ bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info,
if (alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)
{
if (check_conditions(table_name, share->db))
return true;
bool native= create_info->vers_native(thd);
if (check_sys_fields(table_name, alter_info, native))
if (check_sys_fields(table_name, share->db, alter_info, native))
return true;
}
@ -7298,19 +7186,19 @@ bool Vers_parse_info::need_check(const Alter_info *alter_info) const
alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING || *this;
}
bool Vers_parse_info::check_conditions(const char *table_name,
bool Vers_parse_info::check_conditions(const LString &table_name,
const LString &db) const
{
if (!as_row.start || !as_row.end)
{
my_error(ER_MISSING, MYF(0), table_name,
my_error(ER_MISSING, MYF(0), table_name.str,
as_row.start ? "AS ROW END" : "AS ROW START");
return true;
}
if (!system_time.start || !system_time.end)
{
my_error(ER_MISSING, MYF(0), table_name, "PERIOD FOR SYSTEM_TIME");
my_error(ER_MISSING, MYF(0), table_name.str, "PERIOD FOR SYSTEM_TIME");
return true;
}
@ -7329,9 +7217,12 @@ bool Vers_parse_info::check_conditions(const char *table_name,
return false;
}
bool Vers_parse_info::check_sys_fields(const char *table_name,
bool Vers_parse_info::check_sys_fields(const LString &table_name, const LString &db,
Alter_info *alter_info, bool native)
{
if (check_conditions(table_name, db))
return true;
List_iterator<Create_field> it(alter_info->create_list);
uint found_flag= 0;
while (Create_field *f= it++)
@ -7390,14 +7281,14 @@ bool Vers_parse_info::check_sys_fields(const char *table_name,
check_unit == VERS_TIMESTAMP ?
"TIMESTAMP(6)" :
"BIGINT(20) UNSIGNED",
table_name);
table_name.str);
return true;
}
check_unit= f_check_unit;
}
}
my_error(ER_MISSING, MYF(0), table_name, found_flag & VERS_SYS_START_FLAG ?
my_error(ER_MISSING, MYF(0), table_name.str, found_flag & VERS_SYS_START_FLAG ?
"ROW END" : found_flag ? "ROW START" : "ROW START/END");
return true;
}