Browse Source

etcdserver: refactor sender

1. restrict the number of inflight connections to remote member
2. support stop
Yicheng Qin 11 years ago
parent
commit
1e05cd75c7
5 changed files with 149 additions and 75 deletions
  1. 1 0
      etcdserver/cluster.go
  2. 1 0
      etcdserver/etcdhttp/http_test.go
  3. 110 53
      etcdserver/sender.go
  4. 11 6
      etcdserver/server.go
  5. 26 16
      etcdserver/server_test.go

+ 1 - 0
etcdserver/cluster.go

@@ -46,6 +46,7 @@ type ClusterInfo interface {
 	// Members returns a slice of members sorted by their ID
 	Members() []*Member
 	Member(id types.ID) *Member
+	IsIDRemoved(id types.ID) bool
 }
 
 // Cluster is a list of Members that belong to the same raft cluster

+ 1 - 0
etcdserver/etcdhttp/http_test.go

@@ -57,6 +57,7 @@ func (c *fakeCluster) Members() []*etcdserver.Member {
 	return []*etcdserver.Member(sms)
 }
 func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] }
+func (c *fakeCluster) IsIDRemoved(id types.ID) bool          { return false }
 
 // errServer implements the etcd.Server interface for testing.
 // It returns the given error from any Do/Process/AddMember/RemoveMember calls.

+ 110 - 53
etcdserver/sender.go

@@ -28,42 +28,38 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-const raftPrefix = "/raft"
+const (
+	raftPrefix        = "/raft"
+	maxConnsPerSender = 4
+)
+
+type sendHub struct {
+	tr      *http.Transport
+	cl      ClusterInfo
+	ss      *stats.ServerStats
+	ls      *stats.LeaderStats
+	senders map[types.ID]*sender
+}
 
-// Sender creates the default production sender used to transport raft messages
-// in the cluster. The returned sender will update the given ServerStats and
+// newSendHub creates the default send hub used to transport raft messages
+// to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
-func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
-	c := &http.Client{Transport: t}
-
-	return func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			// TODO: reuse go routines
-			// limit the number of outgoing connections for the same receiver
-			go send(c, cl, m, ss, ls)
-		}
+func newSendHub(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+	return &sendHub{
+		tr:      t,
+		cl:      cl,
+		ss:      ss,
+		ls:      ls,
+		senders: make(map[types.ID]*sender),
 	}
 }
 
-// send uses the given client to send a message to a member in the given
-// ClusterStore, retrying up to 3 times for each message. The given
-// ServerStats and LeaderStats are updated appropriately
-func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
-	to := types.ID(m.To)
-	cid := cl.ID()
-	// TODO (xiangli): reasonable retry logic
-	for i := 0; i < 3; i++ {
-		memb := cl.Member(to)
-		if memb == nil {
-			if !cl.IsIDRemoved(to) {
-				// TODO: unknown peer id.. what do we do? I
-				// don't think his should ever happen, need to
-				// look into this further.
-				log.Printf("etcdserver: error sending message to unknown receiver %s", to.String())
-			}
-			return
+func (h *sendHub) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		s := h.sender(types.ID(m.To))
+		if s == nil {
+			continue
 		}
-		u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
 
 		// TODO: don't block. we should be able to have 1000s
 		// of messages out at a time.
@@ -73,52 +69,113 @@ func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats,
 			return // drop bad message
 		}
 		if m.Type == raftpb.MsgApp {
-			ss.SendAppendReq(len(data))
+			h.ss.SendAppendReq(len(data))
 		}
-		fs := ls.Follower(to.String())
 
+		// TODO (xiangli): reasonable retry logic
+		s.send(data)
+	}
+}
+
+func (h *sendHub) Stop() {
+	for _, s := range h.senders {
+		s.stop()
+	}
+}
+
+func (h *sendHub) sender(id types.ID) *sender {
+	if s, ok := h.senders[id]; ok {
+		return s
+	}
+	return h.add(id)
+}
+
+func (h *sendHub) add(id types.ID) *sender {
+	memb := h.cl.Member(id)
+	if memb == nil {
+		if !h.cl.IsIDRemoved(id) {
+			log.Printf("etcdserver: error sending message to unknown receiver %s", id)
+		}
+		return nil
+	}
+	// TODO: considering how to switch between all available peer urls
+	u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
+	c := &http.Client{Transport: h.tr}
+	fs := h.ls.Follower(id.String())
+	s := newSender(u, h.cl.ID(), c, fs)
+	// TODO: recycle sender during long running
+	h.senders[id] = s
+	return s
+}
+
+type sender struct {
+	u   string
+	cid types.ID
+	c   *http.Client
+	fs  *stats.FollowerStats
+	q   chan []byte
+}
+
+func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
+	s := &sender{u: u, cid: cid, c: c, fs: fs, q: make(chan []byte)}
+	for i := 0; i < maxConnsPerSender; i++ {
+		go s.handle()
+	}
+	return s
+}
+
+func (s *sender) send(data []byte) {
+	// TODO: we cannot afford the miss of MsgProp, so we wait for some handler
+	// to take the data
+	s.q <- data
+}
+
+func (s *sender) stop() {
+	close(s.q)
+}
+
+func (s *sender) handle() {
+	for d := range s.q {
 		start := time.Now()
-		sent := httpPost(c, u, cid, data)
+		err := s.post(d)
 		end := time.Now()
-		if sent {
-			fs.Succ(end.Sub(start))
-			return
+		if err != nil {
+			s.fs.Fail()
+			log.Printf("sender: %v", err)
+			continue
 		}
-		fs.Fail()
-		// TODO: backoff
+		s.fs.Succ(end.Sub(start))
 	}
 }
 
-// httpPost POSTs a data payload to a url using the given client. Returns true
-// if the POST succeeds, false on any failure.
-func httpPost(c *http.Client, url string, cid types.ID, data []byte) bool {
-	req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (s *sender) post(data []byte) error {
+	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
 	if err != nil {
-		// TODO: log the error?
-		return false
+		return fmt.Errorf("new request to %s error: %v", s.u, err)
 	}
 	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
-	resp, err := c.Do(req)
+	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
+	resp, err := s.c.Do(req)
 	if err != nil {
-		// TODO: log the error?
-		return false
+		return fmt.Errorf("do request %+v error: %v", req, err)
 	}
 	resp.Body.Close()
 
 	switch resp.StatusCode {
 	case http.StatusPreconditionFailed:
 		// TODO: shutdown the etcdserver gracefully?
-		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), cid.String())
-		return false
+		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		return nil
 	case http.StatusForbidden:
 		// TODO: stop the server
 		log.Println("etcd: this member has been permanently removed from the cluster")
 		log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
-		return false
+		return nil
 	case http.StatusNoContent:
-		return true
+		return nil
 	default:
-		return false
+		return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
 	}
 }

+ 11 - 6
etcdserver/server.go

@@ -78,14 +78,17 @@ func init() {
 	rand.Seed(time.Now().UnixNano())
 }
 
-type sendFunc func(m []raftpb.Message)
-
 type Response struct {
 	Event   *store.Event
 	Watcher store.Watcher
 	err     error
 }
 
