Browse Source

grpcproxy: initial watch proxy

Xiang Li 9 years ago
parent
commit
5f3aa43899

+ 2 - 0
etcdmain/grpc_proxy.go

@@ -88,9 +88,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	}
 
 	kvp := grpcproxy.NewKvProxy(client)
+	watchp := grpcproxy.NewWatchProxy(client)
 
 	server := grpc.NewServer()
 	pb.RegisterKVServer(server, kvp)
+	pb.RegisterWatchServer(server, watchp)
 
 	server.Serve(l)
 }

+ 206 - 0
proxy/grpcproxy/watch.go

@@ -0,0 +1,206 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"io"
+	"sync"
+
+	"golang.org/x/net/context"
+
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type watchProxy struct {
+	c   *clientv3.Client
+	wgs watchergroups
+
+	mu           sync.Mutex
+	nextStreamID int64
+}
+
+func NewWatchProxy(c *clientv3.Client) *watchProxy {
+	return &watchProxy{
+		c: c,
+		wgs: watchergroups{
+			c:      c,
+			groups: make(map[watchRange]*watcherGroup),
+		},
+	}
+}
+
+func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
+	wp.mu.Lock()
+	wp.nextStreamID++
+	wp.mu.Unlock()
+
+	sws := serverWatchStream{
+		c:      wp.c,
+		groups: wp.wgs,
+
+		id:         wp.nextStreamID,
+		gRPCStream: stream,
+
+		ctrlCh:  make(chan *pb.WatchResponse, 10),
+		watchCh: make(chan *pb.WatchResponse, 10),
+	}
+
+	go sws.recvLoop()
+
+	sws.sendLoop()
+
+	return nil
+}
+
+type serverWatchStream struct {
+	id int64
+	c  *clientv3.Client
+
+	mu      sync.Mutex // make sure any access of groups and singles is atomic
+	groups  watchergroups
+	singles map[int64]*watcherSingle
+
+	gRPCStream pb.Watch_WatchServer
+
+	ctrlCh  chan *pb.WatchResponse
+	watchCh chan *pb.WatchResponse
+
+	nextWatcherID int64
+}
+
+func (sws *serverWatchStream) recvLoop() error {
+	for {
+		req, err := sws.gRPCStream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		switch uv := req.RequestUnion.(type) {
+		case *pb.WatchRequest_CreateRequest:
+			cr := uv.CreateRequest
+
+			watcher := watcher{
+				wr: watchRange{
+					key: string(cr.Key),
+					end: string(cr.RangeEnd),
+				},
+				id: sws.nextWatcherID,
+				ch: sws.watchCh,
+
+				progress: cr.ProgressNotify,
+			}
+			if cr.StartRevision != 0 {
+				sws.addDedicatedWatcher(watcher, cr.StartRevision)
+			} else {
+				sws.addCoalescedWatcher(watcher)
+			}
+
+			wresp := &pb.WatchResponse{
+				Header:  &pb.ResponseHeader{}, // TODO: fill in header
+				WatchId: sws.nextWatcherID,
+				Created: true,
+			}
+
+			sws.nextWatcherID++
+			select {
+			case sws.ctrlCh <- wresp:
+			default:
+				panic("handle this")
+			}
+
+		case *pb.WatchRequest_CancelRequest:
+			sws.removeWatcher(uv.CancelRequest.WatchId)
+		default:
+			panic("not implemented")
+		}
+	}
+}
+
+func (sws *serverWatchStream) sendLoop() {
+	for {
+		select {
+		case wresp, ok := <-sws.watchCh:
+			if !ok {
+				return
+			}
+			if err := sws.gRPCStream.Send(wresp); err != nil {
+				return
+			}
+
+		case c, ok := <-sws.ctrlCh:
+			if !ok {
+				return
+			}
+			if err := sws.gRPCStream.Send(c); err != nil {
+				return
+			}
+		}
+	}
+}
+
+func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
+	sws.mu.Lock()
+	defer sws.mu.Unlock()
+
+	rid := receiverID{streamID: sws.id, watcherID: w.id}
+	sws.groups.addWatcher(rid, w)
+}
+
+func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
+	sws.mu.Lock()
+	defer sws.mu.Unlock()
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	wch := sws.c.Watch(ctx,
+		w.wr.key, clientv3.WithRange(w.wr.end),
+		clientv3.WithRev(rev),
+		clientv3.WithProgressNotify(),
+	)
+
+	ws := newWatcherSingle(wch, cancel, w, sws)
+	sws.singles[w.id] = ws
+	go ws.run()
+}
+
+func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
+	sws.mu.Lock()
+	defer sws.mu.Unlock()
+
+	rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
+	if sws.groups.maybeJoinWatcherSingle(rid, ws) {
+		delete(sws.singles, ws.w.id)
+		return true
+	}
+	return false
+}
+
+func (sws *serverWatchStream) removeWatcher(id int64) {
+	sws.mu.Lock()
+	defer sws.mu.Unlock()
+
+	if sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) {
+		return
+	}
+
+	if ws, ok := sws.singles[id]; ok {
+		delete(sws.singles, id)
+		ws.stop()
+	}
+}

