Browse Source

Merge pull request #6054 from heyitsanthony/serialize-refactor

etcdserver: apply serialized requests outside auth apply lock
Anthony Romano 9 years ago
parent
commit
4d309f0cb7
3 changed files with 76 additions and 84 deletions
  1. 18 13
      etcdserver/apply_auth.go
  2. 11 7
      etcdserver/server.go
  3. 47 64
      etcdserver/v3_server.go

+ 18 - 13
etcdserver/apply_auth.go

@@ -92,7 +92,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
 	return aa.applierV3.DeleteRange(txnID, r)
 	return aa.applierV3.DeleteRange(txnID, r)
 }
 }
 
 
-func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
+func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error {
 	for _, requ := range reqs {
 	for _, requ := range reqs {
 		switch tv := requ.Request.(type) {
 		switch tv := requ.Request.(type) {
 		case *pb.RequestOp_RequestRange:
 		case *pb.RequestOp_RequestRange:
@@ -100,7 +100,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
 				continue
 				continue
 			}
 			}
 
 
-			if err := aa.as.IsRangePermitted(&aa.authInfo, tv.RequestRange.Key, tv.RequestRange.RangeEnd); err != nil {
+			if err := as.IsRangePermitted(ai, tv.RequestRange.Key, tv.RequestRange.RangeEnd); err != nil {
 				return err
 				return err
 			}
 			}
 
 
@@ -109,7 +109,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
 				continue
 				continue
 			}
 			}
 
 
-			if err := aa.as.IsPutPermitted(&aa.authInfo, tv.RequestPut.Key); err != nil {
+			if err := as.IsPutPermitted(ai, tv.RequestPut.Key); err != nil {
 				return err
 				return err
 			}
 			}
 
 
@@ -119,13 +119,13 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
 			}
 			}
 
 
 			if tv.RequestDeleteRange.PrevKv {
 			if tv.RequestDeleteRange.PrevKv {
-				err := aa.as.IsRangePermitted(&aa.authInfo, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
+				err := as.IsRangePermitted(ai, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
 				if err != nil {
 				if err != nil {
 					return err
 					return err
 				}
 				}
 			}
 			}
 
 
-			err := aa.as.IsDeleteRangePermitted(&aa.authInfo, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
+			err := as.IsDeleteRangePermitted(ai, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
@@ -135,20 +135,25 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
 	return nil
 	return nil
 }
 }
 
 
-func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
+func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error {
 	for _, c := range rt.Compare {
 	for _, c := range rt.Compare {
-		if err := aa.as.IsRangePermitted(&aa.authInfo, c.Key, nil); err != nil {
-			return nil, err
+		if err := as.IsRangePermitted(ai, c.Key, nil); err != nil {
+			return err
 		}
 		}
 	}
 	}
-
-	if err := aa.checkTxnReqsPermission(rt.Success); err != nil {
-		return nil, err
+	if err := checkTxnReqsPermission(as, ai, rt.Success); err != nil {
+		return err
 	}
 	}
-	if err := aa.checkTxnReqsPermission(rt.Failure); err != nil {
-		return nil, err
+	if err := checkTxnReqsPermission(as, ai, rt.Failure); err != nil {
+		return err
 	}
 	}
+	return nil
+}
 
 
+func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
+	if err := checkTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
+		return nil, err
+	}
 	return aa.applierV3.Txn(rt)
 	return aa.applierV3.Txn(rt)
 }
 }
 
 

+ 11 - 7
etcdserver/server.go

@@ -181,13 +181,16 @@ type EtcdServer struct {
 
 
 	applyV2 ApplierV2
 	applyV2 ApplierV2
 
 
-	applyV3    applierV3
-	kv         mvcc.ConsistentWatchableKV
-	lessor     lease.Lessor
-	bemu       sync.Mutex
-	be         backend.Backend
-	authStore  auth.AuthStore
-	alarmStore *alarm.AlarmStore
+	// applyV3 is the applier with auth and quotas
+	applyV3 applierV3
+	// applyV3Base is the core applier without auth or quotas
+	applyV3Base applierV3
+	kv          mvcc.ConsistentWatchableKV
+	lessor      lease.Lessor
+	bemu        sync.Mutex
+	be          backend.Backend
+	authStore   auth.AuthStore
+	alarmStore  *alarm.AlarmStore
 
 
 	stats  *stats.ServerStats
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 	lstats *stats.LeaderStats
@@ -405,6 +408,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 		srv.compactor.Run()
 		srv.compactor.Run()
 	}
 	}
 
 
