소스 검색

etcdserver: add --max-txn-ops flag

--max-txn-ops allows users to define the maximum transaction operations
for each txn request. it defaults at 128.

Fixes #7826
fanmin shi 8 년 전
부모
커밋
ae7ddfb483
8개의 변경된 파일20개의 추가작업 그리고 10개의 파일을 삭제
  1. 1 1
      clientv3/integration/txn_test.go
  2. 3 0
      embed/config.go
  3. 1 0
      embed/etcd.go
  4. 1 0
      etcdmain/config.go
  5. 2 0
      etcdmain/help.go
  6. 10 8
      etcdserver/api/v3rpc/key.go
  7. 1 0
      etcdserver/config.go
  8. 1 1
      integration/v3_grpc_test.go

+ 1 - 1
clientv3/integration/txn_test.go

@@ -41,7 +41,7 @@ func TestTxnError(t *testing.T) {
 		t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err)
 		t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err)
 	}
 	}
 
 
-	ops := make([]clientv3.Op, v3rpc.MaxOpsPerTxn+10)
+	ops := make([]clientv3.Op, v3rpc.MaxTxnOps+10)
 	for i := range ops {
 	for i := range ops {
 		ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "")
 		ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "")
 	}
 	}

+ 3 - 0
embed/config.go

@@ -40,6 +40,7 @@ const (
 	DefaultName         = "default"
 	DefaultName         = "default"
 	DefaultMaxSnapshots = 5
 	DefaultMaxSnapshots = 5
 	DefaultMaxWALs      = 5
 	DefaultMaxWALs      = 5
+	DefaultMaxTxnOps    = uint(128)
 
 
 	DefaultListenPeerURLs   = "http://localhost:2380"
 	DefaultListenPeerURLs   = "http://localhost:2380"
 	DefaultListenClientURLs = "http://localhost:2379"
 	DefaultListenClientURLs = "http://localhost:2379"
@@ -85,6 +86,7 @@ type Config struct {
 	TickMs            uint  `json:"heartbeat-interval"`
 	TickMs            uint  `json:"heartbeat-interval"`
 	ElectionMs        uint  `json:"election-timeout"`
 	ElectionMs        uint  `json:"election-timeout"`
 	QuotaBackendBytes int64 `json:"quota-backend-bytes"`
 	QuotaBackendBytes int64 `json:"quota-backend-bytes"`
+	MaxTxnOps         uint  `json:"max-txn-ops"`
 
 
 	// clustering
 	// clustering
 
 
@@ -172,6 +174,7 @@ func NewConfig() *Config {
 		MaxWalFiles:         DefaultMaxWALs,
 		MaxWalFiles:         DefaultMaxWALs,
 		Name:                DefaultName,
 		Name:                DefaultName,
 		SnapCount:           etcdserver.DefaultSnapCount,
 		SnapCount:           etcdserver.DefaultSnapCount,
+		MaxTxnOps:           DefaultMaxTxnOps,
 		TickMs:              100,
 		TickMs:              100,
 		ElectionMs:          1000,
 		ElectionMs:          1000,
 		LPUrls:              []url.URL{*lpurl},
 		LPUrls:              []url.URL{*lpurl},

+ 1 - 0
embed/etcd.go

@@ -139,6 +139,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		ElectionTicks:           cfg.ElectionTicks(),
 		ElectionTicks:           cfg.ElectionTicks(),
 		AutoCompactionRetention: cfg.AutoCompactionRetention,
 		AutoCompactionRetention: cfg.AutoCompactionRetention,
 		QuotaBackendBytes:       cfg.QuotaBackendBytes,
 		QuotaBackendBytes:       cfg.QuotaBackendBytes,
+		MaxTxnOps:               cfg.MaxTxnOps,
 		StrictReconfigCheck:     cfg.StrictReconfigCheck,
 		StrictReconfigCheck:     cfg.StrictReconfigCheck,
 		ClientCertAuthEnabled:   cfg.ClientTLSInfo.ClientCertAuth,
 		ClientCertAuthEnabled:   cfg.ClientTLSInfo.ClientCertAuth,
 		AuthToken:               cfg.AuthToken,
 		AuthToken:               cfg.AuthToken,

+ 1 - 0
etcdmain/config.go

@@ -138,6 +138,7 @@ func newConfig() *config {
 	fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
 	fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
 	fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
 	fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
 	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
 	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
+	fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum operations per txn that etcd server allows; defaults to 128.")
 
 
 	// clustering
 	// clustering
 	fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
 	fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")

+ 2 - 0
etcdmain/help.go

@@ -66,6 +66,8 @@ member flags:
 		comma-separated whitelist of origins for CORS (cross-origin resource sharing).
 		comma-separated whitelist of origins for CORS (cross-origin resource sharing).
 	--quota-backend-bytes '0'
 	--quota-backend-bytes '0'
 		raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
 		raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
+	--max-txn-ops '128' 
+		maximum operations per txn that etcd server allows; defaults to 128.
 
 
 clustering flags:
 clustering flags:
 
 

+ 10 - 8
etcdserver/api/v3rpc/key.go

@@ -27,19 +27,20 @@ import (
 
 
 var (
 var (
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc")
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc")
-
-	// Max operations per txn list. For example, Txn.Success can have at most 128 operations,
-	// and Txn.Failure can have at most 128 operations.
-	MaxOpsPerTxn = 128
 )
 )
 
 
 type kvServer struct {
 type kvServer struct {
 	hdr header
 	hdr header
 	kv  etcdserver.RaftKV
 	kv  etcdserver.RaftKV
+	// maxTxnOps is the max operations per txn.
+	// e.g suppose maxTxnOps = 128.
+	// Txn.Success can have at most 128 operations,
+	// and Txn.Failure can have at most 128 operations.
+	maxTxnOps uint
 }
 }
 
 
 func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
 func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
-	return &kvServer{hdr: newHeader(s), kv: s}
+	return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
 }
 }
 
 
 func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
@@ -94,7 +95,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*
 }
 }
 
 
 func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
-	if err := checkTxnRequest(r); err != nil {
+	if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -150,8 +151,9 @@ func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
 	return nil
 	return nil
 }
 }
 
 
-func checkTxnRequest(r *pb.TxnRequest) error {
-	if len(r.Compare) > MaxOpsPerTxn || len(r.Success) > MaxOpsPerTxn || len(r.Failure) > MaxOpsPerTxn {
+func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
+	plog.Infof("maxTxnOps %v", maxTxnOps)
+	if len(r.Compare) > maxTxnOps || len(r.Success) > maxTxnOps || len(r.Failure) > maxTxnOps {
 		return rpctypes.ErrGRPCTooManyOps
 		return rpctypes.ErrGRPCTooManyOps
 	}
 	}
 
 

+ 1 - 0
etcdserver/config.go

@@ -54,6 +54,7 @@ type ServerConfig struct {
 
 
 	AutoCompactionRetention int
 	AutoCompactionRetention int
 	QuotaBackendBytes       int64
 	QuotaBackendBytes       int64
+	MaxTxnOps               uint
 
 
 	StrictReconfigCheck bool
 	StrictReconfigCheck bool
 
 

+ 1 - 1
integration/v3_grpc_test.go

@@ -201,7 +201,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
 		txn := &pb.TxnRequest{}
 		txn := &pb.TxnRequest{}
-		for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
+		for j := 0; j < v3rpc.MaxTxnOps+1; j++ {
 			tt(txn)
 			tt(txn)
 		}
 		}