Kaynağa Gözat

Merge pull request #8306 from heyitsanthony/v3server-raftreq

etcdserver: consolidate error checking for v3_server functions
Anthony Romano 8 yıl önce
ebeveyn
işleme
8b1177194e
2 değiştirilmiş dosya ile 69 ekleme ve 132 silme
  1. 1 2
      etcdserver/server.go
  2. 68 130
      etcdserver/v3_server.go

+ 1 - 2
etcdserver/server.go

@@ -1366,8 +1366,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 			Action:   pb.AlarmRequest_ACTIVATE,
 			Action:   pb.AlarmRequest_ACTIVATE,
 			Alarm:    pb.AlarmType_NOSPACE,
 			Alarm:    pb.AlarmType_NOSPACE,
 		}
 		}
-		r := pb.InternalRaftRequest{Alarm: a}
-		s.processInternalRaftRequest(s.ctx, r)
+		s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
 		s.w.Trigger(id, ar)
 		s.w.Trigger(id, ar)
 	})
 	})
 }
 }

+ 68 - 130
etcdserver/v3_server.go

@@ -19,6 +19,8 @@ import (
 	"encoding/binary"
 	"encoding/binary"
 	"time"
 	"time"
 
 
+	"github.com/gogo/protobuf/proto"
+
 	"github.com/coreos/etcd/auth"
 	"github.com/coreos/etcd/auth"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/etcdserver/membership"
@@ -99,25 +101,19 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
 }
 }
 
 
 func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.PutResponse), nil
+	return resp.(*pb.PutResponse), nil
 }
 }
 
 
 func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.DeleteRangeResponse), nil
+	return resp.(*pb.DeleteRangeResponse), nil
 }
 }
 
 
 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
@@ -139,14 +135,11 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
 		}
 		}
 		return resp, err
 		return resp, err
 	}
 	}
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.TxnResponse), nil
+	return resp.(*pb.TxnResponse), nil
 }
 }
 
 
 func isTxnSerializable(r *pb.TxnRequest) bool {
 func isTxnSerializable(r *pb.TxnRequest) bool {
@@ -211,25 +204,19 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
 		// only use positive int64 id's
 		// only use positive int64 id's
 		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
 		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
 	}
 	}
-	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
+	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.LeaseGrantResponse), nil
+	return resp.(*pb.LeaseGrantResponse), nil
 }
 }
 
 
 func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
 func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
-	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
+	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.LeaseRevokeResponse), nil
+	return resp.(*pb.LeaseRevokeResponse), nil
 }
 }
 
 
 func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
 func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
@@ -325,46 +312,35 @@ func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error)
 }
 }
 
 
 func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
 func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
-	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
+	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AlarmResponse), nil
+	return resp.(*pb.AlarmResponse), nil
 }
 }
 
 
 func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
 func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
-	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
+	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthEnableResponse), nil
+	return resp.(*pb.AuthEnableResponse), nil
 }
 }
 
 
 func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
 func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthDisableResponse), nil
+	return resp.(*pb.AuthDisableResponse), nil
 }
 }
 
 
 func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
 func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
-	var result *applyResult
-
-	err := s.linearizableReadNotify(ctx)
-	if err != nil {
+	if err := s.linearizableReadNotify(ctx); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	var resp proto.Message
 	for {
 	for {
 		checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
 		checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
 		if err != nil {
 		if err != nil {
@@ -385,166 +361,141 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
 			SimpleToken: st,
 			SimpleToken: st,
 		}
 		}
 
 
-		result, err = s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
+		resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		if result.err != nil {
-			return nil, result.err
-		}
-
-		if checkedRevision != s.AuthStore().Revision() {
-			plog.Infof("revision when password checked is obsolete, retrying")
-			continue
+		if checkedRevision == s.AuthStore().Revision() {
+			break
 		}
 		}
-
-		break
+		plog.Infof("revision when password checked is obsolete, retrying")
 	}
 	}
 
 
-	return result.resp.(*pb.AuthenticateResponse), nil
+	return resp.(*pb.AuthenticateResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
 func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserAddResponse), nil
+	return resp.(*pb.AuthUserAddResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
 func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserDeleteResponse), nil
+	return resp.(*pb.AuthUserDeleteResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
 func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserChangePasswordResponse), nil
+	return resp.(*pb.AuthUserChangePasswordResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
 func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserGrantRoleResponse), nil
+	return resp.(*pb.AuthUserGrantRoleResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
 func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserGetResponse), nil
+	return resp.(*pb.AuthUserGetResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
 func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserListResponse), nil
+	return resp.(*pb.AuthUserListResponse), nil
 }
 }
 
 
 func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
 func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthUserRevokeRoleResponse), nil
+	return resp.(*pb.AuthUserRevokeRoleResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
 func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthRoleAddResponse), nil
+	return resp.(*pb.AuthRoleAddResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
 func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthRoleGrantPermissionResponse), nil
+	return resp.(*pb.AuthRoleGrantPermissionResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
 func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthRoleGetResponse), nil
+	return resp.(*pb.AuthRoleGetResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
 func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthRoleListResponse), nil
+	return resp.(*pb.AuthRoleListResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
 func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if result.err != nil {
-		return nil, result.err
-	}
-	return result.resp.(*pb.AuthRoleRevokePermissionResponse), nil
+	return resp.(*pb.AuthRoleRevokePermissionResponse), nil
 }
 }
 
 
 func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
 func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
-	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
+	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
+	if err != nil {
+		return nil, err
+	}
+	return resp.(*pb.AuthRoleDeleteResponse), nil
+}
+
+func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
+	result, err := s.processInternalRaftRequestOnce(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 	if result.err != nil {
 	if result.err != nil {
 		return nil, result.err
 		return nil, result.err
 	}
 	}
-	return result.resp.(*pb.AuthRoleDeleteResponse), nil
+	return result.resp, nil
+}
+
+func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
+	for {
+		resp, err := s.raftRequestOnce(ctx, r)
+		if err != auth.ErrAuthOldRevision {
+			return resp, err
+		}
+	}
 }
 }
 
 
 // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
 // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
@@ -629,19 +580,6 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
 	}
 	}
 }
 }
 
 
-func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
-	var result *applyResult
-	var err error
-	for {
-		result, err = s.processInternalRaftRequestOnce(ctx, r)
-		if err != auth.ErrAuthOldRevision {
-			break
-		}
-	}
-
-	return result, err
-}
-
 // Watchable returns a watchable interface attached to the etcdserver.
 // Watchable returns a watchable interface attached to the etcdserver.
 func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
 func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }