Преглед на файлове

Merge pull request #1802 from yichengq/235

rafthttp: set the API boundary of the package
Yicheng Qin преди 11 години
родител
ревизия
e1ee335c3a
променени са 7 файла, в които са добавени 167 реда и са изтрити 132 реда
  1. 1 4
      etcdserver/etcdhttp/peer.go
  2. 36 18
      etcdserver/server.go
  3. 45 38
      etcdserver/server_test.go
  4. 0 4
      rafthttp/http.go
  5. 26 35
      rafthttp/sendhub.go
  6. 10 33
      rafthttp/sendhub_test.go
  7. 49 0
      rafthttp/transport.go

+ 1 - 4
etcdserver/etcdhttp/peer.go

@@ -31,16 +31,13 @@ const (
 
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
-	rh := rafthttp.NewHandler(server, server.Cluster.ID())
-	rsh := rafthttp.NewStreamHandler(server.SenderFinder(), server.ID(), server.Cluster.ID())
 	mh := &peerMembersHandler{
 		clusterInfo: server.Cluster,
 	}
 
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", http.NotFound)
-	mux.Handle(rafthttp.RaftPrefix, rh)
-	mux.Handle(rafthttp.RaftStreamPrefix+"/", rsh)
+	mux.Handle(rafthttp.RaftPrefix, server.RaftHandler())
 	mux.Handle(peerMembersPrefix, mh)
 	return mux
 }

+ 36 - 18
etcdserver/server.go

@@ -144,11 +144,11 @@ type EtcdServer struct {
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 
-	// sender specifies the sender to send msgs to members. sending msgs
-	// MUST NOT block. It is okay to drop messages, since clients should
-	// timeout and reissue their messages.  If send is nil, server will
-	// panic.
-	sendhub SendHub
+	// transport specifies the transport to send and receive msgs to members.
+	// Sending messages MUST NOT block. It is okay to drop messages, since
+	// clients should timeout and reissue their messages.
+	// If transport is nil, server will panic.
+	transport rafthttp.Transporter
 
 	Ticker     <-chan time.Time
 	SyncTicker <-chan time.Time
@@ -271,13 +271,22 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		SyncTicker:  time.Tick(500 * time.Millisecond),
 		snapCount:   cfg.SnapCount,
 	}
-	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
+	tr := &rafthttp.Transport{
+		RoundTripper: cfg.Transport,
+		ID:           id,
+		ClusterID:    cfg.Cluster.ID(),
+		Processor:    srv,
+		ServerStats:  sstats,
+		LeaderStats:  lstats,
+	}
+	tr.Start()
 	// add all the remote members into sendhub
 	for _, m := range cfg.Cluster.Members() {
 		if m.Name != cfg.Name {
-			srv.sendhub.Add(m)
+			tr.AddPeer(m.ID, m.PeerURLs)
 		}
 	}
+	srv.transport = tr
 	return srv, nil
 }
 
@@ -327,7 +336,7 @@ func (s *EtcdServer) purgeFile() {
 
 func (s *EtcdServer) ID() types.ID { return s.id }
 
-func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }
+func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() }
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
@@ -343,7 +352,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	var shouldstop bool
-	shouldstopC := s.sendhub.ShouldStopNotify()
+	shouldstopC := s.transport.ShouldStopNotify()
 
 	// load initial state from raft storage
 	snap, err := s.raftStorage.Snapshot()
@@ -357,7 +366,7 @@ func (s *EtcdServer) run() {
 
 	defer func() {
 		s.node.Stop()
-		s.sendhub.Stop()
+		s.transport.Stop()
 		if err := s.storage.Close(); err != nil {
 			log.Panicf("etcdserver: close storage error: %v", err)
 		}
@@ -397,7 +406,7 @@ func (s *EtcdServer) run() {
 			}
 			s.raftStorage.Append(rd.Entries)
 
-			s.sendhub.Send(rd.Messages)
+			s.send(rd.Messages)
 
 			// recover from snapshot if it is more updated than current applied
 			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
@@ -663,6 +672,15 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 }
 
+func (s *EtcdServer) send(ms []raftpb.Message) {
+	for _, m := range ms {
+		if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
+			m.To = 0
+		}
+	}
+	s.transport.Send(ms)
+}
+
 // apply takes entries received from Raft (after it has been committed) and
 // applies them to the current state of the EtcdServer.
 // The given entries should not be empty.
