main.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "os"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tal-tech/go-zero/core/breaker"
  10. "github.com/tal-tech/go-zero/core/lang"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "gopkg.in/cheggaaa/pb.v1"
  13. )
  14. const (
  15. duration = time.Minute * 5
  16. breakRange = 20
  17. workRange = 50
  18. requestInterval = time.Millisecond
  19. // multiply to make it visible in plot
  20. stateFator = float64(time.Second/requestInterval) / 2
  21. )
  22. type (
  23. server struct {
  24. state int32
  25. }
  26. metric struct {
  27. calls int64
  28. }
  29. )
  30. func (m *metric) addCall() {
  31. atomic.AddInt64(&m.calls, 1)
  32. }
  33. func (m *metric) reset() int64 {
  34. return atomic.SwapInt64(&m.calls, 0)
  35. }
  36. func newServer() *server {
  37. return &server{}
  38. }
  39. func (s *server) serve(m *metric) bool {
  40. m.addCall()
  41. return atomic.LoadInt32(&s.state) == 1
  42. }
  43. func (s *server) start() {
  44. go func() {
  45. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  46. var state int32
  47. for {
  48. var v int32
  49. if state == 0 {
  50. v = r.Int31n(breakRange)
  51. } else {
  52. v = r.Int31n(workRange)
  53. }
  54. time.Sleep(time.Second * time.Duration(v+1))
  55. state ^= 1
  56. atomic.StoreInt32(&s.state, state)
  57. }
  58. }()
  59. }
  60. func runBreaker(s *server, br breaker.Breaker, duration time.Duration, m *metric) {
  61. ticker := time.NewTicker(requestInterval)
  62. defer ticker.Stop()
  63. done := make(chan lang.PlaceholderType)
  64. go func() {
  65. time.Sleep(duration)
  66. close(done)
  67. }()
  68. for {
  69. select {
  70. case <-ticker.C:
  71. _ = br.Do(func() error {
  72. if s.serve(m) {
  73. return nil
  74. } else {
  75. return breaker.ErrServiceUnavailable
  76. }
  77. })
  78. case <-done:
  79. return
  80. }
  81. }
  82. }
  83. func main() {
  84. srv := newServer()
  85. srv.start()
  86. gb := breaker.NewBreaker()
  87. fp, err := os.Create("result.csv")
  88. logx.Must(err)
  89. defer fp.Close()
  90. fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls")
  91. var gm, nm metric
  92. go func() {
  93. ticker := time.NewTicker(time.Second)
  94. defer ticker.Stop()
  95. var seconds int
  96. for range ticker.C {
  97. seconds++
  98. gcalls := gm.reset()
  99. ncalls := nm.reset()
  100. fmt.Fprintf(fp, "%d,%.2f,%d,%d\n",
  101. seconds, float64(atomic.LoadInt32(&srv.state))*stateFator, gcalls, ncalls)
  102. }
  103. }()
  104. var waitGroup sync.WaitGroup
  105. waitGroup.Add(1)
  106. go func() {
  107. runBreaker(srv, gb, duration, &gm)
  108. waitGroup.Done()
  109. }()
  110. go func() {
  111. bar := pb.New(int(duration / time.Second)).Start()
  112. ticker := time.NewTicker(time.Second)
  113. defer ticker.Stop()
  114. for range ticker.C {
  115. bar.Increment()
  116. }
  117. bar.Finish()
  118. }()
  119. waitGroup.Wait()
  120. }