Browse Source

*: set read/write timeout for raft transport and listener

Yicheng Qin 11 years ago
parent
commit
3e55834c38
4 changed files with 63 additions and 2 deletions
  1. 3 2
      etcdmain/etcd.go
  2. 15 0
      pkg/transport/timeout_listener.go
  3. 42 0
      pkg/transport/timeout_transport.go
  4. 3 0
      rafthttp/sender.go

+ 3 - 2
etcdmain/etcd.go

@@ -35,6 +35,7 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/proxy"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/version"
 )
 
@@ -209,7 +210,7 @@ func startEtcd() (<-chan struct{}, error) {
 		return nil, fmt.Errorf("cannot write to data directory: %v", err)
 	}
 
-	pt, err := transport.NewTransport(peerTLSInfo)
+	pt, err := transport.NewTimeoutTransport(peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
 	if err != nil {
 		return nil, err
 	}
@@ -230,7 +231,7 @@ func startEtcd() (<-chan struct{}, error) {
 	plns := make([]net.Listener, 0)
 	for _, u := range lpurls {
 		var l net.Listener
-		l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo)
+		l, err = transport.NewTimeoutListener(u.Host, u.Scheme, peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
 		if err != nil {
 			return nil, err
 		}

+ 15 - 0
pkg/transport/timeout_listener.go

@@ -21,6 +21,21 @@ import (
 	"time"
 )
 
+// NewTimeoutListener returns a listener that listens on the given address.
+// If read/write on the accepted connection blocks longer than its time limit,
+// it will return timeout error.
+func NewTimeoutListener(addr string, scheme string, info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (net.Listener, error) {
+	ln, err := NewListener(addr, scheme, info)
+	if err != nil {
+		return nil, err
+	}
+	return &rwTimeoutListener{
+		Listener:   ln,
+		rdtimeoutd: rdtimeoutd,
+		wtimeoutd:  wtimeoutd,
+	}, nil
+}
+
 type rwTimeoutListener struct {
 	net.Listener
 	wtimeoutd  time.Duration

+ 42 - 0
pkg/transport/timeout_transport.go

@@ -0,0 +1,42 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package transport
+
+import (
+	"net"
+	"net/http"
+	"time"
+)
+
+// NewTimeoutTransport returns a transport created using the given TLS info.
+// If read/write on the created connection blocks longer than its time limit,
+// it will return timeout error.
+func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
+	tr, err := NewTransport(info)
+	if err != nil {
+		return nil, err
+	}
+	tr.Dial = (&rwTimeoutDialer{
+		Dialer: net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+		},
+		rdtimeoutd: rdtimeoutd,
+		wtimeoutd:  wtimeoutd,
+	}).Dial
+	return tr, nil
+}

+ 3 - 0
rafthttp/sender.go

@@ -35,6 +35,9 @@ const (
 	senderBufSize = connPerSender * 4
 
 	appRespBatchMs = 50
+
+	ConnReadTimeout  = 5 * time.Second
+	ConnWriteTimeout = 5 * time.Second
 )
 
 type Sender interface {