1
0
mirror of https://github.com/facebook/zstd.git synced 2025-08-08 17:22:10 +03:00

[pzstd] Fix frame size for small files + add logging

This commit is contained in:
Nick Terrell
2016-11-15 16:39:09 -08:00
parent 6a7b7ec112
commit f147fccd0c
2 changed files with 16 additions and 11 deletions

View File

@@ -341,15 +341,7 @@ static size_t calculateStep(
std::uintmax_t size, std::uintmax_t size,
size_t numThreads, size_t numThreads,
const ZSTD_parameters &params) { const ZSTD_parameters &params) {
size_t step = size_t{1} << (params.cParams.windowLog + 2); return size_t{1} << (params.cParams.windowLog + 2);
// If file size is known, see if a smaller step will spread work more evenly
if (size != 0) {
const std::uintmax_t newStep = size / numThreads;
if (newStep != 0 && newStep <= std::numeric_limits<size_t>::max()) {
step = std::min(step, static_cast<size_t>(newStep));
}
}
return step;
} }
namespace { namespace {
@@ -401,6 +393,7 @@ std::uint64_t asyncCompressChunks(
// Break the input up into chunks of size `step` and compress each chunk // Break the input up into chunks of size `step` and compress each chunk
// independently. // independently.
size_t step = calculateStep(size, numThreads, params); size_t step = calculateStep(size, numThreads, params);
state.log(DEBUG, "Chosen frame size: %zu\n", step);
auto status = FileStatus::Continue; auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !state.errorHolder.hasError()) { while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into. // Make a new input queue that we will put the chunk's input data into.
@@ -415,6 +408,7 @@ std::uint64_t asyncCompressChunks(
}); });
// Pass the output queue to the writer thread. // Pass the output queue to the writer thread.
chunks.push(std::move(out)); chunks.push(std::move(out));
state.log(VERBOSE, "Starting a new frame\n");
// Fill the input queue for the compression job we just started // Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
} }
@@ -551,11 +545,14 @@ std::uint64_t asyncDecompressFrames(
if (frameSize == 0) { if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task // Pass the rest of the source to this decompression task
state.log(VERBOSE,
"Input not in pzstd format, falling back to serial decompression\n");
while (status == FileStatus::Continue && !state.errorHolder.hasError()) { while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
} }
break; break;
} }
state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
// Fill the input queue for the decompression job we just started // Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
} }

View File

@@ -40,7 +40,8 @@ class SharedState {
if (!options.decompress) { if (!options.decompress) {
auto parameters = options.determineParameters(); auto parameters = options.determineParameters();
cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
[parameters]() -> ZSTD_CStream* { [this, parameters]() -> ZSTD_CStream* {
this->log(VERBOSE, "Creating new ZSTD_CStream\n");
auto zcs = ZSTD_createCStream(); auto zcs = ZSTD_createCStream();
if (zcs) { if (zcs) {
auto err = ZSTD_initCStream_advanced( auto err = ZSTD_initCStream_advanced(
@@ -57,7 +58,8 @@ class SharedState {
}}); }});
} else { } else {
dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
[]() -> ZSTD_DStream* { [this]() -> ZSTD_DStream* {
this->log(VERBOSE, "Creating new ZSTD_DStream\n");
auto zds = ZSTD_createDStream(); auto zds = ZSTD_createDStream();
if (zds) { if (zds) {
auto err = ZSTD_initDStream(zds); auto err = ZSTD_initDStream(zds);
@@ -74,6 +76,12 @@ class SharedState {
} }
} }
~SharedState() {
// The resource pools have references to this, so destroy them first.
cStreamPool.reset();
dStreamPool.reset();
}
Logger log; Logger log;
ErrorHolder errorHolder; ErrorHolder errorHolder;
std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;