diff --git a/programs/fileio.c b/programs/fileio.c index fec0e2d66..1e767ee64 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -48,6 +48,36 @@ FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto}; UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; +/* ************************************* +* Synchronous compression IO helpers +***************************************/ +typedef struct { + const FIO_prefs_t* prefs; + FILE* srcFile; + FILE* dstFile; + unsigned storedSkips; + U8* inBuffer; + size_t inCapacity; + U8* srcBuffer; + size_t srcBufferLoaded; + U8* outBuffer; + size_t outCapacity; +} FIO_SyncCompressIO; + +static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io, + const FIO_prefs_t* prefs, + size_t inCapacity, + size_t outCapacity); +static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io); +static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file); +static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io); +static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file); +static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io); +static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave); +static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n); +static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size); +static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io); + #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "../lib/zstd.h" #include "../lib/zstd_errors.h" /* ZSTD_error_frameParameter_windowTooLarge */ @@ -125,6 +155,233 @@ char const* FIO_lzmaVersion(void) #define TEMPORARY_FILE_PERMISSIONS (0600) #endif +static unsigned FIO_sparseWrite(FILE* file, + const void* buffer, size_t bufferSize, + const FIO_prefs_t* const prefs, + unsigned storedSkips) +{ + const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ + size_t bufferSizeT = bufferSize / sizeof(size_t); + const size_t* const bufferTEnd = bufferT + bufferSizeT; + const size_t* ptrT = bufferT; + static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ + + if (prefs->testMode) return 0; /* do not output anything in test mode */ + + if (!prefs->sparseFileSupport) { /* normal write */ + size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); + if (sizeCheck != bufferSize) + EXM_THROW(70, "Write error : cannot write block : %s", + strerror(errno)); + return 0; + } + + /* avoid int overflow */ + if (storedSkips > 1 GB) { + if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) + EXM_THROW(91, "1 GB skip error (sparse file support)"); + storedSkips -= 1 GB; + } + + while (ptrT < bufferTEnd) { + size_t nb0T; + + /* adjust last segment if < 32 KB */ + size_t seg0SizeT = segmentSizeT; + if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; + bufferSizeT -= seg0SizeT; + + /* count leading zeroes */ + for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; + storedSkips += (unsigned)(nb0T * sizeof(size_t)); + + if (nb0T != seg0SizeT) { /* not all 0s */ + size_t const nbNon0ST = seg0SizeT - nb0T; + /* skip leading zeros */ + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + storedSkips = 0; + /* write the rest */ + if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) + EXM_THROW(93, "Write error : cannot write block : %s", + strerror(errno)); + } + ptrT += seg0SizeT; + } + + { static size_t const maskT = sizeof(size_t)-1; + if (bufferSize & maskT) { + /* size not multiple of sizeof(size_t) : implies end of block */ + const char* const restStart = (const char*)bufferTEnd; + const char* restPtr = restStart; + const char* const restEnd = (const char*)buffer + bufferSize; + assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); + for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; + storedSkips += (unsigned) (restPtr - restStart); + if (restPtr != restEnd) { + /* not all remaining bytes are 0 */ + size_t const restSize = (size_t)(restEnd - restPtr); + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + if (fwrite(restPtr, 1, restSize, file) != restSize) + EXM_THROW(95, "Write error : cannot write end of decoded block : %s", + strerror(errno)); + storedSkips = 0; + } } } + + return storedSkips; +} + +static void FIO_sparseWriteEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) +{ + if (file == NULL) return; + if (prefs->testMode) { + assert(storedSkips == 0); + return; + } + if (storedSkips>0) { + assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ + if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) + EXM_THROW(69, "Final skip error (sparse file support)"); + /* last zero must be explicitly written, + * so that skipped ones get implicitly translated as zero by FS */ + { const char lastZeroByte[1] = { 0 }; + if (fwrite(lastZeroByte, 1, 1, file) != 1) + EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); + } + } +} + +static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io, + const FIO_prefs_t* prefs, + size_t inCapacity, + size_t outCapacity) +{ + memset(io, 0, sizeof(*io)); + io->prefs = prefs; + io->inCapacity = inCapacity; + io->outCapacity = outCapacity; + io->inBuffer = (U8*)malloc(inCapacity); + if (!io->inBuffer) + EXM_THROW(101, "Allocation error : not enough memory"); + io->outBuffer = (U8*)malloc(outCapacity); + if (!io->outBuffer) { + free(io->inBuffer); + io->inBuffer = NULL; + EXM_THROW(101, "Allocation error : not enough memory"); + } + io->srcBuffer = io->inBuffer; + io->srcBufferLoaded = 0; +} + +static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io) +{ + if (!io) return; + free(io->inBuffer); + free(io->outBuffer); + io->inBuffer = NULL; + io->outBuffer = NULL; + io->srcBuffer = NULL; + io->srcBufferLoaded = 0; + io->srcFile = NULL; + io->dstFile = NULL; + io->storedSkips = 0; +} + +static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file) +{ + io->srcFile = file; + io->srcBuffer = io->inBuffer; + io->srcBufferLoaded = 0; +} + +static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io) +{ + io->srcFile = NULL; + io->srcBuffer = io->inBuffer; + io->srcBufferLoaded = 0; +} + +static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file) +{ + io->dstFile = file; + io->storedSkips = 0; +} + +static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io) +{ + int result = 0; + if (io->dstFile != NULL) { + FIO_SyncCompressIO_finish(io); + result = fclose(io->dstFile); + io->dstFile = NULL; + } + return result; +} + +static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave) +{ + size_t added = 0; + if (io->srcFile == NULL) + return 0; + + if (minToHave > io->inCapacity) + minToHave = io->inCapacity; + + if (io->srcBufferLoaded >= minToHave) + return 0; + + if (io->srcBuffer != io->inBuffer) { + if (io->srcBufferLoaded > 0) + memmove(io->inBuffer, io->srcBuffer, io->srcBufferLoaded); + io->srcBuffer = io->inBuffer; + } + + while (io->srcBufferLoaded < minToHave) { + size_t const toRead = io->inCapacity - io->srcBufferLoaded; + size_t const readBytes = fread(io->inBuffer + io->srcBufferLoaded, 1, toRead, io->srcFile); + if (readBytes == 0) { + if (ferror(io->srcFile)) + EXM_THROW(37, "Read error"); + break; /* EOF */ + } + io->srcBufferLoaded += readBytes; + added += readBytes; + if (readBytes < toRead) + break; + } + + return added; +} + +static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n) +{ + assert(n <= io->srcBufferLoaded); + io->srcBuffer += n; + io->srcBufferLoaded -= n; + if (io->srcBufferLoaded == 0) + io->srcBuffer = io->inBuffer; +} + +static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size) +{ + if (size == 0) + return; + if (io->dstFile == NULL) { + assert(io->prefs->testMode); + return; + } + io->storedSkips = FIO_sparseWrite(io->dstFile, buffer, size, io->prefs, io->storedSkips); +} + +static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io) +{ + if (io->dstFile == NULL) + return; + FIO_sparseWriteEnd(io->prefs, io->dstFile, io->storedSkips); + io->storedSkips = 0; +} + /*-************************************ * Signal (Ctrl-C trapping) **************************************/ @@ -1078,8 +1335,7 @@ typedef struct { const char* dictFileName; stat_t dictFileStat; ZSTD_CStream* cctx; - WritePoolCtx_t *writeCtx; - ReadPoolCtx_t *readCtx; + FIO_SyncCompressIO io; } cRess_t; /** ZSTD_cycleLog() : @@ -1147,14 +1403,7 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict; FIO_initDict(&ress.dict, dictFileName, prefs, &ress.dictFileStat, dictBufferType); /* works with dictFileName==NULL */ - { - /* Compression paths stay synchronous for now: lower overhead and easier upkeep. */ - int const savedAsyncIO = prefs->asyncIO; - prefs->asyncIO = 0; - ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize()); - ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize()); - prefs->asyncIO = savedAsyncIO; - } + FIO_SyncCompressIO_init(&ress.io, prefs, ZSTD_CStreamInSize(), ZSTD_CStreamOutSize()); /* Advanced parameters, including dictionary */ if (dictFileName && (ress.dict.dictBuffer==NULL)) @@ -1218,21 +1467,20 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, static void FIO_freeCResources(cRess_t* const ress) { FIO_freeDict(&(ress->dict)); - AIO_WritePool_free(ress->writeCtx); - AIO_ReadPool_free(ress->readCtx); + FIO_SyncCompressIO_free(&ress->io); ZSTD_freeCStream(ress->cctx); /* never fails */ } #ifdef ZSTD_GZCOMPRESS static unsigned long long -FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but not changed */ +FIO_compressGzFrame(cRess_t* ress, const char* srcFileName, U64 const srcFileSize, int compressionLevel, U64* readsize) { + FIO_SyncCompressIO* const io = &ress->io; unsigned long long inFileSize = 0, outFileSize = 0; z_stream strm; - IOJob_t *writeJob = NULL; if (compressionLevel > Z_BEST_COMPRESSION) compressionLevel = Z_BEST_COMPRESSION; @@ -1248,37 +1496,36 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret); } } - writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (Bytef*)writeJob->buffer; - strm.avail_out = (uInt)writeJob->bufferSize; + strm.next_out = (Bytef*)io->outBuffer; + strm.avail_out = (uInt)io->outCapacity; while (1) { int ret; if (strm.avail_in == 0) { - AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); - if (ress->readCtx->srcBufferLoaded == 0) break; - inFileSize += ress->readCtx->srcBufferLoaded; - strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; - strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; + size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); + if (io->srcBufferLoaded == 0) break; + inFileSize += added; + *readsize += added; + strm.next_in = (z_const unsigned char*)io->srcBuffer; + strm.avail_in = (uInt)io->srcBufferLoaded; } { size_t const availBefore = strm.avail_in; ret = deflate(&strm, Z_NO_FLUSH); - AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); + FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); } if (ret != Z_OK) EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); - { size_t const cSize = writeJob->bufferSize - strm.avail_out; + { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); if (cSize) { - writeJob->usedBufferSize = cSize; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); outFileSize += cSize; - strm.next_out = (Bytef*)writeJob->buffer; - strm.avail_out = (uInt)writeJob->bufferSize; + strm.next_out = (Bytef*)io->outBuffer; + strm.avail_out = (uInt)io->outCapacity; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { DISPLAYUPDATE_PROGRESS( @@ -1294,13 +1541,12 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no while (1) { int const ret = deflate(&strm, Z_FINISH); - { size_t const cSize = writeJob->bufferSize - strm.avail_out; + { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); if (cSize) { - writeJob->usedBufferSize = cSize; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); outFileSize += cSize; - strm.next_out = (Bytef*)writeJob->buffer; - strm.avail_out = (uInt)writeJob->bufferSize; + strm.next_out = (Bytef*)io->outBuffer; + strm.avail_out = (uInt)io->outCapacity; } } if (ret == Z_STREAM_END) break; if (ret != Z_BUF_ERROR) @@ -1312,8 +1558,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); } } *readsize = inFileSize; - AIO_WritePool_releaseIoJob(writeJob); - AIO_WritePool_sparseWriteEnd(ress->writeCtx); + FIO_SyncCompressIO_finish(io); return outFileSize; } #endif @@ -1325,11 +1570,11 @@ FIO_compressLzmaFrame(cRess_t* ress, const char* srcFileName, U64 const srcFileSize, int compressionLevel, U64* readsize, int plain_lzma) { + FIO_SyncCompressIO* const io = &ress->io; unsigned long long inFileSize = 0, outFileSize = 0; lzma_stream strm = LZMA_STREAM_INIT; lzma_action action = LZMA_RUN; lzma_ret ret; - IOJob_t *writeJob = NULL; if (compressionLevel < 0) compressionLevel = 0; if (compressionLevel > 9) compressionLevel = 9; @@ -1347,37 +1592,35 @@ FIO_compressLzmaFrame(cRess_t* ress, EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); } - writeJob =AIO_WritePool_acquireJob(ress->writeCtx); - strm.next_out = (BYTE*)writeJob->buffer; - strm.avail_out = writeJob->bufferSize; + strm.next_out = (BYTE*)io->outBuffer; + strm.avail_out = io->outCapacity; strm.next_in = 0; strm.avail_in = 0; while (1) { if (strm.avail_in == 0) { - size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); - if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; - inFileSize += inSize; - strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; - strm.avail_in = ress->readCtx->srcBufferLoaded; + size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); + if (io->srcBufferLoaded == 0) action = LZMA_FINISH; + inFileSize += added; + *readsize += added; + strm.next_in = (BYTE const*)io->srcBuffer; + strm.avail_in = io->srcBufferLoaded; } { size_t const availBefore = strm.avail_in; ret = lzma_code(&strm, action); - AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); + FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); } - if (ret != LZMA_OK && ret != LZMA_STREAM_END) EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); - { size_t const compBytes = writeJob->bufferSize - strm.avail_out; + { size_t const compBytes = io->outCapacity - strm.avail_out; if (compBytes) { - writeJob->usedBufferSize = compBytes; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, compBytes); outFileSize += compBytes; - strm.next_out = (BYTE*)writeJob->buffer; - strm.avail_out = writeJob->bufferSize; + strm.next_out = (BYTE*)io->outBuffer; + strm.avail_out = io->outCapacity; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%", @@ -1393,8 +1636,7 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_end(&strm); *readsize = inFileSize; - AIO_WritePool_releaseIoJob(writeJob); - AIO_WritePool_sparseWriteEnd(ress->writeCtx); + FIO_SyncCompressIO_finish(io); return outFileSize; } @@ -1415,21 +1657,20 @@ FIO_compressLz4Frame(cRess_t* ress, int compressionLevel, int checksumFlag, U64* readsize) { + FIO_SyncCompressIO* const io = &ress->io; const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB); unsigned long long inFileSize = 0, outFileSize = 0; LZ4F_preferences_t prefs; LZ4F_compressionContext_t ctx; - IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx); - LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(errorCode)) EXM_THROW(31, "zstd: failed to create lz4 compression context"); memset(&prefs, 0, sizeof(prefs)); - assert(blockSize <= ress->readCtx->base.jobBufferSize); + assert(blockSize <= io->inCapacity); /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */ prefs.autoFlush = 0; @@ -1440,25 +1681,26 @@ FIO_compressLz4Frame(cRess_t* ress, #if LZ4_VERSION_NUMBER >= 10600 prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; #endif - assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize); + assert(LZ4F_compressBound(blockSize, &prefs) <= io->outCapacity); { - size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs); + size_t headerSize = LZ4F_compressBegin(ctx, io->outBuffer, io->outCapacity, &prefs); if (LZ4F_isError(headerSize)) EXM_THROW(33, "File header generation failed : %s", LZ4F_getErrorName(headerSize)); - writeJob->usedBufferSize = headerSize; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); outFileSize += headerSize; - /* Read first block */ - inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); + { + size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); + inFileSize += added; + *readsize += added; + } - /* Main Loop */ - while (ress->readCtx->srcBufferLoaded) { - size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded); - size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize, - ress->readCtx->srcBuffer, inSize, NULL); + while (io->srcBufferLoaded) { + size_t const inSize = MIN(blockSize, io->srcBufferLoaded); + size_t const outSize = LZ4F_compressUpdate(ctx, io->outBuffer, io->outCapacity, + io->srcBuffer, inSize, NULL); if (LZ4F_isError(outSize)) EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", srcFileName, LZ4F_getErrorName(outSize)); @@ -1473,30 +1715,27 @@ FIO_compressLz4Frame(cRess_t* ress, (double)outFileSize/(double)inFileSize*100); } - /* Write Block */ - writeJob->usedBufferSize = outSize; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, outSize); - /* Read next block */ - AIO_ReadPool_consumeBytes(ress->readCtx, inSize); - inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); + FIO_SyncCompressIO_consumeBytes(io, inSize); + { + size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); + inFileSize += added; + *readsize += added; + } } - /* End of Stream mark */ - headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL); + headerSize = LZ4F_compressEnd(ctx, io->outBuffer, io->outCapacity, NULL); if (LZ4F_isError(headerSize)) EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", srcFileName, LZ4F_getErrorName(headerSize)); - writeJob->usedBufferSize = headerSize; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); outFileSize += headerSize; } - *readsize = inFileSize; LZ4F_freeCompressionContext(ctx); - AIO_WritePool_releaseIoJob(writeJob); - AIO_WritePool_sparseWriteEnd(ress->writeCtx); + FIO_SyncCompressIO_finish(io); return outFileSize; } @@ -1505,12 +1744,11 @@ FIO_compressLz4Frame(cRess_t* ress, static unsigned long long FIO_compressZstdFrame(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - const cRess_t* ressPtr, + cRess_t* ress, const char* srcFileName, U64 fileSize, int compressionLevel, U64* readsize) { - cRess_t const ress = *ressPtr; - IOJob_t* writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx); + FIO_SyncCompressIO* const io = &ress->io; U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; @@ -1535,16 +1773,16 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, /* init */ if (fileSize != UTIL_FILESIZE_UNKNOWN) { pledgedSrcSize = fileSize; - CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize)); + CHECK(ZSTD_CCtx_setPledgedSrcSize(ress->cctx, fileSize)); } else if (prefs->streamSrcSize > 0) { /* unknown source size; use the declared stream size */ pledgedSrcSize = prefs->streamSrcSize; - CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) ); + CHECK( ZSTD_CCtx_setPledgedSrcSize(ress->cctx, prefs->streamSrcSize) ); } { int windowLog; UTIL_HumanReadableSize_t windowSize; - CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog)); + CHECK(ZSTD_CCtx_getParameter(ress->cctx, ZSTD_c_windowLog, &windowLog)); if (windowLog == 0) { if (prefs->ldmFlag) { /* If long mode is set without a window size libzstd will set this size internally */ @@ -1562,12 +1800,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, do { size_t stillToFlush; /* Fill input Buffer */ - size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize()); - ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 ); + size_t const inSize = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); + ZSTD_inBuffer inBuff = setInBuffer( io->srcBuffer, io->srcBufferLoaded, 0 ); DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); *readsize += inSize; - if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize)) + if ((io->srcBufferLoaded == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; stillToFlush = 1; @@ -1575,10 +1813,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, || (directive == ZSTD_e_end && stillToFlush != 0) ) { size_t const oldIPos = inBuff.pos; - ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 ); - size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); - CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive)); - AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos); + ZSTD_outBuffer outBuff = setOutBuffer( io->outBuffer, io->outCapacity, 0 ); + size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx); + CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive)); + FIO_SyncCompressIO_consumeBytes(io, inBuff.pos - oldIPos); /* count stats */ inputPresented++; @@ -1589,14 +1827,13 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); if (outBuff.pos) { - writeJob->usedBufferSize = outBuff.pos; - AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + FIO_SyncCompressIO_commitOut(io, io->outBuffer, outBuff.pos); compressedfilesize += outBuff.pos; } /* adaptive mode : statistics measurement and speed correction */ if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) { - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx); lastAdaptTime = UTIL_getTime(); @@ -1669,14 +1906,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel(); if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel; compressionLevel += (compressionLevel == 0); /* skip 0 */ - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); + ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel); } if (speedChange == faster) { DISPLAYLEVEL(6, "faster speed , lighter compression \n") compressionLevel --; if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel; compressionLevel -= (compressionLevel == 0); /* skip 0 */ - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); + ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel); } speedChange = noChange; @@ -1686,7 +1923,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, /* display notification */ if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) { - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx); double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed); UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed); @@ -1733,8 +1970,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, (unsigned long long)*readsize, (unsigned long long)fileSize); } - AIO_WritePool_releaseIoJob(writeJob); - AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx); + FIO_SyncCompressIO_finish(io); return compressedfilesize; } @@ -1747,7 +1983,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, static int FIO_compressFilename_internal(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - cRess_t ress, + cRess_t* ress, const char* dstFileName, const char* srcFileName, int compressionLevel) { @@ -1762,12 +1998,12 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, switch (prefs->compressionType) { default: case FIO_zstdCompression: - compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, &ress, srcFileName, fileSize, compressionLevel, &readsize); + compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, ress, srcFileName, fileSize, compressionLevel, &readsize); break; case FIO_gzipCompression: #ifdef ZSTD_GZCOMPRESS - compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize); + compressedfilesize = FIO_compressGzFrame(ress, srcFileName, fileSize, compressionLevel, &readsize); #else (void)compressionLevel; EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n", @@ -1778,7 +2014,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, case FIO_xzCompression: case FIO_lzmaCompression: #ifdef ZSTD_LZMACOMPRESS - compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression); + compressedfilesize = FIO_compressLzmaFrame(ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression); #else (void)compressionLevel; EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n", @@ -1788,7 +2024,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, case FIO_lz4Compression: #ifdef ZSTD_LZ4COMPRESS - compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize); + compressedfilesize = FIO_compressLz4Frame(ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize); #else (void)compressionLevel; EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n", @@ -1844,7 +2080,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, */ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - cRess_t ress, + cRess_t* ress, const char* dstFileName, const char* srcFileName, const stat_t* srcFileStat, @@ -1855,8 +2091,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, int transferStat = 0; int dstFd = -1; - assert(AIO_ReadPool_getFile(ress.readCtx) != NULL); - if (AIO_WritePool_getFile(ress.writeCtx) == NULL) { + if (ress->io.dstFile == NULL) { int dstFileInitialPermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp (srcFileName, stdinmark) && strcmp (dstFileName, stdoutmark) @@ -1867,15 +2102,13 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, closeDstFile = 1; DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName); - { FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions); + { + FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions); if (dstFile==NULL) return 1; /* could not open dstFileName */ dstFd = fileno(dstFile); - AIO_WritePool_setFile(ress.writeCtx, dstFile); + FIO_SyncCompressIO_setDst(&ress->io, dstFile); } - /* Must only be added after FIO_openDstFile() succeeds. - * Otherwise we may delete the destination file if it already exists, - * and the user presses Ctrl-C when asked if they wish to overwrite. - */ + /* Must only be added after FIO_openDstFile() succeeds. */ addHandler(dstFileName); } @@ -1889,7 +2122,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, } DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName); - if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */ + if (FIO_SyncCompressIO_closeDst(&ress->io)) { /* error closing file */ DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result=1; } @@ -1898,10 +2131,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, UTIL_utime(dstFileName, srcFileStat); } - if ( (result != 0) /* operation failure */ - && strcmp(dstFileName, stdoutmark) /* special case : don't remove() stdout */ - ) { - FIO_removeFile(dstFileName); /* remove compression artefact; note don't do anything special if remove() fails */ + if ( (result != 0) + && strcmp(dstFileName, stdoutmark) ) { + FIO_removeFile(dstFileName); } } @@ -2035,7 +2267,7 @@ static const char *compressedFileExtensions[] = { static int FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - cRess_t ress, + cRess_t* ress, const char* dstFileName, const char* srcFileName, int compressionLevel) @@ -2057,7 +2289,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, } /* ensure src is not the same as dict (if present) */ - if (ress.dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress.dictFileName, &srcFileStat, &ress.dictFileStat)) { + if (ress->dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress->dictFileName, &srcFileStat, &ress->dictFileStat)) { DISPLAYLEVEL(1, "zstd: cannot use %s as an input file and dictionary \n", srcFileName); return 1; } @@ -2076,19 +2308,21 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat); if (srcFile == NULL) return 1; /* srcFile could not be opened */ - /* AsyncIO is disabled for compression to favor predictable performance and simpler upkeep. */ if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */ fileSize = UTIL_getFileSizeStat(&srcFileStat); (void)fileSize; - AIO_ReadPool_setAsync(ress.readCtx, 0); - AIO_WritePool_setAsync(ress.writeCtx, 0); - AIO_ReadPool_setFile(ress.readCtx, srcFile); + FIO_SyncCompressIO_setSrc(&ress->io, srcFile); result = FIO_compressFilename_dstFile( fCtx, prefs, ress, dstFileName, srcFileName, &srcFileStat, compressionLevel); - AIO_ReadPool_closeFile(ress.readCtx); + FIO_SyncCompressIO_clearSrc(&ress->io); + + if (srcFile != NULL && fclose(srcFile)) { + DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno)); + return 1; + } if ( prefs->removeSrcFile /* --rm */ && result == 0 /* success */ @@ -2155,7 +2389,7 @@ int FIO_compressFilename(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, const int compressionLevel, ZSTD_compressionParameters comprParams) { cRess_t ress = FIO_createCResources(prefs, dictFileName, UTIL_getFileSize(srcFileName), compressionLevel, comprParams); - int const result = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); + int const result = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel); #define DISPLAY_LEVEL_DEFAULT 2 @@ -2252,13 +2486,13 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, if (dstFile == NULL) { /* could not open outFileName */ error = 1; } else { - AIO_WritePool_setFile(ress.writeCtx, dstFile); + FIO_SyncCompressIO_setDst(&ress.io, dstFile); for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) { - status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); + status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if (AIO_WritePool_closeFile(ress.writeCtx)) + if (FIO_SyncCompressIO_closeDst(&ress.io)) EXM_THROW(29, "Write error (%s) : cannot properly close %s", strerror(errno), outFileName); } @@ -2282,7 +2516,7 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, } else { dstFileName = FIO_determineCompressedName(srcFileName, outDirName, suffix); /* cannot fail */ } - status = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); + status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel); if (!status) fCtx->nbFilesProcessed++; error |= status; }