mirror of
https://github.com/minio/mc.git
synced 2025-11-13 12:22:45 +03:00
Donut command
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
"github.com/minio-io/mc/pkg/s3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -24,8 +23,13 @@ const (
|
|||||||
mcConfigFilename = "config.json"
|
mcConfigFilename = "config.json"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type auth struct {
|
||||||
|
AccessKeyID string
|
||||||
|
SecretAccessKey string
|
||||||
|
}
|
||||||
|
|
||||||
type hostConfig struct {
|
type hostConfig struct {
|
||||||
Auth s3.Auth
|
Auth auth
|
||||||
}
|
}
|
||||||
|
|
||||||
type mcConfig struct {
|
type mcConfig struct {
|
||||||
@@ -190,13 +194,25 @@ func parseConfigInput(c *cli.Context) (config *mcConfig, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
alias := strings.Fields(c.String("alias"))
|
alias := strings.Fields(c.String("alias"))
|
||||||
if len(alias) == 0 {
|
switch true {
|
||||||
// valid case throw help
|
case len(alias) == 0:
|
||||||
return nil, nil
|
config = &mcConfig{
|
||||||
}
|
Version: currentConfigVersion,
|
||||||
if len(alias) != 2 {
|
DefaultHost: "https://s3.amazonaws.com",
|
||||||
return nil, errors.New("invalid number of arguments for --alias, requires exact 2")
|
Hosts: map[string]hostConfig{
|
||||||
|
"http*://s3*.amazonaws.com": {
|
||||||
|
Auth: auth{
|
||||||
|
AccessKeyID: accessKeyID,
|
||||||
|
SecretAccessKey: secretAccesskey,
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
Aliases: map[string]string{
|
||||||
|
"s3": "https://s3.amazonaws.com/",
|
||||||
|
"localhost": "http://localhost:9000/",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
return config, nil
|
||||||
|
case len(alias) == 2:
|
||||||
aliasName := alias[0]
|
aliasName := alias[0]
|
||||||
url := alias[1]
|
url := alias[1]
|
||||||
if strings.HasPrefix(aliasName, "http") {
|
if strings.HasPrefix(aliasName, "http") {
|
||||||
@@ -205,13 +221,12 @@ func parseConfigInput(c *cli.Context) (config *mcConfig, err error) {
|
|||||||
if !strings.HasPrefix(url, "http") {
|
if !strings.HasPrefix(url, "http") {
|
||||||
return nil, errors.New("invalid url type only supports http{s}")
|
return nil, errors.New("invalid url type only supports http{s}")
|
||||||
}
|
}
|
||||||
|
|
||||||
config = &mcConfig{
|
config = &mcConfig{
|
||||||
Version: currentConfigVersion,
|
Version: currentConfigVersion,
|
||||||
DefaultHost: "https://s3.amazonaws.com",
|
DefaultHost: "https://s3.amazonaws.com",
|
||||||
Hosts: map[string]hostConfig{
|
Hosts: map[string]hostConfig{
|
||||||
"http*://s3*.amazonaws.com": {
|
"http*://s3*.amazonaws.com": {
|
||||||
Auth: s3.Auth{
|
Auth: auth{
|
||||||
AccessKeyID: accessKeyID,
|
AccessKeyID: accessKeyID,
|
||||||
SecretAccessKey: secretAccesskey,
|
SecretAccessKey: secretAccesskey,
|
||||||
}},
|
}},
|
||||||
@@ -222,8 +237,10 @@ func parseConfigInput(c *cli.Context) (config *mcConfig, err error) {
|
|||||||
aliasName: url,
|
aliasName: url,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
//config.Aliases[aliasName] = url
|
|
||||||
return config, nil
|
return config, nil
|
||||||
|
default:
|
||||||
|
return nil, errors.New("invalid number of arguments for --alias, requires exact 2")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getHostConfig retrieves host specific configuration such as access keys, certs.
|
// getHostConfig retrieves host specific configuration such as access keys, certs.
|
||||||
|
|||||||
70
cmd-cp.go
70
cmd-cp.go
@@ -24,7 +24,6 @@ import (
|
|||||||
|
|
||||||
"github.com/cheggaaa/pb"
|
"github.com/cheggaaa/pb"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
"github.com/minio-io/mc/pkg/s3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Different modes of cp operation
|
// Different modes of cp operation
|
||||||
@@ -62,7 +61,7 @@ func getMode(recursive bool, args *cmdArgs) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// First mode <Object> <S3Object> or <Object> <S3Bucket>
|
// First mode <Object> <S3Object> or <Object> <S3Bucket>
|
||||||
func firstMode(s3c *s3.Client, args *cmdArgs) error {
|
func firstMode(c *cli.Context, args *cmdArgs) error {
|
||||||
if args.source.key == "" {
|
if args.source.key == "" {
|
||||||
return errors.New("invalid args")
|
return errors.New("invalid args")
|
||||||
}
|
}
|
||||||
@@ -80,13 +79,14 @@ func firstMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// http://<bucket>.<hostname> is specified without key
|
// http://<bucket>.<hostname> is specified without key
|
||||||
if args.destination.key == "" {
|
if args.destination.key == "" {
|
||||||
args.destination.key = args.source.key
|
args.destination.key = args.source.key
|
||||||
}
|
}
|
||||||
s3c.Host = args.destination.host
|
s3c, err := getNewClient(c, args.destination.url)
|
||||||
s3c.Scheme = args.destination.scheme
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
err = s3c.Put(args.destination.bucket, args.destination.key, size, source)
|
err = s3c.Put(args.destination.bucket, args.destination.key, size, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -98,13 +98,17 @@ func firstMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Second mode <S3Object> <Object> or <S3Object> .
|
// Second mode <S3Object> <Object> or <S3Object> .
|
||||||
func secondMode(s3c *s3.Client, args *cmdArgs) error {
|
func secondMode(c *cli.Context, args *cmdArgs) error {
|
||||||
var objectReader io.ReadCloser
|
var objectReader io.ReadCloser
|
||||||
var objectSize, downloadedSize int64
|
var objectSize, downloadedSize int64
|
||||||
var destination *os.File
|
var destination *os.File
|
||||||
var err error
|
var err error
|
||||||
var st os.FileInfo
|
var st os.FileInfo
|
||||||
|
|
||||||
|
s3c, err := getNewClient(c, args.source.url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// Send HEAD request to validate if file exists.
|
// Send HEAD request to validate if file exists.
|
||||||
objectSize, _, err = s3c.Stat(args.source.bucket, args.source.key)
|
objectSize, _, err = s3c.Stat(args.source.bucket, args.source.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -181,15 +185,17 @@ func secondMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// <S3Object> <S3Object> or <S3Object> <S3Bucket>
|
// <S3Object> <S3Object> or <S3Object> <S3Bucket>
|
||||||
func thirdMode(s3c *s3.Client, args *cmdArgs) error {
|
func thirdMode(c *cli.Context, args *cmdArgs) error {
|
||||||
var objectReader io.ReadCloser
|
var objectReader io.ReadCloser
|
||||||
var objectSize int64
|
var objectSize int64
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
s3c.Host = args.source.host
|
s3cSource, err := getNewClient(c, args.source.url)
|
||||||
s3c.Scheme = args.source.scheme
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// Send HEAD request to validate if file exists.
|
// Send HEAD request to validate if file exists.
|
||||||
objectSize, _, err = s3c.Stat(args.source.bucket, args.source.key)
|
objectSize, _, err = s3cSource.Stat(args.source.bucket, args.source.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -199,20 +205,18 @@ func thirdMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if the object already exists
|
// Check if the object already exists
|
||||||
s3c.Host = args.destination.host
|
s3cDest, err := getNewClient(c, args.destination.url)
|
||||||
s3c.Scheme = args.destination.scheme
|
|
||||||
_, _, err = s3c.Stat(args.destination.bucket, args.destination.key)
|
|
||||||
switch os.IsNotExist(err) {
|
|
||||||
case true:
|
|
||||||
s3c.Host = args.source.host
|
|
||||||
s3c.Scheme = args.source.scheme
|
|
||||||
objectReader, _, err = s3c.Get(args.source.bucket, args.source.key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s3c.Host = args.destination.host
|
_, _, err = s3cDest.Stat(args.destination.bucket, args.destination.key)
|
||||||
s3c.Scheme = args.destination.scheme
|
switch os.IsNotExist(err) {
|
||||||
err = s3c.Put(args.destination.bucket, args.destination.key, objectSize, objectReader)
|
case true:
|
||||||
|
objectReader, _, err = s3cSource.Get(args.source.bucket, args.source.key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = s3cDest.Put(args.destination.bucket, args.destination.key, objectSize, objectReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -220,13 +224,13 @@ func thirdMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
return errors.New("Ranges not supported")
|
return errors.New("Ranges not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := fmt.Sprintf("http://%s/%s uploaded -- to bucket:(http://%s/%s)", args.source.bucket, args.source.key,
|
msg := fmt.Sprintf("%s/%s/%s uploaded -- to bucket:(%s/%s/%s)", args.source.host, args.source.bucket, args.source.key,
|
||||||
args.destination.bucket, args.destination.key)
|
args.destination.host, args.destination.bucket, args.destination.key)
|
||||||
info(msg)
|
info(msg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fourthMode(s3c *s3.Client, args *cmdArgs) error {
|
func fourthMode(c *cli.Context, args *cmdArgs) error {
|
||||||
if args.source.bucket == "" {
|
if args.source.bucket == "" {
|
||||||
_, err := os.Stat(args.source.key)
|
_, err := os.Stat(args.source.key)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
@@ -244,48 +248,40 @@ func fourthMode(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
os.MkdirAll(args.destination.key, 0755)
|
os.MkdirAll(args.destination.key, 0755)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return doRecursiveCp(s3c, args)
|
return doRecursiveCP(c, args)
|
||||||
}
|
}
|
||||||
|
|
||||||
// doCopyCmd copies objects into and from a bucket or between buckets
|
// doCopyCmd copies objects into and from a bucket or between buckets
|
||||||
func doCopyCmd(c *cli.Context) {
|
func doCopyCmd(c *cli.Context) {
|
||||||
var args *cmdArgs
|
var args *cmdArgs
|
||||||
var err error
|
var err error
|
||||||
var s3c *s3.Client
|
|
||||||
|
|
||||||
args, err = parseArgs(c)
|
args, err = parseArgs(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
s3c, err = getNewClient(c)
|
|
||||||
if err != nil {
|
|
||||||
fatal(err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(c.Args()) != 2 {
|
if len(c.Args()) != 2 {
|
||||||
fatal("Invalid number of args")
|
fatal("Invalid number of args")
|
||||||
}
|
}
|
||||||
s3c.Host = args.source.host
|
|
||||||
s3c.Scheme = args.source.scheme
|
|
||||||
switch getMode(c.Bool("recursive"), args) {
|
switch getMode(c.Bool("recursive"), args) {
|
||||||
case first:
|
case first:
|
||||||
err := firstMode(s3c, args)
|
err := firstMode(c, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
case second:
|
case second:
|
||||||
err := secondMode(s3c, args)
|
err := secondMode(c, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
case third:
|
case third:
|
||||||
err := thirdMode(s3c, args)
|
err := thirdMode(c, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
case fourth:
|
case fourth:
|
||||||
err := fourthMode(s3c, args)
|
err := fourthMode(c, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
12
cmd-ls.go
12
cmd-ls.go
@@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
"github.com/cheggaaa/pb"
|
"github.com/cheggaaa/pb"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
"github.com/minio-io/mc/pkg/s3"
|
"github.com/minio-io/mc/pkg/s3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -31,7 +32,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// printBuckets lists buckets and its meta-dat
|
// printBuckets lists buckets and its meta-dat
|
||||||
func printBuckets(v []*s3.Bucket) {
|
func printBuckets(v []*client.Bucket) {
|
||||||
for _, b := range v {
|
for _, b := range v {
|
||||||
msg := fmt.Sprintf("%23s %13s %s", b.CreationDate.Local().Format(printDate), "", b.Name)
|
msg := fmt.Sprintf("%23s %13s %s", b.CreationDate.Local().Format(printDate), "", b.Name)
|
||||||
info(msg)
|
info(msg)
|
||||||
@@ -39,7 +40,7 @@ func printBuckets(v []*s3.Bucket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// printObjects prints a meta-data of a list of objects
|
// printObjects prints a meta-data of a list of objects
|
||||||
func printObjects(v []*s3.Item) {
|
func printObjects(v []*client.Item) {
|
||||||
if len(v) > 0 {
|
if len(v) > 0 {
|
||||||
// Items are already sorted
|
// Items are already sorted
|
||||||
for _, b := range v {
|
for _, b := range v {
|
||||||
@@ -56,20 +57,17 @@ func printObject(date time.Time, v int64, key string) {
|
|||||||
|
|
||||||
// doListCmd lists objects inside a bucket
|
// doListCmd lists objects inside a bucket
|
||||||
func doListCmd(c *cli.Context) {
|
func doListCmd(c *cli.Context) {
|
||||||
var items []*s3.Item
|
var items []*client.Item
|
||||||
|
|
||||||
args, err := parseArgs(c)
|
args, err := parseArgs(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
s3c, err := getNewClient(c)
|
s3c, err := getNewClient(c, args.source.url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
s3c.Host = args.source.host
|
|
||||||
s3c.Scheme = args.source.scheme
|
|
||||||
|
|
||||||
switch true {
|
switch true {
|
||||||
case args.source.bucket == "": // List all buckets
|
case args.source.bucket == "": // List all buckets
|
||||||
buckets, err := s3c.ListBuckets()
|
buckets, err := s3c.ListBuckets()
|
||||||
|
|||||||
@@ -28,13 +28,10 @@ func doMakeBucketCmd(c *cli.Context) {
|
|||||||
fatal(err.Error())
|
fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
s3c, err := getNewClient(c)
|
s3c, err := getNewClient(c, args.source.url)
|
||||||
|
|
||||||
if !s3.IsValidBucket(args.source.bucket) {
|
if !s3.IsValidBucket(args.source.bucket) {
|
||||||
fatal(errInvalidbucket.Error())
|
fatal(errInvalidbucket.Error())
|
||||||
}
|
}
|
||||||
s3c.Host = args.source.host
|
|
||||||
s3c.Scheme = args.source.scheme
|
|
||||||
|
|
||||||
err = s3c.PutBucket(args.source.bucket)
|
err = s3c.PutBucket(args.source.bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -17,6 +17,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -91,6 +93,7 @@ type object struct {
|
|||||||
key string
|
key string
|
||||||
host string
|
host string
|
||||||
scheme string
|
scheme string
|
||||||
|
url *url.URL
|
||||||
}
|
}
|
||||||
|
|
||||||
type cmdArgs struct {
|
type cmdArgs struct {
|
||||||
|
|||||||
@@ -24,11 +24,13 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
"github.com/minio-io/mc/pkg/s3"
|
"github.com/minio-io/mc/pkg/s3"
|
||||||
)
|
)
|
||||||
|
|
||||||
type walk struct {
|
type walk struct {
|
||||||
s3 *s3.Client
|
s3 client.Client
|
||||||
args *cmdArgs
|
args *cmdArgs
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,8 +51,8 @@ func (w *walk) putWalk(p string, i os.FileInfo, err error) error {
|
|||||||
var size int64
|
var size int64
|
||||||
size, _, err = w.s3.Stat(bucketname, key)
|
size, _, err = w.s3.Stat(bucketname, key)
|
||||||
if os.IsExist(err) || size != 0 {
|
if os.IsExist(err) || size != 0 {
|
||||||
msg := fmt.Sprintf("%s is already uploaded -- to bucket:%s://%s/%s/%s",
|
msg := fmt.Sprintf("%s is already uploaded -- to bucket:%s/%s/%s",
|
||||||
key, w.s3.Scheme, w.s3.Host, bucketname, key)
|
key, w.args.destination.host, bucketname, key)
|
||||||
info(msg)
|
info(msg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -58,8 +60,8 @@ func (w *walk) putWalk(p string, i os.FileInfo, err error) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
msg := fmt.Sprintf("%s uploaded -- to bucket:%s://%s/%s/%s",
|
msg := fmt.Sprintf("%s uploaded -- to bucket:%s/%s/%s",
|
||||||
key, w.s3.Scheme, w.s3.Host, bucketname, key)
|
key, w.args.destination.host, bucketname, key)
|
||||||
info(msg)
|
info(msg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -79,7 +81,7 @@ func isValidBucketName(p string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// isBucketExists checks if a bucket exists
|
// isBucketExists checks if a bucket exists
|
||||||
func isBucketExists(name string, v []*s3.Bucket) bool {
|
func isBucketExists(name string, v []*client.Bucket) bool {
|
||||||
for _, b := range v {
|
for _, b := range v {
|
||||||
if name == b.Name {
|
if name == b.Name {
|
||||||
return true
|
return true
|
||||||
@@ -89,10 +91,10 @@ func isBucketExists(name string, v []*s3.Bucket) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// doRecursiveCP recursively copies objects from source to destination
|
// doRecursiveCP recursively copies objects from source to destination
|
||||||
func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error {
|
func doRecursiveCP(c *cli.Context, args *cmdArgs) error {
|
||||||
var err error
|
var err error
|
||||||
var st os.FileInfo
|
var st os.FileInfo
|
||||||
var buckets []*s3.Bucket
|
var buckets []*client.Bucket
|
||||||
|
|
||||||
switch true {
|
switch true {
|
||||||
case args.source.bucket == "":
|
case args.source.bucket == "":
|
||||||
@@ -107,9 +109,10 @@ func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
if !st.IsDir() {
|
if !st.IsDir() {
|
||||||
return errors.New("Should be a directory")
|
return errors.New("Should be a directory")
|
||||||
}
|
}
|
||||||
|
s3c, err := getNewClient(c, args.destination.url)
|
||||||
s3c.Host = args.destination.host
|
if err != nil {
|
||||||
s3c.Scheme = args.destination.scheme
|
return err
|
||||||
|
}
|
||||||
p := &walk{s3c, args}
|
p := &walk{s3c, args}
|
||||||
buckets, err = s3c.ListBuckets()
|
buckets, err = s3c.ListBuckets()
|
||||||
if !isBucketExists(args.destination.bucket, buckets) {
|
if !isBucketExists(args.destination.bucket, buckets) {
|
||||||
@@ -124,19 +127,23 @@ func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case args.destination.bucket == "":
|
case args.destination.bucket == "":
|
||||||
|
s3c, err := getNewClient(c, args.source.url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
items, _, err := s3c.ListObjects(args.source.bucket, "", "", "", s3.MaxKeys)
|
items, _, err := s3c.ListObjects(args.source.bucket, "", "", "", s3.MaxKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
root := args.destination.key
|
root := args.destination.key
|
||||||
writeObjects := func(v []*s3.Item) error {
|
writeObjects := func(v []*client.Item) error {
|
||||||
if len(v) > 0 {
|
if len(v) > 0 {
|
||||||
// Items are already sorted
|
// Items are already sorted
|
||||||
for _, b := range v {
|
for _, b := range v {
|
||||||
args.source.key = b.Key
|
args.source.key = b.Key
|
||||||
os.MkdirAll(path.Join(root, path.Dir(b.Key)), 0755)
|
os.MkdirAll(path.Join(root, path.Dir(b.Key)), 0755)
|
||||||
args.destination.key = path.Join(root, b.Key)
|
args.destination.key = path.Join(root, b.Key)
|
||||||
err := secondMode(s3c, args)
|
err := secondMode(c, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
1
cmd/donut/.gitignore
vendored
Normal file
1
cmd/donut/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
donut
|
||||||
67
cmd/donut/cmd-cp.go
Normal file
67
cmd/donut/cmd-cp.go
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/minio-io/mc/pkg/donut"
|
||||||
|
)
|
||||||
|
|
||||||
|
func doDonutCp(c *cli.Context) {
|
||||||
|
var e donut.Donut
|
||||||
|
e = donut.NewDriver("testdir")
|
||||||
|
switch len(c.Args()) {
|
||||||
|
case 2:
|
||||||
|
urlArg1, errArg1 := url.Parse(c.Args().Get(0))
|
||||||
|
if errArg1 != nil {
|
||||||
|
panic(errArg1)
|
||||||
|
}
|
||||||
|
urlArg2, errArg2 := url.Parse(c.Args().Get(1))
|
||||||
|
if errArg2 != nil {
|
||||||
|
panic(errArg2)
|
||||||
|
}
|
||||||
|
switch true {
|
||||||
|
case urlArg1.Scheme != "" && urlArg2.Scheme == "":
|
||||||
|
writer, err := os.Create(urlArg2.Path)
|
||||||
|
defer writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if urlArg1.Scheme == "donut" {
|
||||||
|
reader, _, err := e.Get(urlArg1.Host, strings.TrimPrefix(urlArg1.Path, "/"))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
_, err = io.Copy(writer, reader)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case urlArg1.Scheme == "" && urlArg2.Scheme != "":
|
||||||
|
st, stErr := os.Stat(urlArg1.Path)
|
||||||
|
if os.IsNotExist(stErr) {
|
||||||
|
panic(stErr)
|
||||||
|
}
|
||||||
|
if st.IsDir() {
|
||||||
|
panic("is a directory")
|
||||||
|
}
|
||||||
|
reader, err := os.OpenFile(urlArg1.Path, 2, os.ModeAppend)
|
||||||
|
defer reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if urlArg2.Scheme == "donut" {
|
||||||
|
e.PutBucket(urlArg2.Host)
|
||||||
|
writer, err := e.Put(urlArg2.Host, strings.TrimPrefix(urlArg2.Path, "/"))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
io.Copy(writer, reader)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
39
cmd/donut/cmd-mb.go
Normal file
39
cmd/donut/cmd-mb.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
* Minimalist Object Storage, (C) 2014,2015 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/minio-io/mc/pkg/donut"
|
||||||
|
)
|
||||||
|
|
||||||
|
// doMakeBucketCmd creates a new bucket
|
||||||
|
func doMakeBucketCmd(c *cli.Context) {
|
||||||
|
urlArg1, err := url.Parse(c.Args().Get(0))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
var e donut.Donut
|
||||||
|
e = donut.NewDriver("testdir")
|
||||||
|
|
||||||
|
err = e.PutBucket(urlArg1.Path)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
58
cmd/donut/main.go
Normal file
58
cmd/donut/main.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
/*
|
||||||
|
* Minimalist Object Storage, (C) 2014,2015 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/user"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// Check for the environment early on and gracefuly report.
|
||||||
|
_, err := user.Current()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("mc: Unable to obtain user's home directory. \nError: %s\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var cp = cli.Command{
|
||||||
|
Name: "cp",
|
||||||
|
Action: doDonutCp,
|
||||||
|
}
|
||||||
|
|
||||||
|
var mb = cli.Command{
|
||||||
|
Name: "mb",
|
||||||
|
Action: doMakeBucketCmd,
|
||||||
|
}
|
||||||
|
|
||||||
|
var options = []cli.Command{
|
||||||
|
cp,
|
||||||
|
mb,
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
app := cli.NewApp()
|
||||||
|
app.Usage = "Minio Client for S3 Compatible Object Storage"
|
||||||
|
app.Version = "0.1.0"
|
||||||
|
app.Commands = options
|
||||||
|
app.Author = "Minio.io"
|
||||||
|
app.EnableBashCompletion = true
|
||||||
|
app.Run(os.Args)
|
||||||
|
}
|
||||||
15
common.go
15
common.go
@@ -26,6 +26,7 @@ import (
|
|||||||
|
|
||||||
"github.com/cheggaaa/pb"
|
"github.com/cheggaaa/pb"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
"github.com/minio-io/mc/pkg/s3"
|
"github.com/minio-io/mc/pkg/s3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -46,8 +47,7 @@ func startBar(size int64) *pb.ProgressBar {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewClient - get new client
|
// NewClient - get new client
|
||||||
func getNewClient(c *cli.Context) (client *s3.Client, err error) {
|
func getNewClient(c *cli.Context, url *url.URL) (cl client.Client, err error) {
|
||||||
|
|
||||||
config, err := getMcConfig()
|
config, err := getMcConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -58,7 +58,7 @@ func getNewClient(c *cli.Context) (client *s3.Client, err error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var auth s3.Auth
|
var auth client.Auth
|
||||||
auth.AccessKeyID = hostCfg.Auth.AccessKeyID
|
auth.AccessKeyID = hostCfg.Auth.AccessKeyID
|
||||||
auth.SecretAccessKey = hostCfg.Auth.SecretAccessKey
|
auth.SecretAccessKey = hostCfg.Auth.SecretAccessKey
|
||||||
|
|
||||||
@@ -69,12 +69,12 @@ func getNewClient(c *cli.Context) (client *s3.Client, err error) {
|
|||||||
Writer: nil,
|
Writer: nil,
|
||||||
}
|
}
|
||||||
traceTransport := s3.GetNewTraceTransport(trace, http.DefaultTransport)
|
traceTransport := s3.GetNewTraceTransport(trace, http.DefaultTransport)
|
||||||
client = s3.GetNewClient(&auth, traceTransport)
|
cl = s3.GetNewClient(&auth, url, traceTransport)
|
||||||
} else {
|
} else {
|
||||||
client = s3.GetNewClient(&auth, http.DefaultTransport)
|
cl = s3.GetNewClient(&auth, url, http.DefaultTransport)
|
||||||
}
|
}
|
||||||
|
|
||||||
return client, nil
|
return cl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse subcommand options
|
// Parse subcommand options
|
||||||
@@ -95,6 +95,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
args.source.scheme = urlParsed.Scheme
|
args.source.scheme = urlParsed.Scheme
|
||||||
|
args.source.url = urlParsed
|
||||||
if urlParsed.Scheme != "" {
|
if urlParsed.Scheme != "" {
|
||||||
if urlParsed.Host == "" {
|
if urlParsed.Host == "" {
|
||||||
return nil, errHostname
|
return nil, errHostname
|
||||||
@@ -130,6 +131,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) {
|
|||||||
}
|
}
|
||||||
args.source.scheme = urlParsed.Scheme
|
args.source.scheme = urlParsed.Scheme
|
||||||
args.source.host = urlParsed.Host
|
args.source.host = urlParsed.Host
|
||||||
|
args.source.url = urlParsed
|
||||||
urlSplits := strings.Split(urlParsed.Path, "/")
|
urlSplits := strings.Split(urlParsed.Path, "/")
|
||||||
if len(urlSplits) > 1 {
|
if len(urlSplits) > 1 {
|
||||||
args.source.bucket = urlSplits[1]
|
args.source.bucket = urlSplits[1]
|
||||||
@@ -171,6 +173,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) {
|
|||||||
}
|
}
|
||||||
args.destination.host = urlParsed.Host
|
args.destination.host = urlParsed.Host
|
||||||
args.destination.scheme = urlParsed.Scheme
|
args.destination.scheme = urlParsed.Scheme
|
||||||
|
args.destination.url = urlParsed
|
||||||
urlSplits := strings.Split(urlParsed.Path, "/")
|
urlSplits := strings.Split(urlParsed.Path, "/")
|
||||||
if len(urlSplits) > 1 {
|
if len(urlSplits) > 1 {
|
||||||
args.destination.bucket = urlSplits[1]
|
args.destination.bucket = urlSplits[1]
|
||||||
|
|||||||
64
pkg/client/client.go
Normal file
64
pkg/client/client.go
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client - Minio client interface
|
||||||
|
type Client interface {
|
||||||
|
Get(bucket, object string) (body io.ReadCloser, size int64, err error)
|
||||||
|
GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error)
|
||||||
|
Put(bucket, object string, size int64, body io.Reader) error
|
||||||
|
Stat(bucket, object string) (size int64, date time.Time, err error)
|
||||||
|
PutBucket(bucket string) error
|
||||||
|
ListBuckets() ([]*Bucket, error)
|
||||||
|
ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket - carries s3 bucket reply header
|
||||||
|
type Bucket struct {
|
||||||
|
Name string
|
||||||
|
CreationDate XMLTime // 2006-02-03T16:45:09.000Z
|
||||||
|
}
|
||||||
|
|
||||||
|
// Item - object item list
|
||||||
|
type Item struct {
|
||||||
|
Key string
|
||||||
|
LastModified XMLTime
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefix - common prefix
|
||||||
|
type Prefix struct {
|
||||||
|
Prefix string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Meta holds Amazon S3 client credentials and flags.
|
||||||
|
type Meta struct {
|
||||||
|
*Auth // AWS auth credentials
|
||||||
|
Transport http.RoundTripper // or nil for the default behavior
|
||||||
|
|
||||||
|
// Supports URL in following formats
|
||||||
|
// - http://<ipaddress>/<bucketname>/<object>
|
||||||
|
// - http://<bucketname>.<domain>/<object>
|
||||||
|
URL *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
// Auth - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
|
||||||
|
type Auth struct {
|
||||||
|
AccessKeyID string
|
||||||
|
SecretAccessKey string
|
||||||
|
|
||||||
|
// Used for SSL transport layer
|
||||||
|
CertPEM string
|
||||||
|
KeyPEM string
|
||||||
|
}
|
||||||
|
|
||||||
|
// TLSConfig - TLS cert and key configuration
|
||||||
|
type TLSConfig struct {
|
||||||
|
CertPEMBlock []byte
|
||||||
|
KeyPEMBlock []byte
|
||||||
|
}
|
||||||
@@ -1,25 +1,3 @@
|
|||||||
// Original license //
|
|
||||||
// ---------------- //
|
|
||||||
|
|
||||||
/*
|
|
||||||
Copyright 2011 Google Inc.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
// All other modifications and improvements //
|
|
||||||
// ---------------------------------------- //
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Minimalist Object Storage, (C) 2015 Minio, Inc.
|
* Minimalist Object Storage, (C) 2015 Minio, Inc.
|
||||||
*
|
*
|
||||||
@@ -36,7 +14,7 @@ limitations under the License.
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package s3
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@@ -49,24 +27,28 @@ const (
|
|||||||
iso8601Format = "2006-01-02T15:04:05.000Z"
|
iso8601Format = "2006-01-02T15:04:05.000Z"
|
||||||
)
|
)
|
||||||
|
|
||||||
type xmlTime struct {
|
// XMLTime - time wrapper
|
||||||
|
type XMLTime struct {
|
||||||
time.Time
|
time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *xmlTime) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
// UnmarshalXML - unmarshal incoming xml
|
||||||
|
func (c *XMLTime) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||||
var v string
|
var v string
|
||||||
d.DecodeElement(&v, &start)
|
d.DecodeElement(&v, &start)
|
||||||
parse, _ := time.Parse(iso8601Format, v)
|
parse, _ := time.Parse(iso8601Format, v)
|
||||||
*c = xmlTime{parse}
|
*c = XMLTime{parse}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *xmlTime) UnmarshalXMLAttr(attr xml.Attr) error {
|
// UnmarshalXMLAttr - unmarshal specific attr
|
||||||
|
func (c *XMLTime) UnmarshalXMLAttr(attr xml.Attr) error {
|
||||||
t, _ := time.Parse(iso8601Format, attr.Value)
|
t, _ := time.Parse(iso8601Format, attr.Value)
|
||||||
*c = xmlTime{t}
|
*c = XMLTime{t}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *xmlTime) String() string {
|
// String - xml to string
|
||||||
|
func (c *XMLTime) String() string {
|
||||||
return c.Time.Format(iso8601Format)
|
return c.Time.Format(iso8601Format)
|
||||||
}
|
}
|
||||||
14
pkg/donut/bucketdriver.go
Normal file
14
pkg/donut/bucketdriver.go
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
type bucketDriver struct {
|
||||||
|
nodes []string
|
||||||
|
objects map[string][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b bucketDriver) GetNodes() ([]string, error) {
|
||||||
|
var nodes []string
|
||||||
|
for _, node := range b.nodes {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
return nodes, nil
|
||||||
|
}
|
||||||
@@ -1,127 +0,0 @@
|
|||||||
package disk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/minio-io/mc/pkg/utils/split"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Disk struct {
|
|
||||||
*disk
|
|
||||||
}
|
|
||||||
|
|
||||||
type disk struct {
|
|
||||||
root string
|
|
||||||
bucket string
|
|
||||||
object string
|
|
||||||
chunk string
|
|
||||||
file *os.File
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(bucket, object string) *Disk {
|
|
||||||
d := &Disk{&disk{bucket: bucket, object: object}}
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
func openFile(path string, flag int) (fl *os.File, err error) {
|
|
||||||
fl, err = os.OpenFile(path, flag, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return fl, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) Create() error {
|
|
||||||
p := path.Join(d.bucket, d.object, "$"+d.chunk)
|
|
||||||
fl, err := openFile(p, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
d.file = fl
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) Open() error {
|
|
||||||
p := path.Join(d.bucket, d.object, "$"+d.chunk)
|
|
||||||
fl, err := openFile(p, os.O_RDONLY)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
d.file = fl
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) Write(b []byte) (n int, err error) {
|
|
||||||
if d == nil {
|
|
||||||
return 0, os.ErrInvalid
|
|
||||||
}
|
|
||||||
n, e := d.file.Write(b)
|
|
||||||
if n < 0 {
|
|
||||||
n = 0
|
|
||||||
}
|
|
||||||
if n != len(b) {
|
|
||||||
err = io.ErrShortWrite
|
|
||||||
}
|
|
||||||
if e != nil {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) Read(b []byte) (n int, err error) {
|
|
||||||
if d == nil {
|
|
||||||
return 0, os.ErrInvalid
|
|
||||||
}
|
|
||||||
n, e := d.file.Read(b)
|
|
||||||
if n < 0 {
|
|
||||||
n = 0
|
|
||||||
}
|
|
||||||
if n == 0 && len(b) > 0 && e == nil {
|
|
||||||
return 0, io.EOF
|
|
||||||
}
|
|
||||||
if e != nil {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) Close() error {
|
|
||||||
if d == nil {
|
|
||||||
return os.ErrInvalid
|
|
||||||
}
|
|
||||||
return d.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) SplitAndWrite(data io.Reader, chunkSize int) (int, error) {
|
|
||||||
if d == nil {
|
|
||||||
return 0, os.ErrInvalid
|
|
||||||
}
|
|
||||||
splits := split.Stream(data, uint64(chunkSize))
|
|
||||||
i := 0
|
|
||||||
n := 0
|
|
||||||
for chunk := range splits {
|
|
||||||
if chunk.Err != nil {
|
|
||||||
return 0, chunk.Err
|
|
||||||
}
|
|
||||||
d.chunk = strconv.Itoa(i)
|
|
||||||
if err := d.Create(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
m, err := d.Write(chunk.Data)
|
|
||||||
defer d.Close()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
n = n + m
|
|
||||||
i = i + 1
|
|
||||||
}
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Disk) JoinAndRead() (io.Reader, error) {
|
|
||||||
dirname := path.Join(d.root, d.bucket, d.object)
|
|
||||||
return split.JoinFiles(dirname, d.object)
|
|
||||||
}
|
|
||||||
@@ -2,9 +2,46 @@ package donut
|
|||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
|
|
||||||
|
// INTERFACES
|
||||||
|
|
||||||
|
// Donut interface
|
||||||
type Donut interface {
|
type Donut interface {
|
||||||
Get(bucket string, object string) (body io.Reader, err error)
|
PutBucket(bucket string) error
|
||||||
Put(bucket string, object string, size int, body io.Reader) error
|
Get(bucket, object string) (io.ReadCloser, int64, error)
|
||||||
ListObjects(bucket string) (objects map[string]string, err error)
|
Put(bucket, object string) (ObjectWriter, error)
|
||||||
ListBuckets() (buckets map[string]string, err error)
|
Stat(bucket, object string) (map[string]string, error)
|
||||||
|
ListBuckets() ([]string, error)
|
||||||
|
ListObjects(bucket string) ([]string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket interface
|
||||||
|
type Bucket interface {
|
||||||
|
GetNodes() ([]string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Node interface
|
||||||
|
type Node interface {
|
||||||
|
GetBuckets() ([]string, error)
|
||||||
|
GetDonutDriverMetadata(bucket, object string) (map[string]string, error)
|
||||||
|
GetMetadata(bucket, object string) (map[string]string, error)
|
||||||
|
GetReader(bucket, object string) (io.ReadCloser, error)
|
||||||
|
GetWriter(bucket, object string) (Writer, error)
|
||||||
|
ListObjects(bucket string) ([]string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectWriter interface
|
||||||
|
type ObjectWriter interface {
|
||||||
|
Close() error
|
||||||
|
CloseWithError(error) error
|
||||||
|
GetMetadata() (map[string]string, error)
|
||||||
|
SetMetadata(map[string]string) error
|
||||||
|
Write([]byte) (int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writer interface
|
||||||
|
type Writer interface {
|
||||||
|
ObjectWriter
|
||||||
|
|
||||||
|
GetDonutDriverMetadata() (map[string]string, error)
|
||||||
|
SetDonutDriverMetadata(map[string]string) error
|
||||||
}
|
}
|
||||||
|
|||||||
206
pkg/donut/donutdriver_test.go
Normal file
206
pkg/donut/donutdriver_test.go
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"bytes"
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
type MySuite struct{}
|
||||||
|
|
||||||
|
var _ = Suite(&MySuite{})
|
||||||
|
|
||||||
|
func (s *MySuite) TestEmptyBucket(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
|
||||||
|
// check buckets are empty
|
||||||
|
buckets, err := donut.ListBuckets()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(buckets, IsNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestBucketWithoutNameFails(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
// fail to create new bucket without a name
|
||||||
|
err = donut.PutBucket("")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
|
||||||
|
err = donut.PutBucket(" ")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestPutBucketAndList(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
// create bucket
|
||||||
|
err = donut.PutBucket("foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// check bucket exists
|
||||||
|
buckets, err := donut.ListBuckets()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(buckets, DeepEquals, []string{"foo"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestPutBucketWithSameNameFails(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
err = donut.PutBucket("foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = donut.PutBucket("foo")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
// add a second bucket
|
||||||
|
err = donut.PutBucket("foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = donut.PutBucket("bar")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
buckets, err := donut.ListBuckets()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(buckets, DeepEquals, []string{"bar", "foo"})
|
||||||
|
|
||||||
|
err = donut.PutBucket("foobar")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
buckets, err = donut.ListBuckets()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(buckets, DeepEquals, []string{"bar", "foo", "foobar"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
|
||||||
|
writer, err := donut.Put("foo", "obj")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
c.Assert(writer, IsNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
|
||||||
|
writer, err := donut.Put("foo", "")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
c.Assert(writer, IsNil)
|
||||||
|
|
||||||
|
writer, err = donut.Put("foo", " ")
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
c.Assert(writer, IsNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
|
||||||
|
err = donut.PutBucket("foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
writer, err := donut.Put("foo", "obj")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
data := "Hello World"
|
||||||
|
length, err := writer.Write([]byte(data))
|
||||||
|
c.Assert(length, Equals, len(data))
|
||||||
|
|
||||||
|
expectedMetadata := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
"created": "one",
|
||||||
|
"hello": "world",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writer.SetMetadata(expectedMetadata)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = writer.Close()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
actualWriterMetadata, err := writer.GetMetadata()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(actualWriterMetadata, DeepEquals, expectedMetadata)
|
||||||
|
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
reader, _, err := donut.Get("foo", "obj")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
var actualData bytes.Buffer
|
||||||
|
_, err = io.Copy(&actualData, reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
|
||||||
|
|
||||||
|
actualMetadata, err := donut.Stat("foo", "obj")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(actualMetadata, DeepEquals, expectedMetadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestMultipleNewObjects(c *C) {
|
||||||
|
root, err := ioutil.TempDir(os.TempDir(), "donut-")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
donut := NewDriver(root)
|
||||||
|
|
||||||
|
c.Assert(donut.PutBucket("foo"), IsNil)
|
||||||
|
writer, err := donut.Put("foo", "obj1")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
writer.Write([]byte("one"))
|
||||||
|
writer.Close()
|
||||||
|
|
||||||
|
writer, err = donut.Put("foo", "obj2")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
writer.Write([]byte("two"))
|
||||||
|
writer.Close()
|
||||||
|
|
||||||
|
// c.Skip("not complete")
|
||||||
|
|
||||||
|
reader, _, err := donut.Get("foo", "obj1")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
var readerBuffer1 bytes.Buffer
|
||||||
|
_, err = io.Copy(&readerBuffer1, reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
// c.Skip("Not Implemented")
|
||||||
|
c.Assert(readerBuffer1.Bytes(), DeepEquals, []byte("one"))
|
||||||
|
|
||||||
|
reader, _, err = donut.Get("foo", "obj2")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
var readerBuffer2 bytes.Buffer
|
||||||
|
_, err = io.Copy(&readerBuffer2, reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(readerBuffer2.Bytes(), DeepEquals, []byte("two"))
|
||||||
|
|
||||||
|
// test list objects
|
||||||
|
listObjects, err := donut.ListObjects("foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"})
|
||||||
|
}
|
||||||
133
pkg/donut/driver.go
Normal file
133
pkg/donut/driver.go
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type donutDriver struct {
|
||||||
|
buckets map[string]Bucket
|
||||||
|
nodes map[string]Node
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDriver - instantiate new donut driver
|
||||||
|
func NewDriver(root string) Donut {
|
||||||
|
nodes := make(map[string]Node)
|
||||||
|
nodes["localhost"] = localDirectoryNode{root: root}
|
||||||
|
driver := donutDriver{
|
||||||
|
buckets: make(map[string]Bucket),
|
||||||
|
nodes: nodes,
|
||||||
|
}
|
||||||
|
return driver
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver donutDriver) PutBucket(bucketName string) error {
|
||||||
|
if _, ok := driver.buckets[bucketName]; ok == false {
|
||||||
|
bucketName = strings.TrimSpace(bucketName)
|
||||||
|
if bucketName == "" {
|
||||||
|
return errors.New("Cannot create bucket with no name")
|
||||||
|
}
|
||||||
|
// assign nodes
|
||||||
|
// TODO assign other nodes
|
||||||
|
nodes := make([]string, 16)
|
||||||
|
for i := 0; i < 16; i++ {
|
||||||
|
nodes[i] = "localhost"
|
||||||
|
}
|
||||||
|
bucket := bucketDriver{
|
||||||
|
nodes: nodes,
|
||||||
|
}
|
||||||
|
driver.buckets[bucketName] = bucket
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("Bucket exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver donutDriver) ListBuckets() ([]string, error) {
|
||||||
|
var buckets []string
|
||||||
|
for bucket := range driver.buckets {
|
||||||
|
buckets = append(buckets, bucket)
|
||||||
|
}
|
||||||
|
sort.Strings(buckets)
|
||||||
|
return buckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver donutDriver) Put(bucketName, objectName string) (ObjectWriter, error) {
|
||||||
|
if bucket, ok := driver.buckets[bucketName]; ok == true {
|
||||||
|
writers := make([]Writer, 16)
|
||||||
|
nodes, err := bucket.GetNodes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for i, nodeID := range nodes {
|
||||||
|
if node, ok := driver.nodes[nodeID]; ok == true {
|
||||||
|
writer, _ := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName)
|
||||||
|
writers[i] = writer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newErasureWriter(writers), nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("Bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver donutDriver) Get(bucketName, objectName string) (io.ReadCloser, int64, error) {
|
||||||
|
r, w := io.Pipe()
|
||||||
|
if bucket, ok := driver.buckets[bucketName]; ok == true {
|
||||||
|
readers := make([]io.ReadCloser, 16)
|
||||||
|
nodes, err := bucket.GetNodes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
var metadata map[string]string
|
||||||
|
for i, nodeID := range nodes {
|
||||||
|
if node, ok := driver.nodes[nodeID]; ok == true {
|
||||||
|
bucketID := bucketName + ":0:" + strconv.Itoa(i)
|
||||||
|
reader, err := node.GetReader(bucketID, objectName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
readers[i] = reader
|
||||||
|
if metadata == nil {
|
||||||
|
metadata, err = node.GetDonutDriverMetadata(bucketID, objectName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go erasureReader(readers, metadata, w)
|
||||||
|
totalLength, _ := strconv.ParseInt(metadata["totalLength"], 10, 64)
|
||||||
|
return r, totalLength, nil
|
||||||
|
}
|
||||||
|
return nil, 0, errors.New("Bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat returns metadata for a given object in a bucket
|
||||||
|
func (driver donutDriver) Stat(bucketName, object string) (map[string]string, error) {
|
||||||
|
if bucket, ok := driver.buckets[bucketName]; ok {
|
||||||
|
nodes, err := bucket.GetNodes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if node, ok := driver.nodes[nodes[0]]; ok {
|
||||||
|
return node.GetMetadata(bucketName+":0:0", object)
|
||||||
|
}
|
||||||
|
return nil, errors.New("Cannot connect to node: " + nodes[0])
|
||||||
|
}
|
||||||
|
return nil, errors.New("Bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver donutDriver) ListObjects(bucketName string) ([]string, error) {
|
||||||
|
if bucket, ok := driver.buckets[bucketName]; ok {
|
||||||
|
nodes, err := bucket.GetNodes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if node, ok := driver.nodes[nodes[0]]; ok {
|
||||||
|
return node.ListObjects(bucketName + ":0:0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, errors.New("Bucket not found")
|
||||||
|
}
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
package encoder
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
// "github.com/minio-io/mc/pkg/encoding/erasure"
|
|
||||||
"github.com/minio-io/mc/pkg/donut/disk"
|
|
||||||
)
|
|
||||||
|
|
||||||
type encoder struct{}
|
|
||||||
|
|
||||||
const (
|
|
||||||
chunkSize = 10 * 1024 * 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
func New() *encoder {
|
|
||||||
e := encoder{}
|
|
||||||
return &e
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *encoder) Put(bucket, object string, size int, body io.Reader) error {
|
|
||||||
d := disk.New(bucket, object)
|
|
||||||
n, err := d.SplitAndWrite(body, chunkSize)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if n > size {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
if n < size {
|
|
||||||
return io.ErrShortWrite
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *encoder) Get(bucket, object string) (body io.Reader, err error) {
|
|
||||||
d := disk.New(bucket, object)
|
|
||||||
return d.JoinAndRead()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *encoder) ListObjects(bucket string) (map[string]string, error) {
|
|
||||||
return nil, errors.New("Not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *encoder) ListBuckets() (map[string]string, error) {
|
|
||||||
return nil, errors.New("Not implemented")
|
|
||||||
}
|
|
||||||
137
pkg/donut/erasure.go
Normal file
137
pkg/donut/erasure.go
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/encoding/erasure"
|
||||||
|
"github.com/minio-io/mc/pkg/utils/split"
|
||||||
|
)
|
||||||
|
|
||||||
|
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) {
|
||||||
|
totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"])
|
||||||
|
totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"])
|
||||||
|
blockSize, _ := strconv.Atoi(donutMetadata["blockSize"])
|
||||||
|
params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
|
||||||
|
encoder := erasure.NewEncoder(params)
|
||||||
|
for _, reader := range readers {
|
||||||
|
defer reader.Close()
|
||||||
|
}
|
||||||
|
for i := 0; i < totalChunks; i++ {
|
||||||
|
encodedBytes := make([][]byte, 16)
|
||||||
|
for i, reader := range readers {
|
||||||
|
var bytesBuffer bytes.Buffer
|
||||||
|
io.Copy(&bytesBuffer, reader)
|
||||||
|
encodedBytes[i] = bytesBuffer.Bytes()
|
||||||
|
}
|
||||||
|
curBlockSize := totalLeft
|
||||||
|
if blockSize < totalLeft {
|
||||||
|
curBlockSize = blockSize
|
||||||
|
}
|
||||||
|
decodedData, err := encoder.Decode(encodedBytes, curBlockSize)
|
||||||
|
if err != nil {
|
||||||
|
writer.CloseWithError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
io.Copy(writer, bytes.NewBuffer(decodedData))
|
||||||
|
totalLeft = totalLeft - blockSize
|
||||||
|
}
|
||||||
|
writer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// erasure writer
|
||||||
|
|
||||||
|
type erasureWriter struct {
|
||||||
|
writers []Writer
|
||||||
|
metadata map[string]string
|
||||||
|
donutMetadata map[string]string // not exposed
|
||||||
|
writer *io.PipeWriter
|
||||||
|
isClosed <-chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newErasureWriter(writers []Writer) ObjectWriter {
|
||||||
|
r, w := io.Pipe()
|
||||||
|
isClosed := make(chan bool)
|
||||||
|
writer := erasureWriter{
|
||||||
|
writers: writers,
|
||||||
|
metadata: make(map[string]string),
|
||||||
|
writer: w,
|
||||||
|
isClosed: isClosed,
|
||||||
|
}
|
||||||
|
go erasureGoroutine(r, writer, isClosed)
|
||||||
|
return writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) {
|
||||||
|
chunks := split.Stream(r, 10*1024*1024)
|
||||||
|
params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
|
||||||
|
encoder := erasure.NewEncoder(params)
|
||||||
|
chunkCount := 0
|
||||||
|
totalLength := 0
|
||||||
|
for chunk := range chunks {
|
||||||
|
if chunk.Err == nil {
|
||||||
|
totalLength = totalLength + len(chunk.Data)
|
||||||
|
encodedBlocks, _ := encoder.Encode(chunk.Data)
|
||||||
|
for blockIndex, block := range encodedBlocks {
|
||||||
|
io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
chunkCount = chunkCount + 1
|
||||||
|
}
|
||||||
|
metadata := make(map[string]string)
|
||||||
|
metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024)
|
||||||
|
metadata["chunkCount"] = strconv.Itoa(chunkCount)
|
||||||
|
metadata["created"] = time.Now().Format(time.RFC3339Nano)
|
||||||
|
metadata["erasureK"] = "8"
|
||||||
|
metadata["erasureM"] = "8"
|
||||||
|
metadata["erasureTechnique"] = "Cauchy"
|
||||||
|
metadata["totalLength"] = strconv.Itoa(totalLength)
|
||||||
|
for _, nodeWriter := range eWriter.writers {
|
||||||
|
if nodeWriter != nil {
|
||||||
|
nodeWriter.SetMetadata(eWriter.metadata)
|
||||||
|
nodeWriter.SetDonutDriverMetadata(metadata)
|
||||||
|
nodeWriter.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
isClosed <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d erasureWriter) Write(data []byte) (int, error) {
|
||||||
|
io.Copy(d.writer, bytes.NewBuffer(data))
|
||||||
|
return len(data), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d erasureWriter) Close() error {
|
||||||
|
d.writer.Close()
|
||||||
|
<-d.isClosed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d erasureWriter) CloseWithError(err error) error {
|
||||||
|
for _, writer := range d.writers {
|
||||||
|
if writer != nil {
|
||||||
|
writer.CloseWithError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d erasureWriter) SetMetadata(metadata map[string]string) error {
|
||||||
|
for k := range d.metadata {
|
||||||
|
delete(d.metadata, k)
|
||||||
|
}
|
||||||
|
for k, v := range metadata {
|
||||||
|
d.metadata[k] = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d erasureWriter) GetMetadata() (map[string]string, error) {
|
||||||
|
metadata := make(map[string]string)
|
||||||
|
for k, v := range d.metadata {
|
||||||
|
metadata[k] = v
|
||||||
|
}
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
76
pkg/donut/local.go
Normal file
76
pkg/donut/local.go
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"encoding/json"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
type localDirectoryNode struct {
|
||||||
|
root string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) GetBuckets() ([]string, error) {
|
||||||
|
return nil, errors.New("Not Implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) GetWriter(bucket, object string) (Writer, error) {
|
||||||
|
objectPath := path.Join(d.root, bucket, object)
|
||||||
|
err := os.MkdirAll(objectPath, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newDonutFileWriter(objectPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) {
|
||||||
|
return os.Open(path.Join(d.root, bucket, object, "data"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) {
|
||||||
|
return d.getMetadata(bucket, object, "metadata.json")
|
||||||
|
}
|
||||||
|
func (d localDirectoryNode) GetDonutDriverMetadata(bucket, object string) (map[string]string, error) {
|
||||||
|
return d.getMetadata(bucket, object, "donutMetadata.json")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) {
|
||||||
|
file, err := os.Open(path.Join(d.root, bucket, object, fileName))
|
||||||
|
defer file.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metadata := make(map[string]string)
|
||||||
|
decoder := json.NewDecoder(file)
|
||||||
|
if err := decoder.Decode(&metadata); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return metadata, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d localDirectoryNode) ListObjects(bucketName string) ([]string, error) {
|
||||||
|
prefix := path.Join(d.root, bucketName)
|
||||||
|
var objects []string
|
||||||
|
if err := filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !info.IsDir() && strings.HasSuffix(path, "data") {
|
||||||
|
object := strings.TrimPrefix(path, prefix+"/")
|
||||||
|
object = strings.TrimSuffix(object, "/data")
|
||||||
|
objects = append(objects, object)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sort.Strings(objects)
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
@@ -1,35 +0,0 @@
|
|||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/minio-io/mc/pkg/os/scsi"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Node struct {
|
|
||||||
*node
|
|
||||||
}
|
|
||||||
|
|
||||||
type node struct {
|
|
||||||
devices *scsi.Devices
|
|
||||||
bucket string
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(bucket string) *Node {
|
|
||||||
n := &Node{&node{bucket: bucket}}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) GetDisks() error {
|
|
||||||
if n == nil {
|
|
||||||
return errors.New("invalid argument")
|
|
||||||
}
|
|
||||||
n.devices = &scsi.Devices{}
|
|
||||||
if err := n.devices.Get(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) CreateBucket() error {
|
|
||||||
}
|
|
||||||
39
pkg/donut/objectwriter.go
Normal file
39
pkg/donut/objectwriter.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type objectWriter struct {
|
||||||
|
metadata map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj objectWriter) Write(data []byte) (length int, err error) {
|
||||||
|
return 11, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj objectWriter) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj objectWriter) CloseWithError(err error) error {
|
||||||
|
return errors.New("Not Implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj objectWriter) SetMetadata(metadata map[string]string) error {
|
||||||
|
for k := range obj.metadata {
|
||||||
|
delete(obj.metadata, k)
|
||||||
|
}
|
||||||
|
for k, v := range metadata {
|
||||||
|
obj.metadata[k] = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj objectWriter) GetMetadata() (map[string]string, error) {
|
||||||
|
ret := make(map[string]string)
|
||||||
|
for k, v := range obj.metadata {
|
||||||
|
ret[k] = v
|
||||||
|
}
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
88
pkg/donut/writer.go
Normal file
88
pkg/donut/writer.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package donut
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newDonutFileWriter(objectDir string) (Writer, error) {
|
||||||
|
dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return donutFileWriter{
|
||||||
|
root: objectDir,
|
||||||
|
file: dataFile,
|
||||||
|
metadata: make(map[string]string),
|
||||||
|
donutMetadata: make(map[string]string),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type donutFileWriter struct {
|
||||||
|
root string
|
||||||
|
file *os.File
|
||||||
|
metadata map[string]string
|
||||||
|
donutMetadata map[string]string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) Write(data []byte) (int, error) {
|
||||||
|
return d.file.Write(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) Close() error {
|
||||||
|
if d.err != nil {
|
||||||
|
return d.err
|
||||||
|
}
|
||||||
|
metadata, _ := json.Marshal(d.metadata)
|
||||||
|
ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600)
|
||||||
|
donutMetadata, _ := json.Marshal(d.donutMetadata)
|
||||||
|
ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600)
|
||||||
|
|
||||||
|
return d.file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) CloseWithError(err error) error {
|
||||||
|
if d.err != nil {
|
||||||
|
d.err = err
|
||||||
|
}
|
||||||
|
return d.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) SetMetadata(metadata map[string]string) error {
|
||||||
|
for k := range d.metadata {
|
||||||
|
delete(d.metadata, k)
|
||||||
|
}
|
||||||
|
for k, v := range metadata {
|
||||||
|
d.metadata[k] = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) GetMetadata() (map[string]string, error) {
|
||||||
|
metadata := make(map[string]string)
|
||||||
|
for k, v := range d.metadata {
|
||||||
|
metadata[k] = v
|
||||||
|
}
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) SetDonutDriverMetadata(metadata map[string]string) error {
|
||||||
|
for k := range d.donutMetadata {
|
||||||
|
delete(d.donutMetadata, k)
|
||||||
|
}
|
||||||
|
for k, v := range metadata {
|
||||||
|
d.donutMetadata[k] = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d donutFileWriter) GetDonutDriverMetadata() (map[string]string, error) {
|
||||||
|
donutMetadata := make(map[string]string)
|
||||||
|
for k, v := range d.donutMetadata {
|
||||||
|
donutMetadata[k] = v
|
||||||
|
}
|
||||||
|
return donutMetadata, nil
|
||||||
|
}
|
||||||
@@ -41,7 +41,7 @@ type Disk struct {
|
|||||||
Diskattrmap map[string][]byte
|
Diskattrmap map[string][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// Partitions - struct which carries per partition name, and its attributes
|
// Partition - struct which carries per partition name, and its attributes
|
||||||
type Partition struct {
|
type Partition struct {
|
||||||
Name string
|
Name string
|
||||||
Partitionattrmap map[string][]byte
|
Partitionattrmap map[string][]byte
|
||||||
|
|||||||
@@ -53,25 +53,11 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Auth - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
|
func (a *s3Client) loadKeys(cert string, key string) (*client.TLSConfig, error) {
|
||||||
type Auth struct {
|
|
||||||
AccessKeyID string
|
|
||||||
SecretAccessKey string
|
|
||||||
|
|
||||||
// Used for SSL transport layer
|
|
||||||
CertPEM string
|
|
||||||
KeyPEM string
|
|
||||||
}
|
|
||||||
|
|
||||||
// TLSConfig - TLS cert and key configuration
|
|
||||||
type TLSConfig struct {
|
|
||||||
CertPEMBlock []byte
|
|
||||||
KeyPEMBlock []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *Auth) loadKeys(cert string, key string) (*TLSConfig, error) {
|
|
||||||
certBlock, err := ioutil.ReadFile(cert)
|
certBlock, err := ioutil.ReadFile(cert)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -80,13 +66,13 @@ func (a *Auth) loadKeys(cert string, key string) (*TLSConfig, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t := &TLSConfig{}
|
t := &client.TLSConfig{}
|
||||||
t.CertPEMBlock = certBlock
|
t.CertPEMBlock = certBlock
|
||||||
t.KeyPEMBlock = keyBlock
|
t.KeyPEMBlock = keyBlock
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Auth) getTLSTransport() (*http.Transport, error) {
|
func (a *s3Client) getTLSTransport() (*http.Transport, error) {
|
||||||
if a.CertPEM == "" || a.KeyPEM == "" {
|
if a.CertPEM == "" || a.KeyPEM == "" {
|
||||||
return &http.Transport{
|
return &http.Transport{
|
||||||
Dial: (&net.Dialer{
|
Dial: (&net.Dialer{
|
||||||
@@ -118,7 +104,7 @@ func (a *Auth) getTLSTransport() (*http.Transport, error) {
|
|||||||
return transport, nil
|
return transport, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Auth) signRequest(req *http.Request, host string) {
|
func (a *s3Client) signRequest(req *http.Request, host string) {
|
||||||
if date := req.Header.Get("Date"); date == "" {
|
if date := req.Header.Get("Date"); date == "" {
|
||||||
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
||||||
}
|
}
|
||||||
@@ -152,7 +138,7 @@ func firstNonEmptyString(strs ...string) string {
|
|||||||
// Date + "\n" +
|
// Date + "\n" +
|
||||||
// CanonicalizedAmzHeaders +
|
// CanonicalizedAmzHeaders +
|
||||||
// CanonicalizedResource;
|
// CanonicalizedResource;
|
||||||
func (a *Auth) stringToSign(req *http.Request, host string) string {
|
func (a *s3Client) stringToSign(req *http.Request, host string) string {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
buf.WriteString(req.Method)
|
buf.WriteString(req.Method)
|
||||||
buf.WriteByte('\n')
|
buf.WriteByte('\n')
|
||||||
@@ -181,7 +167,7 @@ func hasPrefixCaseInsensitive(s, pfx string) bool {
|
|||||||
return shead == pfx || shead == strings.ToLower(pfx)
|
return shead == pfx || shead == strings.ToLower(pfx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Auth) writeCanonicalizedAmzHeaders(buf *bytes.Buffer, req *http.Request) {
|
func (a *s3Client) writeCanonicalizedAmzHeaders(buf *bytes.Buffer, req *http.Request) {
|
||||||
var amzHeaders []string
|
var amzHeaders []string
|
||||||
vals := make(map[string][]string)
|
vals := make(map[string][]string)
|
||||||
for k, vv := range req.Header {
|
for k, vv := range req.Header {
|
||||||
@@ -243,7 +229,7 @@ var subResList = []string{
|
|||||||
// CanonicalizedResource = [ "/" + Bucket ] +
|
// CanonicalizedResource = [ "/" + Bucket ] +
|
||||||
// <HTTP-Request-URI, from the protocol name up to the query string> +
|
// <HTTP-Request-URI, from the protocol name up to the query string> +
|
||||||
// [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"];
|
// [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"];
|
||||||
func (a *Auth) writeCanonicalizedResource(buf *bytes.Buffer, req *http.Request, host string) {
|
func (a *s3Client) writeCanonicalizedResource(buf *bytes.Buffer, req *http.Request, host string) {
|
||||||
bucket := a.bucketFromHost(req, host)
|
bucket := a.bucketFromHost(req, host)
|
||||||
if bucket != "" {
|
if bucket != "" {
|
||||||
buf.WriteByte('/')
|
buf.WriteByte('/')
|
||||||
@@ -277,7 +263,7 @@ func hasDotSuffix(s string, suffix string) bool {
|
|||||||
return len(s) >= len(suffix)+1 && strings.HasSuffix(s, suffix) && s[len(s)-len(suffix)-1] == '.'
|
return len(s) >= len(suffix)+1 && strings.HasSuffix(s, suffix) && s[len(s)-len(suffix)-1] == '.'
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Auth) bucketFromHost(req *http.Request, host string) string {
|
func (a *s3Client) bucketFromHost(req *http.Request, host string) string {
|
||||||
reqHost := req.Host
|
reqHost := req.Host
|
||||||
if reqHost == "" {
|
if reqHost == "" {
|
||||||
host = req.URL.Host
|
host = req.URL.Host
|
||||||
|
|||||||
@@ -42,8 +42,11 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
type reqAndExpected struct {
|
type reqAndExpected struct {
|
||||||
@@ -59,7 +62,7 @@ func req(s string) *http.Request {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStringToSign(t *testing.T) {
|
func TestStringToSign(t *testing.T) {
|
||||||
var a Auth
|
var a s3Client
|
||||||
tests := []reqAndExpected{
|
tests := []reqAndExpected{
|
||||||
{`GET /photos/puppy.jpg HTTP/1.1
|
{`GET /photos/puppy.jpg HTTP/1.1
|
||||||
Host: johnsmith.s3.amazonaws.com
|
Host: johnsmith.s3.amazonaws.com
|
||||||
@@ -118,7 +121,7 @@ Content-Length: 5913339
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBucketFromHostname(t *testing.T) {
|
func TestBucketFromHostname(t *testing.T) {
|
||||||
var a Auth
|
var a s3Client
|
||||||
tests := []reqAndExpected{
|
tests := []reqAndExpected{
|
||||||
{"GET / HTTP/1.0\n\n", "", ""},
|
{"GET / HTTP/1.0\n\n", "", ""},
|
||||||
{"GET / HTTP/1.0\nHost: s3.amazonaws.com\n\n", "", "s3.amazonaws.com"},
|
{"GET / HTTP/1.0\nHost: s3.amazonaws.com\n\n", "", "s3.amazonaws.com"},
|
||||||
@@ -136,13 +139,19 @@ func TestBucketFromHostname(t *testing.T) {
|
|||||||
|
|
||||||
func TestsignRequest(t *testing.T) {
|
func TestsignRequest(t *testing.T) {
|
||||||
r := req("GET /foo HTTP/1.1\n\n")
|
r := req("GET /foo HTTP/1.1\n\n")
|
||||||
auth := &Auth{AccessKeyID: "key", SecretAccessKey: "secretkey"}
|
auth := &client.Auth{AccessKeyID: "key", SecretAccessKey: "secretkey"}
|
||||||
auth.signRequest(r, "localhost:9000")
|
url, _ := url.Parse("localhost:9000")
|
||||||
|
cl := &s3Client{&client.Meta{
|
||||||
|
Auth: auth,
|
||||||
|
Transport: http.DefaultTransport,
|
||||||
|
URL: url,
|
||||||
|
}}
|
||||||
|
cl.signRequest(r, "localhost:9000")
|
||||||
if r.Header.Get("Date") == "" {
|
if r.Header.Get("Date") == "" {
|
||||||
t.Error("expected a Date set")
|
t.Error("expected a Date set")
|
||||||
}
|
}
|
||||||
r.Header.Set("Date", "Sat, 02 Apr 2011 04:23:52 GMT")
|
r.Header.Set("Date", "Sat, 02 Apr 2011 04:23:52 GMT")
|
||||||
auth.signRequest(r, "localhost:9000")
|
cl.signRequest(r, "localhost:9000")
|
||||||
if e, g := r.Header.Get("Authorization"), "AWS key:kHpCR/N7Rw3PwRlDd8+5X40CFVc="; e != g {
|
if e, g := r.Header.Get("Authorization"), "AWS key:kHpCR/N7Rw3PwRlDd8+5X40CFVc="; e != g {
|
||||||
t.Errorf("got header %q; expected %q", g, e)
|
t.Errorf("got header %q; expected %q", g, e)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,10 +50,12 @@ import (
|
|||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
// bySize implements sort.Interface for []Item based on the Size field.
|
// bySize implements sort.Interface for []Item based on the Size field.
|
||||||
type bySize []*Item
|
type bySize []*client.Item
|
||||||
|
|
||||||
func (a bySize) Len() int { return len(a) }
|
func (a bySize) Len() int { return len(a) }
|
||||||
func (a bySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
func (a bySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
@@ -62,10 +64,10 @@ func (a bySize) Less(i, j int) bool { return a[i].Size < a[j].Size }
|
|||||||
/// Bucket API operations
|
/// Bucket API operations
|
||||||
|
|
||||||
// ListBuckets - Get list of buckets
|
// ListBuckets - Get list of buckets
|
||||||
func (c *Client) ListBuckets() ([]*Bucket, error) {
|
func (c *s3Client) ListBuckets() ([]*client.Bucket, error) {
|
||||||
url := fmt.Sprintf("%s://%s/", c.Scheme, c.Host)
|
url := fmt.Sprintf("%s://%s/", c.URL.Scheme, c.URL.Host)
|
||||||
req := newReq(url)
|
req := newReq(url)
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
|
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -81,14 +83,14 @@ func (c *Client) ListBuckets() ([]*Bucket, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PutBucket - create new bucket
|
// PutBucket - create new bucket
|
||||||
func (c *Client) PutBucket(bucket string) error {
|
func (c *s3Client) PutBucket(bucket string) error {
|
||||||
var url string
|
var url string
|
||||||
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
||||||
url = fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
|
url = fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket)
|
||||||
}
|
}
|
||||||
req := newReq(url)
|
req := newReq(url)
|
||||||
req.Method = "PUT"
|
req.Method = "PUT"
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -106,7 +108,7 @@ func (c *Client) PutBucket(bucket string) error {
|
|||||||
// provided bucket. Keys before startAt will be skipped. (This is the S3
|
// provided bucket. Keys before startAt will be skipped. (This is the S3
|
||||||
// 'marker' value). If the length of the returned items is equal to
|
// 'marker' value). If the length of the returned items is equal to
|
||||||
// maxKeys, there is no indication whether or not the returned list is truncated.
|
// maxKeys, there is no indication whether or not the returned list is truncated.
|
||||||
func (c *Client) ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error) {
|
func (c *s3Client) ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*client.Item, prefixes []*client.Prefix, err error) {
|
||||||
var urlReq string
|
var urlReq string
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
|
|
||||||
@@ -141,7 +143,7 @@ func (c *Client) ListObjects(bucket string, startAt, prefix, delimiter string, m
|
|||||||
for try := 1; try <= maxTries; try++ {
|
for try := 1; try <= maxTries; try++ {
|
||||||
time.Sleep(time.Duration(try-1) * 100 * time.Millisecond)
|
time.Sleep(time.Duration(try-1) * 100 * time.Millisecond)
|
||||||
req := newReq(urlReq)
|
req := newReq(urlReq)
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if try < maxTries {
|
if try < maxTries {
|
||||||
|
|||||||
@@ -44,10 +44,13 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Total max object list
|
// Total max object list
|
||||||
@@ -55,80 +58,55 @@ const (
|
|||||||
MaxKeys = 1000
|
MaxKeys = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bucket - carries s3 bucket reply header
|
|
||||||
type Bucket struct {
|
|
||||||
Name string
|
|
||||||
CreationDate xmlTime // 2006-02-03T16:45:09.000Z
|
|
||||||
}
|
|
||||||
|
|
||||||
// Item - object item list
|
|
||||||
type Item struct {
|
|
||||||
Key string
|
|
||||||
LastModified xmlTime
|
|
||||||
Size int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prefix - common prefix
|
|
||||||
type Prefix struct {
|
|
||||||
Prefix string
|
|
||||||
}
|
|
||||||
|
|
||||||
type listBucketResults struct {
|
type listBucketResults struct {
|
||||||
Contents []*Item
|
Contents []*client.Item
|
||||||
IsTruncated bool
|
IsTruncated bool
|
||||||
MaxKeys int
|
MaxKeys int
|
||||||
Name string // bucket name
|
Name string // bucket name
|
||||||
Marker string
|
Marker string
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Prefix string
|
Prefix string
|
||||||
CommonPrefixes []*Prefix
|
CommonPrefixes []*client.Prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
// Client holds Amazon S3 client credentials and flags.
|
type s3Client struct {
|
||||||
type Client struct {
|
*client.Meta
|
||||||
*Auth // AWS auth credentials
|
|
||||||
Transport http.RoundTripper // or nil for the default behavior
|
|
||||||
|
|
||||||
// Supports URL in following formats
|
|
||||||
// - http://<ipaddress>/<bucketname>/<object>
|
|
||||||
// - http://<bucketname>.<domain>/<object>
|
|
||||||
Host string
|
|
||||||
Scheme string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNewClient returns an initialized S3.Client structure.
|
// GetNewClient returns an initialized s3Client structure.
|
||||||
func GetNewClient(auth *Auth, transport http.RoundTripper) *Client {
|
func GetNewClient(auth *client.Auth, u *url.URL, transport http.RoundTripper) client.Client {
|
||||||
return &Client{
|
return &s3Client{&client.Meta{
|
||||||
Auth: auth,
|
Auth: auth,
|
||||||
Transport: GetNewTraceTransport(s3Verify{}, transport),
|
Transport: GetNewTraceTransport(s3Verify{}, transport),
|
||||||
}
|
URL: u,
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
// bucketURL returns the URL prefix of the bucket, with trailing slash
|
// bucketURL returns the URL prefix of the bucket, with trailing slash
|
||||||
func (c *Client) bucketURL(bucket string) string {
|
func (c *s3Client) bucketURL(bucket string) string {
|
||||||
var url string
|
var url string
|
||||||
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
if IsValidBucket(bucket) && !strings.Contains(bucket, ".") {
|
||||||
// if localhost use PathStyle
|
// if localhost use PathStyle
|
||||||
if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") {
|
if strings.Contains(c.URL.Host, "localhost") || strings.Contains(c.URL.Host, "127.0.0.1") {
|
||||||
return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
|
return fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket)
|
||||||
}
|
}
|
||||||
// Verify if its ip address, use PathStyle
|
// Verify if its ip address, use PathStyle
|
||||||
host, _, _ := net.SplitHostPort(c.Host)
|
host, _, _ := net.SplitHostPort(c.URL.Host)
|
||||||
if net.ParseIP(host) != nil {
|
if net.ParseIP(host) != nil {
|
||||||
return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket)
|
return fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket)
|
||||||
}
|
}
|
||||||
// For DNS hostname or amazonaws.com use subdomain style
|
// For DNS hostname or amazonaws.com use subdomain style
|
||||||
url = fmt.Sprintf("%s://%s.%s/", c.Scheme, bucket, c.Host)
|
url = fmt.Sprintf("%s://%s.%s/", c.URL.Scheme, bucket, c.URL.Host)
|
||||||
}
|
}
|
||||||
return url
|
return url
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) keyURL(bucket, key string) string {
|
func (c *s3Client) keyURL(bucket, key string) string {
|
||||||
url := c.bucketURL(bucket)
|
url := c.bucketURL(bucket)
|
||||||
if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") {
|
if strings.Contains(c.URL.Host, "localhost") || strings.Contains(c.URL.Host, "127.0.0.1") {
|
||||||
return url + "/" + key
|
return url + "/" + key
|
||||||
}
|
}
|
||||||
host, _, _ := net.SplitHostPort(c.Host)
|
host, _, _ := net.SplitHostPort(c.URL.Host)
|
||||||
if net.ParseIP(host) != nil {
|
if net.ParseIP(host) != nil {
|
||||||
return url + "/" + key
|
return url + "/" + key
|
||||||
}
|
}
|
||||||
@@ -140,14 +118,14 @@ func newReq(url string) *http.Request {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("s3 client; invalid URL: %v", err))
|
panic(fmt.Sprintf("s3 client; invalid URL: %v", err))
|
||||||
}
|
}
|
||||||
req.Header.Set("User-Agent", "Minio Client")
|
req.Header.Set("User-Agent", "Minio s3Client")
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseListAllMyBuckets(r io.Reader) ([]*Bucket, error) {
|
func parseListAllMyBuckets(r io.Reader) ([]*client.Bucket, error) {
|
||||||
type allMyBuckets struct {
|
type allMyBuckets struct {
|
||||||
Buckets struct {
|
Buckets struct {
|
||||||
Bucket []*Bucket
|
Bucket []*client.Bucket
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var res allMyBuckets
|
var res allMyBuckets
|
||||||
|
|||||||
@@ -43,9 +43,16 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio-io/mc/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var tc *Client
|
// Date format
|
||||||
|
const (
|
||||||
|
iso8601Format = "2006-01-02T15:04:05.000Z"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tc *s3Client
|
||||||
|
|
||||||
func TestParseBuckets(t *testing.T) {
|
func TestParseBuckets(t *testing.T) {
|
||||||
res := "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<ListAllMyBucketsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"><Owner><ID>ownerIDField</ID><DisplayName>bobDisplayName</DisplayName></Owner><Buckets><Bucket><Name>bucketOne</Name><CreationDate>2006-06-21T07:04:31.000Z</CreationDate></Bucket><Bucket><Name>bucketTwo</Name><CreationDate>2006-06-21T07:04:32.000Z</CreationDate></Bucket></Buckets></ListAllMyBucketsResult>"
|
res := "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<ListAllMyBucketsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"><Owner><ID>ownerIDField</ID><DisplayName>bobDisplayName</DisplayName></Owner><Buckets><Bucket><Name>bucketOne</Name><CreationDate>2006-06-21T07:04:31.000Z</CreationDate></Bucket><Bucket><Name>bucketTwo</Name><CreationDate>2006-06-21T07:04:32.000Z</CreationDate></Bucket></Buckets></ListAllMyBucketsResult>"
|
||||||
@@ -59,11 +66,14 @@ func TestParseBuckets(t *testing.T) {
|
|||||||
|
|
||||||
t1, err := time.Parse(iso8601Format, "2006-06-21T07:04:31.000Z")
|
t1, err := time.Parse(iso8601Format, "2006-06-21T07:04:31.000Z")
|
||||||
t2, err := time.Parse(iso8601Format, "2006-06-21T07:04:32.000Z")
|
t2, err := time.Parse(iso8601Format, "2006-06-21T07:04:32.000Z")
|
||||||
want := []*Bucket{
|
xmlT1 := client.XMLTime{t1}
|
||||||
{Name: "bucketOne", CreationDate: xmlTime{t1}},
|
xmlT2 := client.XMLTime{t2}
|
||||||
{Name: "bucketTwo", CreationDate: xmlTime{t2}},
|
|
||||||
|
want := []*client.Bucket{
|
||||||
|
{Name: "bucketOne", CreationDate: xmlT1},
|
||||||
|
{Name: "bucketTwo", CreationDate: xmlT2},
|
||||||
}
|
}
|
||||||
dump := func(v []*Bucket) {
|
dump := func(v []*client.Bucket) {
|
||||||
for i, b := range v {
|
for i, b := range v {
|
||||||
t.Logf("Bucket #%d: %#v", i, b)
|
t.Logf("Bucket #%d: %#v", i, b)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ import (
|
|||||||
/// Object API operations
|
/// Object API operations
|
||||||
|
|
||||||
// Put - upload new object to bucket
|
// Put - upload new object to bucket
|
||||||
func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
func (c *s3Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
||||||
req := newReq(c.keyURL(bucket, key))
|
req := newReq(c.keyURL(bucket, key))
|
||||||
req.Method = "PUT"
|
req.Method = "PUT"
|
||||||
req.ContentLength = size
|
req.ContentLength = size
|
||||||
@@ -75,7 +75,7 @@ func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
|||||||
req.Body = ioutil.NopCloser(sink)
|
req.Body = ioutil.NopCloser(sink)
|
||||||
b64 := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
b64 := base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||||
req.Header.Set("Content-MD5", b64)
|
req.Header.Set("Content-MD5", b64)
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
|
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -90,10 +90,10 @@ func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stat - returns 0, "", os.ErrNotExist if not on S3
|
// Stat - returns 0, "", os.ErrNotExist if not on S3
|
||||||
func (c *Client) Stat(bucket, key string) (size int64, date time.Time, reterr error) {
|
func (c *s3Client) Stat(bucket, key string) (size int64, date time.Time, reterr error) {
|
||||||
req := newReq(c.keyURL(bucket, key))
|
req := newReq(c.keyURL(bucket, key))
|
||||||
req.Method = "HEAD"
|
req.Method = "HEAD"
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, date, err
|
return 0, date, err
|
||||||
@@ -123,9 +123,9 @@ func (c *Client) Stat(bucket, key string) (size int64, date time.Time, reterr er
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get - download a requested object from a given bucket
|
// Get - download a requested object from a given bucket
|
||||||
func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) {
|
func (c *s3Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) {
|
||||||
req := newReq(c.keyURL(bucket, key))
|
req := newReq(c.keyURL(bucket, key))
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
@@ -140,7 +140,7 @@ func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err er
|
|||||||
|
|
||||||
// GetPartial fetches part of the s3 key object in bucket.
|
// GetPartial fetches part of the s3 key object in bucket.
|
||||||
// If length is negative, the rest of the object is returned.
|
// If length is negative, the rest of the object is returned.
|
||||||
func (c *Client) GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error) {
|
func (c *s3Client) GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error) {
|
||||||
if offset < 0 {
|
if offset < 0 {
|
||||||
return nil, 0, errors.New("invalid negative length")
|
return nil, 0, errors.New("invalid negative length")
|
||||||
}
|
}
|
||||||
@@ -151,7 +151,7 @@ func (c *Client) GetPartial(bucket, key string, offset, length int64) (body io.R
|
|||||||
} else {
|
} else {
|
||||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
|
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
|
||||||
}
|
}
|
||||||
c.Auth.signRequest(req, c.Host)
|
c.signRequest(req, c.URL.Host)
|
||||||
|
|
||||||
res, err := c.Transport.RoundTrip(req)
|
res, err := c.Transport.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
@@ -81,12 +82,12 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) {
|
|||||||
bytesWriter.Flush()
|
bytesWriter.Flush()
|
||||||
// if we have data available, send it over the channel
|
// if we have data available, send it over the channel
|
||||||
if bytesBuffer.Len() != 0 {
|
if bytesBuffer.Len() != 0 {
|
||||||
ch <- Message{bytesBuffer.Bytes(), nil}
|
ch <- Message{Data: bytesBuffer.Bytes(), Err: nil}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if we have an error other than an EOF, send it over the channel
|
// if we have an error other than an EOF, send it over the channel
|
||||||
if readError != io.EOF {
|
if readError != io.EOF {
|
||||||
ch <- Message{nil, readError}
|
ch <- Message{Data: nil, Err: readError}
|
||||||
}
|
}
|
||||||
// close the channel, signaling the channel reader that the stream is complete
|
// close the channel, signaling the channel reader that the stream is complete
|
||||||
close(ch)
|
close(ch)
|
||||||
@@ -103,14 +104,13 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) {
|
|||||||
// fmt.Println(buf)
|
// fmt.Println(buf)
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) {
|
func JoinFiles(dirname string, inputPrefix string) (reader io.Reader, err error) {
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
fileInfos, readError := ioutil.ReadDir(dirname)
|
fileInfos, readError := ioutil.ReadDir(dirname)
|
||||||
if readError != nil {
|
if readError != nil {
|
||||||
writer.CloseWithError(readError)
|
writer.CloseWithError(readError)
|
||||||
return nil, readError
|
return nil, readError
|
||||||
}
|
}
|
||||||
|
|
||||||
var newfileInfos []os.FileInfo
|
var newfileInfos []os.FileInfo
|
||||||
for _, fi := range fileInfos {
|
for _, fi := range fileInfos {
|
||||||
if strings.Contains(fi.Name(), inputPrefix) == true {
|
if strings.Contains(fi.Name(), inputPrefix) == true {
|
||||||
@@ -124,25 +124,20 @@ func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) {
|
|||||||
return nil, nofilesError
|
return nil, nofilesError
|
||||||
}
|
}
|
||||||
|
|
||||||
go joinFilesGoRoutine(newfileInfos, writer)
|
for _, fileInfo := range newfileInfos {
|
||||||
return reader, nil
|
file, err := os.Open(path.Join(dirname, fileInfo.Name()))
|
||||||
}
|
|
||||||
|
|
||||||
func joinFilesGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) {
|
|
||||||
for _, fileInfo := range fileInfos {
|
|
||||||
file, err := os.Open(fileInfo.Name())
|
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
for err != nil {
|
for err != nil {
|
||||||
writer.CloseWithError(err)
|
writer.CloseWithError(err)
|
||||||
return
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = io.Copy(writer, file)
|
_, err = io.Copy(writer, file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writer.CloseWithError(err)
|
writer.CloseWithError(err)
|
||||||
return
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writer.Close()
|
return reader, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileWithPrefix - Takes a file and splits it into chunks with size chunkSize. The output
|
// FileWithPrefix - Takes a file and splits it into chunks with size chunkSize. The output
|
||||||
|
|||||||
Reference in New Issue
Block a user