mirror of
https://github.com/facebook/zstd.git
synced 2025-07-04 07:42:31 +03:00
AsyncIO compression part 2 - added async read and asyncio to compression code (#3022)
* Compression asyncio: - Added asyncio functionality for compression flow - Added ReadPool for async reads, implemented in both comp and decomp flows
This commit is contained in:
@ -164,6 +164,7 @@ Advanced arguments :
|
|||||||
--filelist FILE : read list of files to operate upon from FILE
|
--filelist FILE : read list of files to operate upon from FILE
|
||||||
--output-dir-flat DIR : processed files are stored into DIR
|
--output-dir-flat DIR : processed files are stored into DIR
|
||||||
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
|
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
|
||||||
|
--[no-]asyncio : use asynchronous IO (default: enabled)
|
||||||
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
|
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
|
||||||
-- : All arguments after "--" are treated as files
|
-- : All arguments after "--" are treated as files
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ FIO_prefs_t* FIO_createPreferences(void)
|
|||||||
ret->literalCompressionMode = ZSTD_ps_auto;
|
ret->literalCompressionMode = ZSTD_ps_auto;
|
||||||
ret->excludeCompressedFiles = 0;
|
ret->excludeCompressedFiles = 0;
|
||||||
ret->allowBlockDevices = 0;
|
ret->allowBlockDevices = 0;
|
||||||
ret->asyncIO = 0;
|
ret->asyncIO = AIO_supported();
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -848,16 +848,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t*
|
|||||||
* Compression
|
* Compression
|
||||||
************************************************************************/
|
************************************************************************/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
FILE* srcFile;
|
|
||||||
FILE* dstFile;
|
|
||||||
void* srcBuffer;
|
|
||||||
size_t srcBufferSize;
|
|
||||||
void* dstBuffer;
|
|
||||||
size_t dstBufferSize;
|
|
||||||
void* dictBuffer;
|
void* dictBuffer;
|
||||||
size_t dictBufferSize;
|
size_t dictBufferSize;
|
||||||
const char* dictFileName;
|
const char* dictFileName;
|
||||||
ZSTD_CStream* cctx;
|
ZSTD_CStream* cctx;
|
||||||
|
WritePoolCtx_t *writeCtx;
|
||||||
|
ReadPoolCtx_t *readCtx;
|
||||||
} cRess_t;
|
} cRess_t;
|
||||||
|
|
||||||
/** ZSTD_cycleLog() :
|
/** ZSTD_cycleLog() :
|
||||||
@ -906,9 +902,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
|
|||||||
if (ress.cctx == NULL)
|
if (ress.cctx == NULL)
|
||||||
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
|
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
ress.srcBufferSize = ZSTD_CStreamInSize();
|
|
||||||
ress.srcBuffer = malloc(ress.srcBufferSize);
|
|
||||||
ress.dstBufferSize = ZSTD_CStreamOutSize();
|
|
||||||
|
|
||||||
/* need to update memLimit before calling createDictBuffer
|
/* need to update memLimit before calling createDictBuffer
|
||||||
* because of memLimit check inside it */
|
* because of memLimit check inside it */
|
||||||
@ -916,10 +909,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
|
|||||||
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
|
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
|
||||||
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
|
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
|
||||||
}
|
}
|
||||||
ress.dstBuffer = malloc(ress.dstBufferSize);
|
|
||||||
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */
|
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */
|
||||||
if (!ress.srcBuffer || !ress.dstBuffer)
|
|
||||||
EXM_THROW(31, "allocation error : not enough memory");
|
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
|
||||||
|
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
|
||||||
|
|
||||||
/* Advanced parameters, including dictionary */
|
/* Advanced parameters, including dictionary */
|
||||||
if (dictFileName && (ress.dictBuffer==NULL))
|
if (dictFileName && (ress.dictBuffer==NULL))
|
||||||
@ -982,9 +975,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
|
|||||||
|
|
||||||
static void FIO_freeCResources(const cRess_t* const ress)
|
static void FIO_freeCResources(const cRess_t* const ress)
|
||||||
{
|
{
|
||||||
free(ress->srcBuffer);
|
|
||||||
free(ress->dstBuffer);
|
|
||||||
free(ress->dictBuffer);
|
free(ress->dictBuffer);
|
||||||
|
AIO_WritePool_free(ress->writeCtx);
|
||||||
|
AIO_ReadPool_free(ress->readCtx);
|
||||||
ZSTD_freeCStream(ress->cctx); /* never fails */
|
ZSTD_freeCStream(ress->cctx); /* never fails */
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -997,6 +990,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
|||||||
{
|
{
|
||||||
unsigned long long inFileSize = 0, outFileSize = 0;
|
unsigned long long inFileSize = 0, outFileSize = 0;
|
||||||
z_stream strm;
|
z_stream strm;
|
||||||
|
IOJob_t *writeJob = NULL;
|
||||||
|
|
||||||
if (compressionLevel > Z_BEST_COMPRESSION)
|
if (compressionLevel > Z_BEST_COMPRESSION)
|
||||||
compressionLevel = Z_BEST_COMPRESSION;
|
compressionLevel = Z_BEST_COMPRESSION;
|
||||||
@ -1012,51 +1006,58 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
|||||||
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
|
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
|
||||||
} }
|
} }
|
||||||
|
|
||||||
|
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
strm.next_in = 0;
|
strm.next_in = 0;
|
||||||
strm.avail_in = 0;
|
strm.avail_in = 0;
|
||||||
strm.next_out = (Bytef*)ress->dstBuffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = (uInt)ress->dstBufferSize;
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int ret;
|
int ret;
|
||||||
if (strm.avail_in == 0) {
|
if (strm.avail_in == 0) {
|
||||||
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
|
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
|
||||||
if (inSize == 0) break;
|
if (ress->readCtx->srcBufferLoaded == 0) break;
|
||||||
inFileSize += inSize;
|
inFileSize += ress->readCtx->srcBufferLoaded;
|
||||||
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
|
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
|
||||||
strm.avail_in = (uInt)inSize;
|
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
|
||||||
}
|
}
|
||||||
ret = deflate(&strm, Z_NO_FLUSH);
|
|
||||||
|
{
|
||||||
|
size_t const availBefore = strm.avail_in;
|
||||||
|
ret = deflate(&strm, Z_NO_FLUSH);
|
||||||
|
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
|
||||||
|
}
|
||||||
|
|
||||||
if (ret != Z_OK)
|
if (ret != Z_OK)
|
||||||
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
|
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
|
||||||
{ size_t const cSize = ress->dstBufferSize - strm.avail_out;
|
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
|
||||||
if (cSize) {
|
if (cSize) {
|
||||||
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
|
writeJob->usedBufferSize = cSize;
|
||||||
EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno));
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
outFileSize += cSize;
|
outFileSize += cSize;
|
||||||
strm.next_out = (Bytef*)ress->dstBuffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = (uInt)ress->dstBufferSize;
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
} }
|
} }
|
||||||
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
|
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
|
||||||
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
|
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
|
||||||
(unsigned)(inFileSize>>20),
|
(unsigned)(inFileSize>>20),
|
||||||
(double)outFileSize/inFileSize*100)
|
(double)outFileSize/inFileSize*100)
|
||||||
} else {
|
} else {
|
||||||
DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
|
DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
|
||||||
(unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
|
(unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
|
||||||
(double)outFileSize/inFileSize*100);
|
(double)outFileSize/inFileSize*100);
|
||||||
} }
|
} }
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int const ret = deflate(&strm, Z_FINISH);
|
int const ret = deflate(&strm, Z_FINISH);
|
||||||
{ size_t const cSize = ress->dstBufferSize - strm.avail_out;
|
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
|
||||||
if (cSize) {
|
if (cSize) {
|
||||||
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
|
writeJob->usedBufferSize = cSize;
|
||||||
EXM_THROW(75, "Write error : %s ", strerror(errno));
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
outFileSize += cSize;
|
outFileSize += cSize;
|
||||||
strm.next_out = (Bytef*)ress->dstBuffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = (uInt)ress->dstBufferSize;
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
} }
|
} }
|
||||||
if (ret == Z_STREAM_END) break;
|
if (ret == Z_STREAM_END) break;
|
||||||
if (ret != Z_BUF_ERROR)
|
if (ret != Z_BUF_ERROR)
|
||||||
EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
|
EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
|
||||||
@ -1067,6 +1068,8 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
|||||||
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
|
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
|
||||||
} }
|
} }
|
||||||
*readsize = inFileSize;
|
*readsize = inFileSize;
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
return outFileSize;
|
return outFileSize;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -1082,6 +1085,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
|||||||
lzma_stream strm = LZMA_STREAM_INIT;
|
lzma_stream strm = LZMA_STREAM_INIT;
|
||||||
lzma_action action = LZMA_RUN;
|
lzma_action action = LZMA_RUN;
|
||||||
lzma_ret ret;
|
lzma_ret ret;
|
||||||
|
IOJob_t *writeJob = NULL;
|
||||||
|
|
||||||
if (compressionLevel < 0) compressionLevel = 0;
|
if (compressionLevel < 0) compressionLevel = 0;
|
||||||
if (compressionLevel > 9) compressionLevel = 9;
|
if (compressionLevel > 9) compressionLevel = 9;
|
||||||
@ -1099,31 +1103,37 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
|||||||
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
|
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
strm.next_in = 0;
|
strm.next_in = 0;
|
||||||
strm.avail_in = 0;
|
strm.avail_in = 0;
|
||||||
strm.next_out = (BYTE*)ress->dstBuffer;
|
|
||||||
strm.avail_out = ress->dstBufferSize;
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (strm.avail_in == 0) {
|
if (strm.avail_in == 0) {
|
||||||
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
|
size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
|
||||||
if (inSize == 0) action = LZMA_FINISH;
|
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
|
||||||
inFileSize += inSize;
|
inFileSize += inSize;
|
||||||
strm.next_in = (BYTE const*)ress->srcBuffer;
|
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
|
||||||
strm.avail_in = inSize;
|
strm.avail_in = ress->readCtx->srcBufferLoaded;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
size_t const availBefore = strm.avail_in;
|
||||||
|
ret = lzma_code(&strm, action);
|
||||||
|
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = lzma_code(&strm, action);
|
|
||||||
|
|
||||||
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
|
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
|
||||||
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
|
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
|
||||||
{ size_t const compBytes = ress->dstBufferSize - strm.avail_out;
|
{ size_t const compBytes = writeJob->bufferSize - strm.avail_out;
|
||||||
if (compBytes) {
|
if (compBytes) {
|
||||||
if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes)
|
writeJob->usedBufferSize = compBytes;
|
||||||
EXM_THROW(85, "Write error : %s", strerror(errno));
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
outFileSize += compBytes;
|
outFileSize += compBytes;
|
||||||
strm.next_out = (BYTE*)ress->dstBuffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = ress->dstBufferSize;
|
strm.avail_out = writeJob->bufferSize;
|
||||||
} }
|
} }
|
||||||
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
|
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
|
||||||
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
|
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
|
||||||
@ -1139,6 +1149,9 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
|||||||
lzma_end(&strm);
|
lzma_end(&strm);
|
||||||
*readsize = inFileSize;
|
*readsize = inFileSize;
|
||||||
|
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
|
|
||||||
return outFileSize;
|
return outFileSize;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -1164,15 +1177,18 @@ FIO_compressLz4Frame(cRess_t* ress,
|
|||||||
LZ4F_preferences_t prefs;
|
LZ4F_preferences_t prefs;
|
||||||
LZ4F_compressionContext_t ctx;
|
LZ4F_compressionContext_t ctx;
|
||||||
|
|
||||||
|
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
|
|
||||||
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||||
if (LZ4F_isError(errorCode))
|
if (LZ4F_isError(errorCode))
|
||||||
EXM_THROW(31, "zstd: failed to create lz4 compression context");
|
EXM_THROW(31, "zstd: failed to create lz4 compression context");
|
||||||
|
|
||||||
memset(&prefs, 0, sizeof(prefs));
|
memset(&prefs, 0, sizeof(prefs));
|
||||||
|
|
||||||
assert(blockSize <= ress->srcBufferSize);
|
assert(blockSize <= ress->readCtx->base.jobBufferSize);
|
||||||
|
|
||||||
prefs.autoFlush = 1;
|
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
|
||||||
|
prefs.autoFlush = 0;
|
||||||
prefs.compressionLevel = compressionLevel;
|
prefs.compressionLevel = compressionLevel;
|
||||||
prefs.frameInfo.blockMode = LZ4F_blockLinked;
|
prefs.frameInfo.blockMode = LZ4F_blockLinked;
|
||||||
prefs.frameInfo.blockSizeID = LZ4F_max64KB;
|
prefs.frameInfo.blockSizeID = LZ4F_max64KB;
|
||||||
@ -1180,27 +1196,25 @@ FIO_compressLz4Frame(cRess_t* ress,
|
|||||||
#if LZ4_VERSION_NUMBER >= 10600
|
#if LZ4_VERSION_NUMBER >= 10600
|
||||||
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
|
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
|
||||||
#endif
|
#endif
|
||||||
assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize);
|
assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
|
||||||
|
|
||||||
{
|
{
|
||||||
size_t readSize;
|
size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
|
||||||
size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs);
|
|
||||||
if (LZ4F_isError(headerSize))
|
if (LZ4F_isError(headerSize))
|
||||||
EXM_THROW(33, "File header generation failed : %s",
|
EXM_THROW(33, "File header generation failed : %s",
|
||||||
LZ4F_getErrorName(headerSize));
|
LZ4F_getErrorName(headerSize));
|
||||||
if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize)
|
writeJob->usedBufferSize = headerSize;
|
||||||
EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno));
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
outFileSize += headerSize;
|
outFileSize += headerSize;
|
||||||
|
|
||||||
/* Read first block */
|
/* Read first block */
|
||||||
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
|
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||||
inFileSize += readSize;
|
|
||||||
|
|
||||||
/* Main Loop */
|
/* Main Loop */
|
||||||
while (readSize>0) {
|
while (ress->readCtx->srcBufferLoaded) {
|
||||||
size_t const outSize = LZ4F_compressUpdate(ctx,
|
size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
|
||||||
ress->dstBuffer, ress->dstBufferSize,
|
size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
|
||||||
ress->srcBuffer, readSize, NULL);
|
ress->readCtx->srcBuffer, inSize, NULL);
|
||||||
if (LZ4F_isError(outSize))
|
if (LZ4F_isError(outSize))
|
||||||
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
|
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
|
||||||
srcFileName, LZ4F_getErrorName(outSize));
|
srcFileName, LZ4F_getErrorName(outSize));
|
||||||
@ -1216,33 +1230,29 @@ FIO_compressLz4Frame(cRess_t* ress,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Write Block */
|
/* Write Block */
|
||||||
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile);
|
writeJob->usedBufferSize = outSize;
|
||||||
if (sizeCheck != outSize)
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
EXM_THROW(36, "Write error : %s", strerror(errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Read next block */
|
/* Read next block */
|
||||||
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
|
AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
|
||||||
inFileSize += readSize;
|
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||||
}
|
}
|
||||||
if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);
|
|
||||||
|
|
||||||
/* End of Stream mark */
|
/* End of Stream mark */
|
||||||
headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL);
|
headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
|
||||||
if (LZ4F_isError(headerSize))
|
if (LZ4F_isError(headerSize))
|
||||||
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
|
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
|
||||||
srcFileName, LZ4F_getErrorName(headerSize));
|
srcFileName, LZ4F_getErrorName(headerSize));
|
||||||
|
|
||||||
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile);
|
writeJob->usedBufferSize = headerSize;
|
||||||
if (sizeCheck != headerSize)
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
EXM_THROW(39, "Write error : %s (cannot write end of stream)",
|
|
||||||
strerror(errno));
|
|
||||||
}
|
|
||||||
outFileSize += headerSize;
|
outFileSize += headerSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
*readsize = inFileSize;
|
*readsize = inFileSize;
|
||||||
LZ4F_freeCompressionContext(ctx);
|
LZ4F_freeCompressionContext(ctx);
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
|
|
||||||
return outFileSize;
|
return outFileSize;
|
||||||
}
|
}
|
||||||
@ -1257,8 +1267,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
|||||||
int compressionLevel, U64* readsize)
|
int compressionLevel, U64* readsize)
|
||||||
{
|
{
|
||||||
cRess_t const ress = *ressPtr;
|
cRess_t const ress = *ressPtr;
|
||||||
FILE* const srcFile = ress.srcFile;
|
IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
|
||||||
FILE* const dstFile = ress.dstFile;
|
|
||||||
U64 compressedfilesize = 0;
|
U64 compressedfilesize = 0;
|
||||||
ZSTD_EndDirective directive = ZSTD_e_continue;
|
ZSTD_EndDirective directive = ZSTD_e_continue;
|
||||||
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
|
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
|
||||||
@ -1303,12 +1313,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
|||||||
do {
|
do {
|
||||||
size_t stillToFlush;
|
size_t stillToFlush;
|
||||||
/* Fill input Buffer */
|
/* Fill input Buffer */
|
||||||
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
|
size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
|
||||||
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
|
ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 };
|
||||||
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
|
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
|
||||||
*readsize += inSize;
|
*readsize += inSize;
|
||||||
|
|
||||||
if ((inSize == 0) || (*readsize == fileSize))
|
if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
|
||||||
directive = ZSTD_e_end;
|
directive = ZSTD_e_end;
|
||||||
|
|
||||||
stillToFlush = 1;
|
stillToFlush = 1;
|
||||||
@ -1316,9 +1326,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
|||||||
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
|
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
|
||||||
|
|
||||||
size_t const oldIPos = inBuff.pos;
|
size_t const oldIPos = inBuff.pos;
|
||||||
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
|
||||||
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
|
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
|
||||||
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
|
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
|
||||||
|
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
|
||||||
|
|
||||||
/* count stats */
|
/* count stats */
|
||||||
inputPresented++;
|
inputPresented++;
|
||||||
@ -1327,12 +1338,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
|||||||
|
|
||||||
/* Write compressed stream */
|
/* Write compressed stream */
|
||||||
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
|
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
|
||||||
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
|
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
|
||||||
if (outBuff.pos) {
|
if (outBuff.pos) {
|
||||||
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
|
writeJob->usedBufferSize = outBuff.pos;
|
||||||
if (sizeCheck != outBuff.pos)
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
EXM_THROW(25, "Write error : %s (cannot write compressed block)",
|
|
||||||
strerror(errno));
|
|
||||||
compressedfilesize += outBuff.pos;
|
compressedfilesize += outBuff.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1464,14 +1473,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
|||||||
} /* while ((inBuff.pos != inBuff.size) */
|
} /* while ((inBuff.pos != inBuff.size) */
|
||||||
} while (directive != ZSTD_e_end);
|
} while (directive != ZSTD_e_end);
|
||||||
|
|
||||||
if (ferror(srcFile)) {
|
|
||||||
EXM_THROW(26, "Read error : I/O error");
|
|
||||||
}
|
|
||||||
if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
|
if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
|
||||||
EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
|
EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
|
||||||
(unsigned long long)*readsize, (unsigned long long)fileSize);
|
(unsigned long long)*readsize, (unsigned long long)fileSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
|
AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
|
||||||
|
|
||||||
return compressedfilesize;
|
return compressedfilesize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1572,7 +1581,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
|||||||
|
|
||||||
|
|
||||||
/*! FIO_compressFilename_dstFile() :
|
/*! FIO_compressFilename_dstFile() :
|
||||||
* open dstFileName, or pass-through if ress.dstFile != NULL,
|
* open dstFileName, or pass-through if ress.file != NULL,
|
||||||
* then start compression with FIO_compressFilename_internal().
|
* then start compression with FIO_compressFilename_internal().
|
||||||
* Manages source removal (--rm) and file permissions transfer.
|
* Manages source removal (--rm) and file permissions transfer.
|
||||||
* note : ress.srcFile must be != NULL,
|
* note : ress.srcFile must be != NULL,
|
||||||
@ -1591,8 +1600,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
|||||||
int result;
|
int result;
|
||||||
stat_t statbuf;
|
stat_t statbuf;
|
||||||
int transferMTime = 0;
|
int transferMTime = 0;
|
||||||
assert(ress.srcFile != NULL);
|
FILE *dstFile;
|
||||||
if (ress.dstFile == NULL) {
|
assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
|
||||||
|
if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
|
||||||
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
|
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
|
||||||
if ( strcmp (srcFileName, stdinmark)
|
if ( strcmp (srcFileName, stdinmark)
|
||||||
&& strcmp (dstFileName, stdoutmark)
|
&& strcmp (dstFileName, stdoutmark)
|
||||||
@ -1604,8 +1614,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
|||||||
|
|
||||||
closeDstFile = 1;
|
closeDstFile = 1;
|
||||||
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
|
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
|
||||||
ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
|
dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
|
||||||
if (ress.dstFile==NULL) return 1; /* could not open dstFileName */
|
if (dstFile==NULL) return 1; /* could not open dstFileName */
|
||||||
|
AIO_WritePool_setFile(ress.writeCtx, dstFile);
|
||||||
/* Must only be added after FIO_openDstFile() succeeds.
|
/* Must only be added after FIO_openDstFile() succeeds.
|
||||||
* Otherwise we may delete the destination file if it already exists,
|
* Otherwise we may delete the destination file if it already exists,
|
||||||
* and the user presses Ctrl-C when asked if they wish to overwrite.
|
* and the user presses Ctrl-C when asked if they wish to overwrite.
|
||||||
@ -1616,13 +1627,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
|||||||
result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
||||||
|
|
||||||
if (closeDstFile) {
|
if (closeDstFile) {
|
||||||
FILE* const dstFile = ress.dstFile;
|
|
||||||
ress.dstFile = NULL;
|
|
||||||
|
|
||||||
clearHandler();
|
clearHandler();
|
||||||
|
|
||||||
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
|
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
|
||||||
if (fclose(dstFile)) { /* error closing dstFile */
|
if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
|
||||||
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
|
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
|
||||||
result=1;
|
result=1;
|
||||||
}
|
}
|
||||||
@ -1668,6 +1676,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
|
|||||||
int compressionLevel)
|
int compressionLevel)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
FILE* srcFile;
|
||||||
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
|
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
|
||||||
|
|
||||||
/* ensure src is not a directory */
|
/* ensure src is not a directory */
|
||||||
@ -1691,13 +1700,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ress.srcFile = FIO_openSrcFile(prefs, srcFileName);
|
srcFile = FIO_openSrcFile(prefs, srcFileName);
|
||||||
if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */
|
if (srcFile == NULL) return 1; /* srcFile could not be opened */
|
||||||
|
|
||||||
|
AIO_ReadPool_setFile(ress.readCtx, srcFile);
|
||||||
result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
||||||
|
AIO_ReadPool_closeFile(ress.readCtx);
|
||||||
|
|
||||||
fclose(ress.srcFile);
|
|
||||||
ress.srcFile = NULL;
|
|
||||||
if ( prefs->removeSrcFile /* --rm */
|
if ( prefs->removeSrcFile /* --rm */
|
||||||
&& result == 0 /* success */
|
&& result == 0 /* success */
|
||||||
&& strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */
|
&& strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */
|
||||||
@ -1844,23 +1853,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
|
|||||||
/* init */
|
/* init */
|
||||||
assert(outFileName != NULL || suffix != NULL);
|
assert(outFileName != NULL || suffix != NULL);
|
||||||
if (outFileName != NULL) { /* output into a single destination (stdout typically) */
|
if (outFileName != NULL) { /* output into a single destination (stdout typically) */
|
||||||
|
FILE *dstFile;
|
||||||
if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
|
if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
|
||||||
FIO_freeCResources(&ress);
|
FIO_freeCResources(&ress);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
|
dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
|
||||||
if (ress.dstFile == NULL) { /* could not open outFileName */
|
if (dstFile == NULL) { /* could not open outFileName */
|
||||||
error = 1;
|
error = 1;
|
||||||
} else {
|
} else {
|
||||||
|
AIO_WritePool_setFile(ress.writeCtx, dstFile);
|
||||||
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
|
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
|
||||||
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
|
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
|
||||||
if (!status) fCtx->nbFilesProcessed++;
|
if (!status) fCtx->nbFilesProcessed++;
|
||||||
error |= status;
|
error |= status;
|
||||||
}
|
}
|
||||||
if (fclose(ress.dstFile))
|
if (AIO_WritePool_closeFile(ress.writeCtx))
|
||||||
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
|
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
|
||||||
strerror(errno), outFileName);
|
strerror(errno), outFileName);
|
||||||
ress.dstFile = NULL;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (outMirroredRootDirName)
|
if (outMirroredRootDirName)
|
||||||
@ -1916,13 +1926,10 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
|
|||||||
/* **************************************************************************
|
/* **************************************************************************
|
||||||
* Decompression
|
* Decompression
|
||||||
***************************************************************************/
|
***************************************************************************/
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* srcBuffer;
|
|
||||||
size_t srcBufferSize;
|
|
||||||
size_t srcBufferLoaded;
|
|
||||||
ZSTD_DStream* dctx;
|
ZSTD_DStream* dctx;
|
||||||
WritePoolCtx_t *writeCtx;
|
WritePoolCtx_t *writeCtx;
|
||||||
|
ReadPoolCtx_t *readCtx;
|
||||||
} dRess_t;
|
} dRess_t;
|
||||||
|
|
||||||
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
|
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
|
||||||
@ -1940,11 +1947,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
|
|||||||
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
|
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
|
||||||
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
|
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
|
||||||
|
|
||||||
ress.srcBufferSize = ZSTD_DStreamInSize();
|
|
||||||
ress.srcBuffer = malloc(ress.srcBufferSize);
|
|
||||||
if (!ress.srcBuffer)
|
|
||||||
EXM_THROW(61, "Allocation error : not enough memory");
|
|
||||||
|
|
||||||
/* dictionary */
|
/* dictionary */
|
||||||
{ void* dictBuffer;
|
{ void* dictBuffer;
|
||||||
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
|
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
|
||||||
@ -1953,6 +1955,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
|
|||||||
}
|
}
|
||||||
|
|
||||||
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
|
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
|
||||||
|
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());
|
||||||
|
|
||||||
return ress;
|
return ress;
|
||||||
}
|
}
|
||||||
@ -1960,47 +1963,31 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
|
|||||||
static void FIO_freeDResources(dRess_t ress)
|
static void FIO_freeDResources(dRess_t ress)
|
||||||
{
|
{
|
||||||
CHECK( ZSTD_freeDStream(ress.dctx) );
|
CHECK( ZSTD_freeDStream(ress.dctx) );
|
||||||
free(ress.srcBuffer);
|
|
||||||
AIO_WritePool_free(ress.writeCtx);
|
AIO_WritePool_free(ress.writeCtx);
|
||||||
}
|
AIO_ReadPool_free(ress.readCtx);
|
||||||
|
|
||||||
/* FIO_consumeDSrcBuffer:
|
|
||||||
* Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
|
|
||||||
static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
|
|
||||||
assert(ress->srcBufferLoaded >= len);
|
|
||||||
ress->srcBufferLoaded -= len;
|
|
||||||
memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
|
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
|
||||||
@return : 0 (no error) */
|
@return : 0 (no error) */
|
||||||
static int FIO_passThrough(const FIO_prefs_t* const prefs,
|
static int FIO_passThrough(dRess_t *ress)
|
||||||
FILE* foutput, FILE* finput,
|
|
||||||
void* buffer, size_t bufferSize,
|
|
||||||
size_t alreadyLoaded)
|
|
||||||
{
|
{
|
||||||
size_t const blockSize = MIN(64 KB, bufferSize);
|
size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize());
|
||||||
size_t readFromInput;
|
IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
unsigned storedSkips = 0;
|
AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||||
|
|
||||||
/* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */
|
while(ress->readCtx->srcBufferLoaded) {
|
||||||
{ size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput);
|
size_t writeSize;
|
||||||
if (sizeCheck != alreadyLoaded) {
|
writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
|
||||||
DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno));
|
assert(writeSize <= writeJob->bufferSize);
|
||||||
return 1;
|
memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize);
|
||||||
} }
|
writeJob->usedBufferSize = writeSize;
|
||||||
|
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||||
do {
|
AIO_ReadPool_consumeBytes(ress->readCtx, writeSize);
|
||||||
readFromInput = fread(buffer, 1, blockSize, finput);
|
AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||||
storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
|
|
||||||
} while (readFromInput == blockSize);
|
|
||||||
if (ferror(finput)) {
|
|
||||||
DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
|
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
assert(feof(finput));
|
assert(ress->readCtx->reachedEof);
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2018,7 +2005,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
/* Try to decode the frame header */
|
/* Try to decode the frame header */
|
||||||
err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded);
|
err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded);
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
unsigned long long const windowSize = header.windowSize;
|
unsigned long long const windowSize = header.windowSize;
|
||||||
unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
|
unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
|
||||||
@ -2041,7 +2028,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
|
|||||||
*/
|
*/
|
||||||
#define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2))
|
#define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2))
|
||||||
static unsigned long long
|
static unsigned long long
|
||||||
FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
|
FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress,
|
||||||
const FIO_prefs_t* const prefs,
|
const FIO_prefs_t* const prefs,
|
||||||
const char* srcFileName,
|
const char* srcFileName,
|
||||||
U64 alreadyDecoded) /* for multi-frames streams */
|
U64 alreadyDecoded) /* for multi-frames streams */
|
||||||
@ -2057,16 +2044,11 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
|
|||||||
ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
|
ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
|
||||||
|
|
||||||
/* Header loading : ensures ZSTD_getFrameHeader() will succeed */
|
/* Header loading : ensures ZSTD_getFrameHeader() will succeed */
|
||||||
{ size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX;
|
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX);
|
||||||
if (ress->srcBufferLoaded < toDecode) {
|
|
||||||
size_t const toRead = toDecode - ress->srcBufferLoaded;
|
|
||||||
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
|
|
||||||
ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput);
|
|
||||||
} }
|
|
||||||
|
|
||||||
/* Main decompression Loop */
|
/* Main decompression Loop */
|
||||||
while (1) {
|
while (1) {
|
||||||
ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
|
ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 };
|
||||||
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
|
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
|
||||||
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
|
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
|
||||||
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
|
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
|
||||||
@ -2088,7 +2070,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
|
|||||||
if (srcFileNameSize > 18) {
|
if (srcFileNameSize > 18) {
|
||||||
const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
|
const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
|
||||||
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ",
|
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ",
|
||||||
fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
|
fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
|
||||||
} else {
|
} else {
|
||||||
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ",
|
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ",
|
||||||
fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
|
fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
|
||||||
@ -2098,23 +2080,21 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
|
|||||||
srcFileName, hrs.precision, hrs.value, hrs.suffix);
|
srcFileName, hrs.precision, hrs.value, hrs.suffix);
|
||||||
}
|
}
|
||||||
|
|
||||||
FIO_consumeDSrcBuffer(ress, inBuff.pos);
|
AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos);
|
||||||
|
|
||||||
if (readSizeHint == 0) break; /* end of frame */
|
if (readSizeHint == 0) break; /* end of frame */
|
||||||
|
|
||||||
/* Fill input buffer */
|
/* Fill input buffer */
|
||||||
{ size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */
|
{ size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */
|
||||||
if (ress->srcBufferLoaded < toDecode) {
|
if (ress->readCtx->srcBufferLoaded < toDecode) {
|
||||||
size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */
|
size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode);
|
||||||
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
|
|
||||||
size_t const readSize = fread(startPosition, 1, toRead, finput);
|
|
||||||
if (readSize==0) {
|
if (readSize==0) {
|
||||||
DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
|
DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
|
||||||
srcFileName);
|
srcFileName);
|
||||||
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
return FIO_ERROR_FRAME_DECODING;
|
return FIO_ERROR_FRAME_DECODING;
|
||||||
}
|
}
|
||||||
ress->srcBufferLoaded += readSize;
|
} } }
|
||||||
} } }
|
|
||||||
|
|
||||||
AIO_WritePool_releaseIoJob(writeJob);
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
@ -2125,7 +2105,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
|
|||||||
|
|
||||||
#ifdef ZSTD_GZDECOMPRESS
|
#ifdef ZSTD_GZDECOMPRESS
|
||||||
static unsigned long long
|
static unsigned long long
|
||||||
FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
|
FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName)
|
||||||
{
|
{
|
||||||
unsigned long long outFileSize = 0;
|
unsigned long long outFileSize = 0;
|
||||||
z_stream strm;
|
z_stream strm;
|
||||||
@ -2145,16 +2125,16 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
|
|||||||
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
strm.next_out = (Bytef*)writeJob->buffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = (uInt)writeJob->bufferSize;
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
strm.avail_in = (uInt)ress->srcBufferLoaded;
|
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
|
||||||
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
|
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
|
||||||
|
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
int ret;
|
int ret;
|
||||||
if (strm.avail_in == 0) {
|
if (strm.avail_in == 0) {
|
||||||
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
|
AIO_ReadPool_consumeAndRefill(ress->readCtx);
|
||||||
if (ress->srcBufferLoaded == 0) flush = Z_FINISH;
|
if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH;
|
||||||
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
|
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
|
||||||
strm.avail_in = (uInt)ress->srcBufferLoaded;
|
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
|
||||||
}
|
}
|
||||||
ret = inflate(&strm, flush);
|
ret = inflate(&strm, flush);
|
||||||
if (ret == Z_BUF_ERROR) {
|
if (ret == Z_BUF_ERROR) {
|
||||||
@ -2177,7 +2157,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
|
|||||||
if (ret == Z_STREAM_END) break;
|
if (ret == Z_STREAM_END) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
|
AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
|
||||||
|
|
||||||
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
|
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
|
||||||
&& (decodingError==0) ) {
|
&& (decodingError==0) ) {
|
||||||
@ -2192,7 +2172,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
|
|||||||
|
|
||||||
#ifdef ZSTD_LZMADECOMPRESS
|
#ifdef ZSTD_LZMADECOMPRESS
|
||||||
static unsigned long long
|
static unsigned long long
|
||||||
FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
|
FIO_decompressLzmaFrame(dRess_t* ress,
|
||||||
const char* srcFileName, int plain_lzma)
|
const char* srcFileName, int plain_lzma)
|
||||||
{
|
{
|
||||||
unsigned long long outFileSize = 0;
|
unsigned long long outFileSize = 0;
|
||||||
@ -2220,16 +2200,16 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
|
|||||||
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||||
strm.next_out = (Bytef*)writeJob->buffer;
|
strm.next_out = (Bytef*)writeJob->buffer;
|
||||||
strm.avail_out = (uInt)writeJob->bufferSize;
|
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||||
strm.next_in = (BYTE const*)ress->srcBuffer;
|
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
|
||||||
strm.avail_in = ress->srcBufferLoaded;
|
strm.avail_in = ress->readCtx->srcBufferLoaded;
|
||||||
|
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
lzma_ret ret;
|
lzma_ret ret;
|
||||||
if (strm.avail_in == 0) {
|
if (strm.avail_in == 0) {
|
||||||
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
|
AIO_ReadPool_consumeAndRefill(ress->readCtx);
|
||||||
if (ress->srcBufferLoaded == 0) action = LZMA_FINISH;
|
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
|
||||||
strm.next_in = (BYTE const*)ress->srcBuffer;
|
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
|
||||||
strm.avail_in = ress->srcBufferLoaded;
|
strm.avail_in = ress->readCtx->srcBufferLoaded;
|
||||||
}
|
}
|
||||||
ret = lzma_code(&strm, action);
|
ret = lzma_code(&strm, action);
|
||||||
|
|
||||||
@ -2253,7 +2233,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
|
|||||||
if (ret == LZMA_STREAM_END) break;
|
if (ret == LZMA_STREAM_END) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
|
AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
|
||||||
lzma_end(&strm);
|
lzma_end(&strm);
|
||||||
AIO_WritePool_releaseIoJob(writeJob);
|
AIO_WritePool_releaseIoJob(writeJob);
|
||||||
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||||
@ -2263,8 +2243,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
|
|||||||
|
|
||||||
#ifdef ZSTD_LZ4DECOMPRESS
|
#ifdef ZSTD_LZ4DECOMPRESS
|
||||||
static unsigned long long
|
static unsigned long long
|
||||||
FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
|
FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName)
|
||||||
const char* srcFileName)
|
|
||||||
{
|
{
|
||||||
unsigned long long filesize = 0;
|
unsigned long long filesize = 0;
|
||||||
LZ4F_errorCode_t nextToLoad = 4;
|
LZ4F_errorCode_t nextToLoad = 4;
|
||||||
@ -2282,34 +2261,27 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
|
|||||||
|
|
||||||
/* Main Loop */
|
/* Main Loop */
|
||||||
for (;nextToLoad;) {
|
for (;nextToLoad;) {
|
||||||
size_t readSize;
|
|
||||||
size_t pos = 0;
|
size_t pos = 0;
|
||||||
size_t decodedBytes = writeJob->bufferSize;
|
size_t decodedBytes = writeJob->bufferSize;
|
||||||
int fullBufferDecoded = 0;
|
int fullBufferDecoded = 0;
|
||||||
|
|
||||||
/* Read input */
|
/* Read input */
|
||||||
nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
|
AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad);
|
||||||
readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
|
if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */
|
||||||
if(!readSize && ferror(srcFile)) {
|
|
||||||
DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
|
|
||||||
decodingError=1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
|
|
||||||
ress->srcBufferLoaded += readSize;
|
|
||||||
|
|
||||||
while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
|
while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
|
||||||
/* Decode Input (at least partially) */
|
/* Decode Input (at least partially) */
|
||||||
size_t remaining = ress->srcBufferLoaded - pos;
|
size_t remaining = ress->readCtx->srcBufferLoaded - pos;
|
||||||
decodedBytes = writeJob->bufferSize;
|
decodedBytes = writeJob->bufferSize;
|
||||||
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
|
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos,
|
||||||
|
&remaining, NULL);
|
||||||
if (LZ4F_isError(nextToLoad)) {
|
if (LZ4F_isError(nextToLoad)) {
|
||||||
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
|
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
|
||||||
srcFileName, LZ4F_getErrorName(nextToLoad));
|
srcFileName, LZ4F_getErrorName(nextToLoad));
|
||||||
decodingError = 1; nextToLoad = 0; break;
|
decodingError = 1; nextToLoad = 0; break;
|
||||||
}
|
}
|
||||||
pos += remaining;
|
pos += remaining;
|
||||||
assert(pos <= ress->srcBufferLoaded);
|
assert(pos <= ress->readCtx->srcBufferLoaded);
|
||||||
fullBufferDecoded = decodedBytes == writeJob->bufferSize;
|
fullBufferDecoded = decodedBytes == writeJob->bufferSize;
|
||||||
|
|
||||||
/* Write Block */
|
/* Write Block */
|
||||||
@ -2324,7 +2296,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
|
|||||||
|
|
||||||
if (!nextToLoad) break;
|
if (!nextToLoad) break;
|
||||||
}
|
}
|
||||||
FIO_consumeDSrcBuffer(ress, pos);
|
AIO_ReadPool_consumeBytes(ress->readCtx, pos);
|
||||||
}
|
}
|
||||||
if (nextToLoad!=0) {
|
if (nextToLoad!=0) {
|
||||||
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
|
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
|
||||||
@ -2348,23 +2320,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
|
|||||||
* 1 : error
|
* 1 : error
|
||||||
*/
|
*/
|
||||||
static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
||||||
dRess_t ress, FILE* srcFile,
|
dRess_t ress, const FIO_prefs_t* const prefs,
|
||||||
const FIO_prefs_t* const prefs,
|
const char* dstFileName, const char* srcFileName)
|
||||||
const char* dstFileName, const char* srcFileName)
|
|
||||||
{
|
{
|
||||||
unsigned readSomething = 0;
|
unsigned readSomething = 0;
|
||||||
unsigned long long filesize = 0;
|
unsigned long long filesize = 0;
|
||||||
assert(srcFile != NULL);
|
|
||||||
|
|
||||||
/* for each frame */
|
/* for each frame */
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
/* check magic number -> version */
|
/* check magic number -> version */
|
||||||
size_t const toRead = 4;
|
size_t const toRead = 4;
|
||||||
const BYTE* const buf = (const BYTE*)ress.srcBuffer;
|
const BYTE* buf;
|
||||||
if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */
|
AIO_ReadPool_fillBuffer(ress.readCtx, toRead);
|
||||||
ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded,
|
buf = (const BYTE*)ress.readCtx->srcBuffer;
|
||||||
(size_t)1, toRead - ress.srcBufferLoaded, srcFile);
|
if (ress.readCtx->srcBufferLoaded==0) {
|
||||||
if (ress.srcBufferLoaded==0) {
|
|
||||||
if (readSomething==0) { /* srcFile is empty (which is invalid) */
|
if (readSomething==0) { /* srcFile is empty (which is invalid) */
|
||||||
DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
|
DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
|
||||||
return 1;
|
return 1;
|
||||||
@ -2372,17 +2341,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
|||||||
break; /* no more input */
|
break; /* no more input */
|
||||||
}
|
}
|
||||||
readSomething = 1; /* there is at least 1 byte in srcFile */
|
readSomething = 1; /* there is at least 1 byte in srcFile */
|
||||||
if (ress.srcBufferLoaded < toRead) {
|
if (ress.readCtx->srcBufferLoaded < toRead) {
|
||||||
DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
|
DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) {
|
if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) {
|
||||||
unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize);
|
unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize);
|
||||||
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
||||||
filesize += frameSize;
|
filesize += frameSize;
|
||||||
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
|
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
|
||||||
#ifdef ZSTD_GZDECOMPRESS
|
#ifdef ZSTD_GZDECOMPRESS
|
||||||
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
|
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName);
|
||||||
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
||||||
filesize += frameSize;
|
filesize += frameSize;
|
||||||
#else
|
#else
|
||||||
@ -2392,7 +2361,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
|||||||
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|
||||||
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
|
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
|
||||||
#ifdef ZSTD_LZMADECOMPRESS
|
#ifdef ZSTD_LZMADECOMPRESS
|
||||||
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
|
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD);
|
||||||
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
||||||
filesize += frameSize;
|
filesize += frameSize;
|
||||||
#else
|
#else
|
||||||
@ -2401,7 +2370,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
|||||||
#endif
|
#endif
|
||||||
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
|
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
|
||||||
#ifdef ZSTD_LZ4DECOMPRESS
|
#ifdef ZSTD_LZ4DECOMPRESS
|
||||||
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
|
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName);
|
||||||
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
|
||||||
filesize += frameSize;
|
filesize += frameSize;
|
||||||
#else
|
#else
|
||||||
@ -2409,10 +2378,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
|||||||
return 1;
|
return 1;
|
||||||
#endif
|
#endif
|
||||||
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
|
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
|
||||||
return FIO_passThrough(prefs,
|
return FIO_passThrough(&ress);
|
||||||
AIO_WritePool_getFile(ress.writeCtx), srcFile,
|
|
||||||
ress.srcBuffer, ress.srcBufferSize,
|
|
||||||
ress.srcBufferLoaded);
|
|
||||||
} else {
|
} else {
|
||||||
DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
|
DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
|
||||||
return 1;
|
return 1;
|
||||||
@ -2432,15 +2398,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** FIO_decompressDstFile() :
|
/** FIO_decompressDstFile() :
|
||||||
open `dstFileName`,
|
open `dstFileName`, or pass-through if writeCtx's file is already != 0,
|
||||||
or path-through if ress.dstFile is already != 0,
|
|
||||||
then start decompression process (FIO_decompressFrames()).
|
then start decompression process (FIO_decompressFrames()).
|
||||||
@return : 0 : OK
|
@return : 0 : OK
|
||||||
1 : operation aborted
|
1 : operation aborted
|
||||||
*/
|
*/
|
||||||
static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
|
static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
|
||||||
FIO_prefs_t* const prefs,
|
FIO_prefs_t* const prefs,
|
||||||
dRess_t ress, FILE* srcFile,
|
dRess_t ress,
|
||||||
const char* dstFileName, const char* srcFileName)
|
const char* dstFileName, const char* srcFileName)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
@ -2472,7 +2437,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
|
|||||||
addHandler(dstFileName);
|
addHandler(dstFileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
|
result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName);
|
||||||
|
|
||||||
if (releaseDstFile) {
|
if (releaseDstFile) {
|
||||||
clearHandler();
|
clearHandler();
|
||||||
@ -2513,9 +2478,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
|
|||||||
|
|
||||||
srcFile = FIO_openSrcFile(prefs, srcFileName);
|
srcFile = FIO_openSrcFile(prefs, srcFileName);
|
||||||
if (srcFile==NULL) return 1;
|
if (srcFile==NULL) return 1;
|
||||||
ress.srcBufferLoaded = 0;
|
AIO_ReadPool_setFile(ress.readCtx, srcFile);
|
||||||
|
|
||||||
result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName);
|
result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName);
|
||||||
|
|
||||||
|
AIO_ReadPool_setFile(ress.readCtx, NULL);
|
||||||
|
|
||||||
/* Close file */
|
/* Close file */
|
||||||
if (fclose(srcFile)) {
|
if (fclose(srcFile)) {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) Yann Collet, Facebook, Inc.
|
* Copyright (c) Facebook, Inc.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
*
|
*
|
||||||
* This source code is licensed under both the BSD-style license (found in the
|
* This source code is licensed under both the BSD-style license (found in the
|
||||||
@ -29,7 +29,8 @@
|
|||||||
/** AIO_fwriteSparse() :
|
/** AIO_fwriteSparse() :
|
||||||
* @return : storedSkips,
|
* @return : storedSkips,
|
||||||
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
|
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
|
||||||
unsigned AIO_fwriteSparse(FILE* file,
|
static unsigned
|
||||||
|
AIO_fwriteSparse(FILE* file,
|
||||||
const void* buffer, size_t bufferSize,
|
const void* buffer, size_t bufferSize,
|
||||||
const FIO_prefs_t* const prefs,
|
const FIO_prefs_t* const prefs,
|
||||||
unsigned storedSkips)
|
unsigned storedSkips)
|
||||||
@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file,
|
|||||||
if (!prefs->sparseFileSupport) { /* normal write */
|
if (!prefs->sparseFileSupport) { /* normal write */
|
||||||
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
|
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
|
||||||
if (sizeCheck != bufferSize)
|
if (sizeCheck != bufferSize)
|
||||||
EXM_THROW(70, "Write error : cannot write decoded block : %s",
|
EXM_THROW(70, "Write error : cannot write block : %s",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file,
|
|||||||
storedSkips = 0;
|
storedSkips = 0;
|
||||||
/* write the rest */
|
/* write the rest */
|
||||||
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
|
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
|
||||||
EXM_THROW(93, "Write error : cannot write decoded block : %s",
|
EXM_THROW(93, "Write error : cannot write block : %s",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
}
|
}
|
||||||
ptrT += seg0SizeT;
|
ptrT += seg0SizeT;
|
||||||
@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file,
|
|||||||
return storedSkips;
|
return storedSkips;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
|
static void
|
||||||
|
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
|
||||||
{
|
{
|
||||||
if (prefs->testMode) assert(storedSkips == 0);
|
if (prefs->testMode) assert(storedSkips == 0);
|
||||||
if (storedSkips>0) {
|
if (storedSkips>0) {
|
||||||
@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st
|
|||||||
* AsyncIO functionality
|
* AsyncIO functionality
|
||||||
************************************************************************/
|
************************************************************************/
|
||||||
|
|
||||||
|
/* AIO_supported:
|
||||||
|
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
|
||||||
|
int AIO_supported(void) {
|
||||||
|
#ifdef ZSTD_MULTITHREAD
|
||||||
|
return 1;
|
||||||
|
#else
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/* ***********************************
|
/* ***********************************
|
||||||
* General IoPool implementation
|
* General IoPool implementation
|
||||||
*************************************/
|
*************************************/
|
||||||
|
|
||||||
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
|
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
|
||||||
void *buffer;
|
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
|
||||||
IOJob_t *job;
|
void* const buffer = malloc(bufferSize);
|
||||||
job = (IOJob_t*) malloc(sizeof(IOJob_t));
|
|
||||||
buffer = malloc(bufferSize);
|
|
||||||
if(!job || !buffer)
|
if(!job || !buffer)
|
||||||
EXM_THROW(101, "Allocation error : not enough memory");
|
EXM_THROW(101, "Allocation error : not enough memory");
|
||||||
job->buffer = buffer;
|
job->buffer = buffer;
|
||||||
job->bufferSize = bufferSize;
|
job->bufferSize = bufferSize;
|
||||||
job->usedBufferSize = 0;
|
job->usedBufferSize = 0;
|
||||||
@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
|
|||||||
/* AIO_IOPool_createThreadPool:
|
/* AIO_IOPool_createThreadPool:
|
||||||
* Creates a thread pool and a mutex for threaded IO pool.
|
* Creates a thread pool and a mutex for threaded IO pool.
|
||||||
* Displays warning if asyncio is requested but MT isn't available. */
|
* Displays warning if asyncio is requested but MT isn't available. */
|
||||||
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
|
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
|
||||||
ctx->threadPool = NULL;
|
ctx->threadPool = NULL;
|
||||||
if(prefs->asyncIO) {
|
if(prefs->asyncIO) {
|
||||||
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
|
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
|
||||||
EXM_THROW(102,"Failed creating write availableJobs mutex");
|
EXM_THROW(102,"Failed creating write availableJobs mutex");
|
||||||
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
|
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
|
||||||
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
|
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
|
||||||
assert(MAX_IO_JOBS >= 2);
|
assert(MAX_IO_JOBS >= 2);
|
||||||
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
|
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
|
||||||
if (!ctx->threadPool)
|
if (!ctx->threadPool)
|
||||||
EXM_THROW(104, "Failed creating writer thread pool");
|
EXM_THROW(104, "Failed creating writer thread pool");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_IOPool_init:
|
/* AIO_IOPool_init:
|
||||||
* Allocates and sets and a new write pool including its included availableJobs. */
|
* Allocates and sets and a new write pool including its included availableJobs. */
|
||||||
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
|
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
|
||||||
int i;
|
int i;
|
||||||
AIO_IOPool_createThreadPool(ctx, prefs);
|
AIO_IOPool_createThreadPool(ctx, prefs);
|
||||||
ctx->prefs = prefs;
|
ctx->prefs = prefs;
|
||||||
ctx->poolFunction = poolFunction;
|
ctx->poolFunction = poolFunction;
|
||||||
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
|
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
|
||||||
ctx->availableJobsCount = ctx->totalIoJobs;
|
ctx->availableJobsCount = ctx->totalIoJobs;
|
||||||
for(i=0; i < ctx->availableJobsCount; i++) {
|
for(i=0; i < ctx->availableJobsCount; i++) {
|
||||||
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
|
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
|
||||||
}
|
}
|
||||||
|
ctx->jobBufferSize = bufferSize;
|
||||||
ctx->file = NULL;
|
ctx->file = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* AIO_IOPool_releaseIoJob:
|
/* AIO_IOPool_releaseIoJob:
|
||||||
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
||||||
static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
|
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
|
||||||
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
|
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
|
||||||
if(ctx->threadPool) {
|
if(ctx->threadPool)
|
||||||
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
||||||
assert(ctx->availableJobsCount < MAX_IO_JOBS);
|
assert(ctx->availableJobsCount < ctx->totalIoJobs);
|
||||||
ctx->availableJobs[ctx->availableJobsCount++] = job;
|
ctx->availableJobs[ctx->availableJobsCount++] = job;
|
||||||
|
if(ctx->threadPool)
|
||||||
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
||||||
} else {
|
|
||||||
assert(ctx->availableJobsCount == 0);
|
|
||||||
ctx->availableJobsCount++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_IOPool_join:
|
/* AIO_IOPool_join:
|
||||||
@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
|
|||||||
|
|
||||||
/* AIO_IOPool_acquireJob:
|
/* AIO_IOPool_acquireJob:
|
||||||
* Returns an available io job to be used for a future io. */
|
* Returns an available io job to be used for a future io. */
|
||||||
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
|
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
|
||||||
IOJob_t *job;
|
IOJob_t *job;
|
||||||
assert(ctx->file != NULL || ctx->prefs->testMode);
|
assert(ctx->file != NULL || ctx->prefs->testMode);
|
||||||
if(ctx->threadPool) {
|
if(ctx->threadPool)
|
||||||
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
|
||||||
assert(ctx->availableJobsCount > 0);
|
assert(ctx->availableJobsCount > 0);
|
||||||
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
|
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
|
||||||
|
if(ctx->threadPool)
|
||||||
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
|
||||||
} else {
|
|
||||||
assert(ctx->availableJobsCount == 1);
|
|
||||||
ctx->availableJobsCount--;
|
|
||||||
job = (IOJob_t*)ctx->availableJobs[0];
|
|
||||||
}
|
|
||||||
job->usedBufferSize = 0;
|
job->usedBufferSize = 0;
|
||||||
job->file = ctx->file;
|
job->file = ctx->file;
|
||||||
job->offset = 0;
|
job->offset = 0;
|
||||||
@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
|
|||||||
* Sets the destination file for future files in the pool.
|
* Sets the destination file for future files in the pool.
|
||||||
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
|
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
|
||||||
* Also requires ending of sparse write if a previous file was used in sparse mode. */
|
* Also requires ending of sparse write if a previous file was used in sparse mode. */
|
||||||
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
|
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
|
||||||
assert(ctx!=NULL);
|
assert(ctx!=NULL);
|
||||||
AIO_IOPool_join(ctx);
|
AIO_IOPool_join(ctx);
|
||||||
assert(ctx->availableJobsCount == ctx->totalIoJobs);
|
assert(ctx->availableJobsCount == ctx->totalIoJobs);
|
||||||
ctx->file = file;
|
ctx->file = file;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
|
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
|
||||||
return ctx->file;
|
return ctx->file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_IOPool_enqueueJob:
|
/* AIO_IOPool_enqueueJob:
|
||||||
* Enqueues an io job for execution.
|
* Enqueues an io job for execution.
|
||||||
* The queued job shouldn't be used directly after queueing it. */
|
* The queued job shouldn't be used directly after queueing it. */
|
||||||
static void AIO_IOPool_enqueueJob(IOJob_t *job) {
|
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
|
||||||
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
|
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
|
||||||
if(ctx->threadPool)
|
if(ctx->threadPool)
|
||||||
POOL_add(ctx->threadPool, ctx->poolFunction, job);
|
POOL_add(ctx->threadPool, ctx->poolFunction, job);
|
||||||
else
|
else
|
||||||
@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {
|
|||||||
|
|
||||||
/* AIO_WritePool_acquireJob:
|
/* AIO_WritePool_acquireJob:
|
||||||
* Returns an available write job to be used for a future write. */
|
* Returns an available write job to be used for a future write. */
|
||||||
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
|
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
|
||||||
return AIO_IOPool_acquireJob(&ctx->base);
|
return AIO_IOPool_acquireJob(&ctx->base);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
|
|||||||
/* AIO_WritePool_sparseWriteEnd:
|
/* AIO_WritePool_sparseWriteEnd:
|
||||||
* Ends sparse writes to the current file.
|
* Ends sparse writes to the current file.
|
||||||
* Blocks on completion of all current write jobs before executing. */
|
* Blocks on completion of all current write jobs before executing. */
|
||||||
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
|
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
|
||||||
assert(ctx != NULL);
|
assert(ctx != NULL);
|
||||||
if(ctx->base.threadPool)
|
if(ctx->base.threadPool)
|
||||||
POOL_joinJobs(ctx->base.threadPool);
|
POOL_joinJobs(ctx->base.threadPool);
|
||||||
@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
|
|||||||
* Sets the destination file for future writes in the pool.
|
* Sets the destination file for future writes in the pool.
|
||||||
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
|
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
|
||||||
* Also requires ending of sparse write if a previous file was used in sparse mode. */
|
* Also requires ending of sparse write if a previous file was used in sparse mode. */
|
||||||
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
|
void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
|
||||||
AIO_IOPool_setFile(&ctx->base, file);
|
AIO_IOPool_setFile(&ctx->base, file);
|
||||||
assert(ctx->storedSkips == 0);
|
assert(ctx->storedSkips == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_WritePool_getFile:
|
/* AIO_WritePool_getFile:
|
||||||
* Returns the file the writePool is currently set to write to. */
|
* Returns the file the writePool is currently set to write to. */
|
||||||
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
|
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
|
||||||
return AIO_IOPool_getFile(&ctx->base);
|
return AIO_IOPool_getFile(&ctx->base);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_WritePool_releaseIoJob:
|
/* AIO_WritePool_releaseIoJob:
|
||||||
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
||||||
void AIO_WritePool_releaseIoJob(IOJob_t *job) {
|
void AIO_WritePool_releaseIoJob(IOJob_t* job) {
|
||||||
AIO_IOPool_releaseIoJob(job);
|
AIO_IOPool_releaseIoJob(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_WritePool_closeFile:
|
/* AIO_WritePool_closeFile:
|
||||||
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
|
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
|
||||||
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
|
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
|
||||||
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
|
int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
|
||||||
FILE *dstFile = ctx->base.file;
|
FILE* const dstFile = ctx->base.file;
|
||||||
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
|
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
|
||||||
AIO_WritePool_sparseWriteEnd(ctx);
|
AIO_WritePool_sparseWriteEnd(ctx);
|
||||||
AIO_IOPool_setFile(&ctx->base, NULL);
|
AIO_IOPool_setFile(&ctx->base, NULL);
|
||||||
@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
|
|||||||
/* AIO_WritePool_executeWriteJob:
|
/* AIO_WritePool_executeWriteJob:
|
||||||
* Executes a write job synchronously. Can be used as a function for a thread pool. */
|
* Executes a write job synchronously. Can be used as a function for a thread pool. */
|
||||||
static void AIO_WritePool_executeWriteJob(void* opaque){
|
static void AIO_WritePool_executeWriteJob(void* opaque){
|
||||||
IOJob_t* job = (IOJob_t*) opaque;
|
IOJob_t* const job = (IOJob_t*) opaque;
|
||||||
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
|
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
|
||||||
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
|
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
|
||||||
AIO_IOPool_releaseIoJob(job);
|
AIO_IOPool_releaseIoJob(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* AIO_WritePool_create:
|
/* AIO_WritePool_create:
|
||||||
* Allocates and sets and a new write pool including its included jobs. */
|
* Allocates and sets and a new write pool including its included jobs. */
|
||||||
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
|
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
|
||||||
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
|
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
|
||||||
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
|
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
|
||||||
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
|
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
|
||||||
ctx->storedSkips = 0;
|
ctx->storedSkips = 0;
|
||||||
@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
|
|||||||
assert(ctx->storedSkips==0);
|
assert(ctx->storedSkips==0);
|
||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ***********************************
|
||||||
|
* ReadPool implementation
|
||||||
|
*************************************/
|
||||||
|
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
|
||||||
|
int i;
|
||||||
|
for(i=0; i<ctx->completedJobsCount; i++) {
|
||||||
|
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
|
||||||
|
AIO_IOPool_releaseIoJob(job);
|
||||||
|
}
|
||||||
|
ctx->completedJobsCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
|
||||||
|
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
|
||||||
|
if(ctx->base.threadPool)
|
||||||
|
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
|
||||||
|
assert(ctx->completedJobsCount < MAX_IO_JOBS);
|
||||||
|
ctx->completedJobs[ctx->completedJobsCount++] = job;
|
||||||
|
if(ctx->base.threadPool) {
|
||||||
|
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
|
||||||
|
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
|
||||||
|
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
|
||||||
|
* if job wasn't found returns NULL.
|
||||||
|
* IMPORTANT: assumes ioJobsMutex is locked. */
|
||||||
|
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
|
||||||
|
IOJob_t *job = NULL;
|
||||||
|
int i;
|
||||||
|
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
|
||||||
|
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
|
||||||
|
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
|
||||||
|
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
|
||||||
|
for (i=0; i<ctx->completedJobsCount; i++) {
|
||||||
|
job = (IOJob_t *) ctx->completedJobs[i];
|
||||||
|
if (job->offset == ctx->waitingOnOffset) {
|
||||||
|
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_numReadsInFlight:
|
||||||
|
* Returns the number of IO read jobs currrently in flight. */
|
||||||
|
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
|
||||||
|
const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
|
||||||
|
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_getNextCompletedJob:
|
||||||
|
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
|
||||||
|
* Would block. */
|
||||||
|
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
|
||||||
|
IOJob_t *job = NULL;
|
||||||
|
if (ctx->base.threadPool)
|
||||||
|
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
|
||||||
|
|
||||||
|
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
|
||||||
|
|
||||||
|
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
|
||||||
|
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
|
||||||
|
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
|
||||||
|
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
|
||||||
|
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(job) {
|
||||||
|
assert(job->offset == ctx->waitingOnOffset);
|
||||||
|
ctx->waitingOnOffset += job->usedBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctx->base.threadPool)
|
||||||
|
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* AIO_ReadPool_executeReadJob:
|
||||||
|
* Executes a read job synchronously. Can be used as a function for a thread pool. */
|
||||||
|
static void AIO_ReadPool_executeReadJob(void* opaque){
|
||||||
|
IOJob_t* const job = (IOJob_t*) opaque;
|
||||||
|
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
|
||||||
|
if(ctx->reachedEof) {
|
||||||
|
job->usedBufferSize = 0;
|
||||||
|
AIO_ReadPool_addJobToCompleted(job);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
|
||||||
|
if(job->usedBufferSize < job->bufferSize) {
|
||||||
|
if(ferror(job->file)) {
|
||||||
|
EXM_THROW(37, "Read error");
|
||||||
|
} else if(feof(job->file)) {
|
||||||
|
ctx->reachedEof = 1;
|
||||||
|
} else {
|
||||||
|
EXM_THROW(37, "Unexpected short read");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AIO_ReadPool_addJobToCompleted(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
|
||||||
|
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
|
||||||
|
job->offset = ctx->nextReadOffset;
|
||||||
|
ctx->nextReadOffset += job->bufferSize;
|
||||||
|
AIO_IOPool_enqueueJob(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < ctx->base.availableJobsCount; i++) {
|
||||||
|
AIO_ReadPool_enqueueRead(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_setFile:
|
||||||
|
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
|
||||||
|
* Waits for all current enqueued tasks to complete if a previous file was set. */
|
||||||
|
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
|
||||||
|
assert(ctx!=NULL);
|
||||||
|
AIO_IOPool_join(&ctx->base);
|
||||||
|
AIO_ReadPool_releaseAllCompletedJobs(ctx);
|
||||||
|
if (ctx->currentJobHeld) {
|
||||||
|
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
|
||||||
|
ctx->currentJobHeld = NULL;
|
||||||
|
}
|
||||||
|
AIO_IOPool_setFile(&ctx->base, file);
|
||||||
|
ctx->nextReadOffset = 0;
|
||||||
|
ctx->waitingOnOffset = 0;
|
||||||
|
ctx->srcBuffer = ctx->coalesceBuffer;
|
||||||
|
ctx->srcBufferLoaded = 0;
|
||||||
|
ctx->reachedEof = 0;
|
||||||
|
if(file != NULL)
|
||||||
|
AIO_ReadPool_startReading(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_create:
|
||||||
|
* Allocates and sets and a new readPool including its included jobs.
|
||||||
|
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
|
||||||
|
* as our basic read size. */
|
||||||
|
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
|
||||||
|
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
|
||||||
|
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
|
||||||
|
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
|
||||||
|
|
||||||
|
ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
|
||||||
|
ctx->srcBuffer = ctx->coalesceBuffer;
|
||||||
|
ctx->srcBufferLoaded = 0;
|
||||||
|
ctx->completedJobsCount = 0;
|
||||||
|
ctx->currentJobHeld = NULL;
|
||||||
|
|
||||||
|
if(ctx->base.threadPool)
|
||||||
|
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
|
||||||
|
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
|
||||||
|
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_free:
|
||||||
|
* Frees and releases a readPool and its resources. Closes source file. */
|
||||||
|
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
|
||||||
|
if(AIO_ReadPool_getFile(ctx))
|
||||||
|
AIO_ReadPool_closeFile(ctx);
|
||||||
|
if(ctx->base.threadPool)
|
||||||
|
ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
|
||||||
|
AIO_IOPool_destroy(&ctx->base);
|
||||||
|
free(ctx->coalesceBuffer);
|
||||||
|
free(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_consumeBytes:
|
||||||
|
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
|
||||||
|
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
|
||||||
|
assert(n <= ctx->srcBufferLoaded);
|
||||||
|
ctx->srcBufferLoaded -= n;
|
||||||
|
ctx->srcBuffer += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
|
||||||
|
* Release the current held job and get the next one, returns NULL if no next job available. */
|
||||||
|
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
|
||||||
|
if (ctx->currentJobHeld) {
|
||||||
|
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
|
||||||
|
ctx->currentJobHeld = NULL;
|
||||||
|
AIO_ReadPool_enqueueRead(ctx);
|
||||||
|
}
|
||||||
|
ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
|
||||||
|
return (IOJob_t*) ctx->currentJobHeld;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_fillBuffer:
|
||||||
|
* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
|
||||||
|
* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
|
||||||
|
* Return value is the number of bytes added to the buffer.
|
||||||
|
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
|
||||||
|
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
|
||||||
|
IOJob_t *job;
|
||||||
|
int useCoalesce = 0;
|
||||||
|
if(n > ctx->base.jobBufferSize)
|
||||||
|
n = ctx->base.jobBufferSize;
|
||||||
|
|
||||||
|
/* We are good, don't read anything */
|
||||||
|
if (ctx->srcBufferLoaded >= n)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
|
||||||
|
* and coalesce the remaining bytes with the next job's buffer */
|
||||||
|
if (ctx->srcBufferLoaded > 0) {
|
||||||
|
useCoalesce = 1;
|
||||||
|
memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
|
||||||
|
ctx->srcBuffer = ctx->coalesceBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Read the next chunk */
|
||||||
|
job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
|
||||||
|
if(!job)
|
||||||
|
return 0;
|
||||||
|
if(useCoalesce) {
|
||||||
|
assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
|
||||||
|
memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
|
||||||
|
ctx->srcBufferLoaded += job->usedBufferSize;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ctx->srcBuffer = (U8 *) job->buffer;
|
||||||
|
ctx->srcBufferLoaded = job->usedBufferSize;
|
||||||
|
}
|
||||||
|
return job->usedBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_consumeAndRefill:
|
||||||
|
* Consumes the current buffer and refills it with bufferSize bytes. */
|
||||||
|
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
|
||||||
|
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
|
||||||
|
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_getFile:
|
||||||
|
* Returns the current file set for the read pool. */
|
||||||
|
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
|
||||||
|
return AIO_IOPool_getFile(&ctx->base);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* AIO_ReadPool_closeFile:
|
||||||
|
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
|
||||||
|
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
|
||||||
|
FILE* const file = AIO_ReadPool_getFile(ctx);
|
||||||
|
AIO_ReadPool_setFile(ctx, NULL);
|
||||||
|
return fclose(file);
|
||||||
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) Yann Collet, Facebook, Inc.
|
* Copyright (c) Facebook, Inc.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
*
|
*
|
||||||
* This source code is licensed under both the BSD-style license (found in the
|
* This source code is licensed under both the BSD-style license (found in the
|
||||||
@ -28,7 +28,7 @@ typedef struct {
|
|||||||
/* These struct fields should be set only on creation and not changed afterwards */
|
/* These struct fields should be set only on creation and not changed afterwards */
|
||||||
POOL_ctx* threadPool;
|
POOL_ctx* threadPool;
|
||||||
int totalIoJobs;
|
int totalIoJobs;
|
||||||
FIO_prefs_t* prefs;
|
const FIO_prefs_t* prefs;
|
||||||
POOL_function poolFunction;
|
POOL_function poolFunction;
|
||||||
|
|
||||||
/* Controls the file we currently write to, make changes only by using provided utility functions */
|
/* Controls the file we currently write to, make changes only by using provided utility functions */
|
||||||
@ -39,8 +39,36 @@ typedef struct {
|
|||||||
ZSTD_pthread_mutex_t ioJobsMutex;
|
ZSTD_pthread_mutex_t ioJobsMutex;
|
||||||
void* availableJobs[MAX_IO_JOBS];
|
void* availableJobs[MAX_IO_JOBS];
|
||||||
int availableJobsCount;
|
int availableJobsCount;
|
||||||
|
size_t jobBufferSize;
|
||||||
} IOPoolCtx_t;
|
} IOPoolCtx_t;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
IOPoolCtx_t base;
|
||||||
|
|
||||||
|
/* State regarding the currently read file */
|
||||||
|
int reachedEof;
|
||||||
|
U64 nextReadOffset;
|
||||||
|
U64 waitingOnOffset;
|
||||||
|
|
||||||
|
/* We may hold an IOJob object as needed if we actively expose its buffer. */
|
||||||
|
void *currentJobHeld;
|
||||||
|
|
||||||
|
/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
|
||||||
|
* the first of them. Shouldn't be accessed from outside ot utility functions. */
|
||||||
|
U8 *coalesceBuffer;
|
||||||
|
|
||||||
|
/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
|
||||||
|
* change when consuming / refilling buffer. */
|
||||||
|
U8 *srcBuffer;
|
||||||
|
size_t srcBufferLoaded;
|
||||||
|
|
||||||
|
/* We need to know what tasks completed so we can use their buffers when their time comes.
|
||||||
|
* Should only be accessed after locking base.ioJobsMutex . */
|
||||||
|
void* completedJobs[MAX_IO_JOBS];
|
||||||
|
int completedJobsCount;
|
||||||
|
ZSTD_pthread_cond_t jobCompletedCond;
|
||||||
|
} ReadPoolCtx_t;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
IOPoolCtx_t base;
|
IOPoolCtx_t base;
|
||||||
unsigned storedSkips;
|
unsigned storedSkips;
|
||||||
@ -59,15 +87,10 @@ typedef struct {
|
|||||||
U64 offset;
|
U64 offset;
|
||||||
} IOJob_t;
|
} IOJob_t;
|
||||||
|
|
||||||
/** AIO_fwriteSparse() :
|
/* AIO_supported:
|
||||||
* @return : storedSkips,
|
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
|
||||||
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
|
int AIO_supported(void);
|
||||||
unsigned AIO_fwriteSparse(FILE* file,
|
|
||||||
const void* buffer, size_t bufferSize,
|
|
||||||
const FIO_prefs_t* const prefs,
|
|
||||||
unsigned storedSkips);
|
|
||||||
|
|
||||||
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
|
|
||||||
|
|
||||||
/* AIO_WritePool_releaseIoJob:
|
/* AIO_WritePool_releaseIoJob:
|
||||||
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
* Releases an acquired job back to the pool. Doesn't execute the job. */
|
||||||
@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
|
|||||||
|
|
||||||
/* AIO_WritePool_getFile:
|
/* AIO_WritePool_getFile:
|
||||||
* Returns the file the writePool is currently set to write to. */
|
* Returns the file the writePool is currently set to write to. */
|
||||||
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
|
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
|
||||||
|
|
||||||
/* AIO_WritePool_closeFile:
|
/* AIO_WritePool_closeFile:
|
||||||
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
|
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
|
||||||
@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
|
|||||||
/* AIO_WritePool_create:
|
/* AIO_WritePool_create:
|
||||||
* Allocates and sets and a new write pool including its included jobs.
|
* Allocates and sets and a new write pool including its included jobs.
|
||||||
* bufferSize should be set to the maximal buffer we want to write to at a time. */
|
* bufferSize should be set to the maximal buffer we want to write to at a time. */
|
||||||
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
|
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
|
||||||
|
|
||||||
/* AIO_WritePool_free:
|
/* AIO_WritePool_free:
|
||||||
* Frees and releases a writePool and its resources. Closes destination file. */
|
* Frees and releases a writePool and its resources. Closes destination file. */
|
||||||
void AIO_WritePool_free(WritePoolCtx_t* ctx);
|
void AIO_WritePool_free(WritePoolCtx_t* ctx);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_create:
|
||||||
|
* Allocates and sets and a new readPool including its included jobs.
|
||||||
|
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
|
||||||
|
* as our basic read size. */
|
||||||
|
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_free:
|
||||||
|
* Frees and releases a readPool and its resources. Closes source file. */
|
||||||
|
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_consumeBytes:
|
||||||
|
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
|
||||||
|
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_fillBuffer:
|
||||||
|
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
|
||||||
|
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
|
||||||
|
* Return value is the number of bytes added to the buffer.
|
||||||
|
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
|
||||||
|
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_consumeAndRefill:
|
||||||
|
* Consumes the current buffer and refills it with bufferSize bytes. */
|
||||||
|
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_setFile:
|
||||||
|
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
|
||||||
|
* Waits for all current enqueued tasks to complete if a previous file was set. */
|
||||||
|
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_getFile:
|
||||||
|
* Returns the current file set for the read pool. */
|
||||||
|
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
|
||||||
|
|
||||||
|
/* AIO_ReadPool_closeFile:
|
||||||
|
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
|
||||||
|
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
|
||||||
|
|
||||||
#if defined (__cplusplus)
|
#if defined (__cplusplus)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) Yann Collet, Facebook, Inc.
|
* Copyright (c) Facebook, Inc.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
*
|
*
|
||||||
* This source code is licensed under both the BSD-style license (found in the
|
* This source code is licensed under both the BSD-style license (found in the
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) Yann Collet, Facebook, Inc.
|
* Copyright (c) Facebook, Inc.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
*
|
*
|
||||||
* This source code is licensed under both the BSD-style license (found in the
|
* This source code is licensed under both the BSD-style license (found in the
|
||||||
@ -70,4 +70,4 @@ typedef struct FIO_prefs_s {
|
|||||||
int allowBlockDevices;
|
int allowBlockDevices;
|
||||||
} FIO_prefs_t;
|
} FIO_prefs_t;
|
||||||
|
|
||||||
#endif /* FILEIO_TYPES_HEADER */
|
#endif /* FILEIO_TYPES_HEADER */
|
||||||
|
@ -46,6 +46,7 @@
|
|||||||
# include "zstdcli_trace.h"
|
# include "zstdcli_trace.h"
|
||||||
#endif
|
#endif
|
||||||
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
|
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
|
||||||
|
#include "fileio_asyncio.h"
|
||||||
|
|
||||||
|
|
||||||
/*-************************************
|
/*-************************************
|
||||||
@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
|
|||||||
#ifdef UTIL_HAS_MIRRORFILELIST
|
#ifdef UTIL_HAS_MIRRORFILELIST
|
||||||
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
|
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
|
||||||
#endif
|
#endif
|
||||||
|
if (AIO_supported())
|
||||||
|
DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");
|
||||||
|
|
||||||
#ifndef ZSTD_NOCOMPRESS
|
#ifndef ZSTD_NOCOMPRESS
|
||||||
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
|
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
|
||||||
@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
|
|||||||
DISPLAYOUT( " -l : print information about zstd compressed files \n");
|
DISPLAYOUT( " -l : print information about zstd compressed files \n");
|
||||||
DISPLAYOUT( "--test : test compressed file integrity \n");
|
DISPLAYOUT( "--test : test compressed file integrity \n");
|
||||||
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
|
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
|
||||||
#ifdef ZSTD_MULTITHREAD
|
|
||||||
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
|
|
||||||
#endif
|
|
||||||
# if ZSTD_SPARSE_DEFAULT
|
# if ZSTD_SPARSE_DEFAULT
|
||||||
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
|
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
|
||||||
# else
|
# else
|
||||||
@ -1459,6 +1458,7 @@ int main(int argCount, const char* argv[])
|
|||||||
FIO_setTargetCBlockSize(prefs, targetCBlockSize);
|
FIO_setTargetCBlockSize(prefs, targetCBlockSize);
|
||||||
FIO_setSrcSizeHint(prefs, srcSizeHint);
|
FIO_setSrcSizeHint(prefs, srcSizeHint);
|
||||||
FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
|
FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
|
||||||
|
FIO_setSparseWrite(prefs, 0);
|
||||||
if (adaptMin > cLevel) cLevel = adaptMin;
|
if (adaptMin > cLevel) cLevel = adaptMin;
|
||||||
if (adaptMax < cLevel) cLevel = adaptMax;
|
if (adaptMax < cLevel) cLevel = adaptMax;
|
||||||
|
|
||||||
|
@ -260,10 +260,13 @@ zstd -dc - < tmp.zst > $INTOVOID
|
|||||||
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
|
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
|
||||||
zstd -d - < tmp.zst > $INTOVOID
|
zstd -d - < tmp.zst > $INTOVOID
|
||||||
println "test : impose memory limitation (must fail)"
|
println "test : impose memory limitation (must fail)"
|
||||||
zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
|
datagen -g500K > tmplimit
|
||||||
zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
zstd -f tmplimit
|
||||||
zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
|
||||||
zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
||||||
|
zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
||||||
|
zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
|
||||||
|
rm -f tmplimit tmplimit.zst
|
||||||
println "test : overwrite protection"
|
println "test : overwrite protection"
|
||||||
zstd -q tmp && die "overwrite check failed!"
|
zstd -q tmp && die "overwrite check failed!"
|
||||||
println "test : force overwrite"
|
println "test : force overwrite"
|
||||||
@ -1596,11 +1599,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
println "\n===> zstd asyncio decompression tests "
|
println "\n===> zstd asyncio tests "
|
||||||
|
|
||||||
addFrame() {
|
addFrame() {
|
||||||
datagen -g2M -s$2 >> tmp_uncompressed
|
datagen -g2M -s$2 >> tmp_uncompressed
|
||||||
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
|
datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
|
||||||
}
|
}
|
||||||
|
|
||||||
addTwoFrames() {
|
addTwoFrames() {
|
||||||
|
Reference in New Issue
Block a user