p2c.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package p2c
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "github.com/tal-tech/go-zero/core/syncx"
  13. "github.com/tal-tech/go-zero/core/timex"
  14. "github.com/tal-tech/go-zero/zrpc/internal/codes"
  15. "google.golang.org/grpc/balancer"
  16. "google.golang.org/grpc/balancer/base"
  17. "google.golang.org/grpc/resolver"
  18. )
  19. const (
  20. // Name is the name of p2c balancer.
  21. Name = "p2c_ewma"
  22. decayTime = int64(time.Second * 10) // default value from finagle
  23. forcePick = int64(time.Second)
  24. initSuccess = 1000
  25. throttleSuccess = initSuccess / 2
  26. penalty = int64(math.MaxInt32)
  27. pickTimes = 3
  28. logInterval = time.Minute
  29. )
  30. func init() {
  31. balancer.Register(newBuilder())
  32. }
  33. type p2cPickerBuilder struct {
  34. }
  35. func newBuilder() balancer.Builder {
  36. return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
  37. }
  38. func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
  39. if len(readySCs) == 0 {
  40. return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  41. }
  42. var conns []*subConn
  43. for addr, conn := range readySCs {
  44. conns = append(conns, &subConn{
  45. addr: addr,
  46. conn: conn,
  47. success: initSuccess,
  48. })
  49. }
  50. return &p2cPicker{
  51. conns: conns,
  52. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  53. stamp: syncx.NewAtomicDuration(),
  54. }
  55. }
  56. type p2cPicker struct {
  57. conns []*subConn
  58. r *rand.Rand
  59. stamp *syncx.AtomicDuration
  60. lock sync.Mutex
  61. }
  62. func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
  63. conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
  64. p.lock.Lock()
  65. defer p.lock.Unlock()
  66. var chosen *subConn
  67. switch len(p.conns) {
  68. case 0:
  69. return nil, nil, balancer.ErrNoSubConnAvailable
  70. case 1:
  71. chosen = p.choose(p.conns[0], nil)
  72. case 2:
  73. chosen = p.choose(p.conns[0], p.conns[1])
  74. default:
  75. var node1, node2 *subConn
  76. for i := 0; i < pickTimes; i++ {
  77. a := p.r.Intn(len(p.conns))
  78. b := p.r.Intn(len(p.conns) - 1)
  79. if b >= a {
  80. b++
  81. }
  82. node1 = p.conns[a]
  83. node2 = p.conns[b]
  84. if node1.healthy() && node2.healthy() {
  85. break
  86. }
  87. }
  88. chosen = p.choose(node1, node2)
  89. }
  90. atomic.AddInt64(&chosen.inflight, 1)
  91. atomic.AddInt64(&chosen.requests, 1)
  92. return chosen.conn, p.buildDoneFunc(chosen), nil
  93. }
  94. func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
  95. start := int64(timex.Now())
  96. return func(info balancer.DoneInfo) {
  97. atomic.AddInt64(&c.inflight, -1)
  98. now := timex.Now()
  99. last := atomic.SwapInt64(&c.last, int64(now))
  100. td := int64(now) - last
  101. if td < 0 {
  102. td = 0
  103. }
  104. w := math.Exp(float64(-td) / float64(decayTime))
  105. lag := int64(now) - start
  106. if lag < 0 {
  107. lag = 0
  108. }
  109. olag := atomic.LoadUint64(&c.lag)
  110. if olag == 0 {
  111. w = 0
  112. }
  113. atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
  114. success := initSuccess
  115. if info.Err != nil && !codes.Acceptable(info.Err) {
  116. success = 0
  117. }
  118. osucc := atomic.LoadUint64(&c.success)
  119. atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
  120. stamp := p.stamp.Load()
  121. if now-stamp >= logInterval {
  122. if p.stamp.CompareAndSwap(stamp, now) {
  123. p.logStats()
  124. }
  125. }
  126. }
  127. }
  128. func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
  129. start := int64(timex.Now())
  130. if c2 == nil {
  131. atomic.StoreInt64(&c1.pick, start)
  132. return c1
  133. }
  134. if c1.load() > c2.load() {
  135. c1, c2 = c2, c1
  136. }
  137. pick := atomic.LoadInt64(&c2.pick)
  138. if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
  139. return c2
  140. }
  141. atomic.StoreInt64(&c1.pick, start)
  142. return c1
  143. }
  144. func (p *p2cPicker) logStats() {
  145. var stats []string
  146. p.lock.Lock()
  147. defer p.lock.Unlock()
  148. for _, conn := range p.conns {
  149. stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d",
  150. conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0)))
  151. }
  152. logx.Statf("p2c - %s", strings.Join(stats, "; "))
  153. }
  154. type subConn struct {
  155. addr resolver.Address
  156. conn balancer.SubConn
  157. lag uint64
  158. inflight int64
  159. success uint64
  160. requests int64
  161. last int64
  162. pick int64
  163. }
  164. func (c *subConn) healthy() bool {
  165. return atomic.LoadUint64(&c.success) > throttleSuccess
  166. }
  167. func (c *subConn) load() int64 {
  168. // plus one to avoid multiply zero
  169. lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
  170. load := lag * (atomic.LoadInt64(&c.inflight) + 1)
  171. if load == 0 {
  172. return penalty
  173. }
  174. return load
  175. }