Browse Source

participant: stop http serving when stopped

Yicheng Qin 11 years ago
parent
commit
6059db1f4b
4 changed files with 26 additions and 9 deletions
  1. 0 5
      etcd/etcd.go
  2. 2 2
      etcd/etcd_test.go
  3. 16 1
      etcd/participant.go
  4. 8 1
      etcd/v2_store.go

+ 0 - 5
etcd/etcd.go

@@ -2,7 +2,6 @@ package etcd
 
 import (
 	"crypto/tls"
-	"errors"
 	"log"
 	"net/http"
 	"time"
@@ -18,10 +17,6 @@ const (
 	stopMode
 )
 
-var (
-	stopErr = errors.New("stopped")
-)
-
 type Server struct {
 	config       *config.Config
 	id           int64

+ 2 - 2
etcd/etcd_test.go

@@ -110,7 +110,7 @@ func TestAdd(t *testing.T) {
 				switch err {
 				case tmpErr:
 					time.Sleep(defaultElection * es[0].tickDuration)
-				case raftStopErr:
+				case raftStopErr, stopErr:
 					t.Fatalf("#%d on %d: unexpected stop", i, lead)
 				default:
 					t.Fatal(err)
@@ -179,7 +179,7 @@ func TestRemove(t *testing.T) {
 				switch err {
 				case tmpErr:
 					time.Sleep(defaultElection * 5 * time.Millisecond)
-				case raftStopErr:
+				case raftStopErr, stopErr:
 					if lead == id {
 						break
 					}

+ 16 - 1
etcd/participant.go

@@ -7,6 +7,7 @@ import (
 	"net/http"
 	"net/url"
 	"path"
+	"sync"
 	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
@@ -36,6 +37,7 @@ const (
 
 var (
 	tmpErr            = fmt.Errorf("try again")
+	stopErr           = fmt.Errorf("server is stopped")
 	raftStopErr       = fmt.Errorf("raft is stopped")
 	noneId      int64 = -1
 )
@@ -57,7 +59,9 @@ type participant struct {
 	store.Store
 	rh *raftHandler
 
-	stopc chan struct{}
+	stopped bool
+	mu      sync.Mutex
+	stopc   chan struct{}
 
 	*http.ServeMux
 }
@@ -152,12 +156,19 @@ func (p *participant) run() int64 {
 		p.send(node.Msgs())
 		if node.IsRemoved() {
 			log.Printf("Participant %d return\n", p.id)
+			p.stop()
 			return standbyMode
 		}
 	}
 }
 
 func (p *participant) stop() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.stopped {
+		return
+	}
+	p.stopped = true
 	close(p.stopc)
 }
 
@@ -201,6 +212,8 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 		w.Remove()
 		log.Println("add error: wait timeout")
 		return tmpErr
+	case <-p.stopc:
+		return stopErr
 	}
 }
 
@@ -238,6 +251,8 @@ func (p *participant) remove(id int64) error {
 		w.Remove()
 		log.Println("remove error: wait timeout")
 		return tmpErr
+	case <-p.stopc:
+		return stopErr
 	}
 }
 

+ 8 - 1
etcd/v2_store.go

@@ -67,7 +67,14 @@ func (p *participant) do(c *cmd) (*store.Event, error) {
 		return nil, fmt.Errorf("unable to send out the proposal")
 	}
 
-	switch t := (<-pp.ret).(type) {
+	var ret interface{}
+	select {
+	case ret = <-pp.ret:
+	case <-p.stopc:
+		return nil, fmt.Errorf("stop serving")
+	}
+
+	switch t := ret.(type) {
 	case *store.Event:
 		return t, nil
 	case error: