mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
New multi-table-update code
New (simpler) internal timestamp handling. More debuging to heap tables. Small cleanups to multi-table-delete false -> 0 and true -> 1 (We should use TRUE and FALSE)
This commit is contained in:
@@ -29,10 +29,12 @@ static bool compare_record(TABLE *table, ulong query_id)
|
||||
{
|
||||
if (!table->blob_fields)
|
||||
return cmp_record(table,1);
|
||||
/* Compare null bits */
|
||||
if (memcmp(table->null_flags,
|
||||
table->null_flags+table->rec_buff_length,
|
||||
table->null_bytes))
|
||||
return 1; // Diff in NULL value
|
||||
/* Compare updated fields */
|
||||
for (Field **ptr=table->field ; *ptr ; ptr++)
|
||||
{
|
||||
if ((*ptr)->query_id == query_id &&
|
||||
@@ -52,10 +54,11 @@ int mysql_update(THD *thd,
|
||||
ha_rows limit,
|
||||
enum enum_duplicates handle_duplicates)
|
||||
{
|
||||
bool using_limit=limit != HA_POS_ERROR, safe_update= thd->options & OPTION_SAFE_UPDATES;
|
||||
bool using_limit=limit != HA_POS_ERROR;
|
||||
bool safe_update= thd->options & OPTION_SAFE_UPDATES;
|
||||
bool used_key_is_modified, transactional_table, log_delayed;
|
||||
int error=0;
|
||||
uint save_time_stamp, used_index, want_privilege;
|
||||
uint used_index, want_privilege;
|
||||
ulong query_id=thd->query_id, timestamp_query_id;
|
||||
key_map old_used_keys;
|
||||
TABLE *table;
|
||||
@@ -67,7 +70,6 @@ int mysql_update(THD *thd,
|
||||
|
||||
if (!(table = open_ltable(thd,table_list,table_list->lock_type)))
|
||||
DBUG_RETURN(-1); /* purecov: inspected */
|
||||
save_time_stamp=table->time_stamp;
|
||||
table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
|
||||
thd->proc_info="init";
|
||||
|
||||
@@ -89,6 +91,7 @@ int mysql_update(THD *thd,
|
||||
{
|
||||
timestamp_query_id=table->timestamp_field->query_id;
|
||||
table->timestamp_field->query_id=thd->query_id-1;
|
||||
table->time_stamp= table->timestamp_field->offset() +1;
|
||||
}
|
||||
|
||||
/* Check the fields we are going to modify */
|
||||
@@ -108,7 +111,6 @@ int mysql_update(THD *thd,
|
||||
table->grant.want_privilege=(SELECT_ACL & ~table->grant.privilege);
|
||||
if (setup_fields(thd,table_list,values,0,0,0))
|
||||
{
|
||||
table->time_stamp=save_time_stamp; // Restore timestamp pointer
|
||||
DBUG_RETURN(-1); /* purecov: inspected */
|
||||
}
|
||||
|
||||
@@ -119,7 +121,6 @@ int mysql_update(THD *thd,
|
||||
(select && select->check_quick(safe_update, limit)) || !limit)
|
||||
{
|
||||
delete select;
|
||||
table->time_stamp=save_time_stamp; // Restore timestamp pointer
|
||||
if (error)
|
||||
{
|
||||
DBUG_RETURN(-1); // Error in where
|
||||
@@ -134,7 +135,6 @@ int mysql_update(THD *thd,
|
||||
if (safe_update && !using_limit)
|
||||
{
|
||||
delete select;
|
||||
table->time_stamp=save_time_stamp;
|
||||
send_error(&thd->net,ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE);
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
@@ -153,8 +153,8 @@ int mysql_update(THD *thd,
|
||||
if (used_key_is_modified || order)
|
||||
{
|
||||
/*
|
||||
** We can't update table directly; We must first search after all
|
||||
** matching rows before updating the table!
|
||||
We can't update table directly; We must first search after all
|
||||
matching rows before updating the table!
|
||||
*/
|
||||
table->file->extra(HA_EXTRA_DONT_USE_CURSOR_TO_UPDATE);
|
||||
IO_CACHE tempfile;
|
||||
@@ -162,7 +162,6 @@ int mysql_update(THD *thd,
|
||||
DISK_BUFFER_SIZE, MYF(MY_WME)))
|
||||
{
|
||||
delete select; /* purecov: inspected */
|
||||
table->time_stamp=save_time_stamp; // Restore timestamp pointer /* purecov: inspected */
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
if (old_used_keys & ((key_map) 1 << used_index))
|
||||
@@ -193,7 +192,6 @@ int mysql_update(THD *thd,
|
||||
== HA_POS_ERROR)
|
||||
{
|
||||
delete select;
|
||||
table->time_stamp=save_time_stamp; // Restore timestamp pointer
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
@@ -247,7 +245,6 @@ int mysql_update(THD *thd,
|
||||
if (error >= 0)
|
||||
{
|
||||
delete select;
|
||||
table->time_stamp=save_time_stamp; // Restore timestamp pointer
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
@@ -297,7 +294,6 @@ int mysql_update(THD *thd,
|
||||
end_read_record(&info);
|
||||
thd->proc_info="end";
|
||||
VOID(table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY));
|
||||
table->time_stamp=save_time_stamp; // Restore auto timestamp pointer
|
||||
transactional_table= table->file->has_transactions();
|
||||
log_delayed= (transactional_table || table->tmp_table);
|
||||
if (updated && (error <= 0 || !transactional_table))
|
||||
@@ -351,324 +347,338 @@ int mysql_update(THD *thd,
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
Update multiple tables from join
|
||||
***************************************************************************/
|
||||
|
||||
multi_update::multi_update(THD *thd_arg, TABLE_LIST *ut, List<Item> &fs,
|
||||
enum enum_duplicates handle_duplicates,
|
||||
uint num)
|
||||
: update_tables (ut), thd(thd_arg), updated(0), found(0), fields(fs),
|
||||
dupl(handle_duplicates), num_of_tables(num), num_fields(0), num_updated(0),
|
||||
error(0), do_update(false)
|
||||
/*
|
||||
Setup multi-update handling and call SELECT to do the join
|
||||
*/
|
||||
|
||||
int mysql_multi_update(THD *thd,
|
||||
TABLE_LIST *table_list,
|
||||
List<Item> *fields,
|
||||
List<Item> *values,
|
||||
COND *conds,
|
||||
ulong options,
|
||||
enum enum_duplicates handle_duplicates)
|
||||
{
|
||||
save_time_stamps = (uint *) sql_calloc (sizeof(uint) * num_of_tables);
|
||||
tmp_tables = (TABLE **)NULL;
|
||||
int counter=0;
|
||||
ulong timestamp_query_id;
|
||||
not_trans_safe=false;
|
||||
for (TABLE_LIST *dt=ut ; dt ; dt=dt->next,counter++)
|
||||
int res;
|
||||
multi_update *result;
|
||||
TABLE_LIST *tl;
|
||||
DBUG_ENTER("mysql_multi_update");
|
||||
|
||||
table_list->grant.want_privilege=(SELECT_ACL & ~table_list->grant.privilege);
|
||||
if ((res=open_and_lock_tables(thd,table_list)))
|
||||
DBUG_RETURN(res);
|
||||
|
||||
thd->select_limit=HA_POS_ERROR;
|
||||
if (setup_fields(thd, table_list, *fields, 1, 0, 0))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
/*
|
||||
Count tables and setup timestamp handling
|
||||
*/
|
||||
for (tl= (TABLE_LIST*) table_list ; tl ; tl=tl->next)
|
||||
{
|
||||
TABLE *table=ut->table;
|
||||
// (void) ut->table->file->extra(HA_EXTRA_NO_KEYREAD);
|
||||
dt->table->used_keys=0;
|
||||
TABLE *table= tl->table;
|
||||
if (table->timestamp_field)
|
||||
{
|
||||
// Don't set timestamp column if this is modified
|
||||
timestamp_query_id=table->timestamp_field->query_id;
|
||||
table->timestamp_field->query_id=thd->query_id-1;
|
||||
if (table->timestamp_field->query_id == thd->query_id)
|
||||
table->time_stamp=0;
|
||||
else
|
||||
table->timestamp_field->query_id=timestamp_query_id;
|
||||
table->time_stamp=0;
|
||||
// Only set timestamp column if this is not modified
|
||||
if (table->timestamp_field->query_id != thd->query_id)
|
||||
table->time_stamp= table->timestamp_field->offset() +1;
|
||||
}
|
||||
save_time_stamps[counter]=table->time_stamp;
|
||||
}
|
||||
error = 1; // In case we do not reach prepare we have to reset timestamps
|
||||
|
||||
if (!(result=new multi_update(thd, table_list, fields, values,
|
||||
handle_duplicates)))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
List<Item> total_list;
|
||||
res= mysql_select(thd,table_list,total_list,
|
||||
conds, (ORDER *) NULL, (ORDER *)NULL, (Item *) NULL,
|
||||
(ORDER *)NULL,
|
||||
options | SELECT_NO_JOIN_CACHE,
|
||||
result);
|
||||
|
||||
end:
|
||||
delete result;
|
||||
DBUG_RETURN(res);
|
||||
}
|
||||
|
||||
int
|
||||
multi_update::prepare(List<Item> &values)
|
||||
|
||||
multi_update::multi_update(THD *thd_arg, TABLE_LIST *table_list,
|
||||
List<Item> *field_list, List<Item> *value_list,
|
||||
enum enum_duplicates handle_duplicates_arg)
|
||||
:all_tables(table_list), thd(thd_arg), tmp_tables(0), updated(0),
|
||||
found(0), fields(field_list), values(value_list), table_count(0),
|
||||
handle_duplicates(handle_duplicates_arg), do_update(1)
|
||||
{}
|
||||
|
||||
|
||||
/*
|
||||
Connect fields with tables and create list of tables that are updated
|
||||
*/
|
||||
|
||||
int multi_update::prepare(List<Item> ¬_used_values)
|
||||
{
|
||||
TABLE_LIST *table_ref;
|
||||
SQL_LIST update;
|
||||
table_map tables_to_update= 0;
|
||||
Item_field *item;
|
||||
List_iterator_fast<Item> field_it(*fields);
|
||||
List_iterator_fast<Item> value_it(*values);
|
||||
uint i, max_fields;
|
||||
DBUG_ENTER("multi_update::prepare");
|
||||
do_update = true;
|
||||
|
||||
thd->count_cuted_fields=1;
|
||||
thd->cuted_fields=0L;
|
||||
thd->proc_info="updating the main table";
|
||||
TABLE_LIST *table_ref;
|
||||
thd->proc_info="updating main table";
|
||||
|
||||
if (thd->options & OPTION_SAFE_UPDATES)
|
||||
{
|
||||
for (table_ref=update_tables; table_ref; table_ref=table_ref->next)
|
||||
{
|
||||
TABLE *table=table_ref->table;
|
||||
if ((thd->options & OPTION_SAFE_UPDATES) && !table->quick_keys)
|
||||
{
|
||||
my_error(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,MYF(0));
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
Here I have to connect fields with tables and only update tables that
|
||||
need to be updated.
|
||||
I calculate num_updated and fill-up table_sequence
|
||||
Set table_list->shared to true or false, depending on whether table is
|
||||
to be updated or not
|
||||
*/
|
||||
while ((item= (Item_field *) field_it++))
|
||||
tables_to_update|= item->used_tables();
|
||||
|
||||
Item_field *item;
|
||||
List_iterator<Item> it(fields);
|
||||
num_fields=fields.elements;
|
||||
field_sequence = (uint *) sql_alloc(sizeof(uint)*num_fields);
|
||||
uint *int_ptr=field_sequence;
|
||||
while ((item= (Item_field *)it++))
|
||||
if (!tables_to_update)
|
||||
{
|
||||
unsigned int counter=0;
|
||||
for (table_ref=update_tables; table_ref;
|
||||
table_ref=table_ref->next, counter++)
|
||||
{
|
||||
if (table_ref->table == item->field->table)
|
||||
{
|
||||
if (!table_ref->shared)
|
||||
{
|
||||
TABLE *tbl=table_ref->table;
|
||||
num_updated++;
|
||||
table_ref->shared=1;
|
||||
if (!not_trans_safe && !table_ref->table->file->has_transactions())
|
||||
not_trans_safe=true;
|
||||
// to be moved if initialize_tables has to be used
|
||||
tbl->no_keyread=1;
|
||||
tbl->used_keys=0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!table_ref)
|
||||
{
|
||||
net_printf(&thd->net, ER_NOT_SUPPORTED_YET, "JOIN SYNTAX WITH MULTI-TABLE UPDATES");
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
else
|
||||
*int_ptr++=counter;
|
||||
}
|
||||
if (!num_updated--)
|
||||
{
|
||||
net_printf(&thd->net, ER_NOT_SUPPORTED_YET, "SET CLAUSE MUST CONTAIN TABLE.FIELD REFERENCE");
|
||||
my_error(ER_NOT_SUPPORTED_YET, MYF(0),
|
||||
"You didn't specify any tables to UPDATE");
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
/*
|
||||
Here, I have to allocate the array of temporary tables
|
||||
I have to treat a case of num_updated=1 differently in send_data() method.
|
||||
We have to check values after setup_tables to get used_keys right in
|
||||
reference tables
|
||||
*/
|
||||
if (num_updated)
|
||||
|
||||
if (setup_fields(thd, all_tables, *values, 1,0,0))
|
||||
DBUG_RETURN(1);
|
||||
|
||||
/*
|
||||
Save tables beeing updated in update_tables
|
||||
update_table->shared is position for table
|
||||
Don't use key read on tables that are updated
|
||||
*/
|
||||
|
||||
update.empty();
|
||||
for (table_ref= all_tables; table_ref; table_ref=table_ref->next)
|
||||
{
|
||||
tmp_tables = (TABLE **) sql_calloc(sizeof(TABLE *) * num_updated);
|
||||
infos = (COPY_INFO *) sql_calloc(sizeof(COPY_INFO) * num_updated);
|
||||
fields_by_tables = (List_item **)sql_calloc(sizeof(List_item *) * (num_updated + 1));
|
||||
unsigned int counter;
|
||||
List<Item> *temp_fields;
|
||||
for (table_ref=update_tables, counter = 0; table_ref; table_ref=table_ref->next)
|
||||
TABLE *table=table_ref->table;
|
||||
if (tables_to_update & table->map)
|
||||
{
|
||||
if (!table_ref->shared)
|
||||
continue;
|
||||
// Here we have to add row offset as an additional field ...
|
||||
if (!(temp_fields = (List_item *)sql_calloc(sizeof(List_item))))
|
||||
{
|
||||
error = 1; // A proper error message is due here
|
||||
TABLE_LIST *tl= (TABLE_LIST*) thd->memdup((char*) table_ref,
|
||||
sizeof(*tl));
|
||||
if (!tl)
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
temp_fields->empty();
|
||||
it.rewind(); int_ptr=field_sequence;
|
||||
while ((item= (Item_field *)it++))
|
||||
{
|
||||
if (*int_ptr++ == counter)
|
||||
temp_fields->push_back(item);
|
||||
}
|
||||
if (counter)
|
||||
{
|
||||
Field_string offset(table_ref->table->file->ref_length,false,"offset",table_ref->table,true);
|
||||
temp_fields->push_front(new Item_field(((Field *)&offset)));
|
||||
// Here I make tmp tables
|
||||
int cnt=counter-1;
|
||||
TMP_TABLE_PARAM tmp_table_param;
|
||||
bzero((char*) &tmp_table_param,sizeof(tmp_table_param));
|
||||
tmp_table_param.field_count=temp_fields->elements;
|
||||
if (!(tmp_tables[cnt]=create_tmp_table(thd, &tmp_table_param,
|
||||
*temp_fields,
|
||||
(ORDER*) 0, 1, 0, 0,
|
||||
TMP_TABLE_ALL_COLUMNS)))
|
||||
{
|
||||
error = 1; // A proper error message is due here
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
|
||||
tmp_tables[cnt]->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
|
||||
infos[cnt].handle_duplicates=DUP_IGNORE;
|
||||
temp_fields->pop(); // because we shall use those for values only ...
|
||||
}
|
||||
fields_by_tables[counter]=temp_fields;
|
||||
counter++;
|
||||
update.link_in_list((byte*) tl, (byte**) &tl->next);
|
||||
tl->shared= table_count++;
|
||||
table->no_keyread=1;
|
||||
table->used_keys=0;
|
||||
table->pos_in_table_list= tl;
|
||||
}
|
||||
}
|
||||
table_count= update.elements;
|
||||
update_tables= (TABLE_LIST*) update.first;
|
||||
|
||||
tmp_tables = (TABLE **) thd->calloc(sizeof(TABLE *) * table_count);
|
||||
tmp_table_param = (TMP_TABLE_PARAM*) thd->calloc(sizeof(TMP_TABLE_PARAM) *
|
||||
table_count);
|
||||
fields_for_table= (List_item **) thd->alloc(sizeof(List_item *) *
|
||||
table_count);
|
||||
values_for_table= (List_item **) thd->alloc(sizeof(List_item *) *
|
||||
table_count);
|
||||
if (thd->fatal_error)
|
||||
DBUG_RETURN(1);
|
||||
for (i=0 ; i < table_count ; i++)
|
||||
{
|
||||
fields_for_table[i]= new List_item;
|
||||
values_for_table[i]= new List_item;
|
||||
}
|
||||
if (thd->fatal_error)
|
||||
DBUG_RETURN(1);
|
||||
|
||||
/* Split fields into fields_for_table[] and values_by_table[] */
|
||||
|
||||
field_it.rewind();
|
||||
while ((item= (Item_field *) field_it++))
|
||||
{
|
||||
Item *value= value_it++;
|
||||
uint offset= item->field->table->pos_in_table_list->shared;
|
||||
fields_for_table[offset]->push_back(item);
|
||||
values_for_table[offset]->push_back(value);
|
||||
}
|
||||
if (thd->fatal_error)
|
||||
DBUG_RETURN(1);
|
||||
|
||||
/* Allocate copy fields */
|
||||
max_fields=0;
|
||||
for (i=0 ; i < table_count ; i++)
|
||||
set_if_bigger(max_fields, fields_for_table[i]->elements);
|
||||
copy_field= new Copy_field[max_fields];
|
||||
init_ftfuncs(thd,1);
|
||||
error = 0; // Timestamps do not need to be restored, so far ...
|
||||
DBUG_RETURN(0);
|
||||
DBUG_RETURN(thd->fatal_error != 0);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
/*
|
||||
Store first used table in main_table as this should be updated first
|
||||
This is because we know that no row in this table will be read twice.
|
||||
|
||||
Create temporary tables to store changed values for all other tables
|
||||
that are updated.
|
||||
*/
|
||||
|
||||
bool
|
||||
multi_update::initialize_tables(JOIN *join)
|
||||
{
|
||||
#ifdef NOT_YET
|
||||
We skip it as it only makes a mess ...........
|
||||
TABLE_LIST *walk;
|
||||
table_map tables_to_update_from=0;
|
||||
for (walk= update_tables ; walk ; walk=walk->next)
|
||||
tables_to_update_from|= walk->table->map;
|
||||
|
||||
walk= update_tables;
|
||||
for (JOIN_TAB *tab=join->join_tab, *end=join->join_tab+join->tables;
|
||||
tab < end;
|
||||
tab++)
|
||||
TABLE_LIST *table_ref;
|
||||
DBUG_ENTER("initialize_tables");
|
||||
|
||||
if ((thd->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
|
||||
DBUG_RETURN(1);
|
||||
main_table=join->join_tab->table;
|
||||
trans_safe= transactional_tables= main_table->file->has_transactions();
|
||||
log_delayed= trans_safe || main_table->tmp_table != NO_TMP_TABLE;
|
||||
|
||||
/* Create a temporary table for all tables after except main table */
|
||||
for (table_ref= update_tables; table_ref; table_ref=table_ref->next)
|
||||
{
|
||||
if (tab->table->map & tables_to_update_from)
|
||||
TABLE *table=table_ref->table;
|
||||
if (table != main_table)
|
||||
{
|
||||
// We are going to update from this table
|
||||
TABLE *tbl=walk->table=tab->table;
|
||||
/* Don't use KEYREAD optimization on this table */
|
||||
tbl->no_keyread=1;
|
||||
walk=walk->next;
|
||||
uint cnt= table_ref->shared;
|
||||
ORDER group;
|
||||
List<Item> temp_fields= *fields_for_table[cnt];
|
||||
TMP_TABLE_PARAM *tmp_param= tmp_table_param+cnt;
|
||||
|
||||
/*
|
||||
Create a temporary table to store all fields that are changed for this
|
||||
table. The first field in the temporary table is a pointer to the
|
||||
original row so that we can find and update it
|
||||
*/
|
||||
|
||||
/* ok to be on stack as this is not referenced outside of this func */
|
||||
Field_string offset(table->file->ref_length, 0, "offset",
|
||||
table, 1);
|
||||
if (temp_fields.push_front(new Item_field(((Field *) &offset))))
|
||||
DBUG_RETURN(1);
|
||||
|
||||
/* Make an unique key over the first field to avoid duplicated updates */
|
||||
bzero((char*) &group, sizeof(group));
|
||||
group.asc= 1;
|
||||
group.item= (Item**) temp_fields.head_ref();
|
||||
|
||||
tmp_param->quick_group=1;
|
||||
tmp_param->field_count=temp_fields.elements;
|
||||
tmp_param->group_parts=1;
|
||||
tmp_param->group_length= table->file->ref_length;
|
||||
if (!(tmp_tables[cnt]=create_tmp_table(thd,
|
||||
tmp_param,
|
||||
temp_fields,
|
||||
(ORDER*) &group, 0, 0, 0,
|
||||
TMP_TABLE_ALL_COLUMNS)))
|
||||
DBUG_RETURN(1);
|
||||
tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
multi_update::~multi_update()
|
||||
{
|
||||
int counter = 0;
|
||||
for (table_being_updated=update_tables ;
|
||||
table_being_updated ;
|
||||
counter++, table_being_updated=table_being_updated->next)
|
||||
{
|
||||
TABLE *table=table_being_updated->table;
|
||||
table->no_keyread=0;
|
||||
if (error)
|
||||
table->time_stamp=save_time_stamps[counter];
|
||||
}
|
||||
TABLE_LIST *table;
|
||||
for (table= update_tables ; table; table= table->next)
|
||||
table->table->no_keyread=0;
|
||||
|
||||
if (tmp_tables)
|
||||
for (uint counter = 0; counter < num_updated; counter++)
|
||||
{
|
||||
for (uint counter = 0; counter < table_count; counter++)
|
||||
if (tmp_tables[counter])
|
||||
free_tmp_table(thd,tmp_tables[counter]);
|
||||
}
|
||||
if (copy_field)
|
||||
delete [] copy_field;
|
||||
thd->count_cuted_fields=0; // Restore this setting
|
||||
if (!trans_safe)
|
||||
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
|
||||
}
|
||||
|
||||
|
||||
bool multi_update::send_data(List<Item> &values)
|
||||
bool multi_update::send_data(List<Item> ¬_used_values)
|
||||
{
|
||||
List<Item> real_values(values);
|
||||
for (uint counter = 0; counter < fields.elements; counter++)
|
||||
real_values.pop();
|
||||
// We have skipped fields ....
|
||||
if (!num_updated)
|
||||
TABLE_LIST *cur_table;
|
||||
DBUG_ENTER("multi_update::send_data");
|
||||
|
||||
found++;
|
||||
for (cur_table= update_tables; cur_table ; cur_table= cur_table->next)
|
||||
{
|
||||
for (table_being_updated=update_tables ;
|
||||
table_being_updated ;
|
||||
table_being_updated=table_being_updated->next)
|
||||
TABLE *table= cur_table->table;
|
||||
/* Check if we are using outer join and we didn't find the row */
|
||||
if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
|
||||
continue;
|
||||
|
||||
uint offset= cur_table->shared;
|
||||
table->file->position(table->record[0]);
|
||||
if (table == main_table)
|
||||
{
|
||||
if (!table_being_updated->shared)
|
||||
continue;
|
||||
TABLE *table=table_being_updated->table;
|
||||
/* Check if we are using outer join and we didn't find the row */
|
||||
if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
|
||||
return 0;
|
||||
table->file->position(table->record[0]);
|
||||
// Only one table being updated receives a completely different treatment
|
||||
table->status|= STATUS_UPDATED;
|
||||
store_record(table,1);
|
||||
if (fill_record(fields,real_values))
|
||||
return 1;
|
||||
found++;
|
||||
if (/* compare_record(table, query_id) && */
|
||||
!(error=table->file->update_row(table->record[1], table->record[0])))
|
||||
updated++;
|
||||
table->file->extra(HA_EXTRA_NO_CACHE);
|
||||
return error;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int secure_counter= -1;
|
||||
for (table_being_updated=update_tables ;
|
||||
table_being_updated ;
|
||||
table_being_updated=table_being_updated->next, secure_counter++)
|
||||
{
|
||||
if (!table_being_updated->shared)
|
||||
continue;
|
||||
|
||||
TABLE *table=table_being_updated->table;
|
||||
/* Check if we are using outer join and we didn't find the row */
|
||||
if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
|
||||
continue;
|
||||
table->file->position(table->record[0]);
|
||||
Item *item;
|
||||
List_iterator<Item> it(real_values);
|
||||
List <Item> values_by_table;
|
||||
uint *int_ptr=field_sequence;
|
||||
while ((item= (Item *)it++))
|
||||
store_record(table,1);
|
||||
if (fill_record(*fields_for_table[offset], *values_for_table[offset]))
|
||||
DBUG_RETURN(1);
|
||||
if (compare_record(table, thd->query_id))
|
||||
{
|
||||
if (*int_ptr++ == (uint) (secure_counter + 1))
|
||||
values_by_table.push_back(item);
|
||||
}
|
||||
// Here I am breaking values as per each table
|
||||
if (secure_counter < 0)
|
||||
{
|
||||
table->status|= STATUS_UPDATED;
|
||||
store_record(table,1);
|
||||
if (fill_record(*fields_by_tables[0],values_by_table))
|
||||
return 1;
|
||||
found++;
|
||||
if (/*compare_record(table, query_id) && */
|
||||
!(error=table->file->update_row(table->record[1], table->record[0])))
|
||||
int error;
|
||||
if (!updated++)
|
||||
{
|
||||
updated++;
|
||||
table->file->extra(HA_EXTRA_NO_CACHE);
|
||||
/*
|
||||
Inform the main table that we are going to update the table even
|
||||
while we may be scanning it. This will flush the read cache
|
||||
if it's used.
|
||||
*/
|
||||
main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
|
||||
}
|
||||
else
|
||||
if ((error=table->file->update_row(table->record[1],
|
||||
table->record[0])))
|
||||
{
|
||||
table->file->print_error(error,MYF(0));
|
||||
if (!error) error=1;
|
||||
return 1;
|
||||
updated--;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
}
|
||||
else
|
||||
}
|
||||
else
|
||||
{
|
||||
int error;
|
||||
TABLE *tmp_table= tmp_tables[offset];
|
||||
fill_record(tmp_table->field+1, *values_for_table[offset]);
|
||||
|
||||
/* Store pointer to row */
|
||||
memcpy((char*) tmp_table->field[0]->ptr,
|
||||
(char*) table->file->ref, table->file->ref_length);
|
||||
/* Write row, ignoring duplicated updates to a row */
|
||||
if ((error= tmp_table->file->write_row(tmp_table->record[0])) &&
|
||||
(error != HA_ERR_FOUND_DUPP_KEY &&
|
||||
error != HA_ERR_FOUND_DUPP_UNIQUE))
|
||||
{
|
||||
// Here we insert into each temporary table
|
||||
values_by_table.push_front(new Item_string((char*) table->file->ref,
|
||||
table->file->ref_length));
|
||||
fill_record(tmp_tables[secure_counter]->field,values_by_table);
|
||||
error= write_record(tmp_tables[secure_counter],
|
||||
&(infos[secure_counter]));
|
||||
if (error)
|
||||
if (create_myisam_from_heap(table, tmp_table_param + offset, error, 1))
|
||||
{
|
||||
error=-1;
|
||||
return 1;
|
||||
do_update=0;
|
||||
DBUG_RETURN(1); // Not a table_is_full error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
void multi_update::send_error(uint errcode,const char *err)
|
||||
{
|
||||
/* First send error what ever it is ... */
|
||||
::send_error(&thd->net,errcode,err);
|
||||
|
||||
/* reset used flags */
|
||||
// update_tables->table->no_keyread=0;
|
||||
|
||||
/* If nothing updated return */
|
||||
if (!updated)
|
||||
return;
|
||||
@@ -676,97 +686,124 @@ void multi_update::send_error(uint errcode,const char *err)
|
||||
/* Something already updated so we have to invalidate cache */
|
||||
query_cache_invalidate3(thd, update_tables, 1);
|
||||
|
||||
/* Below can happen when thread is killed early ... */
|
||||
if (!table_being_updated)
|
||||
table_being_updated=update_tables;
|
||||
|
||||
/*
|
||||
If rows from the first table only has been updated and it is transactional,
|
||||
just do rollback.
|
||||
The same if all tables are transactional, regardless of where we are.
|
||||
In all other cases do attempt updates ...
|
||||
If all tables that has been updated are trans safe then just do rollback.
|
||||
If not attempt to do remaining updates.
|
||||
*/
|
||||
if ((table_being_updated->table->file->has_transactions() &&
|
||||
table_being_updated == update_tables) || !not_trans_safe)
|
||||
|
||||
if (trans_safe)
|
||||
ha_rollback_stmt(thd);
|
||||
else if (do_update && num_updated)
|
||||
VOID(do_updates(true));
|
||||
else if (do_update && table_count > 1)
|
||||
{
|
||||
/* Add warning here */
|
||||
VOID(do_updates(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int multi_update::do_updates (bool from_send_error)
|
||||
int multi_update::do_updates(bool from_send_error)
|
||||
{
|
||||
int local_error= 0, counter= 0;
|
||||
TABLE_LIST *cur_table;
|
||||
int local_error;
|
||||
ha_rows org_updated;
|
||||
TABLE *table;
|
||||
DBUG_ENTER("do_updates");
|
||||
|
||||
if (from_send_error)
|
||||
do_update= 0; // Don't retry this function
|
||||
for (cur_table= update_tables; cur_table ; cur_table= cur_table->next)
|
||||
{
|
||||
/* Found out table number for 'table_being_updated' */
|
||||
for (TABLE_LIST *aux=update_tables;
|
||||
aux != table_being_updated;
|
||||
aux=aux->next)
|
||||
counter++;
|
||||
}
|
||||
else
|
||||
table_being_updated = update_tables;
|
||||
table = cur_table->table;
|
||||
if (table == main_table)
|
||||
continue; // Already updated
|
||||
|
||||
do_update = false;
|
||||
for (table_being_updated=table_being_updated->next;
|
||||
table_being_updated ;
|
||||
table_being_updated=table_being_updated->next, counter++)
|
||||
{
|
||||
if (!table_being_updated->shared)
|
||||
continue;
|
||||
org_updated= updated;
|
||||
byte *ref_pos;
|
||||
TABLE *tmp_table= tmp_tables[cur_table->shared];
|
||||
tmp_table->file->extra(HA_EXTRA_CACHE); // Change to read cache
|
||||
table->file->extra(HA_EXTRA_NO_CACHE);
|
||||
|
||||
TABLE *table = table_being_updated->table;
|
||||
TABLE *tmp_table=tmp_tables[counter];
|
||||
if (tmp_table->file->extra(HA_EXTRA_NO_CACHE))
|
||||
/*
|
||||
Setup copy functions to copy fields from temporary table
|
||||
*/
|
||||
List_iterator_fast<Item> field_it(*fields_for_table[cur_table->shared]);
|
||||
Field **field= tmp_table->field+1; // Skip row pointer
|
||||
Copy_field *copy_field_ptr= copy_field, *copy_field_end;
|
||||
for ( ; *field ; field++)
|
||||
{
|
||||
local_error=1;
|
||||
break;
|
||||
Item_field *item= (Item_field* ) field_it++;
|
||||
(copy_field_ptr++)->set(item->field, *field, 0);
|
||||
}
|
||||
List<Item> list;
|
||||
Field **ptr=tmp_table->field,*field;
|
||||
// This is supposed to be something like insert_fields
|
||||
thd->used_tables|=tmp_table->map;
|
||||
while ((field = *ptr++))
|
||||
copy_field_end=copy_field_ptr;
|
||||
|
||||
if ((local_error = tmp_table->file->rnd_init(1)))
|
||||
goto err;
|
||||
|
||||
ref_pos= (byte*) tmp_table->field[0]->ptr;
|
||||
for (;;)
|
||||
{
|
||||
list.push_back((Item *)new Item_field(field));
|
||||
if (field->query_id == thd->query_id)
|
||||
thd->dupp_field=field;
|
||||
field->query_id=thd->query_id;
|
||||
tmp_table->used_keys&=field->part_of_key;
|
||||
}
|
||||
tmp_table->used_fields=tmp_table->fields;
|
||||
local_error=0;
|
||||
list.pop(); // we get position some other way ...
|
||||
local_error = tmp_table->file->rnd_init(1);
|
||||
if (local_error)
|
||||
return local_error;
|
||||
while (!(local_error=tmp_table->file->rnd_next(tmp_table->record[0])) &&
|
||||
(!thd->killed || from_send_error || not_trans_safe))
|
||||
{
|
||||
found++;
|
||||
local_error= table->file->rnd_pos(table->record[0],
|
||||
(byte*) (*(tmp_table->field))->ptr);
|
||||
if (local_error)
|
||||
return local_error;
|
||||
table->status|= STATUS_UPDATED;
|
||||
store_record(table,1);
|
||||
local_error= (fill_record(*fields_by_tables[counter + 1],list) ||
|
||||
/* compare_record(table, query_id) || */
|
||||
table->file->update_row(table->record[1],table->record[0]));
|
||||
if (local_error)
|
||||
if (thd->killed && trans_safe)
|
||||
goto err;
|
||||
if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
|
||||
{
|
||||
table->file->print_error(local_error,MYF(0));
|
||||
break;
|
||||
if (local_error == HA_ERR_END_OF_FILE)
|
||||
break;
|
||||
if (local_error == HA_ERR_RECORD_DELETED)
|
||||
continue; // May happen on dup key
|
||||
goto err;
|
||||
}
|
||||
else
|
||||
found++;
|
||||
if ((local_error= table->file->rnd_pos(table->record[0], ref_pos)))
|
||||
goto err;
|
||||
table->status|= STATUS_UPDATED;
|
||||
store_record(table,1);
|
||||
|
||||
/* Copy data from temporary table to current table */
|
||||
for (copy_field_ptr=copy_field;
|
||||
copy_field_ptr != copy_field_end;
|
||||
copy_field_ptr++)
|
||||
(*copy_field_ptr->do_copy)(copy_field_ptr);
|
||||
|
||||
if (compare_record(table, thd->query_id))
|
||||
{
|
||||
if ((local_error=table->file->update_row(table->record[1],
|
||||
table->record[0])))
|
||||
{
|
||||
if (local_error != HA_ERR_FOUND_DUPP_KEY ||
|
||||
handle_duplicates != DUP_IGNORE)
|
||||
goto err;
|
||||
}
|
||||
updated++;
|
||||
if (table->tmp_table != NO_TMP_TABLE)
|
||||
log_delayed= 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (updated != org_updated)
|
||||
{
|
||||
if (table->tmp_table != NO_TMP_TABLE)
|
||||
log_delayed= 1; // Tmp tables forces delay log
|
||||
if (table->file->has_transactions())
|
||||
log_delayed= transactional_tables= 1;
|
||||
else
|
||||
trans_safe= 0; // Can't do safe rollback
|
||||
}
|
||||
if (local_error == HA_ERR_END_OF_FILE)
|
||||
local_error = 0;
|
||||
}
|
||||
return local_error;
|
||||
DBUG_RETURN(0);
|
||||
|
||||
err:
|
||||
if (!from_send_error)
|
||||
table->file->print_error(local_error,MYF(0));
|
||||
|
||||
if (updated != org_updated)
|
||||
{
|
||||
if (table->tmp_table != NO_TMP_TABLE)
|
||||
log_delayed= 1;
|
||||
if (table->file->has_transactions())
|
||||
log_delayed= transactional_tables= 1;
|
||||
else
|
||||
trans_safe= 0;
|
||||
}
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
|
||||
@@ -774,60 +811,57 @@ int multi_update::do_updates (bool from_send_error)
|
||||
|
||||
bool multi_update::send_eof()
|
||||
{
|
||||
thd->proc_info="updating the reference tables";
|
||||
char buff[80];
|
||||
thd->proc_info="updating reference tables";
|
||||
|
||||
/* Does updates for the last n - 1 tables, returns 0 if ok */
|
||||
int local_error = (num_updated) ? do_updates(false) : 0;
|
||||
|
||||
/* reset used flags */
|
||||
#ifndef NOT_USED
|
||||
update_tables->table->no_keyread=0;
|
||||
#endif
|
||||
if (local_error == -1)
|
||||
local_error= 0;
|
||||
int local_error = (table_count) ? do_updates(0) : 0;
|
||||
thd->proc_info= "end";
|
||||
if (local_error)
|
||||
send_error(local_error, "An error occured in multi-table update");
|
||||
|
||||
/*
|
||||
Write the SQL statement to the binlog if we updated
|
||||
rows and we succeeded, or also in an error case when there
|
||||
was a non-transaction-safe table involved, since
|
||||
modifications in it cannot be rolled back.
|
||||
rows and we succeeded or if we updated some non
|
||||
transacational tables
|
||||
*/
|
||||
|
||||
if (updated || not_trans_safe)
|
||||
if (updated && (local_error <= 0 || !trans_safe))
|
||||
{
|
||||
mysql_update_log.write(thd,thd->query,thd->query_length);
|
||||
Query_log_event qinfo(thd, thd->query, thd->query_length, 0);
|
||||
|
||||
/*
|
||||
mysql_bin_log is not open if binlogging or replication
|
||||
is not used
|
||||
*/
|
||||
|
||||
if (mysql_bin_log.is_open() && mysql_bin_log.write(&qinfo) &&
|
||||
!not_trans_safe)
|
||||
local_error=1; /* Log write failed: roll back the SQL statement */
|
||||
|
||||
/* Commit or rollback the current SQL statement */
|
||||
VOID(ha_autocommit_or_rollback(thd, local_error > 0));
|
||||
}
|
||||
else
|
||||
local_error= 0; // this can happen only if it is end of file error
|
||||
if (!local_error) // if the above log write did not fail ...
|
||||
{
|
||||
char buff[80];
|
||||
sprintf(buff,ER(ER_UPDATE_INFO), (long) found, (long) updated,
|
||||
(long) thd->cuted_fields);
|
||||
if (updated)
|
||||
if (mysql_bin_log.is_open())
|
||||
{
|
||||
query_cache_invalidate3(thd, update_tables, 1);
|
||||
Query_log_event qinfo(thd, thd->query, thd->query_length,
|
||||
log_delayed);
|
||||
if (mysql_bin_log.write(&qinfo) && trans_safe)
|
||||
local_error=1; // Rollback update
|
||||
}
|
||||
::send_ok(&thd->net,
|
||||
(thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated,
|
||||
thd->insert_id_used ? thd->insert_id() : 0L,buff);
|
||||
if (!log_delayed)
|
||||
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
|
||||
}
|
||||
thd->count_cuted_fields=0;
|
||||
|
||||
if (transactional_tables)
|
||||
{
|
||||
if (ha_autocommit_or_rollback(thd, local_error >= 0))
|
||||
local_error=1;
|
||||
}
|
||||
|
||||
if (local_error > 0) // if the above log write did not fail ...
|
||||
{
|
||||
/* Safety: If we haven't got an error before (should not happen) */
|
||||
my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
|
||||
MYF(0));
|
||||
::send_error(&thd->net);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
sprintf(buff,ER(ER_UPDATE_INFO), (long) found, (long) updated,
|
||||
(long) thd->cuted_fields);
|
||||
if (updated)
|
||||
{
|
||||
query_cache_invalidate3(thd, update_tables, 1);
|
||||
}
|
||||
::send_ok(&thd->net,
|
||||
(thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated,
|
||||
thd->insert_id_used ? thd->insert_id() : 0L,buff);
|
||||
return 0;
|
||||
}
|
||||
|
Reference in New Issue
Block a user