Browse Source

server: make removal go through run loop

It should not send to raft endpoint directly.
Yicheng Qin 11 years ago
parent
commit
5fc5681cb4
3 changed files with 43 additions and 45 deletions
  1. 31 16
      etcd/etcd.go
  2. 9 26
      etcd/etcd_test.go
  3. 3 3
      raft/node.go

+ 31 - 16
etcd/etcd.go

@@ -51,9 +51,10 @@ type Server struct {
 	nodes        map[string]bool
 	tickDuration time.Duration
 
-	proposal chan v2Proposal
-	node     *v2Raft
-	t        *transporter
+	proposal    chan v2Proposal
+	node        *v2Raft
+	removeNodeC chan raft.Config
+	t           *transporter
 
 	store.Store
 
@@ -90,7 +91,8 @@ func New(c *config.Config, id int64) *Server {
 			Node:   raft.New(id, defaultHeartbeat, defaultElection),
 			result: make(map[wait]chan interface{}),
 		},
-		t: newTransporter(tc),
+		removeNodeC: make(chan raft.Config),
+		t:           newTransporter(tc),
 
 		Store: store.New(),
 
@@ -175,21 +177,31 @@ func (s *Server) Join() {
 	s.run()
 }
 
-func (s *Server) Remove(id int) {
-	d, err := json.Marshal(&raft.Config{NodeId: s.id})
-	if err != nil {
-		panic(err)
-	}
+func (s *Server) Remove(id int64) error {
+	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
+	index := s.Index()
 
-	b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 2, Data: d}}})
-	if err != nil {
-		panic(err)
+	if _, err := s.Get(p, false, false); err != nil {
+		return err
 	}
-
-	if err := s.t.send(s.raftPubAddr+raftPrefix, b); err != nil {
-		log.Println(err)
+	for {
+		if s.mode == stop {
+			return fmt.Errorf("server is stopped")
+		}
+		s.removeNodeC <- raft.Config{NodeId: id}
+		w, err := s.Watch(p, true, false, index+1)
+		if err != nil {
+			return err
+		}
+		select {
+		case v := <-w.EventChan:
+			if v.Action == store.Delete {
+				return nil
+			}
+			index = v.Index()
+		case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+		}
 	}
-	// todo(xiangli) WAIT for remove to be committed or retry...
 }
 
 func (s *Server) run() {
@@ -209,6 +221,7 @@ func (s *Server) run() {
 
 func (s *Server) runParticipant() {
 	node := s.node
+	removeNodeC := s.removeNodeC
 	recv := s.t.recv
 	ticker := time.NewTicker(s.tickDuration)
 	v2SyncTicker := time.NewTicker(time.Millisecond * 500)
@@ -223,6 +236,8 @@ func (s *Server) runParticipant() {
 		select {
 		case p := <-proposal:
 			node.Propose(p)
+		case c := <-removeNodeC:
+			node.UpdateConf(raft.RemoveNode, &c)
 		case msg := <-recv:
 			node.Step(*msg)
 		case <-ticker.C:

+ 9 - 26
etcd/etcd_test.go

@@ -95,36 +95,19 @@ func TestRemove(t *testing.T) {
 		// not 100 percent safe in our raft.
 		// TODO(yichengq): improve it later.
 		for i := 0; i < tt.size-2; i++ {
-			// wait for leader to be stable for all live machines
-			// TODO(yichengq): change it later
-			var prevLead int64
-			var prevTerm int64
-			for j := i; j < tt.size; j++ {
-				id := int64(i)
-				lead := es[j].node.Leader()
-				term := es[j].node.Term()
-				fit := true
-				if j == i {
-					if lead < id {
-						fit = false
+			id := int64(i)
+			var index uint64
+			for {
+				lead := es[id].node.Leader()
+				if lead != -1 {
+					index = es[lead].Index()
+					if err := es[lead].Remove(id); err == nil {
+						break
 					}
-				} else {
-					if lead != prevLead || term != prevTerm {
-						fit = false
-					}
-				}
-				if !fit {
-					j = i - 1
-					runtime.Gosched()
-					continue
 				}
-				prevLead = lead
-				prevTerm = term
+				runtime.Gosched()
 			}
 
-			index := es[i].Index()
-			es[i].Remove(i)
-
 			// i-th machine cannot be promised to apply the removal command of
 			// its own due to our non-optimized raft.
 			// TODO(yichengq): it should work when

+ 3 - 3
raft/node.go

@@ -74,10 +74,10 @@ func (n *Node) propose(t int64, data []byte) {
 func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) }
 
 func (n *Node) Add(id int64, addr string, context []byte) {
-	n.updateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
+	n.UpdateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
 }
 
-func (n *Node) Remove(id int64) { n.updateConf(RemoveNode, &Config{NodeId: id}) }
+func (n *Node) Remove(id int64) { n.UpdateConf(RemoveNode, &Config{NodeId: id}) }
 
 func (n *Node) Msgs() []Message { return n.sm.Msgs() }
 
@@ -164,7 +164,7 @@ func (n *Node) Tick() {
 	}
 }
 
-func (n *Node) updateConf(t int64, c *Config) {
+func (n *Node) UpdateConf(t int64, c *Config) {
 	data, err := json.Marshal(c)
 	if err != nil {
 		panic(err)