Browse Source

Merge pull request #5169 from xiang90/ready

etcdserver: do not serve requests before finish the first internal proposal
Xiang Li 9 years ago
parent
commit
ca83793876
4 changed files with 15 additions and 1 deletions
  1. 3 0
      etcdmain/serve.go
  2. 9 1
      etcdserver/server.go
  3. 1 0
      etcdserver/server_test.go
  4. 2 0
      integration/member_test.go

+ 3 - 0
etcdmain/serve.go

@@ -42,6 +42,9 @@ type serveCtx struct {
 func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 
+	<-s.ReadyNotify()
+	plog.Info("ready to serve client requests")
+
 	m := cmux.New(sctx.l)
 
 	if sctx.insecure {

+ 9 - 1
etcdserver/server.go

@@ -162,7 +162,9 @@ type EtcdServer struct {
 	// count the number of inflight snapshots.
 	// MUST use atomic operation to access this field.
 	inflightSnapshots int64
-	r                 raftNode
+
+	readych chan struct{}
+	r       raftNode
 
 	cfg       *ServerConfig
 	snapCount uint64
@@ -366,6 +368,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	lstats := stats.NewLeaderStats(id.String())
 
 	srv = &EtcdServer{
+		readych:   make(chan struct{}),
 		cfg:       cfg,
 		snapCount: cfg.SnapCount,
 		errorc:    make(chan error, 1),
@@ -729,6 +732,10 @@ func (s *EtcdServer) Stop() {
 	<-s.done
 }
 
+// ReadyNotify returns a channel that will be closed when the server
+// is ready to serve client requests
+func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
+
 func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
 	select {
 	case <-time.After(d):
@@ -888,6 +895,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
 		cancel()
 		switch err {
 		case nil:
+			close(s.readych)
 			plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
 			return
 		case ErrStopped:

+ 1 - 0
etcdserver/server_test.go

@@ -1155,6 +1155,7 @@ func TestPublish(t *testing.T) {
 	ch <- Response{}
 	w := wait.NewWithResponse(ch)
 	srv := &EtcdServer{
+		readych:    make(chan struct{}),
 		cfg:        &ServerConfig{TickMs: 1},
 		id:         1,
 		r:          raftNode{Node: n},

+ 2 - 0
integration/member_test.go

@@ -61,6 +61,7 @@ func TestRestartMember(t *testing.T) {
 			t.Fatal(err)
 		}
 	}
+
 	clusterMustProgress(t, c.Members)
 }
 
@@ -105,6 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
 	m.Stop(t)
 	m.Restart(t)
 
+	m.WaitOK(t)
 	for i := 0; i < 120; i++ {
 		cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
 		kapi := client.NewKeysAPI(cc)