Browse Source

Merge pull request #1720 from xiang90/sender_stop

*: gracefully stop etcdserver
Xiang Li 11 years ago
parent
commit
ac5a282003
5 changed files with 190 additions and 71 deletions
  1. 16 16
      etcdmain/etcd.go
  2. 44 30
      etcdserver/sender.go
  3. 56 5
      etcdserver/sender_test.go
  4. 26 10
      etcdserver/server.go
  5. 48 10
      etcdserver/server_test.go

+ 16 - 16
etcdmain/etcd.go

@@ -170,8 +170,9 @@ func Main() {
 	}
 	}
 
 
 	shouldProxy := proxyFlag.String() != proxyFlagOff
 	shouldProxy := proxyFlag.String() != proxyFlagOff
+	var stopped <-chan struct{}
 	if !shouldProxy {
 	if !shouldProxy {
-		err = startEtcd()
+		stopped, err = startEtcd()
 		if err == discovery.ErrFullCluster && fallbackFlag.String() == fallbackFlagProxy {
 		if err == discovery.ErrFullCluster && fallbackFlag.String() == fallbackFlagProxy {
 			log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy)
 			log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy)
 			shouldProxy = true
 			shouldProxy = true
@@ -183,19 +184,18 @@ func Main() {
 	if err != nil {
 	if err != nil {
 		log.Fatalf("etcd: %v", err)
 		log.Fatalf("etcd: %v", err)
 	}
 	}
-	// Block indefinitely
-	<-make(chan struct{})
+	<-stopped
 }
 }
 
 
 // startEtcd launches the etcd server and HTTP handlers for client/server communication.
 // startEtcd launches the etcd server and HTTP handlers for client/server communication.
-func startEtcd() error {
+func startEtcd() (<-chan struct{}, error) {
 	apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo)
 	apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 	cls, err := setupCluster(apurls)
 	cls, err := setupCluster(apurls)
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf("error setting up initial cluster: %v", err)
+		return nil, fmt.Errorf("error setting up initial cluster: %v", err)
 	}
 	}
 
 
 	if *dir == "" {
 	if *dir == "" {
@@ -203,25 +203,25 @@ func startEtcd() error {
 		log.Printf("no data-dir provided, using default data-dir ./%s", *dir)
 		log.Printf("no data-dir provided, using default data-dir ./%s", *dir)
 	}
 	}
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
-		return fmt.Errorf("cannot create data directory: %v", err)
+		return nil, fmt.Errorf("cannot create data directory: %v", err)
 	}
 	}
 	if err := fileutil.IsDirWriteable(*dir); err != nil {
 	if err := fileutil.IsDirWriteable(*dir); err != nil {
-		return fmt.Errorf("cannot write to data directory: %v", err)
+		return nil, fmt.Errorf("cannot write to data directory: %v", err)
 	}
 	}
 
 
 	pt, err := transport.NewTransport(peerTLSInfo)
 	pt, err := transport.NewTransport(peerTLSInfo)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 
 
 	acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
 	acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 
 
 	lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
 	lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 
 
 	if !peerTLSInfo.Empty() {
 	if !peerTLSInfo.Empty() {
@@ -232,7 +232,7 @@ func startEtcd() error {
 		var l net.Listener
 		var l net.Listener
 		l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo)
 		l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo)
 		if err != nil {
 		if err != nil {
-			return err
+			return nil, err
 		}
 		}
 
 
 		urlStr := u.String()
 		urlStr := u.String()
@@ -248,7 +248,7 @@ func startEtcd() error {
 
 
 	lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
 	lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 
 
 	if !clientTLSInfo.Empty() {
 	if !clientTLSInfo.Empty() {
@@ -259,7 +259,7 @@ func startEtcd() error {
 		var l net.Listener
 		var l net.Listener
 		l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo)
 		l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo)
 		if err != nil {
 		if err != nil {
-			return err
+			return nil, err
 		}
 		}
 
 
 		urlStr := u.String()
 		urlStr := u.String()
@@ -289,7 +289,7 @@ func startEtcd() error {
 	var s *etcdserver.EtcdServer
 	var s *etcdserver.EtcdServer
 	s, err = etcdserver.NewServer(cfg)
 	s, err = etcdserver.NewServer(cfg)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
 	s.Start()
 	s.Start()
 
 
@@ -313,7 +313,7 @@ func startEtcd() error {
 			log.Fatal(http.Serve(l, ch))
 			log.Fatal(http.Serve(l, ch))
 		}(l)
 		}(l)
 	}
 	}
-	return nil
+	return s.StopNotify(), nil
 }
 }
 
 
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.

