Bladeren bron

rafthttp: configurable stream reader retry timeout

rafthttp.Transport.DialRetryTimeout field alters the frequency of dial attempts
+ minor changes after code review
Vitaly Isaev 8 jaren geleden
bovenliggende
commit
4301f49988
4 gewijzigde bestanden met toevoegingen van 58 en 29 verwijderingen
  1. 4 0
      rafthttp/peer.go
  2. 32 27
      rafthttp/stream.go
  3. 8 0
      rafthttp/stream_test.go
  4. 14 2
      rafthttp/transport.go

+ 4 - 0
rafthttp/peer.go

@@ -24,6 +24,7 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/snap"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 )
 )
 
 
 const (
 const (
@@ -188,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
 		status: status,
 		status: status,
 		recvc:  p.recvc,
 		recvc:  p.recvc,
 		propc:  p.propc,
 		propc:  p.propc,
+		rl:     rate.NewLimiter(transport.DialRetryFrequency, 1),
 	}
 	}
 	p.msgAppReader = &streamReader{
 	p.msgAppReader = &streamReader{
 		peerID: peerID,
 		peerID: peerID,
@@ -197,7 +199,9 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
 		status: status,
 		status: status,
 		recvc:  p.recvc,
 		recvc:  p.recvc,
 		propc:  p.propc,
 		propc:  p.propc,
+		rl:     rate.NewLimiter(transport.DialRetryFrequency, 1),
 	}
 	}
+
 	p.msgAppV2Reader.start()
 	p.msgAppV2Reader.start()
 	p.msgAppReader.start()
 	p.msgAppReader.start()
 
 

+ 32 - 27
rafthttp/stream.go

@@ -25,6 +25,8 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"golang.org/x/time/rate"
+
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/transport"
@@ -243,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool {
 	if !cw.working {
 	if !cw.working {
 		return false
 		return false
 	}
 	}
-	cw.closer.Close()
+	if err := cw.closer.Close(); err != nil {
+		plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
+	}
 	if len(cw.msgc) > 0 {
 	if len(cw.msgc) > 0 {
 		cw.r.ReportUnreachable(uint64(cw.peerID))
 		cw.r.ReportUnreachable(uint64(cw.peerID))
 	}
 	}
@@ -278,25 +282,28 @@ type streamReader struct {
 	recvc  chan<- raftpb.Message
 	recvc  chan<- raftpb.Message
 	propc  chan<- raftpb.Message
 	propc  chan<- raftpb.Message
 
 
+	rl *rate.Limiter // alters the frequency of dial retrial attempts
+
 	errorc chan<- error
 	errorc chan<- error
 
 
 	mu     sync.Mutex
 	mu     sync.Mutex
 	paused bool
 	paused bool
-	cancel func()
 	closer io.Closer
 	closer io.Closer
 
 
-	stopc chan struct{}
-	done  chan struct{}
+	ctx    context.Context
+	cancel context.CancelFunc
+	done   chan struct{}
 }
 }
 
 
-func (r *streamReader) start() {
-	r.stopc = make(chan struct{})
-	r.done = make(chan struct{})
-	if r.errorc == nil {
-		r.errorc = r.tr.ErrorC
+func (cr *streamReader) start() {
+	cr.done = make(chan struct{})
+	if cr.errorc == nil {
+		cr.errorc = cr.tr.ErrorC
 	}
 	}
-
-	go r.run()
+	if cr.ctx == nil {
+		cr.ctx, cr.cancel = context.WithCancel(context.Background())
+	}
+	go cr.run()
 }
 }
 
 
 func (cr *streamReader) run() {
 func (cr *streamReader) run() {
@@ -311,7 +318,7 @@ func (cr *streamReader) run() {
 		} else {
 		} else {
 			cr.status.activate()
 			cr.status.activate()
 			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
 			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
-			err := cr.decodeLoop(rc, t)
+			err = cr.decodeLoop(rc, t)
 			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
 			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
 			switch {
 			switch {
 			// all data is read out
 			// all data is read out
@@ -322,15 +329,16 @@ func (cr *streamReader) run() {
 				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
 				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
 			}
 			}
 		}
 		}
-		select {
-		// Wait 100ms to create a new stream, so it doesn't bring too much
-		// overhead when retry.
-		case <-time.After(100 * time.Millisecond):
-		case <-cr.stopc:
+		// Wait for a while before new dial attempt
+		err = cr.rl.Wait(cr.ctx)
+		if cr.ctx.Err() != nil {
 			plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
 			plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
 			close(cr.done)
 			close(cr.done)
 			return
 			return
 		}
 		}
+		if err != nil {
+			plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
+		}
 	}
 	}
 }
 }
 
 
