1
0
mirror of https://github.com/regclient/regclient.git synced 2025-04-18 22:44:00 +03:00

Feat: regsync option to abort on errors

This stops regsync from retrying with backoffs when a registry is failing.
Multiple errors are also joined using "errors.Join".
Errors from the canceled context are not propagated up.

Signed-off-by: Brandon Mitchell <git@bmitch.net>
This commit is contained in:
Brandon Mitchell 2025-03-18 10:51:23 -04:00
parent 32e5be148f
commit 5c7545ec32
No known key found for this signature in database
GPG Key ID: 6E0FF28C767A8BEE
2 changed files with 124 additions and 61 deletions

View File

@ -55,6 +55,16 @@ func TestProcess(t *testing.T) {
ts := httptest.NewServer(regHandler)
tsURL, _ := url.Parse(ts.URL)
tsHost := tsURL.Host
regROHandler := olareg.New(oConfig.Config{
Storage: oConfig.ConfigStorage{
StoreType: oConfig.StoreMem,
RootDir: "../../testdata",
ReadOnly: &boolT,
},
})
tsRO := httptest.NewServer(regROHandler)
tsROURL, _ := url.Parse(tsRO.URL)
tsROHost := tsROURL.Host
t.Cleanup(func() {
ts.Close()
_ = regHandler.Close()
@ -65,6 +75,11 @@ func TestProcess(t *testing.T) {
Hostname: tsHost,
TLS: config.TLSDisabled,
},
{
Name: tsROHost,
Hostname: tsROHost,
TLS: config.TLSDisabled,
},
{
Name: "registry.example.org",
Hostname: tsHost,
@ -189,13 +204,14 @@ defaults:
// run process on each entry
tt := []struct {
name string
sync ConfigSync
action actionType
expect map[string]digest.Digest
exists []string
missing []string
expErr error
name string
sync ConfigSync
action actionType
abortOnErr bool
expect map[string]digest.Digest
exists []string
missing []string
expErr error
}{
{
name: "Action Missing",
@ -234,6 +250,17 @@ defaults:
},
expErr: nil,
},
{
name: "ReadOnly Error Abort",
sync: ConfigSync{
Source: tsHost + "/testrepo",
Target: tsROHost + "/test-readonly",
Type: "repository",
},
action: actionCopy,
abortOnErr: true,
expErr: errs.ErrHTTPStatus,
},
{
name: "Overwrite",
sync: ConfigSync{
@ -699,10 +726,11 @@ defaults:
t.Run(tc.name, func(t *testing.T) {
// run each test
rootOpts := rootOpts{
conf: conf,
rc: rc,
throttle: pq,
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})),
conf: conf,
rc: rc,
throttle: pq,
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})),
abortOnErr: tc.abortOnErr,
}
syncSetDefaults(&tc.sync, conf.Defaults)
err = rootOpts.process(ctx, tc.sync, tc.action)

View File

@ -54,15 +54,16 @@ const (
type throttle struct{}
type rootOpts struct {
confFile string
verbosity string
logopts []string
log *slog.Logger
format string // for Go template formatting of various commands
missing bool
conf *Config
rc *regclient.RegClient
throttle *pqueue.Queue[throttle]
confFile string
verbosity string
logopts []string
log *slog.Logger
format string // for Go template formatting of various commands
abortOnErr bool
missing bool
conf *Config
rc *regclient.RegClient
throttle *pqueue.Queue[throttle]
}
func NewRootCmd() (*cobra.Command, *rootOpts) {
@ -118,6 +119,9 @@ sync step is finished.`,
_ = curCmd.MarkFlagFilename("config")
_ = curCmd.MarkFlagRequired("config")
}
for _, curCmd := range []*cobra.Command{serverCmd, checkCmd, onceCmd} {
curCmd.Flags().BoolVar(&opts.abortOnErr, "abort-on-error", false, "Immediately abort on any errors")
}
var versionCmd = &cobra.Command{
Use: "version",
@ -193,33 +197,38 @@ func (opts *rootOpts) runOnce(cmd *cobra.Command, args []string) error {
if opts.missing {
action = actionMissing
}
ctx := cmd.Context()
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
var mainErr error
errs := []error{}
for _, s := range opts.conf.Sync {
if opts.conf.Defaults.Parallel > 0 {
wg.Add(1)
go func() {
defer wg.Done()
err := opts.process(ctx, s, action)
if err != nil {
if mainErr == nil {
mainErr = err
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrCanceled) {
if opts.abortOnErr {
cancel()
}
return
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()
} else {
err := opts.process(ctx, s, action)
if err != nil {
if mainErr == nil {
mainErr = err
errs = append(errs, err)
if opts.abortOnErr {
break
}
}
}
}
wg.Wait()
return mainErr
return errors.Join(errs...)
}
// runServer stays running with cron scheduled tasks
@ -228,10 +237,11 @@ func (opts *rootOpts) runServer(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
ctx := cmd.Context()
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
// TODO: switch to joining array of errors once 1.20 is the minimum version
var mainErr error
errs := []error{}
c := cron.New(cron.WithChain(
cron.SkipIfStillRunning(cron.DefaultLogger),
))
@ -246,7 +256,7 @@ func (opts *rootOpts) runServer(cmd *cobra.Command, args []string) error {
slog.String("target", s.Target),
slog.String("type", s.Type),
slog.String("sched", sched))
_, errCron := c.AddFunc(sched, func() {
_, err := c.AddFunc(sched, func() {
opts.log.Debug("Running task",
slog.String("source", s.Source),
slog.String("target", s.Target),
@ -254,18 +264,24 @@ func (opts *rootOpts) runServer(cmd *cobra.Command, args []string) error {
wg.Add(1)
defer wg.Done()
err := opts.process(ctx, s, actionCopy)
if mainErr == nil {
mainErr = err
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrCanceled) {
if opts.abortOnErr {
cancel()
}
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
})
if errCron != nil {
if err != nil {
opts.log.Error("Failed to schedule cron",
slog.String("source", s.Source),
slog.String("target", s.Target),
slog.String("sched", sched),
slog.String("err", errCron.Error()))
if mainErr != nil {
mainErr = errCron
slog.String("err", err.Error()))
errs = append(errs, err)
if opts.abortOnErr {
break
}
}
// immediately copy any images that are missing from target
@ -274,18 +290,23 @@ func (opts *rootOpts) runServer(cmd *cobra.Command, args []string) error {
go func() {
defer wg.Done()
err := opts.process(ctx, s, actionMissing)
if err != nil {
if mainErr == nil {
mainErr = err
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrCanceled) {
if opts.abortOnErr {
cancel()
}
return
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()
} else {
err := opts.process(ctx, s, actionMissing)
if err != nil {
if mainErr == nil {
mainErr = err
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrCanceled) {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
if opts.abortOnErr {
break
}
}
}
@ -296,20 +317,23 @@ func (opts *rootOpts) runServer(cmd *cobra.Command, args []string) error {
slog.String("type", s.Type))
}
}
// wait for any initial copies to finish before scheduling
// wait for any initial copies to finish
wg.Wait()
if ctx.Err() != nil {
return errors.Join(errs...)
}
// start the server and wait until interrupted
c.Start()
// wait on interrupt signal
done := ctx.Done()
if done != nil {
<-done
}
// perform a clean shutdown
opts.log.Info("Stopping server")
// clean shutdown
c.Stop()
opts.log.Debug("Waiting on running tasks")
wg.Wait()
return mainErr
return errors.Join(errs...)
}
// run check is used for a dry-run
@ -318,17 +342,18 @@ func (opts *rootOpts) runCheck(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
var mainErr error
errs := []error{}
ctx := cmd.Context()
for _, s := range opts.conf.Sync {
err := opts.process(ctx, s, actionCheck)
if err != nil {
if mainErr == nil {
mainErr = err
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrCanceled) {
errs = append(errs, err)
if opts.abortOnErr {
break
}
}
}
return mainErr
return errors.Join(errs...)
}
func (opts *rootOpts) loadConf() error {
@ -423,7 +448,8 @@ func (opts *rootOpts) process(ctx context.Context, s ConfigSync, action actionTy
func (opts *rootOpts) processRegistry(ctx context.Context, s ConfigSync, src, tgt string, action actionType) error {
last := ""
var retErr error
errs := []error{}
// loop through pages of the _catalog response
for {
repoOpts := []scheme.RepoOpts{}
if last != "" {
@ -459,11 +485,17 @@ func (opts *rootOpts) processRegistry(ctx context.Context, s ConfigSync, src, tg
}
for _, repo := range sRepoList {
if err := opts.processRepo(ctx, s, fmt.Sprintf("%s/%s", src, repo), fmt.Sprintf("%s/%s", tgt, repo), action); err != nil {
retErr = err
errs = append(errs, err)
if opts.abortOnErr {
break
}
}
}
if opts.abortOnErr && len(errs) > 0 {
break
}
}
return retErr
return errors.Join(errs...)
}
func (opts *rootOpts) processRepo(ctx context.Context, s ConfigSync, src, tgt string, action actionType) error {
@ -551,13 +583,16 @@ func (opts *rootOpts) processRepo(ctx context.Context, s ConfigSync, src, tgt st
}
}
}
var retErr error
errs := []error{}
for _, tag := range sTagList {
if err := opts.processImage(ctx, s, fmt.Sprintf("%s:%s", src, tag), fmt.Sprintf("%s:%s", tgt, tag), action); err != nil {
retErr = err
errs = append(errs, err)
if opts.abortOnErr {
break
}
}
}
return retErr
return errors.Join(errs...)
}
func (opts *rootOpts) processImage(ctx context.Context, s ConfigSync, src, tgt string, action actionType) error {