watcher.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package store
  2. import (
  3. "path"
  4. "strconv"
  5. "strings"
  6. )
  7. //------------------------------------------------------------------------------
  8. //
  9. // Typedefs
  10. //
  11. //------------------------------------------------------------------------------
  12. // WatcherHub is where the client register its watcher
  13. type WatcherHub struct {
  14. watchers map[string][]*Watcher
  15. }
  16. // Currently watcher only contains a response channel
  17. type Watcher struct {
  18. C chan *Response
  19. }
  20. // Create a new watcherHub
  21. func newWatcherHub() *WatcherHub {
  22. w := new(WatcherHub)
  23. w.watchers = make(map[string][]*Watcher)
  24. return w
  25. }
  26. // Create a new watcher
  27. func NewWatcher() *Watcher {
  28. return &Watcher{C: make(chan *Response, 1)}
  29. }
  30. // Add a watcher to the watcherHub
  31. func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
  32. responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error {
  33. prefix = path.Clean("/" + prefix)
  34. if sinceIndex != 0 && sinceIndex >= responseStartIndex {
  35. for i := sinceIndex; i <= currentIndex; i++ {
  36. if checkResponse(prefix, i, resMap) {
  37. watcher.C <- (*resMap)[strconv.FormatUint(i, 10)]
  38. return nil
  39. }
  40. }
  41. }
  42. _, ok := w.watchers[prefix]
  43. if !ok {
  44. w.watchers[prefix] = make([]*Watcher, 0)
  45. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  46. } else {
  47. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  48. }
  49. return nil
  50. }
  51. // Check if the response has what we are watching
  52. func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool {
  53. resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
  54. if !ok {
  55. // not storage system command
  56. return false
  57. } else {
  58. path := resp.Key
  59. if strings.HasPrefix(path, prefix) {
  60. prefixLen := len(prefix)
  61. if len(path) == prefixLen || path[prefixLen] == '/' {
  62. return true
  63. }
  64. }
  65. }
  66. return false
  67. }
  68. // Notify the watcher a action happened
  69. func (w *WatcherHub) notify(resp Response) error {
  70. resp.Key = path.Clean(resp.Key)
  71. segments := strings.Split(resp.Key, "/")
  72. currPath := "/"
  73. // walk through all the pathes
  74. for _, segment := range segments {
  75. currPath = path.Join(currPath, segment)
  76. watchers, ok := w.watchers[currPath]
  77. if ok {
  78. newWatchers := make([]*Watcher, 0)
  79. // notify all the watchers
  80. for _, watcher := range watchers {
  81. watcher.C <- &resp
  82. }
  83. if len(newWatchers) == 0 {
  84. // we have notified all the watchers at this path
  85. // delete the map
  86. delete(w.watchers, currPath)
  87. } else {
  88. w.watchers[currPath] = newWatchers
  89. }
  90. }
  91. }
  92. return nil
  93. }
  94. // stopWatchers stops all the watchers
  95. // This function is used when the etcd recovery from a snapshot at runtime
  96. func (w *WatcherHub) stopWatchers() {
  97. for _, subWatchers := range w.watchers {
  98. for _, watcher := range subWatchers {
  99. watcher.C <- nil
  100. }
  101. }
  102. w.watchers = nil
  103. }