Browse Source

Merge pull request #4117 from xiang90/rm_watching

storage: rename watching -> watcher
Xiang Li 10 years ago
parent
commit
4fa0cd5765

+ 8 - 8
storage/metrics.go

@@ -67,20 +67,20 @@ var (
 			Help:      "Total number of watch streams.",
 		})
 
-	watchingGauge = prometheus.NewGauge(
+	watcherGauge = prometheus.NewGauge(
 		prometheus.GaugeOpts{
 			Namespace: "etcd",
 			Subsystem: "storage",
-			Name:      "watching_total",
-			Help:      "Total number of watchings.",
+			Name:      "watcher_total",
+			Help:      "Total number of watchers.",
 		})
 
-	slowWatchingGauge = prometheus.NewGauge(
+	slowWatcherGauge = prometheus.NewGauge(
 		prometheus.GaugeOpts{
 			Namespace: "etcd",
 			Subsystem: "storage",
-			Name:      "slow_watching_total",
-			Help:      "Total number of unsynced slow watchings.",
+			Name:      "slow_watcher_total",
+			Help:      "Total number of unsynced slow watchers.",
 		})
 
 	totalEventsCounter = prometheus.NewCounter(
@@ -144,8 +144,8 @@ func init() {
 	prometheus.MustRegister(txnCounter)
 	prometheus.MustRegister(keysGauge)
 	prometheus.MustRegister(watchStreamGauge)
-	prometheus.MustRegister(watchingGauge)
-	prometheus.MustRegister(slowWatchingGauge)
+	prometheus.MustRegister(watcherGauge)
+	prometheus.MustRegister(slowWatcherGauge)
 	prometheus.MustRegister(totalEventsCounter)
 	prometheus.MustRegister(pendingEventsGauge)
 	prometheus.MustRegister(indexCompactionPauseDurations)

+ 65 - 65
storage/watchable_store.go

@@ -33,7 +33,7 @@ const (
 )
 
 type watchable interface {
-	watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc)
+	watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc)
 }
 
 type watchableStore struct {
@@ -41,12 +41,12 @@ type watchableStore struct {
 
 	*store
 
-	// contains all unsynced watching that needs to sync events that have happened
-	unsynced map[*watching]struct{}
+	// contains all unsynced watchers that needs to sync with events that have happened
+	unsynced map[*watcher]struct{}
 
-	// contains all synced watching that are tracking the events that will happen
-	// The key of the map is the key that the watching is watching on.
-	synced map[string]map[*watching]struct{}
+	// contains all synced watchers that are in sync with the progress of the store.
+	// The key of the map is the key that the watcher watches on.
+	synced map[string]map[*watcher]struct{}
 	tx     *ongoingTx
 
 	stopc chan struct{}
@@ -56,12 +56,12 @@ type watchableStore struct {
 func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
 		store:    newDefaultStore(path),
-		unsynced: make(map[*watching]struct{}),
-		synced:   make(map[string]map[*watching]struct{}),
+		unsynced: make(map[*watcher]struct{}),
+		synced:   make(map[string]map[*watcher]struct{}),
 		stopc:    make(chan struct{}),
 	}
 	s.wg.Add(1)
-	go s.syncWatchingsLoop()
+	go s.syncWatchersLoop()
 	return s
 }
 
@@ -185,11 +185,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 	}
 }
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
-	wa := &watching{
+	wa := &watcher{
 		key:    key,
 		prefix: prefix,
 		cur:    startRev,
@@ -199,23 +199,23 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
 
 	k := string(key)
 	if startRev == 0 {
-		if err := unsafeAddWatching(&s.synced, k, wa); err != nil {
-			log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
+		if err := unsafeAddWatcher(&s.synced, k, wa); err != nil {
+			log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
 		}
 	} else {
-		slowWatchingGauge.Inc()
+		slowWatcherGauge.Inc()
 		s.unsynced[wa] = struct{}{}
 	}
-	watchingGauge.Inc()
+	watcherGauge.Inc()
 
 	cancel := CancelFunc(func() {
 		s.mu.Lock()
 		defer s.mu.Unlock()
-		// remove global references of the watching
+		// remove global references of the watcher
 		if _, ok := s.unsynced[wa]; ok {
 			delete(s.unsynced, wa)
-			slowWatchingGauge.Dec()
-			watchingGauge.Dec()
+			slowWatcherGauge.Dec()
+			watcherGauge.Dec()
 			return
 		}
 
@@ -227,7 +227,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
 				if len(v) == 0 {
 					delete(s.synced, k)
 				}
-				watchingGauge.Dec()
+				watcherGauge.Dec()
 			}
 		}
 		// If we cannot find it, it should have finished watch.
@@ -236,13 +236,13 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
 	return wa, cancel
 }
 
-// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms.
-func (s *watchableStore) syncWatchingsLoop() {
+// syncWatchersLoop syncs the watcher in the unsyncd map every 100ms.
+func (s *watchableStore) syncWatchersLoop() {
 	defer s.wg.Done()
 
 	for {
 		s.mu.Lock()
-		s.syncWatchings()
+		s.syncWatchers()
 		s.mu.Unlock()
 
 		select {
@@ -253,12 +253,12 @@ func (s *watchableStore) syncWatchingsLoop() {
 	}
 }
 
-// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced
-// watchings to get the minimum revision within its range, skipping the
-// watching if its current revision is behind the compact revision of the
+// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced
+// watchers to get the minimum revision within its range, skipping the
+// watcher if its current revision is behind the compact revision of the
 // store. And use this minimum revision to get all key-value pairs. Then send
-// those events to watchings.
-func (s *watchableStore) syncWatchings() {
+// those events to watchers.
+func (s *watchableStore) syncWatchers() {
 	s.store.mu.Lock()
 	defer s.store.mu.Unlock()
 
@@ -266,7 +266,7 @@ func (s *watchableStore) syncWatchings() {
 		return
 	}
 
-	// in order to find key-value pairs from unsynced watchings, we need to
+	// in order to find key-value pairs from unsynced watchers, we need to
 	// find min revision index, and these revisions can be used to
 	// query the backend store of key-value pairs
 	minRev := int64(math.MaxInt64)
@@ -275,17 +275,17 @@ func (s *watchableStore) syncWatchings() {
 	compactionRev := s.store.compactMainRev
 
 	// TODO: change unsynced struct type same to this
-	keyToUnsynced := make(map[string]map[*watching]struct{})
+	keyToUnsynced := make(map[string]map[*watcher]struct{})
 
 	for w := range s.unsynced {
 		k := string(w.key)
 
 		if w.cur > curRev {
-			panic("watching current revision should not exceed current revision")
+			panic("watcher current revision should not exceed current revision")
 		}
 
 		if w.cur < compactionRev {
-			// TODO: return error compacted to that watching instead of
+			// TODO: return error compacted to that watcher instead of
 			// just removing it sliently from unsynced.
 			delete(s.unsynced, w)
 			continue
@@ -296,7 +296,7 @@ func (s *watchableStore) syncWatchings() {
 		}
 
 		if _, ok := keyToUnsynced[k]; !ok {
-			keyToUnsynced[k] = make(map[*watching]struct{})
+			keyToUnsynced[k] = make(map[*watcher]struct{})
 		}
 		keyToUnsynced[k][w] = struct{}{}
 	}
@@ -338,35 +338,35 @@ func (s *watchableStore) syncWatchings() {
 		evs = append(evs, ev)
 	}
 
-	for w, es := range newWatchingToEventMap(keyToUnsynced, evs) {
+	for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
 		select {
 		case w.ch <- es:
 			pendingEventsGauge.Add(float64(len(es)))
 		default:
-			// TODO: handle the full unsynced watchings.
-			// continue to process other watchings for now, the full ones
+			// TODO: handle the full unsynced watchers.
+			// continue to process other watchers for now, the full ones
 			// will be processed next time and hopefully it will not be full.
 			continue
 		}
 		k := string(w.key)
-		if err := unsafeAddWatching(&s.synced, k, w); err != nil {
-			log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
+		if err := unsafeAddWatcher(&s.synced, k, w); err != nil {
+			log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
 		}
 		delete(s.unsynced, w)
 	}
 
-	slowWatchingGauge.Set(float64(len(s.unsynced)))
+	slowWatcherGauge.Set(float64(len(s.unsynced)))
 }
 
-// handle handles the change of the happening event on all watchings.
+// handle handles the change of the happening event on all watchers.
 func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
 	s.notify(rev, evs)
 }
 
 // notify notifies the fact that given event at the given rev just happened to
-// watchings that watch on the key of the event.
+// watchers that watch on the key of the event.
 func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
-	we := newWatchingToEventMap(s.synced, evs)
+	we := newWatcherToEventMap(s.synced, evs)
 	for _, wm := range s.synced {
 		for w := range wm {
 			if _, ok := we[w]; !ok {
@@ -377,11 +377,11 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 			case w.ch <- es:
 				pendingEventsGauge.Add(float64(len(es)))
 			default:
-				// move slow watching to unsynced
+				// move slow watcher to unsynced
 				w.cur = rev
 				s.unsynced[w] = struct{}{}
 				delete(wm, w)
-				slowWatchingGauge.Inc()
+				slowWatcherGauge.Inc()
 			}
 		}
 	}
@@ -414,52 +414,52 @@ func (tx *ongoingTx) del(k string) {
 	}
 }
 
-type watching struct {
-	// the watching key
+type watcher struct {
+	// the watcher key
 	key []byte
-	// prefix indicates if watching is on a key or a prefix.
-	// If prefix is true, the watching is on a prefix.
+	// prefix indicates if watcher is on a key or a prefix.
+	// If prefix is true, the watcher is on a prefix.
 	prefix bool
-	// cur is the current watching revision.
+	// cur is the current watcher revision.
 	// If cur is behind the current revision of the KV,
-	// watching is unsynced and needs to catch up.
+	// watcher is unsynced and needs to catch up.
 	cur int64
 	id  int64
 
 	// a chan to send out the watched events.
-	// The chan might be shared with other watchings.
+	// The chan might be shared with other watchers.
 	ch chan<- []storagepb.Event
 }
 
-// unsafeAddWatching puts watching with key k into watchableStore's synced.
+// unsafeAddWatcher puts watcher with key k into watchableStore's synced.
 // Make sure to this is thread-safe using mutex before and after.
-func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error {
+func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error {
 	if wa == nil {
-		return fmt.Errorf("nil watching received")
+		return fmt.Errorf("nil watcher received")
 	}
 	mp := *synced
 	if v, ok := mp[k]; ok {
 		if _, ok := v[wa]; ok {
-			return fmt.Errorf("put the same watch twice: %+v", wa)
+			return fmt.Errorf("put the same watcher twice: %+v", wa)
 		} else {
 			v[wa] = struct{}{}
 		}
 		return nil
 	}
 
-	mp[k] = make(map[*watching]struct{})
+	mp[k] = make(map[*watcher]struct{})
 	mp[k][wa] = struct{}{}
 	return nil
 }
 
-// newWatchingToEventMap creates a map that has watching as key and events as
-// value. It enables quick events look up by watching.
-func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event {
-	watchingToEvents := make(map[*watching][]storagepb.Event)
+// newWatcherToEventMap creates a map that has watcher as key and events as
+// value. It enables quick events look up by watcher.
+func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event {
+	watcherToEvents := make(map[*watcher][]storagepb.Event)
 	for _, ev := range evs {
 		key := string(ev.Kv.Key)
 
-		// check all prefixes of the key to notify all corresponded watchings
+		// check all prefixes of the key to notify all corresponded watchers
 		for i := 0; i <= len(key); i++ {
 			k := string(key[:i])
 
@@ -469,20 +469,20 @@ func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb
 			}
 
 			for w := range wm {
-				// the watching needs to be notified when either it watches prefix or
+				// the watcher needs to be notified when either it watches prefix or
 				// the key is exactly matched.
 				if !w.prefix && i != len(ev.Kv.Key) {
 					continue
 				}
 				ev.WatchID = w.id
 
-				if _, ok := watchingToEvents[w]; !ok {
-					watchingToEvents[w] = []storagepb.Event{}
+				if _, ok := watcherToEvents[w]; !ok {
+					watcherToEvents[w] = []storagepb.Event{}
 				}
-				watchingToEvents[w] = append(watchingToEvents[w], ev)
+				watcherToEvents[w] = append(watcherToEvents[w], ev)
 			}
 		}
 	}
 
-	return watchingToEvents
+	return watcherToEvents
 }

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -34,11 +34,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	// in unsynced for this benchmark.
 	s := &watchableStore{
 		store:    newDefaultStore(tmpPath),
-		unsynced: make(map[*watching]struct{}),
+		unsynced: make(map[*watcher]struct{}),
 
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
-		synced: make(map[string]map[*watching]struct{}),
+		synced: make(map[string]map[*watcher]struct{}),
 	}
 
 	defer func() {

+ 49 - 50
storage/watchable_store_test.go

@@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) {
 	}
 }
 
-// TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced.
+// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
 func TestCancelUnsynced(t *testing.T) {
 	// manually create watchableStore instead of newWatchableStore
 	// because newWatchableStore automatically calls syncWatchers
@@ -71,11 +71,11 @@ func TestCancelUnsynced(t *testing.T) {
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
 		store:    newDefaultStore(tmpPath),
-		unsynced: make(map[*watching]struct{}),
+		unsynced: make(map[*watcher]struct{}),
 
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
-		synced: make(map[string]map[*watching]struct{}),
+		synced: make(map[string]map[*watcher]struct{}),
 	}
 
 	defer func() {
@@ -112,21 +112,20 @@ func TestCancelUnsynced(t *testing.T) {
 	// After running CancelFunc
 	//
 	// unsynced should be empty
-	// because cancel removes watching from unsynced
+	// because cancel removes watcher from unsynced
 	if len(s.unsynced) != 0 {
 		t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
 	}
 }
 
-// TestSyncWatchings populates unsynced watching map and
-// tests syncWatchings method to see if it correctly sends
-// events to channel of unsynced watchings and moves these
-// watchings to synced.
-func TestSyncWatchings(t *testing.T) {
+// TestSyncWatchers populates unsynced watcher map and tests syncWatchers
+// method to see if it correctly sends events to channel of unsynced watchers
+// and moves these watchers to synced.
+func TestSyncWatchers(t *testing.T) {
 	s := &watchableStore{
 		store:    newDefaultStore(tmpPath),
-		unsynced: make(map[*watching]struct{}),
-		synced:   make(map[string]map[*watching]struct{}),
+		unsynced: make(map[*watcher]struct{}),
+		synced:   make(map[string]map[*watcher]struct{}),
 	}
 
 	defer func() {
@@ -148,7 +147,7 @@ func TestSyncWatchings(t *testing.T) {
 		w.Watch(testKey, true, 1)
 	}
 
-	// Before running s.syncWatchings()
+	// Before running s.syncWatchers()
 	//
 	// synced should be empty
 	// because we manually populate unsynced only
@@ -161,27 +160,27 @@ func TestSyncWatchings(t *testing.T) {
 		t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
 	}
 
-	// this should move all unsynced watchings
+	// this should move all unsynced watchers
 	// to synced ones
-	s.syncWatchings()
+	s.syncWatchers()
 
-	// After running s.syncWatchings()
+	// After running s.syncWatchers()
 	//
 	// synced should not be empty
-	// because syncWatchings populates synced
+	// because syncwatchers populates synced
 	// in this test case
 	if len(s.synced[string(testKey)]) == 0 {
 		t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
 	}
 	// unsynced should be empty
-	// because syncWatchings is expected to move
-	// all watchings from unsynced to synced
+	// because syncwatchers is expected to move
+	// all watchers from unsynced to synced
 	// in this test case
 	if len(s.unsynced) != 0 {
 		t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
 	}
 
-	// All of the watchings actually share one channel
+	// All of the watchers actually share one channel
 	// so we only need to check one shared channel
 	// (See watcher.go for more detail).
 	if len(w.(*watchStream).ch) != watcherN {
@@ -202,7 +201,7 @@ func TestSyncWatchings(t *testing.T) {
 	}
 }
 
-func TestUnsafeAddWatching(t *testing.T) {
+func TestUnsafeAddWatcher(t *testing.T) {
 	s := newWatchableStore(tmpPath)
 	defer func() {
 		s.store.Close()
@@ -213,18 +212,18 @@ func TestUnsafeAddWatching(t *testing.T) {
 	s.Put(testKey, testValue)
 
 	size := 10
-	ws := make([]*watching, size)
+	ws := make([]*watcher, size)
 	for i := 0; i < size; i++ {
-		ws[i] = &watching{
+		ws[i] = &watcher{
 			key:    testKey,
 			prefix: true,
 			cur:    0,
 		}
 	}
-	// to test if unsafeAddWatching is correctly updating
-	// synced map when adding new watching.
+	// to test if unsafeAddWatcher is correctly updating
+	// synced map when adding new watcher.
 	for i, wa := range ws {
-		if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil {
+		if err := unsafeAddWatcher(&s.synced, string(testKey), wa); err != nil {
 			t.Errorf("#%d: error = %v, want nil", i, err)
 		}
 		if v, ok := s.synced[string(testKey)]; !ok {
@@ -240,11 +239,11 @@ func TestUnsafeAddWatching(t *testing.T) {
 	}
 }
 
-func TestNewMapWatchingToEventMap(t *testing.T) {
+func TestNewMapwatcherToEventMap(t *testing.T) {
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
 
-	ws := []*watching{{key: k0}, {key: k1}, {key: k2}}
+	ws := []*watcher{{key: k0}, {key: k1}, {key: k2}}
 
 	evs := []storagepb.Event{
 		{
@@ -262,63 +261,63 @@ func TestNewMapWatchingToEventMap(t *testing.T) {
 	}
 
 	tests := []struct {
-		sync map[string]map[*watching]struct{}
+		sync map[string]map[*watcher]struct{}
 		evs  []storagepb.Event
 
-		wwe map[*watching][]storagepb.Event
+		wwe map[*watcher][]storagepb.Event
 	}{
-		// no watching in sync, some events should return empty wwe
+		// no watcher in sync, some events should return empty wwe
 		{
-			map[string]map[*watching]struct{}{},
+			map[string]map[*watcher]struct{}{},
 			evs,
-			map[*watching][]storagepb.Event{},
+			map[*watcher][]storagepb.Event{},
 		},
 
-		// one watching in sync, one event that does not match the key of that
-		// watching should return empty wwe
+		// one watcher in sync, one event that does not match the key of that
+		// watcher should return empty wwe
 		{
-			map[string]map[*watching]struct{}{
+			map[string]map[*watcher]struct{}{
 				string(k2): {ws[2]: struct{}{}},
 			},
 			evs[:1],
-			map[*watching][]storagepb.Event{},
+			map[*watcher][]storagepb.Event{},
 		},
 
-		// one watching in sync, one event that matches the key of that
-		// watching should return wwe with that matching watching
+		// one watcher in sync, one event that matches the key of that
+		// watcher should return wwe with that matching watcher
 		{
-			map[string]map[*watching]struct{}{
+			map[string]map[*watcher]struct{}{
 				string(k1): {ws[1]: struct{}{}},
 			},
 			evs[1:2],
-			map[*watching][]storagepb.Event{
+			map[*watcher][]storagepb.Event{
 				ws[1]: evs[1:2],
 			},
 		},
 
-		// two watchings in sync that watches two different keys, one event
-		// that matches the key of only one of the watching should return wwe
-		// with the matching watching
+		// two watchers in sync that watches two different keys, one event
+		// that matches the key of only one of the watcher should return wwe
+		// with the matching watcher
 		{
-			map[string]map[*watching]struct{}{
+			map[string]map[*watcher]struct{}{
 				string(k0): {ws[0]: struct{}{}},
 				string(k2): {ws[2]: struct{}{}},
 			},
 			evs[2:],
-			map[*watching][]storagepb.Event{
+			map[*watcher][]storagepb.Event{
 				ws[2]: evs[2:],
 			},
 		},
 
-		// two watchings in sync that watches the same key, two events that
-		// match the keys should return wwe with those two watchings
+		// two watchers in sync that watches the same key, two events that
+		// match the keys should return wwe with those two watchers
 		{
-			map[string]map[*watching]struct{}{
+			map[string]map[*watcher]struct{}{
 				string(k0): {ws[0]: struct{}{}},
 				string(k1): {ws[1]: struct{}{}},
 			},
 			evs[:2],
-			map[*watching][]storagepb.Event{
+			map[*watcher][]storagepb.Event{
 				ws[0]: evs[:1],
 				ws[1]: evs[1:2],
 			},
@@ -326,7 +325,7 @@ func TestNewMapWatchingToEventMap(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		gwe := newWatchingToEventMap(tt.sync, tt.evs)
+		gwe := newWatcherToEventMap(tt.sync, tt.evs)
 		if len(gwe) != len(tt.wwe) {
 			t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
 		}

+ 10 - 7
storage/watcher.go

@@ -21,13 +21,15 @@ import (
 )
 
 type WatchStream interface {
-	// Watch watches the events happening or happened on the given key
-	// or key prefix from the given startRev.
+	// Watch creates a watcher. The watcher watches the events happening or
+	// happened on the given key or key prefix from the given startRev.
+	//
 	// The whole event history can be watched unless compacted.
 	// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
 	// If `startRev` <=0, watch observes events after currentRev.
-	// The returned `id` is the ID of this watching. It appears as WatchID
-	// in events that are sent to this watching.
+	//
+	// The returned `id` is the ID of this watcher. It appears as WatchID
+	// in events that are sent to the created watcher through stream channel.
 	Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
 
 	// Chan returns a chan. All watched events will be sent to the returned chan.
@@ -37,14 +39,15 @@ type WatchStream interface {
 	Close()
 }
 
-// watchStream contains a collection of watching that share
+// watchStream contains a collection of watchers that share
 // one streaming chan to send out watched events and other control events.
 type watchStream struct {
 	watchable watchable
 	ch        chan []storagepb.Event
 
-	mu      sync.Mutex // guards fields below it
-	nextID  int64      // nextID is the ID allocated for next new watching
+	mu sync.Mutex // guards fields below it
+	// nextID is the ID pre-allocated for next new watcher in this stream
+	nextID  int64
 	closed  bool
 	cancels []CancelFunc
 }

+ 3 - 3
storage/watcher_test.go

@@ -16,8 +16,8 @@ package storage
 
 import "testing"
 
-// TestWatcherWatchID tests that each watcher provides unique watch ID,
-// and the watched event attaches the correct watch ID.
+// TestWatcherWatchID tests that each watcher provides unique watchID,
+// and the watched event attaches the correct watchID.
 func TestWatcherWatchID(t *testing.T) {
 	s := WatchableKV(newWatchableStore(tmpPath))
 	defer cleanup(s, tmpPath)
@@ -47,7 +47,7 @@ func TestWatcherWatchID(t *testing.T) {
 
 	s.Put([]byte("foo2"), []byte("bar"))
 
-	// unsynced watchings
+	// unsynced watchers
 	for i := 10; i < 20; i++ {
 		id, cancel := w.Watch([]byte("foo2"), false, 1)
 		if _, ok := idm[id]; ok {