watcher_hub.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "container/list"
  16. "path"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. etcdErr "github.com/coreos/etcd/error"
  21. )
  22. // A watcherHub contains all subscribed watchers
  23. // watchers is a map with watched path as key and watcher as value
  24. // EventHistory keeps the old events for watcherHub. It is used to help
  25. // watcher to get a continuous event history. Or a watcher might miss the
  26. // event happens between the end of the first watch command and the start
  27. // of the second command.
  28. type watcherHub struct {
  29. mutex sync.Mutex // protect the hash map
  30. watchers map[string]*list.List
  31. count int64 // current number of watchers.
  32. EventHistory *EventHistory
  33. }
  34. // newWatchHub creates a watchHub. The capacity determines how many events we will
  35. // keep in the eventHistory.
  36. // Typically, we only need to keep a small size of history[smaller than 20K].
  37. // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
  38. func newWatchHub(capacity int) *watcherHub {
  39. return &watcherHub{
  40. watchers: make(map[string]*list.List),
  41. EventHistory: newEventHistory(capacity),
  42. }
  43. }
  44. // Watch function returns a Watcher.
  45. // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
  46. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
  47. // If index is zero, watch will start from the current index + 1.
  48. func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
  49. event, err := wh.EventHistory.scan(key, recursive, index)
  50. if err != nil {
  51. err.Index = storeIndex
  52. return nil, err
  53. }
  54. w := &watcher{
  55. eventChan: make(chan *Event, 100), // use a buffered channel
  56. recursive: recursive,
  57. stream: stream,
  58. sinceIndex: index,
  59. startIndex: storeIndex,
  60. hub: wh,
  61. }
  62. // If the event exists in the known history, append the EtcdIndex and return immediately
  63. if event != nil {
  64. event.EtcdIndex = storeIndex
  65. w.eventChan <- event
  66. return w, nil
  67. }
  68. wh.mutex.Lock()
  69. defer wh.mutex.Unlock()
  70. l, ok := wh.watchers[key]
  71. var elem *list.Element
  72. if ok { // add the new watcher to the back of the list
  73. elem = l.PushBack(w)
  74. } else { // create a new list and add the new watcher
  75. l = list.New()
  76. elem = l.PushBack(w)
  77. wh.watchers[key] = l
  78. }
  79. w.remove = func() {
  80. if w.removed { // avoid removing it twice
  81. return
  82. }
  83. w.removed = true
  84. l.Remove(elem)
  85. atomic.AddInt64(&wh.count, -1)
  86. if l.Len() == 0 {
  87. delete(wh.watchers, key)
  88. }
  89. }
  90. atomic.AddInt64(&wh.count, 1)
  91. return w, nil
  92. }
  93. // notify function accepts an event and notify to the watchers.
  94. func (wh *watcherHub) notify(e *Event) {
  95. e = wh.EventHistory.addEvent(e) // add event into the eventHistory
  96. segments := strings.Split(e.Node.Key, "/")
  97. currPath := "/"
  98. // walk through all the segments of the path and notify the watchers
  99. // if the path is "/foo/bar", it will notify watchers with path "/",
  100. // "/foo" and "/foo/bar"
  101. for _, segment := range segments {
  102. currPath = path.Join(currPath, segment)
  103. // notify the watchers who interests in the changes of current path
  104. wh.notifyWatchers(e, currPath, false)
  105. }
  106. }
  107. func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
  108. wh.mutex.Lock()
  109. defer wh.mutex.Unlock()
  110. l, ok := wh.watchers[nodePath]
  111. if ok {
  112. curr := l.Front()
  113. for curr != nil {
  114. next := curr.Next() // save reference to the next one in the list
  115. w, _ := curr.Value.(*watcher)
  116. originalPath := (e.Node.Key == nodePath)
  117. if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
  118. if !w.stream { // do not remove the stream watcher
  119. // if we successfully notify a watcher
  120. // we need to remove the watcher from the list
  121. // and decrease the counter
  122. l.Remove(curr)
  123. atomic.AddInt64(&wh.count, -1)
  124. }
  125. }
  126. curr = next // update current to the next element in the list
  127. }
  128. if l.Len() == 0 {
  129. // if we have notified all watcher in the list
  130. // we can delete the list
  131. delete(wh.watchers, nodePath)
  132. }
  133. }
  134. }
  135. // clone function clones the watcherHub and return the cloned one.
  136. // only clone the static content. do not clone the current watchers.
  137. func (wh *watcherHub) clone() *watcherHub {
  138. clonedHistory := wh.EventHistory.clone()
  139. return &watcherHub{
  140. EventHistory: clonedHistory,
  141. }
  142. }
  143. // isHidden checks to see if key path is considered hidden to watch path i.e. the
  144. // last element is hidden or it's within a hidden directory
  145. func isHidden(watchPath, keyPath string) bool {
  146. // When deleting a directory, watchPath might be deeper than the actual keyPath
  147. // For example, when deleting /foo we also need to notify watchers on /foo/bar.
  148. if len(watchPath) > len(keyPath) {
  149. return false
  150. }
  151. // if watch path is just a "/", after path will start without "/"
  152. // add a "/" to deal with the special case when watchPath is "/"
  153. afterPath := path.Clean("/" + keyPath[len(watchPath):])
  154. return strings.Contains(afterPath, "/_")
  155. }