watcher.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package store
  2. import (
  3. "container/list"
  4. "path"
  5. "strings"
  6. "sync/atomic"
  7. )
  8. type watcherHub struct {
  9. watchers map[string]*list.List
  10. count int64 // current number of watchers.
  11. EventHistory *EventHistory
  12. }
  13. type watcher struct {
  14. eventChan chan *Event
  15. recursive bool
  16. sinceIndex uint64
  17. }
  18. func newWatchHub(capacity int) *watcherHub {
  19. return &watcherHub{
  20. watchers: make(map[string]*list.List),
  21. EventHistory: newEventHistory(capacity),
  22. }
  23. }
  24. // watch function returns an Event channel.
  25. // If recursive is true, the first change after index under prefix will be sent to the event channel.
  26. // If recursive is false, the first change after index at prefix will be sent to the event channel.
  27. // If index is zero, watch will start from the current index + 1.
  28. func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, error) {
  29. eventChan := make(chan *Event, 1)
  30. e, err := wh.EventHistory.scan(prefix, index)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if e != nil {
  35. eventChan <- e
  36. return eventChan, nil
  37. }
  38. w := &watcher{
  39. eventChan: eventChan,
  40. recursive: recursive,
  41. sinceIndex: index - 1, // to catch Expire()
  42. }
  43. l, ok := wh.watchers[prefix]
  44. if ok { // add the new watcher to the back of the list
  45. l.PushBack(w)
  46. } else { // create a new list and add the new watcher
  47. l := list.New()
  48. l.PushBack(w)
  49. wh.watchers[prefix] = l
  50. }
  51. atomic.AddInt64(&wh.count, 1)
  52. return eventChan, nil
  53. }
  54. func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
  55. l, ok := wh.watchers[path]
  56. if ok {
  57. curr := l.Front()
  58. notifiedAll := true
  59. for {
  60. if curr == nil { // we have reached the end of the list
  61. if notifiedAll {
  62. // if we have notified all watcher in the list
  63. // we can delete the list
  64. delete(wh.watchers, path)
  65. }
  66. break
  67. }
  68. next := curr.Next() // save the next
  69. w, _ := curr.Value.(*watcher)
  70. if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
  71. w.eventChan <- e
  72. l.Remove(curr)
  73. atomic.AddInt64(&wh.count, -1)
  74. } else {
  75. notifiedAll = false
  76. }
  77. curr = next // go to the next one
  78. }
  79. }
  80. }
  81. func (wh *watcherHub) notify(e *Event) {
  82. e = wh.EventHistory.addEvent(e)
  83. segments := strings.Split(e.Key, "/")
  84. currPath := "/"
  85. // walk through all the paths
  86. for _, segment := range segments {
  87. currPath = path.Join(currPath, segment)
  88. wh.notifyWithPath(e, currPath, false)
  89. }
  90. }
  91. func (wh *watcherHub) clone() *watcherHub {
  92. clonedHistory := wh.EventHistory.clone()
  93. return &watcherHub{
  94. EventHistory: clonedHistory,
  95. }
  96. }