mirror of
https://github.com/MariaDB/server.git
synced 2025-08-09 22:24:09 +03:00
MDEV-27900: aio handle partial reads/writes
As btrfs showed, a partial read of data in AIO /O_DIRECT circumstances can really confuse MariaDB. Filipe Manana (SuSE)[1] showed how database programmers can assume O_DIRECT is all or nothing. While a fix was done in the kernel side, we can do better in our code by requesting that the rest of the block be read/written synchronously if we do only get a partial read/write. Per the APIs, a partial read/write can occur before an error, so reattempting the request will leave the caller with a concrete error to handle. [1] https://lore.kernel.org/linux-btrfs/CABVffENfbsC6HjGbskRZGR2NvxbnQi17gAuW65eOM+QRzsr8Bg@mail.gmail.com/T/#mb2738e675e48e0e0778a2e8d1537dec5ec0d3d3a Also spell synchronously correctly in other files.
This commit is contained in:
@@ -297,7 +297,7 @@ nothing_read:
|
|||||||
/* Trx sys header is so low in the latching order that we play
|
/* Trx sys header is so low in the latching order that we play
|
||||||
safe and do not leave the i/o-completion to an asynchronous
|
safe and do not leave the i/o-completion to an asynchronous
|
||||||
i/o-thread. Change buffer pages must always be read with
|
i/o-thread. Change buffer pages must always be read with
|
||||||
syncronous i/o, to make sure they do not get involved in
|
synchronous i/o, to make sure they do not get involved in
|
||||||
thread deadlocks. */
|
thread deadlocks. */
|
||||||
sync = true;
|
sync = true;
|
||||||
}
|
}
|
||||||
|
@@ -2807,7 +2807,7 @@ os_file_set_eof(
|
|||||||
|
|
||||||
#endif /* !_WIN32*/
|
#endif /* !_WIN32*/
|
||||||
|
|
||||||
/** Does a syncronous read or write depending upon the type specified
|
/** Does a synchronous read or write depending upon the type specified
|
||||||
In case of partial reads/writes the function tries
|
In case of partial reads/writes the function tries
|
||||||
NUM_RETRIES_ON_PARTIAL_IO times to read/write the complete data.
|
NUM_RETRIES_ON_PARTIAL_IO times to read/write the complete data.
|
||||||
@param[in] type, IO flags
|
@param[in] type, IO flags
|
||||||
|
@@ -131,6 +131,8 @@ class aio_linux final : public aio
|
|||||||
{
|
{
|
||||||
iocb->m_ret_len= event.res;
|
iocb->m_ret_len= event.res;
|
||||||
iocb->m_err= 0;
|
iocb->m_err= 0;
|
||||||
|
if (iocb->m_ret_len != iocb->m_len)
|
||||||
|
finish_synchronous(iocb);
|
||||||
}
|
}
|
||||||
iocb->m_internal_task.m_func= iocb->m_callback;
|
iocb->m_internal_task.m_func= iocb->m_callback;
|
||||||
iocb->m_internal_task.m_arg= iocb;
|
iocb->m_internal_task.m_arg= iocb;
|
||||||
|
@@ -136,32 +136,7 @@ public:
|
|||||||
static void simulated_aio_callback(void *param)
|
static void simulated_aio_callback(void *param)
|
||||||
{
|
{
|
||||||
aiocb *cb= (aiocb *) param;
|
aiocb *cb= (aiocb *) param;
|
||||||
#ifdef _WIN32
|
synchronous(cb);
|
||||||
size_t ret_len;
|
|
||||||
#else
|
|
||||||
ssize_t ret_len;
|
|
||||||
#endif
|
|
||||||
int err= 0;
|
|
||||||
switch (cb->m_opcode)
|
|
||||||
{
|
|
||||||
case aio_opcode::AIO_PREAD:
|
|
||||||
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
|
||||||
break;
|
|
||||||
case aio_opcode::AIO_PWRITE:
|
|
||||||
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
#ifdef _WIN32
|
|
||||||
if (static_cast<int>(ret_len) < 0)
|
|
||||||
err= GetLastError();
|
|
||||||
#else
|
|
||||||
if (ret_len < 0)
|
|
||||||
err= errno;
|
|
||||||
#endif
|
|
||||||
cb->m_ret_len = ret_len;
|
|
||||||
cb->m_err = err;
|
|
||||||
cb->m_internal_task.m_func= cb->m_callback;
|
cb->m_internal_task.m_func= cb->m_callback;
|
||||||
thread_pool *pool= (thread_pool *)cb->m_internal;
|
thread_pool *pool= (thread_pool *)cb->m_internal;
|
||||||
pool->submit_task(&cb->m_internal_task);
|
pool->submit_task(&cb->m_internal_task);
|
||||||
|
@@ -156,7 +156,7 @@ class aio
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
Submit asyncronous IO.
|
Submit asynchronous IO.
|
||||||
On completion, cb->m_callback is executed.
|
On completion, cb->m_callback is executed.
|
||||||
*/
|
*/
|
||||||
virtual int submit_io(aiocb *cb)= 0;
|
virtual int submit_io(aiocb *cb)= 0;
|
||||||
@@ -165,6 +165,10 @@ public:
|
|||||||
/** "Unind" file to AIO handler (used on Windows only) */
|
/** "Unind" file to AIO handler (used on Windows only) */
|
||||||
virtual int unbind(const native_file_handle &fd)= 0;
|
virtual int unbind(const native_file_handle &fd)= 0;
|
||||||
virtual ~aio(){};
|
virtual ~aio(){};
|
||||||
|
protected:
|
||||||
|
static void synchronous(aiocb *cb);
|
||||||
|
/** finish a partial read/write callback synchronously */
|
||||||
|
static void finish_synchronous(aiocb *cb);
|
||||||
};
|
};
|
||||||
|
|
||||||
class timer
|
class timer
|
||||||
|
@@ -47,6 +47,58 @@ namespace tpool
|
|||||||
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
|
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
|
||||||
static const int OVERSUBSCRIBE_FACTOR = 2;
|
static const int OVERSUBSCRIBE_FACTOR = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
Process the cb synchronously
|
||||||
|
*/
|
||||||
|
void aio::synchronous(aiocb *cb)
|
||||||
|
{
|
||||||
|
#ifdef _WIN32
|
||||||
|
size_t ret_len;
|
||||||
|
#else
|
||||||
|
ssize_t ret_len;
|
||||||
|
#endif
|
||||||
|
int err= 0;
|
||||||
|
switch (cb->m_opcode)
|
||||||
|
{
|
||||||
|
case aio_opcode::AIO_PREAD:
|
||||||
|
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
||||||
|
break;
|
||||||
|
case aio_opcode::AIO_PWRITE:
|
||||||
|
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#ifdef _WIN32
|
||||||
|
if (static_cast<int>(ret_len) < 0)
|
||||||
|
err= GetLastError();
|
||||||
|
#else
|
||||||
|
if (ret_len < 0)
|
||||||
|
{
|
||||||
|
err= errno;
|
||||||
|
ret_len= 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
cb->m_ret_len = ret_len;
|
||||||
|
cb->m_err = err;
|
||||||
|
if (!err && cb->m_ret_len != cb->m_len)
|
||||||
|
finish_synchronous(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
A partial read/write has occured, continue synchronously.
|
||||||
|
*/
|
||||||
|
void aio::finish_synchronous(aiocb *cb)
|
||||||
|
{
|
||||||
|
assert(cb->m_ret_len != (unsigned int) cb->m_len && !cb->m_err);
|
||||||
|
/* partial read/write */
|
||||||
|
cb->m_buffer= (char *) cb->m_buffer + cb->m_ret_len;
|
||||||
|
cb->m_len-= (unsigned int) cb->m_ret_len;
|
||||||
|
cb->m_offset+= cb->m_ret_len;
|
||||||
|
synchronous(cb);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Implementation of generic threadpool.
|
Implementation of generic threadpool.
|
||||||
This threadpool consists of the following components
|
This threadpool consists of the following components
|
||||||
|
Reference in New Issue
Block a user