Browse Source

feat wathch for expiring need to be pending

Xiang Li 12 years ago
parent
commit
55058c64f5
2 changed files with 22 additions and 6 deletions
  1. 2 0
      store/store.go
  2. 20 6
      store/watcher_hub.go

+ 2 - 0
store/store.go

@@ -451,6 +451,8 @@ func (s *store) deleteExpiredKeys(cutoff time.Time) {
 		s.Stats.Inc(ExpireCount)
 		s.WatcherHub.notify(newEvent(Expire, node.Path, s.Index, s.Term))
 	}
+
+	s.WatcherHub.clearPendingWatchers()
 }
 
 // checkDir function will check whether the component is a directory under parent node.

+ 20 - 6
store/watcher_hub.go

@@ -16,9 +16,10 @@ import (
 // event happens between the end of the first watch command and the start
 // of the second command.
 type watcherHub struct {
-	watchers     map[string]*list.List
-	count        int64 // current number of watchers.
-	EventHistory *EventHistory
+	watchers        map[string]*list.List
+	count           int64 // current number of watchers.
+	EventHistory    *EventHistory
+	pendingWatchers *list.List
 }
 
 // newWatchHub creates a watchHub. The capacity determines how many events we will
@@ -27,8 +28,9 @@ type watcherHub struct {
 // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
 func newWatchHub(capacity int) *watcherHub {
 	return &watcherHub{
-		watchers:     make(map[string]*list.List),
-		EventHistory: newEventHistory(capacity),
+		watchers:        make(map[string]*list.List),
+		EventHistory:    newEventHistory(capacity),
+		pendingWatchers: list.New(),
 	}
 }
 
@@ -117,9 +119,13 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 				// if we successfully notify a watcher
 				// we need to remove the watcher from the list
 				// and decrease the counter
-
 				l.Remove(curr)
 				atomic.AddInt64(&wh.count, -1)
+
+				if e.Action == Expire {
+					wh.pendingWatchers.PushBack(w)
+				}
+
 			} else {
 				// once there is a watcher in the list is not interested
 				// in the event, we should keep the list in the map
@@ -131,6 +137,14 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 	}
 }
 
+func (wh *watcherHub) clearPendingWatchers() {
+	for e := wh.pendingWatchers.Front(); e != nil; e = e.Next() {
+		w, _ := e.Value.(*watcher)
+		w.eventChan <- nil
+	}
+	wh.pendingWatchers = list.New()
+}
+
 // clone function clones the watcherHub and return the cloned one.
 // only clone the static content. do not clone the current watchers.
 func (wh *watcherHub) clone() *watcherHub {