Browse Source

Merge pull request #5685 from heyitsanthony/multictx-watcher

clientv3: watch with arbitrary ctx values
Anthony Romano 9 years ago
parent
commit
ce180bbaf1
2 changed files with 206 additions and 36 deletions
  1. 52 0
      clientv3/integration/watch_test.go
  2. 154 36
      clientv3/watch.go

+ 52 - 0
clientv3/integration/watch_test.go

@@ -621,3 +621,55 @@ func TestWatchAfterClose(t *testing.T) {
 	case <-donec:
 	}
 }
+
+// TestWatchWithRequireLeader checks the watch channel closes when no leader.
+func TestWatchWithRequireLeader(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	// something for the non-require leader watch to read as an event
+	if _, err := clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
+		t.Fatal(err)
+	}
+
+	clus.Members[1].Stop(t)
+	clus.Members[2].Stop(t)
+	clus.Client(1).Close()
+	clus.Client(2).Close()
+	clus.TakeClient(1)
+	clus.TakeClient(2)
+
+	// wait for election timeout, then member[0] will not have a leader.
+	tickDuration := 10 * time.Millisecond
+	time.Sleep(time.Duration(3*clus.Members[0].ElectionTicks) * tickDuration)
+
+	chLeader := clus.Client(0).Watch(clientv3.WithRequireLeader(context.TODO()), "foo", clientv3.WithRev(1))
+	chNoLeader := clus.Client(0).Watch(context.TODO(), "foo", clientv3.WithRev(1))
+
+	select {
+	case resp, ok := <-chLeader:
+		if !ok {
+			t.Fatalf("expected %v watch channel, got closed channel", rpctypes.ErrNoLeader)
+		}
+		if resp.Err() != rpctypes.ErrNoLeader {
+			t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
+		}
+	case <-time.After(3 * time.Second):
+		t.Fatal("watch without leader took too long to close")
+	}
+
+	select {
+	case resp, ok := <-chLeader:
+		if ok {
+			t.Fatalf("expected closed channel, got response %v", resp)
+		}
+	case <-time.After(3 * time.Second):
+		t.Fatal("waited too long for channel to close")
+	}
+
+	if _, ok := <-chNoLeader; !ok {
+		t.Fatalf("expected response, got closed channel")
+	}
+}

+ 154 - 36
clientv3/watch.go

