|
@@ -26,6 +26,7 @@ import (
|
|
|
"go.etcd.io/etcd/lease"
|
|
"go.etcd.io/etcd/lease"
|
|
|
"go.etcd.io/etcd/mvcc"
|
|
"go.etcd.io/etcd/mvcc"
|
|
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
|
|
|
|
+ "go.etcd.io/etcd/pkg/traceutil"
|
|
|
"go.etcd.io/etcd/pkg/types"
|
|
"go.etcd.io/etcd/pkg/types"
|
|
|
|
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/gogo/protobuf/proto"
|
|
@@ -43,17 +44,18 @@ type applyResult struct {
|
|
|
// to being logically reflected by the node. Currently only used for
|
|
// to being logically reflected by the node. Currently only used for
|
|
|
// Compaction requests.
|
|
// Compaction requests.
|
|
|
physc <-chan struct{}
|
|
physc <-chan struct{}
|
|
|
|
|
+ trace *traceutil.Trace
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// applierV3 is the interface for processing V3 raft messages
|
|
// applierV3 is the interface for processing V3 raft messages
|
|
|
type applierV3 interface {
|
|
type applierV3 interface {
|
|
|
Apply(r *pb.InternalRaftRequest) *applyResult
|
|
Apply(r *pb.InternalRaftRequest) *applyResult
|
|
|
|
|
|
|
|
- Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
|
|
|
|
|
- Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
|
|
|
|
|
+ Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
|
|
|
|
+ Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
|
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
|
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
|
|
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
|
|
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
|
|
|
- Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
|
|
|
|
|
|
|
+ Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
|
|
|
|
|
|
|
|
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
|
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
|
|
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
|
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
|
@@ -119,15 +121,15 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|
|
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
|
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
|
|
switch {
|
|
switch {
|
|
|
case r.Range != nil:
|
|
case r.Range != nil:
|
|
|
- ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
|
|
|
|
|
|
|
+ ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
|
|
|
case r.Put != nil:
|
|
case r.Put != nil:
|
|
|
- ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
|
|
|
|
|
|
|
+ ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put)
|
|
|
case r.DeleteRange != nil:
|
|
case r.DeleteRange != nil:
|
|
|
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
|
|
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
|
|
|
case r.Txn != nil:
|
|
case r.Txn != nil:
|
|
|
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
|
|
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
|
|
|
case r.Compaction != nil:
|
|
case r.Compaction != nil:
|
|
|
- ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
|
|
|
|
|
|
|
+ ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
|
|
|
case r.LeaseGrant != nil:
|
|
case r.LeaseGrant != nil:
|
|
|
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
|
|
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
|
|
|
case r.LeaseRevoke != nil:
|
|
case r.LeaseRevoke != nil:
|
|
@@ -174,32 +176,39 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|
|
return ar
|
|
return ar
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
|
|
|
|
|
|
|
+func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
|
|
resp = &pb.PutResponse{}
|
|
resp = &pb.PutResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
-
|
|
|
|
|
|
|
+ trace = traceutil.New("put",
|
|
|
|
|
+ a.s.getLogger(),
|
|
|
|
|
+ traceutil.Field{Key: "key", Value: string(p.Key)},
|
|
|
|
|
+ traceutil.Field{Key: "req_size", Value: proto.Size(p)},
|
|
|
|
|
+ )
|
|
|
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
|
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
|
|
if txn == nil {
|
|
if txn == nil {
|
|
|
if leaseID != lease.NoLease {
|
|
if leaseID != lease.NoLease {
|
|
|
if l := a.s.lessor.Lookup(leaseID); l == nil {
|
|
if l := a.s.lessor.Lookup(leaseID); l == nil {
|
|
|
- return nil, lease.ErrLeaseNotFound
|
|
|
|
|
|
|
+ return nil, nil, lease.ErrLeaseNotFound
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- txn = a.s.KV().Write()
|
|
|
|
|
|
|
+ txn = a.s.KV().Write(trace)
|
|
|
defer txn.End()
|
|
defer txn.End()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var rr *mvcc.RangeResult
|
|
var rr *mvcc.RangeResult
|
|
|
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
|
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
|
|
|
|
+ trace.DisableStep()
|
|
|
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
|
|
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ return nil, nil, err
|
|
|
}
|
|
}
|
|
|
|
|
+ trace.EnableStep()
|
|
|
|
|
+ trace.Step("get previous kv pair")
|
|
|
}
|
|
}
|
|
|
if p.IgnoreValue || p.IgnoreLease {
|
|
if p.IgnoreValue || p.IgnoreLease {
|
|
|
if rr == nil || len(rr.KVs) == 0 {
|
|
if rr == nil || len(rr.KVs) == 0 {
|
|
|
// ignore_{lease,value} flag expects previous key-value pair
|
|
// ignore_{lease,value} flag expects previous key-value pair
|
|
|
- return nil, ErrKeyNotFound
|
|
|
|
|
|
|
+ return nil, nil, ErrKeyNotFound
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if p.IgnoreValue {
|
|
if p.IgnoreValue {
|
|
@@ -215,7 +224,8 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
|
|
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
|
|
|
- return resp, nil
|
|
|
|
|
|
|
+ trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
|
|
|
|
|
+ return resp, trace, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
|
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
|
@@ -224,7 +234,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
|
|
end := mkGteRange(dr.RangeEnd)
|
|
end := mkGteRange(dr.RangeEnd)
|
|
|
|
|
|
|
|
if txn == nil {
|
|
if txn == nil {
|
|
|
- txn = a.s.kv.Write()
|
|
|
|
|
|
|
+ txn = a.s.kv.Write(traceutil.TODO())
|
|
|
defer txn.End()
|
|
defer txn.End()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -245,12 +255,14 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
|
|
return resp, nil
|
|
return resp, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
|
|
|
|
|
+func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
|
|
|
+ trace := traceutil.Get(ctx)
|
|
|
|
|
+
|
|
|
resp := &pb.RangeResponse{}
|
|
resp := &pb.RangeResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
|
|
|
|
|
if txn == nil {
|
|
if txn == nil {
|
|
|
- txn = a.s.kv.Read()
|
|
|
|
|
|
|
+ txn = a.s.kv.Read(trace)
|
|
|
defer txn.End()
|
|
defer txn.End()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -327,7 +339,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
|
|
|
rr.KVs = rr.KVs[:r.Limit]
|
|
rr.KVs = rr.KVs[:r.Limit]
|
|
|
resp.More = true
|
|
resp.More = true
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ trace.Step("filter and sort the key-value pairs")
|
|
|
resp.Header.Revision = rr.Rev
|
|
resp.Header.Revision = rr.Rev
|
|
|
resp.Count = int64(rr.Count)
|
|
resp.Count = int64(rr.Count)
|
|
|
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
|
|
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
|
|
@@ -337,12 +349,13 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
|
|
|
}
|
|
}
|
|
|
resp.Kvs[i] = &rr.KVs[i]
|
|
resp.Kvs[i] = &rr.KVs[i]
|
|
|
}
|
|
}
|
|
|
|
|
+ trace.Step("assemble the response")
|
|
|
return resp, nil
|
|
return resp, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
isWrite := !isTxnReadonly(rt)
|
|
isWrite := !isTxnReadonly(rt)
|
|
|
- txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
|
|
|
|
|
|
|
+ txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO()))
|
|
|
|
|
|
|
|
txnPath := compareToPath(txn, rt)
|
|
txnPath := compareToPath(txn, rt)
|
|
|
if isWrite {
|
|
if isWrite {
|
|
@@ -364,7 +377,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
// be the revision of the write txn.
|
|
// be the revision of the write txn.
|
|
|
if isWrite {
|
|
if isWrite {
|
|
|
txn.End()
|
|
txn.End()
|
|
|
- txn = a.s.KV().Write()
|
|
|
|
|
|
|
+ txn = a.s.KV().Write(traceutil.TODO())
|
|
|
}
|
|
}
|
|
|
a.applyTxn(txn, rt, txnPath, txnResp)
|
|
a.applyTxn(txn, rt, txnPath, txnResp)
|
|
|
rev := txn.Rev()
|
|
rev := txn.Rev()
|
|
@@ -516,7 +529,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|
|
respi := tresp.Responses[i].Response
|
|
respi := tresp.Responses[i].Response
|
|
|
switch tv := req.Request.(type) {
|
|
switch tv := req.Request.(type) {
|
|
|
case *pb.RequestOp_RequestRange:
|
|
case *pb.RequestOp_RequestRange:
|
|
|
- resp, err := a.Range(txn, tv.RequestRange)
|
|
|
|
|
|
|
+ resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
if lg != nil {
|
|
if lg != nil {
|
|
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
|
@@ -526,7 +539,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|
|
}
|
|
}
|
|
|
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
|
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
|
|
case *pb.RequestOp_RequestPut:
|
|
case *pb.RequestOp_RequestPut:
|
|
|
- resp, err := a.Put(txn, tv.RequestPut)
|
|
|
|
|
|
|
+ resp, _, err := a.Put(txn, tv.RequestPut)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
if lg != nil {
|
|
if lg != nil {
|
|
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
|
lg.Panic("unexpected error during txn", zap.Error(err))
|
|
@@ -557,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
|
|
|
return txns
|
|
return txns
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
|
|
|
|
|
|
|
+func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
|
|
|
resp := &pb.CompactionResponse{}
|
|
resp := &pb.CompactionResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
- ch, err := a.s.KV().Compact(compaction.Revision)
|
|
|
|
|
|
|
+ trace := traceutil.New("compact",
|
|
|
|
|
+ a.s.getLogger(),
|
|
|
|
|
+ traceutil.Field{Key: "revision", Value: compaction.Revision},
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ ch, err := a.s.KV().Compact(trace, compaction.Revision)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, ch, err
|
|
|
|
|
|
|
+ return nil, ch, nil, err
|
|
|
}
|
|
}
|
|
|
// get the current revision. which key to get is not important.
|
|
// get the current revision. which key to get is not important.
|
|
|
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
|
|
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
|
|
|
resp.Header.Revision = rr.Rev
|
|
resp.Header.Revision = rr.Rev
|
|
|
- return resp, ch, err
|
|
|
|
|
|
|
+ return resp, ch, trace, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
|
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
|
@@ -674,8 +692,8 @@ type applierV3Capped struct {
|
|
|
// with Puts so that the number of keys in the store is capped.
|
|
// with Puts so that the number of keys in the store is capped.
|
|
|
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
|
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
|
|
|
|
|
|
|
-func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
|
|
|
|
|
- return nil, ErrNoSpace
|
|
|
|
|
|
|
+func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
|
|
|
|
+ return nil, nil, ErrNoSpace
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
@@ -824,13 +842,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
|
|
|
return "aApplierV3{app, NewBackendQuota(s, "v3-applier")}
|
|
return "aApplierV3{app, NewBackendQuota(s, "v3-applier")}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
|
|
|
|
|
|
|
+func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
|
|
ok := a.q.Available(p)
|
|
ok := a.q.Available(p)
|
|
|
- resp, err := a.applierV3.Put(txn, p)
|
|
|
|
|
|
|
+ resp, trace, err := a.applierV3.Put(txn, p)
|
|
|
if err == nil && !ok {
|
|
if err == nil && !ok {
|
|
|
err = ErrNoSpace
|
|
err = ErrNoSpace
|
|
|
}
|
|
}
|
|
|
- return resp, err
|
|
|
|
|
|
|
+ return resp, trace, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|