Browse Source

grpcproxy: rework watcher organization

The single watcher / group watcher distinction limited and
complicated watcher coalescing more than necessary. Reworked:

Each server watcher is represented by a WatchBroadcast, each
client "Watcher" attaches to some WatchBroadcast. WatchBroadcasts
hold all WatchBroadcast instances for a range. WatchRanges holds
all WatchBroadcasts for the proxy.

WatchProxyStreams represent a grpc watch stream between the proxy and
a client. When a client requests a new watcher through its grpc stream,
the ProxyStream will allocate a Watcher and WatchRanges assigns it to
some WatchBroadcast based on its range.

Coalescing is done by WatchBroadcasts when it receives an update
notification from a WatchBroadcast.

Supports leader failure detection so watches on a bad member
can migrate to other members. Coincidentally, Fixes #6303.
Anthony Romano 9 năm trước cách đây
mục cha
commit
ec459c2185

+ 156 - 163
proxy/grpcproxy/watch.go

@@ -15,253 +15,246 @@
 package grpcproxy
 
 import (
-	"io"
 	"sync"
 
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
+	"google.golang.org/grpc/metadata"
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 type watchProxy struct {
 	cw  clientv3.Watcher
-	wgs watchergroups
+	ctx context.Context
 
-	mu           sync.Mutex
-	nextStreamID int64
+	ranges *watchRanges
 
-	ctx context.Context
+	// retryLimiter controls the create watch retry rate on lost leaders.
+	retryLimiter *rate.Limiter
+
+	// mu protects leaderc updates.
+	mu      sync.RWMutex
+	leaderc chan struct{}
+
+	// wg waits until all outstanding watch servers quit.
+	wg sync.WaitGroup
 }
 
+const (
+	lostLeaderKey  = "__lostleader" // watched to detect leader l oss
+	retryPerSecond = 10
+)
+
 func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 	wp := &watchProxy{
-		cw: c.Watcher,
-		wgs: watchergroups{
-			cw:        c.Watcher,
-			groups:    make(map[watchRange]*watcherGroup),
-			idToGroup: make(map[receiverID]*watcherGroup),
-			proxyCtx:  c.Ctx(),
-		},
-		ctx: c.Ctx(),
+		cw:           c.Watcher,
+		ctx:          clientv3.WithRequireLeader(c.Ctx()),
+		retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
+		leaderc:      make(chan struct{}),
 	}
+	wp.ranges = newWatchRanges(wp)
 	go func() {
+		// a new streams without opening any watchers won't catch
+		// a lost leader event, so have a special watch to monitor it
+		rev := int64((uint64(1) << 63) - 2)
+		for wp.ctx.Err() == nil {
+			wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev))
+			for range wch {
+			}
+			wp.mu.Lock()
+			close(wp.leaderc)
+			wp.leaderc = make(chan struct{})
+			wp.mu.Unlock()
+			wp.retryLimiter.Wait(wp.ctx)
+		}
+		wp.mu.Lock()
 		<-wp.ctx.Done()
-		wp.wgs.stop()
+		wp.mu.Unlock()
+		wp.wg.Wait()
+		wp.ranges.stop()
 	}()
 	return wp
 }
 
 func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 	wp.mu.Lock()
-	wp.nextStreamID++
-	sid := wp.nextStreamID
+	select {
+	case <-wp.ctx.Done():
+		wp.mu.Unlock()
+		return
+	default:
+		wp.wg.Add(1)
+	}
 	wp.mu.Unlock()
 
-	ctx, cancel := context.WithCancel(wp.ctx)
-	sws := serverWatchStream{
-		cw:       wp.cw,
-		groups:   &wp.wgs,
-		singles:  make(map[int64]*watcherSingle),
-		inGroups: make(map[int64]struct{}),
+	ctx, cancel := context.WithCancel(stream.Context())
+	wps := &watchProxyStream{
+		ranges:   wp.ranges,
+		watchers: make(map[int64]*watcher),
+		stream:   stream,
+		watchCh:  make(chan *pb.WatchResponse, 1024),
+		ctx:      ctx,
+		cancel:   cancel,
+	}
 
-		id:         sid,
-		gRPCStream: stream,
+	var leaderc <-chan struct{}
+	if md, ok := metadata.FromContext(stream.Context()); ok {
+		v := md[rpctypes.MetadataRequireLeaderKey]
+		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+			leaderc = wp.lostLeaderNotify()
+		}
+	}
 
