Browse Source

Merge pull request #2010 from yichengq/264

rafthttp: cleanup transport
Yicheng Qin 11 years ago
parent
commit
dc6ba914c8

+ 1 - 1
etcdserver/server.go

@@ -275,7 +275,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		RoundTripper: cfg.Transport,
 		RoundTripper: cfg.Transport,
 		ID:           id,
 		ID:           id,
 		ClusterID:    cfg.Cluster.ID(),
 		ClusterID:    cfg.Cluster.ID(),
-		Processor:    srv,
+		Raft:         srv,
 		ServerStats:  sstats,
 		ServerStats:  sstats,
 		LeaderStats:  lstats,
 		LeaderStats:  lstats,
 	}
 	}

+ 1 - 4
etcdserver/server_test.go

@@ -37,7 +37,6 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
-	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
 
 
@@ -536,8 +535,7 @@ type fakeTransporter struct {
 	ss []*EtcdServer
 	ss []*EtcdServer
 }
 }
 
 
-func (s *fakeTransporter) Handler() http.Handler              { return nil }
-func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
+func (s *fakeTransporter) Handler() http.Handler { return nil }
 func (s *fakeTransporter) Send(msgs []raftpb.Message) {
 func (s *fakeTransporter) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 	for _, m := range msgs {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
 		s.ss[m.To-1].node.Step(context.TODO(), m)
@@ -1632,7 +1630,6 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 type nopTransporter struct{}
 type nopTransporter struct{}
 
 
 func (s *nopTransporter) Handler() http.Handler               { return nil }
 func (s *nopTransporter) Handler() http.Handler               { return nil }
-func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender  { return nil }
 func (s *nopTransporter) Send(m []raftpb.Message)             {}
 func (s *nopTransporter) Send(m []raftpb.Message)             {}
 func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
 func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
 func (s *nopTransporter) RemovePeer(id types.ID)              {}
 func (s *nopTransporter) RemovePeer(id types.ID)              {}

+ 14 - 19
rafthttp/http.go

@@ -40,30 +40,25 @@ var (
 	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 )
 )
 
 
-type SenderFinder interface {
-	// Sender returns the sender of the given id.
-	Sender(id types.ID) Sender
-}
-
-func NewHandler(p Processor, cid types.ID) http.Handler {
+func NewHandler(r Raft, cid types.ID) http.Handler {
 	return &handler{
 	return &handler{
-		p:   p,
+		r:   r,
 		cid: cid,
 		cid: cid,
 	}
 	}
 }
 }
 
 
 // NewStreamHandler returns a handler which initiates streamer when receiving
 // NewStreamHandler returns a handler which initiates streamer when receiving
 // stream request from follower.
 // stream request from follower.
-func NewStreamHandler(finder SenderFinder, id, cid types.ID) http.Handler {
+func NewStreamHandler(tr *Transport, id, cid types.ID) http.Handler {
 	return &streamHandler{
 	return &streamHandler{
-		finder: finder,
-		id:     id,
-		cid:    cid,
+		tr:  tr,
+		id:  id,
+		cid: cid,
 	}
 	}
 }
 }
 
 
 type handler struct {
 type handler struct {
-	p   Processor
+	r   Raft
 	cid types.ID
 	cid types.ID
 }
 }
 
 
@@ -99,7 +94,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
 		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
 		return
 		return
 	}
 	}
