Browse Source

etcdserver: tighten up goroutine management

All outstanding goroutines now go into the etcdserver waitgroup. goroutines are
shutdown with a "stopping" channel which is closed when the run() goroutine
shutsdown. The done channel will only close once the waitgroup is totally cleared.
Anthony Romano 9 years ago
parent
commit
3866e78c26
4 changed files with 51 additions and 36 deletions
  1. 43 29
      etcdserver/server.go
  2. 6 5
      etcdserver/server_test.go
  3. 1 1
      etcdserver/v2_server.go
  4. 1 1
      etcdserver/v3_server.go

+ 43 - 29
etcdserver/server.go

@@ -173,8 +173,12 @@ type EtcdServer struct {
 
 
 	snapCount uint64
 	snapCount uint64
 
 
-	w          wait.Wait
-	stop       chan struct{}
+	w wait.Wait
+	// stop signals the run goroutine should shutdown.
+	stop chan struct{}
+	// stopping is closed by run goroutine on shutdown.
+	stopping chan struct{}
+	// done is closed when all goroutines from start() complete.
 	done       chan struct{}
 	done       chan struct{}
 	errorc     chan error
 	errorc     chan error
 	id         types.ID
 	id         types.ID
@@ -467,10 +471,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 // It also starts a goroutine to publish its server information.
 // It also starts a goroutine to publish its server information.
 func (s *EtcdServer) Start() {
 func (s *EtcdServer) Start() {
 	s.start()
 	s.start()
-	go s.publish(s.Cfg.ReqTimeout())
-	go s.purgeFile()
-	go monitorFileDescriptor(s.done)
-	go s.monitorVersions()
+	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
+	s.goAttach(s.purgeFile)
+	s.goAttach(func() { monitorFileDescriptor(s.stopping) })
+	s.goAttach(s.monitorVersions)
 }
 }
 
 
 // start prepares and starts server in a new goroutine. It is no longer safe to
 // start prepares and starts server in a new goroutine. It is no longer safe to
@@ -485,6 +489,7 @@ func (s *EtcdServer) start() {
 	s.applyWait = wait.NewTimeList()
 	s.applyWait = wait.NewTimeList()
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
 	s.stop = make(chan struct{})
 	s.stop = make(chan struct{})
+	s.stopping = make(chan struct{})
 	if s.ClusterVersion() != nil {
 	if s.ClusterVersion() != nil {
 		plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
 		plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
 	} else {
 	} else {
@@ -508,7 +513,7 @@ func (s *EtcdServer) purgeFile() {
 		plog.Fatalf("failed to purge wal file %v", e)
 		plog.Fatalf("failed to purge wal file %v", e)
 	case e := <-serrc:
 	case e := <-serrc:
 		plog.Fatalf("failed to purge snap file %v", e)
 		plog.Fatalf("failed to purge snap file %v", e)
-	case <-s.done:
+	case <-s.stopping:
 		return
 		return
 	}
 	}
 }
 }
@@ -564,9 +569,11 @@ func (s *EtcdServer) run() {
 	}
 	}
 
 
 	defer func() {
 	defer func() {
+		close(s.stopping)
+
 		sched.Stop()
 		sched.Stop()
 
 
-		// wait for snapshots before closing raft so wal stays open
+		// wait for gouroutines before closing raft so wal stays open
 		s.wg.Wait()
 		s.wg.Wait()
 
 
 		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
 		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
@@ -611,11 +618,11 @@ func (s *EtcdServer) run() {
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			sched.Schedule(f)
 			sched.Schedule(f)
 		case leases := <-expiredLeaseC:
 		case leases := <-expiredLeaseC:
-			go func() {
+			s.goAttach(func() {
 				for _, l := range leases {
 				for _, l := range leases {
 					s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)})
 					s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)})
 				}
 				}
-			}()
+			})
 		case err := <-s.errorc:
 		case err := <-s.errorc:
 			plog.Errorf("%s", err)
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
 			plog.Infof("the data-dir used by this member must be removed.")
@@ -1007,7 +1014,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
 	case <-ctx.Done():
 	case <-ctx.Done():
 		s.w.Trigger(cc.ID, nil) // GC wait
 		s.w.Trigger(cc.ID, nil) // GC wait
 		return s.parseProposeCtxErr(ctx.Err(), start)
 		return s.parseProposeCtxErr(ctx.Err(), start)
-	case <-s.done:
+	case <-s.stopping:
 		return ErrStopped
 		return ErrStopped
 	}
 	}
 }
 }
