1
0
mirror of https://github.com/facebook/zstd.git synced 2025-07-04 07:42:31 +03:00

AsyncIO compression part 2 - added async read and asyncio to compression code (#3022)

* Compression asyncio:
- Added asyncio functionality for compression flow
- Added ReadPool for async reads, implemented in both comp and decomp flows
This commit is contained in:
Yonatan Komornik
2022-01-31 15:43:41 -08:00
committed by GitHub
parent 0b70da6277
commit cc0657f27d
8 changed files with 590 additions and 301 deletions

View File

@ -164,6 +164,7 @@ Advanced arguments :
--filelist FILE : read list of files to operate upon from FILE --filelist FILE : read list of files to operate upon from FILE
--output-dir-flat DIR : processed files are stored into DIR --output-dir-flat DIR : processed files are stored into DIR
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure --output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
--[no-]asyncio : use asynchronous IO (default: enabled)
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate). --[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
-- : All arguments after "--" are treated as files -- : All arguments after "--" are treated as files

View File

@ -289,7 +289,7 @@ FIO_prefs_t* FIO_createPreferences(void)
ret->literalCompressionMode = ZSTD_ps_auto; ret->literalCompressionMode = ZSTD_ps_auto;
ret->excludeCompressedFiles = 0; ret->excludeCompressedFiles = 0;
ret->allowBlockDevices = 0; ret->allowBlockDevices = 0;
ret->asyncIO = 0; ret->asyncIO = AIO_supported();
return ret; return ret;
} }
@ -848,16 +848,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t*
* Compression * Compression
************************************************************************/ ************************************************************************/
typedef struct { typedef struct {
FILE* srcFile;
FILE* dstFile;
void* srcBuffer;
size_t srcBufferSize;
void* dstBuffer;
size_t dstBufferSize;
void* dictBuffer; void* dictBuffer;
size_t dictBufferSize; size_t dictBufferSize;
const char* dictFileName; const char* dictFileName;
ZSTD_CStream* cctx; ZSTD_CStream* cctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
} cRess_t; } cRess_t;
/** ZSTD_cycleLog() : /** ZSTD_cycleLog() :
@ -906,9 +902,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
if (ress.cctx == NULL) if (ress.cctx == NULL)
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx", EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
strerror(errno)); strerror(errno));
ress.srcBufferSize = ZSTD_CStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
ress.dstBufferSize = ZSTD_CStreamOutSize();
/* need to update memLimit before calling createDictBuffer /* need to update memLimit before calling createDictBuffer
* because of memLimit check inside it */ * because of memLimit check inside it */
@ -916,10 +909,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize; unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel); FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
} }
ress.dstBuffer = malloc(ress.dstBufferSize);
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */ ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */
if (!ress.srcBuffer || !ress.dstBuffer)
EXM_THROW(31, "allocation error : not enough memory"); ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
/* Advanced parameters, including dictionary */ /* Advanced parameters, including dictionary */
if (dictFileName && (ress.dictBuffer==NULL)) if (dictFileName && (ress.dictBuffer==NULL))
@ -982,9 +975,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
static void FIO_freeCResources(const cRess_t* const ress) static void FIO_freeCResources(const cRess_t* const ress)
{ {
free(ress->srcBuffer);
free(ress->dstBuffer);
free(ress->dictBuffer); free(ress->dictBuffer);
AIO_WritePool_free(ress->writeCtx);
AIO_ReadPool_free(ress->readCtx);
ZSTD_freeCStream(ress->cctx); /* never fails */ ZSTD_freeCStream(ress->cctx); /* never fails */
} }
@ -997,6 +990,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
{ {
unsigned long long inFileSize = 0, outFileSize = 0; unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm; z_stream strm;
IOJob_t *writeJob = NULL;
if (compressionLevel > Z_BEST_COMPRESSION) if (compressionLevel > Z_BEST_COMPRESSION)
compressionLevel = Z_BEST_COMPRESSION; compressionLevel = Z_BEST_COMPRESSION;
@ -1012,51 +1006,58 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret); EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
} } } }
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_in = 0; strm.next_in = 0;
strm.avail_in = 0; strm.avail_in = 0;
strm.next_out = (Bytef*)ress->dstBuffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)ress->dstBufferSize; strm.avail_out = (uInt)writeJob->bufferSize;
while (1) { while (1) {
int ret; int ret;
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (inSize == 0) break; if (ress->readCtx->srcBufferLoaded == 0) break;
inFileSize += inSize; inFileSize += ress->readCtx->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->srcBuffer; strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
strm.avail_in = (uInt)inSize; strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
} }
ret = deflate(&strm, Z_NO_FLUSH);
{
size_t const availBefore = strm.avail_in;
ret = deflate(&strm, Z_NO_FLUSH);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
}
if (ret != Z_OK) if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
{ size_t const cSize = ress->dstBufferSize - strm.avail_out; { size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) { if (cSize) {
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) writeJob->usedBufferSize = cSize;
EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno)); AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize; outFileSize += cSize;
strm.next_out = (Bytef*)ress->dstBuffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)ress->dstBufferSize; strm.avail_out = (uInt)writeJob->bufferSize;
} } } }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
(unsigned)(inFileSize>>20), (unsigned)(inFileSize>>20),
(double)outFileSize/inFileSize*100) (double)outFileSize/inFileSize*100)
} else { } else {
DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ", DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
(unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20), (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
(double)outFileSize/inFileSize*100); (double)outFileSize/inFileSize*100);
} } } }
while (1) { while (1) {
int const ret = deflate(&strm, Z_FINISH); int const ret = deflate(&strm, Z_FINISH);
{ size_t const cSize = ress->dstBufferSize - strm.avail_out; { size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) { if (cSize) {
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) writeJob->usedBufferSize = cSize;
EXM_THROW(75, "Write error : %s ", strerror(errno)); AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize; outFileSize += cSize;
strm.next_out = (Bytef*)ress->dstBuffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)ress->dstBufferSize; strm.avail_out = (uInt)writeJob->bufferSize;
} } } }
if (ret == Z_STREAM_END) break; if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR) if (ret != Z_BUF_ERROR)
EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret); EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
@ -1067,6 +1068,8 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} } } }
*readsize = inFileSize; *readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize; return outFileSize;
} }
#endif #endif
@ -1082,6 +1085,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_stream strm = LZMA_STREAM_INIT; lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN; lzma_action action = LZMA_RUN;
lzma_ret ret; lzma_ret ret;
IOJob_t *writeJob = NULL;
if (compressionLevel < 0) compressionLevel = 0; if (compressionLevel < 0) compressionLevel = 0;
if (compressionLevel > 9) compressionLevel = 9; if (compressionLevel > 9) compressionLevel = 9;
@ -1099,31 +1103,37 @@ FIO_compressLzmaFrame(cRess_t* ress,
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
} }
writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = 0; strm.next_in = 0;
strm.avail_in = 0; strm.avail_in = 0;
strm.next_out = (BYTE*)ress->dstBuffer;
strm.avail_out = ress->dstBufferSize;
while (1) { while (1) {
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (inSize == 0) action = LZMA_FINISH; if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += inSize; inFileSize += inSize;
strm.next_in = (BYTE const*)ress->srcBuffer; strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = inSize; strm.avail_in = ress->readCtx->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = lzma_code(&strm, action);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
} }
ret = lzma_code(&strm, action);
if (ret != LZMA_OK && ret != LZMA_STREAM_END) if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
{ size_t const compBytes = ress->dstBufferSize - strm.avail_out; { size_t const compBytes = writeJob->bufferSize - strm.avail_out;
if (compBytes) { if (compBytes) {
if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes) writeJob->usedBufferSize = compBytes;
EXM_THROW(85, "Write error : %s", strerror(errno)); AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += compBytes; outFileSize += compBytes;
strm.next_out = (BYTE*)ress->dstBuffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = ress->dstBufferSize; strm.avail_out = writeJob->bufferSize;
} } } }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
@ -1139,6 +1149,9 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_end(&strm); lzma_end(&strm);
*readsize = inFileSize; *readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize; return outFileSize;
} }
#endif #endif
@ -1164,15 +1177,18 @@ FIO_compressLz4Frame(cRess_t* ress,
LZ4F_preferences_t prefs; LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx; LZ4F_compressionContext_t ctx;
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode)) if (LZ4F_isError(errorCode))
EXM_THROW(31, "zstd: failed to create lz4 compression context"); EXM_THROW(31, "zstd: failed to create lz4 compression context");
memset(&prefs, 0, sizeof(prefs)); memset(&prefs, 0, sizeof(prefs));
assert(blockSize <= ress->srcBufferSize); assert(blockSize <= ress->readCtx->base.jobBufferSize);
prefs.autoFlush = 1; /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
prefs.autoFlush = 0;
prefs.compressionLevel = compressionLevel; prefs.compressionLevel = compressionLevel;
prefs.frameInfo.blockMode = LZ4F_blockLinked; prefs.frameInfo.blockMode = LZ4F_blockLinked;
prefs.frameInfo.blockSizeID = LZ4F_max64KB; prefs.frameInfo.blockSizeID = LZ4F_max64KB;
@ -1180,27 +1196,25 @@ FIO_compressLz4Frame(cRess_t* ress,
#if LZ4_VERSION_NUMBER >= 10600 #if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif #endif
assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize); assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
{ {
size_t readSize; size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs);
if (LZ4F_isError(headerSize)) if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s", EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize)); LZ4F_getErrorName(headerSize));
if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize) writeJob->usedBufferSize = headerSize;
EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno)); AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += headerSize; outFileSize += headerSize;
/* Read first block */ /* Read first block */
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
inFileSize += readSize;
/* Main Loop */ /* Main Loop */
while (readSize>0) { while (ress->readCtx->srcBufferLoaded) {
size_t const outSize = LZ4F_compressUpdate(ctx, size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
ress->dstBuffer, ress->dstBufferSize, size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
ress->srcBuffer, readSize, NULL); ress->readCtx->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize)) if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize)); srcFileName, LZ4F_getErrorName(outSize));
@ -1216,33 +1230,29 @@ FIO_compressLz4Frame(cRess_t* ress,
} }
/* Write Block */ /* Write Block */
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile); writeJob->usedBufferSize = outSize;
if (sizeCheck != outSize) AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
EXM_THROW(36, "Write error : %s", strerror(errno));
}
/* Read next block */ /* Read next block */
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
inFileSize += readSize; inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
} }
if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);
/* End of Stream mark */ /* End of Stream mark */
headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL); headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
if (LZ4F_isError(headerSize)) if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize)); srcFileName, LZ4F_getErrorName(headerSize));
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile); writeJob->usedBufferSize = headerSize;
if (sizeCheck != headerSize) AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
EXM_THROW(39, "Write error : %s (cannot write end of stream)",
strerror(errno));
}
outFileSize += headerSize; outFileSize += headerSize;
} }
*readsize = inFileSize; *readsize = inFileSize;
LZ4F_freeCompressionContext(ctx); LZ4F_freeCompressionContext(ctx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize; return outFileSize;
} }
@ -1257,8 +1267,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
int compressionLevel, U64* readsize) int compressionLevel, U64* readsize)
{ {
cRess_t const ress = *ressPtr; cRess_t const ress = *ressPtr;
FILE* const srcFile = ress.srcFile; IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
FILE* const dstFile = ress.dstFile;
U64 compressedfilesize = 0; U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue; ZSTD_EndDirective directive = ZSTD_e_continue;
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
@ -1303,12 +1313,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
do { do {
size_t stillToFlush; size_t stillToFlush;
/* Fill input Buffer */ /* Fill input Buffer */
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 };
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize; *readsize += inSize;
if ((inSize == 0) || (*readsize == fileSize)) if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end; directive = ZSTD_e_end;
stillToFlush = 1; stillToFlush = 1;
@ -1316,9 +1326,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|| (directive == ZSTD_e_end && stillToFlush != 0) ) { || (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos; size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive)); CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
/* count stats */ /* count stats */
inputPresented++; inputPresented++;
@ -1327,12 +1338,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
/* Write compressed stream */ /* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) { if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); writeJob->usedBufferSize = outBuff.pos;
if (sizeCheck != outBuff.pos) AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
EXM_THROW(25, "Write error : %s (cannot write compressed block)",
strerror(errno));
compressedfilesize += outBuff.pos; compressedfilesize += outBuff.pos;
} }
@ -1464,14 +1473,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
} /* while ((inBuff.pos != inBuff.size) */ } /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end); } while (directive != ZSTD_e_end);
if (ferror(srcFile)) {
EXM_THROW(26, "Read error : I/O error");
}
if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) { if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B", EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
(unsigned long long)*readsize, (unsigned long long)fileSize); (unsigned long long)*readsize, (unsigned long long)fileSize);
} }
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
return compressedfilesize; return compressedfilesize;
} }
@ -1572,7 +1581,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
/*! FIO_compressFilename_dstFile() : /*! FIO_compressFilename_dstFile() :
* open dstFileName, or pass-through if ress.dstFile != NULL, * open dstFileName, or pass-through if ress.file != NULL,
* then start compression with FIO_compressFilename_internal(). * then start compression with FIO_compressFilename_internal().
* Manages source removal (--rm) and file permissions transfer. * Manages source removal (--rm) and file permissions transfer.
* note : ress.srcFile must be != NULL, * note : ress.srcFile must be != NULL,
@ -1591,8 +1600,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
int result; int result;
stat_t statbuf; stat_t statbuf;
int transferMTime = 0; int transferMTime = 0;
assert(ress.srcFile != NULL); FILE *dstFile;
if (ress.dstFile == NULL) { assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp (srcFileName, stdinmark) if ( strcmp (srcFileName, stdinmark)
&& strcmp (dstFileName, stdoutmark) && strcmp (dstFileName, stdoutmark)
@ -1604,8 +1614,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
closeDstFile = 1; closeDstFile = 1;
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName); DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (ress.dstFile==NULL) return 1; /* could not open dstFileName */ if (dstFile==NULL) return 1; /* could not open dstFileName */
AIO_WritePool_setFile(ress.writeCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds. /* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists, * Otherwise we may delete the destination file if it already exists,
* and the user presses Ctrl-C when asked if they wish to overwrite. * and the user presses Ctrl-C when asked if they wish to overwrite.
@ -1616,13 +1627,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
if (closeDstFile) { if (closeDstFile) {
FILE* const dstFile = ress.dstFile;
ress.dstFile = NULL;
clearHandler(); clearHandler();
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName); DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
if (fclose(dstFile)) { /* error closing dstFile */ if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result=1; result=1;
} }
@ -1668,6 +1676,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
int compressionLevel) int compressionLevel)
{ {
int result; int result;
FILE* srcFile;
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName); DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
/* ensure src is not a directory */ /* ensure src is not a directory */
@ -1691,13 +1700,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
return 0; return 0;
} }
ress.srcFile = FIO_openSrcFile(prefs, srcFileName); srcFile = FIO_openSrcFile(prefs, srcFileName);
if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */ if (srcFile == NULL) return 1; /* srcFile could not be opened */
AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
AIO_ReadPool_closeFile(ress.readCtx);
fclose(ress.srcFile);
ress.srcFile = NULL;
if ( prefs->removeSrcFile /* --rm */ if ( prefs->removeSrcFile /* --rm */
&& result == 0 /* success */ && result == 0 /* success */
&& strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */ && strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */
@ -1844,23 +1853,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* init */ /* init */
assert(outFileName != NULL || suffix != NULL); assert(outFileName != NULL || suffix != NULL);
if (outFileName != NULL) { /* output into a single destination (stdout typically) */ if (outFileName != NULL) { /* output into a single destination (stdout typically) */
FILE *dstFile;
if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) { if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
FIO_freeCResources(&ress); FIO_freeCResources(&ress);
return 1; return 1;
} }
ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (ress.dstFile == NULL) { /* could not open outFileName */ if (dstFile == NULL) { /* could not open outFileName */
error = 1; error = 1;
} else { } else {
AIO_WritePool_setFile(ress.writeCtx, dstFile);
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) { for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
if (!status) fCtx->nbFilesProcessed++; if (!status) fCtx->nbFilesProcessed++;
error |= status; error |= status;
} }
if (fclose(ress.dstFile)) if (AIO_WritePool_closeFile(ress.writeCtx))
EXM_THROW(29, "Write error (%s) : cannot properly close %s", EXM_THROW(29, "Write error (%s) : cannot properly close %s",
strerror(errno), outFileName); strerror(errno), outFileName);
ress.dstFile = NULL;
} }
} else { } else {
if (outMirroredRootDirName) if (outMirroredRootDirName)
@ -1916,13 +1926,10 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* ************************************************************************** /* **************************************************************************
* Decompression * Decompression
***************************************************************************/ ***************************************************************************/
typedef struct { typedef struct {
void* srcBuffer;
size_t srcBufferSize;
size_t srcBufferLoaded;
ZSTD_DStream* dctx; ZSTD_DStream* dctx;
WritePoolCtx_t *writeCtx; WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
} dRess_t; } dRess_t;
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
@ -1940,11 +1947,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) ); CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag)); CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
ress.srcBufferSize = ZSTD_DStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
if (!ress.srcBuffer)
EXM_THROW(61, "Allocation error : not enough memory");
/* dictionary */ /* dictionary */
{ void* dictBuffer; { void* dictBuffer;
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs); size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
@ -1953,6 +1955,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
} }
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize()); ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());
return ress; return ress;
} }
@ -1960,47 +1963,31 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
static void FIO_freeDResources(dRess_t ress) static void FIO_freeDResources(dRess_t ress)
{ {
CHECK( ZSTD_freeDStream(ress.dctx) ); CHECK( ZSTD_freeDStream(ress.dctx) );
free(ress.srcBuffer);
AIO_WritePool_free(ress.writeCtx); AIO_WritePool_free(ress.writeCtx);
} AIO_ReadPool_free(ress.readCtx);
/* FIO_consumeDSrcBuffer:
* Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
assert(ress->srcBufferLoaded >= len);
ress->srcBufferLoaded -= len;
memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
} }
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */ @return : 0 (no error) */
static int FIO_passThrough(const FIO_prefs_t* const prefs, static int FIO_passThrough(dRess_t *ress)
FILE* foutput, FILE* finput,
void* buffer, size_t bufferSize,
size_t alreadyLoaded)
{ {
size_t const blockSize = MIN(64 KB, bufferSize); size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize());
size_t readFromInput; IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
unsigned storedSkips = 0; AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
/* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */ while(ress->readCtx->srcBufferLoaded) {
{ size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput); size_t writeSize;
if (sizeCheck != alreadyLoaded) { writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno)); assert(writeSize <= writeJob->bufferSize);
return 1; memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize);
} } writeJob->usedBufferSize = writeSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
do { AIO_ReadPool_consumeBytes(ress->readCtx, writeSize);
readFromInput = fread(buffer, 1, blockSize, finput); AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
} while (readFromInput == blockSize);
if (ferror(finput)) {
DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
return 1;
} }
assert(feof(finput)); assert(ress->readCtx->reachedEof);
AIO_WritePool_releaseIoJob(writeJob);
AIO_fwriteSparseEnd(prefs, foutput, storedSkips); AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return 0; return 0;
} }
@ -2018,7 +2005,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
return; return;
/* Try to decode the frame header */ /* Try to decode the frame header */
err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded);
if (err == 0) { if (err == 0) {
unsigned long long const windowSize = header.windowSize; unsigned long long const windowSize = header.windowSize;
unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0); unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
@ -2041,7 +2028,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
*/ */
#define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2)) #define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2))
static unsigned long long static unsigned long long
FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress,
const FIO_prefs_t* const prefs, const FIO_prefs_t* const prefs,
const char* srcFileName, const char* srcFileName,
U64 alreadyDecoded) /* for multi-frames streams */ U64 alreadyDecoded) /* for multi-frames streams */
@ -2057,16 +2044,11 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only); ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
/* Header loading : ensures ZSTD_getFrameHeader() will succeed */ /* Header loading : ensures ZSTD_getFrameHeader() will succeed */
{ size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX; AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX);
if (ress->srcBufferLoaded < toDecode) {
size_t const toRead = toDecode - ress->srcBufferLoaded;
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput);
} }
/* Main decompression Loop */ /* Main decompression Loop */
while (1) { while (1) {
ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 }; ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 };
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff); size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2; const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
@ -2088,7 +2070,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
if (srcFileNameSize > 18) { if (srcFileNameSize > 18) {
const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15; const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ", DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ",
fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix); fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
} else { } else {
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ", DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ",
fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix); fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
@ -2098,23 +2080,21 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
srcFileName, hrs.precision, hrs.value, hrs.suffix); srcFileName, hrs.precision, hrs.value, hrs.suffix);
} }
FIO_consumeDSrcBuffer(ress, inBuff.pos); AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos);
if (readSizeHint == 0) break; /* end of frame */ if (readSizeHint == 0) break; /* end of frame */
/* Fill input buffer */ /* Fill input buffer */
{ size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */ { size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */
if (ress->srcBufferLoaded < toDecode) { if (ress->readCtx->srcBufferLoaded < toDecode) {
size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */ size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode);
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
size_t const readSize = fread(startPosition, 1, toRead, finput);
if (readSize==0) { if (readSize==0) {
DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n", DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
srcFileName); srcFileName);
AIO_WritePool_releaseIoJob(writeJob);
return FIO_ERROR_FRAME_DECODING; return FIO_ERROR_FRAME_DECODING;
} }
ress->srcBufferLoaded += readSize; } } }
} } }
AIO_WritePool_releaseIoJob(writeJob); AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx); AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@ -2125,7 +2105,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
#ifdef ZSTD_GZDECOMPRESS #ifdef ZSTD_GZDECOMPRESS
static unsigned long long static unsigned long long
FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName)
{ {
unsigned long long outFileSize = 0; unsigned long long outFileSize = 0;
z_stream strm; z_stream strm;
@ -2145,16 +2125,16 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
writeJob = AIO_WritePool_acquireJob(ress->writeCtx); writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_out = (uInt)writeJob->bufferSize;
strm.avail_in = (uInt)ress->srcBufferLoaded; strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->srcBuffer; strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
for ( ; ; ) { for ( ; ; ) {
int ret; int ret;
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); AIO_ReadPool_consumeAndRefill(ress->readCtx);
if (ress->srcBufferLoaded == 0) flush = Z_FINISH; if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH;
strm.next_in = (z_const unsigned char*)ress->srcBuffer; strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
strm.avail_in = (uInt)ress->srcBufferLoaded; strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
} }
ret = inflate(&strm, flush); ret = inflate(&strm, flush);
if (ret == Z_BUF_ERROR) { if (ret == Z_BUF_ERROR) {
@ -2177,7 +2157,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
if (ret == Z_STREAM_END) break; if (ret == Z_STREAM_END) break;
} }
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */ if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
&& (decodingError==0) ) { && (decodingError==0) ) {
@ -2192,7 +2172,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
#ifdef ZSTD_LZMADECOMPRESS #ifdef ZSTD_LZMADECOMPRESS
static unsigned long long static unsigned long long
FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, FIO_decompressLzmaFrame(dRess_t* ress,
const char* srcFileName, int plain_lzma) const char* srcFileName, int plain_lzma)
{ {
unsigned long long outFileSize = 0; unsigned long long outFileSize = 0;
@ -2220,16 +2200,16 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
writeJob = AIO_WritePool_acquireJob(ress->writeCtx); writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer; strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = (BYTE const*)ress->srcBuffer; strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->srcBufferLoaded; strm.avail_in = ress->readCtx->srcBufferLoaded;
for ( ; ; ) { for ( ; ; ) {
lzma_ret ret; lzma_ret ret;
if (strm.avail_in == 0) { if (strm.avail_in == 0) {
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); AIO_ReadPool_consumeAndRefill(ress->readCtx);
if (ress->srcBufferLoaded == 0) action = LZMA_FINISH; if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
strm.next_in = (BYTE const*)ress->srcBuffer; strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->srcBufferLoaded; strm.avail_in = ress->readCtx->srcBufferLoaded;
} }
ret = lzma_code(&strm, action); ret = lzma_code(&strm, action);
@ -2253,7 +2233,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
if (ret == LZMA_STREAM_END) break; if (ret == LZMA_STREAM_END) break;
} }
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
lzma_end(&strm); lzma_end(&strm);
AIO_WritePool_releaseIoJob(writeJob); AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx); AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@ -2263,8 +2243,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
#ifdef ZSTD_LZ4DECOMPRESS #ifdef ZSTD_LZ4DECOMPRESS
static unsigned long long static unsigned long long
FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName)
const char* srcFileName)
{ {
unsigned long long filesize = 0; unsigned long long filesize = 0;
LZ4F_errorCode_t nextToLoad = 4; LZ4F_errorCode_t nextToLoad = 4;
@ -2282,34 +2261,27 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
/* Main Loop */ /* Main Loop */
for (;nextToLoad;) { for (;nextToLoad;) {
size_t readSize;
size_t pos = 0; size_t pos = 0;
size_t decodedBytes = writeJob->bufferSize; size_t decodedBytes = writeJob->bufferSize;
int fullBufferDecoded = 0; int fullBufferDecoded = 0;
/* Read input */ /* Read input */
nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded); AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad);
readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile); if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */
if(!readSize && ferror(srcFile)) {
DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
decodingError=1;
break;
}
if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
ress->srcBufferLoaded += readSize;
while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
/* Decode Input (at least partially) */ /* Decode Input (at least partially) */
size_t remaining = ress->srcBufferLoaded - pos; size_t remaining = ress->readCtx->srcBufferLoaded - pos;
decodedBytes = writeJob->bufferSize; decodedBytes = writeJob->bufferSize;
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos,
&remaining, NULL);
if (LZ4F_isError(nextToLoad)) { if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n", DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad)); srcFileName, LZ4F_getErrorName(nextToLoad));
decodingError = 1; nextToLoad = 0; break; decodingError = 1; nextToLoad = 0; break;
} }
pos += remaining; pos += remaining;
assert(pos <= ress->srcBufferLoaded); assert(pos <= ress->readCtx->srcBufferLoaded);
fullBufferDecoded = decodedBytes == writeJob->bufferSize; fullBufferDecoded = decodedBytes == writeJob->bufferSize;
/* Write Block */ /* Write Block */
@ -2324,7 +2296,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
if (!nextToLoad) break; if (!nextToLoad) break;
} }
FIO_consumeDSrcBuffer(ress, pos); AIO_ReadPool_consumeBytes(ress->readCtx, pos);
} }
if (nextToLoad!=0) { if (nextToLoad!=0) {
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName); DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
@ -2348,23 +2320,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
* 1 : error * 1 : error
*/ */
static int FIO_decompressFrames(FIO_ctx_t* const fCtx, static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
dRess_t ress, FILE* srcFile, dRess_t ress, const FIO_prefs_t* const prefs,
const FIO_prefs_t* const prefs, const char* dstFileName, const char* srcFileName)
const char* dstFileName, const char* srcFileName)
{ {
unsigned readSomething = 0; unsigned readSomething = 0;
unsigned long long filesize = 0; unsigned long long filesize = 0;
assert(srcFile != NULL);
/* for each frame */ /* for each frame */
for ( ; ; ) { for ( ; ; ) {
/* check magic number -> version */ /* check magic number -> version */
size_t const toRead = 4; size_t const toRead = 4;
const BYTE* const buf = (const BYTE*)ress.srcBuffer; const BYTE* buf;
if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */ AIO_ReadPool_fillBuffer(ress.readCtx, toRead);
ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded, buf = (const BYTE*)ress.readCtx->srcBuffer;
(size_t)1, toRead - ress.srcBufferLoaded, srcFile); if (ress.readCtx->srcBufferLoaded==0) {
if (ress.srcBufferLoaded==0) {
if (readSomething==0) { /* srcFile is empty (which is invalid) */ if (readSomething==0) { /* srcFile is empty (which is invalid) */
DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName); DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
return 1; return 1;
@ -2372,17 +2341,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
break; /* no more input */ break; /* no more input */
} }
readSomething = 1; /* there is at least 1 byte in srcFile */ readSomething = 1; /* there is at least 1 byte in srcFile */
if (ress.srcBufferLoaded < toRead) { if (ress.readCtx->srcBufferLoaded < toRead) {
DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName); DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
return 1; return 1;
} }
if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) { if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) {
unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize); unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize; filesize += frameSize;
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */ } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
#ifdef ZSTD_GZDECOMPRESS #ifdef ZSTD_GZDECOMPRESS
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName); unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize; filesize += frameSize;
#else #else
@ -2392,7 +2361,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */ } else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */ || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
#ifdef ZSTD_LZMADECOMPRESS #ifdef ZSTD_LZMADECOMPRESS
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD); unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize; filesize += frameSize;
#else #else
@ -2401,7 +2370,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
#endif #endif
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) { } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
#ifdef ZSTD_LZ4DECOMPRESS #ifdef ZSTD_LZ4DECOMPRESS
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName); unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize; filesize += frameSize;
#else #else
@ -2409,10 +2378,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
return 1; return 1;
#endif #endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
return FIO_passThrough(prefs, return FIO_passThrough(&ress);
AIO_WritePool_getFile(ress.writeCtx), srcFile,
ress.srcBuffer, ress.srcBufferSize,
ress.srcBufferLoaded);
} else { } else {
DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName); DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
return 1; return 1;
@ -2432,15 +2398,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
} }
/** FIO_decompressDstFile() : /** FIO_decompressDstFile() :
open `dstFileName`, open `dstFileName`, or pass-through if writeCtx's file is already != 0,
or path-through if ress.dstFile is already != 0,
then start decompression process (FIO_decompressFrames()). then start decompression process (FIO_decompressFrames()).
@return : 0 : OK @return : 0 : OK
1 : operation aborted 1 : operation aborted
*/ */
static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs, FIO_prefs_t* const prefs,
dRess_t ress, FILE* srcFile, dRess_t ress,
const char* dstFileName, const char* srcFileName) const char* dstFileName, const char* srcFileName)
{ {
int result; int result;
@ -2472,7 +2437,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
addHandler(dstFileName); addHandler(dstFileName);
} }
result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName); result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName);
if (releaseDstFile) { if (releaseDstFile) {
clearHandler(); clearHandler();
@ -2513,9 +2478,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
srcFile = FIO_openSrcFile(prefs, srcFileName); srcFile = FIO_openSrcFile(prefs, srcFileName);
if (srcFile==NULL) return 1; if (srcFile==NULL) return 1;
ress.srcBufferLoaded = 0; AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName); result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName);
AIO_ReadPool_setFile(ress.readCtx, NULL);
/* Close file */ /* Close file */
if (fclose(srcFile)) { if (fclose(srcFile)) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) Yann Collet, Facebook, Inc. * Copyright (c) Facebook, Inc.
* All rights reserved. * All rights reserved.
* *
* This source code is licensed under both the BSD-style license (found in the * This source code is licensed under both the BSD-style license (found in the
@ -29,7 +29,8 @@
/** AIO_fwriteSparse() : /** AIO_fwriteSparse() :
* @return : storedSkips, * @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file, static unsigned
AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize, const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs, const FIO_prefs_t* const prefs,
unsigned storedSkips) unsigned storedSkips)
@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file,
if (!prefs->sparseFileSupport) { /* normal write */ if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize) if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write decoded block : %s", EXM_THROW(70, "Write error : cannot write block : %s",
strerror(errno)); strerror(errno));
return 0; return 0;
} }
@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file,
storedSkips = 0; storedSkips = 0;
/* write the rest */ /* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write decoded block : %s", EXM_THROW(93, "Write error : cannot write block : %s",
strerror(errno)); strerror(errno));
} }
ptrT += seg0SizeT; ptrT += seg0SizeT;
@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file,
return storedSkips; return storedSkips;
} }
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) static void
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{ {
if (prefs->testMode) assert(storedSkips == 0); if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) { if (storedSkips>0) {
@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st
* AsyncIO functionality * AsyncIO functionality
************************************************************************/ ************************************************************************/
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void) {
#ifdef ZSTD_MULTITHREAD
return 1;
#else
return 0;
#endif
}
/* *********************************** /* ***********************************
* General IoPool implementation * General IoPool implementation
*************************************/ *************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
void *buffer; IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
IOJob_t *job; void* const buffer = malloc(bufferSize);
job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
if(!job || !buffer) if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory"); EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer; job->buffer = buffer;
job->bufferSize = bufferSize; job->bufferSize = bufferSize;
job->usedBufferSize = 0; job->usedBufferSize = 0;
@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
/* AIO_IOPool_createThreadPool: /* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool. * Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */ * Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) { static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL; ctx->threadPool = NULL;
if(prefs->asyncIO) { if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
EXM_THROW(102,"Failed creating write availableJobs mutex"); EXM_THROW(102,"Failed creating write availableJobs mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2); assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
if (!ctx->threadPool) if (!ctx->threadPool)
EXM_THROW(104, "Failed creating writer thread pool"); EXM_THROW(104, "Failed creating writer thread pool");
} }
} }
/* AIO_IOPool_init: /* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */ * Allocates and sets and a new write pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) { static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i; int i;
AIO_IOPool_createThreadPool(ctx, prefs); AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs; ctx->prefs = prefs;
ctx->poolFunction = poolFunction; ctx->poolFunction = poolFunction;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1; ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
ctx->availableJobsCount = ctx->totalIoJobs; ctx->availableJobsCount = ctx->totalIoJobs;
for(i=0; i < ctx->availableJobsCount; i++) { for(i=0; i < ctx->availableJobsCount; i++) {
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
} }
ctx->jobBufferSize = bufferSize;
ctx->file = NULL; ctx->file = NULL;
} }
/* AIO_IOPool_releaseIoJob: /* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */ * Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t *job) { static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx; IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool) { if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount < MAX_IO_JOBS); assert(ctx->availableJobsCount < ctx->totalIoJobs);
ctx->availableJobs[ctx->availableJobsCount++] = job; ctx->availableJobs[ctx->availableJobsCount++] = job;
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 0);
ctx->availableJobsCount++;
}
} }
/* AIO_IOPool_join: /* AIO_IOPool_join:
@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
/* AIO_IOPool_acquireJob: /* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */ * Returns an available io job to be used for a future io. */
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) { static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job; IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode); assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool) { if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount > 0); assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 1);
ctx->availableJobsCount--;
job = (IOJob_t*)ctx->availableJobs[0];
}
job->usedBufferSize = 0; job->usedBufferSize = 0;
job->file = ctx->file; job->file = ctx->file;
job->offset = 0; job->offset = 0;
@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
* Sets the destination file for future files in the pool. * Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */ * Also requires ending of sparse write if a previous file was used in sparse mode. */
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) { static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL); assert(ctx!=NULL);
AIO_IOPool_join(ctx); AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs); assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file; ctx->file = file;
} }
static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) { static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
return ctx->file; return ctx->file;
} }
/* AIO_IOPool_enqueueJob: /* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution. * Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */ * The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t *job) { static void AIO_IOPool_enqueueJob(IOJob_t* job) {
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx; IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool) if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job); POOL_add(ctx->threadPool, ctx->poolFunction, job);
else else
@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {
/* AIO_WritePool_acquireJob: /* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */ * Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) { IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
return AIO_IOPool_acquireJob(&ctx->base); return AIO_IOPool_acquireJob(&ctx->base);
} }
@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
/* AIO_WritePool_sparseWriteEnd: /* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file. * Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */ * Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) { void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL); assert(ctx != NULL);
if(ctx->base.threadPool) if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool); POOL_joinJobs(ctx->base.threadPool);
@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
* Sets the destination file for future writes in the pool. * Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */ * Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) { void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
AIO_IOPool_setFile(&ctx->base, file); AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0); assert(ctx->storedSkips == 0);
} }
/* AIO_WritePool_getFile: /* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */ * Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) { FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base); return AIO_IOPool_getFile(&ctx->base);
} }
/* AIO_WritePool_releaseIoJob: /* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */ * Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job) { void AIO_WritePool_releaseIoJob(IOJob_t* job) {
AIO_IOPool_releaseIoJob(job); AIO_IOPool_releaseIoJob(job);
} }
/* AIO_WritePool_closeFile: /* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL. * Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) { int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
FILE *dstFile = ctx->base.file; FILE* const dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx); AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL); AIO_IOPool_setFile(&ctx->base, NULL);
@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
/* AIO_WritePool_executeWriteJob: /* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */ * Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){ static void AIO_WritePool_executeWriteJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque; IOJob_t* const job = (IOJob_t*) opaque;
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx; WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job); AIO_IOPool_releaseIoJob(job);
} }
/* AIO_WritePool_create: /* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */ * Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) { WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0; ctx->storedSkips = 0;
@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
assert(ctx->storedSkips==0); assert(ctx->storedSkips==0);
free(ctx); free(ctx);
} }
/* ***********************************
* ReadPool implementation
*************************************/
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
int i;
for(i=0; i<ctx->completedJobsCount; i++) {
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
AIO_IOPool_releaseIoJob(job);
}
ctx->completedJobsCount = 0;
}
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
if(ctx->base.threadPool) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
}
}
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
* if job wasn't found returns NULL.
* IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
int i;
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
for (i=0; i<ctx->completedJobsCount; i++) {
job = (IOJob_t *) ctx->completedJobs[i];
if (job->offset == ctx->waitingOnOffset) {
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
return job;
}
}
return NULL;
}
/* AIO_ReadPool_numReadsInFlight:
* Returns the number of IO read jobs currrently in flight. */
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}
/* AIO_ReadPool_getNextCompletedJob:
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
if (ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
}
if(job) {
assert(job->offset == ctx->waitingOnOffset);
ctx->waitingOnOffset += job->usedBufferSize;
}
if (ctx->base.threadPool)
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
return job;
}
/* AIO_ReadPool_executeReadJob:
* Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
IOJob_t* const job = (IOJob_t*) opaque;
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->reachedEof) {
job->usedBufferSize = 0;
AIO_ReadPool_addJobToCompleted(job);
return;
}
job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
if(job->usedBufferSize < job->bufferSize) {
if(ferror(job->file)) {
EXM_THROW(37, "Read error");
} else if(feof(job->file)) {
ctx->reachedEof = 1;
} else {
EXM_THROW(37, "Unexpected short read");
}
}
AIO_ReadPool_addJobToCompleted(job);
}
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
job->offset = ctx->nextReadOffset;
ctx->nextReadOffset += job->bufferSize;
AIO_IOPool_enqueueJob(job);
}
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
int i;
for (i = 0; i < ctx->base.availableJobsCount; i++) {
AIO_ReadPool_enqueueRead(ctx);
}
}
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(&ctx->base);
AIO_ReadPool_releaseAllCompletedJobs(ctx);
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
}
AIO_IOPool_setFile(&ctx->base, file);
ctx->nextReadOffset = 0;
ctx->waitingOnOffset = 0;
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->reachedEof = 0;
if(file != NULL)
AIO_ReadPool_startReading(ctx);
}
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->completedJobsCount = 0;
ctx->currentJobHeld = NULL;
if(ctx->base.threadPool)
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
return ctx;
}
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
if(AIO_ReadPool_getFile(ctx))
AIO_ReadPool_closeFile(ctx);
if(ctx->base.threadPool)
ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
AIO_IOPool_destroy(&ctx->base);
free(ctx->coalesceBuffer);
free(ctx);
}
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
assert(n <= ctx->srcBufferLoaded);
ctx->srcBufferLoaded -= n;
ctx->srcBuffer += n;
}
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
* Release the current held job and get the next one, returns NULL if no next job available. */
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
AIO_ReadPool_enqueueRead(ctx);
}
ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
return (IOJob_t*) ctx->currentJobHeld;
}
/* AIO_ReadPool_fillBuffer:
* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
IOJob_t *job;
int useCoalesce = 0;
if(n > ctx->base.jobBufferSize)
n = ctx->base.jobBufferSize;
/* We are good, don't read anything */
if (ctx->srcBufferLoaded >= n)
return 0;
/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
* and coalesce the remaining bytes with the next job's buffer */
if (ctx->srcBufferLoaded > 0) {
useCoalesce = 1;
memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
ctx->srcBuffer = ctx->coalesceBuffer;
}
/* Read the next chunk */
job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
if(!job)
return 0;
if(useCoalesce) {
assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
ctx->srcBufferLoaded += job->usedBufferSize;
}
else {
ctx->srcBuffer = (U8 *) job->buffer;
ctx->srcBufferLoaded = job->usedBufferSize;
}
return job->usedBufferSize;
}
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
}
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
FILE* const file = AIO_ReadPool_getFile(ctx);
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) Yann Collet, Facebook, Inc. * Copyright (c) Facebook, Inc.
* All rights reserved. * All rights reserved.
* *
* This source code is licensed under both the BSD-style license (found in the * This source code is licensed under both the BSD-style license (found in the
@ -28,7 +28,7 @@ typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */ /* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool; POOL_ctx* threadPool;
int totalIoJobs; int totalIoJobs;
FIO_prefs_t* prefs; const FIO_prefs_t* prefs;
POOL_function poolFunction; POOL_function poolFunction;
/* Controls the file we currently write to, make changes only by using provided utility functions */ /* Controls the file we currently write to, make changes only by using provided utility functions */
@ -39,8 +39,36 @@ typedef struct {
ZSTD_pthread_mutex_t ioJobsMutex; ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS]; void* availableJobs[MAX_IO_JOBS];
int availableJobsCount; int availableJobsCount;
size_t jobBufferSize;
} IOPoolCtx_t; } IOPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
/* State regarding the currently read file */
int reachedEof;
U64 nextReadOffset;
U64 waitingOnOffset;
/* We may hold an IOJob object as needed if we actively expose its buffer. */
void *currentJobHeld;
/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
* the first of them. Shouldn't be accessed from outside ot utility functions. */
U8 *coalesceBuffer;
/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
* change when consuming / refilling buffer. */
U8 *srcBuffer;
size_t srcBufferLoaded;
/* We need to know what tasks completed so we can use their buffers when their time comes.
* Should only be accessed after locking base.ioJobsMutex . */
void* completedJobs[MAX_IO_JOBS];
int completedJobsCount;
ZSTD_pthread_cond_t jobCompletedCond;
} ReadPoolCtx_t;
typedef struct { typedef struct {
IOPoolCtx_t base; IOPoolCtx_t base;
unsigned storedSkips; unsigned storedSkips;
@ -59,15 +87,10 @@ typedef struct {
U64 offset; U64 offset;
} IOJob_t; } IOJob_t;
/** AIO_fwriteSparse() : /* AIO_supported:
* @return : storedSkips, * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ int AIO_supported(void);
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips);
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
/* AIO_WritePool_releaseIoJob: /* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */ * Releases an acquired job back to the pool. Doesn't execute the job. */
@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
/* AIO_WritePool_getFile: /* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */ * Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx); FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
/* AIO_WritePool_closeFile: /* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL. * Ends sparse write and closes the writePool's current file and sets the file to NULL.
@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create: /* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. * Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */ * bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize); WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_WritePool_free: /* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */ * Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx); void AIO_WritePool_free(WritePoolCtx_t* ctx);
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_fillBuffer:
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
#if defined (__cplusplus) #if defined (__cplusplus)
} }
#endif #endif

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) Yann Collet, Facebook, Inc. * Copyright (c) Facebook, Inc.
* All rights reserved. * All rights reserved.
* *
* This source code is licensed under both the BSD-style license (found in the * This source code is licensed under both the BSD-style license (found in the

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) Yann Collet, Facebook, Inc. * Copyright (c) Facebook, Inc.
* All rights reserved. * All rights reserved.
* *
* This source code is licensed under both the BSD-style license (found in the * This source code is licensed under both the BSD-style license (found in the

View File

@ -46,6 +46,7 @@
# include "zstdcli_trace.h" # include "zstdcli_trace.h"
#endif #endif
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */ #include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
#include "fileio_asyncio.h"
/*-************************************ /*-************************************
@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
#ifdef UTIL_HAS_MIRRORFILELIST #ifdef UTIL_HAS_MIRRORFILELIST
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n"); DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
#endif #endif
if (AIO_supported())
DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");
#ifndef ZSTD_NOCOMPRESS #ifndef ZSTD_NOCOMPRESS
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)"); DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
DISPLAYOUT( " -l : print information about zstd compressed files \n"); DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n"); DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
#ifdef ZSTD_MULTITHREAD
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
#endif
# if ZSTD_SPARSE_DEFAULT # if ZSTD_SPARSE_DEFAULT
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n"); DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
# else # else
@ -1459,6 +1458,7 @@ int main(int argCount, const char* argv[])
FIO_setTargetCBlockSize(prefs, targetCBlockSize); FIO_setTargetCBlockSize(prefs, targetCBlockSize);
FIO_setSrcSizeHint(prefs, srcSizeHint); FIO_setSrcSizeHint(prefs, srcSizeHint);
FIO_setLiteralCompressionMode(prefs, literalCompressionMode); FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
FIO_setSparseWrite(prefs, 0);
if (adaptMin > cLevel) cLevel = adaptMin; if (adaptMin > cLevel) cLevel = adaptMin;
if (adaptMax < cLevel) cLevel = adaptMax; if (adaptMax < cLevel) cLevel = adaptMax;

View File

@ -260,10 +260,13 @@ zstd -dc - < tmp.zst > $INTOVOID
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
zstd -d - < tmp.zst > $INTOVOID zstd -d - < tmp.zst > $INTOVOID
println "test : impose memory limitation (must fail)" println "test : impose memory limitation (must fail)"
zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed" datagen -g500K > tmplimit
zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command zstd -f tmplimit
zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
rm -f tmplimit tmplimit.zst
println "test : overwrite protection" println "test : overwrite protection"
zstd -q tmp && die "overwrite check failed!" zstd -q tmp && die "overwrite check failed!"
println "test : force overwrite" println "test : force overwrite"
@ -1596,11 +1599,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
exit 1 exit 1
fi fi
println "\n===> zstd asyncio decompression tests " println "\n===> zstd asyncio tests "
addFrame() { addFrame() {
datagen -g2M -s$2 >> tmp_uncompressed datagen -g2M -s$2 >> tmp_uncompressed
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
} }
addTwoFrames() { addTwoFrames() {