mirror of
https://github.com/minio/mc.git
synced 2025-11-10 13:42:32 +03:00
484 lines
15 KiB
Go
484 lines
15 KiB
Go
/*
|
|
* MinIO Client, (C) 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/bzip2"
|
|
"compress/gzip"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/minio/cli"
|
|
"github.com/minio/mc/pkg/probe"
|
|
"github.com/minio/minio/pkg/mimedb"
|
|
)
|
|
|
|
var (
|
|
sqlFlags = []cli.Flag{
|
|
cli.StringFlag{
|
|
Name: "query, e",
|
|
Usage: "sql query expression",
|
|
Value: "select * from s3object",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "recursive, r",
|
|
Usage: "sql query recursively",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-input",
|
|
Usage: "csv input serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "json-input",
|
|
Usage: "json input serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "compression",
|
|
Usage: "input compression type",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-output",
|
|
Usage: "csv output serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-output-header",
|
|
Usage: "optional csv output header ",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "json-output",
|
|
Usage: "json output serialization option",
|
|
},
|
|
}
|
|
)
|
|
|
|
// Display contents of a file.
|
|
var sqlCmd = cli.Command{
|
|
Name: "sql",
|
|
Usage: "run sql queries on objects",
|
|
Action: mainSQL,
|
|
Before: setGlobalsFromContext,
|
|
Flags: append(sqlFlags, globalFlags...),
|
|
CustomHelpTemplate: `NAME:
|
|
{{.HelpName}} - {{.Usage}}
|
|
|
|
USAGE:
|
|
{{.HelpName}} [FLAGS] TARGET [TARGET...]
|
|
{{if .VisibleFlags}}
|
|
FLAGS:
|
|
{{range .VisibleFlags}}{{.}}
|
|
{{end}}{{end}}
|
|
ENVIRONMENT VARIABLES:
|
|
MC_ENCRYPT_KEY: list of comma delimited prefix=secret values
|
|
|
|
SERIALIZATION OPTIONS:
|
|
For query serialization options, refer to https://docs.min.io/docs/minio-client-complete-guide#sql
|
|
|
|
EXAMPLES:
|
|
1. Run a query on a set of objects recursively on AWS S3.
|
|
{{.Prompt}} {{.HelpName}} --recursive --query "select * from S3Object" s3/personalbucket/my-large-csvs/
|
|
|
|
2. Run a query on an object on MinIO.
|
|
{{.Prompt}} {{.HelpName}} --query "select count(s.power) from S3Object" myminio/iot-devices/power-ratio.csv
|
|
|
|
3. Run a query on an encrypted object with customer provided keys.
|
|
{{.Prompt}} {{.HelpName}} --encrypt-key "myminio/iot-devices=32byteslongsecretkeymustbegiven1" \
|
|
--query "select count(s.power) from S3Object s" myminio/iot-devices/power-ratio-encrypted.csv
|
|
|
|
4. Run a query on an object on MinIO in gzip format using ; as field delimiter,
|
|
newline as record delimiter and file header to be used
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--query "select count(s.power) from S3Object" myminio/iot-devices/power-ratio.csv.gz
|
|
|
|
5. Run a query on an object on MinIO in gzip format using ; as field delimiter,
|
|
newline as record delimiter and file header to be used
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--json-output "rd=\n\n" --query "select * from S3Object" myminio/iot-devices/data.csv
|
|
|
|
6. Run same query as in 5., but specify csv output headers. If --csv-output-headers is
|
|
specified as "", first row of csv is interpreted as header
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--csv-output "rd=\n" --csv-output-header "device_id,uptime,lat,lon" \
|
|
--query "select * from S3Object" myminio/iot-devices/data.csv
|
|
`,
|
|
}
|
|
|
|
// valid CSV and JSON keys for input/output serialization
|
|
var (
|
|
validCSVCommonKeys = []string{"FieldDelimiter", "QuoteChar", "QuoteEscChar"}
|
|
validCSVInputKeys = []string{"Comments", "FileHeader", "QuotedRecordDelimiter", "RecordDelimiter"}
|
|
validCSVOutputKeys = []string{"QuoteFields"}
|
|
|
|
validJSONInputKeys = []string{"Type"}
|
|
validJSONCSVCommonOutputKeys = []string{"RecordDelimiter"}
|
|
|
|
// mapping of abbreviation to long form name of CSV and JSON input/output serialization keys
|
|
validCSVInputAbbrKeys = map[string]string{"cc": "Comments", "fh": "FileHeader", "qrd": "QuotedRecordDelimiter", "rd": "RecordDelimiter", "fd": "FieldDelimiter", "qc": "QuoteChar", "qec": "QuoteEscChar"}
|
|
validCSVOutputAbbrKeys = map[string]string{"qf": "QuoteFields", "rd": "RecordDelimiter", "fd": "FieldDelimiter", "qc": "QuoteChar", "qec": "QuoteEscChar"}
|
|
validJSONOutputAbbrKeys = map[string]string{"rd": "RecordDelimiter"}
|
|
)
|
|
|
|
// parseKVArgs parses string of the form k=v delimited by ","
|
|
// into a map of k-v pairs
|
|
func parseKVArgs(is string) (map[string]string, *probe.Error) {
|
|
kvmap := make(map[string]string)
|
|
var key, value string
|
|
var s, e int // tracking start and end of value
|
|
var index int // current index in string
|
|
if is != "" {
|
|
for index < len(is) {
|
|
i := strings.Index(is[index:], "=")
|
|
if i == -1 {
|
|
return nil, probe.NewError(errors.New("Arguments should be of the form key=value,... "))
|
|
}
|
|
key = is[index : index+i]
|
|
s = i + index + 1
|
|
e = strings.Index(is[s:], ",")
|
|
delimFound := false
|
|
for !delimFound {
|
|
if e == -1 || e+s >= len(is) {
|
|
delimFound = true
|
|
break
|
|
}
|
|
if string(is[s+e]) != "," {
|
|
delimFound = true
|
|
if string(is[s+e-1]) == "," {
|
|
e--
|
|
}
|
|
} else {
|
|
e++
|
|
}
|
|
}
|
|
var vEnd = len(is)
|
|
if e != -1 {
|
|
vEnd = s + e
|
|
}
|
|
|
|
value = is[s:vEnd]
|
|
index = vEnd + 1
|
|
if _, ok := kvmap[strings.ToLower(key)]; ok {
|
|
return nil, probe.NewError(fmt.Errorf("More than one key=value found for %s", strings.TrimSpace(key)))
|
|
}
|
|
kvmap[strings.ToLower(key)] = strings.NewReplacer(`\n`, "\n", `\t`, "\t", `\r`, "\r").Replace(value)
|
|
}
|
|
}
|
|
return kvmap, nil
|
|
}
|
|
|
|
// returns a string with list of serialization options and abbreviation(s) if any
|
|
func fmtString(validAbbr map[string]string, validKeys []string) string {
|
|
var sb strings.Builder
|
|
i := 0
|
|
for k, v := range validAbbr {
|
|
sb.WriteString(fmt.Sprintf("%s(%s) ", v, k))
|
|
i++
|
|
if i != len(validAbbr) {
|
|
sb.WriteString(",")
|
|
}
|
|
}
|
|
if len(sb.String()) == 0 {
|
|
for _, k := range validKeys {
|
|
sb.WriteString(fmt.Sprintf("%s ", k))
|
|
}
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
// parses the input string and constructs a k-v map, replacing any abbreviated keys with actual keys
|
|
func parseSerializationOpts(inp string, validKeys []string, validAbbrKeys map[string]string) (map[string]string, *probe.Error) {
|
|
if validAbbrKeys == nil {
|
|
validAbbrKeys = make(map[string]string)
|
|
}
|
|
validKeyFn := func(key string, validKeys []string) bool {
|
|
for _, name := range validKeys {
|
|
if strings.EqualFold(name, key) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
kv, err := parseKVArgs(inp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ikv := make(map[string]string)
|
|
for k, v := range kv {
|
|
fldName, ok := validAbbrKeys[strings.ToLower(k)]
|
|
if ok {
|
|
ikv[strings.ToLower(fldName)] = v
|
|
} else {
|
|
ikv[strings.ToLower(k)] = v
|
|
}
|
|
}
|
|
for k := range ikv {
|
|
if !validKeyFn(k, validKeys) {
|
|
return nil, probe.NewError(errors.New("Options should be key-value pairs in the form key=value,... where valid key(s) are " + fmtString(validAbbrKeys, validKeys)))
|
|
}
|
|
}
|
|
return ikv, nil
|
|
}
|
|
|
|
// gets the input serialization opts from cli context and constructs a map of csv, json or parquet options
|
|
func getInputSerializationOpts(ctx *cli.Context) map[string]map[string]string {
|
|
icsv := ctx.String("csv-input")
|
|
ijson := ctx.String("json-input")
|
|
m := make(map[string]map[string]string)
|
|
|
|
csvType := ctx.IsSet("csv-input")
|
|
jsonType := ctx.IsSet("json-input")
|
|
if csvType && jsonType {
|
|
fatalIf(errInvalidArgument(), "Only one of --csv-input or --json-input can be specified as input serialization option")
|
|
}
|
|
|
|
if icsv != "" {
|
|
kv, err := parseSerializationOpts(icsv, append(validCSVCommonKeys, validCSVInputKeys...), validCSVInputAbbrKeys)
|
|
fatalIf(err, "Invalid serialization option(s) specified for --csv-input flag")
|
|
|
|
m["csv"] = kv
|
|
}
|
|
if ijson != "" {
|
|
kv, err := parseSerializationOpts(ijson, validJSONInputKeys, nil)
|
|
|
|
fatalIf(err, "Invalid serialization option(s) specified for --json-input flag")
|
|
m["json"] = kv
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// gets the output serialization opts from cli context and constructs a map of csv or json options
|
|
func getOutputSerializationOpts(ctx *cli.Context, csvHdrs []string) (opts map[string]map[string]string) {
|
|
m := make(map[string]map[string]string)
|
|
|
|
ocsv := ctx.String("csv-output")
|
|
ojson := ctx.String("json-output")
|
|
csvType := ctx.IsSet("csv-output")
|
|
jsonType := ctx.IsSet("json-output")
|
|
|
|
if csvType && jsonType {
|
|
fatalIf(errInvalidArgument(), "Only one of --csv-output, or --json-output can be specified as output serialization option")
|
|
}
|
|
|
|
if jsonType && len(csvHdrs) > 0 {
|
|
fatalIf(errInvalidArgument(), "--csv-output-header incompatible with --json-output option")
|
|
}
|
|
|
|
if csvType {
|
|
validKeys := append(validCSVCommonKeys, validJSONCSVCommonOutputKeys...)
|
|
kv, err := parseSerializationOpts(ocsv, append(validKeys, validCSVOutputKeys...), validCSVOutputAbbrKeys)
|
|
fatalIf(err, "Invalid value(s) specified for --csv-output flag")
|
|
m["csv"] = kv
|
|
}
|
|
|
|
if jsonType || globalJSON {
|
|
kv, err := parseSerializationOpts(ojson, validJSONCSVCommonOutputKeys, validJSONOutputAbbrKeys)
|
|
fatalIf(err, "Invalid value(s) specified for --json-output flag")
|
|
m["json"] = kv
|
|
}
|
|
return m
|
|
}
|
|
|
|
// getCSVHeader fetches the first line of csv query object
|
|
func getCSVHeader(sourceURL string, encKeyDB map[string][]prefixSSEPair) ([]string, *probe.Error) {
|
|
var r io.ReadCloser
|
|
switch sourceURL {
|
|
case "-":
|
|
r = os.Stdin
|
|
default:
|
|
var err *probe.Error
|
|
var metadata map[string]string
|
|
if r, metadata, err = getSourceStreamMetadataFromURL(sourceURL, encKeyDB); err != nil {
|
|
return nil, err.Trace(sourceURL)
|
|
}
|
|
ctype := metadata["Content-Type"]
|
|
if strings.Contains(ctype, "gzip") {
|
|
var e error
|
|
r, e = gzip.NewReader(r)
|
|
if e != nil {
|
|
return nil, probe.NewError(e)
|
|
}
|
|
defer r.Close()
|
|
} else if strings.Contains(ctype, "bzip") {
|
|
defer r.Close()
|
|
r = ioutil.NopCloser(bzip2.NewReader(r))
|
|
} else {
|
|
defer r.Close()
|
|
}
|
|
}
|
|
br := bufio.NewReader(r)
|
|
line, _, err := br.ReadLine()
|
|
if err != nil {
|
|
return nil, probe.NewError(err)
|
|
}
|
|
return strings.Split(string(line), ","), nil
|
|
}
|
|
|
|
// returns true if query is selectign all columns of the csv object
|
|
func isSelectAll(query string) bool {
|
|
match, _ := regexp.MatchString("^\\s*?select\\s+?\\*\\s+?.*?$", query)
|
|
return match
|
|
}
|
|
|
|
// if csv-output-header is set to a comma delimited string use it, othjerwise attempt to get the header from
|
|
// query object
|
|
func getCSVOutputHeaders(ctx *cli.Context, url string, encKeyDB map[string][]prefixSSEPair, query string) (hdrs []string) {
|
|
if !ctx.IsSet("csv-output-header") {
|
|
return
|
|
}
|
|
|
|
hdrStr := ctx.String("csv-output-header")
|
|
if hdrStr == "" && isSelectAll(query) {
|
|
// attempt to get the first line of csv as header
|
|
if hdrs, err := getCSVHeader(url, encKeyDB); err == nil {
|
|
return hdrs
|
|
}
|
|
}
|
|
hdrs = strings.Split(hdrStr, ",")
|
|
return
|
|
}
|
|
|
|
// get the Select options for sql select API
|
|
func getSQLOpts(ctx *cli.Context, csvHdrs []string) (s SelectObjectOpts) {
|
|
is := getInputSerializationOpts(ctx)
|
|
os := getOutputSerializationOpts(ctx, csvHdrs)
|
|
|
|
return SelectObjectOpts{
|
|
InputSerOpts: is,
|
|
OutputSerOpts: os,
|
|
}
|
|
}
|
|
|
|
func isCSVOrJSON(inOpts map[string]map[string]string) bool {
|
|
if _, ok := inOpts["csv"]; ok {
|
|
return true
|
|
}
|
|
if _, ok := inOpts["json"]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func sqlSelect(targetURL, expression string, encKeyDB map[string][]prefixSSEPair, selOpts SelectObjectOpts, csvHdrs []string, writeHdr bool) *probe.Error {
|
|
alias, _, _, err := expandAlias(targetURL)
|
|
if err != nil {
|
|
return err.Trace(targetURL)
|
|
}
|
|
|
|
targetClnt, err := newClient(targetURL)
|
|
if err != nil {
|
|
return err.Trace(targetURL)
|
|
}
|
|
|
|
sseKey := getSSE(targetURL, encKeyDB[alias])
|
|
outputer, err := targetClnt.Select(expression, sseKey, selOpts)
|
|
if err != nil {
|
|
return err.Trace(targetURL, expression)
|
|
}
|
|
defer outputer.Close()
|
|
|
|
// write csv header to stdout
|
|
if len(csvHdrs) > 0 && writeHdr {
|
|
fmt.Println(strings.Join(csvHdrs, ","))
|
|
}
|
|
_, e := io.Copy(os.Stdout, outputer)
|
|
return probe.NewError(e)
|
|
}
|
|
|
|
func validateOpts(selOpts SelectObjectOpts, url string) {
|
|
_, targetURL, _ := mustExpandAlias(url)
|
|
if strings.HasSuffix(targetURL, ".parquet") && isCSVOrJSON(selOpts.InputSerOpts) {
|
|
fatalIf(errInvalidArgument(), "Input serialization flags --csv-input and --json-input cannot be used for object in .parquet format")
|
|
}
|
|
}
|
|
|
|
// validate args and optionally fetch the csv header of query object
|
|
func getAndValidateArgs(ctx *cli.Context, encKeyDB map[string][]prefixSSEPair, url string) (query string, csvHdrs []string, selOpts SelectObjectOpts) {
|
|
query = ctx.String("query")
|
|
csvHdrs = getCSVOutputHeaders(ctx, url, encKeyDB, query)
|
|
selOpts = getSQLOpts(ctx, csvHdrs)
|
|
validateOpts(selOpts, url)
|
|
return
|
|
}
|
|
|
|
// check sql input arguments.
|
|
func checkSQLSyntax(ctx *cli.Context) {
|
|
if len(ctx.Args()) == 0 {
|
|
cli.ShowCommandHelpAndExit(ctx, "sql", 1) // last argument is exit code.
|
|
}
|
|
}
|
|
|
|
// mainSQL is the main entry point for sql command.
|
|
func mainSQL(ctx *cli.Context) error {
|
|
var (
|
|
csvHdrs []string
|
|
selOpts SelectObjectOpts
|
|
query string
|
|
)
|
|
// Parse encryption keys per command.
|
|
encKeyDB, err := getEncKeys(ctx)
|
|
fatalIf(err, "Unable to parse encryption keys.")
|
|
|
|
// validate sql input arguments.
|
|
checkSQLSyntax(ctx)
|
|
// extract URLs.
|
|
URLs := ctx.Args()
|
|
writeHdr := true
|
|
for _, url := range URLs {
|
|
if !isAliasURLDir(url, encKeyDB) {
|
|
if writeHdr {
|
|
query, csvHdrs, selOpts = getAndValidateArgs(ctx, encKeyDB, url)
|
|
}
|
|
errorIf(sqlSelect(url, query, encKeyDB, selOpts, csvHdrs, writeHdr).Trace(url), "Unable to run sql")
|
|
writeHdr = false
|
|
continue
|
|
}
|
|
targetAlias, targetURL, _ := mustExpandAlias(url)
|
|
clnt, err := newClientFromAlias(targetAlias, targetURL)
|
|
if err != nil {
|
|
errorIf(err.Trace(url), "Unable to initialize target `"+url+"`.")
|
|
continue
|
|
}
|
|
|
|
for content := range clnt.List(ctx.Bool("recursive"), false, false, DirNone) {
|
|
if content.Err != nil {
|
|
errorIf(content.Err.Trace(url), "Unable to list on target `"+url+"`.")
|
|
continue
|
|
}
|
|
if writeHdr {
|
|
query, csvHdrs, selOpts = getAndValidateArgs(ctx, encKeyDB, targetAlias+content.URL.Path)
|
|
}
|
|
contentType := mimedb.TypeByExtension(filepath.Ext(content.URL.Path))
|
|
for _, cTypeSuffix := range supportedContentTypes {
|
|
if strings.Contains(contentType, cTypeSuffix) {
|
|
errorIf(sqlSelect(targetAlias+content.URL.Path, query,
|
|
encKeyDB, selOpts, csvHdrs, writeHdr).Trace(content.URL.String()), "Unable to run sql")
|
|
}
|
|
writeHdr = false
|
|
}
|
|
}
|
|
}
|
|
|
|
// Done.
|
|
return nil
|
|
}
|