1
0
mirror of https://github.com/minio/mc.git synced 2025-11-13 12:22:45 +03:00

ListOnChannel() - now listbuckets and objects through channel replies.

Still doesn't do full recursive of all objects based on marker, that is the next step.
This commit is contained in:
Harshavardhana
2015-04-23 17:59:25 -07:00
parent 431143f7d0
commit 527b1ca96d
7 changed files with 146 additions and 18 deletions

View File

@@ -52,24 +52,29 @@ func printItem(date time.Time, v int64, name string) {
func doList(clnt client.Client, targetURL string) (string, error) {
var err error
var items []*client.Item
items, err = clnt.List()
if err != nil && isValidRetry(err) {
console.Infof("Retrying ...")
for itemOnChannel := range clnt.ListOnChannel() {
if itemOnChannel.Err != nil {
err = itemOnChannel.Err
break
}
for i := 1; i <= globalMaxRetryFlag && err != nil && isValidRetry(err); i++ {
items, err = clnt.List()
console.Errorf(" %d", i)
// Progressively longer delays
time.Sleep(time.Duration(i*i) * time.Second)
printItem(itemOnChannel.Item.Time, itemOnChannel.Item.Size, itemOnChannel.Item.Name)
}
switch iodine.ToError(err).(type) {
case client.APINotImplemented:
items, err := clnt.List()
if err != nil {
err = iodine.New(err, nil)
msg := fmt.Sprintf("mc: listing objects for URL [%s] failed with following reason: [%s]\n", targetURL, iodine.ToError(err))
return msg, err
}
printItems(items)
default:
if err != nil {
err = iodine.New(err, nil)
msg := fmt.Sprintf("mc: listing objects for URL [%s] failed with following reason: [%s]\n", targetURL, iodine.ToError(err))
return msg, err
}
}
return "", nil
}

View File

@@ -289,6 +289,15 @@ func (s *CmdTestSuite) TestLsCmdWithBucket(c *C) {
sourceURLConfigMap[sourceURL] = sourceConfig
manager.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
go func() {
defer close(itemCh)
itemCh <- client.ItemOnChannel{
Item: nil,
Err: client.APINotImplemented{},
}
}()
cl1.On("ListOnChannel").Return(itemCh).Once()
cl1.On("List").Return(items, nil).Once()
msg, err := doListCmd(manager, sourceURL, sourceConfig, false)
c.Assert(msg, Equals, "")
@@ -323,6 +332,15 @@ func (s *CmdTestSuite) TestLsCmdWithFilePath(c *C) {
sourceURLConfigMap[sourceURL] = sourceConfig
manager.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
go func() {
defer close(itemCh)
itemCh <- client.ItemOnChannel{
Item: nil,
Err: client.APINotImplemented{},
}
}()
cl1.On("ListOnChannel").Return(itemCh).Once()
cl1.On("List").Return(items, nil).Once()
msg, err := doListCmd(manager, sourceURL, sourceConfig, false)
c.Assert(msg, Equals, "")
@@ -351,6 +369,15 @@ func (s *CmdTestSuite) TestLsCmdListsBuckets(c *C) {
sourceURLConfigMap[sourceURL] = sourceConfig
manager.On("getNewClient", sourceURL, sourceConfig, false).Return(cl1, nil).Once()
itemCh := make(chan client.ItemOnChannel)
go func() {
defer close(itemCh)
itemCh <- client.ItemOnChannel{
Item: nil,
Err: client.APINotImplemented{},
}
}()
cl1.On("ListOnChannel").Return(itemCh).Once()
cl1.On("List").Return(buckets, nil).Once()
msg, err := doListCmd(manager, sourceURL, sourceConfig, false)
c.Assert(msg, Equals, "")

View File

@@ -30,7 +30,7 @@ type Client interface {
// Common operations
Stat() error
List() (items []*Item, err error)
// ListNew() error
ListOnChannel() <-chan ItemOnChannel
// Bucket operations
PutBucket(acl string) error
@@ -67,6 +67,12 @@ type PartItems struct {
Part []*Part
}
// ItemOnChannel - List items on channel
type ItemOnChannel struct {
Item *Item
Err error
}
// Item - object item list
type Item struct {
Name string

View File

@@ -110,6 +110,20 @@ func (f *fsClient) GetObjectMetadata() (item *client.Item, reterr error) {
return item, nil
}
func (f *fsClient) ListOnChannel() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go f.listInGoroutine(itemCh)
return itemCh
}
func (f *fsClient) listInGoroutine(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(client.APINotImplemented{API: "ListOnChannel"}, nil),
}
}
// List - get a list of items
func (f *fsClient) List() (items []*client.Item, err error) {
visitFS := func(fp string, fi os.FileInfo, err error) error {

View File

@@ -125,6 +125,20 @@ func (f *fsClient) listBuckets() ([]*client.Item, error) {
return results, nil
}
func (f *fsClient) ListOnChannel() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go f.listInGoroutine(itemCh)
return itemCh
}
func (f *fsClient) listInGoroutine(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(client.APINotImplemented{API: "ListOnChannel"}, nil),
}
}
// List - get a list of items
func (f *fsClient) List() (items []*client.Item, err error) {
item, err := f.GetObjectMetadata()

View File

@@ -30,6 +30,13 @@ func (m *Client) Stat() error {
return r0
}
// ListOnChannel is a mock method
func (m *Client) ListOnChannel() <-chan client.ItemOnChannel {
ret := m.Called()
r0 := ret.Get(0).(chan client.ItemOnChannel)
return r0
}
// List is a mock method
func (m *Client) List() ([]*client.Item, error) {
ret := m.Called()

View File

@@ -34,7 +34,62 @@ import (
/// Bucket API operations
// ListObjects - list objects inside a bucket or with prefix
// ListOnChannel -
func (c *s3Client) ListOnChannel() <-chan client.ItemOnChannel {
itemCh := make(chan client.ItemOnChannel)
go c.listInGoroutine(itemCh)
return itemCh
}
func (c *s3Client) listInGoroutine(itemCh chan client.ItemOnChannel) {
defer close(itemCh)
var items []*client.Item
bucket, objectPrefix := c.url2Object()
item, err := c.GetObjectMetadata()
switch err {
case nil: // List a single object. Exact key
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
default:
if bucket == "" {
items, err = c.listBucketsInternal()
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
return
}
// List all objects matching the key prefix
items, err = c.listObjectsInternal(bucket, "", objectPrefix, "", globalMaxKeys)
if err != nil {
itemCh <- client.ItemOnChannel{
Item: nil,
Err: iodine.New(err, nil),
}
return
}
for _, item := range items {
itemCh <- client.ItemOnChannel{
Item: item,
Err: nil,
}
}
}
}
// List - list objects inside a bucket or with prefix
func (c *s3Client) List() (items []*client.Item, err error) {
bucket, objectPrefix := c.url2Object()
item, err := c.GetObjectMetadata()