Browse Source

*: expose etcd-index in watch requests

This adds a StartIndex field to the Watcher interface, which represents
the Etcd-Index at which the Watcher is created.

Also refactors the HTTP tests to use a table for most handleWatch tests
Jonathan Boulle 11 years ago
parent
commit
1c11f6a144
6 changed files with 140 additions and 121 deletions
  1. 1 0
      etcdserver/etcdhttp/http.go
  2. 102 111
      etcdserver/etcdhttp/http_test.go
  3. 1 0
      etcdserver/server_test.go
  4. 29 10
      store/store_test.go
  5. 6 0
      store/watcher.go
  6. 1 0
      store/watcher_hub.go

+ 1 - 0
etcdserver/etcdhttp/http.go

@@ -352,6 +352,7 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
 	}
 
 	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
 	w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
 	w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
 	w.WriteHeader(http.StatusOK)

+ 102 - 111
etcdserver/etcdhttp/http_test.go

@@ -572,12 +572,14 @@ func TestWriteEvent(t *testing.T) {
 
 type dummyWatcher struct {
 	echan chan *store.Event
+	sidx  uint64
 }
 
 func (w *dummyWatcher) EventChan() chan *store.Event {
 	return w.echan
 }
-func (w *dummyWatcher) Remove() {}
+func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
+func (w *dummyWatcher) Remove()            {}
 
 func TestV2MachinesEndpoint(t *testing.T) {
 	tests := []struct {
@@ -997,75 +999,6 @@ func TestServeKeysWatch(t *testing.T) {
 	}
 }
 
-func TestHandleWatch(t *testing.T) {
-	rw := httptest.NewRecorder()
-	wa := &dummyWatcher{
-		echan: make(chan *store.Event, 1),
-	}
-	wa.echan <- &store.Event{
-		Action: store.Get,
-		Node:   &store.NodeExtern{},
-	}
-
-	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
-
-	wcode := http.StatusOK
-	wct := "application/json"
-	wri := "100"
-	wrt := "5"
-	wbody := mustMarshalEvent(
-		t,
-		&store.Event{
-			Action: store.Get,
-			Node:   &store.NodeExtern{},
-		},
-	)
-
-	if rw.Code != wcode {
-		t.Errorf("got code=%d, want %d", rw.Code, wcode)
-	}
-	h := rw.Header()
-	if ct := h.Get("Content-Type"); ct != wct {
-		t.Errorf("Content-Type=%q, want %q", ct, wct)
-	}
-	if ri := h.Get("X-Raft-Index"); ri != wri {
-		t.Errorf("X-Raft-Index=%q, want %q", ri, wri)
-	}
-	if rt := h.Get("X-Raft-Term"); rt != wrt {
-		t.Errorf("X-Raft-Term=%q, want %q", rt, wrt)
-	}
-	g := rw.Body.String()
-	if g != wbody {
-		t.Errorf("got body=%#v, want %#v", g, wbody)
-	}
-}
-
-func TestHandleWatchNoEvent(t *testing.T) {
-	rw := httptest.NewRecorder()
-	wa := &dummyWatcher{
-		echan: make(chan *store.Event, 1),
-	}
-	close(wa.echan)
-
-	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
-
-	wcode := http.StatusOK
-	wct := "application/json"
-	wbody := ""
-
-	if rw.Code != wcode {
-		t.Errorf("got code=%d, want %d", rw.Code, wcode)
-	}
-	h := rw.Header()
-	if ct := h.Get("Content-Type"); ct != wct {
-		t.Errorf("Content-Type=%q, want %q", ct, wct)
-	}
-	g := rw.Body.String()
-	if g != wbody {
-		t.Errorf("got body=%#v, want %#v", g, wbody)
-	}
-}
-
 type recordingCloseNotifier struct {
 	*httptest.ResponseRecorder
 	cn chan bool
@@ -1075,56 +1008,114 @@ func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool {
 	return rcn.cn
 }
 
-func TestHandleWatchCloseNotified(t *testing.T) {
-	rw := &recordingCloseNotifier{
-		ResponseRecorder: httptest.NewRecorder(),
-		cn:               make(chan bool, 1),
+func TestHandleWatch(t *testing.T) {
+	defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) {
+		r := httptest.NewRecorder()
+		return r, r
 	}
-	rw.cn <- true
-	wa := &dummyWatcher{}
+	noopEv := func(chan *store.Event) {}
 
-	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
+	tests := []struct {
+		getCtx   func() context.Context
+		getRwRr  func() (http.ResponseWriter, *httptest.ResponseRecorder)
+		doToChan func(chan *store.Event)
 
-	wcode := http.StatusOK
-	wct := "application/json"
-	wbody := ""
+		wbody string
+	}{
+		{
+			// Normal case: one event
+			context.Background,
+			defaultRwRr,
+			func(ch chan *store.Event) {
+				ch <- &store.Event{
+					Action: store.Get,
+					Node:   &store.NodeExtern{},
+				}
+			},
 
-	if rw.Code != wcode {
-		t.Errorf("got code=%d, want %d", rw.Code, wcode)
-	}
-	h := rw.Header()
-	if ct := h.Get("Content-Type"); ct != wct {
-		t.Errorf("Content-Type=%q, want %q", ct, wct)
-	}
-	g := rw.Body.String()
-	if g != wbody {
-		t.Errorf("got body=%#v, want %#v", g, wbody)
+			mustMarshalEvent(
+				t,
+				&store.Event{
+					Action: store.Get,
+					Node:   &store.NodeExtern{},
+				},
+			),
+		},
+		{
+			// Channel is closed, no event
+			context.Background,
+			defaultRwRr,
+			func(ch chan *store.Event) {
+				close(ch)
+			},
+
+			"",
+		},
+		{
+			// Simulate a timed-out context
+			func() context.Context {
+				ctx, cancel := context.WithCancel(context.Background())
+				cancel()
+				return ctx
+			},
+			defaultRwRr,
+			noopEv,
+
+			"",
+		},
+		{
+			// Close-notifying request
+			context.Background,
+			func() (http.ResponseWriter, *httptest.ResponseRecorder) {
+				rw := &recordingCloseNotifier{
+					ResponseRecorder: httptest.NewRecorder(),
+					cn:               make(chan bool, 1),
+				}
+				rw.cn <- true
+				return rw, rw.ResponseRecorder
+			},
+			noopEv,
+
+			"",
+		},
 	}
-}
 
-func TestHandleWatchTimeout(t *testing.T) {
-	rw := httptest.NewRecorder()
-	wa := &dummyWatcher{}
-	// Simulate a timed-out context
-	ctx, cancel := context.WithCancel(context.Background())
-	cancel()
+	for i, tt := range tests {
+		rw, rr := tt.getRwRr()
+		wa := &dummyWatcher{
+			echan: make(chan *store.Event, 1),
+			sidx:  10,
+		}
+		tt.doToChan(wa.echan)
 
-	handleWatch(ctx, rw, wa, false, dummyRaftTimer{})
+		handleWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
 
-	wcode := http.StatusOK
-	wct := "application/json"
-	wbody := ""
+		wcode := http.StatusOK
+		wct := "application/json"
+		wei := "10"
+		wri := "100"
+		wrt := "5"
 
-	if rw.Code != wcode {
-		t.Errorf("got code=%d, want %d", rw.Code, wcode)
-	}
-	h := rw.Header()
-	if ct := h.Get("Content-Type"); ct != wct {
-		t.Errorf("Content-Type=%q, want %q", ct, wct)
-	}
-	g := rw.Body.String()
-	if g != wbody {
-		t.Errorf("got body=%#v, want %#v", g, wbody)
+		if rr.Code != wcode {
+			t.Errorf("#%d: got code=%d, want %d", rr.Code, wcode)
+		}
+		h := rr.Header()
+		if ct := h.Get("Content-Type"); ct != wct {
+			t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct)
+		}
+		if ei := h.Get("X-Etcd-Index"); ei != wei {
+			t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei)
+		}
+		if ri := h.Get("X-Raft-Index"); ri != wri {
+			t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri)
+		}
+		if rt := h.Get("X-Raft-Term"); rt != wrt {
+			t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt)
+		}
+		g := rr.Body.String()
+		if g != tt.wbody {
+			t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody)
+		}
 	}
 }
 

+ 1 - 0
etcdserver/server_test.go

@@ -1059,6 +1059,7 @@ func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
 type stubWatcher struct{}
 
 func (w *stubWatcher) EventChan() chan *store.Event { return nil }
+func (w *stubWatcher) StartIndex() uint64           { return 0 }
 func (w *stubWatcher) Remove()                      {}
 
 // errStoreRecorder returns an store error on Get, Watch request

+ 29 - 10
store/store_test.go

@@ -586,10 +586,12 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 // Ensure that the store can watch for key creation.
 func TestStoreWatchCreate(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 1
+	var eidx uint64 = 0
 	w, _ := s.Watch("/foo", false, false, 0)
 	c := w.EventChan()
+	assert.Equal(t, w.StartIndex(), eidx, "")
 	s.Create("/foo", false, "bar", false, Permanent)
+	eidx = 1
 	e := nbselect(c)
 	assert.Equal(t, e.EtcdIndex, eidx, "")
 	assert.Equal(t, e.Action, "create", "")
@@ -601,8 +603,10 @@ func TestStoreWatchCreate(t *testing.T) {
 // Ensure that the store can watch for recursive key creation.
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 1
+	var eidx uint64 = 0
 	w, _ := s.Watch("/foo", true, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 1
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -613,9 +617,11 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
 // Ensure that the store can watch for key updates.
 func TestStoreWatchUpdate(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo", false, "bar", false, Permanent)
 	w, _ := s.Watch("/foo", false, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.Update("/foo", "baz", Permanent)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -626,9 +632,11 @@ func TestStoreWatchUpdate(t *testing.T) {
 // Ensure that the store can watch for recursive key updates.
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	w, _ := s.Watch("/foo", true, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.Update("/foo/bar", "baz", Permanent)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -639,9 +647,11 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
 // Ensure that the store can watch for key deletions.
 func TestStoreWatchDelete(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo", false, "bar", false, Permanent)
 	w, _ := s.Watch("/foo", false, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.Delete("/foo", false, false)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -652,9 +662,11 @@ func TestStoreWatchDelete(t *testing.T) {
 // Ensure that the store can watch for recursive key deletions.
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	w, _ := s.Watch("/foo", true, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.Delete("/foo/bar", false, false)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -665,9 +677,11 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
 // Ensure that the store can watch for CAS updates.
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo", false, "bar", false, Permanent)
 	w, _ := s.Watch("/foo", false, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -678,9 +692,11 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
 // Ensure that the store can watch for recursive CAS updates.
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 	s := newStore()
-	var eidx uint64 = 2
+	var eidx uint64 = 1
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	w, _ := s.Watch("/foo", true, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	eidx = 2
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	e := nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
@@ -698,22 +714,25 @@ func TestStoreWatchExpire(t *testing.T) {
 	}()
 	go mockSyncService(s.DeleteExpiredKeys, stopChan)
 
-	var eidx uint64 = 3
+	var eidx uint64 = 2
 	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 
 	w, _ := s.Watch("/", true, false, 0)
+	assert.Equal(t, w.StartIndex(), eidx, "")
 	c := w.EventChan()
 	e := nbselect(c)
 	assert.Nil(t, e, "")
 	time.Sleep(600 * time.Millisecond)
+	eidx = 3
 	e = nbselect(c)
 	assert.Equal(t, e.EtcdIndex, eidx, "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	w, _ = s.Watch("/", true, false, 4)
-	e = nbselect(w.EventChan())
 	eidx = 4
+	assert.Equal(t, w.StartIndex(), eidx, "")
+	e = nbselect(w.EventChan())
 	assert.Equal(t, e.EtcdIndex, eidx, "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foofoo", "")

+ 6 - 0
store/watcher.go

@@ -18,6 +18,7 @@ package store
 
 type Watcher interface {
 	EventChan() chan *Event
+	StartIndex() uint64 // The EtcdIndex at which the Watcher was created
 	Remove()
 }
 
@@ -26,6 +27,7 @@ type watcher struct {
 	stream     bool
 	recursive  bool
 	sinceIndex uint64
+	startIndex uint64
 	hub        *watcherHub
 	removed    bool
 	remove     func()
@@ -35,6 +37,10 @@ func (w *watcher) EventChan() chan *Event {
 	return w.eventChan
 }
 
+func (w *watcher) StartIndex() uint64 {
+	return w.startIndex
+}
+
 // notify function notifies the watcher. If the watcher interests in the given path,
 // the function will return true.
 func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {

+ 1 - 0
store/watcher_hub.go

@@ -51,6 +51,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
 		recursive:  recursive,
 		stream:     stream,
 		sinceIndex: index,
+		startIndex: storeIndex,
 		hub:        wh,
 	}