mirror of
https://github.com/facebook/zstd.git
synced 2025-11-02 09:13:20 +03:00
removed asyncio completely for compression path
this does not provide speed benefits, since most of the leverage happens internally within the library, and can even become detrimental in certain scenario, due to complex and wasteful memory management. At a minimum, it makes the logic simpler, easier to debug, at essentially the same performance.
This commit is contained in:
@@ -48,6 +48,36 @@
|
||||
FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
|
||||
UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
|
||||
|
||||
/* *************************************
|
||||
* Synchronous compression IO helpers
|
||||
***************************************/
|
||||
typedef struct {
|
||||
const FIO_prefs_t* prefs;
|
||||
FILE* srcFile;
|
||||
FILE* dstFile;
|
||||
unsigned storedSkips;
|
||||
U8* inBuffer;
|
||||
size_t inCapacity;
|
||||
U8* srcBuffer;
|
||||
size_t srcBufferLoaded;
|
||||
U8* outBuffer;
|
||||
size_t outCapacity;
|
||||
} FIO_SyncCompressIO;
|
||||
|
||||
static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
|
||||
const FIO_prefs_t* prefs,
|
||||
size_t inCapacity,
|
||||
size_t outCapacity);
|
||||
static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io);
|
||||
static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file);
|
||||
static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io);
|
||||
static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file);
|
||||
static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io);
|
||||
static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave);
|
||||
static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n);
|
||||
static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size);
|
||||
static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io);
|
||||
|
||||
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
|
||||
#include "../lib/zstd.h"
|
||||
#include "../lib/zstd_errors.h" /* ZSTD_error_frameParameter_windowTooLarge */
|
||||
@@ -125,6 +155,233 @@ char const* FIO_lzmaVersion(void)
|
||||
#define TEMPORARY_FILE_PERMISSIONS (0600)
|
||||
#endif
|
||||
|
||||
static unsigned FIO_sparseWrite(FILE* file,
|
||||
const void* buffer, size_t bufferSize,
|
||||
const FIO_prefs_t* const prefs,
|
||||
unsigned storedSkips)
|
||||
{
|
||||
const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
|
||||
size_t bufferSizeT = bufferSize / sizeof(size_t);
|
||||
const size_t* const bufferTEnd = bufferT + bufferSizeT;
|
||||
const size_t* ptrT = bufferT;
|
||||
static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
|
||||
|
||||
if (prefs->testMode) return 0; /* do not output anything in test mode */
|
||||
|
||||
if (!prefs->sparseFileSupport) { /* normal write */
|
||||
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
|
||||
if (sizeCheck != bufferSize)
|
||||
EXM_THROW(70, "Write error : cannot write block : %s",
|
||||
strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* avoid int overflow */
|
||||
if (storedSkips > 1 GB) {
|
||||
if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
|
||||
EXM_THROW(91, "1 GB skip error (sparse file support)");
|
||||
storedSkips -= 1 GB;
|
||||
}
|
||||
|
||||
while (ptrT < bufferTEnd) {
|
||||
size_t nb0T;
|
||||
|
||||
/* adjust last segment if < 32 KB */
|
||||
size_t seg0SizeT = segmentSizeT;
|
||||
if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
|
||||
bufferSizeT -= seg0SizeT;
|
||||
|
||||
/* count leading zeroes */
|
||||
for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
|
||||
storedSkips += (unsigned)(nb0T * sizeof(size_t));
|
||||
|
||||
if (nb0T != seg0SizeT) { /* not all 0s */
|
||||
size_t const nbNon0ST = seg0SizeT - nb0T;
|
||||
/* skip leading zeros */
|
||||
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
|
||||
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
|
||||
storedSkips = 0;
|
||||
/* write the rest */
|
||||
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
|
||||
EXM_THROW(93, "Write error : cannot write block : %s",
|
||||
strerror(errno));
|
||||
}
|
||||
ptrT += seg0SizeT;
|
||||
}
|
||||
|
||||
{ static size_t const maskT = sizeof(size_t)-1;
|
||||
if (bufferSize & maskT) {
|
||||
/* size not multiple of sizeof(size_t) : implies end of block */
|
||||
const char* const restStart = (const char*)bufferTEnd;
|
||||
const char* restPtr = restStart;
|
||||
const char* const restEnd = (const char*)buffer + bufferSize;
|
||||
assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
|
||||
for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
|
||||
storedSkips += (unsigned) (restPtr - restStart);
|
||||
if (restPtr != restEnd) {
|
||||
/* not all remaining bytes are 0 */
|
||||
size_t const restSize = (size_t)(restEnd - restPtr);
|
||||
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
|
||||
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
|
||||
if (fwrite(restPtr, 1, restSize, file) != restSize)
|
||||
EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
|
||||
strerror(errno));
|
||||
storedSkips = 0;
|
||||
} } }
|
||||
|
||||
return storedSkips;
|
||||
}
|
||||
|
||||
static void FIO_sparseWriteEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
|
||||
{
|
||||
if (file == NULL) return;
|
||||
if (prefs->testMode) {
|
||||
assert(storedSkips == 0);
|
||||
return;
|
||||
}
|
||||
if (storedSkips>0) {
|
||||
assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
|
||||
if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
|
||||
EXM_THROW(69, "Final skip error (sparse file support)");
|
||||
/* last zero must be explicitly written,
|
||||
* so that skipped ones get implicitly translated as zero by FS */
|
||||
{ const char lastZeroByte[1] = { 0 };
|
||||
if (fwrite(lastZeroByte, 1, 1, file) != 1)
|
||||
EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
|
||||
const FIO_prefs_t* prefs,
|
||||
size_t inCapacity,
|
||||
size_t outCapacity)
|
||||
{
|
||||
memset(io, 0, sizeof(*io));
|
||||
io->prefs = prefs;
|
||||
io->inCapacity = inCapacity;
|
||||
io->outCapacity = outCapacity;
|
||||
io->inBuffer = (U8*)malloc(inCapacity);
|
||||
if (!io->inBuffer)
|
||||
EXM_THROW(101, "Allocation error : not enough memory");
|
||||
io->outBuffer = (U8*)malloc(outCapacity);
|
||||
if (!io->outBuffer) {
|
||||
free(io->inBuffer);
|
||||
io->inBuffer = NULL;
|
||||
EXM_THROW(101, "Allocation error : not enough memory");
|
||||
}
|
||||
io->srcBuffer = io->inBuffer;
|
||||
io->srcBufferLoaded = 0;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io)
|
||||
{
|
||||
if (!io) return;
|
||||
free(io->inBuffer);
|
||||
free(io->outBuffer);
|
||||
io->inBuffer = NULL;
|
||||
io->outBuffer = NULL;
|
||||
io->srcBuffer = NULL;
|
||||
io->srcBufferLoaded = 0;
|
||||
io->srcFile = NULL;
|
||||
io->dstFile = NULL;
|
||||
io->storedSkips = 0;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file)
|
||||
{
|
||||
io->srcFile = file;
|
||||
io->srcBuffer = io->inBuffer;
|
||||
io->srcBufferLoaded = 0;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io)
|
||||
{
|
||||
io->srcFile = NULL;
|
||||
io->srcBuffer = io->inBuffer;
|
||||
io->srcBufferLoaded = 0;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file)
|
||||
{
|
||||
io->dstFile = file;
|
||||
io->storedSkips = 0;
|
||||
}
|
||||
|
||||
static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io)
|
||||
{
|
||||
int result = 0;
|
||||
if (io->dstFile != NULL) {
|
||||
FIO_SyncCompressIO_finish(io);
|
||||
result = fclose(io->dstFile);
|
||||
io->dstFile = NULL;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave)
|
||||
{
|
||||
size_t added = 0;
|
||||
if (io->srcFile == NULL)
|
||||
return 0;
|
||||
|
||||
if (minToHave > io->inCapacity)
|
||||
minToHave = io->inCapacity;
|
||||
|
||||
if (io->srcBufferLoaded >= minToHave)
|
||||
return 0;
|
||||
|
||||
if (io->srcBuffer != io->inBuffer) {
|
||||
if (io->srcBufferLoaded > 0)
|
||||
memmove(io->inBuffer, io->srcBuffer, io->srcBufferLoaded);
|
||||
io->srcBuffer = io->inBuffer;
|
||||
}
|
||||
|
||||
while (io->srcBufferLoaded < minToHave) {
|
||||
size_t const toRead = io->inCapacity - io->srcBufferLoaded;
|
||||
size_t const readBytes = fread(io->inBuffer + io->srcBufferLoaded, 1, toRead, io->srcFile);
|
||||
if (readBytes == 0) {
|
||||
if (ferror(io->srcFile))
|
||||
EXM_THROW(37, "Read error");
|
||||
break; /* EOF */
|
||||
}
|
||||
io->srcBufferLoaded += readBytes;
|
||||
added += readBytes;
|
||||
if (readBytes < toRead)
|
||||
break;
|
||||
}
|
||||
|
||||
return added;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n)
|
||||
{
|
||||
assert(n <= io->srcBufferLoaded);
|
||||
io->srcBuffer += n;
|
||||
io->srcBufferLoaded -= n;
|
||||
if (io->srcBufferLoaded == 0)
|
||||
io->srcBuffer = io->inBuffer;
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size)
|
||||
{
|
||||
if (size == 0)
|
||||
return;
|
||||
if (io->dstFile == NULL) {
|
||||
assert(io->prefs->testMode);
|
||||
return;
|
||||
}
|
||||
io->storedSkips = FIO_sparseWrite(io->dstFile, buffer, size, io->prefs, io->storedSkips);
|
||||
}
|
||||
|
||||
static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io)
|
||||
{
|
||||
if (io->dstFile == NULL)
|
||||
return;
|
||||
FIO_sparseWriteEnd(io->prefs, io->dstFile, io->storedSkips);
|
||||
io->storedSkips = 0;
|
||||
}
|
||||
|
||||
/*-************************************
|
||||
* Signal (Ctrl-C trapping)
|
||||
**************************************/
|
||||
@@ -1078,8 +1335,7 @@ typedef struct {
|
||||
const char* dictFileName;
|
||||
stat_t dictFileStat;
|
||||
ZSTD_CStream* cctx;
|
||||
WritePoolCtx_t *writeCtx;
|
||||
ReadPoolCtx_t *readCtx;
|
||||
FIO_SyncCompressIO io;
|
||||
} cRess_t;
|
||||
|
||||
/** ZSTD_cycleLog() :
|
||||
@@ -1147,14 +1403,7 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
|
||||
dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict;
|
||||
FIO_initDict(&ress.dict, dictFileName, prefs, &ress.dictFileStat, dictBufferType); /* works with dictFileName==NULL */
|
||||
|
||||
{
|
||||
/* Compression paths stay synchronous for now: lower overhead and easier upkeep. */
|
||||
int const savedAsyncIO = prefs->asyncIO;
|
||||
prefs->asyncIO = 0;
|
||||
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
|
||||
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
|
||||
prefs->asyncIO = savedAsyncIO;
|
||||
}
|
||||
FIO_SyncCompressIO_init(&ress.io, prefs, ZSTD_CStreamInSize(), ZSTD_CStreamOutSize());
|
||||
|
||||
/* Advanced parameters, including dictionary */
|
||||
if (dictFileName && (ress.dict.dictBuffer==NULL))
|
||||
@@ -1218,21 +1467,20 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
|
||||
static void FIO_freeCResources(cRess_t* const ress)
|
||||
{
|
||||
FIO_freeDict(&(ress->dict));
|
||||
AIO_WritePool_free(ress->writeCtx);
|
||||
AIO_ReadPool_free(ress->readCtx);
|
||||
FIO_SyncCompressIO_free(&ress->io);
|
||||
ZSTD_freeCStream(ress->cctx); /* never fails */
|
||||
}
|
||||
|
||||
|
||||
#ifdef ZSTD_GZCOMPRESS
|
||||
static unsigned long long
|
||||
FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but not changed */
|
||||
FIO_compressGzFrame(cRess_t* ress,
|
||||
const char* srcFileName, U64 const srcFileSize,
|
||||
int compressionLevel, U64* readsize)
|
||||
{
|
||||
FIO_SyncCompressIO* const io = &ress->io;
|
||||
unsigned long long inFileSize = 0, outFileSize = 0;
|
||||
z_stream strm;
|
||||
IOJob_t *writeJob = NULL;
|
||||
|
||||
if (compressionLevel > Z_BEST_COMPRESSION)
|
||||
compressionLevel = Z_BEST_COMPRESSION;
|
||||
@@ -1248,37 +1496,36 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
||||
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
|
||||
} }
|
||||
|
||||
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||
strm.next_in = 0;
|
||||
strm.avail_in = 0;
|
||||
strm.next_out = (Bytef*)writeJob->buffer;
|
||||
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||
strm.next_out = (Bytef*)io->outBuffer;
|
||||
strm.avail_out = (uInt)io->outCapacity;
|
||||
|
||||
while (1) {
|
||||
int ret;
|
||||
if (strm.avail_in == 0) {
|
||||
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
|
||||
if (ress->readCtx->srcBufferLoaded == 0) break;
|
||||
inFileSize += ress->readCtx->srcBufferLoaded;
|
||||
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
|
||||
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
|
||||
size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
|
||||
if (io->srcBufferLoaded == 0) break;
|
||||
inFileSize += added;
|
||||
*readsize += added;
|
||||
strm.next_in = (z_const unsigned char*)io->srcBuffer;
|
||||
strm.avail_in = (uInt)io->srcBufferLoaded;
|
||||
}
|
||||
|
||||
{
|
||||
size_t const availBefore = strm.avail_in;
|
||||
ret = deflate(&strm, Z_NO_FLUSH);
|
||||
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
|
||||
FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in);
|
||||
}
|
||||
|
||||
if (ret != Z_OK)
|
||||
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
|
||||
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
|
||||
{ size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out);
|
||||
if (cSize) {
|
||||
writeJob->usedBufferSize = cSize;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize);
|
||||
outFileSize += cSize;
|
||||
strm.next_out = (Bytef*)writeJob->buffer;
|
||||
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||
strm.next_out = (Bytef*)io->outBuffer;
|
||||
strm.avail_out = (uInt)io->outCapacity;
|
||||
} }
|
||||
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
|
||||
DISPLAYUPDATE_PROGRESS(
|
||||
@@ -1294,13 +1541,12 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
||||
|
||||
while (1) {
|
||||
int const ret = deflate(&strm, Z_FINISH);
|
||||
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
|
||||
{ size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out);
|
||||
if (cSize) {
|
||||
writeJob->usedBufferSize = cSize;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize);
|
||||
outFileSize += cSize;
|
||||
strm.next_out = (Bytef*)writeJob->buffer;
|
||||
strm.avail_out = (uInt)writeJob->bufferSize;
|
||||
strm.next_out = (Bytef*)io->outBuffer;
|
||||
strm.avail_out = (uInt)io->outCapacity;
|
||||
} }
|
||||
if (ret == Z_STREAM_END) break;
|
||||
if (ret != Z_BUF_ERROR)
|
||||
@@ -1312,8 +1558,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
|
||||
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
|
||||
} }
|
||||
*readsize = inFileSize;
|
||||
AIO_WritePool_releaseIoJob(writeJob);
|
||||
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||
FIO_SyncCompressIO_finish(io);
|
||||
return outFileSize;
|
||||
}
|
||||
#endif
|
||||
@@ -1325,11 +1570,11 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
||||
const char* srcFileName, U64 const srcFileSize,
|
||||
int compressionLevel, U64* readsize, int plain_lzma)
|
||||
{
|
||||
FIO_SyncCompressIO* const io = &ress->io;
|
||||
unsigned long long inFileSize = 0, outFileSize = 0;
|
||||
lzma_stream strm = LZMA_STREAM_INIT;
|
||||
lzma_action action = LZMA_RUN;
|
||||
lzma_ret ret;
|
||||
IOJob_t *writeJob = NULL;
|
||||
|
||||
if (compressionLevel < 0) compressionLevel = 0;
|
||||
if (compressionLevel > 9) compressionLevel = 9;
|
||||
@@ -1347,37 +1592,35 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
||||
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
|
||||
}
|
||||
|
||||
writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
|
||||
strm.next_out = (BYTE*)writeJob->buffer;
|
||||
strm.avail_out = writeJob->bufferSize;
|
||||
strm.next_out = (BYTE*)io->outBuffer;
|
||||
strm.avail_out = io->outCapacity;
|
||||
strm.next_in = 0;
|
||||
strm.avail_in = 0;
|
||||
|
||||
while (1) {
|
||||
if (strm.avail_in == 0) {
|
||||
size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
|
||||
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
|
||||
inFileSize += inSize;
|
||||
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
|
||||
strm.avail_in = ress->readCtx->srcBufferLoaded;
|
||||
size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
|
||||
if (io->srcBufferLoaded == 0) action = LZMA_FINISH;
|
||||
inFileSize += added;
|
||||
*readsize += added;
|
||||
strm.next_in = (BYTE const*)io->srcBuffer;
|
||||
strm.avail_in = io->srcBufferLoaded;
|
||||
}
|
||||
|
||||
{
|
||||
size_t const availBefore = strm.avail_in;
|
||||
ret = lzma_code(&strm, action);
|
||||
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
|
||||
FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in);
|
||||
}
|
||||
|
||||
|
||||
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
|
||||
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
|
||||
{ size_t const compBytes = writeJob->bufferSize - strm.avail_out;
|
||||
{ size_t const compBytes = io->outCapacity - strm.avail_out;
|
||||
if (compBytes) {
|
||||
writeJob->usedBufferSize = compBytes;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, compBytes);
|
||||
outFileSize += compBytes;
|
||||
strm.next_out = (BYTE*)writeJob->buffer;
|
||||
strm.avail_out = writeJob->bufferSize;
|
||||
strm.next_out = (BYTE*)io->outBuffer;
|
||||
strm.avail_out = io->outCapacity;
|
||||
} }
|
||||
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
|
||||
DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%",
|
||||
@@ -1393,8 +1636,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
|
||||
lzma_end(&strm);
|
||||
*readsize = inFileSize;
|
||||
|
||||
AIO_WritePool_releaseIoJob(writeJob);
|
||||
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||
FIO_SyncCompressIO_finish(io);
|
||||
|
||||
return outFileSize;
|
||||
}
|
||||
@@ -1415,21 +1657,20 @@ FIO_compressLz4Frame(cRess_t* ress,
|
||||
int compressionLevel, int checksumFlag,
|
||||
U64* readsize)
|
||||
{
|
||||
FIO_SyncCompressIO* const io = &ress->io;
|
||||
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
|
||||
unsigned long long inFileSize = 0, outFileSize = 0;
|
||||
|
||||
LZ4F_preferences_t prefs;
|
||||
LZ4F_compressionContext_t ctx;
|
||||
|
||||
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
|
||||
|
||||
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(errorCode))
|
||||
EXM_THROW(31, "zstd: failed to create lz4 compression context");
|
||||
|
||||
memset(&prefs, 0, sizeof(prefs));
|
||||
|
||||
assert(blockSize <= ress->readCtx->base.jobBufferSize);
|
||||
assert(blockSize <= io->inCapacity);
|
||||
|
||||
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
|
||||
prefs.autoFlush = 0;
|
||||
@@ -1440,25 +1681,26 @@ FIO_compressLz4Frame(cRess_t* ress,
|
||||
#if LZ4_VERSION_NUMBER >= 10600
|
||||
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
|
||||
#endif
|
||||
assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
|
||||
assert(LZ4F_compressBound(blockSize, &prefs) <= io->outCapacity);
|
||||
|
||||
{
|
||||
size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
|
||||
size_t headerSize = LZ4F_compressBegin(ctx, io->outBuffer, io->outCapacity, &prefs);
|
||||
if (LZ4F_isError(headerSize))
|
||||
EXM_THROW(33, "File header generation failed : %s",
|
||||
LZ4F_getErrorName(headerSize));
|
||||
writeJob->usedBufferSize = headerSize;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize);
|
||||
outFileSize += headerSize;
|
||||
|
||||
/* Read first block */
|
||||
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||
{
|
||||
size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize);
|
||||
inFileSize += added;
|
||||
*readsize += added;
|
||||
}
|
||||
|
||||
/* Main Loop */
|
||||
while (ress->readCtx->srcBufferLoaded) {
|
||||
size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
|
||||
size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
|
||||
ress->readCtx->srcBuffer, inSize, NULL);
|
||||
while (io->srcBufferLoaded) {
|
||||
size_t const inSize = MIN(blockSize, io->srcBufferLoaded);
|
||||
size_t const outSize = LZ4F_compressUpdate(ctx, io->outBuffer, io->outCapacity,
|
||||
io->srcBuffer, inSize, NULL);
|
||||
if (LZ4F_isError(outSize))
|
||||
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
|
||||
srcFileName, LZ4F_getErrorName(outSize));
|
||||
@@ -1473,30 +1715,27 @@ FIO_compressLz4Frame(cRess_t* ress,
|
||||
(double)outFileSize/(double)inFileSize*100);
|
||||
}
|
||||
|
||||
/* Write Block */
|
||||
writeJob->usedBufferSize = outSize;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, outSize);
|
||||
|
||||
/* Read next block */
|
||||
AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
|
||||
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
|
||||
FIO_SyncCompressIO_consumeBytes(io, inSize);
|
||||
{
|
||||
size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize);
|
||||
inFileSize += added;
|
||||
*readsize += added;
|
||||
}
|
||||
}
|
||||
|
||||
/* End of Stream mark */
|
||||
headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
|
||||
headerSize = LZ4F_compressEnd(ctx, io->outBuffer, io->outCapacity, NULL);
|
||||
if (LZ4F_isError(headerSize))
|
||||
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
|
||||
srcFileName, LZ4F_getErrorName(headerSize));
|
||||
|
||||
writeJob->usedBufferSize = headerSize;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize);
|
||||
outFileSize += headerSize;
|
||||
}
|
||||
|
||||
*readsize = inFileSize;
|
||||
LZ4F_freeCompressionContext(ctx);
|
||||
AIO_WritePool_releaseIoJob(writeJob);
|
||||
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
|
||||
FIO_SyncCompressIO_finish(io);
|
||||
|
||||
return outFileSize;
|
||||
}
|
||||
@@ -1505,12 +1744,11 @@ FIO_compressLz4Frame(cRess_t* ress,
|
||||
static unsigned long long
|
||||
FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
FIO_prefs_t* const prefs,
|
||||
const cRess_t* ressPtr,
|
||||
cRess_t* ress,
|
||||
const char* srcFileName, U64 fileSize,
|
||||
int compressionLevel, U64* readsize)
|
||||
{
|
||||
cRess_t const ress = *ressPtr;
|
||||
IOJob_t* writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
|
||||
FIO_SyncCompressIO* const io = &ress->io;
|
||||
|
||||
U64 compressedfilesize = 0;
|
||||
ZSTD_EndDirective directive = ZSTD_e_continue;
|
||||
@@ -1535,16 +1773,16 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
/* init */
|
||||
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
|
||||
pledgedSrcSize = fileSize;
|
||||
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
|
||||
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress->cctx, fileSize));
|
||||
} else if (prefs->streamSrcSize > 0) {
|
||||
/* unknown source size; use the declared stream size */
|
||||
pledgedSrcSize = prefs->streamSrcSize;
|
||||
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) );
|
||||
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress->cctx, prefs->streamSrcSize) );
|
||||
}
|
||||
|
||||
{ int windowLog;
|
||||
UTIL_HumanReadableSize_t windowSize;
|
||||
CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog));
|
||||
CHECK(ZSTD_CCtx_getParameter(ress->cctx, ZSTD_c_windowLog, &windowLog));
|
||||
if (windowLog == 0) {
|
||||
if (prefs->ldmFlag) {
|
||||
/* If long mode is set without a window size libzstd will set this size internally */
|
||||
@@ -1562,12 +1800,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
do {
|
||||
size_t stillToFlush;
|
||||
/* Fill input Buffer */
|
||||
size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
|
||||
ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 );
|
||||
size_t const inSize = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
|
||||
ZSTD_inBuffer inBuff = setInBuffer( io->srcBuffer, io->srcBufferLoaded, 0 );
|
||||
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
|
||||
*readsize += inSize;
|
||||
|
||||
if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
|
||||
if ((io->srcBufferLoaded == 0) || (*readsize == fileSize))
|
||||
directive = ZSTD_e_end;
|
||||
|
||||
stillToFlush = 1;
|
||||
@@ -1575,10 +1813,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
|
||||
|
||||
size_t const oldIPos = inBuff.pos;
|
||||
ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
|
||||
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
|
||||
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
|
||||
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
|
||||
ZSTD_outBuffer outBuff = setOutBuffer( io->outBuffer, io->outCapacity, 0 );
|
||||
size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx);
|
||||
CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive));
|
||||
FIO_SyncCompressIO_consumeBytes(io, inBuff.pos - oldIPos);
|
||||
|
||||
/* count stats */
|
||||
inputPresented++;
|
||||
@@ -1589,14 +1827,13 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
|
||||
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
|
||||
if (outBuff.pos) {
|
||||
writeJob->usedBufferSize = outBuff.pos;
|
||||
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
|
||||
FIO_SyncCompressIO_commitOut(io, io->outBuffer, outBuff.pos);
|
||||
compressedfilesize += outBuff.pos;
|
||||
}
|
||||
|
||||
/* adaptive mode : statistics measurement and speed correction */
|
||||
if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
|
||||
|
||||
lastAdaptTime = UTIL_getTime();
|
||||
|
||||
@@ -1669,14 +1906,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
|
||||
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
|
||||
compressionLevel += (compressionLevel == 0); /* skip 0 */
|
||||
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
|
||||
ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
|
||||
}
|
||||
if (speedChange == faster) {
|
||||
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
|
||||
compressionLevel --;
|
||||
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
|
||||
compressionLevel -= (compressionLevel == 0); /* skip 0 */
|
||||
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
|
||||
ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
|
||||
}
|
||||
speedChange = noChange;
|
||||
|
||||
@@ -1686,7 +1923,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
|
||||
/* display notification */
|
||||
if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
|
||||
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
|
||||
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
|
||||
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
|
||||
@@ -1733,8 +1970,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
(unsigned long long)*readsize, (unsigned long long)fileSize);
|
||||
}
|
||||
|
||||
AIO_WritePool_releaseIoJob(writeJob);
|
||||
AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
|
||||
FIO_SyncCompressIO_finish(io);
|
||||
|
||||
return compressedfilesize;
|
||||
}
|
||||
@@ -1747,7 +1983,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|
||||
static int
|
||||
FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
||||
FIO_prefs_t* const prefs,
|
||||
cRess_t ress,
|
||||
cRess_t* ress,
|
||||
const char* dstFileName, const char* srcFileName,
|
||||
int compressionLevel)
|
||||
{
|
||||
@@ -1762,12 +1998,12 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
||||
switch (prefs->compressionType) {
|
||||
default:
|
||||
case FIO_zstdCompression:
|
||||
compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, &ress, srcFileName, fileSize, compressionLevel, &readsize);
|
||||
compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, ress, srcFileName, fileSize, compressionLevel, &readsize);
|
||||
break;
|
||||
|
||||
case FIO_gzipCompression:
|
||||
#ifdef ZSTD_GZCOMPRESS
|
||||
compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
|
||||
compressedfilesize = FIO_compressGzFrame(ress, srcFileName, fileSize, compressionLevel, &readsize);
|
||||
#else
|
||||
(void)compressionLevel;
|
||||
EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n",
|
||||
@@ -1778,7 +2014,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
||||
case FIO_xzCompression:
|
||||
case FIO_lzmaCompression:
|
||||
#ifdef ZSTD_LZMACOMPRESS
|
||||
compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
|
||||
compressedfilesize = FIO_compressLzmaFrame(ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
|
||||
#else
|
||||
(void)compressionLevel;
|
||||
EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n",
|
||||
@@ -1788,7 +2024,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
||||
|
||||
case FIO_lz4Compression:
|
||||
#ifdef ZSTD_LZ4COMPRESS
|
||||
compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
|
||||
compressedfilesize = FIO_compressLz4Frame(ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
|
||||
#else
|
||||
(void)compressionLevel;
|
||||
EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n",
|
||||
@@ -1844,7 +2080,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
|
||||
*/
|
||||
static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
||||
FIO_prefs_t* const prefs,
|
||||
cRess_t ress,
|
||||
cRess_t* ress,
|
||||
const char* dstFileName,
|
||||
const char* srcFileName,
|
||||
const stat_t* srcFileStat,
|
||||
@@ -1855,8 +2091,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
||||
int transferStat = 0;
|
||||
int dstFd = -1;
|
||||
|
||||
assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
|
||||
if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
|
||||
if (ress->io.dstFile == NULL) {
|
||||
int dstFileInitialPermissions = DEFAULT_FILE_PERMISSIONS;
|
||||
if ( strcmp (srcFileName, stdinmark)
|
||||
&& strcmp (dstFileName, stdoutmark)
|
||||
@@ -1867,15 +2102,13 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
||||
|
||||
closeDstFile = 1;
|
||||
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
|
||||
{ FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
|
||||
{
|
||||
FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
|
||||
if (dstFile==NULL) return 1; /* could not open dstFileName */
|
||||
dstFd = fileno(dstFile);
|
||||
AIO_WritePool_setFile(ress.writeCtx, dstFile);
|
||||
FIO_SyncCompressIO_setDst(&ress->io, dstFile);
|
||||
}
|
||||
/* Must only be added after FIO_openDstFile() succeeds.
|
||||
* Otherwise we may delete the destination file if it already exists,
|
||||
* and the user presses Ctrl-C when asked if they wish to overwrite.
|
||||
*/
|
||||
/* Must only be added after FIO_openDstFile() succeeds. */
|
||||
addHandler(dstFileName);
|
||||
}
|
||||
|
||||
@@ -1889,7 +2122,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
||||
}
|
||||
|
||||
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
|
||||
if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
|
||||
if (FIO_SyncCompressIO_closeDst(&ress->io)) { /* error closing file */
|
||||
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
|
||||
result=1;
|
||||
}
|
||||
@@ -1898,10 +2131,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
|
||||
UTIL_utime(dstFileName, srcFileStat);
|
||||
}
|
||||
|
||||
if ( (result != 0) /* operation failure */
|
||||
&& strcmp(dstFileName, stdoutmark) /* special case : don't remove() stdout */
|
||||
) {
|
||||
FIO_removeFile(dstFileName); /* remove compression artefact; note don't do anything special if remove() fails */
|
||||
if ( (result != 0)
|
||||
&& strcmp(dstFileName, stdoutmark) ) {
|
||||
FIO_removeFile(dstFileName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2035,7 +2267,7 @@ static const char *compressedFileExtensions[] = {
|
||||
static int
|
||||
FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
|
||||
FIO_prefs_t* const prefs,
|
||||
cRess_t ress,
|
||||
cRess_t* ress,
|
||||
const char* dstFileName,
|
||||
const char* srcFileName,
|
||||
int compressionLevel)
|
||||
@@ -2057,7 +2289,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
|
||||
}
|
||||
|
||||
/* ensure src is not the same as dict (if present) */
|
||||
if (ress.dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress.dictFileName, &srcFileStat, &ress.dictFileStat)) {
|
||||
if (ress->dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress->dictFileName, &srcFileStat, &ress->dictFileStat)) {
|
||||
DISPLAYLEVEL(1, "zstd: cannot use %s as an input file and dictionary \n", srcFileName);
|
||||
return 1;
|
||||
}
|
||||
@@ -2076,19 +2308,21 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
|
||||
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
|
||||
if (srcFile == NULL) return 1; /* srcFile could not be opened */
|
||||
|
||||
/* AsyncIO is disabled for compression to favor predictable performance and simpler upkeep. */
|
||||
if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
|
||||
fileSize = UTIL_getFileSizeStat(&srcFileStat);
|
||||
(void)fileSize;
|
||||
AIO_ReadPool_setAsync(ress.readCtx, 0);
|
||||
AIO_WritePool_setAsync(ress.writeCtx, 0);
|
||||
|
||||
AIO_ReadPool_setFile(ress.readCtx, srcFile);
|
||||
FIO_SyncCompressIO_setSrc(&ress->io, srcFile);
|
||||
result = FIO_compressFilename_dstFile(
|
||||
fCtx, prefs, ress,
|
||||
dstFileName, srcFileName,
|
||||
&srcFileStat, compressionLevel);
|
||||
AIO_ReadPool_closeFile(ress.readCtx);
|
||||
FIO_SyncCompressIO_clearSrc(&ress->io);
|
||||
|
||||
if (srcFile != NULL && fclose(srcFile)) {
|
||||
DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
if ( prefs->removeSrcFile /* --rm */
|
||||
&& result == 0 /* success */
|
||||
@@ -2155,7 +2389,7 @@ int FIO_compressFilename(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, const
|
||||
int compressionLevel, ZSTD_compressionParameters comprParams)
|
||||
{
|
||||
cRess_t ress = FIO_createCResources(prefs, dictFileName, UTIL_getFileSize(srcFileName), compressionLevel, comprParams);
|
||||
int const result = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
||||
int const result = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
|
||||
|
||||
#define DISPLAY_LEVEL_DEFAULT 2
|
||||
|
||||
@@ -2252,13 +2486,13 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
|
||||
if (dstFile == NULL) { /* could not open outFileName */
|
||||
error = 1;
|
||||
} else {
|
||||
AIO_WritePool_setFile(ress.writeCtx, dstFile);
|
||||
FIO_SyncCompressIO_setDst(&ress.io, dstFile);
|
||||
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
|
||||
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
|
||||
status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
|
||||
if (!status) fCtx->nbFilesProcessed++;
|
||||
error |= status;
|
||||
}
|
||||
if (AIO_WritePool_closeFile(ress.writeCtx))
|
||||
if (FIO_SyncCompressIO_closeDst(&ress.io))
|
||||
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
|
||||
strerror(errno), outFileName);
|
||||
}
|
||||
@@ -2282,7 +2516,7 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
|
||||
} else {
|
||||
dstFileName = FIO_determineCompressedName(srcFileName, outDirName, suffix); /* cannot fail */
|
||||
}
|
||||
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
|
||||
status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
|
||||
if (!status) fCtx->nbFilesProcessed++;
|
||||
error |= status;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user