소스 검색

rafthttp: extract pipeline from peer

Yicheng Qin 11 년 전
부모
커밋
d8a9e11e22
3개의 변경된 파일214개의 추가작업 그리고 158개의 파일을 삭제
  1. 16 129
      rafthttp/peer.go
  2. 169 0
      rafthttp/pipeline.go
  3. 29 29
      rafthttp/pipeline_test.go

+ 16 - 129
rafthttp/peer.go

@@ -15,28 +15,17 @@
 package rafthttp
 
 import (
-	"bytes"
 	"errors"
-	"fmt"
-	"log"
 	"net/http"
 	"sync"
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/stats"
-	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
 const (
-	connPerSender = 4
-	// senderBufSize is the size of sender buffer, which helps hold the
-	// temporary network latency.
-	// The size ensures that sender does not drop messages when the network
-	// is out of work for less than 1 second in good path.
-	senderBufSize = 64
-
 	appRespBatchMs = 50
 	propBatchMs    = 10
 
@@ -50,50 +39,35 @@ type peer struct {
 	id  types.ID
 	cid types.ID
 
-	tr     http.RoundTripper
-	r      Raft
-	fs     *stats.FollowerStats
-	errorc chan error
+	tr http.RoundTripper
+	// the url this sender post to
+	u  string
+	r  Raft
+	fs *stats.FollowerStats
 
 	batcher     *Batcher
 	propBatcher *ProposalBatcher
-	q           chan *raftpb.Message
 
-	stream *stream
+	pipeline *pipeline
+	stream   *stream
 
-	// wait for the handling routines
-	wg sync.WaitGroup
-
-	// the url this sender post to
-	u string
-	// if the last send was successful, the sender is active.
-	// Or it is inactive
-	active  bool
-	errored error
 	paused  bool
 	stopped bool
 }
 
 func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
-	p := &peer{
+	return &peer{
 		id:          id,
-		active:      true,
+		cid:         cid,
 		tr:          tr,
 		u:           u,
-		cid:         cid,
 		r:           r,
 		fs:          fs,
+		pipeline:    newPipeline(tr, u, id, cid, fs, errorc),
 		stream:      &stream{},
-		errorc:      errorc,
 		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
 		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
-		q:           make(chan *raftpb.Message, senderBufSize),
 	}
-	p.wg.Add(connPerSender)
-	for i := 0; i < connPerSender; i++ {
-		go p.handle()
-	}
-	return p
 }
 
 func (p *peer) Update(u string) {
@@ -104,6 +78,7 @@ func (p *peer) Update(u string) {
 		panic("peer: update a stopped peer")
 	}
 	p.u = u
+	p.pipeline.update(u)
 }
 
 // Send sends the data to the remote node. It is always non-blocking.
@@ -134,14 +109,14 @@ func (p *peer) Send(m raftpb.Message) error {
 		p.propBatcher.Batch(m)
 	case canBatch(m) && p.stream.isOpen():
 		if !p.batcher.ShouldBatch(time.Now()) {
-			err = p.send(m)
+			err = p.pipeline.send(m)
 		}
 	case canUseStream(m):
 		if ok := p.stream.write(m); !ok {
-			err = p.send(m)
+			err = p.pipeline.send(m)
 		}
 	default:
-		err = p.send(m)
+		err = p.pipeline.send(m)
 	}
 	// send out batched MsgProp if needed
 	// TODO: it is triggered by all outcoming send now, and it needs
@@ -150,111 +125,23 @@ func (p *peer) Send(m raftpb.Message) error {
 	if !p.propBatcher.IsEmpty() {
 		t := time.Now()
 		if !p.propBatcher.ShouldBatch(t) {
-			p.send(p.propBatcher.Message)
+			p.pipeline.send(p.propBatcher.Message)
 			p.propBatcher.Reset(t)
 		}
 	}
 	return err
 }
 
-func (p *peer) send(m raftpb.Message) error {
-	// TODO: don't block. we should be able to have 1000s
-	// of messages out at a time.
-	select {
-	case p.q <- &m:
-		return nil
-	default:
-		log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
-			m.Type, senderBufSize, p.u)
-		return fmt.Errorf("reach maximal serving")
-	}
-}
-
 // Stop performs any necessary finalization and terminates the peer
 // elegantly.
 func (p *peer) Stop() {
-	close(p.q)
-	p.wg.Wait()
-
 	p.Lock()
 	defer p.Unlock()
+	p.pipeline.stop()
 	p.stream.stop()
 	p.stopped = true
 }
 
-func (p *peer) handle() {
-	defer p.wg.Done()
-	for m := range p.q {
-		start := time.Now()
-		err := p.post(pbutil.MustMarshal(m))
-		end := time.Now()
-
-		p.Lock()
-		if err != nil {
-			if p.errored == nil || p.errored.Error() != err.Error() {
-				log.Printf("sender: error posting to %s: %v", p.id, err)
-				p.errored = err
-			}
-			if p.active {
-				log.Printf("sender: the connection with %s became inactive", p.id)
-				p.active = false
-			}
-			if m.Type == raftpb.MsgApp {
-				p.fs.Fail()
-			}
-		} else {
-			if !p.active {
-				log.Printf("sender: the connection with %s became active", p.id)
-				p.active = true
-				p.errored = nil
-			}
-			if m.Type == raftpb.MsgApp {
-				p.fs.Succ(end.Sub(start))
-			}
-		}
-		p.Unlock()
-	}
-}
-
-// post POSTs a data payload to a url. Returns nil if the POST succeeds,
-// error on any failure.
-func (p *peer) post(data []byte) error {
-	p.Lock()
-	req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data))
-	p.Unlock()
-	if err != nil {
-		return err
-	}
-	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
-	resp, err := p.tr.RoundTrip(req)
-	if err != nil {
-		return err
-	}
-	resp.Body.Close()
-
-	switch resp.StatusCode {
-	case http.StatusPreconditionFailed:
-		err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
-		select {
-		case p.errorc <- err:
-		default:
-		}
-		return nil
-	case http.StatusForbidden:
-		err := fmt.Errorf("the member has been permanently removed from the cluster")
-		select {
-		case p.errorc <- err:
-		default:
-		}
-		return nil
-	case http.StatusNoContent:
-		return nil
-	default:
-		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
-	}
-}
-
 // attachStream attaches a streamSever to the peer.
 func (p *peer) attachStream(sw *streamWriter) error {
 	p.Lock()

+ 169 - 0
rafthttp/pipeline.go

@@ -0,0 +1,169 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+const (
+	connPerPipeline = 4
+	// pipelineBufSize is the size of pipeline buffer, which helps hold the
+	// temporary network latency.
+	// The size ensures that pipeline does not drop messages when the network
+	// is out of work for less than 1 second in good path.
+	pipelineBufSize = 64
+)
+
+type pipeline struct {
+	id  types.ID
+	cid types.ID
+
+	tr http.RoundTripper
+	// the url this pipeline sends to
+	u      string
+	fs     *stats.FollowerStats
+	errorc chan error
+
+	q chan *raftpb.Message
+	// wait for the handling routines
+	wg sync.WaitGroup
+	sync.Mutex
+	// if the last send was successful, the pipeline is active.
+	// Or it is inactive
+	active  bool
+	errored error
+}
+
+func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, errorc chan error) *pipeline {
+	p := &pipeline{
+		id:     id,
+		cid:    cid,
+		tr:     tr,
+		u:      u,
+		fs:     fs,
+		errorc: errorc,
+		q:      make(chan *raftpb.Message, pipelineBufSize),
+		active: true,
+	}
+	p.wg.Add(connPerPipeline)
+	for i := 0; i < connPerPipeline; i++ {
+		go p.handle()
+	}
+	return p
+}
+
+func (p *pipeline) update(u string) { p.u = u }
+
+func (p *pipeline) send(m raftpb.Message) error {
+	// TODO: don't block. we should be able to have 1000s
+	// of messages out at a time.
+	select {
+	case p.q <- &m:
+		return nil
+	default:
+		log.Printf("pipeline: dropping %s because maximal number %d of pipeline buffer entries to %s has been reached",
+			m.Type, pipelineBufSize, p.u)
+		return fmt.Errorf("reach maximal serving")
+	}
+}
+
+func (p *pipeline) stop() {
+	close(p.q)
+	p.wg.Wait()
+}
+
+func (p *pipeline) handle() {
+	defer p.wg.Done()
+	for m := range p.q {
+		start := time.Now()
+		err := p.pipeline(pbutil.MustMarshal(m))
+		end := time.Now()
+
+		p.Lock()
+		if err != nil {
+			if p.errored == nil || p.errored.Error() != err.Error() {
+				log.Printf("pipeline: error posting to %s: %v", p.id, err)
+				p.errored = err
+			}
+			if p.active {
+				log.Printf("pipeline: the connection with %s became inactive", p.id)
+				p.active = false
+			}
+			if m.Type == raftpb.MsgApp {
+				p.fs.Fail()
+			}
+		} else {
+			if !p.active {
+				log.Printf("pipeline: the connection with %s became active", p.id)
+				p.active = true
+				p.errored = nil
+			}
+			if m.Type == raftpb.MsgApp {
+				p.fs.Succ(end.Sub(start))
+			}
+		}
+		p.Unlock()
+	}
+}
+
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (p *pipeline) pipeline(data []byte) error {
+	p.Lock()
+	req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data))
+	p.Unlock()
+	if err != nil {
+		return err
+	}
+	req.Header.Set("Content-Type", "application/protobuf")
+	req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
+	resp, err := p.tr.RoundTrip(req)
+	if err != nil {
+		return err
+	}
+	resp.Body.Close()
+
+	switch resp.StatusCode {
+	case http.StatusPreconditionFailed:
+		err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
+		select {
+		case p.errorc <- err:
+		default:
+		}
+		return nil
+	case http.StatusForbidden:
+		err := fmt.Errorf("the member has been permanently removed from the cluster")
+		select {
+		case p.errorc <- err:
+		default:
+		}
+		return nil
+	case http.StatusNoContent:
+		return nil
+	default:
+		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
+	}
+}

