Browse Source

Merge pull request #7640 from heyitsanthony/etcdserver-ctx

etcdserver: ctx-ize server initiated requests
Anthony Romano 8 years ago
parent
commit
24e4c94d98
5 changed files with 62 additions and 8 deletions
  1. 1 1
      etcdserver/apply.go
  2. 13 6
      etcdserver/server.go
  3. 21 0
      etcdserver/server_test.go
  4. 7 1
      integration/cluster.go
  5. 20 0
      integration/cluster_test.go

+ 1 - 1
etcdserver/apply.go

@@ -584,7 +584,7 @@ func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
 }
 }
 
 
 func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
 func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
-	ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
+	ctx := context.WithValue(context.WithValue(a.s.ctx, "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
 	return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
 	return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
 }
 }
 
 

+ 13 - 6
etcdserver/server.go

@@ -238,6 +238,11 @@ type EtcdServer struct {
 	// wg is used to wait for the go routines that depends on the server state
 	// wg is used to wait for the go routines that depends on the server state
 	// to exit when stopping the server.
 	// to exit when stopping the server.
 	wg sync.WaitGroup
 	wg sync.WaitGroup
+
+	// ctx is used for etcd-initiated requests that may need to be canceled
+	// on etcd server shutdown.
+	ctx    context.Context
+	cancel context.CancelFunc
 }
 }
 
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -536,6 +541,7 @@ func (s *EtcdServer) start() {
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
 	s.stop = make(chan struct{})
 	s.stop = make(chan struct{})
 	s.stopping = make(chan struct{})
 	s.stopping = make(chan struct{})
+	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.readwaitc = make(chan struct{}, 1)
 	s.readwaitc = make(chan struct{}, 1)
 	s.readNotifier = newNotifier()
 	s.readNotifier = newNotifier()
 	if s.ClusterVersion() != nil {
 	if s.ClusterVersion() != nil {
@@ -686,6 +692,7 @@ func (s *EtcdServer) run() {
 		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
 		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
 		close(s.stopping)
 		close(s.stopping)
 		s.wgMu.Unlock()
 		s.wgMu.Unlock()
+		s.cancel()
 
 
 		sched.Stop()
 		sched.Stop()
 
 
@@ -740,7 +747,7 @@ func (s *EtcdServer) run() {
 					}
 					}
 					lid := lease.ID
 					lid := lease.ID
 					s.goAttach(func() {
 					s.goAttach(func() {
-						s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(lid)})
+						s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
 						<-c
 						<-c
 					})
 					})
 				}
 				}
@@ -967,7 +974,7 @@ func (s *EtcdServer) TransferLeadership() error {
 	}
 	}
 
 
 	tm := s.Cfg.ReqTimeout()
 	tm := s.Cfg.ReqTimeout()
-	ctx, cancel := context.WithTimeout(context.TODO(), tm)
+	ctx, cancel := context.WithTimeout(s.ctx, tm)
 	err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
 	err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
 	cancel()
 	cancel()
 	return err
 	return err
