|
|
@@ -380,26 +380,33 @@ func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
- var revision int64
|
|
|
+func checkRequestRange(kv dstorage.KV, reqs []*pb.RequestUnion) error {
|
|
|
+ for _, requ := range reqs {
|
|
|
+ greq := requ.RequestRange
|
|
|
+ if greq == nil || greq.Revision == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
|
|
|
- txnID := kv.TxnBegin()
|
|
|
- defer func() {
|
|
|
- err := kv.TxnEnd(txnID)
|
|
|
- if err != nil {
|
|
|
- panic(fmt.Sprint("unexpected error when closing txn", txnID))
|
|
|
+ if greq.Revision > kv.Rev() {
|
|
|
+ return dstorage.ErrFutureRev
|
|
|
}
|
|
|
- }()
|
|
|
+ if greq.Revision < kv.FirstRev() {
|
|
|
+ return dstorage.ErrCompacted
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
+ var revision int64
|
|
|
|
|
|
ok := true
|
|
|
for _, c := range rt.Compare {
|
|
|
- if revision, ok = applyCompare(txnID, kv, c); !ok {
|
|
|
+ if revision, ok = applyCompare(kv, c); !ok {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // TODO: check potential errors before actually applying anything
|
|
|
-
|
|
|
var reqs []*pb.RequestUnion
|
|
|
if ok {
|
|
|
reqs = rt.Success
|
|
|
@@ -410,6 +417,19 @@ func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnRespon
|
|
|
if err := checkRequestLeases(le, reqs); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+ if err := checkRequestRange(kv, reqs); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // When executing the operations of txn, we need to hold the txn lock.
|
|
|
+ // So the reader will not see any intermediate results.
|
|
|
+ txnID := kv.TxnBegin()
|
|
|
+ defer func() {
|
|
|
+ err := kv.TxnEnd(txnID)
|
|
|
+ if err != nil {
|
|
|
+ panic(fmt.Sprint("unexpected error when closing txn", txnID))
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
resps := make([]*pb.ResponseUnion, len(reqs))
|
|
|
for i := range reqs {
|
|
|
@@ -467,15 +487,10 @@ func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.Respons
|
|
|
}
|
|
|
|
|
|
// applyCompare applies the compare request.
|
|
|
-// applyCompare should only be called within a txn request and an valid txn ID must
|
|
|
-// be presented. Or applyCompare panics.
|
|
|
// It returns the revision at which the comparison happens. If the comparison
|
|
|
// succeeds, the it returns true. Otherwise it returns false.
|
|
|
-func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
|
|
|
- if txnID == noTxn {
|
|
|
- panic("applyCompare called with noTxn")
|
|
|
- }
|
|
|
- ckvs, rev, err := kv.TxnRange(txnID, c.Key, nil, 1, 0)
|
|
|
+func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) {
|
|
|
+ ckvs, rev, err := kv.Range(c.Key, nil, 1, 0)
|
|
|
if err != nil {
|
|
|
if err == dstorage.ErrTxnIDMismatch {
|
|
|
panic("unexpected txn ID mismatch error")
|