@@ -1025,10 +1032,10 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 	data := pbutil.MustMarshal(&req)
 	data := pbutil.MustMarshal(&req)
 	// There is no promise that node has leader when do SYNC request,
 	// There is no promise that node has leader when do SYNC request,
 	// so it uses goroutine to propose.
 	// so it uses goroutine to propose.
-	go func() {
+	s.goAttach(func() {
 		s.r.Propose(ctx, data)
 		s.r.Propose(ctx, data)
 		cancel()
 		cancel()
-	}()
+	})
 }
 }
 
 
 // publish registers server information into the cluster. The information
 // publish registers server information into the cluster. The information
@@ -1111,7 +1118,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
 	atomic.AddInt64(&s.inflightSnapshots, 1)
 	atomic.AddInt64(&s.inflightSnapshots, 1)
 
 
 	s.r.transport.SendSnapshot(merged)
 	s.r.transport.SendSnapshot(merged)
-	go func() {
+	s.goAttach(func() {
 		select {
 		select {
 		case ok := <-merged.CloseNotify():
 		case ok := <-merged.CloseNotify():
 			// delay releasing inflight snapshot for another 30 seconds to
 			// delay releasing inflight snapshot for another 30 seconds to
@@ -1121,14 +1128,14 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
 			if ok {
 			if ok {
 				select {
 				select {
 				case <-time.After(releaseDelayAfterSnapshot):
 				case <-time.After(releaseDelayAfterSnapshot):
-				case <-s.done:
+				case <-s.stopping:
 				}
 				}
 			}
 			}
 			atomic.AddInt64(&s.inflightSnapshots, -1)
 			atomic.AddInt64(&s.inflightSnapshots, -1)
-		case <-s.done:
+		case <-s.stopping:
 			return
 			return
 		}
 		}
-	}()
+	})
 }
 }
 
 
 // apply takes entries received from Raft (after it has been committed) and
 // apply takes entries received from Raft (after it has been committed) and
@@ -1220,7 +1227,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 	}
 	}
 
 
 	plog.Errorf("applying raft message exceeded backend quota")
 	plog.Errorf("applying raft message exceeded backend quota")
-	go func() {
+	s.goAttach(func() {
 		a := &pb.AlarmRequest{
 		a := &pb.AlarmRequest{
 			MemberID: uint64(s.ID()),
 			MemberID: uint64(s.ID()),
 			Action:   pb.AlarmRequest_ACTIVATE,
 			Action:   pb.AlarmRequest_ACTIVATE,
@@ -1229,7 +1236,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		r := pb.InternalRaftRequest{Alarm: a}
 		r := pb.InternalRaftRequest{Alarm: a}
 		s.processInternalRaftRequest(context.TODO(), r)
 		s.processInternalRaftRequest(context.TODO(), r)
 		s.w.Trigger(id, ar)
 		s.w.Trigger(id, ar)
-	}()
+	})
 }
 }
 
 
 // applyConfChange applies a ConfChange to the server. It is only
 // applyConfChange applies a ConfChange to the server. It is only
@@ -1288,10 +1295,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 	// the go routine created below.
 	// the go routine created below.
 	s.KV().Commit()
 	s.KV().Commit()
 
 
-	s.wg.Add(1)
-	go func() {
-		defer s.wg.Done()
-
+	s.goAttach(func() {
 		d, err := clone.SaveNoCopy()
 		d, err := clone.SaveNoCopy()
 		// TODO: current store will never fail to do a snapshot
 		// TODO: current store will never fail to do a snapshot
 		// what should we do if the store might fail?
 		// what should we do if the store might fail?
@@ -1339,7 +1343,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 			plog.Panicf("unexpected compaction error %v", err)
 			plog.Panicf("unexpected compaction error %v", err)
 		}
 		}
 		plog.Infof("compacted raft log at %d", compacti)
 		plog.Infof("compacted raft log at %d", compacti)
-	}()
+	})
 }
 }
 
 
 // CutPeer drops messages to the specified peer.
 // CutPeer drops messages to the specified peer.
@@ -1378,7 +1382,7 @@ func (s *EtcdServer) monitorVersions() {
 		select {
 		select {
 		case <-s.forceVersionC:
 		case <-s.forceVersionC:
 		case <-time.After(monitorVersionInterval):
 		case <-time.After(monitorVersionInterval):
-		case <-s.done:
+		case <-s.stopping:
 			return
 			return
 		}
 		}
 
 
@@ -1399,18 +1403,18 @@ func (s *EtcdServer) monitorVersions() {
 		// 1. use the decided version if possible
 		// 1. use the decided version if possible
 		// 2. or use the min cluster version
 		// 2. or use the min cluster version
 		if s.cluster.Version() == nil {
 		if s.cluster.Version() == nil {
+			verStr := version.MinClusterVersion
 			if v != nil {
 			if v != nil {
-				go s.updateClusterVersion(v.String())
-			} else {
-				go s.updateClusterVersion(version.MinClusterVersion)
+				verStr = v.String()
 			}
 			}
+			s.goAttach(func() { s.updateClusterVersion(verStr) })
 			continue
 			continue
 		}
 		}
 
 
 		// update cluster version only if the decided version is greater than
 		// update cluster version only if the decided version is greater than
 		// the current cluster version
 		// the current cluster version
 		if v != nil && s.cluster.Version().LessThan(*v) {
 		if v != nil && s.cluster.Version().LessThan(*v) {
-			go s.updateClusterVersion(v.String())
+			s.goAttach(func() { s.updateClusterVersion(v.String()) })
 		}
 		}
 	}
 	}
 }
 }
@@ -1508,3 +1512,13 @@ func (s *EtcdServer) getCommittedIndex() uint64 {
 func (s *EtcdServer) setCommittedIndex(v uint64) {
 func (s *EtcdServer) setCommittedIndex(v uint64) {
 	atomic.StoreUint64(&s.committedIndex, v)
 	atomic.StoreUint64(&s.committedIndex, v)
 }
 }
+
+// goAttach creates a goroutine on a given function and tracks it using
+// the etcdserver waitgroup.
+func (s *EtcdServer) goAttach(f func()) {
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+		f()
+	}()
+}

+ 6 - 5
etcdserver/server_test.go

@@ -706,8 +706,8 @@ func TestDoProposalStopped(t *testing.T) {
 	}
 	}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 
-	srv.done = make(chan struct{})
-	close(srv.done)
+	srv.stopping = make(chan struct{})
+	close(srv.stopping)
 	_, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
 	_, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
 	if err != ErrStopped {
 	if err != ErrStopped {
 		t.Errorf("err = %v, want %v", err, ErrStopped)
 		t.Errorf("err = %v, want %v", err, ErrStopped)
@@ -1217,10 +1217,11 @@ func TestPublishStopped(t *testing.T) {
 		cluster:  &membership.RaftCluster{},
 		cluster:  &membership.RaftCluster{},
 		w:        mockwait.NewNop(),
 		w:        mockwait.NewNop(),
 		done:     make(chan struct{}),
 		done:     make(chan struct{}),
+		stopping: make(chan struct{}),
 		stop:     make(chan struct{}),
 		stop:     make(chan struct{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
-	close(srv.done)
+	close(srv.stopping)
 	srv.publish(time.Hour)
 	srv.publish(time.Hour)
 }
 }
 
 
@@ -1231,11 +1232,11 @@ func TestPublishRetry(t *testing.T) {
 		Cfg:      &ServerConfig{TickMs: 1},
 		Cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: n},
 		r:        raftNode{Node: n},
 		w:        mockwait.NewNop(),
 		w:        mockwait.NewNop(),
-		done:     make(chan struct{}),
+		stopping: make(chan struct{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	// TODO: use fakeClockwork
 	// TODO: use fakeClockwork
-	time.AfterFunc(10*time.Millisecond, func() { close(srv.done) })
+	time.AfterFunc(10*time.Millisecond, func() { close(srv.stopping) })
 	srv.publish(10 * time.Nanosecond)
 	srv.publish(10 * time.Nanosecond)
 
 
 	action := n.Action()
 	action := n.Action()

+ 1 - 1
etcdserver/v2_server.go

@@ -68,7 +68,7 @@ func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Res
 		proposalsFailed.Inc()
 		proposalsFailed.Inc()
 		a.s.w.Trigger(r.ID, nil) // GC wait
 		a.s.w.Trigger(r.ID, nil) // GC wait
 		return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
 		return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
-	case <-a.s.done:
+	case <-a.s.stopping:
 	}
 	}
 	return Response{}, ErrStopped
 	return Response{}, ErrStopped
 }
 }

+ 1 - 1
etcdserver/v3_server.go

@@ -294,7 +294,7 @@ func (s *EtcdServer) waitLeader() (*membership.Member, error) {
 		select {
 		select {
 		case <-time.After(dur):
 		case <-time.After(dur):
 			leader = s.cluster.Member(s.Leader())
 			leader = s.cluster.Member(s.Leader())
-		case <-s.done:
+		case <-s.stopping:
 			return nil, ErrStopped
 			return nil, ErrStopped
 		}
 		}
 	}
 	}