Selaa lähdekoodia

v3api: send watch events only after sending watchid creation

If events show up before the watch id, the client won't be able
to match the event with the requested watcher.
Anthony Romano 10 vuotta sitten
vanhempi
commit
59e7be4a2a
1 muutettua tiedostoa jossa 39 lisäystä ja 8 poistoa
  1. 39 8
      etcdserver/api/v3rpc/watch.go

+ 39 - 8
etcdserver/api/v3rpc/watch.go

@@ -129,6 +129,11 @@ func (sws *serverWatchStream) recvLoop() error {
 }
 }
 
 
 func (sws *serverWatchStream) sendLoop() {
 func (sws *serverWatchStream) sendLoop() {
+	// watch ids that are currently active
+	ids := make(map[storage.WatchID]struct{})
+	// watch responses pending on a watch id creation message
+	pending := make(map[storage.WatchID][]*pb.WatchResponse)
+
 	for {
 	for {
 		select {
 		select {
 		case wresp, ok := <-sws.watchStream.Chan():
 		case wresp, ok := <-sws.watchStream.Chan():
@@ -145,14 +150,22 @@ func (sws *serverWatchStream) sendLoop() {
 				events[i] = &evs[i]
 				events[i] = &evs[i]
 			}
 			}
 
 
-			err := sws.gRPCStream.Send(&pb.WatchResponse{
+			wr := &pb.WatchResponse{
 				Header:          sws.newResponseHeader(wresp.Revision),
 				Header:          sws.newResponseHeader(wresp.Revision),
 				WatchId:         int64(wresp.WatchID),
 				WatchId:         int64(wresp.WatchID),
 				Events:          events,
 				Events:          events,
 				CompactRevision: wresp.CompactRevision,
 				CompactRevision: wresp.CompactRevision,
-			})
+			}
+
+			if _, hasId := ids[wresp.WatchID]; !hasId {
+				// buffer if id not yet announced
+				wrs := append(pending[wresp.WatchID], wr)
+				pending[wresp.WatchID] = wrs
+				continue
+			}
+
 			storage.ReportEventReceived()
 			storage.ReportEventReceived()
-			if err != nil {
+			if err := sws.gRPCStream.Send(wr); err != nil {
 				return
 				return
 			}
 			}
 
 
@@ -165,15 +178,33 @@ func (sws *serverWatchStream) sendLoop() {
 				return
 				return
 			}
 			}
 
 
+			// track id creation
+			wid := storage.WatchID(c.WatchId)
+			if c.Canceled {
+				delete(ids, wid)
+				continue
+			}
+			if c.Created {
+				// flush buffered events
+				ids[wid] = struct{}{}
+				for _, v := range pending[wid] {
+					storage.ReportEventReceived()
+					if err := sws.gRPCStream.Send(v); err != nil {
+						return
+					}
+				}
+				delete(pending, wid)
+			}
 		case <-sws.closec:
 		case <-sws.closec:
 			// drain the chan to clean up pending events
 			// drain the chan to clean up pending events
-			for {
-				_, ok := <-sws.watchStream.Chan()
-				if !ok {
-					return
-				}
+			for range sws.watchStream.Chan() {
 				storage.ReportEventReceived()
 				storage.ReportEventReceived()
 			}
 			}
+			for _, wrs := range pending {
+				for range wrs {
+					storage.ReportEventReceived()
+				}
+			}
 		}
 		}
 	}
 	}
 }
 }