1
0
mirror of https://github.com/regclient/regclient.git synced 2025-04-18 22:44:00 +03:00

Add concurrency to image copy

Signed-off-by: Brandon Mitchell <git@bmitch.net>
This commit is contained in:
Brandon Mitchell 2023-04-17 19:27:14 -04:00
parent 499011b958
commit d2ca5fa5eb
No known key found for this signature in database
GPG Key ID: 6E0FF28C767A8BEE
32 changed files with 1780 additions and 449 deletions

View File

@ -50,7 +50,7 @@ vet: ## go vet
go vet ./...
test: ## go test
go test -cover ./...
go test -cover -race ./...
lint: lint-go lint-md ## Run all linting

45
blob.go
View File

@ -6,6 +6,8 @@ import (
"io"
"time"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/scheme"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/ref"
@ -15,14 +17,14 @@ import (
const blobCBFreq = time.Millisecond * 100
type blobOpt struct {
callback func(kind, instance, state string, cur, total int64)
callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)
}
// BlobOpts define options for the Image* commands
type BlobOpts func(*blobOpt)
// BlobWithCallback provides progress data to a callback function
func BlobWithCallback(callback func(kind, instance, state string, cur, total int64)) BlobOpts {
func BlobWithCallback(callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)) BlobOpts {
return func(opts *blobOpt) {
opts.callback = callback
}
@ -38,10 +40,13 @@ func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Re
}
tDesc := d
tDesc.URLs = []string{} // ignore URLs when pushing to target
if opt.callback != nil {
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
}
// for the same repository, there's nothing to copy
if ref.EqualRepository(refSrc, refTgt) {
if opt.callback != nil {
opt.callback("blob", d.Digest.String(), "skipped", 0, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"src": refTgt.Reference,
@ -53,7 +58,7 @@ func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Re
// check if layer already exists
if _, err := rc.BlobHead(ctx, refTgt, tDesc); err == nil {
if opt.callback != nil {
opt.callback("blob", d.Digest.String(), "skipped", 0, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"tgt": refTgt.Reference,
@ -61,12 +66,36 @@ func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Re
}).Debug("Blob copy skipped, already exists")
return nil
}
// acquire throttle for both src and tgt to avoid deadlocks
tList := []*throttle.Throttle{}
schemeSrcAPI, err := rc.schemeGet(refSrc.Scheme)
if err != nil {
return err
}
schemeTgtAPI, err := rc.schemeGet(refTgt.Scheme)
if err != nil {
return err
}
if tSrc, ok := schemeSrcAPI.(scheme.Throttler); ok {
tList = append(tList, tSrc.Throttle(refSrc, false)...)
}
if tTgt, ok := schemeTgtAPI.(scheme.Throttler); ok {
tList = append(tList, tTgt.Throttle(refTgt, true)...)
}
if len(tList) > 0 {
ctx, err = throttle.AcquireMulti(ctx, tList)
if err != nil {
return err
}
defer throttle.ReleaseMulti(ctx, tList)
}
// try mounting blob from the source repo is the registry is the same
if ref.EqualRegistry(refSrc, refTgt) {
err := rc.BlobMount(ctx, refSrc, refTgt, d)
if err == nil {
if opt.callback != nil {
opt.callback("blob", d.Digest.String(), "skipped", 0, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackSkipped, 0, d.Size)
}
rc.log.WithFields(logrus.Fields{
"src": refTgt.Reference,
@ -92,14 +121,14 @@ func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Re
return err
}
if opt.callback != nil {
opt.callback("blob", d.Digest.String(), "started", 0, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackStarted, 0, d.Size)
ticker := time.NewTicker(blobCBFreq)
done := make(chan bool)
defer func() {
close(done)
ticker.Stop()
if ctx.Err() == nil {
opt.callback("blob", d.Digest.String(), "finished", d.Size, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackFinished, d.Size, d.Size)
}
}()
go func() {
@ -110,7 +139,7 @@ func (rc *RegClient) BlobCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Re
case <-ticker.C:
offset, err := blobIO.Seek(0, io.SeekCurrent)
if err == nil && offset > 0 {
opt.callback("blob", d.Digest.String(), "active", offset, d.Size)
opt.callback(types.CallbackBlob, d.Digest.String(), types.CallbackActive, offset, d.Size)
}
}
}

View File

@ -158,9 +158,10 @@ func TestBlobGet(t *testing.T) {
tsHost := tsURL.Host
rcHosts := []config.Host{
{
Name: tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
Name: tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
ReqPerSec: 100,
},
}
log := &logrus.Logger{
@ -654,6 +655,7 @@ func TestBlobPut(t *testing.T) {
TLS: config.TLSDisabled,
BlobChunk: int64(blobChunk),
BlobMax: int64(-1),
ReqPerSec: 100,
},
}
log := &logrus.Logger{
@ -1111,6 +1113,7 @@ func TestBlobCopy(t *testing.T) {
TLS: config.TLSDisabled,
BlobChunk: int64(blobChunk),
BlobMax: int64(-1),
ReqPerSec: 100,
},
}
log := &logrus.Logger{

View File

@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"
"github.com/regclient/regclient/internal/godbg"
"github.com/sirupsen/logrus"
)
@ -20,6 +21,7 @@ func main() {
// clean shutdown
cancel()
}()
godbg.SignalTrace()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)

View File

@ -683,20 +683,21 @@ type imageProgress struct {
}
type imageProgressEntry struct {
kind, instance string
state string
start, last time.Time
cur, total int64
bps []float64
kind types.CallbackKind
instance string
state types.CallbackState
start, last time.Time
cur, total int64
bps []float64
}
func (ip *imageProgress) callback(kind, instance, state string, cur, total int64) {
func (ip *imageProgress) callback(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64) {
// track kind/instance
ip.mu.Lock()
defer ip.mu.Unlock()
ip.changed = true
now := time.Now()
if e, ok := ip.entries[kind+":"+instance]; ok {
if e, ok := ip.entries[kind.String()+":"+instance]; ok {
e.state = state
diff := now.Sub(e.last)
bps := float64(cur-e.cur) / diff.Seconds()
@ -710,7 +711,7 @@ func (ip *imageProgress) callback(kind, instance, state string, cur, total int64
e.bps = append(e.bps, bps)
}
} else {
ip.entries[kind+":"+instance] = &imageProgressEntry{
ip.entries[kind.String()+":"+instance] = &imageProgressEntry{
kind: kind,
instance: instance,
state: state,
@ -730,36 +731,45 @@ func (ip *imageProgress) display(w io.Writer, final bool) {
return // skip since no changes since last display and not the final display
}
now := time.Now()
var manifests, sum, skipped, queued int64
var manifestTotal, manifestFinished, sum, skipped, queued int64
// sort entry keys by start time
keys := make([]string, 0, len(ip.entries))
for k := range ip.entries {
keys = append(keys, k)
}
sort.Slice(keys, func(a, b int) bool {
return ip.entries[keys[a]].start.Before(ip.entries[keys[b]].start)
if ip.entries[keys[a]].state != ip.entries[keys[b]].state {
return ip.entries[keys[a]].state > ip.entries[keys[b]].state
} else if ip.entries[keys[a]].state != types.CallbackActive {
return ip.entries[keys[a]].last.Before(ip.entries[keys[b]].last)
} else {
return ip.entries[keys[a]].cur > ip.entries[keys[b]].cur
}
})
startCount, startLimit := 0, 2
finishedCount, finishedLimit := 0, 2
// hide old finished entries
for i := len(keys) - 1; i >= 0; i-- {
e := ip.entries[keys[i]]
if e.kind != "manifest" && e.state == "finished" {
if e.kind != types.CallbackManifest && e.state == types.CallbackFinished {
finishedCount++
if finishedCount > finishedLimit {
e.state = "archived"
e.state = types.CallbackArchived
}
}
}
for _, k := range keys {
e := ip.entries[k]
switch e.kind {
case "manifest":
manifests++
case types.CallbackManifest:
manifestTotal++
if e.state == types.CallbackFinished || e.state == types.CallbackSkipped {
manifestFinished++
}
default:
// show progress bars
if e.state == "active" || (e.state == "started" && startCount < startLimit) || e.state == "finished" {
if e.state == "started" {
if !final && (e.state == types.CallbackActive || (e.state == types.CallbackStarted && startCount < startLimit) || e.state == types.CallbackFinished) {
if e.state == types.CallbackStarted {
startCount++
}
pre := e.instance + " "
@ -771,16 +781,17 @@ func (ip *imageProgress) display(w io.Writer, final bool) {
ip.asciOut.Add(ip.bar.Generate(pct, pre, post))
}
// track stats
if e.cur > 0 {
if e.state == types.CallbackSkipped {
skipped += e.total
} else if e.total > 0 {
sum += e.cur
queued += e.total - e.cur
} else if e.state == "skipped" {
skipped += e.total
}
}
}
// show stats summary
ip.asciOut.Add([]byte(fmt.Sprintf("%d manifests, %s/s, %s copied, %s skipped", manifests,
ip.asciOut.Add([]byte(fmt.Sprintf("%d/%d manifests, %s/s, %s copied, %s skipped",
manifestFinished, manifestTotal,
units.HumanSize(float64(sum)/now.Sub(ip.start).Seconds()),
units.HumanSize(float64(sum)),
units.HumanSize(float64(skipped)))))
@ -915,6 +926,12 @@ func runImageGetFile(cmd *cobra.Command, args []string) error {
th, rdr, err := btr.ReadFile(filename)
if err != nil {
if errors.Is(err, types.ErrFileNotFound) {
if err := btr.Close(); err != nil {
return err
}
if err := blob.Close(); err != nil {
return err
}
continue
}
return fmt.Errorf("failed pulling from layer %d: %w", i, err)
@ -943,6 +960,12 @@ func runImageGetFile(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
if err := btr.Close(); err != nil {
return err
}
if err := blob.Close(); err != nil {
return err
}
return nil
}
// all layers exhausted, not found or deleted

View File

@ -8,6 +8,7 @@ import (
"strings"
"syscall"
"github.com/regclient/regclient/internal/godbg"
"github.com/sirupsen/logrus"
)
@ -21,6 +22,7 @@ func main() {
// clean shutdown
cancel()
}()
godbg.SignalTrace()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)

View File

@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"
"github.com/regclient/regclient/internal/godbg"
"github.com/sirupsen/logrus"
)
@ -20,6 +21,7 @@ func main() {
// clean shutdown
cancel()
}()
godbg.SignalTrace()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)

View File

@ -6,8 +6,10 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/internal/timejson"
"github.com/sirupsen/logrus"
)
@ -40,6 +42,9 @@ const (
var (
defaultExpire = time.Hour * 1
defaultCredHelperRetry = time.Second * 5
defaultConcurrent = 3
defaultReqPerSec = 10
mu = sync.Mutex{}
)
// MarshalJSON converts to a json string using MarshalText
@ -95,31 +100,32 @@ func (t *TLSConf) UnmarshalText(b []byte) error {
// Host struct contains host specific settings
type Host struct {
Name string `json:"-" yaml:"registry,omitempty"` // name of the host, read from yaml, not written in json
Scheme string `json:"scheme,omitempty" yaml:"scheme"` // TODO: deprecate, delete
TLS TLSConf `json:"tls,omitempty" yaml:"tls"` // enabled, disabled, insecure
RegCert string `json:"regcert,omitempty" yaml:"regcert"` // public pem cert of registry
ClientCert string `json:"clientcert,omitempty" yaml:"clientcert"` // public pem cert for client (mTLS)
ClientKey string `json:"clientkey,omitempty" yaml:"clientkey"` // private pem cert for client (mTLS)
DNS []string `json:"dns,omitempty" yaml:"dns"` // TODO: remove slice, single string, or remove entirely?
Hostname string `json:"hostname,omitempty" yaml:"hostname"` // replaces DNS array with single string
User string `json:"user,omitempty" yaml:"user"` // username, not used with credHelper
Pass string `json:"pass,omitempty" yaml:"pass"` // password, not used with credHelper
Token string `json:"token,omitempty" yaml:"token"` // token, experimental for specific APIs
CredHelper string `json:"credHelper,omitempty" yaml:"credHelper"` // credential helper command for requesting logins
CredExpire timejson.Duration `json:"credExpire,omitempty" yaml:"credExpire"` // time until credential expires
CredHost string `json:"credHost" yaml:"credHost"` // used when a helper hostname doesn't match Hostname
credRefresh time.Time `json:"-" yaml:"-"` // internal use, when to refresh credentials
PathPrefix string `json:"pathPrefix,omitempty" yaml:"pathPrefix"` // used for mirrors defined within a repository namespace
Mirrors []string `json:"mirrors,omitempty" yaml:"mirrors"` // list of other Host Names to use as mirrors
Priority uint `json:"priority,omitempty" yaml:"priority"` // priority when sorting mirrors, higher priority attempted first
RepoAuth bool `json:"repoAuth,omitempty" yaml:"repoAuth"` // tracks a separate auth per repo
API string `json:"api,omitempty" yaml:"api"` // experimental: registry API to use
APIOpts map[string]string `json:"apiOpts,omitempty" yaml:"apiOpts"` // options for APIs
BlobChunk int64 `json:"blobChunk,omitempty" yaml:"blobChunk"` // size of each blob chunk
BlobMax int64 `json:"blobMax,omitempty" yaml:"blobMax"` // threshold to switch to chunked upload, -1 to disable, 0 for regclient.blobMaxPut
ReqPerSec float64 `json:"reqPerSec,omitempty" yaml:"reqPerSec"` // requests per second
ReqConcurrent int64 `json:"reqConcurrent,omitempty" yaml:"reqConcurrent"` // concurrent requests
Name string `json:"-" yaml:"registry,omitempty"` // name of the host, read from yaml, not written in json
Scheme string `json:"scheme,omitempty" yaml:"scheme"` // TODO: deprecate, delete
TLS TLSConf `json:"tls,omitempty" yaml:"tls"` // enabled, disabled, insecure
RegCert string `json:"regcert,omitempty" yaml:"regcert"` // public pem cert of registry
ClientCert string `json:"clientcert,omitempty" yaml:"clientcert"` // public pem cert for client (mTLS)
ClientKey string `json:"clientkey,omitempty" yaml:"clientkey"` // private pem cert for client (mTLS)
DNS []string `json:"dns,omitempty" yaml:"dns"` // TODO: remove slice, single string, or remove entirely?
Hostname string `json:"hostname,omitempty" yaml:"hostname"` // replaces DNS array with single string
User string `json:"user,omitempty" yaml:"user"` // username, not used with credHelper
Pass string `json:"pass,omitempty" yaml:"pass"` // password, not used with credHelper
Token string `json:"token,omitempty" yaml:"token"` // token, experimental for specific APIs
CredHelper string `json:"credHelper,omitempty" yaml:"credHelper"` // credential helper command for requesting logins
CredExpire timejson.Duration `json:"credExpire,omitempty" yaml:"credExpire"` // time until credential expires
CredHost string `json:"credHost" yaml:"credHost"` // used when a helper hostname doesn't match Hostname
credRefresh time.Time `json:"-" yaml:"-"` // internal use, when to refresh credentials
PathPrefix string `json:"pathPrefix,omitempty" yaml:"pathPrefix"` // used for mirrors defined within a repository namespace
Mirrors []string `json:"mirrors,omitempty" yaml:"mirrors"` // list of other Host Names to use as mirrors
Priority uint `json:"priority,omitempty" yaml:"priority"` // priority when sorting mirrors, higher priority attempted first
RepoAuth bool `json:"repoAuth,omitempty" yaml:"repoAuth"` // tracks a separate auth per repo
API string `json:"api,omitempty" yaml:"api"` // experimental: registry API to use
APIOpts map[string]string `json:"apiOpts,omitempty" yaml:"apiOpts"` // options for APIs
BlobChunk int64 `json:"blobChunk,omitempty" yaml:"blobChunk"` // size of each blob chunk
BlobMax int64 `json:"blobMax,omitempty" yaml:"blobMax"` // threshold to switch to chunked upload, -1 to disable, 0 for regclient.blobMaxPut
ReqPerSec float64 `json:"reqPerSec,omitempty" yaml:"reqPerSec"` // requests per second
ReqConcurrent int64 `json:"reqConcurrent,omitempty" yaml:"reqConcurrent"` // concurrent requests
throttle *throttle.Throttle // limit for concurrent requests
}
type Cred struct {
@ -129,8 +135,10 @@ type Cred struct {
// HostNew creates a default Host entry
func HostNew() *Host {
h := Host{
TLS: TLSEnabled,
APIOpts: map[string]string{},
TLS: TLSEnabled,
APIOpts: map[string]string{},
ReqConcurrent: int64(defaultConcurrent),
ReqPerSec: float64(defaultReqPerSec),
}
return &h
}
@ -442,6 +450,14 @@ func (host *Host) Merge(newHost Host, log *logrus.Logger) error {
if newHost.ReqConcurrent > 0 {
if host.ReqConcurrent != 0 && host.ReqConcurrent != newHost.ReqConcurrent {
if host.throttle != nil {
log.WithFields(logrus.Fields{
"orig": host.ReqConcurrent,
"new": newHost.ReqConcurrent,
"host": name,
}).Warn("Unable to change ReqConcurrent after throttle is created")
return fmt.Errorf("unable to change ReqConcurrent after throttle is created")
}
log.WithFields(logrus.Fields{
"orig": host.ReqConcurrent,
"new": newHost.ReqConcurrent,
@ -454,6 +470,20 @@ func (host *Host) Merge(newHost Host, log *logrus.Logger) error {
return nil
}
func (host *Host) Throttle() *throttle.Throttle {
if host.ReqConcurrent <= 0 {
return nil
}
if host.throttle == nil {
mu.Lock()
defer mu.Unlock()
if host.throttle == nil {
host.throttle = throttle.New(int(host.ReqConcurrent))
}
}
return host.throttle
}
func copyMapString(src map[string]string) map[string]string {
copy := map[string]string{}
for k, v := range src {

580
image.go
View File

@ -9,6 +9,7 @@ import (
"io"
"path/filepath"
"strings"
"sync"
"time"
// crypto libraries included for go-digest
@ -71,7 +72,7 @@ type tarWriteData struct {
}
type imageOpt struct {
callback func(kind, instance, state string, cur, total int64)
callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)
checkBaseDigest string
checkBaseRef string
checkSkipConfig bool
@ -84,13 +85,21 @@ type imageOpt struct {
platforms []string
referrerConfs []scheme.ReferrerConfig
tagList []string
manifWG sync.WaitGroup
mu sync.Mutex
seen map[digest.Digest]*imageSeen
}
type imageSeen struct {
done chan struct{}
err error
}
// ImageOpts define options for the Image* commands
type ImageOpts func(*imageOpt)
// ImageWithCallback provides progress data to a callback function
func ImageWithCallback(callback func(kind, instance, state string, cur, total int64)) ImageOpts {
func ImageWithCallback(callback func(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64)) ImageOpts {
return func(opts *imageOpt) {
opts.callback = callback
}
@ -401,7 +410,9 @@ func (rc *RegClient) ImageCheckBase(ctx context.Context, r ref.Ref, opts ...Imag
// On the same registry, it will attempt to use cross-repository blob mounts to avoid pulling blobs
// Blobs are only pulled when they don't exist on the target and a blob mount fails
func (rc *RegClient) ImageCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, opts ...ImageOpts) error {
var opt imageOpt
opt := imageOpt{
seen: map[digest.Digest]*imageSeen{},
}
for _, optFn := range opts {
optFn(&opt)
}
@ -409,215 +420,192 @@ func (rc *RegClient) ImageCopy(ctx context.Context, refSrc ref.Ref, refTgt ref.R
if w := warning.FromContext(ctx); w == nil {
ctx = warning.NewContext(ctx, &warning.Warning{Hook: warning.DefaultHook()})
}
// block GC from running (in OCIDir) during the copy
schemeTgtAPI, err := rc.schemeGet(refTgt.Scheme)
if err != nil {
return err
}
if tgtGCLocker, isGCLocker := schemeTgtAPI.(scheme.GCLocker); isGCLocker {
tgtGCLocker.GCLock(refTgt)
defer tgtGCLocker.GCUnlock(refTgt)
}
// preload tag listing for digest tag copy
if opt.digestTags {
tl, err := rc.TagList(ctx, refSrc)
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"err": err,
}).Warn("Failed to list tags for digest-tag copy")
return err
}
tags, err := tl.GetTags()
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"err": err,
}).Warn("Failed to list tags for digest-tag copy")
return err
}
opt.tagList = tags
}
opt.manifWG.Add(1)
return rc.imageCopyOpt(ctx, refSrc, refTgt, types.Descriptor{}, opt.child, &opt)
}
func (rc *RegClient) imageCopyOpt(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d types.Descriptor, child bool, opt *imageOpt) error {
mOpts := []ManifestOpts{}
if child {
mOpts = append(mOpts, WithManifestChild())
}
// check if scheme/refTgt prefers parent manifests pushed first
// if so, this should automatically set forceRecursive
tgtSI, err := rc.schemeInfo(refTgt)
if err != nil {
return fmt.Errorf("failed looking up scheme for %s: %v", refTgt.CommonName(), err)
}
if tgtSI.ManifestPushFirst {
opt.forceRecursive = true
}
// check if source and destination already match
mdh, errD := rc.ManifestHead(ctx, refTgt, WithManifestRequireDigest())
if opt.forceRecursive {
// copy forced, unable to run below skips
} else if errD == nil && refTgt.Digest != "" && digest.Digest(refTgt.Digest) == mdh.GetDescriptor().Digest {
rc.log.WithFields(logrus.Fields{
"target": refTgt.Reference,
"digest": mdh.GetDescriptor().Digest.String(),
}).Info("Copy not needed, target already up to date")
if opt.callback != nil {
opt.callback("manifest", d.Digest.String(), "skipped", mdh.GetDescriptor().Size, mdh.GetDescriptor().Size)
// imageCopyOpt is a thread safe copy of a manifest and nested content
func (rc *RegClient) imageCopyOpt(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d types.Descriptor, child bool, opt *imageOpt) (err error) {
var mSrc, mTgt manifest.Manifest
var sDig digest.Digest
afterManifWG := false
defer func() {
if !afterManifWG {
opt.manifWG.Done()
}
return nil
} else if errD == nil && refTgt.Digest == "" {
msh, errS := rc.ManifestHead(ctx, refSrc, WithManifestRequireDigest())
if errS == nil && msh.GetDescriptor().Digest == mdh.GetDescriptor().Digest {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"digest": mdh.GetDescriptor().Digest.String(),
}).Info("Copy not needed, target already up to date")
}()
// check the digest of the source (if available) and skip copy if it's already been seen
if d.Digest != "" {
sDig = d.Digest
} else if refSrc.Digest != "" {
sDig = digest.Digest(refSrc.Digest)
}
if sDig != "" {
seenCB, wait := imageSeenAndWait(opt, sDig)
if seenCB != nil {
defer seenCB(err)
} else {
afterManifWG = true
opt.manifWG.Done()
return wait() // with a digest, this is an index entry, wait
}
}
// check if copy may be skipped
mTgt, err = rc.ManifestHead(ctx, refTgt, WithManifestRequireDigest())
if err == nil && !opt.forceRecursive && opt.referrerConfs == nil && !opt.digestTags {
if sDig == "" {
mSrc, err = rc.ManifestHead(ctx, refSrc, WithManifestRequireDigest())
if err != nil {
return fmt.Errorf("copy failed, error getting source: %w", err)
}
sDig = mSrc.GetDescriptor().Digest
seenCB, _ := imageSeenAndWait(opt, sDig)
if seenCB == nil {
return nil // without a digest, digest tag, do not wait, loop protection
}
defer seenCB(err)
}
if sDig == mTgt.GetDescriptor().Digest {
if opt.callback != nil {
opt.callback("manifest", d.Digest.String(), "skipped", mdh.GetDescriptor().Size, mdh.GetDescriptor().Size)
opt.callback(types.CallbackManifest, d.Digest.String(), types.CallbackSkipped, mTgt.GetDescriptor().Size, mTgt.GetDescriptor().Size)
}
return nil
}
}
// get the manifest for the source
m, err := rc.ManifestGet(ctx, refSrc, WithManifestDesc(d))
if err != nil {
rc.log.WithFields(logrus.Fields{
"ref": refSrc.Reference,
"err": err,
}).Warn("Failed to get source manifest")
return err
// some form of copy is needed, setup vars
mOpts := []ManifestOpts{}
if child {
mOpts = append(mOpts, WithManifestChild())
}
bOpt := []BlobOpts{}
if opt.callback != nil {
bOpt = append(bOpt, BlobWithCallback(opt.callback))
}
waitCh := make(chan error)
waitCount := 0
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// only need a manifestHead in some scenarios to skip copy of blobs
if mTgt != nil && mSrc == nil && !opt.forceRecursive && sDig == "" {
mSrc, err = rc.ManifestHead(ctx, refSrc, WithManifestRequireDigest())
if err != nil {
return fmt.Errorf("copy failed, error getting source: %w", err)
}
sDig = mSrc.GetDescriptor().Digest
seenCB, _ := imageSeenAndWait(opt, sDig)
if seenCB == nil {
return nil // without a digest, digest tag, do not wait, loop protection
}
defer seenCB(err)
}
// manifestGet when needed (mismatch, force, index)
if sDig == "" || mTgt == nil || sDig != mTgt.GetDescriptor().Digest || opt.forceRecursive || mTgt.IsList() {
mSrc, err = rc.ManifestGet(ctx, refSrc, WithManifestDesc(d))
if err != nil {
return fmt.Errorf("copy failed, error getting source: %w", err)
}
if sDig == "" {
sDig = mSrc.GetDescriptor().Digest
seenCB, _ := imageSeenAndWait(opt, sDig)
if seenCB == nil {
return nil // without a digest, digest tag, do not wait, loop protection
}
defer seenCB(err)
}
}
if opt.callback != nil {
opt.callback("manifest", d.Digest.String(), "started", 0, d.Size)
opt.callback(types.CallbackManifest, d.Digest.String(), types.CallbackStarted, 0, d.Size)
}
if tgtSI.ManifestPushFirst {
// push manifest to target
err = rc.ManifestPut(ctx, refTgt, m, mOpts...)
// process entries in an index
if mSrcIndex, ok := mSrc.(manifest.Indexer); ok && mSrc.IsSet() && !ref.EqualRepository(refSrc, refTgt) {
// manifest lists need to recursively copy nested images by digest
dList, err := mSrcIndex.GetManifestList()
if err != nil {
rc.log.WithFields(logrus.Fields{
"target": refTgt.Reference,
"err": err,
}).Warn("Failed to push manifest")
return err
}
if opt.callback != nil {
opt.callback("manifest", d.Digest.String(), "finished", d.Size, d.Size)
}
}
if !ref.EqualRepository(refSrc, refTgt) {
bOpt := []BlobOpts{}
if opt.callback != nil {
bOpt = append(bOpt, BlobWithCallback(opt.callback))
}
// copy components of the image if the repository is different
if mi, ok := m.(manifest.Indexer); ok {
// manifest lists need to recursively copy nested images by digest
pd, err := mi.GetManifestList()
if err != nil {
return err
}
for _, entry := range pd {
// skip copy of platforms not specifically included
if len(opt.platforms) > 0 {
match, err := imagePlatformInList(entry.Platform, opt.platforms)
if err != nil {
return err
}
if !match {
rc.log.WithFields(logrus.Fields{
"platform": entry.Platform,
}).Debug("Platform excluded from copy")
continue
}
for _, dEntry := range dList {
// skip copy of platforms not specifically included
if len(opt.platforms) > 0 {
match, err := imagePlatformInList(dEntry.Platform, opt.platforms)
if err != nil {
return err
}
if !match {
rc.log.WithFields(logrus.Fields{
"platform": dEntry.Platform,
}).Debug("Platform excluded from copy")
continue
}
}
dEntry := dEntry
opt.manifWG.Add(1)
waitCount++
go func() {
var err error
rc.log.WithFields(logrus.Fields{
"platform": entry.Platform,
"digest": entry.Digest.String(),
"platform": dEntry.Platform,
"digest": dEntry.Digest.String(),
}).Debug("Copy platform")
entrySrc := refSrc
entryTgt := refTgt
entrySrc.Tag = ""
entryTgt.Tag = ""
entrySrc.Digest = entry.Digest.String()
entryTgt.Digest = entry.Digest.String()
switch entry.MediaType {
entrySrc.Digest = dEntry.Digest.String()
entryTgt.Digest = dEntry.Digest.String()
switch dEntry.MediaType {
case types.MediaTypeDocker1Manifest, types.MediaTypeDocker1ManifestSigned,
types.MediaTypeDocker2Manifest, types.MediaTypeDocker2ManifestList,
types.MediaTypeOCI1Manifest, types.MediaTypeOCI1ManifestList:
// known manifest media type
err = rc.imageCopyOpt(ctx, entrySrc, entryTgt, entry, true, opt)
err = rc.imageCopyOpt(ctx, entrySrc, entryTgt, dEntry, true, opt)
case types.MediaTypeDocker2ImageConfig, types.MediaTypeOCI1ImageConfig,
types.MediaTypeDocker2LayerGzip, types.MediaTypeOCI1Layer, types.MediaTypeOCI1LayerGzip,
types.MediaTypeBuildkitCacheConfig:
// known blob media type
err = rc.BlobCopy(ctx, entrySrc, entryTgt, entry, bOpt...)
opt.manifWG.Done()
opt.manifWG.Wait()
err = rc.imageCopyBlob(ctx, entrySrc, entryTgt, dEntry, opt, bOpt...)
default:
// unknown media type, first try an image copy
err = rc.imageCopyOpt(ctx, entrySrc, entryTgt, entry, true, opt)
err = rc.imageCopyOpt(ctx, entrySrc, entryTgt, dEntry, true, opt)
if err != nil {
// fall back to trying to copy a blob
err = rc.BlobCopy(ctx, entrySrc, entryTgt, entry, bOpt...)
opt.manifWG.Done()
opt.manifWG.Wait()
err = rc.imageCopyBlob(ctx, entrySrc, entryTgt, dEntry, opt, bOpt...)
}
}
if err != nil {
return err
}
}
}
if mi, ok := m.(manifest.Imager); ok {
// copy components of an image
// transfer the config
cd, err := mi.GetConfig()
if err != nil {
// docker schema v1 does not have a config object, ignore if it's missing
if !errors.Is(err, types.ErrUnsupportedMediaType) {
rc.log.WithFields(logrus.Fields{
"ref": refSrc.Reference,
"err": err,
}).Warn("Failed to get config digest from manifest")
return fmt.Errorf("failed to get config digest for %s: %w", refSrc.CommonName(), err)
}
} else {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"digest": cd.Digest.String(),
}).Info("Copy config")
if err := rc.BlobCopy(ctx, refSrc, refTgt, cd, bOpt...); err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"digest": cd.Digest.String(),
"err": err,
}).Warn("Failed to copy config")
return err
}
}
// copy filesystem layers
l, err := mi.GetLayers()
if err != nil {
return err
}
for _, layerSrc := range l {
if len(layerSrc.URLs) > 0 && !opt.includeExternal {
// skip blobs where the URLs are defined, these aren't hosted and won't be pulled from the source
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
"external-urls": layerSrc.URLs,
}).Debug("Skipping external layer")
continue
}
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
}).Info("Copy layer")
if err := rc.BlobCopy(ctx, refSrc, refTgt, layerSrc, bOpt...); err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
"err": err,
}).Warn("Failed to copy layer")
return err
}
}
}
}
if !tgtSI.ManifestPushFirst {
// push manifest to target
err = rc.ManifestPut(ctx, refTgt, m, mOpts...)
if err != nil {
rc.log.WithFields(logrus.Fields{
"target": refTgt.Reference,
"err": err,
}).Warn("Failed to push manifest")
return err
}
if opt.callback != nil {
opt.callback("manifest", d.Digest.String(), "finished", d.Size, d.Size)
waitCh <- err
}()
}
}
@ -639,46 +627,38 @@ func (rc *RegClient) imageCopyOpt(ctx context.Context, refSrc ref.Ref, refTgt re
}
}
for _, rDesc := range descList {
opt.mu.Lock()
seen := opt.seen[rDesc.Digest]
opt.mu.Unlock()
if seen != nil {
continue // skip referrers that have been seen
}
referrerSrc := refSrc
referrerSrc.Tag = ""
referrerSrc.Digest = rDesc.Digest.String()
referrerTgt := refTgt
referrerTgt.Tag = ""
referrerTgt.Digest = rDesc.Digest.String()
err = rc.imageCopyOpt(ctx, referrerSrc, referrerTgt, rDesc, true, opt)
if err != nil {
rc.log.WithFields(logrus.Fields{
"digest": rDesc.Digest.String(),
"src": referrerSrc.CommonName(),
"tgt": referrerTgt.CommonName(),
}).Warn("Failed to copy referrer")
return err
}
rDesc := rDesc
opt.manifWG.Add(1)
waitCount++
go func() {
err := rc.imageCopyOpt(ctx, referrerSrc, referrerTgt, rDesc, true, opt)
if err != nil {
rc.log.WithFields(logrus.Fields{
"digest": rDesc.Digest.String(),
"src": referrerSrc.CommonName(),
"tgt": referrerTgt.CommonName(),
}).Warn("Failed to copy referrer")
}
waitCh <- err
}()
}
}
// lookup digest tags to include artifacts with image
if opt.digestTags {
if len(opt.tagList) == 0 {
tl, err := rc.TagList(ctx, refSrc)
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"err": err,
}).Warn("Failed to list tags for digest-tag copy")
return err
}
tags, err := tl.GetTags()
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"err": err,
}).Warn("Failed to list tags for digest-tag copy")
return err
}
opt.tagList = tags
}
prefix := fmt.Sprintf("%s-%s", m.GetDescriptor().Digest.Algorithm(), m.GetDescriptor().Digest.Encoded())
prefix := fmt.Sprintf("%s-%s", sDig.Algorithm(), sDig.Encoded())
for _, tag := range opt.tagList {
if strings.HasPrefix(tag, prefix) {
// skip referrers that were copied above
@ -693,22 +673,194 @@ func (rc *RegClient) imageCopyOpt(ctx context.Context, refSrc ref.Ref, refTgt re
refTagTgt := refTgt
refTagTgt.Tag = tag
refTagTgt.Digest = ""
err = rc.imageCopyOpt(ctx, refTagSrc, refTagTgt, types.Descriptor{}, false, opt)
if err != nil {
rc.log.WithFields(logrus.Fields{
"tag": tag,
"src": refTagSrc.CommonName(),
"tgt": refTagTgt.CommonName(),
}).Warn("Failed to copy digest-tag")
return err
}
tag := tag
opt.manifWG.Add(1)
waitCount++
go func() {
err := rc.imageCopyOpt(ctx, refTagSrc, refTagTgt, types.Descriptor{}, false, opt)
if err != nil {
rc.log.WithFields(logrus.Fields{
"tag": tag,
"src": refTagSrc.CommonName(),
"tgt": refTagTgt.CommonName(),
}).Warn("Failed to copy digest-tag")
}
waitCh <- err
}()
}
}
}
// wait for all tasks to finish manifest processing before continuing to blobs
opt.manifWG.Done()
opt.manifWG.Wait()
afterManifWG = true
// check for any errors and abort early if found
err = nil
done := false
for !done && waitCount > 0 {
if err == nil {
select {
case err = <-waitCh:
if err != nil {
cancel()
}
default:
done = true // happy path
}
} else {
<-waitCh
}
if !done {
waitCount--
}
}
if err != nil {
return err
}
// If source is image, copy blobs
if mSrcImg, ok := mSrc.(manifest.Imager); ok && mSrc.IsSet() && !ref.EqualRepository(refSrc, refTgt) {
// copy the config
cd, err := mSrcImg.GetConfig()
if err != nil {
// docker schema v1 does not have a config object, ignore if it's missing
if !errors.Is(err, types.ErrUnsupportedMediaType) {
rc.log.WithFields(logrus.Fields{
"ref": refSrc.Reference,
"err": err,
}).Warn("Failed to get config digest from manifest")
return fmt.Errorf("failed to get config digest for %s: %w", refSrc.CommonName(), err)
}
} else {
waitCount++
go func() {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"digest": cd.Digest.String(),
}).Info("Copy config")
err := rc.imageCopyBlob(ctx, refSrc, refTgt, cd, opt, bOpt...)
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"digest": cd.Digest.String(),
"err": err,
}).Warn("Failed to copy config")
}
waitCh <- err
}()
}
// copy filesystem layers
l, err := mSrcImg.GetLayers()
if err != nil {
return err
}
for _, layerSrc := range l {
if len(layerSrc.URLs) > 0 && !opt.includeExternal {
// skip blobs where the URLs are defined, these aren't hosted and won't be pulled from the source
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
"external-urls": layerSrc.URLs,
}).Debug("Skipping external layer")
continue
}
waitCount++
layerSrc := layerSrc
go func() {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
}).Info("Copy layer")
err := rc.imageCopyBlob(ctx, refSrc, refTgt, layerSrc, opt, bOpt...)
if err != nil {
rc.log.WithFields(logrus.Fields{
"source": refSrc.Reference,
"target": refTgt.Reference,
"layer": layerSrc.Digest.String(),
"err": err,
}).Warn("Failed to copy layer")
}
waitCh <- err
}()
}
}
// wait for background tasks to finish
err = nil
for waitCount > 0 {
if err == nil {
err = <-waitCh
if err != nil {
cancel()
}
} else {
<-waitCh
}
waitCount--
}
if err != nil {
return err
}
// push manifest
if mTgt == nil || sDig != mTgt.GetDescriptor().Digest || opt.forceRecursive {
err = rc.ManifestPut(ctx, refTgt, mSrc, mOpts...)
if err != nil {
rc.log.WithFields(logrus.Fields{
"target": refTgt.Reference,
"err": err,
}).Warn("Failed to push manifest")
return err
}
if opt.callback != nil {
opt.callback(types.CallbackManifest, d.Digest.String(), types.CallbackFinished, d.Size, d.Size)
}
} else {
if opt.callback != nil {
opt.callback(types.CallbackManifest, d.Digest.String(), types.CallbackSkipped, d.Size, d.Size)
}
}
return nil
}
func (rc *RegClient) imageCopyBlob(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref, d types.Descriptor, opt *imageOpt, bOpt ...BlobOpts) error {
seenCB, wait := imageSeenAndWait(opt, d.Digest)
if wait != nil {
return wait()
}
err := rc.BlobCopy(ctx, refSrc, refTgt, d, bOpt...)
seenCB(err)
return err
}
// imageSeenAndWait returns either a callback to report the error when the digest hasn't been seen before
// or a function to wait for another task to finish processing the digest
func imageSeenAndWait(opt *imageOpt, dig digest.Digest) (func(error), func() error) {
opt.mu.Lock()
defer opt.mu.Unlock()
if seen := opt.seen[dig]; seen != nil {
return nil, func() error {
<-seen.done
return seen.err
}
}
seen := imageSeen{
done: make(chan struct{}),
}
opt.seen[dig] = &seen
return func(err error) {
seen.err = err
close(seen.done)
}, nil
}
// ImageExport exports an image to an output stream.
// The format is compatible with "docker load" if a single image is selected and not a manifest list.
// The ref must include a tag for exporting to docker (defaults to latest), and may also include a digest.

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"testing"
"time"
"github.com/regclient/regclient/internal/rwfs"
"github.com/regclient/regclient/types"
@ -21,7 +22,9 @@ func TestImageCheckBase(t *testing.T) {
t.Errorf("failed to setup memfs copy: %v", err)
return
}
rc := New(WithFS(fsMem))
delayInit, _ := time.ParseDuration("0.05s")
delayMax, _ := time.ParseDuration("0.10s")
rc := New(WithFS(fsMem), WithRetryDelay(delayInit, delayMax))
rb1, err := ref.New("ocidir://testrepo:b1")
if err != nil {
t.Errorf("failed to setup ref: %v", err)
@ -138,7 +141,9 @@ func TestExportImport(t *testing.T) {
return
}
// create regclient
rc := New(WithFS(fsMem))
delayInit, _ := time.ParseDuration("0.05s")
delayMax, _ := time.ParseDuration("0.10s")
rc := New(WithFS(fsMem), WithRetryDelay(delayInit, delayMax))
rIn, err := ref.New("ocidir://testrepo:v1")
if err != nil {
t.Errorf("failed to parse ref: %v", err)

21
internal/godbg/godbg.go Normal file
View File

@ -0,0 +1,21 @@
// Package godbg provides tooling for debugging Go
package godbg
import (
"os"
"os/signal"
"runtime/pprof"
"syscall"
)
func SignalTrace(sigs ...os.Signal) {
if len(sigs) == 0 {
sigs = append(sigs, syscall.SIGUSR1)
}
sig := make(chan os.Signal, 1)
signal.Notify(sig, sigs...)
go func() {
<-sig
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
}()
}

49
internal/muset/muset.go Normal file
View File

@ -0,0 +1,49 @@
// Package muset is used to acquire a group of mutex locks
package muset
import "sync"
// Lock acquires a set of locks without holding some locks acquired while waiting for others to be released
func Lock(muList ...*sync.Mutex) {
if len(muList) <= 0 {
return
}
// dedup entries from the list
for i := len(muList) - 2; i >= 0; i-- {
for j := len(muList) - 1; j > i; j-- {
if muList[i] == muList[j] {
// delete j from the list
muList[j] = muList[len(muList)-1]
muList = muList[:len(muList)-1]
}
}
}
lastBlock := 0
for {
// start from last blocking mutex
muList[lastBlock].Lock()
// acquire all other locks with TryLock
nextBlock := -1
for i := range muList {
if i == lastBlock {
continue
}
acquired := muList[i].TryLock()
if !acquired {
nextBlock = i
break
}
}
// if all locks acquired, done
if nextBlock == -1 {
return
}
// unlock
for i := range muList {
if i < nextBlock || i == lastBlock {
muList[i].Unlock()
}
}
lastBlock = nextBlock
}
}

View File

@ -0,0 +1,60 @@
package muset
import (
"sync"
"testing"
"time"
)
func TestMuset(t *testing.T) {
// test acquiring sets of locks, sleep between changes to force out race conditions
var muA, muB, muC sync.Mutex
// empty set
Lock()
// noting initially locked
Lock(&muA, &muB, &muC)
muA.Unlock()
muB.Unlock()
muC.Unlock()
// repeating entries
Lock(&muA, &muA, &muB, &muA, &muB)
muB.Unlock()
// A initially locked
// rotating set of locks in different orders
finished := false
delay := time.Microsecond * 10
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Lock(&muA, &muB, &muC)
finished = true
wg.Done()
}()
time.Sleep(delay)
if finished {
t.Error("finished before unlock")
}
muB.Lock()
muA.Unlock()
time.Sleep(delay)
if finished {
t.Error("finished before unlock")
}
muC.Lock()
muB.Unlock()
time.Sleep(delay)
if finished {
t.Error("finished before unlock")
}
muA.Lock()
muC.Unlock()
time.Sleep(delay)
if finished {
t.Error("finished before unlock")
}
muA.Unlock()
wg.Wait()
if !finished {
t.Error("did not finish")
}
}

View File

@ -28,10 +28,10 @@ import (
"github.com/opencontainers/go-digest"
"github.com/regclient/regclient/config"
"github.com/regclient/regclient/internal/auth"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/warning"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
var defaultDelayInit, _ = time.ParseDuration("1s")
@ -45,16 +45,17 @@ const (
// Client is an HTTP client wrapper
// It handles features like authentication, retries, backoff delays, TLS settings
type Client struct {
host map[string]*clientHost
httpClient *http.Client
rootCAPool [][]byte
rootCADirs []string
retryLimit int
delayInit time.Duration
delayMax time.Duration
log *logrus.Logger
userAgent string
mu sync.Mutex
getConfigHost func(string) *config.Host
host map[string]*clientHost
httpClient *http.Client
rootCAPool [][]byte
rootCADirs []string
retryLimit int
delayInit time.Duration
delayMax time.Duration
log *logrus.Logger
userAgent string
mu sync.Mutex
}
type clientHost struct {
@ -66,8 +67,7 @@ type clientHost struct {
auth map[string]auth.Auth
newAuth func() auth.Auth
mu sync.Mutex
parallel *semaphore.Weighted
throttle *time.Ticker
ratelimit *time.Ticker
}
// Req is a request to send to a registry
@ -110,7 +110,7 @@ type clientResp struct {
digester digest.Digester
reader io.Reader
readCur, readMax int64
parallel *semaphore.Weighted
throttle *throttle.Throttle
}
// Opts is used to configure client options
@ -165,18 +165,10 @@ func WithCertFiles(files []string) Opts {
}
}
// WithConfigHosts adds a list of config.Host entries to use for connection settings
func WithConfigHosts(ch []*config.Host) Opts {
// WithConfigHost adds the callback to request a config.Host struct
func WithConfigHost(gch func(string) *config.Host) Opts {
return func(c *Client) {
for _, cur := range ch {
if cur.Name == "" {
continue
}
if _, ok := c.host[cur.Name]; !ok {
c.host[cur.Name] = &clientHost{}
}
c.host[cur.Name].config = cur
}
c.getConfigHost = gch
}
}
@ -280,21 +272,23 @@ func (resp *clientResp) Next() error {
h := hosts[curHost]
resp.mirror = h.config.Name
// check that context isn't canceled/done
ctxErr := resp.ctx.Err()
if ctxErr != nil {
return ctxErr
}
api, okAPI := req.APIs[h.config.API]
if !okAPI {
api, okAPI = req.APIs[""]
}
// try each host in a closure to handle all the backoff/dropHost from one place
if h.parallel != nil {
h.parallel.Acquire(resp.ctx, 1)
// check that context isn't canceled/done
ctxErr := resp.ctx.Err()
if ctxErr != nil {
return ctxErr
}
// wait for other concurrent requests to this host
throttleErr := h.config.Throttle().Acquire(resp.ctx)
if throttleErr != nil {
return throttleErr
}
// try each host in a closure to handle all the backoff/dropHost from one place
err = func() error {
var err error
if !okAPI {
@ -410,8 +404,9 @@ func (resp *clientResp) Next() error {
}
}
if h.throttle != nil {
<-h.throttle.C
// delay for the rate limit
if h.ratelimit != nil {
<-h.ratelimit.C
}
// update http client for insecure requests and root certs
@ -475,7 +470,7 @@ func (resp *clientResp) Next() error {
case http.StatusRequestedRangeNotSatisfiable:
// if range request error (blob push), drop mirror for this req, but other requests don't need backoff
dropHost = true
case http.StatusTooManyRequests, http.StatusRequestTimeout, http.StatusGatewayTimeout:
case http.StatusTooManyRequests, http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusInternalServerError:
// server is likely overloaded, backoff but still retry
backoff = true
default:
@ -513,13 +508,14 @@ func (resp *clientResp) Next() error {
}()
// return on success
if err == nil {
resp.parallel = h.parallel
resp.throttle = h.config.Throttle()
resp.backoffClear()
return nil
}
// backoff, dropHost, and/or go to next host in the list
if h.parallel != nil {
h.parallel.Release(1)
throttleErr = h.config.Throttle().Release(resp.ctx)
if throttleErr != nil {
return throttleErr
}
if backoff {
if api.IgnoreErr {
@ -597,9 +593,9 @@ func (resp *clientResp) Read(b []byte) (int, error) {
}
func (resp *clientResp) Close() error {
if resp.parallel != nil {
resp.parallel.Release(1)
resp.parallel = nil
if resp.throttle != nil {
_ = resp.throttle.Release(resp.ctx)
resp.throttle = nil
}
if resp.resp == nil {
return types.ErrNotFound
@ -699,16 +695,17 @@ func (c *Client) getHost(host string) *clientHost {
h = &clientHost{}
}
if h.config == nil {
h.config = config.HostNewName(host)
if c.getConfigHost != nil {
h.config = c.getConfigHost(host)
} else {
h.config = config.HostNewName(host)
}
}
if h.auth == nil {
h.auth = map[string]auth.Auth{}
}
if h.throttle == nil && h.config.ReqPerSec > 0 {
h.throttle = time.NewTicker(time.Duration(float64(time.Second) / h.config.ReqPerSec))
}
if h.parallel == nil && h.config.ReqConcurrent > 0 {
h.parallel = semaphore.NewWeighted(h.config.ReqConcurrent)
if h.ratelimit == nil && h.config.ReqPerSec > 0 {
h.ratelimit = time.NewTicker(time.Duration(float64(time.Second) / h.config.ReqPerSec))
}
if h.httpClient == nil {

View File

@ -627,27 +627,27 @@ func TestRegHttp(t *testing.T) {
tsHost := tsURL.Host
// create config for various hosts, different host pointing to same dns hostname
configHosts := []*config.Host{
{
configHosts := map[string]*config.Host{
tsHost: {
Name: tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
},
{
"auth." + tsHost: {
Name: "auth." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
User: user,
Pass: pass,
},
{
"unauth." + tsHost: {
Name: "unauth." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
User: user,
Pass: "bad" + pass,
},
{
"repoauth." + tsHost: {
Name: "repoauth." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
@ -655,7 +655,7 @@ func TestRegHttp(t *testing.T) {
Pass: pass,
RepoAuth: true,
},
{
"nohead." + tsHost: {
Name: "nohead." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
@ -663,49 +663,49 @@ func TestRegHttp(t *testing.T) {
"disableHead": "true",
},
},
{
"missing." + tsHost: {
Name: "missing." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-missing",
Priority: 5,
},
{
"forbidden." + tsHost: {
Name: "forbidden." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-forbidden",
Priority: 6,
},
{
"bad-gw." + tsHost: {
Name: "bad-gw." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-bad-gw",
Priority: 7,
},
{
"gw-timeout." + tsHost: {
Name: "gw-timeout." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-gw-timeout",
Priority: 8,
},
{
"server-error." + tsHost: {
Name: "server-error." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-server-error",
Priority: 4,
},
{
"rate-limit." + tsHost: {
Name: "rate-limit." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
PathPrefix: "mirror-rate-limit",
Priority: 1,
},
{
"mirrors." + tsHost: {
Name: "mirrors." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
@ -733,7 +733,12 @@ func TestRegHttp(t *testing.T) {
delayInit, _ := time.ParseDuration("0.05s")
delayMax, _ := time.ParseDuration("0.10s")
hc := NewClient(
WithConfigHosts(configHosts),
WithConfigHost(func(name string) *config.Host {
if configHosts[name] == nil {
configHosts[name] = config.HostNewName(name)
}
return configHosts[name]
}),
WithDelay(delayInit, delayMax),
WithUserAgent(useragent),
)

View File

@ -7,7 +7,10 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
"github.com/regclient/regclient/internal/muset"
)
// in memory implementation of rwfs
@ -22,10 +25,12 @@ type MemChild interface{}
type MemDir struct {
child map[string]MemChild
mod time.Time
mu sync.Mutex
}
type MemFile struct {
b []byte
mod time.Time
mu sync.Mutex
}
type MemDirFP struct {
f *MemDir
@ -68,6 +73,8 @@ func (o *MemFS) Mkdir(name string, perm fs.FileMode) error {
Err: err,
}
}
memDir.mu.Lock()
defer memDir.mu.Unlock()
if _, ok := memDir.child[base]; ok {
return &fs.PathError{
Op: "mkdir",
@ -90,6 +97,8 @@ func (o *MemFS) OpenFile(name string, flags int, perm fs.FileMode) (RWFile, erro
Err: err,
}
}
memDir.mu.Lock()
defer memDir.mu.Unlock()
var child MemChild
if file == "." || file == "" {
child = memDir
@ -168,6 +177,8 @@ func (o *MemFS) Remove(name string) error {
Err: err,
}
}
memDir.mu.Lock()
defer memDir.mu.Unlock()
if child, ok := memDir.child[file]; ok {
switch v := child.(type) {
case *MemFile:
@ -221,6 +232,16 @@ func (o *MemFS) Rename(oldName, newName string) error {
Err: err,
}
}
lockList := []*sync.Mutex{&(memDirOld.mu)}
if dirOld != dirNew {
lockList = append(lockList, &(memDirNew.mu))
}
muset.Lock(lockList...)
defer func() {
for _, l := range lockList {
l.Unlock()
}
}()
childOld, okOld := memDirOld.child[fileOld]
if !okOld {
return &fs.PathError{
@ -338,7 +359,9 @@ func (o *MemFS) getDir(dir string) (*MemDir, error) {
if el == "." || el == "" {
continue
}
cur.mu.Lock()
next, ok := cur.child[el]
cur.mu.Unlock()
if !ok {
return nil, fs.ErrNotExist
}
@ -357,6 +380,8 @@ func (mfp *MemFileFP) Close() error {
}
func (mfp *MemFileFP) Read(b []byte) (int, error) {
mfp.f.mu.Lock()
defer mfp.f.mu.Unlock()
lc := copy(b, mfp.f.b[mfp.cur:])
mfp.cur += lc
if len(mfp.f.b) <= mfp.cur {
@ -366,6 +391,8 @@ func (mfp *MemFileFP) Read(b []byte) (int, error) {
}
func (mfp *MemFileFP) Seek(offset int64, whence int) (int64, error) {
mfp.f.mu.Lock()
defer mfp.f.mu.Unlock()
switch whence {
case io.SeekStart:
mfp.cur = whence
@ -380,6 +407,8 @@ func (mfp *MemFileFP) Seek(offset int64, whence int) (int64, error) {
}
func (mfp *MemFileFP) Stat() (fs.FileInfo, error) {
mfp.f.mu.Lock()
defer mfp.f.mu.Unlock()
fi := NewFI(mfp.name, int64(len(mfp.f.b)), time.Time{}, 0)
return fi, nil
}
@ -388,6 +417,8 @@ func (mfp *MemFileFP) Write(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
mfp.f.mu.Lock()
defer mfp.f.mu.Unlock()
// use copy to overwrite existing contents
if mfp.cur < len(mfp.f.b) {
l := copy(mfp.f.b[mfp.cur:], b)
@ -418,6 +449,8 @@ func (mdp *MemDirFP) Read(b []byte) (int, error) {
// TODO: implement func (mdp *MemDirFP) Seek
func (mdp *MemDirFP) ReadDir(n int) ([]fs.DirEntry, error) {
mdp.f.mu.Lock()
defer mdp.f.mu.Unlock()
names := mdp.filenames(mdp.cur, n)
mdp.cur += len(names)
des := make([]fs.DirEntry, len(names))
@ -453,6 +486,8 @@ func (mdp *MemDirFP) ReadDir(n int) ([]fs.DirEntry, error) {
}
func (mdp *MemDirFP) Stat() (fs.FileInfo, error) {
mdp.f.mu.Lock()
defer mdp.f.mu.Unlock()
fi := NewFI(mdp.name, 4096, mdp.f.mod, fs.ModeDir)
return fi, nil
}

View File

@ -0,0 +1,191 @@
// Package throttle is used to limit concurrent activities
package throttle
import (
"context"
"fmt"
)
type token struct{}
type Throttle struct {
ch chan token
}
type key int
type valMany struct {
tList []*Throttle
}
var keyMany key
func New(count int) *Throttle {
ch := make(chan token, count)
return &Throttle{ch: ch}
}
func (t *Throttle) checkContext(ctx context.Context) (bool, error) {
tCtx := ctx.Value(keyMany)
if tCtx == nil {
return false, nil
}
tCtxVal, ok := tCtx.(*valMany)
if !ok {
return true, fmt.Errorf("context value is not a throttle list")
}
if tCtxVal.tList == nil {
return false, nil
}
for _, cur := range tCtxVal.tList {
if cur == t {
// instance already locked
return true, nil
}
}
return true, fmt.Errorf("cannot acquire new locks during a transaction")
}
func (t *Throttle) Acquire(ctx context.Context) error {
if t == nil {
return nil
}
// check if already acquired in context
if found, err := t.checkContext(ctx); found {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case t.ch <- token{}:
return nil
}
}
func (t *Throttle) Release(ctx context.Context) error {
if t == nil {
return nil
}
// check if already acquired in context
if found, err := t.checkContext(ctx); found {
return err
}
select {
case <-t.ch:
return nil
default:
return fmt.Errorf("failed to release throttle")
}
}
func (t *Throttle) TryAcquire(ctx context.Context) (bool, error) {
if t == nil {
return true, nil
}
// check if already acquired in context
if found, err := t.checkContext(ctx); found {
return err == nil, err
}
select {
case t.ch <- token{}:
return true, nil
default:
return false, nil
}
}
func AcquireMulti(ctx context.Context, tList []*Throttle) (context.Context, error) {
// verify context not already holding locks
tCtx := ctx.Value(keyMany)
if tCtx != nil {
if tCtxVal, ok := tCtx.(*valMany); ok && tCtxVal.tList != nil {
return ctx, fmt.Errorf("throttle cannot manage concurrent transactions")
}
}
if len(tList) <= 0 {
// noop?
return ctx, nil
}
// dedup entries from the list
for i := len(tList) - 2; i >= 0; i-- {
for j := len(tList) - 1; j > i; j-- {
if tList[i] == tList[j] {
// delete j from the list
tList[j] = tList[len(tList)-1]
tList = tList[:len(tList)-1]
}
}
}
lockI := 0
for {
err := tList[lockI].Acquire(ctx)
if err != nil {
return ctx, err
}
acquired := true
i := 0
for i < len(tList) {
if i != lockI {
acquired, err = tList[i].TryAcquire(ctx)
if err != nil || !acquired {
break
}
}
i++
}
if err == nil && acquired {
break
}
// cleanup on failed attempt
if lockI > i {
tList[lockI].Release(ctx)
}
// track blocking index
lockI = i
for i > 0 {
i--
tList[i].Release(ctx)
}
// abort on errors
if err != nil {
return ctx, err
}
}
// success, update context
newCtx := context.WithValue(ctx, keyMany, &valMany{tList: tList})
return newCtx, nil
}
func ReleaseMulti(ctx context.Context, tList []*Throttle) error {
// verify context shows locked values
tCtx := ctx.Value(keyMany)
if tCtx == nil {
return fmt.Errorf("no transaction found to release")
}
tCtxVal, ok := tCtx.(*valMany)
if !ok || tCtxVal.tList == nil {
return fmt.Errorf("no transaction found to release")
}
// dedup entries from the list
for i := len(tList) - 2; i >= 0; i-- {
for j := len(tList) - 1; j > i; j-- {
if tList[i] == tList[j] {
// delete j from the list
tList[j] = tList[len(tList)-1]
tList = tList[:len(tList)-1]
}
}
}
// TODO: release from tList, tCtx, or compare and error if diff?
for _, t := range tList {
if t == nil {
continue
}
// cannot call t.Release since context has value defined
select {
case <-t.ch:
default:
return fmt.Errorf("failed to release throttle")
}
}
// modify context value to track release
tCtxVal.tList = nil
return nil
}

View File

@ -0,0 +1,493 @@
package throttle
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestNil(t *testing.T) {
ctx := context.Background()
var tNil *Throttle
err := tNil.Acquire(ctx)
if err != nil {
t.Errorf("acquire failed: %v", err)
}
err = tNil.Release(ctx)
if err != nil {
t.Errorf("release failed: %v", err)
}
a, err := tNil.TryAcquire(ctx)
if err != nil {
t.Errorf("try acquire failed: %v", err)
}
if !a {
t.Errorf("try acquire did not succeed")
}
}
func TestSingleThrottle(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
t1 := New(1)
// simple acquire
err := t1.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
return
}
// try to acquire in a goroutine
acquired := false
wg.Add(1)
go func() {
defer wg.Done()
err := t1.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
return
}
acquired = true
}()
sleepMS(1)
// verify goroutine did not succeed and cannot be acquired
if acquired {
t.Errorf("throttle acquired before previous released")
}
a, err := t1.TryAcquire(ctx)
if err != nil {
t.Errorf("try acquire errored: %v", err)
}
if a {
t.Errorf("try acquire succeeded")
}
// release and verify goroutine acquires and returns
err = t1.Release(ctx)
if err != nil {
t.Errorf("release failed: %v", err)
}
wg.Wait()
if !acquired {
t.Errorf("throttle was not acquired by thread")
}
// start a new goroutine to acquire
acquired = false
wg.Add(1)
go func() {
defer wg.Done()
err := t1.Acquire(ctx)
if err == nil {
acquired = true
return
}
if !errors.Is(err, context.Canceled) {
t.Errorf("acquire on cancel returned: %v", err)
return
}
}()
sleepMS(1)
// verify goroutine still waiting, cancel context, and verify the return
if acquired {
t.Errorf("throttle acquired before previous released")
}
cancel()
wg.Wait()
if acquired {
t.Errorf("throttle was not acquired by thread")
}
ctx = context.Background()
a, err = t1.TryAcquire(ctx)
if err != nil {
t.Errorf("try acquire errored: %v", err)
}
if a {
t.Errorf("try acquire succeeded")
}
// release, twice, and verify try acquire can succeed
err = t1.Release(ctx)
if err != nil {
t.Errorf("release failed: %v", err)
}
err = t1.Release(context.Background())
if err == nil {
t.Errorf("second release succeeded")
}
a, err = t1.TryAcquire(context.Background())
if err != nil {
t.Errorf("try acquire errored: %v", err)
}
if !a {
t.Errorf("try acquire failed")
}
}
func TestCapacity(t *testing.T) {
t3 := New(3)
wg := sync.WaitGroup{}
ctx := context.Background()
// acquire all three with an intermediate release
err := t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
err = t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
err = t3.Release(ctx)
if err != nil {
t.Errorf("failed to release: %v", err)
}
err = t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
err = t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
// verify try acquire fails on the 4th
a, err := t3.TryAcquire(ctx)
if err != nil {
t.Errorf("failed to try acquire: %v", err)
}
if a {
t.Errorf("try acquire succeeded on full throttle")
}
// launch acquire requests in background
wg.Add(1)
a = false
go func() {
defer wg.Done()
err := t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
a = true
err = t3.Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire: %v", err)
}
}()
sleepMS(1)
if a {
t.Errorf("acquire ran in background")
}
// release two
err = t3.Release(ctx)
if err != nil {
t.Errorf("failed to release: %v", err)
}
err = t3.Release(ctx)
if err != nil {
t.Errorf("failed to release: %v", err)
}
// wait for background job to finish and verify acquired
wg.Wait()
if !a {
t.Errorf("acquire did not run in background")
}
}
func TestMulti(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tList := make([]*Throttle, 4)
for i := 0; i < 3; i++ {
tList[i] = New(1)
}
wg := sync.WaitGroup{}
// acquire duplicate throttles, one multiple times, another two in alternating sets `ababa`
tListA := []*Throttle{tList[0], tList[0], tList[0]}
tListB := []*Throttle{tList[1], tList[2], tList[1], tList[2], tList[1]}
ctxA, err := AcquireMulti(ctx, tListA)
if err != nil {
t.Errorf("failed to acquire multiple entries to same throttle: %v", err)
}
ctxB, err := AcquireMulti(ctx, tListB)
if err != nil {
t.Errorf("failed to acquire multiple entries to pair of throttles: %v", err)
}
ok, err := tList[0].TryAcquire(ctx)
if err != nil || ok {
t.Errorf("try acquire on 0 did not fail, ok=%t, err=%v", ok, err)
}
ok, err = tList[1].TryAcquire(ctx)
if err != nil || ok {
t.Errorf("try acquire on 1 did not fail, ok=%t, err=%v", ok, err)
}
ok, err = tList[2].TryAcquire(ctx)
if err != nil || ok {
t.Errorf("try acquire on 2 did not fail, ok=%t, err=%v", ok, err)
}
err = ReleaseMulti(ctxA, tListA)
if err != nil {
t.Errorf("failed to release multiple locks on A: %v", err)
}
err = ReleaseMulti(ctxB, tListB)
if err != nil {
t.Errorf("failed to release multiple locks on B: %v", err)
}
// use acquire multi on first two
_, err = AcquireMulti(ctx, tList[:0])
if err != nil {
t.Errorf("empty list acquire multi failed: %v", err)
}
ctxMulti, err := AcquireMulti(ctx, tList[:2])
if err != nil {
t.Errorf("failed to acquire multi: %v", err)
return
}
_, err = AcquireMulti(ctxMulti, tList[2:])
if err == nil {
t.Errorf("nested acquire multi did not fail")
}
// try acquiring individually with ctxMulti, first two should succeed
a, err := tList[0].TryAcquire(ctxMulti)
if err != nil {
t.Errorf("failed to try acquire on 0: %v", err)
}
if !a {
t.Errorf("try acquire on 0 did not return true")
}
a, err = tList[1].TryAcquire(ctxMulti)
if err != nil {
t.Errorf("failed to try acquire on 1: %v", err)
}
if !a {
t.Errorf("try acquire on 1 did not return true")
}
// actions on 2 should all fail because it's not in the multi list
err = tList[2].Acquire(ctxMulti)
if err == nil {
t.Errorf("acquire on 2 did not error")
}
err = tList[2].Release(ctxMulti)
if err == nil {
t.Errorf("acquire on 2 did not error")
}
a, err = tList[2].TryAcquire(ctxMulti)
if err == nil {
t.Errorf("try acquire on 2 did not error")
}
if a {
t.Errorf("try acquire on 2 returned true")
}
// try acquire with ctx, first two should be blocked, 3rd should succeed
a, err = tList[0].TryAcquire(ctx)
if err != nil {
t.Errorf("failed to try acquire on 0: %v", err)
}
if a {
t.Errorf("try acquire on 0 returned true")
}
a, err = tList[1].TryAcquire(ctx)
if err != nil {
t.Errorf("failed to try acquire on 1: %v", err)
}
if a {
t.Errorf("try acquire on 1 returned true")
}
a, err = tList[2].TryAcquire(ctx)
if err != nil {
t.Errorf("failed to try acquire on 2: %v", err)
}
if !a {
t.Errorf("try acquire on 2 returned false")
}
// run acquire in background on first two
wg.Add(1)
finished := false
go func() {
defer wg.Done()
err := tList[0].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 0: %v", err)
}
err = tList[1].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 1: %v", err)
}
finished = true
}()
sleepMS(1)
if finished {
t.Errorf("background job finished before release")
}
// release multi
err = ReleaseMulti(ctxMulti, tList[:2])
if err != nil {
t.Errorf("failed to release multi: %v", err)
}
// verify background job finished
wg.Wait()
if !finished {
t.Errorf("background job did not finish")
}
// release all
for i := 0; i < 3; i++ {
err = tList[i].Release(ctx)
if err != nil {
t.Errorf("failed to release %d: %v", i, err)
}
}
// verify acquire, try acquire, and release with old context succeeds
err = tList[0].Acquire(ctxMulti)
if err != nil {
t.Errorf("acquire on stale context failed: %v", err)
}
a, err = tList[0].TryAcquire(ctxMulti)
if err != nil {
t.Errorf("try acquire on stale context failed: %v", err)
}
if a {
t.Errorf("try acquire returned true")
}
err = tList[0].Release(ctxMulti)
if err != nil {
t.Errorf("release on stale context failed: %v", err)
}
// acquire 0 and verify acquire multi blocked
err = tList[0].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 0: %v", err)
}
wg.Add(1)
finished = false
go func() {
defer wg.Done()
ctxNew, err := AcquireMulti(ctx, tList)
if err != nil {
t.Errorf("failed to acquire multi")
}
ctxMulti = ctxNew
finished = true
}()
sleepMS(1)
err = tList[1].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 1: %v", err)
}
err = tList[0].Release(ctx)
if err != nil {
t.Errorf("failed to release 0: %v", err)
}
sleepMS(1)
err = tList[0].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 0: %v", err)
}
err = tList[1].Release(ctx)
if err != nil {
t.Errorf("failed to release 1: %v", err)
}
sleepMS(1)
err = tList[2].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire 2: %v", err)
}
err = tList[0].Release(ctx)
if err != nil {
t.Errorf("failed to release 0: %v", err)
}
sleepMS(1)
if finished {
t.Errorf("acquire multi returned before release was run")
}
// release and verify acquire multi finishes
err = tList[2].Release(ctx)
if err != nil {
t.Errorf("failed to release 2: %v", err)
}
wg.Wait()
if !finished {
t.Errorf("acquire multi did not finish")
}
// run acquire in background
wg.Add(1)
finished = false
go func() {
defer wg.Done()
err := tList[1].Acquire(ctx)
if err != nil {
t.Errorf("failed to acquire after release multi: %v", err)
}
finished = true
}()
sleepMS(1)
if finished {
t.Errorf("acquire returned before release multi was run")
}
// release multi
err = ReleaseMulti(ctx, tList)
if err == nil {
t.Errorf("release multi on wrong context succeeded")
}
err = ReleaseMulti(ctxMulti, tList)
if err != nil {
t.Errorf("release multi failed: %v", err)
}
err = ReleaseMulti(ctxMulti, tList)
if err == nil {
t.Errorf("release multi again succeeded")
}
// verify background acquire finished
wg.Wait()
if !finished {
t.Errorf("acquire did not finish")
}
// verify acquire/release with multi context works after release multi
err = tList[0].Acquire(ctxMulti)
if err != nil {
t.Errorf("failed to acquire after release multi on multi context: %v", err)
}
a, err = tList[2].TryAcquire(ctxMulti)
if err != nil {
t.Errorf("failed to try acquire after release multi on multi context: %v", err)
}
if !a {
t.Errorf("try acquire after release multi with multi context returned false")
}
err = tList[0].Release(ctxMulti)
if err != nil {
t.Errorf("failed to release 0: %v", err)
}
err = tList[2].Release(ctxMulti)
if err != nil {
t.Errorf("failed to release 2: %v", err)
}
// run acquire multi in background with 1 still blocked and cancel context
finished = false
wg.Add(1)
go func() {
defer wg.Done()
ctxMulti, err := AcquireMulti(ctx, tList)
if errors.Is(err, context.Canceled) {
// expected failure from context
return
}
t.Errorf("acquire multi did not fail on canceled context")
_ = ReleaseMulti(ctxMulti, tList)
finished = true
}()
sleepMS(1)
if finished {
t.Errorf("acquire multi finished unexpectedly")
}
// cancel the context, wait for acquire multi to fail
cancel()
wg.Wait()
err = tList[1].Release(ctx)
if err != nil {
t.Errorf("failed to release 1: %v", err)
}
}
func sleepMS(ms int64) {
time.Sleep(time.Millisecond * time.Duration(ms))
}

View File

@ -178,14 +178,16 @@ func TestManifest(t *testing.T) {
tsHost := tsURL.Host
rcHosts := []config.Host{
{
Name: tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
Name: tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
ReqPerSec: 100,
},
{
Name: "missing." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
Name: "missing." + tsHost,
Hostname: tsHost,
TLS: config.TLSDisabled,
ReqPerSec: 100,
},
{
Name: "nohead." + tsHost,
@ -194,6 +196,7 @@ func TestManifest(t *testing.T) {
APIOpts: map[string]string{
"disableHead": "true",
},
ReqPerSec: 100,
},
}
log := &logrus.Logger{

View File

@ -17,14 +17,6 @@ func (rc *RegClient) schemeGet(scheme string) (scheme.API, error) {
return s, nil
}
func (rc *RegClient) schemeInfo(r ref.Ref) (scheme.Info, error) {
schemeAPI, err := rc.schemeGet(r.Scheme)
if err != nil {
return scheme.Info{}, err
}
return schemeAPI.Info(), nil
}
// Close is used to free resources associated with a reference
// With ocidir, this may trigger a garbage collection process
func (rc *RegClient) Close(ctx context.Context, r ref.Ref) error {

View File

@ -82,6 +82,15 @@ func (o *OCIDir) BlobMount(ctx context.Context, refSrc ref.Ref, refTgt ref.Ref,
// BlobPut sends a blob to the repository, returns the digest and size when successful
func (o *OCIDir) BlobPut(ctx context.Context, r ref.Ref, d types.Descriptor, rdr io.Reader) (types.Descriptor, error) {
err := o.throttleGet(r).Acquire(ctx)
if err != nil {
return d, err
}
defer o.throttleGet(r).Release(ctx)
err = o.initIndex(r, false)
if err != nil {
return d, err
}
digester := digest.Canonical.Digester()
rdr = io.TeeReader(rdr, digester.Hash())
// if digest unavailable, read into a []byte+digest, and replace rdr
@ -105,17 +114,22 @@ func (o *OCIDir) BlobPut(ctx context.Context, r ref.Ref, d types.Descriptor, rdr
}
// write the blob to the CAS file
dir := path.Join(r.Path, "blobs", d.Digest.Algorithm().String())
err := rwfs.MkdirAll(o.fs, dir, 0777)
err = rwfs.MkdirAll(o.fs, dir, 0777)
if err != nil && !errors.Is(err, fs.ErrExist) {
return d, fmt.Errorf("failed creating %s: %w", dir, err)
}
file := path.Join(r.Path, "blobs", d.Digest.Algorithm().String(), d.Digest.Encoded())
fd, err := o.fs.Create(file)
// write to a tmp file, rename after validating
tmpFile, err := rwfs.CreateTemp(o.fs, path.Join(r.Path, "blobs", d.Digest.Algorithm().String()), d.Digest.Encoded()+".*.tmp")
if err != nil {
return d, fmt.Errorf("failed creating %s: %w", file, err)
return d, fmt.Errorf("failed creating blob tmp file: %w", err)
}
defer fd.Close()
i, err := io.Copy(fd, rdr)
fi, err := tmpFile.Stat()
if err != nil {
return d, fmt.Errorf("failed to stat blob tmpfile: %w", err)
}
tmpName := fi.Name()
i, err := io.Copy(tmpFile, rdr)
tmpFile.Close()
if err != nil {
return d, err
}
@ -126,11 +140,19 @@ func (o *OCIDir) BlobPut(ctx context.Context, r ref.Ref, d types.Descriptor, rdr
if d.Size > 0 && i != d.Size {
return d, fmt.Errorf("unexpected blob length, expected %d, received %d", d.Size, i)
}
file := path.Join(r.Path, "blobs", d.Digest.Algorithm().String(), d.Digest.Encoded())
err = o.fs.Rename(path.Join(r.Path, "blobs", d.Digest.Algorithm().String(), tmpName), file)
if err != nil {
return d, fmt.Errorf("failed to write blob (rename tmp file): %w", err)
}
d.Size = i
o.log.WithFields(logrus.Fields{
"ref": r.CommonName(),
"file": file,
}).Debug("pushed blob")
o.mu.Lock()
o.refMod(r)
o.mu.Unlock()
return d, nil
}

View File

@ -19,8 +19,8 @@ func (o *OCIDir) Close(ctx context.Context, r ref.Ref) error {
o.mu.Lock()
defer o.mu.Unlock()
if _, ok := o.modRefs[r.Path]; !ok {
// unmodified, no need to gc ref
if gc, ok := o.modRefs[r.Path]; !ok || !gc.mod || gc.locks > 0 {
// unmodified or locked, skip gc
return nil
}
@ -30,7 +30,7 @@ func (o *OCIDir) Close(ctx context.Context, r ref.Ref) error {
}).Debug("running GC")
dl := map[string]bool{}
// recurse through index, manifests, and blob lists, generating a digest list
index, err := o.readIndex(r)
index, err := o.readIndex(r, true)
if err != nil {
return err
}
@ -85,7 +85,7 @@ func (o *OCIDir) closeProcManifest(ctx context.Context, r ref.Ref, m manifest.Ma
cr.Tag = ""
cr.Digest = cur.Digest.String()
(*dl)[cr.Digest] = true
cm, err := o.ManifestGet(ctx, cr)
cm, err := o.manifestGet(ctx, cr)
if err != nil {
// ignore errors in case a manifest has been deleted or sparse copy
o.log.WithFields(logrus.Fields{

View File

@ -32,7 +32,6 @@ func TestClose(t *testing.T) {
}
rCp := r
rCp.Tag = ""
// delete some manifests
for _, d := range []string{
"sha256:e57d957b974fb4d852aee59b9b2e9dcd7cb0f04622e9356324864a270afd18a0", // armv6

View File

@ -25,6 +25,9 @@ import (
// ManifestDelete removes a manifest, including all tags that point to that manifest
func (o *OCIDir) ManifestDelete(ctx context.Context, r ref.Ref, opts ...scheme.ManifestOpts) error {
o.mu.Lock()
defer o.mu.Unlock()
if r.Digest == "" {
return wraperr.New(fmt.Errorf("digest required to delete manifest, reference %s", r.CommonName()), types.ErrMissingDigest)
}
@ -36,7 +39,7 @@ func (o *OCIDir) ManifestDelete(ctx context.Context, r ref.Ref, opts ...scheme.M
// always check for refers with ocidir
if mc.Manifest == nil {
m, err := o.ManifestGet(ctx, r)
m, err := o.manifestGet(ctx, r)
if err != nil {
return fmt.Errorf("failed to pull manifest for refers: %w", err)
}
@ -57,7 +60,7 @@ func (o *OCIDir) ManifestDelete(ctx context.Context, r ref.Ref, opts ...scheme.M
// get index
changed := false
index, err := o.readIndex(r)
index, err := o.readIndex(r, true)
if err != nil {
return fmt.Errorf("failed to read index: %w", err)
}
@ -70,7 +73,7 @@ func (o *OCIDir) ManifestDelete(ctx context.Context, r ref.Ref, opts ...scheme.M
}
// push manifest back out
if changed {
err = o.writeIndex(r, index)
err = o.writeIndex(r, index, true)
if err != nil {
return fmt.Errorf("failed to write index: %w", err)
}
@ -89,7 +92,13 @@ func (o *OCIDir) ManifestDelete(ctx context.Context, r ref.Ref, opts ...scheme.M
// ManifestGet retrieves a manifest from a repository
func (o *OCIDir) ManifestGet(ctx context.Context, r ref.Ref) (manifest.Manifest, error) {
index, err := o.readIndex(r)
o.mu.Lock()
defer o.mu.Unlock()
return o.manifestGet(ctx, r)
}
func (o *OCIDir) manifestGet(ctx context.Context, r ref.Ref) (manifest.Manifest, error) {
index, err := o.readIndex(r, true)
if err != nil {
return nil, fmt.Errorf("unable to read oci index: %w", err)
}
@ -133,7 +142,7 @@ func (o *OCIDir) ManifestGet(ctx context.Context, r ref.Ref) (manifest.Manifest,
// ManifestHead gets metadata about the manifest (existence, digest, mediatype, size)
func (o *OCIDir) ManifestHead(ctx context.Context, r ref.Ref) (manifest.Manifest, error) {
index, err := o.readIndex(r)
index, err := o.readIndex(r, false)
if err != nil {
return nil, fmt.Errorf("unable to read oci index: %w", err)
}
@ -190,6 +199,12 @@ func (o *OCIDir) ManifestHead(ctx context.Context, r ref.Ref) (manifest.Manifest
// ManifestPut sends a manifest to the repository
func (o *OCIDir) ManifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest, opts ...scheme.ManifestOpts) error {
o.mu.Lock()
defer o.mu.Unlock()
return o.manifestPut(ctx, r, m, opts...)
}
func (o *OCIDir) manifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest, opts ...scheme.ManifestOpts) error {
config := scheme.ManifestConfig{}
for _, opt := range opts {
opt(&config)
@ -197,12 +212,9 @@ func (o *OCIDir) ManifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest
if !config.Child && r.Digest == "" && r.Tag == "" {
r.Tag = "latest"
}
indexChanged := false
index, err := o.readIndex(r)
err := o.initIndex(r, true)
if err != nil {
index = indexCreate()
indexChanged = true
return err
}
desc := m.GetDescriptor()
b, err := m.RawBody()
@ -224,30 +236,31 @@ func (o *OCIDir) ManifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest
if err != nil && !errors.Is(err, fs.ErrExist) {
return fmt.Errorf("failed creating %s: %w", dir, err)
}
// write to a tmp file, rename after validating
tmpFile, err := rwfs.CreateTemp(o.fs, dir, desc.Digest.Encoded()+".*.tmp")
if err != nil {
return fmt.Errorf("failed to create manifest tmpfile: %w", err)
}
fi, err := tmpFile.Stat()
if err != nil {
return fmt.Errorf("failed to stat manifest tmpfile: %w", err)
}
tmpName := fi.Name()
_, err = tmpFile.Write(b)
tmpFile.Close()
if err != nil {
return fmt.Errorf("failed to write manifest tmpfile: %w", err)
}
file := path.Join(dir, desc.Digest.Encoded())
fd, err := o.fs.Create(file)
err = o.fs.Rename(path.Join(dir, tmpName), file)
if err != nil {
return fmt.Errorf("failed to create manifest: %w", err)
return fmt.Errorf("failed to write manifest (rename tmpfile): %w", err)
}
defer fd.Close()
_, err = fd.Write(b)
// verify/update index
err = o.updateIndex(r, desc, config.Child, true)
if err != nil {
return fmt.Errorf("failed to write manifest: %w", err)
}
// replace existing tag or create a new entry
if !config.Child {
err := indexSet(&index, r, desc)
if err != nil {
return fmt.Errorf("failed to update index: %w", err)
}
indexChanged = true
}
// write the index.json and oci-layout if it's been changed
if indexChanged {
err = o.writeIndex(r, index)
if err != nil {
return fmt.Errorf("failed to write index: %w", err)
}
return err
}
o.refMod(r)
o.log.WithFields(logrus.Fields{
@ -268,6 +281,5 @@ func (o *OCIDir) ManifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest
}
}
}
return nil
}

View File

@ -12,7 +12,7 @@ import (
"sync"
"github.com/regclient/regclient/internal/rwfs"
"github.com/regclient/regclient/scheme"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/types"
v1 "github.com/regclient/regclient/types/oci/v1"
"github.com/regclient/regclient/types/ref"
@ -23,21 +23,30 @@ const (
imageLayoutFile = "oci-layout"
aOCIRefName = "org.opencontainers.image.ref.name"
aCtrdImageName = "io.containerd.image.name"
defThrottle = 3
)
// OCIDir is used for accessing OCI Image Layouts defined as a directory
type OCIDir struct {
fs rwfs.RWFS
log *logrus.Logger
gc bool
modRefs map[string]ref.Ref
mu sync.Mutex
fs rwfs.RWFS
log *logrus.Logger
gc bool
modRefs map[string]*ociGC
throttle map[string]*throttle.Throttle
throttleDef int
mu sync.Mutex
}
type ociGC struct {
mod bool
locks int
}
type ociConf struct {
fs rwfs.RWFS
gc bool
log *logrus.Logger
fs rwfs.RWFS
gc bool
log *logrus.Logger
throttle int
}
// Opts are used for passing options to ocidir
@ -46,17 +55,20 @@ type Opts func(*ociConf)
// New creates a new OCIDir with options
func New(opts ...Opts) *OCIDir {
conf := ociConf{
log: &logrus.Logger{Out: io.Discard},
gc: true,
log: &logrus.Logger{Out: io.Discard},
gc: true,
throttle: defThrottle,
}
for _, opt := range opts {
opt(&conf)
}
return &OCIDir{
fs: conf.fs,
log: conf.log,
gc: conf.gc,
modRefs: map[string]ref.Ref{},
fs: conf.fs,
log: conf.log,
gc: conf.gc,
modRefs: map[string]*ociGC{},
throttle: map[string]*throttle.Throttle{},
throttleDef: conf.throttle,
}
}
@ -85,15 +97,98 @@ func WithLog(log *logrus.Logger) Opts {
}
}
// Info is experimental, do not use
func (o *OCIDir) Info() scheme.Info {
return scheme.Info{ManifestPushFirst: true}
// WithThrottle provides a number of concurrent write actions (blob/manifest put)
func WithThrottle(count int) Opts {
return func(c *ociConf) {
c.throttle = count
}
}
func (o *OCIDir) readIndex(r ref.Ref) (v1.Index, error) {
// GCLock is used to prevent GC on a ref
func (o *OCIDir) GCLock(r ref.Ref) {
o.mu.Lock()
defer o.mu.Unlock()
if gc, ok := o.modRefs[r.Path]; ok && gc != nil {
gc.locks++
} else {
o.modRefs[r.Path] = &ociGC{locks: 1}
}
}
// GCUnlock removes a hold on GC of a ref, this must be done before the ref is closed
func (o *OCIDir) GCUnlock(r ref.Ref) {
o.mu.Lock()
defer o.mu.Unlock()
if gc, ok := o.modRefs[r.Path]; ok && gc != nil && gc.locks > 0 {
gc.locks--
}
}
// Throttle is used to limit concurrency
func (o *OCIDir) Throttle(r ref.Ref, put bool) []*throttle.Throttle {
tList := []*throttle.Throttle{}
// throttle only applies to put requests
if !put || o.throttleDef <= 0 {
return tList
}
o.mu.Lock()
defer o.mu.Unlock()
return []*throttle.Throttle{o.throttleGet(r)}
}
func (o *OCIDir) throttleGet(r ref.Ref) *throttle.Throttle {
if t, ok := o.throttle[r.Path]; ok {
return t
}
if _, ok := o.throttle[r.Path]; !ok {
// set a default throttle
o.throttle[r.Path] = throttle.New(o.throttleDef)
}
return o.throttle[r.Path]
}
func (o *OCIDir) initIndex(r ref.Ref, locked bool) error {
if !locked {
o.mu.Lock()
defer o.mu.Unlock()
}
layoutFile := path.Join(r.Path, imageLayoutFile)
_, err := rwfs.Stat(o.fs, layoutFile)
if err == nil {
return nil
}
err = rwfs.MkdirAll(o.fs, r.Path, 0777)
if err != nil && !errors.Is(err, fs.ErrExist) {
return fmt.Errorf("failed creating %s: %w", r.Path, err)
}
// create/replace oci-layout file
layout := v1.ImageLayout{
Version: "1.0.0",
}
lb, err := json.Marshal(layout)
if err != nil {
return fmt.Errorf("cannot marshal layout: %w", err)
}
lfh, err := o.fs.Create(layoutFile)
if err != nil {
return fmt.Errorf("cannot create %s: %w", imageLayoutFile, err)
}
defer lfh.Close()
_, err = lfh.Write(lb)
if err != nil {
return fmt.Errorf("cannot write %s: %w", imageLayoutFile, err)
}
return nil
}
func (o *OCIDir) readIndex(r ref.Ref, locked bool) (v1.Index, error) {
if !locked {
o.mu.Lock()
defer o.mu.Unlock()
}
// validate dir
index := v1.Index{}
err := o.valid(r.Path)
err := o.valid(r.Path, true)
if err != nil {
return index, err
}
@ -114,7 +209,38 @@ func (o *OCIDir) readIndex(r ref.Ref) (v1.Index, error) {
return index, nil
}
func (o *OCIDir) writeIndex(r ref.Ref, i v1.Index) error {
func (o *OCIDir) updateIndex(r ref.Ref, d types.Descriptor, child bool, locked bool) error {
if !locked {
o.mu.Lock()
defer o.mu.Unlock()
}
indexChanged := false
index, err := o.readIndex(r, true)
if err != nil {
index = indexCreate()
indexChanged = true
}
if !child {
err := indexSet(&index, r, d)
if err != nil {
return fmt.Errorf("failed to update index: %w", err)
}
indexChanged = true
}
if indexChanged {
err = o.writeIndex(r, index, true)
if err != nil {
return fmt.Errorf("failed to write index: %w", err)
}
}
return nil
}
func (o *OCIDir) writeIndex(r ref.Ref, i v1.Index, locked bool) error {
if !locked {
o.mu.Lock()
defer o.mu.Unlock()
}
err := rwfs.MkdirAll(o.fs, r.Path, 0777)
if err != nil && !errors.Is(err, fs.ErrExist) {
return fmt.Errorf("failed creating %s: %w", r.Path, err)
@ -137,25 +263,38 @@ func (o *OCIDir) writeIndex(r ref.Ref, i v1.Index) error {
return fmt.Errorf("cannot write %s: %w", imageLayoutFile, err)
}
// create/replace index.json file
indexFile := path.Join(r.Path, "index.json")
fh, err := o.fs.Create(indexFile)
tmpFile, err := rwfs.CreateTemp(o.fs, r.Path, "index.json.*.tmp")
if err != nil {
return fmt.Errorf("%s cannot be created: %w", indexFile, err)
return fmt.Errorf("cannot create index tmpfile: %w", err)
}
defer fh.Close()
fi, err := tmpFile.Stat()
if err != nil {
return fmt.Errorf("failed to stat index tmpfile: %w", err)
}
tmpName := fi.Name()
b, err := json.Marshal(i)
if err != nil {
return fmt.Errorf("cannot marshal index: %w", err)
}
_, err = fh.Write(b)
_, err = tmpFile.Write(b)
tmpFile.Close()
if err != nil {
return fmt.Errorf("cannot write index: %w", err)
}
indexFile := path.Join(r.Path, "index.json")
err = o.fs.Rename(path.Join(r.Path, tmpName), indexFile)
if err != nil {
return fmt.Errorf("cannot rename tmpfile to index: %w", err)
}
return nil
}
// func valid (dir) (error) // check for `oci-layout` file and `index.json` for read
func (o *OCIDir) valid(dir string) error {
func (o *OCIDir) valid(dir string, locked bool) error {
if !locked {
o.mu.Lock()
defer o.mu.Unlock()
}
layout := v1.ImageLayout{}
reqVer := "1.0.0"
fh, err := o.fs.Open(path.Join(dir, imageLayoutFile))
@ -178,9 +317,11 @@ func (o *OCIDir) valid(dir string) error {
}
func (o *OCIDir) refMod(r ref.Ref) {
o.mu.Lock()
defer o.mu.Unlock()
o.modRefs[r.Path] = r
if gc, ok := o.modRefs[r.Path]; ok && gc != nil {
gc.mod = true
} else {
o.modRefs[r.Path] = &ociGC{mod: true}
}
}
func indexCreate() v1.Index {

View File

@ -161,11 +161,11 @@ func TestIndex(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := o.writeIndex(r, tt.index)
err := o.writeIndex(r, tt.index, false)
if err != nil {
t.Errorf("failed to write index: %v", err)
}
index, err := o.readIndex(r)
index, err := o.readIndex(r, false)
if err != nil {
t.Errorf("failed to read index: %v", err)
}

View File

@ -16,6 +16,12 @@ import (
// ReferrerList returns a list of referrers to a given reference
func (o *OCIDir) ReferrerList(ctx context.Context, r ref.Ref, opts ...scheme.ReferrerOpts) (referrer.ReferrerList, error) {
o.mu.Lock()
defer o.mu.Unlock()
return o.referrerList(ctx, r, opts...)
}
func (o *OCIDir) referrerList(ctx context.Context, r ref.Ref, opts ...scheme.ReferrerOpts) (referrer.ReferrerList, error) {
config := scheme.ReferrerConfig{}
for _, opt := range opts {
opt(&config)
@ -26,7 +32,7 @@ func (o *OCIDir) ReferrerList(ctx context.Context, r ref.Ref, opts ...scheme.Ref
}
// select a platform from a manifest list
if config.Platform != "" {
m, err := o.ManifestGet(ctx, r)
m, err := o.manifestGet(ctx, r)
if err != nil {
return rl, err
}
@ -46,7 +52,7 @@ func (o *OCIDir) ReferrerList(ctx context.Context, r ref.Ref, opts ...scheme.Ref
}
// if ref is a tag, run a head request for the digest
if r.Digest == "" {
m, err := o.ManifestHead(ctx, r)
m, err := o.manifestGet(ctx, r)
if err != nil {
return rl, err
}
@ -58,7 +64,7 @@ func (o *OCIDir) ReferrerList(ctx context.Context, r ref.Ref, opts ...scheme.Ref
if err != nil {
return rl, err
}
m, err := o.ManifestGet(ctx, rlTag)
m, err := o.manifestGet(ctx, rlTag)
if err != nil {
if errors.Is(err, types.ErrNotFound) {
// empty list, initialize a new manifest
@ -109,7 +115,7 @@ func (o *OCIDir) referrerDelete(ctx context.Context, r ref.Ref, m manifest.Manif
rSubject.Digest = subject.Digest.String()
// pull existing referrer list
rl, err := o.ReferrerList(ctx, rSubject)
rl, err := o.referrerList(ctx, rSubject)
if err != nil {
return err
}
@ -124,13 +130,13 @@ func (o *OCIDir) referrerDelete(ctx context.Context, r ref.Ref, m manifest.Manif
return err
}
if rl.IsEmpty() {
err = o.TagDelete(ctx, rlTag)
err = o.tagDelete(ctx, rlTag)
if err == nil {
return nil
}
// if delete is not supported, fall back to pushing empty list
}
return o.ManifestPut(ctx, rlTag, rl.Manifest)
return o.manifestPut(ctx, rlTag, rl.Manifest)
}
// referrerPut pushes a new referrer associated with a given reference
@ -155,7 +161,7 @@ func (o *OCIDir) referrerPut(ctx context.Context, r ref.Ref, m manifest.Manifest
rSubject.Digest = subject.Digest.String()
// pull existing referrer list
rl, err := o.ReferrerList(ctx, rSubject)
rl, err := o.referrerList(ctx, rSubject)
if err != nil {
return err
}
@ -169,5 +175,5 @@ func (o *OCIDir) referrerPut(ctx context.Context, r ref.Ref, m manifest.Manifest
if err != nil {
return err
}
return o.ManifestPut(ctx, rlTag, rl.Manifest)
return o.manifestPut(ctx, rlTag, rl.Manifest)
}

View File

@ -15,11 +15,17 @@ import (
// TagDelete removes a tag from the repository
func (o *OCIDir) TagDelete(ctx context.Context, r ref.Ref) error {
o.mu.Lock()
defer o.mu.Unlock()
return o.tagDelete(ctx, r)
}
func (o *OCIDir) tagDelete(ctx context.Context, r ref.Ref) error {
if r.Tag == "" {
return types.ErrMissingTag
}
// get index
index, err := o.readIndex(r)
index, err := o.readIndex(r, true)
if err != nil {
return fmt.Errorf("failed to read index: %w", err)
}
@ -35,7 +41,7 @@ func (o *OCIDir) TagDelete(ctx context.Context, r ref.Ref) error {
return fmt.Errorf("failed deleting %s: %w", r.CommonName(), types.ErrNotFound)
}
// push manifest back out
err = o.writeIndex(r, index)
err = o.writeIndex(r, index, true)
if err != nil {
return fmt.Errorf("failed to write index: %w", err)
}
@ -46,7 +52,7 @@ func (o *OCIDir) TagDelete(ctx context.Context, r ref.Ref) error {
// TagList returns a list of tags from the repository
func (o *OCIDir) TagList(ctx context.Context, r ref.Ref, opts ...scheme.TagOpts) (*tag.List, error) {
// get index
index, err := o.readIndex(r)
index, err := o.readIndex(r, false)
if err != nil {
return nil, err
}

View File

@ -224,7 +224,7 @@ func (reg *Reg) ManifestPut(ctx context.Context, r ref.Ref, m manifest.Manifest,
if err != nil {
return fmt.Errorf("failed to put manifest %s: %w", r.CommonName(), err)
}
defer resp.Close()
resp.Close()
if resp.HTTPResponse().StatusCode != 201 {
return fmt.Errorf("failed to put manifest %s: %w", r.CommonName(), reghttp.HTTPError(resp.HTTPResponse().StatusCode))
}

View File

@ -8,7 +8,8 @@ import (
"github.com/regclient/regclient/config"
"github.com/regclient/regclient/internal/reghttp"
"github.com/regclient/regclient/scheme"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/types/ref"
"github.com/sirupsen/logrus"
)
@ -47,6 +48,7 @@ func New(opts ...Opts) *Reg {
blobMaxPut: defaultBlobMax,
hosts: map[string]*config.Host{},
}
r.reghttpOpts = append(r.reghttpOpts, reghttp.WithConfigHost(r.hostGet))
for _, opt := range opts {
opt(&r)
}
@ -54,9 +56,23 @@ func New(opts ...Opts) *Reg {
return &r
}
// Info is experimental and may be removed in the future
func (reg *Reg) Info() scheme.Info {
return scheme.Info{}
// Throttle is used to limit concurrency
func (reg *Reg) Throttle(r ref.Ref, put bool) []*throttle.Throttle {
tList := []*throttle.Throttle{}
host := reg.hostGet(r.Registry)
t := host.Throttle()
if t != nil {
tList = append(tList, t)
}
if !put {
for _, mirror := range host.Mirrors {
t := reg.hostGet(mirror).Throttle()
if t != nil {
tList = append(tList, t)
}
}
}
return tList
}
func (reg *Reg) hostGet(hostname string) *config.Host {
@ -122,7 +138,6 @@ func WithConfigHosts(configHosts []*config.Host) Opts {
}
r.hosts[host.Name] = host
}
r.reghttpOpts = append(r.reghttpOpts, reghttp.WithConfigHosts(configHosts))
}
}

View File

@ -5,6 +5,7 @@ import (
"context"
"io"
"github.com/regclient/regclient/internal/throttle"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/manifest"
@ -15,9 +16,6 @@ import (
// API is used to interface between different methods to store images
type API interface {
// Info is experimental, do not use
Info() Info
// BlobDelete removes a blob from the repository
BlobDelete(ctx context.Context, r ref.Ref, d types.Descriptor) error
// BlobGet retrieves a blob, returning a reader
@ -52,9 +50,18 @@ type Closer interface {
Close(ctx context.Context, r ref.Ref) error
}
// Info provides details on the scheme, this is experimental, do not use
type Info struct {
ManifestPushFirst bool
// GCLocker is used to indicate locking is available for GC management
type GCLocker interface {
// GCLock a reference to prevent GC from triggering during a put, locks are not exclusive.
GCLock(r ref.Ref)
// GCUnlock a reference to allow GC (once all locks are released).
// The reference should be closed after this step and unlock should only be called once per each Lock call.
GCUnlock(r ref.Ref)
}
// Throttler is used to indicate the scheme implements Throttle
type Throttler interface {
Throttle(r ref.Ref, put bool) []*throttle.Throttle
}
// ManifestConfig is used by schemes to import ManifestOpts

29
types/callback.go Normal file
View File

@ -0,0 +1,29 @@
package types
type CallbackState int
const (
CallbackUndef CallbackState = iota
CallbackSkipped
CallbackStarted
CallbackActive
CallbackFinished
CallbackArchived
)
type CallbackKind int
const (
CallbackManifest CallbackKind = iota
CallbackBlob
)
func (k CallbackKind) String() string {
switch k {
case CallbackBlob:
return "blob"
case CallbackManifest:
return "manifest"
}
return "unknown"
}