You've already forked postgres_exporter
mirror of
https://github.com/prometheus-community/postgres_exporter.git
synced 2025-09-10 22:51:54 +03:00
This commit implements a massive refactor of the repository, and moves the build system over to use Mage (magefile.org) which should allow seamless building across multiple platforms.
365 lines
9.7 KiB
Go
365 lines
9.7 KiB
Go
package lz4
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"io/ioutil"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// ErrInvalid is returned when the data being read is not an LZ4 archive
|
|
// (LZ4 magic number detection failed).
|
|
var ErrInvalid = errors.New("invalid lz4 data")
|
|
|
|
// errEndOfBlock is returned by readBlock when it has reached the last block of the frame.
|
|
// It is not an error.
|
|
var errEndOfBlock = errors.New("end of block")
|
|
|
|
// Reader implements the LZ4 frame decoder.
|
|
// The Header is set after the first call to Read().
|
|
// The Header may change between Read() calls in case of concatenated frames.
|
|
type Reader struct {
|
|
Pos int64 // position within the source
|
|
Header
|
|
src io.Reader
|
|
checksum hash.Hash32 // frame hash
|
|
wg sync.WaitGroup // decompressing go routine wait group
|
|
data []byte // buffered decompressed data
|
|
window []byte // 64Kb decompressed data window
|
|
}
|
|
|
|
// NewReader returns a new LZ4 frame decoder.
|
|
// No access to the underlying io.Reader is performed.
|
|
func NewReader(src io.Reader) *Reader {
|
|
return &Reader{
|
|
src: src,
|
|
checksum: hashPool.Get(),
|
|
}
|
|
}
|
|
|
|
// readHeader checks the frame magic number and parses the frame descriptoz.
|
|
// Skippable frames are supported even as a first frame although the LZ4
|
|
// specifications recommends skippable frames not to be used as first frames.
|
|
func (z *Reader) readHeader(first bool) error {
|
|
defer z.checksum.Reset()
|
|
|
|
for {
|
|
var magic uint32
|
|
if err := binary.Read(z.src, binary.LittleEndian, &magic); err != nil {
|
|
if !first && err == io.ErrUnexpectedEOF {
|
|
return io.EOF
|
|
}
|
|
return err
|
|
}
|
|
z.Pos += 4
|
|
if magic>>8 == frameSkipMagic>>8 {
|
|
var skipSize uint32
|
|
if err := binary.Read(z.src, binary.LittleEndian, &skipSize); err != nil {
|
|
return err
|
|
}
|
|
z.Pos += 4
|
|
m, err := io.CopyN(ioutil.Discard, z.src, int64(skipSize))
|
|
z.Pos += m
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
if magic != frameMagic {
|
|
return ErrInvalid
|
|
}
|
|
break
|
|
}
|
|
|
|
// header
|
|
var buf [8]byte
|
|
if _, err := io.ReadFull(z.src, buf[:2]); err != nil {
|
|
return err
|
|
}
|
|
z.Pos += 2
|
|
|
|
b := buf[0]
|
|
if b>>6 != Version {
|
|
return fmt.Errorf("lz4.Read: invalid version: got %d expected %d", b>>6, Version)
|
|
}
|
|
z.BlockDependency = b>>5&1 == 0
|
|
z.BlockChecksum = b>>4&1 > 0
|
|
frameSize := b>>3&1 > 0
|
|
z.NoChecksum = b>>2&1 == 0
|
|
// z.Dict = b&1 > 0
|
|
|
|
bmsID := buf[1] >> 4 & 0x7
|
|
bSize, ok := bsMapID[bmsID]
|
|
if !ok {
|
|
return fmt.Errorf("lz4.Read: invalid block max size: %d", bmsID)
|
|
}
|
|
z.BlockMaxSize = bSize
|
|
|
|
z.checksum.Write(buf[0:2])
|
|
|
|
if frameSize {
|
|
if err := binary.Read(z.src, binary.LittleEndian, &z.Size); err != nil {
|
|
return err
|
|
}
|
|
z.Pos += 8
|
|
binary.LittleEndian.PutUint64(buf[:], z.Size)
|
|
z.checksum.Write(buf[0:8])
|
|
}
|
|
|
|
// if z.Dict {
|
|
// if err := binary.Read(z.src, binary.LittleEndian, &z.DictID); err != nil {
|
|
// return err
|
|
// }
|
|
// z.Pos += 4
|
|
// binary.LittleEndian.PutUint32(buf[:], z.DictID)
|
|
// z.checksum.Write(buf[0:4])
|
|
// }
|
|
|
|
// header checksum
|
|
if _, err := io.ReadFull(z.src, buf[:1]); err != nil {
|
|
return err
|
|
}
|
|
z.Pos++
|
|
if h := byte(z.checksum.Sum32() >> 8 & 0xFF); h != buf[0] {
|
|
return fmt.Errorf("lz4.Read: invalid header checksum: got %v expected %v", buf[0], h)
|
|
}
|
|
|
|
z.Header.done = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// Read decompresses data from the underlying source into the supplied buffer.
|
|
//
|
|
// Since there can be multiple streams concatenated, Header values may
|
|
// change between calls to Read(). If that is the case, no data is actually read from
|
|
// the underlying io.Reader, to allow for potential input buffer resizing.
|
|
//
|
|
// Data is buffered if the input buffer is too small, and exhausted upon successive calls.
|
|
//
|
|
// If the buffer is large enough (typically in multiples of BlockMaxSize) and there is
|
|
// no block dependency, then the data will be decompressed concurrently based on the GOMAXPROCS value.
|
|
func (z *Reader) Read(buf []byte) (n int, err error) {
|
|
if !z.Header.done {
|
|
if err = z.readHeader(true); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(buf) == 0 {
|
|
return
|
|
}
|
|
|
|
// exhaust remaining data from previous Read()
|
|
if len(z.data) > 0 {
|
|
n = copy(buf, z.data)
|
|
z.data = z.data[n:]
|
|
if len(z.data) == 0 {
|
|
z.data = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// Break up the input buffer into BlockMaxSize blocks with at least one block.
|
|
// Then decompress into each of them concurrently if possible (no dependency).
|
|
// In case of dependency, the first block will be missing the window (except on the
|
|
// very first call), the rest will have it already since it comes from the previous block.
|
|
wbuf := buf
|
|
zn := (len(wbuf) + z.BlockMaxSize - 1) / z.BlockMaxSize
|
|
zblocks := make([]block, zn)
|
|
for zi, abort := 0, uint32(0); zi < zn && atomic.LoadUint32(&abort) == 0; zi++ {
|
|
zb := &zblocks[zi]
|
|
// last block may be too small
|
|
if len(wbuf) < z.BlockMaxSize+len(z.window) {
|
|
wbuf = make([]byte, z.BlockMaxSize+len(z.window))
|
|
}
|
|
copy(wbuf, z.window)
|
|
if zb.err = z.readBlock(wbuf, zb); zb.err != nil {
|
|
break
|
|
}
|
|
wbuf = wbuf[z.BlockMaxSize:]
|
|
if !z.BlockDependency {
|
|
z.wg.Add(1)
|
|
go z.decompressBlock(zb, &abort)
|
|
continue
|
|
}
|
|
// cannot decompress concurrently when dealing with block dependency
|
|
z.decompressBlock(zb, nil)
|
|
// the last block may not contain enough data
|
|
if len(z.window) == 0 {
|
|
z.window = make([]byte, winSize)
|
|
}
|
|
if len(zb.data) >= winSize {
|
|
copy(z.window, zb.data[len(zb.data)-winSize:])
|
|
} else {
|
|
copy(z.window, z.window[len(zb.data):])
|
|
copy(z.window[len(zb.data)+1:], zb.data)
|
|
}
|
|
}
|
|
z.wg.Wait()
|
|
|
|
// since a block size may be less then BlockMaxSize, trim the decompressed buffers
|
|
for _, zb := range zblocks {
|
|
if zb.err != nil {
|
|
if zb.err == errEndOfBlock {
|
|
return n, z.close()
|
|
}
|
|
return n, zb.err
|
|
}
|
|
bLen := len(zb.data)
|
|
if !z.NoChecksum {
|
|
z.checksum.Write(zb.data)
|
|
}
|
|
m := copy(buf[n:], zb.data)
|
|
// buffer the remaining data (this is necessarily the last block)
|
|
if m < bLen {
|
|
z.data = zb.data[m:]
|
|
}
|
|
n += m
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// readBlock reads an entire frame block from the frame.
|
|
// The input buffer is the one that will receive the decompressed data.
|
|
// If the end of the frame is detected, it returns the errEndOfBlock error.
|
|
func (z *Reader) readBlock(buf []byte, b *block) error {
|
|
var bLen uint32
|
|
if err := binary.Read(z.src, binary.LittleEndian, &bLen); err != nil {
|
|
return err
|
|
}
|
|
atomic.AddInt64(&z.Pos, 4)
|
|
|
|
switch {
|
|
case bLen == 0:
|
|
return errEndOfBlock
|
|
case bLen&(1<<31) == 0:
|
|
b.compressed = true
|
|
b.data = buf
|
|
b.zdata = make([]byte, bLen)
|
|
default:
|
|
bLen = bLen & (1<<31 - 1)
|
|
if int(bLen) > len(buf) {
|
|
return fmt.Errorf("lz4.Read: invalid block size: %d", bLen)
|
|
}
|
|
b.data = buf[:bLen]
|
|
b.zdata = buf[:bLen]
|
|
}
|
|
if _, err := io.ReadFull(z.src, b.zdata); err != nil {
|
|
return err
|
|
}
|
|
|
|
if z.BlockChecksum {
|
|
if err := binary.Read(z.src, binary.LittleEndian, &b.checksum); err != nil {
|
|
return err
|
|
}
|
|
xxh := hashPool.Get()
|
|
defer hashPool.Put(xxh)
|
|
xxh.Write(b.zdata)
|
|
if h := xxh.Sum32(); h != b.checksum {
|
|
return fmt.Errorf("lz4.Read: invalid block checksum: got %x expected %x", h, b.checksum)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// decompressBlock decompresses a frame block.
|
|
// In case of an error, the block err is set with it and abort is set to 1.
|
|
func (z *Reader) decompressBlock(b *block, abort *uint32) {
|
|
if abort != nil {
|
|
defer z.wg.Done()
|
|
}
|
|
if b.compressed {
|
|
n := len(z.window)
|
|
m, err := UncompressBlock(b.zdata, b.data, n)
|
|
if err != nil {
|
|
if abort != nil {
|
|
atomic.StoreUint32(abort, 1)
|
|
}
|
|
b.err = err
|
|
return
|
|
}
|
|
b.data = b.data[n : n+m]
|
|
}
|
|
atomic.AddInt64(&z.Pos, int64(len(b.data)))
|
|
}
|
|
|
|
// close validates the frame checksum (if any) and checks the next frame (if any).
|
|
func (z *Reader) close() error {
|
|
if !z.NoChecksum {
|
|
var checksum uint32
|
|
if err := binary.Read(z.src, binary.LittleEndian, &checksum); err != nil {
|
|
return err
|
|
}
|
|
if checksum != z.checksum.Sum32() {
|
|
return fmt.Errorf("lz4.Read: invalid frame checksum: got %x expected %x", z.checksum.Sum32(), checksum)
|
|
}
|
|
}
|
|
|
|
// get ready for the next concatenated frame, but do not change the position
|
|
pos := z.Pos
|
|
z.Reset(z.src)
|
|
z.Pos = pos
|
|
|
|
// since multiple frames can be concatenated, check for another one
|
|
return z.readHeader(false)
|
|
}
|
|
|
|
// Reset discards the Reader's state and makes it equivalent to the
|
|
// result of its original state from NewReader, but reading from r instead.
|
|
// This permits reusing a Reader rather than allocating a new one.
|
|
func (z *Reader) Reset(r io.Reader) {
|
|
z.Header = Header{}
|
|
z.Pos = 0
|
|
z.src = r
|
|
z.checksum.Reset()
|
|
z.data = nil
|
|
z.window = nil
|
|
}
|
|
|
|
// WriteTo decompresses the data from the underlying io.Reader and writes it to the io.Writer.
|
|
// Returns the number of bytes written.
|
|
func (z *Reader) WriteTo(w io.Writer) (n int64, err error) {
|
|
cpus := runtime.GOMAXPROCS(0)
|
|
var buf []byte
|
|
|
|
// The initial buffer being nil, the first Read will be only read the compressed frame options.
|
|
// The buffer can then be sized appropriately to support maximum concurrency decompression.
|
|
// If multiple frames are concatenated, Read() will return with no data decompressed but with
|
|
// potentially changed options. The buffer will be resized accordingly, always trying to
|
|
// maximize concurrency.
|
|
for {
|
|
nsize := 0
|
|
// the block max size can change if multiple streams are concatenated.
|
|
// Check it after every Read().
|
|
if z.BlockDependency {
|
|
// in case of dependency, we cannot decompress concurrently,
|
|
// so allocate the minimum buffer + window size
|
|
nsize = len(z.window) + z.BlockMaxSize
|
|
} else {
|
|
// if no dependency, allocate a buffer large enough for concurrent decompression
|
|
nsize = cpus * z.BlockMaxSize
|
|
}
|
|
if nsize != len(buf) {
|
|
buf = make([]byte, nsize)
|
|
}
|
|
|
|
m, er := z.Read(buf)
|
|
if er != nil && er != io.EOF {
|
|
return n, er
|
|
}
|
|
m, err = w.Write(buf[:m])
|
|
n += int64(m)
|
|
if err != nil || er == io.EOF {
|
|
return
|
|
}
|
|
}
|
|
}
|