Browse Source

*: WatchResponse for multiple Events with WatchID

storage/storagepb: remove watchID from Event

storage: add WatchResponse to watcher.go to wrap events, watchID

storage: watchableStore to use WatchResponse

storage: kv_test with WatchResponse

etcdserver/api/v3rpc: watch to receive storage.WatchResponse type
Gyu-Ho Lee 10 years ago
parent
commit
6540f47dfa

+ 3 - 2
etcdserver/api/v3rpc/watch.go

@@ -111,7 +111,7 @@ func (sws *serverWatchStream) recvLoop() error {
 func (sws *serverWatchStream) sendLoop() {
 	for {
 		select {
-		case evs, ok := <-sws.watchStream.Chan():
+		case wresp, ok := <-sws.watchStream.Chan():
 			if !ok {
 				return
 			}
@@ -119,12 +119,13 @@ func (sws *serverWatchStream) sendLoop() {
 			// TODO: evs is []storagepb.Event type
 			// either return []*storagepb.Event from storage package
 			// or define protocol buffer with []storagepb.Event.
+			evs := wresp.Events
 			events := make([]*storagepb.Event, len(evs))
 			for i := range evs {
 				events[i] = &evs[i]
 			}
 
-			err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events})
+			err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events})
 			storage.ReportEventReceived()
 			if err != nil {
 				return

+ 20 - 12
storage/kv_test.go

@@ -740,7 +740,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 	s.Put([]byte("foo"), []byte("bar"))
 	select {
-	case evs := <-w.Chan():
+	case resp := <-w.Chan():
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
@@ -750,9 +750,11 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    1,
 				Version:        1,
 			},
-			WatchID: wid,
 		}
-		ev := evs[0]
+		if resp.WatchID != wid {
+			t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
+		}
+		ev := resp.Events[0]
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
 		}
@@ -762,7 +764,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 	s.Put([]byte("foo1"), []byte("bar1"))
 	select {
-	case evs := <-w.Chan():
+	case resp := <-w.Chan():
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
@@ -772,9 +774,11 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    2,
 				Version:        1,
 			},
-			WatchID: wid,
 		}
-		ev := evs[0]
+		if resp.WatchID != wid {
+			t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
+		}
+		ev := resp.Events[0]
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
 		}
@@ -789,7 +793,7 @@ func TestWatchableKVWatch(t *testing.T) {
 	defer cancel()
 
 	select {
-	case evs := <-w.Chan():
+	case resp := <-w.Chan():
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
@@ -799,9 +803,11 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    2,
 				Version:        1,
 			},
-			WatchID: wid,
 		}
-		ev := evs[0]
+		if resp.WatchID != wid {
+			t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
+		}
+		ev := resp.Events[0]
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
 		}
@@ -811,7 +817,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 	s.Put([]byte("foo1"), []byte("bar11"))
 	select {
-	case evs := <-w.Chan():
+	case resp := <-w.Chan():
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
@@ -821,9 +827,11 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    3,
 				Version:        2,
 			},
-			WatchID: wid,
 		}
-		ev := evs[0]
+		if resp.WatchID != wid {
+			t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
+		}
+		ev := resp.Events[0]
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
 		}

+ 0 - 26
storage/storagepb/kv.pb.go

@@ -72,8 +72,6 @@ type Event struct {
 	// a delete/expire event contains the previous
 	// key-value
 	Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
-	// watchID is the ID of watching this event is sent to.
-	WatchID int64 `protobuf:"varint,3,opt,name=watchID,proto3" json:"watchID,omitempty"`
 }
 
 func (m *Event) Reset()         { *m = Event{} }
@@ -167,11 +165,6 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
 		}
 		i += n1
 	}
-	if m.WatchID != 0 {
-		data[i] = 0x18
-		i++
-		i = encodeVarintKv(data, i, uint64(m.WatchID))
-	}
 	return i, nil
 }
 
@@ -242,9 +235,6 @@ func (m *Event) Size() (n int) {
 		l = m.Kv.Size()
 		n += 1 + l + sovKv(uint64(l))
 	}
-	if m.WatchID != 0 {
-		n += 1 + sovKv(uint64(m.WatchID))
-	}
 	return n
 }
 
@@ -485,22 +475,6 @@ func (m *Event) Unmarshal(data []byte) error {
 				return err
 			}
 			iNdEx = postIndex
