Browse Source

Merge pull request #1869 from yichengq/251

etcdserver: not add self into sendhub when new server
Yicheng Qin 11 years ago
parent
commit
15aed05071
3 changed files with 20 additions and 34 deletions
  1. 1 5
      etcdserver/sendhub.go
  2. 6 29
      etcdserver/sendhub_test.go
  3. 13 0
      etcdserver/server.go

+ 1 - 5
etcdserver/sendhub.go

@@ -56,7 +56,7 @@ type sendHub struct {
 // to other members. The returned sendHub will update the given ServerStats and
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
 // LeaderStats appropriately.
 func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
 func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
-	h := &sendHub{
+	return &sendHub{
 		tr:         t,
 		tr:         t,
 		cl:         cl,
 		cl:         cl,
 		p:          p,
 		p:          p,
@@ -65,10 +65,6 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *s
 		senders:    make(map[types.ID]rafthttp.Sender),
 		senders:    make(map[types.ID]rafthttp.Sender),
 		shouldstop: make(chan struct{}, 1),
 		shouldstop: make(chan struct{}, 1),
 	}
 	}
-	for _, m := range cl.Members() {
-		h.Add(m)
-	}
-	return h
 }
 }
 
 
 func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
 func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }

+ 6 - 29
etcdserver/sendhub_test.go

@@ -27,27 +27,6 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
-func TestSendHubInitSenders(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-		newTestMember(2, []string{"http://b"}, "", nil),
-		newTestMember(3, []string{"http://c"}, "", nil),
-	}
-	cl := newTestCluster(membs)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, nil, ls)
-
-	ids := cl.MemberIDs()
-	if len(h.senders) != len(ids) {
-		t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids))
-	}
-	for _, id := range ids {
-		if _, ok := h.senders[id]; !ok {
-			t.Errorf("senders[%s] is nil, want exists", id)
-		}
-	}
-}
-
 func TestSendHubAdd(t *testing.T) {
 func TestSendHubAdd(t *testing.T) {
 	cl := newTestCluster(nil)
 	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
 	ls := stats.NewLeaderStats("")
@@ -71,12 +50,11 @@ func TestSendHubAdd(t *testing.T) {
 }
 }
 
 
 func TestSendHubRemove(t *testing.T) {
 func TestSendHubRemove(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-	}
-	cl := newTestCluster(membs)
+	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
 	ls := stats.NewLeaderStats("")
 	h := newSendHub(nil, cl, nil, nil, ls)
 	h := newSendHub(nil, cl, nil, nil, ls)
+	m := newTestMember(1, []string{"http://a"}, "", nil)
+	h.Add(m)
 	h.Remove(types.ID(1))
 	h.Remove(types.ID(1))
 
 
 	if _, ok := h.senders[types.ID(1)]; ok {
 	if _, ok := h.senders[types.ID(1)]; ok {
@@ -85,13 +63,12 @@ func TestSendHubRemove(t *testing.T) {
 }
 }
 
 
 func TestSendHubShouldStop(t *testing.T) {
 func TestSendHubShouldStop(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-	}
 	tr := newRespRoundTripper(http.StatusForbidden, nil)
 	tr := newRespRoundTripper(http.StatusForbidden, nil)
-	cl := newTestCluster(membs)
+	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
 	ls := stats.NewLeaderStats("")
 	h := newSendHub(tr, cl, nil, nil, ls)
 	h := newSendHub(tr, cl, nil, nil, ls)
+	m := newTestMember(1, []string{"http://a"}, "", nil)
+	h.Add(m)
 
 
 	shouldstop := h.ShouldStopNotify()
 	shouldstop := h.ShouldStopNotify()
 	select {
 	select {

+ 13 - 0
etcdserver/server.go

@@ -315,6 +315,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		snapCount:  cfg.SnapCount,
 		snapCount:  cfg.SnapCount,
 	}
 	}
 	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
 	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
+	for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) {
+		srv.sendhub.Add(m)
+	}
 	return srv, nil
 	return srv, nil
 }
 }
 
 
@@ -964,6 +967,16 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
 	return
 	return
 }
 }
 
 
+func getOtherMembers(cl ClusterInfo, self string) []*Member {
+	var ms []*Member
+	for _, m := range cl.Members() {
+		if m.Name != self {
+			ms = append(ms, m)
+		}
+	}
+	return ms
+}
+
 // getOtherPeerURLs returns peer urls of other members in the cluster. The
 // getOtherPeerURLs returns peer urls of other members in the cluster. The
 // returned list is sorted in ascending lexicographical order.
 // returned list is sorted in ascending lexicographical order.
 func getOtherPeerURLs(cl ClusterInfo, self string) []string {
 func getOtherPeerURLs(cl ClusterInfo, self string) []string {