mirror of
https://github.com/minio/mc.git
synced 2025-11-13 12:22:45 +03:00
Code cleanup and simplification
- Print in human-readable not amazon format
- Send stat() before accessing object, error gracefully
- Move client and bucket API operations to their own files
- Remove unused functions, constants and variables
- Rename API operations to sensible names
* GetBucket --> ListObjects
* Buckets --> ListBuckets
This commit is contained in:
@@ -48,7 +48,6 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
//"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
|
||||
175
pkg/s3/bucket.go
Normal file
175
pkg/s3/bucket.go
Normal file
@@ -0,0 +1,175 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"encoding/xml"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// bySize implements sort.Interface for []Item based on the Size field.
|
||||
type bySize []*Item
|
||||
|
||||
func (a bySize) Len() int { return len(a) }
|
||||
func (a bySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a bySize) Less(i, j int) bool { return a[i].Size < a[j].Size }
|
||||
|
||||
/// Bucket API operations
|
||||
|
||||
// ListBuckets - Get list of buckets
|
||||
func (c *Client) ListBuckets() ([]*Bucket, error) {
|
||||
req := newReq(c.endpoint() + "/")
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("s3: Unexpected status code %d fetching bucket list", res.StatusCode)
|
||||
}
|
||||
return parseListAllMyBuckets(res.Body)
|
||||
}
|
||||
|
||||
// PutBucket - create new bucket
|
||||
func (c *Client) PutBucket(bucket string) error {
|
||||
var url string
|
||||
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
||||
url = fmt.Sprintf("%s/%s", c.endpoint(), bucket)
|
||||
}
|
||||
req := newReq(url)
|
||||
req.Method = "PUT"
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Got response code %d from s3", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// ListObjects returns 0 to maxKeys (inclusive) items from the
|
||||
// provided bucket. Keys before startAt will be skipped. (This is the S3
|
||||
// 'marker' value). If the length of the returned items is equal to
|
||||
// maxKeys, there is no indication whether or not the returned list is truncated.
|
||||
func (c *Client) ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error) {
|
||||
var urlReq string
|
||||
var buffer bytes.Buffer
|
||||
|
||||
if maxKeys <= 0 {
|
||||
return nil, nil, fmt.Errorf("negative maxKeys are invalid")
|
||||
}
|
||||
|
||||
marker := startAt
|
||||
for len(items) < maxKeys {
|
||||
fetchN := maxKeys - len(items)
|
||||
if fetchN > MaxKeys {
|
||||
fetchN = MaxKeys
|
||||
}
|
||||
var bres listBucketResults
|
||||
buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.bucketURL(bucket), fetchN))
|
||||
switch true {
|
||||
case marker != "":
|
||||
buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker)))
|
||||
fallthrough
|
||||
case prefix != "":
|
||||
buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix)))
|
||||
fallthrough
|
||||
case delimiter != "":
|
||||
buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter)))
|
||||
}
|
||||
|
||||
urlReq = buffer.String()
|
||||
// Try the enumerate three times, since Amazon likes to close
|
||||
// https connections a lot, and Go sucks at dealing with it:
|
||||
// https://code.google.com/p/go/issues/detail?id=3514
|
||||
const maxTries = 5
|
||||
for try := 1; try <= maxTries; try++ {
|
||||
time.Sleep(time.Duration(try-1) * 100 * time.Millisecond)
|
||||
req := newReq(urlReq)
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
if try < maxTries {
|
||||
continue
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
if res.StatusCode < 500 {
|
||||
body, _ := ioutil.ReadAll(io.LimitReader(res.Body, 1<<20))
|
||||
aerr := &Error{
|
||||
Op: "ListBucket",
|
||||
Code: res.StatusCode,
|
||||
Body: body,
|
||||
Header: res.Header,
|
||||
}
|
||||
aerr.parseXML()
|
||||
res.Body.Close()
|
||||
return nil, nil, aerr
|
||||
}
|
||||
} else {
|
||||
bres = listBucketResults{}
|
||||
var logbuf bytes.Buffer
|
||||
err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(&bres)
|
||||
if err != nil {
|
||||
fmt.Printf("Error parsing s3 XML response: %v for %q", err, logbuf.Bytes())
|
||||
} else if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker {
|
||||
err = fmt.Errorf("Unexpected parse from server: %#v from: %s", bres, logbuf.Bytes())
|
||||
fmt.Print(err)
|
||||
}
|
||||
}
|
||||
res.Body.Close()
|
||||
if err != nil {
|
||||
if try < maxTries-1 {
|
||||
continue
|
||||
}
|
||||
fmt.Print(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
break
|
||||
}
|
||||
for _, it := range bres.Contents {
|
||||
if it.Key == marker && it.Key != startAt {
|
||||
// Skip first dup on pages 2 and higher.
|
||||
continue
|
||||
}
|
||||
if it.Key < startAt {
|
||||
return nil, nil, fmt.Errorf("Unexpected response from Amazon: item key %q but wanted greater than %q", it.Key, startAt)
|
||||
}
|
||||
items = append(items, it)
|
||||
marker = it.Key
|
||||
}
|
||||
|
||||
for _, pre := range bres.CommonPrefixes {
|
||||
if pre.Prefix != "" {
|
||||
prefixes = append(prefixes, pre)
|
||||
}
|
||||
}
|
||||
|
||||
if !bres.IsTruncated {
|
||||
break
|
||||
}
|
||||
|
||||
if len(items) == 0 {
|
||||
return nil, nil, fmt.Errorf("No items replied")
|
||||
}
|
||||
}
|
||||
sort.Sort(bySize(items))
|
||||
return items, prefixes, nil
|
||||
}
|
||||
476
pkg/s3/client.go
476
pkg/s3/client.go
@@ -40,35 +40,64 @@ limitations under the License.
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"encoding/xml"
|
||||
)
|
||||
|
||||
const xmlTimeFormat = "2006-01-02T15:04:05.000Z"
|
||||
// Date format
|
||||
const (
|
||||
xmlTimeFormat = "2006-01-02T15:04:05.000Z"
|
||||
)
|
||||
|
||||
// Total max object list
|
||||
const (
|
||||
MaxKeys = 1000
|
||||
)
|
||||
|
||||
type xmlTime struct {
|
||||
time.Time
|
||||
}
|
||||
|
||||
func parseTime(t string) time.Time {
|
||||
ti, _ := time.Parse(xmlTimeFormat, t)
|
||||
return ti
|
||||
// Client is an Amazon S3 client.
|
||||
type Client struct {
|
||||
*Auth
|
||||
Transport http.RoundTripper // or nil for the default
|
||||
}
|
||||
|
||||
// Bucket - carries s3 bucket reply header
|
||||
type Bucket struct {
|
||||
Name string
|
||||
CreationDate xmlTime // 2006-02-03T16:45:09.000Z
|
||||
}
|
||||
|
||||
// Item - object item list
|
||||
type Item struct {
|
||||
Key string
|
||||
LastModified xmlTime
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Prefix - common prefix
|
||||
type Prefix struct {
|
||||
Prefix string
|
||||
}
|
||||
|
||||
type listBucketResults struct {
|
||||
Contents []*Item
|
||||
IsTruncated bool
|
||||
MaxKeys int
|
||||
Name string // bucket name
|
||||
Marker string
|
||||
Delimiter string
|
||||
Prefix string
|
||||
CommonPrefixes []*Prefix
|
||||
}
|
||||
|
||||
func (c *xmlTime) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
@@ -85,21 +114,8 @@ func (c *xmlTime) UnmarshalXMLAttr(attr xml.Attr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Total max object list
|
||||
const (
|
||||
MaxKeys = 1000
|
||||
)
|
||||
|
||||
// Client is an Amazon S3 client.
|
||||
type Client struct {
|
||||
*Auth
|
||||
Transport http.RoundTripper // or nil for the default
|
||||
}
|
||||
|
||||
// Bucket - carries s3 bucket reply header
|
||||
type Bucket struct {
|
||||
Name string
|
||||
CreationDate xmlTime
|
||||
func (c *xmlTime) format() string {
|
||||
return c.Time.Format(xmlTimeFormat)
|
||||
}
|
||||
|
||||
func (c *Client) transport() http.RoundTripper {
|
||||
@@ -170,355 +186,6 @@ func parseListAllMyBuckets(r io.Reader) ([]*Bucket, error) {
|
||||
return res.Buckets.Bucket, nil
|
||||
}
|
||||
|
||||
/// Object API operations
|
||||
|
||||
// Buckets - Get list of buckets
|
||||
func (c *Client) Buckets() ([]*Bucket, error) {
|
||||
req := newReq(c.endpoint() + "/")
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("s3: Unexpected status code %d fetching bucket list", res.StatusCode)
|
||||
}
|
||||
return parseListAllMyBuckets(res.Body)
|
||||
}
|
||||
|
||||
// Stat - returns 0, "", os.ErrNotExist if not on S3
|
||||
func (c *Client) Stat(key, bucket string) (size int64, date time.Time, reterr error) {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
req.Method = "HEAD"
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound:
|
||||
return 0, date, os.ErrNotExist
|
||||
case http.StatusOK:
|
||||
size, err = strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
if dateStr := res.Header.Get("Last-Modified"); dateStr != "" {
|
||||
// AWS S3 uses RFC1123 standard for Date in HTTP header, unlike XML content
|
||||
date, err := time.Parse(time.RFC1123, dateStr)
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
return size, date, nil
|
||||
}
|
||||
default:
|
||||
return 0, date, fmt.Errorf("s3: Unexpected status code %d statting object %v", res.StatusCode, key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PutBucket - create new bucket
|
||||
func (c *Client) PutBucket(bucket string) error {
|
||||
var url string
|
||||
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
||||
url = fmt.Sprintf("%s/%s", c.endpoint(), bucket)
|
||||
}
|
||||
req := newReq(url)
|
||||
req.Method = "PUT"
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Got response code %d from s3", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Put - upload new object to bucket
|
||||
func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
req.Method = "PUT"
|
||||
req.ContentLength = size
|
||||
|
||||
h := md5.New()
|
||||
// Memory where data is present
|
||||
sink := new(bytes.Buffer)
|
||||
mw := io.MultiWriter(h, sink)
|
||||
written, err := io.Copy(mw, contents)
|
||||
if written != size {
|
||||
return fmt.Errorf("Data read mismatch")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Body = ioutil.NopCloser(sink)
|
||||
b64 := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||
req.Header.Set("Content-MD5", b64)
|
||||
c.Auth.signRequest(req)
|
||||
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
// res.Write(os.Stderr)
|
||||
return fmt.Errorf("Got response code %d from s3", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Item - object item list
|
||||
type Item struct {
|
||||
Key string
|
||||
LastModified xmlTime
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Prefix - common prefix
|
||||
type Prefix struct {
|
||||
Prefix string
|
||||
}
|
||||
|
||||
// BySize implements sort.Interface for []Item based on
|
||||
// the Size field.
|
||||
type BySize []*Item
|
||||
|
||||
func (a BySize) Len() int { return len(a) }
|
||||
func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a BySize) Less(i, j int) bool { return a[i].Size < a[j].Size }
|
||||
|
||||
type listBucketResults struct {
|
||||
Contents []*Item
|
||||
IsTruncated bool
|
||||
MaxKeys int
|
||||
Name string // bucket name
|
||||
Marker string
|
||||
Delimiter string
|
||||
Prefix string
|
||||
CommonPrefixes []*Prefix
|
||||
}
|
||||
|
||||
// BucketLocation - returns the S3 endpoint to be used with the given bucket.
|
||||
func (c *Client) BucketLocation(bucket string) (location string, err error) {
|
||||
if !strings.HasSuffix(c.endpoint(), "amazonaws.com") {
|
||||
return "", errors.New("BucketLocation not implemented for non-Amazon S3 endpoints")
|
||||
}
|
||||
urlReq := fmt.Sprintf("%s/%s/?location", c.endpoint(), url.QueryEscape(bucket))
|
||||
req := newReq(urlReq)
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var xres xmlLocationConstraint
|
||||
if err := xml.NewDecoder(res.Body).Decode(&xres); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if xres.Location == "" {
|
||||
return strings.TrimPrefix(c.endpoint(), "https://"), nil
|
||||
}
|
||||
return "s3-" + xres.Location + ".amazonaws.com", nil
|
||||
}
|
||||
|
||||
// GetBucket (List Objects) returns 0 to maxKeys (inclusive) items from the
|
||||
// provided bucket. Keys before startAt will be skipped. (This is the S3
|
||||
// 'marker' value). If the length of the returned items is equal to
|
||||
// maxKeys, there is no indication whether or not the returned list is truncated.
|
||||
func (c *Client) GetBucket(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error) {
|
||||
var urlReq string
|
||||
var buffer bytes.Buffer
|
||||
|
||||
if maxKeys <= 0 {
|
||||
return nil, nil, errors.New("negative maxKeys are invalid")
|
||||
}
|
||||
|
||||
marker := startAt
|
||||
for len(items) < maxKeys {
|
||||
fetchN := maxKeys - len(items)
|
||||
if fetchN > MaxKeys {
|
||||
fetchN = MaxKeys
|
||||
}
|
||||
var bres listBucketResults
|
||||
buffer.WriteString(fmt.Sprintf("%s?max-keys=%d", c.bucketURL(bucket), fetchN))
|
||||
switch true {
|
||||
case marker != "":
|
||||
buffer.WriteString(fmt.Sprintf("&marker=%s", url.QueryEscape(marker)))
|
||||
fallthrough
|
||||
case prefix != "":
|
||||
buffer.WriteString(fmt.Sprintf("&prefix=%s", url.QueryEscape(prefix)))
|
||||
fallthrough
|
||||
case delimiter != "":
|
||||
buffer.WriteString(fmt.Sprintf("&delimiter=%s", url.QueryEscape(delimiter)))
|
||||
}
|
||||
|
||||
urlReq = buffer.String()
|
||||
// Try the enumerate three times, since Amazon likes to close
|
||||
// https connections a lot, and Go sucks at dealing with it:
|
||||
// https://code.google.com/p/go/issues/detail?id=3514
|
||||
const maxTries = 5
|
||||
for try := 1; try <= maxTries; try++ {
|
||||
time.Sleep(time.Duration(try-1) * 100 * time.Millisecond)
|
||||
req := newReq(urlReq)
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
if try < maxTries {
|
||||
continue
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
if res.StatusCode < 500 {
|
||||
body, _ := ioutil.ReadAll(io.LimitReader(res.Body, 1<<20))
|
||||
aerr := &Error{
|
||||
Op: "ListBucket",
|
||||
Code: res.StatusCode,
|
||||
Body: body,
|
||||
Header: res.Header,
|
||||
}
|
||||
aerr.parseXML()
|
||||
res.Body.Close()
|
||||
return nil, nil, aerr
|
||||
}
|
||||
} else {
|
||||
bres = listBucketResults{}
|
||||
var logbuf bytes.Buffer
|
||||
err = xml.NewDecoder(io.TeeReader(res.Body, &logbuf)).Decode(&bres)
|
||||
if err != nil {
|
||||
log.Printf("Error parsing s3 XML response: %v for %q", err, logbuf.Bytes())
|
||||
} else if bres.MaxKeys != fetchN || bres.Name != bucket || bres.Marker != marker {
|
||||
err = fmt.Errorf("Unexpected parse from server: %#v from: %s", bres, logbuf.Bytes())
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
res.Body.Close()
|
||||
if err != nil {
|
||||
if try < maxTries-1 {
|
||||
continue
|
||||
}
|
||||
log.Print(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
break
|
||||
}
|
||||
for _, it := range bres.Contents {
|
||||
if it.Key == marker && it.Key != startAt {
|
||||
// Skip first dup on pages 2 and higher.
|
||||
continue
|
||||
}
|
||||
if it.Key < startAt {
|
||||
return nil, nil, fmt.Errorf("Unexpected response from Amazon: item key %q but wanted greater than %q", it.Key, startAt)
|
||||
}
|
||||
items = append(items, it)
|
||||
marker = it.Key
|
||||
}
|
||||
|
||||
for _, pre := range bres.CommonPrefixes {
|
||||
if pre.Prefix != "" {
|
||||
prefixes = append(prefixes, pre)
|
||||
}
|
||||
}
|
||||
|
||||
if !bres.IsTruncated {
|
||||
break
|
||||
}
|
||||
|
||||
if len(items) == 0 {
|
||||
return nil, nil, errors.New("No items replied")
|
||||
}
|
||||
}
|
||||
return items, prefixes, nil
|
||||
}
|
||||
|
||||
// Get - download a requested object from a given bucket
|
||||
func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch res.StatusCode {
|
||||
case http.StatusOK:
|
||||
return res.Body, res.ContentLength, nil
|
||||
case http.StatusNotFound:
|
||||
res.Body.Close()
|
||||
return nil, 0, os.ErrNotExist
|
||||
default:
|
||||
res.Body.Close()
|
||||
return nil, 0, fmt.Errorf("Amazon HTTP error on GET: %d", res.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// GetPartial fetches part of the s3 key object in bucket.
|
||||
// If length is negative, the rest of the object is returned.
|
||||
// The caller must close rc.
|
||||
func (c *Client) GetPartial(bucket, key string, offset, length int64) (rc io.ReadCloser, err error) {
|
||||
if offset < 0 {
|
||||
return nil, errors.New("invalid negative length")
|
||||
}
|
||||
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
if length >= 0 {
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
|
||||
} else {
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
|
||||
}
|
||||
c.Auth.signRequest(req)
|
||||
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch res.StatusCode {
|
||||
case http.StatusOK, http.StatusPartialContent:
|
||||
return res.Body, nil
|
||||
case http.StatusNotFound:
|
||||
res.Body.Close()
|
||||
return nil, os.ErrNotExist
|
||||
default:
|
||||
res.Body.Close()
|
||||
return nil, fmt.Errorf("Amazon HTTP error on GET: %d", res.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
/* Not supporting Delete's
|
||||
func (c *Client) Delete(bucket, key string) error {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
req.Method = "DELETE"
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusNoContent ||
|
||||
res.StatusCode == http.StatusOK {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Amazon HTTP error on DELETE: %d", res.StatusCode)
|
||||
}
|
||||
*/
|
||||
|
||||
// NewClient - get new client
|
||||
func NewClient(auth *Auth) (client *Client) {
|
||||
client = &Client{auth, http.DefaultTransport}
|
||||
@@ -541,54 +208,3 @@ func IsValidBucket(bucket string) bool {
|
||||
match, _ := regexp.MatchString("^[a-zA-Z][a-zA-Z0-9\\-]+[a-zA-Z0-9]$", bucket)
|
||||
return match
|
||||
}
|
||||
|
||||
// Error is the type returned by some API operations.
|
||||
type Error struct {
|
||||
Op string
|
||||
Code int // HTTP status code
|
||||
Body []byte // response body
|
||||
Header http.Header // response headers
|
||||
|
||||
// UsedEndpoint and AmazonCode are the XML response's Endpoint and
|
||||
// Code fields, respectively.
|
||||
UseEndpoint string // if a temporary redirect (wrong endpoint)
|
||||
AmazonCode string
|
||||
}
|
||||
|
||||
func (e *Error) Error() string {
|
||||
if bytes.Contains(e.Body, []byte("<Error>")) {
|
||||
return fmt.Sprintf("s3.%s: status %d: %s", e.Op, e.Code, e.Body)
|
||||
}
|
||||
return fmt.Sprintf("s3.%s: status %d", e.Op, e.Code)
|
||||
}
|
||||
|
||||
func (e *Error) parseXML() {
|
||||
var xe xmlError
|
||||
_ = xml.NewDecoder(bytes.NewReader(e.Body)).Decode(&xe)
|
||||
e.AmazonCode = xe.Code
|
||||
if xe.Code == "TemporaryRedirect" {
|
||||
e.UseEndpoint = xe.Endpoint
|
||||
}
|
||||
if xe.Code == "SignatureDoesNotMatch" {
|
||||
want, _ := hex.DecodeString(strings.Replace(xe.StringToSignBytes, " ", "", -1))
|
||||
log.Printf("S3 SignatureDoesNotMatch. StringToSign should be %d bytes: %q (%x)", len(want), want, want)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// xmlError is the Error response from Amazon.
|
||||
type xmlError struct {
|
||||
XMLName xml.Name `xml:"Error"`
|
||||
Code string
|
||||
Message string
|
||||
RequestID string
|
||||
Bucket string
|
||||
Endpoint string
|
||||
StringToSignBytes string
|
||||
}
|
||||
|
||||
// xmlLocationConstraint is the LocationConstraint returned from BucketLocation.
|
||||
type xmlLocationConstraint struct {
|
||||
XMLName xml.Name `xml:"LocationConstraint"`
|
||||
Location string `xml:",chardata"`
|
||||
}
|
||||
|
||||
56
pkg/s3/error.go
Normal file
56
pkg/s3/error.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// Error is the type returned by some API operations.
|
||||
type Error struct {
|
||||
Op string
|
||||
Code int // HTTP status code
|
||||
Body []byte // response body
|
||||
Header http.Header // response headers
|
||||
|
||||
// UsedEndpoint and AmazonCode are the XML response's Endpoint and
|
||||
// Code fields, respectively.
|
||||
UseEndpoint string // if a temporary redirect (wrong endpoint)
|
||||
AmazonCode string
|
||||
}
|
||||
|
||||
// xmlError is the Error response from Amazon.
|
||||
type xmlError struct {
|
||||
XMLName xml.Name `xml:"Error"`
|
||||
Code string
|
||||
Message string
|
||||
RequestID string
|
||||
Bucket string
|
||||
Endpoint string
|
||||
StringToSignBytes string
|
||||
}
|
||||
|
||||
func (e *Error) Error() string {
|
||||
if bytes.Contains(e.Body, []byte("<Error>")) {
|
||||
return fmt.Sprintf("s3.%s: status %d: %s", e.Op, e.Code, e.Body)
|
||||
}
|
||||
return fmt.Sprintf("s3.%s: status %d", e.Op, e.Code)
|
||||
}
|
||||
|
||||
func (e *Error) parseXML() {
|
||||
var xe xmlError
|
||||
_ = xml.NewDecoder(bytes.NewReader(e.Body)).Decode(&xe)
|
||||
e.AmazonCode = xe.Code
|
||||
if xe.Code == "TemporaryRedirect" {
|
||||
e.UseEndpoint = xe.Endpoint
|
||||
}
|
||||
if xe.Code == "SignatureDoesNotMatch" {
|
||||
want, _ := hex.DecodeString(strings.Replace(xe.StringToSignBytes, " ", "", -1))
|
||||
fmt.Printf("S3 SignatureDoesNotMatch. StringToSign should be %d bytes: %q (%x)", len(want), want, want)
|
||||
}
|
||||
|
||||
}
|
||||
141
pkg/s3/object.go
Normal file
141
pkg/s3/object.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
/// Object API operations
|
||||
|
||||
// Put - upload new object to bucket
|
||||
func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
req.Method = "PUT"
|
||||
req.ContentLength = size
|
||||
|
||||
h := md5.New()
|
||||
// Memory where data is present
|
||||
sink := new(bytes.Buffer)
|
||||
mw := io.MultiWriter(h, sink)
|
||||
written, err := io.Copy(mw, contents)
|
||||
if written != size {
|
||||
return fmt.Errorf("Data read mismatch")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Body = ioutil.NopCloser(sink)
|
||||
b64 := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||
req.Header.Set("Content-MD5", b64)
|
||||
c.Auth.signRequest(req)
|
||||
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
// res.Write(os.Stderr)
|
||||
return fmt.Errorf("Got response code %d from s3", res.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat - returns 0, "", os.ErrNotExist if not on S3
|
||||
func (c *Client) Stat(key, bucket string) (size int64, date time.Time, reterr error) {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
req.Method = "HEAD"
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if res != nil && res.Body != nil {
|
||||
defer res.Body.Close()
|
||||
}
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound:
|
||||
return 0, date, os.ErrNotExist
|
||||
case http.StatusOK:
|
||||
size, err = strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
if dateStr := res.Header.Get("Last-Modified"); dateStr != "" {
|
||||
// AWS S3 uses RFC1123 standard for Date in HTTP header, unlike XML content
|
||||
date, err := time.Parse(time.RFC1123, dateStr)
|
||||
if err != nil {
|
||||
return 0, date, err
|
||||
}
|
||||
return size, date, nil
|
||||
}
|
||||
default:
|
||||
return 0, date, fmt.Errorf("s3: Unexpected status code %d statting object %v", res.StatusCode, key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Get - download a requested object from a given bucket
|
||||
func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) {
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
c.Auth.signRequest(req)
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch res.StatusCode {
|
||||
case http.StatusOK:
|
||||
return res.Body, res.ContentLength, nil
|
||||
case http.StatusNotFound:
|
||||
res.Body.Close()
|
||||
return nil, 0, os.ErrNotExist
|
||||
default:
|
||||
res.Body.Close()
|
||||
return nil, 0, fmt.Errorf("Amazon HTTP error on GET: %d", res.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// GetPartial fetches part of the s3 key object in bucket.
|
||||
// If length is negative, the rest of the object is returned.
|
||||
// The caller must close rc.
|
||||
func (c *Client) GetPartial(bucket, key string, offset, length int64) (rc io.ReadCloser, err error) {
|
||||
if offset < 0 {
|
||||
return nil, fmt.Errorf("invalid negative length")
|
||||
}
|
||||
|
||||
req := newReq(c.keyURL(bucket, key))
|
||||
if length >= 0 {
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
|
||||
} else {
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
|
||||
}
|
||||
c.Auth.signRequest(req)
|
||||
|
||||
res, err := c.transport().RoundTrip(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch res.StatusCode {
|
||||
case http.StatusOK, http.StatusPartialContent:
|
||||
return res.Body, nil
|
||||
case http.StatusNotFound:
|
||||
res.Body.Close()
|
||||
return nil, os.ErrNotExist
|
||||
default:
|
||||
res.Body.Close()
|
||||
return nil, fmt.Errorf("Amazon HTTP error on GET: %d", res.StatusCode)
|
||||
}
|
||||
}
|
||||
11
s3-errors.go
11
s3-errors.go
@@ -18,14 +18,13 @@ package main
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
// fs
|
||||
var errFspath = errors.New("Arguments missing <S3Path> or <LocalPath>")
|
||||
var errFskey = errors.New("Key is needed to get the file")
|
||||
errFskey = errors.New("Key is needed to get the file")
|
||||
|
||||
// uri
|
||||
var errInvalidScheme = errors.New("Invalid URI scheme only s3:// supported")
|
||||
errInvalidScheme = errors.New("Invalid URI scheme only s3:// supported")
|
||||
|
||||
// common
|
||||
var errMissingaccess = errors.New("Partial credentials found in the env, missing : AWS_ACCESS_KEY_ID")
|
||||
var errMissingsecret = errors.New("Partial credentials found in the env, missing : AWS_SECRET_ACCESS_KEY")
|
||||
var errInvalidbucket = errors.New("Invalid bucket name")
|
||||
errInvalidbucket = errors.New("Invalid bucket name")
|
||||
)
|
||||
|
||||
@@ -85,6 +85,11 @@ func doFsCopy(c *cli.Context) {
|
||||
var objectReader io.ReadCloser
|
||||
var objectSize int64
|
||||
|
||||
// Send HEAD request to validate if file exists.
|
||||
if _, _, err := s3c.Stat(fsoptions.key, fsoptions.bucket); err != nil {
|
||||
fatal(err.Error())
|
||||
}
|
||||
|
||||
objectReader, objectSize, err = s3c.Get(fsoptions.bucket, fsoptions.key)
|
||||
if err != nil {
|
||||
fatal(err.Error())
|
||||
|
||||
21
s3-fs-ls.go
21
s3-fs-ls.go
@@ -19,7 +19,6 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cheggaaa/pb"
|
||||
@@ -27,16 +26,20 @@ import (
|
||||
"github.com/minio-io/mc/pkg/s3"
|
||||
)
|
||||
|
||||
const (
|
||||
humanReadableFormat = "2006-01-02 15:04:05"
|
||||
)
|
||||
|
||||
func printBuckets(v []*s3.Bucket) {
|
||||
for _, b := range v {
|
||||
msg := fmt.Sprintf("%v %13s %s", b.CreationDate.Local(), "", b.Name)
|
||||
msg := fmt.Sprintf("%v %9s %s", b.CreationDate.Format(humanReadableFormat), "", b.Name)
|
||||
info(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func printObjects(v []*s3.Item) {
|
||||
if len(v) > 0 {
|
||||
sort.Sort(s3.BySize(v))
|
||||
// Items are already sorted
|
||||
for _, b := range v {
|
||||
printObject(b.LastModified.Time, b.Size, b.Key)
|
||||
}
|
||||
@@ -44,7 +47,7 @@ func printObjects(v []*s3.Item) {
|
||||
}
|
||||
|
||||
func printObject(date time.Time, v int64, key string) {
|
||||
msg := fmt.Sprintf("%v %13s %s", date.Local(), pb.FormatBytes(v), key)
|
||||
msg := fmt.Sprintf("%v %9s %s", date.Format(humanReadableFormat), pb.FormatBytes(v), key)
|
||||
info(msg)
|
||||
}
|
||||
|
||||
@@ -66,13 +69,13 @@ func doFsList(c *cli.Context) {
|
||||
}
|
||||
switch true {
|
||||
case fsoptions.bucket == "": // List all buckets
|
||||
buckets, err := s3c.Buckets()
|
||||
buckets, err := s3c.ListBuckets()
|
||||
if err != nil {
|
||||
fatal(err.Error())
|
||||
}
|
||||
printBuckets(buckets)
|
||||
case fsoptions.key == "": // List the objects in a bucket
|
||||
items, _, err = s3c.GetBucket(fsoptions.bucket, "", "", "", s3.MaxKeys)
|
||||
case fsoptions.key == "": // List objects in a bucket
|
||||
items, _, err = s3c.ListObjects(fsoptions.bucket, "", "", "", s3.MaxKeys)
|
||||
if err != nil {
|
||||
fatal(err.Error())
|
||||
}
|
||||
@@ -83,11 +86,11 @@ func doFsList(c *cli.Context) {
|
||||
|
||||
size, date, err = s3c.Stat(fsoptions.key, fsoptions.bucket)
|
||||
switch err {
|
||||
case nil: // List a single object. Exact key prefix match
|
||||
case nil: // List a single object. Exact key
|
||||
printObject(date, size, fsoptions.key)
|
||||
case os.ErrNotExist:
|
||||
// List all objects matching the key prefix
|
||||
items, _, err = s3c.GetBucket(fsoptions.bucket, "", fsoptions.key, "", s3.MaxKeys)
|
||||
items, _, err = s3c.ListObjects(fsoptions.bucket, "", fsoptions.key, "", s3.MaxKeys)
|
||||
if err != nil {
|
||||
fatal(err.Error())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user