-		case 3:
-			if wireType != 0 {
-				return fmt.Errorf("proto: wrong wireType = %d for field WatchID", wireType)
-			}
-			m.WatchID = 0
-			for shift := uint(0); ; shift += 7 {
-				if iNdEx >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[iNdEx]
-				iNdEx++
-				m.WatchID |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
 		default:
 			var sizeOfWire int
 			for {

+ 0 - 2
storage/storagepb/kv.proto

@@ -35,6 +35,4 @@ message Event {
   // a delete/expire event contains the previous
   // key-value
   KeyValue kv = 2;
-  // watchID is the ID of watching this event is sent to.
-  int64 watchID = 3;
 }

+ 9 - 8
storage/watchable_store.go

@@ -33,7 +33,7 @@ const (
 )
 
 type watchable interface {
-	watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc)
+	watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc)
 }
 
 type watchableStore struct {
@@ -181,12 +181,12 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 	watchStreamGauge.Inc()
 	return &watchStream{
 		watchable: s,
-		ch:        make(chan []storagepb.Event, chanBufLen),
+		ch:        make(chan WatchResponse, chanBufLen),
 		cancels:   make(map[int64]CancelFunc),
 	}
 }
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -340,8 +340,9 @@ func (s *watchableStore) syncWatchers() {
 	}
 
 	for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
+		wr := WatchResponse{WatchID: w.id, Events: es}
 		select {
-		case w.ch <- es:
+		case w.ch <- wr:
 			pendingEventsGauge.Add(float64(len(es)))
 		default:
 			// TODO: handle the full unsynced watchers.
@@ -374,8 +375,9 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 				continue
 			}
 			es := we[w]
+			wr := WatchResponse{WatchID: w.id, Events: es}
 			select {
-			case w.ch <- es:
+			case w.ch <- wr:
 				pendingEventsGauge.Add(float64(len(es)))
 			default:
 				// move slow watcher to unsynced
@@ -427,9 +429,9 @@ type watcher struct {
 	cur int64
 	id  int64
 
-	// a chan to send out the watched events.
+	// a chan to send out the watch response.
 	// The chan might be shared with other watchers.
-	ch chan<- []storagepb.Event
+	ch chan<- WatchResponse
 }
 
 // unsafeAddWatcher puts watcher with key k into watchableStore's synced.
@@ -475,7 +477,6 @@ func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.E
 				if !w.prefix && i != len(ev.Kv.Key) {
 					continue
 				}
-				ev.WatchID = w.id
 
 				if _, ok := watcherToEvents[w]; !ok {
 					watcherToEvents[w] = []storagepb.Event{}

+ 2 - 1
storage/watchable_store_test.go

@@ -186,7 +186,8 @@ func TestSyncWatchers(t *testing.T) {
 	if len(w.(*watchStream).ch) != watcherN {
 		t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
 	}
-	evs := <-w.(*watchStream).ch
+	wr := <-w.(*watchStream).ch
+	evs := wr.Events
 	if len(evs) != 1 {
 		t.Errorf("len(evs) got = %d, want = 1", len(evs))
 	}

+ 11 - 4
storage/watcher.go

@@ -39,8 +39,8 @@ type WatchStream interface {
 	// TODO: remove the returned CancelFunc. Always use Cancel.
 	Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
 
-	// Chan returns a chan. All watched events will be sent to the returned chan.
-	Chan() <-chan []storagepb.Event
+	// Chan returns a chan. All watch response will be sent to the returned chan.
+	Chan() <-chan WatchResponse
 
 	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
 	// returned.
@@ -50,11 +50,18 @@ type WatchStream interface {
 	Close()
 }
 
+type WatchResponse struct {
+	// WatchID is the ID of the watcher this response sent to.
+	WatchID int64
+	// Events contains all the events that needs to send.
+	Events []storagepb.Event
+}
+
 // watchStream contains a collection of watchers that share
 // one streaming chan to send out watched events and other control events.
 type watchStream struct {
 	watchable watchable
-	ch        chan []storagepb.Event
+	ch        chan WatchResponse
 
 	mu sync.Mutex // guards fields below it
 	// nextID is the ID pre-allocated for next new watcher in this stream
@@ -80,7 +87,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64,
 	return id, c
 }
 
-func (ws *watchStream) Chan() <-chan []storagepb.Event {
+func (ws *watchStream) Chan() <-chan WatchResponse {
 	return ws.ch
 }
 

+ 7 - 10
storage/watcher_test.go

@@ -36,12 +36,11 @@ func TestWatcherWatchID(t *testing.T) {
 
 		s.Put([]byte("foo"), []byte("bar"))
 
-		evs := <-w.Chan()
-		for j, ev := range evs {
-			if ev.WatchID != id {
-				t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
-			}
+		resp := <-w.Chan()
+		if resp.WatchID != id {
+			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
 		}
+
 		cancel()
 	}
 
@@ -55,11 +54,9 @@ func TestWatcherWatchID(t *testing.T) {
 		}
 		idm[id] = struct{}{}
 
-		evs := <-w.Chan()
-		for j, ev := range evs {
-			if ev.WatchID != id {
-				t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
-			}
+		resp := <-w.Chan()
+		if resp.WatchID != id {
+			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
 		}
 
 		cancel()