-		watchCh: make(chan *pb.WatchResponse, 1024),
+	// post to stopc => terminate server stream; can't use a waitgroup
+	// since all goroutines will only terminate after Watch() exits.
+	stopc := make(chan struct{}, 3)
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		wps.recvLoop()
+	}()
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		wps.sendLoop()
+	}()
+	if leaderc != nil {
+		go func() {
+			defer func() { stopc <- struct{}{} }()
+			select {
+			case <-leaderc:
+			case <-ctx.Done():
+			}
+		}()
+	}
+
+	<-stopc
+	// recv/send may only shutdown after function exits;
+	// goroutine notifies proxy that stream is through
+	go func() {
+		if leaderc != nil {
+			<-stopc
+		}
+		<-stopc
+		wps.close()
+		wp.wg.Done()
+	}()
 
-		ctx:    ctx,
-		cancel: cancel,
+	select {
+	case <-leaderc:
+		return rpctypes.ErrNoLeader
+	default:
+		return wps.ctx.Err()
 	}
+}
 
-	go sws.recvLoop()
-	sws.sendLoop()
-	return wp.ctx.Err()
+func (wp *watchProxy) lostLeaderNotify() <-chan struct{} {
+	wp.mu.RLock()
+	defer wp.mu.RUnlock()
+	return wp.leaderc
 }
 
