123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package internal
- import (
- "context"
- "fmt"
- "io"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/tal-tech/go-zero/core/contextx"
- "github.com/tal-tech/go-zero/core/lang"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/syncx"
- "github.com/tal-tech/go-zero/core/threading"
- "go.etcd.io/etcd/clientv3"
- )
- var (
- registryInstance = Registry{
- clusters: make(map[string]*cluster),
- }
- connManager = syncx.NewResourceManager()
- )
- type Registry struct {
- clusters map[string]*cluster
- lock sync.Mutex
- }
- func GetRegistry() *Registry {
- return ®istryInstance
- }
- func (r *Registry) getCluster(endpoints []string) *cluster {
- clusterKey := getClusterKey(endpoints)
- r.lock.Lock()
- defer r.lock.Unlock()
- c, ok := r.clusters[clusterKey]
- if !ok {
- c = newCluster(endpoints)
- r.clusters[clusterKey] = c
- }
- return c
- }
- func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
- return r.getCluster(endpoints).getClient()
- }
- func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
- return r.getCluster(endpoints).monitor(key, l)
- }
- type cluster struct {
- endpoints []string
- key string
- values map[string]map[string]string
- listeners map[string][]UpdateListener
- watchGroup *threading.RoutineGroup
- done chan lang.PlaceholderType
- lock sync.Mutex
- }
- func newCluster(endpoints []string) *cluster {
- return &cluster{
- endpoints: endpoints,
- key: getClusterKey(endpoints),
- values: make(map[string]map[string]string),
- listeners: make(map[string][]UpdateListener),
- watchGroup: threading.NewRoutineGroup(),
- done: make(chan lang.PlaceholderType),
- }
- }
- func (c *cluster) context(cli EtcdClient) context.Context {
- return contextx.ValueOnlyFrom(cli.Ctx())
- }
- func (c *cluster) getClient() (EtcdClient, error) {
- val, err := connManager.GetResource(c.key, func() (io.Closer, error) {
- return c.newClient()
- })
- if err != nil {
- return nil, err
- }
- return val.(EtcdClient), nil
- }
- func (c *cluster) handleChanges(key string, kvs []KV) {
- var add []KV
- var remove []KV
- c.lock.Lock()
- listeners := append([]UpdateListener(nil), c.listeners[key]...)
- vals, ok := c.values[key]
- if !ok {
- add = kvs
- vals = make(map[string]string)
- for _, kv := range kvs {
- vals[kv.Key] = kv.Val
- }
- c.values[key] = vals
- } else {
- m := make(map[string]string)
- for _, kv := range kvs {
- m[kv.Key] = kv.Val
- }
- for k, v := range vals {
- if val, ok := m[k]; !ok || v != val {
- remove = append(remove, KV{
- Key: k,
- Val: v,
- })
- }
- }
- for k, v := range m {
- if val, ok := vals[k]; !ok || v != val {
- add = append(add, KV{
- Key: k,
- Val: v,
- })
- }
- }
- c.values[key] = m
- }
- c.lock.Unlock()
- for _, kv := range add {
- for _, l := range listeners {
- l.OnAdd(kv)
- }
- }
- for _, kv := range remove {
- for _, l := range listeners {
- l.OnDelete(kv)
- }
- }
- }
- func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event) {
- c.lock.Lock()
- listeners := append([]UpdateListener(nil), c.listeners[key]...)
- c.lock.Unlock()
- for _, ev := range events {
- switch ev.Type {
- case clientv3.EventTypePut:
- c.lock.Lock()
- if vals, ok := c.values[key]; ok {
- vals[string(ev.Kv.Key)] = string(ev.Kv.Value)
- } else {
- c.values[key] = map[string]string{string(ev.Kv.Key): string(ev.Kv.Value)}
- }
- c.lock.Unlock()
- for _, l := range listeners {
- l.OnAdd(KV{
- Key: string(ev.Kv.Key),
- Val: string(ev.Kv.Value),
- })
- }
- case clientv3.EventTypeDelete:
- if vals, ok := c.values[key]; ok {
- delete(vals, string(ev.Kv.Key))
- }
- for _, l := range listeners {
- l.OnDelete(KV{
- Key: string(ev.Kv.Key),
- Val: string(ev.Kv.Value),
- })
- }
- default:
- logx.Errorf("Unknown event type: %v", ev.Type)
- }
- }
- }
- func (c *cluster) load(cli EtcdClient, key string) {
- var resp *clientv3.GetResponse
- for {
- var err error
- ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
- resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
- cancel()
- if err == nil {
- break
- }
- logx.Error(err)
- time.Sleep(coolDownInterval)
- }
- var kvs []KV
- c.lock.Lock()
- for _, ev := range resp.Kvs {
- kvs = append(kvs, KV{
- Key: string(ev.Key),
- Val: string(ev.Value),
- })
- }
- c.lock.Unlock()
- c.handleChanges(key, kvs)
- }
- func (c *cluster) monitor(key string, l UpdateListener) error {
- c.lock.Lock()
- c.listeners[key] = append(c.listeners[key], l)
- c.lock.Unlock()
- cli, err := c.getClient()
- if err != nil {
- return err
- }
- c.load(cli, key)
- c.watchGroup.Run(func() {
- c.watch(cli, key)
- })
- return nil
- }
- func (c *cluster) newClient() (EtcdClient, error) {
- cli, err := NewClient(c.endpoints)
- if err != nil {
- return nil, err
- }
- go c.watchConnState(cli)
- return cli, nil
- }
- func (c *cluster) reload(cli EtcdClient) {
- c.lock.Lock()
- close(c.done)
- c.watchGroup.Wait()
- c.done = make(chan lang.PlaceholderType)
- c.watchGroup = threading.NewRoutineGroup()
- var keys []string
- for k := range c.listeners {
- keys = append(keys, k)
- }
- c.lock.Unlock()
- for _, key := range keys {
- k := key
- c.watchGroup.Run(func() {
- c.load(cli, k)
- c.watch(cli, k)
- })
- }
- }
- func (c *cluster) watch(cli EtcdClient, key string) {
- rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
- for {
- select {
- case wresp, ok := <-rch:
- if !ok {
- logx.Error("etcd monitor chan has been closed")
- return
- }
- if wresp.Canceled {
- logx.Error("etcd monitor chan has been canceled")
- return
- }
- if wresp.Err() != nil {
- logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
- return
- }
- c.handleWatchEvents(key, wresp.Events)
- case <-c.done:
- return
- }
- }
- }
- func (c *cluster) watchConnState(cli EtcdClient) {
- watcher := newStateWatcher()
- watcher.addListener(func() {
- go c.reload(cli)
- })
- watcher.watch(cli.ActiveConnection())
- }
- func DialClient(endpoints []string) (EtcdClient, error) {
- return clientv3.New(clientv3.Config{
- Endpoints: endpoints,
- AutoSyncInterval: autoSyncInterval,
- DialTimeout: DialTimeout,
- DialKeepAliveTime: dialKeepAliveTime,
- DialKeepAliveTimeout: DialTimeout,
- RejectOldCluster: true,
- })
- }
- func getClusterKey(endpoints []string) string {
- sort.Strings(endpoints)
- return strings.Join(endpoints, endpointsSeparator)
- }
- func makeKeyPrefix(key string) string {
- return fmt.Sprintf("%s%c", key, Delimiter)
- }
|