1
0
mirror of https://github.com/containers/image.git synced 2025-04-18 19:44:05 +03:00

Merge pull request #1980 from giuseppe/use-computed-digest

copy: do not fail if digest mismatches
This commit is contained in:
Daniel J Walsh 2023-12-06 10:45:45 -05:00 committed by GitHub
commit 28a299f3d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 257 additions and 93 deletions

View File

@ -20,6 +20,7 @@ import (
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/transports"
"github.com/containers/image/v5/types"
chunkedToc "github.com/containers/storage/pkg/chunked/toc"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
@ -694,6 +695,13 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
requiredCompression = ic.compressionFormat
originalCompression = srcInfo.CompressionAlgorithm
}
// Check if we have a chunked layer in storage that's based on that blob. These layers are stored by their TOC digest.
tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations)
if err != nil {
return types.BlobInfo{}, "", err
}
reused, reusedBlob, err := ic.c.dest.TryReusingBlobWithOptions(ctx, srcInfo, private.TryReusingBlobOptions{
Cache: ic.c.blobInfoCache,
CanSubstitute: canSubstitute,
@ -702,6 +710,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
SrcRef: srcRef,
RequiredCompression: requiredCompression,
OriginalCompression: originalCompression,
TOCDigest: tocDigest,
})
if err != nil {
return types.BlobInfo{}, "", fmt.Errorf("trying to reuse blob %s at destination: %w", srcInfo.Digest, err)

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01
github.com/containers/ocicrypt v1.1.9
github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078
github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5
github.com/cyberphone/json-canonicalization v0.0.0-20231011164504-785e29786b46
github.com/distribution/reference v0.5.0
github.com/docker/cli v24.0.7+incompatible

4
go.sum
View File

@ -51,8 +51,8 @@ github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01 h1:Qzk5C6cYgle
github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01/go.mod h1:9rfv8iPl1ZP7aqh9YA68wnZv2NUDbXdcdPHVz0pFbPY=
github.com/containers/ocicrypt v1.1.9 h1:2Csfba4jse85Raxk5HIyEk8OwZNjRvfkhEGijOjIdEM=
github.com/containers/ocicrypt v1.1.9/go.mod h1:dTKx1918d8TDkxXvarscpNVY+lyPakPNFN4jwA9GBys=
github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078 h1:b9vDfBHAbKKWotUoInxAP/rUf6KmqGRZBND6lSzUcbo=
github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078/go.mod h1:FHXkEBvKRmsTeB1JQIFfXnSyXCp+wVrt172O2ZlSzM4=
github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5 h1:eiCkAt+i9BYRjR7KEKPI3iORCSABhY+spM/w8BkI2lo=
github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5/go.mod h1:pMhG1O3eMGlQKpuEuv7ves+K3BsK8/UJs8ctV5fEaoI=
github.com/coreos/go-oidc/v3 v3.7.0 h1:FTdj0uexT4diYIPlF4yoFVI5MRO1r5+SEcIpEw9vC0o=
github.com/coreos/go-oidc/v3 v3.7.0/go.mod h1:yQzSCqBnK3e6Fs5l+f5i0F8Kwf0zpH9bPEsbY00KanM=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=

View File

@ -117,6 +117,7 @@ type TryReusingBlobOptions struct {
EmptyLayer bool // True if the blob is an "empty"/"throwaway" layer, and may not necessarily be physically represented.
LayerIndex *int // If the blob is a layer, a zero-based index of the layer within the image; nil otherwise.
SrcRef reference.Named // A reference to the source image that contains the input blob.
TOCDigest *digest.Digest // If specified, the blob can be looked up in the destination also by its TOC digest.
}
// ReusedBlob is information about a blob reused in a destination.

View File

@ -9,6 +9,7 @@ import (
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
ociencspec "github.com/containers/ocicrypt/spec"
chunkedToc "github.com/containers/storage/pkg/chunked/toc"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
@ -235,7 +236,7 @@ func (m *OCI1) Inspect(configGetter func(types.BlobInfo) ([]byte, error)) (*type
}
// ImageID computes an ID which can uniquely identify this image by its contents.
func (m *OCI1) ImageID([]digest.Digest) (string, error) {
func (m *OCI1) ImageID(diffIDs []digest.Digest) (string, error) {
// The way m.Config.Digest “uniquely identifies” an image is
// by containing RootFS.DiffIDs, which identify the layers of the image.
// For non-image artifacts, the we cant expect the config to change
@ -259,9 +260,44 @@ func (m *OCI1) ImageID([]digest.Digest) (string, error) {
if err := m.Config.Digest.Validate(); err != nil {
return "", err
}
// If there is any layer that is using partial content, we calculate the image ID
// in a different way since the diffID cannot be validated as for regular pulled images.
for _, layer := range m.Layers {
toc, err := chunkedToc.GetTOCDigest(layer.Annotations)
if err != nil {
return "", fmt.Errorf("error looking up annotation for layer %q: %w", layer.Digest, err)
}
if toc != nil {
return m.calculateImageIDForPartialImage(diffIDs)
}
}
return m.Config.Digest.Hex(), nil
}
func (m *OCI1) calculateImageIDForPartialImage(diffIDs []digest.Digest) (string, error) {
newID := digest.Canonical.Digester()
for i, layer := range m.Layers {
diffID := diffIDs[i]
_, err := newID.Hash().Write([]byte(diffID.Hex()))
if err != nil {
return "", fmt.Errorf("error writing diffID %q: %w", diffID, err)
}
toc, err := chunkedToc.GetTOCDigest(layer.Annotations)
if err != nil {
return "", fmt.Errorf("error looking up annotation for layer %q: %w", layer.Digest, err)
}
if toc != nil {
_, err = newID.Hash().Write([]byte(toc.Hex()))
if err != nil {
return "", fmt.Errorf("error writing TOC %q: %w", toc, err)
}
}
}
return newID.Digest().Hex(), nil
}
// CanChangeLayerCompression returns true if we can compress/decompress layers with mimeType in the current image
// (and the code can handle that).
// NOTE: Even if this returns true, the relevant format might not accept all compression algorithms; the set of accepted

View File

@ -77,13 +77,13 @@ type storageImageDestination struct {
indexToStorageID map[int]*string
// All accesses to below data are protected by `lock` which is made
// *explicit* in the code.
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output
uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output
}
// addedLayerInfo records data about a layer to use in this image.
@ -117,18 +117,18 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
HasThreadSafePutBlob: true,
}),
imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
blobDiffIDs: make(map[digest.Digest]digest.Digest),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
@ -227,7 +227,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
// Record information about the blob.
s.lock.Lock()
s.blobDiffIDs[blobDigest] = diffID.Digest()
s.uncompressedOrTocDigest[blobDigest] = diffID.Digest()
s.fileSizes[blobDigest] = counter.Count
s.filenames[blobDigest] = filename
s.lock.Unlock()
@ -289,7 +289,7 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
blobDigest := srcInfo.Digest
s.lock.Lock()
s.blobDiffIDs[blobDigest] = blobDigest
s.uncompressedOrTocDigest[blobDigest] = blobDigest
s.fileSizes[blobDigest] = 0
s.filenames[blobDigest] = ""
s.diffOutputs[blobDigest] = out
@ -321,7 +321,7 @@ func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context,
})
}
// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (digest, size or -1), filling s.blobDiffIDs and other metadata.
// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (digest, size or -1), filling s.uncompressedOrTocDigest and other metadata.
// The caller must arrange the blob to be eventually committed using s.commitLayer().
func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, size int64, options *private.TryReusingBlobOptions) (bool, private.ReusedBlob, error) {
// lock the entire method as it executes fairly quickly
@ -335,7 +335,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[digest] = aLayer.UncompressedDigest()
s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[digest] = aLayer
return true, private.ReusedBlob{
Digest: digest,
@ -366,7 +366,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Save this for completeness.
s.blobDiffIDs[digest] = layers[0].UncompressedDigest
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].UncompressedSize,
@ -380,7 +380,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[digest] = layers[0].UncompressedDigest
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].CompressedSize,
@ -398,7 +398,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
if size != -1 {
s.blobDiffIDs[digest] = layers[0].UncompressedDigest
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: size,
@ -407,7 +407,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
if !options.CanSubstitute {
return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest)
}
s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest
s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: uncompressedDigest,
Size: layers[0].UncompressedSize,
@ -416,6 +416,25 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
}
tocDigest := digest
if options.TOCDigest != nil {
tocDigest = *options.TOCDigest
}
// Check if we have a chunked layer in storage with the same TOC digest.
layers, err = s.imageRef.transport.store.LayersByTOCDigest(tocDigest)
if err != nil && !errors.Is(err, storage.ErrLayerUnknown) {
return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, tocDigest, err)
}
if len(layers) > 0 {
// Save this for completeness.
s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest
return true, private.ReusedBlob{
Digest: layers[0].TOCDigest,
Size: layers[0].UncompressedSize,
}, nil
}
// Nope, we don't have it.
return false, private.ReusedBlob{}, nil
}
@ -438,16 +457,20 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string {
continue
}
blobSum := m.FSLayers[i].BlobSum
diffID, ok := s.blobDiffIDs[blobSum]
diffID, ok := s.uncompressedOrTocDigest[blobSum]
if !ok {
logrus.Infof("error looking up diffID for layer %q", blobSum.String())
return ""
}
diffIDs = append([]digest.Digest{diffID}, diffIDs...)
}
case *manifest.Schema2, *manifest.OCI1:
// We know the ID calculation for these formats doesn't actually use the diffIDs,
// so we don't need to populate the diffID list.
case *manifest.Schema2:
// We know the ID calculation doesn't actually use the diffIDs, so we don't need to populate
// the diffID list.
case *manifest.OCI1:
for _, l := range m.Layers {
diffIDs = append(diffIDs, l.Digest)
}
default:
return ""
}
@ -518,7 +541,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
}
s.lock.Unlock()
// Note: commitLayer locks on-demand.
if err := s.commitLayer(index, info, -1); err != nil {
if stopQueue, err := s.commitLayer(index, info, -1); stopQueue || err != nil {
return err
}
s.lock.Lock()
@ -532,18 +555,32 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
return nil
}
// getDiffIDOrTOCDigest returns the diffID for the specified digest or the digest for the TOC, if known.
func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest.Digest) (digest.Digest, bool) {
s.lock.Lock()
defer s.lock.Unlock()
if d, found := s.diffOutputs[uncompressedDigest]; found {
return d.TOCDigest, found
}
d, found := s.uncompressedOrTocDigest[uncompressedDigest]
return d, found
}
// commitLayer commits the specified layer with the given index to the storage.
// size can usually be -1; it can be provided if the layer is not known to be already present in blobDiffIDs.
// size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest.
//
// If the layer cannot be committed yet, the function returns (true, nil).
//
// Note that the previous layer is expected to already be committed.
//
// Caution: this function must be called without holding `s.lock`. Callers
// must guarantee that, at any given time, at most one goroutine may execute
// `commitLayer()`.
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) error {
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) (bool, error) {
// Already committed? Return early.
if _, alreadyCommitted := s.indexToStorageID[index]; alreadyCommitted {
return nil
return false, nil
}
// Start with an empty string or the previous layer ID. Note that
@ -557,68 +594,96 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Carry over the previous ID for empty non-base layers.
if info.emptyLayer {
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}
// Check if there's already a layer with the ID that we'd give to the result of applying
// this layer blob to its parent, if it has one, or the blob's hex value otherwise.
s.lock.Lock()
diffID, haveDiffID := s.blobDiffIDs[info.digest]
s.lock.Unlock()
if !haveDiffID {
// The diffIDOrTOCDigest refers either to the DiffID or the digest of the TOC.
diffIDOrTOCDigest, haveDiffIDOrTOCDigest := s.getDiffIDOrTOCDigest(info.digest)
if !haveDiffIDOrTOCDigest {
// Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(),
// or to even check if we had it.
// Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller
// that relies on using a blob digest that has never been seen by the store had better call
// TryReusingBlob; not calling PutBlob already violates the documented API, so theres only
// so far we are going to accommodate that (if we should be doing that at all).
logrus.Debugf("looking for diffID for blob %+v", info.digest)
logrus.Debugf("looking for diffID or TOC digest for blob %+v", info.digest)
// Use tryReusingBlobAsPending, not the top-level TryReusingBlobWithOptions, to prevent recursion via queueOrCommit.
has, _, err := s.tryReusingBlobAsPending(info.digest, size, &private.TryReusingBlobOptions{
Cache: none.NoCache,
CanSubstitute: false,
})
if err != nil {
return fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
}
if !has {
return fmt.Errorf("error determining uncompressed digest for blob %q", info.digest.String())
return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String())
}
diffID, haveDiffID = s.blobDiffIDs[info.digest]
if !haveDiffID {
return fmt.Errorf("we have blob %q, but don't know its uncompressed digest", info.digest.String())
diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest)
if !haveDiffIDOrTOCDigest {
return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String())
}
}
id := diffID.Hex()
id := diffIDOrTOCDigest.Hex()
if lastLayer != "" {
id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex()
id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffIDOrTOCDigest.Hex())).Hex()
}
if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil {
// There's already a layer that should have the right contents, just reuse it.
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}
s.lock.Lock()
diffOutput, ok := s.diffOutputs[info.digest]
s.lock.Unlock()
if ok {
layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
if err != nil {
return err
if s.manifest == nil {
logrus.Debugf("Skipping commit for TOC=%q, manifest not yet available", id)
return true, nil
}
// FIXME: what to do with the uncompressed digest?
diffOutput.UncompressedDigest = info.digest
man, err := manifest.FromBlob(s.manifest, manifest.GuessMIMEType(s.manifest))
if err != nil {
return false, fmt.Errorf("parsing manifest: %w", err)
}
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil {
cb, err := s.getConfigBlob(man.ConfigInfo())
if err != nil {
return false, err
}
// retrieve the expected uncompressed digest from the config blob.
configOCI := &imgspecv1.Image{}
if err := json.Unmarshal(cb, configOCI); err != nil {
return false, err
}
if index >= len(configOCI.RootFS.DiffIDs) {
return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index)
}
layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
if err != nil {
return false, err
}
// let the storage layer know what was the original uncompressed layer.
flags := make(map[string]interface{})
flags[expectedLayerDiffIDFlag] = configOCI.RootFS.DiffIDs[index]
logrus.Debugf("Setting uncompressed digest to %q for layer %q", configOCI.RootFS.DiffIDs[index], id)
options := &graphdriver.ApplyDiffWithDifferOpts{
Flags: flags,
}
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, options); err != nil {
_ = s.imageRef.transport.store.Delete(layer.ID)
return err
return false, err
}
s.indexToStorageID[index] = &layer.ID
return nil
return false, nil
}
s.lock.Lock()
@ -627,11 +692,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
if ok {
layer, err := al.PutAs(id, lastLayer, nil)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return fmt.Errorf("failed to put layer from digest and labels: %w", err)
return false, fmt.Errorf("failed to put layer from digest and labels: %w", err)
}
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}
// Check if we previously cached a file with that blob's contents. If we didn't,
@ -642,7 +707,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
if !ok {
// Try to find the layer with contents matching that blobsum.
layer := ""
layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffID)
layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffIDOrTOCDigest)
if err2 == nil && len(layers) > 0 {
layer = layers[0].ID
} else {
@ -652,7 +717,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
}
if layer == "" {
return fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
return false, fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
}
// Read the layer's contents.
noCompression := archive.Uncompressed
@ -661,17 +726,17 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions)
if err2 != nil {
return fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
return false, fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
}
// Copy the layer diff to a file. Diff() takes a lock that it holds
// until the ReadCloser that it returns is closed, and PutLayer() wants
// the same lock, so the diff can't just be directly streamed from one
// to the other.
filename = s.computeNextBlobCacheFile()
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0o600)
if err != nil {
diff.Close()
return fmt.Errorf("creating temporary file %q: %w", filename, err)
return false, fmt.Errorf("creating temporary file %q: %w", filename, err)
}
// Copy the data to the file.
// TODO: This can take quite some time, and should ideally be cancellable using
@ -680,7 +745,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
diff.Close()
file.Close()
if err != nil {
return fmt.Errorf("storing blob to file %q: %w", filename, err)
return false, fmt.Errorf("storing blob to file %q: %w", filename, err)
}
// Make sure that we can find this file later, should we need the layer's
// contents again.
@ -691,21 +756,21 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Read the cached blob and use it as a diff.
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("opening file %q: %w", filename, err)
return false, fmt.Errorf("opening file %q: %w", filename, err)
}
defer file.Close()
// Build the new layer using the diff, regardless of where it came from.
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, &storage.LayerOptions{
OriginalDigest: info.digest,
UncompressedDigest: diffID,
UncompressedDigest: diffIDOrTOCDigest,
}, file)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
}
s.indexToStorageID[index] = &layer.ID
return nil
return false, nil
}
// Commit marks the process of storing the image as successful and asks for the image to be persisted.
@ -752,11 +817,13 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
// Extract, commit, or find the layers.
for i, blob := range layerBlobs {
if err := s.commitLayer(i, addedLayerInfo{
if stopQueue, err := s.commitLayer(i, addedLayerInfo{
digest: blob.Digest,
emptyLayer: blob.EmptyLayer,
}, blob.Size); err != nil {
return err
} else if stopQueue {
return fmt.Errorf("Internal error: storageImageDestination.Commit(): commitLayer() not ready to commit for layer %q", blob.Digest)
}
}
var lastLayer string

View File

@ -29,21 +29,33 @@ import (
"github.com/sirupsen/logrus"
)
// getBlobMutexProtected is a struct to hold the state of the getBlobMutex mutex.
type getBlobMutexProtected struct {
// digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string
// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}
type storageImageSource struct {
impl.Compat
impl.PropertyMethodsInitialize
stubs.NoGetBlobAtInitialize
imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
layerPosition map[digest.Digest]int // Where we are in reading a blob's layers
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutexProtected getBlobMutexProtected
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
}
const expectedLayerDiffIDFlag = "expected-layer-diffid"
// newImageSource sets up an image for reading.
func newImageSource(sys *types.SystemContext, imageRef storageReference) (*storageImageSource, error) {
// First, locate the image.
@ -62,9 +74,12 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora
imageRef: imageRef,
systemContext: sys,
image: img,
layerPosition: make(map[digest.Digest]int),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
getBlobMutexProtected: getBlobMutexProtected{
digestToLayerID: make(map[digest.Digest]string),
layerPosition: make(map[digest.Digest]int),
},
}
image.Compat = impl.AddCompat(image)
if img.Metadata != "" {
@ -91,6 +106,7 @@ func (s *storageImageSource) Close() error {
func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (rc io.ReadCloser, n int64, err error) {
// We need a valid digest value.
digest := info.Digest
err = digest.Validate()
if err != nil {
return nil, 0, err
@ -100,10 +116,24 @@ func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, c
return io.NopCloser(bytes.NewReader(image.GzippedEmptyLayer)), int64(len(image.GzippedEmptyLayer)), nil
}
// Check if the blob corresponds to a diff that was used to initialize any layers. Our
// callers should try to retrieve layers using their uncompressed digests, so no need to
// check if they're using one of the compressed digests, which we can't reproduce anyway.
layers, _ := s.imageRef.transport.store.LayersByUncompressedDigest(digest)
var layers []storage.Layer
// If the digest was overriden by LayerInfosForCopy, then we need to use the TOC digest
// to retrieve it from the storage.
s.getBlobMutex.Lock()
layerID, found := s.getBlobMutexProtected.digestToLayerID[digest]
s.getBlobMutex.Unlock()
if found {
if layer, err := s.imageRef.transport.store.Layer(layerID); err == nil {
layers = []storage.Layer{*layer}
}
} else {
// Check if the blob corresponds to a diff that was used to initialize any layers. Our
// callers should try to retrieve layers using their uncompressed digests, so no need to
// check if they're using one of the compressed digests, which we can't reproduce anyway.
layers, _ = s.imageRef.transport.store.LayersByUncompressedDigest(digest)
}
// If it's not a layer, then it must be a data item.
if len(layers) == 0 {
@ -174,8 +204,8 @@ func (s *storageImageSource) getBlobAndLayerID(digest digest.Digest, layers []st
// which claim to have the same contents, that we actually do have multiple layers, otherwise we could
// just go ahead and use the first one every time.
s.getBlobMutex.Lock()
i := s.layerPosition[digest]
s.layerPosition[digest] = i + 1
i := s.getBlobMutexProtected.layerPosition[digest]
s.getBlobMutexProtected.layerPosition[digest] = i + 1
s.getBlobMutex.Unlock()
if len(layers) > 0 {
layer = layers[i%len(layers)]
@ -267,14 +297,35 @@ func (s *storageImageSource) LayerInfosForCopy(ctx context.Context, instanceDige
if err != nil {
return nil, fmt.Errorf("reading layer %q in image %q: %w", layerID, s.image.ID, err)
}
if layer.UncompressedDigest == "" {
return nil, fmt.Errorf("uncompressed digest for layer %q is unknown", layerID)
if layer.UncompressedDigest == "" && layer.TOCDigest == "" {
return nil, fmt.Errorf("uncompressed digest and TOC digest for layer %q is unknown", layerID)
}
if layer.UncompressedSize < 0 {
return nil, fmt.Errorf("uncompressed size for layer %q is unknown", layerID)
}
blobDigest := layer.UncompressedDigest
if layer.TOCDigest != "" {
if layer.Flags == nil || layer.Flags[expectedLayerDiffIDFlag] == nil {
return nil, fmt.Errorf("TOC digest %q for layer %q is present but %q flag is not set", layer.TOCDigest, layerID, expectedLayerDiffIDFlag)
}
if expectedDigest, ok := layer.Flags[expectedLayerDiffIDFlag].(string); ok {
// if the layer is stored by its TOC, report the expected diffID as the layer Digest
// but store the TOC digest so we can later retrieve it from the storage.
blobDigest, err = digest.Parse(expectedDigest)
if err != nil {
return nil, fmt.Errorf("parsing expected diffID %q for layer %q: %w", expectedDigest, layerID, err)
}
} else {
return nil, fmt.Errorf("TOC digest %q for layer %q is present but %q flag is not a string", layer.TOCDigest, layerID, expectedLayerDiffIDFlag)
}
}
s.getBlobMutex.Lock()
s.getBlobMutexProtected.digestToLayerID[blobDigest] = layer.ID
s.getBlobMutex.Unlock()
blobInfo := types.BlobInfo{
Digest: layer.UncompressedDigest,
Digest: blobDigest,
Size: layer.UncompressedSize,
MediaType: uncompressedLayerType,
}
@ -384,7 +435,7 @@ func (s *storageImageSource) getSize() (int64, error) {
if err != nil {
return -1, err
}
if layer.UncompressedDigest == "" || layer.UncompressedSize < 0 {
if (layer.TOCDigest == "" && layer.UncompressedDigest == "") || layer.UncompressedSize < 0 {
return -1, fmt.Errorf("size for layer %q is unknown, failing getSize()", layerID)
}
sum += layer.UncompressedSize