Browse Source

etcd: cleanup peerhub

Xiang Li 11 years ago
parent
commit
d198173fd7
2 changed files with 21 additions and 39 deletions
  1. 2 5
      etcd/etcd.go
  2. 19 34
      etcd/peer_hub.go

+ 2 - 5
etcd/etcd.go

@@ -446,14 +446,11 @@ func (s *Server) apply(ents []raft.Entry) {
 				log.Println(err)
 				break
 			}
-			if err := s.peerHub.add(cfg.NodeId, cfg.Addr); err != nil {
+			peer, err := s.peerHub.add(cfg.NodeId, cfg.Addr)
+			if err != nil {
 				log.Println(err)
 				break
 			}
-			peer, err := s.peerHub.peer(cfg.NodeId)
-			if err != nil {
-				log.Fatal("cannot get the added peer:", err)
-			}
 			peer.participate()
 			log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
 			p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))

+ 19 - 34
etcd/peer_hub.go

@@ -54,74 +54,59 @@ func (h *peerHub) peer(id int64) (*peer, error) {
 	return nil, fmt.Errorf("peer %d not found", id)
 }
 
-func (h *peerHub) add(id int64, rawurl string) error {
+func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
 	u, err := url.Parse(rawurl)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	u.Path = raftPrefix
 
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	h.peers[id] = newPeer(u.String(), h.c)
-	return nil
+	return h.peers[id], nil
 }
 
 func (h *peerHub) send(nodeId int64, data []byte) error {
-	h.mu.RLock()
-	p := h.peers[nodeId]
-	h.mu.RUnlock()
-
-	if p == nil {
-		err := h.fetch(nodeId)
-		if err != nil {
-			return errUnknownPeer
-		}
+	if p, err := h.fetch(nodeId); err == nil {
+		return p.send(data)
 	}
-
-	h.mu.RLock()
-	p = h.peers[nodeId]
-	h.mu.RUnlock()
-	return p.send(data)
+	return errUnknownPeer
 }
 
-func (h *peerHub) fetch(nodeId int64) error {
+func (h *peerHub) fetch(nodeId int64) (*peer, error) {
+	if p, err := h.peer(nodeId); err == nil {
+		return p, nil
+	}
 	for seed := range h.seeds {
-		if err := h.seedFetch(seed, nodeId); err == nil {
-			return nil
+		if p, err := h.seedFetch(seed, nodeId); err == nil {
+			return p, nil
 		}
 	}
-	return fmt.Errorf("cannot fetch the address of node %d", nodeId)
+	return nil, fmt.Errorf("cannot fetch the address of node %d", nodeId)
 }
 
-func (h *peerHub) seedFetch(seedurl string, id int64) error {
-	if _, err := h.peer(id); err == nil {
-		return nil
-	}
-
+func (h *peerHub) seedFetch(seedurl string, id int64) (*peer, error) {
 	u, err := url.Parse(seedurl)
 	if err != nil {
-		return fmt.Errorf("cannot parse the url of the given seed")
+		return nil, fmt.Errorf("cannot parse the url of the given seed")
 	}
 
 	u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
 	resp, err := h.c.Get(u.String())
 	if err != nil {
-		return fmt.Errorf("cannot reach %v", u)
+		return nil, fmt.Errorf("cannot reach %v", u)
 	}
 	defer resp.Body.Close()
 
 	if resp.StatusCode != http.StatusOK {
-		return fmt.Errorf("cannot find node %d via %s", id, seedurl)
+		return nil, fmt.Errorf("cannot find node %d via %s", id, seedurl)
 	}
 
 	b, err := ioutil.ReadAll(resp.Body)
 	if err != nil {
-		return fmt.Errorf("cannot reach %v", u)
+		return nil, fmt.Errorf("cannot reach %v", u)
 	}
 
-	if err := h.add(id, string(b)); err != nil {
-		return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
-	}
-	return nil
+	return h.add(id, string(b))
 }