Browse Source

Merge pull request #4118 from xiang90/ctrl_w

v3api: add support for sending watcher control response
Xiang Li 10 years ago
parent
commit
94ac9ae2da
2 changed files with 70 additions and 16 deletions
  1. 9 3
      etcdctlv3/command/watch_command.go
  2. 61 13
      etcdserver/api/v3rpc/watch.go

+ 9 - 3
etcdctlv3/command/watch_command.go

@@ -98,9 +98,15 @@ func recvLoop(wStream pb.Watch_WatchClient) {
 		if err != nil {
 			ExitWithError(ExitError, err)
 		}
-		evs := resp.Events
-		for _, ev := range evs {
-			fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
+
+		switch {
+		// TODO: handle canceled/compacted and other control response types
+		case resp.Created:
+			fmt.Printf("watcher created: id %08x\n", resp.WatchId)
+		default:
+			for _, ev := range resp.Events {
+				fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
+			}
 		}
 	}
 }

+ 61 - 13
etcdserver/api/v3rpc/watch.go

@@ -30,17 +30,45 @@ func NewWatchServer(w storage.Watchable) pb.WatchServer {
 	return &watchServer{w}
 }
 
-func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
-	closec := make(chan struct{})
-	defer close(closec)
+const (
+	// We send ctrl response inside the read loop. We do not want
+	// send to block read, but we still want ctrl response we sent to
+	// be serialized. Thus we use a buffered chan to solve the problem.
+	// A small buffer should be OK for most cases, since we expect the
+	// ctrl requests are infrequent.
+	ctrlStreamBufLen = 16
+)
 
-	watchStream := ws.watchable.NewWatchStream()
-	defer watchStream.Close()
+// serverWatchStream is an etcd server side stream. It receives requests
+// from client side gRPC stream. It receives watch events from storage.WatchStream,
+// and creates responses that forwarded to gRPC stream.
+// It also forwards control message like watch created and canceled.
+type serverWatchStream struct {
+	gRPCStream  pb.Watch_WatchServer
+	watchStream storage.WatchStream
+	ctrlStream  chan *pb.WatchResponse
 
-	go sendLoop(stream, watchStream, closec)
+	// closec indicates the stream is closed.
+	closec chan struct{}
+}
 
+func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
+	sws := serverWatchStream{
+		gRPCStream:  stream,
+		watchStream: ws.watchable.NewWatchStream(),
+		// chan for sending control response like watcher created and canceled.
+		ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
+		closec:     make(chan struct{}),
+	}
+	defer sws.close()
+
+	go sws.sendLoop()
+	return sws.recvLoop()
+}
+
+func (sws *serverWatchStream) recvLoop() error {
 	for {
-		req, err := stream.Recv()
+		req, err := sws.gRPCStream.Recv()
 		if err == io.EOF {
 			return nil
 		}
@@ -57,7 +85,12 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
 				toWatch = creq.Prefix
 				prefix = true
 			}
-			watchStream.Watch(toWatch, prefix, creq.StartRevision)
+			id, _ := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
+			sws.ctrlStream <- &pb.WatchResponse{
+				// TODO: fill in response header.
+				WatchId: id,
+				Created: true,
+			}
 		default:
 			// TODO: support cancellation
 			panic("not implemented")
@@ -65,10 +98,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
 	}
 }
 
-func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, closec chan struct{}) {
+func (sws *serverWatchStream) sendLoop() {
 	for {
 		select {
-		case evs, ok := <-watchStream.Chan():
+		case evs, ok := <-sws.watchStream.Chan():
 			if !ok {
 				return
 			}
@@ -81,16 +114,25 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos
 				events[i] = &evs[i]
 			}
 
-			err := stream.Send(&pb.WatchResponse{Events: events})
+			err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events})
 			storage.ReportEventReceived()
 			if err != nil {
 				return
 			}
 
-		case <-closec:
+		case c, ok := <-sws.ctrlStream:
+			if !ok {
+				return
+			}
+
+			if err := sws.gRPCStream.Send(c); err != nil {
+				return
+			}
+
+		case <-sws.closec:
 			// drain the chan to clean up pending events
 			for {
-				_, ok := <-watchStream.Chan()
+				_, ok := <-sws.watchStream.Chan()
 				if !ok {
 					return
 				}
@@ -99,3 +141,9 @@ func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, clos
 		}
 	}
 }
+
+func (sws *serverWatchStream) close() {
+	sws.watchStream.Close()
+	close(sws.closec)
+	close(sws.ctrlStream)
+}