1
0
mirror of https://github.com/regclient/regclient.git synced 2025-04-17 11:37:11 +03:00
regclient/blob.go
Brandon Mitchell 1eb1ea4b34
Feat: Refactor logging to use log/slog
This updates the regclient Go library.
Existing users of logrus will continue to work using a logrus handler to slog.
Updates to the various commands will be made in a future commit.

Signed-off-by: Brandon Mitchell <git@bmitch.net>
2024-11-10 17:14:57 -05:00

265 lines
8.8 KiB
Go

package regclient
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"time"
"github.com/regclient/regclient/internal/pqueue"
"github.com/regclient/regclient/internal/reqmeta"
"github.com/regclient/regclient/scheme"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/descriptor"
"github.com/regclient/regclient/types/errs"
"github.com/regclient/regclient/types/ref"
"github.com/regclient/regclient/types/warning"
)
const blobCBFreq = time.Millisecond * 100
type blobOpt struct {
callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)
}
// BlobOpts define options for the Image* commands.
type BlobOpts func(*blobOpt)
// BlobWithCallback provides progress data to a callback function.
func BlobWithCallback(callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)) BlobOpts {
return func(opts *blobOpt) {
opts.callback = callback
}
}
// BlobCopy copies a blob between two locations.
// If the blob already exists in the target, the copy is skipped.
// A server side cross repository blob mount is attempted.
func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d descriptor.Descriptor, opts ...BlobOpts) error {
if !refSrc.IsSetRepo() {
return fmt.Errorf("refSrc is not set: %s%.0w", refSrc.CommonName(), errs.ErrInvalidReference)
}
if !refTgt.IsSetRepo() {
return fmt.Errorf("refTgt is not set: %s%.0w", refTgt.CommonName(), errs.ErrInvalidReference)
}
var opt blobOpt
for _, optFn := range opts {
optFn(&opt)
}
// dedup warnings
if w := warning.FromContext(ctx); w == nil {
ctx = warning.NewContext(ctx, &warning.Warning{Hook: warning.DefaultHook()})
}
tDesc := d
tDesc.URLs = []string{} // ignore URLs when pushing to target
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
}
// for the same repository, there's nothing to copy
if ref.EqualRepository(refSrc, refTgt) {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.slog.Debug("Blob copy skipped, same repo",
slog.String("src", refSrc.Reference),
slog.String("tgt", refTgt.Reference),
slog.String("digest", string(d.Digest)))
return nil
}
// check if layer already exists
if _, err := rc.BlobHead(ctx, refTgt, tDesc); err == nil {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.slog.Debug("Blob copy skipped, already exists",
slog.String("src", refSrc.Reference),
slog.String("tgt", refTgt.Reference),
slog.String("digest", string(d.Digest)))
return nil
}
// acquire throttle for both src and tgt to avoid deadlocks
tList := []*pqueue.Queue[reqmeta.Data]{}
schemeSrcAPI, err := rc.schemeGet(refSrc.Scheme)
if err != nil {
return err
}
schemeTgtAPI, err := rc.schemeGet(refTgt.Scheme)
if err != nil {
return err
}
if tSrc, ok := schemeSrcAPI.(scheme.Throttler); ok {
tList = append(tList, tSrc.Throttle(refSrc, false)...)
}
if tTgt, ok := schemeTgtAPI.(scheme.Throttler); ok {
tList = append(tList, tTgt.Throttle(refTgt, true)...)
}
if len(tList) > 0 {
ctxMulti, done, err := pqueue.AcquireMulti[reqmeta.Data](ctx, reqmeta.Data{Kind: reqmeta.Blob, Size: d.Size}, tList...)
if err != nil {
return err
}
if done != nil {
defer done()
}
ctx = ctxMulti
}
// try mounting blob from the source repo is the registry is the same
if ref.EqualRegistry(refSrc, refTgt) {
err := rc.BlobMount(ctx, refSrc, refTgt, d)
if err == nil {
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.slog.Debug("Blob copy performed server side with registry mount",
slog.String("src", refSrc.Reference),
slog.String("tgt", refTgt.Reference),
slog.String("digest", string(d.Digest)))
return nil
}
rc.slog.Warn("Failed to mount blob",
slog.String("src", refSrc.Reference),
slog.String("tgt", refTgt.Reference),
slog.String("err", err.Error()))
}
// fast options failed, download layer from source and push to target
blobIO, err := rc.BlobGet(ctx, refSrc, d)
if err != nil {
if !errors.Is(err, context.Canceled) {
rc.slog.Warn("Failed to retrieve blob",
slog.String("src", refSrc.Reference),
slog.String("digest", string(d.Digest)),
slog.String("err", err.Error()))
}
return err
}
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
ticker := time.NewTicker(blobCBFreq)
done := make(chan bool)
defer func() {
close(done)
ticker.Stop()
if ctx.Err() == nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackFinished, d.Size, d.Size)
}
}()
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
offset, err := blobIO.Seek(0, io.SeekCurrent)
if err == nil && offset > 0 {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackActive, offset, d.Size)
}
}
}
}()
}
defer blobIO.Close()
if _, err := rc.BlobPut(ctx, refTgt, blobIO.GetDescriptor(), blobIO); err != nil {
if !errors.Is(err, context.Canceled) {
rc.slog.Warn("Failed to push blob",
slog.String("src", refSrc.Reference),
slog.String("tgt", refTgt.Reference),
slog.String("err", err.Error()))
}
return err
}
return nil
}
// BlobDelete removes a blob from the registry.
// This method should only be used to repair a damaged registry.
// Typically a server side garbage collection should be used to purge unused blobs.
func (rc *RegClient) BlobDelete(ctx context.Context, r ref.Ref, d descriptor.Descriptor) error {
if !r.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return err
}
return schemeAPI.BlobDelete(ctx, r, d)
}
// BlobGet retrieves a blob, returning a reader.
// This reader must be closed to free up resources that limit concurrent pulls.
func (rc *RegClient) BlobGet(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.Reader, error) {
data, err := d.GetData()
if err == nil {
return blob.NewReader(blob.WithDesc(d), blob.WithRef(r), blob.WithReader(bytes.NewReader(data))), nil
}
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return nil, err
}
return schemeAPI.BlobGet(ctx, r, d)
}
// BlobGetOCIConfig retrieves an OCI config from a blob, automatically extracting the JSON.
func (rc *RegClient) BlobGetOCIConfig(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.OCIConfig, error) {
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
b, err := rc.BlobGet(ctx, r, d)
if err != nil {
return nil, err
}
return b.ToOCIConfig()
}
// BlobHead is used to verify if a blob exists and is accessible.
func (rc *RegClient) BlobHead(ctx context.Context, r ref.Ref, d descriptor.Descriptor) (blob.Reader, error) {
if !r.IsSetRepo() {
return nil, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return nil, err
}
return schemeAPI.BlobHead(ctx, r, d)
}
// BlobMount attempts to perform a server side copy/mount of the blob between repositories.
func (rc *RegClient) BlobMount(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d descriptor.Descriptor) error {
if !refSrc.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", refSrc.CommonName(), errs.ErrInvalidReference)
}
if !refTgt.IsSetRepo() {
return fmt.Errorf("ref is not set: %s%.0w", refTgt.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(refSrc.Scheme)
if err != nil {
return err
}
return schemeAPI.BlobMount(ctx, refSrc, refTgt, d)
}
// BlobPut uploads a blob to a repository.
// Descriptor is optional, leave size and digest to zero value if unknown.
// Reader must also be an [io.Seeker] to support chunked upload fallback.
//
// This will attempt an anonymous blob mount first which some registries may support.
// It will then try doing a full put of the blob without chunking (most widely supported).
// If the full put fails, it will fall back to a chunked upload (useful for flaky networks).
func (rc *RegClient) BlobPut(ctx context.Context, r ref.Ref, d descriptor.Descriptor, rdr io.Reader) (descriptor.Descriptor, error) {
if !r.IsSetRepo() {
return descriptor.Descriptor{}, fmt.Errorf("ref is not set: %s%.0w", r.CommonName(), errs.ErrInvalidReference)
}
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return descriptor.Descriptor{}, err
}
return schemeAPI.BlobPut(ctx, r, d, rdr)
}