rclone/backend/cache/handle.go

639 lines
16 KiB
Go

// +build !plan9
package cache
import (
"context"
"fmt"
"io"
"path"
"runtime"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/operations"
)
var uploaderMap = make(map[string]*backgroundWriter)
var uploaderMapMx sync.Mutex
// initBackgroundUploader returns a single instance
func initBackgroundUploader(fs *Fs) (*backgroundWriter, error) {
// write lock to create one
uploaderMapMx.Lock()
defer uploaderMapMx.Unlock()
if b, ok := uploaderMap[fs.String()]; ok {
// if it was already started we close it so that it can be started again
if b.running {
b.close()
} else {
return b, nil
}
}
bb := newBackgroundWriter(fs)
uploaderMap[fs.String()] = bb
return uploaderMap[fs.String()], nil
}
// Handle is managing the read/write/seek operations on an open handle
type Handle struct {
ctx context.Context
cachedObject *Object
cfs *Fs
memory *Memory
preloadQueue chan int64
preloadOffset int64
offset int64
seenOffsets map[int64]bool
mu sync.Mutex
workersWg sync.WaitGroup
confirmReading chan bool
workers int
maxWorkerID int
UseMemory bool
closed bool
reading bool
}
// NewObjectHandle returns a new Handle for an existing Object
func NewObjectHandle(ctx context.Context, o *Object, cfs *Fs) *Handle {
r := &Handle{
ctx: ctx,
cachedObject: o,
cfs: cfs,
offset: 0,
preloadOffset: -1, // -1 to trigger the first preload
UseMemory: !cfs.opt.ChunkNoMemory,
reading: false,
}
r.seenOffsets = make(map[int64]bool)
r.memory = NewMemory(-1)
// create a larger buffer to queue up requests
r.preloadQueue = make(chan int64, r.cfs.opt.TotalWorkers*10)
r.confirmReading = make(chan bool)
r.startReadWorkers()
return r
}
// cacheFs is a convenience method to get the parent cache FS of the object's manager
func (r *Handle) cacheFs() *Fs {
return r.cfs
}
// storage is a convenience method to get the persistent storage of the object's manager
func (r *Handle) storage() *Persistent {
return r.cacheFs().cache
}
// String representation of this reader
func (r *Handle) String() string {
return r.cachedObject.abs()
}
// startReadWorkers will start the worker pool
func (r *Handle) startReadWorkers() {
if r.workers > 0 {
return
}
totalWorkers := r.cacheFs().opt.TotalWorkers
if r.cacheFs().plexConnector.isConfigured() {
if !r.cacheFs().plexConnector.isConnected() {
err := r.cacheFs().plexConnector.authenticate()
if err != nil {
fs.Errorf(r, "failed to authenticate to Plex: %v", err)
}
}
if r.cacheFs().plexConnector.isConnected() {
totalWorkers = 1
}
}
r.scaleWorkers(totalWorkers)
}
// scaleOutWorkers will increase the worker pool count by the provided amount
func (r *Handle) scaleWorkers(desired int) {
current := r.workers
if current == desired {
return
}
if current > desired {
// scale in gracefully
for r.workers > desired {
r.preloadQueue <- -1
r.workers--
}
} else {
// scale out
for r.workers < desired {
w := &worker{
r: r,
id: r.maxWorkerID,
}
r.workersWg.Add(1)
r.workers++
r.maxWorkerID++
go w.run()
}
}
// ignore first scale out from 0
if current != 0 {
fs.Debugf(r, "scale workers to %v", desired)
}
}
func (r *Handle) confirmExternalReading() {
// if we have a max value of workers
// then we skip this step
if r.workers > 1 ||
!r.cacheFs().plexConnector.isConfigured() {
return
}
if !r.cacheFs().plexConnector.isPlaying(r.cachedObject) {
return
}
fs.Infof(r, "confirmed reading by external reader")
r.scaleWorkers(r.cacheFs().opt.TotalWorkers)
}
// queueOffset will send an offset to the workers if it's different from the last one
func (r *Handle) queueOffset(offset int64) {
if offset != r.preloadOffset {
// clean past in-memory chunks
if r.UseMemory {
go r.memory.CleanChunksByNeed(offset)
}
r.confirmExternalReading()
r.preloadOffset = offset
// clear the past seen chunks
// they will remain in our persistent storage but will be removed from transient
// so they need to be picked up by a worker
for k := range r.seenOffsets {
if k < offset {
r.seenOffsets[k] = false
}
}
for i := 0; i < r.workers; i++ {
o := r.preloadOffset + int64(r.cacheFs().opt.ChunkSize)*int64(i)
if o < 0 || o >= r.cachedObject.Size() {
continue
}
if v, ok := r.seenOffsets[o]; ok && v {
continue
}
r.seenOffsets[o] = true
r.preloadQueue <- o
}
}
}
// getChunk is called by the FS to retrieve a specific chunk of known start and size from where it can find it
// it can be from transient or persistent cache
// it will also build the chunk from the cache's specific chunk boundaries and build the final desired chunk in a buffer
func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
var data []byte
var err error
// we calculate the modulus of the requested offset with the size of a chunk
offset := chunkStart % int64(r.cacheFs().opt.ChunkSize)
// we align the start offset of the first chunk to a likely chunk in the storage
chunkStart = chunkStart - offset
r.queueOffset(chunkStart)
found := false
if r.UseMemory {
data, err = r.memory.GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
}
}
if !found {
// we're gonna give the workers a chance to pickup the chunk
// and retry a couple of times
for i := 0; i < r.cacheFs().opt.ReadRetries*8; i++ {
data, err = r.storage().GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
break
}
fs.Debugf(r, "%v: chunk retry storage: %v", chunkStart, i)
time.Sleep(time.Millisecond * 500)
}
}
// not found in ram or
// the worker didn't managed to download the chunk in time so we abort and close the stream
if err != nil || len(data) == 0 || !found {
if r.workers == 0 {
fs.Errorf(r, "out of workers")
return nil, io.ErrUnexpectedEOF
}
return nil, errors.Errorf("chunk not found %v", chunkStart)
}
// first chunk will be aligned with the start
if offset > 0 {
if offset > int64(len(data)) {
fs.Errorf(r, "unexpected conditions during reading. current position: %v, current chunk position: %v, current chunk size: %v, offset: %v, chunk size: %v, file size: %v",
r.offset, chunkStart, len(data), offset, r.cacheFs().opt.ChunkSize, r.cachedObject.Size())
return nil, io.ErrUnexpectedEOF
}
data = data[int(offset):]
}
return data, nil
}
// Read a chunk from storage or len(p)
func (r *Handle) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
var buf []byte
// first reading
if !r.reading {
r.reading = true
}
// reached EOF
if r.offset >= r.cachedObject.Size() {
return 0, io.EOF
}
currentOffset := r.offset
buf, err = r.getChunk(currentOffset)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(r, "(%v/%v) error (%v) response", currentOffset, r.cachedObject.Size(), err)
}
if len(buf) == 0 && err != io.ErrUnexpectedEOF {
return 0, io.EOF
}
readSize := copy(p, buf)
newOffset := currentOffset + int64(readSize)
r.offset = newOffset
return readSize, err
}
// Close will tell the workers to stop
func (r *Handle) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.closed {
return errors.New("file already closed")
}
close(r.preloadQueue)
r.closed = true
// wait for workers to complete their jobs before returning
r.workersWg.Wait()
r.memory.db.Flush()
fs.Debugf(r, "cache reader closed %v", r.offset)
return nil
}
// Seek will move the current offset based on whence and instruct the workers to move there too
func (r *Handle) Seek(offset int64, whence int) (int64, error) {
r.mu.Lock()
defer r.mu.Unlock()
var err error
switch whence {
case io.SeekStart:
fs.Debugf(r, "moving offset set from %v to %v", r.offset, offset)
r.offset = offset
case io.SeekCurrent:
fs.Debugf(r, "moving offset cur from %v to %v", r.offset, r.offset+offset)
r.offset += offset
case io.SeekEnd:
fs.Debugf(r, "moving offset end (%v) from %v to %v", r.cachedObject.Size(), r.offset, r.cachedObject.Size()+offset)
r.offset = r.cachedObject.Size() + offset
default:
err = errors.Errorf("cache: unimplemented seek whence %v", whence)
}
chunkStart := r.offset - (r.offset % int64(r.cacheFs().opt.ChunkSize))
if chunkStart >= int64(r.cacheFs().opt.ChunkSize) {
chunkStart = chunkStart - int64(r.cacheFs().opt.ChunkSize)
}
r.queueOffset(chunkStart)
return r.offset, err
}
type worker struct {
r *Handle
rc io.ReadCloser
id int
}
// String is a representation of this worker
func (w *worker) String() string {
return fmt.Sprintf("worker-%v <%v>", w.id, w.r.cachedObject.Name)
}
// reader will return a reader depending on the capabilities of the source reader:
// - if it supports seeking it will seek to the desired offset and return the same reader
// - if it doesn't support seeking it will close a possible existing one and open at the desired offset
// - if there's no reader associated with this worker, it will create one
func (w *worker) reader(offset, end int64, closeOpen bool) (io.ReadCloser, error) {
var err error
r := w.rc
if w.rc == nil {
r, err = w.r.cacheFs().openRateLimited(func() (io.ReadCloser, error) {
return w.r.cachedObject.Object.Open(w.r.ctx, &fs.RangeOption{Start: offset, End: end - 1})
})
if err != nil {
return nil, err
}
return r, nil
}
if !closeOpen {
if do, ok := r.(fs.RangeSeeker); ok {
_, err = do.RangeSeek(w.r.ctx, offset, io.SeekStart, end-offset)
return r, err
} else if do, ok := r.(io.Seeker); ok {
_, err = do.Seek(offset, io.SeekStart)
return r, err
}
}
_ = w.rc.Close()
return w.r.cacheFs().openRateLimited(func() (io.ReadCloser, error) {
r, err = w.r.cachedObject.Object.Open(w.r.ctx, &fs.RangeOption{Start: offset, End: end - 1})
if err != nil {
return nil, err
}
return r, nil
})
}
// run is the main loop for the worker which receives offsets to preload
func (w *worker) run() {
var err error
var data []byte
defer func() {
if w.rc != nil {
_ = w.rc.Close()
}
w.r.workersWg.Done()
}()
for {
chunkStart, open := <-w.r.preloadQueue
if chunkStart < 0 || !open {
break
}
// skip if it exists
if w.r.UseMemory {
if w.r.memory.HasChunk(w.r.cachedObject, chunkStart) {
continue
}
// add it in ram if it's in the persistent storage
data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart)
if err == nil {
err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
} else {
continue
}
}
} else {
if w.r.storage().HasChunk(w.r.cachedObject, chunkStart) {
continue
}
}
chunkEnd := chunkStart + int64(w.r.cacheFs().opt.ChunkSize)
// TODO: Remove this comment if it proves to be reliable for #1896
//if chunkEnd > w.r.cachedObject.Size() {
// chunkEnd = w.r.cachedObject.Size()
//}
w.download(chunkStart, chunkEnd, 0)
}
}
func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
var err error
var data []byte
// stop retries
if retry >= w.r.cacheFs().opt.ReadRetries {
return
}
// back-off between retries
if retry > 0 {
time.Sleep(time.Second * time.Duration(retry))
}
closeOpen := false
if retry > 0 {
closeOpen = true
}
w.rc, err = w.reader(chunkStart, chunkEnd, closeOpen)
// we seem to be getting only errors so we abort
if err != nil {
fs.Errorf(w, "object open failed %v: %v", chunkStart, err)
err = w.r.cachedObject.refreshFromSource(w.r.ctx, true)
if err != nil {
fs.Errorf(w, "%v", err)
}
w.download(chunkStart, chunkEnd, retry+1)
return
}
data = make([]byte, chunkEnd-chunkStart)
var sourceRead int
sourceRead, err = io.ReadFull(w.rc, data)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err)
err = w.r.cachedObject.refreshFromSource(w.r.ctx, true)
if err != nil {
fs.Errorf(w, "%v", err)
}
w.download(chunkStart, chunkEnd, retry+1)
return
}
data = data[:sourceRead] // reslice to remove extra garbage
if err == io.ErrUnexpectedEOF {
fs.Debugf(w, "partial downloaded chunk %v", fs.SizeSuffix(chunkStart))
} else {
fs.Debugf(w, "downloaded chunk %v", chunkStart)
}
if w.r.UseMemory {
err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
}
}
err = w.r.storage().AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err)
}
}
const (
// BackgroundUploadStarted is a state for a temp file that has started upload
BackgroundUploadStarted = iota
// BackgroundUploadCompleted is a state for a temp file that has completed upload
BackgroundUploadCompleted
// BackgroundUploadError is a state for a temp file that has an error upload
BackgroundUploadError
)
// BackgroundUploadState is an entity that maps to an existing file which is stored on the temp fs
type BackgroundUploadState struct {
Remote string
Status int
Error error
}
type backgroundWriter struct {
fs *Fs
stateCh chan int
running bool
notifyCh chan BackgroundUploadState
mu sync.Mutex
}
func newBackgroundWriter(f *Fs) *backgroundWriter {
b := &backgroundWriter{
fs: f,
stateCh: make(chan int),
notifyCh: make(chan BackgroundUploadState),
}
return b
}
func (b *backgroundWriter) close() {
b.stateCh <- 2
b.mu.Lock()
defer b.mu.Unlock()
b.running = false
}
func (b *backgroundWriter) pause() {
b.stateCh <- 1
}
func (b *backgroundWriter) play() {
b.stateCh <- 0
}
func (b *backgroundWriter) isRunning() bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.running
}
func (b *backgroundWriter) notify(remote string, status int, err error) {
state := BackgroundUploadState{
Remote: remote,
Status: status,
Error: err,
}
select {
case b.notifyCh <- state:
fs.Debugf(remote, "notified background upload state: %v", state.Status)
default:
}
}
func (b *backgroundWriter) run() {
state := 0
for {
b.mu.Lock()
b.running = true
b.mu.Unlock()
select {
case s := <-b.stateCh:
state = s
default:
//
}
switch state {
case 1:
runtime.Gosched()
time.Sleep(time.Millisecond * 500)
continue
case 2:
return
}
absPath, err := b.fs.cache.getPendingUpload(b.fs.Root(), time.Duration(b.fs.opt.TempWaitTime))
if err != nil || absPath == "" || !b.fs.isRootInPath(absPath) {
time.Sleep(time.Second)
continue
}
remote := b.fs.cleanRootFromPath(absPath)
b.notify(remote, BackgroundUploadStarted, nil)
fs.Infof(remote, "background upload: started upload")
err = operations.MoveFile(context.TODO(), b.fs.UnWrap(), b.fs.tempFs, remote, remote)
if err != nil {
b.notify(remote, BackgroundUploadError, err)
_ = b.fs.cache.rollbackPendingUpload(absPath)
fs.Errorf(remote, "background upload: %v", err)
continue
}
// clean empty dirs up to root
thisDir := cleanPath(path.Dir(remote))
for thisDir != "" {
thisList, err := b.fs.tempFs.List(context.TODO(), thisDir)
if err != nil {
break
}
if len(thisList) > 0 {
break
}
err = b.fs.tempFs.Rmdir(context.TODO(), thisDir)
fs.Debugf(thisDir, "cleaned from temp path")
if err != nil {
break
}
thisDir = cleanPath(path.Dir(thisDir))
}
fs.Infof(remote, "background upload: uploaded entry")
err = b.fs.cache.removePendingUpload(absPath)
if err != nil && !strings.Contains(err.Error(), "pending upload not found") {
fs.Errorf(remote, "background upload: %v", err)
}
parentCd := NewDirectory(b.fs, cleanPath(path.Dir(remote)))
err = b.fs.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "background upload: cache expire error: %v", err)
}
b.fs.notifyChangeUpstream(remote, fs.EntryObject)
fs.Infof(remote, "finished background upload")
b.notify(remote, BackgroundUploadCompleted, nil)
}
}
// Check the interfaces are satisfied
var (
_ io.ReadCloser = (*Handle)(nil)
_ io.Seeker = (*Handle)(nil)
)