@@ -764,7 +782,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if m.ID == s.id {
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sendhub.Add(m)
+			s.transport.AddPeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeRemoveNode:
@@ -775,7 +793,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 			log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 			return true, nil
 		} else {
-			s.sendhub.Remove(id)
+			s.transport.RemovePeer(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeUpdateNode:
@@ -790,7 +808,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if m.ID == s.id {
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sendhub.Update(m)
+			s.transport.UpdatePeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	}
@@ -831,13 +849,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 
 // for testing
 func (s *EtcdServer) PauseSending() {
-	hub := s.sendhub.(*sendHub)
-	hub.pause()
+	hub := s.transport.(*rafthttp.Transport)
+	hub.Pause()
 }
 
 func (s *EtcdServer) ResumeSending() {
-	hub := s.sendhub.(*sendHub)
-	hub.resume()
+	hub := s.transport.(*rafthttp.Transport)
+	hub.Resume()
 }
 
 func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {

+ 45 - 38
etcdserver/server_test.go

@@ -22,6 +22,7 @@ import (
 	"io/ioutil"
 	"log"
 	"math/rand"
+	"net/http"
 	"path"
 	"reflect"
 	"strconv"
@@ -499,10 +500,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 		cl.AddMember(&Member{ID: types.ID(i)})
 	}
 	srv := &EtcdServer{
-		id:      1,
-		node:    &nodeRecorder{},
-		Cluster: cl,
-		sendhub: &nopSender{},
+		id:        1,
+		node:      &nodeRecorder{},
+		Cluster:   cl,
+		transport: &nopTransporter{},
 	}
 	cc := raftpb.ConfChange{
 		Type:   raftpb.ConfChangeRemoveNode,
@@ -531,21 +532,24 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 
-type fakeSender struct {
+type fakeTransporter struct {
 	ss []*EtcdServer
 }
 
-func (s *fakeSender) Sender(id types.ID) rafthttp.Sender { return nil }
-func (s *fakeSender) Send(msgs []raftpb.Message) {
+func (s *fakeTransporter) Handler() http.Handler              { return nil }
+func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
+func (s *fakeTransporter) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
 	}
 }
-func (s *fakeSender) Add(m *Member)                     {}
-func (s *fakeSender) Update(m *Member)                  {}
-func (s *fakeSender) Remove(id types.ID)                {}
-func (s *fakeSender) Stop()                             {}
-func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil }
+func (s *fakeTransporter) AddPeer(id types.ID, us []string)    {}
+func (s *fakeTransporter) UpdatePeer(id types.ID, us []string) {}
+func (s *fakeTransporter) RemovePeer(id types.ID)              {}
+func (s *fakeTransporter) Stop()                               {}
+func (s *fakeTransporter) ShouldStopNotify() <-chan struct{}   { return nil }
+func (s *fakeTransporter) Pause()                              {}
+func (s *fakeTransporter) Resume()                             {}
 
 func testServer(t *testing.T, ns uint64) {
 	ctx, cancel := context.WithCancel(context.Background())
@@ -571,7 +575,7 @@ func testServer(t *testing.T, ns uint64) {
 			node:        n,
 			raftStorage: s,
 			store:       st,
-			sendhub:     &fakeSender{ss},
+			transport:   &fakeTransporter{ss},
 			storage:     &storageRecorder{},
 			Ticker:      tk.C,
 			Cluster:     cl,
@@ -646,7 +650,7 @@ func TestDoProposal(t *testing.T) {
 			node:        n,
 			raftStorage: s,
 			store:       st,
-			sendhub:     &nopSender{},
+			transport:   &nopTransporter{},
 			storage:     &storageRecorder{},
 			Ticker:      tk,
 			Cluster:     cl,
@@ -735,7 +739,7 @@ func TestDoProposalStopped(t *testing.T) {
 		node:        n,
 		raftStorage: s,
 		store:       st,
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		Ticker:      tk,
 		Cluster:     cl,
@@ -847,7 +851,7 @@ func TestSyncTrigger(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		SyncTicker:  st,
 	}
@@ -933,7 +937,7 @@ func TestTriggerSnap(t *testing.T) {
 	cl.SetStore(store.New())
 	srv := &EtcdServer{
 		store:       st,
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     p,
 		node:        n,
 		raftStorage: s,
@@ -973,7 +977,7 @@ func TestRecvSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:       st,
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     p,
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
@@ -1006,7 +1010,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:       st,
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
@@ -1039,7 +1043,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	storage := raft.NewMemoryStorage()
 	s := &EtcdServer{
 		store:       st,
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		node:        n,
 		raftStorage: storage,
@@ -1082,7 +1086,7 @@ func TestAddMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1117,7 +1121,7 @@ func TestRemoveMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1151,7 +1155,7 @@ func TestUpdateMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sendhub:     &nopSender{},
+		transport:   &nopTransporter{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1219,12 +1223,12 @@ func TestPublish(t *testing.T) {
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
-		node:    &nodeRecorder{},
-		sendhub: &nopSender{},
-		Cluster: &Cluster{},
-		w:       &waitRecorder{},
-		done:    make(chan struct{}),
-		stop:    make(chan struct{}),
+		node:      &nodeRecorder{},
+		transport: &nopTransporter{},
+		Cluster:   &Cluster{},
+		w:         &waitRecorder{},
+		done:      make(chan struct{}),
+		stop:      make(chan struct{}),
 	}
 	close(srv.done)
 	srv.publish(time.Hour)
@@ -1625,15 +1629,18 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
 }
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
-type nopSender struct{}
-
-func (s *nopSender) Sender(id types.ID) rafthttp.Sender { return nil }
-func (s *nopSender) Send(m []raftpb.Message)            {}
-func (s *nopSender) Add(m *Member)                      {}
-func (s *nopSender) Remove(id types.ID)                 {}
-func (s *nopSender) Update(m *Member)                   {}
-func (s *nopSender) Stop()                              {}
-func (s *nopSender) ShouldStopNotify() <-chan struct{}  { return nil }
+type nopTransporter struct{}
+
+func (s *nopTransporter) Handler() http.Handler               { return nil }
+func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender  { return nil }
+func (s *nopTransporter) Send(m []raftpb.Message)             {}
+func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
+func (s *nopTransporter) RemovePeer(id types.ID)              {}
+func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
+func (s *nopTransporter) Stop()                               {}
+func (s *nopTransporter) ShouldStopNotify() <-chan struct{}   { return nil }
+func (s *nopTransporter) Pause()                              {}
+func (s *nopTransporter) Resume()                             {}
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))

+ 0 - 4
rafthttp/http.go

@@ -40,10 +40,6 @@ var (
 	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 )
 
-type Processor interface {
-	Process(ctx context.Context, m raftpb.Message) error
-}
-
 type SenderFinder interface {
 	// Sender returns the sender of the given id.
 	Sender(id types.ID) Sender

+ 26 - 35
etcdserver/sendhub.go → rafthttp/sendhub.go

@@ -14,7 +14,7 @@
    limitations under the License.
 */
 
-package etcdserver
+package rafthttp
 
 import (
 	"log"
@@ -26,50 +26,39 @@ import (
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
-	"github.com/coreos/etcd/rafthttp"
 )
 
 const (
 	raftPrefix = "/raft"
 )
 
-type SendHub interface {
-	rafthttp.SenderFinder
-	Send(m []raftpb.Message)
-	Add(m *Member)
-	Remove(id types.ID)
-	Update(m *Member)
-	Stop()
-	ShouldStopNotify() <-chan struct{}
-}
-
 type sendHub struct {
 	tr         http.RoundTripper
-	cl         ClusterInfo
-	p          rafthttp.Processor
+	cid        types.ID
+	p          Processor
 	ss         *stats.ServerStats
 	ls         *stats.LeaderStats
 	mu         sync.RWMutex // protect the sender map
-	senders    map[types.ID]rafthttp.Sender
+	senders    map[types.ID]Sender
 	shouldstop chan struct{}
 }
 
 // newSendHub creates the default send hub used to transport raft messages
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
-func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
 	return &sendHub{
 		tr:         t,
-		cl:         cl,
+		cid:        cid,
 		p:          p,
 		ss:         ss,
 		ls:         ls,
-		senders:    make(map[types.ID]rafthttp.Sender),
+		senders:    make(map[types.ID]Sender),
 		shouldstop: make(chan struct{}, 1),
 	}
 }
 
-func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
+func (h *sendHub) Sender(id types.ID) Sender {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
 	return h.senders[id]
@@ -77,12 +66,14 @@ func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
 
 func (h *sendHub) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
+		// intentionally dropped message
+		if m.To == 0 {
+			continue
+		}
 		to := types.ID(m.To)
 		s, ok := h.senders[to]
 		if !ok {
-			if !h.cl.IsIDRemoved(to) {
-				log.Printf("etcdserver: send message to unknown receiver %s", to)
-			}
+			log.Printf("etcdserver: send message to unknown receiver %s", to)
 			continue
 		}
 
@@ -107,55 +98,55 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
 	return h.shouldstop
 }
 
-func (h *sendHub) Add(m *Member) {
+func (h *sendHub) AddPeer(id types.ID, urls []string) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
-	if _, ok := h.senders[m.ID]; ok {
+	if _, ok := h.senders[id]; ok {
 		return
 	}
 	// TODO: considering how to switch between all available peer urls
-	peerURL := m.PickPeerURL()
+	peerURL := urls[0]
 	u, err := url.Parse(peerURL)
 	if err != nil {
 		log.Panicf("unexpect peer url %s", peerURL)
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
-	fs := h.ls.Follower(m.ID.String())
-	s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop)
-	h.senders[m.ID] = s
+	fs := h.ls.Follower(id.String())
+	s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop)
+	h.senders[id] = s
 }
 
-func (h *sendHub) Remove(id types.ID) {
+func (h *sendHub) RemovePeer(id types.ID) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	h.senders[id].Stop()
 	delete(h.senders, id)
 }
 
-func (h *sendHub) Update(m *Member) {
+func (h *sendHub) UpdatePeer(id types.ID, urls []string) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	// TODO: return error or just panic?
-	if _, ok := h.senders[m.ID]; !ok {
+	if _, ok := h.senders[id]; !ok {
 		return
 	}
-	peerURL := m.PickPeerURL()
+	peerURL := urls[0]
 	u, err := url.Parse(peerURL)
 	if err != nil {
 		log.Panicf("unexpect peer url %s", peerURL)
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
-	h.senders[m.ID].Update(u.String())
+	h.senders[id].Update(u.String())
 }
 
 // for testing
-func (h *sendHub) pause() {
+func (h *sendHub) Pause() {
 	for _, s := range h.senders {
 		s.Pause()
 	}
 }
 
-func (h *sendHub) resume() {
+func (h *sendHub) Resume() {
 	for _, s := range h.senders {
 		s.Resume()
 	}

+ 10 - 33
etcdserver/sendhub_test.go → rafthttp/sendhub_test.go

@@ -14,7 +14,7 @@
    limitations under the License.
 */
 
-package etcdserver
+package rafthttp
 
 import (
 	"net/http"
@@ -28,11 +28,9 @@ import (
 )
 
 func TestSendHubAdd(t *testing.T) {
-	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, nil, ls)
-	m := newTestMember(1, []string{"http://a"}, "", nil)
-	h.Add(m)
+	h := newSendHub(nil, 0, nil, nil, ls)
+	h.AddPeer(1, []string{"http://a"})
 
 	if _, ok := ls.Followers["1"]; !ok {
 		t.Errorf("FollowerStats[1] is nil, want exists")
@@ -42,20 +40,18 @@ func TestSendHubAdd(t *testing.T) {
 		t.Fatalf("senders[1] is nil, want exists")
 	}
 
-	h.Add(m)
+	h.AddPeer(1, []string{"http://a"})
 	ns := h.senders[types.ID(1)]
 	if s != ns {
-		t.Errorf("sender = %p, want %p", ns, s)
+		t.Errorf("sender = %v, want %v", ns, s)
 	}
 }
 
 func TestSendHubRemove(t *testing.T) {
-	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, nil, ls)
-	m := newTestMember(1, []string{"http://a"}, "", nil)
-	h.Add(m)
-	h.Remove(types.ID(1))
+	h := newSendHub(nil, 0, nil, nil, ls)
+	h.AddPeer(1, []string{"http://a"})
+	h.RemovePeer(types.ID(1))
 
 	if _, ok := h.senders[types.ID(1)]; ok {
 		t.Fatalf("senders[1] exists, want removed")
@@ -64,11 +60,9 @@ func TestSendHubRemove(t *testing.T) {
 
 func TestSendHubShouldStop(t *testing.T) {
 	tr := newRespRoundTripper(http.StatusForbidden, nil)
-	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(tr, cl, nil, nil, ls)
-	m := newTestMember(1, []string{"http://a"}, "", nil)
-	h.Add(m)
+	h := newSendHub(tr, 0, nil, nil, ls)
+	h.AddPeer(1, []string{"http://a"})
 
 	shouldstop := h.ShouldStopNotify()
 	select {
@@ -85,20 +79,3 @@ func TestSendHubShouldStop(t *testing.T) {
 		t.Fatalf("cannot receive stop notification")
 	}
 }
-
-type respRoundTripper struct {
-	code int
-	err  error
-}
-
-func newRespRoundTripper(code int, err error) *respRoundTripper {
-	return &respRoundTripper{code: code, err: err}
-}
-func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
-	return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
-}
-
-type nopReadCloser struct{}
-
-func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
-func (n *nopReadCloser) Close() error               { return nil }

+ 49 - 0
rafthttp/transport.go

@@ -0,0 +1,49 @@
+package rafthttp
+
+import (
+	"net/http"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+type Processor interface {
+	Process(ctx context.Context, m raftpb.Message) error
+}
+
+type Transporter interface {
+	Handler() http.Handler
+	Send(m []raftpb.Message)
+	AddPeer(id types.ID, urls []string)
+	RemovePeer(id types.ID)
+	UpdatePeer(id types.ID, urls []string)
+	Stop()
+	ShouldStopNotify() <-chan struct{}
+}
+
+type Transport struct {
+	RoundTripper http.RoundTripper
+	ID           types.ID
+	ClusterID    types.ID
+	Processor    Processor
+	ServerStats  *stats.ServerStats
+	LeaderStats  *stats.LeaderStats
+
+	*sendHub
+	handler http.Handler
+}
+
+func (t *Transport) Start() {
+	t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats)
+	h := NewHandler(t.Processor, t.ClusterID)
+	sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID)
+	mux := http.NewServeMux()
+	mux.Handle(RaftPrefix, h)
+	mux.Handle(RaftStreamPrefix+"/", sh)
+	t.handler = mux
+}
+
+func (t *Transport) Handler() http.Handler { return t.handler }