Browse Source

Merge pull request #3704 from xiang90/rafthttp

clean up rafthttp pkg: round1
Xiang Li 10 years ago
parent
commit
5372e11727

+ 40 - 29
rafthttp/http.go

@@ -29,7 +29,13 @@ import (
 )
 
 const (
-	ConnReadLimitByte = 64 * 1024
+	// connReadLimitByte limits the number of bytes
+	// a single read can read out.
+	//
+	// 64KB should be large enough for not causing
+	// throughput bottleneck as well as small enough
+	// for not causing a read timeout.
+	connReadLimitByte = 64 * 1024
 )
 
 var (
@@ -42,44 +48,32 @@ var (
 	errClusterIDMismatch   = errors.New("cluster ID mismatch")
 )
 
-func NewHandler(r Raft, cid types.ID) http.Handler {
-	return &handler{
-		r:   r,
-		cid: cid,
-	}
-}
-
-func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
-	return &snapshotHandler{
-		r:         r,
-		snapSaver: snapSaver,
-		cid:       cid,
-	}
-}
-
 type peerGetter interface {
 	Get(id types.ID) Peer
 }
 
-func newStreamHandler(peerGetter peerGetter, r Raft, id, cid types.ID) http.Handler {
-	return &streamHandler{
-		peerGetter: peerGetter,
-		r:          r,
-		id:         id,
-		cid:        cid,
-	}
-}
-
 type writerToResponse interface {
 	WriteTo(w http.ResponseWriter)
 }
 
