mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Merge bk-internal.mysql.com:/home/bk/mysql-5.1
into bodhi.local:/opt/local/work/mysql-5.1-runtime-merge
This commit is contained in:
@ -17,6 +17,44 @@
|
||||
|
||||
/* Insert of records */
|
||||
|
||||
/*
|
||||
INSERT DELAYED
|
||||
|
||||
Insert delayed is distinguished from a normal insert by lock_type ==
|
||||
TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
|
||||
"delayed" table (delayed_get_table()), but falls back to
|
||||
open_and_lock_tables() on error and proceeds as normal insert then.
|
||||
|
||||
Opening a "delayed" table means to find a delayed insert thread that
|
||||
has the table open already. If this fails, a new thread is created and
|
||||
waited for to open and lock the table.
|
||||
|
||||
If accessing the thread succeeded, in
|
||||
delayed_insert::get_local_table() the table of the thread is copied
|
||||
for local use. A copy is required because the normal insert logic
|
||||
works on a target table, but the other threads table object must not
|
||||
be used. The insert logic uses the record buffer to create a record.
|
||||
And the delayed insert thread uses the record buffer to pass the
|
||||
record to the table handler. So there must be different objects. Also
|
||||
the copied table is not included in the lock, so that the statement
|
||||
can proceed even if the real table cannot be accessed at this moment.
|
||||
|
||||
Copying a table object is not a trivial operation. Besides the TABLE
|
||||
object there are the field pointer array, the field objects and the
|
||||
record buffer. After copying the field objects, their pointers into
|
||||
the record must be "moved" to point to the new record buffer.
|
||||
|
||||
After this setup the normal insert logic is used. Only that for
|
||||
delayed inserts write_delayed() is called instead of write_record().
|
||||
It inserts the rows into a queue and signals the delayed insert thread
|
||||
instead of writing directly to the table.
|
||||
|
||||
The delayed insert thread awakes from the signal. It locks the table,
|
||||
inserts the rows from the queue, unlocks the table, and waits for the
|
||||
next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
|
||||
|
||||
*/
|
||||
|
||||
#include "mysql_priv.h"
|
||||
#include "sp_head.h"
|
||||
#include "sql_trigger.h"
|
||||
@ -26,8 +64,8 @@
|
||||
static int check_null_fields(THD *thd,TABLE *entry);
|
||||
#ifndef EMBEDDED_LIBRARY
|
||||
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
|
||||
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
|
||||
char *query, uint query_length, bool log_on);
|
||||
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
|
||||
LEX_STRING query, bool ignore, bool log_on);
|
||||
static void end_delayed_insert(THD *thd);
|
||||
pthread_handler_t handle_delayed_insert(void *arg);
|
||||
static void unlink_blobs(register TABLE *table);
|
||||
@ -407,7 +445,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
table->next_number_field=table->found_next_number_field;
|
||||
|
||||
error=0;
|
||||
id=0;
|
||||
thd->proc_info="update";
|
||||
if (duplic != DUP_ERROR || ignore)
|
||||
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
|
||||
@ -512,22 +549,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
#ifndef EMBEDDED_LIBRARY
|
||||
if (lock_type == TL_WRITE_DELAYED)
|
||||
{
|
||||
error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
|
||||
LEX_STRING const st_query = { query, thd->query_length };
|
||||
error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
|
||||
query=0;
|
||||
}
|
||||
else
|
||||
#endif
|
||||
error=write_record(thd, table ,&info);
|
||||
/*
|
||||
If auto_increment values are used, save the first one
|
||||
for LAST_INSERT_ID() and for the update log.
|
||||
We can't use insert_id() as we don't want to touch the
|
||||
last_insert_id_used flag.
|
||||
*/
|
||||
if (! id && thd->insert_id_used)
|
||||
{ // Get auto increment value
|
||||
id= thd->last_insert_id;
|
||||
}
|
||||
if (error)
|
||||
break;
|
||||
thd->row_count++;
|
||||
@ -535,6 +563,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
|
||||
free_underlaid_joins(thd, &thd->lex->select_lex);
|
||||
joins_freed= TRUE;
|
||||
table->file->ha_release_auto_increment();
|
||||
|
||||
/*
|
||||
Now all rows are inserted. Time to update logs and sends response to
|
||||
@ -545,7 +574,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
id=0; // No auto_increment id
|
||||
info.copied=values_list.elements;
|
||||
end_delayed_insert(thd);
|
||||
}
|
||||
@ -559,11 +587,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
table->file->print_error(my_errno,MYF(0));
|
||||
error=1;
|
||||
}
|
||||
if (id && values_list.elements != 1)
|
||||
thd->insert_id(id); // For update log
|
||||
else if (table->next_number_field && info.copied)
|
||||
id=table->next_number_field->val_int(); // Return auto_increment value
|
||||
|
||||
transactional_table= table->file->has_transactions();
|
||||
|
||||
if ((changed= (info.copied || info.deleted || info.updated)))
|
||||
@ -612,21 +635,30 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
}
|
||||
}
|
||||
thd->proc_info="end";
|
||||
/*
|
||||
We'll report to the client this id:
|
||||
- if the table contains an autoincrement column and we successfully
|
||||
inserted an autogenerated value, the autogenerated value.
|
||||
- if the table contains no autoincrement column and LAST_INSERT_ID(X) was
|
||||
called, X.
|
||||
- if the table contains an autoincrement column, and some rows were
|
||||
inserted, the id of the last "inserted" row (if IGNORE, that value may not
|
||||
have been really inserted but ignored).
|
||||
*/
|
||||
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
|
||||
thd->first_successful_insert_id_in_cur_stmt :
|
||||
(thd->arg_of_last_insert_id_function ?
|
||||
thd->first_successful_insert_id_in_prev_stmt :
|
||||
((table->next_number_field && info.copied) ?
|
||||
table->next_number_field->val_int() : 0));
|
||||
table->next_number_field=0;
|
||||
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
|
||||
thd->next_insert_id=0; // Reset this if wrongly used
|
||||
if (duplic != DUP_ERROR || ignore)
|
||||
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
||||
if (duplic == DUP_REPLACE &&
|
||||
(!table->triggers || !table->triggers->has_delete_triggers()))
|
||||
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
|
||||
|
||||
/* Reset value of LAST_INSERT_ID if no rows where inserted */
|
||||
if (!info.copied && thd->insert_id_used)
|
||||
{
|
||||
thd->insert_id(0);
|
||||
id=0;
|
||||
}
|
||||
if (error)
|
||||
goto abort;
|
||||
if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
|
||||
@ -648,8 +680,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
||||
thd->row_count_func= info.copied+info.deleted+info.updated;
|
||||
::send_ok(thd, (ulong) thd->row_count_func, id, buff);
|
||||
}
|
||||
if (table != NULL)
|
||||
table->file->release_auto_increment();
|
||||
thd->abort_on_warning= 0;
|
||||
DBUG_RETURN(FALSE);
|
||||
|
||||
@ -659,7 +689,7 @@ abort:
|
||||
end_delayed_insert(thd);
|
||||
#endif
|
||||
if (table != NULL)
|
||||
table->file->release_auto_increment();
|
||||
table->file->ha_release_auto_increment();
|
||||
if (!joins_freed)
|
||||
free_underlaid_joins(thd, &thd->lex->select_lex);
|
||||
thd->abort_on_warning= 0;
|
||||
@ -968,6 +998,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
int error, trg_error= 0;
|
||||
char *key=0;
|
||||
MY_BITMAP *save_read_set, *save_write_set;
|
||||
ulonglong prev_insert_id= table->file->next_insert_id;
|
||||
ulonglong insert_id_for_cur_row= 0;
|
||||
DBUG_ENTER("write_record");
|
||||
|
||||
info->records++;
|
||||
@ -980,10 +1012,20 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
while ((error=table->file->ha_write_row(table->record[0])))
|
||||
{
|
||||
uint key_nr;
|
||||
/*
|
||||
If we do more than one iteration of this loop, from the second one the
|
||||
row will have an explicit value in the autoinc field, which was set at
|
||||
the first call of handler::update_auto_increment(). So we must save
|
||||
the autogenerated value to avoid thd->insert_id_for_cur_row to become
|
||||
0.
|
||||
*/
|
||||
if (table->file->insert_id_for_cur_row > 0)
|
||||
insert_id_for_cur_row= table->file->insert_id_for_cur_row;
|
||||
else
|
||||
table->file->insert_id_for_cur_row= insert_id_for_cur_row;
|
||||
bool is_duplicate_key_error;
|
||||
if (table->file->is_fatal_error(error, HA_CHECK_DUP))
|
||||
goto err;
|
||||
table->file->restore_auto_increment(); // it's too early here! BUG#20188
|
||||
is_duplicate_key_error= table->file->is_fatal_error(error, 0);
|
||||
if (!is_duplicate_key_error)
|
||||
{
|
||||
@ -1011,7 +1053,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
if (info->handle_duplicates == DUP_REPLACE &&
|
||||
table->next_number_field &&
|
||||
key_nr == table->s->next_number_index &&
|
||||
table->file->auto_increment_column_changed)
|
||||
(insert_id_for_cur_row > 0))
|
||||
goto err;
|
||||
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
|
||||
{
|
||||
@ -1070,22 +1112,29 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
if (res == VIEW_CHECK_ERROR)
|
||||
goto before_trg_err;
|
||||
|
||||
if (thd->clear_next_insert_id)
|
||||
{
|
||||
/* Reset auto-increment cacheing if we do an update */
|
||||
thd->clear_next_insert_id= 0;
|
||||
thd->next_insert_id= 0;
|
||||
}
|
||||
if ((error=table->file->ha_update_row(table->record[1],
|
||||
table->record[0])))
|
||||
{
|
||||
if (info->ignore &&
|
||||
!table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
|
||||
{
|
||||
table->file->restore_auto_increment(prev_insert_id);
|
||||
goto ok_or_after_trg_err;
|
||||
}
|
||||
goto err;
|
||||
}
|
||||
info->updated++;
|
||||
|
||||
/*
|
||||
If ON DUP KEY UPDATE updates a row instead of inserting one, and
|
||||
there is an auto_increment column, then SELECT LAST_INSERT_ID()
|
||||
returns the id of the updated row:
|
||||
*/
|
||||
if (table->next_number_field)
|
||||
{
|
||||
longlong field_val= table->next_number_field->val_int();
|
||||
thd->record_first_successful_insert_id_in_cur_stmt(field_val);
|
||||
table->file->adjust_next_insert_id_after_explicit_value(field_val);
|
||||
}
|
||||
trg_error= (table->triggers &&
|
||||
table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
|
||||
TRG_ACTION_AFTER, TRUE));
|
||||
@ -1114,16 +1163,11 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
|
||||
(!table->triggers || !table->triggers->has_delete_triggers()))
|
||||
{
|
||||
if (thd->clear_next_insert_id)
|
||||
{
|
||||
/* Reset auto-increment cacheing if we do an update */
|
||||
thd->clear_next_insert_id= 0;
|
||||
thd->next_insert_id= 0;
|
||||
}
|
||||
if ((error=table->file->ha_update_row(table->record[1],
|
||||
table->record[0])))
|
||||
goto err;
|
||||
info->deleted++;
|
||||
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
|
||||
/*
|
||||
Since we pretend that we have done insert we should call
|
||||
its after triggers.
|
||||
@ -1152,6 +1196,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
}
|
||||
}
|
||||
}
|
||||
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
|
||||
/*
|
||||
Restore column maps if they where replaced during an duplicate key
|
||||
problem.
|
||||
@ -1165,12 +1210,13 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
|
||||
if (!info->ignore ||
|
||||
table->file->is_fatal_error(error, HA_CHECK_DUP))
|
||||
goto err;
|
||||
table->file->restore_auto_increment();
|
||||
table->file->restore_auto_increment(prev_insert_id);
|
||||
goto ok_or_after_trg_err;
|
||||
}
|
||||
|
||||
after_trg_n_copied_inc:
|
||||
info->copied++;
|
||||
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
|
||||
trg_error= (table->triggers &&
|
||||
table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
|
||||
TRG_ACTION_AFTER, TRUE));
|
||||
@ -1190,6 +1236,7 @@ err:
|
||||
table->file->print_error(error,MYF(0));
|
||||
|
||||
before_trg_err:
|
||||
table->file->restore_auto_increment(prev_insert_id);
|
||||
if (key)
|
||||
my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
|
||||
table->column_bitmaps_set(save_read_set, save_write_set);
|
||||
@ -1252,14 +1299,20 @@ public:
|
||||
char *record;
|
||||
enum_duplicates dup;
|
||||
time_t start_time;
|
||||
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
|
||||
ulonglong last_insert_id;
|
||||
bool query_start_used, ignore, log_query;
|
||||
bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
|
||||
ulonglong first_successful_insert_id_in_prev_stmt;
|
||||
timestamp_auto_set_type timestamp_field_type;
|
||||
LEX_STRING query;
|
||||
|
||||
delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
|
||||
:record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
|
||||
delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
|
||||
bool ignore_arg, bool log_query_arg)
|
||||
: record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
|
||||
query(query_arg)
|
||||
{}
|
||||
~delayed_row()
|
||||
{
|
||||
x_free(query.str);
|
||||
x_free(record);
|
||||
}
|
||||
};
|
||||
@ -1267,9 +1320,6 @@ public:
|
||||
|
||||
class delayed_insert :public ilink {
|
||||
uint locks_in_memory;
|
||||
char *query;
|
||||
ulong query_length;
|
||||
ulong query_allocated;
|
||||
public:
|
||||
THD thd;
|
||||
TABLE *table;
|
||||
@ -1283,7 +1333,7 @@ public:
|
||||
TABLE_LIST table_list; // Argument
|
||||
|
||||
delayed_insert()
|
||||
:locks_in_memory(0), query(0), query_length(0), query_allocated(0),
|
||||
:locks_in_memory(0),
|
||||
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
|
||||
group_count(0)
|
||||
{
|
||||
@ -1294,6 +1344,11 @@ public:
|
||||
thd.command=COM_DELAYED_INSERT;
|
||||
thd.lex->current_select= 0; // for my_message_sql
|
||||
thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
|
||||
/*
|
||||
Statement-based replication of INSERT DELAYED has problems with RAND()
|
||||
and user vars, so in mixed mode we go to row-based.
|
||||
*/
|
||||
thd.set_current_stmt_binlog_row_based_if_mixed();
|
||||
|
||||
bzero((char*) &thd.net, sizeof(thd.net)); // Safety
|
||||
bzero((char*) &table_list, sizeof(table_list)); // Safety
|
||||
@ -1309,7 +1364,6 @@ public:
|
||||
}
|
||||
~delayed_insert()
|
||||
{
|
||||
my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
|
||||
/* The following is not really needed, but just for safety */
|
||||
delayed_row *row;
|
||||
while ((row=rows.get()))
|
||||
@ -1329,25 +1383,6 @@ public:
|
||||
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
|
||||
}
|
||||
|
||||
int set_query(char const *q, ulong qlen) {
|
||||
if (q && qlen > 0)
|
||||
{
|
||||
if (query_allocated < qlen + 1)
|
||||
{
|
||||
ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
|
||||
query= my_realloc(query, qlen + 1, MYF(flags));
|
||||
if (query == 0)
|
||||
return HA_ERR_OUT_OF_MEM;
|
||||
query_allocated= qlen;
|
||||
}
|
||||
query_length= qlen;
|
||||
memcpy(query, q, qlen + 1);
|
||||
}
|
||||
else
|
||||
query_length= 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* The following is for checking when we can delete ourselves */
|
||||
inline void lock()
|
||||
{
|
||||
@ -1520,6 +1555,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
|
||||
TABLE *copy;
|
||||
TABLE_SHARE *share= table->s;
|
||||
byte *bitmap;
|
||||
DBUG_ENTER("delayed_insert::get_local_table");
|
||||
|
||||
/* First request insert thread to get a lock */
|
||||
status=1;
|
||||
@ -1543,6 +1579,13 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Allocate memory for the TABLE object, the field pointers array, and
|
||||
one record buffer of reclength size. Normally a table has three
|
||||
record buffers of rec_buff_length size, which includes alignment
|
||||
bytes. Since the table copy is used for creating one record only,
|
||||
the other record buffers and alignment are unnecessary.
|
||||
*/
|
||||
client_thd->proc_info="allocating local table";
|
||||
copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
|
||||
(share->fields+1)*sizeof(Field**)+
|
||||
@ -1550,23 +1593,28 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
|
||||
share->column_bitmap_size*2);
|
||||
if (!copy)
|
||||
goto error;
|
||||
|
||||
/* Copy the TABLE object. */
|
||||
*copy= *table;
|
||||
|
||||
/* We don't need to change the file handler here */
|
||||
field= copy->field= (Field**) (copy+1);
|
||||
bitmap= (byte*) (field+share->fields+1);
|
||||
copy->record[0]= (bitmap+ share->column_bitmap_size*2);
|
||||
memcpy((char*) copy->record[0],(char*) table->record[0],share->reclength);
|
||||
|
||||
/* Make a copy of all fields */
|
||||
|
||||
adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]);
|
||||
|
||||
found_next_number_field=table->found_next_number_field;
|
||||
for (org_field=table->field ; *org_field ; org_field++,field++)
|
||||
/* Assign the pointers for the field pointers array and the record. */
|
||||
field= copy->field= (Field**) (copy + 1);
|
||||
bitmap= (byte*) (field + share->fields + 1);
|
||||
copy->record[0]= (bitmap + share->column_bitmap_size * 2);
|
||||
memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
|
||||
/*
|
||||
Make a copy of all fields.
|
||||
The copied fields need to point into the copied record. This is done
|
||||
by copying the field objects with their old pointer values and then
|
||||
"move" the pointers by the distance between the original and copied
|
||||
records. That way we preserve the relative positions in the records.
|
||||
*/
|
||||
adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
|
||||
found_next_number_field= table->found_next_number_field;
|
||||
for (org_field= table->field; *org_field; org_field++, field++)
|
||||
{
|
||||
if (!(*field= (*org_field)->new_field(client_thd->mem_root,copy)))
|
||||
return 0;
|
||||
if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
|
||||
DBUG_RETURN(0);
|
||||
(*field)->orig_table= copy; // Remove connection
|
||||
(*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
|
||||
if (*org_field == found_next_number_field)
|
||||
@ -1599,26 +1647,27 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
|
||||
copy->read_set= ©->def_read_set;
|
||||
copy->write_set= ©->def_write_set;
|
||||
|
||||
return copy;
|
||||
DBUG_RETURN(copy);
|
||||
|
||||
/* Got fatal error */
|
||||
error:
|
||||
tables_in_use--;
|
||||
status=1;
|
||||
pthread_cond_signal(&cond); // Inform thread about abort
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/* Put a question in queue */
|
||||
|
||||
static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
|
||||
bool ignore, char *query, uint query_length,
|
||||
bool log_on)
|
||||
static int
|
||||
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
|
||||
LEX_STRING query, bool ignore, bool log_on)
|
||||
{
|
||||
delayed_row *row=0;
|
||||
delayed_row *row;
|
||||
delayed_insert *di=thd->di;
|
||||
DBUG_ENTER("write_delayed");
|
||||
DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
|
||||
|
||||
thd->proc_info="waiting for handler insert";
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
@ -1626,18 +1675,44 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
|
||||
pthread_cond_wait(&di->cond_client,&di->mutex);
|
||||
thd->proc_info="storing row into queue";
|
||||
|
||||
if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
|
||||
if (thd->killed)
|
||||
goto err;
|
||||
|
||||
/*
|
||||
Take a copy of the query string, if there is any. The string will
|
||||
be free'ed when the row is destroyed. If there is no query string,
|
||||
we don't do anything special.
|
||||
*/
|
||||
|
||||
if (query.str)
|
||||
{
|
||||
char *str;
|
||||
if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
|
||||
goto err;
|
||||
query.str= str;
|
||||
}
|
||||
row= new delayed_row(query, duplic, ignore, log_on);
|
||||
if (row == NULL)
|
||||
{
|
||||
my_free(query.str, MYF(MY_WME));
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
|
||||
goto err;
|
||||
memcpy(row->record, table->record[0], table->s->reclength);
|
||||
di->set_query(query, query_length);
|
||||
row->start_time= thd->start_time;
|
||||
row->query_start_used= thd->query_start_used;
|
||||
row->last_insert_id_used= thd->last_insert_id_used;
|
||||
row->insert_id_used= thd->insert_id_used;
|
||||
row->last_insert_id= thd->last_insert_id;
|
||||
/*
|
||||
those are for the binlog: LAST_INSERT_ID() has been evaluated at this
|
||||
time, so record does not need it, but statement-based binlogging of the
|
||||
INSERT will need when the row is actually inserted.
|
||||
As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
|
||||
*/
|
||||
row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
|
||||
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
|
||||
row->first_successful_insert_id_in_prev_stmt=
|
||||
thd->first_successful_insert_id_in_prev_stmt;
|
||||
row->timestamp_field_type= table->timestamp_field_type;
|
||||
|
||||
di->rows.push_back(row);
|
||||
@ -1891,6 +1966,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
|
||||
MYSQL_LOCK *lock=thd->lock;
|
||||
thd->lock=0;
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
di->table->file->ha_release_auto_increment();
|
||||
mysql_unlock_tables(thd, lock);
|
||||
di->group_count=0;
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
@ -1990,7 +2066,7 @@ bool delayed_insert::handle_inserts(void)
|
||||
if (thd.killed || table->s->version != refresh_version)
|
||||
{
|
||||
thd.killed= THD::KILL_CONNECTION;
|
||||
max_rows= ~(ulong)0; // Do as much as possible
|
||||
max_rows= ULONG_MAX; // Do as much as possible
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2002,13 +2078,6 @@ bool delayed_insert::handle_inserts(void)
|
||||
table->file->extra(HA_EXTRA_WRITE_CACHE);
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
/* Reset auto-increment cacheing */
|
||||
if (thd.clear_next_insert_id)
|
||||
{
|
||||
thd.next_insert_id= 0;
|
||||
thd.clear_next_insert_id= 0;
|
||||
}
|
||||
|
||||
while ((row=rows.get()))
|
||||
{
|
||||
stacked_inserts--;
|
||||
@ -2017,9 +2086,12 @@ bool delayed_insert::handle_inserts(void)
|
||||
|
||||
thd.start_time=row->start_time;
|
||||
thd.query_start_used=row->query_start_used;
|
||||
thd.last_insert_id=row->last_insert_id;
|
||||
thd.last_insert_id_used=row->last_insert_id_used;
|
||||
thd.insert_id_used=row->insert_id_used;
|
||||
/* for the binlog, forget auto_increment ids generated by previous rows */
|
||||
// thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
|
||||
thd.first_successful_insert_id_in_prev_stmt=
|
||||
row->first_successful_insert_id_in_prev_stmt;
|
||||
thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
|
||||
row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
|
||||
table->timestamp_field_type= row->timestamp_field_type;
|
||||
|
||||
info.ignore= row->ignore;
|
||||
@ -2044,6 +2116,7 @@ bool delayed_insert::handle_inserts(void)
|
||||
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
|
||||
row->log_query = 0;
|
||||
}
|
||||
|
||||
if (using_ignore)
|
||||
{
|
||||
using_ignore=0;
|
||||
@ -2054,6 +2127,22 @@ bool delayed_insert::handle_inserts(void)
|
||||
using_opt_replace= 0;
|
||||
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
|
||||
}
|
||||
|
||||
if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
|
||||
{
|
||||
/*
|
||||
If the query has several rows to insert, only the first row will come
|
||||
here. In row-based binlogging, this means that the first row will be
|
||||
written to binlog as one Table_map event and one Rows event (due to an
|
||||
event flush done in binlog_query()), then all other rows of this query
|
||||
will be binlogged together as one single Table_map event and one
|
||||
single Rows event.
|
||||
*/
|
||||
thd.binlog_query(THD::ROW_QUERY_TYPE,
|
||||
row->query.str, row->query.length,
|
||||
FALSE, FALSE);
|
||||
}
|
||||
|
||||
if (table->s->blob_fields)
|
||||
free_delayed_insert_blobs(table);
|
||||
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
|
||||
@ -2100,13 +2189,25 @@ bool delayed_insert::handle_inserts(void)
|
||||
pthread_cond_broadcast(&cond_client); // If waiting clients
|
||||
}
|
||||
}
|
||||
|
||||
thd.proc_info=0;
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
/* After releasing the mutex, to prevent deadlocks. */
|
||||
if (mysql_bin_log.is_open())
|
||||
thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
|
||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||
/*
|
||||
We need to flush the pending event when using row-based
|
||||
replication since the flushing normally done in binlog_query() is
|
||||
not done last in the statement: for delayed inserts, the insert
|
||||
statement is logged *before* all rows are inserted.
|
||||
|
||||
We can flush the pending event without checking the thd->lock
|
||||
since the delayed insert *thread* is not inside a stored function
|
||||
or trigger.
|
||||
|
||||
TODO: Move the logging to last in the sequence of rows.
|
||||
*/
|
||||
if (thd.current_stmt_binlog_row_based)
|
||||
thd.binlog_flush_pending_rows_event(TRUE);
|
||||
#endif /* HAVE_ROW_BASED_REPLICATION */
|
||||
|
||||
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
|
||||
{ // This shouldn't happen
|
||||
@ -2194,7 +2295,7 @@ select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
|
||||
enum_duplicates duplic,
|
||||
bool ignore_check_option_errors)
|
||||
:table_list(table_list_par), table(table_par), fields(fields_par),
|
||||
last_insert_id(0),
|
||||
autoinc_value_of_last_inserted_row(0),
|
||||
insert_into_view(table_list_par && table_list_par->view != 0)
|
||||
{
|
||||
bzero((char*) &info,sizeof(info));
|
||||
@ -2409,16 +2510,21 @@ bool select_insert::send_data(List<Item> &values)
|
||||
}
|
||||
if (table->next_number_field)
|
||||
{
|
||||
/*
|
||||
If no value has been autogenerated so far, we need to remember the
|
||||
value we just saw, we may need to send it to client in the end.
|
||||
*/
|
||||
if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
|
||||
autoinc_value_of_last_inserted_row=
|
||||
table->next_number_field->val_int();
|
||||
/*
|
||||
Clear auto-increment field for the next record, if triggers are used
|
||||
we will clear it twice, but this should be cheap.
|
||||
*/
|
||||
table->next_number_field->reset();
|
||||
if (!last_insert_id && thd->insert_id_used)
|
||||
last_insert_id= thd->insert_id();
|
||||
}
|
||||
}
|
||||
table->file->release_auto_increment();
|
||||
table->file->ha_release_auto_increment();
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
@ -2480,8 +2586,6 @@ void select_insert::send_error(uint errcode,const char *err)
|
||||
{
|
||||
if (!table->file->has_transactions())
|
||||
{
|
||||
if (last_insert_id)
|
||||
thd->insert_id(last_insert_id); // For binary log
|
||||
if (mysql_bin_log.is_open())
|
||||
{
|
||||
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
|
||||
@ -2501,6 +2605,7 @@ void select_insert::send_error(uint errcode,const char *err)
|
||||
bool select_insert::send_eof()
|
||||
{
|
||||
int error,error2;
|
||||
ulonglong id;
|
||||
DBUG_ENTER("select_insert::send_eof");
|
||||
|
||||
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
|
||||
@ -2527,8 +2632,6 @@ bool select_insert::send_eof()
|
||||
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
|
||||
}
|
||||
|
||||
if (last_insert_id)
|
||||
thd->insert_id(last_insert_id); // For binary log
|
||||
/*
|
||||
Write to binlog before commiting transaction. No statement will
|
||||
be written by the binlog_query() below in RBR mode. All the
|
||||
@ -2558,7 +2661,13 @@ bool select_insert::send_eof()
|
||||
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
|
||||
(ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
|
||||
thd->row_count_func= info.copied+info.deleted+info.updated;
|
||||
::send_ok(thd, (ulong) thd->row_count_func, last_insert_id, buff);
|
||||
|
||||
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
|
||||
thd->first_successful_insert_id_in_cur_stmt :
|
||||
(thd->arg_of_last_insert_id_function ?
|
||||
thd->first_successful_insert_id_in_prev_stmt :
|
||||
(info.copied ? autoinc_value_of_last_inserted_row : 0));
|
||||
::send_ok(thd, (ulong) thd->row_count_func, id, buff);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
@ -2724,21 +2833,6 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
|
||||
}
|
||||
|
||||
|
||||
class MY_HOOKS : public TABLEOP_HOOKS
|
||||
{
|
||||
public:
|
||||
MY_HOOKS(select_create *x) : ptr(x) { }
|
||||
virtual void do_prelock(TABLE **tables, uint count)
|
||||
{
|
||||
if (ptr->get_thd()->current_stmt_binlog_row_based)
|
||||
ptr->binlog_show_create_table(tables, count);
|
||||
}
|
||||
|
||||
private:
|
||||
select_create *ptr;
|
||||
};
|
||||
|
||||
|
||||
int
|
||||
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
|
||||
{
|
||||
@ -2751,8 +2845,9 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
|
||||
MY_HOOKS(select_create *x) : ptr(x) { }
|
||||
virtual void do_prelock(TABLE **tables, uint count)
|
||||
{
|
||||
if (ptr->get_thd()->current_stmt_binlog_row_based)
|
||||
ptr->binlog_show_create_table(tables, count);
|
||||
if (ptr->get_thd()->current_stmt_binlog_row_based &&
|
||||
!(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE))
|
||||
ptr->binlog_show_create_table(tables, count);
|
||||
}
|
||||
|
||||
private:
|
||||
|
Reference in New Issue
Block a user