watcher.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package main
  2. import (
  3. "path"
  4. "strings"
  5. //"fmt"
  6. "time"
  7. )
  8. type Watcher struct {
  9. chanMap map[string][]chan Response
  10. }
  11. // global watcher
  12. var w *Watcher
  13. // init the global watcher
  14. func init() {
  15. w = createWatcher()
  16. }
  17. // create a new watcher
  18. func createWatcher() *Watcher {
  19. w := new(Watcher)
  20. w.chanMap = make(map[string][]chan Response)
  21. return w
  22. }
  23. // register a function with channel and prefix to the watcher
  24. func (w *Watcher) add(prefix string, c chan Response) error {
  25. prefix = "/" + path.Clean(prefix)
  26. debug("Add a watche at ", prefix)
  27. _, ok := w.chanMap[prefix]
  28. if !ok {
  29. w.chanMap[prefix] = make([]chan Response, 0)
  30. w.chanMap[prefix] = append(w.chanMap[prefix], c)
  31. } else {
  32. w.chanMap[prefix] = append(w.chanMap[prefix], c)
  33. }
  34. return nil
  35. }
  36. // notify the watcher a action happened
  37. func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error {
  38. key = path.Clean(key)
  39. segments := strings.Split(key, "/")
  40. currPath := "/"
  41. // walk through all the pathes
  42. for _, segment := range segments {
  43. currPath = path.Join(currPath, segment)
  44. chans, ok := w.chanMap[currPath]
  45. if ok {
  46. debug("Notify at %s", currPath)
  47. n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)}
  48. // notify all the watchers
  49. for _, c := range chans {
  50. c <- n
  51. }
  52. // we have notified all the watchers at this path
  53. // delete the map
  54. delete(w.chanMap, currPath)
  55. }
  56. }
  57. return nil
  58. }