Selaa lähdekoodia

etcd: refactor transporter

Xiang Li 11 vuotta sitten
vanhempi
commit
d2a553f6c4
7 muutettua tiedostoa jossa 379 lisäystä ja 222 poistoa
  1. 52 31
      etcd/etcd.go
  2. 2 1
      etcd/etcd_test.go
  3. 129 0
      etcd/peer.go
  4. 101 0
      etcd/peer_hub.go
  5. 93 0
      etcd/raft_handler.go
  6. 0 188
      etcd/transporter.go
  7. 2 2
      etcd/v2_http.go

+ 52 - 31
etcd/etcd.go

@@ -55,12 +55,15 @@ type Server struct {
 
 	mode int
 
-	id           int64
-	pubAddr      string
-	raftPubAddr  string
+	id          int64
+	pubAddr     string
+	raftPubAddr string
+
+	nodes   map[string]bool
+	peerHub *peerHub
+
 	tickDuration time.Duration
 
-	nodes  map[string]bool
 	client *v2client
 	t      *transporter
 	node   *v2Raft
@@ -99,25 +102,36 @@ func New(c *config.Config, id int64) *Server {
 		}
 	}
 
+	tr := new(http.Transport)
+	tr.TLSClientConfig = tc
+	client := &http.Client{Transport: tr}
+
 	s := &Server{
-		config:       c,
-		id:           id,
-		pubAddr:      c.Addr,
-		raftPubAddr:  c.Peer.Addr,
+		config:      c,
+		id:          id,
+		pubAddr:     c.Addr,
+		raftPubAddr: c.Peer.Addr,
+
+		nodes:   make(map[string]bool),
+		peerHub: newPeerHub(client),
+
 		tickDuration: defaultTickDuration,
 
-		nodes:  make(map[string]bool),
-		client: newClient(tc),
-		t:      newTransporter(tc),
 		node: &v2Raft{
 			Node:   raft.New(id, defaultHeartbeat, defaultElection),
 			result: make(map[wait]chan interface{}),
 		},
+
+		addNodeC:    make(chan raft.Config),
+		removeNodeC: make(chan raft.Config),
+		client:      newClient(tc),
+
 		Store: store.New(),
 
 		modeC: make(chan int, 10),
 		stop:  make(chan struct{}),
 	}
+	s.t = newTransporter(s.peerHub)
 
 	for _, seed := range c.Peers {
 		s.nodes[seed] = true
@@ -169,8 +183,10 @@ func (s *Server) Stop() {
 		return
 	}
 	s.mode = stop
-	s.t.closeConnections()
+
+	s.t.stop()
 	s.client.CloseConnections()
+	s.peerHub.stop()
 	close(s.stop)
 }
 
@@ -446,10 +462,15 @@ func (s *Server) apply(ents []raft.Entry) {
 				log.Println(err)
 				break
 			}
-			if err := s.t.set(cfg.NodeId, cfg.Addr); err != nil {
+			if err := s.peerHub.add(cfg.NodeId, cfg.Addr); err != nil {
 				log.Println(err)
 				break
 			}
+			peer, err := s.peerHub.peer(cfg.NodeId)
+			if err != nil {
+				log.Fatal("cannot get the added peer:", err)
+			}
+			peer.participate()
 			log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			if _, err := s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent); err == nil {
@@ -463,6 +484,11 @@ func (s *Server) apply(ents []raft.Entry) {
 			}
 			log.Printf("Remove Node %x\n", cfg.NodeId)
 			delete(s.nodes, s.fetchAddrFromStore(cfg.NodeId))
+			peer, err := s.peerHub.peer(cfg.NodeId)
+			if err != nil {
+				log.Fatal("cannot get the added peer:", err)
+			}
+			peer.idle()
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			s.Store.Delete(p, false, false)
 		default:
@@ -478,23 +504,18 @@ func (s *Server) send(msgs []raft.Message) {
 			// todo(xiangli): error handling
 			log.Fatal(err)
 		}
-		// todo(xiangli): reuse routines and limit the number of sending routines
-		// sync.Pool?
-		go func(i int) {
-			var err error
-			if err = s.t.sendTo(msgs[i].To, data); err == nil {
-				return
-			}
-			if err == errUnknownNode {
-				err = s.fetchAddr(msgs[i].To)
-			}
-			if err == nil {
-				err = s.t.sendTo(msgs[i].To, data)
-			}
-			if err != nil {
-				log.Println(err)
-			}
-		}(i)
+		if err = s.peerHub.send(msgs[i].To, data); err == nil {
+			continue
+		}
+		if err == errUnknownNode {
+			err = s.fetchAddr(msgs[i].To)
+		}
+		if err == nil {
+			err = s.peerHub.send(msgs[i].To, data)
+		}
+		if err != nil {
+			log.Println(err)
+		}
 	}
 }
 
