Browse Source

rafthttp: send takes raft message instead of bytes

This gives streaming mechanism the chance to assemble and disassemble
raft messages.
Yicheng Qin 11 years ago
parent
commit
1a72143ecb
4 changed files with 17 additions and 17 deletions
  1. 2 9
      etcdserver/sendhub.go
  2. 2 1
      etcdserver/sendhub_test.go
  3. 7 2
      rafthttp/sender.go
  4. 6 5
      rafthttp/sender_test.go

+ 2 - 9
etcdserver/sendhub.go

@@ -70,18 +70,11 @@ func (h *sendHub) Send(msgs []raftpb.Message) {
 			continue
 		}
 
-		// TODO: don't block. we should be able to have 1000s
-		// of messages out at a time.
-		data, err := m.Marshal()
-		if err != nil {
-			log.Println("sender: dropping message:", err)
-			return // drop bad message
-		}
 		if m.Type == raftpb.MsgApp {
-			h.ss.SendAppendReq(len(data))
+			h.ss.SendAppendReq(m.Size())
 		}
 
-		s.Send(data)
+		s.Send(m)
 	}
 }
 

+ 2 - 1
etcdserver/sendhub_test.go

@@ -24,6 +24,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 )
 
 func TestSendHubInitSenders(t *testing.T) {
@@ -98,7 +99,7 @@ func TestSendHubShouldStop(t *testing.T) {
 		t.Fatalf("received unexpected shouldstop notification")
 	case <-time.After(10 * time.Millisecond):
 	}
-	h.senders[1].Send([]byte("somedata"))
+	h.senders[1].Send(raftpb.Message{})
 
 	testutil.ForceGosched()
 	select {

+ 7 - 2
rafthttp/sender.go

@@ -25,7 +25,9 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 )
 
 const (
@@ -37,7 +39,7 @@ type Sender interface {
 	Update(u string)
 	// Send sends the data to the remote node. It is always non-blocking.
 	// It may be fail to send data if it returns nil error.
-	Send(data []byte) error
+	Send(m raftpb.Message) error
 	// Stop performs any necessary finalization and terminates the Sender
 	// elegantly.
 	Stop()
@@ -77,7 +79,10 @@ func (s *sender) Update(u string) {
 }
 
 // TODO (xiangli): reasonable retry logic
-func (s *sender) Send(data []byte) error {
+func (s *sender) Send(m raftpb.Message) error {
+	// TODO: don't block. we should be able to have 1000s
+	// of messages out at a time.
+	data := pbutil.MustMarshal(&m)
 	select {
 	case s.q <- data:
 		return nil

+ 6 - 5
rafthttp/sender_test.go

@@ -26,6 +26,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 )
 
 // TestSenderSend tests that send func could post data using roundtripper
@@ -35,7 +36,7 @@ func TestSenderSend(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
 
-	if err := s.Send([]byte("some data")); err != nil {
+	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 	}
 	s.Stop()
@@ -58,7 +59,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
 	for i := 0; i < connPerSender+senderBufSize; i++ {
-		if err := s.Send([]byte("some data")); err != nil {
+		if err := s.Send(raftpb.Message{}); err != nil {
 			t.Errorf("send err = %v, want nil", err)
 		}
 		// force the sender to grab data
@@ -66,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	}
 
 	// try to send a data when we are sure the buffer is full
-	if err := s.Send([]byte("some data")); err == nil {
+	if err := s.Send(raftpb.Message{}); err == nil {
 		t.Errorf("unexpect send success")
 	}
 
@@ -75,7 +76,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	testutil.ForceGosched()
 
 	// It could send new data after previous ones succeed
-	if err := s.Send([]byte("some data")); err != nil {
+	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Errorf("send err = %v, want nil", err)
 	}
 	s.Stop()
@@ -87,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
 
-	if err := s.Send([]byte("some data")); err != nil {
+	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
 	}
 	s.Stop()