mirror of
https://github.com/MariaDB/server.git
synced 2026-01-06 05:22:24 +03:00
Merge chilla.local:/home/mydev/mysql-4.1-bug8283-one
into chilla.local:/home/mydev/mysql-5.0-bug8283 include/my_sys.h: Auto merged include/myisam.h: Auto merged myisam/mi_check.c: Auto merged myisam/mi_open.c: Auto merged myisam/mi_packrec.c: Auto merged myisam/sort.c: Auto merged mysql-test/t/myisam.test: Auto merged myisam/myisamdef.h: Bug#8283 - OPTIMIZE TABLE causes data loss Merge from 4.1 mysql-test/r/myisam.result: Bug#8283 - OPTIMIZE TABLE causes data loss Merge from 4.1 mysys/mf_iocache.c: Bug#8283 - OPTIMIZE TABLE causes data loss Merge from 4.1
This commit is contained in:
@@ -70,7 +70,6 @@ static void my_aiowait(my_aio_result *result);
|
||||
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
|
||||
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
|
||||
|
||||
|
||||
/*
|
||||
Setup internal pointers inside IO_CACHE
|
||||
|
||||
@@ -502,65 +501,366 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
#ifdef THREAD
|
||||
/* Prepare IO_CACHE for shared use */
|
||||
void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
|
||||
/*
|
||||
Prepare IO_CACHE for shared use.
|
||||
|
||||
SYNOPSIS
|
||||
init_io_cache_share()
|
||||
read_cache A read cache. This will be copied for
|
||||
every thread after setup.
|
||||
cshare The share.
|
||||
write_cache If non-NULL a write cache that is to be
|
||||
synchronized with the read caches.
|
||||
num_threads Number of threads sharing the cache
|
||||
including the write thread if any.
|
||||
|
||||
DESCRIPTION
|
||||
|
||||
The shared cache is used so: One IO_CACHE is initialized with
|
||||
init_io_cache(). This includes the allocation of a buffer. Then a
|
||||
share is allocated and init_io_cache_share() is called with the io
|
||||
cache and the share. Then the io cache is copied for each thread. So
|
||||
every thread has its own copy of IO_CACHE. But the allocated buffer
|
||||
is shared because cache->buffer is the same for all caches.
|
||||
|
||||
One thread reads data from the file into the buffer. All threads
|
||||
read from the buffer, but every thread maintains its own set of
|
||||
pointers into the buffer. When all threads have used up the buffer
|
||||
contents, one of the threads reads the next block of data into the
|
||||
buffer. To accomplish this, each thread enters the cache lock before
|
||||
accessing the buffer. They wait in lock_io_cache() until all threads
|
||||
joined the lock. The last thread entering the lock is in charge of
|
||||
reading from file to buffer. It wakes all threads when done.
|
||||
|
||||
Synchronizing a write cache to the read caches works so: Whenever
|
||||
the write buffer needs a flush, the write thread enters the lock and
|
||||
waits for all other threads to enter the lock too. They do this when
|
||||
they have used up the read buffer. When all threads are in the lock,
|
||||
the write thread copies the write buffer to the read buffer and
|
||||
wakes all threads.
|
||||
|
||||
share->running_threads is the number of threads not being in the
|
||||
cache lock. When entering lock_io_cache() the number is decreased.
|
||||
When the thread that fills the buffer enters unlock_io_cache() the
|
||||
number is reset to the number of threads. The condition
|
||||
running_threads == 0 means that all threads are in the lock. Bumping
|
||||
up the number to the full count is non-intuitive. But increasing the
|
||||
number by one for each thread that leaves the lock could lead to a
|
||||
solo run of one thread. The last thread to join a lock reads from
|
||||
file to buffer, wakes the other threads, processes the data in the
|
||||
cache and enters the lock again. If no other thread left the lock
|
||||
meanwhile, it would think it's the last one again and read the next
|
||||
block...
|
||||
|
||||
The share has copies of 'error', 'buffer', 'read_end', and
|
||||
'pos_in_file' from the thread that filled the buffer. We may not be
|
||||
able to access this information directly from its cache because the
|
||||
thread may be removed from the share before the variables could be
|
||||
copied by all other threads. Or, if a write buffer is synchronized,
|
||||
it would change its 'pos_in_file' after waking the other threads,
|
||||
possibly before they could copy its value.
|
||||
|
||||
However, the 'buffer' variable in the share is for a synchronized
|
||||
write cache. It needs to know where to put the data. Otherwise it
|
||||
would need access to the read cache of one of the threads that is
|
||||
not yet removed from the share.
|
||||
|
||||
RETURN
|
||||
void
|
||||
*/
|
||||
|
||||
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
|
||||
IO_CACHE *write_cache, uint num_threads)
|
||||
{
|
||||
DBUG_ASSERT(info->type == READ_CACHE);
|
||||
pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init (&s->cond, 0);
|
||||
s->total=s->count=num_threads-1;
|
||||
s->active=0;
|
||||
info->share=s;
|
||||
info->read_function=_my_b_read_r;
|
||||
info->current_pos= info->current_end= 0;
|
||||
DBUG_ENTER("init_io_cache_share");
|
||||
DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
|
||||
"write_cache: 0x%lx threads: %u",
|
||||
read_cache, cshare, write_cache, num_threads));
|
||||
|
||||
DBUG_ASSERT(num_threads > 1);
|
||||
DBUG_ASSERT(read_cache->type == READ_CACHE);
|
||||
DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
|
||||
|
||||
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init(&cshare->cond, 0);
|
||||
pthread_cond_init(&cshare->cond_writer, 0);
|
||||
|
||||
cshare->running_threads= num_threads;
|
||||
cshare->total_threads= num_threads;
|
||||
cshare->error= 0; /* Initialize. */
|
||||
cshare->buffer= read_cache->buffer;
|
||||
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
|
||||
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
|
||||
cshare->source_cache= write_cache; /* Can be NULL. */
|
||||
|
||||
read_cache->share= cshare;
|
||||
read_cache->read_function= _my_b_read_r;
|
||||
read_cache->current_pos= NULL;
|
||||
read_cache->current_end= NULL;
|
||||
|
||||
if (write_cache)
|
||||
write_cache->share= cshare;
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Remove a thread from shared access to IO_CACHE
|
||||
Every thread should do that on exit for not
|
||||
to deadlock other threads
|
||||
Remove a thread from shared access to IO_CACHE.
|
||||
|
||||
SYNOPSIS
|
||||
remove_io_thread()
|
||||
cache The IO_CACHE to be removed from the share.
|
||||
|
||||
NOTE
|
||||
|
||||
Every thread must do that on exit for not to deadlock other threads.
|
||||
|
||||
The last thread destroys the pthread resources.
|
||||
|
||||
A writer flushes its cache first.
|
||||
|
||||
RETURN
|
||||
void
|
||||
*/
|
||||
void remove_io_thread(IO_CACHE *info)
|
||||
|
||||
void remove_io_thread(IO_CACHE *cache)
|
||||
{
|
||||
IO_CACHE_SHARE *s=info->share;
|
||||
IO_CACHE_SHARE *cshare= cache->share;
|
||||
uint total;
|
||||
DBUG_ENTER("remove_io_thread");
|
||||
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
s->total--;
|
||||
if (! s->count--)
|
||||
pthread_cond_signal(&s->cond);
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
}
|
||||
/* If the writer goes, it needs to flush the write cache. */
|
||||
if (cache == cshare->source_cache)
|
||||
flush_io_cache(cache);
|
||||
|
||||
static int lock_io_cache(IO_CACHE *info, my_off_t pos)
|
||||
{
|
||||
int total;
|
||||
IO_CACHE_SHARE *s=info->share;
|
||||
pthread_mutex_lock(&cshare->mutex);
|
||||
DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
|
||||
(cache == cshare->source_cache) ?
|
||||
"writer" : "reader", cache));
|
||||
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
if (!s->count)
|
||||
/* Remove from share. */
|
||||
total= --cshare->total_threads;
|
||||
DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
|
||||
|
||||
/* Detach from share. */
|
||||
cache->share= NULL;
|
||||
|
||||
/* If the writer goes, let the readers know. */
|
||||
if (cache == cshare->source_cache)
|
||||
{
|
||||
s->count=s->total;
|
||||
return 1;
|
||||
DBUG_PRINT("io_cache_share", ("writer leaves"));
|
||||
cshare->source_cache= NULL;
|
||||
}
|
||||
|
||||
total=s->total;
|
||||
s->count--;
|
||||
while (!s->active || s->active->pos_in_file < pos)
|
||||
pthread_cond_wait(&s->cond, &s->mutex);
|
||||
/* If all threads are waiting for me to join the lock, wake them. */
|
||||
if (!--cshare->running_threads)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
|
||||
pthread_cond_signal(&cshare->cond_writer);
|
||||
pthread_cond_broadcast(&cshare->cond);
|
||||
}
|
||||
|
||||
if (s->total < total &&
|
||||
(!s->active || s->active->pos_in_file < pos))
|
||||
return 1;
|
||||
pthread_mutex_unlock(&cshare->mutex);
|
||||
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
return 0;
|
||||
if (!total)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
|
||||
pthread_cond_destroy (&cshare->cond_writer);
|
||||
pthread_cond_destroy (&cshare->cond);
|
||||
pthread_mutex_destroy(&cshare->mutex);
|
||||
}
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static void unlock_io_cache(IO_CACHE *info)
|
||||
|
||||
/*
|
||||
Lock IO cache and wait for all other threads to join.
|
||||
|
||||
SYNOPSIS
|
||||
lock_io_cache()
|
||||
cache The cache of the thread entering the lock.
|
||||
pos File position of the block to read.
|
||||
Unused for the write thread.
|
||||
|
||||
DESCRIPTION
|
||||
|
||||
Wait for all threads to finish with the current buffer. We want
|
||||
all threads to proceed in concert. The last thread to join
|
||||
lock_io_cache() will read the block from file and all threads start
|
||||
to use it. Then they will join again for reading the next block.
|
||||
|
||||
The waiting threads detect a fresh buffer by comparing
|
||||
cshare->pos_in_file with the position they want to process next.
|
||||
Since the first block may start at position 0, we take
|
||||
cshare->read_end as an additional condition. This variable is
|
||||
initialized to NULL and will be set after a block of data is written
|
||||
to the buffer.
|
||||
|
||||
RETURN
|
||||
1 OK, lock in place, go ahead and read.
|
||||
0 OK, unlocked, another thread did the read.
|
||||
*/
|
||||
|
||||
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
|
||||
{
|
||||
pthread_cond_broadcast(&info->share->cond);
|
||||
pthread_mutex_unlock(&info->share->mutex);
|
||||
IO_CACHE_SHARE *cshare= cache->share;
|
||||
DBUG_ENTER("lock_io_cache");
|
||||
|
||||
/* Enter the lock. */
|
||||
pthread_mutex_lock(&cshare->mutex);
|
||||
cshare->running_threads--;
|
||||
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
|
||||
(cache == cshare->source_cache) ?
|
||||
"writer" : "reader", cache, (ulong) pos,
|
||||
cshare->running_threads));
|
||||
|
||||
if (cshare->source_cache)
|
||||
{
|
||||
/* A write cache is synchronized to the read caches. */
|
||||
|
||||
if (cache == cshare->source_cache)
|
||||
{
|
||||
/* The writer waits until all readers are here. */
|
||||
while (cshare->running_threads)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("writer waits in lock"));
|
||||
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
|
||||
}
|
||||
DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
|
||||
|
||||
/* Stay locked. Leave the lock later by unlock_io_cache(). */
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
/* The last thread wakes the writer. */
|
||||
if (!cshare->running_threads)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("waking writer"));
|
||||
pthread_cond_signal(&cshare->cond_writer);
|
||||
}
|
||||
|
||||
/*
|
||||
Readers wait until the data is copied from the writer. Another
|
||||
reason to stop waiting is the removal of the write thread. If this
|
||||
happens, we leave the lock with old data in the buffer.
|
||||
*/
|
||||
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
|
||||
cshare->source_cache)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
|
||||
pthread_cond_wait(&cshare->cond, &cshare->mutex);
|
||||
}
|
||||
|
||||
/*
|
||||
If the writer was removed from the share while this thread was
|
||||
asleep, we need to simulate an EOF condition. The writer cannot
|
||||
reset the share variables as they might still be in use by readers
|
||||
of the last block. When we awake here then because the last
|
||||
joining thread signalled us. If the writer is not the last, it
|
||||
will not signal. So it is safe to clear the buffer here.
|
||||
*/
|
||||
if (!cshare->read_end || (cshare->pos_in_file < pos))
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
|
||||
cshare->read_end= cshare->buffer; /* Empty buffer. */
|
||||
cshare->error= 0; /* EOF is not an error. */
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
There are read caches only. The last thread arriving in
|
||||
lock_io_cache() continues with a locked cache and reads the block.
|
||||
*/
|
||||
if (!cshare->running_threads)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
|
||||
/* Stay locked. Leave the lock later by unlock_io_cache(). */
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
/*
|
||||
All other threads wait until the requested block is read by the
|
||||
last thread arriving. Another reason to stop waiting is the
|
||||
removal of a thread. If this leads to all threads being in the
|
||||
lock, we have to continue also. The first of the awaken threads
|
||||
will then do the read.
|
||||
*/
|
||||
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
|
||||
cshare->running_threads)
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
|
||||
pthread_cond_wait(&cshare->cond, &cshare->mutex);
|
||||
}
|
||||
|
||||
/* If the block is not yet read, continue with a locked cache and read. */
|
||||
if (!cshare->read_end || (cshare->pos_in_file < pos))
|
||||
{
|
||||
DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
|
||||
/* Stay locked. Leave the lock later by unlock_io_cache(). */
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
/* Another thread did read the block already. */
|
||||
}
|
||||
DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
|
||||
cshare->read_end ? (uint)
|
||||
(cshare->read_end - cshare->buffer) : 0));
|
||||
|
||||
/*
|
||||
Leave the lock. Do not call unlock_io_cache() later. The thread that
|
||||
filled the buffer did this and marked all threads as running.
|
||||
*/
|
||||
pthread_mutex_unlock(&cshare->mutex);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Unlock IO cache.
|
||||
|
||||
SYNOPSIS
|
||||
unlock_io_cache()
|
||||
cache The cache of the thread leaving the lock.
|
||||
|
||||
NOTE
|
||||
This is called by the thread that filled the buffer. It marks all
|
||||
threads as running and awakes them. This must not be done by any
|
||||
other thread.
|
||||
|
||||
Do not signal cond_writer. Either there is no writer or the writer
|
||||
is the only one who can call this function.
|
||||
|
||||
The reason for resetting running_threads to total_threads before
|
||||
waking all other threads is that it could be possible that this
|
||||
thread is so fast with processing the buffer that it enters the lock
|
||||
before even one other thread has left it. If every awoken thread
|
||||
would increase running_threads by one, this thread could think that
|
||||
he is again the last to join and would not wait for the other
|
||||
threads to process the data.
|
||||
|
||||
RETURN
|
||||
void
|
||||
*/
|
||||
|
||||
static void unlock_io_cache(IO_CACHE *cache)
|
||||
{
|
||||
IO_CACHE_SHARE *cshare= cache->share;
|
||||
DBUG_ENTER("unlock_io_cache");
|
||||
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
|
||||
(cache == cshare->source_cache) ?
|
||||
"writer" : "reader",
|
||||
cache, (ulong) cshare->pos_in_file,
|
||||
cshare->total_threads));
|
||||
|
||||
cshare->running_threads= cshare->total_threads;
|
||||
pthread_cond_broadcast(&cshare->cond);
|
||||
pthread_mutex_unlock(&cshare->mutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -569,7 +869,7 @@ static void unlock_io_cache(IO_CACHE *info)
|
||||
|
||||
SYNOPSIS
|
||||
_my_b_read_r()
|
||||
info IO_CACHE pointer
|
||||
cache IO_CACHE pointer
|
||||
Buffer Buffer to retrieve count bytes from file
|
||||
Count Number of bytes to read into Buffer
|
||||
|
||||
@@ -581,7 +881,7 @@ static void unlock_io_cache(IO_CACHE *info)
|
||||
|
||||
It works as follows: when a thread tries to read from a file (that
|
||||
is, after using all the data from the (shared) buffer), it just
|
||||
hangs on lock_io_cache(), wating for other threads. When the very
|
||||
hangs on lock_io_cache(), waiting for other threads. When the very
|
||||
last thread attempts a read, lock_io_cache() returns 1, the thread
|
||||
does actual IO and unlock_io_cache(), which signals all the waiting
|
||||
threads that data is in the buffer.
|
||||
@@ -601,16 +901,17 @@ static void unlock_io_cache(IO_CACHE *info)
|
||||
1 Error: can't read requested characters
|
||||
*/
|
||||
|
||||
int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
|
||||
int _my_b_read_r(register IO_CACHE *cache, byte *Buffer, uint Count)
|
||||
{
|
||||
my_off_t pos_in_file;
|
||||
uint length, diff_length, left_length;
|
||||
IO_CACHE_SHARE *cshare= cache->share;
|
||||
DBUG_ENTER("_my_b_read_r");
|
||||
|
||||
if ((left_length= (uint) (info->read_end - info->read_pos)))
|
||||
if ((left_length= (uint) (cache->read_end - cache->read_pos)))
|
||||
{
|
||||
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
|
||||
memcpy(Buffer, info->read_pos, (size_t) (left_length));
|
||||
memcpy(Buffer, cache->read_pos, (size_t) (left_length));
|
||||
Buffer+= left_length;
|
||||
Count-= left_length;
|
||||
}
|
||||
@@ -618,55 +919,133 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
|
||||
{
|
||||
int cnt, len;
|
||||
|
||||
pos_in_file= info->pos_in_file + (info->read_end - info->buffer);
|
||||
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
|
||||
diff_length= (uint) (pos_in_file & (IO_SIZE-1));
|
||||
length=IO_ROUND_UP(Count+diff_length)-diff_length;
|
||||
length=(length <= info->read_length) ?
|
||||
length + IO_ROUND_DN(info->read_length - length) :
|
||||
length - IO_ROUND_UP(length - info->read_length) ;
|
||||
if (info->type != READ_FIFO &&
|
||||
(length > (info->end_of_file - pos_in_file)))
|
||||
length= (uint) (info->end_of_file - pos_in_file);
|
||||
length= ((length <= cache->read_length) ?
|
||||
length + IO_ROUND_DN(cache->read_length - length) :
|
||||
length - IO_ROUND_UP(length - cache->read_length));
|
||||
if (cache->type != READ_FIFO &&
|
||||
(length > (cache->end_of_file - pos_in_file)))
|
||||
length= (uint) (cache->end_of_file - pos_in_file);
|
||||
if (length == 0)
|
||||
{
|
||||
info->error= (int) left_length;
|
||||
cache->error= (int) left_length;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
if (lock_io_cache(info, pos_in_file))
|
||||
if (lock_io_cache(cache, pos_in_file))
|
||||
{
|
||||
info->share->active=info;
|
||||
if (info->seek_not_done) /* File touched, do seek */
|
||||
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
|
||||
len=(int)my_read(info->file,info->buffer, length, info->myflags);
|
||||
info->read_end=info->buffer + (len == -1 ? 0 : len);
|
||||
info->error=(len == (int)length ? 0 : len);
|
||||
info->pos_in_file=pos_in_file;
|
||||
unlock_io_cache(info);
|
||||
/* With a synchronized write/read cache we won't come here... */
|
||||
DBUG_ASSERT(!cshare->source_cache);
|
||||
/*
|
||||
... unless the writer has gone before this thread entered the
|
||||
lock. Simulate EOF in this case. It can be distinguished by
|
||||
cache->file.
|
||||
*/
|
||||
if (cache->file < 0)
|
||||
len= 0;
|
||||
else
|
||||
{
|
||||
if (cache->seek_not_done) /* File touched, do seek */
|
||||
VOID(my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0)));
|
||||
len= (int) my_read(cache->file, cache->buffer, length, cache->myflags);
|
||||
}
|
||||
DBUG_PRINT("io_cache_share", ("read %d bytes", len));
|
||||
|
||||
cache->read_end= cache->buffer + (len == -1 ? 0 : len);
|
||||
cache->error= (len == (int)length ? 0 : len);
|
||||
cache->pos_in_file= pos_in_file;
|
||||
|
||||
/* Copy important values to the share. */
|
||||
cshare->error= cache->error;
|
||||
cshare->read_end= cache->read_end;
|
||||
cshare->pos_in_file= pos_in_file;
|
||||
|
||||
/* Mark all threads as running and wake them. */
|
||||
unlock_io_cache(cache);
|
||||
}
|
||||
else
|
||||
{
|
||||
info->error= info->share->active->error;
|
||||
info->read_end= info->share->active->read_end;
|
||||
info->pos_in_file= info->share->active->pos_in_file;
|
||||
len= (int) (info->error == -1 ? -1 : info->read_end-info->buffer);
|
||||
/*
|
||||
With a synchronized write/read cache readers always come here.
|
||||
Copy important values from the share.
|
||||
*/
|
||||
cache->error= cshare->error;
|
||||
cache->read_end= cshare->read_end;
|
||||
cache->pos_in_file= cshare->pos_in_file;
|
||||
|
||||
len= (int) ((cache->error == -1) ? -1 : cache->read_end - cache->buffer);
|
||||
}
|
||||
info->read_pos=info->buffer;
|
||||
info->seek_not_done=0;
|
||||
cache->read_pos= cache->buffer;
|
||||
cache->seek_not_done= 0;
|
||||
if (len <= 0)
|
||||
{
|
||||
info->error= (int) left_length;
|
||||
DBUG_PRINT("io_cache_share", ("reader error. len %d left %u",
|
||||
len, left_length));
|
||||
cache->error= (int) left_length;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
cnt= ((uint) len > Count) ? (int) Count : len;
|
||||
memcpy(Buffer, info->read_pos, (size_t) cnt);
|
||||
memcpy(Buffer, cache->read_pos, (size_t) cnt);
|
||||
Count -= cnt;
|
||||
Buffer+= cnt;
|
||||
left_length+= cnt;
|
||||
info->read_pos+= cnt;
|
||||
cache->read_pos+= cnt;
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
Copy data from write cache to read cache.
|
||||
|
||||
SYNOPSIS
|
||||
copy_to_read_buffer()
|
||||
write_cache The write cache.
|
||||
write_buffer The source of data, mostly the cache buffer.
|
||||
write_length The number of bytes to copy.
|
||||
|
||||
NOTE
|
||||
The write thread will wait for all read threads to join the cache
|
||||
lock. Then it copies the data over and wakes the read threads.
|
||||
|
||||
RETURN
|
||||
void
|
||||
*/
|
||||
|
||||
static void copy_to_read_buffer(IO_CACHE *write_cache,
|
||||
const byte *write_buffer, uint write_length)
|
||||
{
|
||||
IO_CACHE_SHARE *cshare= write_cache->share;
|
||||
|
||||
DBUG_ASSERT(cshare->source_cache == write_cache);
|
||||
/*
|
||||
write_length is usually less or equal to buffer_length.
|
||||
It can be bigger if _my_b_write() is called with a big length.
|
||||
*/
|
||||
while (write_length)
|
||||
{
|
||||
uint copy_length= min(write_length, write_cache->buffer_length);
|
||||
int __attribute__((unused)) rc;
|
||||
|
||||
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
|
||||
/* The writing thread does always have the lock when it awakes. */
|
||||
DBUG_ASSERT(rc);
|
||||
|
||||
memcpy(cshare->buffer, write_buffer, copy_length);
|
||||
|
||||
cshare->error= 0;
|
||||
cshare->read_end= cshare->buffer + copy_length;
|
||||
cshare->pos_in_file= write_cache->pos_in_file;
|
||||
|
||||
/* Mark all threads as running and wake them. */
|
||||
unlock_io_cache(write_cache);
|
||||
|
||||
write_buffer+= copy_length;
|
||||
write_length-= copy_length;
|
||||
}
|
||||
}
|
||||
#endif /*THREAD*/
|
||||
|
||||
|
||||
/*
|
||||
@@ -1018,6 +1397,7 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
|
||||
Buffer+=rest_length;
|
||||
Count-=rest_length;
|
||||
info->write_pos+=rest_length;
|
||||
|
||||
if (my_b_flush_io_cache(info,1))
|
||||
return 1;
|
||||
if (Count >= IO_SIZE)
|
||||
@@ -1030,6 +1410,23 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
|
||||
}
|
||||
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
|
||||
return info->error= -1;
|
||||
|
||||
#ifdef THREAD
|
||||
/*
|
||||
In case of a shared I/O cache with a writer we normally do direct
|
||||
write cache to read cache copy. Simulate this here by direct
|
||||
caller buffer to read cache copy. Do it after the write so that
|
||||
the cache readers actions on the flushed part can go in parallel
|
||||
with the write of the extra stuff. copy_to_read_buffer()
|
||||
synchronizes writer and readers so that after this call the
|
||||
readers can act on the extra stuff while the writer can go ahead
|
||||
and prepare the next output. copy_to_read_buffer() relies on
|
||||
info->pos_in_file.
|
||||
*/
|
||||
if (info->share)
|
||||
copy_to_read_buffer(info, Buffer, length);
|
||||
#endif
|
||||
|
||||
Count-=length;
|
||||
Buffer+=length;
|
||||
info->pos_in_file+=length;
|
||||
@@ -1050,6 +1447,14 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
|
||||
{
|
||||
uint rest_length,length;
|
||||
|
||||
#ifdef THREAD
|
||||
/*
|
||||
Assert that we cannot come here with a shared cache. If we do one
|
||||
day, we might need to add a call to copy_to_read_buffer().
|
||||
*/
|
||||
DBUG_ASSERT(!info->share);
|
||||
#endif
|
||||
|
||||
lock_append_buffer(info);
|
||||
rest_length=(uint) (info->write_end - info->write_pos);
|
||||
if (Count <= rest_length)
|
||||
@@ -1110,6 +1515,14 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
|
||||
uint length;
|
||||
int error=0;
|
||||
|
||||
#ifdef THREAD
|
||||
/*
|
||||
Assert that we cannot come here with a shared cache. If we do one
|
||||
day, we might need to add a call to copy_to_read_buffer().
|
||||
*/
|
||||
DBUG_ASSERT(!info->share);
|
||||
#endif
|
||||
|
||||
if (pos < info->pos_in_file)
|
||||
{
|
||||
/* Of no overlap, write everything without buffering */
|
||||
@@ -1186,6 +1599,17 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
|
||||
|
||||
if ((length=(uint) (info->write_pos - info->write_buffer)))
|
||||
{
|
||||
#ifdef THREAD
|
||||
/*
|
||||
In case of a shared I/O cache with a writer we do direct write
|
||||
cache to read cache copy. Do it before the write here so that
|
||||
the readers can work in parallel with the write.
|
||||
copy_to_read_buffer() relies on info->pos_in_file.
|
||||
*/
|
||||
if (info->share)
|
||||
copy_to_read_buffer(info, info->write_buffer, length);
|
||||
#endif
|
||||
|
||||
pos_in_file=info->pos_in_file;
|
||||
/*
|
||||
If we have append cache, we always open the file with
|
||||
@@ -1265,16 +1689,10 @@ int end_io_cache(IO_CACHE *info)
|
||||
|
||||
#ifdef THREAD
|
||||
/*
|
||||
if IO_CACHE is shared between several threads, only one
|
||||
thread needs to call end_io_cache() - just as init_io_cache()
|
||||
should be called only once and then memcopy'ed
|
||||
Every thread must call remove_io_thread(). The last one destroys
|
||||
the share elements.
|
||||
*/
|
||||
if (info->share)
|
||||
{
|
||||
pthread_cond_destroy(&info->share->cond);
|
||||
pthread_mutex_destroy(&info->share->mutex);
|
||||
info->share=0;
|
||||
}
|
||||
DBUG_ASSERT(!info->share || !info->share->total_threads);
|
||||
#endif
|
||||
|
||||
if ((pre_close=info->pre_close))
|
||||
|
||||
Reference in New Issue
Block a user