@@ -511,7 +532,7 @@ func (s *Server) setClusterConfig(c *config.ClusterConfig) error {
 
 func (s *Server) fetchAddr(nodeId int64) error {
 	for seed := range s.nodes {
-		if err := s.t.fetchAddr(seed, nodeId); err == nil {
+		if err := s.peerHub.fetch(seed, nodeId); err == nil {
 			return nil
 		}
 	}

+ 2 - 1
etcd/etcd_test.go

@@ -14,7 +14,7 @@ import (
 )
 
 func TestMultipleNodes(t *testing.T) {
-	tests := []int{1, 3, 5, 9, 11}
+	tests := []int{1, 3, 5}
 
 	for _, tt := range tests {
 		es, hs := buildCluster(tt, false)
@@ -195,6 +195,7 @@ func TestRemove(t *testing.T) {
 		}
 	}
 	afterTest(t)
+	TestGoroutinesRunning(t)
 }
 
 func TestBecomeStandby(t *testing.T) {

+ 129 - 0
etcd/peer.go

@@ -0,0 +1,129 @@
+package etcd
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"log"
+	"net/http"
+	"sync"
+	"sync/atomic"
+)
+
+const (
+	maxInflight = 4
+)
+
+const (
+	// participant is defined in etcd.go
+	idle = iota + 1
+	stopped
+)
+
+var (
+	errUnknownNode = errors.New("unknown node")
+)
+
+type peer struct {
+	url      string
+	queue    chan []byte
+	status   int
+	inflight atomicInt
+	c        *http.Client
+	mu       sync.RWMutex
+	wg       sync.WaitGroup
+}
+
+func newPeer(url string, c *http.Client) *peer {
+	return &peer{
+		url:    url,
+		status: idle,
+		c:      c,
+	}
+}
+
+func (p *peer) participate() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.queue = make(chan []byte)
+	p.status = participant
+	for i := 0; i < maxInflight; i++ {
+		go p.handle(p.queue)
+	}
+}
+
+func (p *peer) idle() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.status == participant {
+		close(p.queue)
+	}
+	p.status = idle
+}
+
+func (p *peer) stop() {
+	p.mu.Lock()
+	if p.status == participant {
+		close(p.queue)
+	}
+	p.status = stopped
+	p.mu.Unlock()
+	p.wg.Wait()
+}
+
+func (p *peer) handle(queue chan []byte) {
+	p.wg.Add(1)
+	for d := range queue {
+		p.post(d)
+	}
+	p.wg.Done()
+}
+
+func (p *peer) send(d []byte) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	switch p.status {
+	case participant:
+		select {
+		case p.queue <- d:
+		default:
+			return fmt.Errorf("reach max serving")
+		}
+	case idle:
+		if p.inflight.Get() > maxInflight {
+			return fmt.Errorf("reach max idle")
+		}
+		go func() {
+			p.wg.Add(1)
+			p.post(d)
+			p.wg.Done()
+		}()
+	case stopped:
+		return fmt.Errorf("sender stopped")
+	}
+	return nil
+}
+
+func (p *peer) post(d []byte) {
+	p.inflight.Add(1)
+	defer p.inflight.Add(-1)
+	buf := bytes.NewBuffer(d)
+	resp, err := p.c.Post(p.url, "application/octet-stream", buf)
+	if err != nil {
+		log.Println("post:", err)
+		return
+	}
+	resp.Body.Close()
+}
+
+// An AtomicInt is an int64 to be accessed atomically.
+type atomicInt int64
+
+func (i *atomicInt) Add(d int64) {
+	atomic.AddInt64((*int64)(i), d)
+}
+
+func (i *atomicInt) Get() int64 {
+	return atomic.LoadInt64((*int64)(i))
+}

+ 101 - 0
etcd/peer_hub.go

