fs: Add support for --max-transfer-cutoff modes #2672

This also adds max transfer cut off check for server side copies too
s3-about
Shing Kit Chan 2019-10-31 03:23:17 +08:00 committed by Nick Craig-Wood
parent 7d70eb0346
commit 6f1766dd9e
9 changed files with 155 additions and 11 deletions

View File

@ -736,6 +736,20 @@ When the limit is reached all transfers will stop immediately.
Rclone will exit with exit code 8 if the transfer limit is reached.
### --max-transfer-(hard,soft,cautious) ###
This modifies the behavior of `--max-transfer`
Defaults to `--max-transfer-hard`.
Specifiying `--max-transfer-hard` will stop transferring immediately
when Rclone reaches the limit.
Specifiying `--max-transfer-soft` will stop starting new transfers
when Rclone reaches the limit.
Specifiying `--max-transfer-cautious` will try to prevent Rclone
from reaching the limit.
### --modify-window=TIME ###
When checking whether a file has been modified, this is the maximum

View File

@ -61,7 +61,10 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str
exit: make(chan struct{}),
avg: 0,
lpTime: time.Now(),
max: int64(fs.Config.MaxTransfer),
max: -1,
}
if fs.Config.MaxTransferMode != fs.MaxTransferModeSoft {
acc.max = int64((fs.Config.MaxTransfer))
}
go acc.averageLoop()
stats.inProgress.set(acc.name, acc)

View File

@ -197,9 +197,12 @@ func TestAccountAccounter(t *testing.T) {
func TestAccountMaxTransfer(t *testing.T) {
old := fs.Config.MaxTransfer
oldMode := fs.Config.MaxTransferMode
fs.Config.MaxTransfer = 15
defer func() {
fs.Config.MaxTransfer = old
fs.Config.MaxTransferMode = oldMode
}()
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
@ -218,6 +221,20 @@ func TestAccountMaxTransfer(t *testing.T) {
assert.Equal(t, 0, n)
assert.Equal(t, ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
stats = NewStats()
acc = newAccountSizeName(stats, in, 1, "test")
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
}
func TestShortenName(t *testing.T) {

View File

@ -382,6 +382,22 @@ func (s *StatsInfo) GetBytes() int64 {
return s.bytes
}
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
func (s *StatsInfo) GetBytesWithPending() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
pending := int64(0)
for _, tr := range s.startedTransfers {
if tr.acc != nil {
bytes, size := tr.acc.progress()
if bytes < size {
pending += size - bytes
}
}
}
return s.bytes + pending
}
// Errors updates the stats for errors
func (s *StatsInfo) Errors(errors int64) {
s.mu.Lock()

View File

@ -93,6 +93,7 @@ type ConfigInfo struct {
UseServerModTime bool
MaxTransfer SizeSuffix
MaxDuration time.Duration
MaxTransferMode MaxTransferMode
MaxBacklog int
MaxStatsGroups int
StatsOneLine bool

View File

@ -20,15 +20,18 @@ import (
var (
// these will get interpreted into fs.Config via SetFlags() below
verbose int
quiet bool
dumpHeaders bool
dumpBodies bool
deleteBefore bool
deleteDuring bool
deleteAfter bool
bindAddr string
disableFeatures string
verbose int
quiet bool
dumpHeaders bool
dumpBodies bool
deleteBefore bool
deleteDuring bool
deleteAfter bool
bindAddr string
disableFeatures string
maxTransferHard bool
maxTransferSoft bool
maxTransferCautious bool
)
// AddFlags adds the non filing system specific flags to the command
@ -94,6 +97,9 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.")
flags.BoolVarP(flagSet, &maxTransferHard, "max-transfer-hard", "", false, "When transferring, stop immediately when --max-transfer is reached")
flags.BoolVarP(flagSet, &maxTransferSoft, "max-transfer-soft", "", false, "When transferring, stop starting new transfers when --max-transfer is reached")
flags.BoolVarP(flagSet, &maxTransferCautious, "max-transfer-cautious", "", false, "When transferring, try to avoid reaching --max-transfer")
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.")
flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.")
@ -178,6 +184,20 @@ func SetFlags() {
fs.Config.DeleteMode = fs.DeleteModeDefault
}
switch {
case maxTransferHard && (maxTransferSoft || maxTransferCautious),
maxTransferSoft && maxTransferCautious:
log.Fatalf(`Only one of --max-trans fer-hard, --max-transfer-soft or --max-transfer-cautious can be used.`)
case maxTransferHard:
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
case maxTransferSoft:
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
case maxTransferCautious:
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
default:
fs.Config.MaxTransferMode = fs.MaxTransferModeDefault
}
if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" {
log.Fatalf(`Can't use --compare-dest with --copy-dest.`)
}

12
fs/maxtransfermode.go Normal file
View File

@ -0,0 +1,12 @@
package fs
// MaxTransferMode describes the possible delete modes in the config
type MaxTransferMode byte
// MaxTransferMode constants
const (
MaxTransferModeHard MaxTransferMode = iota
MaxTransferModeSoft
MaxTransferModeCautious
MaxTransferModeDefault = MaxTransferModeHard
)

View File

@ -364,7 +364,8 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
actionTaken = "Copied (server side copy)"
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
// Check transfer limit for server side copies
if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) {
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
return nil, accounting.ErrorMaxTransferLimitReached
}
in := tr.Account(nil) // account the transfer
@ -385,6 +386,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
}
// If can't server side copy, do it manually
if err == fs.ErrorCantCopy {
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
return nil, accounting.ErrorMaxTransferLimitReached
}
if doMultiThreadCopy(f, src) {
// Number of streams proportional to size
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)

View File

@ -39,6 +39,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
@ -1527,3 +1528,58 @@ func TestRcatSize(t *testing.T) {
// Check files exist
fstest.CheckItems(t, r.Fremote, file1, file2)
}
func TestCopyFileMaxTransfer(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
old := fs.Config.MaxTransfer
oldMode := fs.Config.MaxTransferMode
defer func() {
fs.Config.MaxTransfer = old
fs.Config.MaxTransferMode = oldMode
accounting.Stats(context.Background()).ResetCounters()
}()
file1 := r.WriteFile("file1", "file1 contents", t1)
file2 := r.WriteFile("file2", "file2 contents...........", t2)
rfile1 := file1
rfile1.Path = "sub/file1"
rfile2 := file2
rfile2.Path = "sub/file2"
fs.Config.MaxTransfer = 15
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
accounting.Stats(context.Background()).ResetCounters()
err := operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile1.Path, file1.Path)
require.NoError(t, err)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
require.NoError(t, err)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1, rfile2)
}