From b9e468ebbdd7ccb31f37c6c18678993bfdb5b7f5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 8 Jan 2013 18:53:35 +0000 Subject: [PATCH] Add initial S3 support * Requires a patched goamz to enable metadata reading and writing * Still todo: updating metadata * EU region not working yet --- fs.go | 3 + fs_s3.go | 366 +++++++++++++++++++++++++++++++++++++++++++++++++++ fs_swift.go | 4 +- notes.txt | 5 + swiftsync.go | 3 +- 5 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 fs_s3.go diff --git a/fs.go b/fs.go index e490542da..ce991a292 100644 --- a/fs.go +++ b/fs.go @@ -46,6 +46,9 @@ func NewFs(path string) (Fs, error) { if swiftMatch.MatchString(path) { return NewFsSwift(path) } + if s3Match.MatchString(path) { + return NewFsS3(path) + } return NewFsLocal(path) } diff --git a/fs_s3.go b/fs_s3.go new file mode 100644 index 000000000..f4d9ae3ca --- /dev/null +++ b/fs_s3.go @@ -0,0 +1,366 @@ +// S3 interface +package main + +import ( + "errors" + "flag" + "fmt" + "github.com/ncw/swift" + "io" + "launchpad.net/goamz/aws" + "launchpad.net/goamz/s3" + "log" + "mime" + "net/http" + "os" + "path" + "regexp" + "strconv" + "strings" + "time" +) + +// Constants +const ( + metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in +) + +// FsS3 represents a remote s3 server +type FsS3 struct { + c *s3.S3 // the connection to the s3 server + b *s3.Bucket // the connection to the bucket + bucket string // the bucket we are working on + perm s3.ACL // permissions for new buckets / objects +} + +// FsObjectS3 describes a s3 object +type FsObjectS3 struct { + // Will definitely have everything but meta which may be nil + // + // List will read everything but meta - to fill that in need to call + // readMetaData + s3 *FsS3 // what this object is part of + remote string // The remote path + etag string // md5sum of the object + bytes int64 // size of the object + lastModified time.Time // Last modified + meta s3.Headers // The object metadata if known - may be nil +} + +// ------------------------------------------------------------ + +// Globals +var ( + // Flags + awsAccessKeyId = flag.String("aws-access-key-id", os.Getenv("AWS_ACCESS_KEY_ID"), "AWS Access Key ID. Defaults to environment var AWS_ACCESS_KEY_ID.") + awsSecretAccessKey = flag.String("aws-secret-access-key", os.Getenv("AWS_SECRET_ACCESS_KEY"), "AWS Secret Access Key (password). Defaults to environment var AWS_SECRET_ACCESS_KEY.") + // AWS endpoints: http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region + s3Endpoint = flag.String("s3-endpoint", os.Getenv("S3_ENDPOINT"), "S3 Endpoint. Defaults to environment var S3_ENDPOINT then https://s3.amazonaws.com/.") + s3LocationConstraint = flag.String("s3-location-constraint", os.Getenv("S3_LOCATION_CONSTRAINT"), "Location constraint for creating buckets only. Defaults to environment var S3_LOCATION_CONSTRAINT.") +) + +// String converts this FsS3 to a string +func (f *FsS3) String() string { + return fmt.Sprintf("S3 bucket %s", f.bucket) +} + +// Pattern to match a s3 url +var s3Match = regexp.MustCompile(`^s3://([^/]+)(.*)$`) + +// parseParse parses a s3 'url' +func s3ParsePath(path string) (bucket, directory string, err error) { + parts := s3Match.FindAllStringSubmatch(path, -1) + if len(parts) != 1 || len(parts[0]) != 3 { + err = fmt.Errorf("Couldn't parse s3 url %q", path) + } else { + bucket, directory = parts[0][1], parts[0][2] + directory = strings.Trim(directory, "/") + } + return +} + +// s3Connection makes a connection to s3 +func s3Connection() (*s3.S3, error) { + // Make the auth + if *awsAccessKeyId == "" { + return nil, errors.New("Need -aws-access-key-id or environmental variable AWS_ACCESS_KEY_ID") + } + if *awsSecretAccessKey == "" { + return nil, errors.New("Need -aws-secret-access-key or environmental variable AWS_SECRET_ACCESS_KEY") + } + auth := aws.Auth{*awsAccessKeyId, *awsSecretAccessKey} + + // FIXME look through all the regions by name and use one of them if found + + // Synthesize the region + if *s3Endpoint == "" { + *s3Endpoint = "https://s3.amazonaws.com/" + } + region := aws.Region{ + Name: "s3", + S3Endpoint: *s3Endpoint, + S3LocationConstraint: false, + } + if *s3LocationConstraint != "" { + region.Name = *s3LocationConstraint + region.S3LocationConstraint = true + } + + c := s3.New(auth, region) + return c, nil +} + +// NewFsS3 contstructs an FsS3 from the path, bucket:path +func NewFsS3(path string) (*FsS3, error) { + bucket, directory, err := s3ParsePath(path) + if err != nil { + return nil, err + } + if directory != "" { + return nil, fmt.Errorf("Directories not supported yet in %q: %q", path, directory) + } + c, err := s3Connection() + if err != nil { + return nil, err + } + f := &FsS3{ + c: c, + bucket: bucket, + b: c.Bucket(bucket), + perm: s3.Private, // FIXME need user to specify + } + return f, nil +} + +// Lists the buckets +func S3Buckets() { + c, err := s3Connection() + if err != nil { + stats.Error() + log.Fatalf("Couldn't connect: %s", err) + } + buckets, err := c.List() + if err != nil { + stats.Error() + log.Fatalf("Couldn't list buckets: %s", err) + } + for _, bucket := range buckets.Buckets { + fmt.Printf("%12s %s\n", bucket.CreationDate, bucket.Name) + } +} + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsS3) NewFsObjectWithInfo(remote string, info *s3.Key) FsObject { + fs := &FsObjectS3{ + s3: f, + remote: remote, + } + if info != nil { + // Set info but not meta + var err error + fs.lastModified, err = time.Parse(time.RFC3339, info.LastModified) + if err != nil { + FsLog(fs, "Failed to read last modified: %s", err) + fs.lastModified = time.Now() + } + fs.etag = info.ETag + fs.bytes = info.Size + } else { + err := fs.readMetaData() // reads info and meta, returning an error + if err != nil { + // logged already FsDebug("Failed to read info: %s", err) + return nil + } + } + return fs +} + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsS3) NewFsObject(remote string) FsObject { + return f.NewFsObjectWithInfo(remote, nil) +} + +// Walk the path returning a channel of FsObjects +func (f *FsS3) List() FsObjectsChan { + out := make(FsObjectsChan, *checkers) + go func() { + // FIXME need to implement ALL loop + objects, err := f.b.List("", "", "", 10000) + if err != nil { + stats.Error() + log.Printf("Couldn't read bucket %q: %s", f.bucket, err) + } else { + for i := range objects.Contents { + object := &objects.Contents[i] + if fs := f.NewFsObjectWithInfo(object.Key, object); fs != nil { + out <- fs + } + } + } + close(out) + }() + return out +} + +// Put the FsObject into the bucket +func (f *FsS3) Put(src FsObject) { + // Temporary FsObject under construction + fs := &FsObjectS3{s3: f, remote: src.Remote()} + + in0, err := src.Open() + if err != nil { + stats.Error() + FsLog(fs, "Failed to open: %s", err) + return + } + in := NewAccount(in0) // account the transfer + defer in.Close() + + // Set the mtime in the headers + headers := s3.Headers{ + metaMtime: swift.TimeToFloatString(src.ModTime()), + } + + // Guess the content type + contentType := mime.TypeByExtension(path.Ext(fs.remote)) + if contentType == "" { + contentType = "application/octet-stream" + } + + _, err = fs.s3.b.PutReaderHeaders(fs.remote, in, src.Size(), contentType, fs.s3.perm, headers) + if err != nil { + stats.Error() + FsLog(fs, "Failed to upload: %s", err) + FsDebug(fs, "Removing failed upload") + removeErr := fs.Remove() + if removeErr != nil { + stats.Error() + FsLog(fs, "Failed to remove failed download: %s", removeErr) + } + return + } + FsDebug(fs, "Uploaded") +} + +// Mkdir creates the bucket if it doesn't exist +func (f *FsS3) Mkdir() error { + return f.b.PutBucket(f.perm) +} + +// Rmdir deletes the bucket +// +// Returns an error if it isn't empty +func (f *FsS3) Rmdir() error { + return f.b.DelBucket() +} + +// ------------------------------------------------------------ + +// Return the remote path +func (fs *FsObjectS3) Remote() string { + return fs.remote +} + +// Md5sum returns the Md5sum of an object returning a lowercase hex string +func (fs *FsObjectS3) Md5sum() (string, error) { + return strings.Trim(strings.ToLower(fs.etag), `"`), nil +} + +// Size returns the size of an object in bytes +func (fs *FsObjectS3) Size() int64 { + return fs.bytes +} + +// readMetaData gets the metadata if it hasn't already been fetched +// +// it also sets the info +func (fs *FsObjectS3) readMetaData() (err error) { + if fs.meta != nil { + return nil + } + + headers, err := fs.s3.b.Head(fs.remote, nil) + if err != nil { + FsDebug(fs, "Failed to read info: %s", err) + return err + } + size, err := strconv.ParseInt(headers["Content-Length"], 10, 64) + if err != nil { + FsDebug(fs, "Failed to read size from: %q", headers) + return err + } + fs.etag = headers["Etag"] + fs.bytes = size + fs.meta = headers + if fs.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil { + FsLog(fs, "Failed to read last modified from HEAD: %s", err) + fs.lastModified = time.Now() + } + return nil +} + +// ModTime returns the modification time of the object +// +// It attempts to read the objects mtime and if that isn't present the +// LastModified returned in the http headers +func (fs *FsObjectS3) ModTime() time.Time { + err := fs.readMetaData() + if err != nil { + FsLog(fs, "Failed to read metadata: %s", err) + return time.Now() + } + // read mtime out of metadata if available + d, ok := fs.meta[metaMtime] + if !ok { + // FsDebug(fs, "No metadata") + return fs.lastModified + } + modTime, err := swift.FloatStringToTime(d) + if err != nil { + FsLog(fs, "Failed to read mtime from object: %s", err) + return fs.lastModified + } + return modTime +} + +// Sets the modification time of the local fs object +func (fs *FsObjectS3) SetModTime(modTime time.Time) { + // FIXME goamz can't set metadata, though it is possible with a PUT and copy + FsLog(fs, "Can't set mod time yet: %s", modTime) + // err := fs.readMetaData() + // if err != nil { + // stats.Error() + // FsLog(fs, "Failed to read metadata: %s", err) + // return + // } + // fs.meta.SetModTime(modTime) + // err = fs.s3.c.ObjectUpdate(fs.s3.bucket, fs.remote, fs.meta.ObjectHeaders()) + // if err != nil { + // stats.Error() + // FsLog(fs, "Failed to update remote mtime: %s", err) + // } +} + +// Is this object storable +func (fs *FsObjectS3) Storable() bool { + return true +} + +// Open an object for read +func (fs *FsObjectS3) Open() (in io.ReadCloser, err error) { + in, err = fs.s3.b.GetReader(fs.remote) + return +} + +// Remove an object +func (fs *FsObjectS3) Remove() error { + return fs.s3.b.Del(fs.remote) +} + +// Check the interfaces are satisfied +var _ Fs = &FsS3{} +var _ FsObject = &FsObjectS3{} diff --git a/fs_swift.go b/fs_swift.go index ead17e404..0ab9cae81 100644 --- a/fs_swift.go +++ b/fs_swift.go @@ -47,7 +47,7 @@ func (f *FsSwift) String() string { } // Pattern to match a swift url -var swiftMatch = regexp.MustCompile(`^([^/:]+):(.*)$`) +var swiftMatch = regexp.MustCompile(`^swift://([^/]+)(.*)$`) // parseParse parses a swift 'url' func parsePath(path string) (container, directory string, err error) { @@ -294,7 +294,7 @@ func (fs *FsObjectSwift) Storable() bool { // Open an object for read func (fs *FsObjectSwift) Open() (in io.ReadCloser, err error) { - in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.info.Name, true, nil) + in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.remote, true, nil) return } diff --git a/notes.txt b/notes.txt index 7c3cce08c..53dc9320d 100644 --- a/notes.txt +++ b/notes.txt @@ -1,5 +1,8 @@ Todo + * Factor fses into own packages + * FIXME: ls without an argument for buckets/containers? * FIXME: More -dry-run checks for object transfer + * FIXME: EU Region not working * Ignoring the pseudo directories * if object.PseudoDirectory { * fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote()) @@ -14,8 +17,10 @@ Todo * Windows paths? Do we need to translate / and \? * Make a fs.Errorf and count errors and log them at a different level * add -modify-window flag - fs should keep knowledge of resolution + * Add max object size to fs metadata - 5GB for swift, infinite for local, ? for s3 Ideas + * could do encryption - put IV into metadata? * optimise remote copy container to another container using remote copy if local is same as remote * Allow subpaths container:/sub/path diff --git a/swiftsync.go b/swiftsync.go index 7fa800965..b763a7c01 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -283,7 +283,8 @@ func List(f Fs) { // Lists files in a container func list(fdst, fsrc Fs) { if fdst == nil { - SwiftContainers() + // FIXMESwiftContainers() + S3Buckets() return } List(fdst)