Browse Source

Merge pull request #16 from cenkalti/li

feat(stream watchers) end streaming if too many notifications
Xiang Li 12 years ago
parent
commit
629a827ee2
2 changed files with 14 additions and 9 deletions
  1. 8 1
      server/v2/get_handler.go
  2. 6 8
      store/watcher.go

+ 8 - 1
server/v2/get_handler.go

@@ -80,7 +80,14 @@ func handleWatch(key string, recursive, stream bool, waitIndex string, w http.Re
 			case <-closeChan:
 			case <-closeChan:
 				chunkWriter.Close()
 				chunkWriter.Close()
 				return nil
 				return nil
-			case event := <-watcher.EventChan:
+			case event, ok := <-watcher.EventChan:
+				if !ok {
+					// If the channel is closed this may be an indication of
+					// that notifications are much more than we are able to
+					// send to the client in time. Then we simply end streaming.
+					return nil
+				}
+
 				b, _ := json.Marshal(event)
 				b, _ := json.Marshal(event)
 				_, err := chunkWriter.Write(b)
 				_, err := chunkWriter.Write(b)
 				if err != nil {
 				if err != nil {

+ 6 - 8
store/watcher.go

@@ -44,17 +44,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// should get notified even if "/foo" is not the path it is watching.
 	// should get notified even if "/foo" is not the path it is watching.
 	if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
 	if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
+		// We cannot block here if the EventChan capacity is full, otherwise
+		// etcd will hang. EventChan capacity is full when the rate of
+		// notifications are higher than our send rate.
+		// If this happens, we close the channel.
 		select {
 		select {
 		case w.EventChan <- e:
 		case w.EventChan <- e:
-
-		// the stream watcher might be slow
-		// but we cannot block here. blocking will lead the whole etcd system to hang.
-		// create a go-routine to handle the blocking case
 		default:
 		default:
-			go func() {
-				// TODO add a warning here should be helpful
-				w.EventChan <- e
-			}()
+			// We have missed a notification. Close the channel to indicate this situation.
+			close(w.EventChan)
 		}
 		}
 		return true
 		return true
 	}
 	}