Xiang Li 11 лет назад
Родитель
Сommit
4c324fe3a4
3 измененных файлов с 24 добавлено и 41 удалено
  1. 18 35
      etcd/etcd.go
  2. 3 3
      etcd/participant.go
  3. 3 3
      etcd/standby.go

+ 18 - 35
etcd/etcd.go

@@ -93,17 +93,22 @@ func (s *Server) Stop() {
 	if s.mode.Get() == stopMode {
 		return
 	}
-	s.stopc <- struct{}{}
+	m := s.mode.Get()
+	s.mode.Set(stopMode)
+	switch m {
+	case participantMode:
+		s.p.stop()
+	case standbyMode:
+		s.s.stop()
+	}
 	<-s.stopc
 }
 
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	switch s.mode.Get() {
-	case participantMode:
+	case participantMode, standbyMode:
 		s.p.ServeHTTP(w, r)
-	case standbyMode:
-		s.s.ServeHTTP(w, r)
-	case stopMode:
+	default:
 		http.NotFound(w, r)
 	}
 }
@@ -116,56 +121,34 @@ func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
 	switch s.mode.Get() {
 	case participantMode:
 		s.p.raftHandler().ServeHTTP(w, r)
-	case standbyMode:
-		http.NotFound(w, r)
-	case stopMode:
+	default:
 		http.NotFound(w, r)
 	}
 }
 
 func (s *Server) Run() {
-	runc := make(chan struct{})
 	next := participantMode
 	for {
 		switch next {
 		case participantMode:
 			s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration)
 			s.mode.Set(participantMode)
-			// TODO: it may block here. remove modeC later.
 			s.modeC <- s.mode.Get()
-			next = standbyMode
-			go func() {
-				s.p.run()
-				runc <- struct{}{}
-			}()
+			// TODO: it may block here. move modeC later.
+			next = s.p.run()
 		case standbyMode:
 			s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
 			s.mode.Set(standbyMode)
 			s.modeC <- s.mode.Get()
-			next = participantMode
-			go func() {
-				s.s.run()
-				runc <- struct{}{}
-			}()
-		default:
-			panic("unsupport mode")
-		}
-		select {
-		case <-runc:
-		case <-s.stopc:
-			switch s.mode.Get() {
-			case participantMode:
-				s.p.stop()
-			case standbyMode:
-				s.s.stop()
-			}
-			<-runc
-			s.mode.Set(stopMode)
-			s.modeC <- s.mode.Get()
+			next = s.s.run()
+		case stopMode:
 			s.client.CloseConnections()
 			s.peerHub.stop()
+			s.modeC <- s.mode.Get()
 			s.stopc <- struct{}{}
 			return
+		default:
+			panic("unsupport mode")
 		}
 	}
 }

+ 3 - 3
etcd/participant.go

@@ -98,7 +98,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, seeds map[stri
 	return p
 }
 
-func (p *participant) run() {
+func (p *participant) run() int64 {
 	if len(p.seeds) == 0 {
 		log.Println("starting a bootstrap node")
 		p.node.Campaign()
@@ -146,13 +146,13 @@ func (p *participant) run() {
 			node.Sync()
 		case <-p.stopc:
 			log.Printf("Participant %d stopped\n", p.id)
-			return
+			return stopMode
 		}
 		p.apply(node.Next())
 		p.send(node.Msgs())
 		if node.IsRemoved() {
 			log.Printf("Participant %d return\n", p.id)
-			return
+			return standbyMode
 		}
 	}
 }

+ 3 - 3
etcd/standby.go

@@ -53,14 +53,14 @@ func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]b
 	return s
 }
 
-func (s *standby) run() {
+func (s *standby) run() int64 {
 	var syncDuration time.Duration
 	for {
 		select {
 		case <-time.After(syncDuration):
 		case <-s.stopc:
 			log.Printf("Standby %d stopped\n", s.id)
-			return
+			return stopMode
 		}
 
 		if err := s.syncCluster(); err != nil {
@@ -75,7 +75,7 @@ func (s *standby) run() {
 			log.Println("standby join:", err)
 			continue
 		}
-		return
+		return participantMode
 	}
 }