1
0
mirror of https://github.com/facebook/zstd.git synced 2025-11-03 20:33:11 +03:00

changed name to syncIO for clarity

This commit is contained in:
Yann Collet
2025-10-25 11:23:16 -07:00
parent ccadc33a59
commit 41f2673acd

View File

@@ -50,6 +50,8 @@ UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
/* ************************************* /* *************************************
* Synchronous compression IO helpers * Synchronous compression IO helpers
* Lightweight wrapper used by compression paths to manage buffered
* reads/writes without the async job machinery.
***************************************/ ***************************************/
typedef struct { typedef struct {
const FIO_prefs_t* prefs; const FIO_prefs_t* prefs;
@@ -1478,7 +1480,7 @@ FIO_compressGzFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize, const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize) int compressionLevel, U64* readsize)
{ {
FIO_SyncCompressIO* const io = &ress->io; FIO_SyncCompressIO* const syncIO = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0; unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm; z_stream strm;
@@ -1498,34 +1500,34 @@ FIO_compressGzFrame(cRess_t* ress,
strm.next_in = 0; strm.next_in = 0;
strm.avail_in = 0; strm.avail_in = 0;
strm.next_out = (Bytef*)io->outBuffer; strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)io->outCapacity; strm.avail_out = (uInt)syncIO->outCapacity;
while (1) { while (1) {
int ret; int ret;
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
if (io->srcBufferLoaded == 0) break; if (syncIO->srcBufferLoaded == 0) break;
inFileSize += added; inFileSize += added;
*readsize += added; *readsize += added;
strm.next_in = (z_const unsigned char*)io->srcBuffer; strm.next_in = (z_const unsigned char*)syncIO->srcBuffer;
strm.avail_in = (uInt)io->srcBufferLoaded; strm.avail_in = (uInt)syncIO->srcBufferLoaded;
} }
{ {
size_t const availBefore = strm.avail_in; size_t const availBefore = strm.avail_in;
ret = deflate(&strm, Z_NO_FLUSH); ret = deflate(&strm, Z_NO_FLUSH);
FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in);
} }
if (ret != Z_OK) if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
{ size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out);
if (cSize) { if (cSize) {
FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize);
outFileSize += cSize; outFileSize += cSize;
strm.next_out = (Bytef*)io->outBuffer; strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)io->outCapacity; strm.avail_out = (uInt)syncIO->outCapacity;
} } } }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE_PROGRESS( DISPLAYUPDATE_PROGRESS(
@@ -1541,12 +1543,12 @@ FIO_compressGzFrame(cRess_t* ress,
while (1) { while (1) {
int const ret = deflate(&strm, Z_FINISH); int const ret = deflate(&strm, Z_FINISH);
{ size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out); { size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out);
if (cSize) { if (cSize) {
FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize);
outFileSize += cSize; outFileSize += cSize;
strm.next_out = (Bytef*)io->outBuffer; strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)io->outCapacity; strm.avail_out = (uInt)syncIO->outCapacity;
} } } }
if (ret == Z_STREAM_END) break; if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR) if (ret != Z_BUF_ERROR)
@@ -1558,7 +1560,7 @@ FIO_compressGzFrame(cRess_t* ress,
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} } } }
*readsize = inFileSize; *readsize = inFileSize;
FIO_SyncCompressIO_finish(io); FIO_SyncCompressIO_finish(syncIO);
return outFileSize; return outFileSize;
} }
#endif #endif
@@ -1570,7 +1572,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize, const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize, int plain_lzma) int compressionLevel, U64* readsize, int plain_lzma)
{ {
FIO_SyncCompressIO* const io = &ress->io; FIO_SyncCompressIO* const syncIO = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0; unsigned long long inFileSize = 0, outFileSize = 0;
lzma_stream strm = LZMA_STREAM_INIT; lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN; lzma_action action = LZMA_RUN;
@@ -1592,35 +1594,35 @@ FIO_compressLzmaFrame(cRess_t* ress,
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
} }
strm.next_out = (BYTE*)io->outBuffer; strm.next_out = (BYTE*)syncIO->outBuffer;
strm.avail_out = io->outCapacity; strm.avail_out = syncIO->outCapacity;
strm.next_in = 0; strm.next_in = 0;
strm.avail_in = 0; strm.avail_in = 0;
while (1) { while (1) {
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
if (io->srcBufferLoaded == 0) action = LZMA_FINISH; if (syncIO->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += added; inFileSize += added;
*readsize += added; *readsize += added;
strm.next_in = (BYTE const*)io->srcBuffer; strm.next_in = (BYTE const*)syncIO->srcBuffer;
strm.avail_in = io->srcBufferLoaded; strm.avail_in = syncIO->srcBufferLoaded;
} }
{ {
size_t const availBefore = strm.avail_in; size_t const availBefore = strm.avail_in;
ret = lzma_code(&strm, action); ret = lzma_code(&strm, action);
FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in); FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in);
} }
if (ret != LZMA_OK && ret != LZMA_STREAM_END) if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
{ size_t const compBytes = io->outCapacity - strm.avail_out; { size_t const compBytes = syncIO->outCapacity - strm.avail_out;
if (compBytes) { if (compBytes) {
FIO_SyncCompressIO_commitOut(io, io->outBuffer, compBytes); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, compBytes);
outFileSize += compBytes; outFileSize += compBytes;
strm.next_out = (BYTE*)io->outBuffer; strm.next_out = (BYTE*)syncIO->outBuffer;
strm.avail_out = io->outCapacity; strm.avail_out = syncIO->outCapacity;
} } } }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%", DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%",
@@ -1636,7 +1638,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_end(&strm); lzma_end(&strm);
*readsize = inFileSize; *readsize = inFileSize;
FIO_SyncCompressIO_finish(io); FIO_SyncCompressIO_finish(syncIO);
return outFileSize; return outFileSize;
} }
@@ -1657,7 +1659,7 @@ FIO_compressLz4Frame(cRess_t* ress,
int compressionLevel, int checksumFlag, int compressionLevel, int checksumFlag,
U64* readsize) U64* readsize)
{ {
FIO_SyncCompressIO* const io = &ress->io; FIO_SyncCompressIO* const syncIO = &ress->io;
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB); const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
unsigned long long inFileSize = 0, outFileSize = 0; unsigned long long inFileSize = 0, outFileSize = 0;
@@ -1670,7 +1672,7 @@ FIO_compressLz4Frame(cRess_t* ress,
memset(&prefs, 0, sizeof(prefs)); memset(&prefs, 0, sizeof(prefs));
assert(blockSize <= io->inCapacity); assert(blockSize <= syncIO->inCapacity);
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */ /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
prefs.autoFlush = 0; prefs.autoFlush = 0;
@@ -1681,26 +1683,26 @@ FIO_compressLz4Frame(cRess_t* ress,
#if LZ4_VERSION_NUMBER >= 10600 #if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif #endif
assert(LZ4F_compressBound(blockSize, &prefs) <= io->outCapacity); assert(LZ4F_compressBound(blockSize, &prefs) <= syncIO->outCapacity);
{ {
size_t headerSize = LZ4F_compressBegin(ctx, io->outBuffer, io->outCapacity, &prefs); size_t headerSize = LZ4F_compressBegin(ctx, syncIO->outBuffer, syncIO->outCapacity, &prefs);
if (LZ4F_isError(headerSize)) if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s", EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize)); LZ4F_getErrorName(headerSize));
FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize);
outFileSize += headerSize; outFileSize += headerSize;
{ {
size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize);
inFileSize += added; inFileSize += added;
*readsize += added; *readsize += added;
} }
while (io->srcBufferLoaded) { while (syncIO->srcBufferLoaded) {
size_t const inSize = MIN(blockSize, io->srcBufferLoaded); size_t const inSize = MIN(blockSize, syncIO->srcBufferLoaded);
size_t const outSize = LZ4F_compressUpdate(ctx, io->outBuffer, io->outCapacity, size_t const outSize = LZ4F_compressUpdate(ctx, syncIO->outBuffer, syncIO->outCapacity,
io->srcBuffer, inSize, NULL); syncIO->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize)) if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize)); srcFileName, LZ4F_getErrorName(outSize));
@@ -1715,27 +1717,27 @@ FIO_compressLz4Frame(cRess_t* ress,
(double)outFileSize/(double)inFileSize*100); (double)outFileSize/(double)inFileSize*100);
} }
FIO_SyncCompressIO_commitOut(io, io->outBuffer, outSize); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outSize);
FIO_SyncCompressIO_consumeBytes(io, inSize); FIO_SyncCompressIO_consumeBytes(syncIO, inSize);
{ {
size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize); size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize);
inFileSize += added; inFileSize += added;
*readsize += added; *readsize += added;
} }
} }
headerSize = LZ4F_compressEnd(ctx, io->outBuffer, io->outCapacity, NULL); headerSize = LZ4F_compressEnd(ctx, syncIO->outBuffer, syncIO->outCapacity, NULL);
if (LZ4F_isError(headerSize)) if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize)); srcFileName, LZ4F_getErrorName(headerSize));
FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize);
outFileSize += headerSize; outFileSize += headerSize;
} }
LZ4F_freeCompressionContext(ctx); LZ4F_freeCompressionContext(ctx);
FIO_SyncCompressIO_finish(io); FIO_SyncCompressIO_finish(syncIO);
return outFileSize; return outFileSize;
} }
@@ -1748,7 +1750,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
const char* srcFileName, U64 fileSize, const char* srcFileName, U64 fileSize,
int compressionLevel, U64* readsize) int compressionLevel, U64* readsize)
{ {
FIO_SyncCompressIO* const io = &ress->io; FIO_SyncCompressIO* const syncIO = &ress->io;
U64 compressedfilesize = 0; U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue; ZSTD_EndDirective directive = ZSTD_e_continue;
@@ -1800,12 +1802,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
do { do {
size_t stillToFlush; size_t stillToFlush;
/* Fill input Buffer */ /* Fill input Buffer */
size_t const inSize = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize()); size_t const inSize = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
ZSTD_inBuffer inBuff = setInBuffer( io->srcBuffer, io->srcBufferLoaded, 0 ); ZSTD_inBuffer inBuff = setInBuffer( syncIO->srcBuffer, syncIO->srcBufferLoaded, 0 );
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize; *readsize += inSize;
if ((io->srcBufferLoaded == 0) || (*readsize == fileSize)) if ((syncIO->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end; directive = ZSTD_e_end;
stillToFlush = 1; stillToFlush = 1;
@@ -1813,10 +1815,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|| (directive == ZSTD_e_end && stillToFlush != 0) ) { || (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos; size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = setOutBuffer( io->outBuffer, io->outCapacity, 0 ); ZSTD_outBuffer outBuff = setOutBuffer( syncIO->outBuffer, syncIO->outCapacity, 0 );
size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx); size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive)); CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive));
FIO_SyncCompressIO_consumeBytes(io, inBuff.pos - oldIPos); FIO_SyncCompressIO_consumeBytes(syncIO, inBuff.pos - oldIPos);
/* count stats */ /* count stats */
inputPresented++; inputPresented++;
@@ -1827,7 +1829,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) { if (outBuff.pos) {
FIO_SyncCompressIO_commitOut(io, io->outBuffer, outBuff.pos); FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outBuff.pos);
compressedfilesize += outBuff.pos; compressedfilesize += outBuff.pos;
} }
@@ -1970,7 +1972,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
(unsigned long long)*readsize, (unsigned long long)fileSize); (unsigned long long)*readsize, (unsigned long long)fileSize);
} }
FIO_SyncCompressIO_finish(io); FIO_SyncCompressIO_finish(syncIO);
return compressedfilesize; return compressedfilesize;
} }