Browse Source

server: drop read request if found leader changed

nolouch 7 years ago
parent
commit
4de27039cb

+ 0 - 1
clientv3/integration/leasing_test.go

@@ -1790,7 +1790,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
 	defer testutil.AfterTest(t)
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
-
 	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
 	testutil.AssertNil(t, err)
 	defer closeLKV()

+ 52 - 0
clientv3/integration/network_partition_test.go

@@ -24,6 +24,7 @@ import (
 
 	"go.etcd.io/etcd/clientv3"
 	"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/integration"
 	"go.etcd.io/etcd/pkg/testutil"
 	"google.golang.org/grpc"
@@ -265,3 +266,54 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
 		t.Fatal("took too long to detect leader lost")
 	}
 }
+
+func TestDropReadUnderNetworkPartition(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
+		Size:               3,
+		SkipCreatingClient: true,
+	})
+	defer clus.Terminate(t)
+	leaderIndex := clus.WaitLeader(t)
+	// get a follower endpoint
+	eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()}
+	ccfg := clientv3.Config{
+		Endpoints:   eps,
+		DialTimeout: 10 * time.Second,
+		DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	}
+	cli, err := clientv3.New(ccfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer cli.Close()
+
+	// wait for eps[0] to be pinned
+	mustWaitPinReady(t, cli)
+
+	// add other endpoints for later endpoint switch
+	cli.SetEndpoints(eps...)
+	time.Sleep(time.Second * 2)
+	conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer conn.Close()
+
+	clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3])
+	kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil)
+	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
+	_, err = kvc.Get(ctx, "a")
+	cancel()
+	if err != rpctypes.ErrLeaderChanged {
+		t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
+	}
+
+	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+	_, err = kvc.Get(ctx, "a")
+	cancel()
+	if err != nil {
+		t.Fatalf("expected nil, got %v", err)
+	}
+}

+ 3 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -61,6 +61,7 @@ var (
 
 	ErrGRPCNoLeader                   = status.New(codes.Unavailable, "etcdserver: no leader").Err()
 	ErrGRPCNotLeader                  = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
+	ErrGRPCLeaderChanged              = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
 	ErrGRPCNotCapable                 = status.New(codes.Unavailable, "etcdserver: not capable").Err()
 	ErrGRPCStopped                    = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
 	ErrGRPCTimeout                    = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
@@ -111,6 +112,7 @@ var (
 
 		ErrorDesc(ErrGRPCNoLeader):                   ErrGRPCNoLeader,
 		ErrorDesc(ErrGRPCNotLeader):                  ErrGRPCNotLeader,
+		ErrorDesc(ErrGRPCLeaderChanged):              ErrGRPCLeaderChanged,
 		ErrorDesc(ErrGRPCNotCapable):                 ErrGRPCNotCapable,
 		ErrorDesc(ErrGRPCStopped):                    ErrGRPCStopped,
 		ErrorDesc(ErrGRPCTimeout):                    ErrGRPCTimeout,
@@ -163,6 +165,7 @@ var (
 
 	ErrNoLeader                   = Error(ErrGRPCNoLeader)
 	ErrNotLeader                  = Error(ErrGRPCNotLeader)
+	ErrLeaderChanged              = Error(ErrGRPCLeaderChanged)
 	ErrNotCapable                 = Error(ErrGRPCNotCapable)
 	ErrStopped                    = Error(ErrGRPCStopped)
 	ErrTimeout                    = Error(ErrGRPCTimeout)

+ 1 - 0
etcdserver/api/v3rpc/util.go

@@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{
 
 	etcdserver.ErrNoLeader:                   rpctypes.ErrGRPCNoLeader,
 	etcdserver.ErrNotLeader:                  rpctypes.ErrGRPCNotLeader,
+	etcdserver.ErrLeaderChanged:              rpctypes.ErrGRPCLeaderChanged,
 	etcdserver.ErrStopped:                    rpctypes.ErrGRPCStopped,
 	etcdserver.ErrTimeout:                    rpctypes.ErrGRPCTimeout,
 	etcdserver.ErrTimeoutDueToLeaderFail:     rpctypes.ErrGRPCTimeoutDueToLeaderFail,

+ 1 - 0
etcdserver/errors.go

@@ -27,6 +27,7 @@ var (
 	ErrTimeoutDueToLeaderFail     = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
 	ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
 	ErrTimeoutLeaderTransfer      = errors.New("etcdserver: request timed out, leader transfer took too long")
+	ErrLeaderChanged              = errors.New("etcdserver: leader changed")
 	ErrNotEnoughStartedMembers    = errors.New("etcdserver: re-configuration failed due to not enough started members")
 	ErrNoLeader                   = errors.New("etcdserver: no leader")
 	ErrNotLeader                  = errors.New("etcdserver: not leader")

+ 7 - 0
etcdserver/metrics.go

@@ -86,6 +86,12 @@ var (
 		Name:      "slow_read_indexes_total",
 		Help:      "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
 	})
+	readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "server",
+		Name:      "read_indexes_failed_total",
+		Help:      "The total number of failed read indexes seen.",
+	})
 	leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{
 		Namespace: "etcd_debugging",
 		Subsystem: "server",
@@ -132,6 +138,7 @@ func init() {
 	prometheus.MustRegister(proposalsPending)
 	prometheus.MustRegister(proposalsFailed)
 	prometheus.MustRegister(slowReadIndex)
+	prometheus.MustRegister(readIndexFailed)
 	prometheus.MustRegister(leaseExpired)
 	prometheus.MustRegister(quotaBackendBytes)
 	prometheus.MustRegister(currentVersion)

+ 6 - 1
etcdserver/server.go

@@ -212,6 +212,8 @@ type EtcdServer struct {
 	stopping chan struct{}
 	// done is closed when all goroutines from start() complete.
 	done chan struct{}
+	// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
+	leaderChanged chan struct{}
 
 	errorc     chan error
 	id         types.ID
@@ -752,6 +754,7 @@ func (s *EtcdServer) start() {
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.readwaitc = make(chan struct{}, 1)
 	s.readNotifier = newNotifier()
+	s.leaderChanged = make(chan struct{}, 1)
 	if s.ClusterVersion() != nil {
 		if lg != nil {
 			lg.Info(
@@ -938,7 +941,9 @@ func (s *EtcdServer) run() {
 					s.compactor.Resume()
 				}
 			}
-
+			if newLeader {
+				s.leaderChanged <- struct{}{}
+			}
 			// TODO: remove the nil checking
 			// current test utility does not provide the stats
 			if s.stats != nil {

+ 8 - 0
etcdserver/v3_server.go

@@ -636,6 +636,8 @@ func (s *EtcdServer) linearizableReadLoop() {
 		binary.BigEndian.PutUint64(ctxToSend, id1)
 
 		select {
+		case <-s.leaderChanged:
+			continue
 		case <-s.readwaitc:
 		case <-s.stopping:
 			return
@@ -660,6 +662,7 @@ func (s *EtcdServer) linearizableReadLoop() {
 			} else {
 				plog.Errorf("failed to get read index from raft: %v", err)
 			}
+			readIndexFailed.Inc()
 			nr.notify(err)
 			continue
 		}
@@ -691,6 +694,11 @@ func (s *EtcdServer) linearizableReadLoop() {
 					}
 					slowReadIndex.Inc()
 				}
+			case <-s.leaderChanged:
+				timeout = true
+				readIndexFailed.Inc()
+				// return a retryable error.
+				nr.notify(ErrLeaderChanged)
 			case <-time.After(s.Cfg.ReqTimeout()):
 				if lg != nil {
 					lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))