@@ -0,0 +1,101 @@
+package etcd
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+	"sync"
+)
+
+type peerGetter interface {
+	peer(id int64) (*peer, error)
+}
+
+type peerHub struct {
+	mu    sync.RWMutex
+	peers map[int64]*peer
+	c     *http.Client
+}
+
+func newPeerHub(c *http.Client) *peerHub {
+	h := &peerHub{
+		peers: make(map[int64]*peer),
+		c:     c,
+	}
+	return h
+}
+
+func (h *peerHub) stop() {
+	for _, p := range h.peers {
+		p.stop()
+	}
+	tr := h.c.Transport.(*http.Transport)
+	tr.CloseIdleConnections()
+}
+
+func (h *peerHub) peer(id int64) (*peer, error) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	if p, ok := h.peers[id]; ok {
+		return p, nil
+	}
+	return nil, fmt.Errorf("peer %d not found", id)
+}
+
+func (h *peerHub) fetch(seedurl string, id int64) error {
+	if _, err := h.peer(id); err == nil {
+		return nil
+	}
+
+	u, err := url.Parse(seedurl)
+	if err != nil {
+		return fmt.Errorf("cannot parse the url of the given seed")
+	}
+
+	u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
+	resp, err := h.c.Get(u.String())
+	if err != nil {
+		return fmt.Errorf("cannot reach %v", u)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("cannot find node %d via %s", id, seedurl)
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return fmt.Errorf("cannot reach %v", u)
+	}
+
+	if err := h.add(id, string(b)); err != nil {
+		return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
+	}
+	return nil
+}
+
+func (h *peerHub) add(id int64, rawurl string) error {
+	u, err := url.Parse(rawurl)
+	if err != nil {
+		return err
+	}
+	u.Path = raftPrefix
+
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.peers[id] = newPeer(u.String(), h.c)
+	return nil
+}
+
+func (h *peerHub) send(nodeId int64, data []byte) error {
+	h.mu.RLock()
+	p := h.peers[nodeId]
+	h.mu.RUnlock()
+
+	if p == nil {
+		return errUnknownNode
+	}
+	return p.send(data)
+}

+ 93 - 0
etcd/raft_handler.go

@@ -0,0 +1,93 @@
+package etcd
+
+import (
+	"encoding/json"
+
+	"log"
+	"net/http"
+	"strconv"
+	"sync"
+
+	"github.com/coreos/etcd/raft"
+)
+
+type transporter struct {
+	mu      sync.RWMutex
+	serving bool
+
+	peerGetter peerGetter
+
+	recv chan *raft.Message
+	*http.ServeMux
+}
+
+func newTransporter(p peerGetter) *transporter {
+	t := &transporter{
+		recv:       make(chan *raft.Message, 512),
+		peerGetter: p,
+	}
+	t.ServeMux = http.NewServeMux()
+	t.ServeMux.HandleFunc("/raft/cfg/", t.serveCfg)
+	t.ServeMux.HandleFunc("/raft", t.serveRaft)
+	return t
+}
+
+func (t *transporter) start() {
+	t.mu.Lock()
+	t.serving = true
+	t.mu.Unlock()
+}
+
+func (t *transporter) stop() {
+	t.mu.Lock()
+	t.serving = false
+	t.mu.Unlock()
+}
+
+func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
+	t.mu.RLock()
+	serving := t.serving
+	t.mu.RUnlock()
+	if !serving {
+		http.Error(w, "404 page not found", http.StatusNotFound)
+		return
+	}
+
+	msg := new(raft.Message)
+	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
+		log.Println(err)
+		return
+	}
+
+	select {
+	case t.recv <- msg:
+	default:
+		log.Println("drop")
+		// drop the incoming package at network layer if the upper layer
+		// cannot consume them in time.
+		// TODO(xiangli): not return 200.
+	}
+	return
+}
+
+func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
+	t.mu.RLock()
+	serving := t.serving
+	t.mu.RUnlock()
+	if !serving {
+		http.Error(w, "404 page not found", http.StatusNotFound)
+		return
+	}
+
+	id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+		return
+	}
+	p, err := t.peerGetter.peer(id)
+	if err == nil {
+		w.Write([]byte(p.url))
+		return
+	}
+	http.Error(w, err.Error(), http.StatusNotFound)
+}

+ 0 - 188
etcd/transporter.go

