From a5cf3a800e20e86a4469dff659e68cc1b21263e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Lindstr=C3=B6m?= Date: Fri, 7 Feb 2014 15:31:31 +0200 Subject: [PATCH] Merged latest mt-flush code to xtradb. Cleaned up thread statistic output code. --- storage/innobase/buf/buf0mtflu.cc | 116 ++--- storage/xtradb/CMakeLists.txt | 3 +- storage/xtradb/buf/buf0flu.cc | 228 +--------- storage/xtradb/buf/buf0mtflu.cc | 694 +++++++++++++++++++++++++++++ storage/xtradb/include/buf0flu.h | 57 +++ storage/xtradb/include/buf0mtflu.h | 95 ++++ storage/xtradb/include/srv0srv.h | 2 +- storage/xtradb/srv/srv0srv.cc | 2 + storage/xtradb/srv/srv0start.cc | 431 +----------------- 9 files changed, 910 insertions(+), 718 deletions(-) create mode 100644 storage/xtradb/buf/buf0mtflu.cc create mode 100644 storage/xtradb/include/buf0mtflu.h diff --git a/storage/innobase/buf/buf0mtflu.cc b/storage/innobase/buf/buf0mtflu.cc index 901f766c472..a81ccee5650 100644 --- a/storage/innobase/buf/buf0mtflu.cc +++ b/storage/innobase/buf/buf0mtflu.cc @@ -116,18 +116,13 @@ typedef struct wrk_itm /* Thread syncronization data */ typedef struct thread_sync { + ulint n_threads; /*!< Number of threads */ os_thread_id_t wthread_id; /*!< Identifier */ os_thread_t wthread; /*!< Thread id */ ib_wqueue_t *wq; /*!< Work Queue */ ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ wthr_status_t wt_status; /*!< Worker thread status */ - ulint stat_universal_num_processed; - /*!< Total number of pages - processed by this thread */ - ulint stat_cycle_num_processed; - /*!< Number of pages processed - on this cycle */ mem_heap_t* wheap; /*!< Work heap where memory is allocated */ wrk_t* work_item; /*!< Work items to be processed */ @@ -231,6 +226,7 @@ buf_mtflu_flush_pool_instance( work_item->wr.min, work_item->wr.lsn_limit); + buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); @@ -239,28 +235,29 @@ buf_mtflu_flush_pool_instance( #ifdef UNIV_DEBUG /******************************************************************//** -Output work item list status, +Print flush statistics of work items. */ static void -mtflu_print_work_list( -/*==================*/ - wrk_t* wi_list) /*!< in: Work item list */ +mtflu_print_thread_stat( +/*====================*/ + wrk_t* work_item) /*!< in: Work items */ { - wrk_t* wi = wi_list; + ulint stat_tot=0; ulint i=0; - if(!wi_list) { - fprintf(stderr, "list NULL\n"); - } + for(i=0; i< MTFLUSH_MAX_WORKER; i++) { + stat_tot+=work_item[i].n_flushed; - while(wi) { - fprintf(stderr, "-\t[%p]\t[%s]\t[%lu] > %p\n", - wi, (wi->id_usr == -1)?"free":"Busy", wi->n_flushed, wi->next); - wi = wi->next; - i++; - } - fprintf(stderr, "list len: %d\n", i); + fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n", + work_item[i].id_usr, + work_item[i].n_flushed); + + if (work_item[i].next == NULL) { + break; /* No more filled work items */ + } + } + fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot); } #endif /* UNIV_DEBUG */ @@ -282,10 +279,6 @@ mtflush_service_io( mtflush_io->wt_status = WTHR_SIG_WAITING; work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs); -#ifdef UNIV_DEBUG - mtflu_print_work_list(mtflush_io->work_item); -#endif - if (work_item) { mtflush_io->wt_status = WTHR_RUNNING; } else { @@ -345,10 +338,28 @@ DECLARE_THREAD(mtflush_io_thread)( void * arg) { thread_sync_t *mtflush_io = ((thread_sync_t *)arg); +#ifdef UNIV_DEBUG + ib_uint64_t stat_universal_num_processed = 0; + ib_uint64_t stat_cycle_num_processed = 0; + wrk_t* work_item = mtflush_io[0].work_item; + ulint i; +#endif while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { mtflush_service_io(mtflush_io); - mtflush_io->stat_cycle_num_processed = 0; + +#ifdef UNIV_DEBUG + for(i=0; i < MTFLUSH_MAX_WORKER; i++) { + stat_cycle_num_processed+= work_item[i].n_flushed; + } + + stat_universal_num_processed+=stat_cycle_num_processed; + stat_cycle_num_processed = 0; + fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n", + stat_universal_num_processed, + stat_cycle_num_processed); + mtflu_print_thread_stat(work_item); +#endif } /* This should make sure that all current work items are @@ -458,13 +469,16 @@ buf_mtflu_handler_init( work_items = (wrk_t*)mem_heap_alloc(mtflush_heap, MTFLUSH_MAX_WORKER * sizeof(wrk_t)); ut_a(work_items != NULL); + memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER); + memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER); /* Initialize work items */ - mtflu_setup_work_items(work_items, MTFLUSH_MAX_WORKER); + mtflu_setup_work_items(work_items, n_threads); /* Create threads for page-compression-flush */ for(i=0; i < n_threads; i++) { os_thread_id_t new_thread_id; + mtflush_ctx[i].n_threads = n_threads; mtflush_ctx[i].wq = mtflush_work_queue; mtflush_ctx[i].wr_cq = mtflush_write_comp_queue; mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; @@ -531,19 +545,16 @@ buf_mtflu_flush_work_items( per_pool_pages_flushed[i] = done_wi->n_flushed; } - if(done_wi->id_usr == -1 && + if((int)done_wi->id_usr == -1 && done_wi->wi_status == WRK_ITEM_SET ) { fprintf(stderr, - "**Set/Unused work_item[%d] flush_type=%lu\n", + "**Set/Unused work_item[%lu] flush_type=%lu\n", i, done_wi->wr.flush_type); ut_a(0); } n_flushed+= done_wi->n_flushed; - /* Reset for next round*/ - mtflush_ctx->work_item[i].id_usr = -1; - i++; } } @@ -551,47 +562,6 @@ buf_mtflu_flush_work_items( return(n_flushed); } -/*******************************************************************//** -Flushes dirty blocks from the end of the LRU list and also -puts replaceable clean pages from the end of the LRU list to the free -list. -NOTE: The calling thread is not allowed to own any latches on pages! -@return true if a batch was queued successfully. false if another batch -of same type was already running. */ -bool -buf_mtflu_flush_LRU( -/*================*/ - buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */ - ulint min_n, /*!< in: wished minimum mumber of blocks - flushed (it is not guaranteed that the - actual number is that big, though) */ - ulint* n_processed) /*!< out: the number of pages - which were processed is passed - back to caller. Ignored if NULL */ -{ - ulint page_count; - - if (n_processed) { - *n_processed = 0; - } - - if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) { - return(false); - } - - page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0); - - buf_flush_end(buf_pool, BUF_FLUSH_LRU); - - buf_flush_common(BUF_FLUSH_LRU, page_count); - - if (n_processed) { - *n_processed = page_count; - } - - return(true); -} - /*******************************************************************//** Multi-threaded version of buf_flush_list */ diff --git a/storage/xtradb/CMakeLists.txt b/storage/xtradb/CMakeLists.txt index 5050ca34da9..14fbb14bdd7 100644 --- a/storage/xtradb/CMakeLists.txt +++ b/storage/xtradb/CMakeLists.txt @@ -284,8 +284,7 @@ SET(INNOBASE_SOURCES buf/buf0flu.cc buf/buf0lru.cc buf/buf0rea.cc -# TODO: JAN uncomment -# buf/buf0mtflu.cc + buf/buf0mtflu.cc data/data0data.cc data/data0type.cc dict/dict0boot.cc diff --git a/storage/xtradb/buf/buf0flu.cc b/storage/xtradb/buf/buf0flu.cc index 8ed11fd674a..a080ef0ee48 100644 --- a/storage/xtradb/buf/buf0flu.cc +++ b/storage/xtradb/buf/buf0flu.cc @@ -32,6 +32,7 @@ Created 11/11/1995 Heikki Tuuri #endif #include "buf0buf.h" +#include "buf0mtflu.h" #include "buf0checksum.h" #include "srv0start.h" #include "srv0srv.h" @@ -1949,47 +1950,6 @@ void buf_pool_exit_LRU_mutex( mutex_exit(&buf_pool->LRU_list_mutex); } -/*******************************************************************//** -This utility flushes dirty blocks from the end of the LRU list and also -puts replaceable clean pages from the end of the LRU list to the free -list. -NOTE: The calling thread is not allowed to own any latches on pages! -@return true if a batch was queued successfully. false if another batch -of same type was already running. */ -static -bool -pgcomp_buf_flush_LRU( -/*==========*/ - buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */ - ulint min_n, /*!< in: wished minimum mumber of blocks - flushed (it is not guaranteed that the - actual number is that big, though) */ - ulint* n_processed) /*!< out: the number of pages - which were processed is passed - back to caller. Ignored if NULL */ -{ - flush_counters_t n; - - if (n_processed) { - *n_processed = 0; - } - - if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) { - return(false); - } - - buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0, false, &n); - - buf_flush_end(buf_pool, BUF_FLUSH_LRU); - - buf_flush_common(BUF_FLUSH_LRU, n.flushed); - - if (n_processed) { - *n_processed = n.flushed; - } - - return(true); -} /* JAN: TODO: END: */ /*******************************************************************//** @@ -2029,126 +1989,6 @@ buf_flush_LRU( return(true); } -/* JAN: TODO: */ -/*******************************************************************//**/ -extern int is_pgcomp_wrk_init_done(void); -extern int pgcomp_flush_work_items( - int buf_pool_inst, - int *pages_flushed, - buf_flush_t flush_type, - int min_n, - lsn_t lsn_limit); - -#define MT_COMP_WATER_MARK 50 - -#ifdef UNIV_DEBUG -#include -int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time) -{ - if (g_time->tv_usec < s_time->tv_usec) - { - int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1; - s_time->tv_usec -= 1000000 * nsec; - s_time->tv_sec += nsec; - } - if (g_time->tv_usec - s_time->tv_usec > 1000000) - { - int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000; - s_time->tv_usec += 1000000 * nsec; - s_time->tv_sec -= nsec; - } - d_time->tv_sec = g_time->tv_sec - s_time->tv_sec; - d_time->tv_usec = g_time->tv_usec - s_time->tv_usec; - - return 0; -} -#endif - -static os_fast_mutex_t pgcomp_mtx; - -void pgcomp_init(void) -{ - os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &pgcomp_mtx); -} - -void pgcomp_deinit(void) -{ - os_fast_mutex_free(&pgcomp_mtx); -} - -/*******************************************************************//** -Multi-threaded version of buf_flush_list -*/ -UNIV_INTERN -bool -pgcomp_buf_flush_list( -/*==================*/ - ulint min_n, /*!< in: wished minimum mumber of blocks - flushed (it is not guaranteed that the - actual number is that big, though) */ - lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all - blocks whose oldest_modification is - smaller than this should be flushed - (if their number does not exceed - min_n), otherwise ignored */ - ulint* n_processed) /*!< out: the number of pages - which were processed is passed - back to caller. Ignored if NULL */ - -{ - ulint i; - bool success = true; -#ifdef UNIV_DEBUG - struct timeval p_start_time, p_end_time, d_time; -#endif - int cnt_flush[MTFLUSH_MAX_WORKER]; - - if (n_processed) { - *n_processed = 0; - } - - if (min_n != ULINT_MAX) { - /* Ensure that flushing is spread evenly amongst the - buffer pool instances. When min_n is ULINT_MAX - we need to flush everything up to the lsn limit - so no limit here. */ - min_n = (min_n + srv_buf_pool_instances - 1) - / srv_buf_pool_instances; - } - -#ifdef UNIV_DEBUG - gettimeofday(&p_start_time, 0x0); -#endif - // os_fast_mutex_lock(&pgcomp_mtx); - pgcomp_flush_work_items(srv_buf_pool_instances, - cnt_flush, BUF_FLUSH_LIST, - min_n, lsn_limit); - // os_fast_mutex_unlock(&pgcomp_mtx); - - for (i = 0; i < srv_buf_pool_instances; i++) { - if (n_processed) { - *n_processed += cnt_flush[i]; - } - if (cnt_flush[i]) { - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_FLUSH_BATCH_TOTAL_PAGE, - MONITOR_FLUSH_BATCH_COUNT, - MONITOR_FLUSH_BATCH_PAGES, - cnt_flush[i]); - } - } -#ifdef UNIV_DEBUG - gettimeofday(&p_end_time, 0x0); - timediff(&p_end_time, &p_start_time, &d_time); - fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu %llu usec]\n", - __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed, - (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); -#endif - return(success); -} - -/* JAN: TODO: END: */ - /*******************************************************************//** This utility flushes dirty blocks from the end of the flush list of all buffer pool instances. @@ -2181,11 +2021,9 @@ buf_flush_list( bool timeout = false; ulint flush_start_time = 0; - /* JAN: TODO: */ - if (is_pgcomp_wrk_init_done()) { - return(pgcomp_buf_flush_list(min_n, lsn_limit, n_processed)); + if (buf_mtflu_init_done()) { + return(buf_mtflu_flush_list(min_n, lsn_limit, n_processed)); } - /* JAN: TODO: END: */ for (i = 0; i < srv_buf_pool_instances; i++) { requested_pages[i] = 0; @@ -2380,60 +2218,6 @@ buf_flush_single_page_from_LRU( return(freed); } -/* JAN: TODO: */ -/*********************************************************************//** -pgcomp_Clears up tail of the LRU lists: -* Put replaceable pages at the tail of LRU to the free list -* Flush dirty pages at the tail of LRU to the disk -The depth to which we scan each buffer pool is controlled by dynamic -config parameter innodb_LRU_scan_depth. -@return total pages flushed */ -UNIV_INTERN -ulint -pgcomp_buf_flush_LRU_tail(void) -/*====================*/ -{ -#ifdef UNIV_DEBUG - struct timeval p_start_time, p_end_time, d_time; -#endif - ulint total_flushed=0, i=0; - int cnt_flush[32]; - -#ifdef UNIV_DEBUG - gettimeofday(&p_start_time, 0x0); -#endif - ut_ad(is_pgcomp_wrk_init_done()); - - os_fast_mutex_lock(&pgcomp_mtx); - pgcomp_flush_work_items(srv_buf_pool_instances, - cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); - os_fast_mutex_unlock(&pgcomp_mtx); - - for (i = 0; i < srv_buf_pool_instances; i++) { - if (cnt_flush[i]) { - total_flushed += cnt_flush[i]; - - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_LRU_BATCH_TOTAL_PAGE, - MONITOR_LRU_BATCH_COUNT, - MONITOR_LRU_BATCH_PAGES, - cnt_flush[i]); - } - } - -#if UNIV_DEBUG - gettimeofday(&p_end_time, 0x0); - timediff(&p_end_time, &p_start_time, &d_time); - - fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( - srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed, - (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); -#endif - - return(total_flushed); -} - -/* JAN: TODO: END: */ /*********************************************************************//** Clears up tail of the LRU lists: * Put replaceable pages at the tail of LRU to the free list @@ -2458,12 +2242,10 @@ buf_flush_LRU_tail(void) ulint free_list_lwm = srv_LRU_scan_depth / 100 * srv_cleaner_free_list_lwm; - /* JAN: TODO: */ - if(is_pgcomp_wrk_init_done()) + if(buf_mtflu_init_done()) { - return(pgcomp_buf_flush_LRU_tail()); + return(buf_mtflu_flush_LRU_tail()); } - /* JAN: TODO: END */ for (ulint i = 0; i < srv_buf_pool_instances; i++) { diff --git a/storage/xtradb/buf/buf0mtflu.cc b/storage/xtradb/buf/buf0mtflu.cc new file mode 100644 index 00000000000..14ece48519f --- /dev/null +++ b/storage/xtradb/buf/buf0mtflu.cc @@ -0,0 +1,694 @@ +/***************************************************************************** + +Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved. +Copyright (C) 2013, 2014, SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file buf/buf0mtflu.cc +Multi-threaded flush method implementation + +Created 06/11/2013 Dhananjoy Das DDas@fusionio.com +Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com +Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com +Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +#include "buf0buf.h" +#include "buf0flu.h" +#include "buf0mtflu.h" +#include "buf0checksum.h" +#include "srv0start.h" +#include "srv0srv.h" +#include "page0zip.h" +#include "ut0byte.h" +#include "ut0lst.h" +#include "page0page.h" +#include "fil0fil.h" +#include "buf0lru.h" +#include "buf0rea.h" +#include "ibuf0ibuf.h" +#include "log0log.h" +#include "os0file.h" +#include "os0sync.h" +#include "trx0sys.h" +#include "srv0mon.h" +#include "mysql/plugin.h" +#include "mysql/service_thd_wait.h" +#include "fil0pagecompress.h" + +#define MT_COMP_WATER_MARK 50 + +/* Work item status */ +typedef enum wrk_status { + WRK_ITEM_SET=0, /*!< Work item is set */ + WRK_ITEM_START=1, /*!< Processing of work item has started */ + WRK_ITEM_DONE=2, /*!< Processing is done usually set to + SUCCESS/FAILED */ + WRK_ITEM_SUCCESS=2, /*!< Work item successfully processed */ + WRK_ITEM_FAILED=3, /*!< Work item process failed */ + WRK_ITEM_EXIT=4, /*!< Exiting */ + WRK_ITEM_STATUS_UNDEFINED +} wrk_status_t; + +/* Work item task type */ +typedef enum mt_wrk_tsk { + MT_WRK_NONE=0, /*!< Exit queue-wait */ + MT_WRK_WRITE=1, /*!< Flush operation */ + MT_WRK_READ=2, /*!< Read operation */ + MT_WRK_UNDEFINED +} mt_wrk_tsk_t; + +/* Work thread status */ +typedef enum wthr_status { + WTHR_NOT_INIT=0, /*!< Work thread not initialized */ + WTHR_INITIALIZED=1, /*!< Work thread initialized */ + WTHR_SIG_WAITING=2, /*!< Work thread wating signal */ + WTHR_RUNNING=3, /*!< Work thread running */ + WTHR_NO_WORK=4, /*!< Work thread has no work */ + WTHR_KILL_IT=5, /*!< Work thread should exit */ + WTHR_STATUS_UNDEFINED +} wthr_status_t; + +/* Write work task */ +typedef struct wr_tsk { + buf_pool_t *buf_pool; /*!< buffer-pool instance */ + buf_flush_t flush_type; /*!< flush-type for buffer-pool + flush operation */ + ulint min; /*!< minimum number of pages + requested to be flushed */ + lsn_t lsn_limit; /*!< lsn limit for the buffer-pool + flush operation */ +} wr_tsk_t; + +/* Read work task */ +typedef struct rd_tsk { + buf_pool_t *page_pool; /*!< list of pages to decompress; */ +} rd_tsk_t; + +/* Work item */ +typedef struct wrk_itm +{ + mt_wrk_tsk_t tsk; /*!< Task type. Based on task-type + one of the entries wr_tsk/rd_tsk + will be used */ + wr_tsk_t wr; /*!< Flush page list */ + rd_tsk_t rd; /*!< Decompress page list */ + ulint n_flushed; /*!< Flushed pages count */ + os_thread_t id_usr; /*!< Thread-id currently working */ + wrk_status_t wi_status; /*!< Work item status */ + struct wrk_itm *next; /*!< Next work item */ +} wrk_t; + +/* Thread syncronization data */ +typedef struct thread_sync +{ + ulint n_threads; /*!< Number of threads */ + os_thread_id_t wthread_id; /*!< Identifier */ + os_thread_t wthread; /*!< Thread id */ + ib_wqueue_t *wq; /*!< Work Queue */ + ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ + ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ + wthr_status_t wt_status; /*!< Worker thread status */ + mem_heap_t* wheap; /*!< Work heap where memory + is allocated */ + wrk_t* work_item; /*!< Work items to be processed */ +} thread_sync_t; + +/* QUESTION: Is this array used from several threads concurrently ? */ +// static wrk_t work_items[MTFLUSH_MAX_WORKER]; + +/* TODO: REALLY NEEDED ? */ +static int mtflush_work_initialized = -1; +static os_fast_mutex_t mtflush_mtx; +static thread_sync_t* mtflush_ctx=NULL; + +/******************************************************************//** +Initialize work items. */ +static +void +mtflu_setup_work_items( +/*===================*/ + wrk_t* work_items, /*!< inout: Work items */ + ulint n_items) /*!< in: Number of work items */ +{ + ulint i; + for(i=0; iwr.buf_pool != NULL); + + if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ +#ifdef UNIV_DEBUG + /* QUESTION: is this a really failure ? */ + fprintf(stderr, "flush_start Failed, flush_type:%d\n", + work_item->wr.flush_type); +#endif + return 0; + } + + + if (work_item->wr.flush_type == BUF_FLUSH_LRU) { + /* srv_LRU_scan_depth can be arbitrarily large value. + * We cap it with current LRU size. + */ + buf_pool_mutex_enter(work_item->wr.buf_pool); + work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU); + buf_pool_mutex_exit(work_item->wr.buf_pool); + work_item->wr.min = ut_min(srv_LRU_scan_depth,work_item->wr.min); + } + + buf_flush_batch(work_item->wr.buf_pool, + work_item->wr.flush_type, + work_item->wr.min, + work_item->wr.lsn_limit, + false, + &n); + + work_item->n_flushed = n.flushed; + + buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); + buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); + + return 0; +} + +#ifdef UNIV_DEBUG +/******************************************************************//** +Print flush statistics of work items +*/ +static +void +mtflu_print_thread_stat( +/*====================*/ + wrk_t* work_item) /*!< in: Work items */ +{ + ulint stat_tot=0; + ulint i=0; + + for(i=0; i< MTFLUSH_MAX_WORKER; i++) { + stat_tot+=work_item[i].n_flushed; + + fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n", + work_item[i].id_usr, + work_item[i].n_flushed); + + if (work_item[i].next == NULL) { + break; /* No more filled work items */ + } + } + + fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot); +} +#endif /* UNIV_DEBUG */ + +/******************************************************************//** +Worker function to wait for work items and processing them and +sending reply back. +*/ +static +void +mtflush_service_io( +/*===============*/ + thread_sync_t* mtflush_io) /*!< inout: multi-threaded flush + syncronization data */ +{ + wrk_t *work_item = NULL; + ulint n_flushed=0; + ib_time_t max_wait_usecs = 5000000; + + mtflush_io->wt_status = WTHR_SIG_WAITING; + work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs); + + if (work_item) { + mtflush_io->wt_status = WTHR_RUNNING; + } else { + /* Because of timeout this thread did not get any work */ + mtflush_io->wt_status = WTHR_NO_WORK; + return; + } + + work_item->id_usr = mtflush_io->wthread; + + switch(work_item->tsk) { + case MT_WRK_NONE: + ut_a(work_item->wi_status == WRK_ITEM_EXIT); + work_item->wi_status = WRK_ITEM_SUCCESS; + /* QUESTION: Why completed work items are inserted to + completion queue ? */ + ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); + break; + + case MT_WRK_WRITE: + work_item->wi_status = WRK_ITEM_START; + /* Process work item */ + /* QUESTION: Is this a really a error ? */ + if (0 != (n_flushed = buf_mtflu_flush_pool_instance(work_item))) { + fprintf(stderr, "FLUSH op failed ret:%lu\n", n_flushed); + work_item->wi_status = WRK_ITEM_FAILED; + } + work_item->wi_status = WRK_ITEM_SUCCESS; + ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); + break; + + case MT_WRK_READ: + /* Need to also handle the read case */ + /* TODO: ? */ + ut_a(0); + /* completed task get added to rd_cq */ + /* work_item->wi_status = WRK_ITEM_SUCCESS; + ib_wqueue_add(mtflush_io->rd_cq, work_item, mtflush_io->wheap);*/ + break; + + default: + /* None other than Write/Read handling planned */ + ut_a(0); + } + + mtflush_io->wt_status = WTHR_NO_WORK; +} + +/******************************************************************//** +Thead used to flush dirty pages when multi-threaded flush is +used. +@return a dummy parameter*/ +extern "C" UNIV_INTERN +os_thread_ret_t +DECLARE_THREAD(mtflush_io_thread)( +/*==============================*/ + void * arg) +{ + thread_sync_t *mtflush_io = ((thread_sync_t *)arg); +#ifdef UNIV_DEBUG + ib_uint64_t stat_universal_num_processed = 0; + ib_uint64_t stat_cycle_num_processed = 0; + wrk_t* work_item = mtflush_io[0].work_item; + ulint i; +#endif + + while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { + mtflush_service_io(mtflush_io); + +#ifdef UNIV_DEBUG + for(i=0; i < MTFLUSH_MAX_WORKER; i++) { + stat_cycle_num_processed+= work_item[i].n_flushed; + } + + stat_universal_num_processed+=stat_cycle_num_processed; + stat_cycle_num_processed = 0; + fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n", + stat_universal_num_processed, + stat_cycle_num_processed); + mtflu_print_thread_stat(work_item); +#endif + } + + /* This should make sure that all current work items are + processed before threads exit. */ + while (!ib_wqueue_is_empty(mtflush_io->wq)) { + mtflush_service_io(mtflush_io); + } + + os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; +} + +/******************************************************************//** +Add exit work item to work queue to signal multi-threded flush +threads that they should exit. +*/ +void +buf_mtflu_io_thread_exit(void) +/*==========================*/ +{ + ulint i; + thread_sync_t* mtflush_io = mtflush_ctx; + + ut_a(mtflush_io != NULL); + + fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n", + srv_buf_pool_instances); + + /* Send one exit work item/thread */ + for (i=0; i < srv_buf_pool_instances; i++) { + mtflush_io->work_item[i].wr.buf_pool = NULL; + mtflush_io->work_item[i].rd.page_pool = NULL; + mtflush_io->work_item[i].tsk = MT_WRK_NONE; + mtflush_io->work_item[i].wi_status = WRK_ITEM_EXIT; + + ib_wqueue_add(mtflush_io->wq, + (void *)&(mtflush_io->work_item[i]), + mtflush_io->wheap); + } + + /* Wait until all work items on a work queue are processed */ + while(!ib_wqueue_is_empty(mtflush_io->wq)) { + /* Wait about 1/2 sec */ + os_thread_sleep(50000); + } + + ut_a(ib_wqueue_is_empty(mtflush_io->wq)); + + /* Collect all work done items */ + for (i=0; i < srv_buf_pool_instances;) { + wrk_t* work_item; + + work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000); + + if (work_item) { + i++; + } + } + + ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); + ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq)); + + /* Free all queues */ + ib_wqueue_free(mtflush_io->wq); + ib_wqueue_free(mtflush_io->wr_cq); + ib_wqueue_free(mtflush_io->rd_cq); + + /* Free heap */ + mem_heap_free(mtflush_io->wheap); + + os_fast_mutex_free(&mtflush_mtx); +} + +/******************************************************************//** +Initialize multi-threaded flush thread syncronization data. +@return Initialized multi-threaded flush thread syncroniztion data. */ +void* +buf_mtflu_handler_init( +/*===================*/ + ulint n_threads, /*!< in: Number of threads to create */ + ulint wrk_cnt) /*!< in: Number of work items */ +{ + ulint i; + mem_heap_t* mtflush_heap; + ib_wqueue_t* mtflush_work_queue; + ib_wqueue_t* mtflush_write_comp_queue; + ib_wqueue_t* mtflush_read_comp_queue; + wrk_t* work_items; + + os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx); + + /* Create heap, work queue, write completion queue, read + completion queue for multi-threaded flush, and init + handler. */ + mtflush_heap = mem_heap_create(0); + ut_a(mtflush_heap != NULL); + mtflush_work_queue = ib_wqueue_create(); + ut_a(mtflush_work_queue != NULL); + mtflush_write_comp_queue = ib_wqueue_create(); + ut_a(mtflush_write_comp_queue != NULL); + mtflush_read_comp_queue = ib_wqueue_create(); + ut_a(mtflush_read_comp_queue != NULL); + + mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap, + MTFLUSH_MAX_WORKER * sizeof(thread_sync_t)); + ut_a(mtflush_ctx != NULL); + work_items = (wrk_t*)mem_heap_alloc(mtflush_heap, + MTFLUSH_MAX_WORKER * sizeof(wrk_t)); + ut_a(work_items != NULL); + memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER); + memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER); + + /* Initialize work items */ + mtflu_setup_work_items(work_items, n_threads); + + /* Create threads for page-compression-flush */ + for(i=0; i < n_threads; i++) { + os_thread_id_t new_thread_id; + mtflush_ctx[i].n_threads = n_threads; + mtflush_ctx[i].wq = mtflush_work_queue; + mtflush_ctx[i].wr_cq = mtflush_write_comp_queue; + mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; + mtflush_ctx[i].wheap = mtflush_heap; + mtflush_ctx[i].wt_status = WTHR_INITIALIZED; + mtflush_ctx[i].work_item = work_items; + + mtflush_ctx[i].wthread = os_thread_create( + mtflush_io_thread, + ((void *)(mtflush_ctx + i)), + &new_thread_id); + + mtflush_ctx[i].wthread_id = new_thread_id; + } + + buf_mtflu_work_init(); + + return((void *)mtflush_ctx); +} + +/******************************************************************//** +Flush buffer pool instances. +@return number of pages flushed. */ +ulint +buf_mtflu_flush_work_items( +/*=======================*/ + ulint buf_pool_inst, /*!< in: Number of buffer pool instances */ + ulint *per_pool_pages_flushed, /*!< out: Number of pages + flushed/instance */ + buf_flush_t flush_type, /*!< in: Type of flush */ + ulint min_n, /*!< in: Wished minimum number of + blocks to be flushed */ + lsn_t lsn_limit) /*!< in: All blocks whose + oldest_modification is smaller than + this should be flushed (if their + number does not exceed min_n) */ +{ + ulint n_flushed=0, i; + wrk_t *done_wi; + + for(i=0;iwork_item[i].tsk = MT_WRK_WRITE; + mtflush_ctx->work_item[i].rd.page_pool = NULL; + mtflush_ctx->work_item[i].wr.buf_pool = buf_pool_from_array(i); + mtflush_ctx->work_item[i].wr.flush_type = flush_type; + mtflush_ctx->work_item[i].wr.min = min_n; + mtflush_ctx->work_item[i].wr.lsn_limit = lsn_limit; + mtflush_ctx->work_item[i].id_usr = -1; + mtflush_ctx->work_item[i].wi_status = WRK_ITEM_SET; + + ib_wqueue_add(mtflush_ctx->wq, + (void *)(&(mtflush_ctx->work_item[i])), + mtflush_ctx->wheap); + } + + /* wait on the completion to arrive */ + for(i=0; i< buf_pool_inst;) { + done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000); + + if (done_wi != NULL) { + if(done_wi->n_flushed == 0) { + per_pool_pages_flushed[i] = 0; + } else { + per_pool_pages_flushed[i] = done_wi->n_flushed; + } + + if((int)done_wi->id_usr == -1 && + done_wi->wi_status == WRK_ITEM_SET ) { + fprintf(stderr, + "**Set/Unused work_item[%lu] flush_type=%lu\n", + i, + done_wi->wr.flush_type); + ut_a(0); + } + + n_flushed+= done_wi->n_flushed; + i++; + } + } + + return(n_flushed); +} + +/*******************************************************************//** +Multi-threaded version of buf_flush_list +*/ +bool +buf_mtflu_flush_list( +/*=================*/ + ulint min_n, /*!< in: wished minimum mumber of blocks + flushed (it is not guaranteed that the + actual number is that big, though) */ + lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all + blocks whose oldest_modification is + smaller than this should be flushed + (if their number does not exceed + min_n), otherwise ignored */ + ulint* n_processed) /*!< out: the number of pages + which were processed is passed + back to caller. Ignored if NULL */ + +{ + ulint i; + bool success = true; + ulint cnt_flush[MTFLUSH_MAX_WORKER]; + + if (n_processed) { + *n_processed = 0; + } + + if (min_n != ULINT_MAX) { + /* Ensure that flushing is spread evenly amongst the + buffer pool instances. When min_n is ULINT_MAX + we need to flush everything up to the lsn limit + so no limit here. */ + min_n = (min_n + srv_buf_pool_instances - 1) + / srv_buf_pool_instances; + } + + /* QUESTION: What is procted by below mutex ? */ + os_fast_mutex_lock(&mtflush_mtx); + buf_mtflu_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LIST, + min_n, lsn_limit); + os_fast_mutex_unlock(&mtflush_mtx); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (n_processed) { + *n_processed += cnt_flush[i]; + } + if (cnt_flush[i]) { + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_FLUSH_BATCH_TOTAL_PAGE, + MONITOR_FLUSH_BATCH_COUNT, + MONITOR_FLUSH_BATCH_PAGES, + cnt_flush[i]); + } + } +#ifdef UNIV_DEBUG + fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n", + __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed); +#endif + return(success); +} + +/*********************************************************************//** +Clears up tail of the LRU lists: +* Put replaceable pages at the tail of LRU to the free list +* Flush dirty pages at the tail of LRU to the disk +The depth to which we scan each buffer pool is controlled by dynamic +config parameter innodb_LRU_scan_depth. +@return total pages flushed */ +UNIV_INTERN +ulint +buf_mtflu_flush_LRU_tail(void) +/*==========================*/ +{ + ulint total_flushed=0, i; + ulint cnt_flush[MTFLUSH_MAX_WORKER]; + + ut_a(buf_mtflu_init_done()); + + /* QUESTION: What is protected by below mutex ? */ + os_fast_mutex_lock(&mtflush_mtx); + buf_mtflu_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); + os_fast_mutex_unlock(&mtflush_mtx); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (cnt_flush[i]) { + total_flushed += cnt_flush[i]; + + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_LRU_BATCH_TOTAL_PAGE, + MONITOR_LRU_BATCH_COUNT, + MONITOR_LRU_BATCH_PAGES, + cnt_flush[i]); + } + } + +#if UNIV_DEBUG + fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", ( + srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed); +#endif + + return(total_flushed); +} + +/*********************************************************************//** +Set correct thread identifiers to io thread array based on +information we have. */ +void +buf_mtflu_set_thread_ids( +/*=====================*/ + ulint n_threads, /*!wr.buf_pool) { - fprintf(stderr, "work-item wi->buf_pool:%p [likely thread exit]\n", - wi->wr.buf_pool); - return -1; - } - - wi->t_usec = 0; - if (!buf_flush_start(wi->wr.buf_pool, wi->wr.flush_type)) { - /* We have two choices here. If lsn_limit was - specified then skipping an instance of buffer - pool means we cannot guarantee that all pages - up to lsn_limit has been flushed. We can - return right now with failure or we can try - to flush remaining buffer pools up to the - lsn_limit. We attempt to flush other buffer - pools based on the assumption that it will - help in the retry which will follow the - failure. */ - fprintf(stderr, "flush_start Failed, flush_type:%d\n", - wi->wr.flush_type); - return -1; - } - -#ifdef UNIV_DEBUG - /* Record time taken for the OP in usec */ - gettimeofday(&p_start_time, 0x0); -#endif - - if (wi->wr.flush_type == BUF_FLUSH_LRU) { - /* srv_LRU_scan_depth can be arbitrarily large value. - * We cap it with current LRU size. - */ - buf_pool_enter_LRU_mutex(wi->wr.buf_pool); - wi->wr.min = UT_LIST_GET_LEN(wi->wr.buf_pool->LRU); - buf_pool_exit_LRU_mutex(wi->wr.buf_pool); - wi->wr.min = ut_min(srv_LRU_scan_depth,wi->wr.min); - } - - wi->result = buf_flush_batch(wi->wr.buf_pool, - wi->wr.flush_type, - wi->wr.min, wi->wr.lsn_limit, - false, &n); - - buf_flush_end(wi->wr.buf_pool, wi->wr.flush_type); - buf_flush_common(wi->wr.flush_type, wi->result); - -#ifdef UNIV_DEBUG - gettimeofday(&p_end_time, 0x0); - timediff(&p_end_time, &p_start_time, &d_time); - wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); -#endif - - return 0; -} - -int service_page_comp_io(thread_sync_t * ppc) -{ - wrk_t *wi = NULL; - int ret=0; - - ppc->wt_status = WTHR_SIG_WAITING; - wi = (wrk_t *)ib_wqueue_wait(ppc->wq); - - if (wi) { - ppc->wt_status = WTHR_RUNNING; - } else { - fprintf(stderr, "%s:%d work-item is NULL\n", __FILE__, __LINE__); - ppc->wt_status = WTHR_NO_WORK; - return (0); - } - - assert(wi != NULL); - wi->id_usr = ppc->wthread; - - switch(wi->tsk) { - case MT_WRK_NONE: - assert(wi->wi_status == WRK_ITEM_EXIT); - wi->wi_status = WRK_ITEM_SUCCESS; - ib_wqueue_add(ppc->wr_cq, wi, heap_allocated); - break; - - case MT_WRK_WRITE: - wi->wi_status = WRK_ITEM_START; - /* Process work item */ - if (0 != (ret = flush_pool_instance(wi))) { - fprintf(stderr, "FLUSH op failed ret:%d\n", ret); - wi->wi_status = WRK_ITEM_FAILED; - } - wi->wi_status = WRK_ITEM_SUCCESS; - ib_wqueue_add(ppc->wr_cq, wi, heap_allocated); - break; - - case MT_WRK_READ: - /* Need to also handle the read case */ - assert(0); - /* completed task get added to rd_cq */ - /* wi->wi_status = WRK_ITEM_SUCCESS; - ib_wqueue_add(ppc->rd_cq, wi, heap_allocated);*/ - break; - - default: - /* None other than Write/Read handling planned */ - assert(0); - } - - ppc->wt_status = WTHR_NO_WORK; - return(0); -} - -void page_comp_io_thread_exit() -{ - ulint i; - - fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n", srv_buf_pool_instances); - for (i=0; istat_cycle_num_processed = 0; - } - os_thread_exit(NULL); - OS_THREAD_DUMMY_RETURN; -} - -int print_wrk_list(wrk_t *wi_list) -{ - wrk_t *wi = wi_list; - int i=0; - - if(!wi_list) { - fprintf(stderr, "list NULL\n"); - } - - while(wi) { - fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n", - wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next); - wi = wi->next; - i++; - } - fprintf(stderr, "list len: %d\n", i); - return 0; -} - -/******************************************************************//** -@return a dummy parameter*/ -int pgcomp_handler_init(int num_threads, int wrk_cnt, ib_wqueue_t *wq, ib_wqueue_t *wr_cq, ib_wqueue_t *rd_cq) -{ - int i=0; - - if(is_pgcomp_wrk_init_done()) { - fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n"); - return -1; - } - - if(!wq || !wr_cq || !rd_cq) { - fprintf(stderr, "%s() FAILED wq:%p write-cq:%p read-cq:%p\n", - __FUNCTION__, wq, wr_cq, rd_cq); - return -1; - } - - /* work-item setup */ - setup_wrk_itm(wrk_cnt); - - /* Mark each of the thread sync entires */ - for(i=0; i < MTFLUSH_MAX_WORKER; i++) { - pc_sync[i].wthread_id = i; - } - - /* Create threads for page-compression-flush */ - for(i=0; i < num_threads; i++) { - pc_sync[i].wthread_id = i; - pc_sync[i].wq = wq; - pc_sync[i].wr_cq = wr_cq; - pc_sync[i].rd_cq = rd_cq; - - os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), - thread_ids + START_OLD_THREAD_CNT + i); - pc_sync[i].wthread = (START_OLD_THREAD_CNT + i); - pc_sync[i].wt_status = WTHR_INITIALIZED; - } - set_pgcomp_wrk_init_done(); - fprintf(stderr, "%s() Worker-Threads created..\n", __FUNCTION__); - return 0; -} - -int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads) -{ - ulong stat_tot=0; - ulint i=0; - for(i=0; i