1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-16678 Prefer MDL to dict_sys.latch for innodb background tasks

This is joint work with Thirunarayanan Balathandayuthapani.
The MDL interface between InnoDB and the rest of the server
(in storage/innobase/dict/dict0dict.cc and in include/)
is my work, while most everything else is Thiru's.

The collection of InnoDB persistent statistics and the
defragmentation were not refactored to use MDL. They will
keep relying on lower-level interlocking with
fil_check_pending_operations().

The purge of transaction history and the background operations on
fulltext indexes will use MDL. We will revert
commit 2c4844c9e7
(MDEV-17813) because thanks to MDL, purge cannot conflict
with DDL operations anymore. For a similar reason, we will remove
the MDEV-16222 test case from gcol.innodb_virtual_debug_purge.

Purge is essentially replacing all use of the global dict_sys.latch
with MDL. Purge will skip the undo log records for tables whose names
start with #sql-ib or #sql2. Theoretically, such tables might
be renamed back to visible table names if TRUNCATE fails to
create a new table, or the final rename in ALTER TABLE...ALGORITHM=COPY
fails. In that case, purge could permanently leave some garbage
in the table. Such garbage will be tolerated; the table would not
be considered corrupted.

To avoid repeated MDL releases and acquisitions,
trx_purge_attach_undo_recs() will sort undo log records by table_id,
and purge_node_t will keep the MDL and table handle open for multiple
successive undo log records.

get_purge_table(): A new accessor, used during the purge of
history for indexed virtual columns. This interface should ideally
not exist at all.

thd_mdl_context(): Accessor of THD::mdl_context.
Wrapped in a new thd_mdl_service.

dict_get_db_name_len(): Define inline.

dict_acquire_mdl_shared(): Acquire explicit shared MDL on a table name
if needed.

dict_table_open_on_id(): Return MDL_ticket, if requested.

dict_table_close(): Release MDL ticket, if requested.

dict_fts_index_syncing(), dict_index_t::index_fts_syncing: Remove.
row_drop_table_for_mysql() no longer needs to check these, because
MDL guarantees that a fulltext index sync will not be in progress
while MDL_EXCLUSIVE is protecting a DDL operation.

dict_table_t::parse_name(): Parse the table name for acquiring MDL.

purge_node_t::undo_recs: Change the type to std::list<trx_purge_rec_t*>
(different container, and storing also roll_ptr).

purge_node_t: Add mdl_ticket, last_table_id, purge_thd, mdl_hold_recs
for acquiring MDL and for keeping the table open across multiple
undo log records.

purge_vcol_info_t, row_purge_store_vsec_cur(), row_purge_restore_vsec_cur():
Remove. We will acquire the MDL earlier.

purge_sys_t::heap: Added, for reading undo log records.

fts_sync_during_ddl(): Invoked during ALGORITHM=INPLACE operations
to ensure that fts_sync_table() will not conflict with MDL_EXCLUSIVE.
Uses fts_t::sync_message for bookkeeping.
This commit is contained in:
Marko Mäkelä
2019-12-10 15:42:50 +02:00
parent e47bd0073c
commit ea37b14409
29 changed files with 696 additions and 979 deletions

View File

@@ -0,0 +1,46 @@
/* Copyright (c) 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
#pragma once
/**
@file include/mysql/service_thd_mdl.h
This service provides functions for plugins and storage engines to access
metadata locks.
*/
#ifdef __cplusplus
extern "C" {
#endif
extern struct thd_mdl_service_st {
void *(*thd_mdl_context)(MYSQL_THD);
} *thd_mdl_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
# define thd_mdl_context(_THD) thd_mdl_service->thd_mdl_context(_THD)
#else
/**
MDL_context accessor
@param thd the current session
@return pointer to thd->mdl_context
*/
void *thd_mdl_context(MYSQL_THD thd);
#endif
#ifdef __cplusplus
}
#endif

View File

@@ -1,5 +1,5 @@
/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. /* Copyright (c) 2009, 2010, Oracle and/or its affiliates.
Copyright (c) 2012, 2017, MariaDB Copyright (c) 2012, 2019, MariaDB
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@@ -43,3 +43,4 @@
#define VERSION_thd_wait 0x0100 #define VERSION_thd_wait 0x0100
#define VERSION_wsrep 0x0202 #define VERSION_wsrep 0x0202
#define VERSION_json 0x0100 #define VERSION_json 0x0100
#define VERSION_thd_mdl 0x0100

View File

@@ -204,32 +204,5 @@ connection truncate;
disconnect truncate; disconnect truncate;
connection default; connection default;
DROP TABLE t1, t2; DROP TABLE t1, t2;
#
# MDEV-16222 Assertion `0' failed in row_purge_remove_sec_if_poss_leaf
# on table with virtual columns and indexes
#
set @saved_dbug= @@global.debug_dbug;
set global debug_dbug= "+d,ib_purge_virtual_mdev_16222_1,ib_purge_virtual_mdev_16222_2";
create table t1 (
pk serial, vb tinyblob as (b) virtual, b tinyblob,
primary key(pk), index (vb(64)))
engine innodb;
insert ignore into t1 (b) values ('foo');
select * into outfile 'load.data' from t1;
load data infile 'load.data' replace into table t1;
set debug_sync= "now WAIT_FOR latch_released";
set global debug_dbug= "-d,ib_purge_virtual_mdev_16222_1";
drop table t1;
set debug_sync= "now SIGNAL drop_started WAIT_FOR got_no_such_table";
create table t1 (
pk serial, vb tinyblob as (b) virtual, b tinyblob,
primary key(pk), index (vb(64)))
engine innodb;
insert ignore into t1 (b) values ('foo');
select * into outfile 'load.data' from t1;
load data infile 'load.data' replace into table t1;
set debug_sync= "now WAIT_FOR got_no_such_table";
set global debug_dbug= @saved_dbug;
drop table t1;
set debug_sync=reset; set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;

View File

@@ -259,54 +259,6 @@ disconnect truncate;
connection default; connection default;
DROP TABLE t1, t2; DROP TABLE t1, t2;
--echo #
--echo # MDEV-16222 Assertion `0' failed in row_purge_remove_sec_if_poss_leaf
--echo # on table with virtual columns and indexes
--echo #
--let $datadir= `select @@datadir`
set @saved_dbug= @@global.debug_dbug;
set global debug_dbug= "+d,ib_purge_virtual_mdev_16222_1,ib_purge_virtual_mdev_16222_2";
create table t1 (
pk serial, vb tinyblob as (b) virtual, b tinyblob,
primary key(pk), index (vb(64)))
engine innodb;
insert ignore into t1 (b) values ('foo');
select * into outfile 'load.data' from t1;
load data infile 'load.data' replace into table t1;
set debug_sync= "now WAIT_FOR latch_released";
set global debug_dbug= "-d,ib_purge_virtual_mdev_16222_1";
drop table t1;
--remove_file $datadir/test/load.data
set debug_sync= "now SIGNAL drop_started WAIT_FOR got_no_such_table";
create table t1 (
pk serial, vb tinyblob as (b) virtual, b tinyblob,
primary key(pk), index (vb(64)))
engine innodb;
insert ignore into t1 (b) values ('foo');
select * into outfile 'load.data' from t1;
load data infile 'load.data' replace into table t1;
set debug_sync= "now WAIT_FOR got_no_such_table";
# FIXME: Race condition here:
# 1. purge thread goes into sending got_no_such_table
# 2. test thread finishes debug_sync= "RESET" below
# 3. purge thread sends got_no_such_table
set global debug_dbug= @saved_dbug;
# cleanup
drop table t1;
--remove_file $datadir/test/load.data
--source include/wait_until_count_sessions.inc --source include/wait_until_count_sessions.inc
set debug_sync=reset; set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;

View File

@@ -21,26 +21,4 @@ ALTER TABLE t1 DROP extra;
disconnect prevent_purge; disconnect prevent_purge;
InnoDB 0 transactions not purged InnoDB 0 transactions not purged
DROP TABLE t1; DROP TABLE t1;
#
# MDEV-17813 Crash in instant ALTER TABLE due to purge
# concurrently emptying table
#
CREATE TABLE t1 (f2 INT) ENGINE=InnoDB;
INSERT INTO t1 SET f2=1;
ALTER TABLE t1 ADD COLUMN f1 INT;
connect purge_control,localhost,root;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
DELETE FROM t1;
SET DEBUG_SYNC='innodb_commit_inplace_alter_table_enter SIGNAL go WAIT_FOR do';
ALTER TABLE t1 ADD COLUMN f3 INT;
connection purge_control;
SET DEBUG_SYNC='now WAIT_FOR go';
COMMIT;
InnoDB 0 transactions not purged
SET DEBUG_SYNC='now SIGNAL do';
disconnect purge_control;
connection default;
SET DEBUG_SYNC=RESET;
DROP TABLE t1;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;

View File

@@ -76,7 +76,7 @@ DROP TABLE t1;
Warnings: Warnings:
Warning 1932 Table 'test.t1' doesn't exist in engine Warning 1932 Table 'test.t1' doesn't exist in engine
DROP TABLE t2,t3; DROP TABLE t2,t3;
FOUND 50 /\[ERROR\] InnoDB: Table `test`\.`t1` in InnoDB data dictionary contains invalid flags\. SYS_TABLES\.TYPE=1 SYS_TABLES\.MIX_LEN=511\b/ in mysqld.1.err FOUND 49 /\[ERROR\] InnoDB: Table `test`\.`t1` in InnoDB data dictionary contains invalid flags\. SYS_TABLES\.TYPE=1 SYS_TABLES\.MIX_LEN=511\b/ in mysqld.1.err
# restart # restart
ib_buffer_pool ib_buffer_pool
ib_logfile0 ib_logfile0

View File

@@ -34,42 +34,4 @@ disconnect prevent_purge;
let $wait_all_purged= 0; let $wait_all_purged= 0;
--source include/wait_all_purged.inc --source include/wait_all_purged.inc
DROP TABLE t1; DROP TABLE t1;
--echo #
--echo # MDEV-17813 Crash in instant ALTER TABLE due to purge
--echo # concurrently emptying table
--echo #
CREATE TABLE t1 (f2 INT) ENGINE=InnoDB;
INSERT INTO t1 SET f2=1;
ALTER TABLE t1 ADD COLUMN f1 INT;
connect (purge_control,localhost,root);
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
DELETE FROM t1;
if ($have_debug) {
SET DEBUG_SYNC='innodb_commit_inplace_alter_table_enter SIGNAL go WAIT_FOR do';
}
send ALTER TABLE t1 ADD COLUMN f3 INT;
connection purge_control;
if ($have_debug) {
SET DEBUG_SYNC='now WAIT_FOR go';
}
COMMIT;
--source include/wait_all_purged.inc
if ($have_debug) {
SET DEBUG_SYNC='now SIGNAL do';
}
disconnect purge_control;
connection default;
reap;
if ($have_debug) {
SET DEBUG_SYNC=RESET;
}
DROP TABLE t1;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;

View File

@@ -4752,6 +4752,12 @@ TABLE *open_purge_table(THD *thd, const char *db, size_t dblen,
DBUG_RETURN(error ? NULL : tl->table); DBUG_RETURN(error ? NULL : tl->table);
} }
TABLE *get_purge_table(THD *thd)
{
/* see above, at most one table can be opened */
DBUG_ASSERT(thd->open_tables == NULL || thd->open_tables->next == NULL);
return thd->open_tables;
}
/** Find an open table in the list of prelocked tabled /** Find an open table in the list of prelocked tabled
@@ -5303,6 +5309,18 @@ extern "C" void thd_wait_end(MYSQL_THD thd)
#endif // INNODB_COMPATIBILITY_HOOKS */ #endif // INNODB_COMPATIBILITY_HOOKS */
/**
MDL_context accessor
@param thd the current session
@return pointer to thd->mdl_context
*/
extern "C" void *thd_mdl_context(MYSQL_THD thd)
{
return &thd->mdl_context;
}
/**************************************************************************** /****************************************************************************
Handling of statement states in functions and triggers. Handling of statement states in functions and triggers.

View File

@@ -1,5 +1,5 @@
/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. /* Copyright (c) 2009, 2010, Oracle and/or its affiliates.
Copyright (c) 2012, 2014, Monty Program Ab Copyright (c) 2012, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@@ -17,6 +17,7 @@
/* support for Services */ /* support for Services */
#include <service_versions.h> #include <service_versions.h>
#include <mysql/service_wsrep.h> #include <mysql/service_wsrep.h>
#include <mysql/service_thd_mdl.h>
struct st_service_ref { struct st_service_ref {
const char *name; const char *name;
@@ -220,6 +221,11 @@ struct json_service_st json_handler=
json_unescape_json json_unescape_json
}; };
static struct thd_mdl_service_st thd_mdl_handler=
{
thd_mdl_context
};
static struct st_service_ref list_of_services[]= static struct st_service_ref list_of_services[]=
{ {
{ "base64_service", VERSION_base64, &base64_handler }, { "base64_service", VERSION_base64, &base64_handler },
@@ -243,6 +249,7 @@ static struct st_service_ref list_of_services[]=
{ "thd_timezone_service", VERSION_thd_timezone, &thd_timezone_handler }, { "thd_timezone_service", VERSION_thd_timezone, &thd_timezone_handler },
{ "thd_wait_service", VERSION_thd_wait, &thd_wait_handler }, { "thd_wait_service", VERSION_thd_wait, &thd_wait_handler },
{ "wsrep_service", VERSION_wsrep, &wsrep_handler }, { "wsrep_service", VERSION_wsrep, &wsrep_handler },
{ "json_service", VERSION_json, &json_handler } { "json_service", VERSION_json, &json_handler },
{ "thd_mdl_service", VERSION_thd_mdl, &thd_mdl_handler }
}; };

