|
|
@@ -173,8 +173,12 @@ type EtcdServer struct {
|
|
|
|
|
|
snapCount uint64
|
|
|
|
|
|
- w wait.Wait
|
|
|
- stop chan struct{}
|
|
|
+ w wait.Wait
|
|
|
+ // stop signals the run goroutine should shutdown.
|
|
|
+ stop chan struct{}
|
|
|
+ // stopping is closed by run goroutine on shutdown.
|
|
|
+ stopping chan struct{}
|
|
|
+ // done is closed when all goroutines from start() complete.
|
|
|
done chan struct{}
|
|
|
errorc chan error
|
|
|
id types.ID
|
|
|
@@ -467,10 +471,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|
|
// It also starts a goroutine to publish its server information.
|
|
|
func (s *EtcdServer) Start() {
|
|
|
s.start()
|
|
|
- go s.publish(s.Cfg.ReqTimeout())
|
|
|
- go s.purgeFile()
|
|
|
- go monitorFileDescriptor(s.done)
|
|
|
- go s.monitorVersions()
|
|
|
+ s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
|
|
+ s.goAttach(s.purgeFile)
|
|
|
+ s.goAttach(func() { monitorFileDescriptor(s.stopping) })
|
|
|
+ s.goAttach(s.monitorVersions)
|
|
|
}
|
|
|
|
|
|
// start prepares and starts server in a new goroutine. It is no longer safe to
|
|
|
@@ -485,6 +489,7 @@ func (s *EtcdServer) start() {
|
|
|
s.applyWait = wait.NewTimeList()
|
|
|
s.done = make(chan struct{})
|
|
|
s.stop = make(chan struct{})
|
|
|
+ s.stopping = make(chan struct{})
|
|
|
if s.ClusterVersion() != nil {
|
|
|
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
|
|
|
} else {
|
|
|
@@ -508,7 +513,7 @@ func (s *EtcdServer) purgeFile() {
|
|
|
plog.Fatalf("failed to purge wal file %v", e)
|
|
|
case e := <-serrc:
|
|
|
plog.Fatalf("failed to purge snap file %v", e)
|
|
|
- case <-s.done:
|
|
|
+ case <-s.stopping:
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
@@ -564,9 +569,11 @@ func (s *EtcdServer) run() {
|
|
|
}
|
|
|
|
|
|
defer func() {
|
|
|
+ close(s.stopping)
|
|
|
+
|
|
|
sched.Stop()
|
|
|
|
|
|
- // wait for snapshots before closing raft so wal stays open
|
|
|
+ // wait for gouroutines before closing raft so wal stays open
|
|
|
s.wg.Wait()
|
|
|
|
|
|
// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
|
|
|
@@ -611,11 +618,11 @@ func (s *EtcdServer) run() {
|
|
|
f := func(context.Context) { s.applyAll(&ep, &ap) }
|
|
|
sched.Schedule(f)
|
|
|
case leases := <-expiredLeaseC:
|
|
|
- go func() {
|
|
|
+ s.goAttach(func() {
|
|
|
for _, l := range leases {
|
|
|
s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)})
|
|
|
}
|
|
|
- }()
|
|
|
+ })
|
|
|
case err := <-s.errorc:
|
|
|
plog.Errorf("%s", err)
|
|
|
plog.Infof("the data-dir used by this member must be removed.")
|
|
|
@@ -1007,7 +1014,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
|
|
|
case <-ctx.Done():
|
|
|
s.w.Trigger(cc.ID, nil) // GC wait
|
|
|
return s.parseProposeCtxErr(ctx.Err(), start)
|
|
|
- case <-s.done:
|
|
|
+ case <-s.stopping:
|
|
|
return ErrStopped
|
|
|
}
|
|
|
}
|
|
|
@@ -1025,10 +1032,10 @@ func (s *EtcdServer) sync(timeout time.Duration) {
|
|
|
data := pbutil.MustMarshal(&req)
|
|
|
// There is no promise that node has leader when do SYNC request,
|
|
|
// so it uses goroutine to propose.
|
|
|
- go func() {
|
|
|
+ s.goAttach(func() {
|
|
|
s.r.Propose(ctx, data)
|
|
|
cancel()
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
// publish registers server information into the cluster. The information
|
|
|
@@ -1111,7 +1118,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
|
|
atomic.AddInt64(&s.inflightSnapshots, 1)
|
|
|
|
|
|
s.r.transport.SendSnapshot(merged)
|
|
|
- go func() {
|
|
|
+ s.goAttach(func() {
|
|
|
select {
|
|
|
case ok := <-merged.CloseNotify():
|
|
|
// delay releasing inflight snapshot for another 30 seconds to
|
|
|
@@ -1121,14 +1128,14 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
|
|
if ok {
|
|
|
select {
|
|
|
case <-time.After(releaseDelayAfterSnapshot):
|
|
|
- case <-s.done:
|
|
|
+ case <-s.stopping:
|
|
|
}
|
|
|
}
|
|
|
atomic.AddInt64(&s.inflightSnapshots, -1)
|
|
|
- case <-s.done:
|
|
|
+ case <-s.stopping:
|
|
|
return
|
|
|
}
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
// apply takes entries received from Raft (after it has been committed) and
|
|
|
@@ -1220,7 +1227,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
}
|
|
|
|
|
|
plog.Errorf("applying raft message exceeded backend quota")
|
|
|
- go func() {
|
|
|
+ s.goAttach(func() {
|
|
|
a := &pb.AlarmRequest{
|
|
|
MemberID: uint64(s.ID()),
|
|
|
Action: pb.AlarmRequest_ACTIVATE,
|
|
|
@@ -1229,7 +1236,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
r := pb.InternalRaftRequest{Alarm: a}
|
|
|
s.processInternalRaftRequest(context.TODO(), r)
|
|
|
s.w.Trigger(id, ar)
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
// applyConfChange applies a ConfChange to the server. It is only
|
|
|
@@ -1288,10 +1295,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|
|
// the go routine created below.
|
|
|
s.KV().Commit()
|
|
|
|
|
|
- s.wg.Add(1)
|
|
|
- go func() {
|
|
|
- defer s.wg.Done()
|
|
|
-
|
|
|
+ s.goAttach(func() {
|
|
|
d, err := clone.SaveNoCopy()
|
|
|
// TODO: current store will never fail to do a snapshot
|
|
|
// what should we do if the store might fail?
|
|
|
@@ -1339,7 +1343,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|
|
plog.Panicf("unexpected compaction error %v", err)
|
|
|
}
|
|
|
plog.Infof("compacted raft log at %d", compacti)
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
// CutPeer drops messages to the specified peer.
|
|
|
@@ -1378,7 +1382,7 @@ func (s *EtcdServer) monitorVersions() {
|
|
|
select {
|
|
|
case <-s.forceVersionC:
|
|
|
case <-time.After(monitorVersionInterval):
|
|
|
- case <-s.done:
|
|
|
+ case <-s.stopping:
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -1399,18 +1403,18 @@ func (s *EtcdServer) monitorVersions() {
|
|
|
// 1. use the decided version if possible
|
|
|
// 2. or use the min cluster version
|
|
|
if s.cluster.Version() == nil {
|
|
|
+ verStr := version.MinClusterVersion
|
|
|
if v != nil {
|
|
|
- go s.updateClusterVersion(v.String())
|
|
|
- } else {
|
|
|
- go s.updateClusterVersion(version.MinClusterVersion)
|
|
|
+ verStr = v.String()
|
|
|
}
|
|
|
+ s.goAttach(func() { s.updateClusterVersion(verStr) })
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
// update cluster version only if the decided version is greater than
|
|
|
// the current cluster version
|
|
|
if v != nil && s.cluster.Version().LessThan(*v) {
|
|
|
- go s.updateClusterVersion(v.String())
|
|
|
+ s.goAttach(func() { s.updateClusterVersion(v.String()) })
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -1508,3 +1512,13 @@ func (s *EtcdServer) getCommittedIndex() uint64 {
|
|
|
func (s *EtcdServer) setCommittedIndex(v uint64) {
|
|
|
atomic.StoreUint64(&s.committedIndex, v)
|
|
|
}
|
|
|
+
|
|
|
+// goAttach creates a goroutine on a given function and tracks it using
|
|
|
+// the etcdserver waitgroup.
|
|
|
+func (s *EtcdServer) goAttach(f func()) {
|
|
|
+ s.wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer s.wg.Done()
|
|
|
+ f()
|
|
|
+ }()
|
|
|
+}
|