mirror of
https://github.com/facebook/zstd.git
synced 2025-08-08 17:22:10 +03:00
updated streaming API
This commit is contained in:
@@ -336,30 +336,30 @@ static int FIO_compressFilename_internal(cRess_t ress,
|
||||
DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20));
|
||||
|
||||
/* Compress using buffered streaming */
|
||||
{ ZSTD_rCursor rCursor = { ress.srcBuffer, inSize };
|
||||
ZSTD_wCursor wCursor = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
{ size_t const result = ZSTD_compressStream(ress.cctx, &wCursor, &rCursor);
|
||||
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
|
||||
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
{ size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
|
||||
if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); }
|
||||
if (rCursor.size != 0)
|
||||
if (inBuff.pos != inBuff.size)
|
||||
/* inBuff should be entirely consumed since buffer sizes are recommended ones */
|
||||
EXM_THROW(24, "Compression error : input block not fully consumed");
|
||||
|
||||
/* Write cBlock */
|
||||
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, wCursor.nbBytesWritten, dstFile);
|
||||
if (sizeCheck!=wCursor.nbBytesWritten) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); }
|
||||
compressedfilesize += wCursor.nbBytesWritten;
|
||||
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
|
||||
if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); }
|
||||
compressedfilesize += outBuff.pos;
|
||||
}
|
||||
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100);
|
||||
}
|
||||
|
||||
/* End of Frame */
|
||||
{ ZSTD_wCursor wCursor = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
size_t const result = ZSTD_endStream(ress.cctx, &wCursor);
|
||||
{ ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
size_t const result = ZSTD_endStream(ress.cctx, &outBuff);
|
||||
if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end");
|
||||
|
||||
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, wCursor.nbBytesWritten, dstFile);
|
||||
if (sizeCheck!=wCursor.nbBytesWritten) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
|
||||
compressedfilesize += wCursor.nbBytesWritten;
|
||||
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
|
||||
if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
|
||||
compressedfilesize += outBuff.pos;
|
||||
}
|
||||
|
||||
/* Status */
|
||||
@@ -624,19 +624,18 @@ unsigned long long FIO_decompressFrame(dRess_t ress,
|
||||
|
||||
/* Main decompression Loop */
|
||||
while (1) {
|
||||
ZSTD_rCursor rCursor = { ress.srcBuffer, readSize };
|
||||
ZSTD_wCursor wCursor = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
size_t const toRead = ZSTD_decompressStream(ress.dctx, &wCursor, &rCursor );
|
||||
ZSTD_inBuffer inBuff = { ress.srcBuffer, readSize, 0 };
|
||||
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||
size_t const toRead = ZSTD_decompressStream(ress.dctx, &outBuff, &inBuff );
|
||||
if (ZSTD_isError(toRead)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(toRead));
|
||||
readSize = rCursor.size;
|
||||
|
||||
/* Write block */
|
||||
storedSkips = FIO_fwriteSparse(foutput, ress.dstBuffer, wCursor.nbBytesWritten, storedSkips);
|
||||
frameSize += wCursor.nbBytesWritten;
|
||||
storedSkips = FIO_fwriteSparse(foutput, ress.dstBuffer, outBuff.pos, storedSkips);
|
||||
frameSize += outBuff.pos;
|
||||
DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)(frameSize>>20) );
|
||||
|
||||
if (toRead == 0) break; /* end of frame */
|
||||
if (readSize) EXM_THROW(37, "Decoding error : should consume entire input");
|
||||
if (inBuff.size != inBuff.pos) EXM_THROW(37, "Decoding error : should consume entire input");
|
||||
|
||||
/* Fill input buffer */
|
||||
if (toRead > ress.srcBufferSize) EXM_THROW(38, "too large block");
|
||||
|
@@ -155,8 +155,8 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
|
||||
U32 testNb=0;
|
||||
ZSTD_CStream* zc = ZSTD_createCStream_advanced(customMem);
|
||||
ZSTD_DStream* zd = ZSTD_createDStream_advanced(customMem);
|
||||
ZSTD_rCursor rCursor;
|
||||
ZSTD_wCursor wCursor;
|
||||
ZSTD_inBuffer inBuff;
|
||||
ZSTD_outBuffer outBuff;
|
||||
|
||||
/* Create compressible test buffer */
|
||||
if (!CNBuffer || !compressedBuffer || !decodedBuffer || !zc || !zd) {
|
||||
@@ -173,39 +173,41 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
|
||||
/* Basic compression test */
|
||||
DISPLAYLEVEL(4, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
|
||||
ZSTD_initCStream_usingDict(zc, CNBuffer, 128 KB, 1);
|
||||
wCursor.ptr = (char*)(compressedBuffer)+cSize;
|
||||
wCursor.size = compressedBufferSize;
|
||||
wCursor.nbBytesWritten = 0;
|
||||
rCursor.ptr = CNBuffer;
|
||||
rCursor.size = CNBufferSize;
|
||||
{ size_t const r = ZSTD_compressStream(zc, &wCursor, &rCursor);
|
||||
outBuff.dst = (char*)(compressedBuffer)+cSize;
|
||||
outBuff.size = compressedBufferSize;
|
||||
outBuff.pos = 0;
|
||||
inBuff.src = CNBuffer;
|
||||
inBuff.size = CNBufferSize;
|
||||
inBuff.pos = 0;
|
||||
{ size_t const r = ZSTD_compressStream(zc, &outBuff, &inBuff);
|
||||
if (ZSTD_isError(r)) goto _output_error; }
|
||||
if (rCursor.size != 0) goto _output_error; /* entire input should be consumed */
|
||||
{ size_t const r = ZSTD_endStream(zc, &wCursor);
|
||||
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
|
||||
{ size_t const r = ZSTD_endStream(zc, &outBuff);
|
||||
if (r != 0) goto _output_error; } /*< error, or some data not flushed */
|
||||
cSize += wCursor.nbBytesWritten;
|
||||
cSize += outBuff.pos;
|
||||
DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
|
||||
|
||||
/* skippable frame test */
|
||||
DISPLAYLEVEL(4, "test%3i : decompress skippable frame : ", testNb++);
|
||||
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
|
||||
rCursor.ptr = compressedBuffer;
|
||||
rCursor.size = cSize;
|
||||
wCursor.ptr = decodedBuffer;
|
||||
wCursor.size = CNBufferSize;
|
||||
wCursor.nbBytesWritten = 0;
|
||||
{ size_t const r = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
inBuff.src = compressedBuffer;
|
||||
inBuff.size = cSize;
|
||||
inBuff.pos = 0;
|
||||
outBuff.dst = decodedBuffer;
|
||||
outBuff.size = CNBufferSize;
|
||||
outBuff.pos = 0;
|
||||
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
if (r != 0) goto _output_error; }
|
||||
if (wCursor.nbBytesWritten != 0) goto _output_error; /* skippable frame len is 0 */
|
||||
if (outBuff.pos != 0) goto _output_error; /* skippable frame len is 0 */
|
||||
DISPLAYLEVEL(4, "OK \n");
|
||||
|
||||
/* Basic decompression test */
|
||||
DISPLAYLEVEL(4, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
|
||||
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
|
||||
{ size_t const r = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
if (r != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */
|
||||
if (wCursor.nbBytesWritten != CNBufferSize) goto _output_error; /* should regenerate the same amount */
|
||||
if (rCursor.size != 0) goto _output_error; /* should have read the entire frame */
|
||||
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
|
||||
if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */
|
||||
DISPLAYLEVEL(4, "OK \n");
|
||||
|
||||
/* check regenerated data is byte exact */
|
||||
@@ -220,26 +222,27 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
|
||||
DISPLAYLEVEL(4, "test%3i : decompress byte-by-byte : ", testNb++);
|
||||
{ size_t r = 1;
|
||||
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
|
||||
rCursor.ptr = compressedBuffer;
|
||||
wCursor.ptr = decodedBuffer;
|
||||
wCursor.nbBytesWritten = 0;
|
||||
inBuff.src = compressedBuffer;
|
||||
outBuff.dst = decodedBuffer;
|
||||
inBuff.pos = 0;
|
||||
outBuff.pos = 0;
|
||||
while (r) { /* skippable frame */
|
||||
rCursor.size = 1;
|
||||
wCursor.size = 1;
|
||||
r = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
inBuff.size = inBuff.pos + 1;
|
||||
outBuff.size = outBuff.pos + 1;
|
||||
r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
if (ZSTD_isError(r)) goto _output_error;
|
||||
}
|
||||
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
|
||||
r=1;
|
||||
while (r) { /* normal frame */
|
||||
rCursor.size = 1;
|
||||
wCursor.size = 1;
|
||||
r = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
inBuff.size = inBuff.pos + 1;
|
||||
outBuff.size = outBuff.pos + 1;
|
||||
r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
if (ZSTD_isError(r)) goto _output_error;
|
||||
}
|
||||
}
|
||||
if (wCursor.nbBytesWritten != CNBufferSize) goto _output_error; /* should regenerate the same amount */
|
||||
if ((size_t)(rCursor.ptr - compressedBuffer) != cSize) goto _output_error; /* should have read the entire frame */
|
||||
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
|
||||
if (inBuff.pos != cSize) goto _output_error; /* should have read the entire frame */
|
||||
DISPLAYLEVEL(4, "OK \n");
|
||||
|
||||
/* check regenerated data is byte exact */
|
||||
@@ -345,7 +348,7 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
|
||||
const BYTE* srcBuffer;
|
||||
const BYTE* dict;
|
||||
size_t maxTestSize, dictSize;
|
||||
size_t cSize, totalTestSize, totalCSize, totalGenSize;
|
||||
size_t cSize, totalTestSize, totalGenSize;
|
||||
U32 n, nbChunks;
|
||||
XXH64_state_t xxhState;
|
||||
U64 crcOrig;
|
||||
@@ -395,31 +398,30 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
|
||||
/* multi-segments compression test */
|
||||
XXH64_reset(&xxhState, 0);
|
||||
nbChunks = (FUZ_rand(&lseed) & 127) + 2;
|
||||
{ ZSTD_wCursor wCursor = { cBuffer, cBufferSize, 0 } ;
|
||||
{ ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
|
||||
for (n=0, cSize=0, totalTestSize=0 ; (n<nbChunks) && (totalTestSize < maxTestSize) ; n++) {
|
||||
/* compress random chunk into random size dst buffer */
|
||||
{ size_t readChunkSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
{ size_t const readChunkSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - readChunkSize);
|
||||
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
|
||||
ZSTD_rCursor rCursor = { srcBuffer+srcStart, readChunkSize };
|
||||
wCursor.size = dstBuffSize;
|
||||
ZSTD_inBuffer inBuff = { srcBuffer+srcStart, readChunkSize, 0 };
|
||||
outBuff.size = outBuff.pos + dstBuffSize;
|
||||
|
||||
{ size_t const compressionError = ZSTD_compressStream(zc, &wCursor, &rCursor);
|
||||
{ size_t const compressionError = ZSTD_compressStream(zc, &outBuff, &inBuff);
|
||||
CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); }
|
||||
readChunkSize -= rCursor.size;
|
||||
|
||||
XXH64_update(&xxhState, srcBuffer+srcStart, readChunkSize);
|
||||
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, readChunkSize);
|
||||
totalTestSize += readChunkSize;
|
||||
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
|
||||
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
|
||||
totalTestSize += inBuff.pos;
|
||||
}
|
||||
|
||||
/* random flush operation, to mess around */
|
||||
if ((FUZ_rand(&lseed) & 15) == 0) {
|
||||
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
|
||||
wCursor.size = dstBuffSize;
|
||||
{ size_t const flushError = ZSTD_flushStream(zc, &wCursor);
|
||||
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
|
||||
outBuff.size = outBuff.pos + adjustedDstSize;
|
||||
{ size_t const flushError = ZSTD_flushStream(zc, &outBuff);
|
||||
CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError));
|
||||
} } }
|
||||
|
||||
@@ -429,33 +431,32 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
|
||||
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
|
||||
U32 const enoughDstSize = (adjustedDstSize >= remainingToFlush);
|
||||
wCursor.size = adjustedDstSize;
|
||||
remainingToFlush = ZSTD_endStream(zc, &wCursor);
|
||||
outBuff.size = outBuff.pos + adjustedDstSize;
|
||||
remainingToFlush = ZSTD_endStream(zc, &outBuff);
|
||||
CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush));
|
||||
CHECK (enoughDstSize && remainingToFlush, "ZSTD_endStream() not fully flushed (%u remaining), but enough space available", (U32)remainingToFlush);
|
||||
} }
|
||||
crcOrig = XXH64_digest(&xxhState);
|
||||
cSize = wCursor.nbBytesWritten;
|
||||
cSize = outBuff.pos;
|
||||
}
|
||||
|
||||
/* multi - fragments decompression test */
|
||||
ZSTD_initDStream_usingDict(zd, dict, dictSize);
|
||||
{ size_t decompressionResult = 1;
|
||||
ZSTD_rCursor rCursor = { cBuffer, cSize };
|
||||
ZSTD_wCursor wCursor = { dstBuffer, dstBufferSize, 0 };
|
||||
for (totalCSize = 0, totalGenSize = 0 ; decompressionResult ; ) {
|
||||
ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
|
||||
ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
|
||||
for (totalGenSize = 0 ; decompressionResult ; ) {
|
||||
size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
|
||||
rCursor.size = readCSrcSize;
|
||||
wCursor.size = dstBuffSize;
|
||||
decompressionResult = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
inBuff.size = inBuff.pos + readCSrcSize;
|
||||
outBuff.size = inBuff.pos + dstBuffSize;
|
||||
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
|
||||
totalCSize += readCSrcSize - rCursor.size;
|
||||
}
|
||||
CHECK (decompressionResult != 0, "frame not fully decoded");
|
||||
CHECK (wCursor.nbBytesWritten != totalTestSize, "decompressed data : wrong size")
|
||||
CHECK (totalCSize != cSize, "compressed data should be fully read")
|
||||
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size")
|
||||
CHECK (inBuff.pos != cSize, "compressed data should be fully read")
|
||||
{ U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0);
|
||||
if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize);
|
||||
CHECK (crcDest!=crcOrig, "decompressed data corrupted");
|
||||
@@ -475,15 +476,15 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
|
||||
|
||||
/* try decompression on noisy data */
|
||||
ZSTD_initDStream(zd);
|
||||
{ ZSTD_rCursor rCursor = { cBuffer, cSize };
|
||||
ZSTD_wCursor wCursor = { dstBuffer, dstBufferSize, 0 };
|
||||
while (wCursor.nbBytesWritten < dstBufferSize) {
|
||||
{ ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
|
||||
ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
|
||||
while (outBuff.pos < dstBufferSize) {
|
||||
size_t const randomCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
|
||||
size_t const adjustedDstSize = MIN(dstBufferSize - wCursor.nbBytesWritten, randomDstSize);
|
||||
wCursor.size = adjustedDstSize;
|
||||
rCursor.size = randomCSrcSize;
|
||||
{ size_t const decompressError = ZSTD_decompressStream(zd, &wCursor, &rCursor);
|
||||
size_t const adjustedDstSize = MIN(dstBufferSize - outBuff.pos, randomDstSize);
|
||||
outBuff.size = outBuff.pos + adjustedDstSize;
|
||||
inBuff.size = inBuff.pos + randomCSrcSize;
|
||||
{ size_t const decompressError = ZSTD_decompressStream(zd, &outBuff, &inBuff);
|
||||
if (ZSTD_isError(decompressError)) break; /* error correctly detected */
|
||||
} } } }
|
||||
DISPLAY("\r%u fuzzer tests completed \n", testNb);
|
||||
|
Reference in New Issue
Block a user