1
0
mirror of https://github.com/moby/buildkit.git synced 2025-07-30 15:03:06 +03:00

compression: introduce compression.Type interface

Introduce a new compression.Type interface, which needs
to be implemented for each compression type, by that we can
reduce the number of switch case statements and ensure that
we don't miss the handle of any compression types, and also
make more easily for supporting new compression types.

This is a commit for code improvement, so no logical changes.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
This commit is contained in:
Yan Song
2022-09-28 11:17:19 +00:00
parent a3e10ee36c
commit 6a1430e7cf
16 changed files with 424 additions and 328 deletions

69
cache/blobs.go vendored
View File

@ -1,19 +1,15 @@
package cache
import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"strconv"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/diff/walking"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
@ -57,8 +53,6 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter)
}
type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error)
func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}) error {
eg, ctx := errgroup.WithContext(ctx)
switch sr.kind() {
@ -92,28 +86,8 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.WithStack(ErrNoBlobs)
}
var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch comp.Type {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return gzipWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = compressEStargz(comp)
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return zstdWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", comp.Type)
}
compressorFunc, finalize := comp.Type.Compress(comp)
mediaType := comp.Type.MediaType()
var lowerRef *immutableRef
switch sr.kind() {
@ -206,7 +180,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}
}
if desc.Digest == "" && !isTypeWindows(sr) && (comp.Type == compression.Zstd || comp.Type == compression.EStargz) {
if desc.Digest == "" && !isTypeWindows(sr) && comp.Type.NeedsComputeDiffBySelf() {
// These compression types aren't supported by containerd differ. So try to compute diff on buildkit side.
// This case can be happen on containerd worker + non-overlayfs snapshotter (e.g. native).
// See also: https://github.com/containerd/containerd/issues/4263
@ -433,7 +407,7 @@ func isTypeWindows(sr *immutableRef) bool {
// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return nil, err
@ -480,38 +454,3 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
})
return err
}
func gzipWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := gzip.DefaultCompression
if comp.Level != nil {
level = *comp.Level
}
return gzip.NewWriterLevel(dest, level)
}
}
func zstdWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := zstd.SpeedDefault
if comp.Level != nil {
level = toZstdEncoderLevel(*comp.Level)
}
return zstd.NewWriter(dest, zstd.WithEncoderLevel(level))
}
}
func toZstdEncoderLevel(level int) zstd.EncoderLevel {
// map zstd compression levels to go-zstd levels
// once we also have c based implementation move this to helper pkg
if level < 0 {
return zstd.SpeedDefault
} else if level < 3 {
return zstd.SpeedFastest
} else if level < 7 {
return zstd.SpeedDefault
} else if level < 9 {
return zstd.SpeedBetterCompression
}
return zstd.SpeedBestCompression
}

View File

