Browse Source

etcdserver: detect leader change on reads

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
Gyuho Lee 6 years ago
parent
commit
e5c2dff346

+ 2 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -61,6 +61,7 @@ var (
 
 	ErrGRPCNoLeader                   = status.New(codes.Unavailable, "etcdserver: no leader").Err()
 	ErrGRPCNotLeader                  = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
+	ErrGRPCLeaderChanged              = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
 	ErrGRPCNotCapable                 = status.New(codes.Unavailable, "etcdserver: not capable").Err()
 	ErrGRPCStopped                    = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
 	ErrGRPCTimeout                    = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
@@ -163,6 +164,7 @@ var (
 
 	ErrNoLeader                   = Error(ErrGRPCNoLeader)
 	ErrNotLeader                  = Error(ErrGRPCNotLeader)
+	ErrLeaderChanged              = Error(ErrGRPCLeaderChanged)
 	ErrNotCapable                 = Error(ErrGRPCNotCapable)
 	ErrStopped                    = Error(ErrGRPCStopped)
 	ErrTimeout                    = Error(ErrGRPCTimeout)

+ 1 - 0
etcdserver/api/v3rpc/util.go

@@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{
 
 	etcdserver.ErrNoLeader:                   rpctypes.ErrGRPCNoLeader,
 	etcdserver.ErrNotLeader:                  rpctypes.ErrGRPCNotLeader,
+	etcdserver.ErrLeaderChanged:              rpctypes.ErrGRPCLeaderChanged,
 	etcdserver.ErrStopped:                    rpctypes.ErrGRPCStopped,
 	etcdserver.ErrTimeout:                    rpctypes.ErrGRPCTimeout,
 	etcdserver.ErrTimeoutDueToLeaderFail:     rpctypes.ErrGRPCTimeoutDueToLeaderFail,

+ 1 - 0
etcdserver/errors.go

@@ -27,6 +27,7 @@ var (
 	ErrTimeoutDueToLeaderFail     = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
 	ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
 	ErrTimeoutLeaderTransfer      = errors.New("etcdserver: request timed out, leader transfer took too long")
+	ErrLeaderChanged              = errors.New("etcdserver: leader changed")
 	ErrNotEnoughStartedMembers    = errors.New("etcdserver: re-configuration failed due to not enough started members")
 	ErrNoLeader                   = errors.New("etcdserver: no leader")
 	ErrNotLeader                  = errors.New("etcdserver: not leader")

+ 21 - 1
etcdserver/server.go

@@ -198,7 +198,9 @@ type EtcdServer struct {
 	// stopping is closed by run goroutine on shutdown.
 	stopping chan struct{}
 	// done is closed when all goroutines from start() complete.
-	done chan struct{}
+	done            chan struct{}
+	leaderChanged   chan struct{}
+	leaderChangedMu sync.RWMutex
 
 	errorc     chan error
 	id         types.ID
@@ -597,6 +599,7 @@ func (s *EtcdServer) start() {
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.readwaitc = make(chan struct{}, 1)
 	s.readNotifier = newNotifier()
+	s.leaderChanged = make(chan struct{})
 	if s.ClusterVersion() != nil {
 		plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
 	} else {
@@ -733,6 +736,17 @@ func (s *EtcdServer) run() {
 					s.compactor.Resume()
 				}
 			}
+			if newLeader {
+				select {
+				case s.leaderChanged <- struct{}{}:
+				default:
+				}
+				s.leaderChangedMu.Lock()
+				lc := s.leaderChanged
+				s.leaderChanged = make(chan struct{})
+				s.leaderChangedMu.Unlock()
+				close(lc)
+			}
 
 			// TODO: remove the nil checking
 			// current test utility does not provide the stats
@@ -841,6 +855,12 @@ func (s *EtcdServer) run() {
 	}
 }
 
+func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
+	s.leaderChangedMu.RLock()
+	defer s.leaderChangedMu.RUnlock()
+	return s.leaderChanged
+}
+
 func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
 	s.applySnapshot(ep, apply)
 	s.applyEntries(ep, apply)

+ 9 - 0
etcdserver/v3_server.go

@@ -614,7 +614,10 @@ func (s *EtcdServer) linearizableReadLoop() {
 		id1 := s.reqIDGen.Next()
 		binary.BigEndian.PutUint64(ctxToSend, id1)
 
+		leaderChangedNotifier := s.leaderChangedNotify()
 		select {
+		case <-leaderChangedNotifier:
+			continue
 		case <-s.readwaitc:
 		case <-s.stopping:
 			return
@@ -659,6 +662,12 @@ func (s *EtcdServer) linearizableReadLoop() {
 					slowReadIndex.Inc()
 				}
 
+			case <-leaderChangedNotifier:
+				timeout = true
+				readIndexFailed.Inc()
+				// return a retryable error.
+				nr.notify(ErrLeaderChanged)
+
 			case <-time.After(s.Cfg.ReqTimeout()):
 				plog.Warningf("timed out waiting for read index response (local node might have slow network)")
 				nr.notify(ErrTimeout)