Bläddra i källkod

etcdserver: use linearizableReadNotify for txn

Xiang Li 9 år sedan
förälder
incheckning
ea0c65797a
1 ändrade filer med 49 tillägg och 1 borttagningar
  1. 49 1
      etcdserver/v3_server.go

+ 49 - 1
etcdserver/v3_server.go

@@ -165,6 +165,41 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
 }
 
 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+	// TODO: remove this checking when we release etcd 3.2
+	if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
+		return s.legacyTxn(ctx, r)
+	}
+
+	if isTxnReadonly(r) {
+		if !isTxnSerializable(r) {
+			err := s.linearizableReadNotify(ctx)
+			if err != nil {
+				return nil, err
+			}
+		}
+		var resp *pb.TxnResponse
+		var err error
+		chk := func(ai *auth.AuthInfo) error {
+			return checkTxnAuth(s.authStore, ai, r)
+		}
+		get := func() { resp, err = s.applyV3Base.Txn(r) }
+		if serr := s.doSerialize(ctx, chk, get); serr != nil {
+			return nil, serr
+		}
+		return resp, err
+	}
+	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
+	if err != nil {
+		return nil, err
+	}
+	if result.err != nil {
+		return nil, result.err
+	}
+	return result.resp.(*pb.TxnResponse), nil
+}
+
+// TODO: remove this func when we release etcd 3.2
+func (s *EtcdServer) legacyTxn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 	if isTxnSerializable(r) {
 		var resp *pb.TxnResponse
 		var err error
@@ -177,7 +212,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
 		}
 		return resp, err
 	}
-	// TODO: readonly Txn do not need to go through raft
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
 	if err != nil {
 		return nil, err
@@ -202,6 +236,20 @@ func isTxnSerializable(r *pb.TxnRequest) bool {
 	return true
 }
 
+func isTxnReadonly(r *pb.TxnRequest) bool {
+	for _, u := range r.Success {
+		if r := u.GetRequestRange(); r == nil {
+			return false
+		}
+	}
+	for _, u := range r.Failure {
+		if r := u.GetRequestRange(); r == nil {
+			return false
+		}
+	}
+	return true
+}
+
 func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
 	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
 	if r.Physical && result != nil && result.physc != nil {