diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 8fded2ebd..e7382c808 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -25,7 +25,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "net/http" "net/http/httptest" @@ -431,54 +430,6 @@ func TestCat(t *testing.T) { } } -func TestRcat(t *testing.T) { - checkSumBefore := fs.Config.CheckSum - defer func() { fs.Config.CheckSum = checkSumBefore }() - - check := func(withChecksum bool) { - fs.Config.CheckSum = withChecksum - prefix := "no_checksum_" - if withChecksum { - prefix = "with_checksum_" - } - - r := fstest.NewRun(t) - defer r.Finalise() - - if *fstest.SizeLimit > 0 && int64(fs.Config.StreamingUploadCutoff) > *fstest.SizeLimit { - savedCutoff := fs.Config.StreamingUploadCutoff - defer func() { - fs.Config.StreamingUploadCutoff = savedCutoff - }() - fs.Config.StreamingUploadCutoff = fs.SizeSuffix(*fstest.SizeLimit) - t.Logf("Adjust StreamingUploadCutoff to size limit %s (was %s)", fs.Config.StreamingUploadCutoff, savedCutoff) - } - - fstest.CheckListing(t, r.Fremote, []fstest.Item{}) - - data1 := "this is some really nice test data" - path1 := prefix + "small_file_from_pipe" - - data2 := string(make([]byte, fs.Config.StreamingUploadCutoff+1)) - path2 := prefix + "big_file_from_pipe" - - in := ioutil.NopCloser(strings.NewReader(data1)) - _, err := operations.Rcat(context.Background(), r.Fremote, path1, in, t1) - require.NoError(t, err) - - in = ioutil.NopCloser(strings.NewReader(data2)) - _, err = operations.Rcat(context.Background(), r.Fremote, path2, in, t2) - require.NoError(t, err) - - file1 := fstest.NewItem(path1, data1, t1) - file2 := fstest.NewItem(path2, data2, t2) - fstest.CheckItems(t, r.Fremote, file1, file2) - } - - check(true) - check(false) -} - func TestPurge(t *testing.T) { r := fstest.NewRunIndividual(t) // make new container (azureblob has delayed mkdir after rmdir) defer r.Finalise() @@ -667,32 +618,6 @@ func TestRmdirsLeaveRoot(t *testing.T) { ) } -func TestRcatSize(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - const body = "------------------------------------------------------------" - file1 := r.WriteFile("potato1", body, t1) - file2 := r.WriteFile("potato2", body, t2) - // Test with known length - bodyReader := ioutil.NopCloser(strings.NewReader(body)) - obj, err := operations.RcatSize(context.Background(), r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime) - require.NoError(t, err) - assert.Equal(t, int64(len(body)), obj.Size()) - assert.Equal(t, file1.Path, obj.Remote()) - - // Test with unknown length - bodyReader = ioutil.NopCloser(strings.NewReader(body)) // reset Reader - ioutil.NopCloser(strings.NewReader(body)) - obj, err = operations.RcatSize(context.Background(), r.Fremote, file2.Path, bodyReader, -1, file2.ModTime) - require.NoError(t, err) - assert.Equal(t, int64(len(body)), obj.Size()) - assert.Equal(t, file2.Path, obj.Remote()) - - // Check files exist - fstest.CheckItems(t, r.Fremote, file1, file2) -} - func TestCopyURL(t *testing.T) { r := fstest.NewRun(t) defer r.Finalise() diff --git a/fs/operations/xtra_operations_test.go b/fs/operations/xtra_operations_test.go new file mode 100644 index 000000000..fe8a142bf --- /dev/null +++ b/fs/operations/xtra_operations_test.go @@ -0,0 +1,112 @@ +// Extra operations tests (xtra_operations_test.go). +// +// This group contains tests, which involve streaming uploads. +// Currently they are TestRcat and TestRcatSize, which directly +// or implicitly invoke ioutil.NopCloser(). +// Indeterminate upload size triggers multi-upload in few backends. +// +// The S3 backend additionally triggers extra large upload buffers. +// Namely, multiupload track in the Object.upload() method of S3 +// backend (rclone/backends/s3.go) selects PartSize about 512M, +// upload.init() of AWS SDK (vendor/.../s3/s3manager/upload.go) +// allocates upload.bufferPool of that size for each concurrent +// upload goroutine. Given default concurrency of 4, this results +// in 2G buffers persisting until the test executable ends. +// +// As the rclone test suite parallelizes test runs, this may +// create memory pressure on a test box and trigger kernel swap, +// which extremely slows down the test and makes probability +// of memory contention between test processes even higher. +// +// Since name of this source file deliberately starts with `x`, +// its tests will run lattermost isolating high memory at the +// very end to somewhat reduce the contention probability. +// +package operations_test + +import ( + "context" + "io/ioutil" + "strings" + "testing" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fstest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRcat(t *testing.T) { + checkSumBefore := fs.Config.CheckSum + defer func() { fs.Config.CheckSum = checkSumBefore }() + + check := func(withChecksum bool) { + fs.Config.CheckSum = withChecksum + prefix := "no_checksum_" + if withChecksum { + prefix = "with_checksum_" + } + + r := fstest.NewRun(t) + defer r.Finalise() + + if *fstest.SizeLimit > 0 && int64(fs.Config.StreamingUploadCutoff) > *fstest.SizeLimit { + savedCutoff := fs.Config.StreamingUploadCutoff + defer func() { + fs.Config.StreamingUploadCutoff = savedCutoff + }() + fs.Config.StreamingUploadCutoff = fs.SizeSuffix(*fstest.SizeLimit) + t.Logf("Adjust StreamingUploadCutoff to size limit %s (was %s)", fs.Config.StreamingUploadCutoff, savedCutoff) + } + + fstest.CheckListing(t, r.Fremote, []fstest.Item{}) + + data1 := "this is some really nice test data" + path1 := prefix + "small_file_from_pipe" + + data2 := string(make([]byte, fs.Config.StreamingUploadCutoff+1)) + path2 := prefix + "big_file_from_pipe" + + in := ioutil.NopCloser(strings.NewReader(data1)) + _, err := operations.Rcat(context.Background(), r.Fremote, path1, in, t1) + require.NoError(t, err) + + in = ioutil.NopCloser(strings.NewReader(data2)) + _, err = operations.Rcat(context.Background(), r.Fremote, path2, in, t2) + require.NoError(t, err) + + file1 := fstest.NewItem(path1, data1, t1) + file2 := fstest.NewItem(path2, data2, t2) + fstest.CheckItems(t, r.Fremote, file1, file2) + } + + check(true) + check(false) +} + +func TestRcatSize(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + const body = "------------------------------------------------------------" + file1 := r.WriteFile("potato1", body, t1) + file2 := r.WriteFile("potato2", body, t2) + // Test with known length + bodyReader := ioutil.NopCloser(strings.NewReader(body)) + obj, err := operations.RcatSize(context.Background(), r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime) + require.NoError(t, err) + assert.Equal(t, int64(len(body)), obj.Size()) + assert.Equal(t, file1.Path, obj.Remote()) + + // Test with unknown length + bodyReader = ioutil.NopCloser(strings.NewReader(body)) // reset Reader + ioutil.NopCloser(strings.NewReader(body)) + obj, err = operations.RcatSize(context.Background(), r.Fremote, file2.Path, bodyReader, -1, file2.ModTime) + require.NoError(t, err) + assert.Equal(t, int64(len(body)), obj.Size()) + assert.Equal(t, file2.Path, obj.Remote()) + + // Check files exist + fstest.CheckItems(t, r.Fremote, file1, file2) +} diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 2505473d6..2c07b941d 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -579,112 +579,6 @@ func Run(t *testing.T, opt *Opt) { assert.Equal(t, fs.ErrorObjectNotFound, err) }) - t.Run("FsPutChunked", func(t *testing.T) { - skipIfNotOk(t) - if testing.Short() { - t.Skip("not running with -short") - } - - setUploadChunkSizer, _ := remote.(SetUploadChunkSizer) - if setUploadChunkSizer == nil { - t.Skipf("%T does not implement SetUploadChunkSizer", remote) - } - - setUploadCutoffer, _ := remote.(SetUploadCutoffer) - - minChunkSize := opt.ChunkedUpload.MinChunkSize - if minChunkSize < 100 { - minChunkSize = 100 - } - if opt.ChunkedUpload.CeilChunkSize != nil { - minChunkSize = opt.ChunkedUpload.CeilChunkSize(minChunkSize) - } - - maxChunkSize := 2 * fs.MebiByte - if maxChunkSize < 2*minChunkSize { - maxChunkSize = 2 * minChunkSize - } - if opt.ChunkedUpload.MaxChunkSize > 0 && maxChunkSize > opt.ChunkedUpload.MaxChunkSize { - maxChunkSize = opt.ChunkedUpload.MaxChunkSize - } - if opt.ChunkedUpload.CeilChunkSize != nil { - maxChunkSize = opt.ChunkedUpload.CeilChunkSize(maxChunkSize) - } - - next := func(f func(fs.SizeSuffix) fs.SizeSuffix) fs.SizeSuffix { - s := f(minChunkSize) - if s > maxChunkSize { - s = minChunkSize - } - return s - } - - chunkSizes := fs.SizeSuffixList{ - minChunkSize, - minChunkSize + (maxChunkSize-minChunkSize)/3, - next(NextPowerOfTwo), - next(NextMultipleOf(100000)), - next(NextMultipleOf(100001)), - maxChunkSize, - } - chunkSizes.Sort() - - // Set the minimum chunk size, upload cutoff and reset it at the end - oldChunkSize, err := setUploadChunkSizer.SetUploadChunkSize(minChunkSize) - require.NoError(t, err) - var oldUploadCutoff fs.SizeSuffix - if setUploadCutoffer != nil { - oldUploadCutoff, err = setUploadCutoffer.SetUploadCutoff(minChunkSize) - require.NoError(t, err) - } - defer func() { - _, err := setUploadChunkSizer.SetUploadChunkSize(oldChunkSize) - assert.NoError(t, err) - if setUploadCutoffer != nil { - _, err := setUploadCutoffer.SetUploadCutoff(oldUploadCutoff) - assert.NoError(t, err) - } - }() - - var lastCs fs.SizeSuffix - for _, cs := range chunkSizes { - if cs <= lastCs { - continue - } - if opt.ChunkedUpload.CeilChunkSize != nil { - cs = opt.ChunkedUpload.CeilChunkSize(cs) - } - lastCs = cs - - t.Run(cs.String(), func(t *testing.T) { - _, err := setUploadChunkSizer.SetUploadChunkSize(cs) - require.NoError(t, err) - if setUploadCutoffer != nil { - _, err = setUploadCutoffer.SetUploadCutoff(cs) - require.NoError(t, err) - } - - var testChunks []fs.SizeSuffix - if opt.ChunkedUpload.NeedMultipleChunks { - // If NeedMultipleChunks is set then test with > cs - testChunks = []fs.SizeSuffix{cs + 1, 2 * cs, 2*cs + 1} - } else { - testChunks = []fs.SizeSuffix{cs - 1, cs, 2*cs + 1} - } - - for _, fileSize := range testChunks { - t.Run(fmt.Sprintf("%d", fileSize), func(t *testing.T) { - TestPutLarge(t, remote, &fstest.Item{ - ModTime: fstest.Time("2001-02-03T04:05:06.499999999Z"), - Path: fmt.Sprintf("chunked-%s-%s.bin", cs.String(), fileSize.String()), - Size: int64(fileSize), - }) - }) - } - }) - } - }) - t.Run("FsPutZeroLength", func(t *testing.T) { skipIfNotOk(t) @@ -1600,7 +1494,26 @@ func Run(t *testing.T, opt *Opt) { fstest.CheckListingWithPrecision(t, remote, []fstest.Item{file2}, nil, fs.ModTimeNotSupported) }) - // TestFsPutStream tests uploading files when size is not known in advance + // TestAbout tests the About optional interface + t.Run("ObjectAbout", func(t *testing.T) { + skipIfNotOk(t) + + // Check have About + doAbout := remote.Features().About + if doAbout == nil { + t.Skip("FS does not support About") + } + + // Can't really check the output much! + usage, err := doAbout(context.Background()) + require.NoError(t, err) + require.NotNil(t, usage) + assert.NotEqual(t, int64(0), usage.Total) + }) + + // TestFsPutStream tests uploading files when size isn't known in advance. + // This may trigger large buffer allocation in some backends, keep it + // close to the end of suite. (See fs/operations/xtra_operations_test.go) t.Run("FsPutStream", func(t *testing.T) { skipIfNotOk(t) if remote.Features().PutStream == nil { @@ -1638,23 +1551,6 @@ func Run(t *testing.T, opt *Opt) { file.Check(t, obj, remote.Precision()) }) - // TestAbout tests the About optional interface - t.Run("ObjectAbout", func(t *testing.T) { - skipIfNotOk(t) - - // Check have About - doAbout := remote.Features().About - if doAbout == nil { - t.Skip("FS does not support About") - } - - // Can't really check the output much! - usage, err := doAbout(context.Background()) - require.NoError(t, err) - require.NotNil(t, usage) - assert.NotEqual(t, int64(0), usage.Total) - }) - // TestInternal calls InternalTest() on the Fs t.Run("Internal", func(t *testing.T) { skipIfNotOk(t) @@ -1667,8 +1563,120 @@ func Run(t *testing.T, opt *Opt) { }) + // TestFsPutChunked may trigger large buffer allocation with + // some backends (see fs/operations/xtra_operations_test.go), + // keep it closer to the end of suite. + t.Run("FsPutChunked", func(t *testing.T) { + skipIfNotOk(t) + if testing.Short() { + t.Skip("not running with -short") + } + + setUploadChunkSizer, _ := remote.(SetUploadChunkSizer) + if setUploadChunkSizer == nil { + t.Skipf("%T does not implement SetUploadChunkSizer", remote) + } + + setUploadCutoffer, _ := remote.(SetUploadCutoffer) + + minChunkSize := opt.ChunkedUpload.MinChunkSize + if minChunkSize < 100 { + minChunkSize = 100 + } + if opt.ChunkedUpload.CeilChunkSize != nil { + minChunkSize = opt.ChunkedUpload.CeilChunkSize(minChunkSize) + } + + maxChunkSize := 2 * fs.MebiByte + if maxChunkSize < 2*minChunkSize { + maxChunkSize = 2 * minChunkSize + } + if opt.ChunkedUpload.MaxChunkSize > 0 && maxChunkSize > opt.ChunkedUpload.MaxChunkSize { + maxChunkSize = opt.ChunkedUpload.MaxChunkSize + } + if opt.ChunkedUpload.CeilChunkSize != nil { + maxChunkSize = opt.ChunkedUpload.CeilChunkSize(maxChunkSize) + } + + next := func(f func(fs.SizeSuffix) fs.SizeSuffix) fs.SizeSuffix { + s := f(minChunkSize) + if s > maxChunkSize { + s = minChunkSize + } + return s + } + + chunkSizes := fs.SizeSuffixList{ + minChunkSize, + minChunkSize + (maxChunkSize-minChunkSize)/3, + next(NextPowerOfTwo), + next(NextMultipleOf(100000)), + next(NextMultipleOf(100001)), + maxChunkSize, + } + chunkSizes.Sort() + + // Set the minimum chunk size, upload cutoff and reset it at the end + oldChunkSize, err := setUploadChunkSizer.SetUploadChunkSize(minChunkSize) + require.NoError(t, err) + var oldUploadCutoff fs.SizeSuffix + if setUploadCutoffer != nil { + oldUploadCutoff, err = setUploadCutoffer.SetUploadCutoff(minChunkSize) + require.NoError(t, err) + } + defer func() { + _, err := setUploadChunkSizer.SetUploadChunkSize(oldChunkSize) + assert.NoError(t, err) + if setUploadCutoffer != nil { + _, err := setUploadCutoffer.SetUploadCutoff(oldUploadCutoff) + assert.NoError(t, err) + } + }() + + var lastCs fs.SizeSuffix + for _, cs := range chunkSizes { + if cs <= lastCs { + continue + } + if opt.ChunkedUpload.CeilChunkSize != nil { + cs = opt.ChunkedUpload.CeilChunkSize(cs) + } + lastCs = cs + + t.Run(cs.String(), func(t *testing.T) { + _, err := setUploadChunkSizer.SetUploadChunkSize(cs) + require.NoError(t, err) + if setUploadCutoffer != nil { + _, err = setUploadCutoffer.SetUploadCutoff(cs) + require.NoError(t, err) + } + + var testChunks []fs.SizeSuffix + if opt.ChunkedUpload.NeedMultipleChunks { + // If NeedMultipleChunks is set then test with > cs + testChunks = []fs.SizeSuffix{cs + 1, 2 * cs, 2*cs + 1} + } else { + testChunks = []fs.SizeSuffix{cs - 1, cs, 2*cs + 1} + } + + for _, fileSize := range testChunks { + t.Run(fmt.Sprintf("%d", fileSize), func(t *testing.T) { + TestPutLarge(t, remote, &fstest.Item{ + ModTime: fstest.Time("2001-02-03T04:05:06.499999999Z"), + Path: fmt.Sprintf("chunked-%s-%s.bin", cs.String(), fileSize.String()), + Size: int64(fileSize), + }) + }) + } + }) + } + }) + // TestFsUploadUnknownSize ensures Fs.Put() and Object.Update() don't panic when // src.Size() == -1 + // + // This may trigger large buffer allocation in some backends, keep it + // closer to the suite end. (See fs/operations/xtra_operations_test.go) t.Run("FsUploadUnknownSize", func(t *testing.T) { skipIfNotOk(t)