-	if err := h.p.Process(context.TODO(), m); err != nil {
+	if err := h.r.Process(context.TODO(), m); err != nil {
 		switch v := err.(type) {
 		switch v := err.(type) {
 		case writerToResponse:
 		case writerToResponse:
 			v.WriteTo(w)
 			v.WriteTo(w)
@@ -113,9 +108,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 }
 
 
 type streamHandler struct {
 type streamHandler struct {
-	finder SenderFinder
-	id     types.ID
-	cid    types.ID
+	tr  *Transport
+	id  types.ID
+	cid types.ID
 }
 }
 
 
 func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -132,8 +127,8 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		http.Error(w, "invalid path", http.StatusNotFound)
 		http.Error(w, "invalid path", http.StatusNotFound)
 		return
 		return
 	}
 	}
-	s := h.finder.Sender(from)
-	if s == nil {
+	p := h.tr.Peer(from)
+	if p == nil {
 		log.Printf("rafthttp: fail to find sender %s", from)
 		log.Printf("rafthttp: fail to find sender %s", from)
 		http.Error(w, "error sender not found", http.StatusNotFound)
 		http.Error(w, "error sender not found", http.StatusNotFound)
 		return
 		return
@@ -164,7 +159,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.(http.Flusher).Flush()
 	w.(http.Flusher).Flush()
 
 
-	done, err := s.StartStreaming(w.(WriteFlusher), from, term)
+	done, err := p.StartStreaming(w.(WriteFlusher), from, term)
 	if err != nil {
 	if err != nil {
 		log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
 		log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
 		// TODO: consider http status and info here
 		// TODO: consider http status and info here

+ 1 - 1
rafthttp/http_test.go

@@ -36,7 +36,7 @@ func TestServeRaft(t *testing.T) {
 	testCases := []struct {
 	testCases := []struct {
 		method    string
 		method    string
 		body      io.Reader
 		body      io.Reader
-		p         Processor
+		p         Raft
 		clusterID string
 		clusterID string
 
 
 		wcode int
 		wcode int

+ 123 - 134
rafthttp/sender.go → rafthttp/peer.go

@@ -45,53 +45,33 @@ const (
 	ConnWriteTimeout = 5 * time.Second
 	ConnWriteTimeout = 5 * time.Second
 )
 )
 
 
-type Sender interface {
-	// StartStreaming enables streaming in the sender using the given writer,
-	// which provides a fast and efficient way to send appendEntry messages.
-	StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
-	Update(u string)
-	// Send sends the data to the remote node. It is always non-blocking.
-	// It may be fail to send data if it returns nil error.
-	Send(m raftpb.Message) error
-	// Stop performs any necessary finalization and terminates the Sender
-	// elegantly.
-	Stop()
-
-	// Pause pauses the sender. The sender will simply drops all incoming
-	// messages without retruning an error.
-	Pause()
-
-	// Resume resumes a paused sender.
-	Resume()
-}
-
-func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
-	s := &sender{
+func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer {
+	p := &peer{
 		id:          id,
 		id:          id,
 		active:      true,
 		active:      true,
 		tr:          tr,
 		tr:          tr,
 		u:           u,
 		u:           u,
 		cid:         cid,
 		cid:         cid,
-		p:           p,
+		r:           r,
 		fs:          fs,
 		fs:          fs,
 		shouldstop:  shouldstop,
 		shouldstop:  shouldstop,
 		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
 		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
 		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
 		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
 		q:           make(chan *raftpb.Message, senderBufSize),
 		q:           make(chan *raftpb.Message, senderBufSize),
 	}
 	}
-	s.wg.Add(connPerSender)
+	p.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
 	for i := 0; i < connPerSender; i++ {
-		go s.handle()
+		go p.handle()
 	}
 	}
-	return s
+	return p
 }
 }
 
 
-type sender struct {
+type peer struct {
 	id  types.ID
 	id  types.ID
 	cid types.ID
 	cid types.ID
 
 
 	tr         http.RoundTripper
 	tr         http.RoundTripper
-	p          Processor
+	r          Raft
 	fs         *stats.FollowerStats
 	fs         *stats.FollowerStats
 	shouldstop chan struct{}
 	shouldstop chan struct{}
 
 
@@ -115,201 +95,210 @@ type sender struct {
 	paused  bool
 	paused  bool
 }
 }
 
 
-func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
-	s.strmSrvMu.Lock()
-	defer s.strmSrvMu.Unlock()
-	if s.strmSrv != nil {
+// StartStreaming enables streaming in the peer using the given writer,
+// which provides a fast and efficient way to send appendEntry messages.
+func (p *peer) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
+	p.strmSrvMu.Lock()
+	defer p.strmSrvMu.Unlock()
+	if p.strmSrv != nil {
 		// ignore lower-term streaming request
 		// ignore lower-term streaming request
-		if term < s.strmSrv.term {
-			return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
+		if term < p.strmSrv.term {
+			return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, p.strmSrv.term)
 		}
 		}
 		// stop the existing one
 		// stop the existing one
-		s.strmSrv.stop()
-		s.strmSrv = nil
+		p.strmSrv.stop()
+		p.strmSrv = nil
 	}
 	}
-	s.strmSrv = startStreamServer(w, to, term, s.fs)
-	return s.strmSrv.stopNotify(), nil
+	p.strmSrv = startStreamServer(w, to, term, p.fs)
+	return p.strmSrv.stopNotify(), nil
 }
 }
 
 
-func (s *sender) Update(u string) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.u = u
+func (p *peer) Update(u string) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.u = u
 }
 }
 
 
+// Send sends the data to the remote node. It is always non-blocking.
+// It may be fail to send data if it returns nil error.
 // TODO (xiangli): reasonable retry logic
 // TODO (xiangli): reasonable retry logic
-func (s *sender) Send(m raftpb.Message) error {
-	s.mu.RLock()
-	pause := s.paused
-	s.mu.RUnlock()
+func (p *peer) Send(m raftpb.Message) error {
+	p.mu.RLock()
+	pause := p.paused
+	p.mu.RUnlock()
 	if pause {
 	if pause {
 		return nil
 		return nil
 	}
 	}
 
 
-	s.maybeStopStream(m.Term)
-	if shouldInitStream(m) && !s.hasStreamClient() {
-		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
-		s.batcher.Reset(time.Now())
+	p.maybeStopStream(m.Term)
+	if shouldInitStream(m) && !p.hasStreamClient() {
+		p.initStream(types.ID(m.From), types.ID(m.To), m.Term)
+		p.batcher.Reset(time.Now())
 	}
 	}
 
 
 	var err error
 	var err error
 	switch {
 	switch {
 	case isProposal(m):
 	case isProposal(m):
-		s.propBatcher.Batch(m)
-	case canBatch(m) && s.hasStreamClient():
-		if !s.batcher.ShouldBatch(time.Now()) {
-			err = s.send(m)
+		p.propBatcher.Batch(m)
+	case canBatch(m) && p.hasStreamClient():
+		if !p.batcher.ShouldBatch(time.Now()) {
+			err = p.send(m)
 		}
 		}
 	case canUseStream(m):
 	case canUseStream(m):
-		if ok := s.tryStream(m); !ok {
-			err = s.send(m)
+		if ok := p.tryStream(m); !ok {
+			err = p.send(m)
 		}
 		}
 	default:
 	default:
-		err = s.send(m)
+		err = p.send(m)
 	}
 	}
 	// send out batched MsgProp if needed
 	// send out batched MsgProp if needed
 	// TODO: it is triggered by all outcoming send now, and it needs
 	// TODO: it is triggered by all outcoming send now, and it needs
 	// more clear solution. Either use separate goroutine to trigger it
 	// more clear solution. Either use separate goroutine to trigger it
 	// or use streaming.
 	// or use streaming.
-	if !s.propBatcher.IsEmpty() {
+	if !p.propBatcher.IsEmpty() {
 		t := time.Now()
 		t := time.Now()
-		if !s.propBatcher.ShouldBatch(t) {
-			s.send(s.propBatcher.Message)
-			s.propBatcher.Reset(t)
+		if !p.propBatcher.ShouldBatch(t) {
+			p.send(p.propBatcher.Message)
+			p.propBatcher.Reset(t)
 		}
 		}
 	}
 	}
 	return err
 	return err
 }
 }
 
 
-func (s *sender) send(m raftpb.Message) error {
+func (p *peer) send(m raftpb.Message) error {
 	// TODO: don't block. we should be able to have 1000s
 	// TODO: don't block. we should be able to have 1000s
 	// of messages out at a time.
 	// of messages out at a time.
 	select {
 	select {
-	case s.q <- &m:
+	case p.q <- &m:
 		return nil
 		return nil
 	default:
 	default:
 		log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
 		log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
-			m.Type, senderBufSize, s.u)
+			m.Type, senderBufSize, p.u)
 		return fmt.Errorf("reach maximal serving")
 		return fmt.Errorf("reach maximal serving")
 	}
 	}
 }
 }
 
 
