Преглед на файлове

server: add add function

Yicheng Qin преди 11 години
родител
ревизия
f34b77216f
променени са 2 файла, в които са добавени 120 реда и са изтрити 15 реда
  1. 37 0
      etcd/etcd.go
  2. 83 15
      etcd/etcd_test.go

+ 37 - 0
etcd/etcd.go

@@ -10,6 +10,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/config"
 	"github.com/coreos/etcd/config"
+	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
@@ -53,6 +54,7 @@ type Server struct {
 
 
 	proposal    chan v2Proposal
 	proposal    chan v2Proposal
 	node        *v2Raft
 	node        *v2Raft
+	addNodeC    chan raft.Config
 	removeNodeC chan raft.Config
 	removeNodeC chan raft.Config
 	t           *transporter
 	t           *transporter
 
 
@@ -91,6 +93,7 @@ func New(c *config.Config, id int64) *Server {
 			Node:   raft.New(id, defaultHeartbeat, defaultElection),
 			Node:   raft.New(id, defaultHeartbeat, defaultElection),
 			result: make(map[wait]chan interface{}),
 			result: make(map[wait]chan interface{}),
 		},
 		},
+		addNodeC:    make(chan raft.Config),
 		removeNodeC: make(chan raft.Config),
 		removeNodeC: make(chan raft.Config),
 		t:           newTransporter(tc),
 		t:           newTransporter(tc),
 
 
@@ -177,6 +180,37 @@ func (s *Server) Join() {
 	s.run()
 	s.run()
 }
 }
 
 
+func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
+	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
+	index := s.Index()
+
+	_, err := s.Get(p, false, false)
+	if err == nil {
+		return fmt.Errorf("existed node")
+	}
+	if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
+		return err
+	}
+	for {
+		if s.mode == stop {
+			return fmt.Errorf("server is stopped")
+		}
+		s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}
+		w, err := s.Watch(p, true, false, index+1)
+		if err != nil {
+			return err
+		}
+		select {
+		case v := <-w.EventChan:
+			if v.Action == store.Set {
+				return nil
+			}
+			index = v.Index()
+		case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+		}
+	}
+}
+
 func (s *Server) Remove(id int64) error {
 func (s *Server) Remove(id int64) error {
 	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
 	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
 	index := s.Index()
 	index := s.Index()
@@ -221,6 +255,7 @@ func (s *Server) run() {
 
 
 func (s *Server) runParticipant() {
 func (s *Server) runParticipant() {
 	node := s.node
 	node := s.node
+	addNodeC := s.addNodeC
 	removeNodeC := s.removeNodeC
 	removeNodeC := s.removeNodeC
 	recv := s.t.recv
 	recv := s.t.recv
 	ticker := time.NewTicker(s.tickDuration)
 	ticker := time.NewTicker(s.tickDuration)
@@ -236,6 +271,8 @@ func (s *Server) runParticipant() {
 		select {
 		select {
 		case p := <-proposal:
 		case p := <-proposal:
 			node.Propose(p)
 			node.Propose(p)
+		case c := <-addNodeC:
+			node.UpdateConf(raft.AddNode, &c)
 		case c := <-removeNodeC:
 		case c := <-removeNodeC:
 			node.UpdateConf(raft.RemoveNode, &c)
 			node.UpdateConf(raft.RemoveNode, &c)
 		case msg := <-recv:
 		case msg := <-recv:

+ 83 - 15
etcd/etcd_test.go

@@ -76,6 +76,69 @@ func TestV2Redirect(t *testing.T) {
 	afterTest(t)
 	afterTest(t)
 }
 }
 
 
+func TestAdd(t *testing.T) {
+	tests := []struct {
+		size  int
+		round int
+	}{
+		{3, 5},
+		{4, 5},
+		{5, 5},
+		{6, 5},
+	}
+
+	for _, tt := range tests {
+		es := make([]*Server, tt.size)
+		hs := make([]*httptest.Server, tt.size)
+		for i := 0; i < tt.size; i++ {
+			c := config.New()
+			if i > 0 {
+				c.Peers = []string{hs[0].URL}
+			}
+			es[i], hs[i] = initTestServer(c, int64(i), false)
+		}
+
+		go es[0].Bootstrap()
+
+		for i := 1; i < tt.size; i++ {
+			var index uint64
+			for {
+				lead := es[0].node.Leader()
+				if lead != -1 {
+					index = es[lead].Index()
+					ne := es[i]
+					if err := es[lead].Add(ne.id, ne.raftPubAddr, ne.pubAddr); err == nil {
+						break
+					}
+				}
+				runtime.Gosched()
+			}
+			go es[i].run()
+
+			for j := 0; j <= i; j++ {
+				w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1)
+				if err != nil {
+					t.Errorf("#%d on %d: %v", i, j, err)
+					break
+				}
+				v := <-w.EventChan
+				ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i)
+				if v.Node.Key != ww {
+					t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww)
+				}
+			}
+		}
+
+		for i := range hs {
+			es[len(hs)-i-1].Stop()
+		}
+		for i := range hs {
+			hs[len(hs)-i-1].Close()
+		}
+		afterTest(t)
+	}
+}
+
 func TestRemove(t *testing.T) {
 func TestRemove(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		size  int
 		size  int
@@ -155,21 +218,7 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 	for i := range es {
 	for i := range es {
 		c := config.New()
 		c := config.New()
 		c.Peers = []string{seed}
 		c.Peers = []string{seed}
-		es[i] = New(c, int64(i))
-		es[i].SetTick(time.Millisecond * 5)
-		m := http.NewServeMux()
-		m.Handle("/", es[i])
-		m.Handle("/raft", es[i].t)
-		m.Handle("/raft/", es[i].t)
-
-		if tls {
-			hs[i] = httptest.NewTLSServer(m)
-		} else {
-			hs[i] = httptest.NewServer(m)
-		}
-
-		es[i].raftPubAddr = hs[i].URL
-		es[i].pubAddr = hs[i].URL
+		es[i], hs[i] = initTestServer(c, int64(i), tls)
 
 
 		if i == bootstrapper {
 		if i == bootstrapper {
 			seed = hs[i].URL
 			seed = hs[i].URL
@@ -188,6 +237,25 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 	return es, hs
 	return es, hs
 }
 }
 
 
+func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptest.Server) {
+	e = New(c, id)
+	e.SetTick(time.Millisecond * 5)
+	m := http.NewServeMux()
+	m.Handle("/", e)
+	m.Handle("/raft", e.t)
+	m.Handle("/raft/", e.t)
+
+	if tls {
+		h = httptest.NewTLSServer(m)
+	} else {
+		h = httptest.NewServer(m)
+	}
+
+	e.raftPubAddr = h.URL
+	e.pubAddr = h.URL
+	return
+}
+
 func waitCluster(t *testing.T, es []*Server) {
 func waitCluster(t *testing.T, es []*Server) {
 	n := len(es)
 	n := len(es)
 	for i, e := range es {
 	for i, e := range es {