1
0
mirror of https://github.com/minio/mc.git synced 2025-11-10 13:42:32 +03:00

mirror: For non-s3 make sure we copy and exit properly. (#1995)

This commit is contained in:
Harshavardhana
2017-02-03 15:55:44 -08:00
committed by GitHub
parent 2ffa265d8c
commit cca8c10f33
6 changed files with 79 additions and 182 deletions

View File

@@ -409,17 +409,25 @@ func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
// Start listening on all bucket events. // Start listening on all bucket events.
eventsCh := c.api.ListenBucketNotification(bucket, params.prefix, params.suffix, events, doneCh) eventsCh := c.api.ListenBucketNotification(bucket, params.prefix, params.suffix, events, doneCh)
wo := &watchObject{
events: eventChan,
errors: errorChan,
done: doneChan,
}
// wait for events to occur and sent them through the eventChan and errorChan // wait for events to occur and sent them through the eventChan and errorChan
go func() { go func() {
defer wo.Close()
for notificationInfo := range eventsCh { for notificationInfo := range eventsCh {
if notificationInfo.Err != nil { if notificationInfo.Err != nil {
if nErr, ok := notificationInfo.Err.(minio.ErrorResponse); ok && nErr.Code == "APINotSupported" { if nErr, ok := notificationInfo.Err.(minio.ErrorResponse); ok && nErr.Code == "APINotSupported" {
errorChan <- probe.NewError(errors.New("The specified S3 target is not supported. " + errorChan <- probe.NewError(APINotImplemented{
"Only Minio based storages provide monitoring feature.")) API: "Watch",
} else { APIType: c.targetURL.Scheme + "://" + c.targetURL.Host,
errorChan <- probe.NewError(notificationInfo.Err) })
return
} }
continue errorChan <- probe.NewError(notificationInfo.Err)
} }
for _, record := range notificationInfo.Records { for _, record := range notificationInfo.Records {
@@ -454,11 +462,7 @@ func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
} }
}() }()
return &watchObject{ return wo, nil
events: eventChan,
errors: errorChan,
done: doneChan,
}, nil
} }
// Get - get object with metadata. // Get - get object with metadata.

View File

@@ -163,12 +163,6 @@ func (u clientURL) String() string {
return buf.String() return buf.String()
} }
func isURLVirtualHostStyle(hostURL string) bool {
matchS3, _ := filepath.Match("*.s3*.amazonaws.com", hostURL)
matchGoogle, _ := filepath.Match("*.storage.googleapis.com", hostURL)
return matchS3 || matchGoogle
}
// urlJoinPath Join a path to existing URL. // urlJoinPath Join a path to existing URL.
func urlJoinPath(url1, url2 string) string { func urlJoinPath(url1, url2 string) string {
u1 := newClientURL(url1) u1 := newClientURL(url1)

View File

@@ -49,13 +49,10 @@ func checkCopySyntax(ctx *cli.Context) {
// Check if bucket name is passed for URL type arguments. // Check if bucket name is passed for URL type arguments.
url := newClientURL(tgtURL) url := newClientURL(tgtURL)
if url.Host != "" { if url.Host != "" {
// This check is for type URL.
if !isURLVirtualHostStyle(url.Host) {
if url.Path == string(url.Separator) { if url.Path == string(url.Separator) {
fatalIf(errInvalidArgument().Trace(), fmt.Sprintf("Target %s does not contain bucket name.", tgtURL)) fatalIf(errInvalidArgument().Trace(), fmt.Sprintf("Target %s does not contain bucket name.", tgtURL))
} }
} }
}
// Guess CopyURLsType based on source and target URLs. // Guess CopyURLsType based on source and target URLs.
copyURLsType, err := guessCopyURLType(srcURLs, tgtURL, isRecursive) copyURLsType, err := guessCopyURLType(srcURLs, tgtURL, isRecursive)

View File

@@ -102,25 +102,17 @@ type mirrorJob struct {
// channel for errors // channel for errors
errorCh chan *probe.Error errorCh chan *probe.Error
// Contains if watcher is currently running.
watcherRunning bool
// the global watcher object, which receives notifications of created // the global watcher object, which receives notifications of created
// and deleted files // and deleted files
watcher *Watcher watcher *Watcher
// the queue of objects to be created or removed
mirrorQueue chan URLs
// waitgroup for mirror goroutine, waits till all
// mirror actions have been completed
wgMirror *sync.WaitGroup
// waitgroup for harvest goroutine
wgHarvest *sync.WaitGroup
// channel to halt harvest routine
harvestStop chan bool
// channel for urls to harvest
harvestCh chan URLs
// Hold operation status information // Hold operation status information
status Status status Status
scanBar scanBarFunc scanBar scanBarFunc
// waitgroup for status goroutine, waits till all status // waitgroup for status goroutine, waits till all status
// messages have been written and received // messages have been written and received
wgStatus *sync.WaitGroup wgStatus *sync.WaitGroup
@@ -266,36 +258,10 @@ func (mj *mirrorJob) startStatus() {
}() }()
} }
func (mj *mirrorJob) startMirror() {
isRemove := mj.context.Bool("remove")
mj.wgMirror.Add(1)
// wait for new urls to mirror or delete in the queue, and
// run the actual mirror or remove.
defer mj.wgMirror.Done()
loop:
for {
select {
case sURLs, ok := <-mj.mirrorQueue:
if !ok {
break loop
}
// Save total count.
sURLs.TotalCount = mj.TotalObjects
// Save totalSize.
sURLs.TotalSize = mj.TotalBytes
if sURLs.SourceContent != nil {
mj.statusCh <- mj.doMirror(sURLs)
} else if sURLs.TargetContent != nil && isRemove {
mj.statusCh <- mj.doRemove(sURLs)
}
}
}
}
// this goroutine will watch for notifications, and add modified objects to the queue // this goroutine will watch for notifications, and add modified objects to the queue
func (mj *mirrorJob) watch() { func (mj *mirrorJob) watchMirror() {
isForce := mj.context.Bool("force") isForce := mj.context.Bool("force")
isRemove := mj.context.Bool("remove")
for { for {
select { select {
@@ -304,9 +270,10 @@ func (mj *mirrorJob) watch() {
// channel closed // channel closed
return return
} }
// this code seemj complicated, it will change the expanded alias back to the alias
// again, by replacing the sourceUrlFull with the sourceAlias. This url will be // It will change the expanded alias back to the alias
// used to mirror. // again, by replacing the sourceUrlFull with the sourceAlias.
// This url will be used to mirror.
sourceAlias, sourceURLFull, _ := mustExpandAlias(mj.sourceURL) sourceAlias, sourceURLFull, _ := mustExpandAlias(mj.sourceURL)
// If the passed source URL points to fs, fetch the absolute src path // If the passed source URL points to fs, fetch the absolute src path
@@ -319,7 +286,6 @@ func (mj *mirrorJob) watch() {
} }
sourceURL := newClientURL(event.Path) sourceURL := newClientURL(event.Path)
aliasedPath := strings.Replace(event.Path, sourceURLFull, mj.sourceURL, -1) aliasedPath := strings.Replace(event.Path, sourceURLFull, mj.sourceURL, -1)
// build target path, it is the relative of the event.Path with the sourceUrl // build target path, it is the relative of the event.Path with the sourceUrl
@@ -331,8 +297,6 @@ func (mj *mirrorJob) watch() {
targetAlias, expandedTargetPath, _ := mustExpandAlias(targetPath) targetAlias, expandedTargetPath, _ := mustExpandAlias(targetPath)
targetURL := newClientURL(expandedTargetPath) targetURL := newClientURL(expandedTargetPath)
// todo(nl5887): do we want all those actions here? those could cause the channels to
// block in case of large num of changes
if event.Type == EventCreate { if event.Type == EventCreate {
// we are checking if a destination file exists now, and if we only // we are checking if a destination file exists now, and if we only
// overwrite it when force is enabled. // overwrite it when force is enabled.
@@ -372,10 +336,9 @@ func (mj *mirrorJob) watch() {
if shouldQueue || isForce { if shouldQueue || isForce {
mirrorURL.TotalCount = mj.TotalObjects mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes mirrorURL.TotalSize = mj.TotalBytes
mj.mirrorQueue <- mirrorURL // adjust total, because we want to show progress of the item still queued to be copied.
// adjust total, because we want to show progress of the itemj stiil
// queued to be copied.
mj.status.SetTotal(mj.status.Total() + sourceContent.Size).Update() mj.status.SetTotal(mj.status.Total() + sourceContent.Size).Update()
mj.statusCh <- mj.doMirror(mirrorURL)
} }
continue continue
} }
@@ -397,9 +360,9 @@ func (mj *mirrorJob) watch() {
mirrorURL.SourceContent.Size = event.Size mirrorURL.SourceContent.Size = event.Size
mirrorURL.TotalCount = mj.TotalObjects mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes mirrorURL.TotalSize = mj.TotalBytes
mj.mirrorQueue <- mirrorURL
// adjust total, because we want to show progress of the itemj stiil queued to be copied. // adjust total, because we want to show progress of the itemj stiil queued to be copied.
mj.status.SetTotal(mj.status.Total() + event.Size).Update() mj.status.SetTotal(mj.status.Total() + event.Size).Update()
mj.statusCh <- mj.doMirror(mirrorURL)
} }
} else if event.Type == EventRemove { } else if event.Type == EventRemove {
mirrorURL := URLs{ mirrorURL := URLs{
@@ -410,13 +373,22 @@ func (mj *mirrorJob) watch() {
} }
mirrorURL.TotalCount = mj.TotalObjects mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes mirrorURL.TotalSize = mj.TotalBytes
mj.mirrorQueue <- mirrorURL if mirrorURL.TargetContent != nil && isRemove && isForce {
mj.statusCh <- mj.doRemove(mirrorURL)
}
} }
case err := <-mj.watcher.Errors(): case err := <-mj.watcher.Errors():
switch err.ToGoError().(type) {
case APINotImplemented:
// Ignore error if API is not implemented.
mj.watcherRunning = false
return
default:
errorIf(err, "Unexpected error during monitoring.") errorIf(err, "Unexpected error during monitoring.")
} }
} }
}
} }
func (mj *mirrorJob) watchSourceURL(recursive bool) *probe.Error { func (mj *mirrorJob) watchSourceURL(recursive bool) *probe.Error {
@@ -427,43 +399,22 @@ func (mj *mirrorJob) watchSourceURL(recursive bool) *probe.Error {
return err return err
} }
func (mj *mirrorJob) harvestSourceUrls(recursive bool) { // Fetch urls that need to be mirrored
func (mj *mirrorJob) startMirror() {
var totalBytes int64
var totalObjects int64
isForce := mj.context.Bool("force") isForce := mj.context.Bool("force")
isFake := mj.context.Bool("fake") isFake := mj.context.Bool("fake")
isRemove := mj.context.Bool("remove") isRemove := mj.context.Bool("remove")
defer close(mj.harvestCh)
URLsCh := prepareMirrorURLs(mj.sourceURL, mj.targetURL, isForce, isFake, isRemove) URLsCh := prepareMirrorURLs(mj.sourceURL, mj.targetURL, isForce, isFake, isRemove)
for { for {
select { select {
case <-mj.harvestStop: case sURLs, ok := <-URLsCh:
return
case url := <-URLsCh:
// Send harvested urls.
mj.harvestCh <- url
}
}
}
// Fetch urls that need to be mirrored
func (mj *mirrorJob) harvest(recursive bool) {
mj.wgHarvest.Add(1)
defer mj.wgHarvest.Done()
// harvest urls from source urls
go mj.harvestSourceUrls(recursive)
var totalBytes int64
var totalObjects int64
loop:
for {
select {
case sURLs, ok := <-mj.harvestCh:
if !ok { if !ok {
// finished harvesting urls // finished harvesting urls
break loop return
} }
if sURLs.Error != nil { if sURLs.Error != nil {
if strings.Contains(sURLs.Error.ToGoError().Error(), " is a folder.") { if strings.Contains(sURLs.Error.ToGoError().Error(), " is a folder.") {
@@ -477,27 +428,30 @@ loop:
// copy // copy
totalBytes += sURLs.SourceContent.Size totalBytes += sURLs.SourceContent.Size
} }
mj.mirrorQueue <- sURLs
totalObjects++ totalObjects++
mj.status.SetTotal(totalBytes)
case err := <-mj.errorCh:
mj.status.errorIf(err, "Unable to harvest URL for copying.")
continue
}
}
mj.TotalBytes = totalBytes mj.TotalBytes = totalBytes
mj.TotalObjects = totalObjects mj.TotalObjects = totalObjects
// update progressbar and accounting reader
mj.status.SetTotal(totalBytes) mj.status.SetTotal(totalBytes)
// Save total count.
sURLs.TotalCount = mj.TotalObjects
// Save totalSize.
sURLs.TotalSize = mj.TotalBytes
if sURLs.SourceContent != nil {
mj.statusCh <- mj.doMirror(sURLs)
} else if sURLs.TargetContent != nil && isRemove && isForce {
mj.statusCh <- mj.doRemove(sURLs)
}
case <-mj.trapCh:
os.Exit(0)
}
}
} }
// when using a struct for copying, we could save a lot of passing of variables // when using a struct for copying, we could save a lot of passing of variables
func (mj *mirrorJob) mirror() { func (mj *mirrorJob) mirror() {
recursive := mj.context.Bool("recursive")
if globalQuiet || globalJSON { if globalQuiet || globalJSON {
} else { } else {
// Enable progress bar reader only during default mode // Enable progress bar reader only during default mode
@@ -507,18 +461,6 @@ func (mj *mirrorJob) mirror() {
// start the status go routine // start the status go routine
mj.startStatus() mj.startStatus()
// harvest urls to copy
go mj.harvest(recursive)
// wait for trap signal to close properly
go func() {
// on SIGTERM shutdown and stop
<-mj.trapCh
// Shutdown gracefully.
mj.shutdown()
}()
// monitor mode will watch the source folders for changes, // monitor mode will watch the source folders for changes,
// and queue them for copying. Monitor mode can be stopped // and queue them for copying. Monitor mode can be stopped
// only by SIGTERM. // only by SIGTERM.
@@ -526,45 +468,16 @@ func (mj *mirrorJob) mirror() {
mj.status.fatalIf(err, fmt.Sprintf("Failed to start monitoring.")) mj.status.fatalIf(err, fmt.Sprintf("Failed to start monitoring."))
} }
go mj.watch() // Start watching and mirroring.
go mj.watchMirror()
// Start mirroring.
mj.startMirror() mj.startMirror()
}
func (mj *mirrorJob) stopWatcher() { // Wait if watcher is running.
// Stop events watcher if mj.watcherRunning {
mj.watcher.Stop() <-mj.trapCh
} }
func (mj *mirrorJob) stopHarvester() {
// Ask harvester to stop searching for new objects to mirror
close(mj.harvestStop)
mj.wgHarvest.Wait()
}
func (mj *mirrorJob) stopMirror() {
// wait for current copy action to finish and stop copying
close(mj.mirrorQueue)
mj.wgMirror.Wait()
}
func (mj *mirrorJob) stopStatus() {
// Gracefully stopping status
close(mj.statusCh)
mj.wgStatus.Wait()
}
// Called upon signal trigger.
func (mj *mirrorJob) shutdown() {
// make sure only one shutdown can be active
mj.m.Lock()
// Stop everything
mj.stopWatcher()
mj.stopHarvester()
mj.stopMirror()
mj.stopStatus()
} }
func newMirrorJob(ctx *cli.Context) *mirrorJob { func newMirrorJob(ctx *cli.Context) *mirrorJob {
@@ -587,20 +500,11 @@ func newMirrorJob(ctx *cli.Context) *mirrorJob {
sourceURL: args[0], sourceURL: args[0],
targetURL: args[len(args)-1], // Last one is target targetURL: args[len(args)-1], // Last one is target
errorCh: make(chan *probe.Error),
harvestStop: make(chan bool),
harvestCh: make(chan URLs, 10000),
wgHarvest: new(sync.WaitGroup),
status: status, status: status,
scanBar: func(s string) {}, scanBar: func(s string) {},
statusCh: make(chan URLs), statusCh: make(chan URLs),
wgStatus: new(sync.WaitGroup), wgStatus: new(sync.WaitGroup),
watcherRunning: true,
mirrorQueue: make(chan URLs, 1000),
wgMirror: new(sync.WaitGroup),
watcher: NewWatcher(time.Now().UTC()), watcher: NewWatcher(time.Now().UTC()),
} }

View File

@@ -59,13 +59,11 @@ func checkMirrorSyntax(ctx *cli.Context) {
url := newClientURL(tgtURL) url := newClientURL(tgtURL)
if url.Host != "" { if url.Host != "" {
if !isURLVirtualHostStyle(url.Host) {
if url.Path == string(url.Separator) { if url.Path == string(url.Separator) {
fatalIf(errInvalidArgument().Trace(tgtURL), fatalIf(errInvalidArgument().Trace(tgtURL),
fmt.Sprintf("Target %s does not contain bucket name.", tgtURL)) fmt.Sprintf("Target %s does not contain bucket name.", tgtURL))
} }
} }
}
_, _, err := url2Stat(tgtURL) _, _, err := url2Stat(tgtURL)
// we die on any error other than PathNotFound - destination directory need not exist. // we die on any error other than PathNotFound - destination directory need not exist.
switch err.ToGoError().(type) { switch err.ToGoError().(type) {

View File

@@ -181,7 +181,7 @@ func mainWatch(ctx *cli.Context) error {
if !ok { if !ok {
return return
} }
errorIf(err, "Cannot watch on events.") errorIf(err, "Unable to watch for events.")
return return
} }
} }