mirror of
https://github.com/facebook/zstd.git
synced 2025-08-08 17:22:10 +03:00
AsyncIO performance regression for small files fix (#3474)
- Do not use threaded AsyncIO when handling small files. - Some typo / doc fixes
This commit is contained in:
@@ -140,7 +140,7 @@ int AIO_supported(void) {
|
||||
}
|
||||
|
||||
/* ***********************************
|
||||
* General IoPool implementation
|
||||
* Generic IoPool implementation
|
||||
*************************************/
|
||||
|
||||
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
|
||||
@@ -163,20 +163,22 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
|
||||
* Displays warning if asyncio is requested but MT isn't available. */
|
||||
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
|
||||
ctx->threadPool = NULL;
|
||||
ctx->threadPoolActive = 0;
|
||||
if(prefs->asyncIO) {
|
||||
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
|
||||
EXM_THROW(102,"Failed creating write availableJobs mutex");
|
||||
EXM_THROW(102,"Failed creating ioJobsMutex mutex");
|
||||
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
|
||||
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
|
||||
assert(MAX_IO_JOBS >= 2);
|
||||
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
|
||||
ctx->threadPoolActive = 1;
|
||||
if (!ctx->threadPool)
|
||||
EXM_THROW(104, "Failed creating writer thread pool");
|
||||
EXM_THROW(104, "Failed creating I/O thread pool");
|
||||
}
|
||||
}
|
||||
|
||||
/* AIO_IOPool_init:
|
||||
* Allocates and sets and a new write pool including its included availableJobs. */
|
||||
* Allocates and sets and a new I/O thread pool including its included availableJobs. */
|
||||
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
|
||||
int i;
|
||||
AIO_IOPool_createThreadPool(ctx, prefs);
|
||||
@@ -192,27 +194,59 @@ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_fun
|
||||
}
|
||||
|
||||
|
||||
/* AIO_IOPool_threadPoolActive:
|
||||
* Check if current operation uses thread pool.
|
||||
* Note that in some cases we have a thread pool initialized but choose not to use it. */
|
||||
static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
|
||||
return ctx->threadPool && ctx->threadPoolActive;
|
||||
}
|
||||
|
||||
|
||||
/* AIO_IOPool_lockJobsMutex:
|
||||
* Locks the IO jobs mutex if threading is active */
|
||||
static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
|
||||
if(AIO_IOPool_threadPoolActive(ctx))
|
||||
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
||||
}
|
||||
|
||||
/* AIO_IOPool_unlockJobsMutex:
|
||||
* Unlocks the IO jobs mutex if threading is active */
|
||||
static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
|
||||
if(AIO_IOPool_threadPoolActive(ctx))
|
||||
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
||||
}
|
||||
|
||||
/* AIO_IOPool_releaseIoJob:
|
||||
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
||||
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
|
||||
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
|
||||
if(ctx->threadPool)
|
||||
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
||||
AIO_IOPool_lockJobsMutex(ctx);
|
||||
assert(ctx->availableJobsCount < ctx->totalIoJobs);
|
||||
ctx->availableJobs[ctx->availableJobsCount++] = job;
|
||||
if(ctx->threadPool)
|
||||
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
||||
AIO_IOPool_unlockJobsMutex(ctx);
|
||||
}
|
||||
|
||||
/* AIO_IOPool_join:
|
||||
* Waits for all tasks in the pool to finish executing. */
|
||||
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
|
||||
if(ctx->threadPool)
|
||||
if(AIO_IOPool_threadPoolActive(ctx))
|
||||
POOL_joinJobs(ctx->threadPool);
|
||||
}
|
||||
|
||||
/* AIO_IOPool_setThreaded:
|
||||
* Allows (de)activating threaded mode, to be used when the expected overhead
|
||||
* of threading costs more than the expected gains. */
|
||||
static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
|
||||
assert(threaded == 0 || threaded == 1);
|
||||
assert(ctx != NULL);
|
||||
if(ctx->threadPoolActive != threaded) {
|
||||
AIO_IOPool_join(ctx);
|
||||
ctx->threadPoolActive = threaded;
|
||||
}
|
||||
}
|
||||
|
||||
/* AIO_IOPool_free:
|
||||
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
|
||||
* Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
|
||||
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
|
||||
int i;
|
||||
if(ctx->threadPool) {
|
||||
@@ -236,12 +270,10 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
|
||||
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
|
||||
IOJob_t *job;
|
||||
assert(ctx->file != NULL || ctx->prefs->testMode);
|
||||
if(ctx->threadPool)
|
||||
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
||||
AIO_IOPool_lockJobsMutex(ctx);
|
||||
assert(ctx->availableJobsCount > 0);
|
||||
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
|
||||
if(ctx->threadPool)
|
||||
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
||||
AIO_IOPool_unlockJobsMutex(ctx);
|
||||
job->usedBufferSize = 0;
|
||||
job->file = ctx->file;
|
||||
job->offset = 0;
|
||||
@@ -251,8 +283,7 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
|
||||
|
||||
/* AIO_IOPool_setFile:
|
||||
* Sets the destination file for future files in the pool.
|
||||
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
|
||||
* Also requires ending of sparse write if a previous file was used in sparse mode. */
|
||||
* Requires completion of all queued jobs and release of all otherwise acquired jobs. */
|
||||
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
|
||||
assert(ctx!=NULL);
|
||||
AIO_IOPool_join(ctx);
|
||||
@@ -269,7 +300,7 @@ static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
|
||||
* The queued job shouldn't be used directly after queueing it. */
|
||||
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
|
||||
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
|
||||
if(ctx->threadPool)
|
||||
if(AIO_IOPool_threadPoolActive(ctx))
|
||||
POOL_add(ctx->threadPool, ctx->poolFunction, job);
|
||||
else
|
||||
ctx->poolFunction(job);
|
||||
@@ -300,8 +331,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
|
||||
* Blocks on completion of all current write jobs before executing. */
|
||||
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
|
||||
assert(ctx != NULL);
|
||||
if(ctx->base.threadPool)
|
||||
POOL_joinJobs(ctx->base.threadPool);
|
||||
AIO_IOPool_join(&ctx->base);
|
||||
AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
|
||||
ctx->storedSkips = 0;
|
||||
}
|
||||
@@ -368,6 +398,13 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
|
||||
free(ctx);
|
||||
}
|
||||
|
||||
/* AIO_WritePool_setAsync:
|
||||
* Allows (de)activating async mode, to be used when the expected overhead
|
||||
* of asyncio costs more than the expected gains. */
|
||||
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
|
||||
AIO_IOPool_setThreaded(&ctx->base, async);
|
||||
}
|
||||
|
||||
|
||||
/* ***********************************
|
||||
* ReadPool implementation
|
||||
@@ -383,14 +420,13 @@ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
|
||||
|
||||
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
|
||||
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
|
||||
if(ctx->base.threadPool)
|
||||
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
|
||||
AIO_IOPool_lockJobsMutex(&ctx->base);
|
||||
assert(ctx->completedJobsCount < MAX_IO_JOBS);
|
||||
ctx->completedJobs[ctx->completedJobsCount++] = job;
|
||||
if(ctx->base.threadPool) {
|
||||
if(AIO_IOPool_threadPoolActive(&ctx->base)) {
|
||||
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
|
||||
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
|
||||
}
|
||||
AIO_IOPool_unlockJobsMutex(&ctx->base);
|
||||
}
|
||||
|
||||
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
|
||||
@@ -426,8 +462,7 @@ static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
|
||||
* Would block. */
|
||||
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
|
||||
IOJob_t *job = NULL;
|
||||
if (ctx->base.threadPool)
|
||||
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
|
||||
AIO_IOPool_lockJobsMutex(&ctx->base);
|
||||
|
||||
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
|
||||
|
||||
@@ -443,8 +478,7 @@ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
|
||||
ctx->waitingOnOffset += job->usedBufferSize;
|
||||
}
|
||||
|
||||
if (ctx->base.threadPool)
|
||||
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
|
||||
AIO_IOPool_unlockJobsMutex(&ctx->base);
|
||||
return job;
|
||||
}
|
||||
|
||||
@@ -524,7 +558,7 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize)
|
||||
|
||||
if(ctx->base.threadPool)
|
||||
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
|
||||
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
|
||||
EXM_THROW(103,"Failed creating jobCompletedCond cond");
|
||||
|
||||
return ctx;
|
||||
}
|
||||
@@ -620,3 +654,10 @@ int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
|
||||
AIO_ReadPool_setFile(ctx, NULL);
|
||||
return fclose(file);
|
||||
}
|
||||
|
||||
/* AIO_ReadPool_setAsync:
|
||||
* Allows (de)activating async mode, to be used when the expected overhead
|
||||
* of asyncio costs more than the expected gains. */
|
||||
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
|
||||
AIO_IOPool_setThreaded(&ctx->base, async);
|
||||
}
|
||||
|
Reference in New Issue
Block a user