Browse Source

Merge pull request #3895 from yichengq/storage-watchid

storage: add watch ID to identify watchings
Xiang Li 10 years ago
parent
commit
d435d443bb

+ 9 - 2
storage/kv_test.go

@@ -735,7 +735,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 	w := s.NewWatcher()
 
-	cancel := w.Watch([]byte("foo"), true, 0)
+	wid, cancel := w.Watch([]byte("foo"), true, 0)
 	defer cancel()
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -750,6 +750,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    1,
 				Version:        1,
 			},
+			WatchID: wid,
 		}
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
@@ -770,6 +771,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    2,
 				Version:        1,
 			},
+			WatchID: wid,
 		}
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
@@ -778,7 +780,10 @@ func TestWatchableKVWatch(t *testing.T) {
 		t.Fatalf("failed to watch the event")
 	}
 
-	cancel = w.Watch([]byte("foo1"), false, 1)
+	w.Close()
+
+	w = s.NewWatcher()
+	wid, cancel = w.Watch([]byte("foo1"), false, 1)
 	defer cancel()
 
 	select {
@@ -792,6 +797,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    2,
 				Version:        1,
 			},
+			WatchID: wid,
 		}
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)
@@ -812,6 +818,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				ModRevision:    3,
 				Version:        2,
 			},
+			WatchID: wid,
 		}
 		if !reflect.DeepEqual(ev, wev) {
 			t.Errorf("watched event = %+v, want %+v", ev, wev)

+ 26 - 1
storage/storagepb/kv.pb.go

@@ -68,7 +68,8 @@ type Event struct {
 	// a put event contains the current key-value
 	// a delete/expire event contains the previous
 	// key-value
-	Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
+	Kv      *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
+	WatchID int64     `protobuf:"varint,3,opt,name=watchID,proto3" json:"watchID,omitempty"`
 }
 
 func (m *Event) Reset()         { *m = Event{} }
@@ -157,6 +158,11 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
 		}
 		i += n1
 	}
+	if m.WatchID != 0 {
+		data[i] = 0x18
+		i++
+		i = encodeVarintKv(data, i, uint64(m.WatchID))
+	}
 	return i, nil
 }
 
@@ -224,6 +230,9 @@ func (m *Event) Size() (n int) {
 		l = m.Kv.Size()
 		n += 1 + l + sovKv(uint64(l))
 	}
+	if m.WatchID != 0 {
+		n += 1 + sovKv(uint64(m.WatchID))
+	}
 	return n
 }
 
@@ -448,6 +457,22 @@ func (m *Event) Unmarshal(data []byte) error {
 				return err
 			}
 			iNdEx = postIndex
+		case 3:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field WatchID", wireType)
+			}
+			m.WatchID = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.WatchID |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		default:
 			var sizeOfWire int
 			for {

+ 2 - 1
storage/storagepb/kv.proto

@@ -32,5 +32,6 @@ message Event {
   // a delete/expire event contains the previous
   // key-value
   KeyValue kv = 2;
+  // watchID is the ID of watching this event is sent to.
+  int64 watchID = 3;
 }
-

+ 8 - 4
storage/watchable_store.go

@@ -32,7 +32,7 @@ const (
 )
 
 type watchable interface {
-	watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
+	watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
 }
 
 type watchableStore struct {
@@ -173,7 +173,7 @@ func (s *watchableStore) NewWatcher() Watcher {
 	}
 }
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -181,6 +181,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
 		key:    key,
 		prefix: prefix,
 		cur:    startRev,
+		id:     id,
 		ch:     ch,
 	}
 
@@ -273,8 +274,9 @@ func (s *watchableStore) syncWatchings() {
 			}
 
 			w.ch <- storagepb.Event{
-				Type: evt,
-				Kv:   &kv,
+				Type:    evt,
+				Kv:      &kv,
+				WatchID: w.id,
 			}
 			pendingEventsGauge.Inc()
 		}
@@ -311,6 +313,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
 				if !w.prefix && i != len(ev.Kv.Key) {
 					continue
 				}
