Browse Source

storage: implement requestProgress

Xiang Li 9 years ago
parent
commit
9143329c85
4 changed files with 118 additions and 5 deletions
  1. 28 1
      storage/watchable_store.go
  2. 25 4
      storage/watcher.go
  3. 5 0
      storage/watcher_group.go
  4. 60 0
      storage/watcher_test.go

+ 28 - 1
storage/watchable_store.go

@@ -34,6 +34,7 @@ const (
 
 type watchable interface {
 	watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
+	progress(w *watcher)
 	rev() int64
 }
 
@@ -168,6 +169,7 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 		watchable: s,
 		ch:        make(chan WatchResponse, chanBufLen),
 		cancels:   make(map[WatchID]cancelFunc),
+		watchers:  make(map[WatchID]*watcher),
 	}
 }
 
@@ -267,7 +269,9 @@ func (s *watchableStore) syncWatchers() {
 	evs := kvsToEvents(&s.unsynced, revs, vs)
 	tx.Unlock()
 
-	for w, eb := range newWatcherBatch(&s.unsynced, evs) {
+	wb := newWatcherBatch(&s.unsynced, evs)
+
+	for w, eb := range wb {
 		select {
 		// s.store.Rev also uses Lock, so just return directly
 		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}:
@@ -287,6 +291,15 @@ func (s *watchableStore) syncWatchers() {
 		s.unsynced.delete(w)
 	}
 
+	// bring all un-notified watchers to synced.
+	for w := range s.unsynced.watchers {
+		if !wb.contains(w) {
+			w.cur = curRev
+			s.synced.add(w)
+			s.unsynced.delete(w)
+		}
+	}
+
 	slowWatcherGauge.Set(float64(s.unsynced.size()))
 }
 
@@ -335,6 +348,20 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 
 func (s *watchableStore) rev() int64 { return s.store.Rev() }
 
+func (s *watchableStore) progress(w *watcher) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if _, ok := s.synced.watchers[w]; ok {
+		select {
+		case w.ch <- WatchResponse{WatchID: w.id, Revision: s.rev()}:
+		default:
+			// If the ch is full, this watcher is receiving events.
+			// We do not need to send progress at all.
+		}
+	}
+}
+
 type watcher struct {
 	// the watcher key
 	key []byte

+ 25 - 4
storage/watcher.go

@@ -42,6 +42,14 @@ type WatchStream interface {
 	// Chan returns a chan. All watch response will be sent to the returned chan.
 	Chan() <-chan WatchResponse
 
+	// RequestProgress requests the progress of the watcher with given ID. The response
+	// will only be sent if the watcher is currently synced.
+	// The responses will be sent through the WatchRespone Chan attached
+	// with this stream to ensure correct ordering.
+	// The responses contains no events. The revision in the response is the progress
+	// of the watchers since the watcher is currently synced.
+	RequestProgress(id WatchID)
+
 	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
 	// returned.
 	Cancel(id WatchID) error
@@ -79,9 +87,10 @@ type watchStream struct {
 
 	mu sync.Mutex // guards fields below it
 	// nextID is the ID pre-allocated for next new watcher in this stream
-	nextID  WatchID
-	closed  bool
-	cancels map[WatchID]cancelFunc
+	nextID   WatchID
+	closed   bool
+	cancels  map[WatchID]cancelFunc
+	watchers map[WatchID]*watcher
 }
 
 // Watch creates a new watcher in the stream and returns its WatchID.
@@ -96,9 +105,10 @@ func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
 	id := ws.nextID
 	ws.nextID++
 
-	_, c := ws.watchable.watch(key, end, startRev, id, ws.ch)
+	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch)
 
 	ws.cancels[id] = c
+	ws.watchers[id] = w
 	return id
 }
 
@@ -113,6 +123,7 @@ func (ws *watchStream) Cancel(id WatchID) error {
 	}
 	cancel()
 	delete(ws.cancels, id)
+	delete(ws.watchers, id)
 	return nil
 }
 
@@ -133,3 +144,13 @@ func (ws *watchStream) Rev() int64 {
 	defer ws.mu.Unlock()
 	return ws.watchable.rev()
 }
+
+func (ws *watchStream) RequestProgress(id WatchID) {
+	ws.mu.Lock()
+	w, ok := ws.watchers[id]
+	ws.mu.Unlock()
+	if !ok {
+		return
+	}
+	ws.watchable.progress(w)
+}

+ 5 - 0
storage/watcher_group.go

@@ -75,6 +75,11 @@ func (wb watcherBatch) add(w *watcher, ev storagepb.Event) {
 	eb.add(ev)
 }
 
+func (wb watcherBatch) contains(w *watcher) bool {
+	_, ok := wb[w]
+	return ok
+}
+
 // newWatcherBatch maps watchers to their matched events. It enables quick
 // events look up by watcher.
 func newWatcherBatch(wg *watcherGroup, evs []storagepb.Event) watcherBatch {

+ 60 - 0
storage/watcher_test.go

@@ -16,7 +16,10 @@ package storage
 
 import (
 	"bytes"
+	"os"
+	"reflect"
 	"testing"
+	"time"
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/backend"
@@ -184,3 +187,60 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
 		t.Errorf("cancels = %d, want 0", l)
 	}
 }
+
+// TestWatcherRequestProgress ensures synced watcher can correctly
+// report its correct progress.
+func TestWatcherRequestProgress(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+
+	// manually create watchableStore instead of newWatchableStore
+	// because newWatchableStore automatically calls syncWatchers
+	// method to sync watchers in unsynced map. We want to keep watchers
+	// in unsynced to test if syncWatchers works as expected.
+	s := &watchableStore{
+		store:    NewStore(b, &lease.FakeLessor{}),
+		unsynced: newWatcherGroup(),
+		synced:   newWatcherGroup(),
+	}
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+
+	testKey := []byte("foo")
+	notTestKey := []byte("bad")
+	testValue := []byte("bar")
+	s.Put(testKey, testValue, lease.NoLease)
+
+	w := s.NewWatchStream()
+
+	badID := WatchID(1000)
+	w.RequestProgress(badID)
+	select {
+	case resp := <-w.Chan():
+		t.Fatalf("unexpected %+v", resp)
+	default:
+	}
+
+	id := w.Watch(notTestKey, nil, 1)
+	w.RequestProgress(id)
+	select {
+	case resp := <-w.Chan():
+		t.Fatalf("unexpected %+v", resp)
+	default:
+	}
+
+	s.syncWatchers()
+
+	w.RequestProgress(id)
+	wrs := WatchResponse{WatchID: 0, Revision: 2}
+	select {
+	case resp := <-w.Chan():
+		if !reflect.DeepEqual(resp, wrs) {
+			t.Fatalf("got %+v, expect %+v", resp, wrs)
+		}
+	case <-time.After(time.Second):
+		t.Fatal("failed to receive progress")
+	}
+}