Initial commit.

This commit is contained in:
Jeff Flowers 2017-06-09 11:39:42 -04:00
parent f614cdab67
commit 350dcc3651
3 changed files with 406 additions and 1 deletions

View File

@ -1,2 +1,35 @@
# s3-benchmark
Performance test for comparison of AWS versus Wasabi S3 systems.
s3-benchmark is a program for performing S3 operations PUT, GET, and DELETE for objects. Besides the
bucket configuration, the object size and number of threads can be given for different tests.
The test is loosely based on the Nasuni benchmark used to test the performance of different cloud
storage providers.
# Building the Program
If the test is being run on the Ubuntu version 16.04 LTS (the current long term release), the binary
executable s3-benchmark.ubuntu will run the benchmark without building.
Otherwise, to build the test, you must install the Go 1.7 system along with the supporting libraries.
# Command Line Arguments
Below are the command line arguments to the program:
-a string
Access key
-b string
Bucket for testing (default "wasabi-benchmark-bucket")
-d int
Duration of each test in seconds (default 60)
-l int
Number of times to repeat test (default 1)
-s string
Secret key
-t int
Number of threads to run (default 1)
-u string
URL for host with method prefix (default "http://s3.wasabisys.com")
-z string
Size of objects in bytes with postfix K, M, and G (default "1M")
# Example Benchmark

372
s3-benchmark.go Normal file
View File