-type serverWatchStream struct {
-	id int64
-	cw clientv3.Watcher
+// watchProxyStream forwards etcd watch events to a proxied client stream.
+type watchProxyStream struct {
+	ranges *watchRanges
 
-	mu       sync.Mutex // make sure any access of groups and singles is atomic
-	groups   *watchergroups
-	singles  map[int64]*watcherSingle
-	inGroups map[int64]struct{}
+	// mu protects watchers and nextWatcherID
+	mu sync.Mutex
+	// watchers receive events from watch broadcast.
+	watchers map[int64]*watcher
+	// nextWatcherID is the id to assign the next watcher on this stream.
+	nextWatcherID int64
 
-	gRPCStream pb.Watch_WatchServer
+	stream pb.Watch_WatchServer
 
+	// watchCh receives watch responses from the watchers.
 	watchCh chan *pb.WatchResponse
 
-	nextWatcherID int64
-
 	ctx    context.Context
 	cancel context.CancelFunc
 }
 
-func (sws *serverWatchStream) close() {
+func (wps *watchProxyStream) close() {
 	var wg sync.WaitGroup
-	sws.cancel()
-	sws.mu.Lock()
-	wg.Add(len(sws.singles) + len(sws.inGroups))
-	for _, ws := range sws.singles {
-		// copy the range variable to avoid race
-		copyws := ws
-		go func() {
-			copyws.stop()
-			wg.Done()
-		}()
-	}
-	for id := range sws.inGroups {
-		// copy the range variable to avoid race
-		wid := id
-		go func() {
-			sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
+	wps.cancel()
+	wps.mu.Lock()
+	wg.Add(len(wps.watchers))
+	for _, wpsw := range wps.watchers {
+		go func(w *watcher) {
+			wps.ranges.delete(w)
 			wg.Done()
-		}()
+		}(wpsw)
 	}
-	sws.inGroups = nil
-	sws.mu.Unlock()
+	wps.watchers = nil
+	wps.mu.Unlock()
 
 	wg.Wait()
 
-	close(sws.watchCh)
+	close(wps.watchCh)
 }
 
-func (sws *serverWatchStream) recvLoop() error {
-	defer sws.close()
-
+func (wps *watchProxyStream) recvLoop() error {
 	for {
-		req, err := sws.gRPCStream.Recv()
-		if err == io.EOF {
-			return nil
-		}
+		req, err := wps.stream.Recv()
 		if err != nil {
 			return err
 		}
-
 		switch uv := req.RequestUnion.(type) {
 		case *pb.WatchRequest_CreateRequest:
 			cr := uv.CreateRequest
+			w := &watcher{
+				wr:  watchRange{string(cr.Key), string(cr.RangeEnd)},
+				id:  wps.nextWatcherID,
+				wps: wps,
 
-			watcher := watcher{
-				wr: watchRange{
-					key: string(cr.Key),
-					end: string(cr.RangeEnd),
-				},
-				id:  sws.nextWatcherID,
-				sws: sws,
-
+				nextrev:  cr.StartRevision,
 				progress: cr.ProgressNotify,
 				filters:  v3rpc.FiltersFromRequest(cr),
 			}
-			if cr.StartRevision != 0 {
-				sws.addDedicatedWatcher(watcher, cr.StartRevision)
-			} else {
-				sws.addCoalescedWatcher(watcher)
-			}
-			sws.nextWatcherID++
-
+			wps.nextWatcherID++
+			w.nextrev = cr.StartRevision
+			wps.watchers[w.id] = w
+			wps.ranges.add(w)
 		case *pb.WatchRequest_CancelRequest:
-			sws.removeWatcher(uv.CancelRequest.WatchId)
+			wps.delete(uv.CancelRequest.WatchId)
 		default:
 			panic("not implemented")
 		}
 	}
 }
 
-func (sws *serverWatchStream) sendLoop() {
+func (wps *watchProxyStream) sendLoop() {
 	for {
 		select {
-		case wresp, ok := <-sws.watchCh:
+		case wresp, ok := <-wps.watchCh:
 			if !ok {
 				return
 			}
-			if err := sws.gRPCStream.Send(wresp); err != nil {
+			if err := wps.stream.Send(wresp); err != nil {
 				return
 			}
-		case <-sws.ctx.Done():
+		case <-wps.ctx.Done():
 			return
 		}
 	}
 }
 
-func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
-	sws.mu.Lock()
-	defer sws.mu.Unlock()
+func (wps *watchProxyStream) delete(id int64) {
+	wps.mu.Lock()
+	defer wps.mu.Unlock()
 
-	rid := receiverID{streamID: sws.id, watcherID: w.id}
-	sws.groups.addWatcher(rid, w)
-	sws.inGroups[w.id] = struct{}{}
-}
-
-func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
-	ctx, cancel := context.WithCancel(sws.ctx)
-	wch := sws.cw.Watch(ctx,
-		w.wr.key, clientv3.WithRange(w.wr.end),
-		clientv3.WithRev(rev),
-		clientv3.WithProgressNotify(),
-		clientv3.WithCreatedNotify(),
-	)
-	sws.mu.Lock()
-	defer sws.mu.Unlock()
-	ws := newWatcherSingle(wch, cancel, w, sws)
-	sws.singles[w.id] = ws
-	go ws.run()
-}
-
-func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
-	sws.mu.Lock()
-	defer sws.mu.Unlock()
-
-	// do not add new watchers when stream is closing
-	if sws.inGroups == nil {
-		return false
-	}
-	if sws.groups.maybeJoinWatcherSingle(ws) {
-		delete(sws.singles, ws.w.id)
-		sws.inGroups[ws.w.id] = struct{}{}
-		return true
-	}
-	return false
-}
-
-func (sws *serverWatchStream) removeWatcher(id int64) {
-	sws.mu.Lock()
-	defer sws.mu.Unlock()
-
-	var (
-		rev int64
-		ok  bool
-	)
-
-	defer func() {
-		if !ok {
-			return
-		}
-		resp := &pb.WatchResponse{
-			Header: &pb.ResponseHeader{
-				// todo: fill in ClusterId
-				// todo: fill in MemberId:
-				Revision: rev,
-				// todo: fill in RaftTerm:
-			},
-			WatchId:  id,
-			Canceled: true,
-		}
-		sws.watchCh <- resp
-	}()
-
-	rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
-	if ok {
-		delete(sws.inGroups, id)
+	w, ok := wps.watchers[id]
+	if !ok {
 		return
 	}
-
-	var ws *watcherSingle
-	if ws, ok = sws.singles[id]; ok {
-		delete(sws.singles, id)
-		ws.stop()
-		rev = ws.lastStoreRev
+	wps.ranges.delete(w)
+	delete(wps.watchers, id)
+	resp := &pb.WatchResponse{
+		Header:   &w.lastHeader,
+		WatchId:  id,
+		Canceled: true,
 	}
+	wps.watchCh <- resp
 }

+ 135 - 0
proxy/grpcproxy/watch_broadcast.go

@@ -0,0 +1,135 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+
+	"golang.org/x/net/context"
+
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// watchBroadcast broadcasts a server watcher to many client watchers.
+type watchBroadcast struct {
+	// wbs is the backpointer to all broadcasts on this range
+	wbs *watchBroadcasts
+	// cancel stops the underlying etcd server watcher and closes ch.
+	cancel context.CancelFunc
+	donec  chan struct{}
+
+	// mu protects rev and receivers.
+	mu sync.RWMutex
+	// nextrev is the minimum expected next revision of the watcher on ch.
+	nextrev int64
+	// receivers contains all the client-side watchers to serve.
+	receivers map[*watcher]struct{}
+	// responses counts the number of responses
+	responses int
+}
+
+func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
+	cctx, cancel := context.WithCancel(wp.ctx)
+	wb := &watchBroadcast{
+		cancel:    cancel,
+		nextrev:   w.nextrev,
+		receivers: make(map[*watcher]struct{}),
+		donec:     make(chan struct{}),
+	}
+	wb.add(w)
+	go func() {
+		defer close(wb.donec)
+		// loop because leader loss will close channel
+		for cctx.Err() == nil {
+			wch := wp.cw.Watch(cctx, w.wr.key,
+				clientv3.WithRange(w.wr.end),
+				clientv3.WithProgressNotify(),
+				clientv3.WithCreatedNotify(),
+				clientv3.WithRev(wb.nextrev),
+			)
+			for wr := range wch {
+				wb.bcast(wr)
+				update(wb)
+			}
+			wp.retryLimiter.Wait(cctx)
+		}
+	}()
+	return wb
+}
+
+func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) {
+	wb.mu.Lock()
+	defer wb.mu.Unlock()
+	wb.nextrev = wr.Header.Revision + 1
+	wb.responses++
+	for r := range wb.receivers {
+		r.send(wr)
+	}
+}
+
+// add puts a watcher into receiving a broadcast if its revision at least
+// meets the broadcast revision. Returns true if added.
+func (wb *watchBroadcast) add(w *watcher) bool {
+	wb.mu.Lock()
+	defer wb.mu.Unlock()
+	if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) {
+		// wb is too far ahead, w will miss events
+		// or wb is being established with a current watcher
+		return false
+	}
+	if wb.responses == 0 {
+		// Newly created; create event will be sent by etcd.
+		wb.receivers[w] = struct{}{}
+		return true
+	}
+	// already sent by etcd; emulate create event
+	ok := w.post(&pb.WatchResponse{
+		Header: &pb.ResponseHeader{
+			// todo: fill in ClusterId
+			// todo: fill in MemberId:
+			Revision: w.nextrev,
+			// todo: fill in RaftTerm:
+		},
+		WatchId: w.id,
+		Created: true,
+	})
+	if !ok {
+		return false
+	}
+	wb.receivers[w] = struct{}{}
+	return true
+}
+func (wb *watchBroadcast) delete(w *watcher) {
+	wb.mu.Lock()
+	defer wb.mu.Unlock()
+	if _, ok := wb.receivers[w]; !ok {
+		panic("deleting missing watcher from broadcast")
+	}
+	delete(wb.receivers, w)
+}
+
+func (wb *watchBroadcast) size() int {
+	wb.mu.RLock()
+	defer wb.mu.RUnlock()
+	return len(wb.receivers)
+}
+
+func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
+
+func (wb *watchBroadcast) stop() {
+	wb.cancel()
+	<-wb.donec
+}

+ 132 - 0
proxy/grpcproxy/watch_broadcasts.go

@@ -0,0 +1,132 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+)
+
+type watchBroadcasts struct {
+	wp *watchProxy
+
+	// mu protects bcasts and watchers from the coalesce loop.
+	mu       sync.Mutex
+	bcasts   map[*watchBroadcast]struct{}
+	watchers map[*watcher]*watchBroadcast
+
+	updatec chan *watchBroadcast
+	donec   chan struct{}
+}
+
+// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
+const maxCoalesceReceivers = 5
+
+func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
+	wbs := &watchBroadcasts{
+		wp:       wp,
+		bcasts:   make(map[*watchBroadcast]struct{}),
+		watchers: make(map[*watcher]*watchBroadcast),
+		updatec:  make(chan *watchBroadcast, 1),
+		donec:    make(chan struct{}),
+	}
+	go func() {
+		defer close(wbs.donec)
+		for wb := range wbs.updatec {
+			wbs.coalesce(wb)
+		}
+	}()
+	return wbs
+}
+
+func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
+	if wb.size() >= maxCoalesceReceivers {
+		return
+	}
+	wbs.mu.Lock()
+	for wbswb := range wbs.bcasts {
+		if wbswb == wb {
+			continue
+		}
+		wbswb.mu.Lock()
+		// NB: victim lock already held
+		if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 {
+			for w := range wb.receivers {
+				wbswb.receivers[w] = struct{}{}
+				wbs.watchers[w] = wbswb
+			}
+			wb.receivers = nil
+		}
+		wbswb.mu.Unlock()
+		if wb.empty() {
+			delete(wbs.bcasts, wb)
+			wb.stop()
+			break
+		}
+	}
+	wbs.mu.Unlock()
+}
+
+func (wbs *watchBroadcasts) add(w *watcher) {
+	wbs.mu.Lock()
+	defer wbs.mu.Unlock()
+	// find fitting bcast
+	for wb := range wbs.bcasts {
+		if wb.add(w) {
+			wbs.watchers[w] = wb
+			return
+		}
+	}
+	// no fit; create a bcast
+	wb := newWatchBroadcast(wbs.wp, w, wbs.update)
+	wbs.watchers[w] = wb
+	wbs.bcasts[wb] = struct{}{}
+}
+
+func (wbs *watchBroadcasts) delete(w *watcher) {
+	wbs.mu.Lock()
+	defer wbs.mu.Unlock()
+
+	wb, ok := wbs.watchers[w]
+	if !ok {
+		panic("deleting missing watcher from broadcasts")
+	}
+	delete(wbs.watchers, w)
+	wb.delete(w)
+	if wb.empty() {
+		delete(wbs.bcasts, wb)
+		wb.stop()
+	}
+}
+
+func (wbs *watchBroadcasts) empty() bool { return len(wbs.bcasts) == 0 }
+
+func (wbs *watchBroadcasts) stop() {
+	wbs.mu.Lock()
+	defer wbs.mu.Unlock()
+
+	for wb := range wbs.bcasts {
+		wb.stop()
+	}
+	wbs.bcasts = nil
+	close(wbs.updatec)
+	<-wbs.donec
+}
+
+func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
+	select {
+	case wbs.updatec <- wb:
+	default:
+	}
+}

