소스 검색

Merge pull request #9821 from jpbetz/automated-cherry-pick-of-#9288-origin-release-3.2

Automated cherry pick of detailed "took too long" warnings to release-3.2
Gyuho Lee 7 년 전
부모
커밋
ba233791e1
6개의 변경된 파일261개의 추가작업 그리고 16개의 파일을 삭제
  1. 3 0
      etcdserver/apply.go
  2. 4 2
      etcdserver/apply_v2.go
  3. 179 0
      etcdserver/etcdserverpb/raft_internal_stringer.go
  4. 1 7
      etcdserver/server.go
  5. 57 0
      etcdserver/util.go
  6. 17 7
      etcdserver/v3_server.go

+ 3 - 0
etcdserver/apply.go

@@ -89,6 +89,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
 
 func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	ar := &applyResult{}
+	defer func(start time.Time) {
+		warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
+	}(time.Now())
 
 	// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
 	switch {

+ 4 - 2
etcdserver/apply_v2.go

@@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
 	return Response{}
 }
 
-// applyV2Request interprets r as a call to store.X and returns a Response interpreted
-// from store.Event
+// applyV2Request interprets r as a call to v2store.X
+// and returns a Response interpreted from v2store.Event
 func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
+	defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
 	toTTLOptions(r)
+
 	switch r.Method {
 	case "POST":
 		return s.applyV2.Post(r)

+ 179 - 0
etcdserver/etcdserverpb/raft_internal_stringer.go

@@ -0,0 +1,179 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserverpb
+
+import (
+	"fmt"
+	"strings"
+
+	proto "github.com/golang/protobuf/proto"
+)
+
+// InternalRaftStringer implements custom proto Stringer:
+// redact password, replace value fields with value_size fields.
+type InternalRaftStringer struct {
+	Request *InternalRaftRequest
+}
+
+func (as *InternalRaftStringer) String() string {
+	switch {
+	case as.Request.LeaseGrant != nil:
+		return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
+			as.Request.Header.String(),
+			as.Request.LeaseGrant.TTL,
+			as.Request.LeaseGrant.ID,
+		)
+	case as.Request.LeaseRevoke != nil:
+		return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
+			as.Request.Header.String(),
+			as.Request.LeaseRevoke.ID,
+		)
+	case as.Request.Authenticate != nil:
+		return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
+			as.Request.Header.String(),
+			as.Request.Authenticate.Name,
+			as.Request.Authenticate.SimpleToken,
+		)
+	case as.Request.AuthUserAdd != nil:
+		return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
+			as.Request.Header.String(),
+			as.Request.AuthUserAdd.Name,
+		)
+	case as.Request.AuthUserChangePassword != nil:
+		return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
+			as.Request.Header.String(),
+			as.Request.AuthUserChangePassword.Name,
+		)
+	case as.Request.Put != nil:
+		return fmt.Sprintf("header:<%s> put:<%s>",
+			as.Request.Header.String(),
+			newLoggablePutRequest(as.Request.Put).String(),
+		)
+	case as.Request.Txn != nil:
+		return fmt.Sprintf("header:<%s> txn:<%s>",
+			as.Request.Header.String(),
+			NewLoggableTxnRequest(as.Request.Txn).String(),
+		)
+	default:
+		// nothing to redact
+	}
+	return as.Request.String()
+}
+
+// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
+// fields in any nested txn and put operations.
+type txnRequestStringer struct {
+	Request *TxnRequest
+}
+
+func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
+	return &txnRequestStringer{request}
+}
+
+func (as *txnRequestStringer) String() string {
+	var compare []string
+	for _, c := range as.Request.Compare {
+		switch cv := c.TargetUnion.(type) {
+		case *Compare_Value:
+			compare = append(compare, newLoggableValueCompare(c, cv).String())
+		default:
+			// nothing to redact
+			compare = append(compare, c.String())
+		}
+	}
+	var success []string
+	for _, s := range as.Request.Success {
+		success = append(success, newLoggableRequestOp(s).String())
+	}
+	var failure []string
+	for _, f := range as.Request.Failure {
+		failure = append(failure, newLoggableRequestOp(f).String())
+	}
+	return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
+		strings.Join(compare, " "),
+		strings.Join(success, " "),
+		strings.Join(failure, " "),
+	)
+}
+
+// requestOpStringer implements a custom proto String to replace value bytes fields with value
+// size fields in any nested txn and put operations.
+type requestOpStringer struct {
+	Op *RequestOp
+}
+
+func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
+	return &requestOpStringer{op}
+}
+
+func (as *requestOpStringer) String() string {
+	switch op := as.Op.Request.(type) {
+	case *RequestOp_RequestPut:
+		return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
+	default:
+		// nothing to redact
+	}
+	return as.Op.String()
+}
+
+// loggableValueCompare implements a custom proto String for Compare.Value union member types to
+// replace the value bytes field with a value size field.
+// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
+type loggableValueCompare struct {
+	Result    Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
+	Target    Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
+	Key       []byte                `protobuf:"bytes,3,opt,name=key,proto3"`
+	ValueSize int                   `protobuf:"bytes,7,opt,name=value_size,proto3"`
+}
+
+func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
+	return &loggableValueCompare{
+		c.Result,
+		c.Target,
+		c.Key,
+		len(cv.Value),
+	}
+}
+
+func (m *loggableValueCompare) Reset()         { *m = loggableValueCompare{} }
+func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
+func (*loggableValueCompare) ProtoMessage()    {}
+
+// loggablePutRequest implements a custom proto String to replace value bytes field with a value
+// size field.
+// To preserve proto encoding of the key bytes, a faked out proto type is used here.
+type loggablePutRequest struct {
+	Key         []byte `protobuf:"bytes,1,opt,name=key,proto3"`
+	ValueSize   int    `protobuf:"varint,2,opt,name=value_size,proto3"`
+	Lease       int64  `protobuf:"varint,3,opt,name=lease,proto3"`
+	PrevKv      bool   `protobuf:"varint,4,opt,name=prev_kv,proto3"`
+	IgnoreValue bool   `protobuf:"varint,5,opt,name=ignore_value,proto3"`
+	IgnoreLease bool   `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
+}
+
+func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
+	return &loggablePutRequest{
+		request.Key,
+		len(request.Value),
+		request.Lease,
+		request.PrevKv,
+		request.IgnoreValue,
+		request.IgnoreLease,
+	}
+}
+
+func (m *loggablePutRequest) Reset()         { *m = loggablePutRequest{} }
+func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
+func (*loggablePutRequest) ProtoMessage()    {}

+ 1 - 7
etcdserver/server.go

@@ -818,14 +818,8 @@ func (s *EtcdServer) run() {
 
 func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
 	s.applySnapshot(ep, apply)
-	st := time.Now()
 	s.applyEntries(ep, apply)
-	d := time.Since(st)
-	entriesNum := len(apply.entries)
-	if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
-		plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
-		plog.Warningf("avoid queries with large range/delete range!")
-	}
+
 	proposalsApplied.Set(float64(ep.appliedi))
 	s.applyWait.Trigger(ep.appliedi)
 	// wait for the raft routine to finish the disk writes before triggering a

+ 57 - 0
etcdserver/util.go

@@ -15,11 +15,16 @@
 package etcdserver
 
 import (
+	"fmt"
+	"reflect"
+	"strings"
 	"time"
 
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
+	"github.com/golang/protobuf/proto"
 )
 
 // isConnectedToQuorumSince checks whether the local member is connected to the
@@ -95,3 +100,55 @@ func (nc *notifier) notify(err error) {
 	nc.err = err
 	close(nc.c)
 }
+
+func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
+	var resp string
+	if !isNil(respMsg) {
+		resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
+	}
+	warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
+}
+
+func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
+	reqStringer := pb.NewLoggableTxnRequest(r)
+	var resp string
+	if !isNil(txnResponse) {
+		var resps []string
+		for _, r := range txnResponse.Responses {
+			switch op := r.Response.(type) {
+			case *pb.ResponseOp_ResponseRange:
+				resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
+			default:
+				// only range responses should be in a read only txn request
+			}
+		}
+		resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
+	}
+	warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
+}
+
+func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
+	var resp string
+	if !isNil(rangeResponse) {
+		resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
+	}
+	warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
+}
+
+func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
+	// TODO: add metrics
+	d := time.Since(now)
+	if d > warnApplyDuration {
+		var result string
+		if err != nil {
+			result = fmt.Sprintf("error:%v", err)
+		} else {
+			result = resp
+		}
+		plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
+	}
+}
+
+func isNil(msg proto.Message) bool {
+	return msg == nil || reflect.ValueOf(msg).IsNil()
+}

+ 17 - 7
etcdserver/v3_server.go

@@ -19,8 +19,6 @@ import (
 	"encoding/binary"
 	"time"
 
-	"github.com/gogo/protobuf/proto"
-
 	"github.com/coreos/etcd/auth"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
@@ -28,7 +26,7 @@ import (
 	"github.com/coreos/etcd/lease/leasehttp"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/raft"
-
+	"github.com/gogo/protobuf/proto"
 	"golang.org/x/net/context"
 )
 
@@ -82,20 +80,26 @@ type Authenticator interface {
 }
 
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	var resp *pb.RangeResponse
+	var err error
+	defer func(start time.Time) {
+		warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
+	}(time.Now())
+
 	if !r.Serializable {
-		err := s.linearizableReadNotify(ctx)
+		err = s.linearizableReadNotify(ctx)
 		if err != nil {
 			return nil, err
 		}
 	}
-	var resp *pb.RangeResponse
-	var err error
 	chk := func(ai *auth.AuthInfo) error {
 		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
 	}
+
 	get := func() { resp, err = s.applyV3Base.Range(nil, r) }
 	if serr := s.doSerialize(ctx, chk, get); serr != nil {
-		return nil, serr
+		err = serr
+		return nil, err
 	}
 	return resp, err
 }
@@ -129,12 +133,18 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
 		chk := func(ai *auth.AuthInfo) error {
 			return checkTxnAuth(s.authStore, ai, r)
 		}
+
+		defer func(start time.Time) {
+			warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
+		}(time.Now())
+
 		get := func() { resp, err = s.applyV3Base.Txn(r) }
 		if serr := s.doSerialize(ctx, chk, get); serr != nil {
 			return nil, serr
 		}
 		return resp, err
 	}
+
 	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
 	if err != nil {
 		return nil, err