mirror of
https://github.com/containers/image.git
synced 2025-04-18 19:44:05 +03:00
Use options.Progress and ProgressInterval directly. Signed-off-by: Miloslav Trmač <mitr@redhat.com>
188 lines
8.5 KiB
Go
188 lines
8.5 KiB
Go
package copy
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
|
||
"github.com/containers/image/v5/internal/private"
|
||
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
|
||
"github.com/containers/image/v5/types"
|
||
"github.com/sirupsen/logrus"
|
||
)
|
||
|
||
// copyBlobFromStream copies a blob with srcInfo (with known Digest and Annotations and possibly known Size) from srcReader to dest,
|
||
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
|
||
// perhaps (de/re/)compressing it if canModifyBlob,
|
||
// and returns a complete blobInfo of the copied blob.
|
||
func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
|
||
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
|
||
isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
|
||
// The copying happens through a pipeline of connected io.Readers;
|
||
// that pipeline is built by updating stream.
|
||
// === Input: srcReader
|
||
stream := sourceStream{
|
||
reader: srcReader,
|
||
info: srcInfo,
|
||
}
|
||
|
||
// === Process input through digestingReader to validate against the expected digest.
|
||
// Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader,
|
||
// use a separate validation failure indicator.
|
||
// Note that for this check we don't use the stronger "validationSucceeded" indicator, because
|
||
// dest.PutBlob may detect that the layer already exists, in which case we don't
|
||
// read stream to the end, and validation does not happen.
|
||
digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest)
|
||
if err != nil {
|
||
return types.BlobInfo{}, fmt.Errorf("preparing to verify blob %s: %w", srcInfo.Digest, err)
|
||
}
|
||
stream.reader = digestingReader
|
||
|
||
// === Update progress bars
|
||
stream.reader = bar.ProxyReader(stream.reader)
|
||
|
||
// === Decrypt the stream, if required.
|
||
decryptionStep, err := ic.blobPipelineDecryptionStep(&stream, srcInfo)
|
||
if err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
|
||
// === Detect compression of the input stream.
|
||
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression.
|
||
detectedCompression, err := blobPipelineDetectCompressionStep(&stream, srcInfo)
|
||
if err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
|
||
// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
|
||
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
|
||
if getOriginalLayerCopyWriter != nil {
|
||
stream.reader = io.TeeReader(stream.reader, getOriginalLayerCopyWriter(detectedCompression.decompressor))
|
||
originalLayerReader = stream.reader
|
||
}
|
||
|
||
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
|
||
// short-circuit conditions
|
||
canModifyBlob := !isConfig && ic.cannotModifyManifestReason == ""
|
||
// === Deal with layer compression/decompression if necessary
|
||
compressionStep, err := ic.blobPipelineCompressionStep(&stream, canModifyBlob, srcInfo, detectedCompression)
|
||
if err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
defer compressionStep.close()
|
||
|
||
// === Encrypt the stream for valid mediatypes if ociEncryptConfig provided
|
||
if decryptionStep.decrypting && toEncrypt {
|
||
// If nothing else, we can only set uploadedInfo.CryptoOperation to a single value.
|
||
// Before relaxing this, see the original pull request’s review if there are other reasons to reject this.
|
||
return types.BlobInfo{}, errors.New("Unable to support both decryption and encryption in the same copy")
|
||
}
|
||
encryptionStep, err := ic.blobPipelineEncryptionStep(&stream, toEncrypt, srcInfo, decryptionStep)
|
||
if err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
|
||
// === Report progress using the ic.c.options.Progress channel, if required.
|
||
if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 {
|
||
progressReader := newProgressReader(
|
||
stream.reader,
|
||
ic.c.options.Progress,
|
||
ic.c.options.ProgressInterval,
|
||
srcInfo,
|
||
)
|
||
defer progressReader.reportDone()
|
||
stream.reader = progressReader
|
||
}
|
||
|
||
// === Finally, send the layer stream to dest.
|
||
options := private.PutBlobOptions{
|
||
Cache: ic.c.blobInfoCache,
|
||
IsConfig: isConfig,
|
||
EmptyLayer: emptyLayer,
|
||
}
|
||
if !isConfig {
|
||
options.LayerIndex = &layerIndex
|
||
}
|
||
destBlob, err := ic.c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options)
|
||
if err != nil {
|
||
return types.BlobInfo{}, fmt.Errorf("writing blob: %w", err)
|
||
}
|
||
uploadedInfo := updatedBlobInfoFromUpload(stream.info, destBlob)
|
||
|
||
compressionStep.updateCompressionEdits(&uploadedInfo.CompressionOperation, &uploadedInfo.CompressionAlgorithm, &uploadedInfo.Annotations)
|
||
decryptionStep.updateCryptoOperation(&uploadedInfo.CryptoOperation)
|
||
if err := encryptionStep.updateCryptoOperationAndAnnotations(&uploadedInfo.CryptoOperation, &uploadedInfo.Annotations); err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
|
||
// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume
|
||
// all of the input (to compute DiffIDs), even if dest.PutBlob does not need it.
|
||
// So, read everything from originalLayerReader, which will cause the rest to be
|
||
// sent there if we are not already at EOF.
|
||
if getOriginalLayerCopyWriter != nil {
|
||
logrus.Debugf("Consuming rest of the original blob to satisfy getOriginalLayerCopyWriter")
|
||
_, err := io.Copy(io.Discard, originalLayerReader)
|
||
if err != nil {
|
||
return types.BlobInfo{}, fmt.Errorf("reading input blob %s: %w", srcInfo.Digest, err)
|
||
}
|
||
}
|
||
|
||
if digestingReader.validationFailed { // Coverage: This should never happen.
|
||
return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, digest verification failed but was ignored", srcInfo.Digest)
|
||
}
|
||
if stream.info.Digest != "" && uploadedInfo.Digest != stream.info.Digest {
|
||
return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, stream.info.Digest, uploadedInfo.Digest)
|
||
}
|
||
if digestingReader.validationSucceeded {
|
||
if err := compressionStep.recordValidatedDigestData(ic.c, uploadedInfo, srcInfo, encryptionStep, decryptionStep); err != nil {
|
||
return types.BlobInfo{}, err
|
||
}
|
||
}
|
||
|
||
return uploadedInfo, nil
|
||
}
|
||
|
||
// sourceStream encapsulates an input consumed by copyBlobFromStream, in progress of being built.
|
||
// This allows handles of individual aspects to build the copy pipeline without _too much_
|
||
// specific cooperation by the caller.
|
||
//
|
||
// We are currently very far from a generalized plug-and-play API for building/consuming the pipeline
|
||
// without specific knowledge of various aspects in copyBlobFromStream; that may come one day.
|
||
type sourceStream struct {
|
||
reader io.Reader
|
||
info types.BlobInfo // corresponding to the data available in reader.
|
||
}
|
||
|
||
// errorAnnotationReader wraps the io.Reader passed to PutBlob for annotating the error happened during read.
|
||
// These errors are reported as PutBlob errors, so we would otherwise misleadingly attribute them to the copy destination.
|
||
type errorAnnotationReader struct {
|
||
reader io.Reader
|
||
}
|
||
|
||
// Read annotates the error happened during read
|
||
func (r errorAnnotationReader) Read(b []byte) (n int, err error) {
|
||
n, err = r.reader.Read(b)
|
||
if err != nil && err != io.EOF {
|
||
return n, fmt.Errorf("happened during read: %w", err)
|
||
}
|
||
return n, err
|
||
}
|
||
|
||
// updatedBlobInfoFromUpload returns inputInfo updated with uploadedBlob which was created based on inputInfo.
|
||
func updatedBlobInfoFromUpload(inputInfo types.BlobInfo, uploadedBlob private.UploadedBlob) types.BlobInfo {
|
||
// The transport is only tasked with dealing with the raw blob, and possibly computing Digest/Size.
|
||
// Handling of compression, encryption, and the related MIME types and the like are all the responsibility
|
||
// of the generic code in this package.
|
||
return types.BlobInfo{
|
||
Digest: uploadedBlob.Digest,
|
||
Size: uploadedBlob.Size,
|
||
URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior.
|
||
Annotations: inputInfo.Annotations,
|
||
MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression/Crypto.
|
||
CompressionOperation: inputInfo.CompressionOperation, // Expected to be unset, and only updated by copyBlobFromStream.
|
||
CompressionAlgorithm: inputInfo.CompressionAlgorithm, // Expected to be unset, and only updated by copyBlobFromStream.
|
||
CryptoOperation: inputInfo.CryptoOperation, // Expected to be unset, and only updated by copyBlobFromStream.
|
||
}
|
||
}
|