@ -12,6 +12,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/overlay"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
@ -24,7 +25,7 @@ var emptyDesc = ocispecs.Descriptor{}
// diff between lower and upper snapshot. If the passed mounts cannot
// be computed (e.g. because the mounts aren't overlayfs), it returns
// an error.
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
// Get upperdir location if mounts are overlayfs that can be processed by this differ.
upperdir, err := overlay.GetUpperdir(lower, upper)
if err != nil {

View File

@ -6,11 +6,12 @@ package cache
import (
"context"
"github.com/moby/buildkit/util/compression"
"github.com/containerd/containerd/mount"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
return ocispecs.Descriptor{}, true, errors.Errorf("overlayfs-based diff computing is unsupported")
}

133
cache/converter.go vendored
View File

@ -7,10 +7,8 @@ import (
"io"
"sync"
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/labels"
"github.com/moby/buildkit/identity"
@ -21,106 +19,33 @@ import (
"github.com/pkg/errors"
)
// needsConversion indicates whether a conversion is needed for the specified descriptor to
// be the compressionType.
func needsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (bool, error) {
mediaType := desc.MediaType
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed {
return false, nil
}
case compression.Gzip:
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if (!images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip) && !esgz {
return false, nil
}
case compression.Zstd:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd {
return false, nil
}
case compression.EStargz:
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if !images.IsLayerType(mediaType) || esgz {
return false, nil
}
default:
return false, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
return true, nil
}
// getConverter returns converter function according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, comp compression.Config) (converter.ConvertFunc, error) {
if needs, err := needsConversion(ctx, cs, desc, comp.Type); err != nil {
if needs, err := comp.Type.NeedsConversion(ctx, cs, desc); err != nil {
return nil, errors.Wrapf(err, "failed to determine conversion needs")
} else if !needs {
// No conversion. No need to return an error here.
return nil, nil
}
from, err := compression.FromMediaType(desc.MediaType)
if err != nil {
return nil, err
}
c := conversion{target: comp}
from := compression.FromMediaType(desc.MediaType)
switch from {
case compression.Uncompressed:
case compression.Gzip, compression.Zstd:
c.decompress = func(ctx context.Context, desc ocispecs.Descriptor) (r io.ReadCloser, err error) {
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return nil, err
} else if esgz {
r, err = decompressEStargz(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
} else {
r, err = cdcompression.DecompressStream(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
}
return &readCloser{r, ra.Close}, nil
}
default:
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
}
switch comp.Type {
case compression.Uncompressed:
case compression.Gzip:
c.compress = gzipWriter(comp)
case compression.Zstd:
c.compress = zstdWriter(comp)
case compression.EStargz:
compressorFunc, finalize := compressEStargz(comp)
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
}
c.finalize = finalize
default:
return nil, errors.Errorf("unknown target compression type during conversion: %q", comp.Type)
}
c.compress, c.finalize = comp.Type.Compress(comp)
c.decompress = from.Decompress
return (&c).convert, nil
}
type conversion struct {
target compression.Config
decompress func(context.Context, ocispecs.Descriptor) (io.ReadCloser, error)
compress func(w io.Writer) (io.WriteCloser, error)
finalize func(context.Context, content.Store) (map[string]string, error)
decompress compression.Decompressor
compress compression.Compressor
finalize compression.Finalizer
}
var bufioPool = sync.Pool{
@ -151,34 +76,20 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
bufW = bufio.NewWriterSize(w, 128*1024)
}
defer bufioPool.Put(bufW)
var zw io.WriteCloser = &nopWriteCloser{bufW}
if c.compress != nil {
zw, err = c.compress(zw)
zw, err := c.compress(&nopWriteCloser{bufW}, c.target.Type.MediaType())
if err != nil {
return nil, err
}
}
zw = &onceWriteCloser{WriteCloser: zw}
defer zw.Close()
// convert this layer
diffID := digest.Canonical.Digester()
var rdr io.Reader
if c.decompress == nil {
ra, err := cs.ReaderAt(ctx, desc)
rdr, err := c.decompress(ctx, cs, desc)
if err != nil {
return nil, err
}
defer ra.Close()
rdr = io.NewSectionReader(ra, 0, ra.Size())
} else {
rc, err := c.decompress(ctx, desc)
if err != nil {
return nil, err
}
defer rc.Close()
rdr = rc
}
defer rdr.Close()
if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil {
return nil, err
}
@ -201,7 +112,7 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
}
newDesc := desc
newDesc.MediaType = c.target.Type.DefaultMediaType()
newDesc.MediaType = c.target.Type.MediaType()
newDesc.Digest = info.Digest
newDesc.Size = info.Size
newDesc.Annotations = map[string]string{labels.LabelUncompressed: diffID.Digest().String()}
@ -217,20 +128,6 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
return &newDesc, nil
}
type readCloser struct {
io.ReadCloser
closeFunc func() error
}
func (rc *readCloser) Close() error {
err1 := rc.ReadCloser.Close()
err2 := rc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}
type nopWriteCloser struct {
io.Writer
}

14
cache/manager_test.go vendored
View File

