Browse Source

etcdserver: use Context to communicate timeout, and add tests

Jonathan Boulle 11 years ago
parent
commit
ddc30c0a33
2 changed files with 152 additions and 5 deletions
  1. 8 5
      etcdserver/etcdhttp/http.go
  2. 144 0
      etcdserver/etcdhttp/http_test.go

+ 8 - 5
etcdserver/etcdhttp/http.go

@@ -100,7 +100,9 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
 			log.Println("error writing event: %v", err)
 		}
 	case resp.Watcher != nil:
-		handleWatch(w, resp.Watcher, rr.Stream)
+		ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
+		defer cancel()
+		handleWatch(ctx, w, resp.Watcher, rr.Stream)
 	default:
 		writeError(w, errors.New("received response with no Event/Watcher!"))
 	}
@@ -313,17 +315,17 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
 	return json.NewEncoder(w).Encode(ev)
 }
 
-func handleWatch(w http.ResponseWriter, wa store.Watcher, stream bool) {
+func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
 	defer wa.Remove()
 	ech := wa.EventChan()
-	tch := time.After(defaultWatchTimeout)
 	var nch <-chan bool
 	if x, ok := w.(http.CloseNotifier); ok {
 		nch = x.CloseNotify()
 	}
 
 	w.Header().Set("Content-Type", "application/json")
-	// WriteHeader will implicitly write a Transfer-Encoding: chunked header, so no need to do it explicitly
+	// WriteHeader will do this implicitly, but best to be explicit.
+	w.Header().Set("Transfer-Encoding", "chunked")
 	w.WriteHeader(http.StatusOK)
 
 	// Ensure headers are flushed early, in case of long polling
@@ -336,7 +338,8 @@ func handleWatch(w http.ResponseWriter, wa store.Watcher, stream bool) {
 		case <-nch:
 			// Client closed connection. Nothing to do.
 			return
-		case <-tch:
+		case <-ctx.Done():
+			// Timed out. Close the connection gracefully.
 			cw.Close()
 			return
 		case ev, ok := <-ech:

+ 144 - 0
etcdserver/etcdhttp/http_test.go

@@ -974,3 +974,147 @@ func TestServeKeysWatch(t *testing.T) {
 		t.Errorf("got body=%#v, want %#v", g, wbody)
 	}
 }
+
+func TestHandleWatch(t *testing.T) {
+	rw := httptest.NewRecorder()
+	wa := &dummyWatcher{
+		echan: make(chan *store.Event, 1),
+	}
+	wa.echan <- &store.Event{
+		Action: store.Get,
+		Node:   &store.NodeExtern{},
+	}
+
+	handleWatch(context.Background(), rw, wa, false)
+
+	wcode := http.StatusOK
+	wte := "chunked"
+	wct := "application/json"
+	wbody := mustMarshalEvent(
+		t,
+		&store.Event{
+			Action: store.Get,
+			Node:   &store.NodeExtern{},
+		},
+	)
+	wbody = fmt.Sprintf("%x\r\n%s\r\n", len(wbody), wbody)
+
+	if rw.Code != wcode {
+		t.Errorf("got code=%d, want %d", rw.Code, wcode)
+	}
+	h := rw.Header()
+	if ct := h.Get("Content-Type"); ct != wct {
+		t.Errorf("Content-Type=%q, want %q", ct, wct)
+	}
+	if te := h.Get("Transfer-Encoding"); te != wte {
+		t.Errorf("Transfer-Encoding=%q, want %q", te, wte)
+	}
+	g := rw.Body.String()
+	if g != wbody {
+		t.Errorf("got body=%#v, want %#v", g, wbody)
+	}
+}
+
+func TestHandleWatchNoEvent(t *testing.T) {
+	rw := httptest.NewRecorder()
+	wa := &dummyWatcher{
+		echan: make(chan *store.Event, 1),
+	}
+	close(wa.echan)
+
+	handleWatch(context.Background(), rw, wa, false)
+
+	wcode := http.StatusOK
+	wte := "chunked"
+	wct := "application/json"
+	wbody := ""
+
+	if rw.Code != wcode {
+		t.Errorf("got code=%d, want %d", rw.Code, wcode)
+	}
+	h := rw.Header()
+	if ct := h.Get("Content-Type"); ct != wct {
+		t.Errorf("Content-Type=%q, want %q", ct, wct)
+	}
+	if te := h.Get("Transfer-Encoding"); te != wte {
+		t.Errorf("Transfer-Encoding=%q, want %q", te, wte)
+	}
+	g := rw.Body.String()
+	if g != wbody {
+		t.Errorf("got body=%#v, want %#v", g, wbody)
+	}
+}
+
+type recordingCloseNotifier struct {
+	*httptest.ResponseRecorder
+	cn chan bool
+}
+
+func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool {
+	return rcn.cn
+}
+
+func TestHandleWatchCloseNotified(t *testing.T) {
+	rw := &recordingCloseNotifier{
+		ResponseRecorder: httptest.NewRecorder(),
+		cn:               make(chan bool, 1),
+	}
+	rw.cn <- true
+	wa := &dummyWatcher{}
+
+	handleWatch(context.Background(), rw, wa, false)
+
+	wcode := http.StatusOK
+	wte := "chunked"
+	wct := "application/json"
+	wbody := ""
+
+	if rw.Code != wcode {
+		t.Errorf("got code=%d, want %d", rw.Code, wcode)
+	}
+	h := rw.Header()
+	if ct := h.Get("Content-Type"); ct != wct {
+		t.Errorf("Content-Type=%q, want %q", ct, wct)
+	}
+	if te := h.Get("Transfer-Encoding"); te != wte {
+		t.Errorf("Transfer-Encoding=%q, want %q", te, wte)
+	}
+	g := rw.Body.String()
+	if g != wbody {
+		t.Errorf("got body=%#v, want %#v", g, wbody)
+	}
+}
+
+func TestHandleWatchTimeout(t *testing.T) {
+	rw := httptest.NewRecorder()
+	wa := &dummyWatcher{}
+	// Simulate a timed-out context
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel()
+
+	handleWatch(ctx, rw, wa, false)
+
+	wcode := http.StatusOK
+	wte := "chunked"
+	wct := "application/json"
+	wbody := "0\r\n"
+
+	if rw.Code != wcode {
+		t.Errorf("got code=%d, want %d", rw.Code, wcode)
+	}
+	h := rw.Header()
+	if ct := h.Get("Content-Type"); ct != wct {
+		t.Errorf("Content-Type=%q, want %q", ct, wct)
+	}
+	if te := h.Get("Transfer-Encoding"); te != wte {
+		t.Errorf("Transfer-Encoding=%q, want %q", te, wte)
+	}
+	g := rw.Body.String()
+	if g != wbody {
+		t.Errorf("got body=%#v, want %#v", g, wbody)
+	}
+}
+
+func TestHandleWatchStreaming(t *testing.T) {
+	// TODO(jonboulle): me
+}