Browse Source

etcdserver: specify timeout caused by leader election

Before this PR, the timeout caused by leader election returns:

```
14:45:37 etcd2 | 2015-08-12 14:45:37.786349 E | etcdhttp: got unexpected
response error (etcdserver: request timed out)
```

After this PR:

```
15:52:54 etcd1 | 2015-08-12 15:52:54.389523 E | etcdhttp: etcdserver:
request timed out, possibly due to leader down
```
Yicheng Qin 10 years ago
parent
commit
27170e67b9
4 changed files with 50 additions and 24 deletions
  1. 9 21
      etcdserver/errors.go
  2. 6 1
      etcdserver/etcdhttp/http.go
  3. 16 0
      etcdserver/raft.go
  4. 19 2
      etcdserver/server.go

+ 9 - 21
etcdserver/errors.go

@@ -18,32 +18,20 @@ import (
 	"errors"
 
 	etcdErr "github.com/coreos/etcd/error"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 var (
-	ErrUnknownMethod = errors.New("etcdserver: unknown method")
-	ErrStopped       = errors.New("etcdserver: server stopped")
-	ErrIDRemoved     = errors.New("etcdserver: ID removed")
-	ErrIDExists      = errors.New("etcdserver: ID exists")
-	ErrIDNotFound    = errors.New("etcdserver: ID not found")
-	ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
-	ErrCanceled      = errors.New("etcdserver: request cancelled")
-	ErrTimeout       = errors.New("etcdserver: request timed out")
+	ErrUnknownMethod          = errors.New("etcdserver: unknown method")
+	ErrStopped                = errors.New("etcdserver: server stopped")
+	ErrIDRemoved              = errors.New("etcdserver: ID removed")
+	ErrIDExists               = errors.New("etcdserver: ID exists")
+	ErrIDNotFound             = errors.New("etcdserver: ID not found")
+	ErrPeerURLexists          = errors.New("etcdserver: peerURL exists")
+	ErrCanceled               = errors.New("etcdserver: request cancelled")
+	ErrTimeout                = errors.New("etcdserver: request timed out")
+	ErrTimeoutDueToLeaderLost = errors.New("etcdserver: request timed out, possibly due to leader lost")
 )
 
-func parseCtxErr(err error) error {
-	switch err {
-	case context.Canceled:
-		return ErrCanceled
-	case context.DeadlineExceeded:
-		return ErrTimeout
-	default:
-		return err
-	}
-}
-
 func isKeyNotFound(err error) bool {
 	e, ok := err.(*etcdErr.Error)
 	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound

+ 6 - 1
etcdserver/etcdhttp/http.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/auth"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 )
@@ -53,7 +54,11 @@ func writeError(w http.ResponseWriter, err error) {
 		herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
 		herr.WriteTo(w)
 	default:
-		plog.Errorf("got unexpected response error (%v)", err)
+		if err == etcdserver.ErrTimeoutDueToLeaderLost {
+			plog.Error(err)
+		} else {
+			plog.Errorf("got unexpected response error (%v)", err)
+		}
 		herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
 		herr.WriteTo(w)
 	}

+ 16 - 0
etcdserver/raft.go

@@ -19,6 +19,7 @@ import (
 	"expvar"
 	"os"
 	"sort"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -86,6 +87,10 @@ type raftNode struct {
 	term  uint64
 	lead  uint64
 
+	mu sync.Mutex
+	// last lead elected time
+	lt time.Time
+
 	raft.Node
 
 	// a chan to send out apply
@@ -129,6 +134,11 @@ func (r *raftNode) start(s *EtcdServer) {
 				r.Tick()
 			case rd := <-r.Ready():
 				if rd.SoftState != nil {
+					if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
+						r.mu.Lock()
+						r.lt = time.Now()
+						r.mu.Unlock()
+					}
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					if rd.RaftState == raft.StateLeader {
 						syncC = r.s.SyncTicker
@@ -187,6 +197,12 @@ func (r *raftNode) apply() chan apply {
 	return r.applyc
 }
 
+func (r *raftNode) leadElectedTime() time.Time {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	return r.lt
+}
+
 func (r *raftNode) stop() {
 	r.stopped <- struct{}{}
 	<-r.done

+ 19 - 2
etcdserver/server.go

@@ -553,7 +553,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 		case <-ctx.Done():
 			proposeFailed.Inc()
 			s.w.Trigger(r.ID, nil) // GC wait
-			return Response{}, parseCtxErr(ctx.Err())
+			return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
 		case <-s.done:
 			return Response{}, ErrStopped
 		}
@@ -648,6 +648,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
 	cc.ID = s.reqIDGen.Next()
 	ch := s.w.Register(cc.ID)
+	start := time.Now()
 	if err := s.r.ProposeConfChange(ctx, cc); err != nil {
 		s.w.Trigger(cc.ID, nil)
 		return err
@@ -663,7 +664,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
 		return nil
 	case <-ctx.Done():
 		s.w.Trigger(cc.ID, nil) // GC wait
-		return parseCtxErr(ctx.Err())
+		return s.parseProposeCtxErr(ctx.Err(), start)
 	case <-s.done:
 		return ErrStopped
 	}
@@ -1014,3 +1015,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
 		plog.Errorf("error updating cluster version (%v)", err)
 	}
 }
+
+func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
+	switch err {
+	case context.Canceled:
+		return ErrCanceled
+	case context.DeadlineExceeded:
+		curLeadElected := s.r.leadElectedTime()
+		prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
+		if start.After(prevLeadLost) && start.Before(curLeadElected) {
+			return ErrTimeoutDueToLeaderLost
+		}
+		return ErrTimeout
+	default:
+		return err
+	}
+}