1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Merge work:/home/bk/mysql-4.0

into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
This commit is contained in:
sasha@mysql.sashanet.com
2001-11-07 16:27:30 -07:00
8 changed files with 409 additions and 72 deletions

View File

@ -41,7 +41,12 @@ static void my_aiowait(my_aio_result *result);
#include <assert.h>
#include <errno.h>
#ifdef MAIN
#include <my_dir.h>
#endif
static void init_read_function(IO_CACHE* info, enum cache_type type);
static void init_write_function(IO_CACHE* info, enum cache_type type);
static void init_read_function(IO_CACHE* info, enum cache_type type)
{
@ -65,6 +70,18 @@ static void init_read_function(IO_CACHE* info, enum cache_type type)
}
}
static void init_write_function(IO_CACHE* info, enum cache_type type)
{
switch (type)
{
case SEQ_READ_APPEND:
info->write_function = _my_b_append;
break;
default:
info->write_function = _my_b_write;
}
}
/*
** if cachesize == 0 then use default cachesize (from s-file)
** if file == -1 then real_open_cached_file() will be called.
@ -87,6 +104,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
info->alloced_buffer = 0;
if (type == READ_CACHE || type == SEQ_READ_APPEND)
{ /* Assume file isn't growing */
if (cache_myflags & MY_DONT_CHECK_FILESIZE)
@ -113,7 +131,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
}
}
}
info->alloced_buffer = 0;
if ((int) type < (int) READ_NET)
{
uint buffer_block;
@ -152,8 +169,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == SEQ_READ_APPEND)
{
info->append_pos = info->append_buffer;
info->append_end = info->append_buffer + info->buffer_length;
info->append_read_pos = info->write_pos = info->append_buffer;
info->write_end = info->append_end =
info->append_buffer + info->buffer_length;
#ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
#endif
@ -166,7 +184,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
}
else /* type == WRITE_CACHE */
{
info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
info->write_end=
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
info->write_pos = info->buffer;
}
/* end_of_file may be changed by user later */
info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
@ -174,6 +194,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->type=type;
info->error=0;
init_read_function(info,type);
init_write_function(info,type);
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
@ -234,16 +255,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{ /* use current buffer */
if (info->type == WRITE_CACHE && type == READ_CACHE)
{
info->rc_end=info->rc_pos;
info->rc_end=info->write_pos;
info->end_of_file=my_b_tell(info);
}
else if (type == WRITE_CACHE)
{
if (info->type == READ_CACHE)
info->rc_end=info->buffer+info->buffer_length;
{
info->write_end=info->buffer+info->buffer_length;
info->write_pos=info->rc_pos;
}
info->end_of_file = ~(my_off_t) 0;
}
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
if (type == WRITE_CACHE)
info->write_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
else
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
#endif
@ -277,9 +304,14 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
~(my_off_t) 0);
}
}
if (info->type == SEQ_READ_APPEND)
{
info->append_read_pos = info->write_pos = info->append_buffer;
}
info->type=type;
info->error=0;
init_read_function(info,type);
init_write_function(info,type);
#ifdef HAVE_AIOWAIT
if (type != READ_NET)
{
@ -294,7 +326,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->inited=0;
#endif
DBUG_RETURN(0);
} /* init_io_cache */
} /* reinit_io_cache */
@ -377,11 +409,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return 0;
}
/* Do sequential read from the SEQ_READ_APPEND cache
we do this in three stages:
- first read from info->buffer
- then if there are still data to read, try the file descriptor
- afterwards, if there are still data to read, try append buffer
*/
int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
uint length,diff_length,left_length,save_count;
my_off_t max_length, pos_in_file;
save_count=Count;
/* first, read the regular buffer */
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
@ -390,30 +430,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
info->end_of_file)
{
info->pos_in_file=pos_in_file;
goto read_append_buffer;
}
/* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
#ifdef THREAD
pthread_mutex_lock(&info->append_buffer_lock);
#endif
#ifdef THREAD
pthread_mutex_unlock(&info->append_buffer_lock);
#endif
/* now the second stage begins - read from file descriptor */
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
if (info->end_of_file == pos_in_file)
{ /* End of file */
info->error=(int) left_length;
return 1;
goto read_append_buffer;
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length)
{
info->error= read_length == (uint) -1 ? -1 :
(int) (read_length+left_length);
return 1;
if (read_length != (uint)-1)
{
Count -= read_length;
Buffer += read_length;
}
goto read_append_buffer;
}
Count-=length;
Buffer+=length;
@ -422,15 +465,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
diff_length=0;
}
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
(info->end_of_file - pos_in_file) < max_length)
if ((info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
info->error= left_length; /* We only got this many char */
return 1;
goto read_append_buffer;
}
length=0; /* Didn't read any chars */
}
@ -439,15 +480,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
length == (uint) -1)
{
if (length != (uint) -1)
{
memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
return 1;
Count -= length;
Buffer += length;
}
goto read_append_buffer;
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
read_append_buffer:
lock_append_buffer(info);
if (!Count) return 0;
{
uint copy_len = (uint)(info->append_read_pos -
info->write_pos);
dbug_assert(info->append_read_pos <= info->write_pos);
if (copy_len > Count)
copy_len = Count;
memcpy(Buffer, info->append_read_pos,
copy_len);
info->append_read_pos += copy_len;
Count -= copy_len;
if (Count)
info->error = save_count - Count;
}
unlock_append_buffer(info);
return Count ? 1 : 0;
}
#ifdef HAVE_AIOWAIT
@ -641,11 +703,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->rc_end - info->rc_pos);
memcpy(info->rc_pos,Buffer,(size_t) rest_length);
rest_length=(uint) (info->write_end - info->write_pos);
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->rc_pos+=rest_length;
info->write_pos+=rest_length;
if (info->pos_in_file+info->buffer_length > info->end_of_file)
{
my_errno=errno=EFBIG;
@ -667,8 +729,33 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=length;
info->pos_in_file+=length;
}
memcpy(info->rc_pos,Buffer,(size_t) Count);
info->rc_pos+=Count;
memcpy(info->write_pos,Buffer,(size_t) Count);
info->write_pos+=Count;
return 0;
}
int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->append_end -
info->write_pos);
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
{ /* Fill first intern buffer */
length=Count & (uint) ~(IO_SIZE-1);
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
return info->error= -1;
Count-=length;
Buffer+=length;
}
memcpy(info->write_pos,Buffer,(size_t) Count);
info->write_pos+=Count;
return 0;
}
@ -712,8 +799,8 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
Buffer+=length;
Count-= length;
/* Fix length of buffer if the new data was larger */
if (info->buffer+length > info->rc_pos)
info->rc_pos=info->buffer+length;
if (info->buffer+length > info->write_pos)
info->write_pos=info->buffer+length;
if (!Count)
return (error);
}
@ -723,37 +810,60 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
return error;
}
/* avoid warning about empty if body */
#ifdef THREAD
#define IF_APPEND_CACHE if (append_cache)
#else
#define IF_APPEND_CACHE
#endif
/* Flush write cache */
int flush_io_cache(IO_CACHE *info)
{
uint length;
int append_cache;
DBUG_ENTER("flush_io_cache");
if (info->type == WRITE_CACHE)
append_cache = (info->type == SEQ_READ_APPEND);
if (info->type == WRITE_CACHE || append_cache)
{
if (info->file == -1)
{
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
if (info->rc_pos != info->buffer)
IF_APPEND_CACHE
lock_append_buffer(info);
if (info->write_pos != info->buffer)
{
length=(uint) (info->rc_pos - info->buffer);
length=(uint) (info->write_pos - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
{
IF_APPEND_CACHE
unlock_append_buffer(info);
DBUG_RETURN((info->error= -1));
}
info->seek_not_done=0;
}
info->rc_pos=info->buffer;
info->write_pos=info->buffer;
info->pos_in_file+=length;
info->rc_end=(info->buffer+info->buffer_length-
info->write_end=(info->buffer+info->buffer_length-
(info->pos_in_file & (IO_SIZE-1)));
if (append_cache)
{
info->append_read_pos = info->buffer;
info->append_end = info->write_end;
}
if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
DBUG_RETURN((info->error= -1));
DBUG_RETURN(0);
info->error= -1;
else
info->error= 0;
IF_APPEND_CACHE
unlock_append_buffer(info);
DBUG_RETURN(info->error);
}
}
#ifdef HAVE_AIOWAIT
@ -781,7 +891,94 @@ int end_io_cache(IO_CACHE *info)
error=flush_io_cache(info);
my_free((gptr) info->buffer,MYF(MY_WME));
info->buffer=info->rc_pos=(byte*) 0;
info->alloced_buffer = 0;
}
DBUG_RETURN(error);
} /* end_io_cache */
#ifdef MAIN
void die(const char* fmt, ...)
{
va_list va_args;
va_start(va_args,fmt);
fprintf(stderr,"Error:");
vfprintf(stderr, fmt,va_args);
fprintf(stderr,", errno=%d\n", errno);
exit(1);
}
int open_file(const char* fname, IO_CACHE* info, int cache_size)
{
int fd;
if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
die("Could not open %s", fname);
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
die("failed in init_io_cache()");
return fd;
}
void close_file(IO_CACHE* info)
{
end_io_cache(info);
my_close(info->file, MYF(MY_WME));
}
int main(int argc, char** argv)
{
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
MY_STAT status;
const char* fname="/tmp/iocache.test";
int cache_size=16384;
char llstr_buf[22];
int max_block,total_bytes=0;
int i,num_loops=100,error=0;
char *p;
char* block, *block_end;
MY_INIT(argv[0]);
max_block = cache_size*3;
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
die("Not enough memory to allocate test block");
block_end = block + max_block;
for (p = block,i=0; p < block_end;i++)
{
*p++ = (char)i;
}
if (my_stat(fname,&status, MYF(0)) &&
my_delete(fname,MYF(MY_WME)))
{
die("Delete of %s failed, aborting", fname);
}
open_file(fname,&sra_cache, cache_size);
for (i = 0; i < num_loops; i++)
{
char buf[4];
int block_size = abs(rand() % max_block);
int4store(buf, block_size);
if (my_b_write(&sra_cache,buf,4) ||
my_b_write(&sra_cache, block, block_size))
die("write failed");
total_bytes += 4+block_size;
}
close_file(&sra_cache);
my_free(block,MYF(MY_WME));
if (!my_stat(fname,&status,MYF(MY_WME)))
die("%s failed to stat, but I had just closed it,\
wonder how that happened");
printf("Final size of %s is %s, wrote %d bytes\n",fname,
llstr(status.st_size,llstr_buf),
total_bytes);
my_delete(fname, MYF(MY_WME));
/* check correctness of tests */
if (total_bytes != status.st_size)
{
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
supposedly written\n");
error=1;
}
exit(error);
return 0;
}
#endif