mirror of
https://github.com/moby/buildkit.git
synced 2025-07-30 15:03:06 +03:00
cache: support nydus compression type
Nydus image is a container accelerated image format provided by the Dragonfly image-service project, which offers the ability to pull image data on demand, without waiting for the entire image pull to complete and then start the container. It has been put in production usage and shown vast improvements over the old OCI image format in terms of container launching speed, image space, and network bandwidth efficiency, as well as data integrity. Nydus image can be flexibly configured as a FUSE-based user-space filesystem or in-kernel EROFS (from Linux kernel v5.16) with Nydus daemon in user-space, integrating with VM-based container runtime like KataContainers is much easier. Nydus has provided a conversion tool Nydusify for converting OCIv1 image to Nydus image and integrated into Harbor Acceld as a conversion driver, which assumes that the OCI image is already available in the registry, but a better way would be to build the Nydus images directly from the build system instead of using the conversion tool, which would increase the speed of the image export, so we experimentally integrated the Nydus export in Buildkit. Unlike other compression formats (gzip, estargz, etc.) in OCI image, nydus is divided into two types of layer, blob, and bootstrap, where blob serves as the data part of each layer of the image, and bootstrap serves as the metadata of the whole image, the bootstrap is equivalent to the view of the whole image filesystem after all layers overlay. For example, for an OCI image with 3 layers, the corresponding nydus image is 4 layers (3 layers of blob + 1 layer of bootstrap). The nydus-snapshotter project provides a package to do the actual layer compression, this commit imports the package to implement the export of nydus compression type. Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
This commit is contained in:
2
cache/blobs.go
vendored
2
cache/blobs.go
vendored
@ -86,7 +86,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
|
||||
return nil, errors.WithStack(ErrNoBlobs)
|
||||
}
|
||||
|
||||
compressorFunc, finalize := comp.Type.Compress(comp)
|
||||
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
|
||||
mediaType := comp.Type.MediaType()
|
||||
|
||||
var lowerRef *immutableRef
|
||||
|
9
cache/blobs_linux.go
vendored
9
cache/blobs_linux.go
vendored
@ -58,11 +58,14 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
|
||||
if err != nil {
|
||||
return emptyDesc, false, errors.Wrap(err, "failed to get compressed stream")
|
||||
}
|
||||
err = overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower)
|
||||
compressed.Close()
|
||||
if err != nil {
|
||||
// Close ensure compressorFunc does some finalization works.
|
||||
defer compressed.Close()
|
||||
if err := overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower); err != nil {
|
||||
return emptyDesc, false, errors.Wrap(err, "failed to write compressed diff")
|
||||
}
|
||||
if err := compressed.Close(); err != nil {
|
||||
return emptyDesc, false, errors.Wrap(err, "failed to close compressed diff writer")
|
||||
}
|
||||
if labels == nil {
|
||||
labels = map[string]string{}
|
||||
}
|
||||
|
2
cache/converter.go
vendored
2
cache/converter.go
vendored
@ -36,7 +36,7 @@ func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descripto
|
||||
}
|
||||
|
||||
c := conversion{target: comp}
|
||||
c.compress, c.finalize = comp.Type.Compress(comp)
|
||||
c.compress, c.finalize = comp.Type.Compress(ctx, comp)
|
||||
c.decompress = from.Decompress
|
||||
|
||||
return (&c).convert, nil
|
||||
|
25
cache/not_nydus.go
vendored
Normal file
25
cache/not_nydus.go
vendored
Normal file
@ -0,0 +1,25 @@
|
||||
//go:build !nydus
|
||||
// +build !nydus
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/moby/buildkit/cache/config"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/util/compression"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
var validAnnotations = append(compression.EStargzAnnotations, containerdUncompressed)
|
||||
|
||||
func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
|
||||
return refCfg.Compression.Force
|
||||
}
|
||||
|
||||
func PatchLayers(ctx context.Context, ref ImmutableRef, refCfg config.RefConfig, remote *solver.Remote, sg session.Group) (*solver.Remote, error) {
|
||||
return remote, nil
|
||||
}
|
164
cache/nydus.go
vendored
Normal file
164
cache/nydus.go
vendored
Normal file
@ -0,0 +1,164 @@
|
||||
//go:build nydus
|
||||
// +build nydus
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/moby/buildkit/cache/config"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/util/compression"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
|
||||
)
|
||||
|
||||
var validAnnotations = append(
|
||||
compression.EStargzAnnotations, containerdUncompressed,
|
||||
nydusify.LayerAnnotationNydusBlob, nydusify.LayerAnnotationNydusBootstrap, nydusify.LayerAnnotationNydusBlobIDs,
|
||||
)
|
||||
|
||||
// mergeNydus does two steps:
|
||||
// 1. Extracts nydus bootstrap from nydus format (nydus blob + nydus bootstrap) for each layer.
|
||||
// 2. Merge all nydus bootstraps into a final bootstrap (will as an extra layer).
|
||||
// The nydus bootstrap size is very small, so the merge operation is fast.
|
||||
func mergeNydus(ctx context.Context, ref ImmutableRef, comp compression.Config, s session.Group) (*ocispecs.Descriptor, error) {
|
||||
iref, ok := ref.(*immutableRef)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unsupported ref")
|
||||
}
|
||||
refs := iref.layerChain()
|
||||
if len(refs) == 0 {
|
||||
return nil, fmt.Errorf("refs can't be empty")
|
||||
}
|
||||
|
||||
// Extracts nydus bootstrap from nydus format for each layer.
|
||||
var cm *cacheManager
|
||||
layers := []nydusify.Layer{}
|
||||
blobIDs := []string{}
|
||||
for _, ref := range refs {
|
||||
blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, comp, s)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "get compression blob %q", comp.Type)
|
||||
}
|
||||
ra, err := ref.cm.ContentStore.ReaderAt(ctx, blobDesc)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "get reader for compression blob %q", comp.Type)
|
||||
}
|
||||
defer ra.Close()
|
||||
if cm == nil {
|
||||
cm = ref.cm
|
||||
}
|
||||
blobIDs = append(blobIDs, blobDesc.Digest.Hex())
|
||||
layers = append(layers, nydusify.Layer{
|
||||
Digest: blobDesc.Digest,
|
||||
ReaderAt: ra,
|
||||
})
|
||||
}
|
||||
|
||||
// Merge all nydus bootstraps into a final nydus bootstrap.
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
if _, err := nydusify.Merge(ctx, layers, pw, nydusify.MergeOption{
|
||||
WithTar: true,
|
||||
}); err != nil {
|
||||
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
|
||||
}
|
||||
}()
|
||||
|
||||
// Compress final nydus bootstrap to tar.gz and write into content store.
|
||||
cw, err := content.OpenWriter(ctx, cm.ContentStore, content.WithRef("nydus-merge-"+iref.getChainID().String()))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "open content store writer")
|
||||
}
|
||||
defer cw.Close()
|
||||
|
||||
gw := gzip.NewWriter(cw)
|
||||
uncompressedDgst := digest.SHA256.Digester()
|
||||
compressed := io.MultiWriter(gw, uncompressedDgst.Hash())
|
||||
if _, err := io.Copy(compressed, pr); err != nil {
|
||||
return nil, errors.Wrapf(err, "copy bootstrap targz into content store")
|
||||
}
|
||||
if err := gw.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "close gzip writer")
|
||||
}
|
||||
|
||||
compressedDgst := cw.Digest()
|
||||
if err := cw.Commit(ctx, 0, compressedDgst, content.WithLabels(map[string]string{
|
||||
containerdUncompressed: uncompressedDgst.Digest().String(),
|
||||
})); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return nil, errors.Wrap(err, "commit to content store")
|
||||
}
|
||||
}
|
||||
if err := cw.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "close content store writer")
|
||||
}
|
||||
|
||||
info, err := cm.ContentStore.Info(ctx, compressedDgst)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get info from content store")
|
||||
}
|
||||
|
||||
blobIDsBytes, err := json.Marshal(blobIDs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "marshal blob ids")
|
||||
}
|
||||
|
||||
desc := ocispecs.Descriptor{
|
||||
Digest: compressedDgst,
|
||||
Size: info.Size,
|
||||
MediaType: ocispecs.MediaTypeImageLayerGzip,
|
||||
Annotations: map[string]string{
|
||||
containerdUncompressed: uncompressedDgst.Digest().String(),
|
||||
// Use this annotation to identify nydus bootstrap layer.
|
||||
nydusify.LayerAnnotationNydusBootstrap: "true",
|
||||
// Track all blob digests for nydus snapshotter.
|
||||
nydusify.LayerAnnotationNydusBlobIDs: string(blobIDsBytes),
|
||||
},
|
||||
}
|
||||
|
||||
return &desc, nil
|
||||
}
|
||||
|
||||
// Nydus compression type can't be mixed with other compression types in the same image,
|
||||
// so if `source` is this kind of layer, but the target is other compression type, we
|
||||
// should do the forced compression.
|
||||
func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
|
||||
if refCfg.Compression.Force {
|
||||
return true
|
||||
}
|
||||
isNydusBlob, _ := compression.Nydus.Is(ctx, cs, source)
|
||||
if refCfg.Compression.Type == compression.Nydus {
|
||||
return !isNydusBlob
|
||||
}
|
||||
return isNydusBlob
|
||||
}
|
||||
|
||||
// PatchLayers appends an extra nydus bootstrap layer
|
||||
// to the manifest of nydus image, this layer represents the
|
||||
// whole metadata of filesystem view for the entire image.
|
||||
func PatchLayers(ctx context.Context, ref ImmutableRef, refCfg config.RefConfig, remote *solver.Remote, sg session.Group) (*solver.Remote, error) {
|
||||
if refCfg.Compression.Type != compression.Nydus {
|
||||
return remote, nil
|
||||
}
|
||||
|
||||
desc, err := mergeNydus(ctx, ref, refCfg.Compression, sg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "merge nydus layer")
|
||||
}
|
||||
remote.Descriptors = append(remote.Descriptors, *desc)
|
||||
|
||||
return remote, nil
|
||||
}
|
2
cache/refs.go
vendored
2
cache/refs.go
vendored
@ -878,7 +878,7 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
for _, k := range append(compression.EStargzAnnotations, containerdUncompressed) {
|
||||
for _, k := range validAnnotations {
|
||||
v, ok := a[k]
|
||||
if !ok {
|
||||
continue
|
||||
|
2
cache/remote.go
vendored
2
cache/remote.go
vendored
@ -212,7 +212,7 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
|
||||
}
|
||||
}
|
||||
|
||||
if refCfg.Compression.Force {
|
||||
if needsForceCompression(ctx, sr.cm.ContentStore, desc, refCfg) {
|
||||
if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil {
|
||||
return nil, err
|
||||
} else if needs {
|
||||
|
@ -88,6 +88,10 @@ func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error)
|
||||
c.EnableOCITypes(c.RefCfg.Compression.Type.String())
|
||||
}
|
||||
|
||||
if c.RefCfg.Compression.Type.NeedsForceCompression() {
|
||||
c.EnableForceCompression(c.RefCfg.Compression.Type.String())
|
||||
}
|
||||
|
||||
c.AddAnnotations(as)
|
||||
|
||||
return rest, nil
|
||||
@ -120,6 +124,18 @@ func (c *ImageCommitOpts) EnableOCITypes(reason string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ImageCommitOpts) EnableForceCompression(reason string) {
|
||||
if !c.RefCfg.Compression.Force {
|
||||
message := "forcibly turning on force-compression mode"
|
||||
if reason != "" {
|
||||
message += " for " + reason
|
||||
}
|
||||
logrus.Warn(message)
|
||||
|
||||
c.RefCfg.Compression.Force = true
|
||||
}
|
||||
}
|
||||
|
||||
func parseBool(dest *bool, key string, value string) error {
|
||||
b, err := strconv.ParseBool(value)
|
||||
if err != nil {
|
||||
|
1
go.mod
1
go.mod
@ -108,6 +108,7 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/containerd/cgroups v1.0.3 // indirect
|
||||
github.com/containerd/fifo v1.0.0 // indirect
|
||||
github.com/containerd/nydus-snapshotter v0.3.1 // indirect
|
||||
github.com/containerd/ttrpc v1.1.0 // indirect
|
||||
github.com/containernetworking/cni v1.1.1 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -400,6 +400,8 @@ github.com/containerd/imgcrypt v1.1.4/go.mod h1:LorQnPtzL/T0IyCeftcsMEO7AqxUDbdO
|
||||
github.com/containerd/nri v0.0.0-20201007170849-eb1350a75164/go.mod h1:+2wGSDGFYfE5+So4M5syatU0N0f0LbWpuqyMi4/BE8c=
|
||||
github.com/containerd/nri v0.0.0-20210316161719-dbaa18c31c14/go.mod h1:lmxnXF6oMkbqs39FiCt1s0R2HSMhcLel9vNL3m4AaeY=
|
||||
github.com/containerd/nri v0.1.0/go.mod h1:lmxnXF6oMkbqs39FiCt1s0R2HSMhcLel9vNL3m4AaeY=
|
||||
github.com/containerd/nydus-snapshotter v0.3.1 h1:b8WahTrPkt3XsabjG2o/leN4fw3HWZYr+qxo/Z8Mfzk=
|
||||
github.com/containerd/nydus-snapshotter v0.3.1/go.mod h1:+8R7NX7vrjlxAgtidnsstwIhpzyTlriYPssTxH++uiM=
|
||||
github.com/containerd/stargz-snapshotter v0.0.0-20201027054423-3a04e4c2c116/go.mod h1:o59b3PCKVAf9jjiKtCc/9hLAd+5p/rfhBfm6aBcTEr4=
|
||||
github.com/containerd/stargz-snapshotter v0.12.0 h1:SRKo+YxmypnlyC7eKc9KNW0Ciy1Auo102s8E/aRGWKg=
|
||||
github.com/containerd/stargz-snapshotter v0.12.0/go.mod h1:LYKrfzJBNUs0hP9GA4YXTIudE6pjZ96W0X1Ls7yI7Gs=
|
||||
|
@ -24,11 +24,12 @@ type Finalizer func(context.Context, content.Store) (map[string]string, error)
|
||||
// Type represents compression type for blob data, which needs
|
||||
// to be implemented for each compression type.
|
||||
type Type interface {
|
||||
Compress(comp Config) (compressorFunc Compressor, finalize Finalizer)
|
||||
Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer)
|
||||
Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error)
|
||||
NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error)
|
||||
NeedsComputeDiffBySelf() bool
|
||||
OnlySupportOCITypes() bool
|
||||
NeedsForceCompression() bool
|
||||
MediaType() string
|
||||
String() string
|
||||
}
|
||||
@ -83,7 +84,7 @@ const (
|
||||
|
||||
var Default gzipType = Gzip
|
||||
|
||||
func Parse(t string) (Type, error) {
|
||||
func parse(t string) (Type, error) {
|
||||
switch t {
|
||||
case Uncompressed.String():
|
||||
return Uncompressed, nil
|
||||
@ -98,15 +99,7 @@ func Parse(t string) (Type, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func IsMediaType(ct Type, mt string) bool {
|
||||
mt, ok := toOCILayerType[mt]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return mt == ct.MediaType()
|
||||
}
|
||||
|
||||
func FromMediaType(mediaType string) (Type, error) {
|
||||
func fromMediaType(mediaType string) (Type, error) {
|
||||
switch toOCILayerType[mediaType] {
|
||||
case ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerNonDistributable:
|
||||
return Uncompressed, nil
|
||||
@ -119,6 +112,14 @@ func FromMediaType(mediaType string) (Type, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func IsMediaType(ct Type, mt string) bool {
|
||||
mt, ok := toOCILayerType[mt]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return mt == ct.MediaType()
|
||||
}
|
||||
|
||||
// DetectLayerMediaType returns media type from existing blob data.
|
||||
func DetectLayerMediaType(ctx context.Context, cs content.Store, id digest.Digest, oci bool) (string, error) {
|
||||
ra, err := cs.ReaderAt(ctx, ocispecs.Descriptor{Digest: id})
|
||||
|
@ -24,7 +24,7 @@ var EStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.Store
|
||||
const containerdUncompressed = "containerd.io/uncompressed"
|
||||
const estargzLabel = "buildkit.io/compression/estargz"
|
||||
|
||||
func (c estargzType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
func (c estargzType) Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
var cInfo *compressionInfo
|
||||
var writeErr error
|
||||
var mu sync.Mutex
|
||||
@ -142,6 +142,10 @@ func (c estargzType) OnlySupportOCITypes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c estargzType) NeedsForceCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c estargzType) MediaType() string {
|
||||
return ocispecs.MediaTypeImageLayerGzip
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func (c gzipType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
func (c gzipType) Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
return func(dest io.Writer, _ string) (io.WriteCloser, error) {
|
||||
return gzipWriter(comp)(dest)
|
||||
}, nil
|
||||
@ -46,6 +46,10 @@ func (c gzipType) OnlySupportOCITypes() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c gzipType) NeedsForceCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c gzipType) MediaType() string {
|
||||
return ocispecs.MediaTypeImageLayerGzip
|
||||
}
|
||||
|
12
util/compression/not_nydus.go
Normal file
12
util/compression/not_nydus.go
Normal file
@ -0,0 +1,12 @@
|
||||
//go:build !nydus
|
||||
// +build !nydus
|
||||
|
||||
package compression
|
||||
|
||||
func Parse(t string) (Type, error) {
|
||||
return parse(t)
|
||||
}
|
||||
|
||||
func FromMediaType(mediaType string) (Type, error) {
|
||||
return fromMediaType(mediaType)
|
||||
}
|
141
util/compression/nydus.go
Normal file
141
util/compression/nydus.go
Normal file
@ -0,0 +1,141 @@
|
||||
//go:build nydus
|
||||
// +build nydus
|
||||
|
||||
package compression
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
|
||||
)
|
||||
|
||||
type nydusType struct{}
|
||||
|
||||
var Nydus = nydusType{}
|
||||
|
||||
func init() {
|
||||
toDockerLayerType[nydusify.MediaTypeNydusBlob] = nydusify.MediaTypeNydusBlob
|
||||
toOCILayerType[nydusify.MediaTypeNydusBlob] = nydusify.MediaTypeNydusBlob
|
||||
}
|
||||
|
||||
func Parse(t string) (Type, error) {
|
||||
ct, err := parse(t)
|
||||
if err != nil && t == Nydus.String() {
|
||||
return Nydus, nil
|
||||
}
|
||||
return ct, err
|
||||
}
|
||||
|
||||
func FromMediaType(mediaType string) (Type, error) {
|
||||
ct, err := fromMediaType(mediaType)
|
||||
if err != nil && mediaType == nydusify.MediaTypeNydusBlob {
|
||||
return Nydus, nil
|
||||
}
|
||||
return ct, err
|
||||
}
|
||||
|
||||
func (c nydusType) Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
digester := digest.Canonical.Digester()
|
||||
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
|
||||
writer := io.MultiWriter(dest, digester.Hash())
|
||||
return nydusify.Pack(ctx, writer, nydusify.PackOption{})
|
||||
}, func(ctx context.Context, cs content.Store) (map[string]string, error) {
|
||||
// Fill necessary labels
|
||||
uncompressedDgst := digester.Digest().String()
|
||||
info, err := cs.Info(ctx, digester.Digest())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get info from content store")
|
||||
}
|
||||
if info.Labels == nil {
|
||||
info.Labels = make(map[string]string)
|
||||
}
|
||||
info.Labels[containerdUncompressed] = uncompressedDgst
|
||||
if _, err := cs.Update(ctx, info, "labels."+containerdUncompressed); err != nil {
|
||||
return nil, errors.Wrap(err, "update info to content store")
|
||||
}
|
||||
|
||||
// Fill annotations
|
||||
annotations := map[string]string{
|
||||
containerdUncompressed: uncompressedDgst,
|
||||
// Use this annotation to identify nydus blob layer.
|
||||
nydusify.LayerAnnotationNydusBlob: "true",
|
||||
}
|
||||
return annotations, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c nydusType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) {
|
||||
ra, err := cs.ReaderAt(ctx, desc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
if err := nydusify.Unpack(ctx, ra, pw, nydusify.UnpackOption{}); err != nil {
|
||||
pw.CloseWithError(errors.Wrap(err, "unpack nydus blob"))
|
||||
}
|
||||
}()
|
||||
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
func (c nydusType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
|
||||
if !images.IsLayerType(desc.MediaType) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if isNydusBlob, err := c.Is(ctx, cs, desc); err != nil {
|
||||
return true, nil
|
||||
} else if isNydusBlob {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (c nydusType) NeedsComputeDiffBySelf() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c nydusType) OnlySupportOCITypes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c nydusType) NeedsForceCompression() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c nydusType) MediaType() string {
|
||||
return nydusify.MediaTypeNydusBlob
|
||||
}
|
||||
|
||||
func (c nydusType) String() string {
|
||||
return "nydus"
|
||||
}
|
||||
|
||||
// Is returns true when the specified digest of content exists in
|
||||
// the content store and it's nydus format.
|
||||
func (c nydusType) Is(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) {
|
||||
if desc.Annotations == nil {
|
||||
return false, nil
|
||||
}
|
||||
hasMediaType := desc.MediaType == nydusify.MediaTypeNydusBlob
|
||||
_, hasAnno := desc.Annotations[nydusify.LayerAnnotationNydusBlob]
|
||||
|
||||
_, err := cs.Info(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return hasMediaType && hasAnno, nil
|
||||
}
|
@ -11,7 +11,7 @@ import (
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func (c uncompressedType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
func (c uncompressedType) Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
return func(dest io.Writer, mediaType string) (io.WriteCloser, error) {
|
||||
return &iohelper.NopWriteCloser{Writer: dest}, nil
|
||||
}, nil
|
||||
@ -48,6 +48,10 @@ func (c uncompressedType) OnlySupportOCITypes() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c uncompressedType) NeedsForceCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c uncompressedType) MediaType() string {
|
||||
return ocispecs.MediaTypeImageLayer
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func (c zstdType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
func (c zstdType) Compress(ctx context.Context, comp Config) (compressorFunc Compressor, finalize Finalizer) {
|
||||
return func(dest io.Writer, _ string) (io.WriteCloser, error) {
|
||||
return zstdWriter(comp)(dest)
|
||||
}, nil
|
||||
@ -42,6 +42,10 @@ func (c zstdType) OnlySupportOCITypes() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c zstdType) NeedsForceCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c zstdType) MediaType() string {
|
||||
return mediaTypeImageLayerZstd
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ type winApplier struct {
|
||||
|
||||
func (s *winApplier) Apply(ctx context.Context, desc ocispecs.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispecs.Descriptor, err error) {
|
||||
if !hasWindowsLayerMode(ctx) {
|
||||
return s.a.Apply(ctx, desc, mounts, opts...)
|
||||
return s.apply(ctx, desc, mounts, opts...)
|
||||
}
|
||||
|
||||
compressed, err := images.DiffCompression(ctx, desc.MediaType)
|
||||
|
16
util/winlayers/apply.go
Normal file
16
util/winlayers/apply.go
Normal file
@ -0,0 +1,16 @@
|
||||
//go:build !nydus
|
||||
// +build !nydus
|
||||
|
||||
package winlayers
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/mount"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func (s *winApplier) apply(ctx context.Context, desc ocispecs.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispecs.Descriptor, err error) {
|
||||
return s.a.Apply(ctx, desc, mounts, opts...)
|
||||
}
|
73
util/winlayers/apply_nydus.go
Normal file
73
util/winlayers/apply_nydus.go
Normal file
@ -0,0 +1,73 @@
|
||||
//go:build nydus
|
||||
// +build nydus
|
||||
|
||||
package winlayers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/archive"
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/mount"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
|
||||
)
|
||||
|
||||
func isNydusBlob(ctx context.Context, desc ocispecs.Descriptor) bool {
|
||||
if desc.Annotations == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
hasMediaType := desc.MediaType == nydusify.MediaTypeNydusBlob
|
||||
_, hasAnno := desc.Annotations[nydusify.LayerAnnotationNydusBlob]
|
||||
return hasMediaType && hasAnno
|
||||
}
|
||||
|
||||
func (s *winApplier) apply(ctx context.Context, desc ocispecs.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispecs.Descriptor, err error) {
|
||||
if !isNydusBlob(ctx, desc) {
|
||||
return s.a.Apply(ctx, desc, mounts, opts...)
|
||||
}
|
||||
|
||||
var ocidesc ocispecs.Descriptor
|
||||
if err := mount.WithTempMount(ctx, mounts, func(root string) error {
|
||||
ra, err := s.cs.ReaderAt(ctx, desc)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get reader from content store")
|
||||
}
|
||||
defer ra.Close()
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
if err := nydusify.Unpack(ctx, ra, pw, nydusify.UnpackOption{}); err != nil {
|
||||
pw.CloseWithError(errors.Wrap(err, "unpack nydus blob"))
|
||||
}
|
||||
}()
|
||||
defer pr.Close()
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
rc := &readCounter{
|
||||
r: io.TeeReader(pr, digester.Hash()),
|
||||
}
|
||||
|
||||
if _, err := archive.Apply(ctx, root, rc); err != nil {
|
||||
return errors.Wrap(err, "apply nydus blob")
|
||||
}
|
||||
|
||||
ocidesc = ocispecs.Descriptor{
|
||||
MediaType: ocispecs.MediaTypeImageLayer,
|
||||
Size: rc.c,
|
||||
Digest: digester.Digest(),
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return ocispecs.Descriptor{}, err
|
||||
}
|
||||
|
||||
return ocidesc, nil
|
||||
}
|
Reference in New Issue
Block a user