Browse Source

pkg: create package traceutil for tracing. mvcc: add tracing
steps:range from the in-memory index tree; range from boltdb.
etcdserver: add tracing steps: agreement among raft nodes before
linerized reading; authentication; filter and sort kv pairs; assemble
the response.

yoyinzyc 6 years ago
parent
commit
f4e7fc56a7

+ 16 - 7
etcdserver/apply.go

@@ -26,6 +26,7 @@ import (
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc"
 	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/pkg/types"
 
 	"github.com/gogo/protobuf/proto"
@@ -50,7 +51,7 @@ type applierV3 interface {
 	Apply(r *pb.InternalRaftRequest) *applyResult
 
 	Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
-	Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
+	Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
 	DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
 	Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
 	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
@@ -119,7 +120,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
 	switch {
 	case r.Range != nil:
-		ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
+		ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
 	case r.Put != nil:
 		ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
 	case r.DeleteRange != nil:
@@ -245,12 +246,18 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
 	return resp, nil
 }
 
-func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	trace, ok := ctx.Value("trace").(*traceutil.Trace)
+	if !ok || trace == nil {
+		trace = traceutil.New("Apply Range")
+		ctx = context.WithValue(ctx, "trace", trace)
+	}
+
 	resp := &pb.RangeResponse{}
 	resp.Header = &pb.ResponseHeader{}
 
 	if txn == nil {
-		txn = a.s.kv.Read()
+		txn = a.s.kv.Read(trace)
 		defer txn.End()
 	}
 
@@ -327,7 +334,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
 		rr.KVs = rr.KVs[:r.Limit]
 		resp.More = true
 	}
-
+	trace.Step("Filter and sort the key-value pairs.")
 	resp.Header.Revision = rr.Rev
 	resp.Count = int64(rr.Count)
 	resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
@@ -337,12 +344,14 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
 		}
 		resp.Kvs[i] = &rr.KVs[i]
 	}
+	trace.Step("Assemble the response.")
 	return resp, nil
 }
 
 func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
 	isWrite := !isTxnReadonly(rt)
-	txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
+	trace := traceutil.New("ReadOnlyTxn")
+	txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))
 
 	txnPath := compareToPath(txn, rt)
 	if isWrite {
@@ -516,7 +525,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
 		respi := tresp.Responses[i].Response
 		switch tv := req.Request.(type) {
 		case *pb.RequestOp_RequestRange:
-			resp, err := a.Range(txn, tv.RequestRange)
+			resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
 			if err != nil {
 				if lg != nil {
 					lg.Panic("unexpected error during txn", zap.Error(err))

+ 3 - 2
etcdserver/apply_auth.go

@@ -15,6 +15,7 @@
 package etcdserver
 
 import (
+	"context"
 	"sync"
 
 	"go.etcd.io/etcd/auth"
@@ -83,11 +84,11 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon
 	return aa.applierV3.Put(txn, r)
 }
 
-func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 	if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
 		return nil, err
 	}
-	return aa.applierV3.Range(txn, r)
+	return aa.applierV3.Range(ctx, txn, r)
 }
 
 func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {

+ 1 - 1
etcdserver/corrupt.go

@@ -386,7 +386,7 @@ func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResp
 	return nil, ErrCorrupt
 }
 
-func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
 	return nil, ErrCorrupt
 }
 

+ 9 - 5
etcdserver/util.go

@@ -24,6 +24,7 @@ import (
 	"go.etcd.io/etcd/etcdserver/api/membership"
 	"go.etcd.io/etcd/etcdserver/api/rafthttp"
 	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/pkg/types"
 
 	"go.uber.org/zap"
@@ -108,7 +109,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin
 	if !isNil(respMsg) {
 		resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
 	}
-	warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
+	warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err)
 }
 
 func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
@@ -126,18 +127,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
 		}
 		resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
 	}
-	warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
+	warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err)
 }
 
-func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
+func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
 	var resp string
 	if !isNil(rangeResponse) {
 		resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
 	}
-	warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
+	warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err)
 }
 
