Browse Source

Merge pull request #1714 from xiang90/stop

StopNotify
Xiang Li 11 years ago
parent
commit
e66bda957b
2 changed files with 35 additions and 12 deletions
  1. 8 12
      etcdserver/server.go
  2. 27 0
      etcdserver/server_test.go

+ 8 - 12
etcdserver/server.go

@@ -396,6 +396,10 @@ func (s *EtcdServer) Stop() {
 	<-s.done
 }
 
+// StopNotify returns a channel that receives a empty struct
+// when the server is stopped.
+func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
+
 // Do interprets r and performs an operation on s.store according to r.Method
 // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
 // Quorum == true, r will be sent through consensus before performing its
@@ -452,18 +456,14 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 }
 
-func (s *EtcdServer) SelfStats() []byte {
-	return s.stats.JSON()
-}
+func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
 
 func (s *EtcdServer) LeaderStats() []byte {
 	// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
 	return s.lstats.JSON()
 }
 
-func (s *EtcdServer) StoreStats() []byte {
-	return s.store.JsonStats()
-}
+func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
 
 func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) {
 	s.stats.RecvAppendReq(from.String(), int(length))
@@ -508,13 +508,9 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
 }
 
 // Implement the RaftTimer interface
-func (s *EtcdServer) Index() uint64 {
-	return atomic.LoadUint64(&s.raftIndex)
-}
+func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) }
 
-func (s *EtcdServer) Term() uint64 {
-	return atomic.LoadUint64(&s.raftTerm)
-}
+func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) }
 
 // configure sends a configuration change through consensus and
 // then waits for it to be applied to the server. It

+ 27 - 0
etcdserver/server_test.go

@@ -677,6 +677,8 @@ func TestDoProposalStopped(t *testing.T) {
 	tk := make(chan time.Time)
 	// this makes <-tk always successful, which accelarates internal clock
 	close(tk)
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
 	srv := &EtcdServer{
 		// TODO: use fake node for better testability
 		node:    n,
@@ -684,6 +686,7 @@ func TestDoProposalStopped(t *testing.T) {
 		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Ticker:  tk,
+		Cluster: cl,
 	}
 	srv.start()
 
@@ -1132,6 +1135,30 @@ func TestPublishRetry(t *testing.T) {
 	}
 }
 
+func TestStopNotify(t *testing.T) {
+	s := &EtcdServer{
+		stop: make(chan struct{}),
+		done: make(chan struct{}),
+	}
+	go func() {
+		<-s.stop
+		close(s.done)
+	}()
+
+	notifier := s.StopNotify()
+	select {
+	case <-notifier:
+		t.Fatalf("received unexpected stop notification")
+	default:
+	}
+	s.Stop()
+	select {
+	case <-notifier:
+	default:
+		t.Fatalf("cannot receive stop notification")
+	}
+}
+
 func TestGetOtherPeerURLs(t *testing.T) {
 	tests := []struct {
 		membs []*Member