+ 29 - 29
rafthttp/peer_test.go → rafthttp/pipeline_test.go

@@ -27,17 +27,17 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-// TestSenderSend tests that send func could post data using roundtripper
+// TestPipelineSend tests that pipeline could send data using roundtripper
 // and increase success count in stats.
-func TestSenderSend(t *testing.T) {
+func TestPipelineSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
-	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
 
-	if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
+	if err := p.send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 	}
-	p.Stop()
+	p.stop()
 
 	if tr.Request() == nil {
 		t.Errorf("sender fails to post the data")
@@ -49,15 +49,15 @@ func TestSenderSend(t *testing.T) {
 	}
 }
 
-func TestSenderExceedMaximalServing(t *testing.T) {
+func TestPipelineExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
-	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
-	for i := 0; i < connPerSender+senderBufSize; i++ {
-		if err := p.Send(raftpb.Message{}); err != nil {
+	for i := 0; i < connPerPipeline+pipelineBufSize; i++ {
+		if err := p.send(raftpb.Message{}); err != nil {
 			t.Errorf("send err = %v, want nil", err)
 		}
 		// force the sender to grab data
@@ -65,7 +65,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	}
 
 	// try to send a data when we are sure the buffer is full
-	if err := p.Send(raftpb.Message{}); err == nil {
+	if err := p.send(raftpb.Message{}); err == nil {
 		t.Errorf("unexpect send success")
 	}
 
@@ -74,22 +74,22 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	testutil.ForceGosched()
 
 	// It could send new data after previous ones succeed
-	if err := p.Send(raftpb.Message{}); err != nil {
+	if err := p.send(raftpb.Message{}); err != nil {
 		t.Errorf("send err = %v, want nil", err)
 	}
-	p.Stop()
+	p.stop()
 }
 
-// TestSenderSendFailed tests that when send func meets the post error,
+// TestPipelineSendFailed tests that when send func meets the post error,
 // it increases fail count in stats.
-func TestSenderSendFailed(t *testing.T) {
+func TestPipelineSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
-	p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
+	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
 
-	if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
+	if err := p.send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
 	}
-	p.Stop()
+	p.stop()
 
 	fs.Lock()
 	defer fs.Unlock()
@@ -98,13 +98,13 @@ func TestSenderSendFailed(t *testing.T) {
 	}
 }
 
-func TestSenderPost(t *testing.T) {
+func TestPipelinePost(t *testing.T) {
 	tr := &roundTripperRecorder{}
-	p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
-	if err := p.post([]byte("some data")); err != nil {
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, nil)
+	if err := p.pipeline([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
-	p.Stop()
+	p.stop()
 
 	if g := tr.Request().Method; g != "POST" {
 		t.Errorf("method = %s, want %s", g, "POST")
@@ -127,7 +127,7 @@ func TestSenderPost(t *testing.T) {
 	}
 }
 
-func TestSenderPostBad(t *testing.T) {
+func TestPipelinePostBad(t *testing.T) {
 	tests := []struct {
 		u    string
 		code int
@@ -142,9 +142,9 @@ func TestSenderPostBad(t *testing.T) {
 		{"http://10.0.0.1", http.StatusCreated, nil},
 	}
 	for i, tt := range tests {
-		p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, make(chan error))
-		err := p.post([]byte("some data"))
-		p.Stop()
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, make(chan error))
+		err := p.pipeline([]byte("some data"))
+		p.stop()
 
 		if err == nil {
 			t.Errorf("#%d: err = nil, want not nil", i)
@@ -152,7 +152,7 @@ func TestSenderPostBad(t *testing.T) {
 	}
 }
 
-func TestPeerPostErrorc(t *testing.T) {
+func TestPipelinePostErrorc(t *testing.T) {
 	tests := []struct {
 		u    string
 		code int
@@ -163,9 +163,9 @@ func TestPeerPostErrorc(t *testing.T) {
 	}
 	for i, tt := range tests {
 		errorc := make(chan error, 1)
-		p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, errorc)
-		p.post([]byte("some data"))
-		p.Stop()
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, errorc)
+		p.pipeline([]byte("some data"))
+		p.stop()
 		select {
 		case <-errorc:
 		default: