watcher.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package store
  2. import (
  3. "path"
  4. "strconv"
  5. "strings"
  6. )
  7. //------------------------------------------------------------------------------
  8. //
  9. // Typedefs
  10. //
  11. //------------------------------------------------------------------------------
  12. // WatcherHub is where the client register its watcher
  13. type WatcherHub struct {
  14. watchers map[string][]*Watcher
  15. }
  16. // Currently watcher only contains a response channel
  17. type Watcher struct {
  18. C chan *Response
  19. }
  20. // Create a new watcherHub
  21. func newWatcherHub() *WatcherHub {
  22. w := new(WatcherHub)
  23. w.watchers = make(map[string][]*Watcher)
  24. return w
  25. }
  26. // Create a new watcher
  27. func NewWatcher() *Watcher {
  28. return &Watcher{C: make(chan *Response, 1)}
  29. }
  30. // Add a watcher to the watcherHub
  31. func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
  32. responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
  33. prefix = path.Clean("/" + prefix)
  34. if sinceIndex != 0 && sinceIndex >= responseStartIndex {
  35. for i := sinceIndex; i <= currentIndex; i++ {
  36. if checkResponse(prefix, i, resMap) {
  37. watcher.C <- resMap[strconv.FormatUint(i, 10)]
  38. return nil
  39. }
  40. }
  41. }
  42. _, ok := w.watchers[prefix]
  43. if !ok {
  44. w.watchers[prefix] = make([]*Watcher, 0)
  45. }
  46. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  47. return nil
  48. }
  49. // Check if the response has what we are watching
  50. func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
  51. resp, ok := resMap[strconv.FormatUint(index, 10)]
  52. if !ok {
  53. // not storage system command
  54. return false
  55. } else {
  56. path := resp.Key
  57. if strings.HasPrefix(path, prefix) {
  58. prefixLen := len(prefix)
  59. if len(path) == prefixLen || path[prefixLen] == '/' {
  60. return true
  61. }
  62. }
  63. }
  64. return false
  65. }
  66. // Notify the watcher a action happened
  67. func (w *WatcherHub) notify(resp Response) error {
  68. resp.Key = path.Clean(resp.Key)
  69. segments := strings.Split(resp.Key, "/")
  70. currPath := "/"
  71. // walk through all the pathes
  72. for _, segment := range segments {
  73. currPath = path.Join(currPath, segment)
  74. watchers, ok := w.watchers[currPath]
  75. if ok {
  76. newWatchers := make([]*Watcher, 0)
  77. // notify all the watchers
  78. for _, watcher := range watchers {
  79. watcher.C <- &resp
  80. }
  81. if len(newWatchers) == 0 {
  82. // we have notified all the watchers at this path
  83. // delete the map
  84. delete(w.watchers, currPath)
  85. } else {
  86. w.watchers[currPath] = newWatchers
  87. }
  88. }
  89. }
  90. return nil
  91. }
  92. // stopWatchers stops all the watchers
  93. // This function is used when the etcd recovery from a snapshot at runtime
  94. func (w *WatcherHub) stopWatchers() {
  95. for _, subWatchers := range w.watchers {
  96. for _, watcher := range subWatchers {
  97. watcher.C <- nil
  98. }
  99. }
  100. w.watchers = nil
  101. }