diff --git a/contrib/pzstd/ErrorHolder.h b/contrib/pzstd/ErrorHolder.h new file mode 100644 index 000000000..4a81a068c --- /dev/null +++ b/contrib/pzstd/ErrorHolder.h @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include +#include +#include + +namespace pzstd { + +// Coordinates graceful shutdown of the pzstd pipeline +class ErrorHolder { + std::atomic error_; + std::string message_; + + public: + ErrorHolder() : error_(false) {} + + bool hasError() noexcept { + return error_.load(); + } + + void setError(std::string message) noexcept { + // Given multiple possibly concurrent calls, exactly one will ever succeed. + bool expected = false; + if (error_.compare_exchange_strong(expected, true)) { + message_ = std::move(message); + } + } + + bool check(bool predicate, std::string message) noexcept { + if (!predicate) { + setError(std::move(message)); + } + return !hasError(); + } + + std::string getError() noexcept { + error_.store(false); + return std::move(message_); + } + + ~ErrorHolder() { + if (hasError()) { + throw std::logic_error(message_); + } + } +}; +} diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile new file mode 100644 index 000000000..512a76292 --- /dev/null +++ b/contrib/pzstd/Makefile @@ -0,0 +1,71 @@ +# ########################################################################## +# Copyright (c) 2016-present, Facebook, Inc. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. +# ########################################################################## + +ZSTDDIR = ../../lib +PROGDIR = ../../programs + +CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder -I$(PROGDIR) -I. +CFLAGS ?= -O3 +CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wstrict-aliasing=1 \ + -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \ + -std=c++11 +CFLAGS += $(MOREFLAGS) +FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) + + +ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c +ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c +ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c +ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) + + +# Define *.exe as extension for Windows systems +ifneq (,$(filter Windows%,$(OS))) +EXT =.exe +else +EXT = +endif + +.PHONY: default all test clean + +default: pzstd + +all: pzstd + + +libzstd.a: $(ZSTD_FILES) + $(MAKE) -C $(ZSTDDIR) libzstd + @cp $(ZSTDDIR)/libzstd.a . + + +Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h + $(CXX) $(FLAGS) -c Pzstd.cpp -o $@ + +SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h + $(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@ + +Options.o: Options.h Options.cpp + $(CXX) $(FLAGS) -c Options.cpp -o $@ + +main.o: main.cpp *.h utils/*.h + $(CXX) $(FLAGS) -c main.cpp -o $@ + +pzstd: libzstd.a Pzstd.o SkippableFrame.o Options.o main.o + $(CXX) $(FLAGS) $^ -o $@$(EXT) + +test: libzstd.a Pzstd.o Options.o SkippableFrame.o + $(MAKE) -C utils/test test + $(MAKE) -C test test + +clean: + $(MAKE) -C $(ZSTDDIR) clean + $(MAKE) -C utils/test clean + $(MAKE) -C test clean + @$(RM) libzstd.a *.o pzstd$(EXT) + @echo Cleaning completed diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp new file mode 100644 index 000000000..dc6aeef14 --- /dev/null +++ b/contrib/pzstd/Options.cpp @@ -0,0 +1,182 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "Options.h" + +#include + +namespace pzstd { + +namespace { +unsigned parseUnsigned(const char* arg) { + unsigned result = 0; + while (*arg >= '0' && *arg <= '9') { + result *= 10; + result += *arg - '0'; + ++arg; + } + return result; +} + +const std::string zstdExtension = ".zst"; +constexpr unsigned defaultCompressionLevel = 3; +constexpr unsigned maxNonUltraCompressionLevel = 19; + +void usage() { + std::fprintf(stderr, "Usage:\n"); + std::fprintf(stderr, "\tpzstd [args] FILE\n"); + std::fprintf(stderr, "Parallel ZSTD options:\n"); + std::fprintf(stderr, "\t-n/--num-threads #: Number of threads to spawn\n"); + std::fprintf(stderr, "\t-p/--pzstd-headers: Write pzstd headers to enable parallel decompression\n"); + + std::fprintf(stderr, "ZSTD options:\n"); + std::fprintf(stderr, "\t-u/--ultra : enable levels beyond %i, up to %i (requires more memory)\n", maxNonUltraCompressionLevel, ZSTD_maxCLevel()); + std::fprintf(stderr, "\t-h/--help : display help and exit\n"); + std::fprintf(stderr, "\t-V/--version : display version number and exit\n"); + std::fprintf(stderr, "\t-d/--decompress : decompression\n"); + std::fprintf(stderr, "\t-f/--force : overwrite output\n"); + std::fprintf(stderr, "\t-o/--output file : result stored into `file`\n"); + std::fprintf(stderr, "\t-c/--stdout : write output to standard output\n"); + std::fprintf(stderr, "\t-# : # compression level (1-%d, default:%d)\n", maxNonUltraCompressionLevel, defaultCompressionLevel); +} +} // anonymous namespace + +Options::Options() + : numThreads(0), + maxWindowLog(23), + compressionLevel(defaultCompressionLevel), + decompress(false), + overwrite(false), + pzstdHeaders(false) {} + +bool Options::parse(int argc, const char** argv) { + bool ultra = false; + for (int i = 1; i < argc; ++i) { + const char* arg = argv[i]; + // Arguments with a short option + char option = 0; + if (!std::strcmp(arg, "--num-threads")) { + option = 'n'; + } else if (!std::strcmp(arg, "--pzstd-headers")) { + option = 'p'; + } else if (!std::strcmp(arg, "--ultra")) { + option = 'u'; + } else if (!std::strcmp(arg, "--version")) { + option = 'V'; + } else if (!std::strcmp(arg, "--help")) { + option = 'h'; + } else if (!std::strcmp(arg, "--decompress")) { + option = 'd'; + } else if (!std::strcmp(arg, "--force")) { + option = 'f'; + } else if (!std::strcmp(arg, "--output")) { + option = 'o'; + } else if (!std::strcmp(arg, "--stdout")) { + option = 'c'; + }else if (arg[0] == '-' && arg[1] != 0) { + // Parse the compression level or short option + if (arg[1] >= '0' && arg[1] <= '9') { + compressionLevel = parseUnsigned(arg + 1); + continue; + } + option = arg[1]; + } else if (inputFile.empty()) { + inputFile = arg; + continue; + } else { + std::fprintf(stderr, "Invalid argument: %s.\n", arg); + return false; + } + + switch (option) { + case 'n': + if (++i == argc) { + std::fprintf(stderr, "Invalid argument: -n requires an argument.\n"); + return false; + } + numThreads = parseUnsigned(argv[i]); + if (numThreads == 0) { + std::fprintf(stderr, "Invalid argument: # of threads must be > 0.\n"); + } + break; + case 'p': + pzstdHeaders = true; + break; + case 'u': + ultra = true; + maxWindowLog = 0; + break; + case 'V': + std::fprintf(stderr, "ZSTD version: %s.\n", ZSTD_VERSION_STRING); + return false; + case 'h': + usage(); + return false; + case 'd': + decompress = true; + break; + case 'f': + overwrite = true; + break; + case 'o': + if (++i == argc) { + std::fprintf(stderr, "Invalid argument: -o requires an argument.\n"); + return false; + } + outputFile = argv[i]; + break; + case 'c': + outputFile = '-'; + break; + default: + std::fprintf(stderr, "Invalid argument: %s.\n", arg); + return false; + } + } + // Determine input file if not specified + if (inputFile.empty()) { + inputFile = "-"; + } + // Determine output file if not specified + if (outputFile.empty()) { + if (inputFile == "-") { + std::fprintf( + stderr, + "Invalid arguments: Reading from stdin, but -o not provided.\n"); + return false; + } + // Attempt to add/remove zstd extension from the input file + if (decompress) { + int stemSize = inputFile.size() - zstdExtension.size(); + if (stemSize > 0 && inputFile.substr(stemSize) == zstdExtension) { + outputFile = inputFile.substr(0, stemSize); + } else { + std::fprintf( + stderr, "Invalid argument: Unable to determine output file.\n"); + return false; + } + } else { + outputFile = inputFile + zstdExtension; + } + } + // Check compression level + { + unsigned maxCLevel = ultra ? ZSTD_maxCLevel() : maxNonUltraCompressionLevel; + if (compressionLevel > maxCLevel) { + std::fprintf( + stderr, "Invalid compression level %u.\n", compressionLevel); + } + } + // Check that numThreads is set + if (numThreads == 0) { + std::fprintf(stderr, "Invalid arguments: # of threads not specified.\n"); + return false; + } + return true; +} +} diff --git a/contrib/pzstd/Options.h b/contrib/pzstd/Options.h new file mode 100644 index 000000000..47c5f78a6 --- /dev/null +++ b/contrib/pzstd/Options.h @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd.h" +#undef ZSTD_STATIC_LINKING_ONLY + +#include +#include + +namespace pzstd { + +struct Options { + unsigned numThreads; + unsigned maxWindowLog; + unsigned compressionLevel; + bool decompress; + std::string inputFile; + std::string outputFile; + bool overwrite; + bool pzstdHeaders; + + Options(); + Options( + unsigned numThreads, + unsigned maxWindowLog, + unsigned compressionLevel, + bool decompress, + const std::string& inputFile, + const std::string& outputFile, + bool overwrite, + bool pzstdHeaders) + : numThreads(numThreads), + maxWindowLog(maxWindowLog), + compressionLevel(compressionLevel), + decompress(decompress), + inputFile(inputFile), + outputFile(outputFile), + overwrite(overwrite), + pzstdHeaders(pzstdHeaders) {} + + bool parse(int argc, const char** argv); + + ZSTD_parameters determineParameters() const { + ZSTD_parameters params = ZSTD_getParams(compressionLevel, 0, 0); + if (maxWindowLog != 0 && params.cParams.windowLog > maxWindowLog) { + params.cParams.windowLog = maxWindowLog; + params.cParams = ZSTD_adjustCParams(params.cParams, 0, 0); + } + return params; + } +}; +} diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp new file mode 100644 index 000000000..84f6a2e4c --- /dev/null +++ b/contrib/pzstd/Pzstd.cpp @@ -0,0 +1,462 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "Pzstd.h" +#include "SkippableFrame.h" +#include "utils/FileSystem.h" +#include "utils/Range.h" +#include "utils/ScopeGuard.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" + +#include +#include +#include +#include + +namespace pzstd { + +namespace { +#ifdef _WIN32 +const std::string nullOutput = "nul"; +#else +const std::string nullOutput = "/dev/null"; +#endif +} + +using std::size_t; + +size_t pzstdMain(const Options& options, ErrorHolder& errorHolder) { + // Open the input file and attempt to determine its size + FILE* inputFd = stdin; + size_t inputSize = 0; + if (options.inputFile != "-") { + inputFd = std::fopen(options.inputFile.c_str(), "rb"); + if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) { + return 0; + } + std::error_code ec; + inputSize = file_size(options.inputFile, ec); + if (ec) { + inputSize = 0; + } + } + auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); + + // Check if the output file exists and then open it + FILE* outputFd = stdout; + if (options.outputFile != "-") { + if (!options.overwrite && options.outputFile != nullOutput) { + outputFd = std::fopen(options.outputFile.c_str(), "rb"); + if (!errorHolder.check(outputFd == nullptr, "Output file exists")) { + return 0; + } + } + outputFd = std::fopen(options.outputFile.c_str(), "wb"); + if (!errorHolder.check( + outputFd != nullptr, "Failed to open output file")) { + return 0; + } + } + auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); + + // WorkQueue outlives ThreadPool so in the case of error we are certain + // we don't accidently try to call push() on it after it is destroyed. + WorkQueue> outs; + size_t bytesWritten; + { + // Initialize the thread pool with numThreads + ThreadPool executor(options.numThreads); + if (!options.decompress) { + // Add a job that reads the input and starts all the compression jobs + executor.add( + [&errorHolder, &outs, &executor, inputFd, inputSize, &options] { + asyncCompressChunks( + errorHolder, + outs, + executor, + inputFd, + inputSize, + options.numThreads, + options.determineParameters()); + }); + // Start writing + bytesWritten = + writeFile(errorHolder, outs, outputFd, options.pzstdHeaders); + } else { + // Add a job that reads the input and starts all the decompression jobs + executor.add([&errorHolder, &outs, &executor, inputFd] { + asyncDecompressFrames(errorHolder, outs, executor, inputFd); + }); + // Start writing + bytesWritten = writeFile( + errorHolder, outs, outputFd, /* writeSkippableFrames */ false); + } + } + return bytesWritten; +} + +/// Construct a `ZSTD_inBuffer` that points to the data in `buffer`. +static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) { + return ZSTD_inBuffer{buffer.data(), buffer.size(), 0}; +} + +/** + * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by + * `inBuffer.pos`. + */ +void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) { + auto pos = inBuffer.pos; + inBuffer.src = static_cast(inBuffer.src) + pos; + inBuffer.size -= pos; + inBuffer.pos = 0; + return buffer.advance(pos); +} + +/// Construct a `ZSTD_outBuffer` that points to the data in `buffer`. +static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) { + return ZSTD_outBuffer{buffer.data(), buffer.size(), 0}; +} + +/** + * Split `buffer` and advance `outBuffer` by the amount of data written, as + * indicated by `outBuffer.pos`. + */ +Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { + auto pos = outBuffer.pos; + outBuffer.dst = static_cast(outBuffer.dst) + pos; + outBuffer.size -= pos; + outBuffer.pos = 0; + return buffer.splitAt(pos); +} + +/** + * Stream chunks of input from `in`, compress it, and stream it out to `out`. + * + * @param errorHolder Used to report errors and check if an error occured + * @param in Queue that we `pop()` input buffers from + * @param out Queue that we `push()` compressed output buffers to + * @param maxInputSize An upper bound on the size of the input + * @param parameters The zstd parameters to use for compression + */ +static void compress( + ErrorHolder& errorHolder, + std::shared_ptr in, + std::shared_ptr out, + size_t maxInputSize, + ZSTD_parameters parameters) { + auto guard = makeScopeGuard([&] { out->finish(); }); + // Initialize the CCtx + std::unique_ptr ctx( + ZSTD_createCStream(), ZSTD_freeCStream); + if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) { + return; + } + { + auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + } + + // Allocate space for the result + auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize)); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + { + Buffer inBuffer; + // Read a buffer in from the input queue + while (in->pop(inBuffer) && !errorHolder.hasError()) { + auto zstdInBuffer = makeZstdInBuffer(inBuffer); + // Compress the whole buffer and send it to the output queue + while (!inBuffer.empty() && !errorHolder.hasError()) { + if (!errorHolder.check( + !outBuffer.empty(), "ZSTD_compressBound() was too small")) { + return; + } + // Compress + auto err = + ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + // Split the compressed data off outBuffer and pass to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + // Forget about the data we already compressed + advance(inBuffer, zstdInBuffer); + } + } + } + // Write the epilog + size_t bytesLeft; + do { + if (!errorHolder.check( + !outBuffer.empty(), "ZSTD_compressBound() was too small")) { + return; + } + bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer); + if (!errorHolder.check( + !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) { + return; + } + out->push(split(outBuffer, zstdOutBuffer)); + } while (bytesLeft != 0 && !errorHolder.hasError()); +} + +/** + * Calculates how large each independently compressed frame should be. + * + * @param size The size of the source if known, 0 otherwise + * @param numThreads The number of threads available to run compression jobs on + * @param params The zstd parameters to be used for compression + */ +static size_t +calculateStep(size_t size, size_t numThreads, const ZSTD_parameters& params) { + size_t step = 1ul << (params.cParams.windowLog + 2); + // If file size is known, see if a smaller step will spread work more evenly + if (size != 0) { + size_t newStep = size / numThreads; + if (newStep != 0) { + step = std::min(step, newStep); + } + } + return step; +} + +namespace { +enum class FileStatus { Continue, Done, Error }; +} // anonymous namespace + +/** + * Reads `size` data in chunks of `chunkSize` and puts it into `queue`. + * Will read less if an error or EOF occurs. + * Returns the status of the file after all of the reads have occurred. + */ +static FileStatus +readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { + Buffer buffer(size); + while (!buffer.empty()) { + auto bytesRead = + std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); + queue.push(buffer.splitAt(bytesRead)); + if (std::feof(fd)) { + return FileStatus::Done; + } else if (std::ferror(fd) || bytesRead == 0) { + return FileStatus::Error; + } + } + return FileStatus::Continue; +} + +void asyncCompressChunks( + ErrorHolder& errorHolder, + WorkQueue>& chunks, + ThreadPool& executor, + FILE* fd, + size_t size, + size_t numThreads, + ZSTD_parameters params) { + auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); + + // Break the input up into chunks of size `step` and compress each chunk + // independently. + size_t step = calculateStep(size, numThreads, params); + auto status = FileStatus::Continue; + while (status == FileStatus::Continue && !errorHolder.hasError()) { + // Make a new input queue that we will put the chunk's input data into. + auto in = std::make_shared(); + auto inGuard = makeScopeGuard([&] { in->finish(); }); + // Make a new output queue that compress will put the compressed data into. + auto out = std::make_shared(); + // Start compression in the thread pool + executor.add([&errorHolder, in, out, step, params] { + return compress( + errorHolder, std::move(in), std::move(out), step, params); + }); + // Pass the output queue to the writer thread. + chunks.push(std::move(out)); + // Fill the input queue for the compression job we just started + status = readData(*in, ZSTD_CStreamInSize(), step, fd); + } + errorHolder.check(status != FileStatus::Error, "Error reading input"); +} + +/** + * Decompress a frame, whose data is streamed into `in`, and stream the output + * to `out`. + * + * @param errorHolder Used to report errors and check if an error occured + * @param in Queue that we `pop()` input buffers from. It contains + * exactly one compressed frame. + * @param out Queue that we `push()` decompressed output buffers to + */ +static void decompress( + ErrorHolder& errorHolder, + std::shared_ptr in, + std::shared_ptr out) { + auto guard = makeScopeGuard([&] { out->finish(); }); + // Initialize the DCtx + std::unique_ptr ctx( + ZSTD_createDStream(), ZSTD_freeDStream); + if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) { + return; + } + { + auto err = ZSTD_initDStream(ctx.get()); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + } + + const size_t outSize = ZSTD_DStreamOutSize(); + Buffer inBuffer; + size_t returnCode = 0; + // Read a buffer in from the input queue + while (in->pop(inBuffer) && !errorHolder.hasError()) { + auto zstdInBuffer = makeZstdInBuffer(inBuffer); + // Decompress the whole buffer and send it to the output queue + while (!inBuffer.empty() && !errorHolder.hasError()) { + // Allocate a buffer with at least outSize bytes. + Buffer outBuffer(outSize); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + // Decompress + returnCode = + ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check( + !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { + return; + } + // Pass the buffer with the decompressed data to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + // Advance past the input we already read + advance(inBuffer, zstdInBuffer); + if (returnCode == 0) { + // The frame is over, prepare to (maybe) start a new frame + ZSTD_initDStream(ctx.get()); + } + } + } + if (!errorHolder.check(returnCode <= 1, "Incomplete block")) { + return; + } + // We've given ZSTD_decompressStream all of our data, but there may still + // be data to read. + while (returnCode == 1) { + // Allocate a buffer with at least outSize bytes. + Buffer outBuffer(outSize); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + // Pass in no input. + ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0}; + // Decompress + returnCode = + ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check( + !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { + return; + } + // Pass the buffer with the decompressed data to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + } +} + +void asyncDecompressFrames( + ErrorHolder& errorHolder, + WorkQueue>& frames, + ThreadPool& executor, + FILE* fd) { + auto framesGuard = makeScopeGuard([&] { frames.finish(); }); + // Split the source up into its component frames. + // If we find our recognized skippable frame we know the next frames size + // which means that we can decompress each standard frame in independently. + // Otherwise, we will decompress using only one decompression task. + const size_t chunkSize = ZSTD_DStreamInSize(); + auto status = FileStatus::Continue; + while (status == FileStatus::Continue && !errorHolder.hasError()) { + // Make a new input queue that we will put the frames's bytes into. + auto in = std::make_shared(); + auto inGuard = makeScopeGuard([&] { in->finish(); }); + // Make a output queue that decompress will put the decompressed data into + auto out = std::make_shared(); + + size_t frameSize; + { + // Calculate the size of the next frame. + // frameSize is 0 if the frame info can't be decoded. + Buffer buffer(SkippableFrame::kSize); + auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + if (bytesRead == 0 && status != FileStatus::Continue) { + break; + } + buffer.subtract(buffer.size() - bytesRead); + frameSize = SkippableFrame::tryRead(buffer.range()); + in->push(std::move(buffer)); + } + // Start decompression in the thread pool + executor.add([&errorHolder, in, out] { + return decompress(errorHolder, std::move(in), std::move(out)); + }); + // Pass the output queue to the writer thread + frames.push(std::move(out)); + if (frameSize == 0) { + // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted + // Pass the rest of the source to this decompression task + while (status == FileStatus::Continue && !errorHolder.hasError()) { + status = readData(*in, chunkSize, chunkSize, fd); + } + break; + } + // Fill the input queue for the decompression job we just started + status = readData(*in, chunkSize, frameSize, fd); + } + errorHolder.check(status != FileStatus::Error, "Error reading input"); +} + +/// Write `data` to `fd`, returns true iff success. +static bool writeData(ByteRange data, FILE* fd) { + while (!data.empty()) { + data.advance(std::fwrite(data.begin(), 1, data.size(), fd)); + if (std::ferror(fd)) { + return false; + } + } + return true; +} + +size_t writeFile( + ErrorHolder& errorHolder, + WorkQueue>& outs, + FILE* outputFd, + bool writeSkippableFrames) { + size_t bytesWritten = 0; + std::shared_ptr out; + // Grab the output queue for each decompression job (in order). + while (outs.pop(out) && !errorHolder.hasError()) { + if (writeSkippableFrames) { + // If we are compressing and want to write skippable frames we can't + // start writing before compression is done because we need to know the + // compressed size. + // Wait for the compressed size to be available and write skippable frame + SkippableFrame frame(out->size()); + if (!writeData(frame.data(), outputFd)) { + errorHolder.setError("Failed to write output"); + return bytesWritten; + } + bytesWritten += frame.kSize; + } + // For each chunk of the frame: Pop it from the queue and write it + Buffer buffer; + while (out->pop(buffer) && !errorHolder.hasError()) { + if (!writeData(buffer.range(), outputFd)) { + errorHolder.setError("Failed to write output"); + return bytesWritten; + } + bytesWritten += buffer.size(); + } + } + return bytesWritten; +} +} diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h new file mode 100644 index 000000000..617aecb3f --- /dev/null +++ b/contrib/pzstd/Pzstd.h @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "ErrorHolder.h" +#include "Options.h" +#include "utils/Buffer.h" +#include "utils/Range.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd.h" +#undef ZSTD_STATIC_LINKING_ONLY + +#include +#include + +namespace pzstd { +/** + * Runs pzstd with `options` and returns the number of bytes written. + * An error occurred if `errorHandler.hasError()`. + * + * @param options The pzstd options to use for (de)compression + * @param errorHolder Used to report errors and coordinate early shutdown + * if an error occured + * @returns The number of bytes written. + */ +std::size_t pzstdMain(const Options& options, ErrorHolder& errorHolder); + +/** + * Streams input from `fd`, breaks input up into chunks, and compresses each + * chunk independently. Output of each chunk gets streamed to a queue, and + * the output queues get put into `chunks` in order. + * + * @param errorHolder Used to report errors and coordinate early shutdown + * @param chunks Each compression jobs output queue gets `pushed()` here + * as soon as it is available + * @param executor The thread pool to run compression jobs in + * @param fd The input file descriptor + * @param size The size of the input file if known, 0 otherwise + * @param numThreads The number of threads in the thread pool + * @param parameters The zstd parameters to use for compression + */ +void asyncCompressChunks( + ErrorHolder& errorHolder, + WorkQueue>& chunks, + ThreadPool& executor, + FILE* fd, + std::size_t size, + std::size_t numThreads, + ZSTD_parameters parameters); + +/** + * Streams input from `fd`. If pzstd headers are available it breaks the input + * up into independent frames. It sends each frame to an independent + * decompression job. Output of each frame gets streamed to a queue, and + * the output queues get put into `frames` in order. + * + * @param errorHolder Used to report errors and coordinate early shutdown + * @param frames Each decompression jobs output queue gets `pushed()` here + * as soon as it is available + * @param executor The thread pool to run compression jobs in + * @param fd The input file descriptor + */ +void asyncDecompressFrames( + ErrorHolder& errorHolder, + WorkQueue>& frames, + ThreadPool& executor, + FILE* fd); + +/** + * Streams input in from each queue in `outs` in order, and writes the data to + * `outputFd`. + * + * @param errorHolder Used to report errors and coordinate early exit + * @param outs A queue of output queues, one for each + * (de)compression job. + * @param outputFd The file descriptor to write to + * @param writeSkippableFrames Should we write pzstd headers? + * @returns The number of bytes written + */ +std::size_t writeFile( + ErrorHolder& errorHolder, + WorkQueue>& outs, + FILE* outputFd, + bool writeSkippableFrames); +} diff --git a/contrib/pzstd/README.md b/contrib/pzstd/README.md new file mode 100644 index 000000000..1a5a0105d --- /dev/null +++ b/contrib/pzstd/README.md @@ -0,0 +1,47 @@ +# Parallel Zstandard (PZstandard) + +Parallel Zstandard provides Zstandard format compatible compression and decompression that is able to utilize multiple cores. +It breaks the input up into equal sized chunks and compresses each chunk independently into a Zstandard frame. +It then concatenates the frames together to produce the final compressed output. +Optionally, with the `-p` option, PZstandard will write a 12 byte header for each frame that is a skippable frame in the Zstandard format, which tells PZstandard the size of the next compressed frame. +When `-p` is specified for compression, PZstandard can decompress the output in parallel. + +## Usage + +Basic usage + + pzstd input-file -o output-file -n num-threads [ -p ] -# # Compression + pzstd -d input-file -o output-file -n num-threads # Decompression + +PZstandard also supports piping and fifo pipes + + cat input-file | pzstd -n num-threads [ -p ] -# -c > /dev/null + +For more options + + pzstd --help + +## Benchmarks + +As a reference, PZstandard and Pigz were compared on an Intel Core i7 @ 3.1 GHz, each using 4 threads, with the [Silesia compression corpus](http://sun.aei.polsl.pl/~sdeor/index.php?page=silesia). + +Compression Speed vs Ratio with 4 Threads | Decompression Speed with 4 Threads +------------------------------------------|----------------------------------- +![Compression Speed vs Ratio](images/Cspeed.png "Compression Speed vs Ratio") | ![Decompression Speed](images/Dspeed.png "Decompression Speed") + +The test procedure was to run each of the following commands 2 times for each compression level, and take the minimum time. + + time ./pzstd -# -n 4 -p -c silesia.tar > silesia.tar.zst + time ./pzstd -d -n 4 -c silesia.tar.zst > /dev/null + + time pigz -# -p 4 -k -c silesia.tar > silesia.tar.gz + time pigz -d -p 4 -k -c silesia.tar.gz > /dev/null + +PZstandard was tested using compression levels 1-19, and Pigz was tested using compression levels 1-9. +Pigz cannot do parallel decompression, it simply does each of reading, decompression, and writing on separate threads. + +## Tests + +Tests require that you have [gtest](https://github.com/google/googletest) installed. +Modify `GTEST_INC` and `GTEST_LIB` in `test/Makefile` and `utils/test/Makefile` to work for your install of gtest. +Then run `make test` in the `contrib/pzstd` directory. diff --git a/contrib/pzstd/SkippableFrame.cpp b/contrib/pzstd/SkippableFrame.cpp new file mode 100644 index 000000000..20ad4cc8e --- /dev/null +++ b/contrib/pzstd/SkippableFrame.cpp @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "SkippableFrame.h" +#include "common/mem.h" +#include "utils/Range.h" + +#include + +using namespace pzstd; + +SkippableFrame::SkippableFrame(std::uint32_t size) : frameSize_(size) { + MEM_writeLE32(data_.data(), kSkippableFrameMagicNumber); + MEM_writeLE32(data_.data() + 4, kFrameContentsSize); + MEM_writeLE32(data_.data() + 8, frameSize_); +} + +/* static */ std::size_t SkippableFrame::tryRead(ByteRange bytes) { + if (bytes.size() < SkippableFrame::kSize || + MEM_readLE32(bytes.begin()) != kSkippableFrameMagicNumber || + MEM_readLE32(bytes.begin() + 4) != kFrameContentsSize) { + return 0; + } + return MEM_readLE32(bytes.begin() + 8); +} diff --git a/contrib/pzstd/SkippableFrame.h b/contrib/pzstd/SkippableFrame.h new file mode 100644 index 000000000..9dc95c1f5 --- /dev/null +++ b/contrib/pzstd/SkippableFrame.h @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "utils/Range.h" + +#include +#include +#include +#include + +namespace pzstd { +/** + * We put a skippable frame before each frame. + * It contains a skippable frame magic number, the size of the skippable frame, + * and the size of the next frame. + * Each skippable frame is exactly 12 bytes in little endian format. + * The first 8 bytes are for compatibility with the ZSTD format. + * If we have N threads, the output will look like + * + * [0x184D2A50|4|size1] [frame1 of size size1] + * [0x184D2A50|4|size2] [frame2 of size size2] + * ... + * [0x184D2A50|4|sizeN] [frameN of size sizeN] + * + * Each sizeX is 4 bytes. + * + * These skippable frames should allow us to skip through the compressed file + * and only load at most N pages. + */ +class SkippableFrame { + public: + static constexpr std::size_t kSize = 12; + + private: + std::uint32_t frameSize_; + std::array data_; + static constexpr std::uint32_t kSkippableFrameMagicNumber = 0x184D2A50; + // Could be improved if the size fits in less bytes + static constexpr std::uint32_t kFrameContentsSize = kSize - 8; + + public: + // Write the skippable frame to data_ in LE format. + explicit SkippableFrame(std::uint32_t size); + + // Read the skippable frame from bytes in LE format. + static std::size_t tryRead(ByteRange bytes); + + ByteRange data() const { + return {data_.data(), data_.size()}; + } + + // Size of the next frame. + std::size_t frameSize() const { + return frameSize_; + } +}; +} diff --git a/contrib/pzstd/bench.cpp b/contrib/pzstd/bench.cpp new file mode 100644 index 000000000..56bad3915 --- /dev/null +++ b/contrib/pzstd/bench.cpp @@ -0,0 +1,146 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "ErrorHolder.h" +#include "Options.h" +#include "Pzstd.h" +#include "utils/FileSystem.h" +#include "utils/Range.h" +#include "utils/ScopeGuard.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" + +#include +#include +#include + +using namespace pzstd; + +namespace { +// Prints how many ns it was in scope for upon destruction +// Used for rough estimates of how long things took +struct BenchmarkTimer { + using Clock = std::chrono::system_clock; + Clock::time_point start; + FILE* fd; + + explicit BenchmarkTimer(FILE* fd = stdout) : fd(fd) { + start = Clock::now(); + } + + ~BenchmarkTimer() { + auto end = Clock::now(); + size_t ticks = + std::chrono::duration_cast(end - start) + .count(); + ticks = std::max(ticks, size_t{1}); + for (auto tmp = ticks; tmp < 100000; tmp *= 10) { + std::fprintf(fd, " "); + } + std::fprintf(fd, "%zu | ", ticks); + } +}; +} + +// Code I used for benchmarking + +void testMain(const Options& options) { + if (!options.decompress) { + if (options.compressionLevel < 10) { + std::printf("0"); + } + std::printf("%u | ", options.compressionLevel); + } else { + std::printf(" d | "); + } + if (options.numThreads < 10) { + std::printf("0"); + } + std::printf("%u | ", options.numThreads); + + FILE* inputFd = std::fopen(options.inputFile.c_str(), "rb"); + if (inputFd == nullptr) { + std::abort(); + } + size_t inputSize = 0; + if (inputFd != stdin) { + std::error_code ec; + inputSize = file_size(options.inputFile, ec); + if (ec) { + inputSize = 0; + } + } + FILE* outputFd = std::fopen(options.outputFile.c_str(), "wb"); + if (outputFd == nullptr) { + std::abort(); + } + auto guard = makeScopeGuard([&] { + std::fclose(inputFd); + std::fclose(outputFd); + }); + + WorkQueue> outs; + ErrorHolder errorHolder; + size_t bytesWritten; + { + ThreadPool executor(options.numThreads); + BenchmarkTimer timeIncludingClose; + if (!options.decompress) { + executor.add( + [&errorHolder, &outs, &executor, inputFd, inputSize, &options] { + asyncCompressChunks( + errorHolder, + outs, + executor, + inputFd, + inputSize, + options.numThreads, + options.determineParameters()); + }); + bytesWritten = writeFile(errorHolder, outs, outputFd, true); + } else { + executor.add([&errorHolder, &outs, &executor, inputFd] { + asyncDecompressFrames(errorHolder, outs, executor, inputFd); + }); + bytesWritten = writeFile( + errorHolder, outs, outputFd, /* writeSkippableFrames */ false); + } + } + if (errorHolder.hasError()) { + std::fprintf(stderr, "Error: %s.\n", errorHolder.getError().c_str()); + std::abort(); + } + std::printf("%zu\n", bytesWritten); +} + +int main(int argc, const char** argv) { + if (argc < 3) { + return 1; + } + Options options(0, 23, 0, false, "", "", true, true); + // Benchmarking code + for (size_t i = 0; i < 2; ++i) { + for (size_t compressionLevel = 1; compressionLevel <= 16; + compressionLevel <<= 1) { + for (size_t numThreads = 1; numThreads <= 16; numThreads <<= 1) { + options.numThreads = numThreads; + options.compressionLevel = compressionLevel; + options.decompress = false; + options.inputFile = argv[1]; + options.outputFile = argv[2]; + testMain(options); + options.decompress = true; + options.inputFile = argv[2]; + options.outputFile = std::string(argv[1]) + ".d"; + testMain(options); + std::fflush(stdout); + } + } + } + return 0; +} diff --git a/contrib/pzstd/images/Cspeed.png b/contrib/pzstd/images/Cspeed.png new file mode 100644 index 000000000..516d09807 Binary files /dev/null and b/contrib/pzstd/images/Cspeed.png differ diff --git a/contrib/pzstd/images/Dspeed.png b/contrib/pzstd/images/Dspeed.png new file mode 100644 index 000000000..e48881bcd Binary files /dev/null and b/contrib/pzstd/images/Dspeed.png differ diff --git a/contrib/pzstd/main.cpp b/contrib/pzstd/main.cpp new file mode 100644 index 000000000..7ff2cef74 --- /dev/null +++ b/contrib/pzstd/main.cpp @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "ErrorHolder.h" +#include "Options.h" +#include "Pzstd.h" +#include "utils/FileSystem.h" +#include "utils/Range.h" +#include "utils/ScopeGuard.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" + +using namespace pzstd; + +int main(int argc, const char** argv) { + Options options; + if (!options.parse(argc, argv)) { + return 1; + } + + ErrorHolder errorHolder; + pzstdMain(options, errorHolder); + + if (errorHolder.hasError()) { + std::fprintf(stderr, "Error: %s.\n", errorHolder.getError().c_str()); + return 1; + } + return 0; +} diff --git a/contrib/pzstd/test/Makefile b/contrib/pzstd/test/Makefile new file mode 100644 index 000000000..3b0ffec89 --- /dev/null +++ b/contrib/pzstd/test/Makefile @@ -0,0 +1,46 @@ +# ########################################################################## +# Copyright (c) 2016-present, Facebook, Inc. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. +# ########################################################################## + +# Set GTEST_INC and GTEST_LIB to work with your install of gtest +GTEST_INC ?= -isystem googletest/googletest/include +GTEST_LIB ?= -L googletest/build/googlemock/gtest + +# Define *.exe as extension for Windows systems +ifneq (,$(filter Windows%,$(OS))) +EXT =.exe +else +EXT = +endif + +PZSTDDIR = .. +PROGDIR = ../../../programs +ZSTDDIR = ../../../lib + +CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB) -I$(ZSTDDIR)/common -I$(PROGDIR) + +CFLAGS ?= -O3 +CFLAGS += -std=c++11 +CFLAGS += $(MOREFLAGS) +FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) + +datagen.o: $(PROGDIR)/datagen.* + $(CXX) $(FLAGS) $(PROGDIR)/datagen.c -c -o $@ + +%: %.cpp *.h datagen.o + $(CXX) $(FLAGS) -lgtest -lgtest_main $@.cpp datagen.o $(PZSTDDIR)/libzstd.a $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o -o $@$(EXT) + +.PHONY: test clean + +test: OptionsTest PzstdTest RoundTripTest + @./OptionsTest$(EXT) + @./PzstdTest$(EXT) + @./RoundTripTest$(EXT) + +clean: + @rm -f datagen.o OptionsTest PzstdTest RoundTripTest diff --git a/contrib/pzstd/test/OptionsTest.cpp b/contrib/pzstd/test/OptionsTest.cpp new file mode 100644 index 000000000..1479d6cd2 --- /dev/null +++ b/contrib/pzstd/test/OptionsTest.cpp @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "Options.h" + +#include +#include + +using namespace pzstd; + +namespace pzstd { +bool operator==(const Options& lhs, const Options& rhs) { + return lhs.numThreads == rhs.numThreads && + lhs.maxWindowLog == rhs.maxWindowLog && + lhs.compressionLevel == rhs.compressionLevel && + lhs.decompress == rhs.decompress && lhs.inputFile == rhs.inputFile && + lhs.outputFile == rhs.outputFile && lhs.overwrite == rhs.overwrite && + lhs.pzstdHeaders == rhs.pzstdHeaders; +} +} + +TEST(Options, ValidInputs) { + { + Options options; + std::array args = { + {nullptr, "--num-threads", "5", "-o", "-", "-f"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {5, 23, 3, false, "-", "-", true, false}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = { + {nullptr, "-n", "1", "input", "-19", "-p"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {1, 23, 19, false, "input", "input.zst", false, true}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = {{nullptr, + "--ultra", + "-22", + "-n", + "1", + "--output", + "x", + "-d", + "x.zst", + "-f"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {1, 0, 22, true, "x.zst", "x", true, false}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = {{nullptr, + "--num-threads", + "100", + "hello.zst", + "--decompress", + "--force"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {100, 23, 3, true, "hello.zst", "hello", true, false}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = {{nullptr, "-", "-n", "1", "-c"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {1, 23, 3, false, "-", "-", false, false}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = {{nullptr, "-", "-n", "1", "--stdout"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {1, 23, 3, false, "-", "-", false, false}; + EXPECT_EQ(expected, options); + } + { + Options options; + std::array args = {{nullptr, + "-n", + "1", + "-", + "-5", + "-o", + "-", + "-u", + "-d", + "--pzstd-headers"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = {1, 0, 5, true, "-", "-", false, true}; + } + { + Options options; + std::array args = { + {nullptr, "silesia.tar", "-o", "silesia.tar.pzstd", "-n", "2"}}; + EXPECT_TRUE(options.parse(args.size(), args.data())); + Options expected = { + 2, 23, 3, false, "silesia.tar", "silesia.tar.pzstd", false, false}; + } +} + +TEST(Options, BadNumThreads) { + { + Options options; + std::array args = {{nullptr, "-o", "-"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "-n", "0", "-o", "-"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "-n", "-o", "-"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, BadCompressionLevel) { + { + Options options; + std::array args = {{nullptr, "x", "-20"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "x", "-u", "-23"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, InvalidOption) { + { + Options options; + std::array args = {{nullptr, "x", "-x"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, BadOutputFile) { + { + Options options; + std::array args = {{nullptr, "notzst", "-d", "-n", "1"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "-n", "1"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "-", "-n", "1"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, Extras) { + { + Options options; + std::array args = {{nullptr, "-h"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } + { + Options options; + std::array args = {{nullptr, "-V"}}; + EXPECT_FALSE(options.parse(args.size(), args.data())); + } +} diff --git a/contrib/pzstd/test/PzstdTest.cpp b/contrib/pzstd/test/PzstdTest.cpp new file mode 100644 index 000000000..a6eb74596 --- /dev/null +++ b/contrib/pzstd/test/PzstdTest.cpp @@ -0,0 +1,112 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "datagen.h" +#include "Pzstd.h" +#include "test/RoundTrip.h" +#include "utils/ScopeGuard.h" + +#include +#include +#include +#include + +using namespace std; +using namespace pzstd; + +TEST(Pzstd, SmallSizes) { + for (unsigned len = 1; len < 1028; ++len) { + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + static uint8_t buf[1028]; + RDG_genBuffer(buf, len, 0.5, 0.0, 42); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf, 1, len, fd); + std::fclose(fd); + ASSERT_EQ(written, len); + } + for (unsigned headers = 0; headers <= 1; ++headers) { + for (unsigned numThreads = 1; numThreads <= 4; numThreads *= 2) { + for (unsigned level = 1; level <= 8; level *= 8) { + auto errorGuard = makeScopeGuard([&] { + guard.dismiss(); + std::fprintf(stderr, "file: %s\n", inputFile.c_str()); + std::fprintf(stderr, "pzstd headers: %u\n", headers); + std::fprintf(stderr, "# threads: %u\n", numThreads); + std::fprintf(stderr, "compression level: %u\n", level); + }); + Options options; + options.pzstdHeaders = headers; + options.overwrite = true; + options.inputFile = inputFile; + options.numThreads = numThreads; + options.compressionLevel = level; + ASSERT_TRUE(roundTrip(options)); + errorGuard.dismiss(); + } + } + } + } +} + +TEST(Pzstd, LargeSizes) { + for (unsigned len = 1 << 20; len <= (1 << 24); len *= 2) { + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + std::unique_ptr buf(new uint8_t[len]); + RDG_genBuffer(buf.get(), len, 0.5, 0.0, 42); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf.get(), 1, len, fd); + std::fclose(fd); + ASSERT_EQ(written, len); + } + for (unsigned headers = 0; headers <= 1; ++headers) { + for (unsigned numThreads = 1; numThreads <= 16; numThreads *= 4) { + for (unsigned level = 1; level <= 4; level *= 2) { + auto errorGuard = makeScopeGuard([&] { + guard.dismiss(); + std::fprintf(stderr, "file: %s\n", inputFile.c_str()); + std::fprintf(stderr, "pzstd headers: %u\n", headers); + std::fprintf(stderr, "# threads: %u\n", numThreads); + std::fprintf(stderr, "compression level: %u\n", level); + }); + Options options; + options.pzstdHeaders = headers; + options.overwrite = true; + options.inputFile = inputFile; + options.numThreads = numThreads; + options.compressionLevel = level; + ASSERT_TRUE(roundTrip(options)); + errorGuard.dismiss(); + } + } + } + } +} + +TEST(Pzstd, ExtremelyCompressible) { + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + std::unique_ptr buf(new uint8_t[10000]); + std::memset(buf.get(), 'a', 10000); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf.get(), 1, 10000, fd); + std::fclose(fd); + ASSERT_EQ(written, 10000); + } + Options options; + options.pzstdHeaders = false; + options.overwrite = true; + options.inputFile = inputFile; + options.numThreads = 1; + options.compressionLevel = 1; + ASSERT_TRUE(roundTrip(options)); +} diff --git a/contrib/pzstd/test/RoundTrip.h b/contrib/pzstd/test/RoundTrip.h new file mode 100644 index 000000000..829c95cac --- /dev/null +++ b/contrib/pzstd/test/RoundTrip.h @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "Options.h" +#include "Pzstd.h" +#include "utils/ScopeGuard.h" + +#include +#include +#include +#include + +namespace pzstd { + +inline bool check(std::string source, std::string decompressed) { + std::unique_ptr sBuf(new std::uint8_t[1024]); + std::unique_ptr dBuf(new std::uint8_t[1024]); + + auto sFd = std::fopen(source.c_str(), "rb"); + auto dFd = std::fopen(decompressed.c_str(), "rb"); + auto guard = makeScopeGuard([&] { + std::fclose(sFd); + std::fclose(dFd); + }); + + size_t sRead, dRead; + + do { + sRead = std::fread(sBuf.get(), 1, 1024, sFd); + dRead = std::fread(dBuf.get(), 1, 1024, dFd); + if (std::ferror(sFd) || std::ferror(dFd)) { + return false; + } + if (sRead != dRead) { + return false; + } + + for (size_t i = 0; i < sRead; ++i) { + if (sBuf.get()[i] != dBuf.get()[i]) { + return false; + } + } + } while (sRead == 1024); + if (!std::feof(sFd) || !std::feof(dFd)) { + return false; + } + return true; +} + +inline bool roundTrip(Options& options) { + std::string source = options.inputFile; + std::string compressedFile = std::tmpnam(nullptr); + std::string decompressedFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { + std::remove(compressedFile.c_str()); + std::remove(decompressedFile.c_str()); + }); + + { + options.outputFile = compressedFile; + options.decompress = false; + ErrorHolder errorHolder; + pzstdMain(options, errorHolder); + if (errorHolder.hasError()) { + errorHolder.getError(); + return false; + } + } + { + options.decompress = true; + options.inputFile = compressedFile; + options.outputFile = decompressedFile; + ErrorHolder errorHolder; + pzstdMain(options, errorHolder); + if (errorHolder.hasError()) { + errorHolder.getError(); + return false; + } + } + return check(source, decompressedFile); +} +} diff --git a/contrib/pzstd/test/RoundTripTest.cpp b/contrib/pzstd/test/RoundTripTest.cpp new file mode 100644 index 000000000..01c1c8113 --- /dev/null +++ b/contrib/pzstd/test/RoundTripTest.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "datagen.h" +#include "Options.h" +#include "test/RoundTrip.h" +#include "utils/ScopeGuard.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace pzstd; + +namespace { +string +writeData(size_t size, double matchProba, double litProba, unsigned seed) { + std::unique_ptr buf(new uint8_t[size]); + RDG_genBuffer(buf.get(), size, matchProba, litProba, seed); + string file = tmpnam(nullptr); + auto fd = std::fopen(file.c_str(), "wb"); + auto guard = makeScopeGuard([&] { std::fclose(fd); }); + auto bytesWritten = std::fwrite(buf.get(), 1, size, fd); + if (bytesWritten != size) { + std::abort(); + } + return file; +} + +template +string generateInputFile(Generator& gen) { + // Use inputs ranging from 1 Byte to 2^16 Bytes + std::uniform_int_distribution size{1, 1 << 16}; + std::uniform_real_distribution<> prob{0, 1}; + return writeData(size(gen), prob(gen), prob(gen), gen()); +} + +template +Options generateOptions(Generator& gen, const string& inputFile) { + Options options; + options.inputFile = inputFile; + options.overwrite = true; + + std::bernoulli_distribution pzstdHeaders{0.75}; + std::uniform_int_distribution numThreads{1, 32}; + std::uniform_int_distribution compressionLevel{1, 10}; + + options.pzstdHeaders = pzstdHeaders(gen); + options.numThreads = numThreads(gen); + options.compressionLevel = compressionLevel(gen); + + return options; +} +} + +int main(int argc, char** argv) { + std::mt19937 gen(std::random_device{}()); + + auto newlineGuard = makeScopeGuard([] { std::fprintf(stderr, "\n"); }); + for (unsigned i = 0; i < 10000; ++i) { + if (i % 100 == 0) { + std::fprintf(stderr, "Progress: %u%%\r", i / 100); + } + auto inputFile = generateInputFile(gen); + auto inputGuard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + for (unsigned i = 0; i < 10; ++i) { + auto options = generateOptions(gen, inputFile); + if (!roundTrip(options)) { + std::fprintf(stderr, "numThreads: %u\n", options.numThreads); + std::fprintf(stderr, "level: %u\n", options.compressionLevel); + std::fprintf(stderr, "decompress? %u\n", (unsigned)options.decompress); + std::fprintf( + stderr, "pzstd headers? %u\n", (unsigned)options.pzstdHeaders); + std::fprintf(stderr, "file: %s\n", inputFile.c_str()); + return 1; + } + } + } + return 0; +} diff --git a/contrib/pzstd/utils/Buffer.h b/contrib/pzstd/utils/Buffer.h new file mode 100644 index 000000000..ab25bac9c --- /dev/null +++ b/contrib/pzstd/utils/Buffer.h @@ -0,0 +1,99 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "utils/Range.h" + +#include +#include +#include + +namespace pzstd { + +/** + * A `Buffer` has a pointer to a shared buffer, and a range of the buffer that + * it owns. + * The idea is that you can allocate one buffer, and write chunks into it + * and break off those chunks. + * The underlying buffer is reference counted, and will be destroyed when all + * `Buffer`s that reference it are destroyed. + */ +class Buffer { + std::shared_ptr buffer_; + MutableByteRange range_; + + static void delete_buffer(unsigned char* buffer) { + delete[] buffer; + } + + public: + /// Construct an empty buffer that owns no data. + explicit Buffer() {} + + /// Construct a `Buffer` that owns a new underlying buffer of size `size`. + explicit Buffer(std::size_t size) + : buffer_(new unsigned char[size], delete_buffer), + range_(buffer_.get(), buffer_.get() + size) {} + + explicit Buffer(std::shared_ptr buffer, MutableByteRange data) + : buffer_(buffer), range_(data) {} + + Buffer(Buffer&&) = default; + Buffer& operator=(Buffer&&) & = default; + + /** + * Splits the data into two pieces: [begin, begin + n), [begin + n, end). + * Their data both points into the same underlying buffer. + * Modifies the original `Buffer` to point to only [begin + n, end). + * + * @param n The offset to split at. + * @returns A buffer that owns the data [begin, begin + n). + */ + Buffer splitAt(std::size_t n) { + auto firstPiece = range_.subpiece(0, n); + range_.advance(n); + return Buffer(buffer_, firstPiece); + } + + /// Modifies the buffer to point to the range [begin + n, end). + void advance(std::size_t n) { + range_.advance(n); + } + + /// Modifies the buffer to point to the range [begin, end - n). + void subtract(std::size_t n) { + range_.subtract(n); + } + + /// Returns a read only `Range` pointing to the `Buffer`s data. + ByteRange range() const { + return range_; + } + /// Returns a mutable `Range` pointing to the `Buffer`s data. + MutableByteRange range() { + return range_; + } + + const unsigned char* data() const { + return range_.data(); + } + + unsigned char* data() { + return range_.data(); + } + + std::size_t size() const { + return range_.size(); + } + + bool empty() const { + return range_.empty(); + } +}; +} diff --git a/contrib/pzstd/utils/FileSystem.h b/contrib/pzstd/utils/FileSystem.h new file mode 100644 index 000000000..deae0b5b7 --- /dev/null +++ b/contrib/pzstd/utils/FileSystem.h @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "utils/Range.h" + +#include +#include +#include + +// A small subset of `std::filesystem`. +// `std::filesystem` should be a drop in replacement. +// See http://en.cppreference.com/w/cpp/filesystem for documentation. + +namespace pzstd { + +using file_status = struct stat; + +/// http://en.cppreference.com/w/cpp/filesystem/status +inline file_status status(StringPiece path, std::error_code& ec) noexcept { + file_status status; + if (stat(path.data(), &status)) { + ec.assign(errno, std::generic_category()); + } else { + ec.clear(); + } + return status; +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(file_status status) noexcept { + return S_ISREG(status.st_mode); +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(StringPiece path, std::error_code& ec) noexcept { + return is_regular_file(status(path, ec)); +} + +/// http://en.cppreference.com/w/cpp/filesystem/file_size +inline std::uintmax_t file_size( + StringPiece path, + std::error_code& ec) noexcept { + auto stat = status(path, ec); + if (ec) { + return -1; + } + if (!is_regular_file(stat)) { + ec.assign(ENOTSUP, std::generic_category()); + return -1; + } + ec.clear(); + return stat.st_size; +} +} diff --git a/contrib/pzstd/utils/Likely.h b/contrib/pzstd/utils/Likely.h new file mode 100644 index 000000000..c8ea102b1 --- /dev/null +++ b/contrib/pzstd/utils/Likely.h @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +/** + * Compiler hints to indicate the fast path of an "if" branch: whether + * the if condition is likely to be true or false. + * + * @author Tudor Bosman (tudorb@fb.com) + */ + +#pragma once + +#undef LIKELY +#undef UNLIKELY + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif diff --git a/contrib/pzstd/utils/Range.h b/contrib/pzstd/utils/Range.h new file mode 100644 index 000000000..3df15976d --- /dev/null +++ b/contrib/pzstd/utils/Range.h @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +/** + * A subset of `folly/Range.h`. + * All code copied verbatiam modulo formatting + */ +#pragma once + +#include "utils/Likely.h" + +#include +#include +#include +#include + +namespace pzstd { + +namespace detail { +/* + *Use IsCharPointer::type to enable const char* or char*. + *Use IsCharPointer::const_type to enable only const char*. +*/ +template +struct IsCharPointer {}; + +template <> +struct IsCharPointer { + typedef int type; +}; + +template <> +struct IsCharPointer { + typedef int const_type; + typedef int type; +}; + +} // namespace detail + +template +class Range { + Iter b_; + Iter e_; + + public: + using size_type = std::size_t; + using iterator = Iter; + using const_iterator = Iter; + using value_type = typename std::remove_reference< + typename std::iterator_traits::reference>::type; + using reference = typename std::iterator_traits::reference; + + constexpr Range() : b_(), e_() {} + constexpr Range(Iter begin, Iter end) : b_(begin), e_(end) {} + + constexpr Range(Iter begin, size_type size) : b_(begin), e_(begin + size) {} + + template ::type = 0> + /* implicit */ Range(Iter str) : b_(str), e_(str + std::strlen(str)) {} + + template ::const_type = 0> + /* implicit */ Range(const std::string& str) + : b_(str.data()), e_(b_ + str.size()) {} + + // Allow implicit conversion from Range to Range if From is + // implicitly convertible to To. + template < + class OtherIter, + typename std::enable_if< + (!std::is_same::value && + std::is_convertible::value), + int>::type = 0> + constexpr /* implicit */ Range(const Range& other) + : b_(other.begin()), e_(other.end()) {} + + Range(const Range&) = default; + Range(Range&&) = default; + + Range& operator=(const Range&) & = default; + Range& operator=(Range&&) & = default; + + constexpr size_type size() const { + return e_ - b_; + } + bool empty() const { + return b_ == e_; + } + Iter data() const { + return b_; + } + Iter begin() const { + return b_; + } + Iter end() const { + return e_; + } + + void advance(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + b_ += n; + } + + void subtract(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + e_ -= n; + } + + Range subpiece(size_type first, size_type length = std::string::npos) const { + if (UNLIKELY(first > size())) { + throw std::out_of_range("index out of range"); + } + + return Range(b_ + first, std::min(length, size() - first)); + } +}; + +using ByteRange = Range; +using MutableByteRange = Range; +using StringPiece = Range; +} diff --git a/contrib/pzstd/utils/ScopeGuard.h b/contrib/pzstd/utils/ScopeGuard.h new file mode 100644 index 000000000..5a333e0ab --- /dev/null +++ b/contrib/pzstd/utils/ScopeGuard.h @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include + +namespace pzstd { + +/** + * Dismissable scope guard. + * `Function` must be callable and take no parameters. + * Unless `dissmiss()` is called, the callable is executed upon destruction of + * `ScopeGuard`. + * + * Example: + * + * auto guard = makeScopeGuard([&] { cleanup(); }); + */ +template +class ScopeGuard { + Function function; + bool dismissed; + + public: + explicit ScopeGuard(Function&& function) + : function(std::move(function)), dismissed(false) {} + + void dismiss() { + dismissed = true; + } + + ~ScopeGuard() noexcept { + if (!dismissed) { + function(); + } + } +}; + +/// Creates a scope guard from `function`. +template +ScopeGuard makeScopeGuard(Function&& function) { + return ScopeGuard(std::forward(function)); +} +} diff --git a/contrib/pzstd/utils/ThreadPool.h b/contrib/pzstd/utils/ThreadPool.h new file mode 100644 index 000000000..a1d1fc0b9 --- /dev/null +++ b/contrib/pzstd/utils/ThreadPool.h @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "utils/WorkQueue.h" + +#include +#include +#include +#include + +namespace pzstd { +/// A simple thread pool that pulls tasks off its queue in FIFO order. +class ThreadPool { + std::vector threads_; + + WorkQueue> tasks_; + + public: + /// Constructs a thread pool with `numThreads` threads. + explicit ThreadPool(std::size_t numThreads) { + threads_.reserve(numThreads); + for (std::size_t i = 0; i < numThreads; ++i) { + threads_.emplace_back([&] { + std::function task; + while (tasks_.pop(task)) { + task(); + } + }); + } + } + + /// Finishes all tasks currently in the queue. + ~ThreadPool() { + tasks_.finish(); + for (auto& thread : threads_) { + thread.join(); + } + } + + /** + * Adds `task` to the queue of tasks to execute. Since `task` is a + * `std::function<>`, it cannot be a move only type. So any lambda passed must + * not capture move only types (like `std::unique_ptr`). + * + * @param task The task to execute. + */ + void add(std::function task) { + tasks_.push(std::move(task)); + } +}; +} diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h new file mode 100644 index 000000000..3d926cc80 --- /dev/null +++ b/contrib/pzstd/utils/WorkQueue.h @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include "utils/Buffer.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace pzstd { + +/// Unbounded thread-safe work queue. +template +class WorkQueue { + // Protects all member variable access + std::mutex mutex_; + std::condition_variable cv_; + + std::queue queue_; + bool done_; + + public: + /// Constructs an empty work queue. + WorkQueue() : done_(false) {} + + /** + * Push an item onto the work queue. Notify a single thread that work is + * available. If `finish()` has been called, do nothing and return false. + * + * @param item Item to push onto the queue. + * @returns True upon success, false if `finish()` has been called. An + * item was pushed iff `push()` returns true. + */ + bool push(T item) { + { + std::lock_guard lock(mutex_); + if (done_) { + return false; + } + queue_.push(std::move(item)); + } + cv_.notify_one(); + return true; + } + + /** + * Attempts to pop an item off the work queue. It will block until data is + * available or `finish()` has been called. + * + * @param[out] item If `pop` returns `true`, it contains the popped item. + * If `pop` returns `false`, it is unmodified. + * @returns True upon success. False if the queue is empty and + * `finish()` has been called. + */ + bool pop(T& item) { + std::unique_lock lock(mutex_); + while (queue_.empty() && !done_) { + cv_.wait(lock); + } + if (queue_.empty()) { + assert(done_); + return false; + } + item = std::move(queue_.front()); + queue_.pop(); + return true; + } + + /** + * Promise that `push()` won't be called again, so once the queue is empty + * there will never any more work. + */ + void finish() { + { + std::lock_guard lock(mutex_); + assert(!done_); + done_ = true; + } + cv_.notify_all(); + } + + /// Blocks until `finish()` has been called (but the queue may not be empty). + void waitUntilFinished() { + std::unique_lock lock(mutex_); + while (!done_) { + cv_.wait(lock); + // If we were woken by a push, we need to wake a thread waiting on pop(). + if (!done_) { + lock.unlock(); + cv_.notify_one(); + lock.lock(); + } + } + } +}; + +/// Work queue for `Buffer`s that knows the total number of bytes in the queue. +class BufferWorkQueue { + WorkQueue queue_; + std::atomic size_; + + public: + BufferWorkQueue() : size_(0) {} + + void push(Buffer buffer) { + size_.fetch_add(buffer.size()); + queue_.push(std::move(buffer)); + } + + bool pop(Buffer& buffer) { + bool result = queue_.pop(buffer); + if (result) { + size_.fetch_sub(buffer.size()); + } + return result; + } + + void finish() { + queue_.finish(); + } + + /** + * Blocks until `finish()` has been called. + * + * @returns The total number of bytes of all the `Buffer`s currently in the + * queue. + */ + std::size_t size() { + queue_.waitUntilFinished(); + return size_.load(); + } +}; +} diff --git a/contrib/pzstd/utils/test/BufferTest.cpp b/contrib/pzstd/utils/test/BufferTest.cpp new file mode 100644 index 000000000..66ec961e2 --- /dev/null +++ b/contrib/pzstd/utils/test/BufferTest.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/Buffer.h" +#include "utils/Range.h" + +#include +#include + +using namespace pzstd; + +namespace { +void deleter(const unsigned char* buf) { + delete[] buf; +} +} + +TEST(Buffer, Constructors) { + Buffer empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + Buffer sized(5); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer moved(std::move(sized)); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer assigned; + assigned = std::move(moved); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); +} + +TEST(Buffer, BufferManagement) { + std::shared_ptr buf(new unsigned char[10], deleter); + { + Buffer acquired(buf, MutableByteRange(buf.get(), buf.get() + 10)); + EXPECT_EQ(2, buf.use_count()); + Buffer moved(std::move(acquired)); + EXPECT_EQ(2, buf.use_count()); + Buffer assigned; + assigned = std::move(moved); + EXPECT_EQ(2, buf.use_count()); + + Buffer split = assigned.splitAt(5); + EXPECT_EQ(3, buf.use_count()); + + split.advance(1); + assigned.subtract(1); + EXPECT_EQ(3, buf.use_count()); + } + EXPECT_EQ(1, buf.use_count()); +} + +TEST(Buffer, Modifiers) { + Buffer buf(10); + { + unsigned char i = 0; + for (auto& byte : buf.range()) { + byte = i++; + } + } + + auto prefix = buf.splitAt(2); + + ASSERT_EQ(2, prefix.size()); + EXPECT_EQ(0, *prefix.data()); + + ASSERT_EQ(8, buf.size()); + EXPECT_EQ(2, *buf.data()); + + buf.advance(2); + EXPECT_EQ(4, *buf.data()); + + EXPECT_EQ(9, *(buf.range().end() - 1)); + + buf.subtract(2); + EXPECT_EQ(7, *(buf.range().end() - 1)); + + EXPECT_EQ(4, buf.size()); +} diff --git a/contrib/pzstd/utils/test/Makefile b/contrib/pzstd/utils/test/Makefile new file mode 100644 index 000000000..4c6906330 --- /dev/null +++ b/contrib/pzstd/utils/test/Makefile @@ -0,0 +1,41 @@ +# ########################################################################## +# Copyright (c) 2016-present, Facebook, Inc. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. +# ########################################################################## + +GTEST_INC ?= -isystem googletest/googletest/include +GTEST_LIB ?= -L googletest/build/googlemock/gtest + +# Define *.exe as extension for Windows systems +ifneq (,$(filter Windows%,$(OS))) +EXT =.exe +else +EXT = +endif + +PZSTDDIR = ../.. + +CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB) +CFLAGS ?= -O3 +CFLAGS += -std=c++11 +CFLAGS += $(MOREFLAGS) +FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) + +%: %.cpp + $(CXX) $(FLAGS) -lgtest -lgtest_main $^ -o $@$(EXT) + +.PHONY: test clean + +test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest + @./BufferTest$(EXT) + @./RangeTest$(EXT) + @./ScopeGuardTest$(EXT) + @./ThreadPoolTest$(EXT) + @./WorkQueueTest$(EXT) + +clean: + @rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest diff --git a/contrib/pzstd/utils/test/RangeTest.cpp b/contrib/pzstd/utils/test/RangeTest.cpp new file mode 100644 index 000000000..c761c8aff --- /dev/null +++ b/contrib/pzstd/utils/test/RangeTest.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/Range.h" + +#include +#include + +using namespace pzstd; + +// Range is directly copied from folly. +// Just some sanity tests to make sure everything seems to work. + +TEST(Range, Constructors) { + StringPiece empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + std::string str = "hello"; + { + Range piece(str.begin(), str.end()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.data(), str.size()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.c_str()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } +} + +TEST(Range, Modifiers) { + StringPiece range("hello world"); + ASSERT_EQ(11, range.size()); + + { + auto hello = range.subpiece(0, 5); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto hello = range; + hello.subtract(6); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto world = range; + world.advance(6); + EXPECT_EQ(5, world.size()); + EXPECT_EQ('w', *world.data()); + EXPECT_EQ('d', *(world.end() - 1)); + } + + std::string expected = "hello world"; + EXPECT_EQ(expected, std::string(range.begin(), range.end())); + EXPECT_EQ(expected, std::string(range.data(), range.size())); +} diff --git a/contrib/pzstd/utils/test/ScopeGuardTest.cpp b/contrib/pzstd/utils/test/ScopeGuardTest.cpp new file mode 100644 index 000000000..0c4dc0357 --- /dev/null +++ b/contrib/pzstd/utils/test/ScopeGuardTest.cpp @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/ScopeGuard.h" + +#include + +using namespace pzstd; + +TEST(ScopeGuard, Dismiss) { + { + auto guard = makeScopeGuard([&] { EXPECT_TRUE(false); }); + guard.dismiss(); + } +} + +TEST(ScopeGuard, Executes) { + bool executed = false; + { + auto guard = makeScopeGuard([&] { executed = true; }); + } + EXPECT_TRUE(executed); +} diff --git a/contrib/pzstd/utils/test/ThreadPoolTest.cpp b/contrib/pzstd/utils/test/ThreadPoolTest.cpp new file mode 100644 index 000000000..9b9868cb1 --- /dev/null +++ b/contrib/pzstd/utils/test/ThreadPoolTest.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/ThreadPool.h" + +#include +#include +#include +#include + +using namespace pzstd; + +TEST(ThreadPool, Ordering) { + std::vector results; + + { + ThreadPool executor(1); + for (int i = 0; i < 100; ++i) { + executor.add([ &results, i ] { results.push_back(i); }); + } + } + + for (int i = 0; i < 100; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(ThreadPool, AllJobsFinished) { + std::atomic numFinished{0}; + std::atomic start{false}; + { + ThreadPool executor(5); + for (int i = 0; i < 1000; ++i) { + executor.add([ &numFinished, &start ] { + while (!start.load()) { + // spin + } + ++numFinished; + }); + } + start.store(true); + } + EXPECT_EQ(1000, numFinished.load()); +} + +TEST(ThreadPool, AddJobWhileJoining) { + std::atomic done{false}; + { + ThreadPool executor(1); + executor.add([&executor, &done] { + while (!done.load()) { + std::this_thread::yield(); + } + // Sleep for a second to be sure that we are joining + std::this_thread::sleep_for(std::chrono::seconds(1)); + executor.add([] { + EXPECT_TRUE(false); + }); + }); + done.store(true); + } +} diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp new file mode 100644 index 000000000..1b548d160 --- /dev/null +++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -0,0 +1,176 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/Buffer.h" +#include "utils/WorkQueue.h" + +#include +#include +#include +#include + +using namespace pzstd; + +namespace { +struct Popper { + WorkQueue* queue; + int* results; + std::mutex* mutex; + + void operator()() { + int result; + while (queue->pop(result)) { + std::lock_guard lock(*mutex); + results[result] = result; + } + } +}; +} + +TEST(WorkQueue, SingleThreaded) { + WorkQueue queue; + int result; + + queue.push(5); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + + queue.push(1); + queue.push(2); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + + queue.push(1); + queue.push(2); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + EXPECT_FALSE(queue.pop(result)); + + queue.waitUntilFinished(); +} + +TEST(WorkQueue, SPSC) { + WorkQueue queue; + const int max = 100; + + for (int i = 0; i < 10; ++i) { + queue.push(i); + } + + std::thread thread([ &queue, max ] { + int result; + for (int i = 0;; ++i) { + if (!queue.pop(result)) { + EXPECT_EQ(i, max); + break; + } + EXPECT_EQ(i, result); + } + }); + + std::this_thread::yield(); + for (int i = 10; i < max; ++i) { + queue.push(i); + } + queue.finish(); + + thread.join(); +} + +TEST(WorkQueue, SPMC) { + WorkQueue queue; + std::vector results(10000, -1); + std::mutex mutex; + std::vector threads; + for (int i = 0; i < 100; ++i) { + threads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + for (int i = 0; i < 10000; ++i) { + queue.push(i); + } + queue.finish(); + + for (auto& thread : threads) { + thread.join(); + } + + for (int i = 0; i < 10000; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, MPMC) { + WorkQueue queue; + std::vector results(10000, -1); + std::mutex mutex; + std::vector popperThreads; + for (int i = 0; i < 100; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::vector pusherThreads; + for (int i = 0; i < 10; ++i) { + auto min = i * 1000; + auto max = (i + 1) * 1000; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(i); + } + }); + } + + for (auto& thread : pusherThreads) { + thread.join(); + } + queue.finish(); + + for (auto& thread : popperThreads) { + thread.join(); + } + + for (int i = 0; i < 10000; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(BufferWorkQueue, SizeCalculatedCorrectly) { + { + BufferWorkQueue queue; + queue.finish(); + EXPECT_EQ(0, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.finish(); + EXPECT_EQ(10, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + EXPECT_EQ(15, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + Buffer buffer; + queue.pop(buffer); + EXPECT_EQ(5, queue.size()); + } +}