subscriber.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/discov/internal"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. type (
  9. subOptions struct {
  10. exclusive bool
  11. }
  12. // SubOption defines the method to customize a Subscriber.
  13. SubOption func(opts *subOptions)
  14. // A Subscriber is used to subscribe the given key on a etcd cluster.
  15. Subscriber struct {
  16. items *container
  17. }
  18. )
  19. // NewSubscriber returns a Subscriber.
  20. // endpoints is the hosts of the etcd cluster.
  21. // key is the key to subscribe.
  22. // opts are used to customize the Subscriber.
  23. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  24. var subOpts subOptions
  25. for _, opt := range opts {
  26. opt(&subOpts)
  27. }
  28. sub := &Subscriber{
  29. items: newContainer(subOpts.exclusive),
  30. }
  31. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  32. return nil, err
  33. }
  34. return sub, nil
  35. }
  36. // AddListener adds listener to s.
  37. func (s *Subscriber) AddListener(listener func()) {
  38. s.items.addListener(listener)
  39. }
  40. // Values returns all the subscription values.
  41. func (s *Subscriber) Values() []string {
  42. return s.items.getValues()
  43. }
  44. // Exclusive means that key value can only be 1:1,
  45. // which means later added value will remove the keys associated with the same value previously.
  46. func Exclusive() SubOption {
  47. return func(opts *subOptions) {
  48. opts.exclusive = true
  49. }
  50. }
  51. type container struct {
  52. exclusive bool
  53. values map[string][]string
  54. mapping map[string]string
  55. snapshot atomic.Value
  56. dirty *syncx.AtomicBool
  57. listeners []func()
  58. lock sync.Mutex
  59. }
  60. func newContainer(exclusive bool) *container {
  61. return &container{
  62. exclusive: exclusive,
  63. values: make(map[string][]string),
  64. mapping: make(map[string]string),
  65. dirty: syncx.ForAtomicBool(true),
  66. }
  67. }
  68. func (c *container) OnAdd(kv internal.KV) {
  69. c.addKv(kv.Key, kv.Val)
  70. c.notifyChange()
  71. }
  72. func (c *container) OnDelete(kv internal.KV) {
  73. c.removeKey(kv.Key)
  74. c.notifyChange()
  75. }
  76. // addKv adds the kv, returns if there are already other keys associate with the value
  77. func (c *container) addKv(key, value string) ([]string, bool) {
  78. c.lock.Lock()
  79. defer c.lock.Unlock()
  80. c.dirty.Set(true)
  81. keys := c.values[value]
  82. previous := append([]string(nil), keys...)
  83. early := len(keys) > 0
  84. if c.exclusive && early {
  85. for _, each := range keys {
  86. c.doRemoveKey(each)
  87. }
  88. }
  89. c.values[value] = append(c.values[value], key)
  90. c.mapping[key] = value
  91. if early {
  92. return previous, true
  93. }
  94. return nil, false
  95. }
  96. func (c *container) addListener(listener func()) {
  97. c.lock.Lock()
  98. c.listeners = append(c.listeners, listener)
  99. c.lock.Unlock()
  100. }
  101. func (c *container) doRemoveKey(key string) {
  102. server, ok := c.mapping[key]
  103. if !ok {
  104. return
  105. }
  106. delete(c.mapping, key)
  107. keys := c.values[server]
  108. remain := keys[:0]
  109. for _, k := range keys {
  110. if k != key {
  111. remain = append(remain, k)
  112. }
  113. }
  114. if len(remain) > 0 {
  115. c.values[server] = remain
  116. } else {
  117. delete(c.values, server)
  118. }
  119. }
  120. func (c *container) getValues() []string {
  121. if !c.dirty.True() {
  122. return c.snapshot.Load().([]string)
  123. }
  124. c.lock.Lock()
  125. defer c.lock.Unlock()
  126. var vals []string
  127. for each := range c.values {
  128. vals = append(vals, each)
  129. }
  130. c.snapshot.Store(vals)
  131. c.dirty.Set(false)
  132. return vals
  133. }
  134. func (c *container) notifyChange() {
  135. c.lock.Lock()
  136. listeners := append(([]func())(nil), c.listeners...)
  137. c.lock.Unlock()
  138. for _, listener := range listeners {
  139. listener()
  140. }
  141. }
  142. // removeKey removes the kv, returns true if there are still other keys associate with the value
  143. func (c *container) removeKey(key string) {
  144. c.lock.Lock()
  145. defer c.lock.Unlock()
  146. c.dirty.Set(true)
  147. c.doRemoveKey(key)
  148. }