+	srv.applyV3Base = &applierV3backend{srv}
 	if err = srv.restoreAlarms(); err != nil {
 	if err = srv.restoreAlarms(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 47 - 64
etcdserver/v3_server.go

@@ -84,41 +84,19 @@ type Authenticator interface {
 }
 }
 
 
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
-	var result *applyResult
-	var err error
-
 	if r.Serializable {
 	if r.Serializable {
-		for {
-			authInfo, err := s.authInfoFromCtx(ctx)
-			if err != nil {
-				return nil, err
-			}
-
-			hdr := &pb.RequestHeader{}
-			if authInfo != nil {
-				hdr.Username = authInfo.Username
-				hdr.AuthRevision = authInfo.Revision
-			}
-
-			result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Range: r})
-
-			if result.err != nil {
-				if result.err == auth.ErrAuthOldRevision {
-					continue
-				}
-				break
-			}
-
-			if authInfo == nil || authInfo.Revision == s.authStore.Revision() {
-				break
-			}
-
-			// The revision that authorized this request is obsolete.
-			// For avoiding TOCTOU problem, retry of the request is required.
+		var resp *pb.RangeResponse
+		var err error
+		chk := func(ai *auth.AuthInfo) error {
+			return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
 		}
 		}
-	} else {
-		result, err = s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
+		get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
+		if serr := s.doSerialize(ctx, chk, get); serr != nil {
+			return nil, serr
+		}
+		return resp, err
 	}
 	}
+	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -151,41 +129,19 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
 }
 }
 
 
 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
-	var result *applyResult
-	var err error
-
 	if isTxnSerializable(r) {
 	if isTxnSerializable(r) {
-		for {
-			authInfo, err := s.authInfoFromCtx(ctx)
-			if err != nil {
-				return nil, err
-			}
-
-			hdr := &pb.RequestHeader{}
-			if authInfo != nil {
-				hdr.Username = authInfo.Username
-				hdr.AuthRevision = authInfo.Revision
-			}
-
-			result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Txn: r})
-
-			if result.err != nil {
-				if result.err == auth.ErrAuthOldRevision {
-					continue
-				}
-				break
-			}
-
-			if authInfo == nil || authInfo.Revision == s.authStore.Revision() {
-				break
-			}
-
-			// The revision that authorized this request is obsolete.
-			// For avoiding TOCTOU problem, retry of this request is required.
+		var resp *pb.TxnResponse
+		var err error
+		chk := func(ai *auth.AuthInfo) error {
+			return checkTxnAuth(s.authStore, ai, r)
 		}
 		}
-	} else {
-		result, err = s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
+		get := func() { resp, err = s.applyV3Base.Txn(r) }
+		if serr := s.doSerialize(ctx, chk, get); serr != nil {
+			return nil, serr
+		}
+		return resp, err
 	}
 	}
+	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -551,6 +507,33 @@ func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
 	return authInfo, nil
 	return authInfo, nil
 }
 }
 
 
+// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
+func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
+	for {
+		ai, err := s.authInfoFromCtx(ctx)
+		if err != nil {
+			return err
+		}
+		if ai == nil {
+			// chk expects non-nil AuthInfo; use empty credentials
+			ai = &auth.AuthInfo{}
+		}
+		if err = chk(ai); err != nil {
+			if err == auth.ErrAuthOldRevision {
+				continue
+			}
+			return err
+		}
+		// fetch response for serialized request
+		get()
+		//  empty credentials or current auth info means no need to retry
+		if ai.Revision == 0 || ai.Revision == s.authStore.Revision() {
+			return nil
+		}
+		// avoid TOCTOU error, retry of the request is required.
+	}
+}
+
 func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
 func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
 	ai := s.getAppliedIndex()
 	ai := s.getAppliedIndex()
 	ci := s.getCommittedIndex()
 	ci := s.getCommittedIndex()