Browse Source

mvcc: fix watch deleteRange

Xiang Li 9 years ago
parent
commit
3ddcc21179
2 changed files with 41 additions and 2 deletions
  1. 2 2
      mvcc/watchable_store.go
  2. 39 0
      mvcc/watcher_test.go

+ 2 - 2
mvcc/watchable_store.go

@@ -112,10 +112,10 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 	}
 
 	evs := make([]mvccpb.Event, n)
-	for i, change := range changes {
+	for i := range changes {
 		evs[i] = mvccpb.Event{
 			Type: mvccpb.DELETE,
-			Kv:   &change}
+			Kv:   &changes[i]}
 		evs[i].Kv.ModRevision = rev
 	}
 	s.notify(rev, evs)

+ 39 - 0
mvcc/watcher_test.go

@@ -16,6 +16,7 @@ package mvcc
 
 import (
 	"bytes"
+	"fmt"
 	"os"
 	"reflect"
 	"testing"
@@ -23,6 +24,7 @@ import (
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/mvcc/mvccpb"
 )
 
 // TestWatcherWatchID tests that each watcher provides unique watchID,
@@ -151,6 +153,43 @@ func TestWatcherWatchPrefix(t *testing.T) {
 	}
 }
 
+func TestWatchDeleteRange(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+
+	testKeyPrefix := []byte("foo")
+
+	for i := 0; i < 3; i++ {
+		s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
+	}
+
+	w := s.NewWatchStream()
+	from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
+	w.Watch(from, to, 0)
+
+	s.DeleteRange(from, to)
+
+	we := []mvccpb.Event{
+		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
+		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
+		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
+	}
+
+	select {
+	case r := <-w.Chan():
+		if !reflect.DeepEqual(r.Events, we) {
+			t.Errorf("event = %v, want %v", r.Events, we)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("failed to receive event after 10 seconds!")
+	}
+}
+
 // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
 // with given id inside watchStream.
 func TestWatchStreamCancelWatcherByID(t *testing.T) {