p2c.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package p2c
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "github.com/tal-tech/go-zero/core/syncx"
  13. "github.com/tal-tech/go-zero/core/timex"
  14. "github.com/tal-tech/go-zero/zrpc/internal/codes"
  15. "google.golang.org/grpc/balancer"
  16. "google.golang.org/grpc/balancer/base"
  17. "google.golang.org/grpc/resolver"
  18. )
  19. const (
  20. // Name is the name of p2c balancer.
  21. Name = "p2c_ewma"
  22. decayTime = int64(time.Second * 10) // default value from finagle
  23. forcePick = int64(time.Second)
  24. initSuccess = 1000
  25. throttleSuccess = initSuccess / 2
  26. penalty = int64(math.MaxInt32)
  27. pickTimes = 3
  28. logInterval = time.Minute
  29. )
  30. func init() {
  31. balancer.Register(newBuilder())
  32. }
  33. type p2cPickerBuilder struct{}
  34. func newBuilder() balancer.Builder {
  35. return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
  36. }
  37. func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
  38. if len(readySCs) == 0 {
  39. return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  40. }
  41. var conns []*subConn
  42. for addr, conn := range readySCs {
  43. conns = append(conns, &subConn{
  44. addr: addr,
  45. conn: conn,
  46. success: initSuccess,
  47. })
  48. }
  49. return &p2cPicker{
  50. conns: conns,
  51. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  52. stamp: syncx.NewAtomicDuration(),
  53. }
  54. }
  55. type p2cPicker struct {
  56. conns []*subConn
  57. r *rand.Rand
  58. stamp *syncx.AtomicDuration
  59. lock sync.Mutex
  60. }
  61. func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
  62. conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
  63. p.lock.Lock()
  64. defer p.lock.Unlock()
  65. var chosen *subConn
  66. switch len(p.conns) {
  67. case 0:
  68. return nil, nil, balancer.ErrNoSubConnAvailable
  69. case 1:
  70. chosen = p.choose(p.conns[0], nil)
  71. case 2:
  72. chosen = p.choose(p.conns[0], p.conns[1])
  73. default:
  74. var node1, node2 *subConn
  75. for i := 0; i < pickTimes; i++ {
  76. a := p.r.Intn(len(p.conns))
  77. b := p.r.Intn(len(p.conns) - 1)
  78. if b >= a {
  79. b++
  80. }
  81. node1 = p.conns[a]
  82. node2 = p.conns[b]
  83. if node1.healthy() && node2.healthy() {
  84. break
  85. }
  86. }
  87. chosen = p.choose(node1, node2)
  88. }
  89. atomic.AddInt64(&chosen.inflight, 1)
  90. atomic.AddInt64(&chosen.requests, 1)
  91. return chosen.conn, p.buildDoneFunc(chosen), nil
  92. }
  93. func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
  94. start := int64(timex.Now())
  95. return func(info balancer.DoneInfo) {
  96. atomic.AddInt64(&c.inflight, -1)
  97. now := timex.Now()
  98. last := atomic.SwapInt64(&c.last, int64(now))
  99. td := int64(now) - last
  100. if td < 0 {
  101. td = 0
  102. }
  103. w := math.Exp(float64(-td) / float64(decayTime))
  104. lag := int64(now) - start
  105. if lag < 0 {
  106. lag = 0
  107. }
  108. olag := atomic.LoadUint64(&c.lag)
  109. if olag == 0 {
  110. w = 0
  111. }
  112. atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
  113. success := initSuccess
  114. if info.Err != nil && !codes.Acceptable(info.Err) {
  115. success = 0
  116. }
  117. osucc := atomic.LoadUint64(&c.success)
  118. atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
  119. stamp := p.stamp.Load()
  120. if now-stamp >= logInterval {
  121. if p.stamp.CompareAndSwap(stamp, now) {
  122. p.logStats()
  123. }
  124. }
  125. }
  126. }
  127. func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
  128. start := int64(timex.Now())
  129. if c2 == nil {
  130. atomic.StoreInt64(&c1.pick, start)
  131. return c1
  132. }
  133. if c1.load() > c2.load() {
  134. c1, c2 = c2, c1
  135. }
  136. pick := atomic.LoadInt64(&c2.pick)
  137. if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
  138. return c2
  139. }
  140. atomic.StoreInt64(&c1.pick, start)
  141. return c1
  142. }
  143. func (p *p2cPicker) logStats() {
  144. var stats []string
  145. p.lock.Lock()
  146. defer p.lock.Unlock()
  147. for _, conn := range p.conns {
  148. stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d",
  149. conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0)))
  150. }
  151. logx.Statf("p2c - %s", strings.Join(stats, "; "))
  152. }
  153. type subConn struct {
  154. addr resolver.Address
  155. conn balancer.SubConn
  156. lag uint64
  157. inflight int64
  158. success uint64
  159. requests int64
  160. last int64
  161. pick int64
  162. }
  163. func (c *subConn) healthy() bool {
  164. return atomic.LoadUint64(&c.success) > throttleSuccess
  165. }
  166. func (c *subConn) load() int64 {
  167. // plus one to avoid multiply zero
  168. lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
  169. load := lag * (atomic.LoadInt64(&c.inflight) + 1)
  170. if load == 0 {
  171. return penalty
  172. }
  173. return load
  174. }