Browse Source

Merge pull request #573 from cenkalti/stream-bug

Store: Fix slow consumer bug
Xiang Li 12 years ago
parent
commit
93a02b619e
3 changed files with 25 additions and 13 deletions
  1. 15 0
      store/store_test.go
  2. 9 9
      store/watcher.go
  3. 1 4
      store/watcher_hub.go

+ 15 - 0
store/store_test.go

@@ -810,6 +810,21 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
 	assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
 }
 
+// Ensure that slow consumers are handled properly.
+//
+// Since Watcher.EventChan has a buffer of size 1 we can only queue 1
+// event per watcher. If the consumer cannot consume the event on time and
+// another event arrives, the channel is closed and event is discarded.
+// This test ensures that after closing the channel, the store can continue
+// to operate correctly.
+func TestStoreWatchSlowConsumer(t *testing.T) {
+	s := newStore()
+	s.Watch("/foo", true, true, 0)       // stream must be true
+	s.Set("/foo", false, "1", Permanent) // ok
+	s.Set("/foo", false, "2", Permanent) // ok
+	s.Set("/foo", false, "3", Permanent) // must not panic
+}
+
 // Performs a non-blocking select on an event channel.
 func nbselect(c <-chan *Event) *Event {
 	select {

+ 9 - 9
store/watcher.go

@@ -21,6 +21,7 @@ type Watcher struct {
 	stream     bool
 	recursive  bool
 	sinceIndex uint64
+	hub        *watcherHub
 	removed    bool
 	remove     func()
 }
@@ -51,8 +52,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 		select {
 		case w.EventChan <- e:
 		default:
-			// We have missed a notification. Close the channel to indicate this situation.
-			close(w.EventChan)
+			// We have missed a notification. Remove the watcher.
+			// Removing the watcher also closes the EventChan.
+			w.remove()
 		}
 		return true
 	}
@@ -62,11 +64,9 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 // Remove removes the watcher from watcherHub
 // The actual remove function is guaranteed to only be executed once
 func (w *Watcher) Remove() {
-	if w.remove != nil {
-		w.remove()
-	} else {
-		// We attached a remove function to watcher
-		// Other pkg cannot change it, so this should not happen
-		panic("missing Watcher remove function")
-	}
+	w.hub.mutex.Lock()
+	defer w.hub.mutex.Unlock()
+
+	close(w.EventChan)
+	w.remove()
 }

+ 1 - 4
store/watcher_hub.go

@@ -50,6 +50,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
 		recursive:  recursive,
 		stream:     stream,
 		sinceIndex: index,
+		hub:        wh,
 	}
 
 	if event != nil {
@@ -77,10 +78,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
 		if w.removed { // avoid remove it twice
 			return
 		}
-
-		wh.mutex.Lock()
-		defer wh.mutex.Unlock()
-
 		w.removed = true
 		l.Remove(elem)
 		atomic.AddInt64(&wh.count, -1)