watcher.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package store
  2. import (
  3. "path"
  4. "strings"
  5. )
  6. const (
  7. SHORT = iota
  8. LONG
  9. )
  10. type WatcherHub struct {
  11. watchers map[string][]Watcher
  12. }
  13. type Watcher struct {
  14. c chan Response
  15. wType int
  16. }
  17. // global watcher
  18. var w *WatcherHub
  19. // init the global watcher
  20. func init() {
  21. w = createWatcherHub()
  22. }
  23. // create a new watcher
  24. func createWatcherHub() *WatcherHub {
  25. w := new(WatcherHub)
  26. w.watchers = make(map[string][]Watcher)
  27. return w
  28. }
  29. func GetWatcherHub() *WatcherHub {
  30. return w
  31. }
  32. // register a function with channel and prefix to the watcher
  33. func AddWatcher(prefix string, c chan Response, wType int) error {
  34. prefix = "/" + path.Clean(prefix)
  35. _, ok := w.watchers[prefix]
  36. if !ok {
  37. w.watchers[prefix] = make([]Watcher, 0)
  38. watcher := Watcher{c, wType}
  39. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  40. } else {
  41. watcher := Watcher{c, wType}
  42. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  43. }
  44. return nil
  45. }
  46. // notify the watcher a action happened
  47. func notify(resp Response) error {
  48. resp.Key = path.Clean(resp.Key)
  49. segments := strings.Split(resp.Key, "/")
  50. currPath := "/"
  51. // walk through all the pathes
  52. for _, segment := range segments {
  53. currPath = path.Join(currPath, segment)
  54. watchers, ok := w.watchers[currPath]
  55. if ok {
  56. newWatchers := make([]Watcher, 0)
  57. // notify all the watchers
  58. for _, watcher := range watchers {
  59. watcher.c <- resp
  60. if watcher.wType == LONG {
  61. newWatchers = append(newWatchers, watcher)
  62. }
  63. }
  64. if len(newWatchers) == 0 {
  65. // we have notified all the watchers at this path
  66. // delete the map
  67. delete(w.watchers, currPath)
  68. } else {
  69. w.watchers[currPath] = newWatchers
  70. }
  71. }
  72. }
  73. return nil
  74. }