From 53a1a0e3efb2cf04aace48537a7691ada00e988b Mon Sep 17 00:00:00 2001 From: Aleksandar Jankovic Date: Mon, 22 Jul 2019 21:11:46 +0200 Subject: [PATCH] accounting: add reference to completed transfers Add core/transferred call that lists completed transfers and their status. --- backend/b2/b2.go | 23 +++++--- fs/accounting/stats.go | 24 ++++++-- fs/accounting/stats_groups.go | 52 +++++++++++++++++ fs/accounting/transfer.go | 103 +++++++++++++++++++++++++++++----- fs/operations/operations.go | 38 ++++++++----- fs/sync/sync.go | 9 +-- fs/sync/sync_test.go | 24 ++++---- 7 files changed, 218 insertions(+), 55 deletions(-) diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 5ec50f59e..ee9eb5e6b 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -548,12 +548,12 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *api.Fil if info != nil { err := o.decodeMetaData(info) if err != nil { - return nil, err + return o, err } } else { err := o.readMetaData(ctx) // reads info and headers, returning an error if err != nil { - return nil, err + return o, err } } return o, nil @@ -1084,16 +1084,25 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { go func() { defer wg.Done() for object := range toBeDeleted { - accounting.Stats(ctx).Checking(object.Name) - checkErr(f.deleteByID(object.ID, object.Name)) - accounting.Stats(ctx).DoneChecking(object.Name) + oi, err := f.newObjectWithInfo(ctx, object.Name, object) + if err != nil { + fs.Errorf(object, "Can't create object %+v", err) + } + tr := accounting.Stats(ctx).NewCheckingTransfer(oi) + err = f.deleteByID(object.ID, object.Name) + checkErr(err) + tr.Done(err) } }() } last := "" checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { if !isDirectory { - accounting.Stats(ctx).Checking(remote) + oi, err := f.newObjectWithInfo(ctx, object.Name, object) + if err != nil { + fs.Errorf(object, "Can't create object %+v", err) + } + tr := accounting.Stats(ctx).NewCheckingTransfer(oi) if oldOnly && last != remote { if object.Action == "hide" { fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID) @@ -1109,7 +1118,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { toBeDeleted <- object } last = remote - accounting.Stats(ctx).DoneChecking(remote) + tr.Done(nil) } return nil })) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 8ecae96df..a3747e7b9 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -280,6 +280,20 @@ Elapsed time: %10v return buf.String() } +// Transferred returns list of all completed transfers including checked and +// failed ones. +func (s *StatsInfo) Transferred() []TransferSnapshot { + ts := make([]TransferSnapshot, 0, len(s.startedTransfers)) + + for _, tr := range s.startedTransfers { + if tr.IsDone() { + ts = append(ts, tr.Snapshot()) + } + } + + return ts +} + // Log outputs the StatsInfo to the log func (s *StatsInfo) Log() { fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s) @@ -376,6 +390,7 @@ func (s *StatsInfo) ResetCounters() { s.checks = 0 s.transfers = 0 s.deletes = 0 + s.startedTransfers = nil } // ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError @@ -427,9 +442,10 @@ func (s *StatsInfo) RetryAfter() time.Time { return s.retryAfter } -// Checking adds a check into the stats -func (s *StatsInfo) Checking(remote string) { - s.checking.add(remote) +// NewCheckingTransfer adds a checking transfer to the stats, from the object. +func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer { + s.checking.add(obj.Remote()) + return newCheckingTransfer(s, obj) } // DoneChecking removes a check from the stats @@ -456,7 +472,7 @@ func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { // NewTransferRemoteSize adds a transfer to the stats based on remote and size. func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { s.transferring.add(remote) - return newTransferRemoteSize(s, remote, size) + return newTransferRemoteSize(s, remote, size, false) } // DoneTransferring removes a transfer from the stats diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go index 089da4d4d..69ba56b36 100644 --- a/fs/accounting/stats_groups.go +++ b/fs/accounting/stats_groups.go @@ -26,6 +26,23 @@ func remoteStats(ctx context.Context, in rc.Params) (rc.Params, error) { return groups.sum().RemoteStats() } +func transferredStats(ctx context.Context, in rc.Params) (rc.Params, error) { + // Check to see if we should filter by group. + group, err := in.GetString("group") + if rc.NotErrParamNotFound(err) { + return rc.Params{}, err + } + + out := make(rc.Params) + if group != "" { + out["transferred"] = StatsGroup(group).Transferred() + } else { + out["transferred"] = groups.sum().Transferred() + } + + return out, nil +} + func init() { // Init stats container groups = newStatsGroups() @@ -74,6 +91,40 @@ Returns the following values: ` + "```" + ` Values for "transferring", "checking" and "lastError" are only assigned if data is available. The value for "eta" is null if an eta cannot be determined. +`, + }) + + rc.Add(rc.Call{ + Path: "core/transferred", + Fn: transferredStats, + Title: "Returns stats about completed transfers.", + Help: ` +This returns stats about completed transfers: + + rclone rc core/transferred + +If group is not provided then completed transfers for all groups will be +returned. + +Parameters +- group - name of the stats group (string) + +Returns the following values: +` + "```" + ` +{ + "transferred": an array of completed transfers (including failed ones): + [ + { + "name": name of the file, + "size": size of the file in bytes, + "bytes": total transferred bytes for this file, + "checked": if the transfer is only checked (skipped, deleted), + "timestamp": integer representing millisecond unix epoch, + "error": string description of the error (empty if successfull), + "jobid": id of the job that this transfer belongs to + } + ] +} `, }) } @@ -184,6 +235,7 @@ func (sg *statsGroups) sum() *StatsInfo { if sum.lastError == nil && stats.lastError != nil { sum.lastError = stats.lastError } + sum.startedTransfers = append(sum.startedTransfers, stats.startedTransfers...) } return sum } diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index 4ac1d53c6..bf1370a10 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -1,6 +1,7 @@ package accounting import ( + "encoding/json" "io" "sync" "time" @@ -8,31 +9,67 @@ import ( "github.com/ncw/rclone/fs" ) +// TransferSnapshot represents state of an account at point in time. +type TransferSnapshot struct { + Name string `json:"name"` + Size int64 `json:"size"` + Bytes int64 `json:"bytes"` + Checked bool `json:"checked"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at,omitempty"` + Error error `json:"-"` +} + +// MarshalJSON implements json.Marshaler interface. +func (as TransferSnapshot) MarshalJSON() ([]byte, error) { + err := "" + if as.Error != nil { + err = as.Error.Error() + } + type Alias TransferSnapshot + return json.Marshal(&struct { + Error string `json:"error"` + Alias + }{ + Error: err, + Alias: (Alias)(as), + }) +} + // Transfer keeps track of initiated transfers and provides access to // accounting functions. // Transfer needs to be closed on completion. type Transfer struct { - stats *StatsInfo - acc *Account - remote string - size int64 + stats *StatsInfo + remote string + size int64 + checking bool + // Protects all bellow mu sync.Mutex + acc *Account + err error startedAt time.Time completedAt time.Time } -// newTransfer instantiates new transfer -func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer { - return newTransferRemoteSize(stats, obj.Remote(), obj.Size()) +// newCheckingTransfer instantiates new checking of the object. +func newCheckingTransfer(stats *StatsInfo, obj fs.Object) *Transfer { + return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true) } -func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfer { +// newTransfer instantiates new transfer. +func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer { + return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false) +} + +func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool) *Transfer { tr := &Transfer{ stats: stats, remote: remote, size: size, startedAt: time.Now(), + checking: checking, } stats.AddTransfer(tr) return tr @@ -41,26 +78,37 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfe // Done ends the transfer. // Must be called after transfer is finished to run proper cleanups. func (tr *Transfer) Done(err error) { + tr.mu.Lock() + defer tr.mu.Unlock() + if err != nil { tr.stats.Error(err) + tr.err = err } if tr.acc != nil { if err := tr.acc.Close(); err != nil { fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err) } } - tr.stats.DoneTransferring(tr.remote, err == nil) - tr.mu.Lock() + if tr.checking { + tr.stats.DoneChecking(tr.remote) + } else { + tr.stats.DoneTransferring(tr.remote, err == nil) + } + tr.completedAt = time.Now() - tr.mu.Unlock() } // Account returns reader that knows how to keep track of transfer progress. func (tr *Transfer) Account(in io.ReadCloser) *Account { - if tr.acc != nil { - return tr.acc + tr.mu.Lock() + defer tr.mu.Unlock() + + if tr.acc == nil { + tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote) + } - return newAccountSizeName(tr.stats, in, tr.size, tr.remote) + return tr.acc } // TimeRange returns the time transfer started and ended at. If not completed @@ -70,3 +118,30 @@ func (tr *Transfer) TimeRange() (time.Time, time.Time) { defer tr.mu.Unlock() return tr.startedAt, tr.completedAt } + +// IsDone returns true if transfer is completed. +func (tr *Transfer) IsDone() bool { + tr.mu.Lock() + defer tr.mu.Unlock() + return !tr.completedAt.IsZero() +} + +// Snapshot produces stats for this account at point in time. +func (tr *Transfer) Snapshot() TransferSnapshot { + tr.mu.Lock() + defer tr.mu.Unlock() + + var s, b int64 = tr.size, 0 + if tr.acc != nil { + b, s = tr.acc.progress() + } + return TransferSnapshot{ + Name: tr.remote, + Checked: tr.checking, + Size: s, + Bytes: b, + StartedAt: tr.startedAt, + CompletedAt: tr.completedAt, + Error: tr.err, + } +} diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 3da4d5762..7d4b8f604 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -176,7 +176,7 @@ func equal(ctx context.Context, src fs.ObjectInfo, dst fs.Object, sizeOnly, chec // Size and hash the same but mtime different // Error if objects are treated as immutable if fs.Config.Immutable { - fs.Errorf(dst, "Timestamp mismatch between immutable objects") + fs.Errorf(dst, "StartedAt mismatch between immutable objects") return false } // Update the mtime of the dst object here @@ -428,9 +428,9 @@ func SameObject(src, dst fs.Object) bool { // It returns the destination object if possible. Note that this may // be nil. func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - accounting.Stats(ctx).Checking(src.Remote()) + tr := accounting.Stats(ctx).NewCheckingTransfer(src) defer func() { - accounting.Stats(ctx).DoneChecking(src.Remote()) + tr.Done(err) }() newDst = dst if fs.Config.DryRun { @@ -501,7 +501,10 @@ func SuffixName(remote string) string { // If backupDir is set then it moves the file to there instead of // deleting func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) { - accounting.Stats(ctx).Checking(dst.Remote()) + tr := accounting.Stats(ctx).NewCheckingTransfer(dst) + defer func() { + tr.Done(err) + }() numDeletes := accounting.Stats(ctx).Deletes(1) if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete { return fserrors.FatalError(errors.New("--max-delete threshold reached")) @@ -523,7 +526,6 @@ func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs } else if !fs.Config.DryRun { fs.Infof(dst, actioned) } - accounting.Stats(ctx).DoneChecking(dst.Remote()) return err } @@ -709,10 +711,13 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { // check to see if two objects are identical using the check function func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool) { - accounting.Stats(ctx).Checking(src.Remote()) - defer accounting.Stats(ctx).DoneChecking(src.Remote()) + var err error + tr := accounting.Stats(ctx).NewCheckingTransfer(src) + defer func() { + tr.Done(err) + }() if sizeDiffers(src, dst) { - err := errors.Errorf("Sizes differ") + err = errors.Errorf("Sizes differ") fs.Errorf(src, "%v", err) fs.CountError(err) return true, false @@ -930,9 +935,11 @@ func List(ctx context.Context, f fs.Fs, w io.Writer) error { // Lists in parallel which may get them out of order func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error { return ListFn(ctx, f, func(o fs.Object) { - accounting.Stats(ctx).Checking(o.Remote()) + tr := accounting.Stats(ctx).NewCheckingTransfer(o) + defer func() { + tr.Done(nil) + }() modTime := o.ModTime(ctx) - accounting.Stats(ctx).DoneChecking(o.Remote()) syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote()) }) } @@ -968,9 +975,12 @@ func DropboxHashSum(ctx context.Context, f fs.Fs, w io.Writer) error { // hashSum returns the human readable hash for ht passed in. This may // be UNSUPPORTED or ERROR. func hashSum(ctx context.Context, ht hash.Type, o fs.Object) string { - accounting.Stats(ctx).Checking(o.Remote()) + var err error + tr := accounting.Stats(ctx).NewCheckingTransfer(o) + defer func() { + tr.Done(err) + }() sum, err := o.Hash(ctx, ht) - accounting.Stats(ctx).DoneChecking(o.Remote()) if err == hash.ErrUnsupported { sum = "UNSUPPORTED" } else if err != nil { @@ -1711,11 +1721,11 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str _, err = Op(ctx, fdst, dstObj, dstFileName, srcObj) } else { - accounting.Stats(ctx).Checking(srcFileName) + tr := accounting.Stats(ctx).NewCheckingTransfer(srcObj) if !cp { err = DeleteFile(ctx, srcObj) } - defer accounting.Stats(ctx).DoneChecking(srcFileName) + tr.Done(err) } return err } diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 98bd863a6..8de7e3d4e 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -215,7 +215,8 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { return } src := pair.Src - accounting.Stats(s.ctx).Checking(src.Remote()) + var err error + tr := accounting.Stats(s.ctx).NewCheckingTransfer(src) // Check to see if can store this if src.Storable() { NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir) @@ -256,7 +257,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { } } } - accounting.Stats(s.ctx).DoneChecking(src.Remote()) + tr.Done(err) } } @@ -587,12 +588,12 @@ func (s *syncCopyMove) makeRenameMap() { for obj := range in { // only create hash for dst fs.Object if its size could match if _, found := possibleSizes[obj.Size()]; found { - accounting.Stats(s.ctx).Checking(obj.Remote()) + tr := accounting.Stats(s.ctx).NewCheckingTransfer(obj) hash := s.renameHash(obj) if hash != "" { s.pushRenameMap(hash, obj) } - accounting.Stats(s.ctx).DoneChecking(obj.Remote()) + tr.Done(nil) } } }() diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index 7c91c677d..daf49a5d8 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -1233,7 +1233,7 @@ func TestSyncCompareDest(t *testing.T) { file1 := r.WriteFile("one", "one", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1247,7 +1247,7 @@ func TestSyncCompareDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1dst) fstest.CheckItems(t, r.Flocal, file1b) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1263,7 +1263,7 @@ func TestSyncCompareDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3) fstest.CheckItems(t, r.Flocal, file1c) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1275,14 +1275,14 @@ func TestSyncCompareDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3, file4) fstest.CheckItems(t, r.Flocal, file1c, file5) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, file2, file3, file4) // check new dest, new compare - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1293,7 +1293,7 @@ func TestSyncCompareDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3, file4) fstest.CheckItems(t, r.Flocal, file1c, file5b) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1324,7 +1324,7 @@ func TestSyncCopyDest(t *testing.T) { file1 := r.WriteFile("one", "one", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1338,7 +1338,7 @@ func TestSyncCopyDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1dst) fstest.CheckItems(t, r.Flocal, file1b) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1357,7 +1357,7 @@ func TestSyncCopyDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3) fstest.CheckItems(t, r.Flocal, file1c) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1374,7 +1374,7 @@ func TestSyncCopyDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4) fstest.CheckItems(t, r.Flocal, file1c, file5) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1384,7 +1384,7 @@ func TestSyncCopyDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4, file4dst) // check new dest, new copy - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1396,7 +1396,7 @@ func TestSyncCopyDest(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4, file4dst, file6) fstest.CheckItems(t, r.Flocal, file1c, file5, file7) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err)