1
0
mirror of https://github.com/minio/mc.git synced 2025-11-10 13:42:32 +03:00

watch/events: Support events on Get/Head operations as well. (#2116)

Fixes #2073
This commit is contained in:
Harshavardhana
2017-04-10 11:50:52 -07:00
committed by GitHub
parent fac4ef81b2
commit b0968e29e8
15 changed files with 290 additions and 140 deletions

View File

@@ -96,7 +96,10 @@ func (f *fsClient) GetURL() clientURL {
// Watches for all fs events on an input path.
func (f *fsClient) Watch(params watchParams) (*watchObject, *probe.Error) {
eventChan := make(chan Event)
eventChan := make(chan struct {
Event Event
Source Source
})
errorChan := make(chan *probe.Error)
doneChan := make(chan bool)
// Make the channel buffered to ensure no event is dropped. Notify will drop
@@ -110,6 +113,8 @@ func (f *fsClient) Watch(params watchParams) (*watchObject, *probe.Error) {
fsEvents = append(fsEvents, EventTypePut...)
case "delete":
fsEvents = append(fsEvents, EventTypeDelete...)
case "get":
fsEvents = append(fsEvents, EventTypeGet...)
default:
return nil, errInvalidArgument().Trace(event)
}
@@ -159,19 +164,41 @@ func (f *fsClient) Watch(params watchParams) (*watchObject, *probe.Error) {
// we want files
continue
}
eventChan <- Event{
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: time.Now().Format(timeFormatFS),
Size: i.Size(),
Path: event.Path(),
Client: f,
Type: EventCreate,
},
}
} else if IsDeleteEvent(event.Event()) {
eventChan <- Event{
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: time.Now().Format(timeFormatFS),
Path: event.Path(),
Client: f,
Type: EventRemove,
},
}
} else if IsGetEvent(event.Event()) {
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: time.Now().Format(timeFormatFS),
Path: event.Path(),
Client: f,
Type: EventAccessed,
},
}
}
}

View File

@@ -1,7 +1,7 @@
// +build linux
/*
* Minio Client (C) 2015 Minio, Inc.
* Minio Client (C) 2015, 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this fs except in compliance with the License.
@@ -23,34 +23,40 @@ import (
)
var (
// EventTypePut contains the notify events that will cause a put
// EventTypePut contains the notify events that will cause a put (write)
EventTypePut = []notify.Event{notify.InCloseWrite | notify.InMovedTo}
// EventTypeDelete contains the notify events that will cause a delete
// EventTypeDelete contains the notify events that will cause a delete (remove)
EventTypeDelete = []notify.Event{notify.InDelete | notify.InDeleteSelf | notify.InMovedFrom}
// EventTypeGet contains the notify events that will cause a get (read)
EventTypeGet = []notify.Event{notify.InAccess | notify.InOpen}
)
// IsGetEvent checks if the event return is a get event.
func IsGetEvent(event notify.Event) bool {
for _, ev := range EventTypeGet {
if event == ev {
return true
}
}
return false
}
// IsPutEvent checks if the event returned is a put event
func IsPutEvent(event notify.Event) bool {
switch event {
case notify.InCloseWrite:
return true
case notify.InMovedTo:
for _, ev := range EventTypePut {
if event == ev {
return true
}
}
return false
}
// IsDeleteEvent checks if the event returned is a delete event
func IsDeleteEvent(event notify.Event) bool {
switch event {
case notify.InDelete:
return true
case notify.InDeleteSelf:
return true
case notify.InMovedFrom:
for _, ev := range EventTypeDelete {
if event == ev {
return true
}
}
return false
}

View File

@@ -1,7 +1,7 @@
// +build !linux
// +build darwin,freebsd,solaris
/*
* Minio Client (C) 2015 Minio, Inc.
* Minio Client (C) 2015, 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this fs except in compliance with the License.
@@ -21,32 +21,30 @@ package cmd
import "github.com/rjeczalik/notify"
var (
// EventTypePut contains the notify events that will cause a put
// EventTypePut contains the notify events that will cause a put (writer)
EventTypePut = []notify.Event{notify.Create, notify.Write, notify.Rename}
// EventTypeDelete contains the notify events that will cause a delete
// EventTypeDelete contains the notify events that will cause a delete (remove)
EventTypeDelete = []notify.Event{notify.Remove}
// EventTypeGet contains the notify events that will cause a get (read)
EventTypeGet = []notify.Event{} // On macOS, FreeBSD, Solaris this is not available.
)
// IsGetEvent checks if the event return is a get event.
func IsGetEvent(event notify.Event) bool {
return false
}
// IsPutEvent checks if the event returned is a put event
func IsPutEvent(event notify.Event) bool {
switch event {
case notify.Create:
return true
case notify.Rename:
return true
case notify.Write:
for _, ev := range EventTypePut {
if event == ev {
return true
}
}
return false
}
// IsDeleteEvent checks if the event returned is a delete event
func IsDeleteEvent(event notify.Event) bool {
switch event {
case notify.Remove:
return true
}
return false
return event == notify.Remove
}

50
cmd/client-fs_windows.go Normal file
View File

@@ -0,0 +1,50 @@
// +build windows
/*
* Minio Client (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this fs except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "github.com/rjeczalik/notify"
var (
// EventTypePut contains the notify events that will cause a put (writer)
EventTypePut = []notify.Event{notify.Create, notify.Write, notify.Rename}
// EventTypeDelete contains the notify events that will cause a delete (remove)
EventTypeDelete = []notify.Event{notify.Remove}
// EventTypeGet contains the notify events that will cause a get (read)
EventTypeGet = []notify.Event{notify.FileNotifyChangeLastAccess}
)
// IsGetEvent checks if the event return is a get event.
func IsGetEvent(event notify.Event) bool {
return event == notify.FileNotifyChangeLastAccess
}
// IsPutEvent checks if the event returned is a put event
func IsPutEvent(event notify.Event) bool {
for _, ev := range EventTypePut {
if event == ev {
return true
}
}
return false
}
// IsDeleteEvent checks if the event returned is a delete event
func IsDeleteEvent(event notify.Event) bool {
return event == notify.Remove
}

View File

@@ -1,5 +1,5 @@
/*
* Minio Client (C) 2015 Minio, Inc.
* Minio Client (C) 2015, 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -206,6 +206,8 @@ func (c *s3Client) AddNotificationConfig(arn string, events []string, prefix, su
nc.AddEvents(minio.ObjectCreatedAll)
case "delete":
nc.AddEvents(minio.ObjectRemovedAll)
case "get":
nc.AddEvents(minio.ObjectAccessedAll)
default:
return errInvalidArgument().Trace(events...)
}
@@ -366,7 +368,10 @@ func (c *s3Client) ListNotificationConfigs(arn string) ([]notificationConfig, *p
// Start watching on all bucket events for a given account ID.
func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
eventChan := make(chan Event)
eventChan := make(chan struct {
Event Event
Source Source
})
errorChan := make(chan *probe.Error)
doneChan := make(chan bool)
@@ -384,6 +389,8 @@ func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
events = append(events, string(minio.ObjectCreatedAll))
case "delete":
events = append(events, string(minio.ObjectRemovedAll))
case "get":
events = append(events, string(minio.ObjectAccessedAll))
default:
return nil, errInvalidArgument().Trace(event)
}
@@ -441,22 +448,77 @@ func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
u := *c.targetURL
u.Path = path.Join(string(u.Separator), bucketName, key)
if strings.HasPrefix(record.EventName, "s3:ObjectCreated:") {
eventChan <- Event{
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Client: c,
Type: EventCreate,
},
Source: Source{
IP: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
},
}
} else if strings.HasPrefix(record.EventName, "s3:ObjectRemoved:") {
eventChan <- Event{
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: record.EventTime,
Path: u.String(),
Client: c,
Type: EventRemove,
},
Source: Source{
IP: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
},
}
} else if record.EventName == minio.ObjectAccessedGet {
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Client: c,
Type: EventAccessedRead,
},
Source: Source{
IP: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
},
}
} else if record.EventName == minio.ObjectAccessedHead {
eventChan <- struct {
Event Event
Source Source
}{
Event: Event{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Client: c,
Type: EventAccessedStat,
},
Source: Source{
IP: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
},
}
} else {
// ignore other events
}
}
}

View File

@@ -30,7 +30,7 @@ var (
eventsAddFlags = []cli.Flag{
cli.StringFlag{
Name: "events",
Value: "put,delete",
Value: "put,delete,get",
Usage: "Filter specific type of events. Defaults to all events.",
},
cli.StringFlag{
@@ -64,7 +64,7 @@ EXAMPLES:
$ {{.HelpName}} myminio/mybucket arn:aws:sqs:us-west-2:444455556666:your-queue
2. Enable bucket notification with filters parameters
$ {{.HelpName}} s3/mybucket arn:aws:sqs:us-west-2:444455556666:your-queue --events put,delete --prefix photos/ --suffix .jpg
$ {{.HelpName}} s3/mybucket arn:aws:sqs:us-west-2:444455556666:your-queue --events put,delete,get --prefix photos/ --suffix .jpg
`,
}

View File

@@ -289,19 +289,19 @@ func (mj *mirrorJob) watchMirror() {
}
}
sourceURL := newClientURL(event.Path)
aliasedPath := strings.Replace(event.Path, sourceURLFull, mj.sourceURL, -1)
sourceURL := newClientURL(event.Event.Path)
aliasedPath := strings.Replace(event.Event.Path, sourceURLFull, mj.sourceURL, -1)
// build target path, it is the relative of the event.Path with the sourceUrl
// joined to the targetURL.
sourceSuffix := strings.TrimPrefix(event.Path, sourceURLFull)
sourceSuffix := strings.TrimPrefix(event.Event.Path, sourceURLFull)
targetPath := urlJoinPath(mj.targetURL, sourceSuffix)
// newClient needs the unexpanded path, newCLientURL needs the expanded path
targetAlias, expandedTargetPath, _ := mustExpandAlias(targetPath)
targetURL := newClientURL(expandedTargetPath)
if event.Type == EventCreate {
if event.Event.Type == EventCreate {
// we are checking if a destination file exists now, and if we only
// overwrite it when force is enabled.
mirrorURL := URLs{
@@ -310,7 +310,7 @@ func (mj *mirrorJob) watchMirror() {
TargetAlias: targetAlias,
TargetContent: &clientContent{URL: *targetURL},
}
if event.Size == 0 {
if event.Event.Size == 0 {
sourceClient, err := newClient(aliasedPath)
if err != nil {
// cannot create sourceclient
@@ -361,14 +361,14 @@ func (mj *mirrorJob) watchMirror() {
shouldQueue = true
}
if shouldQueue || mj.isForce {
mirrorURL.SourceContent.Size = event.Size
mirrorURL.SourceContent.Size = event.Event.Size
mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes
// adjust total, because we want to show progress of the itemj stiil queued to be copied.
mj.status.SetTotal(mj.status.Total() + event.Size).Update()
mj.status.SetTotal(mj.status.Total() + event.Event.Size).Update()
mj.statusCh <- mj.doMirror(mirrorURL)
}
} else if event.Type == EventRemove {
} else if event.Event.Type == EventRemove {
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: nil,

View File

@@ -36,7 +36,7 @@ var (
watchFlags = []cli.Flag{
cli.StringFlag{
Name: "events",
Value: "put,delete",
Value: "put,delete,get",
Usage: "Filter specific type of events. Defaults to all events by default.",
},
cli.StringFlag{
@@ -64,7 +64,7 @@ var watchCmd = cli.Command{
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} [FLAGS]
{{.HelpName}} PATH [FLAGS]
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
@@ -98,6 +98,7 @@ func checkWatchSyntax(ctx *cli.Context) {
type watchMessage struct {
Status string `json:"status"`
Event Event `json:"events"`
Source Source `json:"source,omitempty"`
}
func (u watchMessage) JSON() string {
@@ -175,7 +176,10 @@ func mainWatch(ctx *cli.Context) error {
if !ok {
return
}
msg := watchMessage{Event: event}
msg := watchMessage{
Event: event.Event,
Source: event.Source,
}
printMsg(msg)
case err, ok := <-wo.Errors():
if !ok {

View File

@@ -24,14 +24,20 @@ import (
"github.com/minio/minio/pkg/probe"
)
// EventType is the type of the event that occurred
// EventType represents the type of the event occurred.
type EventType string
const (
// EventCreate notifies when a new object has been created
// EventCreate notifies when a new object is created
EventCreate EventType = "ObjectCreated"
// EventRemove notifies when a new object has been deleted
// EventRemove notifies when a new object is deleted
EventRemove = "ObjectRemoved"
// EventAccessed notifies when an object is accessed.
EventAccessed = "ObjectAccessed"
// EventAccessedRead notifies when an object is accessed (specifically read).
EventAccessedRead = "ObjectAccessed:Read"
// EventAccessedStat notifies when an object is accessed (specifically stat).
EventAccessedStat = "ObjectAccessed:Stat"
)
// Event contains the information of the event that occurred
@@ -43,6 +49,13 @@ type Event struct {
Type EventType `json:"type"`
}
// Source obtains the information of the client which generated the event.
type Source struct {
IP string `json:"ip"`
Port string `json:"port"`
UserAgent string `json:"userAgent"`
}
type watchParams struct {
accountID string
prefix string
@@ -53,7 +66,10 @@ type watchParams struct {
type watchObject struct {
// events will be put on this chan
events chan Event
events chan struct {
Event Event
Source Source
}
// errors will be put on this chan
errors chan *probe.Error
// will stop the watcher goroutines
@@ -61,7 +77,10 @@ type watchObject struct {
}
// Events returns the chan receiving events
func (w *watchObject) Events() chan Event {
func (w *watchObject) Events() chan struct {
Event Event
Source Source
} {
return w.events
}
@@ -82,7 +101,10 @@ type Watcher struct {
// all errors will be added to this chan
errorsChan chan *probe.Error
// all events will be added to this chan
eventsChan chan Event
eventsChan chan struct {
Event Event
Source Source
}
// array of watchers joined
o []*watchObject
@@ -96,7 +118,10 @@ func NewWatcher(sessionStartTime time.Time) *Watcher {
return &Watcher{
sessionStartTime: sessionStartTime,
errorsChan: make(chan *probe.Error),
eventsChan: make(chan Event),
eventsChan: make(chan struct {
Event Event
Source Source
}),
o: []*watchObject{},
}
}
@@ -107,7 +132,10 @@ func (w *Watcher) Errors() chan *probe.Error {
}
// Events returns a channel which will receive events
func (w *Watcher) Events() chan Event {
func (w *Watcher) Events() chan struct {
Event Event
Source Source
} {
return w.eventsChan
}

View File

@@ -168,14 +168,14 @@ func (c Client) ListObjectsV2(bucketName, objectPrefix string, recursive bool, d
// ?delimiter - A delimiter is a character you use to group keys.
// ?prefix - Limits the response to keys that begin with the specified prefix.
// ?max-keys - Sets the maximum number of keys returned in the response body.
func (c Client) listObjectsV2Query(bucketName, objectPrefix, continuationToken string, fetchOwner bool, delimiter string, maxkeys int) (listBucketV2Result, error) {
func (c Client) listObjectsV2Query(bucketName, objectPrefix, continuationToken string, fetchOwner bool, delimiter string, maxkeys int) (ListBucketV2Result, error) {
// Validate bucket name.
if err := isValidBucketName(bucketName); err != nil {
return listBucketV2Result{}, err
return ListBucketV2Result{}, err
}
// Validate object prefix.
if err := isValidObjectPrefix(objectPrefix); err != nil {
return listBucketV2Result{}, err
return ListBucketV2Result{}, err
}
// Get resources properly escaped and lined up before
// using them in http request.
@@ -216,16 +216,16 @@ func (c Client) listObjectsV2Query(bucketName, objectPrefix, continuationToken s
})
defer closeResponse(resp)
if err != nil {
return listBucketV2Result{}, err
return ListBucketV2Result{}, err
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return listBucketV2Result{}, httpRespToErrorResponse(resp, bucketName, "")
return ListBucketV2Result{}, httpRespToErrorResponse(resp, bucketName, "")
}
}
// Decode listBuckets XML.
listBucketResult := listBucketV2Result{}
listBucketResult := ListBucketV2Result{}
err = xmlDecoder(resp.Body, &listBucketResult)
if err != nil {
return listBucketResult, err

View File

@@ -102,6 +102,14 @@ type eventMeta struct {
Object objectMeta `json:"object"`
}
// sourceInfo represents information on the client that
// triggered the event notification.
type sourceInfo struct {
Host string `json:"host"`
Port string `json:"port"`
UserAgent string `json:"userAgent"`
}
// NotificationEvent represents an Amazon an S3 bucket notification event.
type NotificationEvent struct {
EventVersion string `json:"eventVersion"`
@@ -113,6 +121,7 @@ type NotificationEvent struct {
RequestParameters map[string]string `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 eventMeta `json:"s3"`
Source sourceInfo `json:"source"`
}
// NotificationInfo - represents the collection of notification events, additionally

View File

@@ -42,7 +42,7 @@ type commonPrefix struct {
}
// listBucketResult container for listObjects V2 response.
type listBucketV2Result struct {
type ListBucketV2Result struct {
// A response can contain CommonPrefixes only if you have
// specified a delimiter.
CommonPrefixes []commonPrefix

View File

@@ -543,9 +543,9 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
// For errors verify if its retryable otherwise fail quickly.
errResponse := ToErrorResponse(httpRespToErrorResponse(res, metadata.bucketName, metadata.objectName))
// Bucket region if set in error response, we can retry the
// request with the new region.
if errResponse.Region != "" {
// Bucket region if set in error response and the error code dictates invalid region,
// we can retry the request with the new region.
if errResponse.Code == "InvalidRegion" && errResponse.Region != "" {
c.bucketLocCache.Set(metadata.bucketName, errResponse.Region)
continue // Retry.
}
@@ -625,14 +625,6 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
return req, nil
}
// FIXME: Enable this when Google Cloud Storage properly supports 100-continue.
// Skip setting 'expect' header for Google Cloud Storage, there
// are some known issues - https://github.com/restic/restic/issues/520
if !s3utils.IsGoogleEndpoint(c.endpointURL) && c.s3AccelerateEndpoint == "" {
// Set 'Expect' header for the request.
req.Header.Set("Expect", "100-continue")
}
// Set 'User-Agent' header for the request.
c.setUserAgent(req)

View File

@@ -17,7 +17,6 @@
package minio
import "io"
import "encoding/hex"
// Inherits Client and adds new methods to expose the low level S3 APIs.
type Core struct {
@@ -40,24 +39,14 @@ func (c Core) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int)
return c.listObjectsQuery(bucket, prefix, marker, delimiter, maxKeys)
}
// ListObjectsV2 - List the objects.
func (c Core) ListObjectsV2(bucketName, objectPrefix, continuationToken string, fetchOwner bool, delimiter string, maxkeys int) (ListBucketV2Result, error) {
return c.listObjectsV2Query(bucketName, objectPrefix, continuationToken, fetchOwner, delimiter, maxkeys)
}
// PutObject - Upload object. Uploads using single PUT call.
func (c Core) PutObject(bucket, object string, size int64, data io.Reader, md5Sum string, sha256Sum string, metadata map[string][]string) (ObjectInfo, error) {
var md5SumBytes []byte
var sha256SumBytes []byte
var err error
if md5Sum != "" {
md5SumBytes, err = hex.DecodeString(md5Sum)
if err != nil {
return ObjectInfo{}, err
}
}
if sha256Sum != "" {
sha256SumBytes, err = hex.DecodeString(sha256Sum)
if err != nil {
return ObjectInfo{}, err
}
}
return c.putObjectDo(bucket, object, data, md5SumBytes, sha256SumBytes, size, metadata)
func (c Core) PutObject(bucket, object string, size int64, data io.Reader, md5Sum, sha256Sum []byte, metadata map[string][]string) (ObjectInfo, error) {
return c.putObjectDo(bucket, object, data, md5Sum, sha256Sum, size, metadata)
}
// NewMultipartUpload - Initiates new multipart upload and returns the new uploaID.
@@ -72,23 +61,8 @@ func (c Core) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, de
}
// PutObjectPart - Upload an object part.
func (c Core) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Sum, sha256Sum string) (ObjectPart, error) {
var md5SumBytes []byte
var sha256SumBytes []byte
var err error
if md5Sum != "" {
md5SumBytes, err = hex.DecodeString(md5Sum)
if err != nil {
return ObjectPart{}, err
}
}
if sha256Sum != "" {
sha256SumBytes, err = hex.DecodeString(sha256Sum)
if err != nil {
return ObjectPart{}, err
}
}
return c.uploadPart(bucket, object, uploadID, data, partID, md5SumBytes, sha256SumBytes, size)
func (c Core) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Sum, sha256Sum []byte) (ObjectPart, error) {
return c.uploadPart(bucket, object, uploadID, data, partID, md5Sum, sha256Sum, size)
}
// ListObjectParts - List uploaded parts of an incomplete upload.

6
vendor/vendor.json vendored
View File

@@ -50,10 +50,10 @@
"revisionTime": "2015-10-24T22:24:27-07:00"
},
{
"checksumSHA1": "q2mnNtdkxvuzEx8d5W0uCO0wmgI=",
"checksumSHA1": "mDmRRka//itovrS4jtcpVdr0Weg=",
"path": "github.com/minio/minio-go",
"revision": "ef0dc7e81b008739bd2ab9cb5b5cfbdc22bf32a5",
"revisionTime": "2017-04-01T21:28:19Z"
"revision": "6d434a3827cdfa3fcf43bde6864fbdd2aad19089",
"revisionTime": "2017-04-08T08:09:03Z"
},
{
"checksumSHA1": "qTxOBp3GVxCC70ykb7Hxg6UgWwA=",