From 11dc940e726012c8341679eeba44b6a876a49466 Mon Sep 17 00:00:00 2001 From: Sean Purcell Date: Fri, 21 Apr 2017 12:23:06 -0700 Subject: [PATCH] Add parallel processing example for seekable API --- contrib/seekable_format/examples/.gitignore | 1 + contrib/seekable_format/examples/Makefile | 7 +- .../examples/parallel_processing.c | 195 ++++++++++++++++++ .../examples/seekable_decompression.c | 2 +- contrib/seekable_format/zstdseek_decompress.c | 7 +- 5 files changed, 206 insertions(+), 6 deletions(-) create mode 100644 contrib/seekable_format/examples/parallel_processing.c diff --git a/contrib/seekable_format/examples/.gitignore b/contrib/seekable_format/examples/.gitignore index 4ded45619..1e1661d80 100644 --- a/contrib/seekable_format/examples/.gitignore +++ b/contrib/seekable_format/examples/.gitignore @@ -1,2 +1,3 @@ seekable_compression seekable_decompression +parallel_processing diff --git a/contrib/seekable_format/examples/Makefile b/contrib/seekable_format/examples/Makefile index fcd1d9146..a77143889 100644 --- a/contrib/seekable_format/examples/Makefile +++ b/contrib/seekable_format/examples/Makefile @@ -21,7 +21,7 @@ SEEKABLE_OBJS = ../zstdseek_compress.c ../zstdseek_decompress.c default: all -all: seekable_compression seekable_decompression +all: seekable_compression seekable_decompression parallel_processing seekable_compression : seekable_compression.c $(SEEKABLE_OBJS) $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ @@ -29,7 +29,10 @@ seekable_compression : seekable_compression.c $(SEEKABLE_OBJS) seekable_decompression : seekable_decompression.c $(SEEKABLE_OBJS) $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ +parallel_processing : parallel_processing.c $(SEEKABLE_OBJS) + $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ -pthread + clean: @rm -f core *.o tmp* result* *.zst \ - seekable_compression seekable_decompression + seekable_compression seekable_decompression parallel_processing @echo Cleaning completed diff --git a/contrib/seekable_format/examples/parallel_processing.c b/contrib/seekable_format/examples/parallel_processing.c new file mode 100644 index 000000000..cf2d0d2af --- /dev/null +++ b/contrib/seekable_format/examples/parallel_processing.c @@ -0,0 +1,195 @@ +/** + * Copyright 2017-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the license found in the + * LICENSE-examples file in the root directory of this source tree. + */ + +/* + * A simple demo that sums up all the bytes in the file in parallel using + * seekable decompression and the zstd thread pool + */ + +#include // malloc, exit +#include // fprintf, perror, feof +#include // strerror +#include // errno +#define ZSTD_STATIC_LINKING_ONLY +#include // presumes zstd library is installed +#include +#if defined(WIN32) || defined(_WIN32) +# include +# define SLEEP(x) Sleep(x) +#else +# include +# define SLEEP(x) usleep(x * 1000) +#endif + +#include "pool.h" // use zstd thread pool for demo + +#include "zstd_seekable.h" + +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +static void* malloc_orDie(size_t size) +{ + void* const buff = malloc(size); + if (buff) return buff; + /* error */ + perror("malloc"); + exit(1); +} + +static void* realloc_orDie(void* ptr, size_t size) +{ + ptr = realloc(ptr, size); + if (ptr) return ptr; + /* error */ + perror("realloc"); + exit(1); +} + +static FILE* fopen_orDie(const char *filename, const char *instruction) +{ + FILE* const inFile = fopen(filename, instruction); + if (inFile) return inFile; + /* error */ + perror(filename); + exit(3); +} + +static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file) +{ + size_t const readSize = fread(buffer, 1, sizeToRead, file); + if (readSize == sizeToRead) return readSize; /* good */ + if (feof(file)) return readSize; /* good, reached end of file */ + /* error */ + perror("fread"); + exit(4); +} + +static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file) +{ + size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file); + if (writtenSize == sizeToWrite) return sizeToWrite; /* good */ + /* error */ + perror("fwrite"); + exit(5); +} + +static size_t fclose_orDie(FILE* file) +{ + if (!fclose(file)) return 0; + /* error */ + perror("fclose"); + exit(6); +} + +static void fseek_orDie(FILE* file, long int offset, int origin) { + if (!fseek(file, offset, origin)) { + if (!fflush(file)) return; + } + /* error */ + perror("fseek"); + exit(7); +} + +static const char* filename; + +struct sum_job { + const char* fname; + unsigned long long sum; + unsigned frameNb; + int done; +}; + +static void sumFrame(void* opaque) +{ + struct sum_job* job = (struct sum_job*)opaque; + job->done = 0; + + FILE* const fin = fopen_orDie(job->fname, "rb"); + + ZSTD_seekable* const seekable = ZSTD_seekable_create(); + if (seekable==NULL) { fprintf(stderr, "ZSTD_seekable_create() error \n"); exit(10); } + + size_t const initResult = ZSTD_seekable_initFile(seekable, fin); + if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_seekable_init() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); } + + size_t const frameSize = ZSTD_seekable_getFrameDecompressedSize(seekable, job->frameNb); + unsigned char* data = malloc_orDie(frameSize); + + size_t result = ZSTD_seekable_decompressFrame(seekable, data, frameSize, job->frameNb); + if (ZSTD_isError(result)) { fprintf(stderr, "ZSTD_seekable_decompressFrame() error : %s \n", ZSTD_getErrorName(result)); exit(12); } + + unsigned long long sum = 0; + size_t i; + for (i = 0; i < frameSize; i++) { + sum += data[i]; + } + job->sum = sum; + job->done = 1; + + fclose(fin); + ZSTD_seekable_free(seekable); + free(data); +} + +static void sumFile_orDie(const char* fname, int nbThreads) +{ + POOL_ctx* pool = POOL_create(nbThreads, nbThreads); + if (pool == NULL) { fprintf(stderr, "POOL_create() error \n"); exit(9); } + + FILE* const fin = fopen_orDie(fname, "rb"); + + ZSTD_seekable* const seekable = ZSTD_seekable_create(); + if (seekable==NULL) { fprintf(stderr, "ZSTD_seekable_create() error \n"); exit(10); } + + size_t const initResult = ZSTD_seekable_initFile(seekable, fin); + if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_seekable_init() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); } + + size_t const numFrames = ZSTD_seekable_getNumFrames(seekable); + struct sum_job* jobs = (struct sum_job*)malloc(numFrames * sizeof(struct sum_job)); + + size_t i; + for (i = 0; i < numFrames; i++) { + jobs[i] = (struct sum_job){ fname, 0, i, 0 }; + POOL_add(pool, sumFrame, &jobs[i]); + } + + unsigned long long total = 0; + + for (i = 0; i < numFrames; i++) { + while (!jobs[i].done) SLEEP(5); /* wake up every 5 milliseconds to check */ + total += jobs[i].sum; + } + + printf("Sum: %llu\n", total); + + POOL_free(pool); + ZSTD_seekable_free(seekable); + fclose(fin); + free(jobs); +} + + +int main(int argc, const char** argv) +{ + const char* const exeName = argv[0]; + + if (argc!=3) { + fprintf(stderr, "wrong arguments\n"); + fprintf(stderr, "usage:\n"); + fprintf(stderr, "%s FILE NB_THREADS\n", exeName); + return 1; + } + + { + const char* const inFilename = argv[1]; + int const nbThreads = atoi(argv[2]); + sumFile_orDie(inFilename, nbThreads); + } + + return 0; +} diff --git a/contrib/seekable_format/examples/seekable_decompression.c b/contrib/seekable_format/examples/seekable_decompression.c index d18def7cd..b765a7591 100644 --- a/contrib/seekable_format/examples/seekable_decompression.c +++ b/contrib/seekable_format/examples/seekable_decompression.c @@ -122,7 +122,7 @@ int main(int argc, const char** argv) if (argc!=4) { fprintf(stderr, "wrong arguments\n"); fprintf(stderr, "usage:\n"); - fprintf(stderr, "%s FILE\n", exeName); + fprintf(stderr, "%s FILE START END\n", exeName); return 1; } diff --git a/contrib/seekable_format/zstdseek_decompress.c b/contrib/seekable_format/zstdseek_decompress.c index a9e87577b..4a8b4e568 100644 --- a/contrib/seekable_format/zstdseek_decompress.c +++ b/contrib/seekable_format/zstdseek_decompress.c @@ -406,7 +406,8 @@ size_t ZSTD_seekable_decompress(ZSTD_seekable* zs, void* dst, size_t len, U64 of } if (zs->seekTable.checksumFlag) { - XXH64_update(&zs->xxhState, outTmp.dst, outTmp.pos); + XXH64_update(&zs->xxhState, (BYTE*)outTmp.dst + prevOutPos, + outTmp.pos - prevOutPos); } zs->decompressedOffset += outTmp.pos - prevOutPos; @@ -454,7 +455,7 @@ size_t ZSTD_seekable_decompressFrame(ZSTD_seekable* zs, void* dst, size_t dstSiz return ERROR(dstSize_tooSmall); } return ZSTD_seekable_decompress( - zs, dst, zs->seekTable.entries[frameIndex].dOffset, - decompressedSize); + zs, dst, decompressedSize, + zs->seekTable.entries[frameIndex].dOffset); } }