Browse Source

etcd: no more mutex

Xiang Li 11 years ago
parent
commit
cc1b4b7ef0
3 changed files with 22 additions and 57 deletions
  1. 18 39
      etcd/etcd.go
  2. 2 8
      etcd/participant.go
  3. 2 10
      etcd/standby.go

+ 18 - 39
etcd/etcd.go

@@ -24,7 +24,6 @@ import (
 	"net/http"
 	"net/url"
 	"os"
-	"sync"
 	"time"
 
 	"github.com/coreos/etcd/config"
@@ -50,10 +49,9 @@ type Server struct {
 	client  *v2client
 	peerHub *peerHub
 
-	stopped bool
-	mu      sync.Mutex
-	stopc   chan struct{}
-	log     *log.Logger
+	exited      chan error
+	stopNotifyc chan struct{}
+	log         *log.Logger
 	http.Handler
 }
 
@@ -92,7 +90,8 @@ func New(c *config.Config) (*Server, error) {
 		client:  newClient(tc),
 		peerHub: newPeerHub(client),
 
-		stopc: make(chan struct{}, 1),
+		exited:      make(chan error, 1),
+		stopNotifyc: make(chan struct{}),
 	}
 	m := http.NewServeMux()
 	m.HandleFunc("/", s.requestHandler)
@@ -114,21 +113,14 @@ func (s *Server) SetTick(tick time.Duration) {
 }
 
 // Stop stops the server elegently.
-func (s *Server) Stop() {
-	s.mu.Lock()
-	s.stopped = true
-	switch s.mode.Get() {
-	case participantMode:
-		s.p.stop()
-	case standbyMode:
-		s.s.stop()
-	}
+func (s *Server) Stop() error {
 	s.mode.Set(stopMode)
-	s.mu.Unlock()
-	<-s.stopc
+	close(s.stopNotifyc)
+	err := <-s.exited
 	s.client.CloseConnections()
 	s.peerHub.stop()
 	log.Printf("id=%x server.stop\n", s.id)
+	return err
 }
 
 func (s *Server) requestHandler(w http.ResponseWriter, r *http.Request) {
@@ -158,14 +150,19 @@ func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
 func (s *Server) Run() error {
 	var d *discoverer
 	var seeds []string
+	var exit error
+	defer func() { s.exited <- exit }()
+
 	durl := s.config.Discovery
 	if durl != "" {
 		u, err := url.Parse(durl)
 		if err != nil {
+			exit = err
 			return fmt.Errorf("bad discovery URL error: %v", err)
 		}
 		d = newDiscoverer(u, fmt.Sprint(s.id), s.raftPubAddr)
 		if seeds, err = d.discover(); err != nil {
+			exit = err
 			return err
 		}
 		log.Printf("id=%x server.run source=-discovery seeds=\"%v\"\n", s.id, seeds)
@@ -175,7 +172,6 @@ func (s *Server) Run() error {
 	}
 	s.peerHub.setSeeds(seeds)
 
-	defer func() { s.stopc <- struct{}{} }()
 	next := participantMode
 	for {
 		switch next {
@@ -183,48 +179,31 @@ func (s *Server) Run() error {
 			p, err := newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.config.DataDir, s.client, s.peerHub, s.tickDuration)
 			if err != nil {
 				log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err)
+				exit = err
 				return err
 			}
-
-			s.mu.Lock()
-			if s.stopped {
-				s.mu.Unlock()
-				return nil
-			}
 			s.p = p
 			s.mode.Set(participantMode)
 			log.Printf("id=%x server.run mode=participantMode\n", s.id)
-			s.mu.Unlock()
-
 			dStopc := make(chan struct{})
 			if d != nil {
 				go d.heartbeat(dStopc)
 			}
-			s.p.run()
+			s.p.run(s.stopNotifyc)
 			if d != nil {
 				close(dStopc)
 			}
 			next = standbyMode
 		case standbyMode:
-			s.mu.Lock()
-			if s.stopped {
-				s.mu.Unlock()
-				return nil
-			}
 			s.s = newStandby(s.client, s.peerHub)
 			s.mode.Set(standbyMode)
 			log.Printf("id=%x server.run mode=standbyMode\n", s.id)
-			s.mu.Unlock()
-
-			s.s.run()
+			s.s.run(s.stopNotifyc)
 			next = participantMode
 		default:
 			panic("unsupport mode")
 		}
-		s.mu.Lock()
-		stopped := s.stopped
-		s.mu.Unlock()
-		if stopped {
+		if s.mode.Get() == stopMode {
 			return nil
 		}
 		s.id = genId()

+ 2 - 8
etcd/participant.go

@@ -78,7 +78,6 @@ type participant struct {
 	rh *raftHandler
 	w  *wal.WAL
 
-	stopc       chan struct{}
 	stopNotifyc chan struct{}
 
 	*http.ServeMux
@@ -100,7 +99,6 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 		},
 		Store: store.New(),
 
-		stopc:       make(chan struct{}),
 		stopNotifyc: make(chan struct{}),
 
 		ServeMux: http.NewServeMux(),
@@ -151,7 +149,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 	return p, nil
 }
 
-func (p *participant) run() {
+func (p *participant) run(stop chan struct{}) {
 	defer p.cleanup()
 
 	if p.node.IsEmpty() {
@@ -204,7 +202,7 @@ func (p *participant) run() {
 			node.Tick()
 		case <-v2SyncTicker.C:
 			node.Sync()
-		case <-p.stopc:
+		case <-stop:
 			log.Printf("id=%x participant.stop\n", p.id)
 			return
 		}
@@ -233,10 +231,6 @@ func (p *participant) run() {
 	}
 }
 
-func (p *participant) stop() {
-	close(p.stopc)
-}
-
 func (p *participant) cleanup() {
 	p.w.Close()
 	close(p.stopNotifyc)

+ 2 - 10
etcd/standby.go

@@ -40,8 +40,6 @@ type standby struct {
 	mu          sync.RWMutex
 	clusterConf *config.ClusterConfig
 
-	stopc chan struct{}
-
 	*http.ServeMux
 }
 
@@ -54,21 +52,19 @@ func newStandby(client *v2client, peerHub *peerHub) *standby {
 		leaderAddr:  "",
 		clusterConf: config.NewClusterConfig(),
 
-		stopc: make(chan struct{}),
-
 		ServeMux: http.NewServeMux(),
 	}
 	s.Handle("/", handlerErr(s.serveRedirect))
 	return s
 }
 
-func (s *standby) run() {
+func (s *standby) run(stop chan struct{}) {
 	syncDuration := time.Millisecond * 100
 	nodes := s.peerHub.getSeeds()
 	for {
 		select {
 		case <-time.After(syncDuration):
-		case <-s.stopc:
+		case <-stop:
 			log.Printf("standby.stop\n")
 			return
 		}
@@ -88,10 +84,6 @@ func (s *standby) run() {
 	}
 }
 
-func (s *standby) stop() {
-	close(s.stopc)
-}
-
 func (s *standby) leaderInfo() (int64, string) {
 	s.mu.RLock()
 	defer s.mu.RUnlock()