Browse Source

Merge pull request #3795 from xiang90/watch_stream

storage: add watchChan
Xiang Li 10 years ago
parent
commit
154fc8e19c

+ 3 - 9
storage/kv.go

@@ -80,15 +80,9 @@ type KV interface {
 type WatchableKV interface {
 type WatchableKV interface {
 	KV
 	KV
 
 
-	// Watcher watches the events happening or happened on the given key
-	// or key prefix from the given startRev.
-	// The whole event history can be watched unless compacted.
-	// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
-	// If `startRev` <=0, watch observes events after currentRev.
-	//
-	// Canceling the watcher releases resources associated with it, so code
-	// should always call cancel as soon as watch is done.
-	Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc)
+	// NewWatcher returns a Watcher that can be used to
+	// watch events happened or happending on the KV.
+	NewWatcher() Watcher
 }
 }
 
 
 // ConsistentWatchableKV is a WatchableKV that understands the consistency
 // ConsistentWatchableKV is a WatchableKV that understands the consistency

+ 8 - 6
storage/kv_test.go

@@ -733,12 +733,14 @@ func TestWatchableKVWatch(t *testing.T) {
 	s := WatchableKV(newWatchableStore(tmpPath))
 	s := WatchableKV(newWatchableStore(tmpPath))
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
-	wa, cancel := s.Watcher([]byte("foo"), true, 0)
+	w := s.NewWatcher()
+
+	cancel := w.Watch([]byte("foo"), true, 0)
 	defer cancel()
 	defer cancel()
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
 	select {
 	select {
-	case ev := <-wa.Event():
+	case ev := <-w.Chan():
 		wev := storagepb.Event{
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{
@@ -758,7 +760,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 
 	s.Put([]byte("foo1"), []byte("bar1"))
 	s.Put([]byte("foo1"), []byte("bar1"))
 	select {
 	select {
-	case ev := <-wa.Event():
+	case ev := <-w.Chan():
 		wev := storagepb.Event{
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{
@@ -776,11 +778,11 @@ func TestWatchableKVWatch(t *testing.T) {
 		t.Fatalf("failed to watch the event")
 		t.Fatalf("failed to watch the event")
 	}
 	}
 
 
-	wa, cancel = s.Watcher([]byte("foo1"), false, 1)
+	cancel = w.Watch([]byte("foo1"), false, 1)
 	defer cancel()
 	defer cancel()
 
 
 	select {
 	select {
-	case ev := <-wa.Event():
+	case ev := <-w.Chan():
 		wev := storagepb.Event{
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{
@@ -800,7 +802,7 @@ func TestWatchableKVWatch(t *testing.T) {
 
 
 	s.Put([]byte("foo1"), []byte("bar11"))
 	s.Put([]byte("foo1"), []byte("bar11"))
 	select {
 	select {
-	case ev := <-wa.Event():
+	case ev := <-w.Chan():
 		wev := storagepb.Event{
 		wev := storagepb.Event{
 			Type: storagepb.PUT,
 			Type: storagepb.PUT,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{

+ 62 - 23
storage/watchable_store.go

@@ -22,17 +22,29 @@ import (
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
 
 
+const (
+	// chanBufLen is the length of the buffered chan
+	// for sending out watched events.
+	// TODO: find a good buf value. 1024 is just a random one that
+	// seems to be reasonable.
+	chanBufLen = 1024
+)
+
+type watchable interface {
+	watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
+}
+
 type watchableStore struct {
 type watchableStore struct {
 	mu sync.Mutex
 	mu sync.Mutex
 
 
 	*store
 	*store
 
 
-	// contains all unsynced watchers that needs to sync events that have happened
-	unsynced map[*watcher]struct{}
+	// contains all unsynced watching that needs to sync events that have happened
+	unsynced map[*watching]struct{}
 
 
-	// contains all synced watchers that are tracking the events that will happen
-	// The key of the map is the key that the watcher is watching on.
-	synced map[string][]*watcher
+	// contains all synced watching that are tracking the events that will happen
+	// The key of the map is the key that the watching is watching on.
+	synced map[string][]*watching
 	tx     *ongoingTx
 	tx     *ongoingTx
 
 
 	stopc chan struct{}
 	stopc chan struct{}
@@ -42,12 +54,12 @@ type watchableStore struct {
 func newWatchableStore(path string) *watchableStore {
 func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
 	s := &watchableStore{
 		store:    newStore(path),
 		store:    newStore(path),
-		unsynced: make(map[*watcher]struct{}),
-		synced:   make(map[string][]*watcher),
+		unsynced: make(map[*watching]struct{}),
+		synced:   make(map[string][]*watching),
 		stopc:    make(chan struct{}),
 		stopc:    make(chan struct{}),
 	}
 	}
 	s.wg.Add(1)
 	s.wg.Add(1)
-	go s.syncWatchersLoop()
+	go s.syncWatchingsLoop()
 	return s
 	return s
 }
 }
 
 
@@ -152,11 +164,24 @@ func (s *watchableStore) Close() error {
 	return s.store.Close()
 	return s.store.Close()
 }
 }
 
 
-func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) {
+func (s *watchableStore) NewWatcher() Watcher {
+	return &watcher{
+		watchable: s,
+		ch:        make(chan storagepb.Event, chanBufLen),
+	}
+}
+
+func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
-	wa := newWatcher(key, prefix, startRev)
+	wa := &watching{
+		key:    key,
+		prefix: prefix,
+		cur:    startRev,
+		ch:     ch,
+	}
+
 	k := string(key)
 	k := string(key)
 	if startRev == 0 {
 	if startRev == 0 {
 		s.synced[k] = append(s.synced[k], wa)
 		s.synced[k] = append(s.synced[k], wa)
@@ -169,9 +194,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
 	cancel := CancelFunc(func() {
 	cancel := CancelFunc(func() {
 		s.mu.Lock()
 		s.mu.Lock()
 		defer s.mu.Unlock()
 		defer s.mu.Unlock()
-		wa.stopWithError(ErrCanceled)
-
-		// remove global references of the watcher
+		// remove global references of the watching
 		if _, ok := s.unsynced[wa]; ok {
 		if _, ok := s.unsynced[wa]; ok {
 			delete(s.unsynced, wa)
 			delete(s.unsynced, wa)
 			slowWatchersGauge.Dec()
 			slowWatchersGauge.Dec()
@@ -191,13 +214,13 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
 	return wa, cancel
 	return wa, cancel
 }
 }
 
 
-// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms.
-func (s *watchableStore) syncWatchersLoop() {
+// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms.
+func (s *watchableStore) syncWatchingsLoop() {
 	defer s.wg.Done()
 	defer s.wg.Done()
 
 
 	for {
 	for {
 		s.mu.Lock()
 		s.mu.Lock()
-		s.syncWatchers()
+		s.syncWatchings()
 		s.mu.Unlock()
 		s.mu.Unlock()
 
 
 		select {
 		select {
@@ -208,8 +231,8 @@ func (s *watchableStore) syncWatchersLoop() {
 	}
 	}
 }
 }
 
 
-// syncWatchers syncs the watchers in the unsyncd map.
-func (s *watchableStore) syncWatchers() {
+// syncWatchings syncs the watchings in the unsyncd map.
+func (s *watchableStore) syncWatchings() {
 	_, curRev, _ := s.store.Range(nil, nil, 0, 0)
 	_, curRev, _ := s.store.Range(nil, nil, 0, 0)
 	for w := range s.unsynced {
 	for w := range s.unsynced {
 		var end []byte
 		var end []byte
@@ -225,7 +248,7 @@ func (s *watchableStore) syncWatchers() {
 		}
 		}
 		evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
 		evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
 		if err != nil {
 		if err != nil {
-			w.stopWithError(err)
+			// TODO: send error event to watching
 			delete(s.unsynced, w)
 			delete(s.unsynced, w)
 			continue
 			continue
 		}
 		}
@@ -247,20 +270,20 @@ func (s *watchableStore) syncWatchers() {
 	slowWatchersGauge.Set(float64(len(s.unsynced)))
 	slowWatchersGauge.Set(float64(len(s.unsynced)))
 }
 }
 
 
-// handle handles the change of the happening event on all watchers.
+// handle handles the change of the happening event on all watchings.
 func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
 func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
 	s.notify(rev, ev)
 	s.notify(rev, ev)
 }
 }
 
 
 // notify notifies the fact that given event at the given rev just happened to
 // notify notifies the fact that given event at the given rev just happened to
-// watchers that watch on the key of the event.
+// watchings that watch on the key of the event.
 func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
 func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
-	// check all prefixes of the key to notify all corresponded watchers
+	// check all prefixes of the key to notify all corresponded watchings
 	for i := 0; i <= len(ev.Kv.Key); i++ {
 	for i := 0; i <= len(ev.Kv.Key); i++ {
 		ws := s.synced[string(ev.Kv.Key[:i])]
 		ws := s.synced[string(ev.Kv.Key[:i])]
 		nws := ws[:0]
 		nws := ws[:0]
 		for _, w := range ws {
 		for _, w := range ws {
-			// the watcher needs to be notified when either it watches prefix or
+			// the watching needs to be notified when either it watches prefix or
 			// the key is exactly matched.
 			// the key is exactly matched.
 			if !w.prefix && i != len(ev.Kv.Key) {
 			if !w.prefix && i != len(ev.Kv.Key) {
 				continue
 				continue
@@ -301,3 +324,19 @@ func (tx *ongoingTx) del(k string) {
 	tx.delm[k] = true
 	tx.delm[k] = true
 	tx.putm[k] = false
 	tx.putm[k] = false
 }
 }
+
+type watching struct {
+	// the watching key
+	key []byte
+	// prefix indicates if watching is on a key or a prefix.
+	// If prefix is true, the watching is on a prefix.
+	prefix bool
+	// cur is the current watching revision.
+	// If cur is behind the current revision of the KV,
+	// watching is unsynced and needs to catch up.
+	cur int64
+
+	// a chan to send out the watched events.
+	// The chan might be shared with other watchings.
+	ch chan<- storagepb.Event
+}

+ 6 - 4
storage/watchable_store_bench_test.go

@@ -37,14 +37,14 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	// in unsynced for this benchmark.
 	// in unsynced for this benchmark.
 	s := &watchableStore{
 	s := &watchableStore{
 		store:    newStore(tmpPath),
 		store:    newStore(tmpPath),
-		unsynced: make(map[*watcher]struct{}),
+		unsynced: make(map[*watching]struct{}),
 
 
 		// For previous implementation, use:
 		// For previous implementation, use:
-		// unsynced: make([]*watcher, 0),
+		// unsynced: make([]*watching, 0),
 
 
 		// to make the test not crash from assigning to nil map.
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
 		// 'synced' doesn't get populated in this test.
-		synced: make(map[string][]*watcher),
+		synced: make(map[string][]*watching),
 	}
 	}
 
 
 	defer func() {
 	defer func() {
@@ -60,10 +60,12 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	testValue := []byte("bar")
 	testValue := []byte("bar")
 	s.Put(testKey, testValue)
 	s.Put(testKey, testValue)
 
 
+	w := s.NewWatcher()
+
 	cancels := make([]CancelFunc, watcherSize)
 	cancels := make([]CancelFunc, watcherSize)
 	for i := 0; i < watcherSize; i++ {
 	for i := 0; i < watcherSize; i++ {
 		// non-0 value to keep watchers in unsynced
 		// non-0 value to keep watchers in unsynced
-		_, cancel := s.Watcher(testKey, true, 1)
+		cancel := w.Watch(testKey, true, 1)
 		cancels[i] = cancel
 		cancels[i] = cancel
 	}
 	}
 
 

+ 38 - 37
storage/watcher.go

@@ -20,55 +20,56 @@ import (
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
 
 
-// Watcher watches on the KV. It will be notified if there is an event
-// happened on the watched key or prefix.
 type Watcher interface {
 type Watcher interface {
-	// Event returns a channel that receives observed event that matches the
-	// context of watcher. When watch finishes or is canceled or aborted, the
-	// channel is closed and returns empty event.
-	// Successive calls to Event return the same value.
-	Event() <-chan storagepb.Event
+	// Watch watches the events happening or happened on the given key
+	// or key prefix from the given startRev.
+	// The whole event history can be watched unless compacted.
+	// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
+	// If `startRev` <=0, watch observes events after currentRev.
+	Watch(key []byte, prefix bool, startRev int64) CancelFunc
 
 
-	// Err returns a non-nil error value after Event is closed. Err returns
-	// Compacted if the history was compacted, Canceled if watch is canceled,
-	// or EOF if watch reaches the end revision. No other values for Err are defined.
-	// After Event is closed, successive calls to Err return the same value.
-	Err() error
+	// Chan returns a chan. All watched events will be sent to the returned chan.
+	Chan() <-chan storagepb.Event
+
+	// Close closes the WatchChan and release all related resources.
+	Close()
 }
 }
 
 
+// watcher contains a collection of watching that share
+// one chan to send out watched events and other control events.
 type watcher struct {
 type watcher struct {
-	key    []byte
-	prefix bool
-	cur    int64
+	watchable watchable
+	ch        chan storagepb.Event
 
 
-	ch  chan storagepb.Event
-	mu  sync.Mutex
-	err error
+	mu      sync.Mutex // guards fields below it
+	closed  bool
+	cancels []CancelFunc
 }
 }
 
 
-func newWatcher(key []byte, prefix bool, start int64) *watcher {
-	return &watcher{
-		key:    key,
-		prefix: prefix,
-		cur:    start,
-		ch:     make(chan storagepb.Event, 10),
+// TODO: return error if ws is closed?
+func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc {
+	_, c := ws.watchable.watch(key, prefix, startRev, ws.ch)
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	if ws.closed {
+		return nil
 	}
 	}
+	// TODO: cancelFunc needs to be removed from the cancels when it is called.
+	ws.cancels = append(ws.cancels, c)
+	return c
 }
 }
 
 
-func (w *watcher) Event() <-chan storagepb.Event { return w.ch }
-
-func (w *watcher) Err() error {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	return w.err
+func (ws *watcher) Chan() <-chan storagepb.Event {
+	return ws.ch
 }
 }
 
 
-func (w *watcher) stopWithError(err error) {
-	if w.err != nil {
-		return
+func (ws *watcher) Close() {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+
+	for _, cancel := range ws.cancels {
+		cancel()
 	}
 	}
-	close(w.ch)
-	w.mu.Lock()
-	w.err = err
-	w.mu.Unlock()
+	ws.closed = true
+	close(ws.ch)
 }
 }

+ 5 - 3
storage/watcher_bench_test.go

@@ -20,12 +20,14 @@ import (
 )
 )
 
 
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
-	s := newWatchableStore(tmpPath)
-	defer cleanup(s, tmpPath)
+	watchable := newWatchableStore(tmpPath)
+	defer cleanup(watchable, tmpPath)
+
+	w := watchable.NewWatcher()
 
 
 	b.ReportAllocs()
 	b.ReportAllocs()
 	b.StartTimer()
 	b.StartTimer()
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
-		s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0)
+		w.Watch([]byte(fmt.Sprint("foo", i)), false, 0)
 	}
 	}
 }
 }