watcher.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package store
  2. import (
  3. "path"
  4. "strings"
  5. "fmt"
  6. )
  7. type WatcherHub struct {
  8. watchers map[string][]Watcher
  9. }
  10. type Watcher struct {
  11. c chan Response
  12. }
  13. // global watcher
  14. var w *WatcherHub
  15. // init the global watcher
  16. func init() {
  17. w = createWatcherHub()
  18. }
  19. // create a new watcher
  20. func createWatcherHub() *WatcherHub {
  21. w := new(WatcherHub)
  22. w.watchers = make(map[string][]Watcher)
  23. return w
  24. }
  25. func GetWatcherHub() *WatcherHub {
  26. return w
  27. }
  28. // register a function with channel and prefix to the watcher
  29. func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
  30. prefix = "/" + path.Clean(prefix)
  31. if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
  32. for i := sinceIndex; i <= s.Index; i++ {
  33. if check(prefix, i) {
  34. c <- s.Responses[i]
  35. return nil
  36. }
  37. }
  38. }
  39. _, ok := w.watchers[prefix]
  40. if !ok {
  41. w.watchers[prefix] = make([]Watcher, 0)
  42. watcher := Watcher{c}
  43. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  44. } else {
  45. watcher := Watcher{c}
  46. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  47. }
  48. return nil
  49. }
  50. // check if the response has what we are waching
  51. func check(prefix string, index uint64) bool {
  52. index = index - s.ResponseStartIndex
  53. if index < 0 {
  54. return false
  55. }
  56. path := s.Responses[index].Key
  57. if strings.HasPrefix(path, prefix) {
  58. prefixLen := len(prefix)
  59. if len(path) == prefixLen || path[prefixLen] == '/' {
  60. return true
  61. }
  62. }
  63. return false
  64. }
  65. // notify the watcher a action happened
  66. func notify(resp Response) error {
  67. resp.Key = path.Clean(resp.Key)
  68. segments := strings.Split(resp.Key, "/")
  69. currPath := "/"
  70. // walk through all the pathes
  71. for _, segment := range segments {
  72. currPath = path.Join(currPath, segment)
  73. watchers, ok := w.watchers[currPath]
  74. if ok {
  75. newWatchers := make([]Watcher, 0)
  76. // notify all the watchers
  77. for _, watcher := range watchers {
  78. watcher.c <- resp
  79. }
  80. if len(newWatchers) == 0 {
  81. // we have notified all the watchers at this path
  82. // delete the map
  83. delete(w.watchers, currPath)
  84. } else {
  85. w.watchers[currPath] = newWatchers
  86. }
  87. }
  88. }
  89. return nil
  90. }