Browse Source

etcdserver: make WaitGroup.Add sync with Wait

Gyu-Ho Lee 9 years ago
parent
commit
0c61d8804a
2 changed files with 15 additions and 1 deletions
  1. 14 0
      etcdserver/server.go
  2. 1 1
      etcdserver/server_test.go

+ 14 - 0
etcdserver/server.go

@@ -235,6 +235,8 @@ type EtcdServer struct {
 
 
 	msgSnapC chan raftpb.Message
 	msgSnapC chan raftpb.Message
 
 
+	// wgMu blocks concurrent waitgroup mutation while server stopping
+	wgMu sync.RWMutex
 	// wg is used to wait for the go routines that depends on the server state
 	// wg is used to wait for the go routines that depends on the server state
 	// to exit when stopping the server.
 	// to exit when stopping the server.
 	wg sync.WaitGroup
 	wg sync.WaitGroup
@@ -644,7 +646,9 @@ func (s *EtcdServer) run() {
 	}
 	}
 
 
 	defer func() {
 	defer func() {
+		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
 		close(s.stopping)
 		close(s.stopping)
+		s.wgMu.Unlock()
 
 
 		sched.Stop()
 		sched.Stop()
 
 
@@ -1609,6 +1613,16 @@ func (s *EtcdServer) setCommittedIndex(v uint64) {
 // goAttach creates a goroutine on a given function and tracks it using
 // goAttach creates a goroutine on a given function and tracks it using
 // the etcdserver waitgroup.
 // the etcdserver waitgroup.
 func (s *EtcdServer) goAttach(f func()) {
 func (s *EtcdServer) goAttach(f func()) {
+	s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
+	defer s.wgMu.RUnlock()
+	select {
+	case <-s.stopping:
+		plog.Warning("server has stopped (skipping goAttach)")
+		return
+	default:
+	}
+
+	// now safe to add since waitgroup wait has not started yet
 	s.wg.Add(1)
 	s.wg.Add(1)
 	go func() {
 	go func() {
 		defer s.wg.Done()
 		defer s.wg.Done()

+ 1 - 1
etcdserver/server_test.go

@@ -939,8 +939,8 @@ func TestTriggerSnap(t *testing.T) {
 		srv.Do(context.Background(), pb.Request{Method: "PUT"})
 		srv.Do(context.Background(), pb.Request{Method: "PUT"})
 	}
 	}
 
 
-	srv.Stop()
 	<-donec
 	<-donec
+	srv.Stop()
 }
 }
 
 
 // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
 // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with