rclone/fs/accounting/stats.go

676 lines
17 KiB
Go

package accounting
import (
"bytes"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/rc"
)
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
var MaxCompletedTransfers = 100
// StatsInfo accounts all transfers
type StatsInfo struct {
mu sync.RWMutex
bytes int64
errors int64
lastError error
fatalError bool
retryError bool
retryAfter time.Time
checks int64
checking *stringSet
checkQueue int
checkQueueSize int64
transfers int64
transferring *stringSet
transferQueue int
transferQueueSize int64
renameQueue int
renameQueueSize int64
deletes int64
inProgress *inProgress
startedTransfers []*Transfer // currently active transfers
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
oldDuration time.Duration // duration of transfers we have culled
group string
}
// NewStats creates an initialised StatsInfo
func NewStats() *StatsInfo {
return &StatsInfo{
checking: newStringSet(fs.Config.Checkers, "checking"),
transferring: newStringSet(fs.Config.Transfers, "transferring"),
inProgress: newInProgress(),
}
}
// RemoteStats returns stats for rc
func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
out = make(rc.Params)
s.mu.RLock()
out["speed"] = s.Speed()
out["bytes"] = s.bytes
out["errors"] = s.errors
out["fatalError"] = s.fatalError
out["retryError"] = s.retryError
out["checks"] = s.checks
out["transfers"] = s.transfers
out["deletes"] = s.deletes
out["elapsedTime"] = s.totalDuration().Seconds()
s.mu.RUnlock()
if !s.checking.empty() {
var c []string
s.checking.mu.RLock()
defer s.checking.mu.RUnlock()
for name := range s.checking.items {
c = append(c, name)
}
out["checking"] = c
}
if !s.transferring.empty() {
s.transferring.mu.RLock()
var t []rc.Params
for name := range s.transferring.items {
if acc := s.inProgress.get(name); acc != nil {
t = append(t, acc.RemoteStats())
} else {
t = append(t, s.transferRemoteStats(name))
}
}
out["transferring"] = t
s.transferring.mu.RUnlock()
}
if s.errors > 0 {
out["lastError"] = s.lastError.Error()
}
return out, nil
}
// Speed returns the average speed of the transfer in bytes/second
func (s *StatsInfo) Speed() float64 {
dt := s.totalDuration()
dtSeconds := dt.Seconds()
speed := 0.0
if dt > 0 {
speed = float64(s.bytes) / dtSeconds
}
return speed
}
func (s *StatsInfo) transferRemoteStats(name string) rc.Params {
s.mu.RLock()
defer s.mu.RUnlock()
for _, tr := range s.startedTransfers {
if tr.remote == name {
return rc.Params{
"name": name,
"size": tr.size,
}
}
}
return rc.Params{"name": name}
}
// timeRange is a start and end time of a transfer
type timeRange struct {
start time.Time
end time.Time
}
// timeRanges is a list of non-overlapping start and end times for
// transfers
type timeRanges []timeRange
// merge all the overlapping time ranges
func (trs *timeRanges) merge() {
Trs := *trs
// Sort by the starting time.
sort.Slice(Trs, func(i, j int) bool {
return Trs[i].start.Before(Trs[j].start)
})
// Merge overlaps and add distinctive ranges together
var (
newTrs = Trs[:0]
i, j = 0, 1
)
for i < len(Trs) {
if j < len(Trs) {
if !Trs[i].end.Before(Trs[j].start) {
if Trs[i].end.Before(Trs[j].end) {
Trs[i].end = Trs[j].end
}
j++
continue
}
}
newTrs = append(newTrs, Trs[i])
i = j
j++
}
*trs = newTrs
}
// cull remove any ranges whose start and end are before cutoff
// returning their duration sum
func (trs *timeRanges) cull(cutoff time.Time) (d time.Duration) {
var newTrs = (*trs)[:0]
for _, tr := range *trs {
if cutoff.Before(tr.start) || cutoff.Before(tr.end) {
newTrs = append(newTrs, tr)
} else {
d += tr.end.Sub(tr.start)
}
}
*trs = newTrs
return d
}
// total the time out of the time ranges
func (trs timeRanges) total() (total time.Duration) {
for _, tr := range trs {
total += tr.end.Sub(tr.start)
}
return total
}
// Total duration is union of durations of all transfers belonging to this
// object.
// Needs to be protected by mutex.
func (s *StatsInfo) totalDuration() time.Duration {
// copy of s.oldTimeRanges with extra room for the current transfers
timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers))
copy(timeRanges, s.oldTimeRanges)
// Extract time ranges of all transfers.
now := time.Now()
for i := range s.startedTransfers {
start, end := s.startedTransfers[i].TimeRange()
if end.IsZero() {
end = now
}
timeRanges = append(timeRanges, timeRange{start, end})
}
timeRanges.merge()
return s.oldDuration + timeRanges.total()
}
// eta returns the ETA of the current operation,
// rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false.
func eta(size, total int64, rate float64) (eta time.Duration, ok bool) {
if total <= 0 || size < 0 || rate <= 0 {
return 0, false
}
remaining := total - size
if remaining < 0 {
return 0, false
}
seconds := float64(remaining) / rate
return time.Second * time.Duration(seconds), true
}
// etaString returns the ETA of the current operation,
// rounded to full seconds.
// If the ETA cannot be determined it returns "-"
func etaString(done, total int64, rate float64) string {
d, ok := eta(done, total, rate)
if !ok {
return "-"
}
return fs.Duration(d).ReadableString()
}
// percent returns a/b as a percentage rounded to the nearest integer
// as a string
//
// if the percentage is invalid it returns "-"
func percent(a int64, b int64) string {
if a < 0 || b <= 0 {
return "-"
}
return fmt.Sprintf("%d%%", int(float64(a)*100/float64(b)+0.5))
}
// String convert the StatsInfo to a string for printing
func (s *StatsInfo) String() string {
// checking and transferring have their own locking so read
// here before lock to prevent deadlock on GetBytes
transferring, checking := s.transferring.count(), s.checking.count()
transferringBytesDone, transferringBytesTotal := s.transferring.progress(s)
s.mu.RLock()
dt := s.totalDuration()
dtSeconds := dt.Seconds()
dtSecondsOnly := dt.Truncate(time.Second/10) % time.Minute
speed := 0.0
if dt > 0 {
speed = float64(s.bytes) / dtSeconds
}
displaySpeed := speed
if fs.Config.DataRateUnit == "bits" {
displaySpeed *= 8
}
var (
totalChecks = int64(s.checkQueue) + s.checks + int64(checking)
totalTransfer = int64(s.transferQueue) + s.transfers + int64(transferring)
// note that s.bytes already includes transferringBytesDone so
// we take it off here to avoid double counting
totalSize = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone
currentSize = s.bytes
buf = &bytes.Buffer{}
xfrchkString = ""
dateString = ""
)
if !fs.Config.StatsOneLine {
_, _ = fmt.Fprintf(buf, "\nTransferred: ")
} else {
xfrchk := []string{}
if totalTransfer > 0 && s.transferQueue > 0 {
xfrchk = append(xfrchk, fmt.Sprintf("xfr#%d/%d", s.transfers, totalTransfer))
}
if totalChecks > 0 && s.checkQueue > 0 {
xfrchk = append(xfrchk, fmt.Sprintf("chk#%d/%d", s.checks, totalChecks))
}
if len(xfrchk) > 0 {
xfrchkString = fmt.Sprintf(" (%s)", strings.Join(xfrchk, ", "))
}
if fs.Config.StatsOneLineDate {
t := time.Now()
dateString = t.Format(fs.Config.StatsOneLineDateFormat) // Including the separator so people can customize it
}
}
_, _ = fmt.Fprintf(buf, "%s%10s / %s, %s, %s, ETA %s%s\n",
dateString,
fs.SizeSuffix(s.bytes),
fs.SizeSuffix(totalSize).Unit("Bytes"),
percent(s.bytes, totalSize),
fs.SizeSuffix(displaySpeed).Unit(strings.Title(fs.Config.DataRateUnit)+"/s"),
etaString(currentSize, totalSize, speed),
xfrchkString,
)
if !fs.Config.StatsOneLine {
errorDetails := ""
switch {
case s.fatalError:
errorDetails = " (fatal error encountered)"
case s.retryError:
errorDetails = " (retrying may help)"
case s.errors != 0:
errorDetails = " (no need to retry)"
}
// Add only non zero stats
if s.errors != 0 {
_, _ = fmt.Fprintf(buf, "Errors: %10d%s\n",
s.errors, errorDetails)
}
if s.checks != 0 || totalChecks != 0 {
_, _ = fmt.Fprintf(buf, "Checks: %10d / %d, %s\n",
s.checks, totalChecks, percent(s.checks, totalChecks))
}
if s.deletes != 0 {
_, _ = fmt.Fprintf(buf, "Deleted: %10d\n", s.deletes)
}
if s.transfers != 0 || totalTransfer != 0 {
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s\n",
s.transfers, totalTransfer, percent(s.transfers, totalTransfer))
}
_, _ = fmt.Fprintf(buf, "Elapsed time: %10ss\n", strings.TrimRight(dt.Truncate(time.Minute).String(), "0s")+fmt.Sprintf("%.1f", dtSecondsOnly.Seconds()))
}
// checking and transferring have their own locking so unlock
// here to prevent deadlock on GetBytes
s.mu.RUnlock()
// Add per transfer stats if required
if !fs.Config.StatsOneLine {
if !s.checking.empty() {
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress, s.transferring))
}
if !s.transferring.empty() {
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress, nil))
}
}
return buf.String()
}
// Transferred returns list of all completed transfers including checked and
// failed ones.
func (s *StatsInfo) Transferred() []TransferSnapshot {
ts := make([]TransferSnapshot, 0, len(s.startedTransfers))
for _, tr := range s.startedTransfers {
if tr.IsDone() {
ts = append(ts, tr.Snapshot())
}
}
return ts
}
// Log outputs the StatsInfo to the log
func (s *StatsInfo) Log() {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s)
}
// Bytes updates the stats for bytes bytes
func (s *StatsInfo) Bytes(bytes int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.bytes += bytes
}
// GetBytes returns the number of bytes transferred so far
func (s *StatsInfo) GetBytes() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.bytes
}
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
func (s *StatsInfo) GetBytesWithPending() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
pending := int64(0)
for _, tr := range s.startedTransfers {
if tr.acc != nil {
bytes, size := tr.acc.progress()
if bytes < size {
pending += size - bytes
}
}
}
return s.bytes + pending
}
// Errors updates the stats for errors
func (s *StatsInfo) Errors(errors int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.errors += errors
}
// GetErrors reads the number of errors
func (s *StatsInfo) GetErrors() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.errors
}
// GetLastError returns the lastError
func (s *StatsInfo) GetLastError() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastError
}
// GetChecks returns the number of checks
func (s *StatsInfo) GetChecks() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.checks
}
// FatalError sets the fatalError flag
func (s *StatsInfo) FatalError() {
s.mu.Lock()
defer s.mu.Unlock()
s.fatalError = true
}
// HadFatalError returns whether there has been at least one FatalError
func (s *StatsInfo) HadFatalError() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.fatalError
}
// RetryError sets the retryError flag
func (s *StatsInfo) RetryError() {
s.mu.Lock()
defer s.mu.Unlock()
s.retryError = true
}
// HadRetryError returns whether there has been at least one non-NoRetryError
func (s *StatsInfo) HadRetryError() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.retryError
}
// Deletes updates the stats for deletes
func (s *StatsInfo) Deletes(deletes int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()
s.deletes += deletes
return s.deletes
}
// ResetCounters sets the counters (bytes, checks, errors, transfers, deletes) to 0 and resets lastError, fatalError and retryError
func (s *StatsInfo) ResetCounters() {
s.mu.Lock()
defer s.mu.Unlock()
s.bytes = 0
s.errors = 0
s.lastError = nil
s.fatalError = false
s.retryError = false
s.retryAfter = time.Time{}
s.checks = 0
s.transfers = 0
s.deletes = 0
s.startedTransfers = nil
s.oldDuration = 0
}
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
func (s *StatsInfo) ResetErrors() {
s.mu.Lock()
defer s.mu.Unlock()
s.errors = 0
s.lastError = nil
s.fatalError = false
s.retryError = false
s.retryAfter = time.Time{}
}
// Errored returns whether there have been any errors
func (s *StatsInfo) Errored() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.errors != 0
}
// Error adds a single error into the stats, assigns lastError and eventually sets fatalError or retryError
func (s *StatsInfo) Error(err error) error {
if err == nil || fserrors.IsCounted(err) {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.errors++
s.lastError = err
err = fserrors.FsError(err)
fserrors.Count(err)
switch {
case fserrors.IsFatalError(err):
s.fatalError = true
case fserrors.IsRetryAfterError(err):
retryAfter := fserrors.RetryAfterErrorTime(err)
if s.retryAfter.IsZero() || retryAfter.Sub(s.retryAfter) > 0 {
s.retryAfter = retryAfter
}
s.retryError = true
case !fserrors.IsNoRetryError(err):
s.retryError = true
}
return err
}
// RetryAfter returns the time to retry after if it is set. It will
// be Zero if it isn't set.
func (s *StatsInfo) RetryAfter() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.retryAfter
}
// NewCheckingTransfer adds a checking transfer to the stats, from the object.
func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer {
s.checking.add(obj.Remote())
return newCheckingTransfer(s, obj)
}
// DoneChecking removes a check from the stats
func (s *StatsInfo) DoneChecking(remote string) {
s.checking.del(remote)
s.mu.Lock()
s.checks++
s.mu.Unlock()
}
// GetTransfers reads the number of transfers
func (s *StatsInfo) GetTransfers() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.transfers
}
// NewTransfer adds a transfer to the stats from the object.
func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer {
s.transferring.add(obj.Remote())
return newTransfer(s, obj)
}
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
s.transferring.add(remote)
return newTransferRemoteSize(s, remote, size, false)
}
// DoneTransferring removes a transfer from the stats
//
// if ok is true then it increments the transfers count
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.transferring.del(remote)
if ok {
s.mu.Lock()
s.transfers++
s.mu.Unlock()
}
}
// SetCheckQueue sets the number of queued checks
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
s.mu.Lock()
s.checkQueue = n
s.checkQueueSize = size
s.mu.Unlock()
}
// SetTransferQueue sets the number of queued transfers
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
s.mu.Lock()
s.transferQueue = n
s.transferQueueSize = size
s.mu.Unlock()
}
// SetRenameQueue sets the number of queued transfers
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
s.mu.Lock()
s.renameQueue = n
s.renameQueueSize = size
s.mu.Unlock()
}
// AddTransfer adds reference to the started transfer.
func (s *StatsInfo) AddTransfer(transfer *Transfer) {
s.mu.Lock()
s.startedTransfers = append(s.startedTransfers, transfer)
s.mu.Unlock()
}
// removeTransfer removes a reference to the started transfer in
// position i.
//
// Must be called with the lock held
func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) {
now := time.Now()
// add finished transfer onto old time ranges
start, end := transfer.TimeRange()
if end.IsZero() {
end = now
}
s.oldTimeRanges = append(s.oldTimeRanges, timeRange{start, end})
s.oldTimeRanges.merge()
// remove the found entry
s.startedTransfers = append(s.startedTransfers[:i], s.startedTransfers[i+1:]...)
// Find youngest active transfer
oldestStart := now
for i := range s.startedTransfers {
start, _ := s.startedTransfers[i].TimeRange()
if start.Before(oldestStart) {
oldestStart = start
}
}
// remove old entries older than that
s.oldDuration += s.oldTimeRanges.cull(oldestStart)
}
// RemoveTransfer removes a reference to the started transfer.
func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
s.mu.Lock()
for i, tr := range s.startedTransfers {
if tr == transfer {
s.removeTransfer(tr, i)
break
}
}
s.mu.Unlock()
}
// PruneTransfers makes sure there aren't too many old transfers by removing
// single finished transfer.
func (s *StatsInfo) PruneTransfers() {
if MaxCompletedTransfers < 0 {
return
}
s.mu.Lock()
// remove a transfer from the start if we are over quota
if len(s.startedTransfers) > MaxCompletedTransfers+fs.Config.Transfers {
for i, tr := range s.startedTransfers {
if tr.IsDone() {
s.removeTransfer(tr, i)
break
}
}
}
s.mu.Unlock()
}