-func (s *sender) Stop() {
-	close(s.q)
-	s.wg.Wait()
-	s.strmSrvMu.Lock()
-	if s.strmSrv != nil {
-		s.strmSrv.stop()
-		s.strmSrv = nil
+// Stop performs any necessary finalization and terminates the peer
+// elegantly.
+func (p *peer) Stop() {
+	close(p.q)
+	p.wg.Wait()
+	p.strmSrvMu.Lock()
+	if p.strmSrv != nil {
+		p.strmSrv.stop()
+		p.strmSrv = nil
 	}
 	}
-	s.strmSrvMu.Unlock()
-	if s.strmCln != nil {
-		s.strmCln.stop()
+	p.strmSrvMu.Unlock()
+	if p.strmCln != nil {
+		p.strmCln.stop()
 	}
 	}
 }
 }
 
 
-func (s *sender) Pause() {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.paused = true
+// Pause pauses the peer. The peer will simply drops all incoming
+// messages without retruning an error.
+func (p *peer) Pause() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = true
 }
 }
 
 
-func (s *sender) Resume() {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.paused = false
+// Resume resumes a paused peer.
+func (p *peer) Resume() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = false
 }
 }
 
 
-func (s *sender) maybeStopStream(term uint64) {
-	if s.strmCln != nil && term > s.strmCln.term {
-		s.strmCln.stop()
-		s.strmCln = nil
+func (p *peer) maybeStopStream(term uint64) {
+	if p.strmCln != nil && term > p.strmCln.term {
+		p.strmCln.stop()
+		p.strmCln = nil
 	}
 	}
-	s.strmSrvMu.Lock()
-	defer s.strmSrvMu.Unlock()
-	if s.strmSrv != nil && term > s.strmSrv.term {
-		s.strmSrv.stop()
-		s.strmSrv = nil
+	p.strmSrvMu.Lock()
+	defer p.strmSrvMu.Unlock()
+	if p.strmSrv != nil && term > p.strmSrv.term {
+		p.strmSrv.stop()
+		p.strmSrv = nil
 	}
 	}
 }
 }
 
 