@@ -17,6 +17,7 @@ package clientv3
 import (
 	"fmt"
 	"sync"
+	"time"
 
 	v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -27,6 +28,8 @@ import (
 const (
 	EventTypeDelete = mvccpb.DELETE
 	EventTypePut    = mvccpb.PUT
+
+	closeSendErrTimeout = 250 * time.Millisecond
 )
 
 type Event mvccpb.Event
@@ -56,6 +59,8 @@ type WatchResponse struct {
 	// If the watch failed and the stream was about to close, before the channel is closed,
 	// the channel sends a final response that has Canceled set to true with a non-nil Err().
 	Canceled bool
+
+	closeErr error
 }
 
 // IsCreate returns true if the event tells that the key is newly created.
@@ -70,10 +75,12 @@ func (e *Event) IsModify() bool {
 
 // Err is the error value if this WatchResponse holds an error.
 func (wr *WatchResponse) Err() error {
-	if wr.CompactRevision != 0 {
+	switch {
+	case wr.closeErr != nil:
+		return v3rpc.Error(wr.closeErr)
+	case wr.CompactRevision != 0:
 		return v3rpc.ErrCompacted
-	}
-	if wr.Canceled {
+	case wr.Canceled:
 		return v3rpc.ErrFutureRev
 	}
 	return nil
@@ -88,14 +95,26 @@ func (wr *WatchResponse) IsProgressNotify() bool {
 type watcher struct {
 	remote pb.WatchClient
 
+	// mu protects the grpc streams map
+	mu sync.RWMutex
+	// streams holds all the active grpc streams keyed by ctx value.
+	streams map[string]*watchGrpcStream
+}
+
+type watchGrpcStream struct {
+	owner  *watcher
+	remote pb.WatchClient
+
 	// ctx controls internal remote.Watch requests
-	ctx    context.Context
+	ctx context.Context
+	// ctxKey is the key used when looking up this stream's context
+	ctxKey string
 	cancel context.CancelFunc
 
-	// streams holds all active watchers
-	streams map[int64]*watcherStream
 	// mu protects the streams map
 	mu sync.RWMutex
+	// streams holds all active watchers
+	streams map[int64]*watcherStream
 
 	// reqc sends a watch request from Watch() to the main goroutine
 	reqc chan *watchRequest
@@ -105,8 +124,11 @@ type watcher struct {
 	stopc chan struct{}
 	// donec closes to broadcast shutdown
 	donec chan struct{}
-	// errc transmits errors from grpc Recv
+	// errc transmits errors from grpc Recv to the watch stream reconn logic
 	errc chan error
+
+	// the error that closed the watch stream
+	closeErr error
 }
 
 // watchRequest is issued by the subscriber to start a new watcher
@@ -123,6 +145,7 @@ type watchRequest struct {
 
 // watcherStream represents a registered watcher
 type watcherStream struct {
+	// initReq is the request that initiated this request
 	initReq watchRequest
 
 	// outc publishes watch responses to subscriber
@@ -138,10 +161,30 @@ type watcherStream struct {
 }
 
 func NewWatcher(c *Client) Watcher {
-	ctx, cancel := context.WithCancel(context.Background())
-	w := &watcher{
+	return &watcher{
 		remote:  pb.NewWatchClient(c.conn),
+		streams: make(map[string]*watchGrpcStream),
+	}
+}
+
+// never closes
+var valCtxCh = make(chan struct{})
+var zeroTime = time.Unix(0, 0)
+
+// ctx with only the values; never Done
+type valCtx struct{ context.Context }
+
+func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
+func (vc *valCtx) Done() <-chan struct{}       { return valCtxCh }
+func (vc *valCtx) Err() error                  { return nil }
+
+func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
+	ctx, cancel := context.WithCancel(&valCtx{inctx})
+	wgs := &watchGrpcStream{
+		owner:   w,
+		remote:  w.remote,
 		ctx:     ctx,
+		ctxKey:  fmt.Sprintf("%v", inctx),
 		cancel:  cancel,
 		streams: make(map[int64]*watcherStream),
 
@@ -151,8 +194,8 @@ func NewWatcher(c *Client) Watcher {
 		donec: make(chan struct{}),
 		errc:  make(chan error, 1),
 	}
-	go w.run()
-	return w
+	go wgs.run()
+	return wgs
 }
 
 // Watch posts a watch request to run() and waits for a new watcher channel
@@ -170,13 +213,41 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	}
 
 	ok := false
+	ctxKey := fmt.Sprintf("%v", ctx)
+
+	// find or allocate appropriate grpc watch stream
+	w.mu.Lock()
+	if w.streams == nil {
+		// closed
+		w.mu.Unlock()
+		ch := make(chan WatchResponse)
+		close(ch)
+		return ch
+	}
+	wgs := w.streams[ctxKey]
+	if wgs == nil {
+		wgs = w.newWatcherGrpcStream(ctx)
+		w.streams[ctxKey] = wgs
+	}
+	donec := wgs.donec
+	reqc := wgs.reqc
+	w.mu.Unlock()
+
+	// couldn't create channel; return closed channel
+	closeCh := make(chan WatchResponse, 1)
 
 	// submit request
 	select {
-	case w.reqc <- wr:
+	case reqc <- wr:
 		ok = true
 	case <-wr.ctx.Done():
-	case <-w.donec:
+	case <-donec:
+		if wgs.closeErr != nil {
+			closeCh <- WatchResponse{closeErr: wgs.closeErr}
+			break
+		}
+		// retry; may have dropped stream from no ctxs
+		return w.Watch(ctx, key, opts...)
 	}
 
 	// receive channel
@@ -185,23 +256,44 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 		case ret := <-retc:
 			return ret
 		case <-ctx.Done():
-		case <-w.donec:
+		case <-donec:
+			if wgs.closeErr != nil {
+				closeCh <- WatchResponse{closeErr: wgs.closeErr}
+				break
+			}
+			// retry; may have dropped stream from no ctxs
+			return w.Watch(ctx, key, opts...)
 		}
 	}
 
-	// couldn't create channel; return closed channel
-	ch := make(chan WatchResponse)
-	close(ch)
-	return ch
+	close(closeCh)
+	return closeCh
 }
 
-func (w *watcher) Close() error {
+func (w *watcher) Close() (err error) {
+	w.mu.Lock()
+	streams := w.streams
+	w.streams = nil
+	w.mu.Unlock()
+	for _, wgs := range streams {
+		if werr := wgs.Close(); werr != nil {
+			err = werr
+		}
+	}
+	return err
+}
+
+func (w *watchGrpcStream) Close() (err error) {
 	close(w.stopc)
 	<-w.donec
-	return toErr(w.ctx, <-w.errc)
+	select {
+	case err = <-w.errc:
+	default:
+	}
+	return toErr(w.ctx, err)
 }
 
-func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
+func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 	if pendingReq == nil {
 		// no pending request; ignore
 		return
@@ -254,27 +346,27 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 }
 
 // closeStream closes the watcher resources and removes it
-func (w *watcher) closeStream(ws *watcherStream) {
+func (w *watchGrpcStream) closeStream(ws *watcherStream) {
 	// cancels request stream; subscriber receives nil channel
 	close(ws.initReq.retc)
 	// close subscriber's channel
 	close(ws.outc)
-	// shutdown serveStream
-	close(ws.recvc)
 	delete(w.streams, ws.id)
 }
 
 // run is the root of the goroutines for managing a watcher client
-func (w *watcher) run() {
+func (w *watchGrpcStream) run() {
 	var wc pb.Watch_WatchClient
 	var closeErr error
 
 	defer func() {
-		select {
-		case w.errc <- closeErr:
-		default:
+		w.owner.mu.Lock()
+		w.closeErr = closeErr
+		if w.owner.streams != nil {
+			delete(w.owner.streams, w.ctxKey)
 		}
 		close(w.donec)
+		w.owner.mu.Unlock()
 		w.cancel()
 	}()
 
@@ -308,6 +400,18 @@ func (w *watcher) run() {
 				curReqC = w.reqc
 			case pbresp.Canceled:
 				delete(cancelSet, pbresp.WatchId)
+				// shutdown serveStream, if any
+				w.mu.Lock()
+				if ws, ok := w.streams[pbresp.WatchId]; ok {
+					close(ws.recvc)
+					delete(w.streams, ws.id)
+				}
+				numStreams := len(w.streams)
+				w.mu.Unlock()
+				if numStreams == 0 {
+					// don't leak watcher streams
+					return
+				}
 			default:
 				// dispatch to appropriate watch stream
 				if ok := w.dispatchEvent(pbresp); ok {
@@ -328,7 +432,11 @@ func (w *watcher) run() {
 			}
 		// watch client failed to recv; spawn another if possible
 		// TODO report watch client errors from errc?
-		case <-w.errc:
+		case err := <-w.errc:
+			if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
+				closeErr = err
+				return
+			}
 			if wc, closeErr = w.newWatchClient(); closeErr != nil {
 				return
 			}
@@ -357,7 +465,7 @@ func (w *watcher) run() {
 }
 
 // dispatchEvent sends a WatchResponse to the appropriate watcher stream
-func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
+func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
 	w.mu.RLock()
 	defer w.mu.RUnlock()
 	ws, ok := w.streams[pbresp.WatchId]
@@ -377,7 +485,7 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
 }
 
 // serveWatchClient forwards messages from the grpc stream to run()
-func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) {
+func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
 	for {
 		resp, err := wc.Recv()
 		if err != nil {
@@ -396,7 +504,7 @@ func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) {
 }
 
 // serveStream forwards watch responses from run() to the subscriber
-func (w *watcher) serveStream(ws *watcherStream) {
+func (w *watchGrpcStream) serveStream(ws *watcherStream) {
 	emptyWr := &WatchResponse{}
 	wrs := []*WatchResponse{}
 	resuming := false
@@ -465,13 +573,23 @@ func (w *watcher) serveStream(ws *watcherStream) {
 			closing = true
 		}
 	}
+
+	// try to send off close error
+	if w.closeErr != nil {
+		select {
+		case ws.outc <- WatchResponse{closeErr: w.closeErr}:
+		case <-w.donec:
+		case <-time.After(closeSendErrTimeout):
+		}
+	}
+
 	w.mu.Lock()
 	w.closeStream(ws)
 	w.mu.Unlock()
 	// lazily send cancel message if events on missing id
 }
 
-func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) {
+func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
 	ws, rerr := w.resume()
 	if rerr != nil {
 		return nil, rerr
@@ -481,7 +599,7 @@ func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) {
 }
 
 // resume creates a new WatchClient with all current watchers reestablished
-func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
+func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
 	for {
 		if ws, err = w.openWatchClient(); err != nil {
 			break
@@ -493,7 +611,7 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
 }
 
 // openWatchClient retries opening a watchclient until retryConnection fails
-func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
+func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 	for {
 		select {
 		case <-w.stopc:
@@ -514,7 +632,7 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 }
 
 // resumeWatchers rebuilds every registered watcher on a new client
-func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
+func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
 	w.mu.RLock()
 	streams := make([]*watcherStream, 0, len(w.streams))
 	for _, ws := range w.streams {