@ -0,0 +1,372 @@
// s3-benchmark.go
// Copyright (c) 2017 Wasabi Technology, Inc.
package main
import (
"bytes"
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
"crypto/tls"
"encoding/base64"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pivotal-golang/bytefmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
// Global variables
var access_key, secret_key, url_host, bucket string
var duration_secs, threads, loops int
var object_size uint64
var object_data []byte
var object_data_md5 string
var running_threads, upload_count, download_count, delete_count int32
var endtime, upload_finish, download_finish, delete_finish time.Time
func logit(msg string) {
fmt.Println(msg)
logfile, _ := os.OpenFile("benchmark.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if logfile != nil {
logfile.WriteString(time.Now().Format(http.TimeFormat) + ": " + msg + "\n")
logfile.Close()
}
}
// Our HTTP transport used for the roundtripper below
var HTTPTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 0,
// Allow an unlimited number of idle connections
MaxIdleConnsPerHost: 4096,
MaxIdleConns: 0,
// But limit their idle time
IdleConnTimeout: time.Minute,
// Ignore TLS errors
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
var httpClient = &http.Client{Transport: HTTPTransport}
func getS3Client() *s3.S3 {
// Build our config
creds := credentials.NewStaticCredentials(access_key, secret_key, "")
loglevel := aws.LogOff
// Build the rest of the configuration
awsConfig := &aws.Config{
Region: aws.String("us-east-1"),
Endpoint: aws.String(url_host),
Credentials: creds,
LogLevel: &loglevel,
S3ForcePathStyle: aws.Bool(true),
S3Disable100Continue: aws.Bool(true),
// Comment following to use default transport
HTTPClient: &http.Client{Transport: HTTPTransport},
}
session := session.New(awsConfig)
client := s3.New(session)
if client == nil {
log.Fatalf("FATAL: Unable to create new client.")
}
// Return success
return client
}
func createBucket() {
// Get a client
client := getS3Client()
// Create our bucket (may already exist without error)
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
if _, err := client.CreateBucket(in); err != nil {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err)
}
}
func deleteAllObjects() {
// Get a client
client := getS3Client()
// Use multiple routines to do the actual delete
var doneDeletes sync.WaitGroup
// Loop deleting our versions reading as big a list as we can
var keyMarker, versionId *string
var err error
for loop := 1; ; loop++ {
// Delete all the existing objects and versions in the bucket
in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)}
if listVersions, listErr := client.ListObjectVersions(in); listErr == nil {
delete := &s3.Delete{Quiet: aws.Bool(true)}
for _, version := range listVersions.Versions {
delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId})
}
for _, marker := range listVersions.DeleteMarkers {
delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId})
}
if len(delete.Objects) > 0 {
// Start a delete routine
doDelete := func(bucket string, delete *s3.Delete) {
if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil {
err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error())
}
doneDeletes.Done()
}
doneDeletes.Add(1)
go doDelete(bucket, delete)
}
// Advance to next versions
if listVersions.IsTruncated == nil || !*listVersions.IsTruncated {
break
}
keyMarker = listVersions.NextKeyMarker
versionId = listVersions.NextVersionIdMarker
} else {
// The bucket may not exist, just ignore in that case
if strings.HasPrefix(listErr.Error(), "NoSuchBucket") {
return
}
err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr)
break
}
}
// Wait for deletes to finish
doneDeletes.Wait()
// If error, it is fatal
if err != nil {
log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err)
}
}
// canonicalAmzHeaders -- return the x-amz headers canonicalized
func canonicalAmzHeaders(req *http.Request) string {
// Parse out all x-amz headers
var headers []string
for header := range req.Header {
norm := strings.ToLower(strings.TrimSpace(header))
if strings.HasPrefix(norm, "x-amz") {
headers = append(headers, norm)
}
}
// Put them in sorted order
sort.Strings(headers)
// Now add back the values
for n, header := range headers {
headers[n] = header + ":" + strings.Replace(req.Header.Get(header), "\n", " ", -1)
}
// Finally, put them back together
if len(headers) > 0 {
return strings.Join(headers, "\n") + "\n"
} else {
return ""
}
}
func hmacSHA1(key []byte, content string) []byte {
mac := hmac.New(sha1.New, key)
mac.Write([]byte(content))
return mac.Sum(nil)
}
func setSignature(req *http.Request) {
// Setup default parameters
dateHdr := time.Now().UTC().Format("20060102T150405Z")
req.Header.Set("X-Amz-Date", dateHdr)
// Get the canonical resource and header
canonicalResource := req.URL.EscapedPath()
canonicalHeaders := canonicalAmzHeaders(req)
stringToSign := req.Method + "\n" + req.Header.Get("Content-MD5") + "\n" + req.Header.Get("Content-Type") + "\n\n" +
canonicalHeaders + canonicalResource
hash := hmacSHA1([]byte(secret_key), stringToSign)
signature := base64.StdEncoding.EncodeToString(hash)
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
}
func runUpload(thread_num int) {
for time.Now().Before(endtime) {
objnum := atomic.AddInt32(&upload_count, 1)
fileobj := bytes.NewReader(object_data)
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
req, _ := http.NewRequest("PUT", prefix, fileobj)
req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10))
req.Header.Set("Content-MD5", object_data_md5)
setSignature(req)
if resp, err := httpClient.Do(req); err != nil {
log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
} else if resp.StatusCode != http.StatusOK {
fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp)
if resp.Body != nil {
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Body: %s\n", string(body))
}
}
}
// Remember last done time
upload_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
}
func runDownload(thread_num int) {
for time.Now().Before(endtime) {
atomic.AddInt32(&download_count, 1)
objnum := rand.Int31n(upload_count) + 1
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
req, _ := http.NewRequest("GET", prefix, nil)
setSignature(req)
if resp, err := httpClient.Do(req); err != nil {
log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
} else if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
}
}
// Remember last done time
download_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
}
func runDelete(thread_num int) {
for {
objnum := atomic.AddInt32(&delete_count, 1)
if objnum > upload_count {
break
}
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
req, _ := http.NewRequest("DELETE", prefix, nil)
setSignature(req)
if _, err := httpClient.Do(req); err != nil {
log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err)
}
}
// Remember last done time
delete_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
}
func main() {
// Hello
fmt.Println("Wasabi benchmark program v2.0")
// Parse command line
myflag := flag.NewFlagSet("myflag", flag.ExitOnError)
myflag.StringVar(&access_key, "a", "", "Access key")
myflag.StringVar(&secret_key, "s", "", "Secret key")
myflag.StringVar(&url_host, "u", "http://s3.wasabisys.com", "URL for host with method prefix")
myflag.StringVar(&bucket, "b", "wasabi-benchmark-bucket", "Bucket for testing")
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
var sizeArg string
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
if err := myflag.Parse(os.Args[1:]); err != nil {
os.Exit(1)
}
// Check the arguments
if access_key == "" {
log.Fatal("Missing argument -a for access key.")
}
if secret_key == "" {
log.Fatal("Missing argument -s for secret key.")
}
var err error
if object_size, err = bytefmt.ToBytes(sizeArg); err != nil {
log.Fatalf("Invalid -z argument for object size: %v", err)
}
// Echo the parameters
logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, duration=%d, threads=%d, loops=%d, size=%s",
url_host, bucket, duration_secs, threads, loops, sizeArg))
// Initialize data for the bucket
object_data = make([]byte, object_size)
rand.Read(object_data)
hasher := md5.New()
hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
// Create the bucket and delete all the objects
createBucket()
deleteAllObjects()
// Loop running the tests
for loop := 1; loop <= loops; loop++ {
// Run the upload case
running_threads = int32(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runUpload(n)
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
upload_time := upload_finish.Sub(starttime).Seconds()
bps := float64(uint64(upload_count)*object_size) / upload_time
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.",
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time))
// Run the download case
running_threads = int32(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runDownload(n)
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
download_time := download_finish.Sub(starttime).Seconds()
bps = float64(uint64(download_count)*object_size) / download_time
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.",
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time))
// Run the delete case
running_threads = int32(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runDelete(n)
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
delete_time := delete_finish.Sub(starttime).Seconds()
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec.",
loop, delete_time, float64(upload_count)/delete_time))
}
// All done
fmt.Println("Benchmark completed.")
}

BIN
s3-benchmark.ubuntu Executable file

Binary file not shown.