+ 29 - 9
proxy/grpcproxy/watch_client_adapter.go

@@ -32,14 +32,27 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
 }
 
 func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
-	ch1, ch2 := make(chan interface{}), make(chan interface{})
+	// ch1 is buffered so server can send error on close
+	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
 	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
-	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
-	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}, nil}}
+
+	cctx, ccancel := context.WithCancel(ctx)
+	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
+	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}}
+
+	sctx, scancel := context.WithCancel(ctx)
+	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
+	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
 	go func() {
-		s.wserv.Watch(wserver)
-		// close the server side sender
-		close(ch1)
+		if err := s.wserv.Watch(wserver); err != nil {
+			select {
+			case srv.sendc <- err:
+			case <-sctx.Done():
+			case <-cctx.Done():
+			}
+		}
+		scancel()
+		ccancel()
 	}()
 	return wclient, nil
 }
@@ -145,9 +158,10 @@ func (s *chanClientStream) CloseSend() error {
 
 // chanStream implements grpc.Stream using channels
 type chanStream struct {
-	recvc <-chan interface{}
-	sendc chan<- interface{}
-	ctx   context.Context
+	recvc  <-chan interface{}
+	sendc  chan<- interface{}
+	ctx    context.Context
+	cancel context.CancelFunc
 }
 
 func (s *chanStream) Context() context.Context { return s.ctx }
@@ -155,6 +169,9 @@ func (s *chanStream) Context() context.Context { return s.ctx }
 func (s *chanStream) SendMsg(m interface{}) error {
 	select {
 	case s.sendc <- m:
+		if err, ok := m.(error); ok {
+			return err
+		}
 		return nil
 	case <-s.ctx.Done():
 	}
@@ -168,6 +185,9 @@ func (s *chanStream) RecvMsg(m interface{}) error {
 		if !ok {
 			return grpc.ErrClientConnClosing
 		}
+		if err, ok := msg.(error); ok {
+			return err
+		}
 		*v = msg
 		return nil
 	case <-s.ctx.Done():

+ 70 - 0
proxy/grpcproxy/watch_ranges.go

@@ -0,0 +1,70 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+)
+
+// watchRanges tracks all open watches for the proxy.
+type watchRanges struct {
+	wp *watchProxy
+
+	mu     sync.Mutex
+	bcasts map[watchRange]*watchBroadcasts
+}
+
+func newWatchRanges(wp *watchProxy) *watchRanges {
+	return &watchRanges{
+		wp:     wp,
+		bcasts: make(map[watchRange]*watchBroadcasts),
+	}
+}
+
+func (wrs *watchRanges) add(w *watcher) {
+	wrs.mu.Lock()
+	defer wrs.mu.Unlock()
+
+	if wbs := wrs.bcasts[w.wr]; wbs != nil {
+		wbs.add(w)
+		return
+	}
+	wbs := newWatchBroadcasts(wrs.wp)
+	wrs.bcasts[w.wr] = wbs
+	wbs.add(w)
+}
+
+func (wrs *watchRanges) delete(w *watcher) {
+	wrs.mu.Lock()
+	defer wrs.mu.Unlock()
+	wbs, ok := wrs.bcasts[w.wr]
+	if !ok {
+		panic("deleting missing range")
+	}
+	wbs.delete(w)
+	if wbs.empty() {
+		wbs.stop()
+		delete(wrs.bcasts, w.wr)
+	}
+}
+
+func (wrs *watchRanges) stop() {
+	wrs.mu.Lock()
+	defer wrs.mu.Unlock()
+	for _, wb := range wrs.bcasts {
+		wb.stop()
+	}
+	wrs.bcasts = nil
+}

+ 36 - 12
proxy/grpcproxy/watcher.go

@@ -28,31 +28,48 @@ type watchRange struct {
 }
 
 type watcher struct {
-	id  int64
-	wr  watchRange
-	sws *serverWatchStream
+	// user configuration
 
-	rev      int64
+	wr       watchRange
 	filters  []mvcc.FilterFunc
 	progress bool
+
+	// id is the id returned to the client on its watch stream.
+	id int64
+	// nextrev is the minimum expected next event revision.
+	nextrev int64
+	// lastHeader has the last header sent over the stream.
+	lastHeader pb.ResponseHeader
+
+	// wps is the parent.
+	wps *watchProxyStream
 }
 
+// send filters out repeated events by discarding revisions older
+// than the last one sent over the watch channel.
 func (w *watcher) send(wr clientv3.WatchResponse) {
 	if wr.IsProgressNotify() && !w.progress {
 		return
 	}
+	if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {
+		return
+	}
+	if w.nextrev == 0 {
+		// current watch; expect updates following this revision
+		w.nextrev = wr.Header.Revision + 1
+	}
 
 	events := make([]*mvccpb.Event, 0, len(wr.Events))
 
 	var lastRev int64
 	for i := range wr.Events {
 		ev := (*mvccpb.Event)(wr.Events[i])
-		if ev.Kv.ModRevision <= w.rev {
+		if ev.Kv.ModRevision < w.nextrev {
 			continue
 		} else {
 			// We cannot update w.rev here.
 			// txn can have multiple events with the same rev.
-			// If we update w.rev here, we would skip some events in the same txn.
+			// If w.nextrev updates here, it would skip events in the same txn.
 			lastRev = ev.Kv.ModRevision
 		}
 
@@ -71,8 +88,8 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 		}
 	}
 
-	if lastRev > w.rev {
-		w.rev = lastRev
+	if lastRev >= w.nextrev {
+		w.nextrev = lastRev + 1
 	}
 
 	// all events are filtered out?
@@ -80,15 +97,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 		return
 	}
 
-	pbwr := &pb.WatchResponse{
+	w.lastHeader = wr.Header
+	w.post(&pb.WatchResponse{
 		Header:  &wr.Header,
 		Created: wr.Created,
 		WatchId: w.id,
 		Events:  events,
-	}
+	})
+}
+
+// post puts a watch response on the watcher's proxy stream channel
+func (w *watcher) post(wr *pb.WatchResponse) bool {
 	select {
-	case w.sws.watchCh <- pbwr:
+	case w.wps.watchCh <- wr:
 	case <-time.After(50 * time.Millisecond):
-		w.sws.cancel()
+		w.wps.cancel()
+		return false
 	}
+	return true
 }

+ 0 - 106
proxy/grpcproxy/watcher_group.go

@@ -1,106 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package grpcproxy
-
-import (
-	"sync"
-
-	"golang.org/x/net/context"
-
-	"github.com/coreos/etcd/clientv3"
-)
-
-type watcherGroup struct {
-	// ch delievers events received from the etcd server
-	ch clientv3.WatchChan
-	// cancel is used to cancel the underlying etcd server watcher
-	// It should also close the ch.
-	cancel context.CancelFunc
-
-	mu        sync.Mutex
-	rev       int64 // current revision of the watchergroup
-	receivers map[receiverID]watcher
-
-	donec chan struct{}
-}
-
-type receiverID struct {
-	streamID, watcherID int64
-}
-
-func newWatchergroup(wch clientv3.WatchChan, c context.CancelFunc) *watcherGroup {
-	return &watcherGroup{
-		ch:     wch,
-		cancel: c,
-
-		receivers: make(map[receiverID]watcher),
-		donec:     make(chan struct{}),
-	}
-}
-
-func (wg *watcherGroup) run() {
-	defer close(wg.donec)
-	for wr := range wg.ch {
-		wg.broadcast(wr)
-	}
-}
-
-func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
-	wg.mu.Lock()
-	defer wg.mu.Unlock()
-
-	wg.rev = wr.Header.Revision
-	for _, r := range wg.receivers {
-		r.send(wr)
-	}
-}
-
-// add adds the watcher into the group with given ID.
-// The current revision of the watcherGroup is returned or -1
-// if the watcher is at a revision prior to the watcher group.
-func (wg *watcherGroup) add(rid receiverID, w watcher) int64 {
-	wg.mu.Lock()
-	defer wg.mu.Unlock()
-	if wg.rev > w.rev {
-		return -1
-	}
-	wg.receivers[rid] = w
-	return wg.rev
-}
-
-func (wg *watcherGroup) delete(rid receiverID) {
-	wg.mu.Lock()
-	defer wg.mu.Unlock()
-
-	delete(wg.receivers, rid)
-}
-
-func (wg *watcherGroup) isEmpty() bool {
-	wg.mu.Lock()
-	defer wg.mu.Unlock()
-
-	return len(wg.receivers) == 0
-}
-
-func (wg *watcherGroup) stop() {
-	wg.cancel()
-	<-wg.donec
-}
-
-func (wg *watcherGroup) revision() int64 {
-	wg.mu.Lock()
-	defer wg.mu.Unlock()
-	return wg.rev
-}

+ 0 - 50
proxy/grpcproxy/watcher_group_test.go

@@ -1,50 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package grpcproxy
-
-import (
-	"testing"
-
-	"golang.org/x/net/context"
-
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-)
-
-func TestWatchgroupBroadcast(t *testing.T) {
-	wch := make(chan clientv3.WatchResponse, 0)
-	wg := newWatchergroup(wch, nil)
-	go wg.run()
-
-	chs := make([]chan *pb.WatchResponse, 10)
-	for i := range chs {
-		chs[i] = make(chan *pb.WatchResponse, 1)
-		w := watcher{
-			id:  int64(i),
-			sws: &serverWatchStream{watchCh: chs[i], ctx: context.TODO()},
-
-			progress: true,
-		}
-		rid := receiverID{streamID: 1, watcherID: w.id}
-		wg.add(rid, w)
-	}
-
-	// send a progress response
-	wch <- clientv3.WatchResponse{Header: pb.ResponseHeader{Revision: 1}}
-
-	for _, ch := range chs {
-		<-ch
-	}
-}

+ 0 - 128
proxy/grpcproxy/watcher_groups.go

@@ -1,128 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package grpcproxy
-
-import (
-	"sync"
-
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
-	"golang.org/x/net/context"
-)
-
-type watchergroups struct {
-	cw clientv3.Watcher
-
-	mu        sync.Mutex
-	groups    map[watchRange]*watcherGroup
-	idToGroup map[receiverID]*watcherGroup
-
-	proxyCtx context.Context
-}
-
-func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
-	wgs.mu.Lock()
-	defer wgs.mu.Unlock()
-
-	groups := wgs.groups
-
-	if wg, ok := groups[w.wr]; ok {
-		rev := wg.add(rid, w)
-		wgs.idToGroup[rid] = wg
-
-		if rev == 0 {
-			// The group is newly created, the create event has not been delivered
-			// to this group yet.
-			// We can rely on etcd server to deliver the create event.
-			// Or we might end up sending created event twice.
-			return
-		}
-
-		resp := &pb.WatchResponse{
-			Header: &pb.ResponseHeader{
-				// todo: fill in ClusterId
-				// todo: fill in MemberId:
-				Revision: rev,
-				// todo: fill in RaftTerm:
-			},
-			WatchId: rid.watcherID,
-			Created: true,
-		}
-		select {
-		case w.sws.watchCh <- resp:
-		case <-w.sws.ctx.Done():
-		}
-		return
-	}
-
-	ctx, cancel := context.WithCancel(wgs.proxyCtx)
-
-	wch := wgs.cw.Watch(ctx, w.wr.key,
-		clientv3.WithRange(w.wr.end),
-		clientv3.WithProgressNotify(),
-		clientv3.WithCreatedNotify(),
-	)
-
-	watchg := newWatchergroup(wch, cancel)
-	watchg.add(rid, w)
-	go watchg.run()
-	groups[w.wr] = watchg
-	wgs.idToGroup[rid] = watchg
-}
-
-func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) {
-	wgs.mu.Lock()
-	defer wgs.mu.Unlock()
-
-	if g, ok := wgs.idToGroup[rid]; ok {
-		g.delete(rid)
-		delete(wgs.idToGroup, rid)
-		if g.isEmpty() {
-			g.stop()
-		}
-		return g.revision(), true
-	}
-	return -1, false
-}
-
-func (wgs *watchergroups) maybeJoinWatcherSingle(ws watcherSingle) bool {
-	wgs.mu.Lock()
-	defer wgs.mu.Unlock()
-
-	rid := receiverID{streamID: ws.sws.id, watcherID: ws.w.id}
-	group, ok := wgs.groups[ws.w.wr]
-	if ok {
-		return group.add(rid, ws.w) != -1
-	}
-	if !ws.canPromote() {
-		return false
-	}
-	wg := newWatchergroup(ws.ch, ws.cancel)
-	wgs.groups[ws.w.wr] = wg
-	wgs.idToGroup[rid] = wg
-	wg.add(rid, ws.w)
-	go wg.run()
-	return true
-}
-
-func (wgs *watchergroups) stop() {
-	wgs.mu.Lock()
-	defer wgs.mu.Unlock()
-	for _, wg := range wgs.groups {
-		wg.stop()
-	}
-	wgs.groups = nil
-}

