mirror of
https://github.com/minio/mc.git
synced 2025-04-18 10:04:03 +03:00
289 lines
7.0 KiB
Go
289 lines
7.0 KiB
Go
// Copyright (c) 2015-2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/shirou/gopsutil/v3/mem"
|
|
)
|
|
|
|
const (
|
|
// Maximum number of parallel workers
|
|
maxParallelWorkers = 128
|
|
|
|
// Monitor tick to decide to add new workers
|
|
monitorPeriod = 4 * time.Second
|
|
)
|
|
|
|
// Number of workers added per bandwidth monitoring.
|
|
var defaultWorkerFactor = runtime.GOMAXPROCS(0)
|
|
|
|
// A task is a copy/mirror action that needs to be executed
|
|
type task struct {
|
|
// The function to execute in this task
|
|
fn func() URLs
|
|
// If set to true, ensure no tasks are
|
|
// executed in parallel to this one.
|
|
barrier bool
|
|
// The total size of the information that we need to upload
|
|
uploadSize int64
|
|
}
|
|
|
|
// ParallelManager - helps manage parallel workers to run tasks
|
|
type ParallelManager struct {
|
|
// Calculate sent bytes.
|
|
// Keep this as first element of struct because it guarantees 64bit
|
|
// alignment on 32 bit machines. atomic.* functions crash if operand is not
|
|
// aligned at 64bit. See https://github.com/golang/go/issues/599
|
|
sentBytes int64
|
|
|
|
// Synchronize workers
|
|
wg *sync.WaitGroup
|
|
barrierSync sync.RWMutex
|
|
|
|
// Current threads number
|
|
workersNum uint32
|
|
|
|
// Channel to receive tasks to run
|
|
queueCh chan task
|
|
|
|
// Channel to send back results
|
|
resultCh chan URLs
|
|
|
|
stopMonitorCh chan struct{}
|
|
|
|
// The maximum memory to use
|
|
maxMem uint64
|
|
}
|
|
|
|
// addWorker creates a new worker to process tasks
|
|
func (p *ParallelManager) addWorker() {
|
|
if atomic.LoadUint32(&p.workersNum) >= maxParallelWorkers {
|
|
// Number of maximum workers is reached, no need to
|
|
// to create a new one.
|
|
return
|
|
}
|
|
|
|
// Update number of threads
|
|
atomic.AddUint32(&p.workersNum, 1)
|
|
|
|
// Start a new worker
|
|
p.wg.Add(1)
|
|
go func() {
|
|
for {
|
|
// Wait for jobs
|
|
t, ok := <-p.queueCh
|
|
if !ok {
|
|
// No more tasks, quit
|
|
p.wg.Done()
|
|
return
|
|
}
|
|
|
|
// Execute the task and send the result to channel.
|
|
p.resultCh <- t.fn()
|
|
|
|
if t.barrier {
|
|
p.barrierSync.Unlock()
|
|
} else {
|
|
p.barrierSync.RUnlock()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (p *ParallelManager) Read(b []byte) (n int, err error) {
|
|
atomic.AddInt64(&p.sentBytes, int64(len(b)))
|
|
return len(b), nil
|
|
}
|
|
|
|
// monitorProgress monitors realtime transfer speed of data
|
|
// and increases threads until it reaches a maximum number of
|
|
// threads or notice there is no apparent enhancement of
|
|
// transfer speed.
|
|
func (p *ParallelManager) monitorProgress() {
|
|
go func() {
|
|
ticker := time.NewTicker(monitorPeriod)
|
|
defer ticker.Stop()
|
|
|
|
var prevSentBytes, maxBandwidth int64
|
|
var retry int
|
|
|
|
for {
|
|
select {
|
|
case <-p.stopMonitorCh:
|
|
// Ordered to quit immediately
|
|
return
|
|
case <-ticker.C:
|
|
// Compute new bandwidth from counted sent bytes
|
|
sentBytes := atomic.LoadInt64(&p.sentBytes)
|
|
bandwidth := sentBytes - prevSentBytes
|
|
prevSentBytes = sentBytes
|
|
|
|
if bandwidth <= maxBandwidth {
|
|
retry++
|
|
// We still want to add more workers
|
|
// until we are sure that it is not
|
|
// useful to add more of them.
|
|
if retry > 2 {
|
|
return
|
|
}
|
|
} else {
|
|
retry = 0
|
|
maxBandwidth = bandwidth
|
|
}
|
|
|
|
for i := 0; i < defaultWorkerFactor; i++ {
|
|
p.addWorker()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Queue task in parallel
|
|
func (p *ParallelManager) queueTask(fn func() URLs, uploadSize int64) {
|
|
p.doQueueTask(task{fn: fn, uploadSize: uploadSize})
|
|
}
|
|
|
|
// Queue task but ensures that no tasks is running at parallel,
|
|
// which also means wait until all concurrent tasks finish before
|
|
// queueing this and execute it solely.
|
|
func (p *ParallelManager) queueTaskWithBarrier(fn func() URLs, uploadSize int64) {
|
|
p.doQueueTask(task{fn: fn, barrier: true, uploadSize: uploadSize})
|
|
}
|
|
|
|
func (p *ParallelManager) enoughMemForUpload(uploadSize int64) bool {
|
|
if uploadSize < 0 {
|
|
panic("unexpected size")
|
|
}
|
|
|
|
if uploadSize == 0 || p.maxMem == 0 {
|
|
return true
|
|
}
|
|
|
|
estimateNeededMemoryForUpload := func(size int64) uint64 {
|
|
partsCount, partSize, _, e := minio.OptimalPartInfo(size, 0)
|
|
if e != nil {
|
|
panic(e)
|
|
}
|
|
if partsCount >= 4 {
|
|
return 4 * uint64(partSize)
|
|
}
|
|
return uint64(size)
|
|
}
|
|
|
|
smem := runtime.MemStats{}
|
|
if uploadSize > 50<<20 {
|
|
// GC if upload is bigger than 50MB.
|
|
runtime.GC()
|
|
}
|
|
runtime.ReadMemStats(&smem)
|
|
|
|
return estimateNeededMemoryForUpload(uploadSize)+smem.Alloc < p.maxMem
|
|
}
|
|
|
|
func (p *ParallelManager) doQueueTask(t task) {
|
|
// Check if we have enough memory to perform next task,
|
|
// if not, wait to finish all currents tasks to continue
|
|
if !p.enoughMemForUpload(t.uploadSize) {
|
|
t.barrier = true
|
|
}
|
|
if t.barrier {
|
|
p.barrierSync.Lock()
|
|
} else {
|
|
p.barrierSync.RLock()
|
|
}
|
|
p.queueCh <- t
|
|
}
|
|
|
|
// Wait for all workers to finish tasks before shutting down Parallel
|
|
func (p *ParallelManager) stopAndWait() {
|
|
close(p.queueCh)
|
|
p.wg.Wait()
|
|
close(p.stopMonitorCh)
|
|
}
|
|
|
|
const cgroupLimitFile = "/sys/fs/cgroup/memory/memory.limit_in_bytes"
|
|
|
|
func cgroupLimit(limitFile string) (limit uint64) {
|
|
buf, e := os.ReadFile(limitFile)
|
|
if e != nil {
|
|
return 9223372036854771712
|
|
}
|
|
limit, e = strconv.ParseUint(string(buf), 10, 64)
|
|
if e != nil {
|
|
return 9223372036854771712
|
|
}
|
|
return limit
|
|
}
|
|
|
|
func availableMemory() (available uint64) {
|
|
available = 4 << 30 // Default to 4 GiB when we can't find the limits.
|
|
|
|
if runtime.GOOS == "linux" {
|
|
available = cgroupLimit(cgroupLimitFile)
|
|
|
|
// No limit set, It's the highest positive signed 64-bit
|
|
// integer (2^63-1), rounded down to multiples of 4096 (2^12),
|
|
// the most common page size on x86 systems - for cgroup_limits.
|
|
if available != 9223372036854771712 {
|
|
// This means cgroup memory limit is configured.
|
|
return
|
|
} // no-limit set proceed to set the limits based on virtual memory.
|
|
|
|
} // for all other platforms limits are based on virtual memory.
|
|
|
|
memStats, _ := mem.VirtualMemory()
|
|
if memStats.Available > 0 {
|
|
available = memStats.Available
|
|
}
|
|
|
|
// Always use 50% of available memory.
|
|
available = available / 2
|
|
return
|
|
}
|
|
|
|
// newParallelManager starts new workers waiting for executing tasks
|
|
func newParallelManager(resultCh chan URLs) *ParallelManager {
|
|
p := &ParallelManager{
|
|
wg: &sync.WaitGroup{},
|
|
workersNum: 0,
|
|
stopMonitorCh: make(chan struct{}),
|
|
queueCh: make(chan task),
|
|
resultCh: resultCh,
|
|
maxMem: availableMemory(),
|
|
}
|
|
|
|
// Start with runtime.NumCPU().
|
|
for i := 0; i < runtime.NumCPU(); i++ {
|
|
p.addWorker()
|
|
}
|
|
|
|
// Start monitoring tasks progress
|
|
p.monitorProgress()
|
|
|
|
return p
|
|
}
|