|
@@ -208,6 +208,10 @@ type EtcdServer struct {
|
|
|
forceVersionC chan struct{}
|
|
forceVersionC chan struct{}
|
|
|
|
|
|
|
|
msgSnapC chan raftpb.Message
|
|
msgSnapC chan raftpb.Message
|
|
|
|
|
+
|
|
|
|
|
+ // wg is used to wait for the go routines that depends on the server state
|
|
|
|
|
+ // to exit when stopping the server.
|
|
|
|
|
+ wg sync.WaitGroup
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewServer creates a new EtcdServer from the supplied configuration. The
|
|
// NewServer creates a new EtcdServer from the supplied configuration. The
|
|
@@ -536,6 +540,8 @@ func (s *EtcdServer) run() {
|
|
|
s.r.stop()
|
|
s.r.stop()
|
|
|
sched.Stop()
|
|
sched.Stop()
|
|
|
|
|
|
|
|
|
|
+ s.wg.Wait()
|
|
|
|
|
+
|
|
|
// kv, lessor and backend can be nil if running without v3 enabled
|
|
// kv, lessor and backend can be nil if running without v3 enabled
|
|
|
// or running unit tests.
|
|
// or running unit tests.
|
|
|
if s.lessor != nil {
|
|
if s.lessor != nil {
|
|
@@ -1089,7 +1095,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|
|
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|
|
clone := s.store.Clone()
|
|
clone := s.store.Clone()
|
|
|
|
|
|
|
|
|
|
+ s.wg.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
|
|
|
+ defer s.wg.Done()
|
|
|
|
|
+
|
|
|
d, err := clone.SaveNoCopy()
|
|
d, err := clone.SaveNoCopy()
|
|
|
// TODO: current store will never fail to do a snapshot
|
|
// TODO: current store will never fail to do a snapshot
|
|
|
// what should we do if the store might fail?
|
|
// what should we do if the store might fail?
|