|
|
@@ -235,6 +235,8 @@ type EtcdServer struct {
|
|
|
|
|
|
msgSnapC chan raftpb.Message
|
|
|
|
|
|
+ // wgMu blocks concurrent waitgroup mutation while server stopping
|
|
|
+ wgMu sync.RWMutex
|
|
|
// wg is used to wait for the go routines that depends on the server state
|
|
|
// to exit when stopping the server.
|
|
|
wg sync.WaitGroup
|
|
|
@@ -644,7 +646,9 @@ func (s *EtcdServer) run() {
|
|
|
}
|
|
|
|
|
|
defer func() {
|
|
|
+ s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
|
|
|
close(s.stopping)
|
|
|
+ s.wgMu.Unlock()
|
|
|
|
|
|
sched.Stop()
|
|
|
|
|
|
@@ -1609,6 +1613,16 @@ func (s *EtcdServer) setCommittedIndex(v uint64) {
|
|
|
// goAttach creates a goroutine on a given function and tracks it using
|
|
|
// the etcdserver waitgroup.
|
|
|
func (s *EtcdServer) goAttach(f func()) {
|
|
|
+ s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
|
|
|
+ defer s.wgMu.RUnlock()
|
|
|
+ select {
|
|
|
+ case <-s.stopping:
|
|
|
+ plog.Warning("server has stopped (skipping goAttach)")
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ // now safe to add since waitgroup wait has not started yet
|
|
|
s.wg.Add(1)
|
|
|
go func() {
|
|
|
defer s.wg.Done()
|