@@ -1181,7 +1188,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
 // This makes no guarantee that the request will be proposed or performed.
 // This makes no guarantee that the request will be proposed or performed.
 // The request will be canceled after the given timeout.
 // The request will be canceled after the given timeout.
 func (s *EtcdServer) sync(timeout time.Duration) {
 func (s *EtcdServer) sync(timeout time.Duration) {
-	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	req := pb.Request{
 	req := pb.Request{
 		Method: "SYNC",
 		Method: "SYNC",
 		ID:     s.reqIDGen.Next(),
 		ID:     s.reqIDGen.Next(),
@@ -1190,6 +1196,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 	data := pbutil.MustMarshal(&req)
 	data := pbutil.MustMarshal(&req)
 	// There is no promise that node has leader when do SYNC request,
 	// There is no promise that node has leader when do SYNC request,
 	// so it uses goroutine to propose.
 	// so it uses goroutine to propose.
+	ctx, cancel := context.WithTimeout(s.ctx, timeout)
 	s.goAttach(func() {
 	s.goAttach(func() {
 		s.r.Propose(ctx, data)
 		s.r.Propose(ctx, data)
 		cancel()
 		cancel()
@@ -1214,7 +1221,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
 	}
 	}
 
 
 	for {
 	for {
-		ctx, cancel := context.WithTimeout(context.Background(), timeout)
+		ctx, cancel := context.WithTimeout(s.ctx, timeout)
 		_, err := s.Do(ctx, req)
 		_, err := s.Do(ctx, req)
 		cancel()
 		cancel()
 		switch err {
 		switch err {
@@ -1355,7 +1362,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 			Alarm:    pb.AlarmType_NOSPACE,
 			Alarm:    pb.AlarmType_NOSPACE,
 		}
 		}
 		r := pb.InternalRaftRequest{Alarm: a}
 		r := pb.InternalRaftRequest{Alarm: a}
-		s.processInternalRaftRequest(context.TODO(), r)
+		s.processInternalRaftRequest(s.ctx, r)
 		s.w.Trigger(id, ar)
 		s.w.Trigger(id, ar)
 	})
 	})
 }
 }
@@ -1551,7 +1558,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
 		Path:   membership.StoreClusterVersionKey(),
 		Path:   membership.StoreClusterVersionKey(),
 		Val:    ver,
 		Val:    ver,
 	}
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
+	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
 	_, err := s.Do(ctx, req)
 	_, err := s.Do(ctx, req)
 	cancel()
 	cancel()
 	switch err {
 	switch err {

+ 21 - 0
etcdserver/server_test.go

@@ -721,9 +721,12 @@ func TestDoProposalStopped(t *testing.T) {
 // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
 // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
 func TestSync(t *testing.T) {
 func TestSync(t *testing.T) {
 	n := newNodeRecorder()
 	n := newNodeRecorder()
+	ctx, cancel := context.WithCancel(context.TODO())
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		r:        raftNode{Node: n},
 		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
+		ctx:      ctx,
+		cancel:   cancel,
 	}
 	}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 
@@ -761,9 +764,12 @@ func TestSync(t *testing.T) {
 // after timeout
 // after timeout
 func TestSyncTimeout(t *testing.T) {
 func TestSyncTimeout(t *testing.T) {
 	n := newProposalBlockerRecorder()
 	n := newProposalBlockerRecorder()
+	ctx, cancel := context.WithCancel(context.TODO())
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		r:        raftNode{Node: n},
 		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
+		ctx:      ctx,
+		cancel:   cancel,
 	}
 	}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 
@@ -1185,6 +1191,7 @@ func TestPublish(t *testing.T) {
 	// simulate that request has gone through consensus
 	// simulate that request has gone through consensus
 	ch <- Response{}
 	ch <- Response{}
 	w := wait.NewWithResponse(ch)
 	w := wait.NewWithResponse(ch)
+	ctx, cancel := context.WithCancel(context.TODO())
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		readych:    make(chan struct{}),
 		readych:    make(chan struct{}),
 		Cfg:        &ServerConfig{TickMs: 1},
 		Cfg:        &ServerConfig{TickMs: 1},
@@ -1195,6 +1202,9 @@ func TestPublish(t *testing.T) {
 		w:          w,
 		w:          w,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		SyncTicker: &time.Ticker{},
 		SyncTicker: &time.Ticker{},
+
+		ctx:    ctx,
+		cancel: cancel,
 	}
 	}
 	srv.publish(time.Hour)
 	srv.publish(time.Hour)
 
 
@@ -1228,6 +1238,7 @@ func TestPublish(t *testing.T) {
 
 
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 func TestPublishStopped(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.TODO())
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		Cfg: &ServerConfig{TickMs: 1},
 		Cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 		r: raftNode{
@@ -1242,6 +1253,9 @@ func TestPublishStopped(t *testing.T) {
 		stop:       make(chan struct{}),
 		stop:       make(chan struct{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		SyncTicker: &time.Ticker{},
 		SyncTicker: &time.Ticker{},
+
+		ctx:    ctx,
+		cancel: cancel,
 	}
 	}
 	close(srv.stopping)
 	close(srv.stopping)
 	srv.publish(time.Hour)
 	srv.publish(time.Hour)
@@ -1249,6 +1263,7 @@ func TestPublishStopped(t *testing.T) {
 
 
 // TestPublishRetry tests that publish will keep retry until success.
 // TestPublishRetry tests that publish will keep retry until success.
 func TestPublishRetry(t *testing.T) {
 func TestPublishRetry(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.TODO())
 	n := newNodeRecorderStream()
 	n := newNodeRecorderStream()
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		Cfg:        &ServerConfig{TickMs: 1},
 		Cfg:        &ServerConfig{TickMs: 1},
@@ -1257,6 +1272,8 @@ func TestPublishRetry(t *testing.T) {
 		stopping:   make(chan struct{}),
 		stopping:   make(chan struct{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		SyncTicker: &time.Ticker{},
 		SyncTicker: &time.Ticker{},
+		ctx:        ctx,
+		cancel:     cancel,
 	}
 	}
 	// expect multiple proposals from retrying
 	// expect multiple proposals from retrying
 	ch := make(chan struct{})
 	ch := make(chan struct{})
@@ -1287,6 +1304,7 @@ func TestUpdateVersion(t *testing.T) {
 	// simulate that request has gone through consensus
 	// simulate that request has gone through consensus
 	ch <- Response{}
 	ch <- Response{}
 	w := wait.NewWithResponse(ch)
 	w := wait.NewWithResponse(ch)
+	ctx, cancel := context.WithCancel(context.TODO())
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		id:         1,
 		id:         1,
 		Cfg:        &ServerConfig{TickMs: 1},
 		Cfg:        &ServerConfig{TickMs: 1},
@@ -1296,6 +1314,9 @@ func TestUpdateVersion(t *testing.T) {
 		w:          w,
 		w:          w,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		SyncTicker: &time.Ticker{},
 		SyncTicker: &time.Ticker{},
+
+		ctx:    ctx,
+		cancel: cancel,
 	}
 	}
 	srv.updateClusterVersion("2.0.0")
 	srv.updateClusterVersion("2.0.0")
 
 

+ 7 - 1
integration/cluster.go

@@ -313,9 +313,15 @@ func (c *cluster) removeMember(t *testing.T, id uint64) error {
 }
 }
 
 
 func (c *cluster) Terminate(t *testing.T) {
 func (c *cluster) Terminate(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(len(c.Members))
 	for _, m := range c.Members {
 	for _, m := range c.Members {
-		m.Terminate(t)
+		go func(mm *member) {
+			defer wg.Done()
+			mm.Terminate(t)
+		}(m)
 	}
 	}
+	wg.Wait()
 }
 }
 
 
 func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
 func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {

+ 20 - 0
integration/cluster_test.go

@@ -568,3 +568,23 @@ func TestTransferLeader(t *testing.T) {
 		t.Fatalf("expected old leader %d != new leader %d", oldLeadID, newLeadIDs[0])
 		t.Fatalf("expected old leader %d != new leader %d", oldLeadID, newLeadIDs[0])
 	}
 	}
 }
 }
+
+func TestSpeedyTerminate(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	// Stop/Restart so requests will time out on lost leaders
+	for i := 0; i < 3; i++ {
+		clus.Members[i].Stop(t)
+		clus.Members[i].Restart(t)
+	}
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		clus.Terminate(t)
+	}()
+	select {
+	case <-time.After(10 * time.Second):
+		t.Fatalf("cluster took too long to terminate")
+	case <-donec:
+	}
+}