diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 0687b42fcbc..2b8b0f3bb91 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3636,11 +3636,7 @@ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id) DBUG_ASSERT(t->first->versioned_by_id()); DBUG_ASSERT(trx->rsegs.m_redo.rseg); - mutex_enter(&trx_sys.mutex); - trx_id_t commit_id = trx_sys.get_new_trx_id(); - mutex_exit(&trx_sys.mutex); - - return commit_id; + return trx_sys.get_new_trx_id(); } } @@ -19742,9 +19738,7 @@ wsrep_fake_trx_id( handlerton *hton, THD *thd) /*!< in: user thread handle */ { - mutex_enter(&trx_sys.mutex); trx_id_t trx_id = trx_sys.get_new_trx_id(); - mutex_exit(&trx_sys.mutex); WSREP_DEBUG("innodb fake trx id: " TRX_ID_FMT " thd: %s", trx_id, wsrep_thd_query(thd)); wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id); diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h index 89f87346074..848cb33dbca 100644 --- a/storage/innobase/include/read0types.h +++ b/storage/innobase/include/read0types.h @@ -165,8 +165,10 @@ public: private: /** Opens a read view where exactly the transactions serialized before this - point in time are seen in the view. */ - inline void clone(); + point in time are seen in the view. + + @param[in,out] trx transaction */ + void open(trx_t *trx); /** Copy state from another view. diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index e7b0e75de10..0d3c916ee81 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -804,6 +804,10 @@ private: MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_max_trx_id; + + /** Solves race condition between register_rw() and snapshot_ids(). */ + MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_rw_trx_hash_version; + bool m_initialised; public: @@ -830,16 +834,6 @@ public: transactions that have not yet been started in InnoDB. */ - MY_ALIGNED(CACHE_LINE_SIZE) - trx_ids_t rw_trx_ids; /*!< Array of Read write transaction IDs - for MVCC snapshot. A ReadView would take - a snapshot of these transactions whose - changes are not visible to it. We should - remove transactions from the list before - committing in memory and releasing locks - to ensure right order of removal and - consistent snapshot. */ - MY_ALIGNED(CACHE_LINE_SIZE) /** Temporary rollback segments */ trx_rseg_t* temp_rsegs[TRX_SYS_N_RSEGS]; @@ -870,13 +864,11 @@ public: /** Constructor. - We only initialise rw_trx_ids here as it is impossible to postpone it's - initialisation to create(). + Some members may require late initialisation, thus we just mark object as + uninitialised. Real initialisation happens in create(). */ - trx_sys_t(): m_initialised(false), - rw_trx_ids(ut_allocator(mem_key_trx_sys_t_rw_trx_ids)) - {} + trx_sys_t(): m_initialised(false) {} /** @@ -920,15 +912,54 @@ public: trx_id_t get_new_trx_id() { - ut_ad(mutex_own(&mutex)); - return static_cast(my_atomic_add64_explicit( - reinterpret_cast(&m_max_trx_id), 1, MY_MEMORY_ORDER_RELAXED)); + trx_id_t id= get_new_trx_id_no_refresh(); + refresh_rw_trx_hash_version(); + return id; } + /** + Takes MVCC snapshot. + + To reduce malloc probablility we reserver rw_trx_hash.size() + 32 elements + in ids. + + For details about get_rw_trx_hash_version() != get_max_trx_id() spin + @sa register_rw(). + + We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so + that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash. + + To optimise snapshot creation rw_trx_hash.iterate() is being used instead + of rw_trx_hash.iterate_no_dups(). It means that some transaction + identifiers may appear multiple times in ids. + + @param[in,out] caller_trx used to get access to rw_trx_hash_pins + @param[out] ids array to store registered transaction identifiers + @param[out] max_trx_id variable to store m_max_trx_id value + */ + + void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id) + { + snapshot_ids_arg arg(ids); + + while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id()) + ut_delay(1); + + ids->clear(); + ids->reserve(rw_trx_hash.size() + 32); + *max_trx_id= arg.m_id; + rw_trx_hash.iterate(caller_trx, + reinterpret_cast(copy_one_id), + &arg); + std::sort(ids->begin(), ids->end()); + } + + + /** Initialiser for m_max_trx_id and m_rw_trx_hash_version. */ void init_max_trx_id(trx_id_t value) { - m_max_trx_id= value; + m_max_trx_id= m_rw_trx_hash_version= value; } @@ -945,14 +976,44 @@ public: ulint any_active_transactions(); - /** Registers read-write transaction. */ + /** + Registers read-write transaction. + + Transaction becomes visible to MVCC. + + There's a gap between m_max_trx_id increment and transaction becoming + visible through rw_trx_hash. While we're in this gap concurrent thread may + come and do MVCC snapshot. As a result concurrent read view will be able to + observe records owned by this transaction even before it was committed. + + m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has + to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively + means that all transactions up to m_max_trx_id are available through + rw_trx_hash. + + We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so + that m_rw_trx_hash_version increment happens after transaction becomes + visible through rw_trx_hash. + */ + void register_rw(trx_t *trx) { - mutex_enter(&mutex); - trx->id= get_new_trx_id(); - rw_trx_ids.push_back(trx->id); - mutex_exit(&mutex); + trx->id= get_new_trx_id_no_refresh(); rw_trx_hash.insert(trx); + refresh_rw_trx_hash_version(); + } + + + /** + Deregisters read-write transaction. + + Transaction is removed from rw_trx_hash, which releases all implicit locks. + MVCC snapshot won't see this transaction anymore. + */ + + void deregister_rw(trx_t *trx) + { + rw_trx_hash.erase(trx); } @@ -982,6 +1043,60 @@ private: } return 0; } + + + struct snapshot_ids_arg + { + snapshot_ids_arg(trx_ids_t *ids): m_ids(ids) {} + trx_ids_t *m_ids; + trx_id_t m_id; + }; + + + static my_bool copy_one_id(rw_trx_hash_element_t *element, + snapshot_ids_arg *arg) + { + if (element->id < arg->m_id) + arg->m_ids->push_back(element->id); + return 0; + } + + + /** Getter for m_rw_trx_hash_version, must issue ACQUIRE memory barrier. */ + trx_id_t get_rw_trx_hash_version() + { + return static_cast + (my_atomic_load64_explicit(reinterpret_cast + (&m_rw_trx_hash_version), + MY_MEMORY_ORDER_ACQUIRE)); + } + + + /** Increments m_rw_trx_hash_version, must issue RELEASE memory barrier. */ + void refresh_rw_trx_hash_version() + { + my_atomic_add64_explicit(reinterpret_cast(&m_rw_trx_hash_version), + 1, MY_MEMORY_ORDER_RELEASE); + } + + + /** + Allocates new transaction id without refreshing rw_trx_hash version. + + This method is extracted for exclusive use by register_rw() where + transaction must be inserted into rw_trx_hash between new transaction id + allocation and rw_trx_hash version refresh. + + @sa get_new_trx_id() + + @return new transaction id + */ + + trx_id_t get_new_trx_id_no_refresh() + { + return static_cast(my_atomic_add64_explicit( + reinterpret_cast(&m_max_trx_id), 1, MY_MEMORY_ORDER_RELAXED)); + } }; diff --git a/storage/innobase/include/ut0new.h b/storage/innobase/include/ut0new.h index 955e7b026c7..d51fb40a73e 100644 --- a/storage/innobase/include/ut0new.h +++ b/storage/innobase/include/ut0new.h @@ -172,7 +172,6 @@ extern PSI_memory_key mem_key_other; extern PSI_memory_key mem_key_row_log_buf; extern PSI_memory_key mem_key_row_merge_sort; extern PSI_memory_key mem_key_std; -extern PSI_memory_key mem_key_trx_sys_t_rw_trx_ids; extern PSI_memory_key mem_key_partitioning; /** Setup the internal objects needed for UT_NEW() to operate. diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index b2dea25aa9f..93a20f84f67 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -207,49 +207,25 @@ MVCC::validate() const } #endif /* UNIV_DEBUG */ + /** -Opens a read view where exactly the transactions serialized before this -point in time are seen in the view. */ + Opens a read view where exactly the transactions serialized before this + point in time are seen in the view. -void ReadView::clone() + @param[in,out] trx transaction +*/ + +void ReadView::open(trx_t *trx) { - ut_ad(mutex_own(&trx_sys.mutex)); - m_low_limit_no = m_low_limit_id = trx_sys.get_max_trx_id(); + ut_ad(mutex_own(&trx_sys.mutex)); + trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id); + m_low_limit_no= m_low_limit_id; + m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front(); + ut_ad(m_up_limit_id <= m_low_limit_id); - m_ids= trx_sys.rw_trx_ids; -#ifdef UNIV_DEBUG - /* Original assertion was here to make sure that rw_trx_ids and - rw_trx_hash are in sync and they hold either ACTIVE or PREPARED - transaction. - - Now rw_trx_hash_t::find() does - ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || - trx_state_eq(trx, TRX_STATE_PREPARED)). - No need to repeat it here. We even can't repeat it here: it'll be race - condition because we need trx->element->mutex locked to perform this - check (see how it is done in find()). - - Now rw_trx_ids and rw_trx_hash may get out of sync for a short while: - when transaction is registered it first gets added into rw_trx_ids - under trx_sys.mutex protection and then to rw_trx_hash without mutex - protection. Thus we need repeat this lookup. */ - for (trx_ids_t::const_iterator it = trx_sys.rw_trx_ids.begin(); - it != trx_sys.rw_trx_ids.end(); ++it) { - while (!trx_sys.is_registered(current_trx(), *it)); - } -#endif /* UNIV_DEBUG */ - m_up_limit_id = m_ids.empty() ? m_low_limit_id : m_ids.front(); - ut_ad(m_up_limit_id <= m_low_limit_id); - - if (UT_LIST_GET_LEN(trx_sys.serialisation_list) > 0) { - const trx_t* trx; - - trx = UT_LIST_GET_FIRST(trx_sys.serialisation_list); - - if (trx->no < m_low_limit_no) { - m_low_limit_no = trx->no; - } - } + if (const trx_t *trx= UT_LIST_GET_FIRST(trx_sys.serialisation_list)) + if (trx->no < m_low_limit_no) + m_low_limit_no= trx->no; } @@ -324,7 +300,7 @@ void MVCC::view_open(trx_t* trx) } mutex_enter(&trx_sys.mutex); - trx->read_view.clone(); + trx->read_view.open(trx); if (trx->read_view.is_registered()) UT_LIST_REMOVE(m_views, &trx->read_view); else @@ -375,7 +351,7 @@ MVCC::clone_oldest_view(ReadView* view) { mutex_enter(&trx_sys.mutex); /* Find oldest view. */ - for (ReadView *oldest_view = UT_LIST_GET_LAST(m_views); + for (const ReadView *oldest_view = UT_LIST_GET_LAST(m_views); oldest_view != NULL; oldest_view = UT_LIST_GET_PREV(m_view_list, oldest_view)) { @@ -387,7 +363,7 @@ MVCC::clone_oldest_view(ReadView* view) } } /* No views in the list: snapshot current state. */ - view->clone(); + view->open(0); mutex_exit(&trx_sys.mutex); } diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index 20244b02964..7e7870f53ce 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -859,7 +859,7 @@ void trx_rollback_recovered(bool all) ut_ad(!srv_undo_sources); ut_ad(srv_fast_shutdown); discard: - trx_sys.rw_trx_hash.erase(trx); + trx_sys.deregister_rw(trx); trx_free_at_shutdown(trx); } else diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 056bddb5d73..bb808db7e48 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -878,7 +878,6 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, trx_sys.rw_trx_hash.insert(trx); trx_sys.rw_trx_hash.put_pins(trx); - trx_sys.rw_trx_ids.push_back(trx->id); trx_resurrect_table_locks(trx, undo); if (trx_state_eq(trx, TRX_STATE_ACTIVE)) *rows_to_undo+= trx->undo_no; @@ -973,8 +972,6 @@ trx_lists_init_at_db_start() ib::info() << "Trx id counter is " << trx_sys.get_max_trx_id(); } - - std::sort(trx_sys.rw_trx_ids.begin(), trx_sys.rw_trx_ids.end()); trx_sys.mvcc.clone_oldest_view(&purge_sys->view); } @@ -1519,9 +1516,7 @@ trx_update_mod_tables_timestamp( /** Erase the transaction from running transaction lists and serialization -list. Active RW transaction list of a MVCC snapshot(ReadView::prepare) -won't include this transaction after this call. All implicit locks are -also released by this call as trx is removed from rw_trx_hash. +list. @param[in] trx Transaction to erase, must have an ID > 0 @param[in] serialised true if serialisation log was written */ static @@ -1538,15 +1533,8 @@ trx_erase_lists( } else { mutex_enter(&trx_sys.mutex); } - - trx_ids_t::iterator it = std::lower_bound( - trx_sys.rw_trx_ids.begin(), - trx_sys.rw_trx_ids.end(), - trx->id); - ut_ad(*it == trx->id); - trx_sys.rw_trx_ids.erase(it); mutex_exit(&trx_sys.mutex); - trx_sys.rw_trx_hash.erase(trx); + trx_sys.deregister_rw(trx); } /****************************************************************//** diff --git a/storage/innobase/ut/ut0new.cc b/storage/innobase/ut/ut0new.cc index bf5515f4de0..1815c26a288 100644 --- a/storage/innobase/ut/ut0new.cc +++ b/storage/innobase/ut/ut0new.cc @@ -43,7 +43,6 @@ PSI_memory_key mem_key_other; PSI_memory_key mem_key_row_log_buf; PSI_memory_key mem_key_row_merge_sort; PSI_memory_key mem_key_std; -PSI_memory_key mem_key_trx_sys_t_rw_trx_ids; PSI_memory_key mem_key_partitioning; #ifdef UNIV_PFS_MEMORY @@ -72,7 +71,6 @@ static PSI_memory_info pfs_info[] = { {&mem_key_row_log_buf, "row_log_buf", 0}, {&mem_key_row_merge_sort, "row_merge_sort", 0}, {&mem_key_std, "std", 0}, - {&mem_key_trx_sys_t_rw_trx_ids, "trx_sys_t::rw_trx_ids", 0}, {&mem_key_partitioning, "partitioning", 0}, };