mirror of
https://github.com/containers/image.git
synced 2025-04-18 19:44:05 +03:00
Return private.UploadedBlob from PutBlobWithOptions
This is a subset of types.BlobInfo the transports actually should deal with, to be much more explicit about what the transports are responsible for. Should not change behavior, in practice. Signed-off-by: Miloslav Trmač <mitr@redhat.com>
This commit is contained in:
parent
d12d1d1b59
commit
d9a7aa0631
22
copy/blob.go
22
copy/blob.go
@ -104,12 +104,11 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read
|
||||
if !isConfig {
|
||||
options.LayerIndex = &layerIndex
|
||||
}
|
||||
uploadedInfo, err := ic.c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options)
|
||||
destBlob, err := ic.c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, fmt.Errorf("writing blob: %w", err)
|
||||
}
|
||||
|
||||
uploadedInfo.Annotations = stream.info.Annotations
|
||||
uploadedInfo := updatedBlobInfoFromUpload(stream.info, destBlob)
|
||||
|
||||
compressionStep.updateCompressionEdits(&uploadedInfo.CompressionOperation, &uploadedInfo.CompressionAlgorithm, &uploadedInfo.Annotations)
|
||||
decryptionStep.updateCryptoOperation(&uploadedInfo.CryptoOperation)
|
||||
@ -169,3 +168,20 @@ func (r errorAnnotationReader) Read(b []byte) (n int, err error) {
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// updatedBlobInfoFromUpload returns inputInfo updated with uploadedBlob which was created based on inputInfo.
|
||||
func updatedBlobInfoFromUpload(inputInfo types.BlobInfo, uploadedBlob private.UploadedBlob) types.BlobInfo {
|
||||
// The transport is only tasked with dealing with the raw blob, and possibly computing Digest/Size.
|
||||
// Handling of compression, encryption, and the related MIME types and the like are all the responsibility
|
||||
// of the generic code in this package.
|
||||
return types.BlobInfo{
|
||||
Digest: uploadedBlob.Digest,
|
||||
Size: uploadedBlob.Size,
|
||||
URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior.
|
||||
Annotations: inputInfo.Annotations,
|
||||
MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression/Crypto.
|
||||
CompressionOperation: inputInfo.CompressionOperation, // Expected to be unset, and only updated by copyBlobFromStream.
|
||||
CompressionAlgorithm: inputInfo.CompressionAlgorithm, // Expected to be unset, and only updated by copyBlobFromStream.
|
||||
CryptoOperation: inputInfo.CryptoOperation, // Expected to be unset, and only updated by copyBlobFromStream.
|
||||
}
|
||||
}
|
||||
|
76
copy/blob_test.go
Normal file
76
copy/blob_test.go
Normal file
@ -0,0 +1,76 @@
|
||||
package copy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/containers/image/v5/internal/private"
|
||||
"github.com/containers/image/v5/pkg/compression"
|
||||
"github.com/containers/image/v5/types"
|
||||
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUpdatedBlobInfoFromUpload(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
srcInfo types.BlobInfo
|
||||
uploaded private.UploadedBlob
|
||||
expected types.BlobInfo
|
||||
}{
|
||||
{ // A straightforward upload with a known size
|
||||
srcInfo: types.BlobInfo{
|
||||
Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb",
|
||||
Size: 51354364,
|
||||
URLs: []string{"https://layer.url"},
|
||||
Annotations: map[string]string{"test-annotation-2": "two"},
|
||||
MediaType: imgspecv1.MediaTypeImageLayerGzip,
|
||||
CompressionOperation: types.Compress, // Might be set by blobCacheSource.LayerInfosForCopy
|
||||
CompressionAlgorithm: &compression.Gzip, // Set e.g. in copyLayer
|
||||
// CryptoOperation is not set by LayerInfos()
|
||||
},
|
||||
uploaded: private.UploadedBlob{
|
||||
Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb",
|
||||
Size: 51354364,
|
||||
},
|
||||
expected: types.BlobInfo{
|
||||
Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb",
|
||||
Size: 51354364,
|
||||
URLs: nil,
|
||||
Annotations: map[string]string{"test-annotation-2": "two"},
|
||||
MediaType: imgspecv1.MediaTypeImageLayerGzip,
|
||||
CompressionOperation: types.Compress, // Might be set by blobCacheSource.LayerInfosForCopy
|
||||
CompressionAlgorithm: &compression.Gzip, // Set e.g. in copyLayer
|
||||
// CryptoOperation is set to the zero value
|
||||
},
|
||||
},
|
||||
{ // Upload determining the digest/size
|
||||
srcInfo: types.BlobInfo{
|
||||
Digest: "",
|
||||
Size: -1,
|
||||
URLs: []string{"https://layer.url"},
|
||||
Annotations: map[string]string{"test-annotation-2": "two"},
|
||||
MediaType: imgspecv1.MediaTypeImageLayerGzip,
|
||||
CompressionOperation: types.Compress, // Might be set by blobCacheSource.LayerInfosForCopy
|
||||
CompressionAlgorithm: &compression.Gzip, // Set e.g. in copyLayer
|
||||
// CryptoOperation is not set by LayerInfos()
|
||||
},
|
||||
uploaded: private.UploadedBlob{
|
||||
Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
|
||||
Size: 513543640,
|
||||
},
|
||||
expected: types.BlobInfo{
|
||||
Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
|
||||
Size: 513543640,
|
||||
URLs: nil,
|
||||
Annotations: map[string]string{"test-annotation-2": "two"},
|
||||
MediaType: imgspecv1.MediaTypeImageLayerGzip,
|
||||
CompressionOperation: types.Compress, // Might be set by blobCacheSource.LayerInfosForCopy
|
||||
CompressionAlgorithm: &compression.Gzip, // Set e.g. in copyLayer
|
||||
// CryptoOperation is set to the zero value
|
||||
},
|
||||
},
|
||||
} {
|
||||
res := updatedBlobInfoFromUpload(c.srcInfo, c.uploaded)
|
||||
assert.Equal(t, c.expected, res, fmt.Sprintf("%#v", c.uploaded))
|
||||
}
|
||||
}
|
@ -132,11 +132,11 @@ func (d *dirImageDestination) Close() error {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *dirImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *dirImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
blobFile, err := os.CreateTemp(d.ref.path, "dir-put-blob")
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
succeeded := false
|
||||
explicitClosed := false
|
||||
@ -153,14 +153,14 @@ func (d *dirImageDestination) PutBlobWithOptions(ctx context.Context, stream io.
|
||||
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
|
||||
size, err := io.Copy(blobFile, stream)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
blobDigest := digester.Digest()
|
||||
if inputInfo.Size != -1 && size != inputInfo.Size {
|
||||
return types.BlobInfo{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
return private.UploadedBlob{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
}
|
||||
if err := blobFile.Sync(); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
|
||||
// On POSIX systems, blobFile was created with mode 0600, so we need to make it readable.
|
||||
@ -169,7 +169,7 @@ func (d *dirImageDestination) PutBlobWithOptions(ctx context.Context, stream io.
|
||||
// always fails on Windows.
|
||||
if runtime.GOOS != "windows" {
|
||||
if err := blobFile.Chmod(0644); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,10 +178,10 @@ func (d *dirImageDestination) PutBlobWithOptions(ctx context.Context, stream io.
|
||||
blobFile.Close()
|
||||
explicitClosed = true
|
||||
if err := os.Rename(blobFile.Name(), blobPath); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
succeeded = true
|
||||
return types.BlobInfo{Digest: blobDigest, Size: size}, nil
|
||||
return private.UploadedBlob{Digest: blobDigest, Size: size}, nil
|
||||
}
|
||||
|
||||
// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
|
||||
|
@ -132,8 +132,8 @@ func (c *sizeCounter) Write(p []byte) (n int, err error) {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
// If requested, precompute the blob digest to prevent uploading layers that already exist on the registry.
|
||||
// This functionality is particularly useful when BlobInfoCache has not been populated with compressed digests,
|
||||
// the source blob is uncompressed, and the destination blob is being compressed "on the fly".
|
||||
@ -141,7 +141,7 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
logrus.Debugf("Precomputing digest layer for %s", reference.Path(d.ref.ref))
|
||||
streamCopy, cleanup, err := streamdigest.ComputeBlobInfo(d.c.sys, stream, &inputInfo)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer cleanup()
|
||||
stream = streamCopy
|
||||
@ -152,10 +152,10 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
|
||||
haveBlob, reusedInfo, err := d.tryReusingExactBlob(ctx, inputInfo, options.Cache)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
if haveBlob {
|
||||
return types.BlobInfo{Digest: reusedInfo.Digest, Size: reusedInfo.Size}, nil
|
||||
return private.UploadedBlob{Digest: reusedInfo.Digest, Size: reusedInfo.Size}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,16 +164,16 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
logrus.Debugf("Uploading %s", uploadPath)
|
||||
res, err := d.c.makeRequest(ctx, http.MethodPost, uploadPath, nil, nil, v2Auth, nil)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusAccepted {
|
||||
logrus.Debugf("Error initiating layer upload, response %#v", *res)
|
||||
return types.BlobInfo{}, fmt.Errorf("initiating layer upload to %s in %s: %w", uploadPath, d.c.registry, registryHTTPResponseToError(res))
|
||||
return private.UploadedBlob{}, fmt.Errorf("initiating layer upload to %s in %s: %w", uploadPath, d.c.registry, registryHTTPResponseToError(res))
|
||||
}
|
||||
uploadLocation, err := res.Location()
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, fmt.Errorf("determining upload URL: %w", err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("determining upload URL: %w", err)
|
||||
}
|
||||
|
||||
digester, stream := putblobdigest.DigestIfCanonicalUnknown(stream, inputInfo)
|
||||
@ -201,7 +201,7 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
return uploadLocation, nil
|
||||
}()
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
blobDigest := digester.Digest()
|
||||
|
||||
@ -212,17 +212,17 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
uploadLocation.RawQuery = locationQuery.Encode()
|
||||
res, err = d.c.makeRequestToResolvedURL(ctx, http.MethodPut, uploadLocation, map[string][]string{"Content-Type": {"application/octet-stream"}}, nil, -1, v2Auth, nil)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusCreated {
|
||||
logrus.Debugf("Error uploading layer, response %#v", *res)
|
||||
return types.BlobInfo{}, fmt.Errorf("uploading layer to %s: %w", uploadLocation, registryHTTPResponseToError(res))
|
||||
return private.UploadedBlob{}, fmt.Errorf("uploading layer to %s: %w", uploadLocation, registryHTTPResponseToError(res))
|
||||
}
|
||||
|
||||
logrus.Debugf("Upload of layer %s complete", blobDigest)
|
||||
options.Cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), blobDigest, newBICLocationReference(d.ref))
|
||||
return types.BlobInfo{Digest: blobDigest, Size: sizeCounter.size}, nil
|
||||
return private.UploadedBlob{Digest: blobDigest, Size: sizeCounter.size}, nil
|
||||
}
|
||||
|
||||
// blobExists returns true iff repo contains a blob with digest, and if so, also its size.
|
||||
|
@ -76,15 +76,15 @@ func (d *Destination) AddRepoTags(tags []reference.NamedTagged) {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *Destination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *Destination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
// Ouch, we need to stream the blob into a temporary file just to determine the size.
|
||||
// When the layer is decompressed, we also have to generate the digest on uncompressed data.
|
||||
if inputInfo.Size == -1 || inputInfo.Digest == "" {
|
||||
logrus.Debugf("docker tarfile: input with unknown size, streaming to disk first ...")
|
||||
streamCopy, cleanup, err := streamdigest.ComputeBlobInfo(d.sysCtx, stream, &inputInfo)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer cleanup()
|
||||
stream = streamCopy
|
||||
@ -92,35 +92,35 @@ func (d *Destination) PutBlobWithOptions(ctx context.Context, stream io.Reader,
|
||||
}
|
||||
|
||||
if err := d.archive.lock(); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer d.archive.unlock()
|
||||
|
||||
// Maybe the blob has been already sent
|
||||
ok, reusedInfo, err := d.archive.tryReusingBlobLocked(inputInfo)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
if ok {
|
||||
return types.BlobInfo{Digest: reusedInfo.Digest, Size: reusedInfo.Size}, nil
|
||||
return private.UploadedBlob{Digest: reusedInfo.Digest, Size: reusedInfo.Size}, nil
|
||||
}
|
||||
|
||||
if options.IsConfig {
|
||||
buf, err := iolimits.ReadAtMost(stream, iolimits.MaxConfigBodySize)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, fmt.Errorf("reading Config file stream: %w", err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("reading Config file stream: %w", err)
|
||||
}
|
||||
d.config = buf
|
||||
if err := d.archive.sendFileLocked(d.archive.configPath(inputInfo.Digest), inputInfo.Size, bytes.NewReader(buf)); err != nil {
|
||||
return types.BlobInfo{}, fmt.Errorf("writing Config file: %w", err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("writing Config file: %w", err)
|
||||
}
|
||||
} else {
|
||||
if err := d.archive.sendFileLocked(d.archive.physicalLayerPath(inputInfo.Digest), inputInfo.Size, stream); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
}
|
||||
d.archive.recordBlobLocked(types.BlobInfo{Digest: inputInfo.Digest, Size: inputInfo.Size})
|
||||
return types.BlobInfo{Digest: inputInfo.Digest, Size: inputInfo.Size}, nil
|
||||
return private.UploadedBlob{Digest: inputInfo.Digest, Size: inputInfo.Size}, nil
|
||||
}
|
||||
|
||||
// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
|
||||
|
@ -43,10 +43,17 @@ func AddCompat(dest private.ImageDestinationInternalOnly) Compat {
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (c *Compat) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
|
||||
return c.dest.PutBlobWithOptions(ctx, stream, inputInfo, private.PutBlobOptions{
|
||||
res, err := c.dest.PutBlobWithOptions(ctx, stream, inputInfo, private.PutBlobOptions{
|
||||
Cache: blobinfocache.FromBlobInfoCache(cache),
|
||||
IsConfig: isConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
}
|
||||
return types.BlobInfo{
|
||||
Digest: res.Digest,
|
||||
Size: res.Size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
|
||||
|
@ -46,9 +46,16 @@ func FromPublic(dest types.ImageDestination) private.ImageDestination {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (w *wrapped) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
return w.PutBlob(ctx, stream, inputInfo, options.Cache, options.IsConfig)
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (w *wrapped) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
res, err := w.PutBlob(ctx, stream, inputInfo, options.Cache, options.IsConfig)
|
||||
if err != nil {
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
return private.UploadedBlob{
|
||||
Digest: res.Digest,
|
||||
Size: res.Size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
|
||||
|
@ -47,8 +47,8 @@ type ImageDestinationInternalOnly interface {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options PutBlobOptions) (types.BlobInfo, error)
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options PutBlobOptions) (UploadedBlob, error)
|
||||
|
||||
// PutBlobPartial attempts to create a blob using the data that is already present
|
||||
// at the destination. chunkAccessor is accessed in a non-sequential way to retrieve the missing chunks.
|
||||
@ -78,6 +78,13 @@ type ImageDestination interface {
|
||||
ImageDestinationInternalOnly
|
||||
}
|
||||
|
||||
// UploadedBlob is information about a blob written to a destination.
|
||||
// It is the subset of types.BlobInfo fields the transport is responsible for setting; all fields must be provided.
|
||||
type UploadedBlob struct {
|
||||
Digest digest.Digest
|
||||
Size int64
|
||||
}
|
||||
|
||||
// PutBlobOptions are used in PutBlobWithOptions.
|
||||
type PutBlobOptions struct {
|
||||
Cache blobinfocache.BlobInfoCache2 // Cache to optionally update with the uploaded bloblook up blob infos.
|
||||
|
@ -109,8 +109,8 @@ func (d *ociArchiveImageDestination) SupportsPutBlobPartial() bool {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *ociArchiveImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *ociArchiveImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
return d.unpackedDest.PutBlobWithOptions(ctx, stream, inputInfo, options)
|
||||
}
|
||||
|
||||
|
@ -107,11 +107,11 @@ func (d *ociImageDestination) Close() error {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *ociImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *ociImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
blobFile, err := os.CreateTemp(d.ref.dir, "oci-put-blob")
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
succeeded := false
|
||||
explicitClosed := false
|
||||
@ -128,14 +128,14 @@ func (d *ociImageDestination) PutBlobWithOptions(ctx context.Context, stream io.
|
||||
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
|
||||
size, err := io.Copy(blobFile, stream)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
blobDigest := digester.Digest()
|
||||
if inputInfo.Size != -1 && size != inputInfo.Size {
|
||||
return types.BlobInfo{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
return private.UploadedBlob{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
}
|
||||
if err := blobFile.Sync(); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
|
||||
// On POSIX systems, blobFile was created with mode 0600, so we need to make it readable.
|
||||
@ -144,26 +144,26 @@ func (d *ociImageDestination) PutBlobWithOptions(ctx context.Context, stream io.
|
||||
// always fails on Windows.
|
||||
if runtime.GOOS != "windows" {
|
||||
if err := blobFile.Chmod(0644); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
}
|
||||
|
||||
blobPath, err := d.ref.blobPath(blobDigest, d.sharedBlobDir)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
if err := ensureParentDirectoryExists(blobPath); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
|
||||
// need to explicitly close the file, since a rename won't otherwise not work on Windows
|
||||
blobFile.Close()
|
||||
explicitClosed = true
|
||||
if err := os.Rename(blobFile.Name(), blobPath); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
succeeded = true
|
||||
return types.BlobInfo{Digest: blobDigest, Size: size}, nil
|
||||
return private.UploadedBlob{Digest: blobDigest, Size: size}, nil
|
||||
}
|
||||
|
||||
// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
|
||||
|
@ -116,8 +116,8 @@ func (d *openshiftImageDestination) SupportsPutBlobPartial() bool {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *openshiftImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *openshiftImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
return d.docker.PutBlobWithOptions(ctx, stream, inputInfo, options)
|
||||
}
|
||||
|
||||
|
@ -135,16 +135,16 @@ func (d *ostreeImageDestination) Close() error {
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *ostreeImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
func (d *ostreeImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
tmpDir, err := os.MkdirTemp(d.tmpDirPath, "blob")
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
|
||||
blobPath := filepath.Join(tmpDir, "content")
|
||||
blobFile, err := os.Create(blobPath)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
defer blobFile.Close()
|
||||
|
||||
@ -152,19 +152,19 @@ func (d *ostreeImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
|
||||
size, err := io.Copy(blobFile, stream)
|
||||
if err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
blobDigest := digester.Digest()
|
||||
if inputInfo.Size != -1 && size != inputInfo.Size {
|
||||
return types.BlobInfo{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
return private.UploadedBlob{}, fmt.Errorf("Size mismatch when copying %s, expected %d, got %d", blobDigest, inputInfo.Size, size)
|
||||
}
|
||||
if err := blobFile.Sync(); err != nil {
|
||||
return types.BlobInfo{}, err
|
||||
return private.UploadedBlob{}, err
|
||||
}
|
||||
|
||||
hash := blobDigest.Hex()
|
||||
d.blobs[hash] = &blobToImport{Size: size, Digest: blobDigest, BlobPath: blobPath}
|
||||
return types.BlobInfo{Digest: blobDigest, Size: size}, nil
|
||||
return private.UploadedBlob{Digest: blobDigest, Size: size}, nil
|
||||
}
|
||||
|
||||
func fixFiles(selinuxHnd *C.struct_selabel_handle, root string, dir string, usermode bool) error {
|
||||
|
@ -134,8 +134,8 @@ func (d *blobCacheDestination) HasThreadSafePutBlob() bool {
|
||||
// inputInfo.MediaType describes the blob format, if known.
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *blobCacheDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlobWithOptions MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (d *blobCacheDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
var tempfile *os.File
|
||||
var err error
|
||||
var n int
|
||||
|
@ -164,7 +164,7 @@ func (s *storageImageDestination) computeNextBlobCacheFile() string {
|
||||
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
|
||||
// to any other readers for download using the supplied digest.
|
||||
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
|
||||
func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
info, err := s.putBlobToPendingFile(stream, blobinfo, &options)
|
||||
if err != nil {
|
||||
return info, err
|
||||
@ -182,16 +182,12 @@ func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream
|
||||
|
||||
// putBlobToPendingFile implements ImageDestination.PutBlobWithOptions, storing stream into an on-disk file.
|
||||
// The caller must arrange the blob to be eventually committed using s.commitLayer().
|
||||
func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinfo types.BlobInfo, options *private.PutBlobOptions) (types.BlobInfo, error) {
|
||||
func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinfo types.BlobInfo, options *private.PutBlobOptions) (private.UploadedBlob, error) {
|
||||
// Stores a layer or data blob in our temporary directory, checking that any information
|
||||
// in the blobinfo matches the incoming data.
|
||||
errorBlobInfo := types.BlobInfo{
|
||||
Digest: "",
|
||||
Size: -1,
|
||||
}
|
||||
if blobinfo.Digest != "" {
|
||||
if err := blobinfo.Digest.Validate(); err != nil {
|
||||
return errorBlobInfo, fmt.Errorf("invalid digest %#v: %w", blobinfo.Digest.String(), err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("invalid digest %#v: %w", blobinfo.Digest.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,7 +195,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
|
||||
filename := s.computeNextBlobCacheFile()
|
||||
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return errorBlobInfo, fmt.Errorf("creating temporary file %q: %w", filename, err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("creating temporary file %q: %w", filename, err)
|
||||
}
|
||||
defer file.Close()
|
||||
counter := ioutils.NewWriteCounter(file)
|
||||
@ -207,7 +203,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
|
||||
digester, stream := putblobdigest.DigestIfUnknown(stream, blobinfo)
|
||||
decompressed, err := archive.DecompressStream(stream)
|
||||
if err != nil {
|
||||
return errorBlobInfo, fmt.Errorf("setting up to decompress blob: %w", err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("setting up to decompress blob: %w", err)
|
||||
}
|
||||
|
||||
diffID := digest.Canonical.Digester()
|
||||
@ -216,7 +212,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
|
||||
_, err = io.Copy(diffID.Hash(), decompressed)
|
||||
decompressed.Close()
|
||||
if err != nil {
|
||||
return errorBlobInfo, fmt.Errorf("storing blob to file %q: %w", filename, err)
|
||||
return private.UploadedBlob{}, fmt.Errorf("storing blob to file %q: %w", filename, err)
|
||||
}
|
||||
|
||||
// Determine blob properties, and fail if information that we were given about the blob
|
||||
@ -226,7 +222,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
|
||||
if blobSize < 0 {
|
||||
blobSize = counter.Count
|
||||
} else if blobinfo.Size != counter.Count {
|
||||
return errorBlobInfo, ErrBlobSizeMismatch
|
||||
return private.UploadedBlob{}, ErrBlobSizeMismatch
|
||||
}
|
||||
|
||||
// Record information about the blob.
|
||||
@ -238,10 +234,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf
|
||||
// This is safe because we have just computed diffID, and blobDigest was either computed
|
||||
// by us, or validated by the caller (usually copy.digestingReader).
|
||||
options.Cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
|
||||
return types.BlobInfo{
|
||||
Digest: blobDigest,
|
||||
Size: blobSize,
|
||||
MediaType: blobinfo.MediaType,
|
||||
return private.UploadedBlob{
|
||||
Digest: blobDigest,
|
||||
Size: blobSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user