Browse Source

etcd: stop peerhub

Xiang Li 11 years ago
parent
commit
447f6a16cc
2 changed files with 24 additions and 12 deletions
  1. 1 6
      etcd/etcd.go
  2. 23 6
      etcd/peer_hub.go

+ 1 - 6
etcd/etcd.go

@@ -480,12 +480,7 @@ func (s *Server) apply(ents []raft.Entry) {
 
 func (s *Server) send(msgs []raft.Message) {
 	for i := range msgs {
-		data, err := json.Marshal(msgs[i])
-		if err != nil {
-			// todo(xiangli): error handling
-			log.Fatal(err)
-		}
-		if err = s.peerHub.send(msgs[i].To, data); err != nil {
+		if err := s.peerHub.send(msgs[i]); err != nil {
 			log.Println("send:", err)
 		}
 	}

+ 23 - 6
etcd/peer_hub.go

@@ -1,6 +1,7 @@
 package etcd
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io/ioutil"
@@ -8,6 +9,8 @@ import (
 	"net/url"
 	"path"
 	"sync"
+
+	"github.com/coreos/etcd/raft"
 )
 
 var (
@@ -19,10 +22,11 @@ type peerGetter interface {
 }
 
 type peerHub struct {
-	mu    sync.RWMutex
-	seeds map[string]bool
-	peers map[int64]*peer
-	c     *http.Client
+	mu      sync.RWMutex
+	stopped bool
+	seeds   map[string]bool
+	peers   map[int64]*peer
+	c       *http.Client
 }
 
 func newPeerHub(seeds []string, c *http.Client) *peerHub {
@@ -38,6 +42,9 @@ func newPeerHub(seeds []string, c *http.Client) *peerHub {
 }
 
 func (h *peerHub) stop() {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.stopped = true
 	for _, p := range h.peers {
 		p.stop()
 	}
@@ -48,6 +55,9 @@ func (h *peerHub) stop() {
 func (h *peerHub) peer(id int64) (*peer, error) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
+	if h.stopped {
+		return nil, fmt.Errorf("peerHub stopped")
+	}
 	if p, ok := h.peers[id]; ok {
 		return p, nil
 	}
@@ -63,12 +73,19 @@ func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
 
 	h.mu.Lock()
 	defer h.mu.Unlock()
+	if h.stopped {
+		return nil, fmt.Errorf("peerHub stopped")
+	}
 	h.peers[id] = newPeer(u.String(), h.c)
 	return h.peers[id], nil
 }
 
-func (h *peerHub) send(nodeId int64, data []byte) error {
-	if p, err := h.fetch(nodeId); err == nil {
+func (h *peerHub) send(msg raft.Message) error {
+	if p, err := h.fetch(msg.To); err == nil {
+		data, err := json.Marshal(msg)
+		if err != nil {
+			return err
+		}
 		return p.send(data)
 	}
 	return errUnknownPeer