Browse Source

Merge pull request #2620 from yichengq/new-rafthttp-msgapp

rafthttp: introduce msgappv2 stream format
Yicheng Qin 10 years ago
parent
commit
c777516a5d

+ 4 - 4
rafthttp/http.go

@@ -125,13 +125,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	var t streamType
 	switch path.Dir(r.URL.Path) {
-	case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
-		t = streamTypeMsgApp
-	case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
-		t = streamTypeMessage
 	// backward compatibility
 	case RaftStreamPrefix:
 		t = streamTypeMsgApp
+	case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
+		t = streamTypeMsgAppV2
+	case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
+		t = streamTypeMessage
 	default:
 		log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path)
 		http.Error(w, "invalid path", http.StatusNotFound)

+ 1 - 1
rafthttp/http_test.go

@@ -165,7 +165,7 @@ func TestServeRaftStreamPrefix(t *testing.T) {
 		},
 		{
 			RaftStreamPrefix + "/msgapp/1",
-			streamTypeMsgApp,
+			streamTypeMsgAppV2,
 		},
 		// backward compatibility
 		{

+ 192 - 0
rafthttp/msgappv2.go

@@ -0,0 +1,192 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+const (
+	msgTypeLinkHeartbeat uint8 = 0
+	msgTypeAppEntries    uint8 = 1
+	msgTypeApp           uint8 = 2
+)
+
+// msgappv2 stream sends three types of message: linkHeartbeatMessage,
+// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
+// replicate state in raft, whose index and term are fully predicatable.
+//
+// Data format of linkHeartbeatMessage:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x00        |
+//
+// Data format of AppEntries:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x01        |
+// | 1      | 8     | length of entries |
+// | 9      | 8     | length of first entry |
+// | 17     | n1    | first entry |
+// ...
+// | x      | 8     | length of k-th entry data |
+// | x+8    | nk    | k-th entry data |
+// | x+8+nk | 8     | commit index |
+//
+// Data format of MsgApp:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x01        |
+// | 1      | 8     | length of encoded message |
+// | 9      | n     | encoded message |
+type msgAppV2Encoder struct {
+	w  io.Writer
+	fs *stats.FollowerStats
+
+	term  uint64
+	index uint64
+}
+
+func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
+	start := time.Now()
+	switch {
+	case isLinkHeartbeatMessage(m):
+		return binary.Write(enc.w, binary.BigEndian, msgTypeLinkHeartbeat)
+	case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
+		if err := binary.Write(enc.w, binary.BigEndian, msgTypeAppEntries); err != nil {
+			return err
+		}
+		// write length of entries
+		l := len(m.Entries)
+		if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil {
+			return err
+		}
+		for i := 0; i < l; i++ {
+			size := m.Entries[i].Size()
+			if err := binary.Write(enc.w, binary.BigEndian, uint64(size)); err != nil {
+				return err
+			}
+			if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
+				return err
+			}
+			enc.index++
+		}
+		// write commit index
+		if err := binary.Write(enc.w, binary.BigEndian, m.Commit); err != nil {
+			return err
+		}
+	default:
+		if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
+			return err
+		}
+		// write size of message
+		if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
+			return err
+		}
+		// write message
+		if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
+			return err
+		}
+
+		enc.term = m.Term
+		enc.index = m.Index
+		if l := len(m.Entries); l > 0 {
+			enc.index = m.Entries[l-1].Index
+		}
+	}
+	enc.fs.Succ(time.Since(start))
+	return nil
+}
+
+type msgAppV2Decoder struct {
+	r             io.Reader
+	local, remote types.ID
+
+	term  uint64
+	index uint64
+}
+
+func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
+	var (
+		m   raftpb.Message
+		typ uint8
+	)
+	if err := binary.Read(dec.r, binary.BigEndian, &typ); err != nil {
+		return m, err
+	}
+	switch typ {
+	case msgTypeLinkHeartbeat:
+		return linkHeartbeatMessage, nil
+	case msgTypeAppEntries:
+		m = raftpb.Message{
+			Type:    raftpb.MsgApp,
+			From:    uint64(dec.remote),
+			To:      uint64(dec.local),
+			Term:    dec.term,
+			LogTerm: dec.term,
+			Index:   dec.index,
+		}
+
+		// decode entries
+		var l uint64
+		if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
+			return m, err
+		}
+		m.Entries = make([]raftpb.Entry, int(l))
+		for i := 0; i < int(l); i++ {
+			var size uint64
+			if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
+				return m, err
+			}
+			buf := make([]byte, int(size))
+			if _, err := io.ReadFull(dec.r, buf); err != nil {
+				return m, err
+			}
+			dec.index++
+			pbutil.MustUnmarshal(&m.Entries[i], buf)
+		}
+		// decode commit index
+		if err := binary.Read(dec.r, binary.BigEndian, &m.Commit); err != nil {
+			return m, err
+		}
+	case msgTypeApp:
+		var size uint64
+		if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
+			return m, err
+		}
+		buf := make([]byte, int(size))
+		if _, err := io.ReadFull(dec.r, buf); err != nil {
+			return m, err
+		}
+		pbutil.MustUnmarshal(&m, buf)
+
+		dec.term = m.Term
+		dec.index = m.Index
+		if l := len(m.Entries); l > 0 {
+			dec.index = m.Entries[l-1].Index
+		}
+	default:
+		return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
+	}
+	return m, nil
+}

+ 123 - 0
rafthttp/msgappv2_test.go

@@ -0,0 +1,123 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func TestMsgAppV2(t *testing.T) {
+	tests := []raftpb.Message{
+		linkHeartbeatMessage,
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    1,
+			LogTerm: 1,
+			Index:   0,
+			Entries: []raftpb.Entry{
+				{Term: 1, Index: 1, Data: []byte("some data")},
+				{Term: 1, Index: 2, Data: []byte("some data")},
+				{Term: 1, Index: 3, Data: []byte("some data")},
+			},
+		},
+		// consecutive MsgApp
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    1,
+			LogTerm: 1,
+			Index:   3,
+			Entries: []raftpb.Entry{
+				{Term: 1, Index: 4, Data: []byte("some data")},
+			},
+		},
+		linkHeartbeatMessage,
+		// consecutive MsgApp after linkHeartbeatMessage
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    1,
+			LogTerm: 1,
+			Index:   4,
+			Entries: []raftpb.Entry{
+				{Term: 1, Index: 5, Data: []byte("some data")},
+			},
+		},
+		// MsgApp with higher term
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    3,
+			LogTerm: 1,
+			Index:   5,
+			Entries: []raftpb.Entry{
+				{Term: 3, Index: 6, Data: []byte("some data")},
+			},
+		},
+		linkHeartbeatMessage,
+		// consecutive MsgApp
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    3,
+			LogTerm: 2,
+			Index:   6,
+			Entries: []raftpb.Entry{
+				{Term: 3, Index: 7, Data: []byte("some data")},
+			},
+		},
+		// consecutive empty MsgApp
+		{
+			Type:    raftpb.MsgApp,
+			From:    1,
+			To:      2,
+			Term:    3,
+			LogTerm: 2,
+			Index:   7,
+			Entries: nil,
+		},
+		linkHeartbeatMessage,
+	}
+	b := &bytes.Buffer{}
+	enc := &msgAppV2Encoder{w: b, fs: &stats.FollowerStats{}}
+	dec := &msgAppV2Decoder{r: b, local: types.ID(2), remote: types.ID(1)}
+
+	for i, tt := range tests {
+		if err := enc.encode(tt); err != nil {
+			t.Errorf("#%d: unexpected encode message error: %v", i, err)
+			continue
+		}
+		m, err := dec.decode()
+		if err != nil {
+			t.Errorf("#%d: unexpected decode message error: %v", i, err)
+			continue
+		}
+		if !reflect.DeepEqual(m, tt) {
+			t.Errorf("#%d: message = %+v, want %+v", i, m, tt)
+		}
+	}
+}

+ 4 - 2
rafthttp/peer.go

@@ -48,6 +48,7 @@ const (
 	maxPendingProposals = 4096
 
 	streamApp   = "streamMsgApp"
+	streamAppV2 = "streamMsgAppV2"
 	streamMsg   = "streamMsg"
 	pipelineMsg = "pipeline"
 )
