123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- // Copyright 2015 CoreOS, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package store
- import (
- "container/list"
- "path"
- "strings"
- "sync"
- "sync/atomic"
- etcdErr "github.com/coreos/etcd/error"
- )
- // A watcherHub contains all subscribed watchers
- // watchers is a map with watched path as key and watcher as value
- // EventHistory keeps the old events for watcherHub. It is used to help
- // watcher to get a continuous event history. Or a watcher might miss the
- // event happens between the end of the first watch command and the start
- // of the second command.
- type watcherHub struct {
- // count must be the first element to keep 64-bit alignment for atomic
- // access
- count int64 // current number of watchers.
- mutex sync.Mutex
- watchers map[string]*list.List
- EventHistory *EventHistory
- }
- // newWatchHub creates a watchHub. The capacity determines how many events we will
- // keep in the eventHistory.
- // Typically, we only need to keep a small size of history[smaller than 20K].
- // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
- func newWatchHub(capacity int) *watcherHub {
- return &watcherHub{
- watchers: make(map[string]*list.List),
- EventHistory: newEventHistory(capacity),
- }
- }
- // Watch function returns a Watcher.
- // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
- // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
- // If index is zero, watch will start from the current index + 1.
- func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
- reportWatchRequest()
- event, err := wh.EventHistory.scan(key, recursive, index)
- if err != nil {
- err.Index = storeIndex
- return nil, err
- }
- w := &watcher{
- eventChan: make(chan *Event, 100), // use a buffered channel
- recursive: recursive,
- stream: stream,
- sinceIndex: index,
- startIndex: storeIndex,
- hub: wh,
- }
- wh.mutex.Lock()
- defer wh.mutex.Unlock()
- // If the event exists in the known history, append the EtcdIndex and return immediately
- if event != nil {
- event.EtcdIndex = storeIndex
- w.eventChan <- event
- return w, nil
- }
- l, ok := wh.watchers[key]
- var elem *list.Element
- if ok { // add the new watcher to the back of the list
- elem = l.PushBack(w)
- } else { // create a new list and add the new watcher
- l = list.New()
- elem = l.PushBack(w)
- wh.watchers[key] = l
- }
- w.remove = func() {
- if w.removed { // avoid removing it twice
- return
- }
- w.removed = true
- l.Remove(elem)
- atomic.AddInt64(&wh.count, -1)
- reportWatcherRemoved()
- if l.Len() == 0 {
- delete(wh.watchers, key)
- }
- }
- atomic.AddInt64(&wh.count, 1)
- reportWatcherAdded()
- return w, nil
- }
- // notify function accepts an event and notify to the watchers.
- func (wh *watcherHub) notify(e *Event) {
- e = wh.EventHistory.addEvent(e) // add event into the eventHistory
- segments := strings.Split(e.Node.Key, "/")
- currPath := "/"
- // walk through all the segments of the path and notify the watchers
- // if the path is "/foo/bar", it will notify watchers with path "/",
- // "/foo" and "/foo/bar"
- for _, segment := range segments {
- currPath = path.Join(currPath, segment)
- // notify the watchers who interests in the changes of current path
- wh.notifyWatchers(e, currPath, false)
- }
- }
- func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
- wh.mutex.Lock()
- defer wh.mutex.Unlock()
- l, ok := wh.watchers[nodePath]
- if ok {
- curr := l.Front()
- for curr != nil {
- next := curr.Next() // save reference to the next one in the list
- w, _ := curr.Value.(*watcher)
- originalPath := (e.Node.Key == nodePath)
- if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
- if !w.stream { // do not remove the stream watcher
- // if we successfully notify a watcher
- // we need to remove the watcher from the list
- // and decrease the counter
- w.removed = true
- l.Remove(curr)
- atomic.AddInt64(&wh.count, -1)
- reportWatcherRemoved()
- }
- }
- curr = next // update current to the next element in the list
- }
- if l.Len() == 0 {
- // if we have notified all watcher in the list
- // we can delete the list
- delete(wh.watchers, nodePath)
- }
- }
- }
- // clone function clones the watcherHub and return the cloned one.
- // only clone the static content. do not clone the current watchers.
- func (wh *watcherHub) clone() *watcherHub {
- clonedHistory := wh.EventHistory.clone()
- return &watcherHub{
- EventHistory: clonedHistory,
- }
- }
- // isHidden checks to see if key path is considered hidden to watch path i.e. the
- // last element is hidden or it's within a hidden directory
- func isHidden(watchPath, keyPath string) bool {
- // When deleting a directory, watchPath might be deeper than the actual keyPath
- // For example, when deleting /foo we also need to notify watchers on /foo/bar.
- if len(watchPath) > len(keyPath) {
- return false
- }
- // if watch path is just a "/", after path will start without "/"
- // add a "/" to deal with the special case when watchPath is "/"
- afterPath := path.Clean("/" + keyPath[len(watchPath):])
- return strings.Contains(afterPath, "/_")
- }
|