Browse Source

etcdserver: add/remove sender in sendhub explicitly

Yicheng Qin 11 years ago
parent
commit
457b30e585
3 changed files with 29 additions and 24 deletions
  1. 20 23
      etcdserver/sender.go
  2. 4 0
      etcdserver/server.go
  3. 5 1
      etcdserver/server_test.go

+ 20 - 23
etcdserver/sender.go

@@ -45,19 +45,27 @@ type sendHub struct {
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
 func newSendHub(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
-	return &sendHub{
+	h := &sendHub{
 		tr:      t,
 		cl:      cl,
 		ss:      ss,
 		ls:      ls,
 		senders: make(map[types.ID]*sender),
 	}
+	for _, m := range cl.Members() {
+		h.Add(m)
+	}
+	return h
 }
 
 func (h *sendHub) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
-		s := h.sender(types.ID(m.To))
-		if s == nil {
+		to := types.ID(m.To)
+		s, ok := h.senders[to]
+		if !ok {
+			if !h.cl.IsIDRemoved(to) {
+				log.Printf("etcdserver: send message to unknown receiver %s", to)
+			}
 			continue
 		}
 
@@ -83,29 +91,18 @@ func (h *sendHub) Stop() {
 	}
 }
 
-func (h *sendHub) sender(id types.ID) *sender {
-	if s, ok := h.senders[id]; ok {
-		return s
-	}
-	return h.add(id)
-}
-
-func (h *sendHub) add(id types.ID) *sender {
-	memb := h.cl.Member(id)
-	if memb == nil {
-		if !h.cl.IsIDRemoved(id) {
-			log.Printf("etcdserver: add unknown receiver %s", id)
-		}
-		return nil
-	}
+func (h *sendHub) Add(m *Member) {
 	// TODO: considering how to switch between all available peer urls
-	u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
+	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
 	c := &http.Client{Transport: h.tr}
-	fs := h.ls.Follower(id.String())
+	fs := h.ls.Follower(m.ID.String())
 	s := newSender(u, h.cl.ID(), c, fs)
-	// TODO: recycle sender during long running
-	h.senders[id] = s
-	return s
+	h.senders[m.ID] = s
+}
+
+func (h *sendHub) Remove(id types.ID) {
+	h.senders[id].stop()
+	delete(h.senders, id)
 }
 
 type sender struct {

+ 4 - 0
etcdserver/server.go

@@ -86,6 +86,8 @@ type Response struct {
 
 type Sender interface {
 	Send(m []raftpb.Message)
+	Add(m *Member)
+	Remove(id types.ID)
 	Stop()
 }
 
@@ -652,10 +654,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
 			log.Panicf("nodeID should always be equal to member ID")
 		}
 		s.Cluster.AddMember(m)
+		s.sender.Add(m)
 		log.Printf("etcdserver: added node %s to cluster", types.ID(cc.NodeID))
 	case raftpb.ConfChangeRemoveNode:
 		id := types.ID(cc.NodeID)
 		s.Cluster.RemoveMember(id)
+		s.sender.Remove(id)
 		log.Printf("etcdserver: removed node %s from cluster", id)
 	}
 	return nil

+ 5 - 1
etcdserver/server_test.go

@@ -496,7 +496,9 @@ func (s *fakeSender) Send(msgs []raftpb.Message) {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
 	}
 }
-func (s *fakeSender) Stop() {}
+func (s *fakeSender) Add(m *Member)      {}
+func (s *fakeSender) Remove(id types.ID) {}
+func (s *fakeSender) Stop()              {}
 
 func testServer(t *testing.T, ns uint64) {
 	ctx, cancel := context.WithCancel(context.Background())
@@ -1385,6 +1387,8 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 type nopSender struct{}
 
 func (s *nopSender) Send(m []raftpb.Message) {}
+func (s *nopSender) Add(m *Member)           {}
+func (s *nopSender) Remove(id types.ID)      {}
 func (s *nopSender) Stop()                   {}
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {