Browse Source

refactor watcher.go

Xiang Li 12 years ago
parent
commit
a71838a59b
6 changed files with 169 additions and 117 deletions
  1. 3 1
      etcd_handlers.go
  2. 3 0
      store/node.go
  3. 2 1
      store/store.go
  4. 0 4
      store/store_test.go
  5. 19 111
      store/watcher.go
  6. 142 0
      store/watcher_hub.go

+ 3 - 1
etcd_handlers.go

@@ -24,12 +24,14 @@ func NewEtcdMuxer() *http.ServeMux {
 	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
 	etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
+	etcdMux.HandleFunc("/test/", TestHttpHandler)
 
+	// backward support
 	etcdMux.Handle("/v1/keys/", errorHandler(MultiplexerV1))
 	etcdMux.Handle("/v1/leader", errorHandler(LeaderHttpHandler))
 	etcdMux.Handle("/v1/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/v1/stats/", errorHandler(StatsHttpHandler))
-	etcdMux.HandleFunc("/test/", TestHttpHandler)
+
 	return etcdMux
 }
 

+ 3 - 0
store/node.go

@@ -349,6 +349,9 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 
 func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
 	if !n.IsPermanent() {
+		// check if the node has been expired
+		// if the node is not expired, we need to stop the go routine associated with
+		// that node.
 		expired, _ := n.IsExpired()
 
 		if !expired {

+ 2 - 1
store/store.go

@@ -226,7 +226,8 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6
 	}
 
 	callback := func(path string) { // notify function
-		s.WatcherHub.notifyWithPath(e, path, true)
+		// notify the watchers with delted set true
+		s.WatcherHub.notifyWatchers(e, path, true)
 	}
 
 	err = n.Remove(recursive, callback)

+ 0 - 4
store/store_test.go

@@ -124,10 +124,6 @@ func TestUpdateFile(t *testing.T) {
 		t.Fatalf("cannot get sub dir before expiration [%s]", err.Error())
 	}
 
-	/*if e.KVPairs[2].Key != "/foo/foo/foo2/boo" || e.KVPairs[2].Value != "boo1" {
-		t.Fatalf("cannot get sub node of sub dir before expiration [%s]", err.Error())
-	}*/
-
 	// wait for expiration
 	time.Sleep(time.Second * 3)
 	e, err = s.Get("/foo/foo", true, false, 7, 1)

+ 19 - 111
store/watcher.go

@@ -1,125 +1,33 @@
 package store
 
-import (
-	"container/list"
-	"path"
-	"strings"
-	"sync/atomic"
-
-	etcdErr "github.com/coreos/etcd/error"
-)
-
-type watcherHub struct {
-	watchers     map[string]*list.List
-	count        int64 // current number of watchers.
-	EventHistory *EventHistory
-}
-
 type watcher struct {
 	eventChan  chan *Event
 	recursive  bool
 	sinceIndex uint64
 }
 
-func newWatchHub(capacity int) *watcherHub {
-	return &watcherHub{
-		watchers:     make(map[string]*list.List),
-		EventHistory: newEventHistory(capacity),
-	}
-}
-
-// watch function returns an Event channel.
-// If recursive is true, the first change after index under prefix will be sent to the event channel.
-// If recursive is false, the first change after index at prefix will be sent to the event channel.
-// If index is zero, watch will start from the current index + 1.
-func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
-	eventChan := make(chan *Event, 1)
-
-	e, err := wh.EventHistory.scan(prefix, index)
-
-	if err != nil {
-		return nil, err
-	}
-
-	if e != nil {
-		eventChan <- e
-		return eventChan, nil
-	}
-
-	w := &watcher{
-		eventChan:  eventChan,
-		recursive:  recursive,
-		sinceIndex: index - 1, // to catch Expire()
-	}
-
-	l, ok := wh.watchers[prefix]
-
-	if ok { // add the new watcher to the back of the list
-		l.PushBack(w)
-
-	} else { // create a new list and add the new watcher
-		l := list.New()
-		l.PushBack(w)
-		wh.watchers[prefix] = l
-	}
-
-	atomic.AddInt64(&wh.count, 1)
-
-	return eventChan, nil
-}
-
-func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
-	l, ok := wh.watchers[path]
-
-	if ok {
-		curr := l.Front()
-		notifiedAll := true
-
-		for {
-			if curr == nil { // we have reached the end of the list
-				if notifiedAll {
-					// if we have notified all watcher in the list
-					// we can delete the list
-					delete(wh.watchers, path)
-				}
-
-				break
-			}
-
-			next := curr.Next() // save the next
+// notify function notifies the watcher. If the watcher interests in the given path,
+// the function will return true.
+func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
+	// watcher is interested the path in three cases and under one condition
+	// the condition is that the event happens after the watcher's sinceIndex
 
-			w, _ := curr.Value.(*watcher)
-			if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
-				w.eventChan <- e
-				l.Remove(curr)
-				atomic.AddInt64(&wh.count, -1)
-			} else {
-				notifiedAll = false
-			}
+	// 1. the path at which the event happens is the path the watcher is watching at.
+	// For example if the watcher is watching at "/foo" and the event happens at "/foo",
+	// the watcher must be interested in that event.
 
-			curr = next // go to the next one
-		}
-	}
-}
-
-func (wh *watcherHub) notify(e *Event) {
-	e = wh.EventHistory.addEvent(e)
-
-	segments := strings.Split(e.Key, "/")
-
-	currPath := "/"
-
-	// walk through all the paths
-	for _, segment := range segments {
-		currPath = path.Join(currPath, segment)
-		wh.notifyWithPath(e, currPath, false)
-	}
-}
+	// 2. the watcher is a recursive watcher, it interests in the event happens after
+	// its watching path. For example if watcher A watches at "/foo" and it is a recursive
+	// one, it will interest in the event happens at "/foo/bar".
 
