watcher.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package fileSystem
  2. import (
  3. "container/list"
  4. "path"
  5. "strings"
  6. )
  7. type watcherHub struct {
  8. watchers map[string]*list.List
  9. count uint64 // current number of watchers
  10. EventHistory *EventHistory
  11. }
  12. func newWatchHub(capacity int) *watcherHub {
  13. return &watcherHub{
  14. watchers: make(map[string]*list.List),
  15. EventHistory: newEventHistory(capacity),
  16. }
  17. }
  18. func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) {
  19. eventChan := make(chan *Event, 1)
  20. e, err := wh.EventHistory.scan(prefix, index)
  21. if err != nil {
  22. return err, nil
  23. }
  24. if e != nil {
  25. eventChan <- e
  26. return nil, eventChan
  27. }
  28. l, ok := wh.watchers[prefix]
  29. if ok {
  30. l.PushBack(eventChan)
  31. } else {
  32. l := list.New()
  33. l.PushBack(eventChan)
  34. wh.watchers[prefix] = l
  35. }
  36. return nil, eventChan
  37. }
  38. func (wh *watcherHub) notify(e *Event) {
  39. segments := strings.Split(e.Key, "/")
  40. currPath := "/"
  41. // walk through all the paths
  42. for _, segment := range segments {
  43. currPath = path.Join(currPath, segment)
  44. l, ok := wh.watchers[currPath]
  45. if ok {
  46. for {
  47. element := l.Front()
  48. if element == nil {
  49. delete(wh.watchers, currPath)
  50. break
  51. }
  52. c, _ := element.Value.(chan *Event)
  53. c <- e
  54. l.Remove(element)
  55. }
  56. }
  57. }
  58. }