소스 검색

etcdserver: support txn

Xiang Li 10 년 전
부모
커밋
9233fff48f
2개의 변경된 파일130개의 추가작업 그리고 28개의 파일을 삭제
  1. 2 2
      etcdserver/api/v3rpc/key.go
  2. 128 26
      etcdserver/v3demo_server.go

+ 2 - 2
etcdserver/api/v3rpc/key.go

@@ -44,8 +44,8 @@ func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p
 }
 
 func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
-	panic("not implemented")
-	return nil, nil
+	resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r})
+	return resp.(*pb.TxnResponse), nil
 }
 
 func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {

+ 128 - 26
etcdserver/v3demo_server.go

@@ -15,9 +15,12 @@
 package etcdserver
 
 import (
+	"bytes"
+
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	dstorage "github.com/coreos/etcd/storage"
 )
 
 type V3DemoServer interface {
@@ -27,36 +30,135 @@ type V3DemoServer interface {
 func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message {
 	switch {
 	case r.Range != nil:
-		rr := r.Range
-		resp := &pb.RangeResponse{}
-		resp.Header = &pb.ResponseHeader{}
-		kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0)
-		if err != nil {
-			panic("not handled error")
-		}
-
-		resp.Header.Index = rev
-		for i := range kvs {
-			resp.Kvs = append(resp.Kvs, &kvs[i])
-		}
-		return resp
+		return doRange(s.kv, r.Range)
 	case r.Put != nil:
-		rp := r.Put
-		resp := &pb.PutResponse{}
-		resp.Header = &pb.ResponseHeader{}
-		rev := s.kv.Put(rp.Key, rp.Value)
-		resp.Header.Index = rev
-		return resp
+		return doPut(s.kv, r.Put)
 	case r.DeleteRange != nil:
-		rd := r.DeleteRange
-		resp := &pb.DeleteRangeResponse{}
-		resp.Header = &pb.ResponseHeader{}
-		_, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd)
-		resp.Header.Index = rev
-		return resp
+		return doDeleteRange(s.kv, r.DeleteRange)
 	case r.Txn != nil:
-		panic("not implemented")
+		var index int64
+		rt := r.Txn
+
+		ok := true
+		for _, c := range rt.Compare {
+			kvs, rev, err := s.kv.Range(c.Key, nil, 1, 0)
+			if err != nil {
+				ok = false
+				break
+			}
+			index = rev
+			kv := kvs[0]
+
+			// -1 is less, 0 is equal, 1 is greater
+			var result int
+			switch c.Target {
+			case pb.Compare_VALUE:
+				result = bytes.Compare(kv.Value, c.Value)
+			case pb.Compare_CREATE:
+				result = compareInt64(kv.CreateIndex, c.CreateIndex)
+			case pb.Compare_MOD:
+				result = compareInt64(kv.ModIndex, c.ModIndex)
+			case pb.Compare_VERSION:
+				result = compareInt64(kv.Version, c.Version)
+			}
+
+			switch c.Result {
+			case pb.Compare_EQUAL:
+				if result != 0 {
+					ok = false
+				}
+			case pb.Compare_GREATER:
+				if result != 1 {
+					ok = false
+				}
+			case pb.Compare_LESS:
+				if result != -1 {
+					ok = false
+				}
+			}
+
+			if !ok {
+				break
+			}
+		}
+
+		var reqs []*pb.RequestUnion
+		if ok {
+			reqs = rt.Success
+		} else {
+			reqs = rt.Failure
+		}
+		resps := make([]*pb.ResponseUnion, len(reqs))
+		for i := range reqs {
+			resps[i] = doUnion(s.kv, reqs[i])
+		}
+		if len(resps) != 0 {
+			index += 1
+		}
+
+		txnResp := &pb.TxnResponse{}
+		txnResp.Header = &pb.ResponseHeader{}
+		txnResp.Header.Index = index
+		txnResp.Responses = resps
+		txnResp.Succeeded = ok
+		return txnResp
 	default:
 		panic("not implemented")
 	}
 }
+
+func compareInt64(a, b int64) int {
+	switch {
+	case a < b:
+		return -1
+	case a > b:
+		return 1
+	default:
+		return 0
+	}
+}
+
+func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse {
+	resp := &pb.PutResponse{}
+	resp.Header = &pb.ResponseHeader{}
+	rev := kv.Put(p.Key, p.Value)
+	resp.Header.Index = rev
+	return resp
+}
+
+func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse {
+	resp := &pb.RangeResponse{}
+	resp.Header = &pb.ResponseHeader{}
+	kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0)
+	if err != nil {
+		panic("not handled error")
+	}
+
+	resp.Header.Index = rev
+	for i := range kvs {
+		resp.Kvs = append(resp.Kvs, &kvs[i])
+	}
+	return resp
+}
+
+func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse {
+	resp := &pb.DeleteRangeResponse{}
+	resp.Header = &pb.ResponseHeader{}
+	_, rev := kv.DeleteRange(dr.Key, dr.RangeEnd)
+	resp.Header.Index = rev
+	return resp
+}
+
+func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
+	switch {
+	case union.RequestRange != nil:
+		return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)}
+	case union.RequestPut != nil:
+		return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)}
+	case union.RequestDeleteRange != nil:
+		return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)}
+	default:
+		// empty union
+		return nil
+	}
+}