Merge pull request #5 from wasabi-tech/rob/retry-after-slowdown
Retry upload, download, and delete after a slow-down response
This commit is contained in:
commit
4103387f62
3 changed files with 49 additions and 24 deletions
BIN
s3-benchmark
Executable file
BIN
s3-benchmark
Executable file
Binary file not shown.
|
@ -33,12 +33,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Global variables
|
// Global variables
|
||||||
var access_key, secret_key, url_host, bucket string
|
var access_key, secret_key, url_host, bucket, region string
|
||||||
var duration_secs, threads, loops int
|
var duration_secs, threads, loops int
|
||||||
var object_size uint64
|
var object_size uint64
|
||||||
var object_data []byte
|
var object_data []byte
|
||||||
var object_data_md5 string
|
var object_data_md5 string
|
||||||
var running_threads, upload_count, download_count, delete_count int32
|
var running_threads, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int32
|
||||||
var endtime, upload_finish, download_finish, delete_finish time.Time
|
var endtime, upload_finish, download_finish, delete_finish time.Time
|
||||||
|
|
||||||
func logit(msg string) {
|
func logit(msg string) {
|
||||||
|
@ -76,7 +76,7 @@ func getS3Client() *s3.S3 {
|
||||||
loglevel := aws.LogOff
|
loglevel := aws.LogOff
|
||||||
// Build the rest of the configuration
|
// Build the rest of the configuration
|
||||||
awsConfig := &aws.Config{
|
awsConfig := &aws.Config{
|
||||||
Region: aws.String("us-east-1"),
|
Region: aws.String(region),
|
||||||
Endpoint: aws.String(url_host),
|
Endpoint: aws.String(url_host),
|
||||||
Credentials: creds,
|
Credentials: creds,
|
||||||
LogLevel: &loglevel,
|
LogLevel: &loglevel,
|
||||||
|
@ -94,15 +94,19 @@ func getS3Client() *s3.S3 {
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBucket() {
|
func createBucket(ignore_errors bool) {
|
||||||
// Get a client
|
// Get a client
|
||||||
client := getS3Client()
|
client := getS3Client()
|
||||||
// Create our bucket (may already exist without error)
|
// Create our bucket (may already exist without error)
|
||||||
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
|
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
|
||||||
if _, err := client.CreateBucket(in); err != nil {
|
if _, err := client.CreateBucket(in); err != nil {
|
||||||
|
if ignore_errors {
|
||||||
|
log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err)
|
||||||
|
} else {
|
||||||
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err)
|
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func deleteAllObjects() {
|
func deleteAllObjects() {
|
||||||
// Get a client
|
// Get a client
|
||||||
|
@ -212,7 +216,11 @@ func runUpload(thread_num int) {
|
||||||
setSignature(req)
|
setSignature(req)
|
||||||
if resp, err := httpClient.Do(req); err != nil {
|
if resp, err := httpClient.Do(req); err != nil {
|
||||||
log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
|
log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
|
||||||
} else if resp.StatusCode != http.StatusOK {
|
} else if resp != nil && resp.StatusCode != http.StatusOK {
|
||||||
|
if (resp.StatusCode == http.StatusServiceUnavailable) {
|
||||||
|
atomic.AddInt32(&upload_slowdown_count, 1)
|
||||||
|
atomic.AddInt32(&upload_count, -1)
|
||||||
|
} else {
|
||||||
fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp)
|
fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp)
|
||||||
if resp.Body != nil {
|
if resp.Body != nil {
|
||||||
body, _ := ioutil.ReadAll(resp.Body)
|
body, _ := ioutil.ReadAll(resp.Body)
|
||||||
|
@ -220,6 +228,7 @@ func runUpload(thread_num int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
upload_finish = time.Now()
|
upload_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
|
@ -229,16 +238,21 @@ func runUpload(thread_num int) {
|
||||||
func runDownload(thread_num int) {
|
func runDownload(thread_num int) {
|
||||||
for time.Now().Before(endtime) {
|
for time.Now().Before(endtime) {
|
||||||
atomic.AddInt32(&download_count, 1)
|
atomic.AddInt32(&download_count, 1)
|
||||||
objnum := rand.Int31n(upload_count) + 1
|
objnum := rand.Int31n(download_count) + 1
|
||||||
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
||||||
req, _ := http.NewRequest("GET", prefix, nil)
|
req, _ := http.NewRequest("GET", prefix, nil)
|
||||||
setSignature(req)
|
setSignature(req)
|
||||||
if resp, err := httpClient.Do(req); err != nil {
|
if resp, err := httpClient.Do(req); err != nil {
|
||||||
log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
|
log.Fatalf("FATAL: Error downloading object %s: %v", prefix, err)
|
||||||
} else if resp != nil && resp.Body != nil {
|
} else if resp != nil && resp.Body != nil {
|
||||||
|
if (resp.StatusCode == http.StatusServiceUnavailable){
|
||||||
|
atomic.AddInt32(&download_slowdown_count, 1)
|
||||||
|
atomic.AddInt32(&download_count, -1)
|
||||||
|
} else {
|
||||||
io.Copy(ioutil.Discard, resp.Body)
|
io.Copy(ioutil.Discard, resp.Body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
download_finish = time.Now()
|
download_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
|
@ -254,8 +268,11 @@ func runDelete(thread_num int) {
|
||||||
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
||||||
req, _ := http.NewRequest("DELETE", prefix, nil)
|
req, _ := http.NewRequest("DELETE", prefix, nil)
|
||||||
setSignature(req)
|
setSignature(req)
|
||||||
if _, err := httpClient.Do(req); err != nil {
|
if resp, err := httpClient.Do(req); err != nil {
|
||||||
log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err)
|
log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err)
|
||||||
|
} else if (resp != nil && resp.StatusCode == http.StatusServiceUnavailable) {
|
||||||
|
atomic.AddInt32(&delete_slowdown_count, 1)
|
||||||
|
atomic.AddInt32(&delete_count, -1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
|
@ -274,6 +291,7 @@ func main() {
|
||||||
myflag.StringVar(&secret_key, "s", "", "Secret key")
|
myflag.StringVar(&secret_key, "s", "", "Secret key")
|
||||||
myflag.StringVar(&url_host, "u", "http://s3.wasabisys.com", "URL for host with method prefix")
|
myflag.StringVar(&url_host, "u", "http://s3.wasabisys.com", "URL for host with method prefix")
|
||||||
myflag.StringVar(&bucket, "b", "wasabi-benchmark-bucket", "Bucket for testing")
|
myflag.StringVar(&bucket, "b", "wasabi-benchmark-bucket", "Bucket for testing")
|
||||||
|
myflag.StringVar(®ion, "r", "us-east-1", "Region for testing")
|
||||||
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
|
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
|
||||||
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
||||||
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
||||||
|
@ -296,8 +314,8 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Echo the parameters
|
// Echo the parameters
|
||||||
logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, duration=%d, threads=%d, loops=%d, size=%s",
|
logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
|
||||||
url_host, bucket, duration_secs, threads, loops, sizeArg))
|
url_host, bucket, region, duration_secs, threads, loops, sizeArg))
|
||||||
|
|
||||||
// Initialize data for the bucket
|
// Initialize data for the bucket
|
||||||
object_data = make([]byte, object_size)
|
object_data = make([]byte, object_size)
|
||||||
|
@ -307,12 +325,20 @@ func main() {
|
||||||
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||||
|
|
||||||
// Create the bucket and delete all the objects
|
// Create the bucket and delete all the objects
|
||||||
createBucket()
|
createBucket(true)
|
||||||
deleteAllObjects()
|
deleteAllObjects()
|
||||||
|
|
||||||
// Loop running the tests
|
// Loop running the tests
|
||||||
for loop := 1; loop <= loops; loop++ {
|
for loop := 1; loop <= loops; loop++ {
|
||||||
|
|
||||||
|
// reset counters
|
||||||
|
upload_count = 0
|
||||||
|
upload_slowdown_count = 0
|
||||||
|
download_count = 0
|
||||||
|
download_slowdown_count = 0
|
||||||
|
delete_count = 0
|
||||||
|
delete_slowdown_count = 0
|
||||||
|
|
||||||
// Run the upload case
|
// Run the upload case
|
||||||
running_threads = int32(threads)
|
running_threads = int32(threads)
|
||||||
starttime := time.Now()
|
starttime := time.Now()
|
||||||
|
@ -328,8 +354,8 @@ func main() {
|
||||||
upload_time := upload_finish.Sub(starttime).Seconds()
|
upload_time := upload_finish.Sub(starttime).Seconds()
|
||||||
|
|
||||||
bps := float64(uint64(upload_count)*object_size) / upload_time
|
bps := float64(uint64(upload_count)*object_size) / upload_time
|
||||||
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.",
|
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
|
||||||
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time))
|
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
|
||||||
|
|
||||||
// Run the download case
|
// Run the download case
|
||||||
running_threads = int32(threads)
|
running_threads = int32(threads)
|
||||||
|
@ -346,8 +372,8 @@ func main() {
|
||||||
download_time := download_finish.Sub(starttime).Seconds()
|
download_time := download_finish.Sub(starttime).Seconds()
|
||||||
|
|
||||||
bps = float64(uint64(download_count)*object_size) / download_time
|
bps = float64(uint64(download_count)*object_size) / download_time
|
||||||
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.",
|
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
|
||||||
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time))
|
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count))
|
||||||
|
|
||||||
// Run the delete case
|
// Run the delete case
|
||||||
running_threads = int32(threads)
|
running_threads = int32(threads)
|
||||||
|
@ -363,10 +389,9 @@ func main() {
|
||||||
}
|
}
|
||||||
delete_time := delete_finish.Sub(starttime).Seconds()
|
delete_time := delete_finish.Sub(starttime).Seconds()
|
||||||
|
|
||||||
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec.",
|
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d",
|
||||||
loop, delete_time, float64(upload_count)/delete_time))
|
loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count))
|
||||||
}
|
}
|
||||||
|
|
||||||
// All done
|
// All done
|
||||||
fmt.Println("Benchmark completed.")
|
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
Loading…
Reference in a new issue