diff --git a/programs/fileio.c b/programs/fileio.c index a6e361821..eb489d210 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -50,6 +50,8 @@ UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; /* ************************************* * Synchronous compression IO helpers +* Lightweight wrapper used by compression paths to manage buffered +* reads/writes without the async job machinery. ***************************************/ typedef struct { const FIO_prefs_t* prefs; @@ -1478,7 +1480,7 @@ FIO_compressGzFrame(cRess_t* ress, const char* srcFileName, U64 const srcFileSize, int compressionLevel, U64* readsize) { - FIO_SyncCompressIO* const io = &ress->io; + FIO_SyncCompressIO* const syncIO = &ress->io; unsigned long long inFileSize = 0, outFileSize = 0; z_stream strm; @@ -1498,34 +1500,34 @@ FIO_compressGzFrame(cRess_t* ress, strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (Bytef*)io->outBuffer; - strm.avail_out = (uInt)io->outCapacity; + strm.next_out = (Bytef*)syncIO->outBuffer; + strm.avail_out = (uInt)syncIO->outCapacity; while (1) { int ret; if (strm.avail_in == 0) { - size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); - if (io->srcBufferLoaded == 0) break; + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); + if (syncIO->srcBufferLoaded == 0) break; inFileSize += added; *readsize += added; - strm.next_in = (z_const unsigned char*)io->srcBuffer; - strm.avail_in = (uInt)io->srcBufferLoaded; + strm.next_in = (z_const unsigned char*)syncIO->srcBuffer; + strm.avail_in = (uInt)syncIO->srcBufferLoaded; } { size_t const availBefore = strm.avail_in; ret = deflate(&strm, Z_NO_FLUSH); - FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); + FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in); } if (ret != Z_OK) EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); - { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); + { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out); if (cSize) { - FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize); outFileSize += cSize; - strm.next_out = (Bytef*)io->outBuffer; - strm.avail_out = (uInt)io->outCapacity; + strm.next_out = (Bytef*)syncIO->outBuffer; + strm.avail_out = (uInt)syncIO->outCapacity; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { DISPLAYUPDATE_PROGRESS( @@ -1541,12 +1543,12 @@ FIO_compressGzFrame(cRess_t* ress, while (1) { int const ret = deflate(&strm, Z_FINISH); - { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); + { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out); if (cSize) { - FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize); outFileSize += cSize; - strm.next_out = (Bytef*)io->outBuffer; - strm.avail_out = (uInt)io->outCapacity; + strm.next_out = (Bytef*)syncIO->outBuffer; + strm.avail_out = (uInt)syncIO->outCapacity; } } if (ret == Z_STREAM_END) break; if (ret != Z_BUF_ERROR) @@ -1558,7 +1560,7 @@ FIO_compressGzFrame(cRess_t* ress, EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); } } *readsize = inFileSize; - FIO_SyncCompressIO_finish(io); + FIO_SyncCompressIO_finish(syncIO); return outFileSize; } #endif @@ -1570,7 +1572,7 @@ FIO_compressLzmaFrame(cRess_t* ress, const char* srcFileName, U64 const srcFileSize, int compressionLevel, U64* readsize, int plain_lzma) { - FIO_SyncCompressIO* const io = &ress->io; + FIO_SyncCompressIO* const syncIO = &ress->io; unsigned long long inFileSize = 0, outFileSize = 0; lzma_stream strm = LZMA_STREAM_INIT; lzma_action action = LZMA_RUN; @@ -1592,35 +1594,35 @@ FIO_compressLzmaFrame(cRess_t* ress, EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); } - strm.next_out = (BYTE*)io->outBuffer; - strm.avail_out = io->outCapacity; + strm.next_out = (BYTE*)syncIO->outBuffer; + strm.avail_out = syncIO->outCapacity; strm.next_in = 0; strm.avail_in = 0; while (1) { if (strm.avail_in == 0) { - size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); - if (io->srcBufferLoaded == 0) action = LZMA_FINISH; + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); + if (syncIO->srcBufferLoaded == 0) action = LZMA_FINISH; inFileSize += added; *readsize += added; - strm.next_in = (BYTE const*)io->srcBuffer; - strm.avail_in = io->srcBufferLoaded; + strm.next_in = (BYTE const*)syncIO->srcBuffer; + strm.avail_in = syncIO->srcBufferLoaded; } { size_t const availBefore = strm.avail_in; ret = lzma_code(&strm, action); - FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); + FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in); } if (ret != LZMA_OK && ret != LZMA_STREAM_END) EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); - { size_t const compBytes = io->outCapacity - strm.avail_out; + { size_t const compBytes = syncIO->outCapacity - strm.avail_out; if (compBytes) { - FIO_SyncCompressIO_commitOut(io, io->outBuffer, compBytes); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, compBytes); outFileSize += compBytes; - strm.next_out = (BYTE*)io->outBuffer; - strm.avail_out = io->outCapacity; + strm.next_out = (BYTE*)syncIO->outBuffer; + strm.avail_out = syncIO->outCapacity; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%", @@ -1636,7 +1638,7 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_end(&strm); *readsize = inFileSize; - FIO_SyncCompressIO_finish(io); + FIO_SyncCompressIO_finish(syncIO); return outFileSize; } @@ -1657,7 +1659,7 @@ FIO_compressLz4Frame(cRess_t* ress, int compressionLevel, int checksumFlag, U64* readsize) { - FIO_SyncCompressIO* const io = &ress->io; + FIO_SyncCompressIO* const syncIO = &ress->io; const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB); unsigned long long inFileSize = 0, outFileSize = 0; @@ -1670,7 +1672,7 @@ FIO_compressLz4Frame(cRess_t* ress, memset(&prefs, 0, sizeof(prefs)); - assert(blockSize <= io->inCapacity); + assert(blockSize <= syncIO->inCapacity); /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */ prefs.autoFlush = 0; @@ -1681,26 +1683,26 @@ FIO_compressLz4Frame(cRess_t* ress, #if LZ4_VERSION_NUMBER >= 10600 prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; #endif - assert(LZ4F_compressBound(blockSize, &prefs) <= io->outCapacity); + assert(LZ4F_compressBound(blockSize, &prefs) <= syncIO->outCapacity); { - size_t headerSize = LZ4F_compressBegin(ctx, io->outBuffer, io->outCapacity, &prefs); + size_t headerSize = LZ4F_compressBegin(ctx, syncIO->outBuffer, syncIO->outCapacity, &prefs); if (LZ4F_isError(headerSize)) EXM_THROW(33, "File header generation failed : %s", LZ4F_getErrorName(headerSize)); - FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize); outFileSize += headerSize; { - size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize); inFileSize += added; *readsize += added; } - while (io->srcBufferLoaded) { - size_t const inSize = MIN(blockSize, io->srcBufferLoaded); - size_t const outSize = LZ4F_compressUpdate(ctx, io->outBuffer, io->outCapacity, - io->srcBuffer, inSize, NULL); + while (syncIO->srcBufferLoaded) { + size_t const inSize = MIN(blockSize, syncIO->srcBufferLoaded); + size_t const outSize = LZ4F_compressUpdate(ctx, syncIO->outBuffer, syncIO->outCapacity, + syncIO->srcBuffer, inSize, NULL); if (LZ4F_isError(outSize)) EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", srcFileName, LZ4F_getErrorName(outSize)); @@ -1715,27 +1717,27 @@ FIO_compressLz4Frame(cRess_t* ress, (double)outFileSize/(double)inFileSize*100); } - FIO_SyncCompressIO_commitOut(io, io->outBuffer, outSize); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outSize); - FIO_SyncCompressIO_consumeBytes(io, inSize); + FIO_SyncCompressIO_consumeBytes(syncIO, inSize); { - size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); + size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize); inFileSize += added; *readsize += added; } } - headerSize = LZ4F_compressEnd(ctx, io->outBuffer, io->outCapacity, NULL); + headerSize = LZ4F_compressEnd(ctx, syncIO->outBuffer, syncIO->outCapacity, NULL); if (LZ4F_isError(headerSize)) EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", srcFileName, LZ4F_getErrorName(headerSize)); - FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize); outFileSize += headerSize; } LZ4F_freeCompressionContext(ctx); - FIO_SyncCompressIO_finish(io); + FIO_SyncCompressIO_finish(syncIO); return outFileSize; } @@ -1748,7 +1750,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, const char* srcFileName, U64 fileSize, int compressionLevel, U64* readsize) { - FIO_SyncCompressIO* const io = &ress->io; + FIO_SyncCompressIO* const syncIO = &ress->io; U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; @@ -1800,12 +1802,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, do { size_t stillToFlush; /* Fill input Buffer */ - size_t const inSize = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); - ZSTD_inBuffer inBuff = setInBuffer( io->srcBuffer, io->srcBufferLoaded, 0 ); + size_t const inSize = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize()); + ZSTD_inBuffer inBuff = setInBuffer( syncIO->srcBuffer, syncIO->srcBufferLoaded, 0 ); DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); *readsize += inSize; - if ((io->srcBufferLoaded == 0) || (*readsize == fileSize)) + if ((syncIO->srcBufferLoaded == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; stillToFlush = 1; @@ -1813,10 +1815,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, || (directive == ZSTD_e_end && stillToFlush != 0) ) { size_t const oldIPos = inBuff.pos; - ZSTD_outBuffer outBuff = setOutBuffer( io->outBuffer, io->outCapacity, 0 ); + ZSTD_outBuffer outBuff = setOutBuffer( syncIO->outBuffer, syncIO->outCapacity, 0 ); size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx); CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive)); - FIO_SyncCompressIO_consumeBytes(io, inBuff.pos - oldIPos); + FIO_SyncCompressIO_consumeBytes(syncIO, inBuff.pos - oldIPos); /* count stats */ inputPresented++; @@ -1827,7 +1829,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); if (outBuff.pos) { - FIO_SyncCompressIO_commitOut(io, io->outBuffer, outBuff.pos); + FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outBuff.pos); compressedfilesize += outBuff.pos; } @@ -1970,7 +1972,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, (unsigned long long)*readsize, (unsigned long long)fileSize); } - FIO_SyncCompressIO_finish(io); + FIO_SyncCompressIO_finish(syncIO); return compressedfilesize; }