Browse Source

Merge pull request #5897 from xiang90/lock

v3rpc: lock progress and prevKV map correctly
Xiang Li 9 years ago
parent
commit
4f2da16d82
1 changed files with 8 additions and 3 deletions
  1. 8 3
      etcdserver/api/v3rpc/watch.go

+ 8 - 3
etcdserver/api/v3rpc/watch.go

@@ -88,15 +88,14 @@ type serverWatchStream struct {
 	watchStream mvcc.WatchStream
 	watchStream mvcc.WatchStream
 	ctrlStream  chan *pb.WatchResponse
 	ctrlStream  chan *pb.WatchResponse
 
 
+	// mu protects progress, prevKV
+	mu sync.Mutex
 	// progress tracks the watchID that stream might need to send
 	// progress tracks the watchID that stream might need to send
 	// progress to.
 	// progress to.
 	// TOOD: combine progress and prevKV into a single struct?
 	// TOOD: combine progress and prevKV into a single struct?
 	progress map[mvcc.WatchID]bool
 	progress map[mvcc.WatchID]bool
 	prevKV   map[mvcc.WatchID]bool
 	prevKV   map[mvcc.WatchID]bool
 
 
-	// mu protects progress
-	mu sync.Mutex
-
 	// closec indicates the stream is closed.
 	// closec indicates the stream is closed.
 	closec chan struct{}
 	closec chan struct{}
 
 
@@ -191,12 +190,14 @@ func (sws *serverWatchStream) recvLoop() error {
 			}
 			}
 			id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
 			id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
 			if id != -1 {
 			if id != -1 {
+				sws.mu.Lock()
 				if creq.ProgressNotify {
 				if creq.ProgressNotify {
 					sws.progress[id] = true
 					sws.progress[id] = true
 				}
 				}
 				if creq.PrevKv {
 				if creq.PrevKv {
 					sws.prevKV[id] = true
 					sws.prevKV[id] = true
 				}
 				}
+				sws.mu.Unlock()
 			}
 			}
 			wr := &pb.WatchResponse{
 			wr := &pb.WatchResponse{
 				Header:   sws.newResponseHeader(wsrev),
 				Header:   sws.newResponseHeader(wsrev),
@@ -268,7 +269,9 @@ func (sws *serverWatchStream) sendLoop() {
 			// or define protocol buffer with []mvccpb.Event.
 			// or define protocol buffer with []mvccpb.Event.
 			evs := wresp.Events
 			evs := wresp.Events
 			events := make([]*mvccpb.Event, len(evs))
 			events := make([]*mvccpb.Event, len(evs))
+			sws.mu.Lock()
 			needPrevKV := sws.prevKV[wresp.WatchID]
 			needPrevKV := sws.prevKV[wresp.WatchID]
+			sws.mu.Unlock()
 			for i := range evs {
 			for i := range evs {
 				events[i] = &evs[i]
 				events[i] = &evs[i]
 
 
@@ -333,12 +336,14 @@ func (sws *serverWatchStream) sendLoop() {
 				delete(pending, wid)
 				delete(pending, wid)
 			}
 			}
 		case <-progressTicker.C:
 		case <-progressTicker.C:
+			sws.mu.Lock()
 			for id, ok := range sws.progress {
 			for id, ok := range sws.progress {
 				if ok {
 				if ok {
 					sws.watchStream.RequestProgress(id)
 					sws.watchStream.RequestProgress(id)
 				}
 				}
 				sws.progress[id] = true
 				sws.progress[id] = true
 			}
 			}
+			sws.mu.Unlock()
 		case <-sws.closec:
 		case <-sws.closec:
 			return
 			return
 		}
 		}