Browse Source

storage: change type of WatchIDs from int64 to WatchID

Anthony Romano 10 years ago
parent
commit
21a6ade53d

+ 5 - 3
etcdserver/api/v3rpc/watch.go

@@ -88,12 +88,12 @@ func (sws *serverWatchStream) recvLoop() error {
 			id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
 			sws.ctrlStream <- &pb.WatchResponse{
 				// TODO: fill in response header.
-				WatchId: id,
+				WatchId: int64(id),
 				Created: true,
 			}
 		case req.CancelRequest != nil:
 			id := req.CancelRequest.WatchId
-			err := sws.watchStream.Cancel(id)
+			err := sws.watchStream.Cancel(storage.WatchID(id))
 			if err == nil {
 				sws.ctrlStream <- &pb.WatchResponse{
 					// TODO: fill in response header.
@@ -125,7 +125,9 @@ func (sws *serverWatchStream) sendLoop() {
 				events[i] = &evs[i]
 			}
 
-			err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events})
+			err := sws.gRPCStream.Send(&pb.WatchResponse{
+				WatchId: int64(wresp.WatchID),
+				Events:  events})
 			storage.ReportEventReceived()
 			if err != nil {
 				return

+ 4 - 4
storage/watchable_store.go

@@ -33,7 +33,7 @@ const (
 )
 
 type watchable interface {
-	watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc)
+	watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
 }
 
 type watchableStore struct {
@@ -186,11 +186,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 	return &watchStream{
 		watchable: s,
 		ch:        make(chan WatchResponse, chanBufLen),
-		cancels:   make(map[int64]cancelFunc),
+		cancels:   make(map[WatchID]cancelFunc),
 	}
 }
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -431,7 +431,7 @@ type watcher struct {
 	// If cur is behind the current revision of the KV,
 	// watcher is unsynced and needs to catch up.
 	cur int64
-	id  int64
+	id  WatchID
 
 	// a chan to send out the watch response.
 	// The chan might be shared with other watchers.

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -60,7 +60,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	benchSampleN := b.N
 	watcherN := k * benchSampleN
 
-	watchIDs := make([]int64, watcherN)
+	watchIDs := make([]WatchID, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// non-0 value to keep watchers in unsynced
 		watchIDs[i] = w.Watch(testKey, true, 1)
@@ -98,7 +98,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	// put 1 million watchers on the same key
 	const watcherN = 1000000
 
-	watchIDs := make([]int64, watcherN)
+	watchIDs := make([]WatchID, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// 0 for startRev to keep watchers in synced
 		watchIDs[i] = w.Watch(testKey, true, 0)

+ 1 - 1
storage/watchable_store_test.go

@@ -99,7 +99,7 @@ func TestCancelUnsynced(t *testing.T) {
 	watcherN := 100
 
 	// create watcherN of watch ids to cancel
-	watchIDs := make([]int64, watcherN)
+	watchIDs := make([]WatchID, watcherN)
 	for i := 0; i < watcherN; i++ {
 		// use 1 to keep watchers in unsynced
 		watchIDs[i] = w.Watch(testKey, true, 1)

+ 10 - 8
storage/watcher.go

@@ -25,6 +25,8 @@ var (
 	ErrWatcherNotExist = errors.New("storage: watcher does not exist")
 )
 
+type WatchID int64
+
 type WatchStream interface {
 	// Watch creates a watcher. The watcher watches the events happening or
 	// happened on the given key or key prefix from the given startRev.
@@ -36,22 +38,22 @@ type WatchStream interface {
 	// The returned `id` is the ID of this watcher. It appears as WatchID
 	// in events that are sent to the created watcher through stream channel.
 	//
-	Watch(key []byte, prefix bool, startRev int64) int64
+	Watch(key []byte, prefix bool, startRev int64) WatchID
 
 	// Chan returns a chan. All watch response will be sent to the returned chan.
 	Chan() <-chan WatchResponse
 
 	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
 	// returned.
-	Cancel(id int64) error
+	Cancel(id WatchID) error
 
 	// Close closes the WatchChan and release all related resources.
 	Close()
 }
 
 type WatchResponse struct {
-	// WatchID is the ID of the watcher this response sent to.
-	WatchID int64
+	// WatchID is the WatchID of the watcher this response sent to.
+	WatchID WatchID
 	// Events contains all the events that needs to send.
 	Events []storagepb.Event
 }
@@ -64,13 +66,13 @@ type watchStream struct {
 
 	mu sync.Mutex // guards fields below it
 	// nextID is the ID pre-allocated for next new watcher in this stream
-	nextID  int64
+	nextID  WatchID
 	closed  bool
-	cancels map[int64]cancelFunc
+	cancels map[WatchID]cancelFunc
 }
 
 // TODO: return error if ws is closed?
-func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) int64 {
+func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) WatchID {
 	ws.mu.Lock()
 	defer ws.mu.Unlock()
 	if ws.closed {
@@ -90,7 +92,7 @@ func (ws *watchStream) Chan() <-chan WatchResponse {
 	return ws.ch
 }
 
-func (ws *watchStream) Cancel(id int64) error {
+func (ws *watchStream) Cancel(id WatchID) error {
 	cancel, ok := ws.cancels[id]
 	if !ok {
 		return ErrWatcherNotExist

+ 2 - 2
storage/watcher_test.go

@@ -25,7 +25,7 @@ func TestWatcherWatchID(t *testing.T) {
 	w := s.NewWatchStream()
 	defer w.Close()
 
-	idm := make(map[int64]struct{})
+	idm := make(map[WatchID]struct{})
 
 	for i := 0; i < 10; i++ {
 		id := w.Watch([]byte("foo"), false, 0)
@@ -79,7 +79,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	id := w.Watch([]byte("foo"), false, 0)
 
 	tests := []struct {
-		cancelID int64
+		cancelID WatchID
 		werr     error
 	}{
 		// no error should be returned when cancel the created watcher.