1
0
mirror of https://github.com/minio/mc.git synced 2025-11-26 20:03:05 +03:00

Add multisite replication support in mc replicate cmd (#3766)

Also adding a flag to `mc replicate reset` to allow 
granular replication reset by target
This commit is contained in:
Poorna Krishnamoorthy
2021-09-15 13:32:28 -04:00
committed by GitHub
parent 24456eb990
commit 6bf64d9704
9 changed files with 148 additions and 73 deletions

View File

@@ -1187,8 +1187,8 @@ func (f *fsClient) GetReplicationMetrics(ctx context.Context) (replication.Metri
// ResetReplication - kicks off replication again on previously replicated objects if existing object // ResetReplication - kicks off replication again on previously replicated objects if existing object
// replication is enabled in the replication config, not implemented // replication is enabled in the replication config, not implemented
func (f *fsClient) ResetReplication(ctx context.Context, before time.Duration) (string, *probe.Error) { func (f *fsClient) ResetReplication(ctx context.Context, before time.Duration, arn string) (rinfo replication.ResyncTargetsInfo, err *probe.Error) {
return "", probe.NewError(APINotImplemented{ return rinfo, probe.NewError(APINotImplemented{
API: "ResetReplication", API: "ResetReplication",
APIType: "filesystem", APIType: "filesystem",
}) })

View File

@@ -2498,17 +2498,17 @@ func (c *S3Client) GetReplicationMetrics(ctx context.Context) (replication.Metri
// ResetReplication - kicks off replication again on previously replicated objects if existing object // ResetReplication - kicks off replication again on previously replicated objects if existing object
// replication is enabled in the replication config.Optional to provide a timestamp // replication is enabled in the replication config.Optional to provide a timestamp
func (c *S3Client) ResetReplication(ctx context.Context, before time.Duration) (string, *probe.Error) { func (c *S3Client) ResetReplication(ctx context.Context, before time.Duration, tgtArn string) (rinfo replication.ResyncTargetsInfo, err *probe.Error) {
bucket, _ := c.url2BucketAndObject() bucket, _ := c.url2BucketAndObject()
if bucket == "" { if bucket == "" {
return "", probe.NewError(BucketNameEmpty{}) return rinfo, probe.NewError(BucketNameEmpty{})
} }
rID, e := c.api.ResetBucketReplication(ctx, bucket, before) rinfo, e := c.api.ResetBucketReplicationOnTarget(ctx, bucket, before, tgtArn)
if e != nil { if e != nil {
return "", probe.NewError(e) return rinfo, probe.NewError(e)
} }
return rID, nil return rinfo, nil
} }
// GetEncryption - gets bucket encryption info. // GetEncryption - gets bucket encryption info.

View File

@@ -160,7 +160,7 @@ type Client interface {
SetReplication(ctx context.Context, cfg *replication.Config, opts replication.Options) *probe.Error SetReplication(ctx context.Context, cfg *replication.Config, opts replication.Options) *probe.Error
RemoveReplication(ctx context.Context) *probe.Error RemoveReplication(ctx context.Context) *probe.Error
GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error) GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error)
ResetReplication(ctx context.Context, before time.Duration) (string, *probe.Error) ResetReplication(ctx context.Context, before time.Duration, arn string) (replication.ResyncTargetsInfo, *probe.Error)
// Encryption operations // Encryption operations
GetEncryption(ctx context.Context) (string, string, *probe.Error) GetEncryption(ctx context.Context) (string, string, *probe.Error)
SetEncryption(ctx context.Context, algorithm, kmsKeyID string) *probe.Error SetEncryption(ctx context.Context, algorithm, kmsKeyID string) *probe.Error

View File

@@ -86,26 +86,23 @@ EXAMPLES:
1. Add replication configuration rule on bucket "mybucket" for alias "myminio" to replicate all objects with tags 1. Add replication configuration rule on bucket "mybucket" for alias "myminio" to replicate all objects with tags
"key1=value1, key2=value2" to destbucket, including delete markers and versioned deletes. "key1=value1, key2=value2" to destbucket, including delete markers and versioned deletes.
{{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \ {{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \
--remote-bucket 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \
--storage-class "STANDARD" \ --storage-class "STANDARD" \
--arn 'arn:minio:replication::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \
--priority 1 \ --priority 1 \
--remote-bucket "destbucket"
--replicate "delete,delete-marker" --replicate "delete,delete-marker"
2. Add replication configuration rule with Disabled status on bucket "mybucket" for alias "myminio". 2. Add replication configuration rule with Disabled status on bucket "mybucket" for alias "myminio".
{{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \ {{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \
--storage-class "STANDARD" --disable \ --storage-class "STANDARD" --disable \
--arn 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \ --remote-bucket 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \
--priority 1 \ --priority 1
--remote-bucket "destbucket"
3. Add replication configuration rule with existing object replication, delete marker replication and versioned deletes 3. Add replication configuration rule with existing object replication, delete marker replication and versioned deletes
enabled on bucket "mybucket" for alias "myminio". enabled on bucket "mybucket" for alias "myminio".
{{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \ {{.Prompt}} {{.HelpName}} myminio/mybucket/prefix --tags "key1=value1&key2=value2" \
--storage-class "STANDARD" --disable \ --storage-class "STANDARD" \
--arn 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \ --remote-bucket 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' \
--priority 1 \ --priority 1 \
--remote-bucket "destbucket" \
--replicate "existing-objects,delete,delete-marker" --replicate "existing-objects,delete,delete-marker"
`, `,
} }
@@ -115,9 +112,6 @@ func checkReplicateAddSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 { if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "add", 1) // last argument is exit code cli.ShowCommandHelpAndExit(ctx, "add", 1) // last argument is exit code
} }
if ctx.String("arn") == "" {
fatal(errDummy().Trace(), "--arn flag needs to be specified.")
}
if ctx.String("remote-bucket") == "" { if ctx.String("remote-bucket") == "" {
fatal(errDummy().Trace(), "--remote-bucket flag needs to be specified.") fatal(errDummy().Trace(), "--remote-bucket flag needs to be specified.")
} }

View File

@@ -27,6 +27,7 @@ import (
"github.com/minio/cli" "github.com/minio/cli"
json "github.com/minio/colorjson" json "github.com/minio/colorjson"
"github.com/minio/mc/pkg/probe" "github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/pkg/console" "github.com/minio/pkg/console"
"maze.io/x/duration" "maze.io/x/duration"
) )
@@ -36,6 +37,10 @@ var replicateResetFlags = []cli.Flag{
Name: "older-than", Name: "older-than",
Usage: "re-replicate objects older than n days", Usage: "re-replicate objects older than n days",
}, },
cli.StringFlag{
Name: "remote-bucket",
Usage: "remote bucket ARN",
},
} }
var replicateResetCmd = cli.Command{ var replicateResetCmd = cli.Command{
@@ -56,11 +61,11 @@ FLAGS:
{{range .VisibleFlags}}{{.}} {{range .VisibleFlags}}{{.}}
{{end}} {{end}}
EXAMPLES: EXAMPLES:
1. Re-replicate previously replicated objects in bucket "mybucket" for alias "myminio". 1. Re-replicate previously replicated objects in bucket "mybucket" for alias "myminio" for remote target.
{{.Prompt}} {{.HelpName}} myminio/mybucket {{.Prompt}} {{.HelpName}} myminio/mybucket --remote-bucket "arn:minio:replication::xxx:mybucket"
2. Re-replicate all objects older than 60 days in bucket "mybucket". 2. Re-replicate all objects older than 60 days in bucket "mybucket" for remote bucket target.
{{.Prompt}} {{.HelpName}} myminio/mybucket --older-than 60d {{.Prompt}} {{.HelpName}} myminio/mybucket --older-than 60d --remote-bucket "arn:minio:replication::xxx:mybucket"
`, `,
} }
@@ -69,13 +74,17 @@ func checkReplicateResetSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 { if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "reset", 1) // last argument is exit code cli.ShowCommandHelpAndExit(ctx, "reset", 1) // last argument is exit code
} }
if ctx.String("remote-bucket") == "" {
fatal(errDummy().Trace(), "--remote-bucket flag needs to be specified.")
}
} }
type replicateResetMessage struct { type replicateResetMessage struct {
Op string `json:"op"` Op string `json:"op"`
URL string `json:"url"` URL string `json:"url"`
ResetID string `json:"resetID"` ResyncTargetsInfo replication.ResyncTargetsInfo `json:"resyncInfo"`
Status string `json:"status"` Status string `json:"status"`
TargetArn string `json:"targetArn"`
} }
func (r replicateResetMessage) JSON() string { func (r replicateResetMessage) JSON() string {
@@ -86,7 +95,11 @@ func (r replicateResetMessage) JSON() string {
} }
func (r replicateResetMessage) String() string { func (r replicateResetMessage) String() string {
return console.Colorize("replicateResetMessage", fmt.Sprintf("Replication reset started for %s with ID %s", r.URL, r.ResetID)) if len(r.ResyncTargetsInfo.Targets) == 1 {
return console.Colorize("replicateResetMessage", fmt.Sprintf("Replication reset started for %s with ID %s", r.URL, r.ResyncTargetsInfo.Targets[0].ResetID))
}
return console.Colorize("replicateResetMessage", fmt.Sprintf("Replication reset started for %s", r.URL))
} }
func mainReplicateReset(cliCtx *cli.Context) error { func mainReplicateReset(cliCtx *cli.Context) error {
@@ -119,12 +132,12 @@ func mainReplicateReset(cliCtx *cli.Context) error {
} }
} }
replicateReset, err := client.ResetReplication(ctx, olderThan) rinfo, err := client.ResetReplication(ctx, olderThan, cliCtx.String("remote-bucket"))
fatalIf(err.Trace(args...), "Unable to reset replication") fatalIf(err.Trace(args...), "Unable to reset replication")
printMsg(replicateResetMessage{ printMsg(replicateResetMessage{
Op: "status", Op: "status",
URL: aliasedURL, URL: aliasedURL,
ResetID: replicateReset, ResyncTargetsInfo: rinfo,
}) })
return nil return nil
} }

