Browse Source

etcdserver: learner return Unavailable for unsupported RPC

Make learner return code.Unavailable when the request is not supported
by learner. Client balancer will retry a different endpoint.
Jingyi Hu 6 years ago
parent
commit
d0c1b3fa38

+ 46 - 0
clientv3/integration/kv_test.go

@@ -1051,3 +1051,49 @@ func TestKVForLearner(t *testing.T) {
 		}
 	}
 }
+
+// TestBalancerSupportLearner verifies that balancer's retry and failover mechanism supports cluster with learner member
+func TestBalancerSupportLearner(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	// we have to add and launch learner member after initial cluster was created, because
+	// bootstrapping a cluster with learner member is not supported.
+	clus.AddAndLaunchLearnerMember(t)
+
+	learners, err := clus.GetLearnerMembers()
+	if err != nil {
+		t.Fatalf("failed to get the learner members in cluster: %v", err)
+	}
+	if len(learners) != 1 {
+		t.Fatalf("added 1 learner to cluster, got %d", len(learners))
+	}
+
+	// clus.Members[3] is the newly added learner member, which was appended to clus.Members
+	learnerEp := clus.Members[3].GRPCAddr()
+	cfg := clientv3.Config{
+		Endpoints:   []string{learnerEp},
+		DialTimeout: 5 * time.Second,
+		DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	}
+	cli, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatalf("failed to create clientv3: %v", err)
+	}
+	defer cli.Close()
+
+	// wait until learner member is ready
+	<-clus.Members[3].ReadyNotify()
+
+	if _, err := cli.Get(context.Background(), "foo"); err == nil {
+		t.Fatalf("expect Get request to learner to fail, got no error")
+	}
+
+	eps := []string{learnerEp, clus.Members[0].GRPCAddr()}
+	cli.SetEndpoints(eps...)
+	if _, err := cli.Get(context.Background(), "foo"); err != nil {
+		t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err)
+	}
+}

+ 0 - 1
etcdserver/api/v3rpc/interceptor.go

@@ -48,7 +48,6 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 			return nil, rpctypes.ErrGRPCNotCapable
 		}
 
-		// TODO: add test in clientv3/integration to verify behavior
 		if s.IsLearner() && !isRPCSupportedForLearner(req) {
 			return nil, rpctypes.ErrGPRCNotSupportedForLearner
 		}

+ 2 - 1
etcdserver/api/v3rpc/rpctypes/error.go

@@ -71,7 +71,7 @@ var (
 	ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
 	ErrGRPCUnhealthy                  = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
 	ErrGRPCCorrupt                    = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
-	ErrGPRCNotSupportedForLearner     = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err()
+	ErrGPRCNotSupportedForLearner     = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
 	ErrGRPCBadLeaderTransferee        = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
 
 	errStringToError = map[string]error{
@@ -126,6 +126,7 @@ var (
 		ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
 		ErrorDesc(ErrGRPCUnhealthy):                  ErrGRPCUnhealthy,
 		ErrorDesc(ErrGRPCCorrupt):                    ErrGRPCCorrupt,
+		ErrorDesc(ErrGPRCNotSupportedForLearner):     ErrGPRCNotSupportedForLearner,
 		ErrorDesc(ErrGRPCBadLeaderTransferee):        ErrGRPCBadLeaderTransferee,
 	}
 )

+ 4 - 0
integration/cluster.go

@@ -1166,6 +1166,10 @@ func (m *member) RecoverPartition(t testing.TB, others ...*member) {
 	}
 }
 
+func (m *member) ReadyNotify() <-chan struct{} {
+	return m.s.ReadyNotify()
+}
+
 func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
 	cfgtls := transport.TLSInfo{}
 	if tls != nil {