Browse Source

Merge pull request #5251 from heyitsanthony/fix-watch-panic

clientv3: gracefully handle watcher resume on compacted revision
Anthony Romano 9 years ago
parent
commit
b7761530e1
3 changed files with 60 additions and 1 deletions
  1. 56 0
      clientv3/integration/watch_test.go
  2. 2 1
      clientv3/watch.go
  3. 2 0
      integration/cluster.go

+ 56 - 0
clientv3/integration/watch_test.go

@@ -334,6 +334,62 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 	}
 }
 
+// TestWatchResumeComapcted checks that the watcher gracefully closes in case
+// that it tries to resume to a revision that's been compacted out of the store.
+func TestWatchResumeCompacted(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	// create a waiting watcher at rev 1
+	w := clientv3.NewWatcher(clus.Client(0))
+	defer w.Close()
+	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1))
+	select {
+	case w := <-wch:
+		t.Errorf("unexpected message from wch %v", w)
+	default:
+	}
+	clus.Members[0].Stop(t)
+
+	ticker := time.After(time.Second * 10)
+	for clus.WaitLeader(t) <= 0 {
+		select {
+		case <-ticker:
+			t.Fatalf("failed to wait for new leader")
+		default:
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+
+	// put some data and compact away
+	kv := clientv3.NewKV(clus.Client(1))
+	for i := 0; i < 5; i++ {
+		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
+			t.Fatal(err)
+		}
+	}
+	if err := kv.Compact(context.TODO(), 3); err != nil {
+		t.Fatal(err)
+	}
+
+	clus.Members[0].Restart(t)
+
+	// get compacted error message
+	wresp, ok := <-wch
+	if !ok {
+		t.Fatalf("expected wresp, but got closed channel")
+	}
+	if wresp.Err() != rpctypes.ErrCompacted {
+		t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
+	}
+	// ensure the channel is closed
+	if wresp, ok = <-wch; ok {
+		t.Fatalf("expected closed channel, but got %v", wresp)
+	}
+}
+
 // TestWatchCompactRevision ensures the CompactRevision error is given on a
 // compaction event ahead of a watcher.
 func TestWatchCompactRevision(t *testing.T) {

+ 2 - 1
clientv3/watch.go

@@ -440,7 +440,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
 				return
 			}
 			// resume up to last seen event if disconnected
-			if resuming {
+			if resuming && wr.Err() == nil {
 				resuming = false
 				// trim events already seen
 				for i := 0; i < len(wr.Events); i++ {
@@ -454,6 +454,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
 					break
 				}
 			}
+			resuming = false
 			// TODO don't keep buffering if subscriber stops reading
 			wrs = append(wrs, wr)
 		case resumeRev := <-ws.resumec:

+ 2 - 0
integration/cluster.go

@@ -318,6 +318,8 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
 	return
 }
 
+func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
+
 func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
 	possibleLead := make(map[uint64]bool)
 	var lead uint64