mirror of
https://github.com/minio/mc.git
synced 2025-11-10 13:42:32 +03:00
add support for upload/download limits (#4388)
This commit is contained in:
2
.github/workflows/go.yml
vendored
2
.github/workflows/go.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
|||||||
GO111MODULE: on
|
GO111MODULE: on
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [1.18.x, 1.19.x]
|
go-version: [1.19.x]
|
||||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||||
steps:
|
steps:
|
||||||
- name: Set up Go ${{ matrix.go-version }} on ${{ matrix.os }}
|
- name: Set up Go ${{ matrix.go-version }} on ${{ matrix.os }}
|
||||||
|
|||||||
@@ -190,6 +190,8 @@ func probeS3Signature(ctx context.Context, accessKey, secretKey, url string, pee
|
|||||||
Debug: globalDebug,
|
Debug: globalDebug,
|
||||||
ConnReadDeadline: globalConnReadDeadline,
|
ConnReadDeadline: globalConnReadDeadline,
|
||||||
ConnWriteDeadline: globalConnWriteDeadline,
|
ConnWriteDeadline: globalConnWriteDeadline,
|
||||||
|
UploadLimit: int64(globalLimitUpload),
|
||||||
|
DownloadLimit: int64(globalLimitDownload),
|
||||||
}
|
}
|
||||||
if peerCert != nil {
|
if peerCert != nil {
|
||||||
configurePeerCertificate(s3Config, peerCert)
|
configurePeerCertificate(s3Config, peerCert)
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/mc/pkg/deadlineconn"
|
"github.com/minio/mc/pkg/deadlineconn"
|
||||||
"github.com/minio/mc/pkg/httptracer"
|
"github.com/minio/mc/pkg/httptracer"
|
||||||
|
"github.com/minio/mc/pkg/limiter"
|
||||||
"github.com/minio/mc/pkg/probe"
|
"github.com/minio/mc/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -145,6 +146,7 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
|
|||||||
hostName = googleHostName
|
hostName = googleHostName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate a hash out of s3Conf.
|
// Generate a hash out of s3Conf.
|
||||||
confHash := fnv.New32a()
|
confHash := fnv.New32a()
|
||||||
confHash.Write([]byte(hostName + config.AccessKey + config.SecretKey + config.SessionToken))
|
confHash.Write([]byte(hostName + config.AccessKey + config.SecretKey + config.SessionToken))
|
||||||
@@ -211,6 +213,8 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
|
|||||||
transport = tr
|
transport = tr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
transport = limiter.New(config.UploadLimit, config.DownloadLimit, transport)
|
||||||
|
|
||||||
if config.Debug {
|
if config.Debug {
|
||||||
if strings.EqualFold(config.Signature, "S3v4") {
|
if strings.EqualFold(config.Signature, "S3v4") {
|
||||||
transport = httptracer.GetNewTraceTransport(newTraceV4(), transport)
|
transport = httptracer.GetNewTraceTransport(newTraceV4(), transport)
|
||||||
|
|||||||
@@ -225,6 +225,8 @@ type Config struct {
|
|||||||
Lookup minio.BucketLookupType
|
Lookup minio.BucketLookupType
|
||||||
ConnReadDeadline time.Duration
|
ConnReadDeadline time.Duration
|
||||||
ConnWriteDeadline time.Duration
|
ConnWriteDeadline time.Duration
|
||||||
|
UploadLimit int64
|
||||||
|
DownloadLimit int64
|
||||||
Transport *http.Transport
|
Transport *http.Transport
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,14 @@ var globalFlags = []cli.Flag{
|
|||||||
Name: "insecure",
|
Name: "insecure",
|
||||||
Usage: "disable SSL certificate verification",
|
Usage: "disable SSL certificate verification",
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "limit-upload",
|
||||||
|
Usage: "limits uploads to a maximum rate in KiB/s, MiB/s, GiB/s. (default: unlimited)",
|
||||||
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "limit-download",
|
||||||
|
Usage: "limits downloads to a maximum rate in KiB/s, MiB/s, GiB/s. (default: unlimited)",
|
||||||
|
},
|
||||||
cli.DurationFlag{
|
cli.DurationFlag{
|
||||||
Name: "conn-read-deadline",
|
Name: "conn-read-deadline",
|
||||||
Usage: "custom connection READ deadline",
|
Usage: "custom connection READ deadline",
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/minio/cli"
|
"github.com/minio/cli"
|
||||||
"github.com/minio/madmin-go/v2"
|
"github.com/minio/madmin-go/v2"
|
||||||
"github.com/minio/pkg/console"
|
"github.com/minio/pkg/console"
|
||||||
@@ -72,6 +73,9 @@ var (
|
|||||||
globalConnReadDeadline time.Duration
|
globalConnReadDeadline time.Duration
|
||||||
globalConnWriteDeadline time.Duration
|
globalConnWriteDeadline time.Duration
|
||||||
|
|
||||||
|
globalLimitUpload uint64
|
||||||
|
globalLimitDownload uint64
|
||||||
|
|
||||||
globalContext, globalCancel = context.WithCancel(context.Background())
|
globalContext, globalCancel = context.WithCancel(context.Background())
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -110,6 +114,39 @@ func setGlobalsFromContext(ctx *cli.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
globalConnReadDeadline = ctx.Duration("conn-read-deadline")
|
globalConnReadDeadline = ctx.Duration("conn-read-deadline")
|
||||||
|
if globalConnReadDeadline <= 0 {
|
||||||
|
globalConnReadDeadline = ctx.GlobalDuration("conn-read-deadline")
|
||||||
|
}
|
||||||
|
|
||||||
globalConnWriteDeadline = ctx.Duration("conn-write-deadline")
|
globalConnWriteDeadline = ctx.Duration("conn-write-deadline")
|
||||||
|
if globalConnWriteDeadline <= 0 {
|
||||||
|
globalConnWriteDeadline = ctx.GlobalDuration("conn-write-deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
limitUploadStr := ctx.String("limit-upload")
|
||||||
|
if limitUploadStr == "" {
|
||||||
|
limitUploadStr = ctx.GlobalString("limit-upload")
|
||||||
|
}
|
||||||
|
if limitUploadStr != "" {
|
||||||
|
var e error
|
||||||
|
globalLimitUpload, e = humanize.ParseBytes(limitUploadStr)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
limitDownloadStr := ctx.String("limit-download")
|
||||||
|
if limitDownloadStr == "" {
|
||||||
|
limitDownloadStr = ctx.GlobalString("limit-download")
|
||||||
|
}
|
||||||
|
|
||||||
|
if limitDownloadStr != "" {
|
||||||
|
var e error
|
||||||
|
globalLimitDownload, e = humanize.ParseBytes(limitDownloadStr)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,6 +141,8 @@ func NewS3Config(urlStr string, aliasCfg *aliasConfigV10) *Config {
|
|||||||
s3Config.Insecure = globalInsecure
|
s3Config.Insecure = globalInsecure
|
||||||
s3Config.ConnReadDeadline = globalConnReadDeadline
|
s3Config.ConnReadDeadline = globalConnReadDeadline
|
||||||
s3Config.ConnWriteDeadline = globalConnWriteDeadline
|
s3Config.ConnWriteDeadline = globalConnWriteDeadline
|
||||||
|
s3Config.UploadLimit = int64(globalLimitUpload)
|
||||||
|
s3Config.DownloadLimit = int64(globalLimitDownload)
|
||||||
|
|
||||||
s3Config.HostURL = urlStr
|
s3Config.HostURL = urlStr
|
||||||
if aliasCfg != nil {
|
if aliasCfg != nil {
|
||||||
|
|||||||
3
go.mod
3
go.mod
@@ -34,7 +34,7 @@ require (
|
|||||||
github.com/rs/xid v1.4.0
|
github.com/rs/xid v1.4.0
|
||||||
github.com/shirou/gopsutil/v3 v3.22.9
|
github.com/shirou/gopsutil/v3 v3.22.9
|
||||||
github.com/tidwall/gjson v1.14.3
|
github.com/tidwall/gjson v1.14.3
|
||||||
golang.org/x/crypto v0.3.0
|
golang.org/x/crypto v0.3.0 // indirect
|
||||||
golang.org/x/net v0.2.0
|
golang.org/x/net v0.2.0
|
||||||
golang.org/x/text v0.4.0
|
golang.org/x/text v0.4.0
|
||||||
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b
|
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b
|
||||||
@@ -47,6 +47,7 @@ require (
|
|||||||
github.com/charmbracelet/lipgloss v0.6.0
|
github.com/charmbracelet/lipgloss v0.6.0
|
||||||
github.com/gdamore/tcell/v2 v2.5.3
|
github.com/gdamore/tcell/v2 v2.5.3
|
||||||
github.com/golang-jwt/jwt/v4 v4.4.2
|
github.com/golang-jwt/jwt/v4 v4.4.2
|
||||||
|
github.com/juju/ratelimit v1.0.2
|
||||||
github.com/muesli/reflow v0.3.0
|
github.com/muesli/reflow v0.3.0
|
||||||
github.com/navidys/tvxwidgets v0.1.1
|
github.com/navidys/tvxwidgets v0.1.1
|
||||||
github.com/olekukonko/tablewriter v0.0.5
|
github.com/olekukonko/tablewriter v0.0.5
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -427,6 +427,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
|
|||||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||||
|
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
|
||||||
|
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
|
||||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
|
|||||||
95
pkg/limiter/limiter.go
Normal file
95
pkg/limiter/limiter.go
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// Package limiter implements throughput upload and download limits via http.RoundTripper
|
||||||
|
package limiter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/juju/ratelimit"
|
||||||
|
)
|
||||||
|
|
||||||
|
type limiter struct {
|
||||||
|
upload *ratelimit.Bucket
|
||||||
|
download *ratelimit.Bucket
|
||||||
|
transport http.RoundTripper // HTTP transport that needs to be intercepted
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l limiter) limitReader(r io.Reader, b *ratelimit.Bucket) io.Reader {
|
||||||
|
if b == nil {
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
return ratelimit.Reader(r, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoundTrip executes user provided request and response hooks for each HTTP call.
|
||||||
|
func (l limiter) RoundTrip(req *http.Request) (res *http.Response, err error) {
|
||||||
|
if l.transport == nil {
|
||||||
|
return nil, errors.New("Invalid Argument")
|
||||||
|
}
|
||||||
|
|
||||||
|
type readCloser struct {
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Body != nil {
|
||||||
|
req.Body = &readCloser{
|
||||||
|
Reader: l.limitReader(req.Body, l.upload),
|
||||||
|
Closer: req.Body,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = l.transport.RoundTrip(req)
|
||||||
|
if res != nil && res.Body != nil {
|
||||||
|
res.Body = &readCloser{
|
||||||
|
Reader: l.limitReader(res.Body, l.download),
|
||||||
|
Closer: res.Body,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// New return a ratelimited transport
|
||||||
|
func New(uploadLimit, downloadLimit int64, transport http.RoundTripper) http.RoundTripper {
|
||||||
|
if uploadLimit == 0 && downloadLimit == 0 {
|
||||||
|
return transport
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
uploadBucket *ratelimit.Bucket
|
||||||
|
downloadBucket *ratelimit.Bucket
|
||||||
|
)
|
||||||
|
|
||||||
|
if uploadLimit > 0 {
|
||||||
|
uploadBucket = ratelimit.NewBucketWithRate(float64(uploadLimit), uploadLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
if downloadLimit > 0 {
|
||||||
|
downloadBucket = ratelimit.NewBucketWithRate(float64(downloadLimit), downloadLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &limiter{
|
||||||
|
upload: uploadBucket,
|
||||||
|
download: downloadBucket,
|
||||||
|
transport: transport,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user