Browse Source

server: cleanup participant stop

Yicheng Qin 11 years ago
parent
commit
55c4a3307d
2 changed files with 13 additions and 16 deletions
  1. 12 15
      etcd/participant.go
  2. 1 1
      etcd/v2_store.go

+ 12 - 15
etcd/participant.go

@@ -24,7 +24,6 @@ import (
 	"net/http"
 	"os"
 	"path"
-	"sync"
 	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
@@ -79,9 +78,8 @@ type participant struct {
 	rh *raftHandler
 	w  *wal.WAL
 
-	stopped bool
-	mu      sync.Mutex
-	stopc   chan struct{}
+	stopc       chan struct{}
+	stopNotifyc chan struct{}
 
 	*http.ServeMux
 }
@@ -102,7 +100,8 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 		},
 		Store: store.New(),
 
-		stopc: make(chan struct{}),
+		stopc:       make(chan struct{}),
+		stopNotifyc: make(chan struct{}),
 
 		ServeMux: http.NewServeMux(),
 	}
@@ -153,7 +152,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 }
 
 func (p *participant) run() {
-	defer p.w.Close()
+	defer p.cleanup()
 
 	if p.node.IsEmpty() {
 		seeds := p.peerHub.getSeeds()
@@ -220,7 +219,6 @@ func (p *participant) run() {
 		p.save(ents, node.UnstableState())
 		p.send(node.Msgs())
 		if node.IsRemoved() {
-			p.stop()
 			log.Printf("id=%x participant.end\n", p.id)
 			return
 		}
@@ -236,15 +234,14 @@ func (p *participant) run() {
 }
 
 func (p *participant) stop() {
-	p.mu.Lock()
-	defer p.mu.Unlock()
-	if p.stopped {
-		return
-	}
-	p.stopped = true
 	close(p.stopc)
 }
 
+func (p *participant) cleanup() {
+	p.w.Close()
+	close(p.stopNotifyc)
+}
+
 func (p *participant) raftHandler() http.Handler {
 	return p.rh
 }
@@ -287,7 +284,7 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 		w.Remove()
 		log.Printf("id=%x participant.add watchErr=timeout\n", p.id)
 		return tmpErr
-	case <-p.stopc:
+	case <-p.stopNotifyc:
 		return stopErr
 	}
 }
@@ -327,7 +324,7 @@ func (p *participant) remove(id int64) error {
 		w.Remove()
 		log.Printf("id=%x participant.remove watchErr=timeout\n", p.id)
 		return tmpErr
-	case <-p.stopc:
+	case <-p.stopNotifyc:
 		return stopErr
 	}
 }

+ 1 - 1
etcd/v2_store.go

@@ -88,7 +88,7 @@ func (p *participant) do(c *Cmd) (*store.Event, error) {
 	var ret interface{}
 	select {
 	case ret = <-pp.ret:
-	case <-p.stopc:
+	case <-p.stopNotifyc:
 		return nil, fmt.Errorf("stop serving")
 	}