Browse Source

server: refactor add

Yicheng Qin 11 years ago
parent
commit
83e1fe77c8
2 changed files with 59 additions and 42 deletions
  1. 38 26
      etcd/etcd.go
  2. 21 16
      etcd/etcd_test.go

+ 38 - 26
etcd/etcd.go

@@ -42,7 +42,7 @@ const (
 )
 
 var (
-	removeTmpErr  = fmt.Errorf("remove: try again")
+	tmpErr        = fmt.Errorf("try again")
 	serverStopErr = fmt.Errorf("server is stopped")
 )
 
@@ -199,7 +199,6 @@ func (s *Server) Join() {
 
 func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
-	index := s.Index()
 
 	_, err := s.Get(p, false, false)
 	if err == nil {
@@ -208,26 +207,33 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 	if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
 		return err
 	}
-	for {
-		select {
-		case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
-		case <-s.stop:
-			return fmt.Errorf("server is stopped")
-		}
-		w, err := s.Watch(p, true, false, index+1)
-		if err != nil {
-			return err
-		}
-		select {
-		case v := <-w.EventChan:
-			if v.Action == store.Set {
-				return nil
-			}
-			index = v.Index()
-		case <-time.After(4 * defaultHeartbeat * s.tickDuration):
-		case <-s.stop:
-			return fmt.Errorf("server is stopped")
+
+	w, err := s.Watch(p, true, false, 0)
+	if err != nil {
+		log.Println("add error:", err)
+		return tmpErr
+	}
+
+	select {
+	case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
+	case <-s.stop:
+		return serverStopErr
+	}
+
+	select {
+	case v := <-w.EventChan:
+		if v.Action == store.Set {
+			return nil
 		}
+		log.Println("add error: action =", v.Action)
+		return tmpErr
+	case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+		w.Remove()
+		log.Println("add error: wait timeout")
+		return tmpErr
+	case <-s.stop:
+		w.Remove()
+		return serverStopErr
 	}
 }
 
@@ -249,7 +255,8 @@ func (s *Server) Remove(id int64) error {
 	// removal target is self
 	w, err := s.Watch(p, true, false, v.Index()+1)
 	if err != nil {
-		return removeTmpErr
+		log.Println("remove error:", err)
+		return tmpErr
 	}
 
 	select {
@@ -257,10 +264,12 @@ func (s *Server) Remove(id int64) error {
 		if v.Action == store.Delete {
 			return nil
 		}
-		return removeTmpErr
+		log.Println("remove error: action =", v.Action)
+		return tmpErr
 	case <-time.After(4 * defaultHeartbeat * s.tickDuration):
 		w.Remove()
-		return removeTmpErr
+		log.Println("remove error: wait timeout")
+		return tmpErr
 	case <-s.stop:
 		w.Remove()
 		return serverStopErr
@@ -284,18 +293,21 @@ func (s *Server) run() {
 
 func (s *Server) runParticipant() {
 	node := s.node
-	addNodeC := s.addNodeC
-	removeNodeC := s.removeNodeC
 	recv := s.t.recv
 	ticker := time.NewTicker(s.tickDuration)
 	v2SyncTicker := time.NewTicker(time.Millisecond * 500)
 
 	var proposal chan v2Proposal
+	var addNodeC, removeNodeC chan raft.Config
 	for {
 		if node.HasLeader() {
 			proposal = s.proposal
+			addNodeC = s.addNodeC
+			removeNodeC = s.removeNodeC
 		} else {
 			proposal = nil
+			addNodeC = nil
+			removeNodeC = nil
 		}
 		select {
 		case p := <-proposal:

+ 21 - 16
etcd/etcd_test.go

@@ -5,7 +5,6 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"net/url"
-	"runtime"
 	"testing"
 	"time"
 
@@ -101,31 +100,37 @@ func TestAdd(t *testing.T) {
 		go es[0].Bootstrap()
 
 		for i := 1; i < tt.size; i++ {
-			var index uint64
+			id := int64(i)
 			for {
 				lead := es[0].node.Leader()
-				if lead != -1 {
-					index = es[lead].Index()
-					ne := es[i]
-					if err := es[lead].Add(ne.id, ne.raftPubAddr, ne.pubAddr); err == nil {
-						break
-					}
+				if lead == -1 {
+					time.Sleep(defaultElection * es[0].tickDuration)
+					continue
+				}
+
+				err := es[lead].Add(id, es[id].raftPubAddr, es[id].pubAddr)
+				if err == nil {
+					break
+				}
+				switch err {
+				case tmpErr:
+					time.Sleep(defaultElection * es[0].tickDuration)
+				case serverStopErr:
+					t.Fatalf("#%d on %d: unexpected stop", i, lead)
+				default:
+					t.Fatal(err)
 				}
-				runtime.Gosched()
 			}
 			go es[i].run()
 
 			for j := 0; j <= i; j++ {
-				w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1)
+				p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id)
+				w, err := es[j].Watch(p, false, false, 1)
 				if err != nil {
 					t.Errorf("#%d on %d: %v", i, j, err)
 					break
 				}
-				v := <-w.EventChan
-				ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i)
-				if v.Node.Key != ww {
-					t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww)
-				}
+				<-w.EventChan
 			}
 		}
 
@@ -169,7 +174,7 @@ func TestRemove(t *testing.T) {
 					break
 				}
 				switch err {
-				case removeTmpErr:
+				case tmpErr:
 					time.Sleep(defaultElection * 5 * time.Millisecond)
 				case serverStopErr:
 					if lead == id {