+ 44 - 30
etcdserver/sender.go

@@ -37,23 +37,25 @@ const (
 )
 )
 
 
 type sendHub struct {
 type sendHub struct {
-	tr      *http.Transport
-	cl      ClusterInfo
-	ss      *stats.ServerStats
-	ls      *stats.LeaderStats
-	senders map[types.ID]*sender
+	tr         http.RoundTripper
+	cl         ClusterInfo
+	ss         *stats.ServerStats
+	ls         *stats.LeaderStats
+	senders    map[types.ID]*sender
+	shouldstop chan struct{}
 }
 }
 
 
 // newSendHub creates the default send hub used to transport raft messages
 // newSendHub creates the default send hub used to transport raft messages
 // to other members. The returned sendHub will update the given ServerStats and
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
 // LeaderStats appropriately.
-func newSendHub(t *http.Transport, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
 	h := &sendHub{
 	h := &sendHub{
-		tr:      t,
-		cl:      cl,
-		ss:      ss,
-		ls:      ls,
-		senders: make(map[types.ID]*sender),
+		tr:         t,
+		cl:         cl,
+		ss:         ss,
+		ls:         ls,
+		senders:    make(map[types.ID]*sender),
+		shouldstop: make(chan struct{}, 1),
 	}
 	}
 	for _, m := range cl.Members() {
 	for _, m := range cl.Members() {
 		h.Add(m)
 		h.Add(m)
@@ -94,6 +96,10 @@ func (h *sendHub) Stop() {
 	}
 	}
 }
 }
 
 
