1
0
mirror of https://github.com/regclient/regclient.git synced 2025-04-18 22:44:00 +03:00
regclient/blob.go
Brandon Mitchell eea06e2a5c
Refactoring the type package
I feel like I need to explain, this is all to move the descriptor package.
The platform package could not use the predefined errors in types because of a circular dependency from descriptor.
The most appropriate way to reorg this is to move descriptor out of the type package since it was more complex than a self contained type.
When doing that, type aliases were needed to avoid breaking changes to existing users.
Those aliases themselves caused circular dependency loops because of the media types and errors, so those were also pulled out to separate packages.
All of the old values were aliased and deprecated, and to fix the linter, those deprecations were fixed by updating the imports... everywhere.

Signed-off-by: Brandon Mitchell <git@bmitch.net>
2024-03-04 15:43:18 -05:00

262 lines
8.4 KiB
Go

package regclient
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"github.com/sirupsen/logrus"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/scheme"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/descriptor"
"github.com/regclient/regclient/types/errs"
"github.com/regclient/regclient/types/ref"
)
const blobCBFreq = time.Millisecond * 100
type blobOpt struct {
callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)
}
// BlobOpts define options for the Image* commands.
type BlobOpts func(*blobOpt)
// BlobWithCallback provides progress data to a callback function.
func BlobWithCallback(callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)) BlobOpts {
return func(opts *blobOpt) {
opts.callback = callback
}
}
// BlobCopy copies a blob between two locations.
// If the blob already exists in the target, the copy is skipped.
// A server side cross repository blob mount is attempted.
func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d descriptor.Descriptor, opts ...BlobOpts) error {
if !refSrc.IsSetRepo() {
return fmt.Errorf("refSrc is not set: %s%.0w", refSrc.CommonName(), errs.ErrInvalidReference)
}
if !refTgt.IsSetRepo() {
return fmt.Errorf("refTgt is not set: %s%.0w", refTgt.CommonName(), errs.ErrInvalidReference)
}
var opt blobOpt
for _, optFn := range opts {
optFn(&opt)
}
tDesc := d
tDesc.URLs = []string{} // ignore URLs when pushing to target
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
}
// for the same repository, there's nothing to copy
if ref.EqualRepository(refSrc, refTgt) {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"src": refTgt.Reference,
"tgt": refTgt.Reference,
"digest": d.Digest,
}).Debug("Blob copy skipped, same repo")
return nil
}
// check if layer already exists
if _, err := rc.BlobHead(ctx, refTgt, tDesc); err == nil {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"tgt": refTgt.Reference,
"digest": d,
}).Debug("Blob copy skipped, already exists")
return nil
}
// acquire throttle for both src and tgt to avoid deadlocks
tList := []*throttle.Throttle{}
schemeSrcAPI, err := rc.schemeGet(refSrc.Scheme)
if err != nil {
return err
}
schemeTgtAPI, err := rc.schemeGet(refTgt.Scheme)
if err != nil {
return err
}
if tSrc, ok := schemeSrcAPI.(scheme.Throttler); ok {
tList = append(tList, tSrc.Throttle(refSrc, false)...)
}
if tTgt, ok := schemeTgtAPI.(scheme.Throttler); ok {
tList = append(tList, tTgt.Throttle(refTgt, true)...)
}
if len(tList) > 0 {
ctx, err = throttle.AcquireMulti(ctx, tList)
if err != nil {
return err
}
defer throttle.ReleaseMulti(ctx, tList)
}
// try mounting blob from the source repo is the registry is the same
if ref.EqualRegistry(refSrc, refTgt) {
err := rc.BlobMount(ctx, refSrc, refTgt, d)
if err == nil {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"src": refTgt.Reference,
"tgt": refTgt.Reference,
"digest": d.Digest,
}).Debug("Blob copy performed server side with registry mount")
return nil
}
rc.log.WithFields(logrus.Fields{
"err": err,
"src": refSrc.Reference,
"tgt": refTgt.Reference,
}).Warn("Failed to mount blob")
}
// fast options failed, download layer from source and push to target
blobIO, err := rc.BlobGet(ctx, refSrc, d)
if err != nil {
if !errors.Is(err, context.Canceled) {
rc.log.WithFields(logrus.Fields{
"err": err,
"src": refSrc.Reference,
"digest": d.Digest,
}).Warn("Failed to retrieve blob")
}
return err
}
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
ticker := time.NewTicker(blobCBFreq)
done := make(chan bool)
defer func() {
close(done)
ticker.Stop()
if ctx.Err() == nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackFinished, d.Size, d.Size)
}
}()
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
offset, err := blobIO.Seek(0, io.SeekCurrent)
if err == nil && offset > 0 {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackActive, offset, d.Size)
}
}
}
}()
}
defer blobIO.Close()
if _, err := rc.BlobPut(ctx, refTgt, blobIO.GetDescriptor(), blobIO); err != nil {
if !errors.Is(err, context.Canceled) {
rc.log.WithFields(logrus.Fields{
"err": err,
"src": refSrc.Reference,
"tgt": refTgt.Reference,
}).Warn("Failed to push blob")
}
return err
}
return nil
}
// BlobDelete removes a blob from the registry.
// This method should only be used to repair a damaged registry.
// Typically a server side garbage collection should be used to purge unused blobs.
func (rc *RegClient) BlobDelete(ctx context.Context, r ref.Ref, d descriptor.Descriptor) error {
if !r.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return err
}
return schemeAPI.BlobDelete(ctx, r, d)
}
// BlobGet retrieves a blob, returning a reader.
// This reader must be closed to free up resources that limit concurrent pulls.
func (rc *RegClient) BlobGet(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.Reader, error) {
data, err := d.GetData()
if err == nil {
return blob.NewReader(blob.WithDesc(d), blob.WithRef(r), blob.WithReader(bytes.NewReader(data))), nil
}
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return nil, err
}
return schemeAPI.BlobGet(ctx, r, d)
}
// BlobGetOCIConfig retrieves an OCI config from a blob, automatically extracting the JSON.
func (rc *RegClient) BlobGetOCIConfig(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.OCIConfig, error) {
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
b, err := rc.BlobGet(ctx, r, d)
if err != nil {
return nil, err
}
return b.ToOCIConfig()
}
// BlobHead is used to verify if a blob exists and is accessible.
func (rc *RegClient) BlobHead(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.Reader, error) {
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return nil, err
}
return schemeAPI.BlobHead(ctx, r, d)
}
// BlobMount attempts to perform a server side copy/mount of the blob between repositories.
func (rc *RegClient) BlobMount(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d descriptor.Descriptor) error {
if !refSrc.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", refSrc.CommonName(), errs.ErrInvalidReference)
}
if !refTgt.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", refTgt.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(refSrc.Scheme)
if err != nil {
return err
}
return schemeAPI.BlobMount(ctx, refSrc, refTgt, d)
}
// BlobPut uploads a blob to a repository.
// Descriptor is optional, leave size and digest to zero value if unknown.
// Reader must also be an [io.Seeker] to support chunked upload fallback.
//
// This will attempt an anonymous blob mount first which some registries may support.
// It will then try doing a full put of the blob without chunking (most widely supported).
// If the full put fails, it will fall back to a chunked upload (useful for flaky networks).
func (rc *RegClient) BlobPut(ctx context.Context, r ref.Ref, d descriptor.Descriptor, rdr io.Reader) (descriptor.Descriptor, error) {
if !r.IsSetRepo() {
return descriptor.Descriptor{}, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return descriptor.Descriptor{}, err
}
return schemeAPI.BlobPut(ctx, r, d, rdr)
}