-type handler struct {
+type pipelineHandler struct {
 	r   Raft
 	cid types.ID
 }
 
-func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+// newPipelineHandler returns a handler for handling raft messages
+// from pipeline for RaftPrefix.
+//
+// The handler reads out the raft message from request body,
+// and forwards it to the given raft state machine for processing.
+func newPipelineHandler(r Raft, cid types.ID) http.Handler {
+	return &pipelineHandler{
+		r:   r,
+		cid: cid,
+	}
+}
+
+func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "POST" {
 		w.Header().Set("Allow", "POST")
 		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
@@ -94,8 +88,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 
 	// Limit the data size that could be read from the request body, which ensures that read from
-	// connection will not time out accidentally due to possible block in underlying implementation.
-	limitedr := pioutil.NewLimitedBufferReader(r.Body, ConnReadLimitByte)
+	// connection will not time out accidentally due to possible blocking in underlying implementation.
+	limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
 	b, err := ioutil.ReadAll(limitedr)
 	if err != nil {
 		plog.Errorf("failed to read raft message (%v)", err)
@@ -129,6 +123,14 @@ type snapshotHandler struct {
 	cid       types.ID
 }
 
+func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
+	return &snapshotHandler{
+		r:         r,
+		snapSaver: snapSaver,
+		cid:       cid,
+	}
+}
+
 // ServeHTTP serves HTTP request to receive and process snapshot message.
 //
 // If request sender dies without closing underlying TCP connection,
@@ -200,6 +202,15 @@ type streamHandler struct {
 	cid        types.ID
 }
 
+func newStreamHandler(peerGetter peerGetter, r Raft, id, cid types.ID) http.Handler {
+	return &streamHandler{
+		peerGetter: peerGetter,
+		r:          r,
+		id:         id,
+		cid:        cid,
+	}
+}
+
 func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "GET" {
 		w.Header().Set("Allow", "GET")

+ 4 - 4
rafthttp/http_test.go

@@ -149,7 +149,7 @@ func TestServeRaftPrefix(t *testing.T) {
 		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
 		req.Header.Set("X-Server-Version", version.Version)
 		rw := httptest.NewRecorder()
-		h := NewHandler(tt.p, types.ID(0))
+		h := newPipelineHandler(tt.p, types.ID(0))
 		h.ServeHTTP(rw, req)
 		if rw.Code != tt.wcode {
 			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
@@ -362,9 +362,9 @@ func newFakePeer() *fakePeer {
 	}
 }
 
-func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
-func (pr *fakePeer) Update(urls types.URLs)                { pr.urls = urls }
+func (pr *fakePeer) send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
+func (pr *fakePeer) update(urls types.URLs)                { pr.urls = urls }
 func (pr *fakePeer) setTerm(term uint64)                   { pr.term = term }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
-func (pr *fakePeer) Stop()                                 {}
+func (pr *fakePeer) stop()                                 {}

+ 0 - 0
rafthttp/message.go → rafthttp/msg_codec.go


+ 0 - 0
rafthttp/message_test.go → rafthttp/msg_codec_test.go


+ 0 - 0
rafthttp/msgapp.go → rafthttp/msgapp_codec.go


+ 0 - 0
rafthttp/msgapp_test.go → rafthttp/msgapp_codec_test.go


+ 0 - 0
rafthttp/msgappv2.go → rafthttp/msgappv2_codec.go


+ 0 - 0
rafthttp/msgappv2_test.go → rafthttp/msgappv2_codec_test.go


+ 9 - 9
rafthttp/peer.go

@@ -53,13 +53,13 @@ const (
 )
 
 type Peer interface {
-	// Send sends the message to the remote peer. The function is non-blocking
+	// send sends the message to the remote peer. The function is non-blocking
 	// and has no promise that the message will be received by the remote.
 	// When it fails to send message out, it will report the status to underlying
 	// raft.
-	Send(m raftpb.Message)
-	// Update updates the urls of remote peer.
-	Update(urls types.URLs)
+	send(m raftpb.Message)
+	// update updates the urls of remote peer.
+	update(urls types.URLs)
 	// setTerm sets the term of ongoing communication.
 	setTerm(term uint64)
 	// attachOutgoingConn attachs the outgoing connection to the peer for
@@ -70,9 +70,9 @@ type Peer interface {
 	// activeSince returns the time that the connection with the
 	// peer becomes active.
 	activeSince() time.Time
-	// Stop performs any necessary finalization and terminates the peer
+	// stop performs any necessary finalization and terminates the peer
 	// elegantly.
-	Stop()
+	stop()
 }
 
 // peer is the representative of a remote raft node. Local raft node sends
@@ -208,14 +208,14 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 	return p
 }
 
-func (p *peer) Send(m raftpb.Message) {
+func (p *peer) send(m raftpb.Message) {
 	select {
 	case p.sendc <- m:
 	case <-p.done:
 	}
 }
 
-func (p *peer) Update(urls types.URLs) {
+func (p *peer) update(urls types.URLs) {
 	select {
 	case p.newURLsC <- urls:
 	case <-p.done:
@@ -258,7 +258,7 @@ func (p *peer) Resume() {
 	}
 }
 
-func (p *peer) Stop() {
+func (p *peer) stop() {
 	close(p.stopc)
 	<-p.done
 }

+ 12 - 10
rafthttp/pipeline.go

@@ -96,8 +96,9 @@ func (p *pipeline) handle() {
 		end := time.Now()
 
 		if err != nil {
-			reportSentFailure(pipelineMsg, m)
 			p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
+
+			reportSentFailure(pipelineMsg, m)
 			if m.Type == raftpb.MsgApp && p.fs != nil {
 				p.fs.Fail()
 			}
@@ -105,16 +106,17 @@ func (p *pipeline) handle() {
 			if isMsgSnap(m) {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 			}
-		} else {
-			p.status.activate()
-			if m.Type == raftpb.MsgApp && p.fs != nil {
-				p.fs.Succ(end.Sub(start))
-			}
-			if isMsgSnap(m) {
-				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
-			}
-			reportSentDuration(pipelineMsg, m, time.Since(start))
+			return
+		}
+
+		p.status.activate()
+		if m.Type == raftpb.MsgApp && p.fs != nil {
+			p.fs.Succ(end.Sub(start))
+		}
+		if isMsgSnap(m) {
+			p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
 		}
+		reportSentDuration(pipelineMsg, m, time.Since(start))
 	}
 }
 

+ 2 - 2
rafthttp/remote.go

@@ -37,7 +37,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID,
 	}
 }
 
-func (g *remote) Send(m raftpb.Message) {
+func (g *remote) send(m raftpb.Message) {
 	select {
 	case g.pipeline.msgc <- m:
 	default:
@@ -49,6 +49,6 @@ func (g *remote) Send(m raftpb.Message) {
 	}
 }
 
-func (g *remote) Stop() {
+func (g *remote) stop() {
 	g.pipeline.stop()
 }

+ 45 - 0
rafthttp/snapshot_store.go

@@ -0,0 +1,45 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"io"
+)
+
+// snapshotStore is the store of snapshot. Caller could put one
+// snapshot into the store, and get it later.
+// snapshotStore stores at most one snapshot at a time, or it panics.
+type snapshotStore struct {
+	rc io.ReadCloser
+	// index of the stored snapshot
+	// index is 0 if and only if there is no snapshot stored.
+	index uint64
+}
+
+func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
+	if s.index != 0 {
+		plog.Panicf("unexpected put when there is one snapshot stored")
+	}
+	s.rc, s.index = rc, index
+}
+
+func (s *snapshotStore) get(index uint64) io.ReadCloser {
+	if s.index == index {
+		// set index to 0 to indicate no snapshot stored
+		s.index = 0
+		return s.rc
+	}
+	return nil
+}

+ 21 - 16
rafthttp/stream.go

@@ -346,27 +346,32 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 
 	for {
 		m, err := dec.decode()
-		switch {
-		case err != nil:
+		if err != nil {
 			cr.mu.Lock()
 			cr.close()
 			cr.mu.Unlock()
 			return err
-		case isLinkHeartbeatMessage(m):
-			// do nothing for linkHeartbeatMessage
+		}
+
+		if isLinkHeartbeatMessage(m) {
+			// raft is not interested in link layer
+			// heartbeat message, so we should ignore
+			// it.
+			continue
+		}
+
+		recvc := cr.recvc
+		if m.Type == raftpb.MsgProp {
+			recvc = cr.propc
+		}
+
+		select {
+		case recvc <- m:
 		default:
-			recvc := cr.recvc
-			if m.Type == raftpb.MsgProp {
-				recvc = cr.propc
-			}
-			select {
-			case recvc <- m:
-			default:
-				if cr.status.isActive() {
-					plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
-				} else {
-					plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
-				}
+			if cr.status.isActive() {
+				plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+			} else {
+				plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
 			}
 		}
 	}

+ 11 - 35
rafthttp/transport.go

@@ -152,7 +152,7 @@ func (t *Transport) Start() error {
 }
 
 func (t *Transport) Handler() http.Handler {
-	pipelineHandler := NewHandler(t.Raft, t.ClusterID)
+	pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID)
 	streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
 	snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID)
 	mux := http.NewServeMux()
@@ -183,13 +183,15 @@ func (t *Transport) maybeUpdatePeersTerm(term uint64) {
 
 func (t *Transport) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
-		// intentionally dropped message
 		if m.To == 0 {
+			// ignore intentionally dropped message
 			continue
 		}
 		to := types.ID(m.To)
 
-		if m.Type != raftpb.MsgProp { // proposal message does not have a valid term
+		// update terms for all the peers
+		// ignore MsgProp since it does not have a valid term
+		if m.Type != raftpb.MsgProp {
 			t.maybeUpdatePeersTerm(m.Term)
 		}
 
@@ -198,13 +200,13 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 			if m.Type == raftpb.MsgApp {
 				t.ServerStats.SendAppendReq(m.Size())
 			}
-			p.Send(m)
+			p.send(m)
 			continue
 		}
 
 		g, ok := t.remotes[to]
 		if ok {
-			g.Send(m)
+			g.send(m)
 			continue
 		}
 
@@ -214,10 +216,10 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 
 func (t *Transport) Stop() {
 	for _, r := range t.remotes {
-		r.Stop()
+		r.stop()
 	}
 	for _, p := range t.peers {
-		p.Stop()
+		p.stop()
 	}
 	t.prober.RemoveAll()
 	if tr, ok := t.streamRt.(*http.Transport); ok {
@@ -273,7 +275,7 @@ func (t *Transport) RemoveAllPeers() {
 // the caller of this function must have the peers mutex.
 func (t *Transport) removePeer(id types.ID) {
 	if peer, ok := t.peers[id]; ok {
-		peer.Stop()
+		peer.stop()
 	} else {
 		plog.Panicf("unexpected removal of unknown peer '%d'", id)
 	}
@@ -293,7 +295,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 	if err != nil {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
-	t.peers[id].Update(urls)
+	t.peers[id].update(urls)
 
 	t.prober.Remove(id.String())
 	addPeerToProber(t.prober, id.String(), us)
@@ -329,29 +331,3 @@ func (t *Transport) Resume() {
 		p.(Pausable).Resume()
 	}
 }
-
-// snapshotStore is the store of snapshot. Caller could put one
-// snapshot into the store, and get it later.
-// snapshotStore stores at most one snapshot at a time, or it panics.
-type snapshotStore struct {
-	rc io.ReadCloser
-	// index of the stored snapshot
-	// index is 0 if and only if there is no snapshot stored.
-	index uint64
-}
-
-func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
-	if s.index != 0 {
-		plog.Panicf("unexpected put when there is one snapshot stored")
-	}
-	s.rc, s.index = rc, index
-}
-
-func (s *snapshotStore) get(index uint64) io.ReadCloser {
-	if s.index == index {
-		// set index to 0 to indicate no snapshot stored
-		s.index = 0
-		return s.rc
-	}
-	return nil
-}

+ 1 - 1
rafthttp/transport_test.go

@@ -149,7 +149,7 @@ func TestTransportErrorc(t *testing.T) {
 		t.Fatalf("received unexpected from errorc")
 	case <-time.After(10 * time.Millisecond):
 	}
-	tr.peers[1].Send(raftpb.Message{})
+	tr.peers[1].send(raftpb.Message{})
 
 	testutil.WaitSchedule()
 	select {