-func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
+func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
 	d := time.Since(now)
 	if d > warnApplyDuration {
 		if lg != nil {
@@ -159,6 +160,9 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fm
 			}
 			plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
 		}
+		if trace != nil {
+			trace.Log(lg)
+		}
 		slowApplies.Inc()
 	}
 }

+ 11 - 2
etcdserver/v3_server.go

@@ -26,6 +26,7 @@ import (
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/lease/leasehttp"
 	"go.etcd.io/etcd/mvcc"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/raft"
 
 	"github.com/gogo/protobuf/proto"
@@ -85,14 +86,18 @@ type Authenticator interface {
 }
 
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	trace := traceutil.New("Range")
+	ctx = context.WithValue(ctx, "trace", trace)
+
 	var resp *pb.RangeResponse
 	var err error
 	defer func(start time.Time) {
-		warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
+		warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err)
 	}(time.Now())
 
 	if !r.Serializable {
 		err = s.linearizableReadNotify(ctx)
+		trace.Step("Agreement among raft nodes before linearized reading.")
 		if err != nil {
 			return nil, err
 		}
@@ -101,7 +106,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
 		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
 	}
 
-	get := func() { resp, err = s.applyV3Base.Range(nil, r) }
+	get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
 	if serr := s.doSerialize(ctx, chk, get); serr != nil {
 		err = serr
 		return nil, err
@@ -558,6 +563,10 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
 	if err = chk(ai); err != nil {
 		return err
 	}
+
+	if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil {
+		trace.Step("Authentication.")
+	}
 	// fetch response for serialized request
 	get()
 	// check for stale token revision in case the auth store was updated while

+ 2 - 1
mvcc/kv.go

@@ -18,6 +18,7 @@ import (
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc/backend"
 	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/traceutil"
 )
 
 type RangeOptions struct {
@@ -102,7 +103,7 @@ type KV interface {
 	WriteView
 
 	// Read creates a read transaction.
-	Read() TxnRead
+	Read(trace *traceutil.Trace) TxnRead
 
 	// Write creates a write transaction.
 	Write() TxnWrite

+ 1 - 1
mvcc/kv_test.go

@@ -47,7 +47,7 @@ var (
 		return kv.Range(key, end, ro)
 	}
 	txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
-		txn := kv.Read()
+		txn := kv.Read(nil)
 		defer txn.End()
 		return txn.Range(key, end, ro)
 	}

+ 3 - 3
mvcc/kv_view.go

@@ -19,19 +19,19 @@ import "go.etcd.io/etcd/lease"
 type readView struct{ kv KV }
 
 func (rv *readView) FirstRev() int64 {
-	tr := rv.kv.Read()
+	tr := rv.kv.Read(nil)
 	defer tr.End()
 	return tr.FirstRev()
 }
 
 func (rv *readView) Rev() int64 {
-	tr := rv.kv.Read()
+	tr := rv.kv.Read(nil)
 	defer tr.End()
 	return tr.Rev()
 }
 
 func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
-	tr := rv.kv.Read()
+	tr := rv.kv.Read(nil)
 	defer tr.End()
 	return tr.Range(key, end, ro)
 }

+ 3 - 3
mvcc/kvstore_test.go

@@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 
 	// readTx simulates a long read request
-	readTx1 := s.Read()
+	readTx1 := s.Read(nil)
 
 	// write should not be blocked by reads
 	done := make(chan struct{})
@@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
 	}
 
 	// readTx2 simulates a short read request
-	readTx2 := s.Read()
+	readTx2 := s.Read(nil)
 	ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
 	ret, err := readTx2.Range([]byte("foo"), nil, ro)
 	if err != nil {
@@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
 			mu.Lock()
 			wKVs := make(kvs, len(committedKVs))
 			copy(wKVs, committedKVs)
-			tx := s.Read()
+			tx := s.Read(nil)
 			mu.Unlock()
 			// get all keys in backend store, and compare with wKVs
 			ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})

+ 12 - 3
mvcc/kvstore_txn.go

@@ -18,6 +18,7 @@ import (
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc/backend"
 	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.uber.org/zap"
 )
 
