浏览代码

clientv3: watcher implementation

Anthony Romano 10 年之前
父节点
当前提交
580c563ed6
共有 2 个文件被更改,包括 701 次插入0 次删除
  1. 262 0
      clientv3/integration/watch_test.go
  2. 439 0
      clientv3/watch.go

+ 262 - 0
clientv3/integration/watch_test.go

@@ -0,0 +1,262 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+	storagepb "github.com/coreos/etcd/storage/storagepb"
+)
+
+type watcherTest func(*testing.T, *watchctx)
+
+type watchctx struct {
+	clus    *integration.ClusterV3
+	w       clientv3.Watcher
+	wclient *clientv3.Client
+	kv      clientv3.KV
+	ch      <-chan clientv3.WatchResponse
+}
+
+func runWatchTest(t *testing.T, f watcherTest) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	wclient := clus.RandClient()
+	w := clientv3.NewWatcher(wclient)
+	defer w.Close()
+	// select a different client from wclient so puts succeed if
+	// a test knocks out the watcher client
+	kvclient := clus.RandClient()
+	for kvclient == wclient {
+		kvclient = clus.RandClient()
+	}
+	kv := clientv3.NewKV(kvclient)
+
+	wctx := &watchctx{clus, w, wclient, kv, nil}
+	f(t, wctx)
+}
+
+// TestWatchMultiWatcher modifies multiple keys and observes the changes.
+func TestWatchMultiWatcher(t *testing.T) {
+	runWatchTest(t, testWatchMultiWatcher)
+}
+
+func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
+	numKeyUpdates := 4
+	keys := []string{"foo", "bar", "baz"}
+
+	donec := make(chan struct{})
+	readyc := make(chan struct{})
+	for _, k := range keys {
+		// key watcher
+		go func(key string) {
+			ch := wctx.w.Watch(context.TODO(), key, 0)
+			if ch == nil {
+				t.Fatalf("expected watcher channel, got nil")
+			}
+			readyc <- struct{}{}
+			for i := 0; i < numKeyUpdates; i++ {
+				resp, ok := <-ch
+				if !ok {
+					t.Fatalf("watcher unexpectedly closed")
+				}
+				v := fmt.Sprintf("%s-%d", key, i)
+				gotv := string(resp.Events[0].Kv.Value)
+				if gotv != v {
+					t.Errorf("#%d: got %s, wanted %s", i, gotv, v)
+				}
+			}
+			donec <- struct{}{}
+		}(k)
+	}
+	// prefix watcher on "b" (bar and baz)
+	go func() {
+		prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0)
+		if prefixc == nil {
+			t.Fatalf("expected watcher channel, got nil")
+		}
+		readyc <- struct{}{}
+		evs := []*storagepb.Event{}
+		for i := 0; i < numKeyUpdates*2; i++ {
+			resp, ok := <-prefixc
+			if !ok {
+				t.Fatalf("watcher unexpectedly closed")
+			}
+			evs = append(evs, resp.Events...)
+		}
+
+		// check response
+		expected := []string{}
+		bkeys := []string{"bar", "baz"}
+		for _, k := range bkeys {
+			for i := 0; i < numKeyUpdates; i++ {
+				expected = append(expected, fmt.Sprintf("%s-%d", k, i))
+			}
+		}
+		got := []string{}
+		for _, ev := range evs {
+			got = append(got, string(ev.Kv.Value))
+		}
+		sort.Strings(got)
+		if reflect.DeepEqual(expected, got) == false {
+			t.Errorf("got %v, expected %v", got, expected)
+		}
+
+		// ensure no extra data
+		select {
+		case resp, ok := <-prefixc:
+			if !ok {
+				t.Fatalf("watcher unexpectedly closed")
+			}
+			t.Fatalf("unexpected event %+v", resp)
+		case <-time.After(time.Second):
+		}
+		donec <- struct{}{}
+	}()
+
+	// wait for watcher bring up
+	for i := 0; i < len(keys)+1; i++ {
+		<-readyc
+	}
+	// generate events
+	for i := 0; i < numKeyUpdates; i++ {
+		for _, k := range keys {
+			v := fmt.Sprintf("%s-%d", k, i)
+			if _, err := wctx.kv.Put(k, v, 0); err != nil {
+				t.Fatal(err)
+			}
+		}
+	}
+	// wait for watcher shutdown
+	for i := 0; i < len(keys)+1; i++ {
+		<-donec
+	}
+}
+
+// TestWatchReconnInit tests watcher resumes correctly if connection lost
+// before any data was sent.
+func TestWatchReconnInit(t *testing.T) {
+	runWatchTest(t, testWatchReconnInit)
+}
+
+func testWatchReconnInit(t *testing.T, wctx *watchctx) {
+	if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
+		t.Fatalf("expected non-nil channel")
+	}
+	// take down watcher connection
+	wctx.wclient.ActiveConnection().Close()
+	// watcher should recover
+	putAndWatch(t, wctx, "a", "a")
+}
+
+// TestWatchReconnRunning tests watcher resumes correctly if connection lost
+// after data was sent.
+func TestWatchReconnRunning(t *testing.T) {
+	runWatchTest(t, testWatchReconnRunning)
+}
+
+func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
+	if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
+		t.Fatalf("expected non-nil channel")
+	}
+	putAndWatch(t, wctx, "a", "a")
+	// take down watcher connection
+	wctx.wclient.ActiveConnection().Close()
+	// watcher should recover
+	putAndWatch(t, wctx, "a", "b")
+}
+
+// TestWatchCancelInit tests watcher closes correctly after no events.
+func TestWatchCancelInit(t *testing.T) {
+	runWatchTest(t, testWatchCancelInit)
+}
+
+func testWatchCancelInit(t *testing.T, wctx *watchctx) {
+	ctx, cancel := context.WithCancel(context.Background())
+	if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
+		t.Fatalf("expected non-nil watcher channel")
+	}
+	cancel()
+	select {
+	case <-time.After(time.Second):
+		t.Fatalf("took too long to cancel")
+	case _, ok := <-wctx.ch:
+		if ok {
+			t.Fatalf("expected watcher channel to close")
+		}
+	}
+}
+
+// TestWatchCancelRunning tests watcher closes correctly after events.
+func TestWatchCancelRunning(t *testing.T) {
+	runWatchTest(t, testWatchCancelRunning)
+}
+
+func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
+	ctx, cancel := context.WithCancel(context.Background())
+	if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
+		t.Fatalf("expected non-nil watcher channel")
+	}
+	if _, err := wctx.kv.Put("a", "a", 0); err != nil {
+		t.Fatal(err)
+	}
+	cancel()
+	select {
+	case <-time.After(time.Second):
+		t.Fatalf("took too long to cancel")
+	case v, ok := <-wctx.ch:
+		if !ok {
+			// closed before getting put; OK
+			break
+		}
+		// got the PUT; should close next
+		select {
+		case <-time.After(time.Second):
+			t.Fatalf("took too long to close")
+		case v, ok = <-wctx.ch:
+			if ok {
+				t.Fatalf("expected watcher channel to close, got %v", v)
+			}
+		}
+	}
+}
+
+func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
+	if _, err := wctx.kv.Put(key, val, 0); err != nil {
+		t.Fatal(err)
+	}
+	select {
+	case <-time.After(5 * time.Second):
+		t.Fatalf("watch timed out")
+	case v, ok := <-wctx.ch:
+		if !ok {
+			t.Fatalf("unexpected watch close")
+		}
+		if string(v.Events[0].Kv.Value) != val {
+			t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
+		}
+	}
+}

