mirror of
				https://github.com/facebook/zstd.git
				synced 2025-11-03 20:33:11 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			664 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			664 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 | 
						|
 * All rights reserved.
 | 
						|
 *
 | 
						|
 * This source code is licensed under both the BSD-style license (found in the
 | 
						|
 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
 | 
						|
 * in the COPYING file in the root directory of this source tree).
 | 
						|
 * You may select, at your option, one of the above-listed licenses.
 | 
						|
 */
 | 
						|
 | 
						|
#include "platform.h"
 | 
						|
#include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
 | 
						|
#include <stdlib.h>     /* malloc, free */
 | 
						|
#include <assert.h>
 | 
						|
#include <errno.h>      /* errno */
 | 
						|
 | 
						|
#if defined (_MSC_VER)
 | 
						|
#  include <sys/stat.h>
 | 
						|
#  include <io.h>
 | 
						|
#endif
 | 
						|
 | 
						|
#include "fileio_asyncio.h"
 | 
						|
#include "fileio_common.h"
 | 
						|
 | 
						|
/* **********************************************************************
 | 
						|
 *  Sparse write
 | 
						|
 ************************************************************************/
 | 
						|
 | 
						|
/** AIO_fwriteSparse() :
 | 
						|
*  @return : storedSkips,
 | 
						|
*            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
 | 
						|
static unsigned
 | 
						|
AIO_fwriteSparse(FILE* file,
 | 
						|
                 const void* buffer, size_t bufferSize,
 | 
						|
                 const FIO_prefs_t* const prefs,
 | 
						|
                 unsigned storedSkips)
 | 
						|
{
 | 
						|
    const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
 | 
						|
    size_t bufferSizeT = bufferSize / sizeof(size_t);
 | 
						|
    const size_t* const bufferTEnd = bufferT + bufferSizeT;
 | 
						|
    const size_t* ptrT = bufferT;
 | 
						|
    static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
 | 
						|
 | 
						|
    if (prefs->testMode) return 0;  /* do not output anything in test mode */
 | 
						|
 | 
						|
    if (!prefs->sparseFileSupport) {  /* normal write */
 | 
						|
        size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
 | 
						|
        if (sizeCheck != bufferSize)
 | 
						|
            EXM_THROW(70, "Write error : cannot write block : %s",
 | 
						|
                      strerror(errno));
 | 
						|
        return 0;
 | 
						|
    }
 | 
						|
 | 
						|
    /* avoid int overflow */
 | 
						|
    if (storedSkips > 1 GB) {
 | 
						|
        if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
 | 
						|
        EXM_THROW(91, "1 GB skip error (sparse file support)");
 | 
						|
        storedSkips -= 1 GB;
 | 
						|
    }
 | 
						|
 | 
						|
    while (ptrT < bufferTEnd) {
 | 
						|
        size_t nb0T;
 | 
						|
 | 
						|
        /* adjust last segment if < 32 KB */
 | 
						|
        size_t seg0SizeT = segmentSizeT;
 | 
						|
        if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
 | 
						|
        bufferSizeT -= seg0SizeT;
 | 
						|
 | 
						|
        /* count leading zeroes */
 | 
						|
        for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
 | 
						|
        storedSkips += (unsigned)(nb0T * sizeof(size_t));
 | 
						|
 | 
						|
        if (nb0T != seg0SizeT) {   /* not all 0s */
 | 
						|
            size_t const nbNon0ST = seg0SizeT - nb0T;
 | 
						|
            /* skip leading zeros */
 | 
						|
            if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
 | 
						|
                EXM_THROW(92, "Sparse skip error ; try --no-sparse");
 | 
						|
            storedSkips = 0;
 | 
						|
            /* write the rest */
 | 
						|
            if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
 | 
						|
                EXM_THROW(93, "Write error : cannot write block : %s",
 | 
						|
                          strerror(errno));
 | 
						|
        }
 | 
						|
        ptrT += seg0SizeT;
 | 
						|
    }
 | 
						|
 | 
						|
    {   static size_t const maskT = sizeof(size_t)-1;
 | 
						|
        if (bufferSize & maskT) {
 | 
						|
            /* size not multiple of sizeof(size_t) : implies end of block */
 | 
						|
            const char* const restStart = (const char*)bufferTEnd;
 | 
						|
            const char* restPtr = restStart;
 | 
						|
            const char* const restEnd = (const char*)buffer + bufferSize;
 | 
						|
            assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
 | 
						|
            for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
 | 
						|
            storedSkips += (unsigned) (restPtr - restStart);
 | 
						|
            if (restPtr != restEnd) {
 | 
						|
                /* not all remaining bytes are 0 */
 | 
						|
                size_t const restSize = (size_t)(restEnd - restPtr);
 | 
						|
                if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
 | 
						|
                    EXM_THROW(92, "Sparse skip error ; try --no-sparse");
 | 
						|
                if (fwrite(restPtr, 1, restSize, file) != restSize)
 | 
						|
                    EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
 | 
						|
                              strerror(errno));
 | 
						|
                storedSkips = 0;
 | 
						|
            }   }   }
 | 
						|
 | 
						|
    return storedSkips;
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
 | 
						|
{
 | 
						|
    if (prefs->testMode) assert(storedSkips == 0);
 | 
						|
    if (storedSkips>0) {
 | 
						|
        assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
 | 
						|
        (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
 | 
						|
        if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
 | 
						|
            EXM_THROW(69, "Final skip error (sparse file support)");
 | 
						|
        /* last zero must be explicitly written,
 | 
						|
         * so that skipped ones get implicitly translated as zero by FS */
 | 
						|
        {   const char lastZeroByte[1] = { 0 };
 | 
						|
            if (fwrite(lastZeroByte, 1, 1, file) != 1)
 | 
						|
                EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
 | 
						|
        }   }
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* **********************************************************************
 | 
						|
 *  AsyncIO functionality
 | 
						|
 ************************************************************************/
 | 
						|
 | 
						|
/* AIO_supported:
 | 
						|
 * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
 | 
						|
int AIO_supported(void) {
 | 
						|
#ifdef ZSTD_MULTITHREAD
 | 
						|
    return 1;
 | 
						|
#else
 | 
						|
    return 0;
 | 
						|
#endif
 | 
						|
}
 | 
						|
 | 
						|
/* ***********************************
 | 
						|
 *  Generic IoPool implementation
 | 
						|
 *************************************/
 | 
						|
 | 
						|
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
 | 
						|
    IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
 | 
						|
    void* const buffer = malloc(bufferSize);
 | 
						|
    if(!job || !buffer)
 | 
						|
        EXM_THROW(101, "Allocation error : not enough memory");
 | 
						|
    job->buffer = buffer;
 | 
						|
    job->bufferSize = bufferSize;
 | 
						|
    job->usedBufferSize = 0;
 | 
						|
    job->file = NULL;
 | 
						|
    job->ctx = ctx;
 | 
						|
    job->offset = 0;
 | 
						|
    return job;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* AIO_IOPool_createThreadPool:
 | 
						|
 * Creates a thread pool and a mutex for threaded IO pool.
 | 
						|
 * Displays warning if asyncio is requested but MT isn't available. */
 | 
						|
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
 | 
						|
    ctx->threadPool = NULL;
 | 
						|
    ctx->threadPoolActive = 0;
 | 
						|
    if(prefs->asyncIO) {
 | 
						|
        if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
 | 
						|
            EXM_THROW(102,"Failed creating ioJobsMutex mutex");
 | 
						|
        /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
 | 
						|
         * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
 | 
						|
        assert(MAX_IO_JOBS >= 2);
 | 
						|
        ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
 | 
						|
        ctx->threadPoolActive = 1;
 | 
						|
        if (!ctx->threadPool)
 | 
						|
            EXM_THROW(104, "Failed creating I/O thread pool");
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_init:
 | 
						|
 * Allocates and sets and a new I/O thread pool including its included availableJobs. */
 | 
						|
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
 | 
						|
    int i;
 | 
						|
    AIO_IOPool_createThreadPool(ctx, prefs);
 | 
						|
    ctx->prefs = prefs;
 | 
						|
    ctx->poolFunction = poolFunction;
 | 
						|
    ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
 | 
						|
    ctx->availableJobsCount = ctx->totalIoJobs;
 | 
						|
    for(i=0; i < ctx->availableJobsCount; i++) {
 | 
						|
        ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
 | 
						|
    }
 | 
						|
    ctx->jobBufferSize = bufferSize;
 | 
						|
    ctx->file = NULL;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* AIO_IOPool_threadPoolActive:
 | 
						|
 * Check if current operation uses thread pool.
 | 
						|
 * Note that in some cases we have a thread pool initialized but choose not to use it. */
 | 
						|
static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
 | 
						|
    return ctx->threadPool && ctx->threadPoolActive;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* AIO_IOPool_lockJobsMutex:
 | 
						|
 * Locks the IO jobs mutex if threading is active */
 | 
						|
static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
 | 
						|
    if(AIO_IOPool_threadPoolActive(ctx))
 | 
						|
        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_unlockJobsMutex:
 | 
						|
 * Unlocks the IO jobs mutex if threading is active */
 | 
						|
static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
 | 
						|
    if(AIO_IOPool_threadPoolActive(ctx))
 | 
						|
        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_releaseIoJob:
 | 
						|
 * Releases an acquired job back to the pool. Doesn't execute the job. */
 | 
						|
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
 | 
						|
    IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
 | 
						|
    AIO_IOPool_lockJobsMutex(ctx);
 | 
						|
    assert(ctx->availableJobsCount < ctx->totalIoJobs);
 | 
						|
    ctx->availableJobs[ctx->availableJobsCount++] = job;
 | 
						|
    AIO_IOPool_unlockJobsMutex(ctx);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_join:
 | 
						|
 * Waits for all tasks in the pool to finish executing. */
 | 
						|
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
 | 
						|
    if(AIO_IOPool_threadPoolActive(ctx))
 | 
						|
        POOL_joinJobs(ctx->threadPool);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_setThreaded:
 | 
						|
 * Allows (de)activating threaded mode, to be used when the expected overhead
 | 
						|
 * of threading costs more than the expected gains. */
 | 
						|
static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
 | 
						|
    assert(threaded == 0 || threaded == 1);
 | 
						|
    assert(ctx != NULL);
 | 
						|
    if(ctx->threadPoolActive != threaded) {
 | 
						|
        AIO_IOPool_join(ctx);
 | 
						|
        ctx->threadPoolActive = threaded;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_free:
 | 
						|
 * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
 | 
						|
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
 | 
						|
    int i;
 | 
						|
    if(ctx->threadPool) {
 | 
						|
        /* Make sure we finish all tasks and then free the resources */
 | 
						|
        AIO_IOPool_join(ctx);
 | 
						|
        /* Make sure we are not leaking availableJobs */
 | 
						|
        assert(ctx->availableJobsCount == ctx->totalIoJobs);
 | 
						|
        POOL_free(ctx->threadPool);
 | 
						|
        ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
 | 
						|
    }
 | 
						|
    assert(ctx->file == NULL);
 | 
						|
    for(i=0; i<ctx->availableJobsCount; i++) {
 | 
						|
        IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
 | 
						|
        free(job->buffer);
 | 
						|
        free(job);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_acquireJob:
 | 
						|
 * Returns an available io job to be used for a future io. */
 | 
						|
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
 | 
						|
    IOJob_t* job;
 | 
						|
    assert(ctx->file != NULL || ctx->prefs->testMode);
 | 
						|
    AIO_IOPool_lockJobsMutex(ctx);
 | 
						|
    assert(ctx->availableJobsCount > 0);
 | 
						|
    job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
 | 
						|
    AIO_IOPool_unlockJobsMutex(ctx);
 | 
						|
    job->usedBufferSize = 0;
 | 
						|
    job->file = ctx->file;
 | 
						|
    job->offset = 0;
 | 
						|
    return job;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* AIO_IOPool_setFile:
 | 
						|
 * Sets the destination file for future files in the pool.
 | 
						|
 * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
 | 
						|
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
 | 
						|
    assert(ctx!=NULL);
 | 
						|
    AIO_IOPool_join(ctx);
 | 
						|
    assert(ctx->availableJobsCount == ctx->totalIoJobs);
 | 
						|
    ctx->file = file;
 | 
						|
}
 | 
						|
 | 
						|
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
 | 
						|
    return ctx->file;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_IOPool_enqueueJob:
 | 
						|
 * Enqueues an io job for execution.
 | 
						|
 * The queued job shouldn't be used directly after queueing it. */
 | 
						|
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
 | 
						|
    IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
 | 
						|
    if(AIO_IOPool_threadPoolActive(ctx))
 | 
						|
        POOL_add(ctx->threadPool, ctx->poolFunction, job);
 | 
						|
    else
 | 
						|
        ctx->poolFunction(job);
 | 
						|
}
 | 
						|
 | 
						|
/* ***********************************
 | 
						|
 *  WritePool implementation
 | 
						|
 *************************************/
 | 
						|
 | 
						|
/* AIO_WritePool_acquireJob:
 | 
						|
 * Returns an available write job to be used for a future write. */
 | 
						|
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
 | 
						|
    return AIO_IOPool_acquireJob(&ctx->base);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_enqueueAndReacquireWriteJob:
 | 
						|
 * Queues a write job for execution and acquires a new one.
 | 
						|
 * After execution `job`'s pointed value would change to the newly acquired job.
 | 
						|
 * Make sure to set `usedBufferSize` to the wanted length before call.
 | 
						|
 * The queued job shouldn't be used directly after queueing it. */
 | 
						|
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
 | 
						|
    AIO_IOPool_enqueueJob(*job);
 | 
						|
    *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_sparseWriteEnd:
 | 
						|
 * Ends sparse writes to the current file.
 | 
						|
 * Blocks on completion of all current write jobs before executing. */
 | 
						|
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
 | 
						|
    assert(ctx != NULL);
 | 
						|
    AIO_IOPool_join(&ctx->base);
 | 
						|
    AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
 | 
						|
    ctx->storedSkips = 0;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_setFile:
 | 
						|
 * Sets the destination file for future writes in the pool.
 | 
						|
 * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
 | 
						|
 * Also requires ending of sparse write if a previous file was used in sparse mode. */
 | 
						|
void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
 | 
						|
    AIO_IOPool_setFile(&ctx->base, file);
 | 
						|
    assert(ctx->storedSkips == 0);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_getFile:
 | 
						|
 * Returns the file the writePool is currently set to write to. */
 | 
						|
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
 | 
						|
    return AIO_IOPool_getFile(&ctx->base);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_releaseIoJob:
 | 
						|
 * Releases an acquired job back to the pool. Doesn't execute the job. */
 | 
						|
void AIO_WritePool_releaseIoJob(IOJob_t* job) {
 | 
						|
    AIO_IOPool_releaseIoJob(job);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_closeFile:
 | 
						|
 * Ends sparse write and closes the writePool's current file and sets the file to NULL.
 | 
						|
 * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
 | 
						|
int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
 | 
						|
    FILE* const dstFile = ctx->base.file;
 | 
						|
    assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
 | 
						|
    AIO_WritePool_sparseWriteEnd(ctx);
 | 
						|
    AIO_IOPool_setFile(&ctx->base, NULL);
 | 
						|
    return fclose(dstFile);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_executeWriteJob:
 | 
						|
 * Executes a write job synchronously. Can be used as a function for a thread pool. */
 | 
						|
static void AIO_WritePool_executeWriteJob(void* opaque){
 | 
						|
    IOJob_t* const job = (IOJob_t*) opaque;
 | 
						|
    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
 | 
						|
    ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
 | 
						|
    AIO_IOPool_releaseIoJob(job);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_create:
 | 
						|
 * Allocates and sets and a new write pool including its included jobs. */
 | 
						|
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
 | 
						|
    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
 | 
						|
    if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
 | 
						|
    AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
 | 
						|
    ctx->storedSkips = 0;
 | 
						|
    return ctx;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_free:
 | 
						|
 * Frees and releases a writePool and its resources. Closes destination file if needs to. */
 | 
						|
void AIO_WritePool_free(WritePoolCtx_t* ctx) {
 | 
						|
    /* Make sure we finish all tasks and then free the resources */
 | 
						|
    if(AIO_WritePool_getFile(ctx))
 | 
						|
        AIO_WritePool_closeFile(ctx);
 | 
						|
    AIO_IOPool_destroy(&ctx->base);
 | 
						|
    assert(ctx->storedSkips==0);
 | 
						|
    free(ctx);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_WritePool_setAsync:
 | 
						|
 * Allows (de)activating async mode, to be used when the expected overhead
 | 
						|
 * of asyncio costs more than the expected gains. */
 | 
						|
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
 | 
						|
    AIO_IOPool_setThreaded(&ctx->base, async);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* ***********************************
 | 
						|
 *  ReadPool implementation
 | 
						|
 *************************************/
 | 
						|
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
 | 
						|
    int i;
 | 
						|
    for(i=0; i<ctx->completedJobsCount; i++) {
 | 
						|
        IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
 | 
						|
        AIO_IOPool_releaseIoJob(job);
 | 
						|
    }
 | 
						|
    ctx->completedJobsCount = 0;
 | 
						|
}
 | 
						|
 | 
						|
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
 | 
						|
    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
 | 
						|
    AIO_IOPool_lockJobsMutex(&ctx->base);
 | 
						|
    assert(ctx->completedJobsCount < MAX_IO_JOBS);
 | 
						|
    ctx->completedJobs[ctx->completedJobsCount++] = job;
 | 
						|
    if(AIO_IOPool_threadPoolActive(&ctx->base)) {
 | 
						|
        ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
 | 
						|
    }
 | 
						|
    AIO_IOPool_unlockJobsMutex(&ctx->base);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
 | 
						|
 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
 | 
						|
 * if job wasn't found returns NULL.
 | 
						|
 * IMPORTANT: assumes ioJobsMutex is locked. */
 | 
						|
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
 | 
						|
    IOJob_t *job = NULL;
 | 
						|
    int i;
 | 
						|
    /* This implementation goes through all completed jobs and looks for the one matching the next offset.
 | 
						|
     * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
 | 
						|
     * reads to be completed in order) this implementation was chosen as it better fits other asyncio
 | 
						|
     * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
 | 
						|
    for (i=0; i<ctx->completedJobsCount; i++) {
 | 
						|
        job = (IOJob_t *) ctx->completedJobs[i];
 | 
						|
        if (job->offset == ctx->waitingOnOffset) {
 | 
						|
            ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
 | 
						|
            return job;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    return NULL;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_numReadsInFlight:
 | 
						|
 * Returns the number of IO read jobs currently in flight. */
 | 
						|
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
 | 
						|
    const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
 | 
						|
    return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_getNextCompletedJob:
 | 
						|
 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
 | 
						|
 * Would block. */
 | 
						|
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
 | 
						|
    IOJob_t *job = NULL;
 | 
						|
    AIO_IOPool_lockJobsMutex(&ctx->base);
 | 
						|
 | 
						|
    job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
 | 
						|
 | 
						|
    /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
 | 
						|
    while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
 | 
						|
        assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
 | 
						|
        ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
 | 
						|
        job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
 | 
						|
    }
 | 
						|
 | 
						|
    if(job) {
 | 
						|
        assert(job->offset == ctx->waitingOnOffset);
 | 
						|
        ctx->waitingOnOffset += job->usedBufferSize;
 | 
						|
    }
 | 
						|
 | 
						|
    AIO_IOPool_unlockJobsMutex(&ctx->base);
 | 
						|
    return job;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* AIO_ReadPool_executeReadJob:
 | 
						|
 * Executes a read job synchronously. Can be used as a function for a thread pool. */
 | 
						|
static void AIO_ReadPool_executeReadJob(void* opaque){
 | 
						|
    IOJob_t* const job = (IOJob_t*) opaque;
 | 
						|
    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
 | 
						|
    if(ctx->reachedEof) {
 | 
						|
        job->usedBufferSize = 0;
 | 
						|
        AIO_ReadPool_addJobToCompleted(job);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
    job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
 | 
						|
    if(job->usedBufferSize < job->bufferSize) {
 | 
						|
        if(ferror(job->file)) {
 | 
						|
            EXM_THROW(37, "Read error");
 | 
						|
        } else if(feof(job->file)) {
 | 
						|
            ctx->reachedEof = 1;
 | 
						|
        } else {
 | 
						|
            EXM_THROW(37, "Unexpected short read");
 | 
						|
        }
 | 
						|
    }
 | 
						|
    AIO_ReadPool_addJobToCompleted(job);
 | 
						|
}
 | 
						|
 | 
						|
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
 | 
						|
    IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
 | 
						|
    job->offset = ctx->nextReadOffset;
 | 
						|
    ctx->nextReadOffset += job->bufferSize;
 | 
						|
    AIO_IOPool_enqueueJob(job);
 | 
						|
}
 | 
						|
 | 
						|
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
 | 
						|
    while(ctx->base.availableJobsCount) {
 | 
						|
        AIO_ReadPool_enqueueRead(ctx);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_setFile:
 | 
						|
 * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
 | 
						|
 * Waits for all current enqueued tasks to complete if a previous file was set. */
 | 
						|
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
 | 
						|
    assert(ctx!=NULL);
 | 
						|
    AIO_IOPool_join(&ctx->base);
 | 
						|
    AIO_ReadPool_releaseAllCompletedJobs(ctx);
 | 
						|
    if (ctx->currentJobHeld) {
 | 
						|
        AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
 | 
						|
        ctx->currentJobHeld = NULL;
 | 
						|
    }
 | 
						|
    AIO_IOPool_setFile(&ctx->base, file);
 | 
						|
    ctx->nextReadOffset = 0;
 | 
						|
    ctx->waitingOnOffset = 0;
 | 
						|
    ctx->srcBuffer = ctx->coalesceBuffer;
 | 
						|
    ctx->srcBufferLoaded = 0;
 | 
						|
    ctx->reachedEof = 0;
 | 
						|
    if(file != NULL)
 | 
						|
        AIO_ReadPool_startReading(ctx);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_create:
 | 
						|
 * Allocates and sets and a new readPool including its included jobs.
 | 
						|
 * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
 | 
						|
 * as our basic read size. */
 | 
						|
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
 | 
						|
    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
 | 
						|
    if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
 | 
						|
    AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
 | 
						|
 | 
						|
    ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
 | 
						|
    if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
 | 
						|
    ctx->srcBuffer = ctx->coalesceBuffer;
 | 
						|
    ctx->srcBufferLoaded = 0;
 | 
						|
    ctx->completedJobsCount = 0;
 | 
						|
    ctx->currentJobHeld = NULL;
 | 
						|
 | 
						|
    if(ctx->base.threadPool)
 | 
						|
        if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
 | 
						|
            EXM_THROW(103,"Failed creating jobCompletedCond cond");
 | 
						|
 | 
						|
    return ctx;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_free:
 | 
						|
 * Frees and releases a readPool and its resources. Closes source file. */
 | 
						|
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
 | 
						|
    if(AIO_ReadPool_getFile(ctx))
 | 
						|
        AIO_ReadPool_closeFile(ctx);
 | 
						|
    if(ctx->base.threadPool)
 | 
						|
        ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
 | 
						|
    AIO_IOPool_destroy(&ctx->base);
 | 
						|
    free(ctx->coalesceBuffer);
 | 
						|
    free(ctx);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_consumeBytes:
 | 
						|
 * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
 | 
						|
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
 | 
						|
    assert(n <= ctx->srcBufferLoaded);
 | 
						|
    ctx->srcBufferLoaded -= n;
 | 
						|
    ctx->srcBuffer += n;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
 | 
						|
 * Release the current held job and get the next one, returns NULL if no next job available. */
 | 
						|
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
 | 
						|
    if (ctx->currentJobHeld) {
 | 
						|
        AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
 | 
						|
        ctx->currentJobHeld = NULL;
 | 
						|
        AIO_ReadPool_enqueueRead(ctx);
 | 
						|
    }
 | 
						|
    ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
 | 
						|
    return (IOJob_t*) ctx->currentJobHeld;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_fillBuffer:
 | 
						|
 * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
 | 
						|
 * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
 | 
						|
 * Return value is the number of bytes added to the buffer.
 | 
						|
 * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
 | 
						|
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
 | 
						|
    IOJob_t *job;
 | 
						|
    int useCoalesce = 0;
 | 
						|
    if(n > ctx->base.jobBufferSize)
 | 
						|
        n = ctx->base.jobBufferSize;
 | 
						|
 | 
						|
    /* We are good, don't read anything */
 | 
						|
    if (ctx->srcBufferLoaded >= n)
 | 
						|
        return 0;
 | 
						|
 | 
						|
    /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
 | 
						|
     * and coalesce the remaining bytes with the next job's buffer */
 | 
						|
    if (ctx->srcBufferLoaded > 0) {
 | 
						|
        useCoalesce = 1;
 | 
						|
        memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
 | 
						|
        ctx->srcBuffer = ctx->coalesceBuffer;
 | 
						|
    }
 | 
						|
 | 
						|
    /* Read the next chunk */
 | 
						|
    job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
 | 
						|
    if(!job)
 | 
						|
        return 0;
 | 
						|
    if(useCoalesce) {
 | 
						|
        assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
 | 
						|
        memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
 | 
						|
        ctx->srcBufferLoaded += job->usedBufferSize;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
        ctx->srcBuffer = (U8 *) job->buffer;
 | 
						|
        ctx->srcBufferLoaded = job->usedBufferSize;
 | 
						|
    }
 | 
						|
    return job->usedBufferSize;
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_consumeAndRefill:
 | 
						|
 * Consumes the current buffer and refills it with bufferSize bytes. */
 | 
						|
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
 | 
						|
    AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
 | 
						|
    return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_getFile:
 | 
						|
 * Returns the current file set for the read pool. */
 | 
						|
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
 | 
						|
    return AIO_IOPool_getFile(&ctx->base);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_closeFile:
 | 
						|
 * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
 | 
						|
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
 | 
						|
    FILE* const file = AIO_ReadPool_getFile(ctx);
 | 
						|
    AIO_ReadPool_setFile(ctx, NULL);
 | 
						|
    return fclose(file);
 | 
						|
}
 | 
						|
 | 
						|
/* AIO_ReadPool_setAsync:
 | 
						|
 * Allows (de)activating async mode, to be used when the expected overhead
 | 
						|
 * of asyncio costs more than the expected gains. */
 | 
						|
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
 | 
						|
    AIO_IOPool_setThreaded(&ctx->base, async);
 | 
						|
}
 |