1
0
mirror of https://github.com/minio/mc.git synced 2025-11-12 01:02:26 +03:00

Full cleanup of s3 client for mc

This commit is contained in:
Harshavardhana
2015-05-01 15:52:08 -07:00
parent 28315e7492
commit 78a78c072a
23 changed files with 1008 additions and 875 deletions

View File

@@ -92,12 +92,12 @@ func doUpdateAccessCmd(methods clientMethods, targetURL, targetACL string, targe
}
func doUpdateAccess(clnt client.Client, targetURL, targetACL string) (string, error) {
err := clnt.PutBucket(targetACL)
err := clnt.PutBucketACL(targetACL)
for i := 0; i < globalMaxRetryFlag && err != nil && isValidRetry(err); i++ {
fmt.Println(console.Retry("Retrying ... %d", i))
// Progressively longer delays
time.Sleep(time.Duration(i*i) * time.Second)
err = clnt.PutBucket(targetACL)
err = clnt.PutBucketACL(targetACL)
}
if err != nil {
err := iodine.New(err, nil)

View File

@@ -89,12 +89,12 @@ func doMakeBucketCmd(methods clientMethods, targetURL string, targetConfig *host
// doMakeBucket - wrapper around PutBucket() API
func doMakeBucket(clnt client.Client, targetURL string) (string, error) {
err := clnt.PutBucket("")
err := clnt.PutBucket()
for i := 0; i < globalMaxRetryFlag && err != nil && isValidRetry(err); i++ {
fmt.Println(console.Retry("Retrying ... %d", i))
// Progressively longer delays
time.Sleep(time.Duration(i*i) * time.Second)
err = clnt.PutBucket("")
err = clnt.PutBucket()
}
if err != nil {
err := iodine.New(err, nil)

View File

@@ -132,7 +132,7 @@ func (s *CmdTestSuite) TestCopyRecursive(c *C) {
wg.Done()
}()
items := []*client.Item{
contents := []*client.Content{
{Name: "hello1", Time: time.Now(), Size: dataLen1},
{Name: "hello2", Time: time.Now(), Size: dataLen2},
}
@@ -150,17 +150,17 @@ func (s *CmdTestSuite) TestCopyRecursive(c *C) {
targetURLConfigMap[targetURL] = targetConfig
methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
contentCh := make(chan client.ContentOnChannel)
go func() {
defer close(itemCh)
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
defer close(contentCh)
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}()
cl1.On("ListRecursive").Return(itemCh).Once()
cl1.On("ListRecursive").Return(contentCh).Once()
sourceReader1 := ioutil.NopCloser(bytes.NewBufferString(data1))
sourceReader2 := ioutil.NopCloser(bytes.NewBufferString(data2))
methods.On("getSourceReader", sourceURL+"hello1", sourceConfig).Return(sourceReader1, dataLen1, etag1, nil).Once()
@@ -281,7 +281,7 @@ func (s *CmdTestSuite) TestLsCmdWithBucket(c *C) {
data2 := "hello world 2"
dataLen2 := int64(len(data2))
items := []*client.Item{
contents := []*client.Content{
{Name: "hello1", Time: time.Now(), Size: dataLen1},
{Name: "hello2", Time: time.Now(), Size: dataLen2},
}
@@ -293,17 +293,17 @@ func (s *CmdTestSuite) TestLsCmdWithBucket(c *C) {
sourceURLConfigMap[sourceURL] = sourceConfig
methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
contentCh := make(chan client.ContentOnChannel)
go func() {
defer close(itemCh)
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
defer close(contentCh)
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}()
cl1.On("ListRecursive").Return(itemCh).Once()
cl1.On("ListRecursive").Return(contentCh).Once()
err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false)
c.Assert(err, IsNil)
@@ -324,7 +324,7 @@ func (s *CmdTestSuite) TestLsCmdWithFilePath(c *C) {
data2 := "hello world 2"
dataLen2 := int64(len(data2))
items := []*client.Item{
contents := []*client.Content{
{Name: "hello1", Time: time.Now(), Size: dataLen1},
{Name: "hello2", Time: time.Now(), Size: dataLen2},
}
@@ -337,17 +337,17 @@ func (s *CmdTestSuite) TestLsCmdWithFilePath(c *C) {
methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
contentCh := make(chan client.ContentOnChannel)
go func() {
defer close(itemCh)
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
defer close(contentCh)
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}()
cl1.On("ListRecursive").Return(itemCh).Once()
cl1.On("ListRecursive").Return(contentCh).Once()
err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false)
c.Assert(err, IsNil)
@@ -362,7 +362,7 @@ func (s *CmdTestSuite) TestLsCmdListsBuckets(c *C) {
methods := &MockclientMethods{}
cl1 := &clientMocks.Client{}
buckets := []*client.Item{
buckets := []*client.Content{
{Name: "bucket1", Time: time.Now()},
{Name: "bucket2", Time: time.Now()},
}
@@ -374,17 +374,17 @@ func (s *CmdTestSuite) TestLsCmdListsBuckets(c *C) {
sourceURLConfigMap[sourceURL] = sourceConfig
methods.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
contentCh := make(chan client.ContentOnChannel)
go func() {
defer close(itemCh)
defer close(contentCh)
for _, bucket := range buckets {
itemCh <- client.ItemOnChannel{
Item: bucket,
Err: nil,
contentCh <- client.ContentOnChannel{
Content: bucket,
Err: nil,
}
}
}()
cl1.On("ListRecursive").Return(itemCh).Once()
cl1.On("ListRecursive").Return(contentCh).Once()
err = doListRecursiveCmd(methods, sourceURL, sourceConfig, false)
c.Assert(err, IsNil)
@@ -406,7 +406,7 @@ func (s *CmdTestSuite) TestMbCmd(c *C) {
targetURLConfigMap[targetURL] = targetConfig
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "").Return(nil).Once()
cl1.On("PutBucket").Return(nil).Once()
msg, err := doMakeBucketCmd(methods, targetURL, targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
@@ -429,19 +429,19 @@ func (s *CmdTestSuite) TestAccessCmd(c *C) {
targetURLConfigMap[targetURL] = targetConfig
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "private").Return(nil).Once()
cl1.On("PutBucketACL", "private").Return(nil).Once()
msg, err := doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "public").Return(nil).Once()
cl1.On("PutBucketACL", "public").Return(nil).Once()
msg, err = doUpdateAccessCmd(methods, targetURL, "public", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "readonly").Return(nil).Once()
cl1.On("PutBucketACL", "readonly").Return(nil).Once()
msg, err = doUpdateAccessCmd(methods, targetURL, "readonly", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
@@ -469,8 +469,8 @@ func (s *CmdTestSuite) TestAccessCmdFailures(c *C) {
c.Assert(err, Not(IsNil))
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "private").Return(&net.DNSError{}).Once()
cl1.On("PutBucket", "private").Return(nil).Once()
cl1.On("PutBucketACL", "private").Return(&net.DNSError{}).Once()
cl1.On("PutBucketACL", "private").Return(nil).Once()
msg, err = doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
@@ -484,7 +484,7 @@ func (s *CmdTestSuite) TestAccessCmdFailures(c *C) {
err.Op = "dial"
err.Net = "tcp"
err.Err = errors.New("Another expected error")
cl1.On("PutBucket", "private").Return(err).Once()
cl1.On("PutBucketACL", "private").Return(err).Once()
}
msg, err = doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false)
globalMaxRetryFlag = retries
@@ -514,8 +514,8 @@ func (s *CmdTestSuite) TestMbCmdFailures(c *C) {
c.Assert(err, Not(IsNil))
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "").Return(&net.DNSError{}).Once()
cl1.On("PutBucket", "").Return(nil).Once()
cl1.On("PutBucket").Return(&net.DNSError{}).Once()
cl1.On("PutBucket").Return(nil).Once()
msg, err = doMakeBucketCmd(methods, targetURL, targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
@@ -529,7 +529,7 @@ func (s *CmdTestSuite) TestMbCmdFailures(c *C) {
err.Op = "dial"
err.Net = "tcp"
err.Err = errors.New("Another expected error")
cl1.On("PutBucket", "").Return(err).Once()
cl1.On("PutBucket").Return(err).Once()
}
msg, err = doMakeBucketCmd(methods, targetURL, targetConfig, false)
globalMaxRetryFlag = retries
@@ -554,19 +554,19 @@ func (s *CmdTestSuite) TestAccessCmdOnFile(c *C) {
targetURLConfigMap[targetURL] = targetConfig
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "private").Return(nil).Once()
cl1.On("PutBucketACL", "private").Return(nil).Once()
msg, err := doUpdateAccessCmd(methods, targetURL, "private", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "public").Return(nil).Once()
cl1.On("PutBucketACL", "public").Return(nil).Once()
msg, err = doUpdateAccessCmd(methods, targetURL, "public", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "readonly").Return(nil).Once()
cl1.On("PutBucketACL", "readonly").Return(nil).Once()
msg, err = doUpdateAccessCmd(methods, targetURL, "readonly", targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)
@@ -589,7 +589,7 @@ func (s *CmdTestSuite) TestMbCmdOnFile(c *C) {
targetURLConfigMap[targetURL] = targetConfig
methods.On("getNewClient", targetURL, targetConfig, false).Return(cl1, nil).Once()
cl1.On("PutBucket", "").Return(nil).Once()
cl1.On("PutBucket").Return(nil).Once()
msg, err := doMakeBucketCmd(methods, targetURL, targetConfig, false)
c.Assert(msg, Equals, "")
c.Assert(err, IsNil)

6
cp.go
View File

@@ -85,11 +85,11 @@ func doCopySingleSourceRecursive(methods clientMethods, sourceURL, targetURL str
if err != nil {
return iodine.New(err, nil)
}
for itemCh := range sourceClnt.ListRecursive() {
if itemCh.Err != nil {
for contentCh := range sourceClnt.ListRecursive() {
if contentCh.Err != nil {
continue
}
newSourceURL, newTargetURL := getNewURLRecursive(sourceURL, targetURL, itemCh.Item.Name)
newSourceURL, newTargetURL := getNewURLRecursive(sourceURL, targetURL, contentCh.Content.Name)
if err := doCopySingleSource(methods, newSourceURL, newTargetURL, sourceConfig, targetConfig); err != nil {
// verify for directory related errors, if "open" failed on directories ignore those errors
switch e := iodine.ToError(err).(type) {

20
ls.go
View File

@@ -35,8 +35,8 @@ const (
printDate = "2006-01-02 15:04:05 MST"
)
// printItem prints item meta-data
func printItem(date time.Time, v int64, name string, fileType os.FileMode) {
// printContent prints content meta-data
func printContent(date time.Time, v int64, name string, fileType os.FileMode) {
fmt.Printf(console.Time("[%s] ", date.Local().Format(printDate)))
fmt.Printf(console.Size("%6s ", humanize.IBytes(uint64(v))))
@@ -57,12 +57,12 @@ func printItem(date time.Time, v int64, name string, fileType os.FileMode) {
// doList - list all entities inside a folder
func doList(clnt client.Client, targetURL string) error {
var err error
for itemCh := range clnt.List() {
if itemCh.Err != nil {
err = itemCh.Err
for contentCh := range clnt.List() {
if contentCh.Err != nil {
err = contentCh.Err
break
}
printItem(itemCh.Item.Time, itemCh.Item.Size, itemCh.Item.Name, itemCh.Item.FileType)
printContent(contentCh.Content.Time, contentCh.Content.Size, contentCh.Content.Name, contentCh.Content.FileType)
}
if err != nil {
return iodine.New(err, map[string]string{"Target": targetURL})
@@ -73,12 +73,12 @@ func doList(clnt client.Client, targetURL string) error {
// doListRecursive - list all entities inside folders and sub-folders recursively
func doListRecursive(clnt client.Client, targetURL string) error {
var err error
for itemCh := range clnt.ListRecursive() {
if itemCh.Err != nil {
err = itemCh.Err
for contentCh := range clnt.ListRecursive() {
if contentCh.Err != nil {
err = contentCh.Err
break
}
printItem(itemCh.Item.Time, itemCh.Item.Size, itemCh.Item.Name, itemCh.Item.FileType)
printContent(contentCh.Content.Time, contentCh.Content.Size, contentCh.Content.Name, contentCh.Content.FileType)
}
if err != nil {
return iodine.New(err, map[string]string{"Target": targetURL})

View File

@@ -29,12 +29,13 @@ type Client interface {
MultipartUpload
// Common operations
Stat() (item *Item, err error)
List() <-chan ItemOnChannel
ListRecursive() <-chan ItemOnChannel
Stat() (content *Content, err error)
List() <-chan ContentOnChannel
ListRecursive() <-chan ContentOnChannel
// Bucket operations
PutBucket(acl string) error
PutBucket() error
PutBucketACL(acl string) error
// Object operations
Get() (body io.ReadCloser, size int64, md5 string, err error)
@@ -48,7 +49,7 @@ type MultipartUpload interface {
UploadPart(uploadID string, body io.ReadSeeker, contentLength, partNumber int64) (md5hex string, err error)
CompleteMultiPartUpload(uploadID string) (location, md5hex string, err error)
AbortMultiPartUpload(uploadID string) error
ListParts(uploadID string) (items *PartItems, err error)
ListParts(uploadID string) (contents *PartContents, err error)
}
// Part - part xml response
@@ -59,22 +60,22 @@ type Part struct {
Size int64
}
// PartItems - part xml items response
type PartItems struct {
// PartContents - part xml contents response
type PartContents struct {
Key string
UploadID string
IsTruncated bool
Parts []*Part
}
// ItemOnChannel - List items on channel
type ItemOnChannel struct {
Item *Item
Err error
// ContentOnChannel - List contents on channel
type ContentOnChannel struct {
Content *Content
Err error
}
// Item - object item list
type Item struct {
// Content - object content list
type Content struct {
Name string
Time time.Time
Size int64

View File

@@ -59,11 +59,11 @@ func (f *fsClient) fsStat() (os.FileInfo, error) {
// Get - download an object from bucket
func (f *fsClient) Get() (io.ReadCloser, int64, string, error) {
item, err := f.getFSMetadata()
content, err := f.getFSMetadata()
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
if item.FileType.IsDir() {
if content.FileType.IsDir() {
return nil, 0, "", iodine.New(ISFolder{path: f.path}, nil)
}
body, err := os.Open(f.path)
@@ -82,7 +82,7 @@ func (f *fsClient) Get() (io.ReadCloser, int64, string, error) {
return nil, 0, "", iodine.New(err, nil)
}
md5Str := hex.EncodeToString(h.Sum(nil))
return body, item.Size, md5Str, nil
return body, content.Size, md5Str, nil
}
// GetPartial - download a partial object from bucket
@@ -90,14 +90,14 @@ func (f *fsClient) GetPartial(offset, length int64) (io.ReadCloser, int64, strin
if offset < 0 {
return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil)
}
item, err := f.getFSMetadata()
content, err := f.getFSMetadata()
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
if item.FileType.IsDir() {
if content.FileType.IsDir() {
return nil, 0, "", iodine.New(ISFolder{path: f.path}, nil)
}
if offset > item.Size || (offset+length-1) > item.Size {
if offset > content.Size || (offset+length-1) > content.Size {
return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil)
}
body, err := os.Open(f.path)
@@ -124,26 +124,26 @@ func (f *fsClient) GetPartial(offset, length int64) (io.ReadCloser, int64, strin
}
// List - list files and folders
func (f *fsClient) List() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go f.list(itemCh)
return itemCh
func (f *fsClient) List() <-chan client.ContentOnChannel {
contentCh := make(chan client.ContentOnChannel)
go f.list(contentCh)
return contentCh
}
func (f *fsClient) list(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
func (f *fsClient) list(contentCh chan client.ContentOnChannel) {
defer close(contentCh)
dir, err := os.Open(f.path)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
}
fi, err := os.Lstat(f.path)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
}
defer dir.Close()
@@ -156,46 +156,46 @@ func (f *fsClient) list(itemCh chan client.ItemOnChannel) {
// large quantities of files
files, err := dir.Readdir(-1)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
}
for _, file := range files {
item := &client.Item{
content := &client.Content{
Name: file.Name(),
Time: file.ModTime(),
Size: file.Size(),
FileType: file.Mode(),
}
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
default:
item := &client.Item{
content := &client.Content{
Name: dir.Name(),
Time: fi.ModTime(),
Size: fi.Size(),
FileType: fi.Mode(),
}
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}
// ListRecursive - list files and folders recursively
func (f *fsClient) ListRecursive() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go f.listRecursive(itemCh)
return itemCh
func (f *fsClient) ListRecursive() <-chan client.ContentOnChannel {
contentCh := make(chan client.ContentOnChannel)
go f.listRecursive(contentCh)
return contentCh
}
func (f *fsClient) listRecursive(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
func (f *fsClient) listRecursive(contentCh chan client.ContentOnChannel) {
defer close(contentCh)
visitFS := func(fp string, fi os.FileInfo, err error) error {
if err != nil {
if os.IsPermission(err) { // skip inaccessible files
@@ -203,23 +203,23 @@ func (f *fsClient) listRecursive(itemCh chan client.ItemOnChannel) {
}
return err // fatal
}
item := &client.Item{
content := &client.Content{
Name: fp,
Time: fi.ModTime(),
Size: fi.Size(),
FileType: fi.Mode(),
}
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
return nil
}
err := filepath.Walk(f.path, visitFS)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
}
}
@@ -255,7 +255,16 @@ func aclToPerm(acl string) os.FileMode {
}
// PutBucket - create a new bucket
func (f *fsClient) PutBucket(acl string) error {
func (f *fsClient) PutBucket() error {
err := os.MkdirAll(f.path, 0775)
if err != nil {
return iodine.New(err, nil)
}
return nil
}
// PutBucket - create a new bucket
func (f *fsClient) PutBucketACL(acl string) error {
if !isValidBucketACL(acl) {
return iodine.New(errors.New("invalid acl"), nil)
}
@@ -271,19 +280,19 @@ func (f *fsClient) PutBucket(acl string) error {
}
// getFSMetadata -
func (f *fsClient) getFSMetadata() (item *client.Item, err error) {
func (f *fsClient) getFSMetadata() (content *client.Content, err error) {
st, err := f.fsStat()
if err != nil {
return nil, iodine.New(err, nil)
}
item = new(client.Item)
item.Name = st.Name()
item.Size = st.Size()
item.Time = st.ModTime()
return item, nil
content = new(client.Content)
content.Name = st.Name()
content.Size = st.Size()
content.Time = st.ModTime()
return content, nil
}
// Stat - get metadata from path
func (f *fsClient) Stat() (item *client.Item, err error) {
func (f *fsClient) Stat() (content *client.Content, err error) {
return f.getFSMetadata()
}

View File

@@ -46,6 +46,6 @@ func (c *fsClient) AbortMultiPartUpload(uploadID string) error {
}
// ListParts -
func (c *fsClient) ListParts(uploadID string) (items *client.PartItems, err error) {
func (c *fsClient) ListParts(uploadID string) (contents *client.PartContents, err error) {
return nil, iodine.New(client.APINotImplemented{API: "ListParts"}, nil)
}

View File

@@ -68,12 +68,12 @@ func (s *MySuite) TestList(c *C) {
c.Assert(size, Equals, dataLen)
fsc = New(root)
var items []*client.Item
for itemCh := range fsc.ListRecursive() {
items = append(items, itemCh.Item)
var contents []*client.Content
for contentCh := range fsc.ListRecursive() {
contents = append(contents, contentCh.Content)
}
c.Assert(err, IsNil)
c.Assert(len(items), Equals, 3)
c.Assert(len(contents), Equals, 3)
}
func (s *MySuite) TestPutBucket(c *C) {
@@ -83,7 +83,7 @@ func (s *MySuite) TestPutBucket(c *C) {
bucketPath := filepath.Join(root, "bucket")
fsc := New(bucketPath)
err = fsc.PutBucket("")
err = fsc.PutBucket()
c.Assert(err, IsNil)
}
@@ -94,12 +94,26 @@ func (s *MySuite) TestStatBucket(c *C) {
bucketPath := filepath.Join(root, "bucket")
fsc := New(bucketPath)
err = fsc.PutBucket("")
err = fsc.PutBucket()
c.Assert(err, IsNil)
_, err = fsc.Stat()
c.Assert(err, IsNil)
}
func (s *MySuite) TestPutBucketACL(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "fs-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
bucketPath := filepath.Join(root, "bucket")
fsc := New(bucketPath)
err = fsc.PutBucket()
c.Assert(err, IsNil)
err = fsc.PutBucketACL("private")
c.Assert(err, IsNil)
}
func (s *MySuite) TestPutObject(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "fs-")
c.Assert(err, IsNil)
@@ -169,8 +183,8 @@ func (s *MySuite) TestStat(c *C) {
_, err = io.CopyN(writer, bytes.NewBufferString(data), dataLen)
c.Assert(err, IsNil)
item, err := fsc.Stat()
content, err := fsc.Stat()
c.Assert(err, IsNil)
c.Assert(item.Name, Equals, "object")
c.Assert(item.Size, Equals, dataLen)
c.Assert(content.Name, Equals, "object")
c.Assert(content.Size, Equals, dataLen)
}

View File

@@ -29,7 +29,16 @@ type Client struct {
}
// PutBucket is a mock method
func (m *Client) PutBucket(acl string) error {
func (m *Client) PutBucket() error {
ret := m.Called()
r0 := ret.Error(0)
return r0
}
// PutBucketACL is a mock method
func (m *Client) PutBucketACL(acl string) error {
ret := m.Called(acl)
r0 := ret.Error(0)
@@ -38,12 +47,12 @@ func (m *Client) PutBucket(acl string) error {
}
// Stat is a mock method
func (m *Client) Stat() (*client.Item, error) {
func (m *Client) Stat() (*client.Content, error) {
ret := m.Called()
var r0 *client.Item
var r0 *client.Content
if ret.Get(0) != nil {
r0 = ret.Get(0).(*client.Item)
r0 = ret.Get(0).(*client.Content)
}
r1 := ret.Error(1)
@@ -52,16 +61,16 @@ func (m *Client) Stat() (*client.Item, error) {
}
// List is a mock method
func (m *Client) List() <-chan client.ItemOnChannel {
func (m *Client) List() <-chan client.ContentOnChannel {
ret := m.Called()
r0 := ret.Get(0).(chan client.ItemOnChannel)
r0 := ret.Get(0).(chan client.ContentOnChannel)
return r0
}
// ListRecursive is a mock method
func (m *Client) ListRecursive() <-chan client.ItemOnChannel {
func (m *Client) ListRecursive() <-chan client.ContentOnChannel {
ret := m.Called()
r0 := ret.Get(0).(chan client.ItemOnChannel)
r0 := ret.Get(0).(chan client.ContentOnChannel)
return r0
}
@@ -140,12 +149,12 @@ func (m *Client) AbortMultiPartUpload(uploadID string) error {
}
// ListParts is a mock method
func (m *Client) ListParts(uploadID string) (*client.PartItems, error) {
func (m *Client) ListParts(uploadID string) (*client.PartContents, error) {
ret := m.Called(uploadID)
var r0 *client.PartItems
var r0 *client.PartContents
if ret.Get(0) != nil {
r0 = ret.Get(0).(*client.PartItems)
r0 = ret.Get(0).(*client.PartContents)
}
r1 := ret.Error(1)

View File

@@ -17,13 +17,7 @@
package s3
import (
"encoding/xml"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
"net/http"
@@ -38,8 +32,6 @@ func isValidBucketACL(acl string) bool {
case "public-read":
fallthrough
case "public-read-write":
fallthrough
case "":
return true
default:
return false
@@ -48,99 +40,27 @@ func isValidBucketACL(acl string) bool {
/// Bucket API operations
// Get list of buckets
func (c *s3Client) listBucketsInternal() ([]*client.Item, error) {
var res *http.Response
var err error
u := fmt.Sprintf("%s://%s/", c.Scheme, c.Host)
req, err := c.newRequest("GET", u, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
// do not ignore signatures for 'listBuckets()'
// it is never a public request for amazon s3
// so lets aggressively verify
if strings.Contains(c.Host, "amazonaws.com") && (c.AccessKeyID == "" || c.SecretAccessKey == "") {
msg := "Authorization key cannot be empty for listing buckets, please choose a valid bucketname if its a public request"
return nil, iodine.New(errors.New(msg), nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err = c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
if res != nil {
if res.StatusCode != http.StatusOK {
err = NewError(res)
return nil, iodine.New(err, nil)
}
}
defer res.Body.Close()
type bucket struct {
Name string
CreationDate time.Time
}
type allMyBuckets struct {
Buckets struct {
Bucket []*bucket
}
}
var buckets allMyBuckets
if err := xml.NewDecoder(res.Body).Decode(&buckets); err != nil {
return nil, iodine.New(client.UnexpectedError{
Err: errors.New("Malformed response received from server")},
map[string]string{"XMLError": err.Error()})
}
var items []*client.Item
for _, b := range buckets.Buckets.Bucket {
item := new(client.Item)
item.Name = b.Name
item.Time = b.CreationDate
item.FileType = os.ModeDir
items = append(items, item)
}
return items, nil
}
// PutBucket - create new bucket
func (c *s3Client) PutBucket(acl string) error {
if !isValidBucketACL(acl) {
return iodine.New(InvalidACL{ACL: acl}, nil)
}
bucket, _ := c.url2BucketAndObject()
if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") {
// PutBucket - create a new bucket
func (c *s3Client) PutBucket() error {
bucket, object := c.url2BucketAndObject()
if !client.IsValidBucketName(bucket) {
return iodine.New(InvalidBucketName{Bucket: bucket}, nil)
}
var req *http.Request
var err error
switch len(acl) > 0 {
case true:
u := fmt.Sprintf("%s://%s/%s?acl", c.Scheme, c.Host, bucket)
// new request
req, err = c.newRequest("PUT", u, nil)
if err != nil {
return iodine.New(err, nil)
}
// by default without acl while creating a bucket
// make it default "private"
req.Header.Add("x-amz-acl", acl)
default:
u := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
// new request
req, err = c.newRequest("PUT", u, nil)
if err != nil {
return iodine.New(err, nil)
}
// by default without acl while creating a bucket
// make it default "private"
req.Header.Add("x-amz-acl", "private")
if object != "" {
return iodine.New(InvalidQueryURL{URL: ""}, nil)
}
requestURL, err := c.getRequestURL()
if err != nil {
return iodine.New(err, nil)
}
// new request
req, err := c.newRequest("PUT", requestURL, nil)
if err != nil {
return iodine.New(err, nil)
}
// by default while creating a bucket make it default "private"
req.Header.Add("x-amz-acl", "private")
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
@@ -157,87 +77,52 @@ func (c *s3Client) PutBucket(acl string) error {
return nil
}
func (c *s3Client) getBucketMetadata(bucket string) (item *client.Item, err error) {
if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") {
return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil)
func (c *s3Client) PutBucketACL(acl string) error {
if !isValidBucketACL(acl) {
return iodine.New(InvalidACL{ACL: acl}, nil)
}
u := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
req, err := c.newRequest("HEAD", u, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
item = new(client.Item)
item.Name = bucket
item.FileType = os.ModeDir
defer res.Body.Close()
switch res.StatusCode {
case http.StatusOK:
fallthrough
case http.StatusMovedPermanently:
return item, nil
default:
return nil, iodine.New(NewError(res), nil)
}
}
// getObjectMetadata - returns nil, os.ErrNotExist if not on object storage
func (c *s3Client) getObjectMetadata(bucket, object string) (item *client.Item, err error) {
if !client.IsValidBucketName(bucket) || strings.Contains(bucket, ".") {
return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil)
}
queryURL := c.objectURL(bucket, object)
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("HEAD", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound:
return nil, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil)
case http.StatusOK:
contentLength, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, iodine.New(err, nil)
}
date, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified"))
// AWS S3 uses RFC1123 standard for Date in HTTP header, unlike XML content
if err != nil {
return nil, iodine.New(err, nil)
}
item = new(client.Item)
item.Name = object
item.Time = date
item.Size = contentLength
item.FileType = 0
return item, nil
default:
return nil, iodine.New(NewError(res), nil)
}
}
// Stat - send a 'HEAD' on a bucket or object to see if exists
func (c *s3Client) Stat() (*client.Item, error) {
bucket, object := c.url2BucketAndObject()
if object == "" {
return c.getBucketMetadata(bucket)
if !client.IsValidBucketName(bucket) {
return iodine.New(InvalidBucketName{Bucket: bucket}, nil)
}
return c.getObjectMetadata(bucket, object)
if object != "" {
return iodine.New(InvalidQueryURL{URL: ""}, nil)
}
requestURL, err := c.getRequestURL()
if err != nil {
return iodine.New(err, nil)
}
// new request
u := fmt.Sprintf("%s?acl", requestURL)
req, err := c.newRequest("PUT", u, nil)
if err != nil {
return iodine.New(err, nil)
}
// by default without acl while creating a bucket
// make it default "private"
req.Header.Add("x-amz-acl", acl)
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return iodine.New(err, nil)
}
if res != nil {
if res.StatusCode != http.StatusOK {
return iodine.New(NewError(res), nil)
}
}
defer res.Body.Close()
return nil
}
// Stat - send a 'HEAD' on a bucket or object to get its metadata
func (c *s3Client) Stat() (*client.Content, error) {
bucket, object := c.url2BucketAndObject()
return c.getMetadata(bucket, object)
}

View File

@@ -1,271 +0,0 @@
/*
* Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"bytes"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/minio-io/mc/pkg/client"
"github.com/minio-io/minio/pkg/iodine"
)
/// Bucket API operations
// List - list at delimited path not recursive
func (c *s3Client) List() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go c.list(itemCh)
return itemCh
}
func (c *s3Client) list(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
var items []*client.Item
bucket, objectPrefix := c.url2BucketAndObject()
item, err := c.getObjectMetadata(bucket, objectPrefix)
switch err {
case nil: // List a single object. Exact key
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
default:
if bucket == "" {
items, err = c.listBucketsInternal()
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
return
}
// List all objects matching the key prefix
items, err = c.listObjectsInternal(bucket, "", objectPrefix, "/", globalMaxKeys)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
}
}
// ListRecursive - list buckets and objects recursively
func (c *s3Client) ListRecursive() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go c.listRecursive(itemCh)
return itemCh
}
func (c *s3Client) listRecursive(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
var items []*client.Item
bucket, objectPrefix := c.url2BucketAndObject()
item, err := c.getObjectMetadata(bucket, objectPrefix)
switch err {
case nil: // List a single object. Exact key
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
default:
if bucket == "" {
items, err = c.listBucketsInternal()
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
return
}
// List all objects matching the key prefix
items, err = c.listObjectsInternal(bucket, "", objectPrefix, "", globalMaxKeys)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
}
}
func (c *s3Client) isValidQueryURL(queryURL string) bool {
u, err := url.Parse(queryURL)
if err != nil {
return false
}
if !strings.Contains(u.Scheme, "http") {
return false
}
return true
}
// populate s3 response and decode results into listBucketResults{}
func (c *s3Client) decodeBucketResults(queryURL string) (*listBucketResults, error) {
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
bres := &listBucketResults{}
req, err := c.newRequest("GET", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, iodine.New(NewError(res), nil)
}
var logbuf bytes.Buffer
err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(bres)
if err != nil {
return nil, iodine.New(err, map[string]string{"XMLError": logbuf.String()})
}
return bres, nil
}
// filter items out of content and provide marker for future request
func (c *s3Client) filterItems(startAt, marker, prefix, delimiter string, contents []*item) (items []*client.Item, nextMarker string, err error) {
for _, it := range contents {
if it.Key == marker && it.Key != startAt {
// Skip first dup on pages 2 and higher.
continue
}
if it.Key < startAt {
msg := fmt.Sprintf("Unexpected response from Amazon: item key %q but wanted greater than %q", it.Key, startAt)
return nil, marker, iodine.New(client.UnexpectedError{Err: errors.New(msg)}, nil)
}
item := new(client.Item)
item.Name = it.Key
item.Time = it.LastModified
item.Size = it.Size
item.FileType = 0
items = append(items, item)
nextMarker = it.Key
}
return items, nextMarker, nil
}
// Populare query URL for Listobjects requests
func (c *s3Client) getQueryURL(bucket, marker, prefix, delimiter string, fetchN int) string {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.bucketURL(bucket), fetchN))
switch true {
case marker != "":
buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker)))
fallthrough
case prefix != "":
buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix)))
fallthrough
case delimiter != "":
buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter)))
}
return buffer.String()
}
// listObjectsInternal returns 0 to maxKeys (inclusive) items from the
// provided bucket. Keys before startAt will be skipped. (This is the S3
// 'marker' value). If the length of the returned items is equal to
// maxKeys, there is no indication whether or not the returned list is truncated.
func (c *s3Client) listObjectsInternal(bucket, startAt, prefix, delimiter string, maxKeys int) (items []*client.Item, err error) {
if maxKeys <= 0 {
return nil, iodine.New(InvalidMaxKeys{MaxKeys: maxKeys}, nil)
}
marker := startAt
for len(items) < maxKeys {
fetchN := maxKeys - len(items)
if fetchN > globalMaxKeys {
fetchN = globalMaxKeys
}
bres, err := c.decodeBucketResults(c.getQueryURL(bucket, marker, prefix, delimiter, fetchN))
if err != nil {
return nil, iodine.New(err, nil)
}
if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker {
msg := fmt.Sprintf("Unexpected parse from server: %#v", bres)
return nil, iodine.New(client.UnexpectedError{
Err: errors.New(msg)}, nil)
}
items, marker, err = c.filterItems(startAt, marker, prefix, delimiter, bres.Contents)
if err != nil {
return nil, iodine.New(err, nil)
}
for _, prefix := range bres.CommonPrefixes {
item := &client.Item{
Name: prefix.Prefix,
// TODO no way of fixiing this as of now
Time: time.Now(),
Size: 0,
FileType: os.ModeDir,
}
items = append(items, item)
}
if !bres.IsTruncated {
break
}
if len(items) == 0 {
errMsg := errors.New("No items replied")
return nil, iodine.New(client.UnexpectedError{Err: errMsg}, nil)
}
}
return items, nil
}

View File

@@ -0,0 +1,114 @@
/*
* Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"net/http"
"os"
"strconv"
"time"
"github.com/minio-io/mc/pkg/client"
"github.com/minio-io/minio/pkg/iodine"
)
func (c *s3Client) getMetadata(bucket, object string) (content *client.Content, err error) {
if object == "" {
return c.getBucketMetadata(bucket)
}
return c.getObjectMetadata(bucket, object)
}
func (c *s3Client) getBucketMetadata(bucket string) (content *client.Content, err error) {
queryURL, err := c.getRequestURL()
if err != nil {
return nil, iodine.New(err, nil)
}
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("HEAD", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
content = new(client.Content)
content.Name = bucket
content.FileType = os.ModeDir
defer res.Body.Close()
switch res.StatusCode {
case http.StatusOK:
fallthrough
case http.StatusMovedPermanently:
return content, nil
default:
return nil, iodine.New(NewError(res), nil)
}
}
// getObjectMetadata - returns nil, os.ErrNotExist if not on object storage
func (c *s3Client) getObjectMetadata(bucket, object string) (content *client.Content, err error) {
queryURL, err := c.getRequestURL()
if err != nil {
return nil, iodine.New(err, nil)
}
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("HEAD", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound:
return nil, iodine.New(ObjectNotFound{Bucket: bucket, Object: object}, nil)
case http.StatusOK:
// verify for Content-Length
contentLength, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, iodine.New(err, nil)
}
// AWS S3 uses RFC1123 standard for Date in HTTP header
date, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified"))
if err != nil {
return nil, iodine.New(err, nil)
}
content = new(client.Content)
content.Name = object
content.Time = date
content.Size = contentLength
content.FileType = 0
return content, nil
default:
return nil, iodine.New(NewError(res), nil)
}
}

View File

@@ -0,0 +1,304 @@
/*
1;3803;0c * Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"bytes"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/minio-io/mc/pkg/client"
"github.com/minio-io/minio/pkg/iodine"
)
func (c *s3Client) listInGoRoutine(contentCh chan client.ContentOnChannel) {
defer close(contentCh)
var contents []*client.Content
bucket, objectPrefix := c.url2BucketAndObject()
content, err := c.getObjectMetadata(bucket, objectPrefix)
switch err {
case nil: // List a single object. Exact key
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
default:
if bucket == "" {
contents, err = c.listBuckets()
if err != nil {
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
return
}
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
return
}
// List all objects matching the key prefix
contents, err = c.listObjects(bucket, "", objectPrefix, "/", globalMaxKeys)
if err != nil {
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
return
}
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}
}
func (c *s3Client) listRecursiveInGoRoutine(contentCh chan client.ContentOnChannel) {
defer close(contentCh)
var contents []*client.Content
bucket, objectPrefix := c.url2BucketAndObject()
content, err := c.getObjectMetadata(bucket, objectPrefix)
switch err {
case nil: // List a single object. Exact key
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
default:
if bucket == "" {
contents, err = c.listBuckets()
if err != nil {
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
return
}
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
return
}
// List all objects matching the key prefix
contents, err = c.listObjects(bucket, "", objectPrefix, "", globalMaxKeys)
if err != nil {
contentCh <- client.ContentOnChannel{
Content: nil,
Err: iodine.New(err, nil),
}
return
}
for _, content := range contents {
contentCh <- client.ContentOnChannel{
Content: content,
Err: nil,
}
}
}
}
// populate s3 response and decode results into listBucketResults{}
func (c *s3Client) decodeBucketResults(queryURL string) (*listBucketResults, error) {
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
bres := &listBucketResults{}
req, err := c.newRequest("GET", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, iodine.New(NewError(res), nil)
}
var logbuf bytes.Buffer
err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(bres)
if err != nil {
return nil, iodine.New(err, map[string]string{"XMLError": logbuf.String()})
}
return bres, nil
}
// filter contents out of content and provide marker for future request
func (c *s3Client) filterContents(startAt, marker, prefix, delimiter string, cts []*content) ([]*client.Content, string, error) {
var contents []*client.Content
var nextMarker string
for _, ct := range cts {
if ct.Key == marker && ct.Key != startAt {
// Skip first dup on pages 2 and higher.
continue
}
if ct.Key < startAt {
msg := fmt.Sprintf("Unexpected response from Amazon: content key %q but wanted greater than %q", ct.Key, startAt)
return nil, marker, iodine.New(client.UnexpectedError{Err: errors.New(msg)}, nil)
}
content := new(client.Content)
content.Name = ct.Key
content.Time = ct.LastModified
content.Size = ct.Size
content.FileType = 0
contents = append(contents, content)
nextMarker = ct.Key
}
return contents, nextMarker, nil
}
// Populare query URL for Listobjects requests
func (c *s3Client) getQueryURL(marker, prefix, delimiter string, fetchN int) string {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.mustGetRequestURL(), fetchN))
switch true {
case marker != "":
buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker)))
fallthrough
case prefix != "":
buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix)))
fallthrough
case delimiter != "":
buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter)))
}
return buffer.String()
}
// listObjects returns 0 to maxKeys (inclusive) contents from the
// provided bucket. Keys before startAt will be skipped. (This is the S3
// 'marker' value). If the length of the returned contents is equal to
// maxKeys, there is no indication whether or not the returned list is truncated.
func (c *s3Client) listObjects(bucket, startAt, prefix, delimiter string, maxKeys int) (contents []*client.Content, err error) {
if maxKeys <= 0 {
return nil, iodine.New(InvalidMaxKeys{MaxKeys: maxKeys}, nil)
}
marker := startAt
for len(contents) < maxKeys {
fetchN := maxKeys - len(contents)
if fetchN > globalMaxKeys {
fetchN = globalMaxKeys
}
bres, err := c.decodeBucketResults(c.getQueryURL(marker, prefix, delimiter, fetchN))
if err != nil {
return nil, iodine.New(err, nil)
}
if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker {
msg := fmt.Sprintf("Unexpected parse from server: %#v", bres)
return nil, iodine.New(client.UnexpectedError{
Err: errors.New(msg)}, nil)
}
contents, marker, err = c.filterContents(startAt, marker, prefix, delimiter, bres.Contents)
if err != nil {
return nil, iodine.New(err, nil)
}
for _, prefix := range bres.CommonPrefixes {
content := &client.Content{
Name: prefix.Prefix,
// TODO no way of fixiing this as of now
Time: time.Now(),
Size: 0,
FileType: os.ModeDir,
}
contents = append(contents, content)
}
if !bres.IsTruncated {
break
}
if len(contents) == 0 {
errMsg := errors.New("No contents replied")
return nil, iodine.New(client.UnexpectedError{Err: errMsg}, nil)
}
}
return contents, nil
}
// Get list of buckets
func (c *s3Client) listBuckets() ([]*client.Content, error) {
requestURL, err := c.getRequestURL()
if err != nil {
return nil, iodine.New(err, nil)
}
req, err := c.newRequest("GET", requestURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
// do not ignore signatures for 'listBuckets()' it is never a public request for amazon s3
// so lets aggressively verify
if strings.Contains(c.Host, "amazonaws.com") && (c.AccessKeyID == "" || c.SecretAccessKey == "") {
msg := "Authorization key cannot be empty for listing buckets, please choose a valid bucketname if its a public request"
return nil, iodine.New(errors.New(msg), nil)
}
// rest we can ignore
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, iodine.New(err, nil)
}
if res != nil {
if res.StatusCode != http.StatusOK {
err = NewError(res)
return nil, iodine.New(err, nil)
}
}
defer res.Body.Close()
type bucket struct {
Name string
CreationDate time.Time
}
type allMyBuckets struct {
Buckets struct {
Bucket []*bucket
}
}
var buckets allMyBuckets
if err := xml.NewDecoder(res.Body).Decode(&buckets); err != nil {
return nil, iodine.New(client.UnexpectedError{
Err: errors.New("Malformed response received from server")},
map[string]string{"XMLError": err.Error()})
}
var contents []*client.Content
for _, b := range buckets.Buckets.Bucket {
content := new(client.Content)
content.Name = b.Name
content.Time = b.CreationDate
content.FileType = os.ModeDir
contents = append(contents, content)
}
return contents, nil
}

View File

@@ -1,203 +0,0 @@
/*
* Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/s3"
"github.com/minio-io/mc/pkg/client"
"github.com/minio-io/minio/pkg/iodine"
)
type item struct {
Key string
LastModified time.Time
ETag string
Size int64
}
type prefix struct {
Prefix string
}
type listBucketResults struct {
Contents []*item
IsTruncated bool
MaxKeys int
Name string // bucket name
Marker string
Delimiter string
Prefix string
CommonPrefixes []*prefix
}
// Meta holds Amazon S3 client credentials and flags.
type Meta struct {
*Config
*s3.S3
Transport http.RoundTripper // or nil for the default behavior
}
// Config - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
type Config struct {
AccessKeyID string
SecretAccessKey string
HostURL string
UserAgent string
Debug bool
// Used for SSL transport layer
CertPEM string
KeyPEM string
}
// TLSConfig - TLS cert and key configuration
type TLSConfig struct {
CertPEMBlock []byte
KeyPEMBlock []byte
}
type s3Client struct {
*Meta
// Supports URL in following formats
// - http://<ipaddress>/<bucketname>/<object>
// - http://<bucketname>.<domain>/<object>
*url.URL
}
// url2BucketAndObject converts URL to bucketName and objectName
func (c *s3Client) url2BucketAndObject() (bucketName, objectName string) {
splits := strings.SplitN(c.Path, "/", 3)
switch len(splits) {
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
}
return bucketName, objectName
}
// bucketURL constructs a URL (with a trailing slash) for a given
// bucket. URL is appropriately encoded based on the host's object
// storage implementation.
func (c *s3Client) bucketURL(bucket string) string {
var url string
// Avoid bucket names with "." in them
// Citing - http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
//
// =====================
// When using virtual hostedstyle buckets with SSL, the SSL wild card certificate
// only matches buckets that do not contain periods. To work around this, use HTTP
// or write your own certificate verification logic.
// =====================
//
if client.IsValidBucketName(bucket) {
// if localhost use PathStyle
if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") {
return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
}
// Verify if its ip address, use PathStyle
host, _, _ := net.SplitHostPort(c.Host)
if net.ParseIP(host) != nil {
return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
}
if strings.Contains(c.Host, "amazonaws.com") {
// amazonaws.com use subdomain style
return fmt.Sprintf("%s://%s.%s/", c.Scheme, bucket, c.Host)
}
url = fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
}
return url
}
// objectURL constructs a URL using bucket and object
func (c *s3Client) objectURL(bucket, object string) string {
url := c.bucketURL(bucket)
if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") {
return url + "/" + object
}
// Verify if its ip address, use PathStyle
host, _, _ := net.SplitHostPort(c.Host)
if net.ParseIP(host) != nil {
return url + "/" + object
}
// if not amazon do not construct a subdomain URL
if !strings.Contains(c.Host, "amazonaws.com") {
return url + "/" + object
}
return url + object
}
// Instantiate a new request
func (c *s3Client) newRequest(method, url string, body io.ReadCloser) (*http.Request, error) {
errParams := map[string]string{
"url": url,
"method": method,
"userAgent": c.UserAgent,
}
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, iodine.New(err, errParams)
}
req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
return req, nil
}
// New returns an initialized s3Client structure.
// if debug use a internal trace transport
func New(config *Config) client.Client {
u, err := url.Parse(config.HostURL)
if err != nil {
return nil
}
var traceTransport RoundTripTrace
var transport http.RoundTripper
if config.Debug {
traceTransport = GetNewTraceTransport(NewTrace(false, true, nil), http.DefaultTransport)
transport = GetNewTraceTransport(s3Verify{}, traceTransport)
} else {
transport = http.DefaultTransport
}
awsConf := aws.DefaultConfig
awsConf.Credentials = aws.Creds(config.AccessKeyID, config.SecretAccessKey, "")
awsConf.HTTPClient = &http.Client{Transport: transport}
awsConf.Logger = ioutil.Discard
s3c := &s3Client{
&Meta{
Config: config,
Transport: transport,
S3: s3.New(awsConf),
}, u,
}
return s3c
}

View File

@@ -37,7 +37,7 @@ type MySuite struct{}
var _ = Suite(&MySuite{})
func listAllMyBuckets(r io.Reader) ([]*client.Item, error) {
func listAllMyBuckets(r io.Reader) ([]*client.Content, error) {
type bucket struct {
Name string
CreationDate time.Time
@@ -53,14 +53,14 @@ func listAllMyBuckets(r io.Reader) ([]*client.Item, error) {
return nil, iodine.New(client.UnexpectedError{Err: errors.New("Malformed response received from server")},
map[string]string{"XMLError": err.Error()})
}
var items []*client.Item
var contents []*client.Content
for _, b := range buckets.Buckets.Bucket {
item := new(client.Item)
item.Name = b.Name
item.Time = b.CreationDate
items = append(items, item)
content := new(client.Content)
content.Name = b.Name
content.Time = b.CreationDate
contents = append(contents, content)
}
return items, nil
return contents, nil
}
func (s *MySuite) TestConfig(c *C) {
@@ -81,7 +81,7 @@ func (s *MySuite) TestBucketACL(c *C) {
{"private", true},
{"public-read", true},
{"public-read-write", true},
{"", true},
{"", false},
{"readonly", false},
{"invalid", false},
}
@@ -108,7 +108,7 @@ func (s *MySuite) TestParseBuckets(c *C) {
t1, err := time.Parse(time.RFC3339, "2006-06-21T07:04:31.000Z")
t2, err := time.Parse(time.RFC3339, "2006-06-21T07:04:32.000Z")
want := []*client.Item{
want := []*client.Content{
{Name: "bucketOne", Time: t1},
{Name: "bucketTwo", Time: t2},
}

56
pkg/client/s3/common.go Normal file
View File

@@ -0,0 +1,56 @@
/*
* Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"io/ioutil"
"net/http"
"net/url"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/s3"
"github.com/minio-io/mc/pkg/client"
)
// New returns an initialized s3Client structure.
// if debug use a internal trace transport
func New(config *Config) client.Client {
u, err := url.Parse(config.HostURL)
if err != nil {
return nil
}
var traceTransport RoundTripTrace
var transport http.RoundTripper
if config.Debug {
traceTransport = GetNewTraceTransport(NewTrace(false, true, nil), http.DefaultTransport)
transport = GetNewTraceTransport(s3Verify{}, traceTransport)
} else {
transport = http.DefaultTransport
}
awsConf := aws.DefaultConfig
awsConf.Credentials = aws.Creds(config.AccessKeyID, config.SecretAccessKey, "")
awsConf.HTTPClient = &http.Client{Transport: transport}
awsConf.Logger = ioutil.Discard
s3c := &s3Client{
&Meta{
Config: config,
Transport: transport,
S3: s3.New(awsConf),
}, u,
}
return s3c
}

View File

@@ -0,0 +1,68 @@
package s3
import (
"net/http"
"net/url"
"time"
"github.com/awslabs/aws-sdk-go/service/s3"
)
//
type content struct {
Key string
LastModified time.Time
ETag string
Size int64
}
// prefix
type prefix struct {
Prefix string
}
type listBucketResults struct {
Contents []*content
IsTruncated bool
MaxKeys int
Name string // bucket name
Marker string
Delimiter string
Prefix string
CommonPrefixes []*prefix
}
// Meta holds Amazon S3 client credentials and flags.
type Meta struct {
*Config
*s3.S3
Transport http.RoundTripper // or nil for the default behavior
}
// Config - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
type Config struct {
AccessKeyID string
SecretAccessKey string
HostURL string
UserAgent string
Debug bool
// Used for SSL transport layer
CertPEM string
KeyPEM string
}
// TLSConfig - TLS cert and key configuration
type TLSConfig struct {
CertPEMBlock []byte
KeyPEMBlock []byte
}
type s3Client struct {
*Meta
// Supports URL in following formats
// - http://<ipaddress>/<bucketname>/<object>
// - http://<bucketname>.<domain>/<object>
*url.URL
}

View File

@@ -0,0 +1,35 @@
/*
* Minio Client (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import "github.com/minio-io/mc/pkg/client"
/// Bucket API operations
// List - list at delimited path not recursive
func (c *s3Client) List() <-chan client.ContentOnChannel {
contentCh := make(chan client.ContentOnChannel)
go c.listInGoRoutine(contentCh)
return contentCh
}
// ListRecursive - list buckets and objects recursively
func (c *s3Client) ListRecursive() <-chan client.ContentOnChannel {
contentCh := make(chan client.ContentOnChannel)
go c.listRecursiveInGoRoutine(contentCh)
return contentCh
}

View File

@@ -29,17 +29,36 @@ import (
/// Object API operations
// Get - download a requested object from a given bucket
func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error) {
bucket, object := c.url2BucketAndObject()
if !client.IsValidBucketName(bucket) {
return nil, 0, "", iodine.New(InvalidBucketName{Bucket: bucket}, nil)
func (c *s3Client) setRange(req *http.Request, offset, length int64) (*http.Request, error) {
if offset < 0 {
return nil, iodine.New(client.InvalidRange{Offset: offset}, nil)
}
if length >= 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
} else {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
}
return req, nil
}
func (c *s3Client) get() (*http.Request, error) {
queryURL, err := c.getRequestURL()
if err != nil {
return nil, iodine.New(err, nil)
}
queryURL := c.objectURL(bucket, object)
if !c.isValidQueryURL(queryURL) {
return nil, 0, "", iodine.New(InvalidQueryURL{URL: queryURL}, nil)
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("GET", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
return req, nil
}
// Get - download a requested object from a given bucket
func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error) {
req, err := c.get()
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
@@ -61,35 +80,21 @@ func (c *s3Client) Get() (body io.ReadCloser, size int64, md5 string, err error)
// GetPartial fetches part of the s3 object in bucket.
// If length is negative, the rest of the object is returned.
func (c *s3Client) GetPartial(offset, length int64) (body io.ReadCloser, size int64, md5 string, err error) {
bucket, object := c.url2BucketAndObject()
if !client.IsValidBucketName(bucket) {
return nil, 0, "", iodine.New(InvalidBucketName{Bucket: bucket}, nil)
}
if offset < 0 {
return nil, 0, "", iodine.New(client.InvalidRange{Offset: offset}, nil)
}
queryURL := c.objectURL(bucket, object)
if !c.isValidQueryURL(queryURL) {
return nil, 0, "", iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("GET", queryURL, nil)
req, err := c.get()
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
if length >= 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
} else {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
req, err = c.setRange(req, offset, length)
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
res, err := c.Transport.RoundTrip(req)
if err != nil {
return nil, 0, "", iodine.New(err, nil)
}
switch res.StatusCode {
case http.StatusOK, http.StatusPartialContent:
return res.Body, res.ContentLength, res.Header.Get("ETag"), nil

View File

@@ -87,7 +87,7 @@ func (c *s3Client) AbortMultiPartUpload(uploadID string) error {
}
// ListParts
func (c *s3Client) ListParts(uploadID string) (items *client.PartItems, err error) {
func (c *s3Client) ListParts(uploadID string) (contents *client.PartContents, err error) {
bucket, object := c.url2BucketAndObject()
listPartsInput := new(s3.ListPartsInput)
listPartsInput.Bucket = &bucket
@@ -98,18 +98,18 @@ func (c *s3Client) ListParts(uploadID string) (items *client.PartItems, err erro
if err != nil {
return nil, iodine.New(err, nil)
}
items = new(client.PartItems)
items.Key = *listPartsOutput.Key
items.IsTruncated = *listPartsOutput.IsTruncated
items.UploadID = *listPartsOutput.UploadID
contents = new(client.PartContents)
contents.Key = *listPartsOutput.Key
contents.IsTruncated = *listPartsOutput.IsTruncated
contents.UploadID = *listPartsOutput.UploadID
for _, part := range listPartsOutput.Parts {
newPart := new(client.Part)
newPart.ETag = *part.ETag
newPart.LastModified = *part.LastModified
newPart.PartNumber = *part.PartNumber
newPart.Size = *part.Size
items.Parts = append(items.Parts, newPart)
contents.Parts = append(contents.Parts, newPart)
}
return items, nil
return contents, nil
}

View File

@@ -22,6 +22,7 @@ import (
"errors"
"io"
"net/http"
"strconv"
"strings"
"github.com/minio-io/mc/pkg/client"
@@ -30,49 +31,48 @@ import (
/// Object Operations PUT - keeping this in a separate file for readability
// Put - upload new object to bucket
func (c *s3Client) Put(md5HexString string, size int64) (io.WriteCloser, error) {
bucket, object := c.url2BucketAndObject()
if !client.IsValidBucketName(bucket) {
return nil, iodine.New(InvalidBucketName{Bucket: bucket}, nil)
func (c *s3Client) put(size int64) (*http.Request, error) {
queryURL, err := c.getRequestURL()
if err != nil {
return nil, iodine.New(err, nil)
}
queryURL := c.objectURL(bucket, object)
if !c.isValidQueryURL(queryURL) {
return nil, iodine.New(InvalidQueryURL{URL: queryURL}, nil)
}
req, err := c.newRequest("PUT", queryURL, nil)
if err != nil {
return nil, iodine.New(err, nil)
}
req.Header.Set("Content-Length", strconv.FormatInt(size, 10))
return req, nil
}
// Put - upload new object to bucket
func (c *s3Client) Put(md5HexString string, size int64) (io.WriteCloser, error) {
// if size is negative
if size < 0 {
return nil, iodine.New(client.InvalidArgument{Err: errors.New("invalid argument")}, nil)
}
req, err := c.put(size)
if err != nil {
return nil, iodine.New(err, nil)
}
// set Content-MD5 only if md5 is provided
if strings.TrimSpace(md5HexString) != "" {
md5, err := hex.DecodeString(md5HexString)
if err != nil {
return nil, iodine.New(err, nil)
}
req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5))
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
// starting Pipe session
r, w := io.Pipe()
blockingWriter := client.NewBlockingWriteCloser(w)
go func() {
if size < 0 {
err := iodine.New(client.InvalidArgument{Err: errors.New("invalid argument")}, nil)
r.CloseWithError(err)
blockingWriter.Release(err)
return
}
req, err := c.newRequest("PUT", queryURL, r)
if err != nil {
err := iodine.New(err, nil)
r.CloseWithError(err)
blockingWriter.Release(err)
return
}
req.Method = "PUT"
req.ContentLength = size
// set Content-MD5 only if md5 is provided
if strings.TrimSpace(md5HexString) != "" {
md5, err := hex.DecodeString(md5HexString)
if err != nil {
err := iodine.New(err, nil)
r.CloseWithError(err)
blockingWriter.Release(err)
return
}
req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5))
}
if c.AccessKeyID != "" && c.SecretAccessKey != "" {
c.signRequest(req, c.Host)
}
req.Body = r
// this is necessary for debug, since the underlying transport is a wrapper
res, err := c.Transport.RoundTrip(req)
if err != nil {

107
pkg/client/s3/request.go Normal file
View File

@@ -0,0 +1,107 @@
package s3
import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/minio-io/mc/pkg/client"
"github.com/minio-io/minio/pkg/iodine"
)
func (c *s3Client) isValidQueryURL(queryURL string) bool {
u, err := url.Parse(queryURL)
if err != nil {
return false
}
if !strings.Contains(u.Scheme, "http") {
return false
}
return true
}
// url2BucketAndObject gives bucketName and objectName from URL path
func (c *s3Client) url2BucketAndObject() (bucketName, objectName string) {
splits := strings.SplitN(c.Path, "/", 3)
switch len(splits) {
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
}
return bucketName, objectName
}
func (c *s3Client) getBucketRequestURL(bucket string) string {
// default to path style
url := fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
if strings.Contains(c.Host, "amazonaws.com") {
// amazonaws.com use subdomain style
url = fmt.Sprintf("%s://%s.%s", c.Scheme, bucket, c.Host)
}
return url
}
// getObjectRequestURL constructs a URL using bucket and object
func (c *s3Client) getObjectRequestURL(bucket, object string) string {
return c.getBucketRequestURL(bucket) + "/" + object
}
func (c *s3Client) mustGetRequestURL() string {
requestURL, _ := c.getRequestURL()
return requestURL
}
// getRequestURL constructs a URL. URL is appropriately encoded based on the host's object storage implementation.
func (c *s3Client) getRequestURL() (string, error) {
bucket, object := c.url2BucketAndObject()
// Avoid bucket names with "." in them
// Citing - http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
//
// =====================
// When using virtual hostedstyle buckets with SSL, the SSL wild card certificate
// only matches buckets that do not contain periods. To work around this, use HTTP
// or write your own certificate verification logic.
// =====================
//
if !client.IsValidBucketName(bucket) {
return "", iodine.New(InvalidBucketName{Bucket: bucket}, nil)
}
if !client.IsValidObject(object) {
return "", iodine.New(InvalidObjectName{Bucket: bucket, Object: object}, nil)
}
switch {
case bucket == "" && object == "":
return fmt.Sprintf("%s://%s/", c.Scheme, c.Host), nil
case bucket != "" && object == "":
return c.getBucketRequestURL(bucket), nil
case bucket != "" && object != "":
return c.getObjectRequestURL(bucket, object), nil
}
return "", iodine.New(errors.New("Unexpected error, please report this.."), nil)
}
// Instantiate a new request
func (c *s3Client) newRequest(method, url string, body io.ReadCloser) (*http.Request, error) {
errParams := map[string]string{
"url": url,
"method": method,
"userAgent": c.UserAgent,
}
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, iodine.New(err, errParams)
}
req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
return req, nil
}