+ 439 - 0
clientv3/watch.go

@@ -15,7 +15,11 @@
 package clientv3
 
 import (
+	"fmt"
+	"sync"
+
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	storagepb "github.com/coreos/etcd/storage/storagepb"
 )
@@ -41,3 +45,438 @@ type WatchResponse struct {
 	Header pb.ResponseHeader
 	Events []*storagepb.Event
 }
+
+// watcher implements the Watcher interface
+type watcher struct {
+	c      *Client
+	conn   *grpc.ClientConn
+	remote pb.WatchClient
+
+	// ctx controls internal remote.Watch requests
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	// streams holds all active watchers
+	streams map[int64]*watcherStream
+	// mu protects the streams map
+	mu sync.RWMutex
+
+	// reqc sends a watch request from Watch() to the main goroutine
+	reqc chan *watchRequest
+	// respc receives data from the watch client
+	respc chan *pb.WatchResponse
+	// stopc is sent to the main goroutine to stop all processing
+	stopc chan struct{}
+	// donec closes to broadcast shutdown
+	donec chan struct{}
+	// errc transmits errors from grpc Recv
+	errc chan error
+}
+
+// watchRequest is issued by the subscriber to start a new watcher
+type watchRequest struct {
+	ctx    context.Context
+	key    string
+	prefix string
+	rev    int64
+	// retc receives a chan WatchResponse once the watcher is established
+	retc chan chan WatchResponse
+}
+
+// watcherStream represents a registered watcher
+type watcherStream struct {
+	initReq watchRequest
+
+	// outc publishes watch responses to subscriber
+	outc chan<- WatchResponse
+	// recvc buffers watch responses before publishing
+	recvc chan *WatchResponse
+	id    int64
+
+	// lastRev is revision last successfully sent over outc
+	lastRev int64
+	// resumec indicates the stream must recover at a given revision
+	resumec chan int64
+}
+
+func NewWatcher(c *Client) Watcher {
+	ctx, cancel := context.WithCancel(context.Background())
+	conn := c.ActiveConnection()
+
+	w := &watcher{
+		c:      c,
+		conn:   conn,
+		remote: pb.NewWatchClient(conn),
+
+		ctx:     ctx,
+		cancel:  cancel,
+		streams: make(map[int64]*watcherStream),
+
+		respc: make(chan *pb.WatchResponse),
+		reqc:  make(chan *watchRequest),
+		stopc: make(chan struct{}),
+		donec: make(chan struct{}),
+		errc:  make(chan error, 1),
+	}
+	go w.run()
+	return w
+}
+
+func (w *watcher) Watch(ctx context.Context, key string, rev int64) <-chan WatchResponse {
+	return w.watch(ctx, key, "", rev)
+}
+
+func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) <-chan WatchResponse {
+	return w.watch(ctx, "", prefix, rev)
+}
+
+func (w *watcher) Close() error {
+	select {
+	case w.stopc <- struct{}{}:
+	case <-w.donec:
+	}
+	<-w.donec
+	return <-w.errc
+}
+
+// watch posts a watch request to run() and waits for a new watcher channel
+func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) <-chan WatchResponse {
+	retc := make(chan chan WatchResponse, 1)
+	wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc}
+	// submit request
+	select {
+	case w.reqc <- wr:
+	case <-wr.ctx.Done():
+		return nil
+	case <-w.donec:
+		return nil
+	}
+	// receive channel
+	select {
+	case ret := <-retc:
+		return ret
+	case <-ctx.Done():
+		return nil
+	case <-w.donec:
+		return nil
+	}
+}
+
+func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
+	if pendingReq == nil {
+		// no pending request; ignore
+		return
+	} else if resp.WatchId == -1 || resp.Compacted {
+		// failed; no channel
+		pendingReq.retc <- nil
+		return
+	}
+
+	ret := make(chan WatchResponse)
+	ws := &watcherStream{
+		initReq: *pendingReq,
+		id:      resp.WatchId,
+		outc:    ret,
+		// buffered so unlikely to block on sending while holding mu
+		recvc:   make(chan *WatchResponse, 4),
+		resumec: make(chan int64),
+	}
+
+	w.mu.Lock()
+	w.streams[ws.id] = ws
+	w.mu.Unlock()
+
+	// send messages to subscriber
+	go w.serveStream(ws)
+
+	// pass back the subscriber channel for the watcher
+	pendingReq.retc <- ret
+}
+
+// closeStream closes the watcher resources and removes it
+func (w *watcher) closeStream(ws *watcherStream) {
+	// cancels request stream; subscriber receives nil channel
+	close(ws.initReq.retc)
+	// close subscriber's channel
+	close(ws.outc)
+	// shutdown serveStream
+	close(ws.recvc)
+	delete(w.streams, ws.id)
+}
+
+// run is the root of the goroutines for managing a watcher client
+func (w *watcher) run() {
+	defer func() {
+		close(w.donec)
+		w.cancel()
+	}()
+
+	// start a stream with the etcd grpc server
+	wc, wcerr := w.newWatchClient()
+	if wcerr != nil {
+		w.errc <- wcerr
+		return
+	}
+
+	var pendingReq, failedReq *watchRequest
+	curReqC := w.reqc
+	cancelSet := make(map[int64]struct{})
+
+	for {
+		select {
+		// Watch() requested
+		case pendingReq = <-curReqC:
+			// no more watch requests until there's a response
+			curReqC = nil
+			if err := wc.Send(pendingReq.toPB()); err == nil {
+				// pendingReq now waits on w.respc
+				break
+			}
+			failedReq = pendingReq
+		// New events from the watch client
+		case pbresp := <-w.respc:
+			switch {
+			case pbresp.Canceled:
+				delete(cancelSet, pbresp.WatchId)
+			case pbresp.Compacted:
+				w.mu.Lock()
+				if ws, ok := w.streams[pbresp.WatchId]; ok {
+					w.closeStream(ws)
+				}
+				w.mu.Unlock()
+			case pbresp.Created:
+				// response to pending req, try to add
+				w.addStream(pbresp, pendingReq)
+				pendingReq = nil
+				curReqC = w.reqc
+			default:
+				// dispatch to appropriate watch stream
+				if ok := w.dispatchEvent(pbresp); ok {
+					break
+				}
+				// watch response on unexpected watch id; cancel id
+				if _, ok := cancelSet[pbresp.WatchId]; ok {
+					break
+				}
+				cancelSet[pbresp.WatchId] = struct{}{}
+				cr := &pb.WatchRequest_CancelRequest{
+					CancelRequest: &pb.WatchCancelRequest{
+						WatchId: pbresp.WatchId,
+					},
+				}
+				req := &pb.WatchRequest{RequestUnion: cr}
+				wc.Send(req)
+			}
+		// watch client failed to recv; spawn another if possible
+		// TODO report watch client errors from errc?
+		case <-w.errc:
+			if wc, wcerr = w.newWatchClient(); wcerr != nil {
+				w.errc <- wcerr
+				return
+			}
+			curReqC = w.reqc
+			if pendingReq != nil {
+				failedReq = pendingReq
+			}
+			cancelSet = make(map[int64]struct{})
+		case <-w.stopc:
+			w.errc <- nil
+			return
+		}
+
+		// send failed; queue for retry
+		if failedReq != nil {
+			go func() {
+				select {
+				case w.reqc <- pendingReq:
+				case <-pendingReq.ctx.Done():
+				case <-w.donec:
+				}
+			}()
+			failedReq = nil
+			pendingReq = nil
+		}
+	}
+}
+
+// dispatchEvent sends a WatchResponse to the appropriate watcher stream
+func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
+	w.mu.RLock()
+	defer w.mu.RUnlock()
+	ws, ok := w.streams[pbresp.WatchId]
+	if ok {
+		wr := &WatchResponse{*pbresp.Header, pbresp.Events}
+		ws.recvc <- wr
+	}
+	return ok
+}
+
+// serveWatchClient forwards messages from the grpc stream to run()
+func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) {
+	for {
+		resp, err := wc.Recv()
+		if err != nil {
+			select {
+			case w.errc <- err:
+			case <-w.donec:
+			}
+			return
+		}
+		select {
+		case w.respc <- resp:
+		case <-w.donec:
+			return
+		}
+	}
+}
+
+// serveStream forwards watch responses from run() to the subscriber
+func (w *watcher) serveStream(ws *watcherStream) {
+	emptyWr := &WatchResponse{}
+	wrs := []*WatchResponse{}
+	resuming := false
+	closing := false
+	for !closing {
+		curWr := emptyWr
+		outc := ws.outc
+		if len(wrs) > 0 {
+			curWr = wrs[0]
+		} else {
+			outc = nil
+		}
+		select {
+		case outc <- *curWr:
+			newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
+			if newRev != ws.lastRev {
+				ws.lastRev = newRev
+			}
+			wrs[0] = nil
+			wrs = wrs[1:]
+		case wr, ok := <-ws.recvc:
+			if !ok {
+				// shutdown from closeStream
+				return
+			}
+			// resume up to last seen event if disconnected
+			if resuming {
+				resuming = false
+				// trim events already seen
+				for i := 0; i < len(wr.Events); i++ {
+					if wr.Events[i].Kv.ModRevision > ws.lastRev {
+						wr.Events = wr.Events[i:]
+						break
+					}
+				}
+				// only forward new events
+				if wr.Events[0].Kv.ModRevision == ws.lastRev {
+					break
+				}
+			}
+			// TODO don't keep buffering if subscriber stops reading
+			wrs = append(wrs, wr)
+		case resumeRev := <-ws.resumec:
+			if resumeRev != ws.lastRev {
+				panic("unexpected resume revision")
+			}
+			wrs = nil
+			resuming = true
+		case <-w.donec:
+			closing = true
+		case <-ws.initReq.ctx.Done():
+			closing = true
+		}
+	}
+	w.mu.Lock()
+	w.closeStream(ws)
+	w.mu.Unlock()
+	// lazily send cancel message if events on missing id
+}
+
+func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) {
+	ws, rerr := w.resume()
+	if rerr != nil {
+		return nil, rerr
+	}
+	go w.serveWatchClient(ws)
+	return ws, nil
+}
+
+// resume creates a new WatchClient with all current watchers reestablished
+func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
+	for {
+		if ws, err = w.openWatchClient(); err != nil {
+			break
+		} else if err = w.resumeWatchers(ws); err == nil {
+			break
+		}
+	}
+	return ws, err
+}
+
+// openWatchClient retries opening a watchclient until retryConnection fails
+func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
+	for {
+		if ws, err = w.remote.Watch(w.ctx); ws != nil {
+			break
+		} else if isRPCError(err) {
+			return nil, err
+		}
+		newConn, nerr := w.c.retryConnection(w.conn, nil)
+		if nerr != nil {
+			return nil, nerr
+		}
+		w.conn = newConn
+		w.remote = pb.NewWatchClient(w.conn)
+	}
+	return ws, nil
+}
+
+// resumeWatchers rebuilds every registered watcher on a new client
+func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
+	streams := []*watcherStream{}
+	w.mu.RLock()
+	for _, ws := range w.streams {
+		streams = append(streams, ws)
+	}
+	w.mu.RUnlock()
+
+	for _, ws := range streams {
+		// reconstruct watcher from initial request
+		if ws.lastRev != 0 {
+			ws.initReq.rev = ws.lastRev
+		}
+		if err := wc.Send(ws.initReq.toPB()); err != nil {
+			return err
+		}
+
+		// wait for request ack
+		resp, err := wc.Recv()
+		if err != nil {
+			return err
+		} else if len(resp.Events) != 0 || resp.Created != true {
+			return fmt.Errorf("watcher: unexpected response (%+v)", resp)
+		}
+
+		// id may be different since new remote watcher; update map
+		w.mu.Lock()
+		delete(w.streams, ws.id)
+		ws.id = resp.WatchId
+		w.streams[ws.id] = ws
+		w.mu.Unlock()
+
+		ws.resumec <- ws.lastRev
+	}
+	return nil
+}
+
+// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
+func (wr *watchRequest) toPB() *pb.WatchRequest {
+	req := &pb.WatchCreateRequest{StartRevision: wr.rev}
+	if wr.key != "" {
+		req.Key = []byte(wr.key)
+	} else {
+		req.Prefix = []byte(wr.prefix)
+	}
+	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
+	return &pb.WatchRequest{RequestUnion: cr}
+}