+ 0 - 73
proxy/grpcproxy/watcher_single.go

@@ -1,73 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package grpcproxy
-
-import (
-	"github.com/coreos/etcd/clientv3"
-	"golang.org/x/net/context"
-)
-
-type watcherSingle struct {
-	// ch delievers events received from the etcd server
-	ch clientv3.WatchChan
-	// cancel is used to cancel the underlying etcd server watcher
-	// It should also close the ch.
-	cancel context.CancelFunc
-
-	// sws is the stream this watcherSingle attached to
-	sws *serverWatchStream
-
-	w watcher
-
-	lastStoreRev int64 // last seen revision of the remote mvcc store
-
-	donec chan struct{}
-}
-
-func newWatcherSingle(wch clientv3.WatchChan, c context.CancelFunc, w watcher, sws *serverWatchStream) *watcherSingle {
-	return &watcherSingle{
-		sws: sws,
-
-		ch:     wch,
-		cancel: c,
-
-		w:     w,
-		donec: make(chan struct{}),
-	}
-}
-
-func (ws watcherSingle) run() {
-	defer close(ws.donec)
-
-	for wr := range ws.ch {
-		ws.lastStoreRev = wr.Header.Revision
-		ws.w.send(wr)
-		if ws.sws.maybeCoalesceWatcher(ws) {
-			return
-		}
-	}
-}
-
-// canPromote returns true if a watcherSingle can promote itself to a watchergroup
-// when it already caught up with the last seen revision from the response header
-// of an etcd server.
-func (ws watcherSingle) canPromote() bool {
-	return ws.w.rev == ws.lastStoreRev
-}
-
-func (ws watcherSingle) stop() {
-	ws.cancel()
-	<-ws.donec
-}