etcdmon.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/tal-tech/go-zero/core/discov"
  8. "github.com/tal-tech/go-zero/core/logx"
  9. "github.com/tal-tech/go-zero/core/proc"
  10. "github.com/tal-tech/go-zero/core/syncx"
  11. "go.etcd.io/etcd/clientv3"
  12. )
  13. var (
  14. endpoints []string
  15. keys = []string{
  16. "user.rpc",
  17. "classroom.rpc",
  18. }
  19. vals = make(map[string]map[string]string)
  20. barrier syncx.Barrier
  21. )
  22. type listener struct {
  23. key string
  24. }
  25. func init() {
  26. cluster := proc.Env("ETCD_CLUSTER")
  27. if len(cluster) > 0 {
  28. endpoints = strings.Split(cluster, ",")
  29. } else {
  30. endpoints = []string{"localhost:2379"}
  31. }
  32. }
  33. func (l listener) OnAdd(key, val string) {
  34. fmt.Printf("add, key: %s, val: %s\n", key, val)
  35. barrier.Guard(func() {
  36. if m, ok := vals[l.key]; ok {
  37. m[key] = val
  38. } else {
  39. vals[l.key] = map[string]string{key: val}
  40. }
  41. })
  42. }
  43. func (l listener) OnDelete(key string) {
  44. fmt.Printf("del, key: %s\n", key)
  45. barrier.Guard(func() {
  46. if m, ok := vals[l.key]; ok {
  47. delete(m, key)
  48. }
  49. })
  50. }
  51. func load(cli *clientv3.Client, key string) (map[string]string, error) {
  52. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  53. resp, err := cli.Get(ctx, key, clientv3.WithPrefix())
  54. cancel()
  55. if err != nil {
  56. return nil, err
  57. }
  58. ret := make(map[string]string)
  59. for _, ev := range resp.Kvs {
  60. ret[string(ev.Key)] = string(ev.Value)
  61. }
  62. return ret, nil
  63. }
  64. func loadAll(cli *clientv3.Client) (map[string]map[string]string, error) {
  65. ret := make(map[string]map[string]string)
  66. for _, key := range keys {
  67. m, err := load(cli, key)
  68. if err != nil {
  69. return nil, err
  70. }
  71. ret[key] = m
  72. }
  73. return ret, nil
  74. }
  75. func compare(a, b map[string]map[string]string) bool {
  76. if len(a) != len(b) {
  77. return false
  78. }
  79. for k := range a {
  80. av := a[k]
  81. bv := b[k]
  82. if len(av) != len(bv) {
  83. return false
  84. }
  85. for kk := range av {
  86. if av[kk] != bv[kk] {
  87. return false
  88. }
  89. }
  90. }
  91. return true
  92. }
  93. func serializeMap(m map[string]map[string]string, prefix string) string {
  94. var builder strings.Builder
  95. for k, v := range m {
  96. fmt.Fprintf(&builder, "%s%s:\n", prefix, k)
  97. for kk, vv := range v {
  98. fmt.Fprintf(&builder, "%s\t%s: %s\n", prefix, kk, vv)
  99. }
  100. }
  101. return builder.String()
  102. }
  103. func main() {
  104. registry := discov.NewFacade(endpoints)
  105. for _, key := range keys {
  106. registry.Monitor(key, listener{key: key})
  107. }
  108. ticker := time.NewTicker(time.Minute)
  109. defer ticker.Stop()
  110. for {
  111. select {
  112. case <-ticker.C:
  113. expect, err := loadAll(registry.Client().(*clientv3.Client))
  114. if err != nil {
  115. fmt.Println("[ETCD-test] can't load current keys")
  116. continue
  117. }
  118. check := func() bool {
  119. var match bool
  120. barrier.Guard(func() {
  121. match = compare(expect, vals)
  122. })
  123. if match {
  124. logx.Info("match")
  125. }
  126. return match
  127. }
  128. if check() {
  129. continue
  130. }
  131. time.AfterFunc(time.Second*5, func() {
  132. if check() {
  133. return
  134. }
  135. var builder strings.Builder
  136. builder.WriteString(fmt.Sprintf("expect:\n%s\n", serializeMap(expect, "\t")))
  137. barrier.Guard(func() {
  138. builder.WriteString(fmt.Sprintf("actual:\n%s\n", serializeMap(vals, "\t")))
  139. })
  140. fmt.Println(builder.String())
  141. })
  142. }
  143. }
  144. }