Browse Source

etcdserver: add put request steps.
mvcc: add put request steps; add trace to KV.Write() as input parameter.

yoyinzyc 6 years ago
parent
commit
401df4bb8e

+ 2 - 1
clientv3/snapshot/v3_snapshot.go

@@ -39,6 +39,7 @@ import (
 	"go.etcd.io/etcd/mvcc"
 	"go.etcd.io/etcd/mvcc/backend"
 	"go.etcd.io/etcd/pkg/fileutil"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/pkg/types"
 	"go.etcd.io/etcd/raft"
 	"go.etcd.io/etcd/raft/raftpb"
@@ -384,7 +385,7 @@ func (s *v3Manager) saveDB() error {
 	lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
 
 	mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
-	txn := mvs.Write()
+	txn := mvs.Write(traceutil.TODO())
 	btx := be.BatchTx()
 	del := func(k, v []byte) error {
 		txn.DeleteRange(k, nil)

+ 13 - 4
etcdserver/apply.go

@@ -179,7 +179,14 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
 	resp = &pb.PutResponse{}
 	resp.Header = &pb.ResponseHeader{}
-
+	trace := traceutil.New("put",
+		a.s.getLogger(),
+		traceutil.Field{Key: "key", Value: string(p.Key)},
+		traceutil.Field{Key: "value", Value: string(p.Value)},
+	)
+	defer func() {
+		trace.LogIfLong(warnApplyDuration)
+	}()
 	val, leaseID := p.Value, lease.LeaseID(p.Lease)
 	if txn == nil {
 		if leaseID != lease.NoLease {
@@ -187,16 +194,18 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
 				return nil, lease.ErrLeaseNotFound
 			}
 		}
-		txn = a.s.KV().Write()
+		txn = a.s.KV().Write(trace)
 		defer txn.End()
 	}
 
 	var rr *mvcc.RangeResult
 	if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
+		trace.StepBegin()
 		rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
 		if err != nil {
 			return nil, err
 		}
+		trace.StepEnd("get previous kv pair")
 	}
 	if p.IgnoreValue || p.IgnoreLease {
 		if rr == nil || len(rr.KVs) == 0 {
@@ -226,7 +235,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
 	end := mkGteRange(dr.RangeEnd)
 
 	if txn == nil {
-		txn = a.s.kv.Write()
+		txn = a.s.kv.Write(traceutil.TODO())
 		defer txn.End()
 	}
 
@@ -369,7 +378,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
 	// be the revision of the write txn.
 	if isWrite {
 		txn.End()
-		txn = a.s.KV().Write()
+		txn = a.s.KV().Write(traceutil.TODO())
 	}
 	a.applyTxn(txn, rt, txnPath, txnResp)
 	rev := txn.Rev()

+ 2 - 1
etcdserver/server.go

@@ -50,6 +50,7 @@ import (
 	"go.etcd.io/etcd/pkg/pbutil"
 	"go.etcd.io/etcd/pkg/runtime"
 	"go.etcd.io/etcd/pkg/schedule"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.etcd.io/etcd/pkg/types"
 	"go.etcd.io/etcd/pkg/wait"
 	"go.etcd.io/etcd/raft"
@@ -1178,7 +1179,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			plog.Info("recovering lessor...")
 		}
 
-		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
+		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
 
 		if lg != nil {
 			lg.Info("restored lease store")

+ 1 - 1
mvcc/kv.go

@@ -106,7 +106,7 @@ type KV interface {
 	Read(trace *traceutil.Trace) TxnRead
 
 	// Write creates a write transaction.
-	Write() TxnWrite
+	Write(trace *traceutil.Trace) TxnWrite
 
 	// Hash computes the hash of the KV's backend.
 	Hash() (hash uint32, revision int64, err error)

+ 5 - 5
mvcc/kv_test.go

@@ -57,7 +57,7 @@ var (
 		return kv.Put(key, value, lease)
 	}
 	txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
-		txn := kv.Write()
+		txn := kv.Write(traceutil.TODO())
 		defer txn.End()
 		return txn.Put(key, value, lease)
 	}
@@ -66,7 +66,7 @@ var (
 		return kv.DeleteRange(key, end)
 	}
 	txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
-		txn := kv.Write()
+		txn := kv.Write(traceutil.TODO())
 		defer txn.End()
 		return txn.DeleteRange(key, end)
 	}
@@ -410,7 +410,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {
 		func() { s.DeleteRange([]byte("foo"), nil) },
 	}
 	for i, tt := range tests {
-		txn := s.Write()
+		txn := s.Write(traceutil.TODO())
 		done := make(chan struct{}, 1)
 		go func() {
 			tt()
@@ -439,7 +439,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
 	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 
-	txn := s.Write()
+	txn := s.Write(traceutil.TODO())
 	defer txn.End()
 
 	donec := make(chan struct{})
@@ -461,7 +461,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
-		txn := s.Write()
+		txn := s.Write(traceutil.TODO())
 		base := int64(i + 1)
 
 		// put foo

+ 2 - 2
mvcc/kv_view.go

@@ -42,13 +42,13 @@ func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err
 type writeView struct{ kv KV }
 
 func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
-	tw := wv.kv.Write()
+	tw := wv.kv.Write(traceutil.TODO())
 	defer tw.End()
 	return tw.DeleteRange(key, end)
 }
 
 func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
-	tw := wv.kv.Write()
+	tw := wv.kv.Write(traceutil.TODO())
 	defer tw.End()
 	return tw.Put(key, value, lease)
 }

+ 2 - 1
mvcc/kvstore.go

@@ -29,6 +29,7 @@ import (
 	"go.etcd.io/etcd/mvcc/backend"
 	"go.etcd.io/etcd/mvcc/mvccpb"
 	"go.etcd.io/etcd/pkg/schedule"
+	"go.etcd.io/etcd/pkg/traceutil"
 
 	"github.com/coreos/pkg/capnslog"
 	"go.uber.org/zap"
@@ -140,7 +141,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
 	s.ReadView = &readView{s}
 	s.WriteView = &writeView{s}
 	if s.le != nil {
-		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
 	}
 
 	tx := s.b.BatchTx()

+ 3 - 2
mvcc/kvstore_bench_test.go

@@ -20,6 +20,7 @@ import (
 
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/pkg/traceutil"
 
 	"go.uber.org/zap"
 )
@@ -130,7 +131,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 	for i := 0; i < b.N; i++ {
-		txn := s.Write()
+		txn := s.Write(traceutil.TODO())
 		txn.Put(keys[i], vals[i], lease.NoLease)
 		txn.End()
 	}
@@ -151,7 +152,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 
 	for i := 0; i < b.N; i++ {
 		for j := 0; j < revsPerKey; j++ {
-			txn := s.Write()
+			txn := s.Write(traceutil.TODO())
 			txn.Put(keys[i], vals[i], lease.NoLease)
 			txn.End()
 		}

+ 2 - 2
mvcc/kvstore_test.go

@@ -640,7 +640,7 @@ func TestTxnPut(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
-		txn := s.Write()
+		txn := s.Write(traceutil.TODO())
 		base := int64(i + 2)
 		if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
@@ -731,7 +731,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
 			defer wg.Done()
 			time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
 
-			tx := s.Write()
+			tx := s.Write(traceutil.TODO())
 			numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
 			var pendingKvs kvs
 			for j := 0; j < numOfPuts; j++ {

+ 6 - 3
mvcc/kvstore_txn.go

@@ -64,12 +64,12 @@ type storeTxnWrite struct {
 	changes  []mvccpb.KeyValue
 }
 
-func (s *store) Write() TxnWrite {
+func (s *store) Write(trace *traceutil.Trace) TxnWrite {
 	s.mu.RLock()
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tw := &storeTxnWrite{
-		storeTxnRead: storeTxnRead{s, tx, 0, 0, traceutil.TODO()},
+		storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
 		tx:           tx,
 		beginRev:     s.currentRev,
 		changes:      make([]mvccpb.KeyValue, 0, 4),
@@ -183,7 +183,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
 		c = created.main
 		oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
 	}
-
+	tw.trace.Step("get key's previous created_revision and leaseID")
 	ibytes := newRevBytes()
 	idxRev := revision{main: rev, sub: int64(len(tw.changes))}
 	revToBytes(idxRev, ibytes)
@@ -210,9 +210,11 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
 		}
 	}
 
+	tw.trace.Step("marshal mvccpb.KeyValue")
 	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
 	tw.s.kvindex.Put(key, idxRev)
 	tw.changes = append(tw.changes, kv)
+	tw.trace.Step("store kv pair into bolt db")
 
 	if oldLease != lease.NoLease {
 		if tw.s.le == nil {
@@ -239,6 +241,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
 			panic("unexpected error from lease Attach")
 		}
 	}
+	tw.trace.Step("attach lease to kv pair")
 }
 
 func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {

+ 2 - 1
mvcc/watchable_store.go

@@ -21,6 +21,7 @@ import (
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc/backend"
 	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/traceutil"
 	"go.uber.org/zap"
 )
 
@@ -84,7 +85,7 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
 	s.store.WriteView = &writeView{s}
 	if s.le != nil {
 		// use this store as the deleter so revokes trigger watch events
-		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
 	}
 	s.wg.Add(2)
 	go s.syncWatchersLoop()

+ 2 - 1
mvcc/watchable_store_bench_test.go

@@ -21,6 +21,7 @@ import (
 
 	"go.etcd.io/etcd/lease"
 	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/pkg/traceutil"
 
 	"go.uber.org/zap"
 )
@@ -59,7 +60,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 	for i := 0; i < b.N; i++ {
-		txn := s.Write()
+		txn := s.Write(traceutil.TODO())
 		txn.Put(keys[i], vals[i], lease.NoLease)
 		txn.End()
 	}

+ 7 - 2
mvcc/watchable_store_txn.go

@@ -14,7 +14,10 @@
 
 package mvcc
 
-import "go.etcd.io/etcd/mvcc/mvccpb"
+import (
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/traceutil"
+)
 
 func (tw *watchableStoreTxnWrite) End() {
 	changes := tw.Changes()
@@ -48,4 +51,6 @@ type watchableStoreTxnWrite struct {
 	s *watchableStore
 }
 
-func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }
+func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
+	return &watchableStoreTxnWrite{s.store.Write(trace), s}
+}

+ 13 - 1
pkg/traceutil/trace.go

@@ -56,6 +56,7 @@ type Trace struct {
 	fields    []Field
 	startTime time.Time
 	steps     []step
+	inStep    bool
 }
 
 type step struct {
@@ -81,7 +82,18 @@ func Get(ctx context.Context) *Trace {
 }
 
 func (t *Trace) Step(msg string, fields ...Field) {
-	t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})
+	if !t.inStep {
+		t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})
+	}
+}
+
+func (t *Trace) StepBegin() {
+	t.inStep = true
+}
+
+func (t *Trace) StepEnd(msg string, fields ...Field) {
+	t.inStep = false
+	t.Step(msg, fields...)
 }
 
 func (t *Trace) AddField(fields ...Field) {