123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package load
- import (
- "math/rand"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/tal-tech/go-zero/core/collection"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/mathx"
- "github.com/tal-tech/go-zero/core/stat"
- "github.com/tal-tech/go-zero/core/syncx"
- )
- const (
- buckets = 10
- bucketDuration = time.Millisecond * 50
- )
- func init() {
- stat.SetReporter(nil)
- }
- func TestAdaptiveShedder(t *testing.T) {
- shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
- var wg sync.WaitGroup
- var drop int64
- proba := mathx.NewProba()
- for i := 0; i < 100; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for i := 0; i < 30; i++ {
- promise, err := shedder.Allow()
- if err != nil {
- atomic.AddInt64(&drop, 1)
- } else {
- count := rand.Intn(5)
- time.Sleep(time.Millisecond * time.Duration(count))
- if proba.TrueOnProba(0.01) {
- promise.Fail()
- } else {
- promise.Pass()
- }
- }
- }
- }()
- }
- wg.Wait()
- }
- func TestAdaptiveShedderMaxPass(t *testing.T) {
- passCounter := newRollingWindow()
- for i := 1; i <= 10; i++ {
- passCounter.Add(float64(i * 100))
- time.Sleep(bucketDuration)
- }
- shedder := &adaptiveShedder{
- passCounter: passCounter,
- droppedRecently: syncx.NewAtomicBool(),
- }
- assert.Equal(t, int64(1000), shedder.maxPass())
- // default max pass is equal to 1.
- passCounter = newRollingWindow()
- shedder = &adaptiveShedder{
- passCounter: passCounter,
- droppedRecently: syncx.NewAtomicBool(),
- }
- assert.Equal(t, int64(1), shedder.maxPass())
- }
- func TestAdaptiveShedderMinRt(t *testing.T) {
- rtCounter := newRollingWindow()
- for i := 0; i < 10; i++ {
- if i > 0 {
- time.Sleep(bucketDuration)
- }
- for j := i*10 + 1; j <= i*10+10; j++ {
- rtCounter.Add(float64(j))
- }
- }
- shedder := &adaptiveShedder{
- rtCounter: rtCounter,
- }
- assert.Equal(t, float64(6), shedder.minRt())
- // default max min rt is equal to maxFloat64.
- rtCounter = newRollingWindow()
- shedder = &adaptiveShedder{
- rtCounter: rtCounter,
- droppedRecently: syncx.NewAtomicBool(),
- }
- assert.Equal(t, defaultMinRt, shedder.minRt())
- }
- func TestAdaptiveShedderMaxFlight(t *testing.T) {
- passCounter := newRollingWindow()
- rtCounter := newRollingWindow()
- for i := 0; i < 10; i++ {
- if i > 0 {
- time.Sleep(bucketDuration)
- }
- passCounter.Add(float64((i + 1) * 100))
- for j := i*10 + 1; j <= i*10+10; j++ {
- rtCounter.Add(float64(j))
- }
- }
- shedder := &adaptiveShedder{
- passCounter: passCounter,
- rtCounter: rtCounter,
- windows: buckets,
- droppedRecently: syncx.NewAtomicBool(),
- }
- assert.Equal(t, int64(54), shedder.maxFlight())
- }
- func TestAdaptiveShedderShouldDrop(t *testing.T) {
- logx.Disable()
- passCounter := newRollingWindow()
- rtCounter := newRollingWindow()
- for i := 0; i < 10; i++ {
- if i > 0 {
- time.Sleep(bucketDuration)
- }
- passCounter.Add(float64((i + 1) * 100))
- for j := i*10 + 1; j <= i*10+10; j++ {
- rtCounter.Add(float64(j))
- }
- }
- shedder := &adaptiveShedder{
- passCounter: passCounter,
- rtCounter: rtCounter,
- windows: buckets,
- dropTime: syncx.NewAtomicDuration(),
- droppedRecently: syncx.NewAtomicBool(),
- }
- // cpu >= 800, inflight < maxPass
- systemOverloadChecker = func(int64) bool {
- return true
- }
- shedder.avgFlying = 50
- assert.False(t, shedder.shouldDrop())
- // cpu >= 800, inflight > maxPass
- shedder.avgFlying = 80
- shedder.flying = 50
- assert.False(t, shedder.shouldDrop())
- // cpu >= 800, inflight > maxPass
- shedder.avgFlying = 80
- shedder.flying = 80
- assert.True(t, shedder.shouldDrop())
- // cpu < 800, inflight > maxPass
- systemOverloadChecker = func(int64) bool {
- return false
- }
- shedder.avgFlying = 80
- assert.False(t, shedder.shouldDrop())
- // cpu >= 800, inflight < maxPass
- systemOverloadChecker = func(int64) bool {
- return true
- }
- shedder.avgFlying = 80
- shedder.flying = 80
- _, err := shedder.Allow()
- assert.NotNil(t, err)
- }
- func TestAdaptiveShedderStillHot(t *testing.T) {
- logx.Disable()
- passCounter := newRollingWindow()
- rtCounter := newRollingWindow()
- for i := 0; i < 10; i++ {
- if i > 0 {
- time.Sleep(bucketDuration)
- }
- passCounter.Add(float64((i + 1) * 100))
- for j := i*10 + 1; j <= i*10+10; j++ {
- rtCounter.Add(float64(j))
- }
- }
- shedder := &adaptiveShedder{
- passCounter: passCounter,
- rtCounter: rtCounter,
- windows: buckets,
- dropTime: syncx.NewAtomicDuration(),
- droppedRecently: syncx.ForAtomicBool(true),
- }
- assert.False(t, shedder.stillHot())
- shedder.dropTime.Set(-coolOffDuration * 2)
- assert.False(t, shedder.stillHot())
- }
- func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
- logx.Disable()
- bench := func(b *testing.B) {
- var shedder = NewAdaptiveShedder()
- proba := mathx.NewProba()
- for i := 0; i < 6000; i++ {
- p, err := shedder.Allow()
- if err == nil {
- time.Sleep(time.Millisecond)
- if proba.TrueOnProba(0.01) {
- p.Fail()
- } else {
- p.Pass()
- }
- }
- }
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- p, err := shedder.Allow()
- if err == nil {
- p.Pass()
- }
- }
- }
- systemOverloadChecker = func(int64) bool {
- return true
- }
- b.Run("high load", bench)
- systemOverloadChecker = func(int64) bool {
- return false
- }
- b.Run("low load", bench)
- }
- func newRollingWindow() *collection.RollingWindow {
- return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket())
- }
|