From 56544bb2fd056716e957505210c156a99eee6d8c Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 18 Sep 2019 16:54:34 +0100 Subject: [PATCH] accounting: fix file handle leak on errors - fixes #3547 In 53a1a0e3efb2cf04 we introduced a problem where if there was an error on the file being transferred then the file was re-opened and the old one wasn't closed. This was partially fixed in bfbddab46be5d971 however this didn't address the case of the old file being closed. This is now fixed by - marking the file as open again in UpdateReader - moving the stopping the accounting machinery to a new method Done --- fs/accounting/accounting.go | 11 +++++++++-- fs/accounting/accounting_test.go | 31 +++++++++++++++++++++++-------- fs/accounting/transfer.go | 3 +++ 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 50282ff26..11018c199 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -124,6 +124,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) { acc.in = in acc.close = in acc.origIn = in + acc.closed = false if acc.withBuf { acc.WithBuffer() } @@ -243,14 +244,20 @@ func (acc *Account) Close() error { return nil } acc.closed = true - close(acc.exit) - acc.stats.inProgress.clear(acc.name) if acc.close == nil { return nil } return acc.close.Close() } +// Done with accounting - must be called to free accounting goroutine +func (acc *Account) Done() { + acc.mu.Lock() + defer acc.mu.Unlock() + close(acc.exit) + acc.stats.inProgress.clear(acc.name) +} + // progress returns bytes read as well as the size. // Size can be <= 0 if the size is unknown. func (acc *Account) progress() (bytes, size int64) { diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 3986902a8..3f365e493 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -32,6 +32,8 @@ func TestNewAccountSizeName(t *testing.T) { assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() assert.NoError(t, err) + assert.Equal(t, acc, stats.inProgress.get("test")) + acc.Done() assert.Nil(t, stats.inProgress.get("test")) } @@ -55,18 +57,31 @@ func TestAccountWithBuffer(t *testing.T) { } func TestAccountGetUpdateReader(t *testing.T) { - in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + test := func(doClose bool) func(t *testing.T) { + return func(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") - assert.Equal(t, in, acc.GetReader()) + assert.Equal(t, in, acc.GetReader()) + assert.Equal(t, acc, stats.inProgress.get("test")) - in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc.UpdateReader(in2) + if doClose { + // close the account before swapping it out + require.NoError(t, acc.Close()) + } - assert.Equal(t, in2, acc.GetReader()) + in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc.UpdateReader(in2) - assert.NoError(t, acc.Close()) + assert.Equal(t, in2, acc.GetReader()) + assert.Equal(t, acc, stats.inProgress.get("test")) + + assert.NoError(t, acc.Close()) + } + } + t.Run("NoClose", test(false)) + t.Run("Close", test(true)) } func TestAccountRead(t *testing.T) { diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index a1ab8d023..dc5e90d0e 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -96,9 +96,12 @@ func (tr *Transfer) Done(err error) { tr.mu.RUnlock() if acc != nil { + // Close the file if it is still open if err := acc.Close(); err != nil { fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err) } + // Signal done with accounting + acc.Done() } tr.mu.Lock()