1
0
mirror of https://github.com/facebook/zstd.git synced 2025-07-29 11:21:22 +03:00

Update seekable API to simplify IO

This commit is contained in:
Sean Purcell
2017-04-18 16:47:28 -07:00
parent 9626cf1ac6
commit 0f7bd772e6
5 changed files with 397 additions and 326 deletions

View File

@ -7,7 +7,54 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* *********************************************************
* Turn on Large Files support (>4GB) for 32-bit Linux/Unix
***********************************************************/
#if !defined(__64BIT__) || defined(__MINGW32__) /* No point defining Large file for 64 bit but MinGW-w64 requires it */
# if !defined(_FILE_OFFSET_BITS)
# define _FILE_OFFSET_BITS 64 /* turn off_t into a 64-bit type for ftello, fseeko */
# endif
# if !defined(_LARGEFILE_SOURCE) /* obsolete macro, replaced with _FILE_OFFSET_BITS */
# define _LARGEFILE_SOURCE 1 /* Large File Support extension (LFS) - fseeko, ftello */
# endif
# if defined(_AIX) || defined(__hpux)
# define _LARGE_FILES /* Large file support on 32-bits AIX and HP-UX */
# endif
#endif
/* ************************************************************
* Avoid fseek()'s 2GiB barrier with MSVC, MacOS, *BSD, MinGW
***************************************************************/
#if defined(_MSC_VER) && _MSC_VER >= 1400
# define LONG_SEEK _fseeki64
#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */
# define LONG_SEEK fseeko
#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__)
# define LONG_SEEK fseeko64
#elif defined(_WIN32) && !defined(__DJGPP__)
# include <windows.h>
static int LONG_SEEK(FILE* file, __int64 offset, int origin) {
LARGE_INTEGER off;
DWORD method;
off.QuadPart = offset;
if (origin == SEEK_END)
method = FILE_END;
else if (origin == SEEK_CUR)
method = FILE_CURRENT;
else
method = FILE_BEGIN;
if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method))
return 0;
else
return -1;
}
#else
# define LONG_SEEK fseek
#endif
#include <stdlib.h> /* malloc, free */
#include <stdio.h> /* FILE* */
#define XXH_STATIC_LINKING_ONLY
#define XXH_NAMESPACE ZSTD_
@ -16,17 +63,74 @@
#define ZSTD_STATIC_LINKING_ONLY
#include "zstd.h"
#include "zstd_errors.h"
#include "mem.h" /* includes zstd.h */
#include "mem.h"
#include "zstd_seekable.h"
#undef ERROR
#define ERROR(name) ((size_t)-ZSTD_error_##name)
#define CHECK_IO(f) { int const errcod = (f); if (errcod < 0) return ERROR(seekableIO); }
#undef MIN
#undef MAX
#define MIN(a, b) ((a) < (b) ? (a) : (b))
#define MAX(a, b) ((a) > (b) ? (a) : (b))
/* Special-case callbacks for FILE* and in-memory modes, so that we can treat
* them the same way as the advanced API */
static int ZSTD_seekable_read_FILE(void* opaque, void* buffer, size_t n)
{
size_t const result = fread(buffer, 1, n, (FILE*)opaque);
if (result != n) {
return -1;
}
return 0;
}
static int ZSTD_seekable_seek_FILE(void* opaque, S64 offset, int origin)
{
int const ret = LONG_SEEK((FILE*)opaque, offset, origin);
if (ret) return ret;
return fflush((FILE*)opaque);
}
typedef struct {
const void *ptr;
size_t size;
size_t pos;
} buffWrapper_t;
static int ZSTD_seekable_read_buff(void* opaque, void* buffer, size_t n)
{
buffWrapper_t* buff = (buffWrapper_t*) opaque;
if (buff->size + n > buff->pos) return -1;
memcpy(buffer, (const BYTE*)buff->ptr + buff->pos, n);
buff->pos += n;
return 0;
}
static int ZSTD_seekable_seek_buff(void* opaque, S64 offset, int origin)
{
buffWrapper_t* buff = (buffWrapper_t*) opaque;
unsigned long long newOffset;
switch (origin) {
case SEEK_SET:
newOffset = offset;
break;
case SEEK_CUR:
newOffset = (unsigned long long)buff->pos + offset;
break;
case SEEK_END:
newOffset = (unsigned long long)buff->size - offset;
break;
}
if (newOffset < 0 || newOffset > buff->size) {
return -1;
}
buff->pos = newOffset;
return 0;
}
typedef struct {
U64 cOffset;
U64 dOffset;
@ -40,18 +144,70 @@ typedef struct {
int checksumFlag;
} seekTable_t;
#define SEEKABLE_BUFF_SIZE ZSTD_BLOCKSIZE_ABSOLUTEMAX
struct ZSTD_seekable_s {
ZSTD_DStream* dstream;
seekTable_t seekTable;
ZSTD_seekable_customFile src;
U64 decompressedOffset;
U32 curFrame;
BYTE inBuff[SEEKABLE_BUFF_SIZE]; /* need to do our own input buffering */
BYTE outBuff[SEEKABLE_BUFF_SIZE]; /* so we can efficiently decompress the
starts of chunks before we get to the
desired section */
ZSTD_inBuffer in; /* maintain continuity across ZSTD_seekable_decompress operations */
buffWrapper_t buffWrapper; /* for `src.opaque` in in-memory mode */
XXH64_state_t xxhState;
};
ZSTD_seekable* ZSTD_seekable_create(void)
{
ZSTD_seekable* zs = malloc(sizeof(ZSTD_seekable));
if (zs == NULL) return NULL;
/* also initializes stage to zsds_init */
memset(zs, 0, sizeof(*zs));
zs->dstream = ZSTD_createDStream();
if (zs->dstream == NULL) {
free(zs);
return NULL;
}
return zs;
}
size_t ZSTD_seekable_free(ZSTD_seekable* zs)
{
if (zs == NULL) return 0; /* support free on null */
ZSTD_freeDStream(zs->dstream);
free(zs->seekTable.entries);
free(zs);
return 0;
}
/** ZSTD_seekable_offsetToFrame() :
* Performs a binary search to find the last frame with a decompressed offset
* <= pos
* @return : the frame's index */
static U32 ZSTD_seekable_offsetToFrame(const seekTable_t* table, U64 pos)
U32 ZSTD_seekable_offsetToFrame(ZSTD_seekable* const zs, U64 pos)
{
U32 lo = 0;
U32 hi = table->tableLen;
U32 hi = zs->seekTable.tableLen;
if (pos >= zs->seekTable.entries[zs->seekTable.tableLen].dOffset) {
return zs->seekTable.tableLen;
}
while (lo + 1 < hi) {
U32 const mid = lo + ((hi - lo) >> 1);
if (table->entries[mid].dOffset <= pos) {
if (zs->seekTable.entries[mid].dOffset <= pos) {
lo = mid;
} else {
hi = mid;
@ -60,75 +216,50 @@ static U32 ZSTD_seekable_offsetToFrame(const seekTable_t* table, U64 pos)
return lo;
}
/* Stream decompressor state machine stages */
enum ZSTD_seekable_DStream_stage {
zsds_init = 0,
zsds_seek,
zsds_decompress,
zsds_done,
};
struct ZSTD_seekable_DStream_s {
ZSTD_DStream* dstream;
seekTable_t seekTable;
U32 curFrame;
U64 compressedOffset;
U64 decompressedOffset;
U64 targetStart;
U64 targetEnd;
U64 nextSeek;
enum ZSTD_seekable_DStream_stage stage;
XXH64_state_t xxhState;
};
ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void)
U32 ZSTD_seekable_getNumFrames(ZSTD_seekable* const zs)
{
ZSTD_seekable_DStream* zds = malloc(sizeof(ZSTD_seekable_DStream));
if (zds == NULL) return NULL;
/* also initializes stage to zsds_init */
memset(zds, 0, sizeof(*zds));
zds->dstream = ZSTD_createDStream();
if (zds->dstream == NULL) {
free(zds);
return NULL;
}
return zds;
return zs->seekTable.tableLen;
}
size_t ZSTD_seekable_freeDStream(ZSTD_seekable_DStream* zds)
U64 ZSTD_seekable_getFrameCompressedOffset(ZSTD_seekable* const zs, U32 frameIndex)
{
if (zds == NULL) return 0; /* support free on null */
ZSTD_freeDStream(zds->dstream);
free(zds->seekTable.entries);
free(zds);
return 0;
if (frameIndex >= zs->seekTable.tableLen) return ZSTD_SEEKABLE_FRAMEINDEX_TOOLARGE;
return zs->seekTable.entries[frameIndex].cOffset;
}
size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, size_t srcSize)
U64 ZSTD_seekable_getFrameDecompressedOffset(ZSTD_seekable* const zs, U32 frameIndex)
{
const BYTE* ip = (const BYTE*)src + srcSize;
if (frameIndex >= zs->seekTable.tableLen) return ZSTD_SEEKABLE_FRAMEINDEX_TOOLARGE;
return zs->seekTable.entries[frameIndex].dOffset;
}
size_t ZSTD_seekable_getFrameCompressedSize(ZSTD_seekable* const zs, U32 frameIndex)
{
if (frameIndex >= zs->seekTable.tableLen) return ERROR(frameIndex_tooLarge);
return zs->seekTable.entries[frameIndex + 1].cOffset -
zs->seekTable.entries[frameIndex].cOffset;
}
size_t ZSTD_seekable_getFrameDecompressedSize(ZSTD_seekable* const zs, U32 frameIndex)
{
if (frameIndex > zs->seekTable.tableLen) return ERROR(frameIndex_tooLarge);
return zs->seekTable.entries[frameIndex + 1].dOffset -
zs->seekTable.entries[frameIndex].dOffset;
}
static size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable* zs)
{
int checksumFlag;
ZSTD_seekable_customFile src = zs->src;
/* read the footer, fixed size */
CHECK_IO(src.seek(src.opaque, -(int)ZSTD_seekTableFooterSize, SEEK_END));
CHECK_IO(src.read(src.opaque, zs->inBuff, ZSTD_seekTableFooterSize));
/* footer is fixed size */
if (srcSize < ZSTD_seekTableFooterSize)
return ZSTD_seekTableFooterSize;
if (MEM_readLE32(ip - 4) != ZSTD_SEEKABLE_MAGICNUMBER) {
if (MEM_readLE32(zs->inBuff + 5) != ZSTD_SEEKABLE_MAGICNUMBER) {
return ERROR(prefix_unknown);
}
{ BYTE const sfd = ip[-5];
{ BYTE const sfd = zs->inBuff[4];
checksumFlag = sfd >> 7;
/* check reserved bits */
@ -137,30 +268,36 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src,
}
}
{ U32 const numFrames = MEM_readLE32(ip-9);
{ U32 const numFrames = MEM_readLE32(zs->inBuff);
U32 const sizePerEntry = 8 + (checksumFlag?4:0);
U32 const tableSize = sizePerEntry * numFrames;
U32 const frameSize = tableSize + ZSTD_seekTableFooterSize + ZSTD_skippableHeaderSize;
const BYTE* base = ip - frameSize;
U32 remaining = frameSize - ZSTD_seekTableFooterSize; /* don't need to re-read footer */
{
U32 const toRead = MIN(remaining, SEEKABLE_BUFF_SIZE);
if (srcSize < frameSize) return frameSize;
CHECK_IO(src.seek(src.opaque, -(S64)frameSize, SEEK_END));
CHECK_IO(src.read(src.opaque, zs->inBuff, toRead));
if (MEM_readLE32(base) != (ZSTD_MAGIC_SKIPPABLE_START | 0xE)) {
remaining -= toRead;
}
if (MEM_readLE32(zs->inBuff) != (ZSTD_MAGIC_SKIPPABLE_START | 0xE)) {
return ERROR(prefix_unknown);
}
if (MEM_readLE32(base+4) + ZSTD_skippableHeaderSize != frameSize) {
if (MEM_readLE32(zs->inBuff+4) + ZSTD_skippableHeaderSize != frameSize) {
return ERROR(prefix_unknown);
}
{ /* Allocate an extra entry at the end so that we can do size
* computations on the last element without special case */
seekEntry_t* entries =
(seekEntry_t*)malloc(sizeof(seekEntry_t) * (numFrames + 1));
const BYTE* tableBase = base + ZSTD_skippableHeaderSize;
seekEntry_t* entries = (seekEntry_t*)malloc(sizeof(seekEntry_t) * (numFrames + 1));
const BYTE* tableBase = zs->inBuff + ZSTD_skippableHeaderSize;
U32 idx = 0;
U32 pos = 8;
U32 idx;
size_t pos;
U64 cOffset = 0;
U64 dOffset = 0;
@ -171,202 +308,153 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src,
}
/* compute cumulative positions */
for (idx = 0, pos = 0; idx < numFrames; idx++) {
for (; idx < numFrames; idx++) {
if (pos + sizePerEntry > SEEKABLE_BUFF_SIZE) {
U32 const toRead = MIN(remaining, SEEKABLE_BUFF_SIZE);
U32 const offset = SEEKABLE_BUFF_SIZE - pos;
memmove(zs->inBuff, zs->inBuff + pos, offset); /* move any data we haven't read yet */
CHECK_IO(src.read(src.opaque, zs->inBuff+offset, toRead));
remaining -= toRead;
pos = 0;
}
entries[idx].cOffset = cOffset;
entries[idx].dOffset = dOffset;
cOffset += MEM_readLE32(tableBase + pos);
cOffset += MEM_readLE32(zs->inBuff + pos);
pos += 4;
dOffset += MEM_readLE32(tableBase + pos);
dOffset += MEM_readLE32(zs->inBuff + pos);
pos += 4;
if (checksumFlag) {
entries[idx].checksum = MEM_readLE32(tableBase + pos);
entries[idx].checksum = MEM_readLE32(zs->inBuff + pos);
pos += 4;
}
}
entries[numFrames].cOffset = cOffset;
entries[numFrames].dOffset = dOffset;
zds->seekTable.entries = entries;
zds->seekTable.tableLen = numFrames;
zds->seekTable.checksumFlag = checksumFlag;
zs->seekTable.entries = entries;
zs->seekTable.tableLen = numFrames;
zs->seekTable.checksumFlag = checksumFlag;
return 0;
}
}
}
size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, U64 rangeStart, U64 rangeEnd)
size_t ZSTD_seekable_initBuff(ZSTD_seekable* zs, const void* src, size_t srcSize)
{
/* restrict range to the end of the file, of non-negative size */
rangeEnd = MIN(rangeEnd, zds->seekTable.entries[zds->seekTable.tableLen].dOffset);
rangeStart = MIN(rangeStart, rangeEnd);
zs->buffWrapper = (buffWrapper_t){src, srcSize, 0};
{ ZSTD_seekable_customFile srcFile = {&zs->buffWrapper,
&ZSTD_seekable_read_buff,
&ZSTD_seekable_seek_buff};
return ZSTD_seekable_initAdvanced(zs, srcFile); }
}
zds->targetStart = rangeStart;
zds->targetEnd = rangeEnd;
zds->stage = zsds_seek;
size_t ZSTD_seekable_initFile(ZSTD_seekable* zs, FILE* src)
{
ZSTD_seekable_customFile srcFile = {src, &ZSTD_seekable_read_FILE,
&ZSTD_seekable_seek_FILE};
return ZSTD_seekable_initAdvanced(zs, srcFile);
}
/* force a seek first */
zds->curFrame = (U32)-1;
zds->compressedOffset = (U64)-1;
zds->decompressedOffset = (U64)-1;
size_t ZSTD_seekable_initAdvanced(ZSTD_seekable* zs, ZSTD_seekable_customFile src)
{
zs->src = src;
if (zds->seekTable.checksumFlag) {
XXH64_reset(&zds->xxhState, 0);
}
{ const size_t seekTableInit = ZSTD_seekable_loadSeekTable(zs);
if (ZSTD_isError(seekTableInit)) return seekTableInit; }
if (rangeStart == rangeEnd) zds->stage = zsds_done;
zs->decompressedOffset = (U64)-1;
zs->curFrame = (U32)-1;
{ const size_t ret = ZSTD_initDStream(zds->dstream);
if (ZSTD_isError(ret)) return ret; }
{ const size_t dstreamInit = ZSTD_initDStream(zs->dstream);
if (ZSTD_isError(dstreamInit)) return dstreamInit; }
return 0;
}
U64 ZSTD_seekable_getSeekOffset(ZSTD_seekable_DStream* zds)
size_t ZSTD_seekable_decompress(ZSTD_seekable* zs, void* dst, size_t len, U64 offset)
{
return zds->nextSeek;
}
U32 targetFrame = ZSTD_seekable_offsetToFrame(zs, offset);
do {
/* check if we can continue from a previous decompress job */
if (targetFrame != zs->curFrame || offset != zs->decompressedOffset) {
zs->decompressedOffset = zs->seekTable.entries[targetFrame].dOffset;
zs->curFrame = targetFrame;
size_t ZSTD_seekable_updateOffset(ZSTD_seekable_DStream* zds, U64 offset)
{
if (zds->stage != zsds_seek) {
return ERROR(stage_wrong);
}
if (offset != zds->nextSeek) {
return ERROR(needSeek);
}
CHECK_IO(zs->src.seek(zs->src.opaque,
zs->seekTable.entries[targetFrame].cOffset,
SEEK_SET));
zs->in = (ZSTD_inBuffer){zs->inBuff, 0, 0};
XXH64_reset(&zs->xxhState, 0);
ZSTD_resetDStream(zs->dstream);
}
zds->stage = zsds_decompress;
zds->compressedOffset = offset;
return 0;
}
while (zs->decompressedOffset < offset + len) {
size_t toRead;
ZSTD_outBuffer outTmp;
size_t prevOutPos;
if (zs->decompressedOffset < offset) {
/* dummy decompressions until we get to the target offset */
outTmp = (ZSTD_outBuffer){zs->outBuff, MIN(SEEKABLE_BUFF_SIZE, offset - zs->decompressedOffset), 0};
} else {
outTmp = (ZSTD_outBuffer){dst, len, zs->decompressedOffset - offset};
}
size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
const seekTable_t* const jt = &zds->seekTable;
while (1) {
switch (zds->stage) {
case zsds_init:
return ERROR(init_missing); /* ZSTD_seekable_initDStream should be called first */
case zsds_decompress: {
BYTE* const outBase = (BYTE*)output->dst + output->pos;
size_t const outLen = output->size - output->pos;
while (zds->decompressedOffset < zds->targetStart) {
U64 const toDecompress =
zds->targetStart - zds->decompressedOffset;
size_t const prevInputPos = input->pos;
prevOutPos = outTmp.pos;
toRead = ZSTD_decompressStream(zs->dstream, &outTmp, &zs->in);
if (ZSTD_isError(toRead)) {
return toRead;
}
ZSTD_outBuffer outTmp = {
outBase, (size_t)MIN((U64)outLen, toDecompress), 0};
if (zs->seekTable.checksumFlag) {
XXH64_update(&zs->xxhState, outTmp.dst, outTmp.pos);
}
zs->decompressedOffset += outTmp.pos - prevOutPos;
size_t const ret =
ZSTD_decompressStream(zds->dstream, &outTmp, input);
if (toRead == 0) {
/* frame complete */
if (ZSTD_isError(ret)) return ret;
if (ret == 0) {
/* should not happen at this stage */
/* verify checksum */
if (zs->seekTable.checksumFlag &&
(XXH64_digest(&zs->xxhState) & 0xFFFFFFFFU) !=
zs->seekTable.entries[targetFrame].checksum) {
return ERROR(corruption_detected);
}
zds->compressedOffset += input->pos - prevInputPos;
zds->decompressedOffset += outTmp.pos;
if (jt->checksumFlag) {
XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos);
if (zs->decompressedOffset < offset + len) {
/* go back to the start and force a reset of the stream */
targetFrame = ZSTD_seekable_offsetToFrame(zs, zs->decompressedOffset);
}
if (input->pos == input->size) {
/* need more input */
return MIN(
ZSTD_DStreamInSize(),
(size_t)(jt->entries[zds->curFrame + 1]
.cOffset -
zds->compressedOffset));
}
}
/* do actual decompression */
{
U64 const toDecompress =
MIN(zds->targetEnd,
jt->entries[zds->curFrame + 1].dOffset) -
zds->decompressedOffset;
size_t const prevInputPos = input->pos;
ZSTD_outBuffer outTmp = {
outBase, (size_t)MIN((U64)outLen, toDecompress), 0};
size_t const ret =
ZSTD_decompressStream(zds->dstream, &outTmp, input);
if (ZSTD_isError(ret)) return ret;
zds->compressedOffset += input->pos - prevInputPos;
zds->decompressedOffset += outTmp.pos;
output->pos += outTmp.pos;
if (jt->checksumFlag) {
XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos);
if (ret == 0) {
/* verify the checksum */
U32 const digest = XXH64_digest(&zds->xxhState) & 0xFFFFFFFFU;
if (digest != jt->entries[zds->curFrame].checksum) {
return ERROR(checksum_wrong);
}
XXH64_reset(&zds->xxhState, 0);
}
}
if (zds->decompressedOffset == zds->targetEnd) {
/* done */
zds->stage = zsds_done;
return 0;
}
if (ret == 0) {
/* frame is done */
/* make sure this lines up with the expected frame border */
if (zds->decompressedOffset !=
jt->entries[zds->curFrame + 1].dOffset ||
zds->compressedOffset !=
jt->entries[zds->curFrame + 1].cOffset)
return ERROR(corruption_detected);
ZSTD_resetDStream(zds->dstream);
zds->stage = zsds_seek;
break;
}
/* need more input */
return MIN(ZSTD_DStreamInSize(), (size_t)(
jt->entries[zds->curFrame + 1].cOffset -
zds->compressedOffset));
}
}
case zsds_seek: {
U32 targetFrame;
if (zds->decompressedOffset < zds->targetStart ||
zds->decompressedOffset >= zds->targetEnd) {
/* haven't started yet */
targetFrame = ZSTD_seekable_offsetToFrame(jt, zds->targetStart);
} else {
targetFrame = ZSTD_seekable_offsetToFrame(jt, zds->decompressedOffset);
}
zds->curFrame = targetFrame;
if (zds->compressedOffset == jt->entries[targetFrame].cOffset) {
zds->stage = zsds_decompress;
break;
}
zds->nextSeek = jt->entries[targetFrame].cOffset;
zds->decompressedOffset = jt->entries[targetFrame].dOffset;
/* signal to user that a seek is required */
return ERROR(needSeek);
/* read in more data if we're done with this buffer */
if (zs->in.pos == zs->in.size) {
toRead = MIN(toRead, SEEKABLE_BUFF_SIZE);
CHECK_IO(zs->src.read(zs->src.opaque, zs->inBuff, toRead));
zs->in.size = toRead;
zs->in.pos = 0;
}
}
case zsds_done:
return 0;
} while (zs->decompressedOffset != offset + len);
return len;
}
size_t ZSTD_seekable_decompressFrame(ZSTD_seekable* zs, void* dst, size_t dstSize, U32 frameIndex)
{
if (frameIndex >= zs->seekTable.tableLen) {
return ERROR(frameIndex_tooLarge);
}
{
size_t const decompressedSize =
zs->seekTable.entries[frameIndex + 1].dOffset -
zs->seekTable.entries[frameIndex].dOffset;
if (dstSize < decompressedSize) {
return ERROR(dstSize_tooSmall);
}
return ZSTD_seekable_decompress(
zs, dst, zs->seekTable.entries[frameIndex].dOffset,
decompressedSize);
}
}