@ -1345,7 +1345,7 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut,
if err != nil {
return nil, "", err
}
return cw, testCase.a.DefaultMediaType(), nil
return cw, testCase.a.MediaType(), nil
})
require.NoError(t, err)
contentBuffer := contentutil.NewBuffer()
@ -1386,9 +1386,9 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut,
// check if all compression variables are available on the both refs
checkCompression := func(desc ocispecs.Descriptor, compressionType compression.Type) {
require.Equal(t, compressionType.DefaultMediaType(), desc.MediaType, "compression: %v", compressionType)
require.Equal(t, compressionType.MediaType(), desc.MediaType, "compression: %v", compressionType)
if compressionType == compression.EStargz {
ok, err := isEStargz(ctx, co.cs, desc.Digest)
ok, err := compression.EStargz.Is(ctx, co.cs, desc.Digest)
require.NoError(t, err, "compression: %v", compressionType)
require.True(t, ok, "compression: %v", compressionType)
}
@ -1481,7 +1481,7 @@ func getCompressor(w io.Writer, compressionType compression.Type, customized boo
}
pr.Close()
}()
return &writeCloser{pw, func() error { <-done; return nil }}, nil
return &compression.WriteCloser{WriteCloser: pw, CloseFunc: func() error { <-done; return nil }}, nil
case compression.Zstd:
if customized {
skippableFrameMagic := []byte{0x50, 0x2a, 0x4d, 0x18}
@ -1767,7 +1767,7 @@ func TestGetRemotes(t *testing.T) {
r := refChain[i]
isLazy, err := r.isLazy(egctx)
require.NoError(t, err)
needs, err := needsConversion(ctx, co.cs, desc, compressionType)
needs, err := compressionType.NeedsConversion(ctx, co.cs, desc)
require.NoError(t, err)
if needs {
require.False(t, isLazy, "layer %q requires conversion so it must be unlazied", desc.Digest)
@ -1998,7 +1998,7 @@ func checkDescriptor(ctx context.Context, t *testing.T, cs content.Store, desc o
}
// Check annotation values are valid
c := new(counter)
c := new(compression.Counter)
ra, err := cs.ReaderAt(ctx, desc)
if err != nil && errdefs.IsNotFound(err) {
return // lazy layer
@ -2013,7 +2013,7 @@ func checkDescriptor(ctx context.Context, t *testing.T, cs content.Store, desc o
require.NoError(t, err)
require.Equal(t, diffID.Digest().String(), uncompressedDgst)
if compressionType == compression.EStargz {
require.Equal(t, c.size(), uncompressedSize)
require.Equal(t, c.Size(), uncompressedSize)
}
}

7
cache/refs.go vendored
View File

@ -767,12 +767,9 @@ func (sr *immutableRef) getBlobWithCompression(ctx context.Context, compressionT
}
func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (ocispecs.Descriptor, error) {
if compressionType == compression.UnknownCompression {
return ocispecs.Descriptor{}, fmt.Errorf("cannot get unknown compression type")
}
var target *ocispecs.Descriptor
if err := walkBlob(ctx, cs, desc, func(desc ocispecs.Descriptor) bool {
if needs, err := needsConversion(ctx, cs, desc, compressionType); err == nil && !needs {
if needs, err := compressionType.NeedsConversion(ctx, cs, desc); err == nil && !needs {
target = &desc
return false
}
@ -881,7 +878,7 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
if a == nil {
return nil
}
for _, k := range append(eStargzAnnotations, containerdUncompressed) {
for _, k := range append(compression.EStargzAnnotations, containerdUncompressed) {
v, ok := a[k]
if !ok {
continue

2
cache/remote.go vendored
View File

@ -213,7 +213,7 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
}
if refCfg.Compression.Force {
if needs, err := needsConversion(ctx, sr.cm.ContentStore, desc, refCfg.Compression.Type); err != nil {
if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil {
return nil, err
} else if needs {
// ensure the compression type.

View File

@ -102,11 +102,15 @@ func getContentStore(ctx context.Context, sm *session.Manager, g session.Group,
}
func attrsToCompression(attrs map[string]string) (*compression.Config, error) {
compressionType := compression.Default
var compressionType compression.Type
if v, ok := attrs[attrLayerCompression]; ok {
if c := compression.Parse(v); c != compression.UnknownCompression {
compressionType = c
c, err := compression.Parse(v)
if err != nil {
return nil, err
}
compressionType = c
} else {
compressionType = compression.Default
}
compressionConfig := compression.New(compressionType)
if v, ok := attrs[attrForceCompression]; ok {

View File

@ -131,11 +131,15 @@ func (dsl *withDistributionSourceLabel) SnapshotLabels(descs []ocispecs.Descript
}
func attrsToCompression(attrs map[string]string) (*compression.Config, error) {
compressionType := compression.Default
var compressionType compression.Type
if v, ok := attrs[attrLayerCompression]; ok {
if c := compression.Parse(v); c != compression.UnknownCompression {
compressionType = c
c, err := compression.Parse(v)
if err != nil {
return nil, err
}
compressionType = c
} else {
compressionType = compression.Default
}
compressionConfig := compression.New(compressionType)
if v, ok := attrs[attrForceCompression]; ok {

View File

@ -276,7 +276,7 @@ func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheR
// Any of blobs in the remote must meet the specified compression option.
match := false
for _, desc := range r.result.Descriptors {
m := compressionopts.Type.IsMediaType(desc.MediaType)
m := compression.IsMediaType(compressionopts.Type, desc.MediaType)
match = match || m
if compressionopts.Force && !m {
match = false

View File

@ -39,8 +39,6 @@ type ImageCommitOpts struct {
func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error) {
rest := make(map[string]string)
esgz := false
as, optb, err := ParseAnnotations(toBytesMap(opt))
if err != nil {
return nil, err
@ -58,19 +56,7 @@ func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error)
case keyImageName:
c.ImageName = v
case keyLayerCompression:
switch v {
case "gzip":
c.RefCfg.Compression.Type = compression.Gzip
case "estargz":
c.RefCfg.Compression.Type = compression.EStargz
esgz = true
case "zstd":
c.RefCfg.Compression.Type = compression.Zstd
case "uncompressed":
c.RefCfg.Compression.Type = compression.Uncompressed
default:
err = errors.Errorf("unsupported layer compression type: %v", v)
}
c.RefCfg.Compression.Type, err = compression.Parse(v)
case keyCompressionLevel:
ii, err2 := strconv.ParseInt(v, 10, 64)
if err != nil {
@ -98,8 +84,8 @@ func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error)
}
}
if esgz {
c.EnableOCITypes("estargz")
if c.RefCfg.Compression.Type.OnlySupportOCITypes() {
c.EnableOCITypes(c.RefCfg.Compression.Type.String())
}
c.AddAnnotations(as)

View File

@ -3,8 +3,11 @@ package compression
import (
"bytes"
"context"
"fmt"
"io"
"sync"
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/stargz-snapshotter/estargz"
@ -14,24 +17,41 @@ import (
"github.com/sirupsen/logrus"
)
// Type represents compression type for blob data.
type Type int
type Compressor func(dest io.Writer, mediaType string) (io.WriteCloser, error)
type Decompressor func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error)
type Finalizer func(context.Context, content.Store) (map[string]string, error)
const (
// Type represents compression type for blob data, which needs
// to be implemented for each compression type.
type Type interface {
Compress(comp Config) (compressorFunc Compressor, finalize Finalizer)
Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error)
NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error)
NeedsComputeDiffBySelf() bool
OnlySupportOCITypes() bool
MediaType() string
String() string
}
type (
uncompressedType struct{}
gzipType struct{}
estargzType struct{}
zstdType struct{}
)
var (
// Uncompressed indicates no compression.
Uncompressed Type = iota
Uncompressed = uncompressedType{}
// Gzip is used for blob data.
Gzip
Gzip = gzipType{}
// EStargz is used for estargz data.
EStargz
EStargz = estargzType{}
// Zstd is used for Zstandard data.
Zstd
// UnknownCompression means not supported yet.
UnknownCompression Type = -1
Zstd = zstdType{}
)
type Config struct {
@ -61,69 +81,41 @@ const (
mediaTypeImageLayerZstd = ocispecs.MediaTypeImageLayer + "+zstd" // unreleased image-spec#790
)
var Default = Gzip
var Default gzipType = Gzip
func Parse(t string) Type {
func Parse(t string) (Type, error) {
switch t {
case "uncompressed":
return Uncompressed
case "gzip":
return Gzip
case "estargz":
return EStargz
case "zstd":
return Zstd
case Uncompressed.String():
return Uncompressed, nil
case Gzip.String():
return Gzip, nil
case EStargz.String():
return EStargz, nil
case Zstd.String():
return Zstd, nil
default:
return UnknownCompression
return nil, fmt.Errorf("unsupported compression type %s", t)
}
}
func (ct Type) String() string {
switch ct {
case Uncompressed:
return "uncompressed"
case Gzip:
return "gzip"
case EStargz:
return "estargz"
case Zstd:
return "zstd"
default:
return "unknown"
}
}
func (ct Type) DefaultMediaType() string {
switch ct {
case Uncompressed:
return ocispecs.MediaTypeImageLayer
case Gzip, EStargz:
return ocispecs.MediaTypeImageLayerGzip
case Zstd:
return mediaTypeImageLayerZstd
default:
return ocispecs.MediaTypeImageLayer + "+unknown"
}
}
func (ct Type) IsMediaType(mt string) bool {
func IsMediaType(ct Type, mt string) bool {
mt, ok := toOCILayerType[mt]
if !ok {
return false
}
return mt == ct.DefaultMediaType()
return mt == ct.MediaType()
}
func FromMediaType(mediaType string) Type {
func FromMediaType(mediaType string) (Type, error) {
switch toOCILayerType[mediaType] {
case ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerNonDistributable:
return Uncompressed
return Uncompressed, nil
case ocispecs.MediaTypeImageLayerGzip, ocispecs.MediaTypeImageLayerNonDistributableGzip:
return Gzip
return Gzip, nil
case mediaTypeImageLayerZstd, ocispecs.MediaTypeImageLayerNonDistributableZstd:
return Zstd
return Zstd, nil
default:
return UnknownCompression
return nil, fmt.Errorf("unsupported media type %s", mediaType)
}
}
@ -170,7 +162,7 @@ func detectCompressionType(cr *io.SectionReader) (Type, error) {
// means just create an empty layer.
//
// See issue docker/docker#18170
return UnknownCompression, err
return nil, err
}
if _, _, err := estargz.OpenFooter(cr); err == nil {
@ -241,3 +233,72 @@ func ConvertAllLayerMediaTypes(oci bool, descs ...ocispecs.Descriptor) []ocispec
}
return converted
}
func decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (r io.ReadCloser, err error) {
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
esgz, err := EStargz.Is(ctx, cs, desc.Digest)
if err != nil {
return nil, err
} else if esgz {
r, err = decompressEStargz(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
} else {
r, err = cdcompression.DecompressStream(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
}
return &readCloser{r, ra.Close}, nil
}
type readCloser struct {
io.ReadCloser
closeFunc func() error
}
func (rc *readCloser) Close() error {
err1 := rc.ReadCloser.Close()
err2 := rc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}
type WriteCloser struct {
io.WriteCloser
CloseFunc func() error
}
func (wc *WriteCloser) Close() error {
err1 := wc.WriteCloser.Close()
err2 := wc.CloseFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}
type Counter struct {
n int64
mu sync.Mutex
}
func (c *Counter) Write(p []byte) (n int, err error) {
c.mu.Lock()
c.n += int64(len(p))
c.mu.Unlock()
return len(p), nil
}
func (c *Counter) Size() (n int64) {
c.mu.Lock()
n = c.n
c.mu.Unlock()
return
}

View File

@ -1,4 +1,4 @@
package cache
package compression
import (
"archive/tar"
@ -11,23 +11,28 @@ import (
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
var eStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation}
var EStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation}
// compressEStargz writes the passed blobs stream as an eStargz-compressed blob.
// finalize function finalizes the written blob metadata and returns all eStargz annotations.
func compressEStargz(comp compression.Config) (compressorFunc compressor, finalize func(context.Context, content.Store) (map[string]string, error)) {
const containerdUncompressed = "containerd.io/uncompressed"
const estargzLabel = "buildkit.io/compression/estargz"
func (c estargzType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
var cInfo *compressionInfo
var writeErr error
var mu sync.Mutex
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if compression.FromMediaType(requiredMediaType) != compression.Gzip {
ct, err := FromMediaType(requiredMediaType)
if err != nil {
return nil, err
}
if ct != Gzip {
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
}
done := make(chan struct{})
@ -76,7 +81,7 @@ func compressEStargz(comp compression.Config) (compressorFunc compressor, finali
pr.Close()
return nil
}()
return &writeCloser{pw, func() error {
return &WriteCloser{pw, func() error {
<-done // wait until the write completes
return nil
}}, nil
@ -113,11 +118,40 @@ func compressEStargz(comp compression.Config) (compressorFunc compressor, finali
}
}
const estargzLabel = "buildkit.io/compression/estargz"
func (c estargzType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) {
return decompress(ctx, cs, desc)
}
func (c estargzType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
esgz, err := c.Is(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if !images.IsLayerType(desc.MediaType) || esgz {
return false, nil
}
return true, nil
}
func (c estargzType) NeedsComputeDiffBySelf() bool {
return true
}
func (c estargzType) OnlySupportOCITypes() bool {
return true
}
func (c estargzType) MediaType() string {
return ocispecs.MediaTypeImageLayerGzip
}
func (c estargzType) String() string {
return "estargz"
}
// isEStargz returns true when the specified digest of content exists in
// the content store and it's eStargz.
func isEStargz(ctx context.Context, cs content.Store, dgst digest.Digest) (bool, error) {
func (c estargzType) Is(ctx context.Context, cs content.Store, dgst digest.Digest) (bool, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return false, nil
@ -178,39 +212,6 @@ func decompressEStargz(r *io.SectionReader) (io.ReadCloser, error) {
return estargz.Unpack(r, new(estargz.GzipDecompressor))
}
type writeCloser struct {
io.WriteCloser
closeFunc func() error
}
func (wc *writeCloser) Close() error {
err1 := wc.WriteCloser.Close()
err2 := wc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}
type counter struct {
n int64
mu sync.Mutex
}
func (c *counter) Write(p []byte) (n int, err error) {
c.mu.Lock()
c.n += int64(len(p))
c.mu.Unlock()
return len(p), nil
}
func (c *counter) size() (n int64) {
c.mu.Lock()
n = c.n
c.mu.Unlock()
return
}
type compressionInfo struct {
blobInfo
tocDigest digest.Digest
@ -227,7 +228,7 @@ func calculateBlobInfo() (io.WriteCloser, chan blobInfo) {
pr, pw := io.Pipe()
go func() {
defer pr.Close()
c := new(counter)
c := new(Counter)
dgstr := digest.Canonical.Digester()
diffID := digest.Canonical.Digester()
decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
@ -244,7 +245,7 @@ func calculateBlobInfo() (io.WriteCloser, chan blobInfo) {
pr.CloseWithError(err)
return
}
res <- blobInfo{dgstr.Digest(), diffID.Digest(), c.size()}
res <- blobInfo{dgstr.Digest(), diffID.Digest(), c.Size()}
}()
return pw, res
}

65
util/compression/gzip.go Normal file
View File

@ -0,0 +1,65 @@
package compression
import (
"compress/gzip"
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)
func (c gzipType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
return func(dest io.Writer, _ string) (io.WriteCloser, error) {
return gzipWriter(comp)(dest)
}, nil
}
func (c gzipType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) {
return decompress(ctx, cs, desc)
}
func (c gzipType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
esgz, err := EStargz.Is(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if !images.IsLayerType(desc.MediaType) {
return false, nil
}
ct, err := FromMediaType(desc.MediaType)
if err != nil {
return false, err
}
if ct == Gzip && !esgz {
return false, nil
}
return true, nil
}
func (c gzipType) NeedsComputeDiffBySelf() bool {
return false
}
func (c gzipType) OnlySupportOCITypes() bool {
return false
}
func (c gzipType) MediaType() string {
return ocispecs.MediaTypeImageLayerGzip
}
func (c gzipType) String() string {
return "gzip"
}
func gzipWriter(comp Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := gzip.DefaultCompression
if comp.Level != nil {
level = *comp.Level
}
return gzip.NewWriterLevel(dest, level)
}
}

View File

@ -0,0 +1,64 @@
package compression
import (
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/docker/docker/pkg/ioutils"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)
type nopWriteCloser struct {
io.Writer
}
func (w *nopWriteCloser) Close() error {
return nil
}
func (c uncompressedType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
return func(dest io.Writer, mediaType string) (io.WriteCloser, error) {
return &nopWriteCloser{dest}, nil
}, nil
}
func (c uncompressedType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) {
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
rdr := io.NewSectionReader(ra, 0, ra.Size())
return ioutils.NewReadCloserWrapper(rdr, ra.Close), nil
}
func (c uncompressedType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
if !images.IsLayerType(desc.MediaType) {
return false, nil
}
ct, err := FromMediaType(desc.MediaType)
if err != nil {
return false, err
}
if ct == Uncompressed {
return false, nil
}
return true, nil
}
func (c uncompressedType) NeedsComputeDiffBySelf() bool {
return false
}
func (c uncompressedType) OnlySupportOCITypes() bool {
return false
}
func (c uncompressedType) MediaType() string {
return ocispecs.MediaTypeImageLayer
}
func (c uncompressedType) String() string {
return "uncompressed"
}

76
util/compression/zstd.go Normal file
View File

@ -0,0 +1,76 @@
package compression
import (
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/klauspost/compress/zstd"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)
func (c zstdType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
return func(dest io.Writer, _ string) (io.WriteCloser, error) {
return zstdWriter(comp)(dest)
}, nil
}
func (c zstdType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) {
return decompress(ctx, cs, desc)
}
func (c zstdType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
if !images.IsLayerType(desc.MediaType) {
return false, nil
}
ct, err := FromMediaType(desc.MediaType)
if err != nil {
return false, err
}
if ct == Zstd {
return false, nil
}
return true, nil
}
func (c zstdType) NeedsComputeDiffBySelf() bool {
return true
}
func (c zstdType) OnlySupportOCITypes() bool {
return false
}
func (c zstdType) MediaType() string {
return mediaTypeImageLayerZstd
}
func (c zstdType) String() string {
return "zstd"
}
func zstdWriter(comp Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := zstd.SpeedDefault
if comp.Level != nil {
level = toZstdEncoderLevel(*comp.Level)
}
return zstd.NewWriter(dest, zstd.WithEncoderLevel(level))
}
}
func toZstdEncoderLevel(level int) zstd.EncoderLevel {
// map zstd compression levels to go-zstd levels
// once we also have c based implementation move this to helper pkg
if level < 0 {
return zstd.SpeedDefault
} else if level < 3 {
return zstd.SpeedFastest
} else if level < 7 {
return zstd.SpeedDefault
} else if level < 9 {
return zstd.SpeedBetterCompression
}
return zstd.SpeedBestCompression
}