|
@@ -15,9 +15,11 @@ import (
|
|
|
|
|
|
|
|
// Transporter layer for communication between raft nodes
|
|
// Transporter layer for communication between raft nodes
|
|
|
type transporter struct {
|
|
type transporter struct {
|
|
|
|
|
+ requestTimeout time.Duration
|
|
|
|
|
+
|
|
|
|
|
+ peerServer *PeerServer
|
|
|
client *http.Client
|
|
client *http.Client
|
|
|
transport *http.Transport
|
|
transport *http.Transport
|
|
|
- peerServer *PeerServer
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type dialer func(network, addr string) (net.Conn, error)
|
|
type dialer func(network, addr string) (net.Conn, error)
|
|
@@ -25,13 +27,7 @@ type dialer func(network, addr string) (net.Conn, error)
|
|
|
// Create transporter using by raft server
|
|
// Create transporter using by raft server
|
|
|
// Create http or https transporter based on
|
|
// Create http or https transporter based on
|
|
|
// whether the user give the server cert and key
|
|
// whether the user give the server cert and key
|
|
|
-func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
|
|
|
|
|
- // names for each type of timeout, for the sake of clarity
|
|
|
|
|
- dialTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout
|
|
|
|
|
- responseHeaderTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout
|
|
|
|
|
-
|
|
|
|
|
- t := transporter{}
|
|
|
|
|
-
|
|
|
|
|
|
|
+func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
|
|
|
tr := &http.Transport{
|
|
tr := &http.Transport{
|
|
|
Dial: func(network, addr string) (net.Conn, error) {
|
|
Dial: func(network, addr string) (net.Conn, error) {
|
|
|
return net.DialTimeout(network, addr, dialTimeout)
|
|
return net.DialTimeout(network, addr, dialTimeout)
|
|
@@ -44,9 +40,12 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *
|
|
|
tr.DisableCompression = true
|
|
tr.DisableCompression = true
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- t.client = &http.Client{Transport: tr}
|
|
|
|
|
- t.transport = tr
|
|
|
|
|
- t.peerServer = peerServer
|
|
|
|
|
|
|
+ t := transporter{
|
|
|
|
|
+ client: &http.Client{Transport: tr},
|
|
|
|
|
+ transport: tr,
|
|
|
|
|
+ peerServer: peerServer,
|
|
|
|
|
+ requestTimeout: requestTimeout,
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
return &t
|
|
return &t
|
|
|
}
|
|
}
|
|
@@ -227,7 +226,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
|
|
|
// Cancel the on fly HTTP transaction when timeout happens.
|
|
// Cancel the on fly HTTP transaction when timeout happens.
|
|
|
func (t *transporter) CancelWhenTimeout(req *http.Request) {
|
|
func (t *transporter) CancelWhenTimeout(req *http.Request) {
|
|
|
go func() {
|
|
go func() {
|
|
|
- time.Sleep(t.peerServer.Config.HeartbeatTimeout)
|
|
|
|
|
|
|
+ time.Sleep(t.requestTimeout)
|
|
|
t.transport.CancelRequest(req)
|
|
t.transport.CancelRequest(req)
|
|
|
}()
|
|
}()
|
|
|
}
|
|
}
|