registry.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/tal-tech/go-zero/core/contextx"
  11. "github.com/tal-tech/go-zero/core/lang"
  12. "github.com/tal-tech/go-zero/core/logx"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. "github.com/tal-tech/go-zero/core/threading"
  15. "go.etcd.io/etcd/clientv3"
  16. )
  17. var (
  18. registryInstance = Registry{
  19. clusters: make(map[string]*cluster),
  20. }
  21. connManager = syncx.NewResourceManager()
  22. )
  23. type Registry struct {
  24. clusters map[string]*cluster
  25. lock sync.Mutex
  26. }
  27. func GetRegistry() *Registry {
  28. return &registryInstance
  29. }
  30. func (r *Registry) getCluster(endpoints []string) *cluster {
  31. clusterKey := getClusterKey(endpoints)
  32. r.lock.Lock()
  33. defer r.lock.Unlock()
  34. c, ok := r.clusters[clusterKey]
  35. if !ok {
  36. c = newCluster(endpoints)
  37. r.clusters[clusterKey] = c
  38. }
  39. return c
  40. }
  41. func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
  42. return r.getCluster(endpoints).getClient()
  43. }
  44. func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
  45. return r.getCluster(endpoints).monitor(key, l)
  46. }
  47. type cluster struct {
  48. endpoints []string
  49. key string
  50. values map[string]map[string]string
  51. listeners map[string][]UpdateListener
  52. watchGroup *threading.RoutineGroup
  53. done chan lang.PlaceholderType
  54. lock sync.Mutex
  55. }
  56. func newCluster(endpoints []string) *cluster {
  57. return &cluster{
  58. endpoints: endpoints,
  59. key: getClusterKey(endpoints),
  60. values: make(map[string]map[string]string),
  61. listeners: make(map[string][]UpdateListener),
  62. watchGroup: threading.NewRoutineGroup(),
  63. done: make(chan lang.PlaceholderType),
  64. }
  65. }
  66. func (c *cluster) context(cli EtcdClient) context.Context {
  67. return contextx.ValueOnlyFrom(cli.Ctx())
  68. }
  69. func (c *cluster) getClient() (EtcdClient, error) {
  70. val, err := connManager.GetResource(c.key, func() (io.Closer, error) {
  71. return c.newClient()
  72. })
  73. if err != nil {
  74. return nil, err
  75. }
  76. return val.(EtcdClient), nil
  77. }
  78. func (c *cluster) handleChanges(key string, kvs []KV) {
  79. var add []KV
  80. var remove []KV
  81. c.lock.Lock()
  82. listeners := append([]UpdateListener(nil), c.listeners[key]...)
  83. vals, ok := c.values[key]
  84. if !ok {
  85. add = kvs
  86. vals = make(map[string]string)
  87. for _, kv := range kvs {
  88. vals[kv.Key] = kv.Val
  89. }
  90. c.values[key] = vals
  91. } else {
  92. m := make(map[string]string)
  93. for _, kv := range kvs {
  94. m[kv.Key] = kv.Val
  95. }
  96. for k, v := range vals {
  97. if val, ok := m[k]; !ok || v != val {
  98. remove = append(remove, KV{
  99. Key: k,
  100. Val: v,
  101. })
  102. }
  103. }
  104. for k, v := range m {
  105. if val, ok := vals[k]; !ok || v != val {
  106. add = append(add, KV{
  107. Key: k,
  108. Val: v,
  109. })
  110. }
  111. }
  112. c.values[key] = m
  113. }
  114. c.lock.Unlock()
  115. for _, kv := range add {
  116. for _, l := range listeners {
  117. l.OnAdd(kv)
  118. }
  119. }
  120. for _, kv := range remove {
  121. for _, l := range listeners {
  122. l.OnDelete(kv)
  123. }
  124. }
  125. }
  126. func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event) {
  127. c.lock.Lock()
  128. listeners := append([]UpdateListener(nil), c.listeners[key]...)
  129. c.lock.Unlock()
  130. for _, ev := range events {
  131. switch ev.Type {
  132. case clientv3.EventTypePut:
  133. c.lock.Lock()
  134. if vals, ok := c.values[key]; ok {
  135. vals[string(ev.Kv.Key)] = string(ev.Kv.Value)
  136. } else {
  137. c.values[key] = map[string]string{string(ev.Kv.Key): string(ev.Kv.Value)}
  138. }
  139. c.lock.Unlock()
  140. for _, l := range listeners {
  141. l.OnAdd(KV{
  142. Key: string(ev.Kv.Key),
  143. Val: string(ev.Kv.Value),
  144. })
  145. }
  146. case clientv3.EventTypeDelete:
  147. if vals, ok := c.values[key]; ok {
  148. delete(vals, string(ev.Kv.Key))
  149. }
  150. for _, l := range listeners {
  151. l.OnDelete(KV{
  152. Key: string(ev.Kv.Key),
  153. Val: string(ev.Kv.Value),
  154. })
  155. }
  156. default:
  157. logx.Errorf("Unknown event type: %v", ev.Type)
  158. }
  159. }
  160. }
  161. func (c *cluster) load(cli EtcdClient, key string) {
  162. var resp *clientv3.GetResponse
  163. for {
  164. var err error
  165. ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
  166. resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
  167. cancel()
  168. if err == nil {
  169. break
  170. }
  171. logx.Error(err)
  172. time.Sleep(coolDownInterval)
  173. }
  174. var kvs []KV
  175. c.lock.Lock()
  176. for _, ev := range resp.Kvs {
  177. kvs = append(kvs, KV{
  178. Key: string(ev.Key),
  179. Val: string(ev.Value),
  180. })
  181. }
  182. c.lock.Unlock()
  183. c.handleChanges(key, kvs)
  184. }
  185. func (c *cluster) monitor(key string, l UpdateListener) error {
  186. c.lock.Lock()
  187. c.listeners[key] = append(c.listeners[key], l)
  188. c.lock.Unlock()
  189. cli, err := c.getClient()
  190. if err != nil {
  191. return err
  192. }
  193. c.load(cli, key)
  194. c.watchGroup.Run(func() {
  195. c.watch(cli, key)
  196. })
  197. return nil
  198. }
  199. func (c *cluster) newClient() (EtcdClient, error) {
  200. cli, err := NewClient(c.endpoints)
  201. if err != nil {
  202. return nil, err
  203. }
  204. go c.watchConnState(cli)
  205. return cli, nil
  206. }
  207. func (c *cluster) reload(cli EtcdClient) {
  208. c.lock.Lock()
  209. close(c.done)
  210. c.watchGroup.Wait()
  211. c.done = make(chan lang.PlaceholderType)
  212. c.watchGroup = threading.NewRoutineGroup()
  213. var keys []string
  214. for k := range c.listeners {
  215. keys = append(keys, k)
  216. }
  217. c.lock.Unlock()
  218. for _, key := range keys {
  219. k := key
  220. c.watchGroup.Run(func() {
  221. c.load(cli, k)
  222. c.watch(cli, k)
  223. })
  224. }
  225. }
  226. func (c *cluster) watch(cli EtcdClient, key string) {
  227. rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
  228. for {
  229. select {
  230. case wresp, ok := <-rch:
  231. if !ok {
  232. logx.Error("etcd monitor chan has been closed")
  233. return
  234. }
  235. if wresp.Canceled {
  236. logx.Error("etcd monitor chan has been canceled")
  237. return
  238. }
  239. if wresp.Err() != nil {
  240. logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
  241. return
  242. }
  243. c.handleWatchEvents(key, wresp.Events)
  244. case <-c.done:
  245. return
  246. }
  247. }
  248. }
  249. func (c *cluster) watchConnState(cli EtcdClient) {
  250. watcher := newStateWatcher()
  251. watcher.addListener(func() {
  252. go c.reload(cli)
  253. })
  254. watcher.watch(cli.ActiveConnection())
  255. }
  256. func DialClient(endpoints []string) (EtcdClient, error) {
  257. return clientv3.New(clientv3.Config{
  258. Endpoints: endpoints,
  259. AutoSyncInterval: autoSyncInterval,
  260. DialTimeout: DialTimeout,
  261. DialKeepAliveTime: dialKeepAliveTime,
  262. DialKeepAliveTimeout: DialTimeout,
  263. RejectOldCluster: true,
  264. })
  265. }
  266. func getClusterKey(endpoints []string) string {
  267. sort.Strings(endpoints)
  268. return strings.Join(endpoints, endpointsSeparator)
  269. }
  270. func makeKeyPrefix(key string) string {
  271. return fmt.Sprintf("%s%c", key, Delimiter)
  272. }