-func (s *sender) hasStreamClient() bool {
-	return s.strmCln != nil && !s.strmCln.isStopped()
+func (p *peer) hasStreamClient() bool {
+	return p.strmCln != nil && !p.strmCln.isStopped()
 }
 }
 
 
-func (s *sender) initStream(from, to types.ID, term uint64) {
-	strmCln := newStreamClient(from, to, term, s.p)
-	s.mu.Lock()
-	u := s.u
-	s.mu.Unlock()
-	if err := strmCln.start(s.tr, u, s.cid); err != nil {
+func (p *peer) initStream(from, to types.ID, term uint64) {
+	strmCln := newStreamClient(from, to, term, p.r)
+	p.mu.Lock()
+	u := p.u
+	p.mu.Unlock()
+	if err := strmCln.start(p.tr, u, p.cid); err != nil {
 		log.Printf("rafthttp: start stream client error: %v", err)
 		log.Printf("rafthttp: start stream client error: %v", err)
 		return
 		return
 	}
 	}
-	s.strmCln = strmCln
+	p.strmCln = strmCln
 }
 }
 
 
-func (s *sender) tryStream(m raftpb.Message) bool {
-	s.strmSrvMu.Lock()
-	defer s.strmSrvMu.Unlock()
-	if s.strmSrv == nil || m.Term != s.strmSrv.term {
+func (p *peer) tryStream(m raftpb.Message) bool {
+	p.strmSrvMu.Lock()
+	defer p.strmSrvMu.Unlock()
+	if p.strmSrv == nil || m.Term != p.strmSrv.term {
 		return false
 		return false
 	}
 	}
-	if err := s.strmSrv.send(m.Entries); err != nil {
+	if err := p.strmSrv.send(m.Entries); err != nil {
 		log.Printf("rafthttp: send stream message error: %v", err)
 		log.Printf("rafthttp: send stream message error: %v", err)
-		s.strmSrv.stop()
-		s.strmSrv = nil
+		p.strmSrv.stop()
+		p.strmSrv = nil
 		return false
 		return false
 	}
 	}
 	return true
 	return true
 }
 }
 
 
-func (s *sender) handle() {
-	defer s.wg.Done()
-	for m := range s.q {
+func (p *peer) handle() {
+	defer p.wg.Done()
+	for m := range p.q {
 		start := time.Now()
 		start := time.Now()
-		err := s.post(pbutil.MustMarshal(m))
+		err := p.post(pbutil.MustMarshal(m))
 		end := time.Now()
 		end := time.Now()
 
 
-		s.mu.Lock()
+		p.mu.Lock()
 		if err != nil {
 		if err != nil {
-			if s.errored == nil || s.errored.Error() != err.Error() {
-				log.Printf("sender: error posting to %s: %v", s.id, err)
-				s.errored = err
+			if p.errored == nil || p.errored.Error() != err.Error() {
+				log.Printf("sender: error posting to %s: %v", p.id, err)
+				p.errored = err
 			}
 			}
-			if s.active {
-				log.Printf("sender: the connection with %s becomes inactive", s.id)
-				s.active = false
+			if p.active {
+				log.Printf("sender: the connection with %s becomes inactive", p.id)
+				p.active = false
 			}
 			}
 			if m.Type == raftpb.MsgApp {
 			if m.Type == raftpb.MsgApp {
-				s.fs.Fail()
+				p.fs.Fail()
 			}
 			}
 		} else {
 		} else {
-			if !s.active {
-				log.Printf("sender: the connection with %s becomes active", s.id)
-				s.active = true
-				s.errored = nil
+			if !p.active {
+				log.Printf("sender: the connection with %s becomes active", p.id)
+				p.active = true
+				p.errored = nil
 			}
 			}
 			if m.Type == raftpb.MsgApp {
 			if m.Type == raftpb.MsgApp {
-				s.fs.Succ(end.Sub(start))
+				p.fs.Succ(end.Sub(start))
 			}
 			}
 		}
 		}
-		s.mu.Unlock()
+		p.mu.Unlock()
 	}
 	}
 }
 }
 
 
 // post POSTs a data payload to a url. Returns nil if the POST succeeds,
 // post POSTs a data payload to a url. Returns nil if the POST succeeds,
 // error on any failure.
 // error on any failure.