+func (h *sendHub) ShouldStopNotify() <-chan struct{} {
+	return h.shouldstop
+}
+
 func (h *sendHub) Add(m *Member) {
 func (h *sendHub) Add(m *Member) {
 	if _, ok := h.senders[m.ID]; ok {
 	if _, ok := h.senders[m.ID]; ok {
 		return
 		return
@@ -101,7 +107,7 @@ func (h *sendHub) Add(m *Member) {
 	// TODO: considering how to switch between all available peer urls
 	// TODO: considering how to switch between all available peer urls
 	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
 	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
 	fs := h.ls.Follower(m.ID.String())
 	fs := h.ls.Follower(m.ID.String())
-	s := newSender(h.tr, u, h.cl.ID(), fs)
+	s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop)
 	h.senders[m.ID] = s
 	h.senders[m.ID] = s
 }
 }
 
 
@@ -128,22 +134,24 @@ func (h *sendHub) Update(m *Member) {
 }
 }
 
 
 type sender struct {
 type sender struct {
-	tr  http.RoundTripper
-	u   string
-	cid types.ID
-	fs  *stats.FollowerStats
-	q   chan []byte
-	mu  sync.RWMutex
-	wg  sync.WaitGroup
+	tr         http.RoundTripper
+	u          string
+	cid        types.ID
+	fs         *stats.FollowerStats
+	q          chan []byte
+	mu         sync.RWMutex
+	wg         sync.WaitGroup
+	shouldstop chan struct{}
 }
 }
 
 
-func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender {
+func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
 	s := &sender{
 	s := &sender{
-		tr:  tr,
-		u:   u,
-		cid: cid,
-		fs:  fs,
-		q:   make(chan []byte),
+		tr:         tr,
+		u:          u,
+		cid:        cid,
+		fs:         fs,
+		q:          make(chan []byte),
+		shouldstop: shouldstop,
 	}
 	}
 	s.wg.Add(connPerSender)
 	s.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
 	for i := 0; i < connPerSender; i++ {
@@ -201,13 +209,19 @@ func (s *sender) post(data []byte) error {
 
 
 	switch resp.StatusCode {
 	switch resp.StatusCode {
 	case http.StatusPreconditionFailed:
 	case http.StatusPreconditionFailed:
-		// TODO: shutdown the etcdserver gracefully?
-		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		select {
+		case s.shouldstop <- struct{}{}:
+		default:
+		}
+		log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
 		return nil
 		return nil
 	case http.StatusForbidden:
 	case http.StatusForbidden:
-		// TODO: stop the server
-		log.Println("etcd: this member has been permanently removed from the cluster")
-		log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
+		select {
+		case s.shouldstop <- struct{}{}:
+		default:
+		}
+		log.Println("etcdserver: this member has been permanently removed from the cluster")
+		log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 		return nil
 		return nil
 	case http.StatusNoContent:
 	case http.StatusNoContent:
 		return nil
 		return nil

+ 56 - 5
etcdserver/sender_test.go

@@ -89,12 +89,40 @@ func TestSendHubRemove(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestSendHubShouldStop(t *testing.T) {
+	membs := []*Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+	}
+	tr := newRespRoundTripper(http.StatusForbidden, nil)
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(tr, cl, nil, ls)
+	// wait for handle goroutines start
+	// TODO: wait for goroutines ready before return newSender
+	time.Sleep(10 * time.Millisecond)
+
+	shouldstop := h.ShouldStopNotify()
+	select {
+	case <-shouldstop:
+		t.Fatalf("received unexpected shouldstop notification")
+	case <-time.After(10 * time.Millisecond):
+	}
+	h.senders[1].send([]byte("somedata"))
+
+	testutil.ForceGosched()
+	select {
+	case <-shouldstop:
+	default:
+		t.Fatalf("cannot receive stop notification")
+	}
+}
+
 // TestSenderSend tests that send func could post data using roundtripper
 // TestSenderSend tests that send func could post data using roundtripper
 // and increase success count in stats.
 // and increase success count in stats.
 func TestSenderSend(t *testing.T) {
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs)
+	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
 	// wait for handle goroutines start
 	// wait for handle goroutines start
 	// TODO: wait for goroutines ready before return newSender
 	// TODO: wait for goroutines ready before return newSender
 	time.Sleep(10 * time.Millisecond)
 	time.Sleep(10 * time.Millisecond)
@@ -116,7 +144,7 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs)
+	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
 	// wait for handle goroutines start
 	// wait for handle goroutines start
 	// TODO: wait for goroutines ready before return newSender
 	// TODO: wait for goroutines ready before return newSender
 	time.Sleep(10 * time.Millisecond)
 	time.Sleep(10 * time.Millisecond)
@@ -144,7 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 // it increases fail count in stats.
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs)
+	s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
 	// wait for handle goroutines start
 	// wait for handle goroutines start
 	// TODO: wait for goroutines ready before return newSender
 	// TODO: wait for goroutines ready before return newSender
 	time.Sleep(10 * time.Millisecond)
 	time.Sleep(10 * time.Millisecond)
@@ -162,7 +190,7 @@ func TestSenderSendFailed(t *testing.T) {
 
 
 func TestSenderPost(t *testing.T) {
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), nil)
+	s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
 	if err := s.post([]byte("some data")); err != nil {
 	if err := s.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 		t.Fatalf("unexpect post error: %v", err)
 	}
 	}
@@ -204,7 +232,8 @@ func TestSenderPostBad(t *testing.T) {
 		{"http://10.0.0.1", http.StatusCreated, nil},
 		{"http://10.0.0.1", http.StatusCreated, nil},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil)
+		shouldstop := make(chan struct{})
+		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
 		err := s.post([]byte("some data"))
 		err := s.post([]byte("some data"))
 		s.stop()
 		s.stop()
 
 
@@ -214,6 +243,28 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestSenderPostShouldStop(t *testing.T) {
+	tests := []struct {
+		u    string
+		code int
+		err  error
+	}{
+		{"http://10.0.0.1", http.StatusForbidden, nil},
+		{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
+	}
+	for i, tt := range tests {
+		shouldstop := make(chan struct{}, 1)
+		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s.post([]byte("some data"))
+		s.stop()
+		select {
+		case <-shouldstop:
+		default:
+			t.Fatalf("#%d: cannot receive shouldstop notification", i)
+		}
+	}
+}
+
 type roundTripperBlocker struct {
 type roundTripperBlocker struct {
 	c chan struct{}
 	c chan struct{}
 }
 }

+ 26 - 10
etcdserver/server.go

@@ -91,6 +91,7 @@ type Sender interface {
 	Remove(id types.ID)
 	Remove(id types.ID)
 	Update(m *Member)
 	Update(m *Member)
 	Stop()
 	Stop()
+	ShouldStopNotify() <-chan struct{}
 }
 }
 
 
 type Storage interface {
 type Storage interface {
@@ -327,6 +328,14 @@ func (s *EtcdServer) run() {
 	// snapi indicates the index of the last submitted snapshot request
 	// snapi indicates the index of the last submitted snapshot request
 	var snapi, appliedi uint64
 	var snapi, appliedi uint64
 	var nodes []uint64
 	var nodes []uint64
+	var shouldstop bool
+	shouldstopC := s.sender.ShouldStopNotify()
+
+	defer func() {
+		s.node.Stop()
+		s.sender.Stop()
+		close(s.done)
+	}()
 	for {
 	for {
 		select {
 		select {
 		case <-s.Ticker:
 		case <-s.Ticker:
@@ -372,7 +381,9 @@ func (s *EtcdServer) run() {
 				if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
 				if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
 					ents = rd.CommittedEntries[appliedi+1-firsti:]
 					ents = rd.CommittedEntries[appliedi+1-firsti:]
 				}
 				}
-				appliedi = s.apply(ents)
+				if appliedi, shouldstop = s.apply(ents); shouldstop {
+					return
+				}
 			}
 			}
 
 
 			s.node.Advance()
 			s.node.Advance()
@@ -386,10 +397,9 @@ func (s *EtcdServer) run() {
 			}
 			}
 		case <-syncC:
 		case <-syncC:
 			s.sync(defaultSyncTimeout)
 			s.sync(defaultSyncTimeout)
+		case <-shouldstopC:
+			return
 		case <-s.stop:
 		case <-s.stop:
-			s.node.Stop()
-			s.sender.Stop()
-			close(s.done)
 			return
 			return
 		}
 		}
 	}
 	}
@@ -612,7 +622,7 @@ func getExpirationTime(r *pb.Request) time.Time {
 
 
 // apply takes an Entry received from Raft (after it has been committed) and
 // apply takes an Entry received from Raft (after it has been committed) and
 // applies it to the current state of the EtcdServer
 // applies it to the current state of the EtcdServer
-func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
+func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
 	var applied uint64
 	var applied uint64
 	for i := range es {
 	for i := range es {
 		e := es[i]
 		e := es[i]
@@ -624,7 +634,11 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			pbutil.MustUnmarshal(&cc, e.Data)
-			s.w.Trigger(cc.ID, s.applyConfChange(cc))
+			shouldstop, err := s.applyConfChange(cc)
+			s.w.Trigger(cc.ID, err)
+			if shouldstop {
+				return applied, true
+			}
 		default:
 		default:
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 		}
 		}
@@ -632,7 +646,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
 		atomic.StoreUint64(&s.raftTerm, e.Term)
 		atomic.StoreUint64(&s.raftTerm, e.Term)
 		applied = e.Index
 		applied = e.Index
 	}
 	}
-	return applied
+	return applied, false
 }
 }
 
 
 // applyRequest interprets r as a call to store.X and returns a Response interpreted
 // applyRequest interprets r as a call to store.X and returns a Response interpreted
