Prechádzať zdrojové kódy

etcdserver: check invalid range in txn

Xiang Li 10 rokov pred
rodič
commit
128b5e7387

+ 33 - 18
etcdserver/v3demo_server.go

@@ -380,26 +380,33 @@ func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error {
 	return nil
 }
 
-func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
-	var revision int64
+func checkRequestRange(kv dstorage.KV, reqs []*pb.RequestUnion) error {
+	for _, requ := range reqs {
+		greq := requ.RequestRange
+		if greq == nil || greq.Revision == 0 {
+			continue
+		}
 
-	txnID := kv.TxnBegin()
-	defer func() {
-		err := kv.TxnEnd(txnID)
-		if err != nil {
-			panic(fmt.Sprint("unexpected error when closing txn", txnID))
+		if greq.Revision > kv.Rev() {
+			return dstorage.ErrFutureRev
 		}
-	}()
+		if greq.Revision < kv.FirstRev() {
+			return dstorage.ErrCompacted
+		}
+	}
+	return nil
+}
+
+func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) {
+	var revision int64
 
 	ok := true
 	for _, c := range rt.Compare {
-		if revision, ok = applyCompare(txnID, kv, c); !ok {
+		if revision, ok = applyCompare(kv, c); !ok {
 			break
 		}
 	}
 
-	// TODO: check potential errors before actually applying anything
-
 	var reqs []*pb.RequestUnion
 	if ok {
 		reqs = rt.Success
@@ -410,6 +417,19 @@ func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnRespon
 	if err := checkRequestLeases(le, reqs); err != nil {
 		return nil, err
 	}
+	if err := checkRequestRange(kv, reqs); err != nil {
+		return nil, err
+	}
+
+	// When executing the operations of txn, we need to hold the txn lock.
+	// So the reader will not see any intermediate results.
+	txnID := kv.TxnBegin()
+	defer func() {
+		err := kv.TxnEnd(txnID)
+		if err != nil {
+			panic(fmt.Sprint("unexpected error when closing txn", txnID))
+		}
+	}()
 
 	resps := make([]*pb.ResponseUnion, len(reqs))
 	for i := range reqs {
@@ -467,15 +487,10 @@ func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.Respons
 }
 
 // applyCompare applies the compare request.
-// applyCompare should only be called within a txn request and an valid txn ID must
-// be presented. Or applyCompare panics.
 // It returns the revision at which the comparison happens. If the comparison
 // succeeds, the it returns true. Otherwise it returns false.
-func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
-	if txnID == noTxn {
-		panic("applyCompare called with noTxn")
-	}
-	ckvs, rev, err := kv.TxnRange(txnID, c.Key, nil, 1, 0)
+func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) {
+	ckvs, rev, err := kv.Range(c.Key, nil, 1, 0)
 	if err != nil {
 		if err == dstorage.ErrTxnIDMismatch {
 			panic("unexpected txn ID mismatch error")

+ 39 - 0
integration/v3_grpc_test.go

@@ -25,6 +25,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/storagepb"
@@ -263,6 +264,44 @@ func TestV3DeleteRange(t *testing.T) {
 	}
 }
 
+// TestV3TxnInvaildRange tests txn
+func TestV3TxnInvaildRange(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	kvc := pb.NewKVClient(clus.RandConn())
+	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
+
+	for i := 0; i < 3; i++ {
+		_, err := kvc.Put(context.Background(), preq)
+		if err != nil {
+			t.Fatalf("couldn't put key (%v)", err)
+		}
+	}
+
+	_, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
+	if err != nil {
+		t.Fatalf("couldn't compact kv space (%v)", err)
+	}
+
+	// future rev
+	txn := &pb.TxnRequest{}
+	txn.Success = append(txn.Success, &pb.RequestUnion{RequestPut: preq})
+
+	rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
+	txn.Success = append(txn.Success, &pb.RequestUnion{RequestRange: rreq})
+
+	if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrFutureRev {
+		t.Errorf("err = %v, want %v", err, v3rpc.ErrFutureRev)
+	}
+
+	// compacted rev
+	txn.Success[1].RequestRange.Revision = 1
+	if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrCompacted {
+		t.Errorf("err = %v, want %v", err, v3rpc.ErrCompacted)
+	}
+}
+
 // TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
 func TestV3WatchFromCurrentRevision(t *testing.T) {
 	tests := []struct {

+ 5 - 0
storage/kv.go

@@ -24,6 +24,11 @@ type KV interface {
 	// Rev returns the current revision of the KV.
 	Rev() int64
 
+	// FirstRev returns the first revision of the KV.
+	// After a compaction, the first revision increases to the compaction
+	// revision.
+	FirstRev() int64
+
 	// Range gets the keys in the range at rangeRev.
 	// If rangeRev <=0, range gets the keys at currentRev.
 	// If `end` is nil, the request returns the key.

+ 7 - 0
storage/kvstore.go

@@ -106,6 +106,13 @@ func (s *store) Rev() int64 {
 	return s.currentRev.main
 }
 
+func (s *store) FirstRev() int64 {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	return s.compactMainRev
+}
+
 func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
 	id := s.TxnBegin()
 	s.put(key, value, lease)