Prechádzať zdrojové kódy

etcdserver: trace raft requests.

yoyinzyc 6 rokov pred
rodič
commit
3a3eb24c69

+ 16 - 19
etcdserver/apply.go

@@ -34,8 +34,7 @@ import (
 )
 
 const (
-	warnApplyDuration   = 100 * time.Millisecond
-	rangeTraceThreshold = 100 * time.Millisecond
+	warnApplyDuration = 100 * time.Millisecond
 )
 
 type applyResult struct {
@@ -45,13 +44,14 @@ type applyResult struct {
 	// to being logically reflected by the node. Currently only used for
 	// Compaction requests.
 	physc <-chan struct{}
+	trace *traceutil.Trace
 }
 
 // applierV3 is the interface for processing V3 raft messages
 type applierV3 interface {
 	Apply(r *pb.InternalRaftRequest) *applyResult
 
-	Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
+	Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
 	Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
 	DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
 	Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
@@ -123,7 +123,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	case r.Range != nil:
 		ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
 	case r.Put != nil:
-		ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
+		ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put)
 	case r.DeleteRange != nil:
 		ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
 	case r.Txn != nil:
@@ -176,22 +176,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	return ar
 }
 
-func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
+func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
 	resp = &pb.PutResponse{}
 	resp.Header = &pb.ResponseHeader{}
-	trace := traceutil.New("put",
+	trace = traceutil.New("put",
 		a.s.getLogger(),
 		traceutil.Field{Key: "key", Value: string(p.Key)},
 		traceutil.Field{Key: "value", Value: string(p.Value)},
 	)
-	defer func() {
-		trace.LogIfLong(warnApplyDuration)
-	}()
 	val, leaseID := p.Value, lease.LeaseID(p.Lease)
 	if txn == nil {
 		if leaseID != lease.NoLease {
 			if l := a.s.lessor.Lookup(leaseID); l == nil {
-				return nil, lease.ErrLeaseNotFound
+				return nil, nil, lease.ErrLeaseNotFound
 			}
 		}
 		txn = a.s.KV().Write(trace)
@@ -203,14 +200,14 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
 		trace.StepBegin()
 		rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
 		if err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 		trace.StepEnd("get previous kv pair")
 	}
 	if p.IgnoreValue || p.IgnoreLease {
 		if rr == nil || len(rr.KVs) == 0 {
 			// ignore_{lease,value} flag expects previous key-value pair
-			return nil, ErrKeyNotFound
+			return nil, nil, ErrKeyNotFound
 		}
 	}
 	if p.IgnoreValue {
@@ -226,7 +223,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
 	}
 
 	resp.Header.Revision = txn.Put(p.Key, val, leaseID)
-	return resp, nil
+	return resp, trace, nil
 }
 
 func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
@@ -540,7 +537,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
 			}
 			respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
 		case *pb.RequestOp_RequestPut:
-			resp, err := a.Put(txn, tv.RequestPut)
+			resp, _, err := a.Put(txn, tv.RequestPut)
 			if err != nil {
 				if lg != nil {
 					lg.Panic("unexpected error during txn", zap.Error(err))
@@ -688,8 +685,8 @@ type applierV3Capped struct {
 // with Puts so that the number of keys in the store is capped.
 func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
 
-func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
-	return nil, ErrNoSpace
+func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
+	return nil, nil, ErrNoSpace
 }
 
 func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
@@ -838,13 +835,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
 	return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
 }
 
-func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
+func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
 	ok := a.q.Available(p)
-	resp, err := a.applierV3.Put(txn, p)
+	resp, trace, err := a.applierV3.Put(txn, p)
 	if err == nil && !ok {
 		err = ErrNoSpace
 	}
-	return resp, err
+	return resp, trace, err
 }
 
 func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {

+ 5 - 4
etcdserver/apply_auth.go

@@ -22,6 +22,7 @@ import (
 	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc"
+	"go.etcd.io/etcd/pkg/traceutil"
 )
 
 type authApplierV3 struct {
@@ -62,9 +63,9 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
 	return ret
 }
 
-func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) {
+func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
 	if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil {
@@ -72,13 +73,13 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon
 		// be written by this user. It means the user cannot revoke the
 		// lease so attaching the lease to the newly written key should
 		// be forbidden.
-		return nil, err
+		return nil, nil, err
 	}
 
 	if r.PrevKv {
 		err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil)
 		if err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 	}
 	return aa.applierV3.Put(txn, r)

+ 3 - 2
etcdserver/corrupt.go

@@ -23,6 +23,7 @@ import (
 	"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/mvcc"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/pkg/types"
 
 	"go.uber.org/zap"
@@ -382,8 +383,8 @@ type applierV3Corrupt struct {
 
 func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
 
-func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
-	return nil, ErrCorrupt
+func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
+	return nil, nil, ErrCorrupt
 }
 
 func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {

+ 8 - 0
etcdserver/v3_server.go

@@ -39,6 +39,8 @@ const (
 	// However, if the committed entries are very heavy to apply, the gap might grow.
 	// We should stop accepting new proposals if the gap growing to a certain point.
 	maxGapBetweenApplyAndCommitIndex = 5000
+	rangeTraceThreshold              = 100 * time.Millisecond
+	putTraceThreshold                = 100 * time.Millisecond
 )
 
 type RaftKV interface {
@@ -126,6 +128,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
 }
 
 func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+	ctx = context.WithValue(ctx, "time", time.Now())
 	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
 	if err != nil {
 		return nil, err
@@ -549,6 +552,11 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque
 	if result.err != nil {
 		return nil, result.err
 	}
+	if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil {
+		applyStart := result.trace.ResetStartTime(startTime)
+		result.trace.InsertStep(0, applyStart, "process raft request")
+		result.trace.LogIfLong(putTraceThreshold)
+	}
 	return result.resp, nil
 }
 

+ 15 - 0
pkg/traceutil/trace.go

@@ -81,6 +81,21 @@ func Get(ctx context.Context) *Trace {
 	return TODO()
 }
 
+func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) {
+	prev = t.startTime
+	t.startTime = time
+	return prev
+}
+
+func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) {
+	newStep := step{time, msg, fields}
+	if at < len(t.steps) {
+		t.steps = append(t.steps[:at+1], t.steps[at:]...)
+		t.steps[at] = newStep
+	} else {
+		t.steps = append(t.steps, newStep)
+	}
+}
 func (t *Trace) Step(msg string, fields ...Field) {
 	if !t.inStep {
 		t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})