Browse Source

Merge pull request #4138 from gyuho/watchresponse_header

*: fill in WatchResponse.Header
Gyu-Ho Lee 10 years ago
parent
commit
c70d533771

+ 1 - 1
etcdmain/etcd.go

@@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		// set up v3 demo rpc
 		grpcServer := grpc.NewServer()
 		etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
-		etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
+		etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
 		go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
 	}
 

+ 32 - 5
etcdserver/api/v3rpc/watch.go

@@ -17,17 +17,26 @@ package v3rpc
 import (
 	"io"
 
+	"github.com/coreos/etcd/etcdserver"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 type watchServer struct {
+	clusterID int64
+	memberID  int64
+	raftTimer etcdserver.RaftTimer
 	watchable storage.Watchable
 }
 
-func NewWatchServer(w storage.Watchable) pb.WatchServer {
-	return &watchServer{w}
+func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
+	return &watchServer{
+		clusterID: int64(s.Cluster().ID()),
+		memberID:  int64(s.ID()),
+		raftTimer: s,
+		watchable: s.Watchable(),
+	}
 }
 
 const (
@@ -44,6 +53,10 @@ const (
 // and creates responses that forwarded to gRPC stream.
 // It also forwards control message like watch created and canceled.
 type serverWatchStream struct {
+	clusterID int64
+	memberID  int64
+	raftTimer etcdserver.RaftTimer
+
 	gRPCStream  pb.Watch_WatchServer
 	watchStream storage.WatchStream
 	ctrlStream  chan *pb.WatchResponse
@@ -54,6 +67,9 @@ type serverWatchStream struct {
 
 func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
 	sws := serverWatchStream{
+		clusterID:   ws.clusterID,
+		memberID:    ws.memberID,
+		raftTimer:   ws.raftTimer,
 		gRPCStream:  stream,
 		watchStream: ws.watchable.NewWatchStream(),
 		// chan for sending control response like watcher created and canceled.
@@ -87,7 +103,7 @@ func (sws *serverWatchStream) recvLoop() error {
 			}
 			id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
 			sws.ctrlStream <- &pb.WatchResponse{
-				// TODO: fill in response header.
+				Header:  sws.newResponseHeader(sws.watchStream.Rev()),
 				WatchId: int64(id),
 				Created: true,
 			}
@@ -96,7 +112,7 @@ func (sws *serverWatchStream) recvLoop() error {
 			err := sws.watchStream.Cancel(storage.WatchID(id))
 			if err == nil {
 				sws.ctrlStream <- &pb.WatchResponse{
-					// TODO: fill in response header.
+					Header:   sws.newResponseHeader(sws.watchStream.Rev()),
 					WatchId:  id,
 					Canceled: true,
 				}
@@ -126,8 +142,10 @@ func (sws *serverWatchStream) sendLoop() {
 			}
 
 			err := sws.gRPCStream.Send(&pb.WatchResponse{
+				Header:  sws.newResponseHeader(wresp.Revision),
 				WatchId: int64(wresp.WatchID),
-				Events:  events})
+				Events:  events,
+			})
 			storage.ReportEventReceived()
 			if err != nil {
 				return
@@ -160,3 +178,12 @@ func (sws *serverWatchStream) close() {
 	close(sws.closec)
 	close(sws.ctrlStream)
 }
+
+func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
+	return &pb.ResponseHeader{
+		ClusterId: uint64(sws.clusterID),
+		MemberId:  uint64(sws.memberID),
+		Revision:  rev,
+		RaftTerm:  sws.raftTimer.Term(),
+	}
+}

+ 1 - 1
etcdserver/etcdserverpb/rpc.proto

@@ -235,7 +235,7 @@ message WatchResponse {
   // catch up with the progress of the KV.
   //
   // Client should treat the watching as canceled and should not try to create any
-  // watching with same start_revision again. 
+  // watching with same start_revision again.
   bool compacted = 5;
 
   repeated storagepb.Event events = 11;

+ 6 - 4
storage/watchable_store.go

@@ -36,6 +36,7 @@ const (
 
 type watchable interface {
 	watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
+	rev() int64
 }
 
 type watchableStore struct {
@@ -346,9 +347,9 @@ func (s *watchableStore) syncWatchers() {
 	}
 
 	for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
-		wr := WatchResponse{WatchID: w.id, Events: es}
 		select {
-		case w.ch <- wr:
+		// s.store.Rev also uses Lock, so just return directly
+		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
 			pendingEventsGauge.Add(float64(len(es)))
 		default:
 			// TODO: handle the full unsynced watchers.
@@ -381,9 +382,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 			if !ok {
 				continue
 			}
-			wr := WatchResponse{WatchID: w.id, Events: es}
 			select {
-			case w.ch <- wr:
+			case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}:
 				pendingEventsGauge.Add(float64(len(es)))
 			default:
 				// move slow watcher to unsynced
@@ -396,6 +396,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 	}
 }
 
+func (s *watchableStore) rev() int64 { return s.store.Rev() }
+
 type ongoingTx struct {
 	// keys put/deleted in the ongoing txn
 	putm map[string]struct{}

+ 17 - 0
storage/watcher.go

@@ -49,13 +49,24 @@ type WatchStream interface {
 
 	// Close closes the WatchChan and release all related resources.
 	Close()
+
+	// Rev returns the current revision of the KV the stream watches on.
+	Rev() int64
 }
 
 type WatchResponse struct {
 	// WatchID is the WatchID of the watcher this response sent to.
 	WatchID WatchID
+
 	// Events contains all the events that needs to send.
 	Events []storagepb.Event
+
+	// Revision is the revision of the KV when the watchResponse is created.
+	// For a normal response, the revision should be the same as the last
+	// modified revision inside Events. For a delayed response to a unsynced
+	// watcher, the revision is greater than the last modified revision
+	// inside Events.
+	Revision int64
 }
 
 // watchStream contains a collection of watchers that share
@@ -113,3 +124,9 @@ func (ws *watchStream) Close() {
 	close(ws.ch)
 	watchStreamGauge.Dec()
 }
+
+func (ws *watchStream) Rev() int64 {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	return ws.watchable.rev()
+}