| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package store
- import (
- "path"
- "strconv"
- "strings"
- )
- //------------------------------------------------------------------------------
- //
- // Typedefs
- //
- //------------------------------------------------------------------------------
- // WatcherHub is where the client register its watcher
- type WatcherHub struct {
- watchers map[string][]*Watcher
- }
- // Currently watcher only contains a response channel
- type Watcher struct {
- C chan *Response
- }
- // Create a new watcherHub
- func newWatcherHub() *WatcherHub {
- w := new(WatcherHub)
- w.watchers = make(map[string][]*Watcher)
- return w
- }
- // Create a new watcher
- func NewWatcher() *Watcher {
- return &Watcher{C: make(chan *Response, 1)}
- }
- // Add a watcher to the watcherHub
- func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
- responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error {
- prefix = path.Clean("/" + prefix)
- if sinceIndex != 0 && sinceIndex >= responseStartIndex {
- for i := sinceIndex; i <= currentIndex; i++ {
- if checkResponse(prefix, i, resMap) {
- watcher.C <- (*resMap)[strconv.FormatUint(i, 10)]
- return nil
- }
- }
- }
- _, ok := w.watchers[prefix]
- if !ok {
- w.watchers[prefix] = make([]*Watcher, 0)
- w.watchers[prefix] = append(w.watchers[prefix], watcher)
- } else {
- w.watchers[prefix] = append(w.watchers[prefix], watcher)
- }
- return nil
- }
- // Check if the response has what we are watching
- func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool {
- resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
- if !ok {
- // not storage system command
- return false
- } else {
- path := resp.Key
- if strings.HasPrefix(path, prefix) {
- prefixLen := len(prefix)
- if len(path) == prefixLen || path[prefixLen] == '/' {
- return true
- }
- }
- }
- return false
- }
- // Notify the watcher a action happened
- func (w *WatcherHub) notify(resp Response) error {
- resp.Key = path.Clean(resp.Key)
- segments := strings.Split(resp.Key, "/")
- currPath := "/"
- // walk through all the pathes
- for _, segment := range segments {
- currPath = path.Join(currPath, segment)
- watchers, ok := w.watchers[currPath]
- if ok {
- newWatchers := make([]*Watcher, 0)
- // notify all the watchers
- for _, watcher := range watchers {
- watcher.C <- &resp
- }
- if len(newWatchers) == 0 {
- // we have notified all the watchers at this path
- // delete the map
- delete(w.watchers, currPath)
- } else {
- w.watchers[currPath] = newWatchers
- }
- }
- }
- return nil
- }
- // stopWatchers stops all the watchers
- // This function is used when the etcd recovery from a snapshot at runtime
- func (w *WatcherHub) stopWatchers() {
- for _, subWatchers := range w.watchers {
- for _, watcher := range subWatchers {
- watcher.C <- nil
- }
- }
- w.watchers = nil
- }
|