-func (s *sender) post(data []byte) error {
-	s.mu.RLock()
-	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
-	s.mu.RUnlock()
+func (p *peer) post(data []byte) error {
+	p.mu.RLock()
+	req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data))
+	p.mu.RUnlock()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	req.Header.Set("Content-Type", "application/protobuf")
 	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
-	resp, err := s.tr.RoundTrip(req)
+	req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
+	resp, err := p.tr.RoundTrip(req)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -318,14 +307,14 @@ func (s *sender) post(data []byte) error {
 	switch resp.StatusCode {
 	switch resp.StatusCode {
 	case http.StatusPreconditionFailed:
 	case http.StatusPreconditionFailed:
 		select {
 		select {
-		case s.shouldstop <- struct{}{}:
+		case p.shouldstop <- struct{}{}:
 		default:
 		default:
 		}
 		}
-		log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
 		return nil
 		return nil
 	case http.StatusForbidden:
 	case http.StatusForbidden:
 		select {
 		select {
-		case s.shouldstop <- struct{}{}:
+		case p.shouldstop <- struct{}{}:
 		default:
 		default:
 		}
 		}
 		log.Println("rafthttp: this member has been permanently removed from the cluster")
 		log.Println("rafthttp: this member has been permanently removed from the cluster")

+ 20 - 20
rafthttp/sender_test.go → rafthttp/peer_test.go

@@ -34,12 +34,12 @@ import (
 func TestSenderSend(t *testing.T) {
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 
-	if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
+	if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 		t.Fatalf("unexpect send error: %v", err)
 	}
 	}
-	s.Stop()
+	p.Stop()
 
 
 	if tr.Request() == nil {
 	if tr.Request() == nil {
 		t.Errorf("sender fails to post the data")
 		t.Errorf("sender fails to post the data")
@@ -54,12 +54,12 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 
 	// keep the sender busy and make the buffer full
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
 	// nothing can go out as we block the sender
 	for i := 0; i < connPerSender+senderBufSize; i++ {
 	for i := 0; i < connPerSender+senderBufSize; i++ {
-		if err := s.Send(raftpb.Message{}); err != nil {
+		if err := p.Send(raftpb.Message{}); err != nil {
 			t.Errorf("send err = %v, want nil", err)
 			t.Errorf("send err = %v, want nil", err)
 		}
 		}
 		// force the sender to grab data
 		// force the sender to grab data
@@ -67,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	}
 	}
 
 
 	// try to send a data when we are sure the buffer is full
 	// try to send a data when we are sure the buffer is full
-	if err := s.Send(raftpb.Message{}); err == nil {
+	if err := p.Send(raftpb.Message{}); err == nil {
 		t.Errorf("unexpect send success")
 		t.Errorf("unexpect send success")
 	}
 	}
 
 
@@ -76,22 +76,22 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	testutil.ForceGosched()
 	testutil.ForceGosched()
 
 
 	// It could send new data after previous ones succeed
 	// It could send new data after previous ones succeed
-	if err := s.Send(raftpb.Message{}); err != nil {
+	if err := p.Send(raftpb.Message{}); err != nil {
 		t.Errorf("send err = %v, want nil", err)
 		t.Errorf("send err = %v, want nil", err)
 	}
 	}
-	s.Stop()
+	p.Stop()
 }
 }
 
 
 // TestSenderSendFailed tests that when send func meets the post error,
 // TestSenderSendFailed tests that when send func meets the post error,
 // it increases fail count in stats.
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 
-	if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
+	if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
 		t.Fatalf("unexpect Send error: %v", err)
 	}
 	}
-	s.Stop()
+	p.Stop()
 
 
 	fs.Lock()
 	fs.Lock()
 	defer fs.Unlock()
 	defer fs.Unlock()
@@ -102,11 +102,11 @@ func TestSenderSendFailed(t *testing.T) {
 
 
 func TestSenderPost(t *testing.T) {
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
-	if err := s.post([]byte("some data")); err != nil {
+	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
+	if err := p.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 		t.Fatalf("unexpect post error: %v", err)
 	}
 	}
-	s.Stop()
+	p.Stop()
 
 
 	if g := tr.Request().Method; g != "POST" {
 	if g := tr.Request().Method; g != "POST" {
 		t.Errorf("method = %s, want %s", g, "POST")
 		t.Errorf("method = %s, want %s", g, "POST")
@@ -145,9 +145,9 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		shouldstop := make(chan struct{})
 		shouldstop := make(chan struct{})
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
-		err := s.post([]byte("some data"))
-		s.Stop()
+		p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
+		err := p.post([]byte("some data"))
+		p.Stop()
 
 
 		if err == nil {
 		if err == nil {
 			t.Errorf("#%d: err = nil, want not nil", i)
 			t.Errorf("#%d: err = nil, want not nil", i)
@@ -166,9 +166,9 @@ func TestSenderPostShouldStop(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		shouldstop := make(chan struct{}, 1)
 		shouldstop := make(chan struct{}, 1)
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
-		s.post([]byte("some data"))
-		s.Stop()
+		p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
+		p.post([]byte("some data"))
+		p.Stop()
 		select {
 		select {
 		case <-shouldstop:
 		case <-shouldstop:
 		default:
 		default:

+ 0 - 153
rafthttp/sendhub.go

@@ -1,153 +0,0 @@
-/*
-   Copyright 2014 CoreOS, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package rafthttp
-
-import (
-	"log"
-	"net/http"
-	"net/url"
-	"path"
-	"sync"
-
-	"github.com/coreos/etcd/etcdserver/stats"
-	"github.com/coreos/etcd/pkg/types"
-	"github.com/coreos/etcd/raft/raftpb"
-)
-
-const (
-	raftPrefix = "/raft"
-)
-
-type sendHub struct {
-	tr         http.RoundTripper
-	cid        types.ID
-	p          Processor
-	ss         *stats.ServerStats
-	ls         *stats.LeaderStats
-	mu         sync.RWMutex // protect the sender map
-	senders    map[types.ID]Sender
-	shouldstop chan struct{}
-}
-
-// newSendHub creates the default send hub used to transport raft messages
-// to other members. The returned sendHub will update the given ServerStats and
-// LeaderStats appropriately.
-func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
-	return &sendHub{
-		tr:         t,
-		cid:        cid,
-		p:          p,
-		ss:         ss,
-		ls:         ls,
-		senders:    make(map[types.ID]Sender),
-		shouldstop: make(chan struct{}, 1),
-	}
-}
-
-func (h *sendHub) Sender(id types.ID) Sender {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.senders[id]
-}
-
-func (h *sendHub) Send(msgs []raftpb.Message) {
-	for _, m := range msgs {
-		// intentionally dropped message
-		if m.To == 0 {
-			continue
-		}
-		to := types.ID(m.To)
-		s, ok := h.senders[to]
-		if !ok {
-			log.Printf("etcdserver: send message to unknown receiver %s", to)
-			continue
-		}
-
-		if m.Type == raftpb.MsgApp {
-			h.ss.SendAppendReq(m.Size())
-		}
-
-		s.Send(m)
-	}
-}
-
-func (h *sendHub) Stop() {
-	for _, s := range h.senders {
-		s.Stop()
-	}
-	if tr, ok := h.tr.(*http.Transport); ok {
-		tr.CloseIdleConnections()
-	}
-}
-
-func (h *sendHub) ShouldStopNotify() <-chan struct{} {
-	return h.shouldstop
-}
-
-func (h *sendHub) AddPeer(id types.ID, urls []string) {
-	h.mu.Lock()
-	defer h.mu.Unlock()
-	if _, ok := h.senders[id]; ok {
-		return
-	}
-	// TODO: considering how to switch between all available peer urls
-	peerURL := urls[0]
-	u, err := url.Parse(peerURL)
-	if err != nil {
-		log.Panicf("unexpect peer url %s", peerURL)
-	}
-	u.Path = path.Join(u.Path, raftPrefix)
-	fs := h.ls.Follower(id.String())
-	s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop)
-	h.senders[id] = s
-}
-
-func (h *sendHub) RemovePeer(id types.ID) {
-	h.mu.Lock()
-	defer h.mu.Unlock()
-	h.senders[id].Stop()
-	delete(h.senders, id)
-}
-
-func (h *sendHub) UpdatePeer(id types.ID, urls []string) {
-	h.mu.Lock()
-	defer h.mu.Unlock()
-	// TODO: return error or just panic?
-	if _, ok := h.senders[id]; !ok {
-		return
-	}
-	peerURL := urls[0]
-	u, err := url.Parse(peerURL)
-	if err != nil {
-		log.Panicf("unexpect peer url %s", peerURL)
-	}
-	u.Path = path.Join(u.Path, raftPrefix)
-	h.senders[id].Update(u.String())
-}
-
-// for testing
-func (h *sendHub) Pause() {
-	for _, s := range h.senders {
-		s.Pause()
-	}
-}
-
-func (h *sendHub) Resume() {
-	for _, s := range h.senders {
-		s.Resume()
-	}
-}

+ 4 - 4
rafthttp/streamer.go

@@ -107,18 +107,18 @@ type streamClient struct {
 	id   types.ID
 	id   types.ID
 	to   types.ID
 	to   types.ID
 	term uint64
 	term uint64
-	p    Processor
+	r    Raft
 
 
 	closer io.Closer
 	closer io.Closer
 	done   chan struct{}
 	done   chan struct{}
 }
 }
 
 
-func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
+func newStreamClient(id, to types.ID, term uint64, r Raft) *streamClient {
 	return &streamClient{
 	return &streamClient{
 		id:   id,
 		id:   id,
 		to:   to,
 		to:   to,
 		term: term,
 		term: term,
-		p:    p,
+		r:    r,
 		done: make(chan struct{}),
 		done: make(chan struct{}),
 	}
 	}
 }
 }
@@ -199,7 +199,7 @@ func (s *streamClient) handle(r io.Reader) {
 			Index:   ents[0].Index - 1,
 			Index:   ents[0].Index - 1,
 			Entries: ents,
 			Entries: ents,
 		}
 		}
-		if err := s.p.Process(context.TODO(), msg); err != nil {
+		if err := s.r.Process(context.TODO(), msg); err != nil {
 			log.Printf("rafthttp: process raft message error: %v", err)
 			log.Printf("rafthttp: process raft message error: %v", err)
 			return
 			return
 		}
 		}

+ 114 - 9
rafthttp/transport.go

@@ -1,7 +1,11 @@
 package rafthttp
 package rafthttp
 
 
 import (
 import (
+	"log"
 	"net/http"
 	"net/http"
+	"net/url"
+	"path"
+	"sync"
 
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
@@ -10,7 +14,11 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 )
 
 
-type Processor interface {
+const (
+	raftPrefix = "/raft"
+)
+
+type Raft interface {
 	Process(ctx context.Context, m raftpb.Message) error
 	Process(ctx context.Context, m raftpb.Message) error
 }
 }
 
 
@@ -28,22 +36,119 @@ type Transport struct {
 	RoundTripper http.RoundTripper
 	RoundTripper http.RoundTripper
 	ID           types.ID
 	ID           types.ID
 	ClusterID    types.ID
 	ClusterID    types.ID
-	Processor    Processor
+	Raft         Raft
 	ServerStats  *stats.ServerStats
 	ServerStats  *stats.ServerStats
 	LeaderStats  *stats.LeaderStats
 	LeaderStats  *stats.LeaderStats
 
 
-	*sendHub
-	handler http.Handler
+	mu         sync.RWMutex       // protect the peer map
+	peers      map[types.ID]*peer // remote peers
+	shouldstop chan struct{}
 }
 }
 
 
 func (t *Transport) Start() {
 func (t *Transport) Start() {
-	t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats)
-	h := NewHandler(t.Processor, t.ClusterID)
-	sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID)
+	t.peers = make(map[types.ID]*peer)
+	t.shouldstop = make(chan struct{}, 1)
+}
+
+func (t *Transport) Handler() http.Handler {
+	h := NewHandler(t.Raft, t.ClusterID)
+	sh := NewStreamHandler(t, t.ID, t.ClusterID)
 	mux := http.NewServeMux()
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, h)
 	mux.Handle(RaftPrefix, h)
 	mux.Handle(RaftStreamPrefix+"/", sh)
 	mux.Handle(RaftStreamPrefix+"/", sh)
-	t.handler = mux
+	return mux
+}
+
+func (t *Transport) Peer(id types.ID) *peer {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	return t.peers[id]
+}
+
+func (t *Transport) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		// intentionally dropped message
+		if m.To == 0 {
+			continue
+		}
+		to := types.ID(m.To)
+		p, ok := t.peers[to]
+		if !ok {
+			log.Printf("etcdserver: send message to unknown receiver %s", to)
+			continue
+		}
+
+		if m.Type == raftpb.MsgApp {
+			t.ServerStats.SendAppendReq(m.Size())
+		}
+
+		p.Send(m)
+	}
+}
+
+func (t *Transport) Stop() {
+	for _, p := range t.peers {
+		p.Stop()
+	}
+	if tr, ok := t.RoundTripper.(*http.Transport); ok {
+		tr.CloseIdleConnections()
+	}
 }
 }
 
 
-func (t *Transport) Handler() http.Handler { return t.handler }
+func (t *Transport) ShouldStopNotify() <-chan struct{} {
+	return t.shouldstop
+}
+
+func (t *Transport) AddPeer(id types.ID, urls []string) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	if _, ok := t.peers[id]; ok {
+		return
+	}
+	// TODO: considering how to switch between all available peer urls
+	peerURL := urls[0]
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
+	fs := t.LeaderStats.Follower(id.String())
+	t.peers[id] = NewPeer(t.RoundTripper, u.String(), id, t.ClusterID,
+		t.Raft, fs, t.shouldstop)
+}
+
+func (t *Transport) RemovePeer(id types.ID) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.peers[id].Stop()
+	delete(t.peers, id)
+}
+
+func (t *Transport) UpdatePeer(id types.ID, urls []string) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	// TODO: return error or just panic?
+	if _, ok := t.peers[id]; !ok {
+		return
+	}
+	peerURL := urls[0]
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
+	t.peers[id].Update(u.String())
+}
+
+// for testing
+func (t *Transport) Pause() {
+	for _, p := range t.peers {
+		p.Pause()
+	}
+}
+
+func (t *Transport) Resume() {
+	for _, p := range t.peers {
+		p.Resume()
+	}
+}

+ 27 - 19
rafthttp/sendhub_test.go → rafthttp/transport_test.go

@@ -27,50 +27,58 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
-func TestSendHubAdd(t *testing.T) {
+func TestTransportAdd(t *testing.T) {
 	ls := stats.NewLeaderStats("")
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, 0, nil, nil, ls)
-	h.AddPeer(1, []string{"http://a"})
+	tr := &Transport{
+		LeaderStats: ls,
+	}
+	tr.Start()
+	tr.AddPeer(1, []string{"http://a"})
 
 
 	if _, ok := ls.Followers["1"]; !ok {
 	if _, ok := ls.Followers["1"]; !ok {
 		t.Errorf("FollowerStats[1] is nil, want exists")
 		t.Errorf("FollowerStats[1] is nil, want exists")
 	}
 	}
-	s, ok := h.senders[types.ID(1)]
+	s, ok := tr.peers[types.ID(1)]
 	if !ok {
 	if !ok {
 		t.Fatalf("senders[1] is nil, want exists")
 		t.Fatalf("senders[1] is nil, want exists")
 	}
 	}
 
 
-	h.AddPeer(1, []string{"http://a"})
-	ns := h.senders[types.ID(1)]
+	// duplicate AddPeer is ignored
+	tr.AddPeer(1, []string{"http://a"})
+	ns := tr.peers[types.ID(1)]
 	if s != ns {
 	if s != ns {
 		t.Errorf("sender = %v, want %v", ns, s)
 		t.Errorf("sender = %v, want %v", ns, s)
 	}
 	}
 }
 }
 
 
-func TestSendHubRemove(t *testing.T) {
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, 0, nil, nil, ls)
-	h.AddPeer(1, []string{"http://a"})
-	h.RemovePeer(types.ID(1))
+func TestTransportRemove(t *testing.T) {
+	tr := &Transport{
+		LeaderStats: stats.NewLeaderStats(""),
+	}
+	tr.Start()
+	tr.AddPeer(1, []string{"http://a"})
+	tr.RemovePeer(types.ID(1))
 
 
-	if _, ok := h.senders[types.ID(1)]; ok {
+	if _, ok := tr.peers[types.ID(1)]; ok {
 		t.Fatalf("senders[1] exists, want removed")
 		t.Fatalf("senders[1] exists, want removed")
 	}
 	}
 }
 }
 
 
-func TestSendHubShouldStop(t *testing.T) {
-	tr := newRespRoundTripper(http.StatusForbidden, nil)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(tr, 0, nil, nil, ls)
-	h.AddPeer(1, []string{"http://a"})
+func TestTransportShouldStop(t *testing.T) {
+	tr := &Transport{
+		RoundTripper: newRespRoundTripper(http.StatusForbidden, nil),
+		LeaderStats:  stats.NewLeaderStats(""),
+	}
+	tr.Start()
+	tr.AddPeer(1, []string{"http://a"})
 
 
-	shouldstop := h.ShouldStopNotify()
+	shouldstop := tr.ShouldStopNotify()
 	select {
 	select {
 	case <-shouldstop:
 	case <-shouldstop:
 		t.Fatalf("received unexpected shouldstop notification")
 		t.Fatalf("received unexpected shouldstop notification")
 	case <-time.After(10 * time.Millisecond):
 	case <-time.After(10 * time.Millisecond):
 	}
 	}
-	h.senders[1].Send(raftpb.Message{})
+	tr.peers[1].Send(raftpb.Message{})
 
 
 	testutil.ForceGosched()
 	testutil.ForceGosched()
 	select {
 	select {