Browse Source

server: add standby

Yicheng Qin 11 years ago
parent
commit
a2c5c844a0
8 changed files with 356 additions and 57 deletions
  1. 170 39
      etcd/etcd.go
  2. 12 2
      etcd/etcd_functional_test.go
  3. 108 4
      etcd/etcd_test.go
  4. 1 5
      etcd/v2_admin.go
  5. 14 6
      etcd/v2_http.go
  6. 1 1
      etcd/v2_raft.go
  7. 47 0
      etcd/v2_standby.go
  8. 3 0
      etcd/v2_store.go

+ 170 - 39
etcd/etcd.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"log"
 	"net/http"
+	"net/url"
 	"path"
 	"time"
 
@@ -44,8 +45,9 @@ const (
 )
 
 var (
-	tmpErr        = fmt.Errorf("try again")
-	serverStopErr = fmt.Errorf("server is stopped")
+	tmpErr            = fmt.Errorf("try again")
+	raftStopErr       = fmt.Errorf("raft is stopped")
+	noneId      int64 = -1
 )
 
 type Server struct {
@@ -56,21 +58,30 @@ type Server struct {
 	id           int64
 	pubAddr      string
 	raftPubAddr  string
-	nodes        map[string]bool
 	tickDuration time.Duration
 
+	nodes  map[string]bool
+	client *v2client
+
+	// participant mode vars
 	proposal    chan v2Proposal
 	node        *v2Raft
 	addNodeC    chan raft.Config
 	removeNodeC chan raft.Config
 	t           *transporter
-	client      *v2client
+
+	// standby mode vars
+	leader      int64
+	leaderAddr  string
+	clusterConf *config.ClusterConfig
 
 	store.Store
 
-	stop chan struct{}
+	modeC chan int
+	stop  chan struct{}
 
-	http.Handler
+	participantHandler http.Handler
+	standbyHandler     http.Handler
 }
 
 func New(c *config.Config, id int64) *Server {
@@ -95,21 +106,20 @@ func New(c *config.Config, id int64) *Server {
 		pubAddr:      c.Addr,
 		raftPubAddr:  c.Peer.Addr,
 		nodes:        make(map[string]bool),
+		client:       newClient(tc),
 		tickDuration: defaultTickDuration,
-		proposal:     make(chan v2Proposal, maxBufferedProposal),
-		node: &v2Raft{
-			Node:   raft.New(id, defaultHeartbeat, defaultElection),
-			result: make(map[wait]chan interface{}),
-		},
-		addNodeC:    make(chan raft.Config),
-		removeNodeC: make(chan raft.Config),
-		t:           newTransporter(tc),
-		client:      newClient(tc),
 
 		Store: store.New(),
 
-		stop: make(chan struct{}),
+		modeC: make(chan int, 10),
+		stop:  make(chan struct{}),
 	}
+	node := &v2Raft{
+		Node:   raft.New(id, defaultHeartbeat, defaultElection),
+		result: make(map[wait]chan interface{}),
+	}
+	t := newTransporter(tc)
+	s.initParticipant(node, t)
 
 	for _, seed := range c.Peers {
 		s.nodes[seed] = true
@@ -123,7 +133,10 @@ func New(c *config.Config, id int64) *Server {
 	m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats))
 	m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig))
 	m.Handle(v2adminMachinesPrefix, handlerErr(s.serveAdminMachines))
-	s.Handler = m
+	s.participantHandler = m
+	m = http.NewServeMux()
+	m.Handle("/", handlerErr(s.serveRedirect))
+	s.standbyHandler = m
 	return s
 }
 
@@ -132,7 +145,7 @@ func (s *Server) SetTick(d time.Duration) {
 }
 
 func (s *Server) RaftHandler() http.Handler {
-	return s.t
+	return http.HandlerFunc(s.ServeHTTPRaft)
 }
 
 func (s *Server) ClusterConfig() *config.ClusterConfig {
@@ -216,10 +229,15 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 		return tmpErr
 	}
 
+	if s.mode != participant {
+		return raftStopErr
+	}
 	select {
 	case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
-	case <-s.stop:
-		return serverStopErr
+	default:
+		w.Remove()
+		log.Println("unable to send out addNode proposal")
+		return tmpErr
 	}
 
 	select {
@@ -229,13 +247,10 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 		}
 		log.Println("add error: action =", v.Action)
 		return tmpErr
