diff --git a/sql/handler.cc b/sql/handler.cc index cbe32dd4529..4b1938fe569 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2808,7 +2808,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, int ha_init_pagecache(const char *name, PAGECACHE *pagecache) { - DBUG_ENTER("ha_init_key_cache"); + DBUG_ENTER("ha_init_pagecache"); if (!pagecache->inited) { diff --git a/storage/maria/Makefile.am b/storage/maria/Makefile.am index 2d11d2f470b..6e15b1df056 100644 --- a/storage/maria/Makefile.am +++ b/storage/maria/Makefile.am @@ -30,8 +30,8 @@ DEFS = @DEFS@ # "." is needed first because tests in unittest need libmaria SUBDIRS = . unittest -EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in -pkgdata_DATA = ma_test_all ma_test_all.res +EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in ma_test_recovery +pkgdata_DATA = ma_test_all ma_test_all.res ma_test_recovery pkglib_LIBRARIES = libmaria.a bin_PROGRAMS = maria_chk maria_pack maria_ftdump maria_read_log maria_chk_DEPENDENCIES= $(LIBRARIES) @@ -61,7 +61,7 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \ ma_ft_eval.h trnman.h lockman.h tablockman.h \ ma_control_file.h ha_maria.h ma_blockrec.h \ ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \ - ma_commit.h + ma_recovery.h ma_commit.h ma_test1_DEPENDENCIES= $(LIBRARIES) ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \ $(top_builddir)/storage/myisam/libmyisam.a \ @@ -120,7 +120,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \ ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \ ma_sp_key.c ma_control_file.c ma_loghandler.c \ ma_pagecache.c ma_pagecaches.c \ - ma_commit.c + ma_recovery.c ma_commit.c CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? SUFFIXES = .sh diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index da701feda10..8a2b8ad99ac 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -37,6 +37,15 @@ #define trans_register_ha(A, B, C) do { /* nothing */ } while(0) #endif +/** + @todo For now there is no way for a user to set a different value of + maria_recover_options, i.e. auto-check-and-repair is always disabled. + We could enable it. As the auto-repair is initiated when opened from the + SQL layer (open_unireg_entry(), check_and_repair()), it does not happen + when Maria's Recovery internally opens the table to apply log records to + it, which is good. It would happen only after Recovery, if the table is + still corrupted. +*/ ulong maria_recover_options= HA_RECOVER_NONE; static handlerton *maria_hton; @@ -1867,6 +1876,10 @@ int ha_maria::external_lock(THD *thd, int lock_type) corresponding unlock (they just stay locked and are later dropped while locked); if a tmp table was transactional, "SELECT FROM non_tmp, tmp" would never commit as its "locked_tables" count would stay 1. + When Maria has has_transactions()==TRUE, open_temporary_table() + (sql_base.cc) will use TRANSACTIONAL_TMP_TABLE and thus the + external_lock(F_UNLCK) will happen and we can then allow the user to + create transactional temporary tables. */ if (!file->s->base.born_transactional) goto skip_transaction; diff --git a/storage/maria/ma_bitmap.c b/storage/maria/ma_bitmap.c index 9b2741037c5..6bb4d3c95f3 100644 --- a/storage/maria/ma_bitmap.c +++ b/storage/maria/ma_bitmap.c @@ -130,6 +130,7 @@ #define FULL_HEAD_PAGE 4 #define FULL_TAIL_PAGE 7 +/** all bitmap pages end with this 2-byte signature */ uchar maria_bitmap_marker[2]= {(uchar) 'b',(uchar) 'm'}; static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, @@ -244,7 +245,7 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share) /* - Flush bitmap to disk + Send updated bitmap to the page cache SYNOPSIS _ma_flush_bitmap() @@ -286,7 +287,7 @@ my_bool _ma_flush_bitmap(MARIA_SHARE *share) share Share handler NOTES - This is called on ma_delete_all (truncate data file). + This is called on maria_delete_all_rows (truncate data file). */ void _ma_bitmap_delete_all(MARIA_SHARE *share) @@ -294,8 +295,9 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share) MARIA_FILE_BITMAP *bitmap= &share->bitmap; if (bitmap->map) /* Not in create */ { - bzero(bitmap->map, share->block_size); - memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2); + bzero(bitmap->map, bitmap->block_size); + memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker), + maria_bitmap_marker, sizeof(maria_bitmap_marker)); bitmap->changed= 1; bitmap->page= 0; bitmap->used_size= bitmap->total_size; @@ -497,6 +499,10 @@ static void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap) TODO Update 'bitmap->used_size' to real size of used bitmap + NOTE + We don't always have share->bitmap.bitmap_lock here + (when called from_ma_check_bitmap_data() for example). + RETURN 0 ok 1 error (Error writing old bitmap or reading bitmap page) @@ -516,7 +522,8 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, { share->state.state.data_file_length= position + bitmap->block_size; bzero(bitmap->map, bitmap->block_size); - memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2); + memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker), + maria_bitmap_marker, sizeof(maria_bitmap_marker)); bitmap->used_size= 0; #ifndef DBUG_OFF memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); @@ -525,11 +532,14 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, } bitmap->used_size= bitmap->total_size; DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); - res= pagecache_read(share->pagecache, - (PAGECACHE_FILE*)&bitmap->file, page, 0, - (uchar*) bitmap->map, - PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == 0; + res= (pagecache_read(share->pagecache, + (PAGECACHE_FILE*)&bitmap->file, page, 0, + (uchar*) bitmap->map, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL) | + memcmp(bitmap->map + bitmap->block_size - + sizeof(maria_bitmap_marker), + maria_bitmap_marker, sizeof(maria_bitmap_marker)); #ifndef DBUG_OFF if (!res) memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); @@ -1630,9 +1640,16 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, bitmap->changed= 1; DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap);); - if (fill_pattern != 3 && fill_pattern != 7 && - bitmap_page < info->s->state.first_bitmap_with_space) - info->s->state.first_bitmap_with_space= bitmap_page; + if (fill_pattern != 3 && fill_pattern != 7) + set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page); + /* + Note that if the condition above is false (page is full), and all pages of + this bitmap are now full, and that bitmap page was + first_bitmap_with_space, we don't modify first_bitmap_with_space, indeed + its value still tells us where to start our search for a bitmap with space + (which is for sure after this full one). + That does mean that first_bitmap_with_space is only a lower bound. + */ DBUG_RETURN(0); } @@ -1747,8 +1764,7 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, tmp= (1 << bit_count) - 1; *data&= ~tmp; } - if (bitmap_page < info->s->state.first_bitmap_with_space) - info->s->state.first_bitmap_with_space= bitmap_page; + set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page); bitmap->changed= 1; DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap);); DBUG_RETURN(0); @@ -2014,3 +2030,28 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info, DBUG_ASSERT(0); return 1; } + + +/** + @brief create the first bitmap page of a freshly created data file + + @param share table's share + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +int _ma_bitmap_create_first(MARIA_SHARE *share) +{ + uint block_size= share->bitmap.block_size; + File file= share->bitmap.file.file; + if (my_chsize(file, block_size, 0, MYF(MY_WME)) || + my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker), + block_size - sizeof(maria_bitmap_marker), + MYF(MY_NABP | MY_WME))) + return 1; + share->state.state.data_file_length= block_size; + _ma_bitmap_delete_all(share); + return 0; +} diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c index 6376a3fef87..c89f7465f26 100644 --- a/storage/maria/ma_blockrec.c +++ b/storage/maria/ma_blockrec.c @@ -398,7 +398,8 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share) File must be synced as it is going out of the maria_open_list and so becoming unknown to Checkpoint. */ - if (my_sync(share->bitmap.file.file, MYF(MY_WME)) || + if ((share->now_transactional && + my_sync(share->bitmap.file.file, MYF(MY_WME))) || my_close(share->bitmap.file.file, MYF(MY_WME))) res= 1; /* @@ -1455,9 +1456,6 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row) static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) { - uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + - ROW_EXTENT_SIZE]; - LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; my_bool res= 0; if (pagecache_delete_pages(info->s->pagecache, &info->dfile, @@ -1467,12 +1465,16 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) if (info->s->now_transactional) { LSN lsn; + /** @todo unify log_data's shape with delete_head_or_tail() */ + uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + ROW_EXTENT_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; DBUG_ASSERT(info->trn->rec_lsn); pagerange_store(log_data + FILEID_STORE_SIZE, 1); - int5store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, + page_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page); - int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + 5, - count); + int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGE_STORE_SIZE, count); log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); @@ -1967,8 +1969,8 @@ static my_bool write_block_record(MARIA_HA *info, ((last_head_block - head_block) - 2) * ROW_EXTENT_SIZE; } DBUG_ASSERT(uint2korr(extent_data+5) & TAIL_BIT); - int5store(extent_data, head_tail_block->page); - int2store(extent_data + 5, head_tail_block->page_count); + page_store(extent_data, head_tail_block->page); + int2store(extent_data + PAGE_STORE_SIZE, head_tail_block->page_count); } } else @@ -2225,7 +2227,11 @@ disk_err: and this hook will mark the table corrupted. Maybe hook should be stored in the pagecache's block structure, or in a hash "file->maria_ha*". - */ + + @todo RECOVERY we should distinguish below between log write error and + table write error. The former should stop Maria immediately, the latter + should mark the table corrupted. + */ /* Unpin all pinned pages to not cause problems for disk cache */ _ma_unpin_all_pages(info, 0); @@ -2340,7 +2346,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) { LSN lsn; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; - uchar log_data[LSN_STORE_SIZE]; + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]; /* Write UNDO record @@ -2351,16 +2357,28 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) */ /** @todo RECOVERY BUG - We will soon change that: we will here execute the UNDO records - generated while we were trying to write the row; this will log some - CLRs which will replace this LOGREC_UNDO_PURGE. + We do need the code above (delete_head_or_tail() etc) for + non-transactional tables. + For transactional tables we can either also use it or execute the + UNDO_INSERT. If we crash before this + _ma_write_abort_block_record(), Recovery will do the work of this + function by executing UNDO_INSERT. + For transactional tables, we will remove this LOGREC_UNDO_PURGE and + replace it with a LOGREC_CLR_END: we should go back the UNDO chain + until we reach the UNDO which inserted the row into the data file, and + use its previous_undo_lsn. + Same logic for when we remove inserted keys (in case of error in + maria_write(): we come to the present function only after removing the + inserted keys... as long as we unpin the key pages only after writing + the CLR_END, this would be recovery-safe...). */ lsn_store(log_data, info->trn->undo_lsn); log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); if (translog_write_record(&lsn, LOGREC_UNDO_ROW_PURGE, - info->trn, NULL, sizeof(log_data), - TRANSLOG_INTERNAL_PARTS + 1, log_array, NULL)) + info->trn, info->s, sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, log_array, + log_data + LSN_STORE_SIZE)) res= 1; } _ma_unpin_all_pages(info, info->trn->undo_lsn); @@ -2390,6 +2408,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ulonglong page; struct st_row_pos_info row_pos; MARIA_SHARE *share= info->s; + my_bool res; DBUG_ENTER("_ma_update_block_record"); DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos)); @@ -2486,8 +2505,8 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, row_pos.dir= dir; row_pos.data= buff + uint2korr(dir); row_pos.length= head_length; - DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1, - &row_pos)); + res= write_block_record(info, oldrec, record, new_row, blocks, 1, &row_pos); + DBUG_RETURN(res); err: _ma_unpin_all_pages(info, 0); @@ -2609,7 +2628,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info, res= delete_dir_entry(buff, block_size, record_number, &empty_space); if (res < 0) DBUG_RETURN(1); - if (res == 0) + if (res == 0) /* after our deletion, page is still not empty */ { uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; @@ -2637,14 +2656,13 @@ static my_bool delete_head_or_tail(MARIA_HA *info, PAGECACHE_WRITE_DELAY, &page_link.link)) DBUG_RETURN(1); } - else + else /* page is now empty */ { - uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + - PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE]; - LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; - if (info->s->now_transactional) { + uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; pagerange_store(log_data + FILEID_STORE_SIZE, 1); page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page); pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + @@ -2849,7 +2867,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info, uint page_count; extent->extent= extent_info; extent->extent_count= extents; - extent->page= uint5korr(extent_info); /* First extent */ + extent->page= page_korr(extent_info); /* First extent */ page_count= uint2korr(extent_info + ROW_EXTENT_PAGE_SIZE); extent->page_count= page_count & ~TAIL_BIT; extent->tail= page_count & TAIL_BIT; @@ -2889,7 +2907,7 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent, if (!--extent->extent_count) goto crashed; extent->extent+= ROW_EXTENT_SIZE; - extent->page= uint5korr(extent->extent); + extent->page= page_korr(extent->extent); page_count= uint2korr(extent->extent+ROW_EXTENT_PAGE_SIZE); if (!page_count) goto crashed; @@ -4123,15 +4141,21 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, uint block_size= share->block_size; uint rec_offset; uchar *buff= info->keyread_buff, *dir; - DBUG_ENTER("_ma_apply_redo_insert_row_head"); + DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail"); info->keyread_buff_used= 1; page= page_korr(header); rownr= dirpos_korr(header+PAGE_STORE_SIZE); - if (page * info->s->block_size > info->state->data_file_length) + if (((page + 1) * info->s->block_size) > info->state->data_file_length) { - /* New page at end of file */ + /* + New page at end of file. Note that the test above is also positive if + data_file_length is not a multiple of block_size (system crashed while + writing the last page): in this case we just extend the last page and + fill it entirely with zeroes, then the REDO will put correct data on + it. + */ DBUG_ASSERT(rownr == 0); if (rownr != 0) goto err; @@ -4141,7 +4165,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE; /* Update that file is extended */ - info->state->data_file_length= page * info->s->block_size; + info->state->data_file_length= (page + 1) * info->s->block_size; } else { @@ -4294,8 +4318,6 @@ err: lsn LSN to put on page page_type HEAD_PAGE or TAIL_PAGE header Header (without FILEID) - data Data to be put on page - data_length Length of data NOTES This function is very similar to delete_head_or_tail() @@ -4340,6 +4362,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, if (delete_dir_entry(buff, block_size, record_number, &empty_space) < 0) DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + lsn_store(buff, lsn); if (pagecache_write(share->pagecache, &info->dfile, page, 0, buff, PAGECACHE_PLAIN_PAGE, @@ -4354,3 +4377,91 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, DBUG_RETURN(0); } + + +/** + @brief Apply LOGREC_REDO_PURGE_BLOCKS + + @param info Maria handler + @param header Header (without FILEID) + + @note It marks the page free in the bitmap, and sets the directory's count + to 0. + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +uint _ma_apply_redo_purge_blocks(MARIA_HA *info, + LSN lsn, const byte *header) +{ + MARIA_SHARE *share= info->s; + ulonglong page; + uint page_range; + uint res; + byte *buff= info->keyread_buff; + uint block_size= share->block_size; + DBUG_ENTER("_ma_apply_redo_purge_blocks"); + + info->keyread_buff_used= 1; + page_range= pagerange_korr(header); + /* works only for a one-page range for now */ + DBUG_ASSERT(page_range == 1); // for now + header+= PAGERANGE_STORE_SIZE; + page= page_korr(header); + header+= PAGE_STORE_SIZE; + page_range= pagerange_korr(header); + DBUG_ASSERT(page_range == 1); // for now + + if (!(buff= pagecache_read(share->pagecache, + &info->dfile, + page, 0, + buff, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) + DBUG_RETURN(my_errno); + + if (lsn_korr(buff) >= lsn) + { + /* Already applied */ + goto mark_free_in_bitmap; + } + + buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE; + + /* + Strictly speaking, we don't need to zero the last directory entry of this + page; setting the directory's count to zero is enough (it makes the last + directory entry invisible, irrelevant). + But as the "runtime" code (delete_head_or_tail()) called + delete_dir_entry() which zeroed the entry, if we don't do it here, we get + a difference between runtime and log-applying. Irrelevant, but it's + time-consuming to differentiate irrelevant differences from relevant + ones. So we remove the difference by zeroing the entry. + */ + { + uint rownr= ((uint) ((uchar *) buff)[DIR_COUNT_OFFSET]) - 1; + byte *dir= (buff + block_size - DIR_ENTRY_SIZE * rownr - + DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE); + dir[0]= dir[1]= 0; /* Delete entry */ + } + + buff[DIR_COUNT_OFFSET]= 0; + + lsn_store(buff, lsn); + if (pagecache_write(share->pagecache, + &info->dfile, page, 0, + buff, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DELAY, 0)) + DBUG_RETURN(my_errno); + +mark_free_in_bitmap: + /** @todo leave bitmap lock to the bitmap code... */ + pthread_mutex_lock(&share->bitmap.bitmap_lock); + res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1); + pthread_mutex_unlock(&share->bitmap.bitmap_lock); + + DBUG_RETURN(res); +} diff --git a/storage/maria/ma_blockrec.h b/storage/maria/ma_blockrec.h index c11c341f782..71feb33cabb 100644 --- a/storage/maria/ma_blockrec.h +++ b/storage/maria/ma_blockrec.h @@ -105,8 +105,6 @@ enum en_page_type { UNALLOCATED_PAGE, HEAD_PAGE, TAIL_PAGE, BLOB_PAGE, MAX_PAGE_ /* Don't allocate memory for too many row extents on the stack */ #define ROW_EXTENTS_ON_STACK 32 -extern uchar maria_bitmap_marker[2]; - /* Functions to convert MARIA_RECORD_POS to/from page:offset */ static inline MARIA_RECORD_POS ma_recordpos(ulonglong page, uint dir_entry) @@ -178,6 +176,7 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info, ulonglong page, uint *bitmap_pattern); void _ma_bitmap_delete_all(MARIA_SHARE *share); +int _ma_bitmap_create_first(MARIA_SHARE *share); uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, uint page_type, const uchar *header, @@ -186,3 +185,5 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, uint page_type, const uchar *header); +uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn, + const uchar *header); diff --git a/storage/maria/ma_close.c b/storage/maria/ma_close.c index b52ce113540..f287aa1bb68 100644 --- a/storage/maria/ma_close.c +++ b/storage/maria/ma_close.c @@ -87,7 +87,7 @@ int maria_close(register MARIA_HA *info) may be using the file at this point IF using --external-locking, which does not apply to Maria. */ - if (share->mode != O_RDONLY && maria_is_crashed(info)) + if (share->mode != O_RDONLY) _ma_state_info_write(share->kfile.file, &share->state, 1); if (my_close(share->kfile.file, MYF(0))) error= my_errno; diff --git a/storage/maria/ma_control_file.c b/storage/maria/ma_control_file.c index 66f0c37f4a3..4174a0e797e 100644 --- a/storage/maria/ma_control_file.c +++ b/storage/maria/ma_control_file.c @@ -51,6 +51,8 @@ uint32 last_logno= FILENO_IMPOSSIBLE; it is called at startup. */ my_bool maria_multi_threaded= FALSE; +/** @brief if currently doing a recovery */ +my_bool maria_in_recovery= FALSE; /* Control file is less then 512 bytes (a disk sector), diff --git a/storage/maria/ma_control_file.h b/storage/maria/ma_control_file.h index d6c121b21be..d69f221abb8 100644 --- a/storage/maria/ma_control_file.h +++ b/storage/maria/ma_control_file.h @@ -18,6 +18,9 @@ First version written by Guilhem Bichot on 2006-04-27. */ +#ifndef _ma_control_file_h +#define _ma_control_file_h + #define CONTROL_FILE_BASE_NAME "maria_log_control" /* Here is the interface of this module */ @@ -33,7 +36,7 @@ extern LSN last_checkpoint_lsn; */ extern uint32 last_logno; -extern my_bool maria_multi_threaded; +extern my_bool maria_multi_threaded, maria_in_recovery; typedef enum enum_control_file_error { CONTROL_FILE_OK= 0, @@ -74,3 +77,4 @@ int ma_control_file_end(); #ifdef __cplusplus } #endif +#endif diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c index 88374872ce2..1736e24a7b6 100644 --- a/storage/maria/ma_create.c +++ b/storage/maria/ma_create.c @@ -677,7 +677,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, /* max_data_file_length and max_key_file_length are recalculated on open */ if (tmp_table) share.base.max_data_file_length= (my_off_t) ci->data_file_length; - else if (ci->transactional && translog_inited) + else if (ci->transactional && translog_inited && !maria_in_recovery) { /* we have checked translog_inited above, because maria_chk may call us @@ -940,23 +940,31 @@ int maria_create(const char *name, enum data_file_type datafile_type, for (i= TRANSLOG_INTERNAL_PARTS; i < (sizeof(log_array)/sizeof(log_array[0])); i++) total_rec_length+= log_array[i].length; - /* - For this record to be of any use for Recovery, we need the upper - MySQL layer to be crash-safe, which it is not now (that would require - work using the ddl_log of sql/sql_table.cc); when it is, we should - reconsider the moment of writing this log record (before or after op, - under THR_LOCK_maria or not...), how to use it in Recovery. - For now this record can serve when we apply logs to a backup, - so we sync it. This happens before the data file is created. If the data - file was created before, and we crashed before writing the log record, - at restart the table may be used, so we would not have a trustable - history in the log (impossible to apply this log to a backup). The way - we do it, if we crash before writing the log record then there is no - data file and the table cannot be used. - Note that in case of TRUNCATE TABLE we also come here. - When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not called - external_lock(), so have no TRN. It does not matter, as all these - operations are non-transactional and sync their files. + /** + For this record to be of any use for Recovery, we need the upper + MySQL layer to be crash-safe, which it is not now (that would require + work using the ddl_log of sql/sql_table.cc); when it is, we should + reconsider the moment of writing this log record (before or after op, + under THR_LOCK_maria or not...), how to use it in Recovery. + For now this record can serve when we apply logs to a backup, + so we sync it. This happens before the data file is created. If the + data file was created before, and we crashed before writing the log + record, at restart the table may be used, so we would not have a + trustable history in the log (impossible to apply this log to a + backup). The way we do it, if we crash before writing the log record + then there is no data file and the table cannot be used. + @todo Note that in case of TRUNCATE TABLE we also come here; for + Recovery to be able to finish TRUNCATE TABLE, instead of leaving a + half-truncated table, we should log the record at start of + maria_create(); for that we shouldn't write to the index file but to a + buffer (DYNAMIC_STRING), put the buffer into the record, then put the + buffer into the index file (so, change _ma_keydef_write() etc). That + would also enable Recovery to finish a CREATE TABLE. The final result + would be that we would be able to finish what the SQL layer has asked + for: it would be atomic. + When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not + called external_lock(), so have no TRN. It does not matter, as all + these operations are non-transactional and sync their files. */ if (unlikely(translog_write_record(&share.state.create_rename_lsn, LOGREC_REDO_CREATE_TABLE, @@ -1016,6 +1024,20 @@ int maria_create(const char *name, enum data_file_type datafile_type, goto err; errpos=3; + /* + QQ: this sets data_file_length from 0 to 8192, but we wrote the state + already to the index file (because: + - log record is built from index header so state must be written before + log record + - data file must be created after log record, so that "missing log + record" implies "unusable table"). + Thus, we below create a 8192-byte data file, but its recorded size is 0, + so next time we read the bitmap (a maria_write() for example) we'll + overwrite the bitmap we just created below. + It's not very efficient. Though there is no bug. + Why do we absolutely want to create a 8192-byte page for a freshly + created, empty table? Why don't we leave the data file empty? + */ if (_ma_initialize_data_file(&share, dfile)) goto err; } @@ -1159,11 +1181,14 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile) { if (share->data_file_type == BLOCK_RECORD) { - if (my_chsize(dfile, share->base.block_size, 0, MYF(MY_WME))) - return 1; - share->state.state.data_file_length= share->base.block_size; - _ma_bitmap_delete_all(share); + share->bitmap.block_size= share->base.block_size; + share->bitmap.file.file = dfile; + return _ma_bitmap_create_first(share); } + /* + So, in BLOCK_RECORD, a freshly created datafile is one page long; while in + other formats it is 0-byte long. + */ return 0; } diff --git a/storage/maria/ma_delete_table.c b/storage/maria/ma_delete_table.c index 6d6b9d032fd..693c68c7e5f 100644 --- a/storage/maria/ma_delete_table.c +++ b/storage/maria/ma_delete_table.c @@ -64,7 +64,8 @@ int maria_delete_table(const char *name) raid_type= info->s->base.raid_type; raid_chunks= info->s->base.raid_chunks; #endif - sync_dir= (info->s->now_transactional && !info->s->temporary) ? + sync_dir= (info->s->now_transactional && !info->s->temporary && + !maria_in_recovery) ? MY_SYNC_DIR : 0; maria_close(info); } @@ -85,7 +86,7 @@ int maria_delete_table(const char *name) LSN lsn; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char *)name; - log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name); + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name) + 1; if (unlikely(translog_write_record(&lsn, LOGREC_REDO_DROP_TABLE, &dummy_transaction_object, NULL, log_array[TRANSLOG_INTERNAL_PARTS + diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index d5c4d59c45f..fa604f71b4d 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -181,10 +181,10 @@ static MARIA_SHARE **id_to_share= NULL; static my_atomic_rwlock_t LOCK_id_to_share; static my_bool write_hook_for_redo(enum translog_record_type type, - TRN *trn, LSN *lsn, + TRN *trn, MARIA_SHARE *share, LSN *lsn, struct st_translog_parts *parts); static my_bool write_hook_for_undo(enum translog_record_type type, - TRN *trn, LSN *lsn, + TRN *trn, MARIA_SHARE *share, LSN *lsn, struct st_translog_parts *parts); /* @@ -197,27 +197,27 @@ LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; static LOG_DESC INIT_LOGREC_FIXED_RECORD_0LSN_EXAMPLE= {LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0, - "fixed0example", FALSE, NULL, NULL}; + "fixed0example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0, -"variable0example", FALSE, NULL, NULL}; +"variable0example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_FIXED_RECORD_1LSN_EXAMPLE= {LOGRECTYPE_PSEUDOFIXEDLENGTH, 7, 7, NULL, NULL, NULL, 1, -"fixed1example", FALSE, NULL, NULL}; +"fixed1example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_1LSN_EXAMPLE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 12, NULL, NULL, NULL, 1, -"variable1example", FALSE, NULL, NULL}; +"variable1example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_FIXED_RECORD_2LSN_EXAMPLE= {LOGRECTYPE_PSEUDOFIXEDLENGTH, 23, 23, NULL, NULL, NULL, 2, -"fixed2example", FALSE, NULL, NULL}; +"fixed2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 19, NULL, NULL, NULL, 2, -"variable2example", FALSE, NULL, NULL}; +"variable2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; void example_loghandler_init() @@ -239,157 +239,172 @@ void example_loghandler_init() static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23= {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0, - "reserved", FALSE, NULL, NULL }; + "reserved", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL }; static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_HEAD= {LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_insert_row_head", FALSE, NULL, NULL}; + "redo_insert_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_TAIL= {LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_insert_row_tail", FALSE, NULL, NULL}; + "redo_insert_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOB= {LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0, - "redo_insert_row_blob", FALSE, NULL, NULL}; + "redo_insert_row_blob", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; /*QQQ:TODO:header???*/ static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS= {LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_insert_row_blobs", FALSE, NULL, NULL}; + "redo_insert_row_blobs", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_HEAD= {LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_purge_row_head", FALSE, NULL, NULL}; + "redo_purge_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL= {LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_purge_row_tail", FALSE, NULL, NULL}; + "redo_purge_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; /* QQQ: TODO: variable and fixed size??? */ static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS= {LOGRECTYPE_VARIABLE_LENGTH, - 0, - FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE, + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_purge_blocks", FALSE, NULL, NULL}; + "redo_purge_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW= {LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0, - "redo_delete_row", FALSE, NULL, NULL}; + "redo_delete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_UPDATE_ROW_HEAD= {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0, - "redo_update_row_head", FALSE, NULL, NULL}; + "redo_update_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_INDEX= {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0, - "redo_index", FALSE, NULL, NULL}; + "redo_index", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW= {LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0, - "redo_undelete_row", FALSE, NULL, NULL}; + "redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_CLR_END= {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1, - "clr_end", TRUE, NULL, NULL}; + "clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_PURGE_END= {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1, - "purge_end", TRUE, NULL, NULL}; + "purge_end", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT= {LOGRECTYPE_FIXEDLENGTH, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_undo, NULL, 0, - "undo_row_insert", TRUE, NULL, NULL}; + "undo_row_insert", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE= {LOGRECTYPE_VARIABLE_LENGTH, 0, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_undo, NULL, 0, - "undo_row_delete", TRUE, NULL, NULL}; + "undo_row_delete", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE= {LOGRECTYPE_VARIABLE_LENGTH, 0, LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, write_hook_for_undo, NULL, 1, - "undo_row_update", TRUE, NULL, NULL}; + "undo_row_update", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_ROW_PURGE= -{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE, LSN_STORE_SIZE, - NULL, NULL, NULL, 1, - "undo_row_purge", TRUE, NULL, NULL}; +{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE + FILEID_STORE_SIZE, + LSN_STORE_SIZE + FILEID_STORE_SIZE, + NULL, write_hook_for_undo, NULL, 1, + "undo_row_purge", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT= {LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, write_hook_for_undo, NULL, 1, - "undo_key_insert", TRUE, NULL, NULL}; + "undo_key_insert", LOGREC_LAST_IN_GROUP, NULL, NULL}; static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, write_hook_for_undo, NULL, 0, - "undo_key_delete", TRUE, NULL, NULL}; // QQ: why not compressed? + "undo_key_delete", LOGREC_LAST_IN_GROUP, NULL, NULL}; // QQ: why not compressed? static LOG_DESC INIT_LOGREC_PREPARE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0, - "prepare", TRUE, NULL, NULL}; + "prepare", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_PREPARE_WITH_UNDO_PURGE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1, - "prepare_with_undo_purge", TRUE, NULL, NULL}; + "prepare_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_COMMIT= -{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0, - "commit", TRUE, NULL, NULL}; +{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, + NULL, NULL, 0, "commit", LOGREC_IS_GROUP_ITSELF, NULL, + NULL}; static LOG_DESC INIT_LOGREC_COMMIT_WITH_UNDO_PURGE= {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1, - "commit_with_undo_purge", TRUE, NULL, NULL}; + "commit_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_CHECKPOINT= {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0, - "checkpoint", TRUE, NULL, NULL}; + "checkpoint", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_CREATE_TABLE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 1 + 2, NULL, NULL, NULL, 0, -"redo_create_table", TRUE, NULL, NULL}; +"redo_create_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_RENAME_TABLE= {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0, - "redo_rename_table", TRUE, NULL, NULL}; + "redo_rename_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; +/** + @todo LOG BUG + the "1" below is a hack to overcome a bug in the log handler where a 0-byte + header is considered a read failure: + translog_read_record() calls translog_init_reader_data() which calls + translog_read_record_header_scan() which calls + translog_read_record_header_from_buffer() which calls + translog_variable_length_header() which returns 0 (normal); + translog_init_reader_data() considers this 0 as a problem, + and thus translog_read_record() fails. +*/ static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE= -{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0, - "redo_drop_table", TRUE, NULL, NULL}; +{LOGRECTYPE_VARIABLE_LENGTH, 0, 1, NULL, NULL, NULL, 0, + "redo_drop_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL= {LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE, NULL, write_hook_for_redo, NULL, 0, - "redo_delete_all", TRUE, NULL, NULL}; + "redo_delete_all", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE= {LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + 4, FILEID_STORE_SIZE + 4, NULL, NULL, NULL, 0, - "redo_repair_table", TRUE, NULL, NULL}; + "redo_repair_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_FILE_ID= {LOGRECTYPE_VARIABLE_LENGTH, 0, 2, NULL, NULL, NULL, 0, - "file_id", TRUE, NULL, NULL}; + "file_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; static LOG_DESC INIT_LOGREC_LONG_TRANSACTION_ID= {LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0, - "long_transaction_id", TRUE, NULL, NULL}; + "long_transaction_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL; @@ -3045,6 +3060,7 @@ static translog_size_t translog_get_current_group_size() static my_bool translog_write_variable_record_1group(LSN *lsn, enum translog_record_type type, + MARIA_SHARE *share, SHORT_TRANSACTION_ID short_trid, struct st_translog_parts *parts, struct st_translog_buffer @@ -3062,7 +3078,8 @@ translog_write_variable_record_1group(LSN *lsn, *lsn= horizon= log_descriptor.horizon; if (log_record_type_descriptor[type].inwrite_hook && - (*log_record_type_descriptor[type].inwrite_hook)(type, trn, lsn, parts)) + (*log_record_type_descriptor[type].inwrite_hook)(type, trn, share, + lsn, parts)) { translog_unlock(); DBUG_RETURN(1); @@ -3199,6 +3216,7 @@ translog_write_variable_record_1group(LSN *lsn, static my_bool translog_write_variable_record_1chunk(LSN *lsn, enum translog_record_type type, + MARIA_SHARE *share, SHORT_TRANSACTION_ID short_trid, struct st_translog_parts *parts, struct st_translog_buffer @@ -3214,7 +3232,7 @@ translog_write_variable_record_1chunk(LSN *lsn, *lsn= log_descriptor.horizon; if (log_record_type_descriptor[type].inwrite_hook && - (*log_record_type_descriptor[type].inwrite_hook)(type, trn, + (*log_record_type_descriptor[type].inwrite_hook)(type, trn, share, lsn, parts)) { translog_unlock(); @@ -3567,6 +3585,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts, static my_bool translog_write_variable_record_mgroup(LSN *lsn, enum translog_record_type type, + MARIA_SHARE *share, SHORT_TRANSACTION_ID short_trid, struct st_translog_parts *parts, struct st_translog_buffer @@ -3909,7 +3928,7 @@ translog_write_variable_record_mgroup(LSN *lsn, first_chunk0= 0; *lsn= horizon; if (log_record_type_descriptor[type].inwrite_hook && - (*log_record_type_descriptor[type].inwrite_hook) (type, trn, + (*log_record_type_descriptor[type].inwrite_hook) (type, trn, share, lsn, parts)) goto err; } @@ -3995,6 +4014,7 @@ err: static my_bool translog_write_variable_record(LSN *lsn, enum translog_record_type type, + MARIA_SHARE *share, SHORT_TRANSACTION_ID short_trid, struct st_translog_parts *parts, TRN *trn) @@ -4007,6 +4027,7 @@ static my_bool translog_write_variable_record(LSN *lsn, /* Max number of such LSNs per record is 2 */ uchar compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD * COMPRESSED_LSN_MAX_STORE_SIZE]; + my_bool res; DBUG_ENTER("translog_write_variable_record"); translog_lock(); @@ -4071,9 +4092,11 @@ static my_bool translog_write_variable_record(LSN *lsn, if (page_rest >= parts->record_length + header_length1) { /* following function makes translog_unlock(); */ - DBUG_RETURN(translog_write_variable_record_1chunk(lsn, type, short_trid, - parts, buffer_to_flush, - header_length1, trn)); + res= translog_write_variable_record_1chunk(lsn, type, share, + short_trid, + parts, buffer_to_flush, + header_length1, trn); + DBUG_RETURN(res); } buffer_rest= translog_get_current_group_size(); @@ -4081,15 +4104,19 @@ static my_bool translog_write_variable_record(LSN *lsn, if (buffer_rest >= parts->record_length + header_length1 - page_rest) { /* following function makes translog_unlock(); */ - DBUG_RETURN(translog_write_variable_record_1group(lsn, type, short_trid, - parts, buffer_to_flush, - header_length1, trn)); + res= translog_write_variable_record_1group(lsn, type, share, + short_trid, + parts, buffer_to_flush, + header_length1, trn); + DBUG_RETURN(res); } /* following function makes translog_unlock(); */ - DBUG_RETURN(translog_write_variable_record_mgroup(lsn, type, short_trid, - parts, buffer_to_flush, - header_length1, - buffer_rest, trn)); + res= translog_write_variable_record_mgroup(lsn, type, share, + short_trid, + parts, buffer_to_flush, + header_length1, + buffer_rest, trn); + DBUG_RETURN(res); } @@ -4112,6 +4139,7 @@ static my_bool translog_write_variable_record(LSN *lsn, static my_bool translog_write_fixed_record(LSN *lsn, enum translog_record_type type, + MARIA_SHARE *share, SHORT_TRANSACTION_ID short_trid, struct st_translog_parts *parts, TRN *trn) @@ -4164,7 +4192,7 @@ static my_bool translog_write_fixed_record(LSN *lsn, *lsn= log_descriptor.horizon; if (log_record_type_descriptor[type].inwrite_hook && - (*log_record_type_descriptor[type].inwrite_hook) (type, trn, + (*log_record_type_descriptor[type].inwrite_hook) (type, trn, share, lsn, parts)) { rc= 1; @@ -4363,11 +4391,13 @@ my_bool translog_write_record(LSN *lsn, { switch (log_record_type_descriptor[type].class) { case LOGRECTYPE_VARIABLE_LENGTH: - rc= translog_write_variable_record(lsn, type, short_trid, &parts, trn); + rc= translog_write_variable_record(lsn, type, share, + short_trid, &parts, trn); break; case LOGRECTYPE_PSEUDOFIXEDLENGTH: case LOGRECTYPE_FIXEDLENGTH: - rc= translog_write_fixed_record(lsn, type, short_trid, &parts, trn); + rc= translog_write_fixed_record(lsn, type, share, + short_trid, &parts, trn); break; case LOGRECTYPE_NOT_ALLOWED: default: @@ -4927,6 +4957,7 @@ translog_read_record_header_from_buffer(uchar *page, TRANSLOG_HEADER_BUFFER *buff, TRANSLOG_SCANNER_DATA *scanner) { + translog_size_t res; DBUG_ENTER("translog_read_record_header_from_buffer"); DBUG_ASSERT((page[page_offset] & TRANSLOG_CHUNK_TYPE) == TRANSLOG_CHUNK_LSN || @@ -4941,15 +4972,18 @@ translog_read_record_header_from_buffer(uchar *page, /* Read required bytes from the header and call hook */ switch (log_record_type_descriptor[buff->type].class) { case LOGRECTYPE_VARIABLE_LENGTH: - DBUG_RETURN(translog_variable_length_header(page, page_offset, buff, - scanner)); + res= translog_variable_length_header(page, page_offset, buff, + scanner); + break; case LOGRECTYPE_PSEUDOFIXEDLENGTH: case LOGRECTYPE_FIXEDLENGTH: - DBUG_RETURN(translog_fixed_length_header(page, page_offset, buff)); + res= translog_fixed_length_header(page, page_offset, buff); + break; default: DBUG_ASSERT(0); + res= 0; } - DBUG_RETURN(0); /* purecov: deadcode */ + DBUG_RETURN(res); } @@ -4979,7 +5013,7 @@ translog_size_t translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff) { uchar buffer[TRANSLOG_PAGE_SIZE], *page; - translog_size_t page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE; + translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE; TRANSLOG_ADDRESS addr; TRANSLOG_VALIDATOR_DATA data; DBUG_ENTER("translog_read_record_header"); @@ -4993,11 +5027,9 @@ translog_size_t translog_read_record_header(LSN lsn, data.was_recovered= 0; addr= lsn; addr-= page_offset; /* offset decreasing */ - if (!(page= translog_get_page(&data, buffer))) - DBUG_RETURN(0); - - DBUG_RETURN(translog_read_record_header_from_buffer(page, page_offset, - buff, 0)); + res= (!(page= translog_get_page(&data, buffer))) ? 0 : + translog_read_record_header_from_buffer(page, page_offset, buff, 0); + DBUG_RETURN(res); } @@ -5030,6 +5062,7 @@ translog_read_record_header_scan(TRANSLOG_SCANNER_DATA TRANSLOG_HEADER_BUFFER *buff, my_bool move_scanner) { + translog_size_t res; DBUG_ENTER("translog_read_record_header_scan"); DBUG_PRINT("enter", ("Scanner: Cur: (%lu,0x%lx) Hrz: (%lu,0x%lx) " "Lst: (%lu,0x%lx) Offset: %u(%x) fixed %d", @@ -5044,11 +5077,12 @@ translog_read_record_header_scan(TRANSLOG_SCANNER_DATA buff->groups_no= 0; buff->lsn= scanner->page_addr; buff->lsn+= scanner->page_offset; /* offset increasing */ - DBUG_RETURN(translog_read_record_header_from_buffer(scanner->page, - scanner->page_offset, - buff, - (move_scanner ? - scanner : 0))); + res= translog_read_record_header_from_buffer(scanner->page, + scanner->page_offset, + buff, + (move_scanner ? + scanner : 0)); + DBUG_RETURN(res); } @@ -5083,7 +5117,7 @@ translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA TRANSLOG_HEADER_BUFFER *buff) { uint8 chunk_type; - + translog_size_t res; buff->groups_no= 0; /* to be sure that we will free it right */ DBUG_ENTER("translog_read_next_record_header"); @@ -5114,9 +5148,11 @@ translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA /* Last record was read */ buff->lsn= LSN_IMPOSSIBLE; /* Return 'end of log' marker */ - DBUG_RETURN(TRANSLOG_RECORD_HEADER_MAX_SIZE + 1); + res= TRANSLOG_RECORD_HEADER_MAX_SIZE + 1; } - DBUG_RETURN(translog_read_record_header_scan(scanner, buff, 0)); + else + res= translog_read_record_header_scan(scanner, buff, 0); + DBUG_RETURN(res); } @@ -5610,7 +5646,9 @@ my_bool translog_flush(LSN lsn) static my_bool write_hook_for_redo(enum translog_record_type type __attribute__ ((unused)), - TRN *trn, LSN *lsn, + TRN *trn, MARIA_SHARE *share + __attribute__ ((unused)), + LSN *lsn, struct st_translog_parts *parts __attribute__ ((unused))) { @@ -5646,7 +5684,9 @@ static my_bool write_hook_for_redo(enum translog_record_type type static my_bool write_hook_for_undo(enum translog_record_type type __attribute__ ((unused)), - TRN *trn, LSN *lsn, + TRN *trn, MARIA_SHARE *share + __attribute__ ((unused)), + LSN *lsn, struct st_translog_parts *parts __attribute__ ((unused))) { diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h index 8382271a07a..011b8f4cf83 100644 --- a/storage/maria/ma_loghandler.h +++ b/storage/maria/ma_loghandler.h @@ -289,7 +289,7 @@ typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type, struct st_translog_parts *parts); typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type, - TRN *trn, + TRN *trn, struct st_maria_share *share, LSN *lsn, struct st_translog_parts *parts); @@ -309,6 +309,11 @@ enum record_class /* C++ can't bear that a variable's name is "class" */ #ifndef __cplusplus + +enum enum_record_in_group { + LOGREC_NOT_LAST_IN_GROUP= 0, LOGREC_LAST_IN_GROUP, LOGREC_IS_GROUP_ITSELF +}; + /* Descriptor of log record type Note: Don't reorder because of constructs later... @@ -338,7 +343,7 @@ typedef struct st_log_record_type_descriptor /* the rest is for maria_read_log & Recovery */ /** @brief for debug error messages or "maria_read_log" command-line tool */ const char *name; - my_bool record_ends_group; + enum enum_record_in_group record_in_group; /* a function to execute when we see the record during the REDO phase */ int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *); /* a function to execute when we see the record during the UNDO phase */ diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c index a42fbdf0458..eb802969bce 100644 --- a/storage/maria/ma_recovery.c +++ b/storage/maria/ma_recovery.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB +/* Copyright (C) 2006, 2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,180 +16,1097 @@ /* WL#3072 Maria recovery First version written by Guilhem Bichot on 2006-04-27. - Does not compile yet. */ /* Here is the implementation of this module */ -#include "page_cache.h" -#include "least_recently_dirtied.h" -#include "transaction.h" -#include "share.h" -#include "log.h" +#include "maria_def.h" +#include "ma_recovery.h" +#include "ma_blockrec.h" -typedef struct st_record_type_properties { - /* used for debug error messages or "maria_read_log" command-line tool: */ - char *name, - my_bool record_ends_group; - /* a function to execute when we see the record during the REDO phase */ - int (*record_execute_in_redo_phase)(RECORD *); /* param will be record header instead later */ - /* a function to execute when we see the record during the UNDO phase */ - int (*record_execute_in_undo_phase)(RECORD *); /* param will be record header instead later */ -} RECORD_TYPE_PROPERTIES; - -int no_op(RECORD *) {return 0}; - -RECORD_TYPE_PROPERTIES all_record_type_properties[]= +struct TRN_FOR_RECOVERY { - /* listed here in the order of the "log records type" enumeration */ - {"REDO_INSERT_HEAD", FALSE, redo_insert_head_execute_in_redo_phase, no_op}, - ..., - {"UNDO_INSERT" , TRUE , undo_insert_execute_in_redo_phase, undo_insert_execute_in_undo_phase}, - {"COMMIT", , TRUE , commit_execute_in_redo_phase, no_op}, - ... + LSN group_start_lsn, undo_lsn; + TrID long_trid; }; -int redo_insert_head_execute_in_redo_phase(RECORD *record) +/* Variables used by all functions of this module. Ok as single-threaded */ +static struct TRN_FOR_RECOVERY *all_active_trans; +static MARIA_HA **all_tables; +static LSN current_group_end_lsn; +FILE *tracef; /**< trace file for debugging */ + +#define prototype_exec_hook(R) \ +static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) +prototype_exec_hook(LONG_TRANSACTION_ID); +#ifdef MARIA_CHECKPOINT +prototype_exec_hook(CHECKPOINT); +#endif +prototype_exec_hook(REDO_CREATE_TABLE); +prototype_exec_hook(REDO_DROP_TABLE); +prototype_exec_hook(FILE_ID); +prototype_exec_hook(REDO_INSERT_ROW_HEAD); +prototype_exec_hook(REDO_INSERT_ROW_TAIL); +prototype_exec_hook(REDO_PURGE_ROW_HEAD); +prototype_exec_hook(REDO_PURGE_ROW_TAIL); +prototype_exec_hook(REDO_PURGE_BLOCKS); +prototype_exec_hook(REDO_DELETE_ALL); +prototype_exec_hook(UNDO_ROW_INSERT); +prototype_exec_hook(UNDO_ROW_DELETE); +prototype_exec_hook(UNDO_ROW_PURGE); +prototype_exec_hook(COMMIT); +static int end_of_redo_phase(); +static void display_record_position(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec, + uint number); +static int display_and_apply_record(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec); +static MARIA_HA *get_MARIA_HA_from_REDO_record(const + TRANSLOG_HEADER_BUFFER *rec); +static MARIA_HA *get_MARIA_HA_from_UNDO_record(const + TRANSLOG_HEADER_BUFFER *rec); +static int close_recovered_table(MARIA_HA *info); + + +/** @brief global [out] buffer for translog_read_record(); never shrinks */ +static LEX_STRING log_record_buffer; +#define enlarge_buffer(rec) \ + if (log_record_buffer.length < rec->record_length) \ + { \ + log_record_buffer.length= rec->record_length; \ + log_record_buffer.str= my_realloc(log_record_buffer.str, \ + rec->record_length, MYF(MY_WME)); \ + } + +#define ALERT_USER() DBUG_ASSERT(0) + + +/** + @brief Recovers from the last checkpoint +*/ + +int maria_recover() { - /* write the data to the proper page */ -} + my_bool res= TRUE; + LSN from_lsn; + FILE *trace_file; + DBUG_ENTER("maria_recover"); -int undo_insert_execute_in_redo_phase(RECORD *record) -{ - trans_table[short_trans_id].undo_lsn= record.lsn; - /* don't restore the old version of the row */ -} + DBUG_ASSERT(!maria_in_recovery); + maria_in_recovery= TRUE; -int undo_insert_execute_in_undo_phase(RECORD *record) -{ - /* restore the old version of the row */ - trans_table[short_trans_id].undo_lsn= record.prev_undo_lsn; -} - -int commit_execute_in_redo_phase(RECORD *record) -{ - trans_table[short_trans_id].state= COMMITTED; - /* - and that's all: the delete/update handler should not be woken up! as there - may be REDO for purge further in the log. - */ -} - -#define record_ends_group(R) \ - all_record_type_properties[(R)->type].record_ends_group) - -#define execute_log_record_in_redo_phase(R) \ - all_record_type_properties[(R).type].record_execute_in_redo_phase(R) - - -int recovery() -{ - control_file_create_or_open(); - /* - init log handler: tell it that we are going to do large reads of the - log, sequential and backward. Log handler could decide to alloc a big - read-only IO_CACHE for this, or use its usual page cache. - */ - - /* read checkpoint log record from log handler */ - RECORD *checkpoint_record= log_read_record(last_checkpoint_lsn_at_start); - - /* parse this record, build structs (dirty_pages, transactions table, file_map) */ - /* - read log records (note: sometimes only the header is needed, for ex during - REDO phase only the header of UNDO is needed, not the 4G blob in the - variable-length part, so I could use that; however for PREPARE (which is a - variable-length record) I'll need to read the full record in the REDO - phase): - */ - - /**** REDO PHASE *****/ - - record= log_read_record(min(rec_lsn, ...)); /* later, read only header */ - - /* - if log handler knows the end LSN of the log, we could print here how many - MB of log we have to read (to give an idea of the time), and print - progress notes. - */ - - while (record != NULL) + if (last_checkpoint_lsn == LSN_IMPOSSIBLE) + from_lsn= first_lsn_in_log(); + else { + DBUG_ASSERT(0); /* not yet implemented */ + /** + @todo read the checkpoint record, fill structures + and use the minimum of checkpoint_start_lsn, rec_lsn of trns, rec_lsn + of dirty pages. + */ + //from_lsn= something; + } + + /* + mysqld has not yet initialized any page cache. Let's create a dedicated + one for recovery. + */ + if ((trace_file= fopen("maria_recovery.trace", "w"))) + { + fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n"); + res= (init_pagecache(maria_pagecache, + /** @todo what size? */ + 1024*1024, + 0, 0, + maria_block_size) == 0) || + maria_apply_log(from_lsn, TRUE, trace_file); + end_pagecache(maria_pagecache, TRUE); + if (!res) + fprintf(trace_file, "SUCCESS\n"); + fclose(trace_file); + } + /** + @todo take checkpoint if log applying did some work. + Be sure to not checkpoint if no work. + */ + maria_in_recovery= FALSE; + DBUG_RETURN(res); +} + + +/** + @brief Displays and/or applies the log + + @param lsn LSN from which log reading/applying should start + @param apply if log records should be applied or not + @param trace_file trace file where progress/debug messages will go + + @todo This trace_file thing is primitive; soon we will make it similar to + ma_check_print_warning() etc, and a successful recovery does not need to + create a trace file. But for debugging now it is useful. + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file) +{ + int error= 0; + DBUG_ENTER("maria_apply_log"); + + DBUG_ASSERT(!maria_multi_threaded); + all_active_trans= (struct TRN_FOR_RECOVERY *) + my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct TRN_FOR_RECOVERY), + MYF(MY_ZEROFILL)); + all_tables= (MARIA_HA **)my_malloc((SHARE_ID_MAX + 1) * sizeof(MARIA_HA *), + MYF(MY_ZEROFILL)); + if (!all_active_trans || !all_tables) + goto err; + + tracef= trace_file; + /* install hooks for execution */ +#define install_exec_hook(R) \ + log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ + exec_LOGREC_ ## R; + install_exec_hook(LONG_TRANSACTION_ID); +#ifdef MARIA_CHECKPOINT + install_exec_hook(CHECKPOINT); +#endif + install_exec_hook(REDO_CREATE_TABLE); + install_exec_hook(REDO_DROP_TABLE); + install_exec_hook(FILE_ID); + install_exec_hook(REDO_INSERT_ROW_HEAD); + install_exec_hook(REDO_INSERT_ROW_TAIL); + install_exec_hook(REDO_PURGE_ROW_HEAD); + install_exec_hook(REDO_PURGE_ROW_TAIL); + install_exec_hook(REDO_PURGE_BLOCKS); + install_exec_hook(REDO_DELETE_ALL); + install_exec_hook(UNDO_ROW_INSERT); + install_exec_hook(UNDO_ROW_DELETE); + install_exec_hook(UNDO_ROW_PURGE); + install_exec_hook(COMMIT); + + current_group_end_lsn= LSN_IMPOSSIBLE; + + TRANSLOG_HEADER_BUFFER rec; + struct st_translog_scanner_data scanner; + uint i= 1; + + translog_size_t len= translog_read_record_header(lsn, &rec); + + /** @todo translog_read_record_header() should be fixed for 0-byte headers */ + if (len == 0) /* means error, but apparently EOF too */ + { + fprintf(tracef, "empty log\n"); + goto end; + } + + if (translog_init_scanner(lsn, 1, &scanner)) + { + fprintf(tracef, "Scanner init failed\n"); + goto err; + } + for (;;i++) + { + uint16 sid= rec.short_trid; + const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type]; + display_record_position(log_desc, &rec, i); + /* A complete group is a set of log records with an "end mark" record (e.g. a set of REDOs for an operation, terminated by an UNDO for this operation); if there is no "end mark" record the group is incomplete and won't be executed. */ - if (record_ends_group(record) + if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) || + (log_desc->record_in_group == LOGREC_LAST_IN_GROUP)) { - if (trans_table[record.short_trans_id].group_start_lsn != 0) + if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE) { - /* - There is a complete group for this transaction, containing more than - this event. - We're going to read recently read log records: - for this log_read_record() to be efficient (not touch the disk), - log handler could cache recently read pages - (can just use an IO_CACHE of 10 MB to read the log, or the normal - log handler page cache). - Without it only OS file cache will help. - */ - record2= - log_read_record(trans_table[record.short_trans_id].group_start_lsn); - - do + if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) { - if (record2.short_trans_id == record.short_trans_id) - execute_log_record_in_redo_phase(record2); /* it's in our group */ - record2= log_read_next_record(); + /* + can happen if the transaction got a table write error, then + unlocked tables thus wrote a COMMIT record. + */ + fprintf(tracef, "\nDiscarding unfinished group before this record\n"); + ALERT_USER(); + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + } + else + { + /* + There is a complete group for this transaction, containing more + than this event. + */ + fprintf(tracef, " ends a group:\n"); + struct st_translog_scanner_data scanner2; + TRANSLOG_HEADER_BUFFER rec2; + len= + translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2); + if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) + { + fprintf(tracef, "Cannot find record where it should be\n"); + goto err; + } + if (translog_init_scanner(rec2.lsn, 1, &scanner2)) + { + fprintf(tracef, "Scanner2 init failed\n"); + goto err; + } + current_group_end_lsn= rec.lsn; + do + { + if (rec2.short_trid == sid) /* it's in our group */ + { + const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type]; + display_record_position(log_desc2, &rec2, 0); + if (apply && display_and_apply_record(log_desc2, &rec2)) + goto err; + } + len= translog_read_next_record_header(&scanner2, &rec2); + if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) + { + fprintf(tracef, "Cannot find record where it should be\n"); + goto err; + } + } + while (rec2.lsn < rec.lsn); + translog_free_record_header(&rec2); + /* group finished */ + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */ + display_record_position(log_desc, &rec, 0); } - while (record2.lsn < record.lsn); - trans_table[record.short_trans_id].group_start_lsn= 0; /* group finished */ } - execute_log_record_in_redo_phase(record); + if (apply && display_and_apply_record(log_desc, &rec)) + goto err; } else /* record does not end group */ { /* just record the fact, can't know if can execute yet */ - if (trans_table[short_trans_id].group_start_lsn == 0) /* group not yet started */ - trans_table[short_trans_id].group_start_lsn= record.lsn; + if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE) + { + /* group not yet started */ + all_active_trans[sid].group_start_lsn= rec.lsn; + } + } + len= translog_read_next_record_header(&scanner, &rec); + if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) + { + fprintf(tracef, "EOF on the log\n"); + break; } - - /* - Later we can optimize: instead of "execute_log_record(record2)", do - copy_record_into_exec_buffer(record2): - this will just copy record into a multi-record (10 MB?) memory buffer, - and when buffer is full, will do sorting of REDOs per - page id and execute them. - This sorting will enable us to do more sequential reads of the - data/index pages. - Note that updating bitmap pages (when we have executed a REDO for a page - we update its bitmap page) may break the sequential read of pages, - so maybe we should read and cache bitmap pages in the beginning. - Or ok the sequence will be broken, but quickly all bitmap pages will be - in memory and so the sequence will not be broken anymore. - Sorting could even determine, based on physical device of files - ("st_dev" in stat()), that some files should be should be taken by - different threads, if we want to do parallism. - */ - /* - Here's how to read a complete variable-length record if needed: - read the header, allocate buffer of record length, read whole - record. - */ - record= log_read_next_record(); } + translog_free_record_header(&rec); /* - Earlier or here, create true transactions in TM. - If done earlier, note that TM should not wake up the delete/update handler - when it receives a commit info, as existing REDO for purge may exist in - the log, and so the delete/update handler may do changes which conflict - with these REDOs. - Even if done here, better to not wake it up now as we're going to free the - page cache. + So we have applied all REDOs. + We may now have unfinished transactions. + I don't think it's this program's job to roll them back: + to roll back and at the same time stay idempotent, it needs to write log + records (without CLRs, 2nd rollback would hit the effects of first + rollback and fail). But this standalone tool is not allowed to write to + the server's transaction log. So we do not roll back anything. + In the real Recovery code, or the code to do "recover after online + backup", yes we will roll back. + */ + if (end_of_redo_phase()) + goto err; + goto end; +err: + error= 1; + fprintf(tracef, "Recovery of tables with transaction logs FAILED\n"); +end: + my_free((gptr)all_tables, MYF(MY_ALLOW_ZERO_PTR)); + my_free((gptr)all_active_trans, MYF(MY_ALLOW_ZERO_PTR)); + my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR)); + log_record_buffer.str= NULL; + log_record_buffer.length= 0; + DBUG_RETURN(error); +} + + +/* very basic info about the record's header */ +static void display_record_position(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec, + uint number) +{ + /* + if number==0, we're going over records which we had already seen and which + form a group, so we indent below the group's end record + */ + fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n", + number ? "" : " ", number, + (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn), + rec->short_trid, log_desc->name, rec->type, + (ulong)rec->record_length); +} + + +static int display_and_apply_record(const LOG_DESC *log_desc, + const TRANSLOG_HEADER_BUFFER *rec) +{ + int error; + if (log_desc->record_execute_in_redo_phase == NULL) + { + /* die on all not-yet-handled records :) */ + DBUG_ASSERT("one more hook" == "to write"); + return 1; + } + if ((error= (*log_desc->record_execute_in_redo_phase)(rec))) + fprintf(tracef, "Got error when executing record\n"); + return error; +} + + +prototype_exec_hook(LONG_TRANSACTION_ID) +{ + uint16 sid= rec->short_trid; + TrID long_trid= all_active_trans[sid].long_trid; + /* abort group of this trn (must be of before a crash) */ + LSN gslsn= all_active_trans[sid].group_start_lsn; + char llbuf[22]; + if (gslsn != LSN_IMPOSSIBLE) + { + fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n", + (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + } + if (long_trid != 0) + { + LSN ulsn= all_active_trans[sid].undo_lsn; + if (ulsn != LSN_IMPOSSIBLE) + { + llstr(long_trid, llbuf); + fprintf(tracef, "Found an old transaction long_trid %s short_trid %u" + " with same short id as this new transaction, and has neither" + " committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf, + sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn)); + goto err; + } + } + long_trid= uint6korr(rec->header); + all_active_trans[sid].long_trid= long_trid; + llstr(long_trid, llbuf); + fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid); + goto end; +err: + ALERT_USER(); + return 1; +end: + return 0; +} + + +#ifdef MARIA_CHECKPOINT +prototype_exec_hook(CHECKPOINT) +{ + /* the only checkpoint we care about was found via control file, ignore */ + return 0; +} +#endif + + +prototype_exec_hook(REDO_CREATE_TABLE) +{ + File dfile= -1, kfile= -1; + char *linkname_ptr, filename[FN_REFLEN]; + char *name, *ptr; + myf create_flag; + uint flags; + int error= 1, create_mode= O_RDWR | O_TRUNC; + MARIA_HA *info= NULL; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + fprintf(tracef, "Failed to read record\n"); + goto end; + } + name= log_record_buffer.str; + fprintf(tracef, "Table '%s'", name); + /* we try hard to get create_rename_lsn, to avoid mistakes if possible */ + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + /* check that we're not already using it */ + DBUG_ASSERT(share->reopen == 1); + DBUG_ASSERT(share->now_transactional == share->base.born_transactional); + if (!share->base.born_transactional) + { + /* + could be that transactional table was later dropped, and a non-trans + one was renamed to its name, thus create_rename_lsn is 0 and should + not be trusted. + */ + fprintf(tracef, ", is not transactional\n"); + ALERT_USER(); + error= 0; + goto end; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record", + (ulong) LSN_FILE_NO(rec->lsn), + (ulong) LSN_OFFSET(rec->lsn)); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + fprintf(tracef, ", is crashed, overwriting it"); + ALERT_USER(); + } + maria_close(info); + info= NULL; + } + /* if does not exist, is older, or its header is corrupted, overwrite it */ + // TODO symlinks + ptr= name + strlen(name) + 1; + if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0)) + fprintf(tracef, ", we will only touch index file"); + fn_format(filename, name, "", MARIA_NAME_IEXT, + (MY_UNPACK_FILENAME | + (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) | + MY_APPEND_EXT); + linkname_ptr= NULL; + create_flag= MY_DELETE_OLD; + fprintf(tracef, ", creating as '%s'", filename); + if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, + MYF(MY_WME|create_flag))) < 0) + { + fprintf(tracef, "Failed to create index file\n"); + goto end; + } + ptr++; + uint kfile_size_before_extension= uint2korr(ptr); + ptr+= 2; + uint keystart= uint2korr(ptr); + ptr+= 2; + /* set create_rename_lsn (for maria_read_log to be idempotent) */ + lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn); + /* we also set is_of_lsn, like maria_create() does */ + lsn_store(ptr + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE, + rec->lsn); + if (my_pwrite(kfile, ptr, + kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) || + my_chsize(kfile, keystart, 0, MYF(MY_WME))) + { + fprintf(tracef, "Failed to write to index file\n"); + goto end; + } + if (!(flags & HA_DONT_TOUCH_DATA)) + { + fn_format(filename,name,"", MARIA_NAME_DEXT, + MY_UNPACK_FILENAME | MY_APPEND_EXT); + linkname_ptr= NULL; + create_flag=MY_DELETE_OLD; + if (((dfile= + my_create_with_symlink(linkname_ptr, filename, 0, create_mode, + MYF(MY_WME | create_flag))) < 0) || + my_close(dfile, MYF(MY_WME))) + { + fprintf(tracef, "Failed to create data file\n"); + goto end; + } + /* + we now have an empty data file. To be able to + _ma_initialize_data_file() we need some pieces of the share to be + correctly filled. So we just open the table (fortunately, an empty + data file does not preclude this). + */ + if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || + _ma_initialize_data_file(info->s, info->dfile.file)) + { + fprintf(tracef, "Failed to open new table or write to data file\n"); + goto end; + } + } + error= 0; +end: + fprintf(tracef, "\n"); + if (kfile >= 0) + error|= my_close(kfile, MYF(MY_WME)); + if (info != NULL) + error|= maria_close(info); + return error; +} + + +prototype_exec_hook(REDO_DROP_TABLE) +{ + char *name; + int error= 1; + MARIA_HA *info= NULL; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + fprintf(tracef, "Failed to read record\n"); + goto end; + } + name= log_record_buffer.str; + fprintf(tracef, "Table '%s'", name); + info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); + if (info) + { + MARIA_SHARE *share= info->s; + /* + We may have open instances on this table. But it does not matter, the + maria_extra() below will take care of them. + */ + if (!share->base.born_transactional) + { + fprintf(tracef, ", is not transactional\n"); + ALERT_USER(); + error= 0; + goto end; + } + if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) + { + fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record", + (ulong) LSN_FILE_NO(rec->lsn), + (ulong) LSN_OFFSET(rec->lsn)); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + fprintf(tracef, ", is crashed, dropping it"); + ALERT_USER(); + } + /* + This maria_extra() call serves to signal that old open instances of + this table should not be used anymore, and (only on Windows) to close + open files so they can be deleted + */ + if (maria_extra(info, HA_EXTRA_PREPARE_FOR_DELETE, NULL) || + maria_close(info)) + goto end; + info= NULL; + } + /* if does not exist, is older, or its header is corrupted, drop it */ + fprintf(tracef, ", dropping '%s'", name); + if (maria_delete_table(name)) + { + fprintf(tracef, "Failed to drop table\n"); + goto end; + } + error= 0; +end: + fprintf(tracef, "\n"); + if (info != NULL) + error|= maria_close(info); + return error; +} + + +prototype_exec_hook(FILE_ID) +{ + uint16 sid; + int error= 1; + char *name, *buff; + MARIA_HA *info= NULL; + MARIA_SHARE *share; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + fprintf(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + sid= fileid_korr(buff); + name= buff + FILEID_STORE_SIZE; + info= all_tables[sid]; + if (info != NULL) + { + all_tables[sid]= NULL; + if (close_recovered_table(info)) + { + fprintf(tracef, "Failed to close table\n"); + goto end; + } + } + fprintf(tracef, "Table '%s', id %u", name, sid); + info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR); + if (info == NULL) + { + fprintf(tracef, ", is absent (must have been dropped later?)" + " or its header is so corrupted that we cannot open it;" + " we skip it\n"); + error= 0; + goto end; + } + if (maria_is_crashed(info)) + { + fprintf(tracef, "Table is crashed, can't apply log records to it\n"); + goto end; + /* + we should make an exception for REDO_REPAIR_TABLE records: if we want to + execute them, we should not reject the crashed table here. + */ + } + share= info->s; + /* check that we're not already using it */ + DBUG_ASSERT(share->reopen == 1); + DBUG_ASSERT(share->now_transactional == share->base.born_transactional); + if (!share->base.born_transactional) + { + fprintf(tracef, ", is not transactional\n"); + ALERT_USER(); + error= 0; + goto end; + } + all_tables[sid]= info; + /* don't log any records for this work */ + _ma_tmp_disable_logging_for_table(share); + /* execution of some REDO records relies on data_file_length */ + my_off_t dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME)); + my_off_t kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME)); + if ((dfile_len == MY_FILEPOS_ERROR) || + (kfile_len == MY_FILEPOS_ERROR)) + { + fprintf(tracef, ", length unknown\n"); + goto end; + } + share->state.state.data_file_length= dfile_len; + share->state.state.key_file_length= kfile_len; + if ((dfile_len == 0) || ((dfile_len % share->block_size) > 0)) + { + fprintf(tracef, ", has too short last page\n"); + /* Recovery will fix this, no error */ + ALERT_USER(); + } + fprintf(tracef, ", opened\n"); + error= 0; +end: + if (error && info != NULL) + error|= maria_close(info); + return error; +} + + +prototype_exec_hook(REDO_INSERT_ROW_HEAD) +{ + int error= 1; + byte *buff= NULL; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + /* + If REDO's LSN is > page's LSN (read from disk), we are going to modify the + page and change its LSN. The normal runtime code stores the UNDO's LSN + into the page. Here storing the REDO's LSN (rec->lsn) would work + (we are not writing to the log here, so don't have to "flush up to UNDO's + LSN"). But in a test scenario where we do updates at runtime, then remove + tables, apply the log and check that this results in the same table as at + runtime, putting the same LSN as runtime had done will decrease + differences. So we use the UNDO's LSN which is current_group_end_lsn. + */ + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + fprintf(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, + HEAD_PAGE, + buff + FILEID_STORE_SIZE, + buff + + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE, + rec->record_length - + (FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE))) + goto end; + error= 0; +end: + return error; +} + + +prototype_exec_hook(REDO_INSERT_ROW_TAIL) +{ + int error= 1; + byte *buff= NULL; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + enlarge_buffer(rec); + if (log_record_buffer.str == NULL || + translog_read_record(rec->lsn, 0, rec->record_length, + log_record_buffer.str, NULL) != + rec->record_length) + { + fprintf(tracef, "Failed to read record\n"); + goto end; + } + buff= log_record_buffer.str; + if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn, + TAIL_PAGE, + buff + FILEID_STORE_SIZE, + buff + + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE, + rec->record_length - + (FILEID_STORE_SIZE + + PAGE_STORE_SIZE + + DIRPOS_STORE_SIZE))) + goto end; + error= 0; + +end: + return error; +} + + +prototype_exec_hook(REDO_PURGE_ROW_HEAD) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, + HEAD_PAGE, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_exec_hook(REDO_PURGE_ROW_TAIL) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn, + TAIL_PAGE, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_exec_hook(REDO_PURGE_BLOCKS) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn, + rec->header + FILEID_STORE_SIZE)) + goto end; + error= 0; +end: + return error; +} + + +prototype_exec_hook(REDO_DELETE_ALL) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); + if (info == NULL) + goto end; + fprintf(tracef, " deleting all %lu rows\n", + (ulong)info->s->state.state.records); + if (maria_delete_all_rows(info)) + goto end; + error= 0; +end: + return error; +} + + +prototype_exec_hook(UNDO_ROW_INSERT) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + if (info == NULL) + goto end; + all_active_trans[rec->short_trid].undo_lsn= rec->lsn; + /* + todo: instead of above, call write_hook_for_undo, it will also set + first_undo_lsn + */ + /* + in an upcoming patch ("recovery of the state"), we introduce + state.is_of_lsn. For now, we just assume the state is old (true when we + recreate tables from scratch - but not idempotent). + */ + { + fprintf(tracef, " state older than record, updating rows' count\n"); + info->s->state.state.records++; + } + fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); + error= 0; +end: + return error; +} + + +prototype_exec_hook(UNDO_ROW_DELETE) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + if (info == NULL) + goto end; + all_active_trans[rec->short_trid].undo_lsn= rec->lsn; + /* + todo: instead of above, call write_hook_for_undo, it will also set + first_undo_lsn + */ + { + fprintf(tracef, " state older than record, updating rows' count\n"); + info->s->state.state.records--; + } + fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); + error= 0; +end: + return error; +} + + +prototype_exec_hook(UNDO_ROW_PURGE) +{ + int error= 1; + MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); + if (info == NULL) + goto end; + /* this a bit broken, but this log record type will be deleted soon */ + all_active_trans[rec->short_trid].undo_lsn= rec->lsn; + /* + todo: instead of above, call write_hook_for_undo, it will also set + first_undo_lsn + */ + { + fprintf(tracef, " state older than record, updating rows' count\n"); + info->s->state.state.records--; + } + fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); + error= 0; +end: + return error; +} + + +prototype_exec_hook(COMMIT) +{ + uint16 sid= rec->short_trid; + TrID long_trid= all_active_trans[sid].long_trid; + LSN gslsn= all_active_trans[sid].group_start_lsn; + char llbuf[22]; + if (long_trid == 0) + { + fprintf(tracef, "We don't know about transaction with short_trid %u;" + "it probably committed long ago, forget it\n", sid); + return 0; + } + llstr(long_trid, llbuf); + fprintf(tracef, "Transaction long_trid %s short_trid %u committed", llbuf, sid); + if (gslsn != LSN_IMPOSSIBLE) + { + /* + It's not an error, it may be that trn got a disk error when writing to a + table, so an unfinished group staid in the log. + */ + fprintf(tracef, ", with group at LSN (%lu,0x%lx) short_trid %u aborted\n", + (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); + all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; + } + else + fprintf(tracef, "\n"); + bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); +#ifdef MARIA_VERSIONING + /* + if real recovery: + transaction was committed, move it to some separate list for later + purging (but don't purge now! purging may have been started before, we + may find REDO_PURGE records soon). + */ +#endif + return 0; +} + + +/* Just to inform about any aborted groups or unfinished transactions */ +static int end_of_redo_phase() +{ + uint sid, unfinished= 0, error= 0; + for (sid= 0; sid <= SHORT_TRID_MAX; sid++) + { + TrID long_trid= all_active_trans[sid].long_trid; + LSN gslsn= all_active_trans[sid].group_start_lsn; + if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE) + { + char llbuf[22]; + llstr(long_trid, llbuf); + fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n", + llbuf, sid); + unfinished++; + } + if (gslsn != LSN_IMPOSSIBLE) + { + fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n", + (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); + ALERT_USER(); + } + /* If real recovery: roll back unfinished transaction */ +#ifdef MARIA_VERSIONING + /* + If real recovery: transaction was committed, move it to some separate + list for soon purging. Create TRNs. + */ +#endif + } + /* + We don't close tables if there are some unfinished transactions, because + closing tables normally requires that all unfinished transactions on them + be rolled back. Unfinished transactions are symptom of a crash, we + reproduce the crash. + For example, closing will soon write the state to disk and when doing that + it will think this is a committed state, but it may not be. + */ + if (unfinished > 0) + fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be" + " left inconsistent!\n", unfinished); + for (sid= 0; sid <= SHARE_ID_MAX; sid++) + { + MARIA_HA *info= all_tables[sid]; + if (info != NULL) + { + /* if error, still close other tables */ + error|= close_recovered_table(info); + } + } + return error; +} + + +static int close_recovered_table(MARIA_HA *info) +{ + int error; + MARIA_SHARE *share= info->s; + fprintf(tracef, " Closing table '%s'\n", share->open_file_name); + _ma_reenable_logging_for_table(share); + /* + Recovery normally corrected problems, don't scare user with "table was not + closed properly" in CHECK TABLE and don't automatically check table at + next open (when we have --maria-recover). + */ + share->state.open_count= share->global_changed ? 1 : 0; + /* this var is set only by non-recovery operations (mi_write() etc) */ + DBUG_ASSERT(!share->global_changed); + if ((error= maria_close(info))) + fprintf(tracef, "Failed to close table\n"); + return error; +} + + +static MARIA_HA *get_MARIA_HA_from_REDO_record(const + TRANSLOG_HEADER_BUFFER *rec) +{ + uint16 sid; + ulonglong page; + MARIA_HA *info; + char llbuf[22]; + + sid= fileid_korr(rec->header); + page= page_korr(rec->header + FILEID_STORE_SIZE); + /* BUG not correct for REDO_PURGE_BLOCKS, page is not at this pos */ + llstr(page, llbuf); + fprintf(tracef, " For page %s of table of short id %u", llbuf, sid); + info= all_tables[sid]; + if (info == NULL) + { + fprintf(tracef, ", table skipped, so skipping record\n"); + return NULL; + } + fprintf(tracef, ", '%s'", info->s->open_file_name); + /* detect if an open instance of a dropped table (internal bug) */ + DBUG_ASSERT(info->s->last_version != 0); + if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) + { + fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log" + " record\n", + (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); + return NULL; + } + fprintf(tracef, ", applying record\n"); + return info; + /* + Soon we will also skip the page depending on the rec_lsn for this page in + the checkpoint record, but this is not absolutely needed for now (just + assume we have made no checkpoint). Btw rec_lsn and bitmap's recovery is a + an unsolved problem (rec_lsn is to ignore a REDO without reading the data + page and to do so we need to be sure the corresponding bitmap page does + not need a _ma_bitmap_set()). + */ +} + + +static MARIA_HA *get_MARIA_HA_from_UNDO_record(const + TRANSLOG_HEADER_BUFFER *rec) +{ + uint16 sid; + MARIA_HA *info; + + sid= fileid_korr(rec->header + LSN_STORE_SIZE); + fprintf(tracef, " For table of short id %u", sid); + info= all_tables[sid]; + if (info == NULL) + { + fprintf(tracef, ", table skipped, so skipping record\n"); + return NULL; + } + fprintf(tracef, ", '%s'", info->s->open_file_name); + DBUG_ASSERT(info->s->last_version != 0); + if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) + { + fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log" + " record\n", + (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); + return NULL; + } + fprintf(tracef, ", applying record\n"); + return info; + /* + Soon we will also skip the page depending on the rec_lsn for this page in + the checkpoint record, but this is not absolutely needed for now (just + assume we have made no checkpoint). + */ +} + + + + +/* some comments and pseudo-code which we keep for later */ +#if 0 + /* MikaelR suggests: support checkpoints during REDO phase too: do checkpoint after a certain amount of log records have been executed. This helps against repeated crashes. Those checkpoints could not be user-requested @@ -214,8 +1131,7 @@ int recovery() /**** UNDO PHASE *****/ - print_information_to_error_log(nb of trans to roll back, nb of prepared trans); - + print_information_to_error_log(nb of trans to roll back, nb of prepared trans /* Launch one or more threads to do the background rollback. Don't wait for them to complete their rollback (background rollback; for debugging, we @@ -265,3 +1181,4 @@ pthread_handler_decl rollback_background_thread() unlock_mutex(rollback_threads); pthread_exit(); } +#endif diff --git a/storage/maria/ma_recovery.h b/storage/maria/ma_recovery.h index 42c5071babd..0b576efc95f 100644 --- a/storage/maria/ma_recovery.h +++ b/storage/maria/ma_recovery.h @@ -22,4 +22,8 @@ /* This is the interface of this module. */ /* Performs recovery of the engine at start */ -int recovery(); + +C_MODE_START +int maria_recover(); +int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file); +C_MODE_END diff --git a/storage/maria/ma_rename.c b/storage/maria/ma_rename.c index 9dd75705229..6250b781a68 100644 --- a/storage/maria/ma_rename.c +++ b/storage/maria/ma_rename.c @@ -62,8 +62,8 @@ int maria_rename(const char *old_name, const char *new_name) this is important; make sure transactionality has been re-enabled. */ DBUG_ASSERT(share->now_transactional == share->base.born_transactional); - sync_dir= (share->now_transactional && !share->temporary) ? - MY_SYNC_DIR : 0; + sync_dir= (share->now_transactional && !share->temporary && + !maria_in_recovery) ? MY_SYNC_DIR : 0; if (sync_dir) { uchar log_data[2 + 2]; diff --git a/storage/maria/ma_test2.c b/storage/maria/ma_test2.c index 585a78b753b..8ab22c60d40 100644 --- a/storage/maria/ma_test2.c +++ b/storage/maria/ma_test2.c @@ -47,7 +47,7 @@ static void copy_key(struct st_maria_info *info,uint inx, static int verbose=0,testflag=0, first_key=0,async_io=0,pagecacheing=0,write_cacheing=0,locking=0, rec_pointer_size=0,pack_fields=1,silent=0, - opt_quick_mode=0, transactional= 0; + opt_quick_mode=0, transactional= 0, skip_update= 0; static int pack_seg=HA_SPACE_PACK,pack_type=HA_PACK_KEY,remove_count=-1; static int create_flag= 0, srand_arg= 0; static ulong pagecache_size=IO_SIZE*16; @@ -84,7 +84,24 @@ int main(int argc, char *argv[]) if (! async_io) my_disable_async_io=1; - maria_init(); + maria_data_root= "."; + /* Maria requires that we always have a page cache */ + if (maria_init() || + (init_pagecache(maria_pagecache, pagecache_size, 0, 0, + maria_block_size) == 0) || + ma_control_file_create_or_open(TRUE) || + (init_pagecache(maria_log_pagecache, + TRANSLOG_PAGECACHE_SIZE, 0, 0, + TRANSLOG_PAGE_SIZE) == 0) || + translog_init(maria_data_root, TRANSLOG_FILE_SIZE, + 0, 0, maria_log_pagecache, + TRANSLOG_DEFAULT_FLAGS) || + (transactional && trnman_init())) + { + fprintf(stderr, "Error in initialization"); + exit(1); + } + reclength=STANDARD_LENGTH+60+(use_blob ? 8 : 0); blob_pos=STANDARD_LENGTH+60; keyinfo[0].seg= &glob_keyseg[0][0]; @@ -220,22 +237,6 @@ int main(int argc, char *argv[]) goto err; if (!silent) printf("- Writing key:s\n"); - maria_data_root= "."; - /* Maria requires that we always have a page cache */ - if ((init_pagecache(maria_pagecache, pagecache_size, 0, 0, - maria_block_size) == 0) || - ma_control_file_create_or_open(TRUE) || - (init_pagecache(maria_log_pagecache, - TRANSLOG_PAGECACHE_SIZE, 0, 0, - TRANSLOG_PAGE_SIZE) == 0) || - translog_init(maria_data_root, TRANSLOG_FILE_SIZE, - 0, 0, maria_log_pagecache, - TRANSLOG_DEFAULT_FLAGS)) - { - fprintf(stderr, "Error in initialization"); - exit(1); - } - if (locking) maria_lock_database(file,F_WRLCK); if (write_cacheing) @@ -246,6 +247,14 @@ int main(int argc, char *argv[]) for (i=0 ; i < recant ; i++) { ulong blob_length; +#if 0 + /* + Starting from i==72, there was a difference between runtime and + log-appplying. This is now fixed, by not using non_header_data_len in + log-applying. + */ + if (i == 72) goto end; +#endif n1=rnd(1000); n2=rnd(100); n3=rnd(5000); sprintf(record,"%6d:%4d:%8d:Pos: %4d ",n1,n2,n3,write_count); int4store(record+STANDARD_LENGTH-4,(long) i); @@ -260,7 +269,7 @@ int main(int argc, char *argv[]) printf("Error: %d in write at record: %d\n",my_errno,i); goto err; } - if (verbose) printf(" Double key: %d\n",n3); + if (verbose) printf(" Double key: %d at record# %d\n", n3, i); } else { @@ -294,7 +303,7 @@ int main(int argc, char *argv[]) if (maria_extra(file,HA_EXTRA_NO_CACHE,0)) { puts("got error from maria_extra(HA_EXTRA_NO_CACHE)"); - goto end; + goto err; } } #ifdef REMOVE_WHEN_WE_HAVE_RESIZE @@ -376,6 +385,8 @@ int main(int argc, char *argv[]) else bmove(record+blob_pos,read_record+blob_pos,8); } + if (skip_update) + continue; if (maria_update(file,read_record,record2)) { if (my_errno != HA_ERR_FOUND_DUPP_KEY || key3[n3] == 0) @@ -423,7 +434,7 @@ int main(int argc, char *argv[]) if (memcmp(read_record,read_record2,reclength) != 0) { printf("maria_rsame didn't find same record\n"); - goto end; + goto err; } info.recpos=maria_position(file); if (maria_rfirst(file,read_record2,0) || @@ -431,7 +442,7 @@ int main(int argc, char *argv[]) memcmp(read_record,read_record2,reclength) != 0) { printf("maria_rsame_with_pos didn't find same record\n"); - goto end; + goto err; } { info.recpos= maria_position(file); @@ -442,7 +453,7 @@ int main(int argc, char *argv[]) info.recpos != maria_position(file)) { printf("maria_rsame_with_pos lost position\n"); - goto end; + goto err; } } ant=1; @@ -451,7 +462,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys) { printf("next: Found: %d keys of %d\n",ant,dupp_keys); - goto end; + goto err; } ant=0; while (maria_rprev(file,read_record3,0) == 0 && @@ -459,7 +470,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys) { printf("prev: Found: %d records of %d\n",ant,dupp_keys); - goto end; + goto err; } /* Check of maria_rnext_same */ @@ -471,7 +482,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys || my_errno != HA_ERR_END_OF_FILE) { printf("maria_rnext_same: Found: %d records of %d\n",ant,dupp_keys); - goto end; + goto err; } } @@ -482,7 +493,7 @@ int main(int argc, char *argv[]) if (maria_rfirst(file,read_record,0)) { printf("Can't find first record\n"); - goto end; + goto err; } while ((error=maria_rnext(file,read_record3,0)) == 0 && ant < write_count+10) ant++; @@ -490,7 +501,7 @@ int main(int argc, char *argv[]) { printf("next: I found: %d records of %d (error: %d)\n", ant, write_count - opt_delete, error); - goto end; + goto err; } if (maria_rlast(file,read_record2,0) || bcmp(read_record2,read_record3,reclength)) @@ -498,7 +509,7 @@ int main(int argc, char *argv[]) printf("Can't find last record\n"); DBUG_DUMP("record2",(uchar*) read_record2,reclength); DBUG_DUMP("record3",(uchar*) read_record3,reclength); - goto end; + goto err; } ant=1; while (maria_rprev(file,read_record3,0) == 0 && ant < write_count+10) @@ -506,12 +517,12 @@ int main(int argc, char *argv[]) if (ant != write_count - opt_delete) { printf("prev: I found: %d records of %d\n",ant,write_count); - goto end; + goto err; } if (bcmp(read_record,read_record3,reclength)) { printf("Can't find first record\n"); - goto end; + goto err; } if (!silent) @@ -552,7 +563,7 @@ int main(int argc, char *argv[]) if (bcmp(read_record+start,key,(uint) i)) { puts("Didn't find right record"); - goto end; + goto err; } } if (dupp_keys > 2) @@ -570,7 +581,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys-1) { printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-1); - goto end; + goto err; } } if (dupp_keys>4) @@ -588,7 +599,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys-2) { printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-2); - goto end; + goto err; } } if (dupp_keys > 6) @@ -607,7 +618,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys-3) { printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-3); - goto end; + goto err; } if (!silent) @@ -622,7 +633,7 @@ int main(int argc, char *argv[]) if (ant != dupp_keys-4) { printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-4); - goto end; + goto err; } } @@ -655,7 +666,7 @@ int main(int argc, char *argv[]) if (bcmp(read_record,read_record2,reclength) != 0) { printf("maria_rsame didn't find same record\n"); - goto end; + goto err; } } if (!silent) @@ -682,7 +693,7 @@ int main(int argc, char *argv[]) { printf("maria_records_range returned %ld; Should be about %ld\n", (long) range_records,(long) info.records); - goto end; + goto err; } if (verbose) { @@ -719,7 +730,7 @@ int main(int argc, char *argv[]) { printf("maria_records_range for key: %d returned %lu; Should be about %lu\n", i, (ulong) range_records, (ulong) records); - goto end; + goto err; } if (verbose && records) { @@ -740,6 +751,7 @@ int main(int argc, char *argv[]) puts("Wrong info from maria_info"); printf("Got: records: %lu delete: %lu i_keys: %d\n", (ulong) info.records, (ulong) info.deleted, info.keys); + goto err; } if (verbose) { @@ -764,7 +776,7 @@ int main(int argc, char *argv[]) if (locking || (!use_blob && !pack_fields)) { puts("got error from maria_extra(HA_EXTRA_CACHE)"); - goto end; + goto err; } } ant=0; @@ -777,12 +789,12 @@ int main(int argc, char *argv[]) { printf("scan with cache: I can only find: %d records of %d\n", ant,write_count-opt_delete); - goto end; + goto err; } if (maria_extra(file,HA_EXTRA_NO_CACHE,0)) { puts("got error from maria_extra(HA_EXTRA_NO_CACHE)"); - goto end; + goto err; } ant=0; @@ -794,7 +806,7 @@ int main(int argc, char *argv[]) { printf("scan with cache: I can only find: %d records of %d\n", ant,write_count-opt_delete); - goto end; + goto err; } if (testflag == 4) @@ -852,6 +864,15 @@ int main(int argc, char *argv[]) goto err; } opt_delete++; +#if 0 + / + /* + 179 is ok, 180 causes a difference between runtime and log-applying. + This is now fixed (we zero the last directory entry during + log-applying, just to eliminate this irrelevant difference). + */ + if (opt_delete==180) goto end; +#endif } else found_parts++; @@ -1021,6 +1042,9 @@ static void get_options(int argc, char **argv) case 'D': create_flag|=HA_CREATE_DELAY_KEY_WRITE; break; + case 'g': + skip_update= TRUE; + break; case '?': case 'I': case 'V': diff --git a/storage/maria/ma_test_all.sh b/storage/maria/ma_test_all.sh index a6786315afe..e8b9f1cef9a 100755 --- a/storage/maria/ma_test_all.sh +++ b/storage/maria/ma_test_all.sh @@ -6,6 +6,9 @@ # If you want to run this in Valgrind, you should use --trace-children=yes, # so that it detects problems in ma_test* and not in the shell script +# Running in a "shared memory" disk is 10 times faster; you can do +# mkdir /dev/shm/test; cd /dev/shm/test; maria_path= + # Remove # from following line if you need some more information #set -x -v -e @@ -21,6 +24,7 @@ fi # Delete temporary files rm -f *.TMD +rm -f maria_log* run_tests() { @@ -211,8 +215,14 @@ echo "$maria_path/maria_chk$suffix -sm test2 will warn that 'Datafile is almost $maria_path/maria_chk$suffix -sm test2 >ma_test2_message.txt 2>&1 cat ma_test2_message.txt grep "warning: Datafile is almost full" ma_test2_message.txt >/dev/null +rm -f ma_test2_message.txt $maria_path/maria_chk$suffix -ssm test2 +# +# Test that removing tables and applying the log leads to identical tables +# +/bin/sh $maria_path/ma_test_recovery + # # Some timing tests # diff --git a/storage/maria/ma_test_recovery b/storage/maria/ma_test_recovery new file mode 100644 index 00000000000..3393b932e18 --- /dev/null +++ b/storage/maria/ma_test_recovery @@ -0,0 +1,37 @@ +set -e + +if [ -z "$maria_path" ] +then + maria_path="." +fi + +echo "MARIA RECOVERY TESTS - success is if exit code is 0" + +# runs a program inserting/deleting rows, then moves the resulting table +# elsewhere; applies the log and checks that the data file is +# identical to the saved original. +# Does not test the index file as we don't have logging for it yet. + +rm -f maria_log* +prog="$maria_path/ma_test1 -M -T --skip-update" +echo "TEST WITH $prog" +$prog +mv -f test1.MAD test1.MAD.good +rm test1.MAI +echo "applying log" +$maria_path/maria_read_log -a > /dev/null +cmp test1.MAD test1.MAD.good +rm -f test1.* + +rm -f maria_log* +prog="$maria_path/ma_test2 -s -L -K -W -P -M -T -g" +echo "TEST WITH $prog" +$prog +mv -f test2.MAD test2.MAD.good +rm test2.MAI +echo "applying log" +$maria_path/maria_read_log -a > /dev/null +cmp test2.MAD test2.MAD.good +rm -f test2.* + +echo "ALL RECOVERY TESTS OK" diff --git a/storage/maria/maria_read_log.c b/storage/maria/maria_read_log.c index dd130f287f5..f8bfeb24826 100644 --- a/storage/maria/maria_read_log.c +++ b/storage/maria/maria_read_log.c @@ -14,7 +14,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "maria_def.h" -#include +#include "ma_recovery.h" #include #define PCACHE_SIZE (1024*1024*10) @@ -32,60 +32,6 @@ const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace"; #endif /* DBUG_OFF */ static my_bool opt_only_display, opt_display_and_apply; -struct TRN_FOR_RECOVERY -{ - LSN group_start_lsn, undo_lsn; - TrID long_trid; -}; - -struct TRN_FOR_RECOVERY all_active_trans[SHORT_TRID_MAX + 1]; -MARIA_HA *all_tables[SHORT_TRID_MAX + 1]; -LSN current_group_end_lsn= LSN_IMPOSSIBLE; - -static void end_of_redo_phase(); -static void display_record_position(const LOG_DESC *log_desc, - const TRANSLOG_HEADER_BUFFER *rec, - uint number); -static int display_and_apply_record(const LOG_DESC *log_desc, - const TRANSLOG_HEADER_BUFFER *rec); -#define prototype_exec_hook(R) \ -static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) -prototype_exec_hook(LONG_TRANSACTION_ID); -#ifdef MARIA_CHECKPOINT -prototype_exec_hook(CHECKPOINT); -#endif -prototype_exec_hook(REDO_CREATE_TABLE); -prototype_exec_hook(FILE_ID); -prototype_exec_hook(REDO_INSERT_ROW_HEAD); -prototype_exec_hook(REDO_INSERT_ROW_TAIL); -prototype_exec_hook(REDO_PURGE_ROW_HEAD); -prototype_exec_hook(REDO_PURGE_ROW_TAIL); -prototype_exec_hook(UNDO_ROW_INSERT); -prototype_exec_hook(UNDO_ROW_DELETE); -prototype_exec_hook(COMMIT); - - -/* - TODO: Avoid mallocs in exec. - - Proposed fix: - Add either a context/buffer argument to all exec_hook functions - or add 'record_buffer' and 'record_buffer_length' to - TRANSLOG_HEADER_BUFFER. - With this we could use my_realloc() instead of my_malloc() to - allocate data and save some mallocs. -*/ - -/* - To implement REDO_DROP_TABLE and REDO_RENAME_TABLE, we would need to go - through the all_tables[] array, find all open instances of the - table-to-drop-or-rename, and remove them from the array. - We however know that in real Recovery, we don't have to handle those log - records at all, same for REDO_CREATE_TABLE. - So for now, we can use this program to replay/debug a sequence of CREATE + - DMLs, but not DROP/RENAME; it is probably enough for a start. -*/ - int main(int argc, char **argv) { LSN lsn; @@ -97,6 +43,7 @@ int main(int argc, char **argv) get_options(&argc, &argv); maria_data_root= "."; + maria_in_recovery= TRUE; if (maria_init()) { @@ -114,6 +61,8 @@ int main(int argc, char **argv) fprintf(stderr, "Can't find any log\n"); goto err; } + /* same page cache for log and data; assumes same page size... */ + DBUG_ASSERT(maria_block_size == TRANSLOG_PAGE_SIZE); if (init_pagecache(maria_pagecache, PCACHE_SIZE, 0, 0, TRANSLOG_PAGE_SIZE) == 0) { @@ -133,147 +82,22 @@ int main(int argc, char **argv) goto err; } - /* install hooks for execution */ -#define install_exec_hook(R) \ - log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ - exec_LOGREC_ ## R; - install_exec_hook(LONG_TRANSACTION_ID); -#ifdef MARIA_CHECKPOINT - install_exec_hook(CHECKPOINT); -#endif - install_exec_hook(REDO_CREATE_TABLE); - install_exec_hook(FILE_ID); - install_exec_hook(REDO_INSERT_ROW_HEAD); - install_exec_hook(REDO_INSERT_ROW_TAIL); - install_exec_hook(REDO_PURGE_ROW_HEAD); - install_exec_hook(REDO_PURGE_ROW_TAIL); - install_exec_hook(UNDO_ROW_INSERT); - install_exec_hook(UNDO_ROW_DELETE); - install_exec_hook(COMMIT); - if (opt_only_display) printf("You are using --only-display, NOTHING will be written to disk\n"); - lsn= first_lsn_in_log(); /*could also be last_checkpoint_lsn */ + lsn= first_lsn_in_log(); /* LSN could be also --start-from-lsn=# */ - TRANSLOG_HEADER_BUFFER rec; - struct st_translog_scanner_data scanner; - uint i= 1; - - translog_size_t len= translog_read_record_header(lsn, &rec); - - if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) - { - printf("EOF on the log\n"); - goto end; - } - - if (translog_init_scanner(lsn, 1, &scanner)) - { - fprintf(stderr, "Scanner init failed\n"); + fprintf(stdout, "TRACE of the last maria_read_log\n"); + if (maria_apply_log(lsn, opt_display_and_apply, stdout)) goto err; - } - for (;;i++) - { - uint16 sid= rec.short_trid; - const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type]; - display_record_position(log_desc, &rec, i); + fprintf(stdout, "SUCCESS\n"); - /* - A complete group is a set of log records with an "end mark" record - (e.g. a set of REDOs for an operation, terminated by an UNDO for this - operation); if there is no "end mark" record the group is incomplete - and won't be executed. - There are pitfalls: if a table write failed, the transaction may have - put an incomplete group in the log and then a COMMIT record, that will - make a complete group which is wrong. We say that we should mark the - table corrupted if such error happens (what if it cannot be marked?). - */ - if (log_desc->record_ends_group) - { - if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE) - { - /* - There is a complete group for this transaction, containing more than - this event. - */ - printf(" ends a group:\n"); - struct st_translog_scanner_data scanner2; - TRANSLOG_HEADER_BUFFER rec2; - len= - translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2); - if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) - { - fprintf(stderr, "Cannot find record where it should be\n"); - goto err; - } - if (translog_init_scanner(rec2.lsn, 1, &scanner2)) - { - fprintf(stderr, "Scanner2 init failed\n"); - goto err; - } - current_group_end_lsn= rec.lsn; - do - { - if (rec2.short_trid == sid) /* it's in our group */ - { - const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type]; - display_record_position(log_desc2, &rec2, 0); - if (display_and_apply_record(log_desc2, &rec2)) - goto err; - } - len= translog_read_next_record_header(&scanner2, &rec2); - if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) - { - fprintf(stderr, "Cannot find record where it should be\n"); - goto err; - } - } - while (rec2.lsn < rec.lsn); - translog_free_record_header(&rec2); - /* group finished */ - all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; - current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */ - } - if (display_and_apply_record(log_desc, &rec)) - goto err; - } - else /* record does not end group */ - { - /* just record the fact, can't know if can execute yet */ - if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE) - { - /* group not yet started */ - all_active_trans[sid].group_start_lsn= rec.lsn; - } - } - len= translog_read_next_record_header(&scanner, &rec); - if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1)) - { - printf("EOF on the log\n"); - goto end; - } - } - translog_free_record_header(&rec); - - /* - So we have applied all REDOs. - We may now have unfinished transactions. - I don't think it's this program's job to roll them back: - to roll back and at the same time stay idempotent, it needs to write log - records (without CLRs, 2nd rollback would hit the effects of first - rollback and fail). But this standalone tool is not allowed to write to - the server's transaction log. So we do not roll back anything. - In the real Recovery code, or the code to do "recover after online - backup", yes we will roll back. - */ - end_of_redo_phase(); goto end; err: /* don't touch anything more, in case we hit a bug */ exit(1); end: - maria_panic(HA_PANIC_CLOSE); + maria_end(); free_defaults(default_argv); my_end(0); exit(0); @@ -284,11 +108,11 @@ end: static struct my_option my_long_options[] = { {"only-display", 'o', "display brief info about records's header", - (uchar**) &opt_only_display, (uchar**) &opt_only_display, 0, GET_BOOL, NO_ARG, + (gptr*) &opt_only_display, (gptr*) &opt_only_display, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"display-and-apply", 'a', "like --only-display but displays more info and modifies tables", - (uchar**) &opt_display_and_apply, (uchar**) &opt_display_and_apply, 0, + (gptr*) &opt_display_and_apply, (gptr*) &opt_display_and_apply, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, #ifndef DBUG_OFF {"debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", @@ -355,629 +179,3 @@ static void get_options(int *argc,char ***argv) exit(1); } } - - -/* very basic info about the record's header */ -static void display_record_position(const LOG_DESC *log_desc, - const TRANSLOG_HEADER_BUFFER *rec, - uint number) -{ - /* - if number==0, we're going over records which we had already seen and which - form a group, so we indent below the group's end record - */ - printf("%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n", - number ? "" : " ", number, - (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn), - rec->short_trid, log_desc->name, rec->type, - (ulong)rec->record_length); -} - - -static int display_and_apply_record(const LOG_DESC *log_desc, - const TRANSLOG_HEADER_BUFFER *rec) -{ - int error; - if (opt_only_display) - return 0; - if (log_desc->record_execute_in_redo_phase == NULL) - { - /* die on all not-yet-handled records :) */ - DBUG_ASSERT("one more hook" == "to write"); - } - if ((error= (*log_desc->record_execute_in_redo_phase)(rec))) - fprintf(stderr, "Got error when executing record\n"); - return error; -} - - -prototype_exec_hook(LONG_TRANSACTION_ID) -{ - uint16 sid= rec->short_trid; - TrID long_trid= all_active_trans[sid].long_trid; - /* abort group of this trn (must be of before a crash) */ - LSN gslsn= all_active_trans[sid].group_start_lsn; - char llbuf[22]; - if (gslsn != LSN_IMPOSSIBLE) - { - printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n", - (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); - all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; - } - if (long_trid != 0) - { - LSN ulsn= all_active_trans[sid].undo_lsn; - if (ulsn != LSN_IMPOSSIBLE) - { - llstr(long_trid, llbuf); - fprintf(stderr, "Found an old transaction long_trid %s short_trid %u" - " with same short id as this new transaction, and has neither" - " committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf, - sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn)); - goto err; - } - } - long_trid= uint6korr(rec->header); - all_active_trans[sid].long_trid= long_trid; - llstr(long_trid, llbuf); - printf("Transaction long_trid %s short_trid %u starts\n", llbuf, sid); - goto end; -err: - DBUG_ASSERT(0); - return 1; -end: - return 0; -} - - -#ifdef MARIA_CHECKPOINT -prototype_exec_hook(CHECKPOINT) -{ - /* the only checkpoint we care about was found via control file, ignore */ - return 0; -} -#endif - - -prototype_exec_hook(REDO_CREATE_TABLE) -{ - File dfile= -1, kfile= -1; - char *linkname_ptr, filename[FN_REFLEN]; - char *name, *ptr; - myf create_flag; - uint flags; - int error, create_mode= O_RDWR | O_TRUNC; - MARIA_HA *info= NULL; - if (((name= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) || - (translog_read_record(rec->lsn, 0, rec->record_length, name, NULL) != - rec->record_length)) - { - fprintf(stderr, "Failed to read record\n"); - goto err; - } - printf("Table '%s'", name); - /* we try hard to get create_rename_lsn, to avoid mistakes if possible */ - info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); - if (info) - { - MARIA_SHARE *share= info->s; - /* check that we're not already using it */ - DBUG_ASSERT(share->reopen == 1); - DBUG_ASSERT(share->now_transactional == share->base.born_transactional); - if (!share->base.born_transactional) - { - /* - could be that transactional table was later dropped, and a non-trans - one was renamed to its name, thus create_rename_lsn is 0 and should - not be trusted. - */ - printf(", is not transactional\n"); - DBUG_ASSERT(0); /* I want to know this */ - goto end; - } - if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0) - { - printf(", has create_rename_lsn (%lu,0x%lx) is more recent than record", - (ulong) LSN_FILE_NO(rec->lsn), - (ulong) LSN_OFFSET(rec->lsn)); - goto end; - } - if (maria_is_crashed(info)) - { - printf(", is crashed, overwriting it"); - DBUG_ASSERT(0); /* I want to know this */ - } - maria_close(info); - info= NULL; - } - /* if does not exist, is older, or its header is corrupted, overwrite it */ - // TODO symlinks - ptr= name + strlen(name) + 1; - if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0)) - printf(", we will only touch index file"); - fn_format(filename, name, "", MARIA_NAME_IEXT, - (MY_UNPACK_FILENAME | - (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) | - MY_APPEND_EXT); - linkname_ptr= NULL; - create_flag= MY_DELETE_OLD; - printf(", creating as '%s'", filename); - if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, - MYF(MY_WME|create_flag))) < 0) - { - fprintf(stderr, "Failed to create index file\n"); - goto err; - } - ptr++; - uint kfile_size_before_extension= uint2korr(ptr); - ptr+= 2; - uint keystart= uint2korr(ptr); - ptr+= 2; - /* set create_rename_lsn (for maria_read_log to be idempotent) */ - lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn); - if (my_pwrite(kfile, ptr, - kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) || - my_chsize(kfile, keystart, 0, MYF(MY_WME))) - { - fprintf(stderr, "Failed to write to index file\n"); - goto err; - } - if (!(flags & HA_DONT_TOUCH_DATA)) - { - fn_format(filename,name,"", MARIA_NAME_DEXT, - MY_UNPACK_FILENAME | MY_APPEND_EXT); - linkname_ptr= NULL; - create_flag=MY_DELETE_OLD; - if ((dfile= - my_create_with_symlink(linkname_ptr, filename, 0, create_mode, - MYF(MY_WME | create_flag))) < 0) - { - fprintf(stderr, "Failed to create data file\n"); - goto err; - } - /* - we now have an empty data file. To be able to - _ma_initialize_data_file() we need some pieces of the share to be - correctly filled. So we just open the table (fortunately, an empty - data file does not preclude this). - */ - if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || - _ma_initialize_data_file(info->s, dfile)) - { - fprintf(stderr, "Failed to open new table or write to data file\n"); - goto err; - } - } - error= 0; - goto end; -err: - DBUG_ASSERT(0); - error= 1; -end: - printf("\n"); - if (kfile >= 0) - error|= my_close(kfile, MYF(MY_WME)); - if (dfile >= 0) - error|= my_close(dfile, MYF(MY_WME)); - if (info != NULL) - error|= maria_close(info); - my_free(name, MYF(MY_ALLOW_ZERO_PTR)); - return 0; -} - - -prototype_exec_hook(FILE_ID) -{ - uint16 sid; - int error; - char *name, *buff; - MARIA_HA *info= NULL; - MARIA_SHARE *share; - if (((buff= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) || - (translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) != - rec->record_length)) - { - fprintf(stderr, "Failed to read record\n"); - goto err; - } - sid= fileid_korr(buff); - name= buff + FILEID_STORE_SIZE; - printf("Table '%s', id %u", name, sid); - info= all_tables[sid]; - if (info != NULL) - { - printf(", closing table '%s'", info->s->open_file_name); - all_tables[sid]= NULL; - _ma_reenable_logging_for_table(info->s); /* put back the truth */ - if (maria_close(info)) - { - fprintf(stderr, "Failed to close table\n"); - goto err; - } - } - info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR); - if (info == NULL) - { - printf(", is absent (must have been dropped later?)" - " or its header is so corrupted that we cannot open it;" - " we skip it\n"); - goto end; - } - if (maria_is_crashed(info)) - { - fprintf(stderr, "Table is crashed, can't apply log records to it\n"); - goto err; - } - share= info->s; - /* check that we're not already using it */ - DBUG_ASSERT(share->reopen == 1); - DBUG_ASSERT(share->now_transactional == share->base.born_transactional); - if (!share->base.born_transactional) - { - printf(", is not transactional\n"); - DBUG_ASSERT(0); /* I want to know this */ - goto end; - } - all_tables[sid]= info; - /* don't log any records for this work */ - _ma_tmp_disable_logging_for_table(share); - printf(", opened\n"); - error= 0; - goto end; -err: - DBUG_ASSERT(0); - error= 1; - if (info != NULL) - error|= maria_close(info); -end: - my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); - return 0; -} - - -prototype_exec_hook(REDO_INSERT_ROW_HEAD) -{ - uint16 sid; - ulonglong page; - MARIA_HA *info; - char llbuf[22]; - uchar *buff= 0; - - sid= fileid_korr(rec->header); - page= page_korr(rec->header + FILEID_STORE_SIZE); - llstr(page, llbuf); - printf("For page %s of table of short id %u", llbuf, sid); - info= all_tables[sid]; - if (info == NULL) - { - printf(", table skipped, so skipping record\n"); - goto end; - } - printf(", '%s'", info->s->open_file_name); - if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) - { - printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log" - " record\n", - (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); - goto end; - } - /* - Soon we will also skip the page depending on the rec_lsn for this page in - the checkpoint record, but this is not absolutely needed for now (just - assume we have made no checkpoint). - */ - printf(", applying record\n"); - /* - If REDO's LSN is > page's LSN (read from disk), we are going to modify the - page and change its LSN. The normal runtime code stores the UNDO's LSN - into the page. Here storing the REDO's LSN (rec->lsn) would work - (we are not writing to the log here, so don't have to "flush up to UNDO's - LSN"). But in a test scenario where we do updates at runtime, then remove - tables, apply the log and check that this results in the same table as at - runtime, putting the same LSN as runtime had done will decrease - differences. So we use the UNDO's LSN which is current_group_end_lsn. - */ - - if ((!(buff= (uchar*) my_malloc(rec->record_length, MYF(MY_WME)))) || - (translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) != - rec->record_length)) - { - fprintf(stderr, "Failed to read record\n"); - goto end; - } - if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, HEAD_PAGE, - rec->header + FILEID_STORE_SIZE, - buff + (rec->record_length - - rec->non_header_data_len), - rec->non_header_data_len)) - goto end; - my_free(buff, MYF(0)); - return 0; - -end: - /* as we don't have apply working: */ - my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); - return 1; -} - - -prototype_exec_hook(REDO_INSERT_ROW_TAIL) -{ - uint16 sid; - ulonglong page; - MARIA_HA *info; - char llbuf[22]; - uchar *buff= 0; - - sid= fileid_korr(rec->header); - page= page_korr(rec->header + FILEID_STORE_SIZE); - llstr(page, llbuf); - printf("For page %s of table of short id %u", llbuf, sid); - info= all_tables[sid]; - if (info == NULL) - { - printf(", table skipped, so skipping record\n"); - goto end; - } - printf(", '%s'", info->s->open_file_name); - if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) - { - printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log" - " record\n", - (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); - goto end; - } - /* - Soon we will also skip the page depending on the rec_lsn for this page in - the checkpoint record, but this is not absolutely needed for now (just - assume we have made no checkpoint). - */ - printf(", applying record\n"); - /* - If REDO's LSN is > page's LSN (read from disk), we are going to modify the - page and change its LSN. The normal runtime code stores the UNDO's LSN - into the page. Here storing the REDO's LSN (rec->lsn) would work - (we are not writing to the log here, so don't have to "flush up to UNDO's - LSN"). But in a test scenario where we do updates at runtime, then remove - tables, apply the log and check that this results in the same table as at - runtime, putting the same LSN as runtime had done will decrease - differences. So we use the UNDO's LSN which is current_group_end_lsn. - */ - - if ((!(buff= (uchar*) my_malloc(rec->record_length, MYF(MY_WME)))) || - (translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) != - rec->record_length)) - { - fprintf(stderr, "Failed to read record\n"); - goto end; - } - if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, TAIL_PAGE, - rec->header + FILEID_STORE_SIZE, - buff + (rec->record_length - - rec->non_header_data_len), - rec->non_header_data_len)) - goto end; - - my_free(buff, MYF(0)); - return 0; - -end: - /* as we don't have apply working: */ - my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); - return 1; -} - - -prototype_exec_hook(REDO_PURGE_ROW_HEAD) -{ - uint16 sid; - ulonglong page; - MARIA_HA *info; - char llbuf[22]; - - sid= fileid_korr(rec->header); - page= page_korr(rec->header + FILEID_STORE_SIZE); - llstr(page, llbuf); - printf("For page %s of table of short id %u", llbuf, sid); - info= all_tables[sid]; - if (info == NULL) - { - printf(", table skipped, so skipping record\n"); - goto end; - } - printf(", '%s'", info->s->open_file_name); - if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) - { - printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log" - " record\n", - (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); - goto end; - } - /* - Soon we will also skip the page depending on the rec_lsn for this page in - the checkpoint record, but this is not absolutely needed for now (just - assume we have made no checkpoint). - */ - printf(", applying record\n"); - /* - If REDO's LSN is > page's LSN (read from disk), we are going to modify the - page and change its LSN. The normal runtime code stores the UNDO's LSN - into the page. Here storing the REDO's LSN (rec->lsn) would work - (we are not writing to the log here, so don't have to "flush up to UNDO's - LSN"). But in a test scenario where we do updates at runtime, then remove - tables, apply the log and check that this results in the same table as at - runtime, putting the same LSN as runtime had done will decrease - differences. So we use the UNDO's LSN which is current_group_end_lsn. - */ - - if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, HEAD_PAGE, - rec->header + FILEID_STORE_SIZE)) - goto end; - - return 0; - -end: - /* as we don't have apply working: */ - return 1; -} - - -prototype_exec_hook(REDO_PURGE_ROW_TAIL) -{ - uint16 sid; - ulonglong page; - MARIA_HA *info; - char llbuf[22]; - - sid= fileid_korr(rec->header); - page= page_korr(rec->header + FILEID_STORE_SIZE); - llstr(page, llbuf); - printf("For page %s of table of short id %u", llbuf, sid); - info= all_tables[sid]; - if (info == NULL) - { - printf(", table skipped, so skipping record\n"); - goto end; - } - printf(", '%s'", info->s->open_file_name); - if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) - { - printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log" - " record\n", - (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn)); - goto end; - } - /* - Soon we will also skip the page depending on the rec_lsn for this page in - the checkpoint record, but this is not absolutely needed for now (just - assume we have made no checkpoint). - */ - printf(", applying record\n"); - /* - If REDO's LSN is > page's LSN (read from disk), we are going to modify the - page and change its LSN. The normal runtime code stores the UNDO's LSN - into the page. Here storing the REDO's LSN (rec->lsn) would work - (we are not writing to the log here, so don't have to "flush up to UNDO's - LSN"). But in a test scenario where we do updates at runtime, then remove - tables, apply the log and check that this results in the same table as at - runtime, putting the same LSN as runtime had done will decrease - differences. So we use the UNDO's LSN which is current_group_end_lsn. - */ - - if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, TAIL_PAGE, - rec->header + FILEID_STORE_SIZE)) - goto end; - - return 0; - -end: - /* as we don't have apply working: */ - return 1; -} - - -static int exec_LOGREC_UNDO_ROW_INSERT(const TRANSLOG_HEADER_BUFFER *rec - __attribute__((unused))) -{ - /* Ignore this during the redo phase */ - return 0; -} - -static int exec_LOGREC_UNDO_ROW_DELETE(const TRANSLOG_HEADER_BUFFER *rec - __attribute__((unused))) -{ - /* Ignore this during the redo phase */ - return 0; -} - - - -prototype_exec_hook(COMMIT) -{ - uint16 sid= rec->short_trid; - TrID long_trid= all_active_trans[sid].long_trid; - LSN gslsn= all_active_trans[sid].group_start_lsn; - char llbuf[22]; - if (long_trid == 0) - { - printf("We don't know about transaction short_trid %u;" - "it probably committed long ago, forget it\n", sid); - return 0; - } - llstr(long_trid, llbuf); - printf("Transaction long_trid %s short_trid %u committed", llbuf, sid); - if (gslsn != LSN_IMPOSSIBLE) - { - /* - It's not an error, it may be that trn got a disk error when writing to a - table, so an unfinished group staid in the log. - */ - printf(", with group at LSN (%lu,0x%lx) short_trid %u aborted\n", - (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); - all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; - } - else - printf("\n"); - all_active_trans[sid].long_trid= 0; -#ifdef MARIA_VERSIONING - /* - if real recovery: - transaction was committed, move it to some separate list for later - purging (but don't purge now! purging may have been started before, we - may find REDO_PURGE records soon). - */ -#endif - return 0; -} - - -/* Just to inform about any aborted groups or unfinished transactions */ -static void end_of_redo_phase() -{ - uint sid, unfinished= 0; - for (sid= 0; sid <= SHORT_TRID_MAX; sid++) - { - TrID long_trid= all_active_trans[sid].long_trid; - LSN gslsn= all_active_trans[sid].group_start_lsn; - if (long_trid == 0) - continue; - if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE) - { - char llbuf[22]; - llstr(long_trid, llbuf); - printf("Transaction long_trid %s short_trid %u unfinished\n", - llbuf, sid); - } - if (gslsn != LSN_IMPOSSIBLE) - { - printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n", - (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid); - } - /* If real recovery: roll back unfinished transaction */ -#ifdef MARIA_VERSIONING - /* - If real recovery: transaction was committed, move it to some separate - list for soon purging. - */ -#endif - } - /* - We don't close tables if there are some unfinished transactions, because - closing tables normally requires that all unfinished transactions on them - be rolled back. - For example, closing will soon write the state to disk and when doing that - it will think this is a committed state, but it may not be. - */ - if (unfinished == 0) - { - for (sid= 0; sid <= SHORT_TRID_MAX; sid++) - { - MARIA_HA *info= all_tables[sid]; - if (info != NULL) - { - _ma_reenable_logging_for_table(info->s); /* put back the truth */ - maria_close(info); - } - } - } -}