Browse Source

etcd: separate raft and client port

Xiang Li 11 years ago
parent
commit
c3f8eabac3
8 changed files with 82 additions and 40 deletions
  1. 13 8
      etcd/etcd.go
  2. 9 2
      etcd/etcd_test.go
  3. 28 16
      etcd/transporter.go
  4. 14 5
      etcd/v2_http.go
  5. 6 0
      main.go
  6. 1 1
      raft/cluster_test.go
  7. 7 4
      raft/node.go
  8. 4 4
      raft/node_test.go

+ 13 - 8
etcd/etcd.go

@@ -32,8 +32,9 @@ const (
 type Server struct {
 	config *config.Config
 
-	id           int
+	id           int64
 	pubAddr      string
+	raftPubAddr  string
 	nodes        map[string]bool
 	tickDuration time.Duration
 
@@ -48,7 +49,7 @@ type Server struct {
 	http.Handler
 }
 
-func New(c *config.Config, id int) *Server {
+func New(c *config.Config, id int64) *Server {
 	if err := c.Sanitize(); err != nil {
 		log.Fatalf("failed sanitizing configuration: %v", err)
 	}
@@ -57,6 +58,7 @@ func New(c *config.Config, id int) *Server {
 		config:       c,
 		id:           id,
 		pubAddr:      c.Addr,
+		raftPubAddr:  c.Peer.Addr,
 		nodes:        make(map[string]bool),
 		tickDuration: defaultTickDuration,
 		proposal:     make(chan v2Proposal),
@@ -78,7 +80,6 @@ func New(c *config.Config, id int) *Server {
 	m := http.NewServeMux()
 	//m.Handle("/HEAD", handlerErr(s.serveHead))
 	m.Handle(v2Prefix+"/", handlerErr(s.serveValue))
-	m.Handle("/raft", s.t)
 	m.Handle(v2machinePrefix, handlerErr(s.serveMachines))
 	m.Handle(v2peersPrefix, handlerErr(s.serveMachines))
 	m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader))
@@ -91,6 +92,10 @@ func (s *Server) SetTick(d time.Duration) {
 	s.tickDuration = d
 }
 
+func (s *Server) RaftHandler() http.Handler {
+	return s.t
+}
+
 func (s *Server) Run() {
 	if len(s.config.Peers) == 0 {
 		s.Bootstrap()
@@ -107,14 +112,14 @@ func (s *Server) Stop() {
 func (s *Server) Bootstrap() {
 	log.Println("starting a bootstrap node")
 	s.node.Campaign()
-	s.node.Add(s.id, s.pubAddr)
+	s.node.Add(s.id, s.raftPubAddr, []byte(s.pubAddr))
 	s.apply(s.node.Next())
 	s.run()
 }
 
 func (s *Server) Join() {
 	log.Println("joining cluster via peers", s.config.Peers)
-	d, err := json.Marshal(&raft.Config{s.id, s.pubAddr})
+	d, err := json.Marshal(&raft.Config{s.id, s.raftPubAddr, []byte(s.pubAddr)})
 	if err != nil {
 		panic(err)
 	}
@@ -186,10 +191,10 @@ func (s *Server) apply(ents []raft.Entry) {
 				log.Println(err)
 				break
 			}
-			log.Printf("Add Node %x %v\n", cfg.NodeId, cfg.Addr)
+			log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
 			s.nodes[cfg.Addr] = true
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
-			s.Store.Set(p, false, cfg.Addr, store.Permanent)
+			s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
 		default:
 			panic("unimplemented")
 		}
@@ -223,7 +228,7 @@ func (s *Server) send(msgs []raft.Message) {
 	}
 }
 
-func (s *Server) fetchAddr(nodeId int) error {
+func (s *Server) fetchAddr(nodeId int64) error {
 	for seed := range s.nodes {
 		if err := s.t.fetchAddr(seed, nodeId); err == nil {
 			return nil

+ 9 - 2
etcd/etcd_test.go

@@ -2,6 +2,7 @@ package etcd
 
 import (
 	"fmt"
+	"net/http"
 	"net/http/httptest"
 	"testing"
 	"time"
@@ -34,9 +35,15 @@ func buildCluster(number int) ([]*Server, []*httptest.Server) {
 	for i := range es {
 		c := config.New()
 		c.Peers = []string{seed}
-		es[i] = New(c, i)
+		es[i] = New(c, int64(i))
 		es[i].SetTick(time.Millisecond * 5)
-		hs[i] = httptest.NewServer(es[i])
+		m := http.NewServeMux()
+		m.Handle("/", es[i])
+		m.Handle("/raft", es[i].t)
+		m.Handle("/raft/", es[i].t)
+
+		hs[i] = httptest.NewServer(m)
+		es[i].raftPubAddr = hs[i].URL
 		es[i].pubAddr = hs[i].URL
 
 		if i == bootstrapper {

+ 28 - 16
etcd/transporter.go

@@ -10,10 +10,10 @@ import (
 	"net/http"
 	"net/url"
 	"path"
+	"strconv"
 	"sync"
 
 	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/store"
 )
 
 var (
@@ -23,22 +23,27 @@ var (
 type transporter struct {
 	mu      sync.RWMutex
 	stopped bool
-	urls    map[int]string
+	urls    map[int64]string
 
 	recv   chan *raft.Message
 	client *http.Client
 	wg     sync.WaitGroup
+	*http.ServeMux
 }
 
 func newTransporter() *transporter {
 	tr := new(http.Transport)
 	c := &http.Client{Transport: tr}
 
-	return &transporter{
-		urls:   make(map[int]string),
+	t := &transporter{
+		urls:   make(map[int64]string),
 		recv:   make(chan *raft.Message, 512),
 		client: c,
 	}
+	t.ServeMux = http.NewServeMux()
+	t.ServeMux.HandleFunc("/raft/cfg/", t.serveCfg)
+	t.ServeMux.HandleFunc("/raft", t.serveRaft)
+	return t
 }
 
 func (t *transporter) stop() {
@@ -51,7 +56,7 @@ func (t *transporter) stop() {
 	tr.CloseIdleConnections()
 }
 
-func (t *transporter) set(nodeId int, rawurl string) error {
+func (t *transporter) set(nodeId int64, rawurl string) error {
 	u, err := url.Parse(rawurl)
 	if err != nil {
 		return err
@@ -63,7 +68,7 @@ func (t *transporter) set(nodeId int, rawurl string) error {
 	return nil
 }
 
-func (t *transporter) sendTo(nodeId int, data []byte) error {
+func (t *transporter) sendTo(nodeId int64, data []byte) error {
 	t.mu.RLock()
 	url := t.urls[nodeId]
 	t.mu.RUnlock()
@@ -93,13 +98,13 @@ func (t *transporter) send(addr string, data []byte) error {
 	return nil
 }
 
-func (t *transporter) fetchAddr(seedurl string, id int) error {
+func (t *transporter) fetchAddr(seedurl string, id int64) error {
 	u, err := url.Parse(seedurl)
 	if err != nil {
 		return fmt.Errorf("cannot parse the url of the given seed")
 	}
 
-	u.Path = path.Join(v2Prefix, v2machineKVPrefix, fmt.Sprint(id))
+	u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
 	resp, err := t.client.Get(u.String())
 	if err != nil {
 		return fmt.Errorf("cannot reach %v", u)
@@ -111,19 +116,13 @@ func (t *transporter) fetchAddr(seedurl string, id int) error {
 		return fmt.Errorf("cannot reach %v", u)
 	}
 
-	event := new(store.Event)
-	err = json.Unmarshal(b, event)
-	if err != nil {
-		panic(fmt.Sprintf("fetchAddr: ", err))
-	}
-
-	if err := t.set(id, *event.Node.Value); err != nil {
+	if err := t.set(id, string(b)); err != nil {
 		return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
 	}
 	return nil
 }
 
-func (t *transporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
 	msg := new(raft.Message)
 	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
 		log.Println(err)
@@ -140,3 +139,16 @@ func (t *transporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	return
 }
+
+func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
+	id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+		return
+	}
+	if u, ok := t.urls[id]; ok {
+		w.Write([]byte(u))
+		return
+	}
+	http.Error(w, "Not Found", http.StatusNotFound)
+}

+ 14 - 5
etcd/v2_http.go

@@ -37,7 +37,11 @@ func (s *Server) serveMachines(w http.ResponseWriter, r *http.Request) error {
 	}
 	ns := make([]string, len(v.Node.Nodes))
 	for i, n := range v.Node.Nodes {
-		ns[i] = *n.Value
+		m, err := url.ParseQuery(*n.Value)
+		if err != nil {
+			continue
+		}
+		ns[i] = m["etcd"][0]
 	}
 	w.Write([]byte(strings.Join(ns, ",")))
 	return nil
@@ -95,15 +99,20 @@ func (w *HEADResponseWriter) Write([]byte) (int, error) {
 	return 0, nil
 }
 
-func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int) error {
-	baseURL := s.t.urls[id]
-	if len(baseURL) == 0 {
+func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) error {
+	e, err := s.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, s.node.Leader()), false, false)
+	if err != nil {
 		log.Println("redirect cannot find node", id)
 		return fmt.Errorf("redirect cannot find node %d", id)
 	}
 
+	m, err := url.ParseQuery(*e.Node.Value)
+	if err != nil {
+		return fmt.Errorf("failed to parse node entry: %s", *e.Node.Value)
+	}
+
 	originalURL := r.URL
-	redirectURL, err := url.Parse(baseURL)
+	redirectURL, err := url.Parse(m["etcd"][0])
 	if err != nil {
 		log.Println("redirect cannot parse url:", err)
 		return fmt.Errorf("redirect cannot parse url: %v", err)

+ 6 - 0
main.go

@@ -27,6 +27,12 @@ func main() {
 	e := etcd.New(config, genId())
 	go e.Run()
 
+	go func() {
+		if err := http.ListenAndServe(config.Peer.BindAddr, e.RaftHandler()); err != nil {
+			log.Fatal("system", err)
+		}
+	}()
+
 	if err := http.ListenAndServe(config.BindAddr, e); err != nil {
 		log.Fatal("system", err)
 	}

+ 1 - 1
raft/cluster_test.go

@@ -124,7 +124,7 @@ func buildCluster(size int, ids []int64) (nt *network, nodes []*Node) {
 	lead := dictate(nodes[0])
 	lead.Next()
 	for i := 1; i < size; i++ {
-		lead.Add(ids[i], "")
+		lead.Add(ids[i], "", nil)
 		nt.send(lead.Msgs()...)
 		for j := 0; j < i; j++ {
 			nodes[j].Next()

+ 7 - 4
raft/node.go

@@ -13,8 +13,9 @@ type Interface interface {
 type tick int
 
 type Config struct {
-	NodeId int64
-	Addr   string
+	NodeId  int64
+	Addr    string
+	Context []byte
 }
 
 type Node struct {
@@ -51,7 +52,7 @@ func (n *Node) HasLeader() bool { return n.sm.lead != none }
 
 func (n *Node) IsLeader() bool { return n.sm.lead == n.Id() }
 
-func (n *Node) Leader() int { return n.sm.lead }
+func (n *Node) Leader() int64 { return n.sm.lead }
 
 // Propose asynchronously proposes data be applied to the underlying state machine.
 func (n *Node) Propose(data []byte) { n.propose(Normal, data) }
@@ -62,7 +63,9 @@ func (n *Node) propose(t int, data []byte) {
 
 func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) }
 
-func (n *Node) Add(id int64, addr string) { n.updateConf(AddNode, &Config{NodeId: id, Addr: addr}) }
+func (n *Node) Add(id int64, addr string, context []byte) {
+	n.updateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
+}
 
 func (n *Node) Remove(id int64) { n.updateConf(RemoveNode, &Config{NodeId: id}) }
 

+ 4 - 4
raft/node_test.go

@@ -36,7 +36,7 @@ func TestTickMsgBeat(t *testing.T) {
 	n := dictate(New(0, defaultHeartbeat, defaultElection))
 	n.Next()
 	for i := 1; i < k; i++ {
-		n.Add(int64(i), "")
+		n.Add(int64(i), "", nil)
 		for _, m := range n.Msgs() {
 			if m.Type == msgApp {
 				n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)})
@@ -112,7 +112,7 @@ func TestStartCluster(t *testing.T) {
 func TestAdd(t *testing.T) {
 	n := dictate(New(0, defaultHeartbeat, defaultElection))
 	n.Next()
-	n.Add(1, "")
+	n.Add(1, "", nil)
 	n.Next()
 
 	if len(n.sm.ins) != 2 {
@@ -126,7 +126,7 @@ func TestAdd(t *testing.T) {
 func TestRemove(t *testing.T) {
 	n := dictate(New(0, defaultHeartbeat, defaultElection))
 	n.Next()
-	n.Add(1, "")
+	n.Add(1, "", nil)
 	n.Next()
 	n.Remove(0)
 	n.Step(Message{Type: msgAppResp, From: 1, Term: 1, Index: 4})
@@ -142,6 +142,6 @@ func TestRemove(t *testing.T) {
 
 func dictate(n *Node) *Node {
 	n.Step(Message{Type: msgHup})
-	n.Add(n.Id(), "")
+	n.Add(n.Id(), "", nil)
 	return n
 }