From 78a78c072ab6b775e3d0dd642731ecf8889beb51 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 1 May 2015 15:52:08 -0700 Subject: [PATCH] Full cleanup of s3 client for mc --- cmd-access.go | 4 +- cmd-mb.go | 4 +- cmd_test.go | 90 +++--- cp.go | 6 +- ls.go | 20 +- pkg/client/client.go | 27 +- pkg/client/fs/fs.go | 109 ++++--- .../fs/{fs_multi.go => fs_multipart_put.go} | 2 +- pkg/client/fs/fs_test.go | 32 +- pkg/client/mocks/Client.go | 31 +- pkg/client/s3/bucket.go | 241 ++++---------- pkg/client/s3/bucket_list.go | 271 ---------------- pkg/client/s3/bucket_object_metadata.go | 114 +++++++ pkg/client/s3/bucket_routines.go | 304 ++++++++++++++++++ pkg/client/s3/client.go | 203 ------------ pkg/client/s3/client_test.go | 18 +- pkg/client/s3/common.go | 56 ++++ pkg/client/s3/definitions.go | 68 ++++ pkg/client/s3/list_contents.go | 35 ++ pkg/client/s3/object_get.go | 55 ++-- ...bject_multi.go => object_multipart_put.go} | 14 +- pkg/client/s3/object_put.go | 72 ++--- pkg/client/s3/request.go | 107 ++++++ 23 files changed, 1008 insertions(+), 875 deletions(-) rename pkg/client/fs/{fs_multi.go => fs_multipart_put.go} (94%) delete mode 100644 pkg/client/s3/bucket_list.go create mode 100644 pkg/client/s3/bucket_object_metadata.go create mode 100644 pkg/client/s3/bucket_routines.go delete mode 100644 pkg/client/s3/client.go create mode 100644 pkg/client/s3/common.go create mode 100644 pkg/client/s3/definitions.go create mode 100644 pkg/client/s3/list_contents.go rename pkg/client/s3/{object_multi.go => object_multipart_put.go} (91%) create mode 100644 pkg/client/s3/request.go diff --git a/cmd-access.go b/cmd-access.go index 19204abc..49b97223 100644 --- a/cmd-access.go +++ b/cmd-access.go @@ -92,12 +92,12 @@ func doUpdateAccessCmd(methods clientMethods, targetURL, targetACL string, targe } func doUpdateAccess(clnt client.Client, targetURL, targetACL string) (string, error) { - err := clnt.PutBucket(targetACL) + err := clnt.PutBucketACL(targetACL) for i := 0; i < globalMaxRetryFlag && err != nil && isValidRetry(err); i++ { fmt.Println(console.Retry("Retrying ... %d", i)) // Progressively longer delays time.Sleep(time.Duration(i*i) * time.Second) - err = clnt.PutBucket(targetACL) + err = clnt.PutBucketACL(targetACL) } if err != nil { err := iodine.New(err, nil) diff --git a/cmd-mb.go b/cmd-mb.go index 1dc1a5b8..d5253c41 100644 --- a/cmd-mb.go +++ b/cmd-mb.go @@ -89,12 +89,12 @@ func doMakeBucketCmd(methods clientMethods, targetURL string, targetConfig *host // doMakeBucket - wrapper around PutBucket() API func doMakeBucket(clnt client.Client, targetURL string) (string, error) { - err := clnt.PutBucket("") + err := clnt.PutBucket() for i := 0; i < globalMaxRetryFlag && err != nil && isValidRetry(err); i++ { fmt.Println(console.Retry("Retrying ... %d", i)) // Progressively longer delays time.Sleep(time.Duration(i*i) * time.Second) - err = clnt.PutBucket("") + err = clnt.PutBucket() } if err != nil { err := iodine.New(err, nil) diff --git a/cmd_test.go b/cmd_test.go index 0bccaba2..841702e5 100644 --- a/cmd_test.go +++ b/cmd_test.go @@ -132,7 +132,7 @@ func (s *CmdTestSuite) TestCopyRecursive(c *C) { wg.Done() }() - items := []*client.Item{ + contents := []*client.Content{ {Name: "hello1", Time: time.Now(), Size: dataLen1}, {Name: "hello2", Time: time.Now(), Size: dataLen2}, } @@ -150,17 +150,17 @@ func (s *CmdTestSuite) TestCopyRecursive(c *C) { targetURLConfigMap[targetURL] = targetConfig methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once() - itemCh := make(chan client.ItemOnChannel) + contentCh := make(chan client.ContentOnChannel) go func() { - defer close(itemCh) - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + defer close(contentCh) + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } } }() - cl1.On("ListRecursive").Return(itemCh).Once() + cl1.On("ListRecursive").Return(contentCh).Once() sourceReader1 := ioutil.NopCloser(bytes.NewBufferString(data1)) sourceReader2 := ioutil.NopCloser(bytes.NewBufferString(data2)) methods.On("getSourceReader", sourceURL+"hello1", sourceConfig).Return(sourceReader1, dataLen1, etag1, nil).Once() @@ -281,7 +281,7 @@ func (s *CmdTestSuite) TestLsCmdWithBucket(c *C) { data2 := "hello world 2" dataLen2 := int64(len(data2)) - items := []*client.Item{ + contents := []*client.Content{ {Name: "hello1", Time: time.Now(), Size: dataLen1}, {Name: "hello2", Time: time.Now(), Size: dataLen2}, } @@ -293,17 +293,17 @@ func (s *CmdTestSuite) TestLsCmdWithBucket(c *C) { sourceURLConfigMap[sourceURL] = sourceConfig methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once() - itemCh := make(chan client.ItemOnChannel) + contentCh := make(chan client.ContentOnChannel) go func() { - defer close(itemCh) - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + defer close(contentCh) + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } } }() - cl1.On("ListRecursive").Return(itemCh).Once() + cl1.On("ListRecursive").Return(contentCh).Once() err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false) c.Assert(err, IsNil) @@ -324,7 +324,7 @@ func (s *CmdTestSuite) TestLsCmdWithFilePath(c *C) { data2 := "hello world 2" dataLen2 := int64(len(data2)) - items := []*client.Item{ + contents := []*client.Content{ {Name: "hello1", Time: time.Now(), Size: dataLen1}, {Name: "hello2", Time: time.Now(), Size: dataLen2}, } @@ -337,17 +337,17 @@ func (s *CmdTestSuite) TestLsCmdWithFilePath(c *C) { methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once() - itemCh := make(chan client.ItemOnChannel) + contentCh := make(chan client.ContentOnChannel) go func() { - defer close(itemCh) - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + defer close(contentCh) + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } } }() - cl1.On("ListRecursive").Return(itemCh).Once() + cl1.On("ListRecursive").Return(contentCh).Once() err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false) c.Assert(err, IsNil) @@ -362,7 +362,7 @@ func (s *CmdTestSuite) TestLsCmdListsBuckets(c *C) { methods := &MockclientMethods{} cl1 := &clientMocks.Client{} - buckets := []*client.Item{ + buckets := []*client.Content{ {Name: "bucket1", Time: time.Now()}, {Name: "bucket2", Time: time.Now()}, } @@ -374,17 +374,17 @@ func (s *CmdTestSuite) TestLsCmdListsBuckets(c *C) { sourceURLConfigMap[sourceURL] = sourceConfig methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once() - itemCh := make(chan client.ItemOnChannel) + contentCh := make(chan client.ContentOnChannel) go func() { - defer close(itemCh) + defer close(contentCh) for _, bucket := range buckets { - itemCh <- client.ItemOnChannel{ - Item: bucket, - Err: nil, + contentCh <- client.ContentOnChannel{ + Content: bucket, + Err: nil, } } }() - cl1.On("ListRecursive").Return(itemCh).Once() + cl1.On("ListRecursive").Return(contentCh).Once() err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false) c.Assert(err, IsNil) @@ -406,7 +406,7 @@ func (s *CmdTestSuite) TestMbCmd(c *C) { targetURLConfigMap[targetURL] = targetConfig methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "").Return(nil).Once() + cl1.On("PutBucket").Return(nil).Once() msg, err := doMakeBucketCmd(methods, targetURL, targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) @@ -429,19 +429,19 @@ func (s *CmdTestSuite) TestAccessCmd(c *C) { targetURLConfigMap[targetURL] = targetConfig methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "private").Return(nil).Once() + cl1.On("PutBucketACL", "private").Return(nil).Once() msg, err := doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "public").Return(nil).Once() + cl1.On("PutBucketACL", "public").Return(nil).Once() msg, err = doUpdateAccessCmd(methods, targetURL, "public", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "readonly").Return(nil).Once() + cl1.On("PutBucketACL", "readonly").Return(nil).Once() msg, err = doUpdateAccessCmd(methods, targetURL, "readonly", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) @@ -469,8 +469,8 @@ func (s *CmdTestSuite) TestAccessCmdFailures(c *C) { c.Assert(err, Not(IsNil)) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "private").Return(&net.DNSError{}).Once() - cl1.On("PutBucket", "private").Return(nil).Once() + cl1.On("PutBucketACL", "private").Return(&net.DNSError{}).Once() + cl1.On("PutBucketACL", "private").Return(nil).Once() msg, err = doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) @@ -484,7 +484,7 @@ func (s *CmdTestSuite) TestAccessCmdFailures(c *C) { err.Op = "dial" err.Net = "tcp" err.Err = errors.New("Another expected error") - cl1.On("PutBucket", "private").Return(err).Once() + cl1.On("PutBucketACL", "private").Return(err).Once() } msg, err = doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false) globalMaxRetryFlag = retries @@ -514,8 +514,8 @@ func (s *CmdTestSuite) TestMbCmdFailures(c *C) { c.Assert(err, Not(IsNil)) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "").Return(&net.DNSError{}).Once() - cl1.On("PutBucket", "").Return(nil).Once() + cl1.On("PutBucket").Return(&net.DNSError{}).Once() + cl1.On("PutBucket").Return(nil).Once() msg, err = doMakeBucketCmd(methods, targetURL, targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) @@ -529,7 +529,7 @@ func (s *CmdTestSuite) TestMbCmdFailures(c *C) { err.Op = "dial" err.Net = "tcp" err.Err = errors.New("Another expected error") - cl1.On("PutBucket", "").Return(err).Once() + cl1.On("PutBucket").Return(err).Once() } msg, err = doMakeBucketCmd(methods, targetURL, targetConfig, false) globalMaxRetryFlag = retries @@ -554,19 +554,19 @@ func (s *CmdTestSuite) TestAccessCmdOnFile(c *C) { targetURLConfigMap[targetURL] = targetConfig methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "private").Return(nil).Once() + cl1.On("PutBucketACL", "private").Return(nil).Once() msg, err := doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "public").Return(nil).Once() + cl1.On("PutBucketACL", "public").Return(nil).Once() msg, err = doUpdateAccessCmd(methods, targetURL, "public", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "readonly").Return(nil).Once() + cl1.On("PutBucketACL", "readonly").Return(nil).Once() msg, err = doUpdateAccessCmd(methods, targetURL, "readonly", targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) @@ -589,7 +589,7 @@ func (s *CmdTestSuite) TestMbCmdOnFile(c *C) { targetURLConfigMap[targetURL] = targetConfig methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once() - cl1.On("PutBucket", "").Return(nil).Once() + cl1.On("PutBucket").Return(nil).Once() msg, err := doMakeBucketCmd(methods, targetURL, targetConfig, false) c.Assert(msg, Equals, "") c.Assert(err, IsNil) diff --git a/cp.go b/cp.go index af37d01b..490a0da9 100644 --- a/cp.go +++ b/cp.go @@ -85,11 +85,11 @@ func doCopySingleSourceRecursive(methods clientMethods, sourceURL, targetURL str if err != nil { return iodine.New(err, nil) } - for itemCh := range sourceClnt.ListRecursive() { - if itemCh.Err != nil { + for contentCh := range sourceClnt.ListRecursive() { + if contentCh.Err != nil { continue } - newSourceURL, newTargetURL := getNewURLRecursive(sourceURL, targetURL, itemCh.Item.Name) + newSourceURL, newTargetURL := getNewURLRecursive(sourceURL, targetURL, contentCh.Content.Name) if err := doCopySingleSource(methods, newSourceURL, newTargetURL, sourceConfig, targetConfig); err != nil { // verify for directory related errors, if "open" failed on directories ignore those errors switch e := iodine.ToError(err).(type) { diff --git a/ls.go b/ls.go index 611b8526..392138f6 100644 --- a/ls.go +++ b/ls.go @@ -35,8 +35,8 @@ const ( printDate = "2006-01-02 15:04:05 MST" ) -// printItem prints item meta-data -func printItem(date time.Time, v int64, name string, fileType os.FileMode) { +// printContent prints content meta-data +func printContent(date time.Time, v int64, name string, fileType os.FileMode) { fmt.Printf(console.Time("[%s] ", date.Local().Format(printDate))) fmt.Printf(console.Size("%6s ", humanize.IBytes(uint64(v)))) @@ -57,12 +57,12 @@ func printItem(date time.Time, v int64, name string, fileType os.FileMode) { // doList - list all entities inside a folder func doList(clnt client.Client, targetURL string) error { var err error - for itemCh := range clnt.List() { - if itemCh.Err != nil { - err = itemCh.Err + for contentCh := range clnt.List() { + if contentCh.Err != nil { + err = contentCh.Err break } - printItem(itemCh.Item.Time, itemCh.Item.Size, itemCh.Item.Name, itemCh.Item.FileType) + printContent(contentCh.Content.Time, contentCh.Content.Size, contentCh.Content.Name, contentCh.Content.FileType) } if err != nil { return iodine.New(err, map[string]string{"Target": targetURL}) @@ -73,12 +73,12 @@ func doList(clnt client.Client, targetURL string) error { // doListRecursive - list all entities inside folders and sub-folders recursively func doListRecursive(clnt client.Client, targetURL string) error { var err error - for itemCh := range clnt.ListRecursive() { - if itemCh.Err != nil { - err = itemCh.Err + for contentCh := range clnt.ListRecursive() { + if contentCh.Err != nil { + err = contentCh.Err break } - printItem(itemCh.Item.Time, itemCh.Item.Size, itemCh.Item.Name, itemCh.Item.FileType) + printContent(contentCh.Content.Time, contentCh.Content.Size, contentCh.Content.Name, contentCh.Content.FileType) } if err != nil { return iodine.New(err, map[string]string{"Target": targetURL}) diff --git a/pkg/client/client.go b/pkg/client/client.go index 61065094..07b35aa4 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -29,12 +29,13 @@ type Client interface { MultipartUpload // Common operations - Stat() (item *Item, err error) - List() <-chan ItemOnChannel - ListRecursive() <-chan ItemOnChannel + Stat() (content *Content, err error) + List() <-chan ContentOnChannel + ListRecursive() <-chan ContentOnChannel // Bucket operations - PutBucket(acl string) error + PutBucket() error + PutBucketACL(acl string) error // Object operations Get() (body io.ReadCloser, size int64, md5 string, err error) @@ -48,7 +49,7 @@ type MultipartUpload interface { UploadPart(uploadID string, body io.ReadSeeker, contentLength, partNumber int64) (md5hex string, err error) CompleteMultiPartUpload(uploadID string) (location, md5hex string, err error) AbortMultiPartUpload(uploadID string) error - ListParts(uploadID string) (items *PartItems, err error) + ListParts(uploadID string) (contents *PartContents, err error) } // Part - part xml response @@ -59,22 +60,22 @@ type Part struct { Size int64 } -// PartItems - part xml items response -type PartItems struct { +// PartContents - part xml contents response +type PartContents struct { Key string UploadID string IsTruncated bool Parts []*Part } -// ItemOnChannel - List items on channel -type ItemOnChannel struct { - Item *Item - Err error +// ContentOnChannel - List contents on channel +type ContentOnChannel struct { + Content *Content + Err error } -// Item - object item list -type Item struct { +// Content - object content list +type Content struct { Name string Time time.Time Size int64 diff --git a/pkg/client/fs/fs.go b/pkg/client/fs/fs.go index f5cf13e1..a6f706ec 100644 --- a/pkg/client/fs/fs.go +++ b/pkg/client/fs/fs.go @@ -59,11 +59,11 @@ func (f *fsClient) fsStat() (os.FileInfo, error) { // Get - download an object from bucket func (f *fsClient) Get() (io.ReadCloser, int64, string, error) { - item, err := f.getFSMetadata() + content, err := f.getFSMetadata() if err != nil { return nil, 0, "", iodine.New(err, nil) } - if item.FileType.IsDir() { + if content.FileType.IsDir() { return nil, 0, "", iodine.New(ISFolder{path: f.path}, nil) } body, err := os.Open(f.path) @@ -82,7 +82,7 @@ func (f *fsClient) Get() (io.ReadCloser, int64, string, error) { return nil, 0, "", iodine.New(err, nil) } md5Str := hex.EncodeToString(h.Sum(nil)) - return body, item.Size, md5Str, nil + return body, content.Size, md5Str, nil } // GetPartial - download a partial object from bucket @@ -90,14 +90,14 @@ func (f *fsClient) GetPartial(offset, length int64) (io.ReadCloser, int64, strin if offset < 0 { return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil) } - item, err := f.getFSMetadata() + content, err := f.getFSMetadata() if err != nil { return nil, 0, "", iodine.New(err, nil) } - if item.FileType.IsDir() { + if content.FileType.IsDir() { return nil, 0, "", iodine.New(ISFolder{path: f.path}, nil) } - if offset > item.Size || (offset+length-1) > item.Size { + if offset > content.Size || (offset+length-1) > content.Size { return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil) } body, err := os.Open(f.path) @@ -124,26 +124,26 @@ func (f *fsClient) GetPartial(offset, length int64) (io.ReadCloser, int64, strin } // List - list files and folders -func (f *fsClient) List() <-chan client.ItemOnChannel { - itemCh := make(chan client.ItemOnChannel) - go f.list(itemCh) - return itemCh +func (f *fsClient) List() <-chan client.ContentOnChannel { + contentCh := make(chan client.ContentOnChannel) + go f.list(contentCh) + return contentCh } -func (f *fsClient) list(itemCh chan client.ItemOnChannel) { - defer close(itemCh) +func (f *fsClient) list(contentCh chan client.ContentOnChannel) { + defer close(contentCh) dir, err := os.Open(f.path) if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), } } fi, err := os.Lstat(f.path) if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), } } defer dir.Close() @@ -156,46 +156,46 @@ func (f *fsClient) list(itemCh chan client.ItemOnChannel) { // large quantities of files files, err := dir.Readdir(-1) if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), } } for _, file := range files { - item := &client.Item{ + content := &client.Content{ Name: file.Name(), Time: file.ModTime(), Size: file.Size(), FileType: file.Mode(), } - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } } default: - item := &client.Item{ + content := &client.Content{ Name: dir.Name(), Time: fi.ModTime(), Size: fi.Size(), FileType: fi.Mode(), } - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } } } // ListRecursive - list files and folders recursively -func (f *fsClient) ListRecursive() <-chan client.ItemOnChannel { - itemCh := make(chan client.ItemOnChannel) - go f.listRecursive(itemCh) - return itemCh +func (f *fsClient) ListRecursive() <-chan client.ContentOnChannel { + contentCh := make(chan client.ContentOnChannel) + go f.listRecursive(contentCh) + return contentCh } -func (f *fsClient) listRecursive(itemCh chan client.ItemOnChannel) { - defer close(itemCh) +func (f *fsClient) listRecursive(contentCh chan client.ContentOnChannel) { + defer close(contentCh) visitFS := func(fp string, fi os.FileInfo, err error) error { if err != nil { if os.IsPermission(err) { // skip inaccessible files @@ -203,23 +203,23 @@ func (f *fsClient) listRecursive(itemCh chan client.ItemOnChannel) { } return err // fatal } - item := &client.Item{ + content := &client.Content{ Name: fp, Time: fi.ModTime(), Size: fi.Size(), FileType: fi.Mode(), } - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, } return nil } err := filepath.Walk(f.path, visitFS) if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), } } } @@ -255,7 +255,16 @@ func aclToPerm(acl string) os.FileMode { } // PutBucket - create a new bucket -func (f *fsClient) PutBucket(acl string) error { +func (f *fsClient) PutBucket() error { + err := os.MkdirAll(f.path, 0775) + if err != nil { + return iodine.New(err, nil) + } + return nil +} + +// PutBucket - create a new bucket +func (f *fsClient) PutBucketACL(acl string) error { if !isValidBucketACL(acl) { return iodine.New(errors.New("invalid acl"), nil) } @@ -271,19 +280,19 @@ func (f *fsClient) PutBucket(acl string) error { } // getFSMetadata - -func (f *fsClient) getFSMetadata() (item *client.Item, err error) { +func (f *fsClient) getFSMetadata() (content *client.Content, err error) { st, err := f.fsStat() if err != nil { return nil, iodine.New(err, nil) } - item = new(client.Item) - item.Name = st.Name() - item.Size = st.Size() - item.Time = st.ModTime() - return item, nil + content = new(client.Content) + content.Name = st.Name() + content.Size = st.Size() + content.Time = st.ModTime() + return content, nil } // Stat - get metadata from path -func (f *fsClient) Stat() (item *client.Item, err error) { +func (f *fsClient) Stat() (content *client.Content, err error) { return f.getFSMetadata() } diff --git a/pkg/client/fs/fs_multi.go b/pkg/client/fs/fs_multipart_put.go similarity index 94% rename from pkg/client/fs/fs_multi.go rename to pkg/client/fs/fs_multipart_put.go index 9bd3657c..c72bd765 100644 --- a/pkg/client/fs/fs_multi.go +++ b/pkg/client/fs/fs_multipart_put.go @@ -46,6 +46,6 @@ func (c *fsClient) AbortMultiPartUpload(uploadID string) error { } // ListParts - -func (c *fsClient) ListParts(uploadID string) (items *client.PartItems, err error) { +func (c *fsClient) ListParts(uploadID string) (contents *client.PartContents, err error) { return nil, iodine.New(client.APINotImplemented{API: "ListParts"}, nil) } diff --git a/pkg/client/fs/fs_test.go b/pkg/client/fs/fs_test.go index 2a87b44d..87651777 100644 --- a/pkg/client/fs/fs_test.go +++ b/pkg/client/fs/fs_test.go @@ -68,12 +68,12 @@ func (s *MySuite) TestList(c *C) { c.Assert(size, Equals, dataLen) fsc = New(root) - var items []*client.Item - for itemCh := range fsc.ListRecursive() { - items = append(items, itemCh.Item) + var contents []*client.Content + for contentCh := range fsc.ListRecursive() { + contents = append(contents, contentCh.Content) } c.Assert(err, IsNil) - c.Assert(len(items), Equals, 3) + c.Assert(len(contents), Equals, 3) } func (s *MySuite) TestPutBucket(c *C) { @@ -83,7 +83,7 @@ func (s *MySuite) TestPutBucket(c *C) { bucketPath := filepath.Join(root, "bucket") fsc := New(bucketPath) - err = fsc.PutBucket("") + err = fsc.PutBucket() c.Assert(err, IsNil) } @@ -94,12 +94,26 @@ func (s *MySuite) TestStatBucket(c *C) { bucketPath := filepath.Join(root, "bucket") fsc := New(bucketPath) - err = fsc.PutBucket("") + err = fsc.PutBucket() c.Assert(err, IsNil) _, err = fsc.Stat() c.Assert(err, IsNil) } +func (s *MySuite) TestPutBucketACL(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "fs-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + + bucketPath := filepath.Join(root, "bucket") + fsc := New(bucketPath) + err = fsc.PutBucket() + c.Assert(err, IsNil) + + err = fsc.PutBucketACL("private") + c.Assert(err, IsNil) +} + func (s *MySuite) TestPutObject(c *C) { root, err := ioutil.TempDir(os.TempDir(), "fs-") c.Assert(err, IsNil) @@ -169,8 +183,8 @@ func (s *MySuite) TestStat(c *C) { _, err = io.CopyN(writer, bytes.NewBufferString(data), dataLen) c.Assert(err, IsNil) - item, err := fsc.Stat() + content, err := fsc.Stat() c.Assert(err, IsNil) - c.Assert(item.Name, Equals, "object") - c.Assert(item.Size, Equals, dataLen) + c.Assert(content.Name, Equals, "object") + c.Assert(content.Size, Equals, dataLen) } diff --git a/pkg/client/mocks/Client.go b/pkg/client/mocks/Client.go index 699a6958..99587390 100644 --- a/pkg/client/mocks/Client.go +++ b/pkg/client/mocks/Client.go @@ -29,7 +29,16 @@ type Client struct { } // PutBucket is a mock method -func (m *Client) PutBucket(acl string) error { +func (m *Client) PutBucket() error { + ret := m.Called() + + r0 := ret.Error(0) + + return r0 +} + +// PutBucketACL is a mock method +func (m *Client) PutBucketACL(acl string) error { ret := m.Called(acl) r0 := ret.Error(0) @@ -38,12 +47,12 @@ func (m *Client) PutBucket(acl string) error { } // Stat is a mock method -func (m *Client) Stat() (*client.Item, error) { +func (m *Client) Stat() (*client.Content, error) { ret := m.Called() - var r0 *client.Item + var r0 *client.Content if ret.Get(0) != nil { - r0 = ret.Get(0).(*client.Item) + r0 = ret.Get(0).(*client.Content) } r1 := ret.Error(1) @@ -52,16 +61,16 @@ func (m *Client) Stat() (*client.Item, error) { } // List is a mock method -func (m *Client) List() <-chan client.ItemOnChannel { +func (m *Client) List() <-chan client.ContentOnChannel { ret := m.Called() - r0 := ret.Get(0).(chan client.ItemOnChannel) + r0 := ret.Get(0).(chan client.ContentOnChannel) return r0 } // ListRecursive is a mock method -func (m *Client) ListRecursive() <-chan client.ItemOnChannel { +func (m *Client) ListRecursive() <-chan client.ContentOnChannel { ret := m.Called() - r0 := ret.Get(0).(chan client.ItemOnChannel) + r0 := ret.Get(0).(chan client.ContentOnChannel) return r0 } @@ -140,12 +149,12 @@ func (m *Client) AbortMultiPartUpload(uploadID string) error { } // ListParts is a mock method -func (m *Client) ListParts(uploadID string) (*client.PartItems, error) { +func (m *Client) ListParts(uploadID string) (*client.PartContents, error) { ret := m.Called(uploadID) - var r0 *client.PartItems + var r0 *client.PartContents if ret.Get(0) != nil { - r0 = ret.Get(0).(*client.PartItems) + r0 = ret.Get(0).(*client.PartContents) } r1 := ret.Error(1) diff --git a/pkg/client/s3/bucket.go b/pkg/client/s3/bucket.go index 31b2642a..78fcf104 100644 --- a/pkg/client/s3/bucket.go +++ b/pkg/client/s3/bucket.go @@ -17,13 +17,7 @@ package s3 import ( - "encoding/xml" - "errors" "fmt" - "os" - "strconv" - "strings" - "time" "net/http" @@ -38,8 +32,6 @@ func isValidBucketACL(acl string) bool { case "public-read": fallthrough case "public-read-write": - fallthrough - case "": return true default: return false @@ -48,99 +40,27 @@ func isValidBucketACL(acl string) bool { /// Bucket API operations -// Get list of buckets -func (c *s3Client) listBucketsInternal() ([]*client.Item, error) { - var res *http.Response - var err error - - u := fmt.Sprintf("%s://%s/", c.Scheme, c.Host) - req, err := c.newRequest("GET", u, nil) - if err != nil { - return nil, iodine.New(err, nil) - } - - // do not ignore signatures for 'listBuckets()' - // it is never a public request for amazon s3 - // so lets aggressively verify - if strings.Contains(c.Host, "amazonaws.com") && (c.AccessKeyID == "" || c.SecretAccessKey == "") { - msg := "Authorization key cannot be empty for listing buckets, please choose a valid bucketname if its a public request" - return nil, iodine.New(errors.New(msg), nil) - } - if c.AccessKeyID != "" && c.SecretAccessKey != "" { - c.signRequest(req, c.Host) - } - - res, err = c.Transport.RoundTrip(req) - if err != nil { - return nil, iodine.New(err, nil) - } - if res != nil { - if res.StatusCode != http.StatusOK { - err = NewError(res) - return nil, iodine.New(err, nil) - } - } - defer res.Body.Close() - - type bucket struct { - Name string - CreationDate time.Time - } - type allMyBuckets struct { - Buckets struct { - Bucket []*bucket - } - } - var buckets allMyBuckets - if err := xml.NewDecoder(res.Body).Decode(&buckets); err != nil { - return nil, iodine.New(client.UnexpectedError{ - Err: errors.New("Malformed response received from server")}, - map[string]string{"XMLError": err.Error()}) - } - var items []*client.Item - for _, b := range buckets.Buckets.Bucket { - item := new(client.Item) - item.Name = b.Name - item.Time = b.CreationDate - item.FileType = os.ModeDir - items = append(items, item) - } - return items, nil -} - -// PutBucket - create new bucket -func (c *s3Client) PutBucket(acl string) error { - if !isValidBucketACL(acl) { - return iodine.New(InvalidACL{ACL: acl}, nil) - } - bucket, _ := c.url2BucketAndObject() - if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") { +// PutBucket - create a new bucket +func (c *s3Client) PutBucket() error { + bucket, object := c.url2BucketAndObject() + if !client.IsValidBucketName(bucket) { return iodine.New(InvalidBucketName{Bucket: bucket}, nil) } - var req *http.Request - var err error - switch len(acl) > 0 { - case true: - u := fmt.Sprintf("%s://%s/%s?acl", c.Scheme, c.Host, bucket) - // new request - req, err = c.newRequest("PUT", u, nil) - if err != nil { - return iodine.New(err, nil) - } - // by default without acl while creating a bucket - // make it default "private" - req.Header.Add("x-amz-acl", acl) - default: - u := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) - // new request - req, err = c.newRequest("PUT", u, nil) - if err != nil { - return iodine.New(err, nil) - } - // by default without acl while creating a bucket - // make it default "private" - req.Header.Add("x-amz-acl", "private") + if object != "" { + return iodine.New(InvalidQueryURL{URL: ""}, nil) } + requestURL, err := c.getRequestURL() + if err != nil { + return iodine.New(err, nil) + } + // new request + req, err := c.newRequest("PUT", requestURL, nil) + if err != nil { + return iodine.New(err, nil) + } + // by default while creating a bucket make it default "private" + req.Header.Add("x-amz-acl", "private") + if c.AccessKeyID != "" && c.SecretAccessKey != "" { c.signRequest(req, c.Host) } @@ -157,87 +77,52 @@ func (c *s3Client) PutBucket(acl string) error { return nil } -func (c *s3Client) getBucketMetadata(bucket string) (item *client.Item, err error) { - if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") { - return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil) +func (c *s3Client) PutBucketACL(acl string) error { + if !isValidBucketACL(acl) { + return iodine.New(InvalidACL{ACL: acl}, nil) } - u := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) - req, err := c.newRequest("HEAD", u, nil) - if err != nil { - return nil, iodine.New(err, nil) - } - if c.AccessKeyID != "" && c.SecretAccessKey != "" { - c.signRequest(req, c.Host) - } - res, err := c.Transport.RoundTrip(req) - if err != nil { - return nil, iodine.New(err, nil) - } - item = new(client.Item) - item.Name = bucket - item.FileType = os.ModeDir - - defer res.Body.Close() - switch res.StatusCode { - case http.StatusOK: - fallthrough - case http.StatusMovedPermanently: - return item, nil - default: - return nil, iodine.New(NewError(res), nil) - } -} - -// getObjectMetadata - returns nil, os.ErrNotExist if not on object storage -func (c *s3Client) getObjectMetadata(bucket, object string) (item *client.Item, err error) { - if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") { - return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil) - } - queryURL := c.objectURL(bucket, object) - if !c.isValidQueryURL(queryURL) { - return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) - } - req, err := c.newRequest("HEAD", queryURL, nil) - if err != nil { - return nil, iodine.New(err, nil) - } - if c.AccessKeyID != "" && c.SecretAccessKey != "" { - c.signRequest(req, c.Host) - } - res, err := c.Transport.RoundTrip(req) - if err != nil { - return nil, iodine.New(err, nil) - } - defer res.Body.Close() - switch res.StatusCode { - case http.StatusNotFound: - return nil, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil) - case http.StatusOK: - contentLength, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64) - if err != nil { - return nil, iodine.New(err, nil) - } - date, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) - // AWS S3 uses RFC1123 standard for Date in HTTP header, unlike XML content - if err != nil { - return nil, iodine.New(err, nil) - } - item = new(client.Item) - item.Name = object - item.Time = date - item.Size = contentLength - item.FileType = 0 - return item, nil - default: - return nil, iodine.New(NewError(res), nil) - } -} - -// Stat - send a 'HEAD' on a bucket or object to see if exists -func (c *s3Client) Stat() (*client.Item, error) { bucket, object := c.url2BucketAndObject() - if object == "" { - return c.getBucketMetadata(bucket) + if !client.IsValidBucketName(bucket) { + return iodine.New(InvalidBucketName{Bucket: bucket}, nil) } - return c.getObjectMetadata(bucket, object) + if object != "" { + return iodine.New(InvalidQueryURL{URL: ""}, nil) + } + requestURL, err := c.getRequestURL() + if err != nil { + return iodine.New(err, nil) + } + + // new request + u := fmt.Sprintf("%s?acl", requestURL) + req, err := c.newRequest("PUT", u, nil) + if err != nil { + return iodine.New(err, nil) + } + + // by default without acl while creating a bucket + // make it default "private" + req.Header.Add("x-amz-acl", acl) + + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + + res, err := c.Transport.RoundTrip(req) + if err != nil { + return iodine.New(err, nil) + } + if res != nil { + if res.StatusCode != http.StatusOK { + return iodine.New(NewError(res), nil) + } + } + defer res.Body.Close() + return nil +} + +// Stat - send a 'HEAD' on a bucket or object to get its metadata +func (c *s3Client) Stat() (*client.Content, error) { + bucket, object := c.url2BucketAndObject() + return c.getMetadata(bucket, object) } diff --git a/pkg/client/s3/bucket_list.go b/pkg/client/s3/bucket_list.go deleted file mode 100644 index aec4921b..00000000 --- a/pkg/client/s3/bucket_list.go +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Minio Client (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package s3 - -import ( - "bytes" - "encoding/xml" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "os" - "strings" - "time" - - "github.com/minio-io/mc/pkg/client" - "github.com/minio-io/minio/pkg/iodine" -) - -/// Bucket API operations - -// List - list at delimited path not recursive -func (c *s3Client) List() <-chan client.ItemOnChannel { - itemCh := make(chan client.ItemOnChannel) - go c.list(itemCh) - return itemCh -} - -func (c *s3Client) list(itemCh chan client.ItemOnChannel) { - defer close(itemCh) - var items []*client.Item - bucket, objectPrefix := c.url2BucketAndObject() - item, err := c.getObjectMetadata(bucket, objectPrefix) - switch err { - case nil: // List a single object. Exact key - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - default: - if bucket == "" { - items, err = c.listBucketsInternal() - if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), - } - return - } - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - } - return - } - // List all objects matching the key prefix - items, err = c.listObjectsInternal(bucket, "", objectPrefix, "/", globalMaxKeys) - if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), - } - return - } - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - } - } -} - -// ListRecursive - list buckets and objects recursively -func (c *s3Client) ListRecursive() <-chan client.ItemOnChannel { - itemCh := make(chan client.ItemOnChannel) - go c.listRecursive(itemCh) - return itemCh -} - -func (c *s3Client) listRecursive(itemCh chan client.ItemOnChannel) { - defer close(itemCh) - - var items []*client.Item - bucket, objectPrefix := c.url2BucketAndObject() - item, err := c.getObjectMetadata(bucket, objectPrefix) - switch err { - case nil: // List a single object. Exact key - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - default: - if bucket == "" { - items, err = c.listBucketsInternal() - if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), - } - return - } - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - } - return - } - // List all objects matching the key prefix - items, err = c.listObjectsInternal(bucket, "", objectPrefix, "", globalMaxKeys) - if err != nil { - itemCh <- client.ItemOnChannel{ - Item: nil, - Err: iodine.New(err, nil), - } - return - } - for _, item := range items { - itemCh <- client.ItemOnChannel{ - Item: item, - Err: nil, - } - } - } -} - -func (c *s3Client) isValidQueryURL(queryURL string) bool { - u, err := url.Parse(queryURL) - if err != nil { - return false - } - if !strings.Contains(u.Scheme, "http") { - return false - } - return true -} - -// populate s3 response and decode results into listBucketResults{} -func (c *s3Client) decodeBucketResults(queryURL string) (*listBucketResults, error) { - if !c.isValidQueryURL(queryURL) { - return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) - } - bres := &listBucketResults{} - req, err := c.newRequest("GET", queryURL, nil) - if err != nil { - return nil, iodine.New(err, nil) - } - if c.AccessKeyID != "" && c.SecretAccessKey != "" { - c.signRequest(req, c.Host) - } - res, err := c.Transport.RoundTrip(req) - if err != nil { - return nil, iodine.New(err, nil) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, iodine.New(NewError(res), nil) - } - var logbuf bytes.Buffer - err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(bres) - if err != nil { - return nil, iodine.New(err, map[string]string{"XMLError": logbuf.String()}) - } - return bres, nil -} - -// filter items out of content and provide marker for future request -func (c *s3Client) filterItems(startAt, marker, prefix, delimiter string, contents []*item) (items []*client.Item, nextMarker string, err error) { - for _, it := range contents { - if it.Key == marker && it.Key != startAt { - // Skip first dup on pages 2 and higher. - continue - } - if it.Key < startAt { - msg := fmt.Sprintf("Unexpected response from Amazon: item key %q but wanted greater than %q", it.Key, startAt) - return nil, marker, iodine.New(client.UnexpectedError{Err: errors.New(msg)}, nil) - } - item := new(client.Item) - item.Name = it.Key - item.Time = it.LastModified - item.Size = it.Size - item.FileType = 0 - items = append(items, item) - nextMarker = it.Key - } - return items, nextMarker, nil -} - -// Populare query URL for Listobjects requests -func (c *s3Client) getQueryURL(bucket, marker, prefix, delimiter string, fetchN int) string { - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.bucketURL(bucket), fetchN)) - switch true { - case marker != "": - buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker))) - fallthrough - case prefix != "": - buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix))) - fallthrough - case delimiter != "": - buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter))) - } - return buffer.String() -} - -// listObjectsInternal returns 0 to maxKeys (inclusive) items from the -// provided bucket. Keys before startAt will be skipped. (This is the S3 -// 'marker' value). If the length of the returned items is equal to -// maxKeys, there is no indication whether or not the returned list is truncated. -func (c *s3Client) listObjectsInternal(bucket, startAt, prefix, delimiter string, maxKeys int) (items []*client.Item, err error) { - if maxKeys <= 0 { - return nil, iodine.New(InvalidMaxKeys{MaxKeys: maxKeys}, nil) - } - marker := startAt - for len(items) < maxKeys { - fetchN := maxKeys - len(items) - if fetchN > globalMaxKeys { - fetchN = globalMaxKeys - } - bres, err := c.decodeBucketResults(c.getQueryURL(bucket, marker, prefix, delimiter, fetchN)) - if err != nil { - return nil, iodine.New(err, nil) - } - if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker { - msg := fmt.Sprintf("Unexpected parse from server: %#v", bres) - return nil, iodine.New(client.UnexpectedError{ - Err: errors.New(msg)}, nil) - } - items, marker, err = c.filterItems(startAt, marker, prefix, delimiter, bres.Contents) - if err != nil { - return nil, iodine.New(err, nil) - } - for _, prefix := range bres.CommonPrefixes { - item := &client.Item{ - Name: prefix.Prefix, - // TODO no way of fixiing this as of now - Time: time.Now(), - Size: 0, - FileType: os.ModeDir, - } - items = append(items, item) - } - if !bres.IsTruncated { - break - } - if len(items) == 0 { - errMsg := errors.New("No items replied") - return nil, iodine.New(client.UnexpectedError{Err: errMsg}, nil) - } - } - return items, nil -} diff --git a/pkg/client/s3/bucket_object_metadata.go b/pkg/client/s3/bucket_object_metadata.go new file mode 100644 index 00000000..65c8be8a --- /dev/null +++ b/pkg/client/s3/bucket_object_metadata.go @@ -0,0 +1,114 @@ +/* + * Minio Client (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package s3 + +import ( + "net/http" + "os" + "strconv" + "time" + + "github.com/minio-io/mc/pkg/client" + "github.com/minio-io/minio/pkg/iodine" +) + +func (c *s3Client) getMetadata(bucket, object string) (content *client.Content, err error) { + if object == "" { + return c.getBucketMetadata(bucket) + } + return c.getObjectMetadata(bucket, object) +} + +func (c *s3Client) getBucketMetadata(bucket string) (content *client.Content, err error) { + queryURL, err := c.getRequestURL() + if err != nil { + return nil, iodine.New(err, nil) + } + if !c.isValidQueryURL(queryURL) { + return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) + } + req, err := c.newRequest("HEAD", queryURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + res, err := c.Transport.RoundTrip(req) + if err != nil { + return nil, iodine.New(err, nil) + } + content = new(client.Content) + content.Name = bucket + content.FileType = os.ModeDir + + defer res.Body.Close() + switch res.StatusCode { + case http.StatusOK: + fallthrough + case http.StatusMovedPermanently: + return content, nil + default: + return nil, iodine.New(NewError(res), nil) + } +} + +// getObjectMetadata - returns nil, os.ErrNotExist if not on object storage +func (c *s3Client) getObjectMetadata(bucket, object string) (content *client.Content, err error) { + queryURL, err := c.getRequestURL() + if err != nil { + return nil, iodine.New(err, nil) + } + if !c.isValidQueryURL(queryURL) { + return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) + } + req, err := c.newRequest("HEAD", queryURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + res, err := c.Transport.RoundTrip(req) + if err != nil { + return nil, iodine.New(err, nil) + } + defer res.Body.Close() + switch res.StatusCode { + case http.StatusNotFound: + return nil, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil) + case http.StatusOK: + // verify for Content-Length + contentLength, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64) + if err != nil { + return nil, iodine.New(err, nil) + } + // AWS S3 uses RFC1123 standard for Date in HTTP header + date, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) + if err != nil { + return nil, iodine.New(err, nil) + } + content = new(client.Content) + content.Name = object + content.Time = date + content.Size = contentLength + content.FileType = 0 + return content, nil + default: + return nil, iodine.New(NewError(res), nil) + } +} diff --git a/pkg/client/s3/bucket_routines.go b/pkg/client/s3/bucket_routines.go new file mode 100644 index 00000000..5a37dd54 --- /dev/null +++ b/pkg/client/s3/bucket_routines.go @@ -0,0 +1,304 @@ +/* +1;3803;0c * Minio Client (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package s3 + +import ( + "bytes" + "encoding/xml" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/minio-io/mc/pkg/client" + "github.com/minio-io/minio/pkg/iodine" +) + +func (c *s3Client) listInGoRoutine(contentCh chan client.ContentOnChannel) { + defer close(contentCh) + var contents []*client.Content + bucket, objectPrefix := c.url2BucketAndObject() + content, err := c.getObjectMetadata(bucket, objectPrefix) + switch err { + case nil: // List a single object. Exact key + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + default: + if bucket == "" { + contents, err = c.listBuckets() + if err != nil { + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), + } + return + } + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + } + return + } + // List all objects matching the key prefix + contents, err = c.listObjects(bucket, "", objectPrefix, "/", globalMaxKeys) + if err != nil { + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), + } + return + } + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + } + } +} + +func (c *s3Client) listRecursiveInGoRoutine(contentCh chan client.ContentOnChannel) { + defer close(contentCh) + + var contents []*client.Content + bucket, objectPrefix := c.url2BucketAndObject() + content, err := c.getObjectMetadata(bucket, objectPrefix) + switch err { + case nil: // List a single object. Exact key + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + default: + if bucket == "" { + contents, err = c.listBuckets() + if err != nil { + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), + } + return + } + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + } + return + } + // List all objects matching the key prefix + contents, err = c.listObjects(bucket, "", objectPrefix, "", globalMaxKeys) + if err != nil { + contentCh <- client.ContentOnChannel{ + Content: nil, + Err: iodine.New(err, nil), + } + return + } + for _, content := range contents { + contentCh <- client.ContentOnChannel{ + Content: content, + Err: nil, + } + } + } +} + +// populate s3 response and decode results into listBucketResults{} +func (c *s3Client) decodeBucketResults(queryURL string) (*listBucketResults, error) { + if !c.isValidQueryURL(queryURL) { + return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) + } + bres := &listBucketResults{} + req, err := c.newRequest("GET", queryURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + res, err := c.Transport.RoundTrip(req) + if err != nil { + return nil, iodine.New(err, nil) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, iodine.New(NewError(res), nil) + } + var logbuf bytes.Buffer + err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(bres) + if err != nil { + return nil, iodine.New(err, map[string]string{"XMLError": logbuf.String()}) + } + return bres, nil +} + +// filter contents out of content and provide marker for future request +func (c *s3Client) filterContents(startAt, marker, prefix, delimiter string, cts []*content) ([]*client.Content, string, error) { + var contents []*client.Content + var nextMarker string + for _, ct := range cts { + if ct.Key == marker && ct.Key != startAt { + // Skip first dup on pages 2 and higher. + continue + } + if ct.Key < startAt { + msg := fmt.Sprintf("Unexpected response from Amazon: content key %q but wanted greater than %q", ct.Key, startAt) + return nil, marker, iodine.New(client.UnexpectedError{Err: errors.New(msg)}, nil) + } + content := new(client.Content) + content.Name = ct.Key + content.Time = ct.LastModified + content.Size = ct.Size + content.FileType = 0 + contents = append(contents, content) + nextMarker = ct.Key + } + return contents, nextMarker, nil +} + +// Populare query URL for Listobjects requests +func (c *s3Client) getQueryURL(marker, prefix, delimiter string, fetchN int) string { + var buffer bytes.Buffer + buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.mustGetRequestURL(), fetchN)) + switch true { + case marker != "": + buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker))) + fallthrough + case prefix != "": + buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix))) + fallthrough + case delimiter != "": + buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter))) + } + return buffer.String() +} + +// listObjects returns 0 to maxKeys (inclusive) contents from the +// provided bucket. Keys before startAt will be skipped. (This is the S3 +// 'marker' value). If the length of the returned contents is equal to +// maxKeys, there is no indication whether or not the returned list is truncated. +func (c *s3Client) listObjects(bucket, startAt, prefix, delimiter string, maxKeys int) (contents []*client.Content, err error) { + if maxKeys <= 0 { + return nil, iodine.New(InvalidMaxKeys{MaxKeys: maxKeys}, nil) + } + marker := startAt + for len(contents) < maxKeys { + fetchN := maxKeys - len(contents) + if fetchN > globalMaxKeys { + fetchN = globalMaxKeys + } + bres, err := c.decodeBucketResults(c.getQueryURL(marker, prefix, delimiter, fetchN)) + if err != nil { + return nil, iodine.New(err, nil) + } + if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker { + msg := fmt.Sprintf("Unexpected parse from server: %#v", bres) + return nil, iodine.New(client.UnexpectedError{ + Err: errors.New(msg)}, nil) + } + contents, marker, err = c.filterContents(startAt, marker, prefix, delimiter, bres.Contents) + if err != nil { + return nil, iodine.New(err, nil) + } + for _, prefix := range bres.CommonPrefixes { + content := &client.Content{ + Name: prefix.Prefix, + // TODO no way of fixiing this as of now + Time: time.Now(), + Size: 0, + FileType: os.ModeDir, + } + contents = append(contents, content) + } + if !bres.IsTruncated { + break + } + if len(contents) == 0 { + errMsg := errors.New("No contents replied") + return nil, iodine.New(client.UnexpectedError{Err: errMsg}, nil) + } + } + return contents, nil +} + +// Get list of buckets +func (c *s3Client) listBuckets() ([]*client.Content, error) { + requestURL, err := c.getRequestURL() + if err != nil { + return nil, iodine.New(err, nil) + } + req, err := c.newRequest("GET", requestURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + // do not ignore signatures for 'listBuckets()' it is never a public request for amazon s3 + // so lets aggressively verify + if strings.Contains(c.Host, "amazonaws.com") && (c.AccessKeyID == "" || c.SecretAccessKey == "") { + msg := "Authorization key cannot be empty for listing buckets, please choose a valid bucketname if its a public request" + return nil, iodine.New(errors.New(msg), nil) + } + // rest we can ignore + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + res, err := c.Transport.RoundTrip(req) + if err != nil { + return nil, iodine.New(err, nil) + } + if res != nil { + if res.StatusCode != http.StatusOK { + err = NewError(res) + return nil, iodine.New(err, nil) + } + } + defer res.Body.Close() + + type bucket struct { + Name string + CreationDate time.Time + } + type allMyBuckets struct { + Buckets struct { + Bucket []*bucket + } + } + var buckets allMyBuckets + if err := xml.NewDecoder(res.Body).Decode(&buckets); err != nil { + return nil, iodine.New(client.UnexpectedError{ + Err: errors.New("Malformed response received from server")}, + map[string]string{"XMLError": err.Error()}) + } + var contents []*client.Content + for _, b := range buckets.Buckets.Bucket { + content := new(client.Content) + content.Name = b.Name + content.Time = b.CreationDate + content.FileType = os.ModeDir + contents = append(contents, content) + } + return contents, nil +} diff --git a/pkg/client/s3/client.go b/pkg/client/s3/client.go deleted file mode 100644 index 0793e7d0..00000000 --- a/pkg/client/s3/client.go +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Minio Client (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package s3 - -import ( - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "net/url" - "strings" - "time" - - "github.com/awslabs/aws-sdk-go/aws" - "github.com/awslabs/aws-sdk-go/service/s3" - "github.com/minio-io/mc/pkg/client" - "github.com/minio-io/minio/pkg/iodine" -) - -type item struct { - Key string - LastModified time.Time - ETag string - Size int64 -} - -type prefix struct { - Prefix string -} - -type listBucketResults struct { - Contents []*item - IsTruncated bool - MaxKeys int - Name string // bucket name - Marker string - Delimiter string - Prefix string - CommonPrefixes []*prefix -} - -// Meta holds Amazon S3 client credentials and flags. -type Meta struct { - *Config - *s3.S3 - Transport http.RoundTripper // or nil for the default behavior -} - -// Config - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html -type Config struct { - AccessKeyID string - SecretAccessKey string - HostURL string - UserAgent string - Debug bool - - // Used for SSL transport layer - CertPEM string - KeyPEM string -} - -// TLSConfig - TLS cert and key configuration -type TLSConfig struct { - CertPEMBlock []byte - KeyPEMBlock []byte -} - -type s3Client struct { - *Meta - - // Supports URL in following formats - // - http://// - // - http://./ - *url.URL -} - -// url2BucketAndObject converts URL to bucketName and objectName -func (c *s3Client) url2BucketAndObject() (bucketName, objectName string) { - splits := strings.SplitN(c.Path, "/", 3) - switch len(splits) { - case 0, 1: - bucketName = "" - objectName = "" - case 2: - bucketName = splits[1] - objectName = "" - case 3: - bucketName = splits[1] - objectName = splits[2] - } - return bucketName, objectName -} - -// bucketURL constructs a URL (with a trailing slash) for a given -// bucket. URL is appropriately encoded based on the host's object -// storage implementation. -func (c *s3Client) bucketURL(bucket string) string { - var url string - // Avoid bucket names with "." in them - // Citing - http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html - // - // ===================== - // When using virtual hosted–style buckets with SSL, the SSL wild card certificate - // only matches buckets that do not contain periods. To work around this, use HTTP - // or write your own certificate verification logic. - // ===================== - // - if client.IsValidBucketName(bucket) { - // if localhost use PathStyle - if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") { - return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) - } - // Verify if its ip address, use PathStyle - host, _, _ := net.SplitHostPort(c.Host) - if net.ParseIP(host) != nil { - return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) - } - if strings.Contains(c.Host, "amazonaws.com") { - // amazonaws.com use subdomain style - return fmt.Sprintf("%s://%s.%s/", c.Scheme, bucket, c.Host) - } - url = fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) - } - return url -} - -// objectURL constructs a URL using bucket and object -func (c *s3Client) objectURL(bucket, object string) string { - url := c.bucketURL(bucket) - if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") { - return url + "/" + object - } - // Verify if its ip address, use PathStyle - host, _, _ := net.SplitHostPort(c.Host) - if net.ParseIP(host) != nil { - return url + "/" + object - } - // if not amazon do not construct a subdomain URL - if !strings.Contains(c.Host, "amazonaws.com") { - return url + "/" + object - } - return url + object -} - -// Instantiate a new request -func (c *s3Client) newRequest(method, url string, body io.ReadCloser) (*http.Request, error) { - errParams := map[string]string{ - "url": url, - "method": method, - "userAgent": c.UserAgent, - } - req, err := http.NewRequest(method, url, body) - if err != nil { - return nil, iodine.New(err, errParams) - } - req.Header.Set("User-Agent", c.UserAgent) - req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) - return req, nil -} - -// New returns an initialized s3Client structure. -// if debug use a internal trace transport -func New(config *Config) client.Client { - u, err := url.Parse(config.HostURL) - if err != nil { - return nil - } - var traceTransport RoundTripTrace - var transport http.RoundTripper - if config.Debug { - traceTransport = GetNewTraceTransport(NewTrace(false, true, nil), http.DefaultTransport) - transport = GetNewTraceTransport(s3Verify{}, traceTransport) - } else { - transport = http.DefaultTransport - } - awsConf := aws.DefaultConfig - awsConf.Credentials = aws.Creds(config.AccessKeyID, config.SecretAccessKey, "") - awsConf.HTTPClient = &http.Client{Transport: transport} - awsConf.Logger = ioutil.Discard - s3c := &s3Client{ - &Meta{ - Config: config, - Transport: transport, - S3: s3.New(awsConf), - }, u, - } - return s3c -} diff --git a/pkg/client/s3/client_test.go b/pkg/client/s3/client_test.go index e8ab3369..1ca0d4bd 100644 --- a/pkg/client/s3/client_test.go +++ b/pkg/client/s3/client_test.go @@ -37,7 +37,7 @@ type MySuite struct{} var _ = Suite(&MySuite{}) -func listAllMyBuckets(r io.Reader) ([]*client.Item, error) { +func listAllMyBuckets(r io.Reader) ([]*client.Content, error) { type bucket struct { Name string CreationDate time.Time @@ -53,14 +53,14 @@ func listAllMyBuckets(r io.Reader) ([]*client.Item, error) { return nil, iodine.New(client.UnexpectedError{Err: errors.New("Malformed response received from server")}, map[string]string{"XMLError": err.Error()}) } - var items []*client.Item + var contents []*client.Content for _, b := range buckets.Buckets.Bucket { - item := new(client.Item) - item.Name = b.Name - item.Time = b.CreationDate - items = append(items, item) + content := new(client.Content) + content.Name = b.Name + content.Time = b.CreationDate + contents = append(contents, content) } - return items, nil + return contents, nil } func (s *MySuite) TestConfig(c *C) { @@ -81,7 +81,7 @@ func (s *MySuite) TestBucketACL(c *C) { {"private", true}, {"public-read", true}, {"public-read-write", true}, - {"", true}, + {"", false}, {"readonly", false}, {"invalid", false}, } @@ -108,7 +108,7 @@ func (s *MySuite) TestParseBuckets(c *C) { t1, err := time.Parse(time.RFC3339, "2006-06-21T07:04:31.000Z") t2, err := time.Parse(time.RFC3339, "2006-06-21T07:04:32.000Z") - want := []*client.Item{ + want := []*client.Content{ {Name: "bucketOne", Time: t1}, {Name: "bucketTwo", Time: t2}, } diff --git a/pkg/client/s3/common.go b/pkg/client/s3/common.go new file mode 100644 index 00000000..4cb43c86 --- /dev/null +++ b/pkg/client/s3/common.go @@ -0,0 +1,56 @@ +/* + * Minio Client (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package s3 + +import ( + "io/ioutil" + "net/http" + "net/url" + + "github.com/awslabs/aws-sdk-go/aws" + "github.com/awslabs/aws-sdk-go/service/s3" + "github.com/minio-io/mc/pkg/client" +) + +// New returns an initialized s3Client structure. +// if debug use a internal trace transport +func New(config *Config) client.Client { + u, err := url.Parse(config.HostURL) + if err != nil { + return nil + } + var traceTransport RoundTripTrace + var transport http.RoundTripper + if config.Debug { + traceTransport = GetNewTraceTransport(NewTrace(false, true, nil), http.DefaultTransport) + transport = GetNewTraceTransport(s3Verify{}, traceTransport) + } else { + transport = http.DefaultTransport + } + awsConf := aws.DefaultConfig + awsConf.Credentials = aws.Creds(config.AccessKeyID, config.SecretAccessKey, "") + awsConf.HTTPClient = &http.Client{Transport: transport} + awsConf.Logger = ioutil.Discard + s3c := &s3Client{ + &Meta{ + Config: config, + Transport: transport, + S3: s3.New(awsConf), + }, u, + } + return s3c +} diff --git a/pkg/client/s3/definitions.go b/pkg/client/s3/definitions.go new file mode 100644 index 00000000..e697b87f --- /dev/null +++ b/pkg/client/s3/definitions.go @@ -0,0 +1,68 @@ +package s3 + +import ( + "net/http" + "net/url" + "time" + + "github.com/awslabs/aws-sdk-go/service/s3" +) + +// +type content struct { + Key string + LastModified time.Time + ETag string + Size int64 +} + +// prefix +type prefix struct { + Prefix string +} + +type listBucketResults struct { + Contents []*content + IsTruncated bool + MaxKeys int + Name string // bucket name + Marker string + Delimiter string + Prefix string + CommonPrefixes []*prefix +} + +// Meta holds Amazon S3 client credentials and flags. +type Meta struct { + *Config + *s3.S3 + Transport http.RoundTripper // or nil for the default behavior +} + +// Config - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html +type Config struct { + AccessKeyID string + SecretAccessKey string + HostURL string + UserAgent string + Debug bool + + // Used for SSL transport layer + CertPEM string + KeyPEM string +} + +// TLSConfig - TLS cert and key configuration +type TLSConfig struct { + CertPEMBlock []byte + KeyPEMBlock []byte +} + +type s3Client struct { + *Meta + + // Supports URL in following formats + // - http://// + // - http://./ + *url.URL +} diff --git a/pkg/client/s3/list_contents.go b/pkg/client/s3/list_contents.go new file mode 100644 index 00000000..d2abe704 --- /dev/null +++ b/pkg/client/s3/list_contents.go @@ -0,0 +1,35 @@ +/* + * Minio Client (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package s3 + +import "github.com/minio-io/mc/pkg/client" + +/// Bucket API operations + +// List - list at delimited path not recursive +func (c *s3Client) List() <-chan client.ContentOnChannel { + contentCh := make(chan client.ContentOnChannel) + go c.listInGoRoutine(contentCh) + return contentCh +} + +// ListRecursive - list buckets and objects recursively +func (c *s3Client) ListRecursive() <-chan client.ContentOnChannel { + contentCh := make(chan client.ContentOnChannel) + go c.listRecursiveInGoRoutine(contentCh) + return contentCh +} diff --git a/pkg/client/s3/object_get.go b/pkg/client/s3/object_get.go index 19f1582e..ceba0ab1 100644 --- a/pkg/client/s3/object_get.go +++ b/pkg/client/s3/object_get.go @@ -29,17 +29,36 @@ import ( /// Object API operations -// Get - download a requested object from a given bucket -func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error) { - bucket, object := c.url2BucketAndObject() - if !client.IsValidBucketName(bucket) { - return nil, 0, "", iodine.New(InvalidBucketName{Bucket: bucket}, nil) +func (c *s3Client) setRange(req *http.Request, offset, length int64) (*http.Request, error) { + if offset < 0 { + return nil, iodine.New(client.InvalidRange{Offset: offset}, nil) + } + if length >= 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) + } else { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + return req, nil +} + +func (c *s3Client) get() (*http.Request, error) { + queryURL, err := c.getRequestURL() + if err != nil { + return nil, iodine.New(err, nil) } - queryURL := c.objectURL(bucket, object) if !c.isValidQueryURL(queryURL) { - return nil, 0, "", iodine.New(InvalidQueryURL{URL: queryURL}, nil) + return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) } req, err := c.newRequest("GET", queryURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + return req, nil +} + +// Get - download a requested object from a given bucket +func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error) { + req, err := c.get() if err != nil { return nil, 0, "", iodine.New(err, nil) } @@ -61,35 +80,21 @@ func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error) // GetPartial fetches part of the s3 object in bucket. // If length is negative, the rest of the object is returned. func (c *s3Client) GetPartial(offset, length int64) (body io.ReadCloser, size int64, md5 string, err error) { - bucket, object := c.url2BucketAndObject() - if !client.IsValidBucketName(bucket) { - return nil, 0, "", iodine.New(InvalidBucketName{Bucket: bucket}, nil) - } - if offset < 0 { - return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil) - } - queryURL := c.objectURL(bucket, object) - if !c.isValidQueryURL(queryURL) { - return nil, 0, "", iodine.New(InvalidQueryURL{URL: queryURL}, nil) - } - req, err := c.newRequest("GET", queryURL, nil) + req, err := c.get() if err != nil { return nil, 0, "", iodine.New(err, nil) } - if length >= 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) - } else { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + req, err = c.setRange(req, offset, length) + if err != nil { + return nil, 0, "", iodine.New(err, nil) } if c.AccessKeyID != "" && c.SecretAccessKey != "" { c.signRequest(req, c.Host) } - res, err := c.Transport.RoundTrip(req) if err != nil { return nil, 0, "", iodine.New(err, nil) } - switch res.StatusCode { case http.StatusOK, http.StatusPartialContent: return res.Body, res.ContentLength, res.Header.Get("ETag"), nil diff --git a/pkg/client/s3/object_multi.go b/pkg/client/s3/object_multipart_put.go similarity index 91% rename from pkg/client/s3/object_multi.go rename to pkg/client/s3/object_multipart_put.go index b6d2622c..380eb54a 100644 --- a/pkg/client/s3/object_multi.go +++ b/pkg/client/s3/object_multipart_put.go @@ -87,7 +87,7 @@ func (c *s3Client) AbortMultiPartUpload(uploadID string) error { } // ListParts -func (c *s3Client) ListParts(uploadID string) (items *client.PartItems, err error) { +func (c *s3Client) ListParts(uploadID string) (contents *client.PartContents, err error) { bucket, object := c.url2BucketAndObject() listPartsInput := new(s3.ListPartsInput) listPartsInput.Bucket = &bucket @@ -98,18 +98,18 @@ func (c *s3Client) ListParts(uploadID string) (items *client.PartItems, err erro if err != nil { return nil, iodine.New(err, nil) } - items = new(client.PartItems) - items.Key = *listPartsOutput.Key - items.IsTruncated = *listPartsOutput.IsTruncated - items.UploadID = *listPartsOutput.UploadID + contents = new(client.PartContents) + contents.Key = *listPartsOutput.Key + contents.IsTruncated = *listPartsOutput.IsTruncated + contents.UploadID = *listPartsOutput.UploadID for _, part := range listPartsOutput.Parts { newPart := new(client.Part) newPart.ETag = *part.ETag newPart.LastModified = *part.LastModified newPart.PartNumber = *part.PartNumber newPart.Size = *part.Size - items.Parts = append(items.Parts, newPart) + contents.Parts = append(contents.Parts, newPart) } - return items, nil + return contents, nil } diff --git a/pkg/client/s3/object_put.go b/pkg/client/s3/object_put.go index 91cbdc9f..c49023d6 100644 --- a/pkg/client/s3/object_put.go +++ b/pkg/client/s3/object_put.go @@ -22,6 +22,7 @@ import ( "errors" "io" "net/http" + "strconv" "strings" "github.com/minio-io/mc/pkg/client" @@ -30,49 +31,48 @@ import ( /// Object Operations PUT - keeping this in a separate file for readability -// Put - upload new object to bucket -func (c *s3Client) Put(md5HexString string, size int64) (io.WriteCloser, error) { - bucket, object := c.url2BucketAndObject() - if !client.IsValidBucketName(bucket) { - return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil) +func (c *s3Client) put(size int64) (*http.Request, error) { + queryURL, err := c.getRequestURL() + if err != nil { + return nil, iodine.New(err, nil) } - queryURL := c.objectURL(bucket, object) if !c.isValidQueryURL(queryURL) { return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil) } + req, err := c.newRequest("PUT", queryURL, nil) + if err != nil { + return nil, iodine.New(err, nil) + } + req.Header.Set("Content-Length", strconv.FormatInt(size, 10)) + return req, nil +} + +// Put - upload new object to bucket +func (c *s3Client) Put(md5HexString string, size int64) (io.WriteCloser, error) { + // if size is negative + if size < 0 { + return nil, iodine.New(client.InvalidArgument{Err: errors.New("invalid argument")}, nil) + } + req, err := c.put(size) + if err != nil { + return nil, iodine.New(err, nil) + } + // set Content-MD5 only if md5 is provided + if strings.TrimSpace(md5HexString) != "" { + md5, err := hex.DecodeString(md5HexString) + if err != nil { + return nil, iodine.New(err, nil) + } + req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5)) + } + if c.AccessKeyID != "" && c.SecretAccessKey != "" { + c.signRequest(req, c.Host) + } + // starting Pipe session r, w := io.Pipe() blockingWriter := client.NewBlockingWriteCloser(w) go func() { - if size < 0 { - err := iodine.New(client.InvalidArgument{Err: errors.New("invalid argument")}, nil) - r.CloseWithError(err) - blockingWriter.Release(err) - return - } - req, err := c.newRequest("PUT", queryURL, r) - if err != nil { - err := iodine.New(err, nil) - r.CloseWithError(err) - blockingWriter.Release(err) - return - } - req.Method = "PUT" - req.ContentLength = size - - // set Content-MD5 only if md5 is provided - if strings.TrimSpace(md5HexString) != "" { - md5, err := hex.DecodeString(md5HexString) - if err != nil { - err := iodine.New(err, nil) - r.CloseWithError(err) - blockingWriter.Release(err) - return - } - req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5)) - } - if c.AccessKeyID != "" && c.SecretAccessKey != "" { - c.signRequest(req, c.Host) - } + req.Body = r // this is necessary for debug, since the underlying transport is a wrapper res, err := c.Transport.RoundTrip(req) if err != nil { diff --git a/pkg/client/s3/request.go b/pkg/client/s3/request.go new file mode 100644 index 00000000..d1d722d3 --- /dev/null +++ b/pkg/client/s3/request.go @@ -0,0 +1,107 @@ +package s3 + +import ( + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/minio-io/mc/pkg/client" + "github.com/minio-io/minio/pkg/iodine" +) + +func (c *s3Client) isValidQueryURL(queryURL string) bool { + u, err := url.Parse(queryURL) + if err != nil { + return false + } + if !strings.Contains(u.Scheme, "http") { + return false + } + return true +} + +// url2BucketAndObject gives bucketName and objectName from URL path +func (c *s3Client) url2BucketAndObject() (bucketName, objectName string) { + splits := strings.SplitN(c.Path, "/", 3) + switch len(splits) { + case 0, 1: + bucketName = "" + objectName = "" + case 2: + bucketName = splits[1] + objectName = "" + case 3: + bucketName = splits[1] + objectName = splits[2] + } + return bucketName, objectName +} + +func (c *s3Client) getBucketRequestURL(bucket string) string { + // default to path style + url := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) + if strings.Contains(c.Host, "amazonaws.com") { + // amazonaws.com use subdomain style + url = fmt.Sprintf("%s://%s.%s", c.Scheme, bucket, c.Host) + } + return url +} + +// getObjectRequestURL constructs a URL using bucket and object +func (c *s3Client) getObjectRequestURL(bucket, object string) string { + return c.getBucketRequestURL(bucket) + "/" + object +} + +func (c *s3Client) mustGetRequestURL() string { + requestURL, _ := c.getRequestURL() + return requestURL +} + +// getRequestURL constructs a URL. URL is appropriately encoded based on the host's object storage implementation. +func (c *s3Client) getRequestURL() (string, error) { + bucket, object := c.url2BucketAndObject() + // Avoid bucket names with "." in them + // Citing - http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html + // + // ===================== + // When using virtual hosted–style buckets with SSL, the SSL wild card certificate + // only matches buckets that do not contain periods. To work around this, use HTTP + // or write your own certificate verification logic. + // ===================== + // + if !client.IsValidBucketName(bucket) { + return "", iodine.New(InvalidBucketName{Bucket: bucket}, nil) + } + if !client.IsValidObject(object) { + return "", iodine.New(InvalidObjectName{Bucket: bucket, Object: object}, nil) + } + switch { + case bucket == "" && object == "": + return fmt.Sprintf("%s://%s/", c.Scheme, c.Host), nil + case bucket != "" && object == "": + return c.getBucketRequestURL(bucket), nil + case bucket != "" && object != "": + return c.getObjectRequestURL(bucket, object), nil + } + return "", iodine.New(errors.New("Unexpected error, please report this.."), nil) +} + +// Instantiate a new request +func (c *s3Client) newRequest(method, url string, body io.ReadCloser) (*http.Request, error) { + errParams := map[string]string{ + "url": url, + "method": method, + "userAgent": c.UserAgent, + } + req, err := http.NewRequest(method, url, body) + if err != nil { + return nil, iodine.New(err, errParams) + } + req.Header.Set("User-Agent", c.UserAgent) + req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) + return req, nil +}