Browse Source

rafthttp: add streaming server and client

Yicheng Qin 11 years ago
parent
commit
9d53b94546

+ 3 - 2
etcdserver/etcdhttp/peer.go

@@ -26,20 +26,21 @@ import (
 )
 
 const (
-	raftPrefix        = "/raft"
 	peerMembersPrefix = "/members"
 )
 
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 	rh := rafthttp.NewHandler(server, server.Cluster.ID())
+	rsh := rafthttp.NewStreamHandler(server.SenderFinder(), server.ID(), server.Cluster.ID())
 	mh := &peerMembersHandler{
 		clusterInfo: server.Cluster,
 	}
 
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", http.NotFound)
-	mux.Handle(raftPrefix, rh)
+	mux.Handle(rafthttp.RaftPrefix, rh)
+	mux.Handle(rafthttp.RaftStreamPrefix+"/", rsh)
 	mux.Handle(peerMembersPrefix, mh)
 	return mux
 }

+ 6 - 2
etcdserver/sendhub.go

@@ -35,6 +35,7 @@ const (
 type sendHub struct {
 	tr         http.RoundTripper
 	cl         ClusterInfo
+	p          rafthttp.Processor
 	ss         *stats.ServerStats
 	ls         *stats.LeaderStats
 	senders    map[types.ID]rafthttp.Sender
@@ -44,10 +45,11 @@ type sendHub struct {
 // newSendHub creates the default send hub used to transport raft messages
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
-func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
 	h := &sendHub{
 		tr:         t,
 		cl:         cl,
+		p:          p,
 		ss:         ss,
 		ls:         ls,
 		senders:    make(map[types.ID]rafthttp.Sender),
@@ -59,6 +61,8 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *
 	return h
 }
 
+func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
+
 func (h *sendHub) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		to := types.ID(m.To)
@@ -100,7 +104,7 @@ func (h *sendHub) Add(m *Member) {
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
 	fs := h.ls.Follower(m.ID.String())
-	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop)
+	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop)
 	h.senders[m.ID] = s
 }
 

+ 4 - 4
etcdserver/sendhub_test.go

@@ -35,7 +35,7 @@ func TestSendHubInitSenders(t *testing.T) {
 	}
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 
 	ids := cl.MemberIDs()
 	if len(h.senders) != len(ids) {
@@ -51,7 +51,7 @@ func TestSendHubInitSenders(t *testing.T) {
 func TestSendHubAdd(t *testing.T) {
 	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 	m := newTestMember(1, []string{"http://a"}, "", nil)
 	h.Add(m)
 
@@ -76,7 +76,7 @@ func TestSendHubRemove(t *testing.T) {
 	}
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 	h.Remove(types.ID(1))
 
 	if _, ok := h.senders[types.ID(1)]; ok {
@@ -91,7 +91,7 @@ func TestSendHubShouldStop(t *testing.T) {
 	tr := newRespRoundTripper(http.StatusForbidden, nil)
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(tr, cl, nil, ls)
+	h := newSendHub(tr, cl, nil, nil, ls)
 
 	shouldstop := h.ShouldStopNotify()
 	select {

+ 13 - 10
etcdserver/server.go

@@ -41,6 +41,7 @@ import (
 	"github.com/coreos/etcd/pkg/wait"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
@@ -85,7 +86,8 @@ type Response struct {
 	err     error
 }
 
-type Sender interface {
+type SendHub interface {
+	rafthttp.SenderFinder
 	Send(m []raftpb.Message)
 	Add(m *Member)
 	Remove(id types.ID)
@@ -172,7 +174,7 @@ type EtcdServer struct {
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If send is nil, server will
 	// panic.
-	sender Sender
+	sendhub SendHub
 
 	storage Storage
 
@@ -268,7 +270,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 	lstats := stats.NewLeaderStats(id.String())
 
-	shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
 	s := &EtcdServer{
 		store:      st,
 		node:       n,
@@ -281,11 +282,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		}{w, ss},
 		stats:      sstats,
 		lstats:     lstats,
-		sender:     shub,
 		Ticker:     time.Tick(100 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
 		snapCount:  cfg.SnapCount,
 	}
+	s.sendhub = newSendHub(cfg.Transport, cfg.Cluster, s, sstats, lstats)
 	return s, nil
 }
 
@@ -316,6 +317,8 @@ func (s *EtcdServer) start() {
 
 func (s *EtcdServer) ID() types.ID { return s.id }
 
+func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }
+
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
 		log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
@@ -333,11 +336,11 @@ func (s *EtcdServer) run() {
 	var snapi, appliedi uint64
 	var nodes []uint64
 	var shouldstop bool
-	shouldstopC := s.sender.ShouldStopNotify()
+	shouldstopC := s.sendhub.ShouldStopNotify()
 
 	defer func() {
 		s.node.Stop()
-		s.sender.Stop()
+		s.sendhub.Stop()
 		close(s.done)
 	}()
 	for {
@@ -361,7 +364,7 @@ func (s *EtcdServer) run() {
 			if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
 				log.Fatalf("etcdserver: create snapshot error: %v", err)
 			}
-			s.sender.Send(rd.Messages)
+			s.sendhub.Send(rd.Messages)
 
 			// recover from snapshot if it is more updated than current applied
 			if rd.Snapshot.Index > appliedi {
@@ -726,7 +729,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 		if m.ID == s.id {
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sender.Add(m)
+			s.sendhub.Add(m)
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeRemoveNode:
@@ -737,7 +740,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 			log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 			return true, nil
 		} else {
-			s.sender.Remove(id)
+			s.sendhub.Remove(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeUpdateNode:
@@ -752,7 +755,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 		if m.ID == s.id {
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sender.Update(m)
+			s.sendhub.Update(m)
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	}

+ 22 - 19
etcdserver/server_test.go

@@ -36,6 +36,7 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/store"
 )
 
@@ -501,7 +502,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 		id:      1,
 		node:    &nodeRecorder{},
 		Cluster: cl,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 	}
 	cc := raftpb.ConfChange{
 		Type:   raftpb.ConfChangeRemoveNode,
@@ -534,6 +535,7 @@ type fakeSender struct {
 	ss []*EtcdServer
 }
 
+func (s *fakeSender) Sender(id types.ID) rafthttp.Sender { return nil }
 func (s *fakeSender) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
@@ -567,7 +569,7 @@ func testServer(t *testing.T, ns uint64) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			sender:  &fakeSender{ss},
+			sendhub: &fakeSender{ss},
 			storage: &storageRecorder{},
 			Ticker:  tk.C,
 			Cluster: cl,
@@ -636,7 +638,7 @@ func TestDoProposal(t *testing.T) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			sender:  &nopSender{},
+			sendhub: &nopSender{},
 			storage: &storageRecorder{},
 			Ticker:  tk,
 			Cluster: cl,
@@ -721,7 +723,7 @@ func TestDoProposalStopped(t *testing.T) {
 		// TODO: use fake node for better testability
 		node:    n,
 		store:   st,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		Ticker:  tk,
 		Cluster: cl,
@@ -832,7 +834,7 @@ func TestSyncTrigger(t *testing.T) {
 	srv := &EtcdServer{
 		node:       n,
 		store:      &storeRecorder{},
-		sender:     &nopSender{},
+		sendhub:    &nopSender{},
 		storage:    &storageRecorder{},
 		SyncTicker: st,
 	}
@@ -906,7 +908,7 @@ func TestTriggerSnap(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:     st,
-		sender:    &nopSender{},
+		sendhub:   &nopSender{},
 		storage:   p,
 		node:      n,
 		snapCount: 10,
@@ -942,7 +944,7 @@ func TestRecvSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:   st,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: p,
 		node:    n,
 		Cluster: cl,
@@ -974,7 +976,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:   st,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		node:    n,
 		Cluster: cl,
@@ -1005,7 +1007,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:   st,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		node:    n,
 		Cluster: cl,
@@ -1049,7 +1051,7 @@ func TestAddMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -1086,7 +1088,7 @@ func TestRemoveMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -1122,7 +1124,7 @@ func TestUpdateMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -1191,7 +1193,7 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		node:    &nodeRecorder{},
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		Cluster: &Cluster{},
 		w:       &waitRecorder{},
 		done:    make(chan struct{}),
@@ -1593,12 +1595,13 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
 type nopSender struct{}
 
-func (s *nopSender) Send(m []raftpb.Message)           {}
-func (s *nopSender) Add(m *Member)                     {}
-func (s *nopSender) Remove(id types.ID)                {}
-func (s *nopSender) Update(m *Member)                  {}
-func (s *nopSender) Stop()                             {}
-func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil }
+func (s *nopSender) Sender(id types.ID) rafthttp.Sender { return nil }
+func (s *nopSender) Send(m []raftpb.Message)            {}
+func (s *nopSender) Add(m *Member)                      {}
+func (s *nopSender) Remove(id types.ID)                 {}
+func (s *nopSender) Update(m *Member)                   {}
+func (s *nopSender) Stop()                              {}
+func (s *nopSender) ShouldStopNotify() <-chan struct{}  { return nil }
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))

+ 3 - 6
integration/cluster_test.go

@@ -100,11 +100,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
 }
 
 func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
-func TestDecreaseClusterSizeOf5(t *testing.T) {
-	t.Skip("enable after reducing the election collision rate")
-	// election collision rate is too high when enabling --race
-	testDecreaseClusterSize(t, 5)
-}
+func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
 
 func testDecreaseClusterSize(t *testing.T, size int) {
 	defer afterTest(t)
@@ -112,7 +108,8 @@ func testDecreaseClusterSize(t *testing.T, size int) {
 	c.Launch(t)
 	defer c.Terminate(t)
 
-	for i := 0; i < size-1; i++ {
+	// TODO: remove the last but one member
+	for i := 0; i < size-2; i++ {
 		id := c.Members[len(c.Members)-1].s.ID()
 		c.RemoveMember(t, uint64(id))
 		c.waitLeader(t)

+ 54 - 0
rafthttp/entry_reader.go

@@ -0,0 +1,54 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"io"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type entryReader struct {
+	r io.Reader
+}
+
+func (er *entryReader) readEntries() ([]raftpb.Entry, error) {
+	var l uint64
+	if err := binary.Read(er.r, binary.BigEndian, &l); err != nil {
+		return nil, err
+	}
+	ents := make([]raftpb.Entry, int(l))
+	for i := 0; i < int(l); i++ {
+		if err := er.readEntry(&ents[i]); err != nil {
+			return nil, err
+		}
+	}
+	return ents, nil
+}
+
+func (er *entryReader) readEntry(ent *raftpb.Entry) error {
+	var l uint64
+	if err := binary.Read(er.r, binary.BigEndian, &l); err != nil {
+		return err
+	}
+	buf := make([]byte, int(l))
+	if _, err := io.ReadFull(er.r, buf); err != nil {
+		return err
+	}
+	return ent.Unmarshal(buf)
+}

+ 63 - 0
rafthttp/entry_test.go

@@ -0,0 +1,63 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func TestEntsWriteAndRead(t *testing.T) {
+	tests := [][]raftpb.Entry{
+		{
+			{},
+		},
+		{
+			{Term: 1, Index: 1},
+		},
+		{
+			{Term: 1, Index: 1},
+			{Term: 1, Index: 2},
+			{Term: 1, Index: 3},
+		},
+		{
+			{Term: 1, Index: 1, Data: []byte("some data")},
+			{Term: 1, Index: 2, Data: []byte("some data")},
+			{Term: 1, Index: 3, Data: []byte("some data")},
+		},
+	}
+	for i, tt := range tests {
+		b := &bytes.Buffer{}
+		ew := &entryWriter{w: b}
+		if err := ew.writeEntries(tt); err != nil {
+			t.Errorf("#%d: unexpected write ents error: %v", i, err)
+			continue
+		}
+		er := &entryReader{r: b}
+		ents, err := er.readEntries()
+		if err != nil {
+			t.Errorf("#%d: unexpected read ents error: %v", i, err)
+			continue
+		}
+		if !reflect.DeepEqual(ents, tt) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt)
+		}
+	}
+}

+ 54 - 0
rafthttp/entry_writer.go

@@ -0,0 +1,54 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"io"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type entryWriter struct {
+	w io.Writer
+}
+
+func (ew *entryWriter) writeEntries(ents []raftpb.Entry) error {
+	l := len(ents)
+	if err := binary.Write(ew.w, binary.BigEndian, uint64(l)); err != nil {
+		return err
+	}
+	for i := 0; i < l; i++ {
+		if err := ew.writeEntry(&ents[i]); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (ew *entryWriter) writeEntry(ent *raftpb.Entry) error {
+	size := ent.Size()
+	if err := binary.Write(ew.w, binary.BigEndian, uint64(size)); err != nil {
+		return err
+	}
+	b, err := ent.Marshal()
+	if err != nil {
+		return err
+	}
+	_, err = ew.w.Write(b)
+	return err
+}

+ 85 - 0
rafthttp/http.go

@@ -20,6 +20,9 @@ import (
 	"io/ioutil"
 	"log"
 	"net/http"
+	"path"
+	"strconv"
+	"strings"
 
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -27,10 +30,20 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 )
 
+var (
+	RaftPrefix       = "/raft"
+	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
+)
+
 type Processor interface {
 	Process(ctx context.Context, m raftpb.Message) error
 }
 
+type SenderFinder interface {
+	// Sender returns the sender of the given id.
+	Sender(id types.ID) Sender
+}
+
 func NewHandler(p Processor, cid types.ID) http.Handler {
 	return &handler{
 		p:   p,
@@ -38,6 +51,16 @@ func NewHandler(p Processor, cid types.ID) http.Handler {
 	}
 }
 
+// NewStreamHandler returns a handler which initiates streamer when receiving
+// stream request from follower.
+func NewStreamHandler(finder SenderFinder, id, cid types.ID) http.Handler {
+	return &streamHandler{
+		finder: finder,
+		id:     id,
+		cid:    cid,
+	}
+}
+
 type handler struct {
 	p   Processor
 	cid types.ID
@@ -85,6 +108,68 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusNoContent)
 }
 
+type streamHandler struct {
+	finder SenderFinder
+	id     types.ID
+	cid    types.ID
+}
+
+func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "GET" {
+		w.Header().Set("Allow", "GET")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
+	from, err := types.IDFromString(fromStr)
+	if err != nil {
+		log.Printf("rafthttp: path %s cannot be parsed", fromStr)
+		http.Error(w, "invalid path", http.StatusNotFound)
+		return
+	}
+	s := h.finder.Sender(from)
+	if s == nil {
+		log.Printf("rafthttp: fail to find sender %s", from)
+		http.Error(w, "error sender not found", http.StatusNotFound)
+		return
+	}
+
+	wcid := h.cid.String()
+	if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
+		log.Printf("rafthttp: streaming request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
+		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	wto := h.id.String()
+	if gto := r.Header.Get("X-Raft-To"); gto != wto {
+		log.Printf("rafthttp: streaming request ignored due to ID mismatch got %s want %s", gto, wto)
+		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	termStr := r.Header.Get("X-Raft-Term")
+	term, err := strconv.ParseUint(termStr, 10, 64)
+	if err != nil {
+		log.Printf("rafthttp: streaming request ignored due to parse term %s error: %v", termStr, err)
+		http.Error(w, "invalid term field", http.StatusBadRequest)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+	w.(http.Flusher).Flush()
+
+	done, err := s.StartStreaming(w.(WriteFlusher), from, term)
+	if err != nil {
+		log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
+		// TODO: consider http status and info here
+		http.Error(w, "error enable streaming", http.StatusInternalServerError)
+		return
+	}
+	<-done
+}
+
 type writerToResponse interface {
 	WriteTo(w http.ResponseWriter)
 }

+ 92 - 5
rafthttp/sender.go

@@ -36,6 +36,9 @@ const (
 )
 
 type Sender interface {
+	// StartStreaming enables streaming in the sender using the given writer,
+	// which provides a fast and effecient way to send appendEntry messages.
+	StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
 	Update(u string)
 	// Send sends the data to the remote node. It is always non-blocking.
 	// It may be fail to send data if it returns nil error.
@@ -45,14 +48,15 @@ type Sender interface {
 	Stop()
 }
 
-func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
+func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
 	s := &sender{
 		tr:         tr,
 		u:          u,
 		cid:        cid,
+		p:          p,
 		fs:         fs,
-		q:          make(chan []byte, senderBufSize),
 		shouldstop: shouldstop,
+		q:          make(chan []byte, senderBufSize),
 	}
 	s.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
@@ -65,11 +69,32 @@ type sender struct {
 	tr         http.RoundTripper
 	u          string
 	cid        types.ID
+	p          Processor
 	fs         *stats.FollowerStats
-	q          chan []byte
-	mu         sync.RWMutex
-	wg         sync.WaitGroup
 	shouldstop chan struct{}
+
+	strmCln   *streamClient
+	strmSrv   *streamServer
+	strmSrvMu sync.Mutex
+	q         chan []byte
+
+	mu sync.RWMutex
+	wg sync.WaitGroup
+}
+
+func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv != nil {
+		// ignore lower-term streaming request
+		if term < s.strmSrv.term {
+			return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
+		}
+		// stop the existing one
+		s.strmSrv.stop()
+	}
+	s.strmSrv = startStreamServer(w, to, term, s.fs)
+	return s.strmSrv.stopNotify(), nil
 }
 
 func (s *sender) Update(u string) {
@@ -80,6 +105,15 @@ func (s *sender) Update(u string) {
 
 // TODO (xiangli): reasonable retry logic
 func (s *sender) Send(m raftpb.Message) error {
+	s.maybeStopStream(m.Term)
+	if !s.hasStreamClient() && shouldInitStream(m) {
+		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
+	}
+	if canUseStream(m) {
+		if ok := s.tryStream(m); ok {
+			return nil
+		}
+	}
 	// TODO: don't block. we should be able to have 1000s
 	// of messages out at a time.
 	data := pbutil.MustMarshal(&m)
@@ -95,6 +129,59 @@ func (s *sender) Send(m raftpb.Message) error {
 func (s *sender) Stop() {
 	close(s.q)
 	s.wg.Wait()
+	s.strmSrvMu.Lock()
+	if s.strmSrv != nil {
+		s.strmSrv.stop()
+	}
+	s.strmSrvMu.Unlock()
+	if s.strmCln != nil {
+		s.strmCln.stop()
+	}
+}
+
+func (s *sender) maybeStopStream(term uint64) {
+	if s.strmCln != nil && term > s.strmCln.term {
+		s.strmCln.stop()
+		s.strmCln = nil
+	}
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv != nil && term > s.strmSrv.term {
+		s.strmSrv.stop()
+		s.strmSrv = nil
+	}
+}
+
+func (s *sender) hasStreamClient() bool {
+	return s.strmCln != nil && !s.strmCln.isStopped()
+}
+
+func (s *sender) initStream(from, to types.ID, term uint64) {
+	strmCln := newStreamClient(from, to, term, s.p)
+	s.mu.Lock()
+	u := s.u
+	s.mu.Unlock()
+	if err := strmCln.start(s.tr, u, s.cid); err != nil {
+		log.Printf("rafthttp: start stream client error: %v", err)
+		return
+	}
+	s.strmCln = strmCln
+	log.Printf("rafthttp: start stream client with %s in term %d", to, term)
+}
+
+func (s *sender) tryStream(m raftpb.Message) bool {
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv == nil || m.Term != s.strmSrv.term {
+		return false
+	}
+	if err := s.strmSrv.send(m.Entries); err != nil {
+		log.Printf("rafthttp: send stream message error: %v", err)
+		s.strmSrv.stop()
+		s.strmSrv = nil
+		return false
+	}
+	return true
 }
 
 func (s *sender) handle() {

+ 6 - 6
rafthttp/sender_test.go

@@ -34,7 +34,7 @@ import (
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
@@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -86,7 +86,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
-	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
@@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) {
 
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil)
 	if err := s.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{})
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
 		err := s.post([]byte("some data"))
 		s.Stop()
 
@@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{}, 1)
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
 		s.post([]byte("some data"))
 		s.Stop()
 		select {

+ 207 - 0
rafthttp/streamer.go

@@ -0,0 +1,207 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"net/http"
+	"net/url"
+	"path"
+	"strconv"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+const (
+	streamBufSize = 1024
+)
+
+type WriteFlusher interface {
+	io.Writer
+	http.Flusher
+}
+
+type streamServer struct {
+	to   types.ID
+	term uint64
+	fs   *stats.FollowerStats
+	q    chan []raftpb.Entry
+	done chan struct{}
+}
+
+func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
+	s := &streamServer{
+		to:   to,
+		term: term,
+		fs:   fs,
+		q:    make(chan []raftpb.Entry, streamBufSize),
+		done: make(chan struct{}),
+	}
+	go s.handle(w)
+	return s
+}
+
+func (s *streamServer) send(ents []raftpb.Entry) error {
+	select {
+	case <-s.done:
+		return fmt.Errorf("stopped")
+	default:
+	}
+	select {
+	case s.q <- ents:
+		return nil
+	default:
+		log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
+		return fmt.Errorf("reach maximal serving")
+	}
+}
+
+func (s *streamServer) stop() {
+	close(s.q)
+	<-s.done
+}
+
+func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
+
+func (s *streamServer) handle(w WriteFlusher) {
+	defer close(s.done)
+
+	ew := &entryWriter{w: w}
+	for ents := range s.q {
+		start := time.Now()
+		if err := ew.writeEntries(ents); err != nil {
+			log.Printf("rafthttp: write ents error: %v", err)
+			return
+		}
+		w.Flush()
+		s.fs.Succ(time.Since(start))
+	}
+}
+
+type streamClient struct {
+	id   types.ID
+	to   types.ID
+	term uint64
+	p    Processor
+
+	closer io.Closer
+	done   chan struct{}
+}
+
+func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
+	return &streamClient{
+		id:   id,
+		to:   to,
+		term: term,
+		p:    p,
+		done: make(chan struct{}),
+	}
+}
+
+// Dial dials to the remote url, and sends streaming request. If it succeeds,
+// it returns nil error, and the caller should call Handle function to keep
+// receiving appendEntry messages.
+func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
+	uu, err := url.Parse(u)
+	if err != nil {
+		return fmt.Errorf("parse url %s error: %v", u, err)
+	}
+	uu.Path = path.Join(RaftStreamPrefix, s.id.String())
+	req, err := http.NewRequest("GET", uu.String(), nil)
+	if err != nil {
+		return fmt.Errorf("new request to %s error: %v", u, err)
+	}
+	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
+	req.Header.Set("X-Raft-To", s.to.String())
+	req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
+	resp, err := tr.RoundTrip(req)
+	if err != nil {
+		return fmt.Errorf("error posting to %q: %v", u, err)
+	}
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return fmt.Errorf("unhandled http status %d", resp.StatusCode)
+	}
+	s.closer = resp.Body
+	go s.handle(resp.Body)
+	return nil
+}
+
+func (s *streamClient) stop() {
+	s.closer.Close()
+	<-s.done
+}
+
+func (s *streamClient) isStopped() bool {
+	select {
+	case <-s.done:
+		return true
+	default:
+		return false
+	}
+}
+
+func (s *streamClient) handle(r io.Reader) {
+	defer close(s.done)
+
+	er := &entryReader{r: r}
+	for {
+		ents, err := er.readEntries()
+		if err != nil {
+			if err != io.EOF {
+				log.Printf("rafthttp: read ents error: %v", err)
+			}
+			return
+		}
+		// Considering Commit in MsgApp is not recovered, zero-entry appendEntry
+		// messages have no use to raft state machine. Drop it here because
+		// we don't have easy way to recover its Index easily.
+		if len(ents) == 0 {
+			continue
+		}
+		// The commit index field in appendEntry message is not recovered.
+		// The follower updates its commit index through heartbeat.
+		msg := raftpb.Message{
+			Type:    raftpb.MsgApp,
+			From:    uint64(s.to),
+			To:      uint64(s.id),
+			Term:    s.term,
+			LogTerm: s.term,
+			Index:   ents[0].Index - 1,
+			Entries: ents,
+		}
+		if err := s.p.Process(context.TODO(), msg); err != nil {
+			log.Printf("rafthttp: process raft message error: %v", err)
+			return
+		}
+	}
+}
+
+func shouldInitStream(m raftpb.Message) bool {
+	return m.Type == raftpb.MsgAppResp && m.Reject == false
+}
+
+func canUseStream(m raftpb.Message) bool {
+	return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
+}