|
@@ -16,6 +16,7 @@ package mvcc
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"bytes"
|
|
"bytes"
|
|
|
|
|
+ "fmt"
|
|
|
"os"
|
|
"os"
|
|
|
"reflect"
|
|
"reflect"
|
|
|
"testing"
|
|
"testing"
|
|
@@ -23,6 +24,7 @@ import (
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/lease"
|
|
"github.com/coreos/etcd/lease"
|
|
|
"github.com/coreos/etcd/mvcc/backend"
|
|
"github.com/coreos/etcd/mvcc/backend"
|
|
|
|
|
+ "github.com/coreos/etcd/mvcc/mvccpb"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// TestWatcherWatchID tests that each watcher provides unique watchID,
|
|
// TestWatcherWatchID tests that each watcher provides unique watchID,
|
|
@@ -151,6 +153,43 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func TestWatchDeleteRange(t *testing.T) {
|
|
|
|
|
+ b, tmpPath := backend.NewDefaultTmpBackend()
|
|
|
|
|
+ s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
|
|
|
|
+
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ s.store.Close()
|
|
|
|
|
+ os.Remove(tmpPath)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ testKeyPrefix := []byte("foo")
|
|
|
|
|
+
|
|
|
|
|
+ for i := 0; i < 3; i++ {
|
|
|
|
|
+ s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ w := s.NewWatchStream()
|
|
|
|
|
+ from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
|
|
|
|
|
+ w.Watch(from, to, 0)
|
|
|
|
|
+
|
|
|
|
|
+ s.DeleteRange(from, to)
|
|
|
|
|
+
|
|
|
|
|
+ we := []mvccpb.Event{
|
|
|
|
|
+ {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
|
|
|
|
|
+ {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
|
|
|
|
|
+ {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case r := <-w.Chan():
|
|
|
|
|
+ if !reflect.DeepEqual(r.Events, we) {
|
|
|
|
|
+ t.Errorf("event = %v, want %v", r.Events, we)
|
|
|
|
|
+ }
|
|
|
|
|
+ case <-time.After(10 * time.Second):
|
|
|
|
|
+ t.Fatal("failed to receive event after 10 seconds!")
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
|
|
// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
|
|
|
// with given id inside watchStream.
|
|
// with given id inside watchStream.
|
|
|
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
|
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|