+ 57 - 0
proxy/grpcproxy/watcher.go

@@ -0,0 +1,57 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+type watchRange struct {
+	key, end string
+}
+
+type watcher struct {
+	id int64
+	wr watchRange
+	// TODO: support filter
+	progress bool
+	ch       chan<- *pb.WatchResponse
+}
+
+func (w *watcher) send(wr clientv3.WatchResponse) {
+	if wr.IsProgressNotify() && !w.progress {
+		return
+	}
+
+	// todo: filter out the events that this watcher already seen.
+
+	evs := wr.Events
+	events := make([]*mvccpb.Event, len(evs))
+	for i := range evs {
+		events[i] = (*mvccpb.Event)(evs[i])
+	}
+	pbwr := &pb.WatchResponse{
+		Header:  &wr.Header,
+		WatchId: w.id,
+		Events:  events,
+	}
+	select {
+	case w.ch <- pbwr:
+	default:
+		panic("handle this")
+	}
+}

+ 94 - 0
proxy/grpcproxy/watcher_group.go

@@ -0,0 +1,94 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+
+	"golang.org/x/net/context"
+
+	"github.com/coreos/etcd/clientv3"
+)
+
+type watcherGroup struct {
+	// ch delievers events received from the etcd server
+	ch clientv3.WatchChan
+	// cancel is used to cancel the underlying etcd server watcher
+	// It should also close the ch.
+	cancel context.CancelFunc
+
+	mu        sync.Mutex
+	rev       int64 // current revision of the watchergroup
+	receivers map[receiverID]watcher
+
+	donec chan struct{}
+}
+
+type receiverID struct {
+	streamID, watcherID int64
+}
+
+func newWatchergroup(wch clientv3.WatchChan, c context.CancelFunc) *watcherGroup {
+	return &watcherGroup{
+		ch:     wch,
+		cancel: c,
+
+		receivers: make(map[receiverID]watcher),
+		donec:     make(chan struct{}),
+	}
+}
+
+func (wg *watcherGroup) run() {
+	defer close(wg.donec)
+	for wr := range wg.ch {
+		wg.broadcast(wr)
+	}
+}
+
+func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+
+	wg.rev = wr.Header.Revision
+	for _, r := range wg.receivers {
+		r.send(wr)
+	}
+}
+
+func (wg *watcherGroup) add(rid receiverID, w watcher) {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+
+	wg.receivers[rid] = w
+}
+
+func (wg *watcherGroup) delete(rid receiverID) {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+
+	delete(wg.receivers, rid)
+}
+
+func (wg *watcherGroup) isEmpty() bool {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+
+	return len(wg.receivers) == 0
+}
+
+func (wg *watcherGroup) stop() {
+	wg.cancel()
+	<-wg.donec
+}

+ 48 - 0
proxy/grpcproxy/watcher_group_test.go

