diff --git a/cmd-config.go b/cmd-config.go index 07508c7d..9b037c52 100644 --- a/cmd-config.go +++ b/cmd-config.go @@ -16,7 +16,6 @@ import ( "path/filepath" "github.com/codegangsta/cli" - "github.com/minio-io/mc/pkg/s3" ) const ( @@ -24,8 +23,13 @@ const ( mcConfigFilename = "config.json" ) +type auth struct { + AccessKeyID string + SecretAccessKey string +} + type hostConfig struct { - Auth s3.Auth + Auth auth } type mcConfig struct { @@ -190,40 +194,53 @@ func parseConfigInput(c *cli.Context) (config *mcConfig, err error) { } alias := strings.Fields(c.String("alias")) - if len(alias) == 0 { - // valid case throw help - return nil, nil - } - if len(alias) != 2 { + switch true { + case len(alias) == 0: + config = &mcConfig{ + Version: currentConfigVersion, + DefaultHost: "https://s3.amazonaws.com", + Hosts: map[string]hostConfig{ + "http*://s3*.amazonaws.com": { + Auth: auth{ + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccesskey, + }}, + }, + Aliases: map[string]string{ + "s3": "https://s3.amazonaws.com/", + "localhost": "http://localhost:9000/", + }, + } + return config, nil + case len(alias) == 2: + aliasName := alias[0] + url := alias[1] + if strings.HasPrefix(aliasName, "http") { + return nil, errors.New("invalid alias cannot use http{s}") + } + if !strings.HasPrefix(url, "http") { + return nil, errors.New("invalid url type only supports http{s}") + } + config = &mcConfig{ + Version: currentConfigVersion, + DefaultHost: "https://s3.amazonaws.com", + Hosts: map[string]hostConfig{ + "http*://s3*.amazonaws.com": { + Auth: auth{ + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccesskey, + }}, + }, + Aliases: map[string]string{ + "s3": "https://s3.amazonaws.com/", + "localhost": "http://localhost:9000/", + aliasName: url, + }, + } + return config, nil + default: return nil, errors.New("invalid number of arguments for --alias, requires exact 2") } - aliasName := alias[0] - url := alias[1] - if strings.HasPrefix(aliasName, "http") { - return nil, errors.New("invalid alias cannot use http{s}") - } - if !strings.HasPrefix(url, "http") { - return nil, errors.New("invalid url type only supports http{s}") - } - - config = &mcConfig{ - Version: currentConfigVersion, - DefaultHost: "https://s3.amazonaws.com", - Hosts: map[string]hostConfig{ - "http*://s3*.amazonaws.com": { - Auth: s3.Auth{ - AccessKeyID: accessKeyID, - SecretAccessKey: secretAccesskey, - }}, - }, - Aliases: map[string]string{ - "s3": "https://s3.amazonaws.com/", - "localhost": "http://localhost:9000/", - aliasName: url, - }, - } - //config.Aliases[aliasName] = url - return config, nil } // getHostConfig retrieves host specific configuration such as access keys, certs. diff --git a/cmd-cp.go b/cmd-cp.go index d345e9a1..9f00b230 100644 --- a/cmd-cp.go +++ b/cmd-cp.go @@ -24,7 +24,6 @@ import ( "github.com/cheggaaa/pb" "github.com/codegangsta/cli" - "github.com/minio-io/mc/pkg/s3" ) // Different modes of cp operation @@ -62,7 +61,7 @@ func getMode(recursive bool, args *cmdArgs) int { } // First mode or -func firstMode(s3c *s3.Client, args *cmdArgs) error { +func firstMode(c *cli.Context, args *cmdArgs) error { if args.source.key == "" { return errors.New("invalid args") } @@ -80,13 +79,14 @@ func firstMode(s3c *s3.Client, args *cmdArgs) error { if err != nil { return err } - // http://. is specified without key if args.destination.key == "" { args.destination.key = args.source.key } - s3c.Host = args.destination.host - s3c.Scheme = args.destination.scheme + s3c, err := getNewClient(c, args.destination.url) + if err != nil { + return err + } err = s3c.Put(args.destination.bucket, args.destination.key, size, source) if err != nil { return err @@ -98,13 +98,17 @@ func firstMode(s3c *s3.Client, args *cmdArgs) error { } // Second mode or . -func secondMode(s3c *s3.Client, args *cmdArgs) error { +func secondMode(c *cli.Context, args *cmdArgs) error { var objectReader io.ReadCloser var objectSize, downloadedSize int64 var destination *os.File var err error var st os.FileInfo + s3c, err := getNewClient(c, args.source.url) + if err != nil { + return err + } // Send HEAD request to validate if file exists. objectSize, _, err = s3c.Stat(args.source.bucket, args.source.key) if err != nil { @@ -181,15 +185,17 @@ func secondMode(s3c *s3.Client, args *cmdArgs) error { } // or -func thirdMode(s3c *s3.Client, args *cmdArgs) error { +func thirdMode(c *cli.Context, args *cmdArgs) error { var objectReader io.ReadCloser var objectSize int64 var err error - s3c.Host = args.source.host - s3c.Scheme = args.source.scheme + s3cSource, err := getNewClient(c, args.source.url) + if err != nil { + return err + } // Send HEAD request to validate if file exists. - objectSize, _, err = s3c.Stat(args.source.bucket, args.source.key) + objectSize, _, err = s3cSource.Stat(args.source.bucket, args.source.key) if err != nil { return err } @@ -199,20 +205,18 @@ func thirdMode(s3c *s3.Client, args *cmdArgs) error { } // Check if the object already exists - s3c.Host = args.destination.host - s3c.Scheme = args.destination.scheme - _, _, err = s3c.Stat(args.destination.bucket, args.destination.key) + s3cDest, err := getNewClient(c, args.destination.url) + if err != nil { + return err + } + _, _, err = s3cDest.Stat(args.destination.bucket, args.destination.key) switch os.IsNotExist(err) { case true: - s3c.Host = args.source.host - s3c.Scheme = args.source.scheme - objectReader, _, err = s3c.Get(args.source.bucket, args.source.key) + objectReader, _, err = s3cSource.Get(args.source.bucket, args.source.key) if err != nil { return err } - s3c.Host = args.destination.host - s3c.Scheme = args.destination.scheme - err = s3c.Put(args.destination.bucket, args.destination.key, objectSize, objectReader) + err = s3cDest.Put(args.destination.bucket, args.destination.key, objectSize, objectReader) if err != nil { return err } @@ -220,13 +224,13 @@ func thirdMode(s3c *s3.Client, args *cmdArgs) error { return errors.New("Ranges not supported") } - msg := fmt.Sprintf("http://%s/%s uploaded -- to bucket:(http://%s/%s)", args.source.bucket, args.source.key, - args.destination.bucket, args.destination.key) + msg := fmt.Sprintf("%s/%s/%s uploaded -- to bucket:(%s/%s/%s)", args.source.host, args.source.bucket, args.source.key, + args.destination.host, args.destination.bucket, args.destination.key) info(msg) return nil } -func fourthMode(s3c *s3.Client, args *cmdArgs) error { +func fourthMode(c *cli.Context, args *cmdArgs) error { if args.source.bucket == "" { _, err := os.Stat(args.source.key) if os.IsNotExist(err) { @@ -244,48 +248,40 @@ func fourthMode(s3c *s3.Client, args *cmdArgs) error { os.MkdirAll(args.destination.key, 0755) } } - return doRecursiveCp(s3c, args) + return doRecursiveCP(c, args) } // doCopyCmd copies objects into and from a bucket or between buckets func doCopyCmd(c *cli.Context) { var args *cmdArgs var err error - var s3c *s3.Client args, err = parseArgs(c) if err != nil { fatal(err.Error()) } - s3c, err = getNewClient(c) - if err != nil { - fatal(err.Error()) - } - if len(c.Args()) != 2 { fatal("Invalid number of args") } - s3c.Host = args.source.host - s3c.Scheme = args.source.scheme switch getMode(c.Bool("recursive"), args) { case first: - err := firstMode(s3c, args) + err := firstMode(c, args) if err != nil { fatal(err.Error()) } case second: - err := secondMode(s3c, args) + err := secondMode(c, args) if err != nil { fatal(err.Error()) } case third: - err := thirdMode(s3c, args) + err := thirdMode(c, args) if err != nil { fatal(err.Error()) } case fourth: - err := fourthMode(s3c, args) + err := fourthMode(c, args) if err != nil { fatal(err.Error()) } diff --git a/cmd-ls.go b/cmd-ls.go index bee5ae51..69e90d11 100644 --- a/cmd-ls.go +++ b/cmd-ls.go @@ -23,6 +23,7 @@ import ( "github.com/cheggaaa/pb" "github.com/codegangsta/cli" + "github.com/minio-io/mc/pkg/client" "github.com/minio-io/mc/pkg/s3" ) @@ -31,7 +32,7 @@ const ( ) // printBuckets lists buckets and its meta-dat -func printBuckets(v []*s3.Bucket) { +func printBuckets(v []*client.Bucket) { for _, b := range v { msg := fmt.Sprintf("%23s %13s %s", b.CreationDate.Local().Format(printDate), "", b.Name) info(msg) @@ -39,7 +40,7 @@ func printBuckets(v []*s3.Bucket) { } // printObjects prints a meta-data of a list of objects -func printObjects(v []*s3.Item) { +func printObjects(v []*client.Item) { if len(v) > 0 { // Items are already sorted for _, b := range v { @@ -56,20 +57,17 @@ func printObject(date time.Time, v int64, key string) { // doListCmd lists objects inside a bucket func doListCmd(c *cli.Context) { - var items []*s3.Item + var items []*client.Item args, err := parseArgs(c) if err != nil { fatal(err.Error()) } - s3c, err := getNewClient(c) + s3c, err := getNewClient(c, args.source.url) if err != nil { fatal(err.Error()) } - s3c.Host = args.source.host - s3c.Scheme = args.source.scheme - switch true { case args.source.bucket == "": // List all buckets buckets, err := s3c.ListBuckets() diff --git a/cmd-mb.go b/cmd-mb.go index 3fdb82dd..593b6b60 100644 --- a/cmd-mb.go +++ b/cmd-mb.go @@ -28,13 +28,10 @@ func doMakeBucketCmd(c *cli.Context) { fatal(err.Error()) } - s3c, err := getNewClient(c) - + s3c, err := getNewClient(c, args.source.url) if !s3.IsValidBucket(args.source.bucket) { fatal(errInvalidbucket.Error()) } - s3c.Host = args.source.host - s3c.Scheme = args.source.scheme err = s3c.PutBucket(args.source.bucket) if err != nil { diff --git a/cmd-options.go b/cmd-options.go index 65a10289..3585ca3a 100644 --- a/cmd-options.go +++ b/cmd-options.go @@ -17,6 +17,8 @@ package main import ( + "net/url" + "github.com/codegangsta/cli" ) @@ -91,6 +93,7 @@ type object struct { key string host string scheme string + url *url.URL } type cmdArgs struct { diff --git a/cmd-recursive-cp.go b/cmd-recursive-cp.go index 990fcbe7..518e8bec 100644 --- a/cmd-recursive-cp.go +++ b/cmd-recursive-cp.go @@ -24,11 +24,13 @@ import ( "path/filepath" "strings" + "github.com/codegangsta/cli" + "github.com/minio-io/mc/pkg/client" "github.com/minio-io/mc/pkg/s3" ) type walk struct { - s3 *s3.Client + s3 client.Client args *cmdArgs } @@ -49,8 +51,8 @@ func (w *walk) putWalk(p string, i os.FileInfo, err error) error { var size int64 size, _, err = w.s3.Stat(bucketname, key) if os.IsExist(err) || size != 0 { - msg := fmt.Sprintf("%s is already uploaded -- to bucket:%s://%s/%s/%s", - key, w.s3.Scheme, w.s3.Host, bucketname, key) + msg := fmt.Sprintf("%s is already uploaded -- to bucket:%s/%s/%s", + key, w.args.destination.host, bucketname, key) info(msg) return nil } @@ -58,8 +60,8 @@ func (w *walk) putWalk(p string, i os.FileInfo, err error) error { if err != nil { return err } - msg := fmt.Sprintf("%s uploaded -- to bucket:%s://%s/%s/%s", - key, w.s3.Scheme, w.s3.Host, bucketname, key) + msg := fmt.Sprintf("%s uploaded -- to bucket:%s/%s/%s", + key, w.args.destination.host, bucketname, key) info(msg) return nil } @@ -79,7 +81,7 @@ func isValidBucketName(p string) error { } // isBucketExists checks if a bucket exists -func isBucketExists(name string, v []*s3.Bucket) bool { +func isBucketExists(name string, v []*client.Bucket) bool { for _, b := range v { if name == b.Name { return true @@ -89,10 +91,10 @@ func isBucketExists(name string, v []*s3.Bucket) bool { } // doRecursiveCP recursively copies objects from source to destination -func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error { +func doRecursiveCP(c *cli.Context, args *cmdArgs) error { var err error var st os.FileInfo - var buckets []*s3.Bucket + var buckets []*client.Bucket switch true { case args.source.bucket == "": @@ -107,9 +109,10 @@ func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error { if !st.IsDir() { return errors.New("Should be a directory") } - - s3c.Host = args.destination.host - s3c.Scheme = args.destination.scheme + s3c, err := getNewClient(c, args.destination.url) + if err != nil { + return err + } p := &walk{s3c, args} buckets, err = s3c.ListBuckets() if !isBucketExists(args.destination.bucket, buckets) { @@ -124,19 +127,23 @@ func doRecursiveCp(s3c *s3.Client, args *cmdArgs) error { return err } case args.destination.bucket == "": + s3c, err := getNewClient(c, args.source.url) + if err != nil { + return err + } items, _, err := s3c.ListObjects(args.source.bucket, "", "", "", s3.MaxKeys) if err != nil { return err } root := args.destination.key - writeObjects := func(v []*s3.Item) error { + writeObjects := func(v []*client.Item) error { if len(v) > 0 { // Items are already sorted for _, b := range v { args.source.key = b.Key os.MkdirAll(path.Join(root, path.Dir(b.Key)), 0755) args.destination.key = path.Join(root, b.Key) - err := secondMode(s3c, args) + err := secondMode(c, args) if err != nil { return err } diff --git a/cmd/donut/.gitignore b/cmd/donut/.gitignore new file mode 100644 index 00000000..11abcb13 --- /dev/null +++ b/cmd/donut/.gitignore @@ -0,0 +1 @@ +donut \ No newline at end of file diff --git a/cmd/donut/cmd-cp.go b/cmd/donut/cmd-cp.go new file mode 100644 index 00000000..c6ccb7aa --- /dev/null +++ b/cmd/donut/cmd-cp.go @@ -0,0 +1,67 @@ +package main + +import ( + "io" + "os" + "strings" + + "net/url" + + "github.com/codegangsta/cli" + "github.com/minio-io/mc/pkg/donut" +) + +func doDonutCp(c *cli.Context) { + var e donut.Donut + e = donut.NewDriver("testdir") + switch len(c.Args()) { + case 2: + urlArg1, errArg1 := url.Parse(c.Args().Get(0)) + if errArg1 != nil { + panic(errArg1) + } + urlArg2, errArg2 := url.Parse(c.Args().Get(1)) + if errArg2 != nil { + panic(errArg2) + } + switch true { + case urlArg1.Scheme != "" && urlArg2.Scheme == "": + writer, err := os.Create(urlArg2.Path) + defer writer.Close() + if err != nil { + panic(err) + } + if urlArg1.Scheme == "donut" { + reader, _, err := e.Get(urlArg1.Host, strings.TrimPrefix(urlArg1.Path, "/")) + if err != nil { + panic(err) + } + _, err = io.Copy(writer, reader) + if err != nil { + panic(err) + } + } + case urlArg1.Scheme == "" && urlArg2.Scheme != "": + st, stErr := os.Stat(urlArg1.Path) + if os.IsNotExist(stErr) { + panic(stErr) + } + if st.IsDir() { + panic("is a directory") + } + reader, err := os.OpenFile(urlArg1.Path, 2, os.ModeAppend) + defer reader.Close() + if err != nil { + panic(err) + } + if urlArg2.Scheme == "donut" { + e.PutBucket(urlArg2.Host) + writer, err := e.Put(urlArg2.Host, strings.TrimPrefix(urlArg2.Path, "/")) + if err != nil { + panic(err) + } + io.Copy(writer, reader) + } + } + } +} diff --git a/cmd/donut/cmd-mb.go b/cmd/donut/cmd-mb.go new file mode 100644 index 00000000..10e12792 --- /dev/null +++ b/cmd/donut/cmd-mb.go @@ -0,0 +1,39 @@ +/* + * Minimalist Object Storage, (C) 2014,2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "net/url" + + "github.com/codegangsta/cli" + "github.com/minio-io/mc/pkg/donut" +) + +// doMakeBucketCmd creates a new bucket +func doMakeBucketCmd(c *cli.Context) { + urlArg1, err := url.Parse(c.Args().Get(0)) + if err != nil { + panic(err) + } + var e donut.Donut + e = donut.NewDriver("testdir") + + err = e.PutBucket(urlArg1.Path) + if err != nil { + panic(err) + } +} diff --git a/cmd/donut/main.go b/cmd/donut/main.go new file mode 100644 index 00000000..85b470fe --- /dev/null +++ b/cmd/donut/main.go @@ -0,0 +1,58 @@ +/* + * Minimalist Object Storage, (C) 2014,2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "log" + "os" + "os/user" + + "github.com/codegangsta/cli" +) + +func init() { + // Check for the environment early on and gracefuly report. + _, err := user.Current() + if err != nil { + log.Fatalf("mc: Unable to obtain user's home directory. \nError: %s\n", err) + } +} + +var cp = cli.Command{ + Name: "cp", + Action: doDonutCp, +} + +var mb = cli.Command{ + Name: "mb", + Action: doMakeBucketCmd, +} + +var options = []cli.Command{ + cp, + mb, +} + +func main() { + app := cli.NewApp() + app.Usage = "Minio Client for S3 Compatible Object Storage" + app.Version = "0.1.0" + app.Commands = options + app.Author = "Minio.io" + app.EnableBashCompletion = true + app.Run(os.Args) +} diff --git a/common.go b/common.go index 81239a3e..d82e1691 100644 --- a/common.go +++ b/common.go @@ -26,6 +26,7 @@ import ( "github.com/cheggaaa/pb" "github.com/codegangsta/cli" + "github.com/minio-io/mc/pkg/client" "github.com/minio-io/mc/pkg/s3" ) @@ -46,8 +47,7 @@ func startBar(size int64) *pb.ProgressBar { } // NewClient - get new client -func getNewClient(c *cli.Context) (client *s3.Client, err error) { - +func getNewClient(c *cli.Context, url *url.URL) (cl client.Client, err error) { config, err := getMcConfig() if err != nil { return nil, err @@ -58,7 +58,7 @@ func getNewClient(c *cli.Context) (client *s3.Client, err error) { return nil, err } - var auth s3.Auth + var auth client.Auth auth.AccessKeyID = hostCfg.Auth.AccessKeyID auth.SecretAccessKey = hostCfg.Auth.SecretAccessKey @@ -69,12 +69,12 @@ func getNewClient(c *cli.Context) (client *s3.Client, err error) { Writer: nil, } traceTransport := s3.GetNewTraceTransport(trace, http.DefaultTransport) - client = s3.GetNewClient(&auth, traceTransport) + cl = s3.GetNewClient(&auth, url, traceTransport) } else { - client = s3.GetNewClient(&auth, http.DefaultTransport) + cl = s3.GetNewClient(&auth, url, http.DefaultTransport) } - return client, nil + return cl, nil } // Parse subcommand options @@ -95,6 +95,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) { return nil, err } args.source.scheme = urlParsed.Scheme + args.source.url = urlParsed if urlParsed.Scheme != "" { if urlParsed.Host == "" { return nil, errHostname @@ -130,6 +131,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) { } args.source.scheme = urlParsed.Scheme args.source.host = urlParsed.Host + args.source.url = urlParsed urlSplits := strings.Split(urlParsed.Path, "/") if len(urlSplits) > 1 { args.source.bucket = urlSplits[1] @@ -171,6 +173,7 @@ func parseArgs(c *cli.Context) (args *cmdArgs, err error) { } args.destination.host = urlParsed.Host args.destination.scheme = urlParsed.Scheme + args.destination.url = urlParsed urlSplits := strings.Split(urlParsed.Path, "/") if len(urlSplits) > 1 { args.destination.bucket = urlSplits[1] diff --git a/pkg/client/client.go b/pkg/client/client.go new file mode 100644 index 00000000..6460f95c --- /dev/null +++ b/pkg/client/client.go @@ -0,0 +1,64 @@ +package client + +import ( + "io" + "net/http" + "net/url" + "time" +) + +// Client - Minio client interface +type Client interface { + Get(bucket, object string) (body io.ReadCloser, size int64, err error) + GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error) + Put(bucket, object string, size int64, body io.Reader) error + Stat(bucket, object string) (size int64, date time.Time, err error) + PutBucket(bucket string) error + ListBuckets() ([]*Bucket, error) + ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error) +} + +// Bucket - carries s3 bucket reply header +type Bucket struct { + Name string + CreationDate XMLTime // 2006-02-03T16:45:09.000Z +} + +// Item - object item list +type Item struct { + Key string + LastModified XMLTime + Size int64 +} + +// Prefix - common prefix +type Prefix struct { + Prefix string +} + +// Meta holds Amazon S3 client credentials and flags. +type Meta struct { + *Auth // AWS auth credentials + Transport http.RoundTripper // or nil for the default behavior + + // Supports URL in following formats + // - http://// + // - http://./ + URL *url.URL +} + +// Auth - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html +type Auth struct { + AccessKeyID string + SecretAccessKey string + + // Used for SSL transport layer + CertPEM string + KeyPEM string +} + +// TLSConfig - TLS cert and key configuration +type TLSConfig struct { + CertPEMBlock []byte + KeyPEMBlock []byte +} diff --git a/pkg/s3/xmltime.go b/pkg/client/xmltime.go similarity index 50% rename from pkg/s3/xmltime.go rename to pkg/client/xmltime.go index 5ff05d01..9484a568 100644 --- a/pkg/s3/xmltime.go +++ b/pkg/client/xmltime.go @@ -1,25 +1,3 @@ -// Original license // -// ---------------- // - -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// All other modifications and improvements // -// ---------------------------------------- // - /* * Minimalist Object Storage, (C) 2015 Minio, Inc. * @@ -36,7 +14,7 @@ limitations under the License. * limitations under the License. */ -package s3 +package client import ( "time" @@ -49,24 +27,28 @@ const ( iso8601Format = "2006-01-02T15:04:05.000Z" ) -type xmlTime struct { +// XMLTime - time wrapper +type XMLTime struct { time.Time } -func (c *xmlTime) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { +// UnmarshalXML - unmarshal incoming xml +func (c *XMLTime) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var v string d.DecodeElement(&v, &start) parse, _ := time.Parse(iso8601Format, v) - *c = xmlTime{parse} + *c = XMLTime{parse} return nil } -func (c *xmlTime) UnmarshalXMLAttr(attr xml.Attr) error { +// UnmarshalXMLAttr - unmarshal specific attr +func (c *XMLTime) UnmarshalXMLAttr(attr xml.Attr) error { t, _ := time.Parse(iso8601Format, attr.Value) - *c = xmlTime{t} + *c = XMLTime{t} return nil } -func (c *xmlTime) String() string { +// String - xml to string +func (c *XMLTime) String() string { return c.Time.Format(iso8601Format) } diff --git a/pkg/donut/bucketdriver.go b/pkg/donut/bucketdriver.go new file mode 100644 index 00000000..783fa987 --- /dev/null +++ b/pkg/donut/bucketdriver.go @@ -0,0 +1,14 @@ +package donut + +type bucketDriver struct { + nodes []string + objects map[string][]byte +} + +func (b bucketDriver) GetNodes() ([]string, error) { + var nodes []string + for _, node := range b.nodes { + nodes = append(nodes, node) + } + return nodes, nil +} diff --git a/pkg/donut/disk/disk.go b/pkg/donut/disk/disk.go deleted file mode 100644 index c80e50e8..00000000 --- a/pkg/donut/disk/disk.go +++ /dev/null @@ -1,127 +0,0 @@ -package disk - -import ( - "io" - "os" - "path" - "strconv" - - "github.com/minio-io/mc/pkg/utils/split" -) - -type Disk struct { - *disk -} - -type disk struct { - root string - bucket string - object string - chunk string - file *os.File -} - -func New(bucket, object string) *Disk { - d := &Disk{&disk{bucket: bucket, object: object}} - return d -} - -func openFile(path string, flag int) (fl *os.File, err error) { - fl, err = os.OpenFile(path, flag, 0644) - if err != nil { - return nil, err - } - return fl, nil -} - -func (d *Disk) Create() error { - p := path.Join(d.bucket, d.object, "$"+d.chunk) - fl, err := openFile(p, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL) - if err != nil { - return err - } - d.file = fl - return err -} - -func (d *Disk) Open() error { - p := path.Join(d.bucket, d.object, "$"+d.chunk) - fl, err := openFile(p, os.O_RDONLY) - if err != nil { - return err - } - d.file = fl - return err -} - -func (d *Disk) Write(b []byte) (n int, err error) { - if d == nil { - return 0, os.ErrInvalid - } - n, e := d.file.Write(b) - if n < 0 { - n = 0 - } - if n != len(b) { - err = io.ErrShortWrite - } - if e != nil { - err = e - } - return n, err -} - -func (d *Disk) Read(b []byte) (n int, err error) { - if d == nil { - return 0, os.ErrInvalid - } - n, e := d.file.Read(b) - if n < 0 { - n = 0 - } - if n == 0 && len(b) > 0 && e == nil { - return 0, io.EOF - } - if e != nil { - err = e - } - return n, err -} - -func (d *Disk) Close() error { - if d == nil { - return os.ErrInvalid - } - return d.file.Close() -} - -func (d *Disk) SplitAndWrite(data io.Reader, chunkSize int) (int, error) { - if d == nil { - return 0, os.ErrInvalid - } - splits := split.Stream(data, uint64(chunkSize)) - i := 0 - n := 0 - for chunk := range splits { - if chunk.Err != nil { - return 0, chunk.Err - } - d.chunk = strconv.Itoa(i) - if err := d.Create(); err != nil { - return 0, err - } - m, err := d.Write(chunk.Data) - defer d.Close() - if err != nil { - return 0, err - } - n = n + m - i = i + 1 - } - return n, nil -} - -func (d *Disk) JoinAndRead() (io.Reader, error) { - dirname := path.Join(d.root, d.bucket, d.object) - return split.JoinFiles(dirname, d.object) -} diff --git a/pkg/donut/donut.go b/pkg/donut/donut.go index d59e30d9..da093bbc 100644 --- a/pkg/donut/donut.go +++ b/pkg/donut/donut.go @@ -2,9 +2,46 @@ package donut import "io" +// INTERFACES + +// Donut interface type Donut interface { - Get(bucket string, object string) (body io.Reader, err error) - Put(bucket string, object string, size int, body io.Reader) error - ListObjects(bucket string) (objects map[string]string, err error) - ListBuckets() (buckets map[string]string, err error) + PutBucket(bucket string) error + Get(bucket, object string) (io.ReadCloser, int64, error) + Put(bucket, object string) (ObjectWriter, error) + Stat(bucket, object string) (map[string]string, error) + ListBuckets() ([]string, error) + ListObjects(bucket string) ([]string, error) +} + +// Bucket interface +type Bucket interface { + GetNodes() ([]string, error) +} + +// Node interface +type Node interface { + GetBuckets() ([]string, error) + GetDonutDriverMetadata(bucket, object string) (map[string]string, error) + GetMetadata(bucket, object string) (map[string]string, error) + GetReader(bucket, object string) (io.ReadCloser, error) + GetWriter(bucket, object string) (Writer, error) + ListObjects(bucket string) ([]string, error) +} + +// ObjectWriter interface +type ObjectWriter interface { + Close() error + CloseWithError(error) error + GetMetadata() (map[string]string, error) + SetMetadata(map[string]string) error + Write([]byte) (int, error) +} + +// Writer interface +type Writer interface { + ObjectWriter + + GetDonutDriverMetadata() (map[string]string, error) + SetDonutDriverMetadata(map[string]string) error } diff --git a/pkg/donut/donutdriver_test.go b/pkg/donut/donutdriver_test.go new file mode 100644 index 00000000..b8254959 --- /dev/null +++ b/pkg/donut/donutdriver_test.go @@ -0,0 +1,206 @@ +package donut + +import ( + "testing" + + "bytes" + . "gopkg.in/check.v1" + "io" + "io/ioutil" + "os" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) TestEmptyBucket(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + + // check buckets are empty + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, IsNil) +} + +func (s *MySuite) TestBucketWithoutNameFails(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + // fail to create new bucket without a name + err = donut.PutBucket("") + c.Assert(err, Not(IsNil)) + + err = donut.PutBucket(" ") + c.Assert(err, Not(IsNil)) +} + +func (s *MySuite) TestPutBucketAndList(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + // create bucket + err = donut.PutBucket("foo") + c.Assert(err, IsNil) + + // check bucket exists + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"foo"}) +} + +func (s *MySuite) TestPutBucketWithSameNameFails(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + err = donut.PutBucket("foo") + c.Assert(err, IsNil) + + err = donut.PutBucket("foo") + c.Assert(err, Not(IsNil)) +} + +func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + // add a second bucket + err = donut.PutBucket("foo") + c.Assert(err, IsNil) + + err = donut.PutBucket("bar") + c.Assert(err, IsNil) + + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"bar", "foo"}) + + err = donut.PutBucket("foobar") + c.Assert(err, IsNil) + + buckets, err = donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"bar", "foo", "foobar"}) +} + +func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + + writer, err := donut.Put("foo", "obj") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) +} + +func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + + writer, err := donut.Put("foo", "") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) + + writer, err = donut.Put("foo", " ") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) +} + +func (s *MySuite) TestNewObjectCanBeWritten(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + + err = donut.PutBucket("foo") + c.Assert(err, IsNil) + + writer, err := donut.Put("foo", "obj") + c.Assert(err, IsNil) + + data := "Hello World" + length, err := writer.Write([]byte(data)) + c.Assert(length, Equals, len(data)) + + expectedMetadata := map[string]string{ + "foo": "bar", + "created": "one", + "hello": "world", + } + + err = writer.SetMetadata(expectedMetadata) + c.Assert(err, IsNil) + + err = writer.Close() + c.Assert(err, IsNil) + + actualWriterMetadata, err := writer.GetMetadata() + c.Assert(err, IsNil) + c.Assert(actualWriterMetadata, DeepEquals, expectedMetadata) + + c.Assert(err, IsNil) + + reader, _, err := donut.Get("foo", "obj") + c.Assert(err, IsNil) + + var actualData bytes.Buffer + _, err = io.Copy(&actualData, reader) + c.Assert(err, IsNil) + c.Assert(actualData.Bytes(), DeepEquals, []byte(data)) + + actualMetadata, err := donut.Stat("foo", "obj") + c.Assert(err, IsNil) + c.Assert(actualMetadata, DeepEquals, expectedMetadata) +} + +func (s *MySuite) TestMultipleNewObjects(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDriver(root) + + c.Assert(donut.PutBucket("foo"), IsNil) + writer, err := donut.Put("foo", "obj1") + c.Assert(err, IsNil) + writer.Write([]byte("one")) + writer.Close() + + writer, err = donut.Put("foo", "obj2") + c.Assert(err, IsNil) + writer.Write([]byte("two")) + writer.Close() + + // c.Skip("not complete") + + reader, _, err := donut.Get("foo", "obj1") + c.Assert(err, IsNil) + var readerBuffer1 bytes.Buffer + _, err = io.Copy(&readerBuffer1, reader) + c.Assert(err, IsNil) + // c.Skip("Not Implemented") + c.Assert(readerBuffer1.Bytes(), DeepEquals, []byte("one")) + + reader, _, err = donut.Get("foo", "obj2") + c.Assert(err, IsNil) + var readerBuffer2 bytes.Buffer + _, err = io.Copy(&readerBuffer2, reader) + c.Assert(err, IsNil) + c.Assert(readerBuffer2.Bytes(), DeepEquals, []byte("two")) + + // test list objects + listObjects, err := donut.ListObjects("foo") + c.Assert(err, IsNil) + c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"}) +} diff --git a/pkg/donut/driver.go b/pkg/donut/driver.go new file mode 100644 index 00000000..0d4cf9ba --- /dev/null +++ b/pkg/donut/driver.go @@ -0,0 +1,133 @@ +package donut + +import ( + "errors" + "io" + "sort" + "strconv" + "strings" +) + +type donutDriver struct { + buckets map[string]Bucket + nodes map[string]Node +} + +// NewDriver - instantiate new donut driver +func NewDriver(root string) Donut { + nodes := make(map[string]Node) + nodes["localhost"] = localDirectoryNode{root: root} + driver := donutDriver{ + buckets: make(map[string]Bucket), + nodes: nodes, + } + return driver +} + +func (driver donutDriver) PutBucket(bucketName string) error { + if _, ok := driver.buckets[bucketName]; ok == false { + bucketName = strings.TrimSpace(bucketName) + if bucketName == "" { + return errors.New("Cannot create bucket with no name") + } + // assign nodes + // TODO assign other nodes + nodes := make([]string, 16) + for i := 0; i < 16; i++ { + nodes[i] = "localhost" + } + bucket := bucketDriver{ + nodes: nodes, + } + driver.buckets[bucketName] = bucket + return nil + } + return errors.New("Bucket exists") +} + +func (driver donutDriver) ListBuckets() ([]string, error) { + var buckets []string + for bucket := range driver.buckets { + buckets = append(buckets, bucket) + } + sort.Strings(buckets) + return buckets, nil +} + +func (driver donutDriver) Put(bucketName, objectName string) (ObjectWriter, error) { + if bucket, ok := driver.buckets[bucketName]; ok == true { + writers := make([]Writer, 16) + nodes, err := bucket.GetNodes() + if err != nil { + return nil, err + } + for i, nodeID := range nodes { + if node, ok := driver.nodes[nodeID]; ok == true { + writer, _ := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName) + writers[i] = writer + } + } + return newErasureWriter(writers), nil + } + return nil, errors.New("Bucket not found") +} + +func (driver donutDriver) Get(bucketName, objectName string) (io.ReadCloser, int64, error) { + r, w := io.Pipe() + if bucket, ok := driver.buckets[bucketName]; ok == true { + readers := make([]io.ReadCloser, 16) + nodes, err := bucket.GetNodes() + if err != nil { + return nil, 0, err + } + var metadata map[string]string + for i, nodeID := range nodes { + if node, ok := driver.nodes[nodeID]; ok == true { + bucketID := bucketName + ":0:" + strconv.Itoa(i) + reader, err := node.GetReader(bucketID, objectName) + if err != nil { + return nil, 0, err + } + readers[i] = reader + if metadata == nil { + metadata, err = node.GetDonutDriverMetadata(bucketID, objectName) + if err != nil { + return nil, 0, err + } + } + } + } + go erasureReader(readers, metadata, w) + totalLength, _ := strconv.ParseInt(metadata["totalLength"], 10, 64) + return r, totalLength, nil + } + return nil, 0, errors.New("Bucket not found") +} + +// Stat returns metadata for a given object in a bucket +func (driver donutDriver) Stat(bucketName, object string) (map[string]string, error) { + if bucket, ok := driver.buckets[bucketName]; ok { + nodes, err := bucket.GetNodes() + if err != nil { + return nil, err + } + if node, ok := driver.nodes[nodes[0]]; ok { + return node.GetMetadata(bucketName+":0:0", object) + } + return nil, errors.New("Cannot connect to node: " + nodes[0]) + } + return nil, errors.New("Bucket not found") +} + +func (driver donutDriver) ListObjects(bucketName string) ([]string, error) { + if bucket, ok := driver.buckets[bucketName]; ok { + nodes, err := bucket.GetNodes() + if err != nil { + return nil, err + } + if node, ok := driver.nodes[nodes[0]]; ok { + return node.ListObjects(bucketName + ":0:0") + } + } + return nil, errors.New("Bucket not found") +} diff --git a/pkg/donut/encoder/encoder.go b/pkg/donut/encoder/encoder.go deleted file mode 100644 index 42737879..00000000 --- a/pkg/donut/encoder/encoder.go +++ /dev/null @@ -1,48 +0,0 @@ -package encoder - -import ( - "errors" - "io" - - // "github.com/minio-io/mc/pkg/encoding/erasure" - "github.com/minio-io/mc/pkg/donut/disk" -) - -type encoder struct{} - -const ( - chunkSize = 10 * 1024 * 1024 -) - -func New() *encoder { - e := encoder{} - return &e -} - -func (e *encoder) Put(bucket, object string, size int, body io.Reader) error { - d := disk.New(bucket, object) - n, err := d.SplitAndWrite(body, chunkSize) - if err != nil { - return err - } - if n > size { - return io.ErrUnexpectedEOF - } - if n < size { - return io.ErrShortWrite - } - return nil -} - -func (e *encoder) Get(bucket, object string) (body io.Reader, err error) { - d := disk.New(bucket, object) - return d.JoinAndRead() -} - -func (e *encoder) ListObjects(bucket string) (map[string]string, error) { - return nil, errors.New("Not implemented") -} - -func (e *encoder) ListBuckets() (map[string]string, error) { - return nil, errors.New("Not implemented") -} diff --git a/pkg/donut/erasure.go b/pkg/donut/erasure.go new file mode 100644 index 00000000..c5d94fd5 --- /dev/null +++ b/pkg/donut/erasure.go @@ -0,0 +1,137 @@ +package donut + +import ( + "bytes" + "io" + "strconv" + "time" + + "github.com/minio-io/mc/pkg/encoding/erasure" + "github.com/minio-io/mc/pkg/utils/split" +) + +func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { + totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) + totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) + blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + for _, reader := range readers { + defer reader.Close() + } + for i := 0; i < totalChunks; i++ { + encodedBytes := make([][]byte, 16) + for i, reader := range readers { + var bytesBuffer bytes.Buffer + io.Copy(&bytesBuffer, reader) + encodedBytes[i] = bytesBuffer.Bytes() + } + curBlockSize := totalLeft + if blockSize < totalLeft { + curBlockSize = blockSize + } + decodedData, err := encoder.Decode(encodedBytes, curBlockSize) + if err != nil { + writer.CloseWithError(err) + return + } + io.Copy(writer, bytes.NewBuffer(decodedData)) + totalLeft = totalLeft - blockSize + } + writer.Close() +} + +// erasure writer + +type erasureWriter struct { + writers []Writer + metadata map[string]string + donutMetadata map[string]string // not exposed + writer *io.PipeWriter + isClosed <-chan bool +} + +func newErasureWriter(writers []Writer) ObjectWriter { + r, w := io.Pipe() + isClosed := make(chan bool) + writer := erasureWriter{ + writers: writers, + metadata: make(map[string]string), + writer: w, + isClosed: isClosed, + } + go erasureGoroutine(r, writer, isClosed) + return writer +} + +func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) { + chunks := split.Stream(r, 10*1024*1024) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + chunkCount := 0 + totalLength := 0 + for chunk := range chunks { + if chunk.Err == nil { + totalLength = totalLength + len(chunk.Data) + encodedBlocks, _ := encoder.Encode(chunk.Data) + for blockIndex, block := range encodedBlocks { + io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block)) + } + } + chunkCount = chunkCount + 1 + } + metadata := make(map[string]string) + metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024) + metadata["chunkCount"] = strconv.Itoa(chunkCount) + metadata["created"] = time.Now().Format(time.RFC3339Nano) + metadata["erasureK"] = "8" + metadata["erasureM"] = "8" + metadata["erasureTechnique"] = "Cauchy" + metadata["totalLength"] = strconv.Itoa(totalLength) + for _, nodeWriter := range eWriter.writers { + if nodeWriter != nil { + nodeWriter.SetMetadata(eWriter.metadata) + nodeWriter.SetDonutDriverMetadata(metadata) + nodeWriter.Close() + } + } + isClosed <- true +} + +func (d erasureWriter) Write(data []byte) (int, error) { + io.Copy(d.writer, bytes.NewBuffer(data)) + return len(data), nil +} + +func (d erasureWriter) Close() error { + d.writer.Close() + <-d.isClosed + return nil +} + +func (d erasureWriter) CloseWithError(err error) error { + for _, writer := range d.writers { + if writer != nil { + writer.CloseWithError(err) + } + } + return nil +} + +func (d erasureWriter) SetMetadata(metadata map[string]string) error { + for k := range d.metadata { + delete(d.metadata, k) + } + for k, v := range metadata { + d.metadata[k] = v + } + return nil +} + +func (d erasureWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range d.metadata { + metadata[k] = v + } + return metadata, nil +} diff --git a/pkg/donut/local.go b/pkg/donut/local.go new file mode 100644 index 00000000..0091da2e --- /dev/null +++ b/pkg/donut/local.go @@ -0,0 +1,76 @@ +package donut + +import ( + "errors" + "io" + "os" + "path" + "sort" + "strings" + + "encoding/json" + "path/filepath" +) + +type localDirectoryNode struct { + root string +} + +func (d localDirectoryNode) GetBuckets() ([]string, error) { + return nil, errors.New("Not Implemented") +} + +func (d localDirectoryNode) GetWriter(bucket, object string) (Writer, error) { + objectPath := path.Join(d.root, bucket, object) + err := os.MkdirAll(objectPath, 0700) + if err != nil { + return nil, err + } + return newDonutFileWriter(objectPath) +} + +func (d localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { + return os.Open(path.Join(d.root, bucket, object, "data")) +} + +func (d localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { + return d.getMetadata(bucket, object, "metadata.json") +} +func (d localDirectoryNode) GetDonutDriverMetadata(bucket, object string) (map[string]string, error) { + return d.getMetadata(bucket, object, "donutMetadata.json") +} + +func (d localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { + file, err := os.Open(path.Join(d.root, bucket, object, fileName)) + defer file.Close() + if err != nil { + return nil, err + } + metadata := make(map[string]string) + decoder := json.NewDecoder(file) + if err := decoder.Decode(&metadata); err != nil { + return nil, err + } + return metadata, nil + +} + +func (d localDirectoryNode) ListObjects(bucketName string) ([]string, error) { + prefix := path.Join(d.root, bucketName) + var objects []string + if err := filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && strings.HasSuffix(path, "data") { + object := strings.TrimPrefix(path, prefix+"/") + object = strings.TrimSuffix(object, "/data") + objects = append(objects, object) + } + return nil + }); err != nil { + return nil, err + } + sort.Strings(objects) + return objects, nil +} diff --git a/pkg/donut/node/node.go b/pkg/donut/node/node.go deleted file mode 100644 index 0c597e1c..00000000 --- a/pkg/donut/node/node.go +++ /dev/null @@ -1,35 +0,0 @@ -package node - -import ( - "errors" - - "github.com/minio-io/mc/pkg/os/scsi" -) - -type Node struct { - *node -} - -type node struct { - devices *scsi.Devices - bucket string -} - -func New(bucket string) *Node { - n := &Node{&node{bucket: bucket}} - return n -} - -func (n *Node) GetDisks() error { - if n == nil { - return errors.New("invalid argument") - } - n.devices = &scsi.Devices{} - if err := n.devices.Get(); err != nil { - return err - } - return nil -} - -func (n *Node) CreateBucket() error { -} diff --git a/pkg/donut/objectwriter.go b/pkg/donut/objectwriter.go new file mode 100644 index 00000000..cfba213f --- /dev/null +++ b/pkg/donut/objectwriter.go @@ -0,0 +1,39 @@ +package donut + +import ( + "errors" +) + +type objectWriter struct { + metadata map[string]string +} + +func (obj objectWriter) Write(data []byte) (length int, err error) { + return 11, nil +} + +func (obj objectWriter) Close() error { + return nil +} + +func (obj objectWriter) CloseWithError(err error) error { + return errors.New("Not Implemented") +} + +func (obj objectWriter) SetMetadata(metadata map[string]string) error { + for k := range obj.metadata { + delete(obj.metadata, k) + } + for k, v := range metadata { + obj.metadata[k] = v + } + return nil +} + +func (obj objectWriter) GetMetadata() (map[string]string, error) { + ret := make(map[string]string) + for k, v := range obj.metadata { + ret[k] = v + } + return ret, nil +} diff --git a/pkg/donut/writer.go b/pkg/donut/writer.go new file mode 100644 index 00000000..b94e4c34 --- /dev/null +++ b/pkg/donut/writer.go @@ -0,0 +1,88 @@ +package donut + +import ( + "encoding/json" + "io/ioutil" + "os" + "path" +) + +func newDonutFileWriter(objectDir string) (Writer, error) { + dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0600) + if err != nil { + return nil, err + } + return donutFileWriter{ + root: objectDir, + file: dataFile, + metadata: make(map[string]string), + donutMetadata: make(map[string]string), + }, nil +} + +type donutFileWriter struct { + root string + file *os.File + metadata map[string]string + donutMetadata map[string]string + err error +} + +func (d donutFileWriter) Write(data []byte) (int, error) { + return d.file.Write(data) +} + +func (d donutFileWriter) Close() error { + if d.err != nil { + return d.err + } + metadata, _ := json.Marshal(d.metadata) + ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600) + donutMetadata, _ := json.Marshal(d.donutMetadata) + ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600) + + return d.file.Close() +} + +func (d donutFileWriter) CloseWithError(err error) error { + if d.err != nil { + d.err = err + } + return d.Close() +} + +func (d donutFileWriter) SetMetadata(metadata map[string]string) error { + for k := range d.metadata { + delete(d.metadata, k) + } + for k, v := range metadata { + d.metadata[k] = v + } + return nil +} + +func (d donutFileWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range d.metadata { + metadata[k] = v + } + return metadata, nil +} + +func (d donutFileWriter) SetDonutDriverMetadata(metadata map[string]string) error { + for k := range d.donutMetadata { + delete(d.donutMetadata, k) + } + for k, v := range metadata { + d.donutMetadata[k] = v + } + return nil +} + +func (d donutFileWriter) GetDonutDriverMetadata() (map[string]string, error) { + donutMetadata := make(map[string]string) + for k, v := range d.donutMetadata { + donutMetadata[k] = v + } + return donutMetadata, nil +} diff --git a/pkg/os/scsi/scsi_linux.go b/pkg/os/scsi/scsi_linux.go index e5191e05..24d6937f 100644 --- a/pkg/os/scsi/scsi_linux.go +++ b/pkg/os/scsi/scsi_linux.go @@ -41,7 +41,7 @@ type Disk struct { Diskattrmap map[string][]byte } -// Partitions - struct which carries per partition name, and its attributes +// Partition - struct which carries per partition name, and its attributes type Partition struct { Name string Partitionattrmap map[string][]byte diff --git a/pkg/s3/auth.go b/pkg/s3/auth.go index 82d270b7..b50add30 100644 --- a/pkg/s3/auth.go +++ b/pkg/s3/auth.go @@ -53,25 +53,11 @@ import ( "sort" "strings" "time" + + "github.com/minio-io/mc/pkg/client" ) -// Auth - see http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html -type Auth struct { - AccessKeyID string - SecretAccessKey string - - // Used for SSL transport layer - CertPEM string - KeyPEM string -} - -// TLSConfig - TLS cert and key configuration -type TLSConfig struct { - CertPEMBlock []byte - KeyPEMBlock []byte -} - -func (a *Auth) loadKeys(cert string, key string) (*TLSConfig, error) { +func (a *s3Client) loadKeys(cert string, key string) (*client.TLSConfig, error) { certBlock, err := ioutil.ReadFile(cert) if err != nil { return nil, err @@ -80,13 +66,13 @@ func (a *Auth) loadKeys(cert string, key string) (*TLSConfig, error) { if err != nil { return nil, err } - t := &TLSConfig{} + t := &client.TLSConfig{} t.CertPEMBlock = certBlock t.KeyPEMBlock = keyBlock return t, nil } -func (a *Auth) getTLSTransport() (*http.Transport, error) { +func (a *s3Client) getTLSTransport() (*http.Transport, error) { if a.CertPEM == "" || a.KeyPEM == "" { return &http.Transport{ Dial: (&net.Dialer{ @@ -118,7 +104,7 @@ func (a *Auth) getTLSTransport() (*http.Transport, error) { return transport, nil } -func (a *Auth) signRequest(req *http.Request, host string) { +func (a *s3Client) signRequest(req *http.Request, host string) { if date := req.Header.Get("Date"); date == "" { req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) } @@ -152,7 +138,7 @@ func firstNonEmptyString(strs ...string) string { // Date + "\n" + // CanonicalizedAmzHeaders + // CanonicalizedResource; -func (a *Auth) stringToSign(req *http.Request, host string) string { +func (a *s3Client) stringToSign(req *http.Request, host string) string { buf := new(bytes.Buffer) buf.WriteString(req.Method) buf.WriteByte('\n') @@ -181,7 +167,7 @@ func hasPrefixCaseInsensitive(s, pfx string) bool { return shead == pfx || shead == strings.ToLower(pfx) } -func (a *Auth) writeCanonicalizedAmzHeaders(buf *bytes.Buffer, req *http.Request) { +func (a *s3Client) writeCanonicalizedAmzHeaders(buf *bytes.Buffer, req *http.Request) { var amzHeaders []string vals := make(map[string][]string) for k, vv := range req.Header { @@ -243,7 +229,7 @@ var subResList = []string{ // CanonicalizedResource = [ "/" + Bucket ] + // + // [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"]; -func (a *Auth) writeCanonicalizedResource(buf *bytes.Buffer, req *http.Request, host string) { +func (a *s3Client) writeCanonicalizedResource(buf *bytes.Buffer, req *http.Request, host string) { bucket := a.bucketFromHost(req, host) if bucket != "" { buf.WriteByte('/') @@ -277,7 +263,7 @@ func hasDotSuffix(s string, suffix string) bool { return len(s) >= len(suffix)+1 && strings.HasSuffix(s, suffix) && s[len(s)-len(suffix)-1] == '.' } -func (a *Auth) bucketFromHost(req *http.Request, host string) string { +func (a *s3Client) bucketFromHost(req *http.Request, host string) string { reqHost := req.Host if reqHost == "" { host = req.URL.Host diff --git a/pkg/s3/auth_test.go b/pkg/s3/auth_test.go index 7ead9d66..743c8600 100644 --- a/pkg/s3/auth_test.go +++ b/pkg/s3/auth_test.go @@ -42,8 +42,11 @@ import ( "bufio" "fmt" "net/http" + "net/url" "strings" "testing" + + "github.com/minio-io/mc/pkg/client" ) type reqAndExpected struct { @@ -59,7 +62,7 @@ func req(s string) *http.Request { } func TestStringToSign(t *testing.T) { - var a Auth + var a s3Client tests := []reqAndExpected{ {`GET /photos/puppy.jpg HTTP/1.1 Host: johnsmith.s3.amazonaws.com @@ -118,7 +121,7 @@ Content-Length: 5913339 } func TestBucketFromHostname(t *testing.T) { - var a Auth + var a s3Client tests := []reqAndExpected{ {"GET / HTTP/1.0\n\n", "", ""}, {"GET / HTTP/1.0\nHost: s3.amazonaws.com\n\n", "", "s3.amazonaws.com"}, @@ -136,13 +139,19 @@ func TestBucketFromHostname(t *testing.T) { func TestsignRequest(t *testing.T) { r := req("GET /foo HTTP/1.1\n\n") - auth := &Auth{AccessKeyID: "key", SecretAccessKey: "secretkey"} - auth.signRequest(r, "localhost:9000") + auth := &client.Auth{AccessKeyID: "key", SecretAccessKey: "secretkey"} + url, _ := url.Parse("localhost:9000") + cl := &s3Client{&client.Meta{ + Auth: auth, + Transport: http.DefaultTransport, + URL: url, + }} + cl.signRequest(r, "localhost:9000") if r.Header.Get("Date") == "" { t.Error("expected a Date set") } r.Header.Set("Date", "Sat, 02 Apr 2011 04:23:52 GMT") - auth.signRequest(r, "localhost:9000") + cl.signRequest(r, "localhost:9000") if e, g := r.Header.Get("Authorization"), "AWS key:kHpCR/N7Rw3PwRlDd8+5X40CFVc="; e != g { t.Errorf("got header %q; expected %q", g, e) } diff --git a/pkg/s3/bucket.go b/pkg/s3/bucket.go index 8b64f5e2..7a21dd82 100644 --- a/pkg/s3/bucket.go +++ b/pkg/s3/bucket.go @@ -50,10 +50,12 @@ import ( "encoding/xml" "net/http" "net/url" + + "github.com/minio-io/mc/pkg/client" ) // bySize implements sort.Interface for []Item based on the Size field. -type bySize []*Item +type bySize []*client.Item func (a bySize) Len() int { return len(a) } func (a bySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -62,10 +64,10 @@ func (a bySize) Less(i, j int) bool { return a[i].Size < a[j].Size } /// Bucket API operations // ListBuckets - Get list of buckets -func (c *Client) ListBuckets() ([]*Bucket, error) { - url := fmt.Sprintf("%s://%s/", c.Scheme, c.Host) +func (c *s3Client) ListBuckets() ([]*client.Bucket, error) { + url := fmt.Sprintf("%s://%s/", c.URL.Scheme, c.URL.Host) req := newReq(url) - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { @@ -81,14 +83,14 @@ func (c *Client) ListBuckets() ([]*Bucket, error) { } // PutBucket - create new bucket -func (c *Client) PutBucket(bucket string) error { +func (c *s3Client) PutBucket(bucket string) error { var url string if IsValidBucket(bucket) && !strings.Contains(bucket, ".") { - url = fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) + url = fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket) } req := newReq(url) req.Method = "PUT" - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { return err @@ -106,7 +108,7 @@ func (c *Client) PutBucket(bucket string) error { // provided bucket. Keys before startAt will be skipped. (This is the S3 // 'marker' value). If the length of the returned items is equal to // maxKeys, there is no indication whether or not the returned list is truncated. -func (c *Client) ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*Item, prefixes []*Prefix, err error) { +func (c *s3Client) ListObjects(bucket string, startAt, prefix, delimiter string, maxKeys int) (items []*client.Item, prefixes []*client.Prefix, err error) { var urlReq string var buffer bytes.Buffer @@ -141,7 +143,7 @@ func (c *Client) ListObjects(bucket string, startAt, prefix, delimiter string, m for try := 1; try <= maxTries; try++ { time.Sleep(time.Duration(try-1) * 100 * time.Millisecond) req := newReq(urlReq) - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { if try < maxTries { diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 4e631ee6..6bb24e0f 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -44,10 +44,13 @@ import ( "io" "net" "net/http" + "net/url" "regexp" "strings" "encoding/xml" + + "github.com/minio-io/mc/pkg/client" ) // Total max object list @@ -55,80 +58,55 @@ const ( MaxKeys = 1000 ) -// Bucket - carries s3 bucket reply header -type Bucket struct { - Name string - CreationDate xmlTime // 2006-02-03T16:45:09.000Z -} - -// Item - object item list -type Item struct { - Key string - LastModified xmlTime - Size int64 -} - -// Prefix - common prefix -type Prefix struct { - Prefix string -} - type listBucketResults struct { - Contents []*Item + Contents []*client.Item IsTruncated bool MaxKeys int Name string // bucket name Marker string Delimiter string Prefix string - CommonPrefixes []*Prefix + CommonPrefixes []*client.Prefix } -// Client holds Amazon S3 client credentials and flags. -type Client struct { - *Auth // AWS auth credentials - Transport http.RoundTripper // or nil for the default behavior - - // Supports URL in following formats - // - http://// - // - http://./ - Host string - Scheme string +type s3Client struct { + *client.Meta } -// GetNewClient returns an initialized S3.Client structure. -func GetNewClient(auth *Auth, transport http.RoundTripper) *Client { - return &Client{ +// GetNewClient returns an initialized s3Client structure. +func GetNewClient(auth *client.Auth, u *url.URL, transport http.RoundTripper) client.Client { + return &s3Client{&client.Meta{ Auth: auth, Transport: GetNewTraceTransport(s3Verify{}, transport), - } + URL: u, + }} } // bucketURL returns the URL prefix of the bucket, with trailing slash -func (c *Client) bucketURL(bucket string) string { +func (c *s3Client) bucketURL(bucket string) string { var url string if IsValidBucket(bucket) && !strings.Contains(bucket, ".") { // if localhost use PathStyle - if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") { - return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) + if strings.Contains(c.URL.Host, "localhost") || strings.Contains(c.URL.Host, "127.0.0.1") { + return fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket) } // Verify if its ip address, use PathStyle - host, _, _ := net.SplitHostPort(c.Host) + host, _, _ := net.SplitHostPort(c.URL.Host) if net.ParseIP(host) != nil { - return fmt.Sprintf("%s://%s/%s", c.Scheme, c.Host, bucket) + return fmt.Sprintf("%s://%s/%s", c.URL.Scheme, c.URL.Host, bucket) } // For DNS hostname or amazonaws.com use subdomain style - url = fmt.Sprintf("%s://%s.%s/", c.Scheme, bucket, c.Host) + url = fmt.Sprintf("%s://%s.%s/", c.URL.Scheme, bucket, c.URL.Host) } return url } -func (c *Client) keyURL(bucket, key string) string { +func (c *s3Client) keyURL(bucket, key string) string { url := c.bucketURL(bucket) - if strings.Contains(c.Host, "localhost") || strings.Contains(c.Host, "127.0.0.1") { + if strings.Contains(c.URL.Host, "localhost") || strings.Contains(c.URL.Host, "127.0.0.1") { return url + "/" + key } - host, _, _ := net.SplitHostPort(c.Host) + host, _, _ := net.SplitHostPort(c.URL.Host) if net.ParseIP(host) != nil { return url + "/" + key } @@ -140,14 +118,14 @@ func newReq(url string) *http.Request { if err != nil { panic(fmt.Sprintf("s3 client; invalid URL: %v", err)) } - req.Header.Set("User-Agent", "Minio Client") + req.Header.Set("User-Agent", "Minio s3Client") return req } -func parseListAllMyBuckets(r io.Reader) ([]*Bucket, error) { +func parseListAllMyBuckets(r io.Reader) ([]*client.Bucket, error) { type allMyBuckets struct { Buckets struct { - Bucket []*Bucket + Bucket []*client.Bucket } } var res allMyBuckets diff --git a/pkg/s3/client_test.go b/pkg/s3/client_test.go index ca70558b..b2420b2b 100644 --- a/pkg/s3/client_test.go +++ b/pkg/s3/client_test.go @@ -43,9 +43,16 @@ import ( "strings" "testing" "time" + + "github.com/minio-io/mc/pkg/client" ) -var tc *Client +// Date format +const ( + iso8601Format = "2006-01-02T15:04:05.000Z" +) + +var tc *s3Client func TestParseBuckets(t *testing.T) { res := "\nownerIDFieldbobDisplayNamebucketOne2006-06-21T07:04:31.000ZbucketTwo2006-06-21T07:04:32.000Z" @@ -59,11 +66,14 @@ func TestParseBuckets(t *testing.T) { t1, err := time.Parse(iso8601Format, "2006-06-21T07:04:31.000Z") t2, err := time.Parse(iso8601Format, "2006-06-21T07:04:32.000Z") - want := []*Bucket{ - {Name: "bucketOne", CreationDate: xmlTime{t1}}, - {Name: "bucketTwo", CreationDate: xmlTime{t2}}, + xmlT1 := client.XMLTime{t1} + xmlT2 := client.XMLTime{t2} + + want := []*client.Bucket{ + {Name: "bucketOne", CreationDate: xmlT1}, + {Name: "bucketTwo", CreationDate: xmlT2}, } - dump := func(v []*Bucket) { + dump := func(v []*client.Bucket) { for i, b := range v { t.Logf("Bucket #%d: %#v", i, b) } diff --git a/pkg/s3/object.go b/pkg/s3/object.go index b6a3d731..f1021283 100644 --- a/pkg/s3/object.go +++ b/pkg/s3/object.go @@ -56,7 +56,7 @@ import ( /// Object API operations // Put - upload new object to bucket -func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error { +func (c *s3Client) Put(bucket, key string, size int64, contents io.Reader) error { req := newReq(c.keyURL(bucket, key)) req.Method = "PUT" req.ContentLength = size @@ -75,7 +75,7 @@ func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error { req.Body = ioutil.NopCloser(sink) b64 := base64.StdEncoding.EncodeToString(h.Sum(nil)) req.Header.Set("Content-MD5", b64) - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { @@ -90,10 +90,10 @@ func (c *Client) Put(bucket, key string, size int64, contents io.Reader) error { } // Stat - returns 0, "", os.ErrNotExist if not on S3 -func (c *Client) Stat(bucket, key string) (size int64, date time.Time, reterr error) { +func (c *s3Client) Stat(bucket, key string) (size int64, date time.Time, reterr error) { req := newReq(c.keyURL(bucket, key)) req.Method = "HEAD" - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { return 0, date, err @@ -123,9 +123,9 @@ func (c *Client) Stat(bucket, key string) (size int64, date time.Time, reterr er } // Get - download a requested object from a given bucket -func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) { +func (c *s3Client) Get(bucket, key string) (body io.ReadCloser, size int64, err error) { req := newReq(c.keyURL(bucket, key)) - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { return nil, 0, err @@ -140,7 +140,7 @@ func (c *Client) Get(bucket, key string) (body io.ReadCloser, size int64, err er // GetPartial fetches part of the s3 key object in bucket. // If length is negative, the rest of the object is returned. -func (c *Client) GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error) { +func (c *s3Client) GetPartial(bucket, key string, offset, length int64) (body io.ReadCloser, size int64, err error) { if offset < 0 { return nil, 0, errors.New("invalid negative length") } @@ -151,7 +151,7 @@ func (c *Client) GetPartial(bucket, key string, offset, length int64) (body io.R } else { req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) } - c.Auth.signRequest(req, c.Host) + c.signRequest(req, c.URL.Host) res, err := c.Transport.RoundTrip(req) if err != nil { diff --git a/pkg/utils/split/split.go b/pkg/utils/split/split.go index ba83d324..629d8eec 100644 --- a/pkg/utils/split/split.go +++ b/pkg/utils/split/split.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "os" + "path" "strconv" "strings" ) @@ -81,12 +82,12 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) { bytesWriter.Flush() // if we have data available, send it over the channel if bytesBuffer.Len() != 0 { - ch <- Message{bytesBuffer.Bytes(), nil} + ch <- Message{Data: bytesBuffer.Bytes(), Err: nil} } } // if we have an error other than an EOF, send it over the channel if readError != io.EOF { - ch <- Message{nil, readError} + ch <- Message{Data: nil, Err: readError} } // close the channel, signaling the channel reader that the stream is complete close(ch) @@ -103,14 +104,13 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) { // fmt.Println(buf) // } // -func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) { +func JoinFiles(dirname string, inputPrefix string) (reader io.Reader, err error) { reader, writer := io.Pipe() fileInfos, readError := ioutil.ReadDir(dirname) if readError != nil { writer.CloseWithError(readError) return nil, readError } - var newfileInfos []os.FileInfo for _, fi := range fileInfos { if strings.Contains(fi.Name(), inputPrefix) == true { @@ -124,25 +124,20 @@ func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) { return nil, nofilesError } - go joinFilesGoRoutine(newfileInfos, writer) - return reader, nil -} - -func joinFilesGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) { - for _, fileInfo := range fileInfos { - file, err := os.Open(fileInfo.Name()) + for _, fileInfo := range newfileInfos { + file, err := os.Open(path.Join(dirname, fileInfo.Name())) defer file.Close() for err != nil { writer.CloseWithError(err) - return + return nil, err } _, err = io.Copy(writer, file) if err != nil { writer.CloseWithError(err) - return + return nil, err } } - writer.Close() + return reader, nil } // FileWithPrefix - Takes a file and splits it into chunks with size chunkSize. The output