|
|
@@ -196,11 +196,11 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
|
|
|
case r.Range != nil:
|
|
|
ar.resp, ar.err = applyRange(noTxn, kv, r.Range)
|
|
|
case r.Put != nil:
|
|
|
- ar.resp, ar.err = applyPut(noTxn, kv, r.Put)
|
|
|
+ ar.resp, ar.err = applyPut(noTxn, kv, le, r.Put)
|
|
|
case r.DeleteRange != nil:
|
|
|
ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange)
|
|
|
case r.Txn != nil:
|
|
|
- ar.resp, ar.err = applyTxn(kv, r.Txn)
|
|
|
+ ar.resp, ar.err = applyTxn(kv, le, r.Txn)
|
|
|
case r.Compaction != nil:
|
|
|
ar.resp, ar.err = applyCompaction(kv, r.Compaction)
|
|
|
case r.LeaseCreate != nil:
|
|
|
@@ -214,7 +214,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
|
|
|
return ar
|
|
|
}
|
|
|
|
|
|
-func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, error) {
|
|
|
+func applyPut(txnID int64, kv dstorage.KV, le lease.Lessor, p *pb.PutRequest) (*pb.PutResponse, error) {
|
|
|
resp := &pb.PutResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
var (
|
|
|
@@ -227,7 +227,13 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
|
|
|
return nil, err
|
|
|
}
|
|
|
} else {
|
|
|
- rev = kv.Put(p.Key, p.Value, lease.LeaseID(p.Lease))
|
|
|
+ leaseID := lease.LeaseID(p.Lease)
|
|
|
+ if leaseID != lease.NoLease {
|
|
|
+ if l := le.Lookup(leaseID); l == nil {
|
|
|
+ return nil, lease.ErrLeaseNotFound
|
|
|
+ }
|
|
|
+ }
|
|
|
+ rev = kv.Put(p.Key, p.Value, leaseID)
|
|
|
}
|
|
|
resp.Header.Revision = rev
|
|
|
return resp, nil
|
|
|
@@ -360,7 +366,20 @@ func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (*
|
|
|
return resp, nil
|
|
|
}
|
|
|
|
|
|
-func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
+func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error {
|
|
|
+ for _, requ := range reqs {
|
|
|
+ preq := requ.RequestPut
|
|
|
+ if preq == nil || lease.LeaseID(preq.Lease) == lease.NoLease {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if l := le.Lookup(lease.LeaseID(preq.Lease)); l == nil {
|
|
|
+ return lease.ErrLeaseNotFound
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
var revision int64
|
|
|
|
|
|
txnID := kv.TxnBegin()
|
|
|
@@ -387,6 +406,10 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
reqs = rt.Failure
|
|
|
}
|
|
|
|
|
|
+ if err := checkRequestLeases(le, reqs); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
resps := make([]*pb.ResponseUnion, len(reqs))
|
|
|
for i := range reqs {
|
|
|
resps[i] = applyUnion(txnID, kv, reqs[i])
|
|
|
@@ -425,7 +448,7 @@ func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.Respons
|
|
|
}
|
|
|
return &pb.ResponseUnion{ResponseRange: resp}
|
|
|
case union.RequestPut != nil:
|
|
|
- resp, err := applyPut(txnID, kv, union.RequestPut)
|
|
|
+ resp, err := applyPut(txnID, kv, nil, union.RequestPut)
|
|
|
if err != nil {
|
|
|
panic("unexpected error during txn")
|
|
|
}
|