package main import ( "context" "crypto/tls" "fmt" "io" "log" "math/rand" "net/http" "os" "strconv" "time" "github.com/google/uuid" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) type PRNG struct { rem int64 } func (r *PRNG) Read(p []byte) (n int, err error) { //log.Printf("rem=%d, buf=%d\n", r.rem, len(p)) if int64(len(p)) > r.rem { p = p[:r.rem] } if int64(len(p)) > r.rem { log.Fatal("LOGIC ERROR") } n, err = rand.Read(p) if err != nil { return } r.rem -= int64(n) if r.rem <= 0 { err = io.EOF //log.Printf("PRNG file has been fully read. rem=%d,n=%d,err=%s\n", r.rem, n, err) } return } func putObj(mc *minio.Client, buck string, size int64) error { prng := new(PRNG) prng.rem = size key := uuid.New().String() _, err := mc.PutObject( context.Background(), buck, key, prng, size, minio.PutObjectOptions{ContentType: "application/octet-stream"}, ) return err } func main() { fmt.Printf("total_objects,batch_dur_nanoseconds\n") minio.MaxRetry = 1 _, isSSL := os.LookupEnv("SSL") opts := minio.Options{ Creds: credentials.NewStaticV4(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), ""), Secure: isSSL, } if region, ok := os.LookupEnv("AWS_REGION"); ok { opts.Region = region } if _, ok := os.LookupEnv("SSL_INSECURE"); ok { opts.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}} } mc, err := minio.New(os.Getenv("ENDPOINT"), &opts) if err != nil { log.Fatal("failed connect", err) return } thread := 32 if env_thread, ok := os.LookupEnv("THREAD"); ok { tmp, err := strconv.Atoi(env_thread) if err != nil { log.Fatalf("invalid value for THREAD: %v\n", env_thread) } thread = tmp } batch_size := 256 if env_batch_size, ok := os.LookupEnv("BATCH_SIZE"); ok { tmp, err := strconv.Atoi(env_batch_size) if err != nil { log.Fatalf("invalid value for BATCH_SIZE: %v\n", env_batch_size) } batch_size = tmp } batch_count := 128 if env_batch_count, ok := os.LookupEnv("BATCH_COUNT"); ok { tmp, err := strconv.Atoi(env_batch_count) if err != nil { log.Fatalf("invalid value for BATCH_COUNT: %v\n", env_batch_count) } batch_count = tmp } obj_size := 16 if env_obj_size, ok := os.LookupEnv("OBJ_SIZE"); ok { tmp, err := strconv.Atoi(env_obj_size) if err != nil { log.Fatalf("invalid value for OBJ_SIZE: %v\n", env_obj_size) } obj_size = tmp } total_obj := thread * batch_size * batch_count total_size := total_obj * obj_size log.Printf("if bench succeed, %v objects (%v bytes) will be created\n", total_obj, total_size) // Create Bucket buck := uuid.New().String() err = mc.MakeBucket(context.Background(), buck, minio.MakeBucketOptions{}) if err != nil { log.Fatal(err) return } log.Printf("created bucket %s\n", buck) // Start sending... for bc := 0; bc < batch_count; bc++ { log.Printf("batch %d/%d - start\n", bc+1, batch_count) start := time.Now() syn := make(chan error) for tc := 0; tc < thread; tc++ { go func() { for bs := 0; bs < batch_size; bs++ { err := putObj(mc, buck, int64(obj_size)) if err != nil { syn <- err return } } syn <- nil }() } log.Printf("batch %d/%d - all threads started\n", bc+1, batch_count) errCount := 0 for tc := 0; tc < thread; tc++ { cerr := <-syn if cerr != nil { errCount += 1 log.Printf("thread %d/%d failed with %s\n", tc, thread, cerr) } } if errCount > 0 { log.Fatal("Too many errors, exiting...") return } elapsed := time.Since(start) fmt.Printf("%d,%v\n", bc * thread * batch_size, elapsed.Nanoseconds()) log.Printf("batch %d/%d - all threads returned\n", bc+1, batch_count) } }