@@ -686,11 +700,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 
 
 // applyConfChange applies a ConfChange to the server. It is only
 // applyConfChange applies a ConfChange to the server. It is only
 // invoked with a ConfChange that has already passed through Raft
 // invoked with a ConfChange that has already passed through Raft
-func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
+func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 		cc.NodeID = raft.None
 		cc.NodeID = raft.None
 		s.node.ApplyConfChange(cc)
 		s.node.ApplyConfChange(cc)
-		return err
+		return false, err
 	}
 	}
 	s.node.ApplyConfChange(cc)
 	s.node.ApplyConfChange(cc)
 	switch cc.Type {
 	switch cc.Type {
@@ -714,6 +728,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
 		s.Cluster.RemoveMember(id)
 		s.Cluster.RemoveMember(id)
 		if id == s.id {
 		if id == s.id {
 			log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID())
 			log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID())
+			log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
+			return true, nil
 		} else {
 		} else {
 			s.sender.Remove(id)
 			s.sender.Remove(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
@@ -734,7 +750,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 		}
 	}
 	}
-	return nil
+	return false, nil
 }
 }
 
 
 // TODO: non-blocking snapshot
 // TODO: non-blocking snapshot

+ 48 - 10
etcdserver/server_test.go

@@ -474,7 +474,7 @@ func TestApplyConfChangeError(t *testing.T) {
 			node:    n,
 			node:    n,
 			Cluster: cl,
 			Cluster: cl,
 		}
 		}
