Browse Source

fix(store/watch): fix the slow consumer bug

Cenk Alti 12 years ago
parent
commit
8bed1e1f15
2 changed files with 10 additions and 13 deletions
  1. 9 9
      store/watcher.go
  2. 1 4
      store/watcher_hub.go

+ 9 - 9
store/watcher.go

@@ -21,6 +21,7 @@ type Watcher struct {
 	stream     bool
 	stream     bool
 	recursive  bool
 	recursive  bool
 	sinceIndex uint64
 	sinceIndex uint64
+	hub        *watcherHub
 	removed    bool
 	removed    bool
 	remove     func()
 	remove     func()
 }
 }
@@ -51,8 +52,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 		select {
 		select {
 		case w.EventChan <- e:
 		case w.EventChan <- e:
 		default:
 		default:
-			// We have missed a notification. Close the channel to indicate this situation.
-			close(w.EventChan)
+			// We have missed a notification. Remove the watcher.
+			// Removing the watcher also closes the EventChan.
+			w.remove()
 		}
 		}
 		return true
 		return true
 	}
 	}
@@ -62,11 +64,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 // Remove removes the watcher from watcherHub
 // Remove removes the watcher from watcherHub
 // The actual remove function is guaranteed to only be executed once
 // The actual remove function is guaranteed to only be executed once
 func (w *Watcher) Remove() {
 func (w *Watcher) Remove() {
-	if w.remove != nil {
-		w.remove()
-	} else {
-		// We attached a remove function to watcher
-		// Other pkg cannot change it, so this should not happen
-		panic("missing Watcher remove function")
-	}
+	w.hub.mutex.Lock()
+	defer w.hub.mutex.Unlock()
+
+	close(w.EventChan)
+	w.remove()
 }
 }

+ 1 - 4
store/watcher_hub.go

@@ -50,6 +50,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
 		recursive:  recursive,
 		recursive:  recursive,
 		stream:     stream,
 		stream:     stream,
 		sinceIndex: index,
 		sinceIndex: index,
+		hub:        wh,
 	}
 	}
 
 
 	if event != nil {
 	if event != nil {
@@ -77,10 +78,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
 		if w.removed { // avoid remove it twice
 		if w.removed { // avoid remove it twice
 			return
 			return
 		}
 		}
-
-		wh.mutex.Lock()
-		defer wh.mutex.Unlock()
-
 		w.removed = true
 		w.removed = true
 		l.Remove(elem)
 		l.Remove(elem)
 		atomic.AddInt64(&wh.count, -1)
 		atomic.AddInt64(&wh.count, -1)