azureblob: make all operations work from the root #3421

s3-about
Nick Craig-Wood 2019-08-16 10:10:56 +01:00
parent d8e9b1a67c
commit 8a0775ce3c
1 changed files with 200 additions and 231 deletions

View File

@ -13,11 +13,9 @@ import (
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"net/url"
"path"
"regexp"
"strconv"
"strings"
"sync"
@ -34,6 +32,7 @@ import (
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/pacer"
)
@ -144,19 +143,20 @@ type Options struct {
// Fs represents a remote azure server
type Fs struct {
name string // name of this remote
root string // the path we are working on if any
opt Options // parsed config options
features *fs.Features // optional features
client *http.Client // http client we are using
svcURL *azblob.ServiceURL // reference to serviceURL
cntURL *azblob.ContainerURL // reference to containerURL
container string // the container we are working on
containerOKMu sync.Mutex // mutex to protect container OK
containerOK bool // true if we have created the container
containerDeleted bool // true if we have deleted the container
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
name string // name of this remote
root string // the path we are working on if any
opt Options // parsed config options
features *fs.Features // optional features
client *http.Client // http client we are using
svcURL *azblob.ServiceURL // reference to serviceURL
cntURLcacheMu sync.Mutex // mutex to protect cntURLcache
cntURLcache map[string]*azblob.ContainerURL // reference to containerURL per container
rootContainer string // container part of root (if any)
rootDirectory string // directory part of root (if any)
isLimited bool // if limited to one container
cache *bucket.Cache // cache for container creation status
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
}
// Object describes a azure object
@ -180,18 +180,18 @@ func (f *Fs) Name() string {
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
if f.root == "" {
return f.container
}
return f.container + "/" + f.root
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
if f.root == "" {
return fmt.Sprintf("Azure container %s", f.container)
if f.rootContainer == "" {
return fmt.Sprintf("Azure root")
}
return fmt.Sprintf("Azure container %s path %s", f.container, f.root)
if f.rootDirectory == "" {
return fmt.Sprintf("Azure container %s", f.rootContainer)
}
return fmt.Sprintf("Azure container %s path %s", f.rootContainer, f.rootDirectory)
}
// Features returns the optional features of this Fs
@ -199,21 +199,23 @@ func (f *Fs) Features() *fs.Features {
return f.features
}
// Pattern to match a azure path
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
// parseParse parses a azure 'url'
func parsePath(path string) (container, directory string, err error) {
parts := matcher.FindStringSubmatch(path)
if parts == nil {
err = errors.Errorf("couldn't find container in azure path %q", path)
} else {
container, directory = parts[1], parts[2]
directory = strings.Trim(directory, "/")
}
// parsePath parses a remote 'url'
func parsePath(path string) (root string) {
root = strings.Trim(path, "/")
return
}
// split returns container and containerPath from the rootRelativePath
// relative to f.root
func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) {
return bucket.Split(path.Join(f.root, rootRelativePath))
}
// split returns container and containerPath from the object
func (o *Object) split() (container, containerPath string) {
return o.fs.split(o.remote)
}
// validateAccessTier checks if azureblob supports user supplied tier
func validateAccessTier(tier string) bool {
switch tier {
@ -318,6 +320,12 @@ func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
}
// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
f.root = parsePath(root)
f.rootContainer, f.rootDirectory = bucket.Split(f.root)
}
// NewFs constructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
ctx := context.Background()
@ -339,10 +347,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if opt.ListChunkSize > maxListChunkSize {
return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize)
}
container, directory, err := parsePath(root)
if err != nil {
return nil, err
}
if opt.Endpoint == "" {
opt.Endpoint = storageDefaultBaseURL
}
@ -357,24 +361,25 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f := &Fs{
name: name,
opt: *opt,
container: container,
root: directory,
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
client: fshttp.NewClient(fs.Config),
cache: bucket.NewCache(),
cntURLcache: make(map[string]*azblob.ContainerURL, 1),
}
f.setRoot(root)
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
SetTier: true,
GetTier: true,
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
GetTier: true,
}).Fill(f)
var (
u *url.URL
serviceURL azblob.ServiceURL
containerURL azblob.ContainerURL
u *url.URL
serviceURL azblob.ServiceURL
)
switch {
case opt.UseEmulator:
@ -388,7 +393,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
}
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case opt.Account != "" && opt.Key != "":
credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil {
@ -401,7 +405,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
}
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case opt.SASURL != "":
u, err = url.Parse(opt.SASURL)
if err != nil {
@ -412,38 +415,30 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Check if we have container level SAS or account level sas
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
if container != "" && parts.ContainerName != container {
if f.rootContainer != "" && parts.ContainerName != f.rootContainer {
return nil, errors.New("Container name in SAS URL and container provided in command do not match")
}
f.container = parts.ContainerName
containerURL = azblob.NewContainerURL(*u, pipeline)
containerURL := azblob.NewContainerURL(*u, pipeline)
f.cntURLcache[parts.ContainerName] = &containerURL
f.isLimited = true
} else {
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
}
default:
return nil, errors.New("Need account+key or connectionString or sasURL")
}
f.svcURL = &serviceURL
f.cntURL = &containerURL
if f.root != "" {
f.root += "/"
if f.rootContainer != "" && f.rootDirectory != "" {
// Check to see if the (container,directory) is actually an existing file
oldRoot := f.root
remote := path.Base(directory)
f.root = path.Dir(directory)
if f.root == "." {
f.root = ""
} else {
f.root += "/"
}
_, err := f.NewObject(ctx, remote)
newRoot, leaf := path.Split(oldRoot)
f.setRoot(newRoot)
_, err := f.NewObject(ctx, leaf)
if err != nil {
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
// File doesn't exist or is a directory so return old f
f.root = oldRoot
f.setRoot(oldRoot)
return f, nil
}
return nil, err
@ -454,6 +449,20 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
return f, nil
}
// return the container URL for the container passed in
func (f *Fs) cntURL(container string) (containerURL *azblob.ContainerURL) {
f.cntURLcacheMu.Lock()
defer f.cntURLcacheMu.Unlock()
var ok bool
if containerURL, ok = f.cntURLcache[container]; !ok {
cntURL := f.svcURL.NewContainerURL(container)
containerURL = &cntURL
f.cntURLcache[container] = containerURL
}
return containerURL
}
// Return an Object from a path
//
// If it can't be found it returns the error fs.ErrorObjectNotFound.
@ -483,8 +492,8 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
}
// getBlobReference creates an empty blob reference with no metadata
func (f *Fs) getBlobReference(remote string) azblob.BlobURL {
return f.cntURL.NewBlobURL(f.root + remote)
func (f *Fs) getBlobReference(container, containerPath string) azblob.BlobURL {
return f.cntURL(container).NewBlobURL(containerPath)
}
// updateMetadataWithModTime adds the modTime passed in to o.meta.
@ -520,16 +529,18 @@ type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error
// the container and root supplied
//
// dir is the starting directory, "" for root
func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint, fn listFn) error {
f.containerOKMu.Lock()
deleted := f.containerDeleted
f.containerOKMu.Unlock()
if deleted {
//
// The remote has prefix removed from it and if addContainer is set then
// it adds the container to the start.
func (f *Fs) list(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, maxResults uint, fn listFn) error {
if f.cache.IsDeleted(container) {
return fs.ErrorDirNotFound
}
root := f.root
if dir != "" {
root += dir + "/"
if prefix != "" {
prefix += "/"
}
if directory != "" {
directory += "/"
}
delimiter := ""
if !recurse {
@ -544,15 +555,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
UncommittedBlobs: false,
Deleted: false,
},
Prefix: root,
Prefix: directory,
MaxResults: int32(maxResults),
}
directoryMarkers := map[string]struct{}{}
for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListBlobsHierarchySegmentResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options)
response, err = f.cntURL(container).ListBlobsHierarchySegment(ctx, marker, delimiter, options)
return f.shouldRetry(err)
})
@ -572,26 +582,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
// return nil
// }
if !strings.HasPrefix(file.Name, f.root) {
if !strings.HasPrefix(file.Name, prefix) {
fs.Debugf(f, "Odd name received %q", file.Name)
continue
}
remote := file.Name[len(f.root):]
remote := file.Name[len(prefix):]
if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) {
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
err = fn(remote, file, true)
if err != nil {
return err
}
// Keep track of directory markers. If recursing then
// there will be no Prefixes so no need to keep track
if !recurse {
directoryMarkers[remote] = struct{}{}
}
continue // skip directory marker
}
if addContainer {
remote = path.Join(container, remote)
}
// Send object
err = fn(remote, file, false)
if err != nil {
@ -601,14 +602,13 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
// Send the subdirectories
for _, remote := range response.Segment.BlobPrefixes {
remote := strings.TrimRight(remote.Name, "/")
if !strings.HasPrefix(remote, f.root) {
if !strings.HasPrefix(remote, prefix) {
fs.Debugf(f, "Odd directory name received %q", remote)
continue
}
remote = remote[len(f.root):]
// Don't send if already sent as a directory marker
if _, found := directoryMarkers[remote]; found {
continue
remote = remote[len(prefix):]
if addContainer {
remote = path.Join(container, remote)
}
// Send object
err = fn(remote, nil, true)
@ -633,19 +633,9 @@ func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory
return o, nil
}
// mark the container as being OK
func (f *Fs) markContainerOK() {
if f.container != "" {
f.containerOKMu.Lock()
f.containerOK = true
f.containerDeleted = false
f.containerOKMu.Unlock()
}
}
// listDir lists a single directory
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
err = f.list(ctx, dir, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
@ -659,7 +649,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
return nil, err
}
// container must be present if listing succeeded
f.markContainerOK()
f.cache.MarkOK(container)
return entries, nil
}
@ -668,8 +658,18 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
if dir != "" {
return nil, fs.ErrorListBucketRequired
}
if f.isLimited {
f.cntURLcacheMu.Lock()
for container := range f.cntURLcache {
d := fs.NewDir(container, time.Time{})
entries = append(entries, d)
}
f.cntURLcacheMu.Unlock()
return entries, nil
}
err = f.listContainersToFn(func(container *azblob.ContainerItem) error {
d := fs.NewDir(container.Name, container.Properties.LastModified)
f.cache.MarkOK(container.Name)
entries = append(entries, d)
return nil
})
@ -689,10 +689,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
if f.container == "" {
return f.listContainers(dir)
container, directory := f.split(dir)
if container == "" {
return f.listContainers(directory)
}
return f.listDir(ctx, dir)
return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")
}
// ListR lists the objects and directories of the Fs starting
@ -712,22 +713,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
if f.container == "" {
return fs.ErrorListBucketRequired
}
container, directory := f.split(dir)
list := walk.NewListRHelper(callback)
err = f.list(ctx, dir, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
listR := func(container, directory, prefix string, addContainer bool) error {
return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
return list.Add(entry)
})
}
if container == "" {
entries, err := f.listContainers("")
if err != nil {
return err
}
for _, entry := range entries {
err = list.Add(entry)
if err != nil {
return err
}
container := entry.Remote()
err = listR(container, "", f.rootDirectory, true)
if err != nil {
return err
}
}
} else {
err = listR(container, directory, f.rootDirectory, f.rootContainer == "")
if err != nil {
return err
}
return list.Add(entry)
})
if err != nil {
return err
}
// container must be present if listing succeeded
f.markContainerOK()
f.cache.MarkOK(container)
return list.Flush()
}
@ -777,86 +797,38 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
return fs, fs.Update(ctx, in, src, options...)
}
// Check if the container exists
//
// NB this can return incorrect results if called immediately after container deletion
func (f *Fs) dirExists() (bool, error) {
options := azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
MaxResults: 1,
}
err := f.pacer.Call(func() (bool, error) {
ctx := context.Background()
_, err := f.cntURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, "", options)
return f.shouldRetry(err)
})
if err == nil {
return true, nil
}
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
return false, nil
}
return false, err
}
// Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
f.containerOKMu.Lock()
defer f.containerOKMu.Unlock()
if f.containerOK {
return nil
}
if !f.containerDeleted {
exists, err := f.dirExists()
if err == nil {
f.containerOK = exists
}
if err != nil || exists {
return err
}
}
// now try to create the container
err := f.pacer.Call(func() (bool, error) {
ctx := context.Background()
_, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok {
switch storageErr.ServiceCode() {
case azblob.ServiceCodeContainerAlreadyExists:
f.containerOK = true
return false, nil
case azblob.ServiceCodeContainerBeingDeleted:
// From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
// When a container is deleted, a container with the same name cannot be created
// for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
f.containerDeleted = true
return true, err
container, _ := f.split(dir)
return f.cache.Create(container, func() error {
// now try to create the container
return f.pacer.Call(func() (bool, error) {
_, err := f.cntURL(container).Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok {
switch storageErr.ServiceCode() {
case azblob.ServiceCodeContainerAlreadyExists:
return false, nil
case azblob.ServiceCodeContainerBeingDeleted:
// From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
// When a container is deleted, a container with the same name cannot be created
// for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
f.cache.MarkDeleted(container)
return true, err
}
}
}
}
return f.shouldRetry(err)
})
if err == nil {
f.containerOK = true
f.containerDeleted = false
}
return errors.Wrap(err, "failed to make container")
return f.shouldRetry(err)
})
}, nil)
}
// isEmpty checks to see if a given directory is empty and returns an error if not
func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) {
// isEmpty checks to see if a given (container, directory) is empty and returns an error if not
func (f *Fs) isEmpty(ctx context.Context, container, directory string) (err error) {
empty := true
err = f.list(ctx, dir, true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
err = f.list(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
empty = false
return nil
})
@ -871,47 +843,42 @@ func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) {
// deleteContainer deletes the container. It can delete a full
// container so use isEmpty if you don't want that.
func (f *Fs) deleteContainer() error {
f.containerOKMu.Lock()
defer f.containerOKMu.Unlock()
options := azblob.ContainerAccessConditions{}
ctx := context.Background()
err := f.pacer.Call(func() (bool, error) {
_, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
if err == nil {
_, err = f.cntURL.Delete(ctx, options)
}
func (f *Fs) deleteContainer(ctx context.Context, container string) error {
return f.cache.Remove(container, func() error {
options := azblob.ContainerAccessConditions{}
return f.pacer.Call(func() (bool, error) {
_, err := f.cntURL(container).GetProperties(ctx, azblob.LeaseAccessConditions{})
if err == nil {
_, err = f.cntURL(container).Delete(ctx, options)
}
if err != nil {
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
return false, fs.ErrorDirNotFound
if err != nil {
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
return false, fs.ErrorDirNotFound
}
return f.shouldRetry(err)
}
return f.shouldRetry(err)
}
return f.shouldRetry(err)
})
})
if err == nil {
f.containerOK = false
f.containerDeleted = true
}
return errors.Wrap(err, "failed to delete container")
}
// Rmdir deletes the container if the fs is at the root
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err := f.isEmpty(ctx, dir)
container, directory := f.split(dir)
if container == "" || directory != "" {
return nil
}
err := f.isEmpty(ctx, container, directory)
if err != nil {
return err
}
if f.root != "" || dir != "" {
return nil
}
return f.deleteContainer()
return f.deleteContainer(ctx, container)
}
// Precision of the remote
@ -927,11 +894,12 @@ func (f *Fs) Hashes() hash.Set {
// Purge deletes all the files and directories including the old versions.
func (f *Fs) Purge(ctx context.Context) error {
dir := "" // forward compat!
if f.root != "" || dir != "" {
// Delegate to caller if not root container
container, directory := f.split(dir)
if container == "" || directory != "" {
// Delegate to caller if not root of a container
return fs.ErrorCantPurge
}
return f.deleteContainer()
return f.deleteContainer(ctx, container)
}
// Copy src to this remote using server side copy operations.
@ -944,6 +912,7 @@ func (f *Fs) Purge(ctx context.Context) error {
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
dstContainer, dstPath := f.split(remote)
err := f.Mkdir(ctx, "")
if err != nil {
return nil, err
@ -953,7 +922,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstBlobURL := f.getBlobReference(remote)
dstBlobURL := f.getBlobReference(dstContainer, dstPath)
srcBlobURL := srcObj.getBlobReference()
source, err := url.Parse(srcBlobURL.String())
@ -1086,7 +1055,8 @@ func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) {
// getBlobReference creates an empty blob reference with no metadata
func (o *Object) getBlobReference() azblob.BlobURL {
return o.fs.getBlobReference(o.remote)
container, directory := o.split()
return o.fs.getBlobReference(container, directory)
}
// clearMetaData clears enough metadata so readMetaData will re-read it
@ -1206,7 +1176,6 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
ac := azblob.BlobAccessConditions{}
var dowloadResponse *azblob.DownloadResponse
err = o.fs.pacer.Call(func() (bool, error) {
log.Printf("offset=%d, count=%v", offset, count)
dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
return o.fs.shouldRetry(err)
})