Browse Source

rafthttp: add functions to create listener and roundTripper

This moves the code to create listener and roundTripper for raft communication
to the same place, and use explicit functions to build them. This prevents
possible development errors in the future.
Yicheng Qin 10 years ago
parent
commit
4ccbcb91c8
4 changed files with 31 additions and 11 deletions
  1. 1 1
      etcdmain/etcd.go
  2. 1 4
      etcdserver/server.go
  3. 2 6
      rafthttp/transport.go
  4. 27 0
      rafthttp/util.go

+ 1 - 1
etcdmain/etcd.go

@@ -208,7 +208,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 			plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
 			plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
 		}
 		}
 		var l net.Listener
 		var l net.Listener
-		l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
+		l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}

+ 1 - 4
etcdserver/server.go

@@ -40,7 +40,6 @@ import (
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/runtime"
 	"github.com/coreos/etcd/pkg/runtime"
 	"github.com/coreos/etcd/pkg/timeutil"
 	"github.com/coreos/etcd/pkg/timeutil"
-	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/wait"
 	"github.com/coreos/etcd/pkg/wait"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
@@ -211,12 +210,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	haveWAL := wal.Exist(cfg.WALDir())
 	haveWAL := wal.Exist(cfg.WALDir())
 	ss := snap.New(cfg.SnapDir())
 	ss := snap.New(cfg.SnapDir())
 
 
-	// use timeout transport to pair with remote timeout listeners
-	pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout(), 0, 0)
+	prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	prt := http.RoundTripper(pt)
 	var remotes []*Member
 	var remotes []*Member
 	switch {
 	switch {
 	case !haveWAL && !cfg.NewCluster:
 	case !haveWAL && !cfg.NewCluster:

+ 2 - 6
rafthttp/transport.go

@@ -136,15 +136,11 @@ type Transport struct {
 
 
 func (t *Transport) Start() error {
 func (t *Transport) Start() error {
 	var err error
 	var err error
-	// Read/write timeout is set for stream roundTripper to promptly
-	// find out broken status, which minimizes the number of messages
-	// sent on broken connection.
-	t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout)
+	t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	// use timeout transport to pair with remote timeout listeners
-	t.pipelineRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, 0, 0)
+	t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 27 - 0
rafthttp/util.go

@@ -18,11 +18,14 @@ import (
 	"encoding/binary"
 	"encoding/binary"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"net"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"strings"
 	"strings"
+	"time"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
+	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/version"
@@ -30,6 +33,30 @@ import (
 
 
 var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
 var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
 
 
+// NewListener returns a listener for raft message transfer between peers.
+// It uses timeout listener to identify broken streams promptly.
+func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) {
+	return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout)
+}
+
+// NewRoundTripper returns a roundTripper used to send requests
+// to rafthttp listener of remote peers.
+func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+	// It uses timeout transport to pair with remote timeout listeners.
+	// It sets no read/write timeout, because message in requests may
+	// take long time to write out before reading out the response.
+	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
+}
+
+// newStreamRoundTripper returns a roundTripper used to send stream requests
+// to rafthttp listener of remote peers.
+// Read/write timeout is set for stream roundTripper to promptly
+// find out broken status, which minimizes the number of messages
+// sent on broken connection.
+func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
+}
+
 func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
 func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
 	size := ent.Size()
 	size := ent.Size()
 	if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
 	if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {