Browse Source

clientv3: support watch events fragmentation

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
d2c8408216
1 changed files with 46 additions and 3 deletions
  1. 46 3
      clientv3/watch.go

+ 46 - 3
clientv3/watch.go

@@ -174,10 +174,16 @@ type watchRequest struct {
 	key string
 	end string
 	rev int64
+
 	// send created notification event if this field is true
 	createdNotify bool
 	// progressNotify is for progress updates
 	progressNotify bool
+	// fragmentation should be disabled by default
+	// if true, split watch events when total exceeds
+	// "--max-request-bytes" flag value + 512-byte
+	fragment bool
+
 	// filters is the list of events to filter out
 	filters []pb.WatchCreateRequest_FilterType
 	// get the previous key-value pair before the event happens
@@ -272,6 +278,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 		end:            string(ow.end),
 		rev:            ow.rev,
 		progressNotify: ow.progressNotify,
+		fragment:       ow.fragment,
 		filters:        filters,
 		prevKV:         ow.prevKV,
 		retc:           make(chan chan WatchResponse, 1),
@@ -451,6 +458,7 @@ func (w *watchGrpcStream) run() {
 
 	cancelSet := make(map[int64]struct{})
 
+	var cur *pb.WatchResponse
 	for {
 		select {
 		// Watch() requested
@@ -475,8 +483,18 @@ func (w *watchGrpcStream) run() {
 				// head of resume queue, can register a new watcher
 				wc.Send(ws.initReq.toPB())
 			}
-		// New events from the watch client
+
+		// new events from the watch client
 		case pbresp := <-w.respc:
+			if cur == nil || pbresp.Created || pbresp.Canceled {
+				cur = pbresp
+			} else if cur != nil && cur.WatchId == pbresp.WatchId {
+				// merge new events
+				cur.Events = append(cur.Events, pbresp.Events...)
+				// update "Fragment" field; last response with "Fragment" == false
+				cur.Fragment = pbresp.Fragment
+			}
+
 			switch {
 			case pbresp.Created:
 				// response to head of queue creation
@@ -485,9 +503,14 @@ func (w *watchGrpcStream) run() {
 					w.dispatchEvent(pbresp)
 					w.resuming[0] = nil
 				}
+
 				if ws := w.nextResume(); ws != nil {
 					wc.Send(ws.initReq.toPB())
 				}
+
+				// reset for next iteration
+				cur = nil
+
 			case pbresp.Canceled && pbresp.CompactRevision == 0:
 				delete(cancelSet, pbresp.WatchId)
 				if ws, ok := w.substreams[pbresp.WatchId]; ok {
@@ -495,15 +518,31 @@ func (w *watchGrpcStream) run() {
 					close(ws.recvc)
 					closing[ws] = struct{}{}
 				}
+
+				// reset for next iteration
+				cur = nil
+
+			case cur.Fragment:
+				// watch response events are still fragmented
+				// continue to fetch next fragmented event arrival
+				continue
+
 			default:
 				// dispatch to appropriate watch stream
-				if ok := w.dispatchEvent(pbresp); ok {
+				ok := w.dispatchEvent(cur)
+
+				// reset for next iteration
+				cur = nil
+
+				if ok {
 					break
 				}
+
 				// watch response on unexpected watch id; cancel id
 				if _, ok := cancelSet[pbresp.WatchId]; ok {
 					break
 				}
+
 				cancelSet[pbresp.WatchId] = struct{}{}
 				cr := &pb.WatchRequest_CancelRequest{
 					CancelRequest: &pb.WatchCancelRequest{
@@ -513,6 +552,7 @@ func (w *watchGrpcStream) run() {
 				req := &pb.WatchRequest{RequestUnion: cr}
 				wc.Send(req)
 			}
+
 		// watch client failed on Recv; spawn another if possible
 		case err := <-w.errc:
 			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
@@ -526,13 +566,15 @@ func (w *watchGrpcStream) run() {
 				wc.Send(ws.initReq.toPB())
 			}
 			cancelSet = make(map[int64]struct{})
+
 		case <-w.ctx.Done():
 			return
+
 		case ws := <-w.closingc:
 			w.closeSubstream(ws)
 			delete(closing, ws)
+			// no more watchers on this stream, shutdown
 			if len(w.substreams)+len(w.resuming) == 0 {
-				// no more watchers on this stream, shutdown
 				return
 			}
 		}
@@ -820,6 +862,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
 		ProgressNotify: wr.progressNotify,
 		Filters:        wr.filters,
 		PrevKv:         wr.prevKV,
+		Fragment:       wr.fragment,
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	return &pb.WatchRequest{RequestUnion: cr}