mirror of
https://github.com/MariaDB/server.git
synced 2025-08-09 22:24:09 +03:00
MDEV-32050: Do not copy undo records in purge
Also, default to innodb_purge_batch_size=1000,
replacing the old default value of processing 300 undo log pages
in a batch. Axel Schwenke found this value to help reduce purge lag
without having a significant impact on workload throughput.
In purge, we can simply acquire a shared latch on the undo log page
(to avoid a race condition like the one that was fixed in
commit b102872ad5
) and retain a buffer-fix
after releasing the latch. The buffer-fix will prevent the undo log
page from being evicted from the buffer pool. Concurrent modification
is prevented by design. Only the purge_coordinator_task
(or its accomplice purge_truncation_task) may free the undo log pages,
after any purge_worker_task have completed execution. Hence, we do not
have to worry about any overwriting or reuse of the undo log records.
trx_undo_rec_copy(): Remove. The only remaining caller would have been
trx_undo_get_undo_rec_low(), which is where the logic was merged.
purge_sys_t::m_initialized: Replaces heap.
purge_sys_t::pages: A cache of buffer-fixed pages that have been
looked up from buf_pool.page_hash.
purge_sys_t::get_page(): Return a buffer-fixed undo page, using the
pages cache.
trx_purge_t::batch_cleanup(): Renamed from clone_end_view().
Clear the pages cache and clone the end_view at the end of a batch.
purge_sys_t::n_pages_handled(): Return pages.size(). This determines
if innodb_purge_batch_size was exceeded.
purge_sys_t::rseg_get_next_history_log(): Replaces
trx_purge_rseg_get_next_history_log().
purge_sys_t::choose_next_log(): Replaces trx_purge_choose_next_log()
and trx_purge_read_undo_rec().
purge_sys_t::get_next_rec(): Replaces trx_purge_get_next_rec()
and trx_undo_get_next_rec().
purge_sys_t::fetch_next_rec(): Replaces trx_purge_fetch_next_rec()
and some use of trx_undo_get_first_rec().
trx_purge_attach_undo_recs(): Do not allow purge_sys.n_pages_handled()
exceed the innodb_purge_batch_size or ¾ of the buffer pool, whichever
is smaller.
Reviewed by: Vladislav Lesin
Tested by: Matthias Leich and Axel Schwenke
This commit is contained in:
@@ -1,19 +1,19 @@
|
||||
SET @global_start_value = @@global.innodb_purge_batch_size;
|
||||
SELECT @global_start_value;
|
||||
@global_start_value
|
||||
300
|
||||
1000
|
||||
'#--------------------FN_DYNVARS_046_01------------------------#'
|
||||
SET @@global.innodb_purge_batch_size = 1;
|
||||
SET @@global.innodb_purge_batch_size = DEFAULT;
|
||||
SELECT @@global.innodb_purge_batch_size;
|
||||
@@global.innodb_purge_batch_size
|
||||
300
|
||||
1000
|
||||
'#---------------------FN_DYNVARS_046_02-------------------------#'
|
||||
SET innodb_purge_batch_size = 1;
|
||||
ERROR HY000: Variable 'innodb_purge_batch_size' is a GLOBAL variable and should be set with SET GLOBAL
|
||||
SELECT @@innodb_purge_batch_size;
|
||||
@@innodb_purge_batch_size
|
||||
300
|
||||
1000
|
||||
SELECT local.innodb_purge_batch_size;
|
||||
ERROR 42S02: Unknown table 'local' in field list
|
||||
SET global innodb_purge_batch_size = 1;
|
||||
@@ -112,4 +112,4 @@ SELECT @@global.innodb_purge_batch_size;
|
||||
SET @@global.innodb_purge_batch_size = @global_start_value;
|
||||
SELECT @@global.innodb_purge_batch_size;
|
||||
@@global.innodb_purge_batch_size
|
||||
300
|
||||
1000
|
||||
|
@@ -307,7 +307,7 @@
|
||||
NUMERIC_MAX_VALUE 65536
|
||||
@@ -1345,7 +1345,7 @@
|
||||
SESSION_VALUE NULL
|
||||
DEFAULT_VALUE 300
|
||||
DEFAULT_VALUE 1000
|
||||
VARIABLE_SCOPE GLOBAL
|
||||
-VARIABLE_TYPE BIGINT UNSIGNED
|
||||
+VARIABLE_TYPE INT UNSIGNED
|
||||
|
@@ -1293,7 +1293,7 @@ READ_ONLY NO
|
||||
COMMAND_LINE_ARGUMENT OPTIONAL
|
||||
VARIABLE_NAME INNODB_PURGE_BATCH_SIZE
|
||||
SESSION_VALUE NULL
|
||||
DEFAULT_VALUE 300
|
||||
DEFAULT_VALUE 1000
|
||||
VARIABLE_SCOPE GLOBAL
|
||||
VARIABLE_TYPE BIGINT UNSIGNED
|
||||
VARIABLE_COMMENT Number of UNDO log pages to purge in one batch from the history list.
|
||||
|
@@ -18852,7 +18852,7 @@ static MYSQL_SYSVAR_ULONG(purge_batch_size, srv_purge_batch_size,
|
||||
PLUGIN_VAR_OPCMDARG,
|
||||
"Number of UNDO log pages to purge in one batch from the history list.",
|
||||
NULL, NULL,
|
||||
300, /* Default setting */
|
||||
1000, /* Default setting */
|
||||
1, /* Minimum value */
|
||||
5000, 0); /* Maximum value */
|
||||
|
||||
|
@@ -80,15 +80,6 @@ row_purge_step(
|
||||
que_thr_t* thr) /*!< in: query thread */
|
||||
MY_ATTRIBUTE((nonnull, warn_unused_result));
|
||||
|
||||
/** Info required to purge a record */
|
||||
struct trx_purge_rec_t
|
||||
{
|
||||
/** Record to purge */
|
||||
const trx_undo_rec_t *undo_rec;
|
||||
/** File pointer to undo record */
|
||||
roll_ptr_t roll_ptr;
|
||||
};
|
||||
|
||||
/** Purge worker context */
|
||||
struct purge_node_t
|
||||
{
|
||||
|
@@ -31,6 +31,7 @@ Created 3/26/1996 Heikki Tuuri
|
||||
#include "srw_lock.h"
|
||||
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
|
||||
/** Prepend the history list with an undo log.
|
||||
Remove the undo log segment from the rseg slot if it is too big for reuse.
|
||||
@@ -127,6 +128,7 @@ private:
|
||||
/** The control structure used in the purge operation */
|
||||
class purge_sys_t
|
||||
{
|
||||
friend TrxUndoRsegsIterator;
|
||||
public:
|
||||
/** latch protecting view, m_enabled */
|
||||
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock latch;
|
||||
@@ -134,6 +136,8 @@ private:
|
||||
/** Read view at the start of a purge batch. Any encountered index records
|
||||
that are older than view will be removed. */
|
||||
ReadViewBase view;
|
||||
/** whether the subsystem has been initialized */
|
||||
bool m_initialized{false};
|
||||
/** whether purge is enabled; protected by latch and std::atomic */
|
||||
std::atomic<bool> m_enabled{false};
|
||||
public:
|
||||
@@ -152,7 +156,34 @@ private:
|
||||
/** Read view at the end of a purge batch (copied from view). Any undo pages
|
||||
containing records older than end_view may be freed. */
|
||||
ReadViewBase end_view;
|
||||
|
||||
struct hasher
|
||||
{
|
||||
size_t operator()(const page_id_t &id) const { return size_t(id.raw()); }
|
||||
};
|
||||
|
||||
using unordered_map =
|
||||
std::unordered_map<const page_id_t, buf_block_t*, hasher,
|
||||
#if defined __GNUC__ && __GNUC__ == 4 && __GNUC_MINOR__ >= 8
|
||||
std::equal_to<page_id_t>
|
||||
/* GCC 4.8.5 would fail to find a matching allocator */
|
||||
#else
|
||||
std::equal_to<page_id_t>,
|
||||
ut_allocator<std::pair<const page_id_t, buf_block_t*>>
|
||||
#endif
|
||||
>;
|
||||
/** map of buffer-fixed undo log pages processed during a purge batch */
|
||||
unordered_map pages;
|
||||
public:
|
||||
/** @return the number of processed undo pages */
|
||||
size_t n_pages_handled() const { return pages.size(); }
|
||||
|
||||
/** Look up an undo log page.
|
||||
@param id undo page identifier
|
||||
@return undo page
|
||||
@retval nullptr in case the page is corrupted */
|
||||
buf_block_t *get_page(page_id_t id);
|
||||
|
||||
que_t* query; /*!< The query graph which will do the
|
||||
parallelized purge operation */
|
||||
|
||||
@@ -188,6 +219,7 @@ public:
|
||||
to purge */
|
||||
trx_rseg_t* rseg; /*!< Rollback segment for the next undo
|
||||
record to purge */
|
||||
private:
|
||||
uint32_t page_no; /*!< Page number for the next undo
|
||||
record to purge, page number of the
|
||||
log header, if dummy record */
|
||||
@@ -202,7 +234,7 @@ public:
|
||||
TrxUndoRsegsIterator
|
||||
rseg_iter; /*!< Iterator to get the next rseg
|
||||
to process */
|
||||
|
||||
public:
|
||||
purge_pq_t purge_queue; /*!< Binary min-heap, ordered on
|
||||
TrxUndoRsegs::trx_no. It is protected
|
||||
by the pq_mutex */
|
||||
@@ -217,17 +249,6 @@ public:
|
||||
fil_space_t* last;
|
||||
} truncate;
|
||||
|
||||
/** Heap for reading the undo log records */
|
||||
mem_heap_t* heap;
|
||||
/**
|
||||
Constructor.
|
||||
|
||||
Some members may require late initialisation, thus we just mark object as
|
||||
uninitialised. Real initialisation happens in create().
|
||||
*/
|
||||
|
||||
purge_sys_t(): m_enabled(false), heap(nullptr) {}
|
||||
|
||||
/** Create the instance */
|
||||
void create();
|
||||
|
||||
@@ -281,6 +302,32 @@ public:
|
||||
/** @return whether stop_SYS() is in effect */
|
||||
bool must_wait_FTS() const { return m_FTS_paused; }
|
||||
|
||||
private:
|
||||
/**
|
||||
Get the next record to purge and update the info in the purge system.
|
||||
@param roll_ptr undo log pointer to the record
|
||||
@return buffer-fixed reference to undo log record
|
||||
@retval {nullptr,1} if the whole undo log can skipped in purge
|
||||
@retval {nullptr,0} if nothing is left, or on corruption */
|
||||
inline trx_purge_rec_t get_next_rec(roll_ptr_t roll_ptr);
|
||||
|
||||
/** Choose the next undo log to purge.
|
||||
@return whether anything is to be purged */
|
||||
bool choose_next_log();
|
||||
|
||||
/** Update the last not yet purged history log info in rseg when
|
||||
we have purged a whole undo log. Advances also purge_trx_no
|
||||
past the purged log. */
|
||||
void rseg_get_next_history_log();
|
||||
|
||||
public:
|
||||
/**
|
||||
Fetch the next undo log record from the history list to purge.
|
||||
@return buffer-fixed reference to undo log record
|
||||
@retval {nullptr,1} if the whole undo log can skipped in purge
|
||||
@retval {nullptr,0} if nothing is left, or on corruption */
|
||||
inline trx_purge_rec_t fetch_next_rec();
|
||||
|
||||
/** Determine if the history of a transaction is purgeable.
|
||||
@param trx_id transaction identifier
|
||||
@return whether the history is purgeable */
|
||||
@@ -327,9 +374,10 @@ public:
|
||||
/** Wake up the purge threads if there is work to do. */
|
||||
void wake_if_not_active();
|
||||
|
||||
/** Update end_view at the end of a purge batch.
|
||||
@param head the new head of the purge queue */
|
||||
inline void clone_end_view(const iterator &head);
|
||||
/** Release undo pages and update end_view at the end of a purge batch.
|
||||
@retval false when nothing is to be purged
|
||||
@retval true when purge_sys.rseg->latch was locked */
|
||||
inline void batch_cleanup(const iterator &head);
|
||||
|
||||
struct view_guard
|
||||
{
|
||||
|
@@ -28,32 +28,9 @@ Created 3/26/1996 Heikki Tuuri
|
||||
|
||||
#include "trx0types.h"
|
||||
#include "row0types.h"
|
||||
#include "mtr0mtr.h"
|
||||
#include "rem0types.h"
|
||||
#include "page0types.h"
|
||||
#include "row0log.h"
|
||||
#include "que0types.h"
|
||||
|
||||
/***********************************************************************//**
|
||||
Copies the undo record to the heap.
|
||||
@param undo_rec record in an undo log page
|
||||
@param heap memory heap
|
||||
@return copy of undo_rec
|
||||
@retval nullptr if the undo log record is corrupted */
|
||||
inline trx_undo_rec_t* trx_undo_rec_copy(const trx_undo_rec_t *undo_rec,
|
||||
mem_heap_t *heap)
|
||||
{
|
||||
const size_t offset= ut_align_offset(undo_rec, srv_page_size);
|
||||
const size_t end= mach_read_from_2(undo_rec);
|
||||
if (end <= offset || end >= srv_page_size - FIL_PAGE_DATA_END)
|
||||
return nullptr;
|
||||
const size_t len= end - offset;
|
||||
trx_undo_rec_t *rec= static_cast<trx_undo_rec_t*>
|
||||
(mem_heap_dup(heap, undo_rec, len));
|
||||
mach_write_to_2(rec, len);
|
||||
return rec;
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
Reads the undo log record number.
|
||||
@return undo no */
|
||||
|
@@ -107,6 +107,15 @@ typedef byte trx_undo_rec_t;
|
||||
|
||||
/* @} */
|
||||
|
||||
/** Info required to purge a record */
|
||||
struct trx_purge_rec_t
|
||||
{
|
||||
/** Undo log record, or nullptr (roll_ptr!=0 if the log can be skipped) */
|
||||
const trx_undo_rec_t *undo_rec;
|
||||
/** File pointer to undo_rec */
|
||||
roll_ptr_t roll_ptr;
|
||||
};
|
||||
|
||||
typedef std::vector<trx_id_t, ut_allocator<trx_id_t> > trx_ids_t;
|
||||
|
||||
/** Number of std::unordered_map hash buckets expected to be needed
|
||||
|
@@ -116,31 +116,16 @@ trx_undo_page_get_next_rec(const buf_block_t *undo_page, uint16_t rec,
|
||||
trx_undo_rec_t*
|
||||
trx_undo_get_prev_rec(buf_block_t *&block, uint16_t rec, uint32_t page_no,
|
||||
uint16_t offset, bool shared, mtr_t *mtr);
|
||||
/** Get the next record in an undo log.
|
||||
@param[in,out] block undo log page
|
||||
@param[in] rec undo record offset in the page
|
||||
@param[in] page_no undo log header page number
|
||||
@param[in] offset undo log header offset on page
|
||||
@param[in,out] mtr mini-transaction
|
||||
@return undo log record, the page latched, NULL if none */
|
||||
trx_undo_rec_t*
|
||||
trx_undo_get_next_rec(const buf_block_t *&block, uint16_t rec,
|
||||
uint32_t page_no, uint16_t offset, mtr_t *mtr);
|
||||
|
||||
/** Get the first record in an undo log.
|
||||
@param[in] space undo log header space
|
||||
/** Get the first undo log record on a page.
|
||||
@param[in] block undo log page
|
||||
@param[in] page_no undo log header page number
|
||||
@param[in] offset undo log header offset on page
|
||||
@param[in] mode latching mode: RW_S_LATCH or RW_X_LATCH
|
||||
@param[out] block undo log page
|
||||
@param[in,out] mtr mini-transaction
|
||||
@param[out] err error code
|
||||
@return undo log record, the page latched
|
||||
@retval nullptr if none */
|
||||
@param[in] offset undo log header page offset
|
||||
@return pointer to first record
|
||||
@retval nullptr if none exists */
|
||||
trx_undo_rec_t*
|
||||
trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
|
||||
uint16_t offset, ulint mode, const buf_block_t*& block,
|
||||
mtr_t *mtr, dberr_t *err);
|
||||
trx_undo_page_get_first_rec(const buf_block_t *block, uint32_t page_no,
|
||||
uint16_t offset);
|
||||
|
||||
/** Initialize an undo log page.
|
||||
NOTE: This corresponds to a redo log record and must not be changed!
|
||||
|
@@ -166,7 +166,7 @@ purge_graph_build()
|
||||
void purge_sys_t::create()
|
||||
{
|
||||
ut_ad(this == &purge_sys);
|
||||
ut_ad(!heap);
|
||||
ut_ad(!m_initialized);
|
||||
ut_ad(!enabled());
|
||||
m_paused= 0;
|
||||
query= purge_graph_build();
|
||||
@@ -181,18 +181,18 @@ void purge_sys_t::create()
|
||||
mysql_mutex_init(purge_sys_pq_mutex_key, &pq_mutex, nullptr);
|
||||
truncate.current= NULL;
|
||||
truncate.last= NULL;
|
||||
heap= mem_heap_create(4096);
|
||||
m_initialized= true;
|
||||
}
|
||||
|
||||
/** Close the purge subsystem on shutdown. */
|
||||
void purge_sys_t::close()
|
||||
{
|
||||
ut_ad(this == &purge_sys);
|
||||
if (!heap)
|
||||
if (!m_initialized)
|
||||
return;
|
||||
|
||||
ut_ad(!enabled());
|
||||
trx_t* trx = query->trx;
|
||||
trx_t *trx= query->trx;
|
||||
que_graph_free(query);
|
||||
ut_ad(!trx->id);
|
||||
ut_ad(trx->state == TRX_STATE_ACTIVE);
|
||||
@@ -201,8 +201,7 @@ void purge_sys_t::close()
|
||||
latch.destroy();
|
||||
end_latch.destroy();
|
||||
mysql_mutex_destroy(&pq_mutex);
|
||||
mem_heap_free(heap);
|
||||
heap= nullptr;
|
||||
m_initialized= false;
|
||||
}
|
||||
|
||||
/** Determine if the history of a transaction is purgeable.
|
||||
@@ -825,35 +824,45 @@ not_free:
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Updates the last not yet purged history log info in rseg when we have purged
|
||||
a whole undo log. Advances also purge_sys.purge_trx_no past the purged log.
|
||||
buf_block_t *purge_sys_t::get_page(page_id_t id)
|
||||
{
|
||||
buf_block_t*& undo_page= pages[id];
|
||||
|
||||
@param n_pages_handled number of UNDO pages handled */
|
||||
static void trx_purge_rseg_get_next_history_log(ulint *n_pages_handled)
|
||||
if (undo_page)
|
||||
return undo_page;
|
||||
|
||||
mtr_t mtr;
|
||||
mtr.start();
|
||||
undo_page=
|
||||
buf_page_get_gen(id, 0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr);
|
||||
|
||||
if (UNIV_LIKELY(undo_page != nullptr))
|
||||
{
|
||||
undo_page->fix();
|
||||
mtr.commit();
|
||||
return undo_page;
|
||||
}
|
||||
|
||||
mtr.commit();
|
||||
pages.erase(id);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void purge_sys_t::rseg_get_next_history_log()
|
||||
{
|
||||
fil_addr_t prev_log_addr;
|
||||
mtr_t mtr;
|
||||
|
||||
mtr.start();
|
||||
ut_ad(rseg->latch.is_write_locked());
|
||||
ut_a(rseg->last_page_no != FIL_NULL);
|
||||
|
||||
ut_ad(purge_sys.rseg->latch.is_write_locked());
|
||||
ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
|
||||
tail.trx_no= rseg->last_trx_no() + 1;
|
||||
tail.undo_no= 0;
|
||||
next_stored= false;
|
||||
|
||||
purge_sys.tail.trx_no= purge_sys.rseg->last_trx_no() + 1;
|
||||
purge_sys.tail.undo_no= 0;
|
||||
purge_sys.next_stored= false;
|
||||
|
||||
if (const buf_block_t* undo_page=
|
||||
buf_page_get_gen(page_id_t(purge_sys.rseg->space->id,
|
||||
purge_sys.rseg->last_page_no),
|
||||
0, RW_S_LATCH, nullptr,
|
||||
BUF_GET_POSSIBLY_FREED, &mtr))
|
||||
if (buf_block_t *undo_page=
|
||||
get_page(page_id_t(rseg->space->id, rseg->last_page_no)))
|
||||
{
|
||||
const trx_ulogf_t *log_hdr=
|
||||
undo_page->page.frame + purge_sys.rseg->last_offset();
|
||||
/* Increase the purge page count by one for every handled log */
|
||||
++*n_pages_handled;
|
||||
const byte *log_hdr= undo_page->page.frame + rseg->last_offset();
|
||||
prev_log_addr= flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE);
|
||||
prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset -
|
||||
TRX_UNDO_HISTORY_NODE);
|
||||
@@ -861,242 +870,204 @@ static void trx_purge_rseg_get_next_history_log(ulint *n_pages_handled)
|
||||
else
|
||||
prev_log_addr.page= FIL_NULL;
|
||||
|
||||
mtr.commit();
|
||||
|
||||
if (prev_log_addr.page == FIL_NULL)
|
||||
purge_sys.rseg->last_page_no= FIL_NULL;
|
||||
rseg->last_page_no= FIL_NULL;
|
||||
else
|
||||
{
|
||||
/* Read the previous log header. */
|
||||
mtr.start();
|
||||
|
||||
trx_id_t trx_no= 0;
|
||||
if (const buf_block_t* undo_page=
|
||||
buf_page_get_gen(page_id_t(purge_sys.rseg->space->id,
|
||||
prev_log_addr.page),
|
||||
0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr))
|
||||
get_page(page_id_t(rseg->space->id,
|
||||
prev_log_addr.page)))
|
||||
{
|
||||
const byte *log_hdr= undo_page->page.frame + prev_log_addr.boffset;
|
||||
trx_no= mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
|
||||
ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1);
|
||||
}
|
||||
|
||||
mtr.commit();
|
||||
|
||||
if (UNIV_LIKELY(trx_no != 0))
|
||||
{
|
||||
purge_sys.rseg->last_page_no= prev_log_addr.page;
|
||||
purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no);
|
||||
rseg->last_page_no= prev_log_addr.page;
|
||||
rseg->set_last_commit(prev_log_addr.boffset, trx_no);
|
||||
|
||||
/* Purge can also produce events, however these are already
|
||||
ordered in the rollback segment and any user generated event
|
||||
will be greater than the events that Purge produces. ie. Purge
|
||||
can never produce events from an empty rollback segment. */
|
||||
|
||||
mysql_mutex_lock(&purge_sys.pq_mutex);
|
||||
purge_sys.purge_queue.push(*purge_sys.rseg);
|
||||
mysql_mutex_unlock(&purge_sys.pq_mutex);
|
||||
mysql_mutex_lock(&pq_mutex);
|
||||
purge_queue.push(*rseg);
|
||||
mysql_mutex_unlock(&pq_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
purge_sys.rseg->latch.wr_unlock();
|
||||
rseg->latch.wr_unlock();
|
||||
}
|
||||
|
||||
/** Position the purge sys "iterator" on the undo record to use for purging. */
|
||||
static void trx_purge_read_undo_rec()
|
||||
{
|
||||
uint16_t offset;
|
||||
uint32_t page_no;
|
||||
ib_uint64_t undo_no;
|
||||
|
||||
purge_sys.hdr_offset = purge_sys.rseg->last_offset();
|
||||
page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
|
||||
|
||||
if (purge_sys.rseg->needs_purge) {
|
||||
mtr_t mtr;
|
||||
mtr.start();
|
||||
const buf_block_t* undo_page;
|
||||
if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec(
|
||||
*purge_sys.rseg->space, purge_sys.hdr_page_no,
|
||||
purge_sys.hdr_offset, RW_S_LATCH,
|
||||
undo_page, &mtr, nullptr)) {
|
||||
|
||||
offset = page_offset(undo_rec);
|
||||
undo_no = trx_undo_rec_get_undo_no(undo_rec);
|
||||
page_no = undo_page->page.id().page_no();
|
||||
} else {
|
||||
offset = 0;
|
||||
undo_no = 0;
|
||||
}
|
||||
|
||||
mtr.commit();
|
||||
} else {
|
||||
offset = 0;
|
||||
undo_no = 0;
|
||||
}
|
||||
|
||||
purge_sys.offset = offset;
|
||||
purge_sys.page_no = page_no;
|
||||
purge_sys.tail.undo_no = undo_no;
|
||||
|
||||
purge_sys.next_stored = true;
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Chooses the next undo log to purge and updates the info in purge_sys. This
|
||||
function is used to initialize purge_sys when the next record to purge is
|
||||
not known, and also to update the purge system info on the next record when
|
||||
purge has handled the whole undo log for a transaction.
|
||||
/** Position the purge sys "iterator" on the undo record to use for purging.
|
||||
@retval false when nothing is to be purged
|
||||
@retval true when purge_sys.rseg->latch was locked */
|
||||
static bool trx_purge_choose_next_log()
|
||||
bool purge_sys_t::choose_next_log()
|
||||
{
|
||||
if (purge_sys.rseg_iter.set_next()) {
|
||||
trx_purge_read_undo_rec();
|
||||
return true;
|
||||
} else {
|
||||
if (!rseg_iter.set_next())
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************************//**
|
||||
Gets the next record to purge and updates the info in the purge system.
|
||||
@return copy of an undo log record
|
||||
@retval -1 if there is nothing to purge
|
||||
@retval nullptr on corruption */
|
||||
static
|
||||
trx_undo_rec_t*
|
||||
trx_purge_get_next_rec(
|
||||
/*===================*/
|
||||
ulint* n_pages_handled,/*!< in/out: number of UNDO pages
|
||||
handled */
|
||||
mem_heap_t* heap) /*!< in: memory heap where copied */
|
||||
{
|
||||
mtr_t mtr;
|
||||
hdr_offset= rseg->last_offset();
|
||||
hdr_page_no= rseg->last_page_no;
|
||||
|
||||
ut_ad(purge_sys.next_stored);
|
||||
ut_ad(purge_sys.tail.trx_no < purge_sys.low_limit_no());
|
||||
ut_ad(purge_sys.rseg->latch.is_write_locked());
|
||||
|
||||
const page_id_t page_id{purge_sys.rseg->space->id, purge_sys.page_no};
|
||||
const uint16_t offset = purge_sys.offset;
|
||||
bool locked = true;
|
||||
|
||||
if (offset == 0) {
|
||||
/* It is the dummy undo log record, which means that there is
|
||||
no need to purge this undo log */
|
||||
|
||||
trx_purge_rseg_get_next_history_log(n_pages_handled);
|
||||
|
||||
/* Look for the next undo log and record to purge */
|
||||
|
||||
if (trx_purge_choose_next_log()) {
|
||||
purge_sys.rseg->latch.wr_unlock();
|
||||
}
|
||||
return reinterpret_cast<trx_undo_rec_t*>(-1);
|
||||
}
|
||||
|
||||
mtr.start();
|
||||
trx_undo_rec_t *rec_copy = nullptr;
|
||||
const buf_block_t* undo_page
|
||||
= buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr,
|
||||
BUF_GET_POSSIBLY_FREED, &mtr);
|
||||
if (UNIV_UNLIKELY(!undo_page)) {
|
||||
func_exit:
|
||||
if (locked) {
|
||||
purge_sys.rseg->latch.wr_unlock();
|
||||
}
|
||||
mtr.commit();
|
||||
return rec_copy;
|
||||
}
|
||||
|
||||
const buf_block_t* rec2_page = undo_page;
|
||||
|
||||
const trx_undo_rec_t* rec2 = trx_undo_page_get_next_rec(
|
||||
undo_page, offset, purge_sys.hdr_page_no, purge_sys.hdr_offset);
|
||||
|
||||
if (rec2 == NULL) {
|
||||
rec2 = trx_undo_get_next_rec(rec2_page, offset,
|
||||
purge_sys.hdr_page_no,
|
||||
purge_sys.hdr_offset, &mtr);
|
||||
}
|
||||
|
||||
if (rec2 == NULL) {
|
||||
mtr_commit(&mtr);
|
||||
|
||||
trx_purge_rseg_get_next_history_log(n_pages_handled);
|
||||
|
||||
/* Look for the next undo log and record to purge */
|
||||
|
||||
locked = trx_purge_choose_next_log();
|
||||
|
||||
mtr_start(&mtr);
|
||||
|
||||
undo_page = buf_page_get_gen(page_id, 0, RW_S_LATCH,
|
||||
nullptr, BUF_GET_POSSIBLY_FREED,
|
||||
&mtr);
|
||||
if (UNIV_UNLIKELY(!undo_page)) {
|
||||
goto func_exit;
|
||||
}
|
||||
} else {
|
||||
purge_sys.offset = page_offset(rec2);
|
||||
purge_sys.page_no = rec2_page->page.id().page_no();
|
||||
purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2);
|
||||
|
||||
if (undo_page != rec2_page) {
|
||||
/* We advance to a new page of the undo log: */
|
||||
(*n_pages_handled)++;
|
||||
}
|
||||
}
|
||||
|
||||
rec_copy = trx_undo_rec_copy(undo_page->page.frame + offset, heap);
|
||||
goto func_exit;
|
||||
}
|
||||
|
||||
/********************************************************************//**
|
||||
Fetches the next undo log record from the history list to purge. It must be
|
||||
released with the corresponding release function.
|
||||
@return copy of an undo log record
|
||||
@retval -1 if the whole undo log can skipped in purge
|
||||
@retval nullptr if nothing is left, or on corruption */
|
||||
static MY_ATTRIBUTE((warn_unused_result))
|
||||
trx_undo_rec_t*
|
||||
trx_purge_fetch_next_rec(
|
||||
/*=====================*/
|
||||
roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */
|
||||
ulint* n_pages_handled,/*!< in/out: number of UNDO log pages
|
||||
handled */
|
||||
mem_heap_t* heap) /*!< in: memory heap where copied */
|
||||
{
|
||||
if (!purge_sys.next_stored)
|
||||
if (!rseg->needs_purge)
|
||||
{
|
||||
bool locked= trx_purge_choose_next_log();
|
||||
ut_ad(locked == purge_sys.next_stored);
|
||||
purge_nothing:
|
||||
page_no= hdr_page_no;
|
||||
offset= 0;
|
||||
tail.undo_no= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
page_id_t id{rseg->space->id, hdr_page_no};
|
||||
buf_block_t *b= get_page(id);
|
||||
if (!b)
|
||||
goto purge_nothing;
|
||||
const trx_undo_rec_t *undo_rec=
|
||||
trx_undo_page_get_first_rec(b, hdr_page_no, hdr_offset);
|
||||
if (!undo_rec)
|
||||
{
|
||||
if (mach_read_from_2(b->page.frame + hdr_offset + TRX_UNDO_NEXT_LOG))
|
||||
goto purge_nothing;
|
||||
const uint32_t next=
|
||||
mach_read_from_4(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE +
|
||||
FLST_NEXT + FIL_ADDR_PAGE + b->page.frame);
|
||||
if (next == FIL_NULL)
|
||||
goto purge_nothing;
|
||||
id.set_page_no(next);
|
||||
b= get_page(id);
|
||||
if (!b)
|
||||
goto purge_nothing;
|
||||
undo_rec=
|
||||
trx_undo_page_get_first_rec(b, page_no, hdr_offset);
|
||||
if (!undo_rec)
|
||||
goto purge_nothing;
|
||||
}
|
||||
|
||||
offset= page_offset(undo_rec);
|
||||
tail.undo_no= trx_undo_rec_get_undo_no(undo_rec);
|
||||
page_no= id.page_no();
|
||||
}
|
||||
|
||||
next_stored= true;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
Get the next record to purge and update the info in the purge system.
|
||||
@param roll_ptr undo log pointer to the record
|
||||
@return buffer-fixed reference to undo log record
|
||||
@retval {nullptr,1} if the whole undo log can skipped in purge
|
||||
@retval {nullptr,0} if nothing is left, or on corruption */
|
||||
inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr)
|
||||
{
|
||||
ut_ad(next_stored);
|
||||
ut_ad(tail.trx_no < low_limit_no());
|
||||
ut_ad(rseg->latch.is_write_locked());
|
||||
|
||||
if (!offset)
|
||||
{
|
||||
/* It is the dummy undo log record, which means that there is no
|
||||
need to purge this undo log */
|
||||
rseg_get_next_history_log();
|
||||
|
||||
/* Look for the next undo log and record to purge */
|
||||
if (choose_next_log())
|
||||
rseg->latch.wr_unlock();
|
||||
return {nullptr, 1};
|
||||
}
|
||||
|
||||
ut_ad(offset == uint16_t(roll_ptr));
|
||||
|
||||
page_id_t page_id{rseg->space->id, page_no};
|
||||
bool locked= true;
|
||||
buf_block_t *b= get_page(page_id);
|
||||
if (UNIV_UNLIKELY(!b))
|
||||
{
|
||||
if (locked)
|
||||
rseg->latch.wr_unlock();
|
||||
return {nullptr, 0};
|
||||
}
|
||||
|
||||
if (const trx_undo_rec_t *rec2=
|
||||
trx_undo_page_get_next_rec(b, offset, hdr_page_no, hdr_offset))
|
||||
{
|
||||
got_rec:
|
||||
ut_ad(page_no == page_id.page_no());
|
||||
offset= page_offset(rec2);
|
||||
tail.undo_no= trx_undo_rec_get_undo_no(rec2);
|
||||
}
|
||||
else if (hdr_page_no != page_no ||
|
||||
!mach_read_from_2(b->page.frame + hdr_offset + TRX_UNDO_NEXT_LOG))
|
||||
{
|
||||
uint32_t next= mach_read_from_4(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE +
|
||||
FLST_NEXT + FIL_ADDR_PAGE + b->page.frame);
|
||||
if (next != FIL_NULL)
|
||||
{
|
||||
page_id.set_page_no(next);
|
||||
if (buf_block_t *next_page= get_page(page_id))
|
||||
{
|
||||
rec2= trx_undo_page_get_first_rec(next_page, hdr_page_no, hdr_offset);
|
||||
if (rec2)
|
||||
{
|
||||
page_no= next;
|
||||
goto got_rec;
|
||||
}
|
||||
}
|
||||
}
|
||||
goto got_no_rec;
|
||||
}
|
||||
else
|
||||
{
|
||||
got_no_rec:
|
||||
rseg_get_next_history_log();
|
||||
/* Look for the next undo log and record to purge */
|
||||
locked= choose_next_log();
|
||||
}
|
||||
|
||||
if (locked)
|
||||
rseg->latch.wr_unlock();
|
||||
|
||||
return {b->page.frame + uint16_t(roll_ptr), roll_ptr};
|
||||
}
|
||||
|
||||
inline trx_purge_rec_t purge_sys_t::fetch_next_rec()
|
||||
{
|
||||
roll_ptr_t roll_ptr;
|
||||
|
||||
if (!next_stored)
|
||||
{
|
||||
bool locked= choose_next_log();
|
||||
ut_ad(locked == next_stored);
|
||||
if (!locked)
|
||||
return nullptr;
|
||||
if (purge_sys.tail.trx_no >= purge_sys.low_limit_no())
|
||||
goto got_nothing;
|
||||
if (tail.trx_no >= low_limit_no())
|
||||
{
|
||||
purge_sys.rseg->latch.wr_unlock();
|
||||
return nullptr;
|
||||
rseg->latch.wr_unlock();
|
||||
goto got_nothing;
|
||||
}
|
||||
/* row_purge_record_func() will later set ROLL_PTR_INSERT_FLAG for
|
||||
TRX_UNDO_INSERT_REC */
|
||||
*roll_ptr= trx_undo_build_roll_ptr(false,
|
||||
trx_sys.rseg_id(purge_sys.rseg, true),
|
||||
purge_sys.page_no, purge_sys.offset);
|
||||
roll_ptr= trx_undo_build_roll_ptr(false, trx_sys.rseg_id(rseg, true),
|
||||
page_no, offset);
|
||||
}
|
||||
else if (purge_sys.tail.trx_no >= purge_sys.low_limit_no())
|
||||
return nullptr;
|
||||
else if (tail.trx_no >= low_limit_no())
|
||||
got_nothing:
|
||||
return {nullptr, 0};
|
||||
else
|
||||
{
|
||||
*roll_ptr= trx_undo_build_roll_ptr(false,
|
||||
trx_sys.rseg_id(purge_sys.rseg, true),
|
||||
purge_sys.page_no, purge_sys.offset);
|
||||
purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL);
|
||||
roll_ptr= trx_undo_build_roll_ptr(false, trx_sys.rseg_id(rseg, true),
|
||||
page_no, offset);
|
||||
rseg->latch.wr_lock(SRW_LOCK_CALL);
|
||||
}
|
||||
|
||||
/* The following will advance the purge iterator. */
|
||||
return trx_purge_get_next_rec(n_pages_handled, heap);
|
||||
return get_next_rec(roll_ptr);
|
||||
}
|
||||
|
||||
/** Close all tables that were opened in a purge batch for a worker.
|
||||
@@ -1267,13 +1238,12 @@ dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd,
|
||||
|
||||
/** Run a purge batch.
|
||||
@param n_purge_threads number of purge threads
|
||||
@return new purge_sys.head and the number of undo log pages handled */
|
||||
static std::pair<purge_sys_t::iterator,ulint>
|
||||
@return new purge_sys.head */
|
||||
static purge_sys_t::iterator
|
||||
trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
|
||||
{
|
||||
que_thr_t* thr;
|
||||
ulint i;
|
||||
ulint n_pages_handled = 0;
|
||||
|
||||
ut_a(n_purge_threads > 0);
|
||||
ut_a(UT_LIST_GET_LEN(purge_sys.query->thrs) >= n_purge_threads);
|
||||
@@ -1313,16 +1283,16 @@ trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
|
||||
|
||||
std::unordered_map<table_id_t, purge_node_t*>
|
||||
table_id_map(TRX_PURGE_TABLE_BUCKETS);
|
||||
mem_heap_empty(purge_sys.heap);
|
||||
purge_sys.m_active = true;
|
||||
|
||||
MDL_context* const mdl_context
|
||||
= static_cast<MDL_context*>(thd_mdl_context(thd));
|
||||
ut_ad(mdl_context);
|
||||
|
||||
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
|
||||
trx_purge_rec_t purge_rec;
|
||||
const size_t max_pages = std::min(buf_pool.curr_size * 3 / 4,
|
||||
size_t{srv_purge_batch_size});
|
||||
|
||||
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
|
||||
/* Track the max {trx_id, undo_no} for truncating the
|
||||
UNDO logs once we have purged the records. */
|
||||
|
||||
@@ -1331,14 +1301,13 @@ trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
|
||||
}
|
||||
|
||||
/* Fetch the next record, and advance the purge_sys.tail. */
|
||||
purge_rec.undo_rec = trx_purge_fetch_next_rec(
|
||||
&purge_rec.roll_ptr, &n_pages_handled,
|
||||
purge_sys.heap);
|
||||
trx_purge_rec_t purge_rec = purge_sys.fetch_next_rec();
|
||||
|
||||
if (!purge_rec.undo_rec) {
|
||||
if (!purge_rec.roll_ptr) {
|
||||
break;
|
||||
} else if (purge_rec.undo_rec
|
||||
== reinterpret_cast<trx_undo_rec_t*>(-1)) {
|
||||
}
|
||||
ut_ad(purge_rec.roll_ptr == 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1376,7 +1345,7 @@ enqueue:
|
||||
table_node->undo_recs.push(purge_rec);
|
||||
}
|
||||
|
||||
if (n_pages_handled >= srv_purge_batch_size) {
|
||||
if (purge_sys.n_pages_handled() >= max_pages) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1385,7 +1354,7 @@ enqueue:
|
||||
|
||||
ut_ad(head <= purge_sys.tail);
|
||||
|
||||
return std::make_pair(head, n_pages_handled);
|
||||
return head;
|
||||
}
|
||||
|
||||
extern tpool::waitable_task purge_worker_task;
|
||||
@@ -1408,10 +1377,15 @@ static void trx_purge_wait_for_workers_to_complete()
|
||||
ut_ad(srv_get_task_queue_length() == 0);
|
||||
}
|
||||
|
||||
/** Update end_view at the end of a purge batch. */
|
||||
TRANSACTIONAL_INLINE
|
||||
void purge_sys_t::clone_end_view(const purge_sys_t::iterator &head)
|
||||
void purge_sys_t::batch_cleanup(const purge_sys_t::iterator &head)
|
||||
{
|
||||
/* Release the undo pages. */
|
||||
for (auto p : pages)
|
||||
p.second->unfix();
|
||||
pages.clear();
|
||||
pages.reserve(srv_purge_batch_size);
|
||||
|
||||
/* This is only invoked only by the purge coordinator,
|
||||
which is the only thread that can modify our inputs head, tail, view.
|
||||
Therefore, we only need to protect end_view from concurrent reads. */
|
||||
@@ -1451,10 +1425,12 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
|
||||
THD* const thd = current_thd;
|
||||
|
||||
/* Fetch the UNDO recs that need to be purged. */
|
||||
const auto n = trx_purge_attach_undo_recs(n_tasks, thd);
|
||||
const purge_sys_t::iterator head
|
||||
= trx_purge_attach_undo_recs(n_tasks, thd);
|
||||
const size_t n_pages = purge_sys.n_pages_handled();
|
||||
|
||||
{
|
||||
ulint delay = n.second ? srv_max_purge_lag : 0;
|
||||
ulint delay = n_pages ? srv_max_purge_lag : 0;
|
||||
if (UNIV_UNLIKELY(delay)) {
|
||||
if (delay >= history_size) {
|
||||
no_throttle:
|
||||
@@ -1494,10 +1470,10 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
|
||||
node->tables.clear();
|
||||
}
|
||||
|
||||
purge_sys.clone_end_view(n.first);
|
||||
purge_sys.batch_cleanup(head);
|
||||
|
||||
MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
|
||||
MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n.second);
|
||||
MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages);
|
||||
|
||||
return n.second;
|
||||
return n_pages;
|
||||
}
|
||||
|
@@ -2061,12 +2061,23 @@ trx_undo_get_undo_rec_low(
|
||||
|
||||
mtr.start();
|
||||
|
||||
const buf_block_t* undo_page=
|
||||
buf_page_get(page_id_t(rseg->space->id, page_no), 0, RW_S_LATCH, &mtr);
|
||||
|
||||
trx_undo_rec_t *undo_rec= undo_page
|
||||
? trx_undo_rec_copy(undo_page->page.frame + offset, heap)
|
||||
: nullptr;
|
||||
trx_undo_rec_t *undo_rec= nullptr;
|
||||
if (const buf_block_t* undo_page=
|
||||
buf_page_get(page_id_t(rseg->space->id, page_no), 0, RW_S_LATCH, &mtr))
|
||||
{
|
||||
undo_rec= undo_page->page.frame + offset;
|
||||
const size_t end= mach_read_from_2(undo_rec);
|
||||
if (UNIV_UNLIKELY(end <= offset ||
|
||||
end >= srv_page_size - FIL_PAGE_DATA_END))
|
||||
undo_rec= nullptr;
|
||||
else
|
||||
{
|
||||
size_t len{end - offset};
|
||||
undo_rec=
|
||||
static_cast<trx_undo_rec_t*>(mem_heap_dup(heap, undo_rec, len));
|
||||
mach_write_to_2(undo_rec, len);
|
||||
}
|
||||
}
|
||||
|
||||
mtr.commit();
|
||||
return undo_rec;
|
||||
|
@@ -128,8 +128,8 @@ uint16_t trx_undo_page_get_start(const buf_block_t *block, uint32_t page_no,
|
||||
@param[in] page_no undo log header page number
|
||||
@param[in] offset undo log header page offset
|
||||
@return pointer to first record
|
||||
@retval NULL if none exists */
|
||||
static trx_undo_rec_t*
|
||||
@retval nullptr if none exists */
|
||||
trx_undo_rec_t*
|
||||
trx_undo_page_get_first_rec(const buf_block_t *block, uint32_t page_no,
|
||||
uint16_t offset)
|
||||
{
|
||||
@@ -253,25 +253,6 @@ trx_undo_get_next_rec_from_next_page(const buf_block_t *&block,
|
||||
return block ? trx_undo_page_get_first_rec(block, page_no, offset) : nullptr;
|
||||
}
|
||||
|
||||
/** Get the next record in an undo log.
|
||||
@param[in,out] block undo log page
|
||||
@param[in] rec undo record offset in the page
|
||||
@param[in] page_no undo log header page number
|
||||
@param[in] offset undo log header offset on page
|
||||
@param[in,out] mtr mini-transaction
|
||||
@return undo log record, the page latched, NULL if none */
|
||||
trx_undo_rec_t*
|
||||
trx_undo_get_next_rec(const buf_block_t *&block, uint16_t rec,
|
||||
uint32_t page_no, uint16_t offset, mtr_t *mtr)
|
||||
{
|
||||
if (trx_undo_rec_t *next= trx_undo_page_get_next_rec(block, rec, page_no,
|
||||
offset))
|
||||
return next;
|
||||
|
||||
return trx_undo_get_next_rec_from_next_page(block, page_no, offset,
|
||||
RW_S_LATCH, mtr);
|
||||
}
|
||||
|
||||
/** Get the first record in an undo log.
|
||||
@param[in] space undo log header space
|
||||
@param[in] page_no undo log header page number
|
||||
@@ -282,7 +263,7 @@ trx_undo_get_next_rec(const buf_block_t *&block, uint16_t rec,
|
||||
@param[out] err error code
|
||||
@return undo log record, the page latched
|
||||
@retval nullptr if none */
|
||||
trx_undo_rec_t*
|
||||
static trx_undo_rec_t*
|
||||
trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
|
||||
uint16_t offset, ulint mode, const buf_block_t*& block,
|
||||
mtr_t *mtr, dberr_t *err)
|
||||
|
Reference in New Issue
Block a user