diff --git a/build/meson/programs/meson.build b/build/meson/programs/meson.build index 4181030c2..0ae93fc10 100644 --- a/build/meson/programs/meson.build +++ b/build/meson/programs/meson.build @@ -20,14 +20,24 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/dibio.c'), join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'), # needed due to use of private symbol + -fvisibility=hidden - join_paths(zstd_rootdir, 'lib/common/xxhash.c')] - -zstd_c_args = libzstd_debug_cflags -if use_multi_thread - zstd_c_args += [ '-DZSTD_MULTITHREAD' ] -endif + join_paths(zstd_rootdir, 'lib/common/xxhash.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] zstd_deps = [ libzstd_dep ] +zstd_c_args = libzstd_debug_cflags + +zstd_frugal_deps = [ libzstd_dep ] +zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ] + +if use_multi_thread + zstd_deps += [ thread_dep ] + zstd_c_args += [ '-DZSTD_MULTITHREAD' ] + zstd_frugal_deps += [ thread_dep ] + zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ] +endif + if use_zlib zstd_deps += [ zlib_dep ] zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ] @@ -69,14 +79,17 @@ zstd = executable('zstd', zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/util.c'), - join_paths(zstd_rootdir, 'programs/fileio.c')] + join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] # Minimal target, with only zstd compression and decompression. # No bench. No legacy. executable('zstd-frugal', zstd_frugal_sources, - dependencies: libzstd_dep, - c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ], + dependencies: zstd_frugal_deps, + c_args: zstd_frugal_c_args, install: true) install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'), diff --git a/lib/common/pool.c b/lib/common/pool.c index 2e37cdd73..5c1d07d35 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) { /* If the intended queue size was 0, signal after finishing job */ ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - if (ctx->queueSize == 1) { - ZSTD_pthread_cond_signal(&ctx->queuePushCond); - } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ @@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) { ZSTD_customFree(ctx, ctx->customMem); } +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx) { + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} + void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { POOL_free (pool); } @@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } +void POOL_joinJobs(POOL_ctx* ctx){ + assert(!ctx || ctx == &g_poolCtx); + (void)ctx; +} + int POOL_resize(POOL_ctx* ctx, size_t numThreads) { (void)ctx; (void)numThreads; return 0; diff --git a/lib/common/pool.h b/lib/common/pool.h index 0ebde1805..b86a3452e 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, */ void POOL_free(POOL_ctx* ctx); + +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx); + /*! POOL_resize() : * Expands or shrinks pool's number of threads. * This is more efficient than releasing + creating a new context, diff --git a/programs/fileio.c b/programs/fileio.c index 5338fa629..b85e0806b 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -34,6 +34,8 @@ #include /* INT_MAX */ #include #include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */ +#include "../lib/common/pool.h" +#include "../lib/common/threading.h" #if defined (_MSC_VER) # include @@ -325,6 +327,7 @@ struct FIO_prefs_s { /* IO preferences */ U32 removeSrcFile; U32 overwrite; + U32 asyncIO; /* Computation resources preferences */ unsigned memLimit; @@ -395,6 +398,7 @@ FIO_prefs_t* FIO_createPreferences(void) ret->literalCompressionMode = ZSTD_ps_auto; ret->excludeCompressedFiles = 0; ret->allowBlockDevices = 0; + ret->asyncIO = 0; return ret; } @@ -558,6 +562,10 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value) prefs->contentSize = value != 0; } +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) { + prefs->asyncIO = value; +} + /* FIO_ctx_t functions */ void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) { @@ -1798,7 +1806,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, static const char* checked_index(const char* options[], size_t length, size_t index) { assert(index < length); - // Necessary to avoid warnings since -O3 will omit the above `assert` + /* Necessary to avoid warnings since -O3 will omit the above `assert` */ (void) length; return options[index]; } @@ -2000,16 +2008,124 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* ************************************************************************** * Decompression ***************************************************************************/ +#define DECOMPRESSION_MAX_WRITE_JOBS (10) + +typedef struct { + /* These struct fields should be set only on creation and not changed afterwards */ + POOL_ctx* writerPool; + int totalWriteJobs; + FIO_prefs_t* prefs; + + /* Controls the file we currently write to, make changes only by using provided utility functions */ + FILE* dstFile; + unsigned storedSkips; + + /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should + * only be mutated after locking the mutex */ + ZSTD_pthread_mutex_t writeJobsMutex; + void* jobs[DECOMPRESSION_MAX_WRITE_JOBS]; + int availableWriteJobs; +} write_pool_ctx_t; + +typedef struct { + /* These fields are automaically set and shouldn't be changed by non WritePool code. */ + write_pool_ctx_t *ctx; + FILE* dstFile; + void *buffer; + size_t bufferSize; + + /* This field should be changed before a job is queued for execution and should contain the number + * of bytes to write from the buffer. */ + size_t usedBufferSize; +} write_job_t; + typedef struct { void* srcBuffer; size_t srcBufferSize; size_t srcBufferLoaded; - void* dstBuffer; - size_t dstBufferSize; ZSTD_DStream* dctx; - FILE* dstFile; + write_pool_ctx_t *writePoolCtx; } dRess_t; +static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) { + void *buffer; + write_job_t *job; + job = (write_job_t*) malloc(sizeof(write_job_t)); + buffer = malloc(ZSTD_DStreamOutSize()); + if(!job || !buffer) + EXM_THROW(101, "Allocation error : not enough memory"); + job->buffer = buffer; + job->bufferSize = ZSTD_DStreamOutSize(); + job->usedBufferSize = 0; + job->dstFile = NULL; + job->ctx = ctx; + return job; +} + +/* WritePool_createThreadPool: + * Creates a thread pool and a mutex for threaded write pool. + * Displays warning if asyncio is requested but MT isn't available. */ +static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { + ctx->writerPool = NULL; + if(prefs->asyncIO) { +#ifdef ZSTD_MULTITHREAD + if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL)) + EXM_THROW(102, "Failed creating write jobs mutex"); + /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to + * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ + assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2); + ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2); + if (!ctx->writerPool) + EXM_THROW(103, "Failed creating writer thread pool"); +#else + DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n"); +#endif + } +} + +/* WritePool_create: + * Allocates and sets and a new write pool including its included jobs. */ +static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) { + write_pool_ctx_t *ctx; + int i; + ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t)); + if(!ctx) + EXM_THROW(100, "Allocation error : not enough memory"); + WritePool_createThreadPool(ctx, prefs); + ctx->prefs = prefs; + ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1; + ctx->availableWriteJobs = ctx->totalWriteJobs; + for(i=0; i < ctx->availableWriteJobs; i++) { + ctx->jobs[i] = FIO_createWriteJob(ctx); + } + ctx->storedSkips = 0; + ctx->dstFile = NULL; + return ctx; +} + +/* WritePool_free: + * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ +static void WritePool_free(write_pool_ctx_t* ctx) { + int i=0; + if(ctx->writerPool) { + /* Make sure we finish all tasks and then free the resources */ + POOL_joinJobs(ctx->writerPool); + /* Make sure we are not leaking jobs */ + assert(ctx->availableWriteJobs==ctx->totalWriteJobs); + POOL_free(ctx->writerPool); + ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex); + } + assert(ctx->dstFile==NULL); + assert(ctx->storedSkips==0); + for(i=0; iavailableWriteJobs; i++) { + write_job_t* job = (write_job_t*) ctx->jobs[i]; + free(job->buffer); + free(job); + } + free(ctx); +} + + static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) { dRess_t ress; @@ -2027,9 +2143,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi ress.srcBufferSize = ZSTD_DStreamInSize(); ress.srcBuffer = malloc(ress.srcBufferSize); - ress.dstBufferSize = ZSTD_DStreamOutSize(); - ress.dstBuffer = malloc(ress.dstBufferSize); - if (!ress.srcBuffer || !ress.dstBuffer) + if (!ress.srcBuffer) EXM_THROW(61, "Allocation error : not enough memory"); /* dictionary */ @@ -2039,6 +2153,8 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi free(dictBuffer); } + ress.writePoolCtx = WritePool_create(prefs); + return ress; } @@ -2046,9 +2162,16 @@ static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); free(ress.srcBuffer); - free(ress.dstBuffer); + WritePool_free(ress.writePoolCtx); } +/* FIO_consumeDSrcBuffer: + * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */ +static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) { + assert(ress->srcBufferLoaded >= len); + ress->srcBufferLoaded -= len; + memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded); +} /** FIO_fwriteSparse() : * @return : storedSkips, @@ -2148,6 +2271,106 @@ FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS } } } +/* WritePool_releaseWriteJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +static void WritePool_releaseWriteJob(write_job_t *job) { + write_pool_ctx_t *ctx = job->ctx; + if(ctx->writerPool) { + ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); + assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS); + ctx->jobs[ctx->availableWriteJobs++] = job; + ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); + } else { + ctx->availableWriteJobs++; + } +} + +/* WritePool_acquireWriteJob: + * Returns an available write job to be used for a future write. */ +static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) { + write_job_t *job; + assert(ctx->dstFile!=NULL || ctx->prefs->testMode); + if(ctx->writerPool) { + ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); + assert(ctx->availableWriteJobs > 0); + job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs]; + ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); + } else { + assert(ctx->availableWriteJobs==1); + ctx->availableWriteJobs--; + job = (write_job_t*)ctx->jobs[0]; + } + job->usedBufferSize = 0; + job->dstFile = ctx->dstFile; + return job; +} + +/* WritePool_executeWriteJob: + * Executes a write job synchronously. Can be used as a function for a thread pool. */ +static void WritePool_executeWriteJob(void* opaque){ + write_job_t* job = (write_job_t*) opaque; + write_pool_ctx_t* ctx = job->ctx; + ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips); + WritePool_releaseWriteJob(job); +} + +/* WritePool_queueWriteJob: + * Queues a write job for execution. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +static void WritePool_queueWriteJob(write_job_t *job) { + write_pool_ctx_t* ctx = job->ctx; + if(ctx->writerPool) + POOL_add(ctx->writerPool, WritePool_executeWriteJob, job); + else + WritePool_executeWriteJob(job); +} + +/* WritePool_queueAndReacquireWriteJob: + * Queues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +static void WritePool_queueAndReacquireWriteJob(write_job_t **job) { + WritePool_queueWriteJob(*job); + *job = WritePool_acquireWriteJob((*job)->ctx); +} + +/* WritePool_sparseWriteEnd: + * Ends sparse writes to the current dstFile. + * Blocks on completion of all current write jobs before executing. */ +static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) { + assert(ctx != NULL); + if(ctx->writerPool) + POOL_joinJobs(ctx->writerPool); + FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips); + ctx->storedSkips = 0; +} + +/* WritePool_setDstFile: + * Sets the destination file for future files in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) { + assert(ctx!=NULL); + /* We can change the dst file only if we have finished writing */ + if(ctx->writerPool) + POOL_joinJobs(ctx->writerPool); + assert(ctx->storedSkips == 0); + assert(ctx->availableWriteJobs == ctx->totalWriteJobs); + ctx->dstFile = dstFile; +} + +/* WritePool_closeDstFile: + * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +static int WritePool_closeDstFile(write_pool_ctx_t *ctx) { + FILE *dstFile = ctx->dstFile; + assert(dstFile!=NULL || ctx->prefs->testMode!=0); + WritePool_sparseWriteEnd(ctx); + WritePool_setDstFile(ctx, NULL); + return fclose(dstFile); +} /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ @@ -2224,7 +2447,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, U64 alreadyDecoded) /* for multi-frames streams */ { U64 frameSize = 0; - U32 storedSkips = 0; + write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); /* display last 20 characters only */ { size_t const srcFileLength = strlen(srcFileName); @@ -2244,7 +2467,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, /* Main decompression Loop */ while (1) { ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 }; - ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 }; + ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff); const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2; UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize); @@ -2256,7 +2479,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, } /* Write block */ - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips); + writeJob->usedBufferSize = outBuff.pos; + WritePool_queueAndReacquireWriteJob(&writeJob); frameSize += outBuff.pos; if (fCtx->nbFilesTotal > 1) { size_t srcFileNameSize = strlen(srcFileName); @@ -2273,10 +2497,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, srcFileName, hrs.precision, hrs.value, hrs.suffix); } - if (inBuff.pos > 0) { - memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos); - ress->srcBufferLoaded -= inBuff.pos; - } + FIO_consumeDSrcBuffer(ress, inBuff.pos); if (readSizeHint == 0) break; /* end of frame */ @@ -2294,7 +2515,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ress->srcBufferLoaded += readSize; } } } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return frameSize; } @@ -2302,15 +2524,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, #ifdef ZSTD_GZDECOMPRESS static unsigned long long -FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, - const char* srcFileName) +FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) { unsigned long long outFileSize = 0; z_stream strm; int flush = Z_NO_FLUSH; int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = NULL; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; @@ -2321,8 +2541,9 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK) return FIO_ERROR_FRAME_DECODING; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_in = (uInt)ress->srcBufferLoaded; strm.next_in = (z_const unsigned char*)ress->srcBuffer; @@ -2343,35 +2564,34 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + WritePool_queueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; } } if (ret == Z_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); + if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */ && (decodingError==0) ) { DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); decodingError = 1; } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif - #ifdef ZSTD_LZMADECOMPRESS static unsigned long long FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, const char* srcFileName, int plain_lzma) { unsigned long long outFileSize = 0; @@ -2379,7 +2599,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, lzma_action action = LZMA_RUN; lzma_ret initRet; int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = NULL; strm.next_in = 0; strm.avail_in = 0; @@ -2396,8 +2616,9 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = (BYTE const*)ress->srcBuffer; strm.avail_in = ress->srcBufferLoaded; @@ -2420,21 +2641,21 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + WritePool_queueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = writeJob->bufferSize; } } if (ret == LZMA_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); lzma_end(&strm); - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2442,60 +2663,57 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, #ifdef ZSTD_LZ4DECOMPRESS static unsigned long long FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, const char* srcFileName) { unsigned long long filesize = 0; - LZ4F_errorCode_t nextToLoad; + LZ4F_errorCode_t nextToLoad = 4; LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); if (LZ4F_isError(errorCode)) { DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n"); return FIO_ERROR_FRAME_DECODING; } - /* Init feed with magic number (already consumed from FILE* sFile) */ - { size_t inSize = 4; - size_t outSize= 0; - MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER); - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL); - if (LZ4F_isError(nextToLoad)) { - DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n", - srcFileName, LZ4F_getErrorName(nextToLoad)); - LZ4F_freeDecompressionContext(dCtx); - return FIO_ERROR_FRAME_DECODING; - } } - /* Main Loop */ for (;nextToLoad;) { size_t readSize; size_t pos = 0; - size_t decodedBytes = ress->dstBufferSize; + size_t decodedBytes = writeJob->bufferSize; + int fullBufferDecoded = 0; /* Read input */ - if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize; - readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile); - if (!readSize) break; /* reached end of file or stream */ + nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded); + readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile); + if(!readSize && ferror(srcFile)) { + DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); + decodingError=1; + break; + } + if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */ + ress->srcBufferLoaded += readSize; - while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) { /* still to read, or still to flush */ + while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ /* Decode Input (at least partially) */ - size_t remaining = readSize - pos; - decodedBytes = ress->dstBufferSize; - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); + size_t remaining = ress->srcBufferLoaded - pos; + decodedBytes = writeJob->bufferSize; + nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); decodingError = 1; nextToLoad = 0; break; } pos += remaining; + assert(pos <= ress->srcBufferLoaded); + fullBufferDecoded = decodedBytes == writeJob->bufferSize; /* Write Block */ if (decodedBytes) { UTIL_HumanReadableSize_t hrs; - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips); + writeJob->usedBufferSize = decodedBytes; + WritePool_queueAndReacquireWriteJob(&writeJob); filesize += decodedBytes; hrs = UTIL_makeHumanReadableSize(filesize); DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix); @@ -2503,21 +2721,16 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (!nextToLoad) break; } + FIO_consumeDSrcBuffer(ress, pos); } - /* can be out because readSize == 0, which could be an fread() error */ - if (ferror(srcFile)) { - DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); - decodingError=1; - } - if (nextToLoad!=0) { DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName); decodingError=1; } LZ4F_freeDecompressionContext(dCtx); - ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */ - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : filesize; } @@ -2566,7 +2779,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, filesize += frameSize; } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */ #ifdef ZSTD_GZDECOMPRESS - unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2576,7 +2789,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */ || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */ #ifdef ZSTD_LZMADECOMPRESS - unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD); + unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2585,7 +2798,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) { #ifdef ZSTD_LZ4DECOMPRESS - unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2594,7 +2807,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ return FIO_passThrough(prefs, - ress.dstFile, srcFile, + ress.writePoolCtx->dstFile, srcFile, ress.srcBuffer, ress.srcBufferSize, ress.srcBufferLoaded); } else { @@ -2632,7 +2845,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, int releaseDstFile = 0; int transferMTime = 0; - if ((ress.dstFile == NULL) && (prefs->testMode==0)) { + if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) { + FILE *dstFile; int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */ && strcmp(dstFileName, stdoutmark) @@ -2644,8 +2858,9 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, releaseDstFile = 1; - ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); - if (ress.dstFile==NULL) return 1; + dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); + if (dstFile==NULL) return 1; + WritePool_setDstFile(ress.writePoolCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, @@ -2657,10 +2872,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName); if (releaseDstFile) { - FILE* const dstFile = ress.dstFile; clearHandler(); - ress.dstFile = NULL; - if (fclose(dstFile)) { + if (WritePool_closeDstFile(ress.writePoolCtx)) { DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result = 1; } @@ -2874,15 +3087,16 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx, return 1; } if (!prefs->testMode) { - ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); - if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); + if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + WritePool_setDstFile(ress.writePoolCtx, dstFile); } for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) { status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if ((!prefs->testMode) && (fclose(ress.dstFile))) + if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx))) EXM_THROW(72, "Write error : %s : cannot properly close output file", strerror(errno)); } else { diff --git a/programs/fileio.h b/programs/fileio.h index 61094db83..398937a64 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -109,6 +109,7 @@ void FIO_setAllowBlockDevices(FIO_prefs_t* const prefs, int allowBlockDevices); void FIO_setPatchFromMode(FIO_prefs_t* const prefs, int value); void FIO_setContentSize(FIO_prefs_t* const prefs, int value); void FIO_displayCompressionParameters(const FIO_prefs_t* prefs); +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value); /* FIO_ctx_t functions */ void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value); diff --git a/programs/zstdcli.c b/programs/zstdcli.c index bfe18c0c1..fd563e1c2 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -239,9 +239,12 @@ static void usage_advanced(const char* programName) #ifndef ZSTD_NODECOMPRESS DISPLAYOUT( "\n"); DISPLAYOUT( "Advanced decompression arguments : \n"); - DISPLAYOUT( " -l : print information about zstd compressed files \n"); - DISPLAYOUT( "--test : test compressed file integrity \n"); - DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); + DISPLAYOUT( " -l : print information about zstd compressed files \n"); + DISPLAYOUT( "--test : test compressed file integrity \n"); + DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); +#ifdef ZSTD_MULTITHREAD + DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n"); +#endif # if ZSTD_SPARSE_DEFAULT DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n"); # else @@ -912,6 +915,8 @@ int main(int argCount, const char* argv[]) if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(prefs, 2); continue; } if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(prefs, 0); continue; } if (!strcmp(argument, "--test")) { operation=zom_test; continue; } + if (!strcmp(argument, "--asyncio")) { FIO_setAsyncIOFlag(prefs, 1); continue;} + if (!strcmp(argument, "--no-asyncio")) { FIO_setAsyncIOFlag(prefs, 0); continue;} if (!strcmp(argument, "--train")) { operation=zom_train; if (outFileName==NULL) outFileName=g_defaultDictName; continue; } if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(prefs, 0); continue; } if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(prefs, 0); continue; } diff --git a/tests/playTests.sh b/tests/playTests.sh index b7a3d88a8..78d8e742a 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -1575,6 +1575,44 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then exit 1 fi +println "\n===> zstd asyncio decompression tests " + +addFrame() { + datagen -g2M -s$2 >> tmp_uncompressed + datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst +} + +addTwoFrames() { + addFrame $1 1 + addFrame $1 2 +} + +testAsyncIO() { + roundTripTest -g2M "3 --asyncio --format=$1" + roundTripTest -g2M "3 --no-asyncio --format=$1" +} + +rm -f tmp_compressed tmp_uncompressed +testAsyncIO zstd +addTwoFrames zstd +if [ $GZIPMODE -eq 1 ]; then + testAsyncIO gzip + addTwoFrames gzip +fi +if [ $LZMAMODE -eq 1 ]; then + testAsyncIO lzma + addTwoFrames lzma +fi +if [ $LZ4MODE -eq 1 ]; then + testAsyncIO lz4 + addTwoFrames lz4 +fi +cat tmp_uncompressed | $MD5SUM > tmp2 +zstd -d tmp_compressed.zst --asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 +rm tmp1 +zstd -d tmp_compressed.zst --no-asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 if [ "$1" != "--test-large-data" ]; then println "Skipping large data tests"