فهرست منبع

Merge pull request #3376 from yichengq/connection-down

etcdserver: specify request timeout error due to connection down
Yicheng Qin 10 سال پیش
والد
کامیت
f38778160d

+ 10 - 9
etcdserver/errors.go

@@ -21,15 +21,16 @@ import (
 )
 
 var (
-	ErrUnknownMethod          = errors.New("etcdserver: unknown method")
-	ErrStopped                = errors.New("etcdserver: server stopped")
-	ErrIDRemoved              = errors.New("etcdserver: ID removed")
-	ErrIDExists               = errors.New("etcdserver: ID exists")
-	ErrIDNotFound             = errors.New("etcdserver: ID not found")
-	ErrPeerURLexists          = errors.New("etcdserver: peerURL exists")
-	ErrCanceled               = errors.New("etcdserver: request cancelled")
-	ErrTimeout                = errors.New("etcdserver: request timed out")
-	ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
+	ErrUnknownMethod              = errors.New("etcdserver: unknown method")
+	ErrStopped                    = errors.New("etcdserver: server stopped")
+	ErrIDRemoved                  = errors.New("etcdserver: ID removed")
+	ErrIDExists                   = errors.New("etcdserver: ID exists")
+	ErrIDNotFound                 = errors.New("etcdserver: ID not found")
+	ErrPeerURLexists              = errors.New("etcdserver: peerURL exists")
+	ErrCanceled                   = errors.New("etcdserver: request cancelled")
+	ErrTimeout                    = errors.New("etcdserver: request timed out")
+	ErrTimeoutDueToLeaderFail     = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
+	ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
 )
 
 func isKeyNotFound(err error) bool {

+ 3 - 2
etcdserver/etcdhttp/client.go

@@ -593,9 +593,10 @@ func writeKeyError(w http.ResponseWriter, err error) {
 	case *etcdErr.Error:
 		e.WriteTo(w)
 	default:
-		if err == etcdserver.ErrTimeoutDueToLeaderFail {
+		switch err {
+		case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost:
 			plog.Error(err)
-		} else {
+		default:
 			plog.Errorf("got unexpected response error (%v)", err)
 		}
 		ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0)

+ 3 - 2
etcdserver/etcdhttp/http.go

@@ -54,9 +54,10 @@ func writeError(w http.ResponseWriter, err error) {
 		herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
 		herr.WriteTo(w)
 	default:
-		if err == etcdserver.ErrTimeoutDueToLeaderFail {
+		switch err {
+		case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost:
 			plog.Error(err)
-		} else {
+		default:
 			plog.Errorf("got unexpected response error (%v)", err)
 		}
 		herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")

+ 34 - 0
etcdserver/server.go

@@ -1024,8 +1024,42 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
 		if start.After(prevLeadLost) && start.Before(curLeadElected) {
 			return ErrTimeoutDueToLeaderFail
 		}
+
+		lead := types.ID(atomic.LoadUint64(&s.r.lead))
+		switch lead {
+		case types.ID(raft.None):
+			// TODO: return error to specify it happens because the cluster does not have leader now
+		case s.ID():
+			if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
+				return ErrTimeoutDueToConnectionLost
+			}
+		default:
+			if !isConnectedSince(s.r.transport, start, lead) {
+				return ErrTimeoutDueToConnectionLost
+			}
+		}
+
 		return ErrTimeout
 	default:
 		return err
 	}
 }
+
+// isConnectedToQuorumSince checks whether the local member is connected to the
+// quorum of the cluster since the given time.
+func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool {
+	var connectedNum int
+	for _, m := range members {
+		if m.ID == self || isConnectedSince(transport, since, m.ID) {
+			connectedNum++
+		}
+	}
+	return connectedNum >= (len(members)+1)/2
+}
+
+// isConnectedSince checks whether the local member is connected to the
+// remote member since the given time.
+func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool {
+	t := transport.ActiveSince(remote)
+	return !t.IsZero() && t.Before(since)
+}

+ 1 - 0
etcdserver/server_test.go

@@ -1417,6 +1417,7 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
 func (s *nopTransporter) RemovePeer(id types.ID)              {}
 func (s *nopTransporter) RemoveAllPeers()                     {}
 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
+func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Pause()                              {}
 func (s *nopTransporter) Resume()                             {}

+ 1 - 0
rafthttp/http_test.go

@@ -366,4 +366,5 @@ func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr
 func (pr *fakePeer) Update(urls types.URLs)                { pr.urls = urls }
 func (pr *fakePeer) setTerm(term uint64)                   { pr.term = term }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
+func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
 func (pr *fakePeer) Stop()                                 {}

+ 8 - 0
rafthttp/peer.go

@@ -66,6 +66,9 @@ type Peer interface {
 	// connection hands over to the peer. The peer will close the connection
 	// when it is no longer used.
 	attachOutgoingConn(conn *outgoingConn)
+	// activeSince returns the time that the connection with the
+	// peer becomes active.
+	activeSince() time.Time
 	// Stop performs any necessary finalization and terminates the peer
 	// elegantly.
 	Stop()
@@ -87,6 +90,8 @@ type peer struct {
 	id types.ID
 	r  Raft
 
+	status *peerStatus
+
 	msgAppWriter *streamWriter
 	writer       *streamWriter
 	pipeline     *pipeline
@@ -112,6 +117,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 	p := &peer{
 		id:           to,
 		r:            r,
+		status:       status,
 		msgAppWriter: startStreamWriter(to, status, fs, r),
 		writer:       startStreamWriter(to, status, fs, r),
 		pipeline:     newPipeline(tr, picker, local, to, cid, status, fs, r, errorc),
@@ -223,6 +229,8 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	}
 }
 
+func (p *peer) activeSince() time.Time { return p.status.activeSince }
+
 // Pause pauses the peer. The peer will simply drops all incoming
 // messages without retruning an error.
 func (p *peer) Pause() {

+ 8 - 4
rafthttp/peer_status.go

@@ -17,6 +17,7 @@ package rafthttp
 import (
 	"fmt"
 	"sync"
+	"time"
 
 	"github.com/coreos/etcd/pkg/types"
 )
@@ -27,10 +28,11 @@ type failureType struct {
 }
 
 type peerStatus struct {
-	id         types.ID
-	mu         sync.Mutex // protect active and failureMap
-	active     bool
-	failureMap map[failureType]string
+	id          types.ID
+	mu          sync.Mutex // protect variables below
+	active      bool
+	failureMap  map[failureType]string
+	activeSince time.Time
 }
 
 func newPeerStatus(id types.ID) *peerStatus {
@@ -46,6 +48,7 @@ func (s *peerStatus) activate() {
 	if !s.active {
 		plog.Infof("the connection with %s became active", s.id)
 		s.active = true
+		s.activeSince = time.Now()
 		s.failureMap = make(map[failureType]string)
 	}
 }
@@ -56,6 +59,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
 	if s.active {
 		plog.Infof("the connection with %s became inactive", s.id)
 		s.active = false
+		s.activeSince = time.Time{}
 	}
 	logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
 	if r, ok := s.failureMap[failure]; ok && r == reason {

+ 15 - 0
rafthttp/transport.go

@@ -17,6 +17,7 @@ package rafthttp
 import (
 	"net/http"
 	"sync"
+	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
@@ -68,6 +69,11 @@ type Transporter interface {
 	// It is the caller's responsibility to ensure the urls are all valid,
 	// or it panics.
 	UpdatePeer(id types.ID, urls []string)
+	// ActiveSince returns the time that the connection with the peer
+	// of the given id becomes active.
+	// If the connection is active since peer was added, it returns the adding time.
+	// If the connection is currently inactive, it returns zero time.
+	ActiveSince(id types.ID) time.Time
 	// Stop closes the connections and stops the transporter.
 	Stop()
 }
@@ -248,6 +254,15 @@ func (t *transport) UpdatePeer(id types.ID, us []string) {
 	addPeerToProber(t.prober, id.String(), us)
 }
 
+func (t *transport) ActiveSince(id types.ID) time.Time {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	if p, ok := t.peers[id]; ok {
+		return p.activeSince()
+	}
+	return time.Time{}
+}
+
 type Pausable interface {
 	Pause()
 	Resume()