Browse Source

server: add remove function

Yicheng Qin 11 years ago
parent
commit
98fdbaaae0
2 changed files with 120 additions and 0 deletions
  1. 32 0
      etcd/etcd.go
  2. 88 0
      etcd/etcd_test.go

+ 32 - 0
etcd/etcd.go

@@ -175,6 +175,23 @@ func (s *Server) Join() {
 	s.run()
 	s.run()
 }
 }
 
 
+func (s *Server) Remove(id int) {
+	d, err := json.Marshal(&raft.Config{NodeId: s.id})
+	if err != nil {
+		panic(err)
+	}
+
+	b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 2, Data: d}}})
+	if err != nil {
+		panic(err)
+	}
+
+	if err := s.t.send(s.raftPubAddr+raftPrefix, b); err != nil {
+		log.Println(err)
+	}
+	// todo(xiangli) WAIT for remove to be committed or retry...
+}
+
 func (s *Server) run() {
 func (s *Server) run() {
 	for {
 	for {
 		switch s.mode {
 		switch s.mode {
@@ -219,6 +236,12 @@ func (s *Server) runParticipant() {
 		}
 		}
 		s.apply(node.Next())
 		s.apply(node.Next())
 		s.send(node.Msgs())
 		s.send(node.Msgs())
+		if node.IsRemoved() {
+			// TODO: delete it after standby is implemented
+			s.mode = stop
+			log.Printf("Node: %d removed from participants\n", s.id)
+			return
+		}
 	}
 	}
 }
 }
 
 
@@ -250,6 +273,15 @@ func (s *Server) apply(ents []raft.Entry) {
 			s.nodes[cfg.Addr] = true
 			s.nodes[cfg.Addr] = true
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
 			s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
+		case raft.RemoveNode:
+			cfg := new(raft.Config)
+			if err := json.Unmarshal(ent.Data, cfg); err != nil {
+				log.Println(err)
+				break
+			}
+			log.Printf("Remove Node %x\n", cfg.NodeId)
+			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
+			s.Store.Delete(p, false, false)
 		default:
 		default:
 			panic("unimplemented")
 			panic("unimplemented")
 		}
 		}

+ 88 - 0
etcd/etcd_test.go

@@ -5,6 +5,7 @@ import (
 	"net/http"
 	"net/http"
 	"net/http/httptest"
 	"net/http/httptest"
 	"net/url"
 	"net/url"
+	"runtime"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -75,6 +76,93 @@ func TestV2Redirect(t *testing.T) {
 	afterTest(t)
 	afterTest(t)
 }
 }
 
 
+func TestRemove(t *testing.T) {
+	tests := []struct {
+		size  int
+		round int
+	}{
+		{3, 5},
+		{4, 5},
+		{5, 5},
+		{6, 5},
+	}
+
+	for _, tt := range tests {
+		es, hs := buildCluster(tt.size, false)
+		waitCluster(t, es)
+
+		// we don't remove the machine from 2-node cluster because it is
+		// not 100 percent safe in our raft.
+		// TODO(yichengq): improve it later.
+		for i := 0; i < tt.size-2; i++ {
+			// wait for leader to be stable for all live machines
+			// TODO(yichengq): change it later
+			var prevLead int64
+			var prevTerm int64
+			for j := i; j < tt.size; j++ {
+				id := int64(i)
+				lead := es[j].node.Leader()
+				term := es[j].node.Term()
+				fit := true
+				if j == i {
+					if lead < id {
+						fit = false
+					}
+				} else {
+					if lead != prevLead || term != prevTerm {
+						fit = false
+					}
+				}
+				if !fit {
+					j = i - 1
+					runtime.Gosched()
+					continue
+				}
+				prevLead = lead
+				prevTerm = term
+			}
+
+			index := es[i].Index()
+			es[i].Remove(i)
+
+			// i-th machine cannot be promised to apply the removal command of
+			// its own due to our non-optimized raft.
+			// TODO(yichengq): it should work when
+			// https://github.com/etcd-team/etcd/pull/7 is merged.
+			for j := i + 1; j < tt.size; j++ {
+				w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1)
+				if err != nil {
+					t.Errorf("#%d on %d: %v", i, j, err)
+					break
+				}
+				v := <-w.EventChan
+				ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i)
+				if v.Node.Key != ww {
+					t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww)
+				}
+			}
+
+			// may need to wait for msgDenial
+			// TODO(yichengq): no need to sleep here when previous issue is merged.
+			if es[i].mode == stop {
+				continue
+			}
+			time.Sleep(defaultElection * defaultTickDuration)
+			if g := es[i].mode; g != stop {
+				t.Errorf("#%d: mode = %d, want stop", i, g)
+			}
+		}
+
+		for i := range hs {
+			es[len(hs)-i-1].Stop()
+		}
+		for i := range hs {
+			hs[len(hs)-i-1].Close()
+		}
+		afterTest(t)
+	}
+}
+
 func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 	bootstrapper := 0
 	bootstrapper := 0
 	es := make([]*Server, number)
 	es := make([]*Server, number)