@@ -346,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 		plog.Panicf("unhandled stream type %s", t)
 		plog.Panicf("unhandled stream type %s", t)
 	}
 	}
 	select {
 	select {
-	case <-cr.stopc:
+	case <-cr.ctx.Done():
 		cr.mu.Unlock()
 		cr.mu.Unlock()
 		if err := rc.Close(); err != nil {
 		if err := rc.Close(); err != nil {
 			return err
 			return err
@@ -401,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 }
 }
 
 
 func (cr *streamReader) stop() {
 func (cr *streamReader) stop() {
-	close(cr.stopc)
 	cr.mu.Lock()
 	cr.mu.Lock()
-	if cr.cancel != nil {
-		cr.cancel()
-	}
+	cr.cancel()
 	cr.close()
 	cr.close()
 	cr.mu.Unlock()
 	cr.mu.Unlock()
 	<-cr.done
 	<-cr.done
@@ -429,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 
 
 	setPeerURLsHeader(req, cr.tr.URLs)
 	setPeerURLsHeader(req, cr.tr.URLs)
 
 
-	ctx, cancel := context.WithCancel(context.Background())
-	req = req.WithContext(ctx)
+	req = req.WithContext(cr.ctx)
 
 
 	cr.mu.Lock()
 	cr.mu.Lock()
-	cr.cancel = cancel
 	select {
 	select {
-	case <-cr.stopc:
+	case <-cr.ctx.Done():
 		cr.mu.Unlock()
 		cr.mu.Unlock()
 		return nil, fmt.Errorf("stream reader is stopped")
 		return nil, fmt.Errorf("stream reader is stopped")
 	default:
 	default:
@@ -497,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 
 
 func (cr *streamReader) close() {
 func (cr *streamReader) close() {
 	if cr.closer != nil {
 	if cr.closer != nil {
-		cr.closer.Close()
+		if err := cr.closer.Close(); err != nil {
+			plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
+		}
 	}
 	}
 	cr.closer = nil
 	cr.closer = nil
 }
 }

+ 8 - 0
rafthttp/stream_test.go

@@ -15,6 +15,7 @@
 package rafthttp
 package rafthttp
 
 
 import (
 import (
+	"context"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
@@ -25,6 +26,8 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"golang.org/x/time/rate"
+
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
@@ -113,6 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
 			peerID: types.ID(2),
 			peerID: types.ID(2),
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			ctx:    context.Background(),
 		}
 		}
 		sr.dial(tt)
 		sr.dial(tt)
 
 
@@ -167,6 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) {
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			errorc: make(chan error, 1),
 			errorc: make(chan error, 1),
+			ctx:    context.Background(),
 		}
 		}
 
 
 		_, err := sr.dial(streamTypeMessage)
 		_, err := sr.dial(streamTypeMessage)
@@ -192,6 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
 		errorc: make(chan error, 1),
 		errorc: make(chan error, 1),
 		typ:    streamTypeMessage,
 		typ:    streamTypeMessage,
 		status: newPeerStatus(types.ID(2)),
 		status: newPeerStatus(types.ID(2)),
+		rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
 	}
 	}
 	tr.onResp = func() {
 	tr.onResp = func() {
 		// stop() waits for the run() goroutine to exit, but that exit
 		// stop() waits for the run() goroutine to exit, but that exit
@@ -246,6 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
 			peerID: types.ID(2),
 			peerID: types.ID(2),
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			ctx:    context.Background(),
 		}
 		}
 
 
 		_, err := sr.dial(typ)
 		_, err := sr.dial(typ)
@@ -311,6 +318,7 @@ func TestStream(t *testing.T) {
 			status: newPeerStatus(types.ID(2)),
 			status: newPeerStatus(types.ID(2)),
 			recvc:  recvc,
 			recvc:  recvc,
 			propc:  propc,
 			propc:  propc,
+			rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
 		}
 		}
 		sr.start()
 		sr.start()
 
 

+ 14 - 2
rafthttp/transport.go

@@ -29,6 +29,7 @@ import (
 	"github.com/coreos/pkg/capnslog"
 	"github.com/coreos/pkg/capnslog"
 	"github.com/xiang90/probing"
 	"github.com/xiang90/probing"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 )
 )
 
 
 var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
 var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
@@ -94,8 +95,12 @@ type Transporter interface {
 // User needs to call Start before calling other functions, and call
 // User needs to call Start before calling other functions, and call
 // Stop when the Transport is no longer used.
 // Stop when the Transport is no longer used.
 type Transport struct {
 type Transport struct {
-	DialTimeout time.Duration     // maximum duration before timing out dial of the request
-	TLSInfo     transport.TLSInfo // TLS information used when creating connection
+	DialTimeout time.Duration // maximum duration before timing out dial of the request
+	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
+	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
+	DialRetryFrequency rate.Limit
+
+	TLSInfo transport.TLSInfo // TLS information used when creating connection
 
 
 	ID          types.ID   // local member ID
 	ID          types.ID   // local member ID
 	URLs        types.URLs // local peer URLs
 	URLs        types.URLs // local peer URLs
@@ -135,6 +140,13 @@ func (t *Transport) Start() error {
 	t.remotes = make(map[types.ID]*remote)
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
 	t.peers = make(map[types.ID]Peer)
 	t.prober = probing.NewProber(t.pipelineRt)
 	t.prober = probing.NewProber(t.pipelineRt)
+
+	// If client didn't provide dial retry frequence, use the default
+	// (100ms backoff between attempts to create a new stream),
+	// so it doesn't bring too much overhead when retry.
+	if t.DialRetryFrequency == 0 {
+		t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
+	}
 	return nil
 	return nil
 }
 }