View File

@@ -36,6 +36,9 @@ Created 1/8/1996 Heikki Tuuri
#include "fts0fts.h" #include "fts0fts.h"
#include "fil0fil.h" #include "fil0fil.h"
#include <algorithm> #include <algorithm>
#include "sql_class.h"
#include "sql_table.h"
#include <mysql/service_thd_mdl.h>
/** dummy index for ROW_FORMAT=REDUNDANT supremum and infimum records */ /** dummy index for ROW_FORMAT=REDUNDANT supremum and infimum records */
dict_index_t* dict_ind_redundant; dict_index_t* dict_ind_redundant;
@@ -205,21 +208,6 @@ dict_remove_db_name(
return(s + 1); return(s + 1);
} }
/********************************************************************//**
Get the database name length in a table name.
@return database name length */
ulint
dict_get_db_name_len(
/*=================*/
const char* name) /*!< in: table name in the form
dbname '/' tablename */
{
const char* s;
s = strchr(name, '/');
ut_a(s);
return ulint(s - name);
}
/** Open a persistent table. /** Open a persistent table.
@param[in] table_id persistent table identifier @param[in] table_id persistent table identifier
@param[in] ignore_err errors to ignore @param[in] ignore_err errors to ignore
@@ -311,16 +299,21 @@ dict_table_try_drop_aborted_and_mutex_exit(
} }
} }
/********************************************************************//** /** Decrements the count of open handles of a table.
Decrements the count of open handles to a table. */ @param[in,out] table table
@param[in] dict_locked data dictionary locked
@param[in] try_drop try to drop any orphan indexes after
an aborted online index creation
@param[in] thd thread to release MDL
@param[in] mdl metadata lock or NULL if the thread
is a foreground one. */
void void
dict_table_close( dict_table_close(
/*=============*/ dict_table_t* table,
dict_table_t* table, /*!< in/out: table */ bool dict_locked,
ibool dict_locked, /*!< in: TRUE=data dictionary locked */ bool try_drop,
ibool try_drop) /*!< in: TRUE=try to drop any orphan THD* thd,
indexes after an aborted online MDL_ticket* mdl)
index creation */
{ {
if (!dict_locked) { if (!dict_locked) {
mutex_enter(&dict_sys.mutex); mutex_enter(&dict_sys.mutex);
@@ -359,6 +352,12 @@ dict_table_close(
dict_table_try_drop_aborted(NULL, table_id, 0); dict_table_try_drop_aborted(NULL, table_id, 0);
} }
} }
if (!thd || !mdl) {
} else if (MDL_context *mdl_context= static_cast<MDL_context*>(
thd_mdl_context(thd))) {
mdl_context->release_lock(mdl);
}
} }
/********************************************************************//** /********************************************************************//**
@@ -379,7 +378,7 @@ dict_table_close_and_drop(
ut_ad(trx->dict_operation != TRX_DICT_OP_NONE); ut_ad(trx->dict_operation != TRX_DICT_OP_NONE);
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
dict_table_close(table, TRUE, FALSE); dict_table_close(table, true, false);
#if defined UNIV_DEBUG || defined UNIV_DDL_DEBUG #if defined UNIV_DEBUG || defined UNIV_DDL_DEBUG
/* Nobody should have initialized the stats of the newly created /* Nobody should have initialized the stats of the newly created
@@ -723,17 +722,204 @@ dict_index_get_nth_field_pos(
return(ULINT_UNDEFINED); return(ULINT_UNDEFINED);
} }
/**********************************************************************//** /** Parse the table file name into table name and database name.
Returns a table object based on table id. @tparam dict_locked whether dict_sys.mutex is being held
@param[in,out] db_name database name buffer
@param[in,out] tbl_name table name buffer
@param[out] db_name_len database name length
@param[out] tbl_name_len table name length
@return whether the table name is visible to SQL */
template<bool dict_locked>
bool dict_table_t::parse_name(char (&db_name)[NAME_LEN + 1],
char (&tbl_name)[NAME_LEN + 1],
size_t *db_name_len, size_t *tbl_name_len) const
{
char db_buf[MAX_DATABASE_NAME_LEN + 1];
char tbl_buf[MAX_TABLE_NAME_LEN + 1];
if (!dict_locked)
mutex_enter(&dict_sys.mutex); /* protect against renaming */
else
ut_ad(mutex_own(&dict_sys.mutex));
const size_t db_len= name.dblen();
ut_ad(db_len <= MAX_DATABASE_NAME_LEN);
memcpy(db_buf, name.m_name, db_len);
db_buf[db_len]= 0;
size_t tbl_len= strlen(name.m_name + db_len);
memcpy(tbl_buf, name.m_name + db_len + 1, tbl_len);
tbl_len--;
if (!dict_locked)
mutex_exit(&dict_sys.mutex);
*db_name_len= db_len;
*tbl_name_len= tbl_len;
filename_to_tablename(db_buf, db_name, MAX_DATABASE_NAME_LEN + 1, true);
if (tbl_len > TEMP_FILE_PREFIX_LENGTH
&& !strncmp(tbl_buf, TEMP_FILE_PREFIX, TEMP_FILE_PREFIX_LENGTH))
return false;
if (char* is_part= strchr(tbl_buf, '#'))
*is_part = '\0';
filename_to_tablename(tbl_buf, tbl_name, MAX_TABLE_NAME_LEN + 1, true);
return true;
}
template bool
dict_table_t::parse_name<>(char(&)[NAME_LEN + 1], char(&)[NAME_LEN + 1],
size_t*, size_t*) const;
/** Acquire MDL shared for the table name.
@tparam trylock whether to use non-blocking operation
@param[in,out] table table object
@param[in,out] thd background thread
@param[out] mdl mdl ticket
@param[in] table_op operation to perform when opening
@return table object after locking MDL shared
@retval nullptr if the table is not readable, or if trylock && MDL blocked */
template<bool trylock>
dict_table_t*
dict_acquire_mdl_shared(dict_table_t *table,
THD *thd,
MDL_ticket **mdl,
dict_table_op_t table_op)
{
if (!table || !mdl)
return table;
MDL_context *mdl_context= static_cast<MDL_context*>(thd_mdl_context(thd));
size_t db_len;
if (trylock)
{
mutex_enter(&dict_sys.mutex);
db_len= dict_get_db_name_len(table->name.m_name);
mutex_exit(&dict_sys.mutex);
}
else
{
ut_ad(mutex_own(&dict_sys.mutex));
db_len= dict_get_db_name_len(table->name.m_name);
}
if (db_len == 0)
return table; /* InnoDB system tables are not covered by MDL */
if (!mdl_context)
return nullptr;
table_id_t table_id= table->id;
char db_buf[NAME_LEN + 1], db_buf1[NAME_LEN + 1];
char tbl_buf[NAME_LEN + 1], tbl_buf1[NAME_LEN + 1];
size_t tbl_len;
bool unaccessible= false;
if (!table->parse_name<!trylock>(db_buf, tbl_buf, &db_len, &tbl_len))
/* The name of an intermediate table starts with #sql */
return table;
retry:
if (!unaccessible && (!table->is_readable() || table->corrupted))
{
is_unaccessible:
if (*mdl)
{
mdl_context->release_lock(*mdl);
*mdl= nullptr;
}
unaccessible= true;
}
if (!trylock)
table->release();
if (unaccessible)
return nullptr;
if (!trylock)
mutex_exit(&dict_sys.mutex);
{
MDL_request request;
request.init(MDL_key::TABLE, db_buf, tbl_buf, MDL_SHARED, MDL_EXPLICIT);
if (trylock
? mdl_context->try_acquire_lock(&request)
: mdl_context->acquire_lock(&request,
global_system_variables.lock_wait_timeout))
{
*mdl= nullptr;
if (trylock)
return nullptr;
}
else
*mdl= request.ticket;
}
if (!trylock)
mutex_enter(&dict_sys.mutex);
else if (!*mdl)
return nullptr;
table= dict_table_open_on_id(table_id, !trylock, table_op);
if (!table)
{
/* The table was dropped. */
if (*mdl)
{
mdl_context->release_lock(*mdl);
*mdl= nullptr;
}
return nullptr;
}
if (!fil_table_accessible(table))
goto is_unaccessible;
size_t db1_len, tbl1_len;
table->parse_name<!trylock>(db_buf1, tbl_buf1, &db1_len, &tbl1_len);
if (*mdl)
{
if (db_len == db1_len && tbl_len == tbl1_len &&
!memcmp(db_buf, db_buf1, db_len) &&
!memcmp(tbl_buf, tbl_buf1, tbl_len))
return table;
/* The table was renamed. Release MDL for the old name and
try to acquire MDL for the new name. */
mdl_context->release_lock(*mdl);
*mdl= nullptr;
}
db_len= db1_len;
tbl_len= tbl1_len;
memcpy(tbl_buf, tbl_buf1, tbl_len + 1);
memcpy(db_buf, db_buf1, db_len + 1);
goto retry;
}
template dict_table_t*
dict_acquire_mdl_shared<true>(dict_table_t*,THD*,MDL_ticket**,dict_table_op_t);
/** Look up a table by numeric identifier.
@param[in] table_id table identifier
@param[in] dict_locked data dictionary locked
@param[in] table_op operation to perform when opening
@param[in,out] thd background thread, or NULL to not acquire MDL
@param[out] mdl mdl ticket, or NULL
@return table, NULL if does not exist */ @return table, NULL if does not exist */
dict_table_t* dict_table_t*
dict_table_open_on_id( dict_table_open_on_id(table_id_t table_id, bool dict_locked,
/*==================*/ dict_table_op_t table_op, THD *thd,
table_id_t table_id, /*!< in: table id */ MDL_ticket **mdl)
ibool dict_locked, /*!< in: TRUE=data dictionary locked */
dict_table_op_t table_op) /*!< in: operation to perform */
{ {
dict_table_t* table; ut_ad(!dict_locked || !thd);
if (!dict_locked) { if (!dict_locked) {
mutex_enter(&dict_sys.mutex); mutex_enter(&dict_sys.mutex);
@@ -741,7 +927,7 @@ dict_table_open_on_id(
ut_ad(mutex_own(&dict_sys.mutex)); ut_ad(mutex_own(&dict_sys.mutex));
table = dict_table_open_on_id_low( dict_table_t* table = dict_table_open_on_id_low(
table_id, table_id,
table_op == DICT_TABLE_OP_LOAD_TABLESPACE table_op == DICT_TABLE_OP_LOAD_TABLESPACE
? DICT_ERR_IGNORE_RECOVER_LOCK ? DICT_ERR_IGNORE_RECOVER_LOCK
@@ -754,11 +940,16 @@ dict_table_open_on_id(
} }
if (!dict_locked) { if (!dict_locked) {
if (thd) {
table = dict_acquire_mdl_shared<false>(
table, thd, mdl, table_op);
}
dict_table_try_drop_aborted_and_mutex_exit( dict_table_try_drop_aborted_and_mutex_exit(
table, table_op == DICT_TABLE_OP_DROP_ORPHAN); table, table_op == DICT_TABLE_OP_DROP_ORPHAN);
} }
return(table); return table;
} }
/********************************************************************//** /********************************************************************//**

View File

@@ -1842,7 +1842,6 @@ dict_load_columns(
the flag is set before the table is created. */ the flag is set before the table is created. */
if (table->fts == NULL) { if (table->fts == NULL) {
table->fts = fts_create(table); table->fts = fts_create(table);
fts_optimize_add_table(table);
} }
ut_a(table->fts->doc_col == ULINT_UNDEFINED); ut_a(table->fts->doc_col == ULINT_UNDEFINED);
@@ -3077,7 +3076,6 @@ func_exit:
/* the table->fts could be created in dict_load_column /* the table->fts could be created in dict_load_column
when a user defined FTS_DOC_ID is present, but no when a user defined FTS_DOC_ID is present, but no
FTS */ FTS */
fts_optimize_remove_table(table);
fts_free(table); fts_free(table);
} else if (fts_optimize_wq) { } else if (fts_optimize_wq) {
fts_optimize_add_table(table); fts_optimize_add_table(table);

View File

@@ -216,8 +216,6 @@ dict_mem_table_free(
|| DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID) || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
|| DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) { || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
if (table->fts) { if (table->fts) {
fts_optimize_remove_table(table);
fts_free(table); fts_free(table);
} }
} }

View File

@@ -833,10 +833,6 @@ fts_drop_index(
doc_id_t current_doc_id; doc_id_t current_doc_id;
doc_id_t first_doc_id; doc_id_t first_doc_id;
/* If we are dropping the only FTS index of the table,
remove it from optimize thread */
fts_optimize_remove_table(table);
DICT_TF2_FLAG_UNSET(table, DICT_TF2_FTS); DICT_TF2_FLAG_UNSET(table, DICT_TF2_FTS);
/* If Doc ID column is not added internally by FTS index, /* If Doc ID column is not added internally by FTS index,
@@ -850,21 +846,11 @@ fts_drop_index(
err = fts_drop_index_tables(trx, index); err = fts_drop_index_tables(trx, index);
while (index->index_fts_syncing
&& !trx_is_interrupted(trx)) {
DICT_BG_YIELD(trx);
}
fts_free(table); fts_free(table);
return(err); return(err);
} }
while (index->index_fts_syncing
&& !trx_is_interrupted(trx)) {
DICT_BG_YIELD(trx);
}
current_doc_id = table->fts->cache->next_doc_id; current_doc_id = table->fts->cache->next_doc_id;
first_doc_id = table->fts->cache->first_doc_id; first_doc_id = table->fts->cache->first_doc_id;
fts_cache_clear(table->fts->cache); fts_cache_clear(table->fts->cache);
@@ -881,10 +867,6 @@ fts_drop_index(
index_cache = fts_find_index_cache(cache, index); index_cache = fts_find_index_cache(cache, index);
if (index_cache != NULL) { if (index_cache != NULL) {
while (index->index_fts_syncing
&& !trx_is_interrupted(trx)) {
DICT_BG_YIELD(trx);
}
if (index_cache->words) { if (index_cache->words) {
fts_words_free(index_cache->words); fts_words_free(index_cache->words);
rbt_free(index_cache->words); rbt_free(index_cache->words);
@@ -4322,8 +4304,6 @@ begin_sync:
DBUG_EXECUTE_IF("fts_instrument_sync_before_syncing", DBUG_EXECUTE_IF("fts_instrument_sync_before_syncing",
os_thread_sleep(300000);); os_thread_sleep(300000););
index_cache->index->index_fts_syncing = true;
error = fts_sync_index(sync, index_cache); error = fts_sync_index(sync, index_cache);
if (error != DB_SUCCESS) { if (error != DB_SUCCESS) {
@@ -4361,13 +4341,6 @@ end_sync:
} }
rw_lock_x_lock(&cache->lock); rw_lock_x_lock(&cache->lock);
/* Clear fts syncing flags of any indexes in case sync is
interrupted */
for (i = 0; i < ib_vector_size(cache->indexes); ++i) {
static_cast<fts_index_cache_t*>(
ib_vector_get(cache->indexes, i))
->index->index_fts_syncing = false;
}
sync->interrupted = false; sync->interrupted = false;
sync->in_progress = false; sync->in_progress = false;
@@ -5323,7 +5296,7 @@ fts_t::fts_t(
bg_threads(0), bg_threads(0),
add_wq(NULL), add_wq(NULL),
cache(NULL), cache(NULL),
doc_col(ULINT_UNDEFINED), in_queue(false), doc_col(ULINT_UNDEFINED), in_queue(false), sync_message(false),
fts_heap(heap) fts_heap(heap)
{ {
ut_a(table->fts == NULL); ut_a(table->fts == NULL);

View File

@@ -45,6 +45,8 @@ static tpool::timer* timer;
static tpool::task_group task_group(1); static tpool::task_group task_group(1);
static tpool::task task(fts_optimize_callback,0, &task_group); static tpool::task task(fts_optimize_callback,0, &task_group);
/** FTS optimize thread, for MDL acquisition */
static THD *fts_opt_thd;
/** The FTS vector to store fts_slot_t */ /** The FTS vector to store fts_slot_t */
static ib_vector_t* fts_slots; static ib_vector_t* fts_slots;
@@ -2564,12 +2566,6 @@ void fts_optimize_add_table(dict_table_t* table)
return; return;
} }
/* If there is no fts index present then don't add to
optimize queue. */
if (!ib_vector_size(table->fts->indexes)) {
return;
}
/* Make sure table with FTS index cannot be evicted */ /* Make sure table with FTS index cannot be evicted */
dict_table_prevent_eviction(table); dict_table_prevent_eviction(table);
@@ -2627,6 +2623,8 @@ fts_optimize_remove_table(
remove->event = event; remove->event = event;
msg->ptr = remove; msg->ptr = remove;
ut_ad(!mutex_own(&dict_sys.mutex));
add_msg(msg, true); add_msg(msg, true);
mutex_exit(&fts_optimize_wq->mutex); mutex_exit(&fts_optimize_wq->mutex);
@@ -2664,7 +2662,7 @@ fts_optimize_request_sync_table(
add_msg(msg, true); add_msg(msg, true);
table->fts->in_queue = true; table->fts->in_queue = table->fts->sync_message = true;
mutex_exit(&fts_optimize_wq->mutex); mutex_exit(&fts_optimize_wq->mutex);
} }
@@ -2791,14 +2789,34 @@ static bool fts_is_sync_needed()
} }
/** Sync fts cache of a table /** Sync fts cache of a table
@param[in,out] table table to be synced */ @param[in,out] table table to be synced
static void fts_optimize_sync_table(dict_table_t* table) @param[in] process_message processing messages from fts_optimize_wq */
static void fts_optimize_sync_table(dict_table_t *table,
bool process_message= false)
{ {
if (table->fts && table->fts->cache && fil_table_accessible(table)) { MDL_ticket* mdl_ticket= nullptr;
fts_sync_table(table, false); dict_table_t *sync_table= dict_acquire_mdl_shared<true>(table, fts_opt_thd,
} &mdl_ticket);
DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000);); if (!sync_table)
return;
if (sync_table->fts && sync_table->fts->cache &&
fil_table_accessible(sync_table))
{
fts_sync_table(sync_table, false);
if (process_message)
{
mutex_enter(&fts_optimize_wq->mutex);
sync_table->fts->sync_message = false;
mutex_exit(&fts_optimize_wq->mutex);
}
}
DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
if (mdl_ticket)
dict_table_close(sync_table, false, false, fts_opt_thd, mdl_ticket);
} }
/**********************************************************************//** /**********************************************************************//**
@@ -2806,11 +2824,6 @@ Optimize all FTS tables.
@return Dummy return */ @return Dummy return */
static void fts_optimize_callback(void *) static void fts_optimize_callback(void *)
{ {
static ulint current = 0;
static ibool done = FALSE;
static ulint n_tables = ib_vector_size(fts_slots);
static ulint n_optimize = 0;
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (!fts_optimize_wq) { if (!fts_optimize_wq) {
@@ -2818,6 +2831,11 @@ static void fts_optimize_callback(void *)
return; return;
} }
static ulint current = 0;
static ibool done = FALSE;
static ulint n_tables = ib_vector_size(fts_slots);
static ulint n_optimize = 0;
while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) { while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
/* If there is no message in the queue and we have tables /* If there is no message in the queue and we have tables
@@ -2889,7 +2907,8 @@ static void fts_optimize_callback(void *)
os_thread_sleep(300000);); os_thread_sleep(300000););
fts_optimize_sync_table( fts_optimize_sync_table(
static_cast<dict_table_t*>(msg->ptr)); static_cast<dict_table_t*>(msg->ptr),
true);
break; break;
default: default:
@@ -2917,6 +2936,7 @@ static void fts_optimize_callback(void *)
ib_vector_free(fts_slots); ib_vector_free(fts_slots);
fts_slots = NULL; fts_slots = NULL;
innobase_destroy_background_thd(fts_opt_thd);
ib::info() << "FTS optimize thread exiting."; ib::info() << "FTS optimize thread exiting.";
os_event_set(fts_opt_shutdown_event); os_event_set(fts_opt_shutdown_event);
@@ -2946,6 +2966,7 @@ fts_optimize_init(void)
heap_alloc = ib_heap_allocator_create(heap); heap_alloc = ib_heap_allocator_create(heap);
fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4); fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer");
/* Add fts tables to fts_slots which could be skipped /* Add fts tables to fts_slots which could be skipped
during dict_load_table_one() because fts_optimize_thread during dict_load_table_one() because fts_optimize_thread
wasn't even started. */ wasn't even started. */
@@ -3004,4 +3025,24 @@ fts_optimize_shutdown()
os_event_destroy(fts_opt_shutdown_event); os_event_destroy(fts_opt_shutdown_event);
ib_wqueue_free(fts_optimize_wq); ib_wqueue_free(fts_optimize_wq);
fts_optimize_wq = NULL; fts_optimize_wq = NULL;
fts_opt_thd = NULL;
}
/** Sync the table during commit phase
@param[in] table table to be synced */
void fts_sync_during_ddl(dict_table_t* table)
{
mutex_enter(&fts_optimize_wq->mutex);
if (!table->fts->sync_message)
{
mutex_exit(&fts_optimize_wq->mutex);
return;
}
mutex_exit(&fts_optimize_wq->mutex);
fts_sync_table(table, false);
mutex_enter(&fts_optimize_wq->mutex);
table->fts->sync_message = false;
mutex_exit(&fts_optimize_wq->mutex);
} }

View File

@@ -127,6 +127,7 @@ TABLE *find_fk_open_table(THD *thd, const char *db, size_t db_len,
MYSQL_THD create_background_thd(); MYSQL_THD create_background_thd();
void destroy_background_thd(MYSQL_THD thd); void destroy_background_thd(MYSQL_THD thd);
void reset_thd(MYSQL_THD thd); void reset_thd(MYSQL_THD thd);
TABLE *get_purge_table(THD *thd);
TABLE *open_purge_table(THD *thd, const char *db, size_t dblen, TABLE *open_purge_table(THD *thd, const char *db, size_t dblen,
const char *tb, size_t tblen); const char *tb, size_t tblen);
void close_thread_tables(THD* thd); void close_thread_tables(THD* thd);
@@ -5847,14 +5848,6 @@ initialize_auto_increment(dict_table_t* table, const Field* field)
int int
ha_innobase::open(const char* name, int, uint) ha_innobase::open(const char* name, int, uint)
{ {
/* TODO: If trx_rollback_recovered(bool all=false) is ever
removed, the first-time open() must hold (or acquire and release)
a table lock that conflicts with trx_resurrect_table_locks(),
to ensure that any recovered incomplete ALTER TABLE will have been
rolled back. Otherwise, dict_table_t::instant could be cleared by
the rollback invoking dict_index_t::clear_instant_alter() while
open table handles exist in client connections. */
char norm_name[FN_REFLEN]; char norm_name[FN_REFLEN];
DBUG_ENTER("ha_innobase::open"); DBUG_ENTER("ha_innobase::open");
@@ -13731,35 +13724,6 @@ innobase_rename_table(
row_mysql_lock_data_dictionary(trx); row_mysql_lock_data_dictionary(trx);
} }
dict_table_t* table = dict_table_open_on_name(
norm_from, TRUE, FALSE, DICT_ERR_IGNORE_FK_NOKEY);
/* Since DICT_BG_YIELD has sleep for 250 milliseconds,
Convert lock_wait_timeout unit from second to 250 milliseconds */
long int lock_wait_timeout = thd_lock_wait_timeout(trx->mysql_thd) * 4;
if (table != NULL) {
for (dict_index_t* index = dict_table_get_first_index(table);
index != NULL;
index = dict_table_get_next_index(index)) {
if (index->type & DICT_FTS) {
/* Found */
while (index->index_fts_syncing
&& !trx_is_interrupted(trx)
&& (lock_wait_timeout--) > 0) {
DICT_BG_YIELD(trx);
}
}
}
dict_table_close(table, TRUE, FALSE);
}
/* FTS sync is in progress. We shall timeout this operation */
if (lock_wait_timeout < 0) {
error = DB_LOCK_WAIT_TIMEOUT;
goto func_exit;
}
error = row_rename_table_for_mysql(norm_from, norm_to, trx, commit, error = row_rename_table_for_mysql(norm_from, norm_to, trx, commit,
use_fk); use_fk);
@@ -13811,7 +13775,6 @@ innobase_rename_table(
} }
} }
func_exit:
if (commit) { if (commit) {
row_mysql_unlock_data_dictionary(trx); row_mysql_unlock_data_dictionary(trx);
} }
@@ -20647,126 +20610,6 @@ ha_innobase::multi_range_read_explain_info(
return m_ds_mrr.dsmrr_explain_info(mrr_mode, str, size); return m_ds_mrr.dsmrr_explain_info(mrr_mode, str, size);
} }
/** Parse the table file name into table name and database name.
@param[in] tbl_name InnoDB table name
@param[out] dbname database name buffer (NAME_LEN + 1 bytes)
@param[out] tblname table name buffer (NAME_LEN + 1 bytes)
@param[out] dbnamelen database name length
@param[out] tblnamelen table name length
@return true if the table name is parsed properly. */
static bool table_name_parse(
const table_name_t& tbl_name,
char* dbname,
char* tblname,
ulint& dbnamelen,
ulint& tblnamelen)
{
dbnamelen = dict_get_db_name_len(tbl_name.m_name);
char db_buf[MAX_DATABASE_NAME_LEN + 1];
char tbl_buf[MAX_TABLE_NAME_LEN + 1];
ut_ad(dbnamelen > 0);
ut_ad(dbnamelen <= MAX_DATABASE_NAME_LEN);
memcpy(db_buf, tbl_name.m_name, dbnamelen);
db_buf[dbnamelen] = 0;
tblnamelen = strlen(tbl_name.m_name + dbnamelen + 1);
memcpy(tbl_buf, tbl_name.m_name + dbnamelen + 1, tblnamelen);
tbl_buf[tblnamelen] = 0;
filename_to_tablename(db_buf, dbname, MAX_DATABASE_NAME_LEN + 1, true);
if (tblnamelen > TEMP_FILE_PREFIX_LENGTH
&& !strncmp(tbl_buf, TEMP_FILE_PREFIX, TEMP_FILE_PREFIX_LENGTH)) {
return false;
}
if (char *is_part = strchr(tbl_buf, '#')) {
*is_part = '\0';
tblnamelen = is_part - tbl_buf;
}
filename_to_tablename(tbl_buf, tblname, MAX_TABLE_NAME_LEN + 1, true);
return true;
}
/** Acquire metadata lock and MariaDB table handle for an InnoDB table.
@param[in,out] thd thread handle
@param[in,out] table InnoDB table
@return MariaDB table handle
@retval NULL if the table does not exist, is unaccessible or corrupted. */
static TABLE* innodb_acquire_mdl(THD* thd, dict_table_t* table)
{
char db_buf[NAME_LEN + 1], db_buf1[NAME_LEN + 1];
char tbl_buf[NAME_LEN + 1], tbl_buf1[NAME_LEN + 1];
ulint db_buf_len, db_buf1_len;
ulint tbl_buf_len, tbl_buf1_len;
if (!table_name_parse(table->name, db_buf, tbl_buf,
db_buf_len, tbl_buf_len)) {
table->release();
return NULL;
}
DEBUG_SYNC(thd, "ib_purge_virtual_latch_released");
const table_id_t table_id = table->id;
retry_mdl:
const bool unaccessible = !table->is_readable() || table->corrupted;
table->release();
if (unaccessible) {
return NULL;
}
TABLE* mariadb_table = open_purge_table(thd, db_buf, db_buf_len,
tbl_buf, tbl_buf_len);
if (!mariadb_table)
thd_clear_error(thd);
DEBUG_SYNC(thd, "ib_purge_virtual_got_no_such_table");
table = dict_table_open_on_id(table_id, false, DICT_TABLE_OP_NORMAL);
if (table == NULL) {
/* Table is dropped. */
goto fail;
}
if (!fil_table_accessible(table)) {
release_fail:
table->release();
fail:
if (mariadb_table) {
close_thread_tables(thd);
}
return NULL;
}
if (!table_name_parse(table->name, db_buf1, tbl_buf1,
db_buf1_len, tbl_buf1_len)) {
goto release_fail;
}
if (!mariadb_table) {
} else if (!strcmp(db_buf, db_buf1) && !strcmp(tbl_buf, tbl_buf1)) {
return mariadb_table;
} else {
/* Table is renamed. So release MDL for old name and try
to acquire the MDL for new table name. */
close_thread_tables(thd);
}
strcpy(tbl_buf, tbl_buf1);
strcpy(db_buf, db_buf1);
tbl_buf_len = tbl_buf1_len;
db_buf_len = db_buf1_len;
goto retry_mdl;
}
/** Find or open a table handle for the virtual column template /** Find or open a table handle for the virtual column template
@param[in] thd thread handle @param[in] thd thread handle
@param[in,out] table InnoDB table whose virtual column template @param[in,out] table InnoDB table whose virtual column template
@@ -20790,12 +20633,13 @@ static TABLE* innodb_find_table_for_vc(THD* thd, dict_table_t* table)
STRING_WITH_LEN("ib_purge_virtual_got_no_such_table " STRING_WITH_LEN("ib_purge_virtual_got_no_such_table "
"SIGNAL got_no_such_table")));); "SIGNAL got_no_such_table"))););
if (THDVAR(thd, background_thread)) { TABLE *mysql_table;
/* Purge thread acquires dict_sys.latch while const bool bg_thread = THDVAR(thd, background_thread);
processing undo log record. Release it
before acquiring MDL on the table. */ if (bg_thread) {
rw_lock_s_unlock(&dict_sys.latch); if ((mysql_table = get_purge_table(thd))) {
return innodb_acquire_mdl(thd, table); return mysql_table;
}
} else { } else {
if (table->vc_templ->mysql_table_query_id if (table->vc_templ->mysql_table_query_id
== thd_get_query_id(thd)) { == thd_get_query_id(thd)) {
@@ -20807,15 +20651,17 @@ static TABLE* innodb_find_table_for_vc(THD* thd, dict_table_t* table)
char tbl_buf[NAME_LEN + 1]; char tbl_buf[NAME_LEN + 1];
ulint db_buf_len, tbl_buf_len; ulint db_buf_len, tbl_buf_len;
if (!table_name_parse(table->name, db_buf, tbl_buf, if (!table->parse_name(db_buf, tbl_buf, &db_buf_len, &tbl_buf_len)) {
db_buf_len, tbl_buf_len)) {
ut_ad(!"invalid table name");
return NULL; return NULL;
} }
TABLE* mysql_table = find_fk_open_table(thd, db_buf, db_buf_len, if (bg_thread) {
tbl_buf, tbl_buf_len); return open_purge_table(thd, db_buf, db_buf_len,
tbl_buf, tbl_buf_len);
}
mysql_table = find_fk_open_table(thd, db_buf, db_buf_len,
tbl_buf, tbl_buf_len);
table->vc_templ->mysql_table = mysql_table; table->vc_templ->mysql_table = mysql_table;
table->vc_templ->mysql_table_query_id = thd_get_query_id(thd); table->vc_templ->mysql_table_query_id = thd_get_query_id(thd);
return mysql_table; return mysql_table;

View File

@@ -140,30 +140,6 @@ static const alter_table_operations INNOBASE_ALTER_INSTANT
| ALTER_RENAME_INDEX | ALTER_RENAME_INDEX
| ALTER_DROP_VIRTUAL_COLUMN; | ALTER_DROP_VIRTUAL_COLUMN;
/** Acquire a page latch on the possible metadata record,
to prevent concurrent invocation of dict_index_t::clear_instant_alter()
by purge when the table turns out to be empty.
@param[in,out] index clustered index
@param[in,out] mtr mini-transaction */
static void instant_metadata_lock(dict_index_t& index, mtr_t& mtr)
{
DBUG_ASSERT(index.is_primary());
if (!index.is_instant()) {
/* dict_index_t::clear_instant_alter() cannot be called.
No need for a latch. */
return;
}
btr_cur_t btr_cur;
btr_cur_open_at_index_side(true, &index, BTR_SEARCH_LEAF,
&btr_cur, 0, &mtr);
ut_ad(page_cur_is_before_first(btr_cur_get_page_cur(&btr_cur)));
ut_ad(page_is_leaf(btr_cur_get_page(&btr_cur)));
ut_ad(!page_has_prev(btr_cur_get_page(&btr_cur)));
ut_ad(!buf_block_get_page_zip(btr_cur_get_block(&btr_cur)));
}
/** Initialize instant->field_map. /** Initialize instant->field_map.
@param[in] table table definition to copy from */ @param[in] table table definition to copy from */
inline void dict_table_t::init_instant(const dict_table_t& table) inline void dict_table_t::init_instant(const dict_table_t& table)
@@ -237,16 +213,10 @@ inline void dict_table_t::prepare_instant(const dict_table_t& old,
If that is the case, the instant ALTER TABLE would keep If that is the case, the instant ALTER TABLE would keep
the InnoDB table in its current format. */ the InnoDB table in its current format. */
dict_index_t& oindex = *old.indexes.start; const dict_index_t& oindex = *old.indexes.start;
dict_index_t& index = *indexes.start; dict_index_t& index = *indexes.start;
first_alter_pos = 0; first_alter_pos = 0;
mtr_t mtr;
mtr.start();
/* Protect oindex.n_core_fields and others, so that
purge cannot invoke dict_index_t::clear_instant_alter(). */
instant_metadata_lock(oindex, mtr);
for (unsigned i = 0; i + DATA_N_SYS_COLS < old.n_cols; i++) { for (unsigned i = 0; i + DATA_N_SYS_COLS < old.n_cols; i++) {
if (col_map[i] != i) { if (col_map[i] != i) {
first_alter_pos = 1 + i; first_alter_pos = 1 + i;
@@ -419,7 +389,6 @@ found_j:
DBUG_ASSERT(n_dropped() >= old.n_dropped()); DBUG_ASSERT(n_dropped() >= old.n_dropped());
DBUG_ASSERT(index.n_core_fields == oindex.n_core_fields); DBUG_ASSERT(index.n_core_fields == oindex.n_core_fields);
DBUG_ASSERT(index.n_core_null_bytes == oindex.n_core_null_bytes); DBUG_ASSERT(index.n_core_null_bytes == oindex.n_core_null_bytes);
mtr.commit();
} }
/** Adjust index metadata for instant ADD/DROP/reorder COLUMN. /** Adjust index metadata for instant ADD/DROP/reorder COLUMN.
@@ -439,15 +408,8 @@ inline void dict_index_t::instant_add_field(const dict_index_t& instant)
DBUG_ASSERT(n_uniq == instant.n_uniq); DBUG_ASSERT(n_uniq == instant.n_uniq);
DBUG_ASSERT(instant.n_fields >= n_fields); DBUG_ASSERT(instant.n_fields >= n_fields);
DBUG_ASSERT(instant.n_nullable >= n_nullable); DBUG_ASSERT(instant.n_nullable >= n_nullable);
/* dict_table_t::prepare_instant() initialized n_core_fields DBUG_ASSERT(instant.n_core_fields == n_core_fields);
to be equal. However, after that purge could have emptied the DBUG_ASSERT(instant.n_core_null_bytes == n_core_null_bytes);
table and invoked dict_index_t::clear_instant_alter(). */
DBUG_ASSERT(instant.n_core_fields <= n_core_fields);
DBUG_ASSERT(instant.n_core_null_bytes <= n_core_null_bytes);
DBUG_ASSERT(instant.n_core_fields == n_core_fields
|| (!is_instant() && instant.is_instant()));
DBUG_ASSERT(instant.n_core_null_bytes == n_core_null_bytes
|| (!is_instant() && instant.is_instant()));
/* instant will have all fields (including ones for columns /* instant will have all fields (including ones for columns
that have been or are being instantly dropped) in the same position that have been or are being instantly dropped) in the same position
@@ -752,11 +714,6 @@ inline void dict_table_t::rollback_instant(
{ {
ut_d(dict_sys.assert_locked()); ut_d(dict_sys.assert_locked());
dict_index_t* index = indexes.start; dict_index_t* index = indexes.start;
mtr_t mtr;
mtr.start();
/* Prevent concurrent execution of dict_index_t::clear_instant_alter()
by acquiring a latch on the leftmost leaf page. */
instant_metadata_lock(*index, mtr);
/* index->is_instant() does not necessarily hold here, because /* index->is_instant() does not necessarily hold here, because
the table may have been emptied */ the table may have been emptied */
DBUG_ASSERT(old_n_cols >= DATA_N_SYS_COLS); DBUG_ASSERT(old_n_cols >= DATA_N_SYS_COLS);
@@ -811,7 +768,6 @@ inline void dict_table_t::rollback_instant(
} }
index->fields = old_fields; index->fields = old_fields;
mtr.commit();
while ((index = dict_table_get_next_index(index)) != NULL) { while ((index = dict_table_get_next_index(index)) != NULL) {
if (index->to_be_dropped) { if (index->to_be_dropped) {
@@ -5557,12 +5513,6 @@ static bool innobase_instant_try(
dict_table_t* user_table = ctx->old_table; dict_table_t* user_table = ctx->old_table;
dict_index_t* index = dict_table_get_first_index(user_table); dict_index_t* index = dict_table_get_first_index(user_table);
mtr_t mtr;
mtr.start();
/* Prevent purge from calling dict_index_t::clear_instant_add(),
to protect index->n_core_fields, index->table->instant and others
from changing during ctx->instant_column(). */
instant_metadata_lock(*index, mtr);
const unsigned n_old_fields = index->n_fields; const unsigned n_old_fields = index->n_fields;
const dict_col_t* old_cols = user_table->cols; const dict_col_t* old_cols = user_table->cols;
DBUG_ASSERT(user_table->n_cols == ctx->old_n_cols); DBUG_ASSERT(user_table->n_cols == ctx->old_n_cols);
@@ -5570,11 +5520,6 @@ static bool innobase_instant_try(
const bool metadata_changed = ctx->instant_column(); const bool metadata_changed = ctx->instant_column();
DBUG_ASSERT(index->n_fields >= n_old_fields); DBUG_ASSERT(index->n_fields >= n_old_fields);
/* Release the page latch. Between this and the next
btr_pcur_open_at_index_side(), data fields such as
index->n_core_fields and index->table->instant could change,
but we would handle that in empty_table: below. */
mtr.commit();
/* The table may have been emptied and may have lost its /* The table may have been emptied and may have lost its
'instantness' during this ALTER TABLE. */ 'instantness' during this ALTER TABLE. */
@@ -5736,6 +5681,7 @@ add_all_virtual:
memset(roll_ptr, 0, sizeof roll_ptr); memset(roll_ptr, 0, sizeof roll_ptr);
dtuple_t* entry = index->instant_metadata(*row, ctx->heap); dtuple_t* entry = index->instant_metadata(*row, ctx->heap);
mtr_t mtr;
mtr.start(); mtr.start();
index->set_modified(mtr); index->set_modified(mtr);
btr_pcur_t pcur; btr_pcur_t pcur;
@@ -6975,6 +6921,10 @@ op_ok:
ut_a(ctx->trx->lock.n_active_thrs == 0); ut_a(ctx->trx->lock.n_active_thrs == 0);
if (ctx->old_table->fts) {
fts_sync_during_ddl(ctx->old_table);
}
error_handling: error_handling:
/* After an error, remove all those index definitions from the /* After an error, remove all those index definitions from the
dictionary which were defined. */ dictionary which were defined. */
@@ -8517,6 +8467,26 @@ innobase_rollback_sec_index(
} }
} }
/* Get the number of uncommitted fts index during rollback
operation.
@param[in] table table which undergoes rollback for alter
@return number of uncommitted fts indexes. */
static
ulint innobase_get_uncommitted_fts_indexes(const dict_table_t* table)
{
ut_ad(mutex_own(&dict_sys.mutex));
dict_index_t* index = dict_table_get_first_index(table);
ulint n_uncommitted_fts = 0;
for (; index ; index = dict_table_get_next_index(index))
{
if (index->type & DICT_FTS && !index->is_committed())
n_uncommitted_fts++;
}
return n_uncommitted_fts;
}
/** Roll back the changes made during prepare_inplace_alter_table() /** Roll back the changes made during prepare_inplace_alter_table()
and inplace_alter_table() inside the storage engine. Note that the and inplace_alter_table() inside the storage engine. Note that the
allowed level of concurrency during this operation will be the same as allowed level of concurrency during this operation will be the same as
@@ -8599,6 +8569,19 @@ rollback_inplace_alter_table(
& ALTER_ADD_PK_INDEX)); & ALTER_ADD_PK_INDEX));
DBUG_ASSERT(ctx->new_table == prebuilt->table); DBUG_ASSERT(ctx->new_table == prebuilt->table);
/* Remove the fts table from fts_optimize_wq if
there is only one fts index exist. */
if (prebuilt->table->fts
&& innobase_get_uncommitted_fts_indexes(
prebuilt->table) == 1
&& (ib_vector_is_empty(prebuilt->table->fts->indexes)
|| ib_vector_size(prebuilt->table->fts->indexes)
== 1)) {
row_mysql_unlock_data_dictionary(ctx->trx);
fts_optimize_remove_table(prebuilt->table);
row_mysql_lock_data_dictionary(ctx->trx);
}
innobase_rollback_sec_index( innobase_rollback_sec_index(
prebuilt->table, table, FALSE, ctx->trx); prebuilt->table, table, FALSE, ctx->trx);
} }
@@ -10724,6 +10707,7 @@ ha_innobase::commit_inplace_alter_table(
if (ctx->new_table->fts) { if (ctx->new_table->fts) {
ut_ad(!ctx->new_table->fts->add_wq); ut_ad(!ctx->new_table->fts->add_wq);
fts_optimize_remove_table(ctx->new_table); fts_optimize_remove_table(ctx->new_table);
fts_sync_during_ddl(ctx->new_table);
} }
/* Apply the online log of the table before acquiring /* Apply the online log of the table before acquiring
@@ -10781,44 +10765,6 @@ ha_innobase::commit_inplace_alter_table(
DICT_BG_YIELD(trx); DICT_BG_YIELD(trx);
} }
/* Make a concurrent Drop fts Index to wait until sync of that
fts index is happening in the background */
for (int retry_count = 0;;) {
bool retry = false;
for (inplace_alter_handler_ctx** pctx = ctx_array;
*pctx; pctx++) {
ha_innobase_inplace_ctx* ctx
= static_cast<ha_innobase_inplace_ctx*>(*pctx);
DBUG_ASSERT(new_clustered == ctx->need_rebuild());
if (dict_fts_index_syncing(ctx->old_table)) {
retry = true;
break;
}
if (new_clustered && dict_fts_index_syncing(ctx->new_table)) {
retry = true;
break;
}
}
if (!retry) {
break;
}
/* Print a message if waiting for a long time. */
if (retry_count < 100) {
retry_count++;
} else {
ib::info() << "Drop index waiting for background sync"
" to finish";
retry_count = 0;
}
DICT_BG_YIELD(trx);
}
/* Apply the changes to the data dictionary tables, for all /* Apply the changes to the data dictionary tables, for all
partitions. */ partitions. */

View File

@@ -33,21 +33,27 @@ Created 1/8/1996 Heikki Tuuri
#include "fsp0fsp.h" #include "fsp0fsp.h"
#include <deque> #include <deque>
class MDL_ticket;
extern bool innodb_table_stats_not_found; extern bool innodb_table_stats_not_found;
extern bool innodb_index_stats_not_found; extern bool innodb_index_stats_not_found;
/** the first table or index ID for other than hard-coded system tables */ /** the first table or index ID for other than hard-coded system tables */
constexpr uint8_t DICT_HDR_FIRST_ID= 10; constexpr uint8_t DICT_HDR_FIRST_ID= 10;
/********************************************************************//**
Get the database name length in a table name. /** Get the database name length in a table name.
@param name filename-safe encoded table name "dbname/tablename"
@return database name length */ @return database name length */
ulint inline size_t dict_get_db_name_len(const char *name)
dict_get_db_name_len( {
/*=================*/ /* table_name_t::dblen() would assert that '/' is contained */
const char* name) /*!< in: table name in the form if (const char* s= strchr(name, '/'))
dbname '/' tablename */ return size_t(s - name);
MY_ATTRIBUTE((nonnull, warn_unused_result));
return 0;
}
/*********************************************************************//** /*********************************************************************//**
Open a table from its database and table name, this is currently used by Open a table from its database and table name, this is currently used by
foreign constraint parser to get the referenced table. foreign constraint parser to get the referenced table.
@@ -118,33 +124,56 @@ enum dict_table_op_t {
DICT_TABLE_OP_OPEN_ONLY_IF_CACHED DICT_TABLE_OP_OPEN_ONLY_IF_CACHED
}; };
/**********************************************************************//** /** Acquire MDL shared for the table name.
Returns a table object based on table id. @tparam trylock whether to use non-blocking operation
@param[in,out] table table object
@param[in,out] thd background thread
@param[out] mdl mdl ticket
@param[in] table_op operation to perform when opening
@return table object after locking MDL shared
@retval NULL if the table is not readable, or if trylock && MDL blocked */
template<bool trylock>
dict_table_t*
dict_acquire_mdl_shared(dict_table_t *table,
THD *thd,
MDL_ticket **mdl,
dict_table_op_t table_op= DICT_TABLE_OP_NORMAL);
/** Look up a table by numeric identifier.
@param[in] table_id table identifier
@param[in] dict_locked data dictionary locked
@param[in] table_op operation to perform when opening
@param[in,out] thd background thread, or NULL to not acquire MDL
@param[out] mdl mdl ticket, or NULL
@return table, NULL if does not exist */ @return table, NULL if does not exist */
dict_table_t* dict_table_t*
dict_table_open_on_id( dict_table_open_on_id(table_id_t table_id, bool dict_locked,
/*==================*/ dict_table_op_t table_op, THD *thd= nullptr,
table_id_t table_id, /*!< in: table id */ MDL_ticket **mdl= nullptr)
ibool dict_locked, /*!< in: TRUE=data dictionary locked */ MY_ATTRIBUTE((warn_unused_result));
dict_table_op_t table_op) /*!< in: operation to perform */
MY_ATTRIBUTE((warn_unused_result));
/**********************************************************************//** /**********************************************************************//**
Returns a table object based on table id. Returns a table object based on table id.
@return table, NULL if does not exist */ @return table, NULL if does not exist */
dict_table_t* dict_table_open_on_index_id(index_id_t index_id) dict_table_t* dict_table_open_on_index_id(index_id_t index_id)
__attribute__((warn_unused_result)); __attribute__((warn_unused_result));
/********************************************************************//**
Decrements the count of open handles to a table. */ /** Decrements the count of open handles of a table.
@param[in,out] table table
@param[in] dict_locked data dictionary locked
@param[in] try_drop try to drop any orphan indexes after
an aborted online index creation
@param[in] thd thread to release MDL
@param[in] mdl metadata lock or NULL if the thread is a
foreground one. */
void void
dict_table_close( dict_table_close(
/*=============*/ dict_table_t* table,
dict_table_t* table, /*!< in/out: table */ bool dict_locked,
ibool dict_locked, /*!< in: TRUE=data dictionary locked */ bool try_drop,
ibool try_drop) /*!< in: TRUE=try to drop any orphan THD* thd = NULL,
indexes after an aborted online MDL_ticket* mdl = NULL);
index creation */
MY_ATTRIBUTE((nonnull));
/*********************************************************************//** /*********************************************************************//**
Closes the only open handle to a table and drops a table while assuring Closes the only open handle to a table and drops a table while assuring
that dict_sys.mutex is held the whole time. This assures that the table that dict_sys.mutex is held the whole time. This assures that the table

View File

@@ -705,28 +705,6 @@ dict_tf_to_sys_tables_type(
return(type); return(type);
} }
/*********************************************************************//**
Returns true if the particular FTS index in the table is still syncing
in the background, false otherwise.
@param [in] table Table containing FTS index
@return True if sync of fts index is still going in the background */
UNIV_INLINE
bool
dict_fts_index_syncing(
dict_table_t* table)
{
dict_index_t* index;
for (index = dict_table_get_first_index(table);
index != NULL;
index = dict_table_get_next_index(index)) {
if (index->index_fts_syncing) {
return(true);
}
}
return(false);
}
/********************************************************************//** /********************************************************************//**
Gets the number of fields in the internal representation of an index, Gets the number of fields in the internal representation of an index,
including fields added by the dictionary system. including fields added by the dictionary system.

View File

@@ -46,6 +46,7 @@ Created 1/8/1996 Heikki Tuuri
#include "gis0type.h" #include "gis0type.h"
#include "fil0fil.h" #include "fil0fil.h"
#include "fil0crypt.h" #include "fil0crypt.h"
#include "mysql_com.h"
#include <sql_const.h> #include <sql_const.h>
#include <set> #include <set>
#include <algorithm> #include <algorithm>
@@ -1039,9 +1040,6 @@ struct dict_index_t {
bool has_new_v_col; bool has_new_v_col;
/*!< whether it has a newly added virtual /*!< whether it has a newly added virtual
column in ALTER */ column in ALTER */
bool index_fts_syncing;/*!< Whether the fts index is
still syncing in the background;
FIXME: remove this and use MDL */
UT_LIST_NODE_T(dict_index_t) UT_LIST_NODE_T(dict_index_t)
indexes;/*!< list of indexes of the table */ indexes;/*!< list of indexes of the table */
#ifdef BTR_CUR_ADAPT #ifdef BTR_CUR_ADAPT
@@ -1884,6 +1882,18 @@ struct dict_table_t {
/** For overflow fields returns potential max length stored inline */ /** For overflow fields returns potential max length stored inline */
inline size_t get_overflow_field_local_len() const; inline size_t get_overflow_field_local_len() const;
/** Parse the table file name into table name and database name.
@tparam dict_locked whether dict_sys.mutex is being held
@param[in,out] db_name database name buffer
@param[in,out] tbl_name table name buffer
@param[out] db_name_len database name length
@param[out] tbl_name_len table name length
@return whether the table name is visible to SQL */
template<bool dict_locked= false>
bool parse_name(char (&db_name)[NAME_LEN + 1],
char (&tbl_name)[NAME_LEN + 1],
size_t *db_name_len, size_t *tbl_name_len) const;
private: private:
/** Initialize instant->field_map. /** Initialize instant->field_map.
@param[in] table table definition to copy from */ @param[in] table table definition to copy from */

View File

@@ -349,6 +349,10 @@ public:
protected by fts_optimize_wq mutex */ protected by fts_optimize_wq mutex */
bool in_queue; bool in_queue;
/** Whether the sync message exists in fts_optimize_wq;
protected by fts_optimize_wq mutex */
bool sync_message;
/** Heap for fts_t allocation. */ /** Heap for fts_t allocation. */
mem_heap_t* fts_heap; mem_heap_t* fts_heap;
}; };
@@ -984,4 +988,8 @@ fts_trx_t*
fts_trx_create( fts_trx_create(
trx_t* trx); trx_t* trx);
/** Sync the table during commit phase
@param[in] table table to be synced */
void fts_sync_during_ddl(dict_table_t* table);
#endif /*!< fts0fts.h */ #endif /*!< fts0fts.h */

View File

@@ -32,9 +32,11 @@ Created 3/14/1997 Heikki Tuuri
#include "btr0pcur.h" #include "btr0pcur.h"
#include "trx0types.h" #include "trx0types.h"
#include "row0types.h" #include "row0types.h"
#include "ut0vec.h"
#include "row0mysql.h" #include "row0mysql.h"
#include "mysqld.h"
#include <list>
class MDL_ticket;
/** Determines if it is possible to remove a secondary index entry. /** Determines if it is possible to remove a secondary index entry.
Removal is possible if the secondary index entry does not refer to any Removal is possible if the secondary index entry does not refer to any
not delete marked version of a clustered index record where DB_TRX_ID not delete marked version of a clustered index record where DB_TRX_ID
@@ -79,6 +81,15 @@ row_purge_step(
que_thr_t* thr) /*!< in: query thread */ que_thr_t* thr) /*!< in: query thread */
MY_ATTRIBUTE((nonnull, warn_unused_result)); MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Info required to purge a record */
struct trx_purge_rec_t
{
/** Record to purge */
trx_undo_rec_t *undo_rec;
/** File pointer to undo record */
roll_ptr_t roll_ptr;
};
/* Purge node structure */ /* Purge node structure */
struct purge_node_t{ struct purge_node_t{
@@ -86,7 +97,6 @@ struct purge_node_t{
/*----------------------*/ /*----------------------*/
/* Local storage for this graph node */ /* Local storage for this graph node */
roll_ptr_t roll_ptr;/* roll pointer to undo log record */ roll_ptr_t roll_ptr;/* roll pointer to undo log record */
ib_vector_t* undo_recs;/*!< Undo recs to purge */
undo_no_t undo_no;/*!< undo number of the record */ undo_no_t undo_no;/*!< undo number of the record */
@@ -127,21 +137,37 @@ public:
#endif #endif
trx_id_t trx_id; /*!< trx id for this purging record */ trx_id_t trx_id; /*!< trx id for this purging record */
/** Virtual column information about opening of MariaDB table. /** meta-data lock for the table name */
It resets after processing each undo log record. */ MDL_ticket* mdl_ticket;
purge_vcol_info_t vcol_info;
/** table id of the previous undo log record */
table_id_t last_table_id;
/** purge thread */
THD* purge_thd;
/** metadata lock holds for this number of undo log recs */
int mdl_hold_recs;
/** Undo recs to purge */
std::list<trx_purge_rec_t*> undo_recs;
/** Constructor */ /** Constructor */
explicit purge_node_t(que_thr_t* parent) : explicit purge_node_t(que_thr_t* parent) :
common(QUE_NODE_PURGE, parent), common(QUE_NODE_PURGE, parent),
undo_recs(NULL),
unavailable_table_id(0), unavailable_table_id(0),
table(NULL),
heap(mem_heap_create(256)), heap(mem_heap_create(256)),
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
in_progress(false), in_progress(false),
#endif #endif
vcol_info() mdl_ticket(NULL),
{} last_table_id(0),
purge_thd(NULL),
mdl_hold_recs(0)
{
undo_recs.clear();
}
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/***********************************************************//** /***********************************************************//**
@@ -154,11 +180,6 @@ public:
bool validate_pcur(); bool validate_pcur();
#endif #endif
/** Whether purge failed to open the maria table for virtual column
computation.
@return true if the table failed to open. */
bool vcol_op_failed() const { return !vcol_info.validate(); }
/** Determine if a table should be skipped in purge. /** Determine if a table should be skipped in purge.
@param[in] table_id table identifier @param[in] table_id table identifier
@return whether to skip the table lookup and processing */ @return whether to skip the table lookup and processing */
@@ -177,33 +198,72 @@ public:
def_trx_id = limit; def_trx_id = limit;
} }
/** Start processing an undo log record. */ /** Start processing an undo log record. */
void start() void start()
{ {
ut_ad(in_progress); ut_ad(in_progress);
DBUG_ASSERT(common.type == QUE_NODE_PURGE); DBUG_ASSERT(common.type == QUE_NODE_PURGE);
table = NULL; row= nullptr;
row = NULL; ref= nullptr;
ref = NULL; index= nullptr;
index = NULL; update= nullptr;
update = NULL; found_clust= FALSE;
found_clust = FALSE; rec_type= ULINT_UNDEFINED;
rec_type = ULINT_UNDEFINED; cmpl_info= ULINT_UNDEFINED;
cmpl_info = ULINT_UNDEFINED; if (!purge_thd)
} purge_thd= current_thd;
}
/** Reset the state at end
@return the query graph parent */ /** Close the existing table and release the MDL for it. */
que_node_t* end() void close_table()
{ {
DBUG_ASSERT(common.type == QUE_NODE_PURGE); last_table_id= 0;
undo_recs = NULL; if (!table)
ut_d(in_progress = false); {
vcol_info.reset(); ut_ad(!mdl_ticket);
mem_heap_empty(heap); return;
return common.parent; }
}
innobase_reset_background_thd(purge_thd);
dict_table_close(table, false, false, purge_thd, mdl_ticket);
table= nullptr;
mdl_ticket= nullptr;
}
/** Retail mdl for the table id.
@param[in] table_id table id to be processed
@return true if retain mdl */
bool retain_mdl(table_id_t table_id)
{
ut_ad(table_id);
if (last_table_id == table_id && mdl_hold_recs < 100)
{
ut_ad(table);
mdl_hold_recs++;
return true;
}
mdl_hold_recs= 0;
close_table();
return false;
}
/** Reset the state at end
@return the query graph parent */
que_node_t* end()
{
DBUG_ASSERT(common.type == QUE_NODE_PURGE);
close_table();
undo_recs.clear();
ut_d(in_progress= false);
purge_thd= nullptr;
mem_heap_empty(heap);
return common.parent;
}
}; };
#endif #endif

View File

@@ -53,98 +53,4 @@ struct row_log_t;
/* MySQL data types */ /* MySQL data types */
struct TABLE; struct TABLE;
/** Purge virtual column node information. */
struct purge_vcol_info_t
{
private:
/** Is there a possible need to evaluate virtual columns? */
bool requested;
/** Do we have to evaluate virtual columns (using mariadb_table)? */
bool used;
/** True if it is used for the first time. */
bool first_use;
/** MariaDB table opened for virtual column computation. */
TABLE* mariadb_table;
public:
/** Default constructor */
purge_vcol_info_t() :
requested(false), used(false), first_use(false),
mariadb_table(NULL)
{}
/** Reset the state. */
void reset()
{
requested = false;
used = false;
first_use = false;
mariadb_table = NULL;
}
/** Validate the virtual column information.
@return true if the mariadb table opened successfully
or doesn't try to calculate virtual column. */
bool validate() const { return !used || mariadb_table; }
/** @return the table handle for evaluating virtual columns */
TABLE* table() const { return mariadb_table; }
/** Set the table handle for evaluating virtual columns.
@param[in] table table handle */
void set_table(TABLE* table)
{
ut_ad(!table || is_first_fetch());
mariadb_table = table;
}
/** Note that virtual column information may be needed. */
void set_requested()
{
ut_ad(!used);
ut_ad(!first_use);
ut_ad(!mariadb_table);
requested = true;
}
/** @return whether the virtual column information may be needed */
bool is_requested() const { return requested; }
/** Note that the virtual column information is needed. */
void set_used()
{
ut_ad(requested);
if (first_use) {
first_use = false;
ut_ad(used);
return;
}
if (!used) {
first_use = used = true;
}
}
/** @return whether the virtual column information is needed */
bool is_used() const
{
ut_ad(!first_use || used);
ut_ad(!used || requested);
ut_ad(used || !mariadb_table);
return used;
}
/** Check whether it fetches mariadb table for the first time.
@return true if first time tries to open mariadb table. */
bool is_first_fetch() const
{
ut_ad(!first_use || used);
ut_ad(!used || requested);
ut_ad(used || !mariadb_table);
return first_use;
}
};
#endif #endif

View File

@@ -70,7 +70,6 @@ this case we return TRUE.
@param[in] ientry secondary index entry @param[in] ientry secondary index entry
@param[in] roll_ptr roll_ptr for the purge record @param[in] roll_ptr roll_ptr for the purge record
@param[in] trx_id transaction ID on the purging record @param[in] trx_id transaction ID on the purging record
@param[in,out] vcol_info virtual column information for purge thread.
@return TRUE if earlier version should have */ @return TRUE if earlier version should have */
bool bool
row_vers_old_has_index_entry( row_vers_old_has_index_entry(
@@ -80,8 +79,7 @@ row_vers_old_has_index_entry(
dict_index_t* index, dict_index_t* index,
const dtuple_t* ientry, const dtuple_t* ientry,
roll_ptr_t roll_ptr, roll_ptr_t roll_ptr,
trx_id_t trx_id, trx_id_t trx_id);
purge_vcol_info_t* vcol_info=NULL);
/*****************************************************************//** /*****************************************************************//**
Constructs the version of a clustered index record which a consistent Constructs the version of a clustered index record which a consistent

View File

@@ -140,7 +140,6 @@ public:
MY_ALIGNED(CACHE_LINE_SIZE) MY_ALIGNED(CACHE_LINE_SIZE)
rw_lock_t latch; rw_lock_t latch;
private: private:
bool m_initialized;
/** whether purge is enabled; protected by latch and std::atomic */ /** whether purge is enabled; protected by latch and std::atomic */
std::atomic<bool> m_enabled; std::atomic<bool> m_enabled;
/** number of pending stop() calls without resume() */ /** number of pending stop() calls without resume() */
@@ -213,6 +212,8 @@ public:
fil_space_t* last; fil_space_t* last;
} truncate; } truncate;
/** Heap for reading the undo log records */
mem_heap_t* heap;
/** /**
Constructor. Constructor.
@@ -220,8 +221,7 @@ public:
uninitialised. Real initialisation happens in create(). uninitialised. Real initialisation happens in create().
*/ */
purge_sys_t(): m_initialized(false), m_enabled(false) {} purge_sys_t(): m_enabled(false), heap(nullptr) {}
/** Create the instance */ /** Create the instance */
void create(); void create();
@@ -261,12 +261,6 @@ public:
/** The global data structure coordinating a purge */ /** The global data structure coordinating a purge */
extern purge_sys_t purge_sys; extern purge_sys_t purge_sys;
/** Info required to purge a record */
struct trx_purge_rec_t {
trx_undo_rec_t* undo_rec; /*!< Record to purge */
roll_ptr_t roll_ptr; /*!< File pointr to UNDO record */
};
#include "trx0purge.ic" #include "trx0purge.ic"
#endif /* trx0purge_h */ #endif /* trx0purge_h */

View File

@@ -327,6 +327,16 @@ record */
/** The search tuple corresponding to TRX_UNDO_INSERT_METADATA */ /** The search tuple corresponding to TRX_UNDO_INSERT_METADATA */
extern const dtuple_t trx_undo_metadata; extern const dtuple_t trx_undo_metadata;
/** Read the table id from an undo log record.
@param[in] rec Undo log record
@return table id stored as a part of undo log record */
inline table_id_t trx_undo_rec_get_table_id(const trx_undo_rec_t *rec)
{
rec+= 3;
mach_read_next_much_compressed(&rec);
return mach_read_next_much_compressed(&rec);
}
#include "trx0rec.ic" #include "trx0rec.ic"
#endif /* trx0rec_h */ #endif /* trx0rec_h */

View File

@@ -3373,19 +3373,6 @@ row_drop_table_for_mysql(
table records yet. Thus it is safe to release and table records yet. Thus it is safe to release and
reacquire the data dictionary latches. */ reacquire the data dictionary latches. */
if (table->fts) { if (table->fts) {
ut_ad(!table->fts->add_wq);
ut_ad(lock_trx_has_sys_table_locks(trx) == 0);
for (;;) {
bool retry = false;
if (dict_fts_index_syncing(table)) {
retry = true;
}
if (!retry) {
break;
}
DICT_BG_YIELD(trx);
}
row_mysql_unlock_data_dictionary(trx); row_mysql_unlock_data_dictionary(trx);
fts_optimize_remove_table(table); fts_optimize_remove_table(table);
row_mysql_lock_data_dictionary(trx); row_mysql_lock_data_dictionary(trx);

View File

@@ -103,15 +103,13 @@ row_purge_remove_clust_if_poss_low(
purge_node_t* node, /*!< in/out: row purge node */ purge_node_t* node, /*!< in/out: row purge node */
ulint mode) /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */ ulint mode) /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
{ {
ut_ad(rw_lock_own(&dict_sys.latch, RW_LOCK_S)
|| node->vcol_info.is_used());
dict_index_t* index = dict_table_get_first_index(node->table); dict_index_t* index = dict_table_get_first_index(node->table);
log_free_check(); log_free_check();
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
index->set_modified(mtr);
if (!row_purge_reposition_pcur(mode, node, &mtr)) { if (!row_purge_reposition_pcur(mode, node, &mtr)) {
/* The record was already removed. */ /* The record was already removed. */
@@ -119,9 +117,6 @@ row_purge_remove_clust_if_poss_low(
return true; return true;
} }
ut_d(const bool was_instant = !!index->table->instant);
index->set_modified(mtr);
rec_t* rec = btr_pcur_get_rec(&node->pcur); rec_t* rec = btr_pcur_get_rec(&node->pcur);
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_offs_init(offsets_); rec_offs_init(offsets_);
@@ -161,10 +156,6 @@ row_purge_remove_clust_if_poss_low(
} }
} }
/* Prove that dict_index_t::clear_instant_alter() was
not called with index->table->instant != NULL. */
ut_ad(!was_instant || index->table->instant);
func_exit: func_exit:
if (heap) { if (heap) {
mem_heap_free(heap); mem_heap_free(heap);
@@ -210,54 +201,6 @@ row_purge_remove_clust_if_poss(
return(false); return(false);
} }
/** Tries to store secondary index cursor before openin mysql table for
virtual index condition computation.
@param[in,out] node row purge node
@param[in] index secondary index
@param[in,out] sec_pcur secondary index cursor
@param[in,out] sec_mtr mini-transaction which holds
secondary index entry */
static void row_purge_store_vsec_cur(
purge_node_t* node,
dict_index_t* index,
btr_pcur_t* sec_pcur,
mtr_t* sec_mtr)
{
row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, sec_mtr);
if (!node->found_clust) {
return;
}
node->vcol_info.set_requested();
btr_pcur_store_position(sec_pcur, sec_mtr);
btr_pcurs_commit_specify_mtr(&node->pcur, sec_pcur, sec_mtr);
}
/** Tries to restore secondary index cursor after opening the mysql table
@param[in,out] node row purge node
@param[in] index secondary index
@param[in,out] sec_mtr mini-transaction which holds secondary index entry
@param[in] is_tree true=pessimistic purge,
false=optimistic (leaf-page only)
@return false in case of restore failure. */
static bool row_purge_restore_vsec_cur(
purge_node_t* node,
dict_index_t* index,
btr_pcur_t* sec_pcur,
mtr_t* sec_mtr,
bool is_tree)
{
sec_mtr->start();
index->set_modified(*sec_mtr);
return btr_pcur_restore_position(
is_tree ? BTR_PURGE_TREE : BTR_PURGE_LEAF,
sec_pcur, sec_mtr);
}
/** Determines if it is possible to remove a secondary index entry. /** Determines if it is possible to remove a secondary index entry.
Removal is possible if the secondary index entry does not refer to any Removal is possible if the secondary index entry does not refer to any
not delete marked version of a clustered index record where DB_TRX_ID not delete marked version of a clustered index record where DB_TRX_ID
@@ -297,53 +240,13 @@ row_purge_poss_sec(
ut_ad(!dict_index_is_clust(index)); ut_ad(!dict_index_is_clust(index));
const bool store_cur = sec_mtr && !node->vcol_info.is_used()
&& dict_index_has_virtual(index);
if (store_cur) {
row_purge_store_vsec_cur(node, index, sec_pcur, sec_mtr);
ut_ad(sec_mtr->has_committed()
== node->vcol_info.is_requested());
/* The PRIMARY KEY value was not found in the clustered
index. The secondary index record found. We can purge
the secondary index record. */
if (!node->vcol_info.is_requested()) {
ut_ad(!node->found_clust);
return true;
}
}
retry_purge_sec:
mtr_start(&mtr); mtr_start(&mtr);
can_delete = !row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, &mtr) can_delete = !row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, &mtr)
|| !row_vers_old_has_index_entry(true, || !row_vers_old_has_index_entry(true,
btr_pcur_get_rec(&node->pcur), btr_pcur_get_rec(&node->pcur),
&mtr, index, entry, &mtr, index, entry,
node->roll_ptr, node->trx_id, node->roll_ptr, node->trx_id);
&node->vcol_info);
if (node->vcol_info.is_first_fetch()) {
ut_ad(store_cur);
const TABLE* t= node->vcol_info.table();
DBUG_LOG("purge", "retry " << t
<< (is_tree ? " tree" : " leaf")
<< index->name << "," << index->table->name
<< ": " << rec_printer(entry).str());
ut_ad(mtr.has_committed());
if (t) {
node->vcol_info.set_used();
goto retry_purge_sec;
}
node->table = NULL;
sec_pcur = NULL;
return false;
}
/* Persistent cursor is closed if reposition fails. */ /* Persistent cursor is closed if reposition fails. */
if (node->found_clust) { if (node->found_clust) {
@@ -354,18 +257,6 @@ retry_purge_sec:
ut_ad(mtr.has_committed()); ut_ad(mtr.has_committed());
/* If the virtual column info is not used then reset the virtual column
info. */
if (node->vcol_info.is_requested()
&& !node->vcol_info.is_used()) {
node->vcol_info.reset();
}
if (store_cur && !row_purge_restore_vsec_cur(
node, index, sec_pcur, sec_mtr, is_tree)) {
return false;
}
return can_delete; return can_delete;
} }
@@ -482,13 +373,6 @@ row_purge_remove_sec_if_poss_tree(
} }
} }
if (node->vcol_op_failed()) {
ut_ad(mtr.has_committed());
ut_ad(!pcur.old_rec_buf);
ut_ad(pcur.pos_state == BTR_PCUR_NOT_POSITIONED);
return false;
}
func_exit: func_exit:
btr_pcur_close(&pcur); // FIXME: need this? btr_pcur_close(&pcur); // FIXME: need this?
func_exit_no_pcur: func_exit_no_pcur:
@@ -641,11 +525,6 @@ row_purge_remove_sec_if_poss_leaf(
} }
} }
if (node->vcol_op_failed()) {
btr_pcur_close(&pcur);
return false;
}
/* (The index entry is still needed, /* (The index entry is still needed,
or the deletion succeeded) */ or the deletion succeeded) */
/* fall through */ /* fall through */
@@ -692,10 +571,6 @@ row_purge_remove_sec_if_poss(
return; return;
} }
retry: retry:
if (node->vcol_op_failed()) {
return;
}
success = row_purge_remove_sec_if_poss_tree(node, index, entry); success = row_purge_remove_sec_if_poss_tree(node, index, entry);
/* The delete operation may fail if we have little /* The delete operation may fail if we have little
file space left: TODO: easiest to crash the database file space left: TODO: easiest to crash the database
@@ -762,12 +637,6 @@ row_purge_del_mark(
node->row, NULL, node->index, node->row, NULL, node->index,
heap, ROW_BUILD_FOR_PURGE); heap, ROW_BUILD_FOR_PURGE);
row_purge_remove_sec_if_poss(node, node->index, entry); row_purge_remove_sec_if_poss(node, node->index, entry);
if (node->vcol_op_failed()) {
mem_heap_free(heap);
return false;
}
mem_heap_empty(heap); mem_heap_empty(heap);
} }
@@ -785,8 +654,6 @@ whose old history can no longer be observed.
@param[in,out] mtr mini-transaction (will be started and committed) */ @param[in,out] mtr mini-transaction (will be started and committed) */
static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr) static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr)
{ {
ut_ad(rw_lock_own(&dict_sys.latch, RW_LOCK_S)
|| node->vcol_info.is_used());
/* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */ /* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */
mtr->start(); mtr->start();
@@ -869,8 +736,6 @@ row_purge_upd_exist_or_extern_func(
{ {
mem_heap_t* heap; mem_heap_t* heap;
ut_ad(rw_lock_own(&dict_sys.latch, RW_LOCK_S)
|| node->vcol_info.is_used());
ut_ad(!node->table->skip_alter_undo); ut_ad(!node->table->skip_alter_undo);
if (node->rec_type == TRX_UNDO_UPD_DEL_REC if (node->rec_type == TRX_UNDO_UPD_DEL_REC
@@ -898,11 +763,6 @@ row_purge_upd_exist_or_extern_func(
heap, ROW_BUILD_FOR_PURGE); heap, ROW_BUILD_FOR_PURGE);
row_purge_remove_sec_if_poss(node, node->index, entry); row_purge_remove_sec_if_poss(node, node->index, entry);
if (node->vcol_op_failed()) {
ut_ad(!node->table);
mem_heap_free(heap);
return;
}
ut_ad(node->table); ut_ad(node->table);
mem_heap_empty(heap); mem_heap_empty(heap);
@@ -1005,18 +865,20 @@ skip_secondaries:
row_purge_upd_exist_or_extern_func(node,undo_rec) row_purge_upd_exist_or_extern_func(node,undo_rec)
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
/***********************************************************//** /** Parses the row reference and other info in a modify undo log record.
Parses the row reference and other info in a modify undo log record. @param[in] node row undo node
@param[in] undo_rec record to purge
@param[in] thr query thread
@param[out] updated_extern true if an externally stored field was
updated
@return true if purge operation required */ @return true if purge operation required */
static static
bool bool
row_purge_parse_undo_rec( row_purge_parse_undo_rec(
/*=====================*/ purge_node_t* node,
purge_node_t* node, /*!< in: row undo node */ trx_undo_rec_t* undo_rec,
trx_undo_rec_t* undo_rec, /*!< in: record to purge */ que_thr_t* thr,
bool* updated_extern, /*!< out: true if an externally bool* updated_extern)
stored field was updated */
que_thr_t* thr) /*!< in: query thread */
{ {
dict_index_t* clust_index; dict_index_t* clust_index;
byte* ptr; byte* ptr;
@@ -1063,28 +925,27 @@ row_purge_parse_undo_rec(
return false; return false;
} }
/* Prevent DROP TABLE etc. from running when we are doing the purge
for this row */
try_again:
rw_lock_s_lock_inline(&dict_sys.latch, 0, __FILE__, __LINE__);
node->table = dict_table_open_on_id(
table_id, FALSE, DICT_TABLE_OP_NORMAL);
trx_id_t trx_id = TRX_ID_MAX; trx_id_t trx_id = TRX_ID_MAX;
if (node->table == NULL) { if (node->retain_mdl(table_id)) {
/* The table has been dropped: no need to do purge */ ut_ad(node->table != NULL);
goto already_locked;
}
try_again:
node->table = dict_table_open_on_id(
table_id, false, DICT_TABLE_OP_NORMAL, node->purge_thd,
&node->mdl_ticket);
if (node->table == NULL || node->table->name.is_temporary()) {
/* The table has been dropped: no need to do purge and
release mdl happened as a part of open process itself */
goto err_exit; goto err_exit;
} }
already_locked:
ut_ad(!node->table->is_temporary()); ut_ad(!node->table->is_temporary());
if (!fil_table_accessible(node->table)) {
goto inaccessible;
}
switch (type) { switch (type) {
case TRX_UNDO_INSERT_METADATA: case TRX_UNDO_INSERT_METADATA:
case TRX_UNDO_INSERT_REC: case TRX_UNDO_INSERT_REC:
@@ -1097,19 +958,13 @@ try_again:
/* Need server fully up for virtual column computation */ /* Need server fully up for virtual column computation */
if (!mysqld_server_started) { if (!mysqld_server_started) {
dict_table_close(node->table, FALSE, FALSE); node->close_table();
rw_lock_s_unlock(&dict_sys.latch);
if (srv_shutdown_state != SRV_SHUTDOWN_NONE) { if (srv_shutdown_state != SRV_SHUTDOWN_NONE) {
return(false); return(false);
} }
os_thread_sleep(1000000); os_thread_sleep(1000000);
goto try_again; goto try_again;
} }
node->vcol_info.set_requested();
node->vcol_info.set_used();
node->vcol_info.set_table(innobase_init_vc_templ(node->table));
node->vcol_info.set_used();
} }
clust_index = dict_table_get_first_index(node->table); clust_index = dict_table_get_first_index(node->table);
@@ -1118,21 +973,20 @@ try_again:
/* The table was corrupt in the data dictionary. /* The table was corrupt in the data dictionary.
dict_set_corrupted() works on an index, and dict_set_corrupted() works on an index, and
we do not have an index to call it with. */ we do not have an index to call it with. */
inaccessible:
DBUG_ASSERT(table_id == node->table->id); DBUG_ASSERT(table_id == node->table->id);
trx_id = node->table->def_trx_id; trx_id = node->table->def_trx_id;
if (!trx_id) { if (!trx_id) {
trx_id = TRX_ID_MAX; trx_id = TRX_ID_MAX;
} }
dict_table_close(node->table, FALSE, FALSE);
node->table = NULL;
err_exit: err_exit:
rw_lock_s_unlock(&dict_sys.latch); node->close_table();
node->skip(table_id, trx_id); node->skip(table_id, trx_id);
return(false); return(false);
} }
node->last_table_id = table_id;
if (type == TRX_UNDO_INSERT_METADATA) { if (type == TRX_UNDO_INSERT_METADATA) {
node->ref = &trx_undo_metadata; node->ref = &trx_undo_metadata;
return(true); return(true);
@@ -1165,20 +1019,21 @@ err_exit:
return(true); return(true);
} }
/***********************************************************//** /** Purges the parsed record.
Purges the parsed record. @param[in] node row purge node
@param[in] undo_rec record to purge
@param[in] thr query thread
@param[in] updated_extern whether external columns were updated
@return true if purged, false if skipped */ @return true if purged, false if skipped */
static MY_ATTRIBUTE((nonnull, warn_unused_result)) static MY_ATTRIBUTE((nonnull, warn_unused_result))
bool bool
row_purge_record_func( row_purge_record_func(
/*==================*/ purge_node_t* node,
purge_node_t* node, /*!< in: row purge node */ trx_undo_rec_t* undo_rec,
trx_undo_rec_t* undo_rec, /*!< in: record to purge */
#if defined UNIV_DEBUG || defined WITH_WSREP #if defined UNIV_DEBUG || defined WITH_WSREP
const que_thr_t*thr, /*!< in: query thread */ const que_thr_t*thr,
#endif /* UNIV_DEBUG || WITH_WSREP */ #endif /* UNIV_DEBUG || WITH_WSREP */
bool updated_extern) /*!< in: whether external columns bool updated_extern)
were updated */
{ {
dict_index_t* clust_index; dict_index_t* clust_index;
bool purged = true; bool purged = true;
@@ -1226,11 +1081,6 @@ row_purge_record_func(
node->found_clust = FALSE; node->found_clust = FALSE;
} }
if (node->table != NULL) {
dict_table_close(node->table, FALSE, FALSE);
node->table = NULL;
}
return(purged); return(purged);
} }
@@ -1258,20 +1108,13 @@ row_purge(
bool updated_extern; bool updated_extern;
while (row_purge_parse_undo_rec( while (row_purge_parse_undo_rec(
node, undo_rec, &updated_extern, thr)) { node, undo_rec, thr, &updated_extern)) {
bool purged = row_purge_record( bool purged = row_purge_record(
node, undo_rec, thr, updated_extern); node, undo_rec, thr, updated_extern);
if (!node->vcol_info.is_used()) {
rw_lock_s_unlock(&dict_sys.latch);
}
ut_ad(!rw_lock_own(&dict_sys.latch, RW_LOCK_S));
if (purged if (purged
|| srv_shutdown_state != SRV_SHUTDOWN_NONE || srv_shutdown_state != SRV_SHUTDOWN_NONE) {
|| node->vcol_op_failed()) {
return; return;
} }
@@ -1311,28 +1154,23 @@ row_purge_step(
node->start(); node->start();
if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) { if (!node->undo_recs.empty()) {
trx_purge_rec_t*purge_rec; trx_purge_rec_t* purge_rec =
node->undo_recs.front();
purge_rec = static_cast<trx_purge_rec_t*>( node->undo_recs.pop_front();
ib_vector_pop(node->undo_recs));
node->roll_ptr = purge_rec->roll_ptr; node->roll_ptr = purge_rec->roll_ptr;
row_purge(node, purge_rec->undo_rec, thr); row_purge(node, purge_rec->undo_rec, thr);
if (ib_vector_is_empty(node->undo_recs)) { if (node->undo_recs.empty()) {
row_purge_end(thr); row_purge_end(thr);
} else { } else {
thr->run_node = node; thr->run_node = node;
node->vcol_info.reset();
} }
} else { } else {
row_purge_end(thr); row_purge_end(thr);
} }
innobase_reset_background_thd(thr_get_trx(thr)->mysql_thd);
return(thr); return(thr);
} }

View File

@@ -440,16 +440,14 @@ row_vers_impl_x_locked(
@param[in,out] row the cluster index row in dtuple form @param[in,out] row the cluster index row in dtuple form
@param[in] clust_index clustered index @param[in] clust_index clustered index
@param[in] index the secondary index @param[in] index the secondary index
@param[in] heap heap used to build virtual dtuple @param[in] heap heap used to build virtual dtuple. */
@param[in,out] vcol_info virtual column information. */
static static
void void
row_vers_build_clust_v_col( row_vers_build_clust_v_col(
dtuple_t* row, dtuple_t* row,
dict_index_t* clust_index, dict_index_t* clust_index,
dict_index_t* index, dict_index_t* index,
mem_heap_t* heap, mem_heap_t* heap)
purge_vcol_info_t* vcol_info)
{ {
mem_heap_t* local_heap = NULL; mem_heap_t* local_heap = NULL;
VCOL_STORAGE *vcol_storage= NULL; VCOL_STORAGE *vcol_storage= NULL;
@@ -460,10 +458,6 @@ row_vers_build_clust_v_col(
ut_ad(dict_index_has_virtual(index)); ut_ad(dict_index_has_virtual(index));
ut_ad(index->table == clust_index->table); ut_ad(index->table == clust_index->table);
if (vcol_info != NULL) {
vcol_info->set_used();
maria_table = vcol_info->table();
}
DEBUG_SYNC(current_thd, "ib_clust_v_col_before_row_allocated"); DEBUG_SYNC(current_thd, "ib_clust_v_col_before_row_allocated");
innobase_allocate_row_for_vcol(thd, index, innobase_allocate_row_for_vcol(thd, index,
@@ -472,10 +466,7 @@ row_vers_build_clust_v_col(
&record, &record,
&vcol_storage); &vcol_storage);
if (vcol_info && !vcol_info->table()) { ut_ad(maria_table);
vcol_info->set_table(maria_table);
goto func_exit;
}
for (ulint i = 0; i < dict_index_get_n_fields(index); i++) { for (ulint i = 0; i < dict_index_get_n_fields(index); i++) {
const dict_field_t* ind_field = dict_index_get_nth_field( const dict_field_t* ind_field = dict_index_get_nth_field(
@@ -494,7 +485,6 @@ row_vers_build_clust_v_col(
} }
} }
func_exit:
if (local_heap) { if (local_heap) {
if (vcol_storage) if (vcol_storage)
innobase_free_row_for_vcol(vcol_storage); innobase_free_row_for_vcol(vcol_storage);
@@ -802,7 +792,6 @@ func_exit:
@param[in,out] heap heap memory @param[in,out] heap heap memory
@param[in,out] v_heap heap memory to keep virtual colum dtuple @param[in,out] v_heap heap memory to keep virtual colum dtuple
@param[in] mtr mtr holding the latch on rec @param[in] mtr mtr holding the latch on rec
@param[in,out] vcol_info virtual column information for purge thread
@return dtuple contains virtual column data */ @return dtuple contains virtual column data */
static static
dtuple_t* dtuple_t*
@@ -816,8 +805,7 @@ row_vers_build_cur_vrow(
trx_id_t trx_id, trx_id_t trx_id,
mem_heap_t* heap, mem_heap_t* heap,
mem_heap_t* v_heap, mem_heap_t* v_heap,
mtr_t* mtr, mtr_t* mtr)
purge_vcol_info_t* vcol_info)
{ {
dtuple_t* cur_vrow = NULL; dtuple_t* cur_vrow = NULL;
@@ -837,16 +825,8 @@ row_vers_build_cur_vrow(
rec, *clust_offsets, rec, *clust_offsets,
NULL, NULL, NULL, NULL, heap); NULL, NULL, NULL, NULL, heap);
if (vcol_info && !vcol_info->is_used()) {
mtr->commit();
}
row_vers_build_clust_v_col( row_vers_build_clust_v_col(
row, clust_index, index, heap, vcol_info); row, clust_index, index, heap);
if (vcol_info != NULL && vcol_info->is_first_fetch()) {
return NULL;
}
cur_vrow = dtuple_copy(row, v_heap); cur_vrow = dtuple_copy(row, v_heap);
dtuple_dup_v_fld(cur_vrow, v_heap); dtuple_dup_v_fld(cur_vrow, v_heap);
@@ -878,7 +858,6 @@ this case we return TRUE.
@param[in] ientry secondary index entry @param[in] ientry secondary index entry
@param[in] roll_ptr roll_ptr for the purge record @param[in] roll_ptr roll_ptr for the purge record
@param[in] trx_id transaction ID on the purging record @param[in] trx_id transaction ID on the purging record
@param[in,out] vcol_info virtual column information for purge thread.
@return TRUE if earlier version should have */ @return TRUE if earlier version should have */
bool bool
row_vers_old_has_index_entry( row_vers_old_has_index_entry(
@@ -888,8 +867,7 @@ row_vers_old_has_index_entry(
dict_index_t* index, dict_index_t* index,
const dtuple_t* ientry, const dtuple_t* ientry,
roll_ptr_t roll_ptr, roll_ptr_t roll_ptr,
trx_id_t trx_id, trx_id_t trx_id)
purge_vcol_info_t* vcol_info)
{ {
const rec_t* version; const rec_t* version;
rec_t* prev_version; rec_t* prev_version;
@@ -906,9 +884,6 @@ row_vers_old_has_index_entry(
ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
| MTR_MEMO_PAGE_S_FIX)); | MTR_MEMO_PAGE_S_FIX));
ut_ad(!rw_lock_own(&purge_sys.latch, RW_LOCK_S));
ut_ad(also_curr || !vcol_info);
clust_index = dict_table_get_first_index(index->table); clust_index = dict_table_get_first_index(index->table);
comp = page_rec_is_comp(rec); comp = page_rec_is_comp(rec);
@@ -959,17 +934,8 @@ row_vers_old_has_index_entry(
if (trx_undo_roll_ptr_is_insert(t_roll_ptr) if (trx_undo_roll_ptr_is_insert(t_roll_ptr)
|| dbug_v_purge) { || dbug_v_purge) {
if (vcol_info && !vcol_info->is_used()) {
mtr->commit();
}
row_vers_build_clust_v_col( row_vers_build_clust_v_col(
row, clust_index, index, heap, row, clust_index, index, heap);
vcol_info);
if (vcol_info && vcol_info->is_first_fetch()) {
goto unsafe_to_purge;
}
entry = row_build_index_entry( entry = row_build_index_entry(
row, ext, index, heap); row, ext, index, heap);
@@ -1045,11 +1011,7 @@ unsafe_to_purge:
cur_vrow = row_vers_build_cur_vrow( cur_vrow = row_vers_build_cur_vrow(
also_curr, rec, clust_index, &clust_offsets, also_curr, rec, clust_index, &clust_offsets,
index, roll_ptr, trx_id, heap, v_heap, mtr, vcol_info); index, roll_ptr, trx_id, heap, v_heap, mtr);
if (vcol_info && vcol_info->is_first_fetch()) {
goto unsafe_to_purge;
}
} }
version = rec; version = rec;

View File

@@ -160,6 +160,7 @@ purge_graph_build()
void purge_sys_t::create() void purge_sys_t::create()
{ {
ut_ad(this == &purge_sys); ut_ad(this == &purge_sys);
ut_ad(!heap);
ut_ad(!enabled()); ut_ad(!enabled());
m_paused= 0; m_paused= 0;
query= purge_graph_build(); query= purge_graph_build();
@@ -173,14 +174,14 @@ void purge_sys_t::create()
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex); mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
truncate.current= NULL; truncate.current= NULL;
truncate.last= NULL; truncate.last= NULL;
m_initialized= true; heap= mem_heap_create(4096);
} }
/** Close the purge subsystem on shutdown. */ /** Close the purge subsystem on shutdown. */
void purge_sys_t::close() void purge_sys_t::close()
{ {
ut_ad(this == &purge_sys); ut_ad(this == &purge_sys);
if (!m_initialized) if (!heap)
return; return;
ut_ad(!enabled()); ut_ad(!enabled());
@@ -192,7 +193,8 @@ void purge_sys_t::close()
trx_free(trx); trx_free(trx);
rw_lock_free(&latch); rw_lock_free(&latch);
mutex_free(&pq_mutex); mutex_free(&pq_mutex);
m_initialized= false; mem_heap_free(heap);
heap= nullptr;
} }
/*================ UNDO LOG HISTORY LIST =============================*/ /*================ UNDO LOG HISTORY LIST =============================*/
@@ -1114,7 +1116,7 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
node = (purge_node_t*) thr->child; node = (purge_node_t*) thr->child;
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE); ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
ut_ad(node->undo_recs == NULL); ut_ad(node->undo_recs.empty());
ut_ad(!node->in_progress); ut_ad(!node->in_progress);
ut_d(node->in_progress = true); ut_d(node->in_progress = true);
} }
@@ -1133,7 +1135,9 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
i = 0; i = 0;
const ulint batch_size = srv_purge_batch_size; const ulint batch_size = srv_purge_batch_size;
std::map<table_id_t, purge_node_t*> table_id_map;
mem_heap_empty(purge_sys.heap);
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
purge_node_t* node; purge_node_t* node;
@@ -1146,7 +1150,7 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
ut_a(que_node_get_type(node) == QUE_NODE_PURGE); ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
purge_rec = static_cast<trx_purge_rec_t*>( purge_rec = static_cast<trx_purge_rec_t*>(
mem_heap_zalloc(node->heap, sizeof(*purge_rec))); mem_heap_zalloc(purge_sys.heap, sizeof(*purge_rec)));
/* Track the max {trx_id, undo_no} for truncating the /* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */ UNDO logs once we have purged the records. */
@@ -1157,36 +1161,39 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
/* Fetch the next record, and advance the purge_sys.tail. */ /* Fetch the next record, and advance the purge_sys.tail. */
purge_rec->undo_rec = trx_purge_fetch_next_rec( purge_rec->undo_rec = trx_purge_fetch_next_rec(
&purge_rec->roll_ptr, &n_pages_handled, node->heap); &purge_rec->roll_ptr, &n_pages_handled,
purge_sys.heap);
if (purge_rec->undo_rec != NULL) { if (purge_rec->undo_rec == NULL) {
break;
} else if (purge_rec->undo_rec == &trx_purge_dummy_rec) {
continue;
}
if (node->undo_recs == NULL) { table_id_t table_id = trx_undo_rec_get_table_id(
node->undo_recs = ib_vector_create( purge_rec->undo_rec);
ib_heap_allocator_create(node->heap),
sizeof(trx_purge_rec_t),
batch_size);
} else {
ut_a(!ib_vector_is_empty(node->undo_recs));
}
ib_vector_push(node->undo_recs, purge_rec); auto it = table_id_map.find(table_id);
if (n_pages_handled >= batch_size) { if (it != table_id_map.end()) {
node = it->second;
break;
}
} else { } else {
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
thr = UT_LIST_GET_FIRST(
purge_sys.query->thrs);
}
ut_a(thr != NULL);
table_id_map.insert({table_id, node});
}
node->undo_recs.push_back(purge_rec);
if (n_pages_handled >= batch_size) {
break; break;
} }
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
}
ut_a(thr != NULL);
} }
ut_ad(purge_sys.head <= purge_sys.tail); ut_ad(purge_sys.head <= purge_sys.tail);