watcher.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package fileSystem
  2. import (
  3. "container/list"
  4. "path"
  5. "strings"
  6. )
  7. type watcherHub struct {
  8. watchers map[string]*list.List
  9. count uint64 // current number of watchers
  10. EventHistory *EventHistory
  11. }
  12. type watcher struct {
  13. eventChan chan *Event
  14. recursive bool
  15. }
  16. func newWatchHub(capacity int) *watcherHub {
  17. return &watcherHub{
  18. watchers: make(map[string]*list.List),
  19. EventHistory: newEventHistory(capacity),
  20. }
  21. }
  22. // watch function returns an Event channel.
  23. // If recursive is true, the first change after index under prefix will be sent to the event channel.
  24. // If recursive is false, the first change after index at prefix will be sent to the event channel.
  25. // If index is zero, watch will start from the current index + 1.
  26. func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, error) {
  27. eventChan := make(chan *Event, 1)
  28. e, err := wh.EventHistory.scan(prefix, index)
  29. if err != nil {
  30. return nil, err
  31. }
  32. if e != nil {
  33. eventChan <- e
  34. return eventChan, nil
  35. }
  36. w := &watcher{
  37. eventChan: eventChan,
  38. recursive: recursive,
  39. }
  40. l, ok := wh.watchers[prefix]
  41. if ok { // add the new watcher to the back of the list
  42. l.PushBack(w)
  43. } else { // create a new list and add the new watcher
  44. l := list.New()
  45. l.PushBack(w)
  46. wh.watchers[prefix] = l
  47. }
  48. return eventChan, nil
  49. }
  50. func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
  51. l, ok := wh.watchers[path]
  52. if ok {
  53. curr := l.Front()
  54. notifiedAll := true
  55. for {
  56. if curr == nil { // we have reached the end of the list
  57. if notifiedAll {
  58. // if we have notified all watcher in the list
  59. // we can delete the list
  60. delete(wh.watchers, path)
  61. }
  62. break
  63. }
  64. next := curr.Next() // save the next
  65. w, _ := curr.Value.(*watcher)
  66. if w.recursive || force || e.Key == path {
  67. w.eventChan <- e
  68. l.Remove(curr)
  69. } else {
  70. notifiedAll = false
  71. }
  72. curr = next // go to the next one
  73. }
  74. }
  75. }
  76. func (wh *watcherHub) notify(e *Event) {
  77. segments := strings.Split(e.Key, "/")
  78. currPath := "/"
  79. // walk through all the paths
  80. for _, segment := range segments {
  81. currPath = path.Join(currPath, segment)
  82. wh.notifyWithPath(e, currPath, false)
  83. }
  84. wh.EventHistory.addEvent(e)
  85. }