@@ -55,6 +56,7 @@ const (
 var (
 	bufSizeMap = map[string]int{
 		streamApp:   streamBufSize,
+		streamAppV2: streamBufSize,
 		streamMsg:   streamBufSize,
 		pipelineMsg: pipelineBufSize,
 	}
@@ -147,7 +149,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 
 	go func() {
 		var paused bool
-		msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc)
+		msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc)
 		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc)
 		for {
 			select {
@@ -212,7 +214,7 @@ func (p *peer) Update(urls types.URLs) {
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	var ok bool
 	switch conn.t {
-	case streamTypeMsgApp:
+	case streamTypeMsgApp, streamTypeMsgAppV2:
 		ok = p.msgAppWriter.attach(conn)
 	case streamTypeMessage:
 		ok = p.writer.attach(conn)

+ 24 - 13
rafthttp/stream.go

@@ -30,15 +30,30 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-type streamType string
-
 const (
-	streamTypeMessage streamType = "message"
-	streamTypeMsgApp  streamType = "msgapp"
+	streamTypeMessage  streamType = "message"
+	streamTypeMsgAppV2 streamType = "msgappv2"
+	streamTypeMsgApp   streamType = "msgapp"
 
 	streamBufSize = 4096
 )
 
+type streamType string
+
+func (t streamType) endpoint() string {
+	switch t {
+	case streamTypeMsgApp: // for backward compatibility of v2.0
+		return RaftStreamPrefix
+	case streamTypeMsgAppV2:
+		return path.Join(RaftStreamPrefix, "msgapp")
+	case streamTypeMessage:
+		return path.Join(RaftStreamPrefix, "message")
+	default:
+		log.Panicf("rafthttp: unhandled stream type %v", t)
+		return ""
+	}
+}
+
 var (
 	// linkHeartbeatMessage is a special message used as heartbeat message in
 	// link layer. It never conflicts with messages from raft because raft
@@ -146,6 +161,8 @@ func (cw *streamWriter) run() {
 					log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err)
 				}
 				enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
+			case streamTypeMsgAppV2:
+				enc = &msgAppV2Encoder{w: conn.Writer, fs: cw.fs}
 			case streamTypeMessage:
 				enc = &messageEncoder{w: conn.Writer}
 			default:
@@ -263,6 +280,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
 	switch cr.t {
 	case streamTypeMsgApp:
 		dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
+	case streamTypeMsgAppV2:
+		dec = &msgAppV2Decoder{r: rc, local: cr.from, remote: cr.to}
 	case streamTypeMessage:
 		dec = &messageDecoder{r: rc}
 	default:
@@ -329,15 +348,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
 	cr.mu.Unlock()
 
 	uu := u
-	switch cr.t {
-	case streamTypeMsgApp:
-		// for backward compatibility of v2.0
-		uu.Path = path.Join(RaftStreamPrefix, cr.from.String())
-	case streamTypeMessage:
-		uu.Path = path.Join(RaftStreamPrefix, string(streamTypeMessage), cr.from.String())
-	default:
-		log.Panicf("rafthttp: unhandled stream type %v", cr.t)
-	}
+	uu.Path = path.Join(cr.t.endpoint(), cr.from.String())
 	req, err := http.NewRequest("GET", uu.String(), nil)
 	if err != nil {
 		cr.picker.unreachable(u)

+ 9 - 8
rafthttp/stream_test.go

@@ -2,6 +2,7 @@ package rafthttp
 
 import (
 	"errors"
+	"fmt"
 	"net/http"
 	"net/http/httptest"
 	"reflect"
@@ -81,7 +82,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
 }
 
 func TestStreamReaderDialRequest(t *testing.T) {
-	for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage} {
+	for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} {
 		tr := &roundTripperRecorder{}
 		sr := &streamReader{
 			tr:         tr,
@@ -95,13 +96,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
 		sr.dial()
 
 		req := tr.Request()
-		var wurl string
-		switch tt {
-		case streamTypeMsgApp:
-			wurl = "http://localhost:7001/raft/stream/1"
-		case streamTypeMessage:
-			wurl = "http://localhost:7001/raft/stream/message/1"
-		}
+		wurl := fmt.Sprintf("http://localhost:7001" + tt.endpoint() + "/1")
 		if req.URL.String() != wurl {
 			t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
 		}
@@ -191,6 +186,12 @@ func TestStream(t *testing.T) {
 			msgapp,
 			recvc,
 		},
+		{
+			streamTypeMsgAppV2,
+			0,
+			msgapp,
+			recvc,
+		},
 	}
 	for i, tt := range tests {
 		h := &fakeStreamHandler{t: tt.t}

+ 29 - 8
rafthttp/transport_bench_test.go

@@ -29,16 +29,24 @@ import (
 )
 
 func BenchmarkSendingMsgApp(b *testing.B) {
-	r := &countRaft{}
-	ss := &stats.ServerStats{}
-	ss.Initialize()
-	tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), r, nil, ss, stats.NewLeaderStats("1"))
+	// member 1
+	tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), &fakeRaft{}, nil, newServerStats(), stats.NewLeaderStats("1"))
 	srv := httptest.NewServer(tr.Handler())
 	defer srv.Close()
-	tr.AddPeer(types.ID(1), []string{srv.URL})
+
+	// member 2
+	r := &countRaft{}
+	tr2 := NewTransporter(&http.Transport{}, types.ID(2), types.ID(1), r, nil, newServerStats(), stats.NewLeaderStats("2"))
+	srv2 := httptest.NewServer(tr2.Handler())
+	defer srv2.Close()
+
+	tr.AddPeer(types.ID(2), []string{srv2.URL})
 	defer tr.Stop()
-	// wait for underlying stream created
-	time.Sleep(time.Second)
+	tr2.AddPeer(types.ID(1), []string{srv.URL})
+	defer tr2.Stop()
+	if !waitStreamWorking(tr.(*transport).Get(types.ID(2)).(*peer)) {
+		b.Fatalf("stream from 1 to 2 is not in work as expected")
+	}
 
 	b.ReportAllocs()
 	b.SetBytes(64)
@@ -46,7 +54,20 @@ func BenchmarkSendingMsgApp(b *testing.B) {
 	b.ResetTimer()
 	data := make([]byte, 64)
 	for i := 0; i < b.N; i++ {
-		tr.Send([]raftpb.Message{{Type: raftpb.MsgApp, To: 1, Entries: []raftpb.Entry{{Data: data}}}})
+		tr.Send([]raftpb.Message{
+			{
+				Type:  raftpb.MsgApp,
+				From:  1,
+				To:    2,
+				Index: uint64(i),
+				Entries: []raftpb.Entry{
+					{
+						Index: uint64(i + 1),
+						Data:  data,
+					},
+				},
+			},
+		})
 	}
 	// wait until all messages are received by the target raft
 	for r.count() != b.N {