mirror of
https://github.com/minio/mc.git
synced 2025-11-10 13:42:32 +03:00
app.Before receives a cli.Context but without flags parsed. There is no point calling ctx.Bool() or ctx.IsSet() at that stage. However, flags are parsed in commands, so minio initialization and setting global variables (globalJSON, globalQuiet, etc..) can be moved to the Before function of all commands. Avoid setting command.Before for non leaf commands, otherwise Before function will be called multiples times until it reaches the leaf command.
496 lines
15 KiB
Go
496 lines
15 KiB
Go
/*
|
|
* MinIO Client, (C) 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/bzip2"
|
|
"compress/gzip"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/minio/cli"
|
|
"github.com/minio/mc/pkg/probe"
|
|
"github.com/minio/minio-go/v6"
|
|
"github.com/minio/minio/pkg/mimedb"
|
|
)
|
|
|
|
var (
|
|
sqlFlags = []cli.Flag{
|
|
cli.StringFlag{
|
|
Name: "query, e",
|
|
Usage: "sql query expression",
|
|
Value: "select * from s3object",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "recursive, r",
|
|
Usage: "sql query recursively",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-input",
|
|
Usage: "csv input serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "json-input",
|
|
Usage: "json input serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "compression",
|
|
Usage: "input compression type",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-output",
|
|
Usage: "csv output serialization option",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "csv-output-header",
|
|
Usage: "optional csv output header ",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "json-output",
|
|
Usage: "json output serialization option",
|
|
},
|
|
}
|
|
)
|
|
|
|
// Display contents of a file.
|
|
var sqlCmd = cli.Command{
|
|
Name: "sql",
|
|
Usage: "run sql queries on objects",
|
|
Action: mainSQL,
|
|
Before: initBeforeRunningCmd,
|
|
Flags: append(append(sqlFlags, ioFlags...), globalFlags...),
|
|
CustomHelpTemplate: `NAME:
|
|
{{.HelpName}} - {{.Usage}}
|
|
|
|
USAGE:
|
|
{{.HelpName}} [FLAGS] TARGET [TARGET...]
|
|
{{if .VisibleFlags}}
|
|
FLAGS:
|
|
{{range .VisibleFlags}}{{.}}
|
|
{{end}}{{end}}
|
|
ENVIRONMENT VARIABLES:
|
|
MC_ENCRYPT_KEY: list of comma delimited prefix=secret values
|
|
|
|
SERIALIZATION OPTIONS:
|
|
For query serialization options, refer to https://docs.min.io/docs/minio-client-complete-guide#sql
|
|
|
|
EXAMPLES:
|
|
1. Run a query on a set of objects recursively on AWS S3.
|
|
{{.Prompt}} {{.HelpName}} --recursive --query "select * from S3Object" s3/personalbucket/my-large-csvs/
|
|
|
|
2. Run a query on an object on MinIO.
|
|
{{.Prompt}} {{.HelpName}} --query "select count(s.power) from S3Object" myminio/iot-devices/power-ratio.csv
|
|
|
|
3. Run a query on an encrypted object with customer provided keys.
|
|
{{.Prompt}} {{.HelpName}} --encrypt-key "myminio/iot-devices=32byteslongsecretkeymustbegiven1" \
|
|
--query "select count(s.power) from S3Object s" myminio/iot-devices/power-ratio-encrypted.csv
|
|
|
|
4. Run a query on an object on MinIO in gzip format using ; as field delimiter,
|
|
newline as record delimiter and file header to be used
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--query "select count(s.power) from S3Object" myminio/iot-devices/power-ratio.csv.gz
|
|
|
|
5. Run a query on an object on MinIO in gzip format using ; as field delimiter,
|
|
newline as record delimiter and file header to be used
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--json-output "rd=\n\n" --query "select * from S3Object" myminio/iot-devices/data.csv
|
|
|
|
6. Run same query as in 5., but specify csv output headers. If --csv-output-headers is
|
|
specified as "", first row of csv is interpreted as header
|
|
{{.Prompt}} {{.HelpName}} --compression GZIP --csv-input "rd=\n,fh=USE,fd=;" \
|
|
--csv-output "rd=\n" --csv-output-header "device_id,uptime,lat,lon" \
|
|
--query "select * from S3Object" myminio/iot-devices/data.csv
|
|
`,
|
|
}
|
|
|
|
// valid CSV and JSON keys for input/output serialization
|
|
var (
|
|
validCSVCommonKeys = []string{"FieldDelimiter", "QuoteChar", "QuoteEscChar"}
|
|
validCSVInputKeys = []string{"Comments", "FileHeader", "QuotedRecordDelimiter", "RecordDelimiter"}
|
|
validCSVOutputKeys = []string{"QuoteFields"}
|
|
|
|
validJSONInputKeys = []string{"Type"}
|
|
validJSONCSVCommonOutputKeys = []string{"RecordDelimiter"}
|
|
|
|
// mapping of abbreviation to long form name of CSV and JSON input/output serialization keys
|
|
validCSVInputAbbrKeys = map[string]string{"cc": "Comments", "fh": "FileHeader", "qrd": "QuotedRecordDelimiter", "rd": "RecordDelimiter", "fd": "FieldDelimiter", "qc": "QuoteChar", "qec": "QuoteEscChar"}
|
|
validCSVOutputAbbrKeys = map[string]string{"qf": "QuoteFields", "rd": "RecordDelimiter", "fd": "FieldDelimiter", "qc": "QuoteChar", "qec": "QuoteEscChar"}
|
|
validJSONOutputAbbrKeys = map[string]string{"rd": "RecordDelimiter"}
|
|
)
|
|
|
|
// parseKVArgs parses string of the form k=v delimited by ","
|
|
// into a map of k-v pairs
|
|
func parseKVArgs(is string) (map[string]string, *probe.Error) {
|
|
kvmap := make(map[string]string)
|
|
var key, value string
|
|
var s, e int // tracking start and end of value
|
|
var index int // current index in string
|
|
if is != "" {
|
|
for index < len(is) {
|
|
i := strings.Index(is[index:], "=")
|
|
if i == -1 {
|
|
return nil, probe.NewError(errors.New("Arguments should be of the form key=value,... "))
|
|
}
|
|
key = is[index : index+i]
|
|
s = i + index + 1
|
|
e = strings.Index(is[s:], ",")
|
|
delimFound := false
|
|
for !delimFound {
|
|
if e == -1 || e+s >= len(is) {
|
|
delimFound = true
|
|
break
|
|
}
|
|
if string(is[s+e]) != "," {
|
|
delimFound = true
|
|
if string(is[s+e-1]) == "," {
|
|
e--
|
|
}
|
|
} else {
|
|
e++
|
|
}
|
|
}
|
|
var vEnd = len(is)
|
|
if e != -1 {
|
|
vEnd = s + e
|
|
}
|
|
|
|
value = is[s:vEnd]
|
|
index = vEnd + 1
|
|
if _, ok := kvmap[strings.ToLower(key)]; ok {
|
|
return nil, probe.NewError(fmt.Errorf("More than one key=value found for %s", strings.TrimSpace(key)))
|
|
}
|
|
kvmap[strings.ToLower(key)] = strings.NewReplacer(`\n`, "\n", `\t`, "\t", `\r`, "\r").Replace(value)
|
|
}
|
|
}
|
|
return kvmap, nil
|
|
}
|
|
|
|
// returns a string with list of serialization options and abbreviation(s) if any
|
|
func fmtString(validAbbr map[string]string, validKeys []string) string {
|
|
var sb strings.Builder
|
|
i := 0
|
|
for k, v := range validAbbr {
|
|
sb.WriteString(fmt.Sprintf("%s(%s) ", v, k))
|
|
i++
|
|
if i != len(validAbbr) {
|
|
sb.WriteString(",")
|
|
}
|
|
}
|
|
if len(sb.String()) == 0 {
|
|
for _, k := range validKeys {
|
|
sb.WriteString(fmt.Sprintf("%s ", k))
|
|
}
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
// parses the input string and constructs a k-v map, replacing any abbreviated keys with actual keys
|
|
func parseSerializationOpts(inp string, validKeys []string, validAbbrKeys map[string]string) (map[string]string, *probe.Error) {
|
|
if validAbbrKeys == nil {
|
|
validAbbrKeys = make(map[string]string)
|
|
}
|
|
validKeyFn := func(key string, validKeys []string) bool {
|
|
for _, name := range validKeys {
|
|
if strings.EqualFold(name, key) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
kv, err := parseKVArgs(inp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ikv := make(map[string]string)
|
|
for k, v := range kv {
|
|
fldName, ok := validAbbrKeys[strings.ToLower(k)]
|
|
if ok {
|
|
ikv[strings.ToLower(fldName)] = v
|
|
} else {
|
|
ikv[strings.ToLower(k)] = v
|
|
}
|
|
}
|
|
for k := range ikv {
|
|
if !validKeyFn(k, validKeys) {
|
|
return nil, probe.NewError(errors.New("Options should be key-value pairs in the form key=value,... where valid key(s) are " + fmtString(validAbbrKeys, validKeys)))
|
|
}
|
|
}
|
|
return ikv, nil
|
|
}
|
|
|
|
// gets the input serialization opts from cli context and constructs a map of csv, json or parquet options
|
|
func getInputSerializationOpts(ctx *cli.Context) map[string]map[string]string {
|
|
icsv := ctx.String("csv-input")
|
|
ijson := ctx.String("json-input")
|
|
m := make(map[string]map[string]string)
|
|
|
|
csvType := ctx.IsSet("csv-input")
|
|
jsonType := ctx.IsSet("json-input")
|
|
if csvType && jsonType {
|
|
fatalIf(errInvalidArgument(), "Only one of --csv-input or --json-input can be specified as input serialization option")
|
|
}
|
|
|
|
if icsv != "" {
|
|
kv, err := parseSerializationOpts(icsv, append(validCSVCommonKeys, validCSVInputKeys...), validCSVInputAbbrKeys)
|
|
fatalIf(err, "Invalid serialization option(s) specified for --csv-input flag")
|
|
|
|
m["csv"] = kv
|
|
}
|
|
if ijson != "" {
|
|
kv, err := parseSerializationOpts(ijson, validJSONInputKeys, nil)
|
|
|
|
fatalIf(err, "Invalid serialization option(s) specified for --json-input flag")
|
|
m["json"] = kv
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// gets the output serialization opts from cli context and constructs a map of csv or json options
|
|
func getOutputSerializationOpts(ctx *cli.Context, csvHdrs []string) (opts map[string]map[string]string) {
|
|
m := make(map[string]map[string]string)
|
|
|
|
ocsv := ctx.String("csv-output")
|
|
ojson := ctx.String("json-output")
|
|
csvType := ctx.IsSet("csv-output")
|
|
jsonType := ctx.IsSet("json-output")
|
|
|
|
if csvType && jsonType {
|
|
fatalIf(errInvalidArgument(), "Only one of --csv-output, or --json-output can be specified as output serialization option")
|
|
}
|
|
|
|
if jsonType && len(csvHdrs) > 0 {
|
|
fatalIf(errInvalidArgument(), "--csv-output-header incompatible with --json-output option")
|
|
}
|
|
|
|
if csvType {
|
|
validKeys := append(validCSVCommonKeys, validJSONCSVCommonOutputKeys...)
|
|
kv, err := parseSerializationOpts(ocsv, append(validKeys, validCSVOutputKeys...), validCSVOutputAbbrKeys)
|
|
fatalIf(err, "Invalid value(s) specified for --csv-output flag")
|
|
m["csv"] = kv
|
|
}
|
|
|
|
if jsonType || globalJSON {
|
|
kv, err := parseSerializationOpts(ojson, validJSONCSVCommonOutputKeys, validJSONOutputAbbrKeys)
|
|
fatalIf(err, "Invalid value(s) specified for --json-output flag")
|
|
m["json"] = kv
|
|
}
|
|
return m
|
|
}
|
|
|
|
// getCSVHeader fetches the first line of csv query object
|
|
func getCSVHeader(sourceURL string, encKeyDB map[string][]prefixSSEPair) ([]string, *probe.Error) {
|
|
var r io.ReadCloser
|
|
switch sourceURL {
|
|
case "-":
|
|
r = os.Stdin
|
|
default:
|
|
var err *probe.Error
|
|
var metadata map[string]string
|
|
if r, metadata, err = getSourceStreamMetadataFromURL(globalContext, sourceURL, encKeyDB); err != nil {
|
|
return nil, err.Trace(sourceURL)
|
|
}
|
|
ctype := metadata["Content-Type"]
|
|
if strings.Contains(ctype, "gzip") {
|
|
var e error
|
|
r, e = gzip.NewReader(r)
|
|
if e != nil {
|
|
return nil, probe.NewError(e)
|
|
}
|
|
defer r.Close()
|
|
} else if strings.Contains(ctype, "bzip") {
|
|
defer r.Close()
|
|
r = ioutil.NopCloser(bzip2.NewReader(r))
|
|
} else {
|
|
defer r.Close()
|
|
}
|
|
}
|
|
br := bufio.NewReader(r)
|
|
line, _, err := br.ReadLine()
|
|
if err != nil {
|
|
return nil, probe.NewError(err)
|
|
}
|
|
return strings.Split(string(line), ","), nil
|
|
}
|
|
|
|
// returns true if query is selectign all columns of the csv object
|
|
func isSelectAll(query string) bool {
|
|
match, _ := regexp.MatchString("^\\s*?select\\s+?\\*\\s+?.*?$", query)
|
|
return match
|
|
}
|
|
|
|
// if csv-output-header is set to a comma delimited string use it, othjerwise attempt to get the header from
|
|
// query object
|
|
func getCSVOutputHeaders(ctx *cli.Context, url string, encKeyDB map[string][]prefixSSEPair, query string) (hdrs []string) {
|
|
if !ctx.IsSet("csv-output-header") {
|
|
return
|
|
}
|
|
|
|
hdrStr := ctx.String("csv-output-header")
|
|
if hdrStr == "" && isSelectAll(query) {
|
|
// attempt to get the first line of csv as header
|
|
if hdrs, err := getCSVHeader(url, encKeyDB); err == nil {
|
|
return hdrs
|
|
}
|
|
}
|
|
hdrs = strings.Split(hdrStr, ",")
|
|
return
|
|
}
|
|
|
|
// get the Select options for sql select API
|
|
func getSQLOpts(ctx *cli.Context, csvHdrs []string) (s SelectObjectOpts) {
|
|
is := getInputSerializationOpts(ctx)
|
|
os := getOutputSerializationOpts(ctx, csvHdrs)
|
|
|
|
return SelectObjectOpts{
|
|
InputSerOpts: is,
|
|
OutputSerOpts: os,
|
|
CompressionType: minio.SelectCompressionType(ctx.String("compression")),
|
|
}
|
|
}
|
|
|
|
func isCSVOrJSON(inOpts map[string]map[string]string) bool {
|
|
if _, ok := inOpts["csv"]; ok {
|
|
return true
|
|
}
|
|
if _, ok := inOpts["json"]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func sqlSelect(targetURL, expression string, encKeyDB map[string][]prefixSSEPair, selOpts SelectObjectOpts, csvHdrs []string, writeHdr bool) *probe.Error {
|
|
ctx, cancelSelect := context.WithCancel(globalContext)
|
|
defer cancelSelect()
|
|
|
|
alias, _, _, err := expandAlias(targetURL)
|
|
if err != nil {
|
|
return err.Trace(targetURL)
|
|
}
|
|
|
|
targetClnt, err := newClient(targetURL)
|
|
if err != nil {
|
|
return err.Trace(targetURL)
|
|
}
|
|
|
|
sseKey := getSSE(targetURL, encKeyDB[alias])
|
|
outputer, err := targetClnt.Select(ctx, expression, sseKey, selOpts)
|
|
if err != nil {
|
|
return err.Trace(targetURL, expression)
|
|
}
|
|
defer outputer.Close()
|
|
|
|
// write csv header to stdout
|
|
if len(csvHdrs) > 0 && writeHdr {
|
|
fmt.Println(strings.Join(csvHdrs, ","))
|
|
}
|
|
_, e := io.Copy(os.Stdout, outputer)
|
|
return probe.NewError(e)
|
|
}
|
|
|
|
func validateOpts(selOpts SelectObjectOpts, url string) {
|
|
_, targetURL, _ := mustExpandAlias(url)
|
|
if strings.HasSuffix(targetURL, ".parquet") && isCSVOrJSON(selOpts.InputSerOpts) {
|
|
fatalIf(errInvalidArgument(), "Input serialization flags --csv-input and --json-input cannot be used for object in .parquet format")
|
|
}
|
|
}
|
|
|
|
// validate args and optionally fetch the csv header of query object
|
|
func getAndValidateArgs(ctx *cli.Context, encKeyDB map[string][]prefixSSEPair, url string) (query string, csvHdrs []string, selOpts SelectObjectOpts) {
|
|
query = ctx.String("query")
|
|
csvHdrs = getCSVOutputHeaders(ctx, url, encKeyDB, query)
|
|
selOpts = getSQLOpts(ctx, csvHdrs)
|
|
validateOpts(selOpts, url)
|
|
return
|
|
}
|
|
|
|
// check sql input arguments.
|
|
func checkSQLSyntax(ctx *cli.Context) {
|
|
if len(ctx.Args()) == 0 {
|
|
cli.ShowCommandHelpAndExit(ctx, "sql", 1) // last argument is exit code.
|
|
}
|
|
}
|
|
|
|
// mainSQL is the main entry point for sql command.
|
|
func mainSQL(cliCtx *cli.Context) error {
|
|
ctx, cancelSQL := context.WithCancel(globalContext)
|
|
defer cancelSQL()
|
|
|
|
var (
|
|
csvHdrs []string
|
|
selOpts SelectObjectOpts
|
|
query string
|
|
)
|
|
// Parse encryption keys per command.
|
|
encKeyDB, err := getEncKeys(cliCtx)
|
|
fatalIf(err, "Unable to parse encryption keys.")
|
|
|
|
// validate sql input arguments.
|
|
checkSQLSyntax(cliCtx)
|
|
// extract URLs.
|
|
URLs := cliCtx.Args()
|
|
writeHdr := true
|
|
for _, url := range URLs {
|
|
if _, targetContent, err := url2Stat(ctx, url, false, encKeyDB); err != nil {
|
|
errorIf(err.Trace(url), "Unable to run sql for "+url+".")
|
|
continue
|
|
} else if !targetContent.Type.IsDir() {
|
|
if writeHdr {
|
|
query, csvHdrs, selOpts = getAndValidateArgs(cliCtx, encKeyDB, url)
|
|
}
|
|
errorIf(sqlSelect(url, query, encKeyDB, selOpts, csvHdrs, writeHdr).Trace(url), "Unable to run sql")
|
|
writeHdr = false
|
|
continue
|
|
}
|
|
targetAlias, targetURL, _ := mustExpandAlias(url)
|
|
clnt, err := newClientFromAlias(targetAlias, targetURL)
|
|
if err != nil {
|
|
errorIf(err.Trace(url), "Unable to initialize target `"+url+"`.")
|
|
continue
|
|
}
|
|
|
|
for content := range clnt.List(ctx, cliCtx.Bool("recursive"), false, false, DirNone) {
|
|
if content.Err != nil {
|
|
errorIf(content.Err.Trace(url), "Unable to list on target `"+url+"`.")
|
|
continue
|
|
}
|
|
if writeHdr {
|
|
query, csvHdrs, selOpts = getAndValidateArgs(cliCtx, encKeyDB, targetAlias+content.URL.Path)
|
|
}
|
|
contentType := mimedb.TypeByExtension(filepath.Ext(content.URL.Path))
|
|
for _, cTypeSuffix := range supportedContentTypes {
|
|
if strings.Contains(contentType, cTypeSuffix) {
|
|
errorIf(sqlSelect(targetAlias+content.URL.Path, query,
|
|
encKeyDB, selOpts, csvHdrs, writeHdr).Trace(content.URL.String()), "Unable to run sql")
|
|
}
|
|
writeHdr = false
|
|
}
|
|
}
|
|
}
|
|
|
|
// Done.
|
|
return nil
|
|
}
|