@@ -0,0 +1,48 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"testing"
+
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+func TestWatchgroupBroadcast(t *testing.T) {
+	wch := make(chan clientv3.WatchResponse, 0)
+	wg := newWatchergroup(wch, nil)
+	go wg.run()
+
+	chs := make([]chan *pb.WatchResponse, 10)
+	for i := range chs {
+		chs[i] = make(chan *pb.WatchResponse, 1)
+		w := watcher{
+			id: int64(i),
+			ch: chs[i],
+
+			progress: true,
+		}
+		rid := receiverID{streamID: 1, watcherID: w.id}
+		wg.add(rid, w)
+	}
+
+	// send a progress response
+	wch <- clientv3.WatchResponse{}
+
+	for _, ch := range chs {
+		<-ch
+	}
+}

+ 88 - 0
proxy/grpcproxy/watcher_groups.go

@@ -0,0 +1,88 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/clientv3"
+	"golang.org/x/net/context"
+)
+
+type watchergroups struct {
+	c *clientv3.Client
+
+	mu        sync.Mutex
+	groups    map[watchRange]*watcherGroup
+	idToGroup map[receiverID]*watcherGroup
+}
+
+func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
+	wgs.mu.Lock()
+	defer wgs.mu.Unlock()
+
+	groups := wgs.groups
+
+	if wg, ok := groups[w.wr]; ok {
+		wg.add(rid, w)
+		return
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	wch := wgs.c.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
+	watchg := newWatchergroup(wch, cancel)
+	watchg.add(rid, w)
+	go watchg.run()
+	groups[w.wr] = watchg
+}
+
+func (wgs *watchergroups) removeWatcher(rid receiverID) bool {
+	wgs.mu.Lock()
+	defer wgs.mu.Unlock()
+
+	if g, ok := wgs.idToGroup[rid]; ok {
+		g.delete(rid)
+		if g.isEmpty() {
+			g.stop()
+		}
+		return true
+	}
+	return false
+}
+
+func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {
+	wgs.mu.Lock()
+	defer wgs.mu.Unlock()
+
+	gropu, ok := wgs.groups[ws.w.wr]
+	if ok {
+		if ws.rev >= gropu.rev {
+			gropu.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w)
+			return true
+		}
+		return false
+	}
+
+	if ws.canPromote() {
+		wg := newWatchergroup(ws.ch, ws.cancel)
+		wgs.groups[ws.w.wr] = wg
+		wg.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w)
+		go wg.run()
+		return true
+	}
+
+	return false
+}

+ 75 - 0
proxy/grpcproxy/watcher_single.go

@@ -0,0 +1,75 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	"golang.org/x/net/context"
+)
+
+type watcherSingle struct {
+	// ch delievers events received from the etcd server
+	ch clientv3.WatchChan
+	// cancel is used to cancel the underlying etcd server watcher
+	// It should also close the ch.
+	cancel context.CancelFunc
+
+	// sws is the stream this watcherSingle attached to
+	sws *serverWatchStream
+
+	w watcher
+
+	rev         int64 // current revision
+	lastSeenRev int64
+
+	donec chan struct{}
+}
+
+func newWatcherSingle(wch clientv3.WatchChan, c context.CancelFunc, w watcher, sws *serverWatchStream) *watcherSingle {
+	return &watcherSingle{
+		sws: sws,
+
+		ch:     wch,
+		cancel: c,
+
+		w:     w,
+		donec: make(chan struct{}),
+	}
+}
+
+func (ws watcherSingle) run() {
+	defer close(ws.donec)
+
+	for wr := range ws.ch {
+		ws.rev = wr.Header.Revision
+		ws.w.send(wr)
+		ws.lastSeenRev = wr.Events[len(wr.Events)-1].Kv.ModRevision
+
+		if ws.sws.maybeCoalesceWatcher(ws) {
+			return
+		}
+	}
+}
+
+// canPromote returns true if a watcherSingle can promote itself to a watchergroup
+// when it already caught up with the current revision.
+func (ws watcherSingle) canPromote() bool {
+	return ws.rev == ws.lastSeenRev
+}
+
+func (ws watcherSingle) stop() {
+	ws.cancel()
+	<-ws.donec
+}