123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package main
- import (
- "flag"
- "fmt"
- "io"
- "math"
- "math/rand"
- "os"
- "sync"
- "sync/atomic"
- "time"
- "github.com/tal-tech/go-zero/core/collection"
- "github.com/tal-tech/go-zero/core/executors"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/syncx"
- "gopkg.in/cheggaaa/pb.v1"
- )
- const (
- beta = 0.9
- total = 400
- interval = time.Second
- factor = 5
- )
- var (
- seconds = flag.Int("d", 400, "duration to go")
- flying uint64
- avgFlyingAggressive float64
- aggressiveLock syncx.SpinLock
- avgFlyingLazy float64
- lazyLock syncx.SpinLock
- avgFlyingBoth float64
- bothLock syncx.SpinLock
- lessWriter *executors.LessExecutor
- passCounter = collection.NewRollingWindow(50, time.Millisecond*100)
- rtCounter = collection.NewRollingWindow(50, time.Millisecond*100)
- index int32
- )
- func main() {
- flag.Parse()
- // only log 100 records
- lessWriter = executors.NewLessExecutor(interval * total / 100)
- fp, err := os.Create("result.csv")
- logx.Must(err)
- defer fp.Close()
- fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying")
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- bar := pb.New(*seconds * 2).Start()
- var waitGroup sync.WaitGroup
- batchRequests := func(i int) {
- <-ticker.C
- requests := (i + 1) * factor
- func() {
- it := time.NewTicker(interval / time.Duration(requests))
- defer it.Stop()
- for j := 0; j < requests; j++ {
- <-it.C
- waitGroup.Add(1)
- go func() {
- issueRequest(fp, atomic.AddInt32(&index, 1))
- waitGroup.Done()
- }()
- }
- bar.Increment()
- }()
- }
- for i := 0; i < *seconds; i++ {
- batchRequests(i)
- }
- for i := *seconds; i > 0; i-- {
- batchRequests(i)
- }
- bar.Finish()
- waitGroup.Wait()
- }
- func issueRequest(writer io.Writer, idx int32) {
- v := atomic.AddUint64(&flying, 1)
- aggressiveLock.Lock()
- af := avgFlyingAggressive*beta + float64(v)*(1-beta)
- avgFlyingAggressive = af
- aggressiveLock.Unlock()
- bothLock.Lock()
- bf := avgFlyingBoth*beta + float64(v)*(1-beta)
- avgFlyingBoth = bf
- bothLock.Unlock()
- duration := time.Millisecond * time.Duration(rand.Int63n(10)+1)
- job(duration)
- passCounter.Add(1)
- rtCounter.Add(float64(duration) / float64(time.Millisecond))
- v1 := atomic.AddUint64(&flying, ^uint64(0))
- lazyLock.Lock()
- lf := avgFlyingLazy*beta + float64(v1)*(1-beta)
- avgFlyingLazy = lf
- lazyLock.Unlock()
- bothLock.Lock()
- bf = avgFlyingBoth*beta + float64(v1)*(1-beta)
- avgFlyingBoth = bf
- bothLock.Unlock()
- lessWriter.DoOrDiscard(func() {
- fmt.Fprintf(writer, "%d,%d,%d,%.2f,%.2f,%.2f\n", idx, maxFlight(), v, af, lf, bf)
- })
- }
- func job(duration time.Duration) {
- time.Sleep(duration)
- }
- func maxFlight() int64 {
- return int64(math.Max(1, float64(maxPass()*10)*(minRt()/1e3)))
- }
- func maxPass() int64 {
- var result float64 = 1
- passCounter.Reduce(func(b *collection.Bucket) {
- if b.Sum > result {
- result = b.Sum
- }
- })
- return int64(result)
- }
- func minRt() float64 {
- var result float64 = 1000
- rtCounter.Reduce(func(b *collection.Bucket) {
- if b.Count <= 0 {
- return
- }
- avg := math.Round(b.Sum / float64(b.Count))
- if avg < result {
- result = avg
- }
- })
- return result
- }
|