+				ev.WatchID = w.id
 				select {
 				case w.ch <- ev:
 					pendingEventsGauge.Inc()
@@ -362,6 +365,7 @@ type watching struct {
 	// If cur is behind the current revision of the KV,
 	// watching is unsynced and needs to catch up.
 	cur int64
+	id  int64
 
 	// a chan to send out the watched events.
 	// The chan might be shared with other watchings.

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -65,7 +65,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	cancels := make([]CancelFunc, watcherSize)
 	for i := 0; i < watcherSize; i++ {
 		// non-0 value to keep watchers in unsynced
-		cancel := w.Watch(testKey, true, 1)
+		_, cancel := w.Watch(testKey, true, 1)
 		cancels[i] = cancel
 	}
 
@@ -102,7 +102,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	cancels := make([]CancelFunc, watcherSize)
 	for i := 0; i < watcherSize; i++ {
 		// 0 for startRev to keep watchers in synced
-		cancel := w.Watch(testKey, true, 0)
+		_, cancel := w.Watch(testKey, true, 0)
 		cancels[i] = cancel
 	}
 

+ 1 - 1
storage/watchable_store_test.go

@@ -49,7 +49,7 @@ func TestNewWatcherCancel(t *testing.T) {
 	s.Put(testKey, testValue)
 
 	w := s.NewWatcher()
-	cancel := w.Watch(testKey, true, 0)
+	_, cancel := w.Watch(testKey, true, 0)
 
 	cancel()
 

+ 13 - 5
storage/watcher.go

@@ -26,7 +26,9 @@ type Watcher interface {
 	// The whole event history can be watched unless compacted.
 	// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
 	// If `startRev` <=0, watch observes events after currentRev.
-	Watch(key []byte, prefix bool, startRev int64) CancelFunc
+	// The returned `id` is the ID of this watching. It appears as WatchID
+	// in events that are sent to this watching.
+	Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
 
 	// Chan returns a chan. All watched events will be sent to the returned chan.
 	Chan() <-chan storagepb.Event
@@ -42,21 +44,27 @@ type watcher struct {
 	ch        chan storagepb.Event
 
 	mu      sync.Mutex // guards fields below it
+	nextID  int64      // nextID is the ID allocated for next new watching
 	closed  bool
 	cancels []CancelFunc
 }
 
 // TODO: return error if ws is closed?
-func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc {
-	_, c := ws.watchable.watch(key, prefix, startRev, ws.ch)
+func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
 	ws.mu.Lock()
 	defer ws.mu.Unlock()
 	if ws.closed {
-		return nil
+		return -1, nil
 	}
+
+	id = ws.nextID
+	ws.nextID++
+
+	_, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch)
+
 	// TODO: cancelFunc needs to be removed from the cancels when it is called.
 	ws.cancels = append(ws.cancels, c)
-	return c
+	return id, c
 }
 
 func (ws *watcher) Chan() <-chan storagepb.Event {

+ 64 - 0
storage/watcher_test.go

@@ -0,0 +1,64 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import "testing"
+
+// TestWatcherWatchID tests that each watcher provides unique watch ID,
+// and the watched event attaches the correct watch ID.
+func TestWatcherWatchID(t *testing.T) {
+	s := WatchableKV(newWatchableStore(tmpPath))
+	defer cleanup(s, tmpPath)
+
+	w := s.NewWatcher()
+	defer w.Close()
+
+	idm := make(map[int64]struct{})
+
+	// synced watchings
+	for i := 0; i < 10; i++ {
+		id, cancel := w.Watch([]byte("foo"), false, 0)
+		if _, ok := idm[id]; ok {
+			t.Errorf("#%d: id %d exists", i, id)
+		}
+		idm[id] = struct{}{}
+
+		s.Put([]byte("foo"), []byte("bar"))
+
+		ev := <-w.Chan()
+		if ev.WatchID != id {
+			t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
+		}
+
+		cancel()
+	}
+
+	s.Put([]byte("foo2"), []byte("bar"))
+	// unsynced watchings
+	for i := 10; i < 20; i++ {
+		id, cancel := w.Watch([]byte("foo2"), false, 1)
+		if _, ok := idm[id]; ok {
+			t.Errorf("#%d: id %d exists", i, id)
+		}
+		idm[id] = struct{}{}
+
+		ev := <-w.Chan()
+		if ev.WatchID != id {
+			t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
+		}
+
+		cancel()
+	}
+}