-	case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+	case <-time.After(6 * defaultHeartbeat * s.tickDuration):
 		w.Remove()
 		log.Println("add error: wait timeout")
 		return tmpErr
-	case <-s.stop:
-		w.Remove()
-		return serverStopErr
 	}
 }
 
@@ -247,10 +262,14 @@ func (s *Server) Remove(id int64) error {
 		return nil
 	}
 
+	if s.mode != participant {
+		return raftStopErr
+	}
 	select {
 	case s.removeNodeC <- raft.Config{NodeId: id}:
-	case <-s.stop:
-		return serverStopErr
+	default:
+		log.Println("unable to send out removeNode proposal")
+		return tmpErr
 	}
 
 	// TODO(xiangli): do not need to watch if the
@@ -268,18 +287,56 @@ func (s *Server) Remove(id int64) error {
 		}
 		log.Println("remove error: action =", v.Action)
 		return tmpErr
-	case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+	case <-time.After(6 * defaultHeartbeat * s.tickDuration):
 		w.Remove()
 		log.Println("remove error: wait timeout")
 		return tmpErr
-	case <-s.stop:
-		w.Remove()
-		return serverStopErr
 	}
 }
 
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	switch s.mode {
+	case participant:
+		s.participantHandler.ServeHTTP(w, r)
+	case standby:
+		s.standbyHandler.ServeHTTP(w, r)
+	case stop:
+		http.Error(w, "server is stopped", http.StatusInternalServerError)
+	}
+}
+
+func (s *Server) ServeHTTPRaft(w http.ResponseWriter, r *http.Request) {
+	switch s.mode {
+	case participant:
+		s.t.ServeHTTP(w, r)
+	case standby:
+		http.NotFound(w, r)
+	case stop:
+		http.Error(w, "server is stopped", http.StatusInternalServerError)
+	}
+}
+
+func (s *Server) initParticipant(node *v2Raft, t *transporter) {
+	s.proposal = make(chan v2Proposal, maxBufferedProposal)
+	s.node = node
+	s.addNodeC = make(chan raft.Config, 1)
+	s.removeNodeC = make(chan raft.Config, 1)
+	s.t = t
+}
+
+func (s *Server) initStandby(leader int64, leaderAddr string, conf *config.ClusterConfig) {
+	s.leader = leader
+	s.leaderAddr = leaderAddr
+	s.clusterConf = conf
+}
+
 func (s *Server) run() {
 	for {
+		select {
+		case s.modeC <- s.mode:
+		default:
+		}
+
 		switch s.mode {
 		case participant:
 			s.runParticipant()
@@ -298,7 +355,7 @@ func (s *Server) runParticipant() {
 	recv := s.t.recv
 	ticker := time.NewTicker(s.tickDuration)
 	v2SyncTicker := time.NewTicker(time.Millisecond * 500)
-	defer node.StopProposalWaiters()
+	defer s.node.StopProposalWaiters()
 
 	var proposal chan v2Proposal
 	var addNodeC, removeNodeC chan raft.Config
@@ -332,16 +389,54 @@ func (s *Server) runParticipant() {
 		s.apply(node.Next())
 		s.send(node.Msgs())
 		if node.IsRemoved() {
-			// TODO: delete it after standby is implemented
-			log.Printf("Node: %d removed from participants\n", s.id)
-			s.Stop()
-			return
+			break
 		}
 	}
+
+	log.Printf("Node: %d removed to standby mode\n", s.id)
+	leader := noneId
+	leaderAddr := ""
+	if s.node.HasLeader() && !s.node.IsLeader() {
+		leader = s.node.Leader()
+		leaderAddr = s.fetchAddrFromStore(s.leader)
+	}
+	conf := s.ClusterConfig()
+	s.initStandby(leader, leaderAddr, conf)
+	s.mode = standby
+	return
 }
 
 func (s *Server) runStandby() {
-	panic("unimplemented")
+	syncDuration := time.Duration(int64(s.clusterConf.SyncInterval * float64(time.Second)))
+	for {
+		select {
+		case <-time.After(syncDuration):
+		case <-s.stop:
+			log.Printf("Node: %d stopped\n", s.id)
+			return
+		}
+
+		if err := s.syncCluster(); err != nil {
+			continue
+		}
+		if err := s.standbyJoin(s.leaderAddr); err != nil {
+			continue
+		}
+		break
+	}
+
+	log.Printf("Node: %d removed to participant mode\n", s.id)
+	// TODO(yichengq): use old v2Raft
+	// 1. reject proposal in leader state when sm is removed
+	// 2. record removeIndex in node to ignore msgDenial and old removal
+	s.Store = store.New()
+	node := &v2Raft{
+		Node:   raft.New(s.id, defaultHeartbeat, defaultElection),
+		result: make(map[wait]chan interface{}),
+	}
+	s.initParticipant(node, s.t)
+	s.mode = participant
+	return
 }
 
 func (s *Server) apply(ents []raft.Entry) {
@@ -376,10 +471,9 @@ func (s *Server) apply(ents []raft.Entry) {
 				break
 			}
 			log.Printf("Remove Node %x\n", cfg.NodeId)
+			delete(s.nodes, s.fetchAddrFromStore(cfg.NodeId))
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
-			if _, err := s.Store.Delete(p, false, false); err == nil {
-				delete(s.nodes, cfg.Addr)
-			}
+			s.Store.Delete(p, false, false)
 		default:
 			panic("unimplemented")
 		}
@@ -413,6 +507,17 @@ func (s *Server) send(msgs []raft.Message) {
 	}
 }
 
+func (s *Server) setClusterConfig(c *config.ClusterConfig) error {
+	b, err := json.Marshal(c)
+	if err != nil {
+		return err
+	}
+	if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil {
+		return err
+	}
+	return nil
+}
+
 func (s *Server) fetchAddr(nodeId int64) error {
 	for seed := range s.nodes {
 		if err := s.t.fetchAddr(seed, nodeId); err == nil {
@@ -421,3 +526,29 @@ func (s *Server) fetchAddr(nodeId int64) error {
 	}
 	return fmt.Errorf("cannot fetch the address of node %d", nodeId)
 }
+
+func (s *Server) fetchAddrFromStore(nodeId int64) string {
+	p := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId))
+	if ev, err := s.Get(p, false, false); err == nil {
+		if m, err := url.ParseQuery(*ev.Node.Value); err == nil {
+			return m["raft"][0]
+		}
+	}
+	return ""
+}
+
+func (s *Server) standbyJoin(addr string) error {
+	if s.clusterConf.ActiveSize <= len(s.nodes) {
+		return fmt.Errorf("full cluster")
+	}
+	info := &context{
+		MinVersion: store.MinVersion(),
+		MaxVersion: store.MaxVersion(),
+		ClientURL:  s.pubAddr,
+		PeerURL:    s.raftPubAddr,
+	}
+	if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil {
+		return err
+	}
+	return nil
+}

+ 12 - 2
etcd/etcd_functional_test.go

@@ -104,7 +104,17 @@ type leadterm struct {
 	term int64
 }
 
-func waitLeader(es []*Server) {
+func waitActiveLeader(es []*Server) (lead, term int64) {
+	for {
+		if l, t := waitLeader(es); l >= 0 && es[l].mode == participant {
+			return l, t
+		}
+	}
+}
+
+// waitLeader waits until all alive servers are checked to have the same leader.
+// WARNING: The lead returned is not guaranteed to be actual leader.
+func waitLeader(es []*Server) (lead, term int64) {
 	for {
 		ls := make([]leadterm, 0, len(es))
 		for i := range es {
@@ -117,7 +127,7 @@ func waitLeader(es []*Server) {
 			}
 		}
 		if isSameLead(ls) {
-			return
+			return ls[0].lead, ls[0].term
 		}
 		time.Sleep(es[0].tickDuration * defaultElection)
 	}

+ 108 - 4
etcd/etcd_test.go

@@ -2,6 +2,7 @@ package etcd
 
 import (
 	"fmt"
+	"math/rand"
 	"net/http"
 	"net/http/httptest"
 	"net/url"
@@ -9,6 +10,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/config"
+	"github.com/coreos/etcd/store"
 )
 
 func TestMultipleNodes(t *testing.T) {
@@ -107,7 +109,7 @@ func TestAdd(t *testing.T) {
 				switch err {
 				case tmpErr:
 					time.Sleep(defaultElection * es[0].tickDuration)
-				case serverStopErr:
+				case raftStopErr:
 					t.Fatalf("#%d on %d: unexpected stop", i, lead)
 				default:
 					t.Fatal(err)
@@ -147,6 +149,8 @@ func TestRemove(t *testing.T) {
 		// not 100 percent safe in our raft.
 		// TODO(yichengq): improve it later.
 		for i := 0; i < tt-2; i++ {
+			<-es[i].modeC
+
 			id := int64(i)
 			send := id
 			for {
@@ -168,15 +172,19 @@ func TestRemove(t *testing.T) {
 				switch err {
 				case tmpErr:
 					time.Sleep(defaultElection * 5 * time.Millisecond)
-				case serverStopErr:
+				case raftStopErr:
 					if lead == id {
 						break
 					}
 				default:
 					t.Fatal(err)
 				}
+
+			}
+
+			if g := <-es[i].modeC; g != standby {
+				t.Errorf("#%d: mode = %d, want standby", i, g)
 			}
-			<-es[i].stop
 		}
 
 		for i := range es {
@@ -189,6 +197,79 @@ func TestRemove(t *testing.T) {
 	afterTest(t)
 }
 
+// TODO(yichengq): cannot handle previous msgDenial correctly now
+func TestModeSwitch(t *testing.T) {
+	size := 5
+	round := 1
+
+	for i := 0; i < size; i++ {
+		es, hs := buildCluster(size, false)
+		waitCluster(t, es)
+
+		if g := <-es[i].modeC; g != participant {
+			t.Fatalf("#%d: mode = %d, want participant", i, g)
+		}
+
+		config := config.NewClusterConfig()
+		config.SyncInterval = 0
+		id := int64(i)
+		for j := 0; j < round; j++ {
+			lead, _ := waitActiveLeader(es)
+			// cluster only demotes follower
+			if lead == id {
+				continue
+			}
+
+			config.ActiveSize = size - 1
+			if err := es[lead].setClusterConfig(config); err != nil {
+				t.Fatalf("#%d: setClusterConfig err = %v", i, err)
+			}
+			if err := es[lead].Remove(id); err != nil {
+				t.Fatalf("#%d: remove err = %v", i, err)
+			}
+
+			if g := <-es[i].modeC; g != standby {
+				t.Fatalf("#%d: mode = %d, want standby", i, g)
+			}
+			if g := len(es[i].modeC); g != 0 {
+				t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC)
+			}
+
+			if g := es[i].leader; g != lead {
+				t.Errorf("#%d: lead = %d, want %d", i, g, lead)
+			}
+
+			config.ActiveSize = size
+			if err := es[lead].setClusterConfig(config); err != nil {
+				t.Fatalf("#%d: setClusterConfig err = %v", i, err)
+			}
+
+			if g := <-es[i].modeC; g != participant {
+				t.Fatalf("#%d: mode = %d, want participant", i, g)
+			}
+			//			if g := len(es[i].modeC); g != 0 {
+			//				t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC)
+			//			}
+
+			//			if err := checkParticipant(i, es); err != nil {
+			//				t.Errorf("#%d: check alive err = %v", i, err)
+			//			}
+		}
+
+		//		if g := len(es[i].modeC); g != 0 {
+		//			t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC)
+		//		}
+
+		for i := range hs {
+			es[len(hs)-i-1].Stop()
+		}
+		for i := range hs {
+			hs[len(hs)-i-1].Close()
+		}
+	}
+	afterTest(t)
+}
+
 func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 	bootstrapper := 0
 	es := make([]*Server, number)
@@ -197,7 +278,9 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 
 	for i := range es {
 		c := config.New()
-		c.Peers = []string{seed}
+		if seed != "" {
+			c.Peers = []string{seed}
+		}
 		es[i], hs[i] = initTestServer(c, int64(i), tls)
 
 		if i == bootstrapper {
@@ -262,3 +345,24 @@ func waitCluster(t *testing.T, es []*Server) {
 		}
 	}
 }
+
+// checkParticipant checks the i-th server works well as participant.
+func checkParticipant(i int, es []*Server) error {
+	lead, _ := waitActiveLeader(es)
+	key := fmt.Sprintf("/%d", rand.Int31())
+	ev, err := es[lead].Set(key, false, "bar", store.Permanent)
+	if err != nil {
+		return err
+	}
+
+	w, err := es[i].Watch(key, false, false, ev.Index())
+	if err != nil {
+		return err
+	}
+	select {
+	case <-w.EventChan:
+	case <-time.After(8 * defaultHeartbeat * es[i].tickDuration):
+		return fmt.Errorf("watch timeout")
+	}
+	return nil
+}

+ 1 - 5
etcd/v2_admin.go

@@ -45,11 +45,7 @@ func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error
 			return err
 		}
 		c.Sanitize()
-		b, err := json.Marshal(c)
-		if err != nil {
-			return err
-		}
-		if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil {
+		if err := s.setClusterConfig(c); err != nil {
 			return err
 		}
 	default:

+ 14 - 6
etcd/v2_http.go

@@ -111,16 +111,24 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro
 		return fmt.Errorf("failed to parse node entry: %s", *e.Node.Value)
 	}
 
-	originalURL := r.URL
-	redirectURL, err := url.Parse(m["etcd"][0])
+	redirectAddr, err := s.buildRedirectURL(m["etcd"][0], r.URL)
 	if err != nil {
-		log.Println("redirect cannot parse url:", err)
-		return fmt.Errorf("redirect cannot parse url: %v", err)
+		log.Println("redirect cannot build new url:", err)
+		return err
+	}
+
+	http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
+	return nil
+}
+
+func (s *Server) buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) {
+	redirectURL, err := url.Parse(redirectAddr)
+	if err != nil {
+		return "", fmt.Errorf("redirect cannot parse url: %v", err)
 	}
 
 	redirectURL.Path = originalURL.Path
 	redirectURL.RawQuery = originalURL.RawQuery
 	redirectURL.Fragment = originalURL.Fragment
-	http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
-	return nil
+	return redirectURL.String(), nil
 }

+ 1 - 1
etcd/v2_raft.go

@@ -48,7 +48,7 @@ func (r *v2Raft) Sync() {
 
 func (r *v2Raft) StopProposalWaiters() {
 	for k, ch := range r.result {
-		ch <- fmt.Errorf("server is stopped or removed from participant")
+		ch <- raftStopErr
 		delete(r.result, k)
 	}
 }

+ 47 - 0
etcd/v2_standby.go

@@ -0,0 +1,47 @@
+package etcd
+
+import (
+	"fmt"
+	"net/http"
+	"strconv"
+)
+
+func (s *Server) serveRedirect(w http.ResponseWriter, r *http.Request) error {
+	if s.leader == noneId {
+		return fmt.Errorf("no leader in the cluster")
+	}
+	redirectAddr, err := s.buildRedirectURL(s.leaderAddr, r.URL)
+	if err != nil {
+		return err
+	}
+	http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
+	return nil
+}
+
+func (s *Server) syncCluster() error {
+	for node := range s.nodes {
+		machines, err := s.client.GetMachines(node)
+		if err != nil {
+			continue
+		}
+		config, err := s.client.GetClusterConfig(node)
+		if err != nil {
+			continue
+		}
+		s.nodes = make(map[string]bool)
+		for _, machine := range machines {
+			s.nodes[machine.PeerURL] = true
+			if machine.State == stateLeader {
+				id, err := strconv.ParseInt(machine.Name, 0, 64)
+				if err != nil {
+					return err
+				}
+				s.leader = id
+				s.leaderAddr = machine.PeerURL
+			}
+		}
+		s.clusterConf = config
+		return nil
+	}
+	return fmt.Errorf("unreachable cluster")
+}

+ 3 - 0
etcd/v2_store.go

@@ -61,6 +61,9 @@ func (s *Server) do(c *cmd) (*store.Event, error) {
 		ret:  make(chan interface{}, 1),
 	}
 
+	if s.mode != participant {
+		return nil, raftStopErr
+	}
 	select {
 	case s.proposal <- p:
 	default: