From e8d9458f1548c655e461d51f3ad9dd2698eb5b7c Mon Sep 17 00:00:00 2001 From: "petr@mysql.com" <> Date: Fri, 19 May 2006 01:26:19 +0400 Subject: [PATCH] WL#3244 "CSV engine: convert mmap to read/write calls" --- mysql-test/r/csv.result | 4 +- storage/csv/ha_tina.cc | 463 +++++++++++++++++++++++++--------------- storage/csv/ha_tina.h | 69 +++++- 3 files changed, 357 insertions(+), 179 deletions(-) diff --git a/mysql-test/r/csv.result b/mysql-test/r/csv.result index 3adcc895474..04f0636d400 100644 --- a/mysql-test/r/csv.result +++ b/mysql-test/r/csv.result @@ -4944,10 +4944,10 @@ val UPDATE bug13894 SET val=6 WHERE val=10; SELECT * FROM bug13894; val +6 +6 5 11 -6 -6 DROP TABLE bug13894; DROP TABLE IF EXISTS bug14672; CREATE TABLE bug14672 (c1 integer) engine = CSV; diff --git a/storage/csv/ha_tina.cc b/storage/csv/ha_tina.cc index de69df90ed5..409364f59c6 100644 --- a/storage/csv/ha_tina.cc +++ b/storage/csv/ha_tina.cc @@ -61,7 +61,7 @@ TODO: /* The file extension */ #define CSV_EXT ".CSV" // The data file -#define CSN_EXT ".CSN" // Files used during repair +#define CSN_EXT ".CSN" // Files used during repair and update #define CSM_EXT ".CSM" // Meta file @@ -114,9 +114,59 @@ handlerton tina_hton= { NULL, /* Fill FILES Table */ HTON_CAN_RECREATE, NULL, /* binlog_func */ - NULL /* binlog_log_query */ + NULL, /* binlog_log_query */ + NULL /* release_temporary_latches */ }; + +off_t Transparent_file::read_next() +{ + off_t bytes_read; + + /* + No need to seek here, as the file managed by Transparent_file class + always points to upper_bound byte + */ + if ((bytes_read= my_read(filedes, buff, buff_size, MYF(0))) == MY_FILE_ERROR) + return -1; + + /* end of file */ + if (!bytes_read) + return -1; + + lower_bound= upper_bound; + upper_bound+= bytes_read; + + return lower_bound; +} + + +char Transparent_file::get_value(off_t offset) +{ + off_t bytes_read; + + /* check boundaries */ + if ((lower_bound <= offset) && (offset < upper_bound)) + return buff[offset - lower_bound]; + else + { + VOID(my_seek(filedes, offset, MY_SEEK_SET, MYF(0))); + /* read appropriate portion of the file */ + if ((bytes_read= my_read(filedes, buff, buff_size, + MYF(0))) == MY_FILE_ERROR) + return 0; + + lower_bound= offset; + upper_bound= lower_bound + bytes_read; + + /* end of file */ + if (upper_bound == offset) + return 0; + + return buff[0]; + } +} + /***************************************************************************** ** TINA tables *****************************************************************************/ @@ -130,7 +180,7 @@ int sort_set (tina_set *a, tina_set *b) We assume that intervals do not intersect. So, it is enought to compare any two points. Here we take start of intervals for comparison. */ - return ( a->begin > b->begin ? -1 : ( a->begin < b->begin ? 1 : 0 ) ); + return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) ); } static byte* tina_get_key(TINA_SHARE *share,uint *length, @@ -140,48 +190,6 @@ static byte* tina_get_key(TINA_SHARE *share,uint *length, return (byte*) share->table_name; } -/* - Reloads the mmap file. -*/ -int get_mmap(TINA_SHARE *share, int write) -{ - DBUG_ENTER("ha_tina::get_mmap"); - if (share->mapped_file && my_munmap(share->mapped_file, - share->file_stat.st_size)) - DBUG_RETURN(1); - - if (my_fstat(share->data_file, &share->file_stat, MYF(MY_WME)) == -1) - DBUG_RETURN(1); - - if (share->file_stat.st_size) - { - if (write) - share->mapped_file= (byte *)my_mmap(NULL, share->file_stat.st_size, - PROT_READ|PROT_WRITE, MAP_SHARED, - share->data_file, 0); - else - share->mapped_file= (byte *)my_mmap(NULL, share->file_stat.st_size, - PROT_READ, MAP_PRIVATE, - share->data_file, 0); - if ((share->mapped_file == MAP_FAILED)) - { - /* - Bad idea you think? See the problem is that nothing actually checks - the return value of ::rnd_init(), so tossing an error is about - it for us. - Never going to happen right? :) - */ - my_message(errno, "Woops, blew up opening a mapped file", 0); - DBUG_ASSERT(0); - DBUG_RETURN(1); - } - } - else - share->mapped_file= NULL; - - DBUG_RETURN(0); -} - static int tina_init_func() { @@ -218,6 +226,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) { TINA_SHARE *share; char meta_file_name[FN_REFLEN]; + MY_STAT file_stat; /* Stat information for the data file */ char *tmp_name; uint length; @@ -250,6 +259,8 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) share->table_name= tmp_name; share->crashed= FALSE; share->rows_recorded= 0; + share->update_file_opened= FALSE; + share->tina_write_opened= FALSE; strmov(share->table_name, table_name); fn_format(share->data_file_name, table_name, "", CSV_EXT, MY_REPLACE_EXT|MY_UNPACK_FILENAME); @@ -277,27 +288,16 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) */ if (read_meta_file(share->meta_file, &share->rows_recorded)) share->crashed= TRUE; - else - (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE); - if ((share->data_file= my_open(share->data_file_name, O_RDWR|O_APPEND, - MYF(0))) == -1) + if (my_stat(share->data_file_name, &file_stat, MYF(MY_WME)) == NULL) goto error2; - - share->mapped_file= NULL; // We don't know the state as we just allocated it - if (get_mmap(share, 0) > 0) - goto error3; - - /* init file length value used by readers */ - share->saved_data_file_length= share->file_stat.st_size; + share->saved_data_file_length= file_stat.st_size; } share->use_count++; pthread_mutex_unlock(&tina_mutex); return share; -error3: - my_close(share->data_file,MYF(0)); error2: thr_lock_delete(&share->lock); pthread_mutex_destroy(&share->mutex); @@ -425,6 +425,30 @@ bool ha_tina::check_and_repair(THD *thd) } +int ha_tina::init_tina_writer() +{ + DBUG_ENTER("ha_tina::init_tina_writer"); + + /* + Mark the file as crashed. We will set the flag back when we close + the file. In the case of the crash it will remain marked crashed, + which enforce recovery. + */ + (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE); + + if ((share->tina_write_filedes= + my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1) + { + DBUG_PRINT("info", ("Could not open tina file writes")); + share->crashed= TRUE; + DBUG_RETURN(1); + } + share->tina_write_opened= TRUE; + + DBUG_RETURN(0); +} + + bool ha_tina::is_crashed() const { DBUG_ENTER("ha_tina::is_crashed"); @@ -445,10 +469,13 @@ static int free_share(TINA_SHARE *share) share->crashed ? TRUE :FALSE); if (my_close(share->meta_file, MYF(0))) result_code= 1; - if (share->mapped_file) - my_munmap(share->mapped_file, share->file_stat.st_size); - share->mapped_file= NULL; - result_code= my_close(share->data_file,MYF(0)); + if (share->tina_write_opened) + { + if (my_close(share->tina_write_filedes, MYF(0))) + result_code= 1; + share->tina_write_opened= FALSE; + } + hash_delete(&tina_open_tables, (byte*) share); thr_lock_delete(&share->lock); pthread_mutex_destroy(&share->mutex); @@ -468,16 +495,16 @@ int tina_end(ha_panic_function type) Finds the end of a line. Currently only supports files written on a UNIX OS. */ -byte * find_eoln(byte *data, off_t begin, off_t end) + +off_t find_eoln_buff(Transparent_file *data_buff, off_t begin, off_t end) { for (off_t x= begin; x < end; x++) - if (data[x] == '\n') - return data + x; + if (data_buff->get_value(x) == '\n') + return x; return 0; } - static handler *tina_create_handler(TABLE_SHARE *table) { return new ha_tina(table); @@ -491,7 +518,7 @@ ha_tina::ha_tina(TABLE_SHARE *table_arg) They are not probably completely right. */ current_position(0), next_position(0), local_saved_data_file_length(0), - chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH), + file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH), records_is_known(0) { /* Set our original buffers from pre-allocated memory */ @@ -619,50 +646,50 @@ int ha_tina::chain_append() */ int ha_tina::find_current_row(byte *buf) { - byte *mapped_ptr; - byte *end_ptr; + off_t end_offset, curr_offset= current_position; DBUG_ENTER("ha_tina::find_current_row"); - mapped_ptr= (byte *)share->mapped_file + current_position; - /* We do not read further then local_saved_data_file_length in order not to conflict with undergoing concurrent insert. */ - if ((end_ptr= find_eoln(share->mapped_file, current_position, - local_saved_data_file_length)) == 0) + if ((end_offset= find_eoln_buff(file_buff, current_position, + local_saved_data_file_length)) == 0) DBUG_RETURN(HA_ERR_END_OF_FILE); for (Field **field=table->field ; *field ; field++) { buffer.length(0); - if (*mapped_ptr == '"') - mapped_ptr++; // Increment past the first quote + if (file_buff->get_value(curr_offset) == '"') + curr_offset++; // Incrementpast the first quote else DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); - for(;mapped_ptr != end_ptr; mapped_ptr++) + for(;curr_offset != end_offset; curr_offset++) { // Need to convert line feeds! - if (*mapped_ptr == '"' && - (((mapped_ptr[1] == ',') && (mapped_ptr[2] == '"')) || - (mapped_ptr == end_ptr -1 ))) + if (file_buff->get_value(curr_offset) == '"' && + (((file_buff->get_value(curr_offset + 1) == ',') && + (file_buff->get_value(curr_offset + 2) == '"')) || + (curr_offset == end_offset -1 ))) { - mapped_ptr += 2; // Move past the , and the " + curr_offset+= 2; // Move past the , and the " break; } - if (*mapped_ptr == '\\' && mapped_ptr != (end_ptr - 1)) + if (file_buff->get_value(curr_offset) == '\\' && + curr_offset != (end_offset - 1)) { - mapped_ptr++; - if (*mapped_ptr == 'r') + curr_offset++; + if (file_buff->get_value(curr_offset) == 'r') buffer.append('\r'); - else if (*mapped_ptr == 'n' ) + else if (file_buff->get_value(curr_offset) == 'n' ) buffer.append('\n'); - else if ((*mapped_ptr == '\\') || (*mapped_ptr == '"')) - buffer.append(*mapped_ptr); + else if ((file_buff->get_value(curr_offset) == '\\') || + (file_buff->get_value(curr_offset) == '"')) + buffer.append(file_buff->get_value(curr_offset)); else /* This could only happed with an externally created file */ { buffer.append('\\'); - buffer.append(*mapped_ptr); + buffer.append(file_buff->get_value(curr_offset)); } } else // ordinary symbol @@ -671,14 +698,14 @@ int ha_tina::find_current_row(byte *buf) We are at final symbol and no last quote was found => we are working with a damaged file. */ - if (mapped_ptr == end_ptr -1) + if (curr_offset == end_offset - 1) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); - buffer.append(*mapped_ptr); + buffer.append(file_buff->get_value(curr_offset)); } } (*field)->store(buffer.ptr(), buffer.length(), system_charset_info); } - next_position= (end_ptr - share->mapped_file)+1; + next_position= end_offset + 1; /* Maybe use \N for null? */ memset(buf, 0, table->s->null_bytes); /* We do not implement nulls! */ @@ -776,7 +803,7 @@ void ha_tina::get_status() void ha_tina::update_status() { /* correct local_saved_data_file_length for writers */ - share->saved_data_file_length= share->file_stat.st_size; + share->saved_data_file_length= local_saved_data_file_length; } @@ -827,6 +854,11 @@ int ha_tina::open(const char *name, int mode, uint open_options) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); } + if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1) + DBUG_RETURN(0); + + file_buff= new Transparent_file(data_file); + /* Init locking. Pass handler object to the locking routines, so that they could save/update local_saved_data_file_length value @@ -845,12 +877,14 @@ int ha_tina::open(const char *name, int mode, uint open_options) /* Close a database file. We remove ourselves from the shared strucutre. - If it is empty we destroy it and free the mapped file. + If it is empty we destroy it. */ int ha_tina::close(void) { + int rc= 0; DBUG_ENTER("ha_tina::close"); - DBUG_RETURN(free_share(share)); + rc= my_close(data_file, MYF(0)); + DBUG_RETURN(free_share(share) || rc); } /* @@ -873,22 +907,17 @@ int ha_tina::write_row(byte * buf) size= encode_quote(buf); - if (my_write(share->data_file, (byte*)buffer.ptr(), size, + if (!share->tina_write_opened) + if (init_tina_writer()) + DBUG_RETURN(-1); + + /* use pwrite, as concurrent reader could have changed the position */ + if (my_write(share->tina_write_filedes, buffer.ptr(), size, MYF(MY_WME | MY_NABP))) DBUG_RETURN(-1); - /* - Ok, this is means that we will be doing potentially bad things - during a bulk insert on some OS'es. What we need is a cleanup - call for ::write_row that would let us fix up everything after the bulk - insert. The archive handler does this with an extra mutx call, which - might be a solution for this. - */ - if (get_mmap(share, 0) > 0) - DBUG_RETURN(-1); - /* update local copy of the max position to see our own changes */ - local_saved_data_file_length= share->file_stat.st_size; + local_saved_data_file_length+= size; /* update shared info */ pthread_mutex_lock(&share->mutex); @@ -903,6 +932,23 @@ int ha_tina::write_row(byte * buf) } +int ha_tina::open_update_temp_file_if_needed() +{ + char updated_fname[FN_REFLEN]; + + if (!share->update_file_opened) + { + if ((update_temp_file= + my_create(fn_format(updated_fname, share->table_name, + "", CSN_EXT, + MY_REPLACE_EXT | MY_UNPACK_FILENAME), + 0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0) + return 1; + share->update_file_opened= TRUE; + } + return 0; +} + /* This is called for an update. Make sure you put in code to increment the auto increment, also @@ -926,16 +972,16 @@ int ha_tina::update_row(const byte * old_data, byte * new_data) if (chain_append()) DBUG_RETURN(-1); - if (my_write(share->data_file, (byte*)buffer.ptr(), size, + if (open_update_temp_file_if_needed()) + DBUG_RETURN(-1); + + if (my_write(update_temp_file, (byte*)buffer.ptr(), size, MYF(MY_WME | MY_NABP))) DBUG_RETURN(-1); /* UPDATE should never happen on the log tables */ DBUG_ASSERT(!share->is_log_table); - /* update local copy of the max position to see our own changes */ - local_saved_data_file_length= share->file_stat.st_size; - DBUG_RETURN(0); } @@ -1001,6 +1047,8 @@ int ha_tina::rnd_init(bool scan) { DBUG_ENTER("ha_tina::rnd_init"); + /* set buffer to the beginning of the file */ + file_buff->init_buff(data_file); if (share->crashed) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); @@ -1008,11 +1056,6 @@ int ha_tina::rnd_init(bool scan) records= 0; records_is_known= 0; chain_ptr= chain; -#ifdef HAVE_MADVISE - if (scan) - (void) madvise(share->mapped_file, share->file_stat.st_size, - MADV_SEQUENTIAL); -#endif DBUG_RETURN(0); } @@ -1042,8 +1085,11 @@ int ha_tina::rnd_next(byte *buf) ha_statistic_increment(&SSV::ha_read_rnd_next_count); current_position= next_position; - if (!share->mapped_file) + + /* don't scan an empty file */ + if (!local_saved_data_file_length) DBUG_RETURN(HA_ERR_END_OF_FILE); + if ((rc= find_current_row(buf))) DBUG_RETURN(rc); @@ -1112,6 +1158,22 @@ int ha_tina::extra(enum ha_extra_function operation) DBUG_RETURN(0); } +/* + Set end_pos to the last valid byte of continuous area, closest + to the given "hole", stored in the buffer. "Valid" here means, + not listed in the chain of deleted records ("holes"). +*/ +bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole) +{ + if (closest_hole == chain_ptr) /* no more chains */ + *end_pos= file_buff->end(); + else + *end_pos= min(file_buff->end(), + closest_hole->begin); + return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin); +} + + /* Called after each table scan. In particular after deletes, and updates. In the last case we employ chain of deleted @@ -1120,53 +1182,99 @@ int ha_tina::extra(enum ha_extra_function operation) */ int ha_tina::rnd_end() { + char updated_fname[FN_REFLEN]; + off_t file_buffer_start= 0; DBUG_ENTER("ha_tina::rnd_end"); records_is_known= 1; - /* First position will be truncate position, second will be increment */ if ((chain_ptr - chain) > 0) { - tina_set *ptr; - size_t length; + tina_set *ptr= chain; /* - Setting up writable map, this will contain all of the data after the - get_mmap call that we have added to the file. + Re-read the beginning of a file (as the buffer should point to the + end of file after the scan). */ - if (get_mmap(share, 1) > 0) - DBUG_RETURN(-1); - length= share->file_stat.st_size; + file_buff->init_buff(data_file); /* - The sort handles updates/deletes with random orders. - It also sorts so that we move the final blocks to the - beginning so that we move the smallest amount of data possible. + The sort is needed when there were updates/deletes with random orders. + It sorts so that we move the firts blocks to the beginning. */ qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set), (qsort_cmp)sort_set); - for (ptr= chain; ptr < chain_ptr; ptr++) + + off_t write_begin= 0, write_end; + + /* create the file to write updated table if it wasn't yet created */ + if (open_update_temp_file_if_needed()) + DBUG_RETURN(-1); + + /* write the file with updated info */ + while ((file_buffer_start != -1)) // while not end of file { - memmove(share->mapped_file + ptr->begin, share->mapped_file + ptr->end, - length - (size_t)ptr->end); - length= length - (size_t)(ptr->end - ptr->begin); + bool in_hole= get_write_pos(&write_end, ptr); + + /* if there is something to write, write it */ + if ((write_end - write_begin) && + (my_write(update_temp_file, + file_buff->ptr() + (write_begin - file_buff->start()), + write_end - write_begin, MYF_RW))) + goto error; + + if (in_hole) + { + /* skip hole */ + while (file_buff->end() <= ptr->end && file_buffer_start != -1) + file_buffer_start= file_buff->read_next(); + write_begin= ptr->end; + ptr++; + } + else + write_begin= write_end; + + if (write_end == file_buff->end()) + file_buffer_start= file_buff->read_next(); /* shift the buffer */ + } - /* Unmap the file before the new size is set */ - if (my_munmap(share->mapped_file, share->file_stat.st_size)) + if (my_close(update_temp_file, MYF(0))) DBUG_RETURN(-1); - /* We set it to null so that get_mmap() won't try to unmap it */ - share->mapped_file= NULL; + share->update_file_opened= FALSE; - /* Set the file to the new size */ - if (my_chsize(share->data_file, length, 0, MYF(MY_WME))) + if (share->tina_write_opened) + { + if (my_close(share->tina_write_filedes, MYF(0))) + DBUG_RETURN(-1); + /* + Mark that the writer fd is closed, so that init_tina_writer() + will reopen it later. + */ + share->tina_write_opened= FALSE; + } + + /* + Close opened fildes's. Then move updated file in place + of the old datafile. + */ + if (my_close(data_file, MYF(0)) || + my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT, + MY_REPLACE_EXT | MY_UNPACK_FILENAME), + share->data_file_name, MYF(0))) DBUG_RETURN(-1); - if (get_mmap(share, 0) > 0) + /* Open the file again and sync it */ + if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1) + || my_sync(data_file, MYF(MY_WME))) DBUG_RETURN(-1); } DBUG_RETURN(0); +error: + my_close(update_temp_file, MYF(0)); + share->update_file_opened= FALSE; + DBUG_RETURN(-1); } @@ -1195,10 +1303,11 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) File repair_file; int rc; ha_rows rows_repaired= 0; + off_t write_begin= 0, write_end; DBUG_ENTER("ha_tina::repair"); /* empty file */ - if (!share->mapped_file) + if (!share->saved_data_file_length) { share->rows_recorded= 0; goto end; @@ -1207,12 +1316,15 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) DBUG_RETURN(HA_ERR_OUT_OF_MEM); + /* position buffer to the start of the file */ + file_buff->init_buff(data_file); + /* Local_saved_data_file_length is initialized during the lock phase. Sometimes this is not getting executed before ::repair (e.g. for the log tables). We set it manually here. */ - local_saved_data_file_length= share->file_stat.st_size; + local_saved_data_file_length= share->saved_data_file_length; /* set current position to the beginning of the file */ current_position= next_position= 0; @@ -1227,11 +1339,10 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) if (rc == HA_ERR_END_OF_FILE) { - /* All rows were read ok until end of file, the file does not need repair. */ - /* - If rows_recorded != rows_repaired, we should update - rows_recorded value to the current amount of rows. + All rows were read ok until end of file, the file does not need repair. + If rows_recorded != rows_repaired, we should update rows_recorded value + to the current amount of rows. */ share->rows_recorded= rows_repaired; goto end; @@ -1247,36 +1358,45 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) 0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR); - if (my_write(repair_file, (byte*)share->mapped_file, current_position, - MYF(MY_NABP))) - DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); - my_close(repair_file, MYF(0)); + file_buff->init_buff(data_file); + + /* we just truncated the file up to the first bad row. update rows count. */ share->rows_recorded= rows_repaired; - if (my_munmap(share->mapped_file, share->file_stat.st_size)) - DBUG_RETURN(-1); - /* We set it to null so that get_mmap() won't try to unmap it */ - share->mapped_file= NULL; + /* write repaired file */ + while (1) + { + write_end= min(file_buff->end(), current_position); + if ((write_end - write_begin) && + (my_write(repair_file, file_buff->ptr(), + write_end - write_begin, MYF_RW))) + DBUG_RETURN(-1); + + write_begin= write_end; + if (write_end== current_position) + break; + else + file_buff->read_next(); /* shift the buffer */ + } /* - Close the "to"-file before renaming - On Windows one cannot rename a file, which descriptor - is still open. EACCES will be returned when trying to delete - the "to"-file in my_rename() + Close the files and rename repaired file to the datafile. + We have to close the files, as on Windows one cannot rename + a file, which descriptor is still open. EACCES will be returned + when trying to delete the "to"-file in my_rename(). */ - my_close(share->data_file,MYF(0)); - - if (my_rename(repaired_fname, share->data_file_name, MYF(0))) + if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) || + my_rename(repaired_fname, share->data_file_name, MYF(0))) DBUG_RETURN(-1); /* Open the file again, it should now be repaired */ - if ((share->data_file= my_open(share->data_file_name, O_RDWR|O_APPEND, - MYF(0))) == -1) + if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND, + MYF(0))) == -1) DBUG_RETURN(-1); - if (get_mmap(share, 0) > 0) - DBUG_RETURN(-1); + /* Set new file size. The file size will be updated by ::update_status() */ + local_saved_data_file_length= (size_t) current_position; end: share->crashed= FALSE; @@ -1295,17 +1415,12 @@ int ha_tina::delete_all_rows() if (!records_is_known) DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND); - /* Unmap the file before the new size is set */ - if (share->mapped_file && my_munmap(share->mapped_file, - share->file_stat.st_size)) - DBUG_RETURN(-1); - share->mapped_file= NULL; + if (!share->tina_write_opened) + if (init_tina_writer()) + DBUG_RETURN(-1); /* Truncate the file to zero size */ - rc= my_chsize(share->data_file, 0, 0, MYF(MY_WME)); - - if (get_mmap(share, 0) > 0) - DBUG_RETURN(-1); + rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME)); records=0; DBUG_RETURN(rc); @@ -1367,12 +1482,15 @@ int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt) if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) DBUG_RETURN(HA_ERR_OUT_OF_MEM); + /* position buffer to the start of the file */ + file_buff->init_buff(data_file); + /* Local_saved_data_file_length is initialized during the lock phase. Check does not use store_lock in certain cases. So, we set it manually here. */ - local_saved_data_file_length= share->file_stat.st_size; + local_saved_data_file_length= share->saved_data_file_length; /* set current position to the beginning of the file */ current_position= next_position= 0; /* Read the file row-by-row. If everything is ok, repair is not needed. */ @@ -1412,6 +1530,7 @@ mysql_declare_plugin(csv) tina_init_func, /* Plugin Init */ tina_done_func, /* Plugin Deinit */ 0x0100 /* 1.0 */, + 0 } mysql_declare_plugin_end; diff --git a/storage/csv/ha_tina.h b/storage/csv/ha_tina.h index d155a614780..eb30f150bbe 100644 --- a/storage/csv/ha_tina.h +++ b/storage/csv/ha_tina.h @@ -19,6 +19,7 @@ #include #define DEFAULT_CHAIN_LENGTH 512 +#define DEFAULT_FILE_WINDOW_SIZE 4096 /* Version for file format. 1 - Initial Version. That is, the version when the metafile was introduced. @@ -29,15 +30,12 @@ typedef struct st_tina_share { char *table_name; char data_file_name[FN_REFLEN]; - byte *mapped_file; /* mapped region of file */ uint table_name_length, use_count; /* Below flag is needed to make log tables work with concurrent insert. For more details see comment to ha_tina::update_status. */ my_bool is_log_table; - MY_STAT file_stat; /* Stat information for the data file */ - File data_file; /* Current open data file */ /* Here we save the length of the file for readers. This is updated by inserts, updates and deletes. The var is initialized along with the @@ -46,7 +44,10 @@ typedef struct st_tina_share { off_t saved_data_file_length; pthread_mutex_t mutex; THR_LOCK lock; + bool update_file_opened; + bool tina_write_opened; File meta_file; /* Meta file we use */ + File tina_write_filedes; /* File handler for readers */ bool crashed; /* Meta file is crashed */ ha_rows rows_recorded; /* Number of rows in tables */ } TINA_SHARE; @@ -56,6 +57,54 @@ struct tina_set { off_t end; }; +class Transparent_file +{ + File filedes; + byte *buff; /* in-memory window to the file or mmaped area */ + /* current window sizes */ + off_t lower_bound; + off_t upper_bound; + uint buff_size; + + public: + + Transparent_file(File filedes_arg) : lower_bound(0), + buff_size(DEFAULT_FILE_WINDOW_SIZE) + { + buff= (byte *) my_malloc(buff_size*sizeof(byte), MYF(MY_WME)); + /* read the beginning of the file */ + init_buff(filedes_arg); + } + + ~Transparent_file() + { my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); } + + void init_buff(File filedes_arg) + { + filedes= filedes_arg; + /* read the beginning of the file */ + lower_bound= 0; + VOID(my_seek(filedes, 0, MY_SEEK_SET, MYF(0))); + if (filedes && buff) + upper_bound= my_read(filedes, buff, buff_size, MYF(0)); + } + + byte *ptr() + { return buff; } + + off_t start() + { return lower_bound; } + + off_t end() + { return upper_bound; } + + /* get a char from the given position in the file */ + char get_value (off_t offset); + /* shift a buffer windows to see the next part of the file */ + off_t read_next(); + +}; + class ha_tina: public handler { THR_LOCK_DATA lock; /* MySQL lock */ @@ -64,6 +113,9 @@ class ha_tina: public handler off_t next_position; /* Next position in the file scan */ off_t local_saved_data_file_length; /* save position for reads */ byte byte_buffer[IO_SIZE]; + Transparent_file *file_buff; + File data_file; /* File handler for readers */ + File update_temp_file; String buffer; /* The chain contains "holes" in the file, occured because of @@ -77,12 +129,19 @@ class ha_tina: public handler uint32 chain_size; bool records_is_known; +private: + bool get_write_pos(off_t *end_pos, tina_set *closest_hole); + int open_update_temp_file_if_needed(); + int init_tina_writer(); + public: ha_tina(TABLE_SHARE *table_arg); - ~ha_tina() + ~ha_tina() { if (chain_alloced) - my_free((gptr)chain,0); + my_free((gptr)chain, 0); + if (file_buff) + delete file_buff; } const char *table_type() const { return "CSV"; } const char *index_type(uint inx) { return "NONE"; }