@@ -27,9 +28,11 @@ type storeTxnRead struct {
 
 	firstRev int64
 	rev      int64
+
+	trace *traceutil.Trace
 }
 
-func (s *store) Read() TxnRead {
+func (s *store) Read(trace *traceutil.Trace) TxnRead {
 	s.mu.RLock()
 	s.revMu.RLock()
 	// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
@@ -38,7 +41,7 @@ func (s *store) Read() TxnRead {
 	tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
 	firstRev, rev := s.compactMainRev, s.currentRev
 	s.revMu.RUnlock()
-	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
+	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
 }
 
 func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
@@ -66,7 +69,7 @@ func (s *store) Write() TxnWrite {
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tw := &storeTxnWrite{
-		storeTxnRead: storeTxnRead{s, tx, 0, 0},
+		storeTxnRead: storeTxnRead{s, tx, 0, 0, nil},
 		tx:           tx,
 		beginRev:     s.currentRev,
 		changes:      make([]mvccpb.KeyValue, 0, 4),
@@ -124,6 +127,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
 	}
 
 	revpairs := tr.s.kvindex.Revisions(key, end, rev)
+	if tr.trace != nil {
+		tr.trace.Step("Range keys from in-memory index tree.")
+	}
 	if len(revpairs) == 0 {
 		return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
 	}
@@ -163,6 +169,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
 			}
 		}
 	}
+	if tr.trace != nil {
+		tr.trace.Step("Range keys from bolt db.")
+	}
 	return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
 }
 

+ 60 - 0
pkg/traceutil/trace.go

@@ -0,0 +1,60 @@
+package traceutil
+
+import (
+	"bytes"
+	"fmt"
+	"time"
+
+	"github.com/coreos/pkg/capnslog"
+	"go.uber.org/zap"
+)
+
+var (
+	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace")
+)
+
+type Trace struct {
+	operation string
+	startTime time.Time
+	steps     []step
+}
+
+type step struct {
+	time time.Time
+	msg  string
+}
+
+func New(op string) *Trace {
+	return &Trace{operation: op, startTime: time.Now()}
+}
+
+func (t *Trace) Step(msg string) {
+	t.steps = append(t.steps, step{time: time.Now(), msg: msg})
+}
+
+// Dump all steps in the Trace
+func (t *Trace) Log(lg *zap.Logger) {
+
+	var buf bytes.Buffer
+
+	buf.WriteString(fmt.Sprintf("The tracing of %v request:\n", t.operation))
+
+	buf.WriteString("Request started at:")
+	buf.WriteString(t.startTime.Format("2006-01-02 15:04:05"))
+	buf.WriteString(fmt.Sprintf(".%06d", t.startTime.Nanosecond()/1000))
+	buf.WriteString("\n")
+	lastStepTime := t.startTime
+	for i, step := range t.steps {
+		buf.WriteString(fmt.Sprintf("Step %d: %v Time cost: %v\n", i, step.msg, step.time.Sub(lastStepTime)))
+		//fmt.Println(step.msg, " costs: ", step.time.Sub(lastStepTime))
+		lastStepTime = step.time
+	}
+	buf.WriteString("Trace End\n")
+
+	s := buf.String()
+	if lg != nil {
+		lg.Info(s)
+	} else {
+		plog.Info(s)
+	}
+}

+ 28 - 0
pkg/traceutil/trace_test.go

@@ -0,0 +1,28 @@
+package traceutil
+
+import (
+	"testing"
+)
+
+func TestTrace(t *testing.T) {
+	var (
+		op    = "Test"
+		steps = []string{"Step1, Step2"}
+	)
+
+	trace := New(op)
+	if trace.operation != op {
+		t.Errorf("Expected %v, got %v\n", op, trace.operation)
+	}
+
+	for _, v := range steps {
+		trace.Step(v)
+		trace.Step(v)
+	}
+
+	for i, v := range steps {
+		if v != trace.steps[i].msg {
+			t.Errorf("Expected %v, got %v\n.", v, trace.steps[i].msg)
+		}
+	}
+}