Browse Source

Merge pull request #3800 from xiang90/watch_server

*: serve watch service
Xiang Li 10 năm trước cách đây
mục cha
commit
1f1d8e9282
4 tập tin đã thay đổi với 83 bổ sung0 xóa
  1. 1 0
      etcdmain/etcd.go
  2. 73 0
      etcdserver/api/v3rpc/watch.go
  3. 5 0
      etcdserver/v3demo_server.go
  4. 4 0
      storage/kv.go

+ 1 - 0
etcdmain/etcd.go

@@ -322,6 +322,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		// set up v3 demo rpc
 		grpcServer := grpc.NewServer()
 		etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s))
+		etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
 		go plog.Fatal(grpcServer.Serve(v3l))
 	}
 

+ 73 - 0
etcdserver/api/v3rpc/watch.go

@@ -0,0 +1,73 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3rpc
+
+import (
+	"io"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage"
+)
+
+type watchServer struct {
+	watchable storage.Watchable
+}
+
+func NewWatchServer(w storage.Watchable) pb.WatchServer {
+	return &watchServer{w}
+}
+
+func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
+	closec := make(chan struct{})
+	defer close(closec)
+
+	watcher := ws.watchable.NewWatcher()
+	defer watcher.Close()
+
+	go sendLoop(stream, watcher, closec)
+
+	for {
+		req, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		var prefix bool
+		toWatch := req.Key
+		if len(req.Key) == 0 {
+			toWatch = req.Prefix
+			prefix = true
+		}
+		// TODO: support cancellation
+		watcher.Watch(toWatch, prefix, req.StartRevision)
+	}
+}
+
+func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
+	for {
+		select {
+		case e := <-watcher.Chan():
+			err := stream.Send(&pb.WatchResponse{Event: &e})
+			if err != nil {
+				return
+			}
+		case <-closec:
+			return
+		}
+	}
+}

+ 5 - 0
etcdserver/v3demo_server.go

@@ -55,6 +55,11 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr
 	}
 }
 
+// Watcable returns a watchable interface attached to the etcdserver.
+func (s *EtcdServer) Watchable() dstorage.Watchable {
+	return s.kv
+}
+
 func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
 	ar := &applyResult{}
 

+ 4 - 0
storage/kv.go

@@ -79,7 +79,11 @@ type KV interface {
 // WatchableKV is a KV that can be watched.
 type WatchableKV interface {
 	KV
+	Watchable
+}
 
+// Watchable is the interface that wraps the NewWatcher function.
+type Watchable interface {
 	// NewWatcher returns a Watcher that can be used to
 	// watch events happened or happending on the KV.
 	NewWatcher() Watcher