-func (wh *watcherHub) clone() *watcherHub {
-	clonedHistory := wh.EventHistory.clone()
+	// 3. when we delete a directory, we need to force notify all the watchers who watches
+	// at the file we need to delete.
+	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
+	// should get notified even if "/foo" is not the path it is watching.
 
-	return &watcherHub{
-		EventHistory: clonedHistory,
+	if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
+		w.eventChan <- e
+		return true
 	}
+	return false
 }

+ 142 - 0
store/watcher_hub.go

@@ -0,0 +1,142 @@
+package store
+
+import (
+	"container/list"
+	"path"
+	"strings"
+	"sync/atomic"
+
+	etcdErr "github.com/coreos/etcd/error"
+)
+
+// A watcherHub contains all subscribed watchers
+// watchers is a map with watched path as key and watcher as value
+// EventHistory keeps the old events for watcherHub. It is used to help
+// watcher to get a continuous event history. Or a watcher might miss the
+// event happens between the end of the first watch command and the start
+// of the second command.
+type watcherHub struct {
+	watchers     map[string]*list.List
+	count        int64 // current number of watchers.
+	EventHistory *EventHistory
+}
+
+// newWatchHub creates a watchHub. The capacity determines how many events we will
+// keep in the eventHistory.
+// Typically, we only need to keep a small size of history[smaller than 20K].
+// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
+func newWatchHub(capacity int) *watcherHub {
+	return &watcherHub{
+		watchers:     make(map[string]*list.List),
+		EventHistory: newEventHistory(capacity),
+	}
+}
+
+// watch function returns an Event channel.
+// If recursive is true, the first change after index under prefix will be sent to the event channel.
+// If recursive is false, the first change after index at prefix will be sent to the event channel.
+// If index is zero, watch will start from the current index + 1.
+func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
+	eventChan := make(chan *Event, 1)
+
+	e, err := wh.EventHistory.scan(prefix, index)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if e != nil {
+		eventChan <- e
+		return eventChan, nil
+	}
+
+	w := &watcher{
+		eventChan:  eventChan,
+		recursive:  recursive,
+		sinceIndex: index - 1, // to catch Expire()
+	}
+
+	l, ok := wh.watchers[prefix]
+
+	if ok { // add the new watcher to the back of the list
+		l.PushBack(w)
+
+	} else { // create a new list and add the new watcher
+		l := list.New()
+		l.PushBack(w)
+		wh.watchers[prefix] = l
+	}
+
+	atomic.AddInt64(&wh.count, 1)
+
+	return eventChan, nil
+}
+
+// notify function accepts an event and notify to the watchers.
+func (wh *watcherHub) notify(e *Event) {
+	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
+
+	segments := strings.Split(e.Key, "/")
+
+	currPath := "/"
+
+	// walk through all the segments of the path and notify the watchers
+	// if the path is "/foo/bar", it will notify watchers with path "/",
+	// "/foo" and "/foo/bar"
+
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+		// notify the watchers who interests in the changes of current path
+		wh.notifyWatchers(e, currPath, false)
+	}
+}
+
+func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
+	l, ok := wh.watchers[path]
+
+	if ok {
+		curr := l.Front()
+		notifiedAll := true
+
+		for {
+			if curr == nil { // we have reached the end of the list
+				if notifiedAll {
+					// if we have notified all watcher in the list
+					// we can delete the list
+					delete(wh.watchers, path)
+				}
+
+				break
+			}
+
+			next := curr.Next() // save reference to the next one in the list
+
+			w, _ := curr.Value.(*watcher)
+
+			if w.notify(e, e.Key == path, deleted) {
+				// if we successfully notify a watcher
+				// we need to remove the watcher from the list
+				// and decrease the counter
+
+				l.Remove(curr)
+				atomic.AddInt64(&wh.count, -1)
+			} else {
+				// once there is a watcher in the list is not interested
+				// in the event, we should keep the list in the map
+				notifiedAll = false
+			}
+
+			curr = next // update current to the next
+		}
+	}
+}
+
+// clone function clones the watcherHub and return the cloned one.
+// only clone the static content. do not clone the current watchers.
+func (wh *watcherHub) clone() *watcherHub {
+	clonedHistory := wh.EventHistory.clone()
+
+	return &watcherHub{
+		EventHistory: clonedHistory,
+	}
+}