Browse Source

rafthttp: add stream http tests

Yicheng Qin 10 years ago
parent
commit
399e3cdf81
4 changed files with 214 additions and 11 deletions
  1. 12 8
      rafthttp/http.go
  2. 183 0
      rafthttp/http_test.go
  3. 18 2
      rafthttp/peer.go
  4. 1 1
      rafthttp/transport.go

+ 12 - 8
rafthttp/http.go

@@ -42,11 +42,15 @@ func NewHandler(r Raft, cid types.ID) http.Handler {
 	}
 }
 
-func newStreamHandler(tr *transport, id, cid types.ID) http.Handler {
+type peerGetter interface {
+	Get(id types.ID) Peer
+}
+
+func newStreamHandler(peerGetter peerGetter, id, cid types.ID) http.Handler {
 	return &streamHandler{
-		tr:  tr,
-		id:  id,
-		cid: cid,
+		peerGetter: peerGetter,
+		id:         id,
+		cid:        cid,
 	}
 }
 
@@ -107,9 +111,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 
 type streamHandler struct {
-	tr  *transport
-	id  types.ID
-	cid types.ID
+	peerGetter peerGetter
+	id         types.ID
+	cid        types.ID
 }
 
 func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -141,7 +145,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		http.Error(w, "invalid from", http.StatusNotFound)
 		return
 	}
-	p := h.tr.Peer(from)
+	p := h.peerGetter.Get(from)
 	if p == nil {
 		log.Printf("rafthttp: fail to find sender %s", from)
 		http.Error(w, "error sender not found", http.StatusNotFound)

+ 183 - 0
rafthttp/http_test.go

@@ -22,6 +22,7 @@ import (
 	"net/http/httptest"
 	"strings"
 	"testing"
+	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -155,6 +156,165 @@ func TestServeRaftPrefix(t *testing.T) {
 	}
 }
 
+func TestServeRaftStreamPrefix(t *testing.T) {
+	tests := []struct {
+		path  string
+		wtype streamType
+	}{
+		{
+			RaftStreamPrefix + "/message/1",
+			streamTypeMessage,
+		},
+		{
+			RaftStreamPrefix + "/msgapp/1",
+			streamTypeMsgApp,
+		},
+		// backward compatibility
+		{
+			RaftStreamPrefix + "/1",
+			streamTypeMsgApp,
+		},
+	}
+	for i, tt := range tests {
+		req, err := http.NewRequest("GET", "http://localhost:7001"+tt.path, nil)
+		if err != nil {
+			t.Fatalf("#%d: could not create request: %#v", i, err)
+		}
+		req.Header.Set("X-Etcd-Cluster-ID", "1")
+		req.Header.Set("X-Raft-To", "2")
+		wterm := "1"
+		req.Header.Set("X-Raft-Term", wterm)
+
+		peer := newFakePeer()
+		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
+		h := newStreamHandler(peerGetter, types.ID(2), types.ID(1))
+
+		rw := httptest.NewRecorder()
+		go h.ServeHTTP(rw, req)
+
+		var conn *outgoingConn
+		select {
+		case conn = <-peer.connc:
+		case <-time.After(time.Second):
+			t.Fatalf("#%d: failed to attach outgoingConn", i)
+		}
+		if conn.t != tt.wtype {
+			t.Errorf("$%d: type = %s, want %s", i, conn.t, tt.wtype)
+		}
+		if conn.termStr != wterm {
+			t.Errorf("$%d: term = %s, want %s", i, conn.termStr, wterm)
+		}
+		conn.Close()
+	}
+}
+
+func TestServeRaftStreamPrefixBad(t *testing.T) {
+	tests := []struct {
+		method    string
+		path      string
+		clusterID string
+		remote    string
+
+		wcode int
+	}{
+		// bad method
+		{
+			"PUT",
+			RaftStreamPrefix + "/message/1",
+			"1",
+			"1",
+			http.StatusMethodNotAllowed,
+		},
+		// bad method
+		{
+			"POST",
+			RaftStreamPrefix + "/message/1",
+			"1",
+			"1",
+			http.StatusMethodNotAllowed,
+		},
+		// bad method
+		{
+			"DELETE",
+			RaftStreamPrefix + "/message/1",
+			"1",
+			"1",
+			http.StatusMethodNotAllowed,
+		},
+		// bad path
+		{
+			"GET",
+			RaftStreamPrefix + "/strange/1",
+			"1",
+			"1",
+			http.StatusNotFound,
+		},
+		// bad path
+		{
+			"GET",
+			RaftStreamPrefix + "/strange",
+			"1",
+			"1",
+			http.StatusNotFound,
+		},
+		// non-existant peer
+		{
+			"GET",
+			RaftStreamPrefix + "/message/2",
+			"1",
+			"1",
+			http.StatusNotFound,
+		},
+		// wrong cluster ID
+		{
+			"GET",
+			RaftStreamPrefix + "/message/1",
+			"2",
+			"1",
+			http.StatusPreconditionFailed,
+		},
+		// wrong remote id
+		{
+			"GET",
+			RaftStreamPrefix + "/message/1",
+			"1",
+			"2",
+			http.StatusPreconditionFailed,
+		},
+	}
+	for i, tt := range tests {
+		req, err := http.NewRequest(tt.method, "http://localhost:7001"+tt.path, nil)
+		if err != nil {
+			t.Fatalf("#%d: could not create request: %#v", i, err)
+		}
+		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
+		req.Header.Set("X-Raft-To", tt.remote)
+		rw := httptest.NewRecorder()
+		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
+		h := newStreamHandler(peerGetter, types.ID(1), types.ID(1))
+		h.ServeHTTP(rw, req)
+
+		if rw.Code != tt.wcode {
+			t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
+		}
+	}
+}
+
+func TestCloseNotifier(t *testing.T) {
+	c := newCloseNotifier()
+	select {
+	case <-c.closeNotify():
+		t.Fatalf("received unexpected close notification")
+	default:
+	}
+	c.Close()
+	select {
+	case <-c.closeNotify():
+	default:
+		t.Fatalf("failed to get close notification")
+	}
+}
+
 // errReader implements io.Reader to facilitate a broken request.
 type errReader struct{}
 
@@ -180,3 +340,26 @@ type resWriterToError struct {
 
 func (e *resWriterToError) Error() string                 { return "" }
 func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
+
+type fakePeerGetter struct {
+	peers map[types.ID]Peer
+}
+
+func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
+
+type fakePeer struct {
+	msgs  []raftpb.Message
+	u     string
+	connc chan *outgoingConn
+}
+
+func newFakePeer() *fakePeer {
+	return &fakePeer{
+		connc: make(chan *outgoingConn, 1),
+	}
+}
+
+func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
+func (pr *fakePeer) Update(u string)                       { pr.u = u }
+func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
+func (pr *fakePeer) Stop()                                 {}

+ 18 - 2
rafthttp/peer.go

@@ -33,6 +33,24 @@ const (
 	recvBufSize = 4096
 )
 
+type Peer interface {
+	// Send sends the message to the remote peer. The function is non-blocking
+	// and has no promise that the message will be received by the remote.
+	// When it fails to send message out, it will report the status to underlying
+	// raft.
+	Send(m raftpb.Message)
+	// Update updates the urls of remote peer.
+	Update(u string)
+	// attachOutgoingConn attachs the outgoing connection to the peer for
+	// stream usage. After the call, the ownership of the outgoing
+	// connection hands over to the peer. The peer will close the connection
+	// when it is no longer used.
+	attachOutgoingConn(conn *outgoingConn)
+	// Stop performs any necessary finalization and terminates the peer
+	// elegantly.
+	Stop()
+}
+
 // peer is the representative of a remote raft node. Local raft node sends
 // messages to the remote through peer.
 // Each peer has two underlying mechanisms to send out a message: stream and
@@ -171,8 +189,6 @@ func (p *peer) Resume() {
 	}
 }
 
-// Stop performs any necessary finalization and terminates the peer
-// elegantly.
 func (p *peer) Stop() {
 	close(p.stopc)
 	<-p.done

+ 1 - 1
rafthttp/transport.go

@@ -79,7 +79,7 @@ func (t *transport) Handler() http.Handler {
 	return mux
 }
 
-func (t *transport) Peer(id types.ID) *peer {
+func (t *transport) Get(id types.ID) Peer {
 	t.mu.RLock()
 	defer t.mu.RUnlock()
 	return t.peers[id]