+type Sender interface {
+	Send(m []raftpb.Message)
+	Stop()
+}
+
 type Storage interface {
 	// Save function saves ents and state to the underlying stable storage.
 	// Save MUST block until st and ents are on stable storage.
@@ -156,11 +159,11 @@ type EtcdServer struct {
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 
-	// send specifies the send function for sending msgs to members. send
+	// sender specifies the sender to send msgs to members. sending msgs
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If send is nil, server will
 	// panic.
-	send sendFunc
+	sender Sender
 
 	storage Storage
 
@@ -241,6 +244,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 	lstats := stats.NewLeaderStats(id.String())
 
+	shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
 	s := &EtcdServer{
 		store:      st,
 		node:       n,
@@ -253,7 +257,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		}{w, ss},
 		stats:      sstats,
 		lstats:     lstats,
-		send:       Sender(cfg.Transport, cfg.Cluster, sstats, lstats),
+		sender:     shub,
 		Ticker:     time.Tick(100 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
 		snapCount:  cfg.SnapCount,
@@ -318,7 +322,7 @@ func (s *EtcdServer) run() {
 			if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
 				log.Fatalf("etcdserver: create snapshot error: %v", err)
 			}
-			s.send(rd.Messages)
+			s.sender.Send(rd.Messages)
 
 			// TODO(bmizerany): do this in the background, but take
 			// care to apply entries in a single goroutine, and not
@@ -361,6 +365,7 @@ func (s *EtcdServer) Stop() {
 	s.node.Stop()
 	close(s.done)
 	<-s.stopped
+	s.sender.Stop()
 }
 
 // Do interprets r and performs an operation on s.store according to r.Method

+ 26 - 16
etcdserver/server_test.go

@@ -487,19 +487,23 @@ func TestApplyConfChangeError(t *testing.T) {
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 
+type fakeSender struct {
+	ss []*EtcdServer
+}
+
+func (s *fakeSender) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		s.ss[m.To-1].node.Step(context.TODO(), m)
+	}
+}
+func (s *fakeSender) Stop() {}
+
 func testServer(t *testing.T, ns uint64) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
 	ss := make([]*EtcdServer, ns)
 
-	send := func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			t.Logf("m = %+v\n", m)
-			ss[m.To-1].node.Step(ctx, m)
-		}
-	}
-
 	ids := make([]uint64, ns)
 	for i := uint64(0); i < ns; i++ {
 		ids[i] = i + 1
@@ -516,7 +520,7 @@ func testServer(t *testing.T, ns uint64) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			send:    send,
+			sender:  &fakeSender{ss},
 			storage: &storageRecorder{},
 			Ticker:  tk.C,
 			Cluster: cl,
@@ -585,7 +589,7 @@ func TestDoProposal(t *testing.T) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			send:    func(_ []raftpb.Message) {},
+			sender:  &nopSender{},
 			storage: &storageRecorder{},
 			Ticker:  tk,
 			Cluster: cl,
@@ -668,7 +672,7 @@ func TestDoProposalStopped(t *testing.T) {
 		// TODO: use fake node for better testability
 		node:    n,
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Ticker:  tk,
 	}
@@ -768,7 +772,7 @@ func TestSyncTrigger(t *testing.T) {
 	srv := &EtcdServer{
 		node:       n,
 		store:      &storeRecorder{},
-		send:       func(_ []raftpb.Message) {},
+		sender:     &nopSender{},
 		storage:    &storageRecorder{},
 		SyncTicker: st,
 	}
@@ -842,7 +846,7 @@ func TestTriggerSnap(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:     st,
-		send:      func(_ []raftpb.Message) {},
+		sender:    &nopSender{},
 		storage:   p,
 		node:      n,
 		snapCount: 10,
@@ -876,7 +880,7 @@ func TestRecvSnapshot(t *testing.T) {
 	p := &storageRecorder{}
 	s := &EtcdServer{
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: p,
 		node:    n,
 	}
@@ -904,7 +908,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	s := &EtcdServer{
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		node:    n,
 	}
@@ -939,7 +943,7 @@ func TestAddMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -974,7 +978,7 @@ func TestRemoveMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -1042,6 +1046,7 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		node:    &nodeRecorder{},
+		sender:  &nopSender{},
 		Cluster: &Cluster{},
 		w:       &waitRecorder{},
 		done:    make(chan struct{}),
@@ -1402,6 +1407,11 @@ func (cs *removedClusterStore) Get() Cluster             { return Cluster{} }
 func (cs *removedClusterStore) Remove(id uint64)         {}
 func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] }
 
+type nopSender struct{}
+
+func (s *nopSender) Send(m []raftpb.Message) {}
+func (s *nopSender) Stop()                   {}
+
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))
 	for i, id := range ids {