// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
package cmd
import (
"context"
"fmt"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/madmin-go/v3"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/pkg/v3/console"
)
var replicateAddFlags = []cli.Flag{
cli.StringFlag{
Name: "arn",
Usage: "unique role ARN",
Hidden: true,
},
cli.StringFlag{
Name: "id",
Usage: "id for the rule, should be a unique value",
},
cli.StringFlag{
Name: "tags",
Usage: "format '=&=&=', multiple values allowed for multiple key/value pairs",
},
cli.StringFlag{
Name: "storage-class",
Usage: `storage class for destination, valid values are either "STANDARD" or "REDUCED_REDUNDANCY"`,
},
cli.BoolFlag{
Name: "disable",
Usage: "disable the rule",
},
cli.IntFlag{
Name: "priority",
Usage: "priority of the rule, should be unique and is a required field",
},
cli.StringFlag{
Name: "remote-bucket",
Usage: "remote bucket, should be a unique value for the configuration",
},
cli.StringFlag{
Name: "replicate",
Value: `delete-marker,delete,existing-objects,metadata-sync`,
Usage: `comma separated list to enable replication of soft deletes, permanent deletes, existing objects and metadata sync`,
},
cli.StringFlag{
Name: "path",
Value: "auto",
Usage: "bucket path lookup supported by the server. Valid options are ['auto', 'on', 'off']'",
},
cli.StringFlag{
Name: "region",
Usage: "region of the destination bucket (optional)",
},
cli.StringFlag{
Name: "bandwidth",
Usage: "set bandwidth limit in bytes per second (K,B,G,T for metric and Ki,Bi,Gi,Ti for IEC units)",
},
cli.BoolFlag{
Name: "sync",
Usage: "enable synchronous replication for this target. default is async",
},
cli.UintFlag{
Name: "healthcheck-seconds",
Usage: "health check interval in seconds",
Value: 60,
},
cli.BoolFlag{
Name: "disable-proxy",
Usage: "disable proxying in active-active replication. If unset, default behavior is to proxy",
},
}
var replicateAddCmd = cli.Command{
Name: "add",
Usage: "add a server side replication configuration rule",
Action: mainReplicateAdd,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(globalFlags, replicateAddFlags...),
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} TARGET
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}
EXAMPLES:
1. Add replication configuration rule on bucket "sourcebucket" for alias "sourceminio" with alias "targetminio" to replicate all operations in an active-active replication setup.
{{.Prompt}} {{.HelpName}} sourceminio/sourcebucket --remote-bucket targetminio/targetbucket \
--priority 1
2. Add replication configuration rule on bucket "mybucket" for alias "myminio" to replicate all operations in an active-active replication setup.
{{.Prompt}} {{.HelpName}} myminio/mybucket --remote-bucket https://foobar:foo12345@minio.siteb.example.com/targetbucket \
--priority 1
3. Add replication configuration rule on bucket "mybucket" for alias "myminio" to replicate all objects with tags
"key1=value1, key2=value2" to targetbucket synchronously with bandwidth set to 2 gigabits per second.
{{.Prompt}} {{.HelpName}} myminio/mybucket --remote-bucket https://foobar:foo12345@minio.siteb.example.com/targetbucket \
--tags "key1=value1&key2=value2" --bandwidth "2G" --sync \
--priority 1
4. Disable a replication configuration rule on bucket "mybucket" for alias "myminio".
{{.Prompt}} {{.HelpName}} myminio/mybucket --remote-bucket https://foobar:foo12345@minio.siteb.example.com/targetbucket \
--tags "key1=value1&key2=value2" \
--priority 1 --disable
5. Add replication configuration rule with existing object replication, delete marker replication and versioned deletes
enabled on bucket "mybucket" for alias "myminio".
{{.Prompt}} {{.HelpName}} myminio/mybucket --remote-bucket https://foobar:foo12345@minio.siteb.example.com/targetbucket \
--replicate "existing-objects,delete,delete-marker" \
--priority 1
`,
}
// checkReplicateAddSyntax - validate all the passed arguments
func checkReplicateAddSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 {
showCommandHelpAndExit(ctx, 1) // last argument is exit code
}
if ctx.String("remote-bucket") == "" {
fatal(errDummy().Trace(), "--remote-bucket flag needs to be specified.")
}
}
type replicateAddMessage struct {
Op string `json:"op"`
Status string `json:"status"`
URL string `json:"url"`
ID string `json:"id"`
}
const (
enableStatus = "enable"
disableStatus = "disable"
)
func (l replicateAddMessage) JSON() string {
l.Status = "success"
jsonMessageBytes, e := json.MarshalIndent(l, "", " ")
fatalIf(probe.NewError(e), "Unable to marshal into JSON.")
return string(jsonMessageBytes)
}
func (l replicateAddMessage) String() string {
if l.ID != "" {
return console.Colorize("replicateAddMessage", "Replication configuration rule with ID `"+l.ID+"` applied to "+l.URL+".")
}
return console.Colorize("replicateAddMessage", "Replication configuration rule applied to "+l.URL+" successfully.")
}
func extractCredentialURL(argURL string) (accessKey, secretKey string, u *url.URL) {
var parsedURL string
if strings.HasPrefix(argURL, "http://") || strings.HasPrefix(argURL, "https://") {
if hostKeyTokens.MatchString(argURL) {
fatalIf(errInvalidArgument().Trace(argURL), "temporary tokens are not allowed for remote targets")
}
if hostKeys.MatchString(argURL) {
parts := hostKeys.FindStringSubmatch(argURL)
if len(parts) != 5 {
fatalIf(errInvalidArgument().Trace(argURL), "unsupported remote target format, please check --help")
}
accessKey = parts[2]
secretKey = parts[3]
parsedURL = fmt.Sprintf("%s%s", parts[1], parts[4])
}
} else {
var alias string
var aliasCfg *aliasConfigV10
// get alias config by alias url
alias, parsedURL, aliasCfg = mustExpandAlias(argURL)
if aliasCfg == nil {
fatalIf(errInvalidAliasedURL(alias).Trace(argURL), "No such alias `"+alias+"` found.")
return
}
accessKey, secretKey = aliasCfg.AccessKey, aliasCfg.SecretKey
}
var e error
if parsedURL == "" {
fatalIf(errInvalidArgument().Trace(argURL), "no valid credentials were detected")
}
u, e = url.Parse(parsedURL)
if e != nil {
fatalIf(errInvalidArgument().Trace(parsedURL), "unsupported URL format %v", e)
}
return accessKey, secretKey, u
}
// fetchRemoteTarget - returns the dest bucket, dest endpoint, access and secret key
func fetchRemoteTarget(cli *cli.Context) (bktTarget *madmin.BucketTarget) {
if !cli.IsSet("remote-bucket") {
fatalIf(probe.NewError(fmt.Errorf("missing Remote target configuration")), "unable to parse remote target")
}
p := cli.String("path")
if !isValidPath(p) {
fatalIf(errInvalidArgument().Trace(p),
"unrecognized bucket path style. Valid options are `[on, off, auto]`.")
}
tgtURL := cli.String("remote-bucket")
accessKey, secretKey, u := extractCredentialURL(tgtURL)
var tgtBucket string
if u.Path != "" {
tgtBucket = path.Clean(u.Path[1:])
}
fatalIf(probe.NewError(s3utils.CheckValidBucketName(tgtBucket)).Trace(tgtURL), "invalid target bucket")
bandwidthStr := cli.String("bandwidth")
bandwidth, e := getBandwidthInBytes(bandwidthStr)
fatalIf(probe.NewError(e).Trace(bandwidthStr), "invalid bandwidth value")
console.SetColor(cred, color.New(color.FgYellow, color.Italic))
creds := &madmin.Credentials{AccessKey: accessKey, SecretKey: secretKey}
disableproxy := cli.Bool("disable-proxy")
bktTarget = &madmin.BucketTarget{
TargetBucket: tgtBucket,
Secure: u.Scheme == "https",
Credentials: creds,
Endpoint: u.Host,
Path: p,
API: "s3v4",
Type: madmin.ServiceType("replication"),
Region: cli.String("region"),
BandwidthLimit: int64(bandwidth),
ReplicationSync: cli.Bool("sync"),
DisableProxy: disableproxy,
HealthCheckDuration: time.Duration(cli.Uint("healthcheck-seconds")) * time.Second,
}
return bktTarget
}
func getBandwidthInBytes(bandwidthStr string) (bandwidth uint64, err error) {
if bandwidthStr != "" {
bandwidth, err = humanize.ParseBytes(bandwidthStr)
if err != nil {
return
}
}
return
}
func mainReplicateAdd(cliCtx *cli.Context) error {
ctx, cancelReplicateAdd := context.WithCancel(globalContext)
defer cancelReplicateAdd()
console.SetColor("replicateAddMessage", color.New(color.FgGreen))
checkReplicateAddSyntax(cliCtx)
// Get the alias parameter from cli
args := cliCtx.Args()
aliasedURL := args.Get(0)
// Create a new Client
client, err := newClient(aliasedURL)
fatalIf(err, "unable to initialize connection.")
var sourceBucket string
switch c := client.(type) {
case *S3Client:
sourceBucket, _ = c.url2BucketAndObject()
default:
fatalIf(err.Trace(args...), "replication is not supported for filesystem")
}
// Create a new MinIO Admin Client
admclient, cerr := newAdminClient(aliasedURL)
fatalIf(cerr, "unable to initialize admin connection.")
bktTarget := fetchRemoteTarget(cliCtx)
arn, e := admclient.SetRemoteTarget(globalContext, sourceBucket, bktTarget)
fatalIf(probe.NewError(e).Trace(args...), "unable to configure remote target")
rcfg, err := client.GetReplication(ctx)
fatalIf(err.Trace(args...), "unable to fetch replication configuration")
ruleStatus := enableStatus
if cliCtx.Bool(disableStatus) {
ruleStatus = disableStatus
}
dmReplicateStatus := disableStatus
deleteReplicationStatus := disableStatus
replicaSync := enableStatus
existingReplicationStatus := disableStatus
replSlice := strings.Split(cliCtx.String("replicate"), ",")
for _, opt := range replSlice {
switch strings.TrimSpace(strings.ToLower(opt)) {
case "delete-marker":
dmReplicateStatus = enableStatus
case "delete":
deleteReplicationStatus = enableStatus
case "metadata-sync", "replica-metadata-sync":
replicaSync = enableStatus
case "existing-objects":
existingReplicationStatus = enableStatus
default:
fatalIf(probe.NewError(fmt.Errorf("invalid value for --replicate flag %s", cliCtx.String("replicate"))),
`--replicate flag takes one or more comma separated string with values "delete", "delete-marker", "metadata-sync", "existing-objects" or "" to disable these settings`)
}
}
opts := replication.Options{
TagString: cliCtx.String("tags"),
StorageClass: cliCtx.String("storage-class"),
Priority: strconv.Itoa(cliCtx.Int("priority")),
RuleStatus: ruleStatus,
ID: cliCtx.String("id"),
DestBucket: arn,
Op: replication.AddOption,
ReplicateDeleteMarkers: dmReplicateStatus,
ReplicateDeletes: deleteReplicationStatus,
ReplicaSync: replicaSync,
ExistingObjectReplicate: existingReplicationStatus,
}
fatalIf(client.SetReplication(ctx, &rcfg, opts), "unable to add replication rule")
printMsg(replicateAddMessage{
Op: cliCtx.Command.Name,
URL: aliasedURL,
ID: opts.ID,
})
return nil
}