소스 검색

Merge pull request #9897 from gyuho/read-index

etcdserver: clarify read index warnings, add metrics
Gyuho Lee 7 년 전
부모
커밋
495eb2f230
2개의 변경된 파일22개의 추가작업 그리고 9개의 파일을 삭제
  1. 7 0
      etcdserver/metrics.go
  2. 15 9
      etcdserver/v3_server.go

+ 7 - 0
etcdserver/metrics.go

@@ -78,6 +78,12 @@ var (
 		Name:      "proposals_failed_total",
 		Help:      "The total number of failed proposals seen.",
 	})
+	slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "server",
+		Name:      "slow_read_indexes_total",
+		Help:      "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
+	})
 	leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{
 		Namespace: "etcd_debugging",
 		Subsystem: "server",
@@ -109,6 +115,7 @@ func init() {
 	prometheus.MustRegister(proposalsApplied)
 	prometheus.MustRegister(proposalsPending)
 	prometheus.MustRegister(proposalsFailed)
+	prometheus.MustRegister(slowReadIndex)
 	prometheus.MustRegister(leaseExpired)
 	prometheus.MustRegister(quotaBackendBytes)
 	prometheus.MustRegister(currentVersion)

+ 15 - 9
etcdserver/v3_server.go

@@ -18,7 +18,6 @@ import (
 	"bytes"
 	"context"
 	"encoding/binary"
-	"fmt"
 	"time"
 
 	"github.com/coreos/etcd/auth"
@@ -632,8 +631,9 @@ func (s *EtcdServer) linearizableReadLoop() {
 	var rs raft.ReadState
 
 	for {
-		ctx := make([]byte, 8)
-		binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
+		ctxToSend := make([]byte, 8)
+		id1 := s.reqIDGen.Next()
+		binary.BigEndian.PutUint64(ctxToSend, id1)
 
 		select {
 		case <-s.readwaitc:
@@ -650,7 +650,7 @@ func (s *EtcdServer) linearizableReadLoop() {
 
 		lg := s.getLogger()
 		cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
-		if err := s.r.ReadIndex(cctx, ctx); err != nil {
+		if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
 			cancel()
 			if err == raft.ErrStopped {
 				return
@@ -672,19 +672,24 @@ func (s *EtcdServer) linearizableReadLoop() {
 		for !timeout && !done {
 			select {
 			case rs = <-s.r.readStateC:
-				done = bytes.Equal(rs.RequestCtx, ctx)
+				done = bytes.Equal(rs.RequestCtx, ctxToSend)
 				if !done {
 					// a previous request might time out. now we should ignore the response of it and
 					// continue waiting for the response of the current requests.
+					id2 := uint64(0)
+					if len(rs.RequestCtx) == 8 {
+						id2 = binary.BigEndian.Uint64(rs.RequestCtx)
+					}
 					if lg != nil {
 						lg.Warn(
-							"ignored out-of-date read index response",
-							zap.String("ctx-expected", fmt.Sprintf("%+v", string(rs.RequestCtx))),
-							zap.String("ctx-got", fmt.Sprintf("%+v", string(ctx))),
+							"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
+							zap.Uint64("sent-request-id", id1),
+							zap.Uint64("received-request-id", id2),
 						)
 					} else {
-						plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
+						plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
 					}
+					slowReadIndex.Inc()
 				}
 			case <-time.After(s.Cfg.ReqTimeout()):
 				if lg != nil {
@@ -694,6 +699,7 @@ func (s *EtcdServer) linearizableReadLoop() {
 				}
 				nr.notify(ErrTimeout)
 				timeout = true
+				slowReadIndex.Inc()
 			case <-s.stopping:
 				return
 			}