-		err := srv.applyConfChange(tt.cc)
+		_, err := srv.applyConfChange(tt.cc)
 		if err != tt.werr {
 		if err != tt.werr {
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 		}
 		}
@@ -491,6 +491,42 @@ func TestApplyConfChangeError(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestApplyConfChangeShouldStop(t *testing.T) {
+	cl := newCluster("")
+	cl.SetStore(store.New())
+	for i := 1; i <= 3; i++ {
+		cl.AddMember(&Member{ID: types.ID(i)})
+	}
+	srv := &EtcdServer{
+		id:      1,
+		node:    &nodeRecorder{},
+		Cluster: cl,
+		sender:  &nopSender{},
+	}
+	cc := raftpb.ConfChange{
+		Type:   raftpb.ConfChangeRemoveNode,
+		NodeID: 2,
+	}
+	// remove non-local member
+	shouldStop, err := srv.applyConfChange(cc)
+	if err != nil {
+		t.Fatalf("unexpected error %v", err)
+	}
+	if shouldStop != false {
+		t.Errorf("shouldStop = %t, want %t", shouldStop, false)
+	}
+
+	// remove local member
+	cc.NodeID = 1
+	shouldStop, err = srv.applyConfChange(cc)
+	if err != nil {
+		t.Fatalf("unexpected error %v", err)
+	}
+	if shouldStop != true {
+		t.Errorf("shouldStop = %t, want %t", shouldStop, true)
+	}
+}
+
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 
 
@@ -503,10 +539,11 @@ func (s *fakeSender) Send(msgs []raftpb.Message) {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
 		s.ss[m.To-1].node.Step(context.TODO(), m)
 	}
 	}
 }
 }
-func (s *fakeSender) Add(m *Member)      {}
-func (s *fakeSender) Update(m *Member)   {}
-func (s *fakeSender) Remove(id types.ID) {}
-func (s *fakeSender) Stop()              {}
+func (s *fakeSender) Add(m *Member)                     {}
+func (s *fakeSender) Update(m *Member)                  {}
+func (s *fakeSender) Remove(id types.ID)                {}
+func (s *fakeSender) Stop()                             {}
+func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil }
 
 
 func testServer(t *testing.T, ns uint64) {
 func testServer(t *testing.T, ns uint64) {
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
@@ -1556,11 +1593,12 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
 
 type nopSender struct{}
 type nopSender struct{}
 
 
-func (s *nopSender) Send(m []raftpb.Message) {}
-func (s *nopSender) Add(m *Member)           {}
-func (s *nopSender) Remove(id types.ID)      {}
-func (s *nopSender) Update(m *Member)        {}
-func (s *nopSender) Stop()                   {}
+func (s *nopSender) Send(m []raftpb.Message)           {}
+func (s *nopSender) Add(m *Member)                     {}
+func (s *nopSender) Remove(id types.ID)                {}
+func (s *nopSender) Update(m *Member)                  {}
+func (s *nopSender) Stop()                             {}
+func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil }
 
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))
 	peers := make([]raft.Peer, len(ids))