View File

@@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"fmt"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/fatih/color" "github.com/fatih/color"
@@ -72,38 +73,103 @@ func (s replicateStatusMessage) JSON() string {
return string(jsonMessageBytes) return string(jsonMessageBytes)
} }
func printReplicateStatusHeader() { func (s replicateStatusMessage) String() string {
if globalJSON { coloredDot := console.Colorize("Headers", dot)
return
}
maxLen := 15 maxLen := 15
console.Println(console.Colorize("Headers", newPrettyTable(" | ", var contents [][]string
Field{"Status", 20},
var rows string
var arntheme = []string{"Headers"}
var theme = []string{"Pending", "Failed", "Replicated", "Replica"}
contents = append(contents, []string{"Pending", humanize.IBytes(s.ReplicationStatus.PendingSize), humanize.Comma(int64(s.ReplicationStatus.PendingCount))})
contents = append(contents, []string{"Failed", humanize.IBytes(s.ReplicationStatus.FailedSize), humanize.Comma(int64(s.ReplicationStatus.FailedCount))})
contents = append(contents, []string{"Replicated", humanize.IBytes(s.ReplicationStatus.ReplicatedSize), ""})
contents = append(contents, []string{"Replica", humanize.IBytes(s.ReplicationStatus.ReplicaSize), ""})
var th string
if s.ReplicationStatus.PendingSize == 0 &&
s.ReplicationStatus.FailedSize == 0 &&
s.ReplicationStatus.ReplicaSize == 0 &&
s.ReplicationStatus.ReplicatedSize == 0 {
return "Replication status not available."
}
r := console.Colorize("THeaders", newPrettyTable(" | ",
Field{"Summary", 95},
).buildRow("Summary: "))
rows += r
rows += "\n"
hIdx := 0
for i, row := range contents {
if i%4 == 0 {
if hIdx > 0 {
rows += "\n"
}
hIdx++
rows += console.Colorize("TgtHeaders", newPrettyTable(" | ",
Field{"Status", 21},
Field{"Size", maxLen}, Field{"Size", maxLen},
Field{"Count", maxLen}, Field{"Count", maxLen},
).buildRow("Replication Status", "Size (Bytes)", "Count"))) ).buildRow("Replication Status ", "Size (Bytes)", "Count"))
rows += "\n"
} }
func (s replicateStatusMessage) String() string { idx := i % 4
maxLen := 15 th = theme[idx]
var contents = [][]string{
{"Pending", humanize.IBytes(s.ReplicationStatus.PendingSize), humanize.Comma(int64(s.ReplicationStatus.PendingCount))},
{"Failed", humanize.IBytes(s.ReplicationStatus.FailedSize), humanize.Comma(int64(s.ReplicationStatus.FailedCount))},
{"Replicated", humanize.IBytes(s.ReplicationStatus.ReplicatedSize), ""},
{"Replica", humanize.IBytes(s.ReplicationStatus.ReplicaSize), ""},
}
var rows string
var theme = []string{"Pending", "Failed", "Replica", "Replica"}
for i, row := range contents {
th := theme[i]
if row[1] == "0 B" && i == 1 {
th = theme[0]
}
r := console.Colorize(th, newPrettyTable(" | ", r := console.Colorize(th, newPrettyTable(" | ",
Field{"Status", 20}, Field{"Status", 21},
Field{"Size", maxLen}, Field{"Size", maxLen},
Field{"Count", maxLen}, Field{"Count", maxLen},
).buildRow(row[0], row[1], row[2])+"\n") ).buildRow(" "+row[0], row[1], row[2])+"\n")
rows += r
}
contents = nil
var arns []string
for arn := range s.ReplicationStatus.Stats {
arns = append(arns, arn)
}
for _, st := range s.ReplicationStatus.Stats {
contents = append(contents, []string{"Pending", humanize.IBytes(st.PendingSize), humanize.Comma(int64(st.PendingCount))})
contents = append(contents, []string{"Failed", humanize.IBytes(st.FailedSize), humanize.Comma(int64(st.FailedCount))})
contents = append(contents, []string{"Replicated", humanize.IBytes(st.ReplicatedSize), ""})
}
if len(contents) > 0 {
rows += "\n"
r := console.Colorize("THeaders", newPrettyTable(" | ",
Field{"Target statuses", 95},
).buildRow("Remote Target Statuses: "))
rows += r
rows += "\n"
}
hIdx = 0
for i, row := range contents {
if i%3 == 0 {
if hIdx > 0 {
rows += "\n"
}
th = arntheme[0]
r := console.Colorize(th, newPrettyTable(" | ",
Field{"ARN", 120},
).buildRow(fmt.Sprintf("%s %s", coloredDot, arns[hIdx])))
rows += r
rows += "\n"
hIdx++
rows += console.Colorize("TgtHeaders", newPrettyTable(" | ",
Field{"Status", 21},
Field{"Size", maxLen},
Field{"Count", maxLen},
).buildRow("Replication Status ", "Size (Bytes)", "Count"))
rows += "\n"
}
idx := i % 3
th = theme[idx]
r := console.Colorize(th, newPrettyTable(" | ",
Field{"Status", 21},
Field{"Size", maxLen},
Field{"Count", maxLen},
).buildRow(" "+row[0], row[1], row[2])+"\n")
rows += r rows += r
} }
return console.Colorize("replicateStatusMessage", rows) return console.Colorize("replicateStatusMessage", rows)
@@ -113,10 +179,13 @@ func mainReplicateStatus(cliCtx *cli.Context) error {
ctx, cancelReplicateStatus := context.WithCancel(globalContext) ctx, cancelReplicateStatus := context.WithCancel(globalContext)
defer cancelReplicateStatus() defer cancelReplicateStatus()
console.SetColor("Headers", color.New(color.FgGreen)) console.SetColor("THeaders", color.New(color.Bold, color.FgHiWhite))
console.SetColor("Replica", color.New(color.Bold, color.FgCyan)) console.SetColor("Headers", color.New(color.Bold, color.FgGreen))
console.SetColor("TgtHeaders", color.New(color.Bold, color.FgCyan))
console.SetColor("Replica", color.New(color.FgCyan))
console.SetColor("Failed", color.New(color.Bold, color.FgRed)) console.SetColor("Failed", color.New(color.Bold, color.FgRed))
console.SetColor("Pending", color.New(color.Bold, color.FgWhite)) console.SetColor("Pending", color.New(color.FgWhite))
checkReplicateStatusSyntax(cliCtx) checkReplicateStatusSyntax(cliCtx)
@@ -129,7 +198,6 @@ func mainReplicateStatus(cliCtx *cli.Context) error {
replicateStatus, err := client.GetReplicationMetrics(ctx) replicateStatus, err := client.GetReplicationMetrics(ctx)
fatalIf(err.Trace(args...), "Unable to get replication status") fatalIf(err.Trace(args...), "Unable to get replication status")
printReplicateStatusHeader()
printMsg(replicateStatusMessage{ printMsg(replicateStatusMessage{
Op: "status", Op: "status",
URL: aliasedURL, URL: aliasedURL,

View File

@@ -1811,7 +1811,7 @@ FLAGS:
*Example: Add replication configuration rule on `mybucket` on alias `myminio`. Enable delete marker replication and replication of versioned deletes for the configuration* *Example: Add replication configuration rule on `mybucket` on alias `myminio`. Enable delete marker replication and replication of versioned deletes for the configuration*
``` ```
mc replicate add myminio/mybucket/prefix --tags "key1=value1&key2=value2" --storage-class "STANDARD" --arn 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' --priority 1 --remote-bucket destbucket --replicate "delete-marker,delete" mc replicate add myminio/mybucket/prefix --tags "key1=value1&key2=value2" --storage-class "STANDARD" --remote-bucket 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' --priority 1 --replicate "delete-marker,delete"
Replication configuration rule applied to myminio/mybucket/prefix. Replication configuration rule applied to myminio/mybucket/prefix.
``` ```

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/minio/filepath v1.0.0 github.com/minio/filepath v1.0.0
github.com/minio/madmin-go v1.1.5 github.com/minio/madmin-go v1.1.5
github.com/minio/md5-simd v1.1.1 // indirect github.com/minio/md5-simd v1.1.1 // indirect
github.com/minio/minio-go/v7 v7.0.13-0.20210819151058-7877ed5b8110 github.com/minio/minio-go/v7 v7.0.14-0.20210913162205-f7d98f5e10a8
github.com/minio/pkg v1.0.10 github.com/minio/pkg v1.0.10
github.com/minio/selfupdate v0.3.1 github.com/minio/selfupdate v0.3.1
github.com/minio/sha256-simd v1.0.0 github.com/minio/sha256-simd v1.0.0

4
go.sum
View File

@@ -270,8 +270,8 @@ github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77Z
github.com/minio/md5-simd v1.1.1 h1:9ojcLbuZ4gXbB2sX53MKn8JUZ0sB/2wfwsEcRw+I08U= github.com/minio/md5-simd v1.1.1 h1:9ojcLbuZ4gXbB2sX53MKn8JUZ0sB/2wfwsEcRw+I08U=
github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw= github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw=
github.com/minio/minio-go/v7 v7.0.13-0.20210819151058-7877ed5b8110 h1:IFcUDB04Qdqbbn4Vn3A4UGCpIruEvaMHhZ0ymVnC7W8= github.com/minio/minio-go/v7 v7.0.14-0.20210913162205-f7d98f5e10a8 h1:gxTsIWqvrS1tNxKiPpUAMYjFiXOD0ygOvBIGGFsudkE=
github.com/minio/minio-go/v7 v7.0.13-0.20210819151058-7877ed5b8110/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs= github.com/minio/minio-go/v7 v7.0.14-0.20210913162205-f7d98f5e10a8/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8=
github.com/minio/pkg v1.0.10 h1:fohpAm/0ttQFf4BzmzH5r6A9JUIfg63AyGCPM0f9/9U= github.com/minio/pkg v1.0.10 h1:fohpAm/0ttQFf4BzmzH5r6A9JUIfg63AyGCPM0f9/9U=
github.com/minio/pkg v1.0.10/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14= github.com/minio/pkg v1.0.10/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14=