소스 검색

etcd: move fetch logic into peerhub

Xiang Li 11 년 전
부모
커밋
e0e8495ace
2개의 변경된 파일52개의 추가작업 그리고 48개의 파일을 삭제
  1. 5 22
      etcd/etcd.go
  2. 47 26
      etcd/peer_hub.go

+ 5 - 22
etcd/etcd.go

@@ -105,7 +105,7 @@ func New(c *config.Config, id int64) *Server {
 	tr := new(http.Transport)
 	tr.TLSClientConfig = tc
 	client := &http.Client{Transport: tr}
-	peerHub := newPeerHub(client)
+	peerHub := newPeerHub(c.Peers, client)
 
 	s := &Server{
 		config:      c,
@@ -113,7 +113,8 @@ func New(c *config.Config, id int64) *Server {
 		pubAddr:     c.Addr,
 		raftPubAddr: c.Peer.Addr,
 
-		nodes:   make(map[string]bool),
+		nodes: make(map[string]bool),
+
 		peerHub: peerHub,
 
 		tickDuration: defaultTickDuration,
@@ -487,28 +488,10 @@ func (s *Server) send(msgs []raft.Message) {
 			// todo(xiangli): error handling
 			log.Fatal(err)
 		}
-		if err = s.peerHub.send(msgs[i].To, data); err == nil {
-			continue
-		}
-		if err == errUnknownNode {
-			err = s.fetchAddr(msgs[i].To)
-		}
-		if err == nil {
-			err = s.peerHub.send(msgs[i].To, data)
-		}
-		if err != nil {
-			log.Println(err)
-		}
-	}
-}
-
-func (s *Server) fetchAddr(nodeId int64) error {
-	for seed := range s.nodes {
-		if err := s.peerHub.fetch(seed, nodeId); err == nil {
-			return nil
+		if err = s.peerHub.send(msgs[i].To, data); err != nil {
+			log.Println("send:", err)
 		}
 	}
-	return fmt.Errorf("cannot fetch the address of node %d", nodeId)
 }
 
 func (s *Server) fetchAddrFromStore(nodeId int64) string {

+ 47 - 26
etcd/peer_hub.go

@@ -15,15 +15,20 @@ type peerGetter interface {
 
 type peerHub struct {
 	mu    sync.RWMutex
+	seeds map[string]bool
 	peers map[int64]*peer
 	c     *http.Client
 }
 
-func newPeerHub(c *http.Client) *peerHub {
+func newPeerHub(seeds []string, c *http.Client) *peerHub {
 	h := &peerHub{
 		peers: make(map[int64]*peer),
+		seeds: make(map[string]bool),
 		c:     c,
 	}
+	for _, seed := range seeds {
+		h.seeds[seed] = true
+	}
 	return h
 }
 
@@ -44,7 +49,47 @@ func (h *peerHub) peer(id int64) (*peer, error) {
 	return nil, fmt.Errorf("peer %d not found", id)
 }
 
-func (h *peerHub) fetch(seedurl string, id int64) error {
+func (h *peerHub) add(id int64, rawurl string) error {
+	u, err := url.Parse(rawurl)
+	if err != nil {
+		return err
+	}
+	u.Path = raftPrefix
+
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.peers[id] = newPeer(u.String(), h.c)
+	return nil
+}
+
+func (h *peerHub) send(nodeId int64, data []byte) error {
+	h.mu.RLock()
+	p := h.peers[nodeId]
+	h.mu.RUnlock()
+
+	if p == nil {
+		err := h.fetch(nodeId)
+		if err != nil {
+			return err
+		}
+	}
+
+	h.mu.RLock()
+	p = h.peers[nodeId]
+	h.mu.RUnlock()
+	return p.send(data)
+}
+
+func (h *peerHub) fetch(nodeId int64) error {
+	for seed := range h.seeds {
+		if err := h.seedFetch(seed, nodeId); err == nil {
+			return nil
+		}
+	}
+	return fmt.Errorf("cannot fetch the address of node %d", nodeId)
+}
+
+func (h *peerHub) seedFetch(seedurl string, id int64) error {
 	if _, err := h.peer(id); err == nil {
 		return nil
 	}
@@ -75,27 +120,3 @@ func (h *peerHub) fetch(seedurl string, id int64) error {
 	}
 	return nil
 }
-
-func (h *peerHub) add(id int64, rawurl string) error {
-	u, err := url.Parse(rawurl)
-	if err != nil {
-		return err
-	}
-	u.Path = raftPrefix
-
-	h.mu.Lock()
-	defer h.mu.Unlock()
-	h.peers[id] = newPeer(u.String(), h.c)
-	return nil
-}
-
-func (h *peerHub) send(nodeId int64, data []byte) error {
-	h.mu.RLock()
-	p := h.peers[nodeId]
-	h.mu.RUnlock()
-
-	if p == nil {
-		return errUnknownNode
-	}
-	return p.send(data)
-}