diff --git a/contrib/pzstd/Logging.h b/contrib/pzstd/Logging.h new file mode 100644 index 000000000..76c982ab2 --- /dev/null +++ b/contrib/pzstd/Logging.h @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include +#include + +namespace pzstd { + +constexpr int ERROR = 1; +constexpr int INFO = 2; +constexpr int DEBUG = 3; +constexpr int VERBOSE = 4; + +class Logger { + std::mutex mutex_; + FILE* out_; + const int level_; + + using Clock = std::chrono::system_clock; + Clock::time_point lastUpdate_; + std::chrono::milliseconds refreshRate_; + + public: + explicit Logger(int level, FILE* out = stderr) + : out_(out), level_(level), lastUpdate_(Clock::now()), + refreshRate_(150) {} + + + bool logsAt(int level) { + return level <= level_; + } + + template + void operator()(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + std::fprintf(out_, fmt, args...); + } + + template + void update(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + auto now = Clock::now(); + if (now - lastUpdate_ > refreshRate_) { + lastUpdate_ = now; + std::fprintf(out_, "\r"); + std::fprintf(out_, fmt, args...); + } + } + + void clear(int level) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + std::fprintf(out_, "\r%79s\r", ""); + } +}; + +} diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 6b3d27b3e..c5b4ce4cb 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -85,30 +85,28 @@ static std::uint64_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(state, outs, outputFd, options.decompress, - options.verbosity); + bytesWritten = writeFile(state, outs, outputFd, options.decompress); } else { // Add a job that reads the input and starts all the decompression jobs readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(state, outs, outputFd, options.decompress, - options.verbosity); + bytesWritten = writeFile(state, outs, outputFd, options.decompress); } } - if (options.verbosity > 1 && !state.errorHolder.hasError()) { + if (!state.errorHolder.hasError()) { std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; if (!options.decompress) { double ratio = static_cast(bytesWritten) / static_cast(bytesRead + !bytesRead); - std::fprintf(stderr, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 + state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 " bytes, %s)\n", inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, outputFileName.c_str()); } else { - std::fprintf(stderr, "%-20s: %" PRIu64 " bytes \n", + state.log(INFO, "%-20s: %" PRIu64 " bytes \n", inputFileName.c_str(),bytesWritten); } } @@ -138,7 +136,7 @@ static FILE *openInputFile(const std::string &inputFile, static FILE *openOutputFile(const Options &options, const std::string &outputFile, - ErrorHolder &errorHolder) { + SharedState& state) { if (outputFile == "-") { SET_BINARY_MODE(stdout); return stdout; @@ -148,41 +146,39 @@ static FILE *openOutputFile(const Options &options, auto outputFd = std::fopen(outputFile.c_str(), "rb"); if (outputFd != nullptr) { std::fclose(outputFd); - if (options.verbosity <= 1) { - errorHolder.setError("Output file exists"); + if (!state.log.logsAt(INFO)) { + state.errorHolder.setError("Output file exists"); return nullptr; } - std::fprintf( - stderr, + state.log( + INFO, "pzstd: %s already exists; do you wish to overwrite (y/n) ? ", outputFile.c_str()); int c = getchar(); if (c != 'y' && c != 'Y') { - errorHolder.setError("Not overwritten"); + state.errorHolder.setError("Not overwritten"); return nullptr; } } } auto outputFd = std::fopen(outputFile.c_str(), "wb"); - if (!errorHolder.check( + if (!state.errorHolder.check( outputFd != nullptr, "Failed to open output file")) { - return 0; + return nullptr; } return outputFd; } int pzstdMain(const Options &options) { int returnCode = 0; - SharedState state(options.decompress, options.determineParameters()); + SharedState state(options); for (const auto& input : options.inputFiles) { // Setup the shared state auto printErrorGuard = makeScopeGuard([&] { if (state.errorHolder.hasError()) { returnCode = 1; - if (options.verbosity > 0) { - std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), - state.errorHolder.getError().c_str()); - } + state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(), + state.errorHolder.getError().c_str()); } }); // Open the input file @@ -197,7 +193,7 @@ int pzstdMain(const Options &options) { "Input file does not have extension .zst")) { continue; } - auto outputFd = openOutputFile(options, outputFile, state.errorHolder); + auto outputFd = openOutputFile(options, outputFile, state); if (outputFd == nullptr) { continue; } @@ -578,33 +574,14 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } -void updateWritten(int verbosity, std::uint64_t bytesWritten) { - if (verbosity <= 1) { - return; - } - using Clock = std::chrono::system_clock; - static Clock::time_point then; - constexpr std::chrono::milliseconds refreshRate{150}; - - auto now = Clock::now(); - if (now - then > refreshRate) { - then = now; - std::fprintf(stderr, "\rWritten: %u MB ", - static_cast(bytesWritten >> 20)); - } -} - std::uint64_t writeFile( SharedState& state, WorkQueue>& outs, FILE* outputFd, - bool decompress, - int verbosity) { + bool decompress) { auto& errorHolder = state.errorHolder; - auto lineClearGuard = makeScopeGuard([verbosity] { - if (verbosity > 1) { - std::fprintf(stderr, "\r%79s\r", ""); - } + auto lineClearGuard = makeScopeGuard([&state] { + state.log.clear(INFO); }); std::uint64_t bytesWritten = 0; std::shared_ptr out; @@ -630,7 +607,8 @@ std::uint64_t writeFile( return bytesWritten; } bytesWritten += buffer.size(); - updateWritten(verbosity, bytesWritten); + state.log.update(INFO, "Written: %u MB ", + static_cast(bytesWritten >> 20)); } } return bytesWritten; diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index b02fe7b19..9fb2c4884 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -9,6 +9,7 @@ #pragma once #include "ErrorHolder.h" +#include "Logging.h" #include "Options.h" #include "utils/Buffer.h" #include "utils/Range.h" @@ -35,8 +36,9 @@ int pzstdMain(const Options& options); class SharedState { public: - SharedState(bool decompress, ZSTD_parameters parameters) { - if (!decompress) { + SharedState(const Options& options) : log(options.verbosity) { + if (!options.decompress) { + auto parameters = options.determineParameters(); cStreamPool.reset(new ResourcePool{ [parameters]() -> ZSTD_CStream* { auto zcs = ZSTD_createCStream(); @@ -72,6 +74,7 @@ class SharedState { } } + Logger log; ErrorHolder errorHolder; std::unique_ptr> cStreamPool; std::unique_ptr> dStreamPool; @@ -129,13 +132,11 @@ std::uint64_t asyncDecompressFrames( * (de)compression job. * @param outputFd The file descriptor to write to * @param decompress Are we decompressing? - * @param verbosity The verbosity level to log at * @returns The number of bytes written */ std::uint64_t writeFile( SharedState& state, WorkQueue>& outs, FILE* outputFd, - bool decompress, - int verbosity); + bool decompress); }