subscriber.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/discov/internal"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. type (
  9. subOptions struct {
  10. exclusive bool
  11. }
  12. SubOption func(opts *subOptions)
  13. Subscriber struct {
  14. items *container
  15. }
  16. )
  17. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  18. var subOpts subOptions
  19. for _, opt := range opts {
  20. opt(&subOpts)
  21. }
  22. sub := &Subscriber{
  23. items: newContainer(subOpts.exclusive),
  24. }
  25. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  26. return nil, err
  27. }
  28. return sub, nil
  29. }
  30. func (s *Subscriber) AddListener(listener func()) {
  31. s.items.addListener(listener)
  32. }
  33. func (s *Subscriber) Values() []string {
  34. return s.items.getValues()
  35. }
  36. // exclusive means that key value can only be 1:1,
  37. // which means later added value will remove the keys associated with the same value previously.
  38. func Exclusive() SubOption {
  39. return func(opts *subOptions) {
  40. opts.exclusive = true
  41. }
  42. }
  43. type container struct {
  44. exclusive bool
  45. values map[string][]string
  46. mapping map[string]string
  47. snapshot atomic.Value
  48. dirty *syncx.AtomicBool
  49. listeners []func()
  50. lock sync.Mutex
  51. }
  52. func newContainer(exclusive bool) *container {
  53. return &container{
  54. exclusive: exclusive,
  55. values: make(map[string][]string),
  56. mapping: make(map[string]string),
  57. dirty: syncx.ForAtomicBool(true),
  58. }
  59. }
  60. func (c *container) OnAdd(kv internal.KV) {
  61. c.addKv(kv.Key, kv.Val)
  62. c.notifyChange()
  63. }
  64. func (c *container) OnDelete(kv internal.KV) {
  65. c.removeKey(kv.Key)
  66. c.notifyChange()
  67. }
  68. // addKv adds the kv, returns if there are already other keys associate with the value
  69. func (c *container) addKv(key, value string) ([]string, bool) {
  70. c.lock.Lock()
  71. defer c.lock.Unlock()
  72. c.dirty.Set(true)
  73. keys := c.values[value]
  74. previous := append([]string(nil), keys...)
  75. early := len(keys) > 0
  76. if c.exclusive && early {
  77. for _, each := range keys {
  78. c.doRemoveKey(each)
  79. }
  80. }
  81. c.values[value] = append(c.values[value], key)
  82. c.mapping[key] = value
  83. if early {
  84. return previous, true
  85. } else {
  86. return nil, false
  87. }
  88. }
  89. func (c *container) addListener(listener func()) {
  90. c.lock.Lock()
  91. c.listeners = append(c.listeners, listener)
  92. c.lock.Unlock()
  93. }
  94. func (c *container) doRemoveKey(key string) {
  95. server, ok := c.mapping[key]
  96. if !ok {
  97. return
  98. }
  99. delete(c.mapping, key)
  100. keys := c.values[server]
  101. remain := keys[:0]
  102. for _, k := range keys {
  103. if k != key {
  104. remain = append(remain, k)
  105. }
  106. }
  107. if len(remain) > 0 {
  108. c.values[server] = remain
  109. } else {
  110. delete(c.values, server)
  111. }
  112. }
  113. func (c *container) getValues() []string {
  114. if !c.dirty.True() {
  115. return c.snapshot.Load().([]string)
  116. }
  117. c.lock.Lock()
  118. defer c.lock.Unlock()
  119. var vals []string
  120. for each := range c.values {
  121. vals = append(vals, each)
  122. }
  123. c.snapshot.Store(vals)
  124. c.dirty.Set(false)
  125. return vals
  126. }
  127. func (c *container) notifyChange() {
  128. c.lock.Lock()
  129. listeners := append(([]func())(nil), c.listeners...)
  130. c.lock.Unlock()
  131. for _, listener := range listeners {
  132. listener()
  133. }
  134. }
  135. // removeKey removes the kv, returns true if there are still other keys associate with the value
  136. func (c *container) removeKey(key string) {
  137. c.lock.Lock()
  138. defer c.lock.Unlock()
  139. c.dirty.Set(true)
  140. c.doRemoveKey(key)
  141. }