Browse Source

*: remove CancelFunc return for Watch. Use Cancel for watch.

Gyu-Ho Lee 10 years ago
parent
commit
556d4a6932

+ 1 - 1
etcdserver/api/v3rpc/watch.go

@@ -85,7 +85,7 @@ func (sws *serverWatchStream) recvLoop() error {
 				toWatch = creq.Prefix
 				prefix = true
 			}
-			id, _ := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
+			id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
 			sws.ctrlStream <- &pb.WatchResponse{
 				// TODO: fill in response header.
 				WatchId: id,

+ 0 - 4
storage/kv.go

@@ -19,10 +19,6 @@ import (
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
-// CancelFunc tells an operation to abandon its work. A CancelFunc does not
-// wait for the work to stop.
-type CancelFunc func()
-
 type Snapshot backend.Snapshot
 
 type LeaseID int64

+ 3 - 6
storage/kv_test.go

@@ -734,9 +734,9 @@ func TestWatchableKVWatch(t *testing.T) {
 	defer cleanup(s, tmpPath)
 
 	w := s.NewWatchStream()
+	defer w.Close()
 
-	wid, cancel := w.Watch([]byte("foo"), true, 0)
-	defer cancel()
+	wid := w.Watch([]byte("foo"), true, 0)
 
 	s.Put([]byte("foo"), []byte("bar"), 1)
 	select {
@@ -788,11 +788,8 @@ func TestWatchableKVWatch(t *testing.T) {
 		t.Fatalf("failed to watch the event")
 	}
 
-	w.Close()
-
 	w = s.NewWatchStream()
-	wid, cancel = w.Watch([]byte("foo1"), false, 1)
-	defer cancel()
+	wid = w.Watch([]byte("foo1"), false, 1)
 
 	select {
 	case resp := <-w.Chan():

+ 8 - 4
storage/watchable_store.go

@@ -33,7 +33,7 @@ const (
 )
 
 type watchable interface {
-	watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc)
+	watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc)
 }
 
 type watchableStore struct {
@@ -53,6 +53,10 @@ type watchableStore struct {
 	wg    sync.WaitGroup
 }
 
+// cancelFunc updates unsynced and synced maps when running
+// cancel operations.
+type cancelFunc func()
+
 func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
 		store:    newDefaultStore(path),
@@ -182,11 +186,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 	return &watchStream{
 		watchable: s,
 		ch:        make(chan WatchResponse, chanBufLen),
-		cancels:   make(map[int64]CancelFunc),
+		cancels:   make(map[int64]cancelFunc),
 	}
 }
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -209,7 +213,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
 	}
 	watcherGauge.Inc()
 
-	cancel := CancelFunc(func() {
+	cancel := cancelFunc(func() {
 		s.mu.Lock()
 		defer s.mu.Unlock()
 		// remove global references of the watcher

+ 10 - 8
storage/watchable_store_bench_test.go

@@ -60,11 +60,10 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	benchSampleN := b.N
 	watcherN := k * benchSampleN
 
-	cancels := make([]CancelFunc, watcherN)
+	watchIDs := make([]int64, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// non-0 value to keep watchers in unsynced
-		_, cancel := w.Watch(testKey, true, 1)
-		cancels[i] = cancel
+		watchIDs[i] = w.Watch(testKey, true, 1)
 	}
 
 	// random-cancel N watchers to make it not biased towards
@@ -76,7 +75,9 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 	// cancel N watchers
 	for _, idx := range ix[:benchSampleN] {
-		cancels[idx]()
+		if err := w.Cancel(watchIDs[idx]); err != nil {
+			b.Error(err)
+		}
 	}
 }
 
@@ -97,11 +98,10 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	// put 1 million watchers on the same key
 	const watcherN = 1000000
 
-	cancels := make([]CancelFunc, watcherN)
+	watchIDs := make([]int64, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// 0 for startRev to keep watchers in synced
-		_, cancel := w.Watch(testKey, true, 0)
-		cancels[i] = cancel
+		watchIDs[i] = w.Watch(testKey, true, 0)
 	}
 
 	// randomly cancel watchers to make it not biased towards
@@ -112,6 +112,8 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	b.ReportAllocs()
 
 	for _, idx := range ix {
-		cancels[idx]()
+		if err := w.Cancel(watchIDs[idx]); err != nil {
+			b.Error(err)
+		}
 	}
 }

+ 11 - 9
storage/watchable_store_test.go

@@ -53,9 +53,11 @@ func TestNewWatcherCancel(t *testing.T) {
 	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
-	_, cancel := w.Watch(testKey, true, 0)
+	wt := w.Watch(testKey, true, 0)
 
-	cancel()
+	if err := w.Cancel(wt); err != nil {
+		t.Error(err)
+	}
 
 	if _, ok := s.synced[string(testKey)]; ok {
 		// the key shoud have been deleted
@@ -96,17 +98,17 @@ func TestCancelUnsynced(t *testing.T) {
 	// arbitrary number for watchers
 	watcherN := 100
 
-	// create watcherN of CancelFunc of
-	// synced and unsynced
-	cancels := make([]CancelFunc, watcherN)
+	// create watcherN of watch ids to cancel
+	watchIDs := make([]int64, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// use 1 to keep watchers in unsynced
-		_, cancel := w.Watch(testKey, true, 1)
-		cancels[i] = cancel
+		watchIDs[i] = w.Watch(testKey, true, 1)
 	}
 
-	for idx := range cancels {
-		cancels[idx]()
+	for _, idx := range watchIDs {
+		if err := w.Cancel(idx); err != nil {
+			t.Error(err)
+		}
 	}
 
 	// After running CancelFunc

+ 6 - 7
storage/watcher.go

@@ -36,8 +36,7 @@ type WatchStream interface {
 	// The returned `id` is the ID of this watcher. It appears as WatchID
 	// in events that are sent to the created watcher through stream channel.
 	//
-	// TODO: remove the returned CancelFunc. Always use Cancel.
-	Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
+	Watch(key []byte, prefix bool, startRev int64) int64
 
 	// Chan returns a chan. All watch response will be sent to the returned chan.
 	Chan() <-chan WatchResponse
@@ -67,24 +66,24 @@ type watchStream struct {
 	// nextID is the ID pre-allocated for next new watcher in this stream
 	nextID  int64
 	closed  bool
-	cancels map[int64]CancelFunc
+	cancels map[int64]cancelFunc
 }
 
 // TODO: return error if ws is closed?
-func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
+func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) int64 {
 	ws.mu.Lock()
 	defer ws.mu.Unlock()
 	if ws.closed {
-		return -1, nil
+		return -1
 	}
 
-	id = ws.nextID
+	id := ws.nextID
 	ws.nextID++
 
 	_, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch)
 
 	ws.cancels[id] = c
-	return id, c
+	return id
 }
 
 func (ws *watchStream) Chan() <-chan WatchResponse {

+ 9 - 5
storage/watcher_test.go

@@ -28,7 +28,7 @@ func TestWatcherWatchID(t *testing.T) {
 	idm := make(map[int64]struct{})
 
 	for i := 0; i < 10; i++ {
-		id, cancel := w.Watch([]byte("foo"), false, 0)
+		id := w.Watch([]byte("foo"), false, 0)
 		if _, ok := idm[id]; ok {
 			t.Errorf("#%d: id %d exists", i, id)
 		}
@@ -41,14 +41,16 @@ func TestWatcherWatchID(t *testing.T) {
 			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
 		}
 
-		cancel()
+		if err := w.Cancel(id); err != nil {
+			t.Error(err)
+		}
 	}
 
 	s.Put([]byte("foo2"), []byte("bar"), NoLease)
 
 	// unsynced watchers
 	for i := 10; i < 20; i++ {
-		id, cancel := w.Watch([]byte("foo2"), false, 1)
+		id := w.Watch([]byte("foo2"), false, 1)
 		if _, ok := idm[id]; ok {
 			t.Errorf("#%d: id %d exists", i, id)
 		}
@@ -59,7 +61,9 @@ func TestWatcherWatchID(t *testing.T) {
 			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
 		}
 
-		cancel()
+		if err := w.Cancel(id); err != nil {
+			t.Error(err)
+		}
 	}
 }
 
@@ -72,7 +76,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	w := s.NewWatchStream()
 	defer w.Close()
 
-	id, _ := w.Watch([]byte("foo"), false, 0)
+	id := w.Watch([]byte("foo"), false, 0)
 
 	tests := []struct {
 		cancelID int64