1
0
mirror of https://github.com/facebook/zstd.git synced 2025-08-08 17:22:10 +03:00

AsyncIO compression part 2 - added async read and asyncio to compression code (#3022)

* Compression asyncio:
- Added asyncio functionality for compression flow
- Added ReadPool for async reads, implemented in both comp and decomp flows
This commit is contained in:
Yonatan Komornik
2022-01-31 15:43:41 -08:00
committed by GitHub
parent 0b70da6277
commit cc0657f27d
8 changed files with 590 additions and 301 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
@@ -29,7 +29,8 @@
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
static unsigned
AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
@@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file,
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write decoded block : %s",
EXM_THROW(70, "Write error : cannot write block : %s",
strerror(errno));
return 0;
}
@@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file,
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write decoded block : %s",
EXM_THROW(93, "Write error : cannot write block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
@@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file,
return storedSkips;
}
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
static void
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) {
@@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st
* AsyncIO functionality
************************************************************************/
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void) {
#ifdef ZSTD_MULTITHREAD
return 1;
#else
return 0;
#endif
}
/* ***********************************
* General IoPool implementation
*************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
void *buffer;
IOJob_t *job;
job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
void* const buffer = malloc(bufferSize);
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = bufferSize;
job->usedBufferSize = 0;
@@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
/* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
EXM_THROW(102,"Failed creating write availableJobs mutex");
EXM_THROW(102,"Failed creating write availableJobs mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
if (!ctx->threadPool)
EXM_THROW(104, "Failed creating writer thread pool");
EXM_THROW(104, "Failed creating writer thread pool");
}
}
/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->poolFunction = poolFunction;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
ctx->availableJobsCount = ctx->totalIoJobs;
for(i=0; i < ctx->availableJobsCount; i++) {
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
}
ctx->jobBufferSize = bufferSize;
ctx->file = NULL;
}
/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool) {
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount < MAX_IO_JOBS);
ctx->availableJobs[ctx->availableJobsCount++] = job;
assert(ctx->availableJobsCount < ctx->totalIoJobs);
ctx->availableJobs[ctx->availableJobsCount++] = job;
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 0);
ctx->availableJobsCount++;
}
}
/* AIO_IOPool_join:
@@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
/* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool) {
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 1);
ctx->availableJobsCount--;
job = (IOJob_t*)ctx->availableJobs[0];
}
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
@@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file;
}
static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
return ctx->file;
}
/* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t *job) {
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
@@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
return AIO_IOPool_acquireJob(&ctx->base);
}
@@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
@@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0);
}
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job) {
void AIO_WritePool_releaseIoJob(IOJob_t* job) {
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
FILE *dstFile = ctx->base.file;
int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
FILE* const dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL);
@@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
/* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
IOJob_t* const job = (IOJob_t*) opaque;
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0;
@@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
assert(ctx->storedSkips==0);
free(ctx);
}
/* ***********************************
* ReadPool implementation
*************************************/
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
int i;
for(i=0; i<ctx->completedJobsCount; i++) {
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
AIO_IOPool_releaseIoJob(job);
}
ctx->completedJobsCount = 0;
}
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
if(ctx->base.threadPool) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
}
}
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
* if job wasn't found returns NULL.
* IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
int i;
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
for (i=0; i<ctx->completedJobsCount; i++) {
job = (IOJob_t *) ctx->completedJobs[i];
if (job->offset == ctx->waitingOnOffset) {
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
return job;
}
}
return NULL;
}
/* AIO_ReadPool_numReadsInFlight:
* Returns the number of IO read jobs currrently in flight. */
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}
/* AIO_ReadPool_getNextCompletedJob:
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
if (ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
}
if(job) {
assert(job->offset == ctx->waitingOnOffset);
ctx->waitingOnOffset += job->usedBufferSize;
}
if (ctx->base.threadPool)
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
return job;
}
/* AIO_ReadPool_executeReadJob:
* Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
IOJob_t* const job = (IOJob_t*) opaque;
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->reachedEof) {
job->usedBufferSize = 0;
AIO_ReadPool_addJobToCompleted(job);
return;
}
job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
if(job->usedBufferSize < job->bufferSize) {
if(ferror(job->file)) {
EXM_THROW(37, "Read error");
} else if(feof(job->file)) {
ctx->reachedEof = 1;
} else {
EXM_THROW(37, "Unexpected short read");
}
}
AIO_ReadPool_addJobToCompleted(job);
}
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
job->offset = ctx->nextReadOffset;
ctx->nextReadOffset += job->bufferSize;
AIO_IOPool_enqueueJob(job);
}
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
int i;
for (i = 0; i < ctx->base.availableJobsCount; i++) {
AIO_ReadPool_enqueueRead(ctx);
}
}
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(&ctx->base);
AIO_ReadPool_releaseAllCompletedJobs(ctx);
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
}
AIO_IOPool_setFile(&ctx->base, file);
ctx->nextReadOffset = 0;
ctx->waitingOnOffset = 0;
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->reachedEof = 0;
if(file != NULL)
AIO_ReadPool_startReading(ctx);
}
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->completedJobsCount = 0;
ctx->currentJobHeld = NULL;
if(ctx->base.threadPool)
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
return ctx;
}
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
if(AIO_ReadPool_getFile(ctx))
AIO_ReadPool_closeFile(ctx);
if(ctx->base.threadPool)
ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
AIO_IOPool_destroy(&ctx->base);
free(ctx->coalesceBuffer);
free(ctx);
}
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
assert(n <= ctx->srcBufferLoaded);
ctx->srcBufferLoaded -= n;
ctx->srcBuffer += n;
}
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
* Release the current held job and get the next one, returns NULL if no next job available. */
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
AIO_ReadPool_enqueueRead(ctx);
}
ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
return (IOJob_t*) ctx->currentJobHeld;
}
/* AIO_ReadPool_fillBuffer:
* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
IOJob_t *job;
int useCoalesce = 0;
if(n > ctx->base.jobBufferSize)
n = ctx->base.jobBufferSize;
/* We are good, don't read anything */
if (ctx->srcBufferLoaded >= n)
return 0;
/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
* and coalesce the remaining bytes with the next job's buffer */
if (ctx->srcBufferLoaded > 0) {
useCoalesce = 1;
memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
ctx->srcBuffer = ctx->coalesceBuffer;
}
/* Read the next chunk */
job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
if(!job)
return 0;
if(useCoalesce) {
assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
ctx->srcBufferLoaded += job->usedBufferSize;
}
else {
ctx->srcBuffer = (U8 *) job->buffer;
ctx->srcBufferLoaded = job->usedBufferSize;
}
return job->usedBufferSize;
}
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
}
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
FILE* const file = AIO_ReadPool_getFile(ctx);
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}