@@ -1,188 +0,0 @@
-package etcd
-
-import (
-	"bytes"
-	"crypto/tls"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"io/ioutil"
-	"log"
-	"net/http"
-	"net/url"
-	"path"
-	"strconv"
-	"sync"
-
-	"github.com/coreos/etcd/raft"
-)
-
-const (
-	serving int = iota
-	stopped
-)
-
-var (
-	errUnknownNode = errors.New("unknown node")
-)
-
-type transporter struct {
-	mu     sync.RWMutex
-	status int
-	urls   map[int64]string
-
-	recv   chan *raft.Message
-	client *http.Client
-	wg     sync.WaitGroup
-	*http.ServeMux
-}
-
-func newTransporter(tc *tls.Config) *transporter {
-	tr := new(http.Transport)
-	tr.TLSClientConfig = tc
-	c := &http.Client{Transport: tr}
-
-	t := &transporter{
-		urls:   make(map[int64]string),
-		recv:   make(chan *raft.Message, 512),
-		client: c,
-	}
-	t.ServeMux = http.NewServeMux()
-	t.ServeMux.HandleFunc("/raft/cfg/", t.serveCfg)
-	t.ServeMux.HandleFunc("/raft", t.serveRaft)
-	return t
-}
-
-func (t *transporter) start() {
-	t.mu.Lock()
-	t.status = serving
-	t.mu.Unlock()
-}
-
-func (t *transporter) stop() {
-	t.mu.Lock()
-	t.status = stopped
-	t.mu.Unlock()
-}
-
-func (t *transporter) closeConnections() {
-	t.wg.Wait()
-	tr := t.client.Transport.(*http.Transport)
-	tr.CloseIdleConnections()
-}
-
-func (t *transporter) set(nodeId int64, rawurl string) error {
-	u, err := url.Parse(rawurl)
-	if err != nil {
-		return err
-	}
-	u.Path = raftPrefix
-	t.mu.Lock()
-	t.urls[nodeId] = u.String()
-	t.mu.Unlock()
-	return nil
-}
-
-func (t *transporter) sendTo(nodeId int64, data []byte) error {
-	t.mu.RLock()
-	url := t.urls[nodeId]
-	t.mu.RUnlock()
-
-	if len(url) == 0 {
-		return errUnknownNode
-	}
-	return t.send(url, data)
-}
-
-func (t *transporter) send(addr string, data []byte) error {
-	t.mu.RLock()
-	if t.status == stopped {
-		t.mu.RUnlock()
-		return fmt.Errorf("transporter stopped")
-	}
-	t.wg.Add(1)
-	defer t.wg.Done()
-	t.mu.RUnlock()
-
-	buf := bytes.NewBuffer(data)
-	resp, err := t.client.Post(addr, "application/octet-stream", buf)
-	if err != nil {
-		return err
-	}
-	resp.Body.Close()
-	return nil
-}
-
-func (t *transporter) fetchAddr(seedurl string, id int64) error {
-	u, err := url.Parse(seedurl)
-	if err != nil {
-		return fmt.Errorf("cannot parse the url of the given seed")
-	}
-
-	u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
-	resp, err := t.client.Get(u.String())
-	if err != nil {
-		return fmt.Errorf("cannot reach %v", u)
-	}
-	defer resp.Body.Close()
-
-	b, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return fmt.Errorf("cannot reach %v", u)
-	}
-
-	if err := t.set(id, string(b)); err != nil {
-		return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
-	}
-	return nil
-}
-
-func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
-	t.mu.RLock()
-	status := t.status
-	t.mu.RUnlock()
-	if status == stopped {
-		http.Error(w, "404 page not found", http.StatusNotFound)
-		return
-	}
-
-	msg := new(raft.Message)
-	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
-		log.Println(err)
-		return
-	}
-
-	select {
-	case t.recv <- msg:
-	default:
-		log.Println("drop")
-		// drop the incoming package at network layer if the upper layer
-		// cannot consume them in time.
-		// TODO(xiangli): not return 200.
-	}
-	return
-}
-
-func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
-	t.mu.RLock()
-	status := t.status
-	t.mu.RUnlock()
-	if status == stopped {
-		http.Error(w, "404 page not found", http.StatusNotFound)
-		return
-	}
-
-	id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusBadRequest)
-		return
-	}
-	t.mu.RLock()
-	u, ok := t.urls[id]
-	t.mu.RUnlock()
-	if ok {
-		w.Write([]byte(u))
-		return
-	}
-	http.Error(w, "Not Found", http.StatusNotFound)
-}

+ 2 - 2
etcd/v2_http.go

@@ -51,8 +51,8 @@ func (s *Server) serveLeader(w http.ResponseWriter, r *http.Request) error {
 	if r.Method != "GET" {
 		return allow(w, "GET")
 	}
-	if laddr, ok := s.t.urls[s.node.Leader()]; ok {
-		w.Write([]byte(laddr))
+	if p, ok := s.peerHub.peers[s.node.Leader()]; ok {
+		w.Write([]byte(p.url))
 		return nil
 	}
 	return fmt.Errorf("no leader")