Browse Source

Merge pull request #4184 from heyitsanthony/v3-rangereq-sort

etcdserver: support sorted range requests in v3 api
Anthony Romano 10 years ago
parent
commit
efa9cd7e0c

+ 1 - 0
etcdctlv3/command/error.go

@@ -27,6 +27,7 @@ const (
 	ExitError
 	ExitBadConnection
 	ExitInvalidInput // for txn, watch command
+	ExitBadFeature   // provided a valid flag with an unsupported value
 	ExitBadArgs      = 128
 )
 

+ 52 - 3
etcdctlv3/command/range_command.go

@@ -16,6 +16,7 @@ package command
 
 import (
 	"fmt"
+	"strings"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -23,13 +24,23 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
+var (
+	rangeLimit      int
+	rangeSortOrder  string
+	rangeSortTarget string
+)
+
 // NewRangeCommand returns the cobra command for "range".
 func NewRangeCommand() *cobra.Command {
-	return &cobra.Command{
+	cmd := &cobra.Command{
 		Use:   "range",
 		Short: "Range gets the keys in the range from the store.",
 		Run:   rangeCommandFunc,
 	}
+	cmd.Flags().StringVar(&rangeSortOrder, "order", "", "order of results; ASCEND or DESCEND")
+	cmd.Flags().StringVar(&rangeSortTarget, "sort-by", "", "sort target; CREATE, KEY, MODIFY, VALUE, or VERSION")
+	cmd.Flags().IntVar(&rangeLimit, "limit", 0, "maximum number of results")
+	return cmd
 }
 
 // rangeCommandFunc executes the "range" command.
@@ -48,13 +59,51 @@ func rangeCommandFunc(cmd *cobra.Command, args []string) {
 	if err != nil {
 		ExitWithError(ExitError, err)
 	}
+
+	sortByOrder := pb.RangeRequest_NONE
+	sortOrder := strings.ToUpper(rangeSortOrder)
+	switch {
+	case sortOrder == "ASCEND":
+		sortByOrder = pb.RangeRequest_ASCEND
+	case sortOrder == "DESCEND":
+		sortByOrder = pb.RangeRequest_DESCEND
+	case sortOrder == "":
+		sortByOrder = pb.RangeRequest_NONE
+	default:
+		ExitWithError(ExitBadFeature, fmt.Errorf("bad sort order %v", rangeSortOrder))
+	}
+
+	sortByTarget := pb.RangeRequest_KEY
+	sortTarget := strings.ToUpper(rangeSortTarget)
+	switch {
+	case sortTarget == "CREATE":
+		sortByTarget = pb.RangeRequest_CREATE
+	case sortTarget == "KEY":
+		sortByTarget = pb.RangeRequest_KEY
+	case sortTarget == "MODIFY":
+		sortByTarget = pb.RangeRequest_MOD
+	case sortTarget == "VALUE":
+		sortByTarget = pb.RangeRequest_VALUE
+	case sortTarget == "VERSION":
+		sortByTarget = pb.RangeRequest_VERSION
+	case sortTarget == "":
+		sortByTarget = pb.RangeRequest_KEY
+	default:
+		ExitWithError(ExitBadFeature, fmt.Errorf("bad sort target %v", rangeSortTarget))
+	}
+
 	conn, err := grpc.Dial(endpoint)
 	if err != nil {
 		ExitWithError(ExitBadConnection, err)
 	}
 	kv := pb.NewKVClient(conn)
-	req := &pb.RangeRequest{Key: key, RangeEnd: rangeEnd}
-
+	req := &pb.RangeRequest{
+		Key:        key,
+		RangeEnd:   rangeEnd,
+		SortOrder:  sortByOrder,
+		SortTarget: sortByTarget,
+		Limit:      int64(rangeLimit),
+	}
 	resp, err := kv.Range(context.Background(), req)
 	for _, kv := range resp.Kvs {
 		fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value))

+ 106 - 0
etcdserver/etcdserverpb/rpc.pb.go

@@ -20,6 +20,58 @@ import fmt "fmt"
 // Reference imports to suppress errors if they are not otherwise used.
 var _ = proto.Marshal
 
+type RangeRequest_SortOrder int32
+
+const (
+	RangeRequest_NONE    RangeRequest_SortOrder = 0
+	RangeRequest_ASCEND  RangeRequest_SortOrder = 1
+	RangeRequest_DESCEND RangeRequest_SortOrder = 2
+)
+
+var RangeRequest_SortOrder_name = map[int32]string{
+	0: "NONE",
+	1: "ASCEND",
+	2: "DESCEND",
+}
+var RangeRequest_SortOrder_value = map[string]int32{
+	"NONE":    0,
+	"ASCEND":  1,
+	"DESCEND": 2,
+}
+
+func (x RangeRequest_SortOrder) String() string {
+	return proto.EnumName(RangeRequest_SortOrder_name, int32(x))
+}
+
+type RangeRequest_SortTarget int32
+
+const (
+	RangeRequest_KEY     RangeRequest_SortTarget = 0
+	RangeRequest_VERSION RangeRequest_SortTarget = 1
+	RangeRequest_CREATE  RangeRequest_SortTarget = 2
+	RangeRequest_MOD     RangeRequest_SortTarget = 3
+	RangeRequest_VALUE   RangeRequest_SortTarget = 4
+)
+
+var RangeRequest_SortTarget_name = map[int32]string{
+	0: "KEY",
+	1: "VERSION",
+	2: "CREATE",
+	3: "MOD",
+	4: "VALUE",
+}
+var RangeRequest_SortTarget_value = map[string]int32{
+	"KEY":     0,
+	"VERSION": 1,
+	"CREATE":  2,
+	"MOD":     3,
+	"VALUE":   4,
+}
+
+func (x RangeRequest_SortTarget) String() string {
+	return proto.EnumName(RangeRequest_SortTarget_name, int32(x))
+}
+
 type Compare_CompareResult int32
 
 const (
@@ -94,6 +146,10 @@ type RangeRequest struct {
 	// if the revision has been compacted, ErrCompaction will be returned in
 	// response.
 	Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"`
+	// sort_order is the requested order for returned the results
+	SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
+	// sort_target is the kv field to use for sorting
+	SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
 }
 
 func (m *RangeRequest) Reset()         { *m = RangeRequest{} }
@@ -516,6 +572,8 @@ func (m *LeaseKeepAliveResponse) GetHeader() *ResponseHeader {
 }
 
 func init() {
+	proto.RegisterEnum("etcdserverpb.RangeRequest_SortOrder", RangeRequest_SortOrder_name, RangeRequest_SortOrder_value)
+	proto.RegisterEnum("etcdserverpb.RangeRequest_SortTarget", RangeRequest_SortTarget_name, RangeRequest_SortTarget_value)
 	proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value)
 	proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value)
 }
@@ -1059,6 +1117,16 @@ func (m *RangeRequest) MarshalTo(data []byte) (int, error) {
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Revision))
 	}
+	if m.SortOrder != 0 {
+		data[i] = 0x28
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.SortOrder))
+	}
+	if m.SortTarget != 0 {
+		data[i] = 0x30
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.SortTarget))
+	}
 	return i, nil
 }
 
@@ -1971,6 +2039,12 @@ func (m *RangeRequest) Size() (n int) {
 	if m.Revision != 0 {
 		n += 1 + sovRpc(uint64(m.Revision))
 	}
+	if m.SortOrder != 0 {
+		n += 1 + sovRpc(uint64(m.SortOrder))
+	}
+	if m.SortTarget != 0 {
+		n += 1 + sovRpc(uint64(m.SortTarget))
+	}
 	return n
 }
 
@@ -2551,6 +2625,38 @@ func (m *RangeRequest) Unmarshal(data []byte) error {
 					break
 				}
 			}
+		case 5:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field SortOrder", wireType)
+			}
+			m.SortOrder = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.SortOrder |= (RangeRequest_SortOrder(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 6:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field SortTarget", wireType)
+			}
+			m.SortTarget = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.SortTarget |= (RangeRequest_SortTarget(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		default:
 			var sizeOfWire int
 			for {

+ 19 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -67,6 +67,19 @@ message ResponseHeader {
 }
 
 message RangeRequest {
+  enum SortOrder {
+	NONE = 0; // default, no sorting
+	ASCEND = 1; // lowest target value first
+	DESCEND = 2; // highest target value first
+  }
+  enum SortTarget {
+	KEY = 0;
+	VERSION = 1;
+	CREATE = 2;
+	MOD = 3;
+	VALUE = 4;
+  }
+
   // if the range_end is not given, the request returns the key.
   bytes key = 1;
   // if the range_end is given, it gets the keys in range [key, range_end).
@@ -78,6 +91,12 @@ message RangeRequest {
   // if the revision has been compacted, ErrCompaction will be returned in
   // response.
   int64 revision = 4;
+
+  // sort_order is the requested order for returned the results
+  SortOrder sort_order = 5;
+
+  // sort_target is the kv field to use for sorting
+  SortTarget sort_target = 6;
 }
 
 message RangeResponse {

+ 73 - 2
etcdserver/v3demo_server.go

@@ -17,6 +17,7 @@ package etcdserver
 import (
 	"bytes"
 	"fmt"
+	"sort"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -191,6 +192,45 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
 	return resp, nil
 }
 
+type kvSort struct{ kvs []storagepb.KeyValue }
+
+func (s *kvSort) Swap(i, j int) {
+	t := s.kvs[i]
+	s.kvs[i] = s.kvs[j]
+	s.kvs[j] = t
+}
+func (s *kvSort) Len() int { return len(s.kvs) }
+
+type kvSortByKey struct{ *kvSort }
+
+func (s *kvSortByKey) Less(i, j int) bool {
+	return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
+}
+
+type kvSortByVersion struct{ *kvSort }
+
+func (s *kvSortByVersion) Less(i, j int) bool {
+	return (s.kvs[i].Version - s.kvs[j].Version) < 0
+}
+
+type kvSortByCreate struct{ *kvSort }
+
+func (s *kvSortByCreate) Less(i, j int) bool {
+	return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
+}
+
+type kvSortByMod struct{ *kvSort }
+
+func (s *kvSortByMod) Less(i, j int) bool {
+	return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
+}
+
+type kvSortByValue struct{ *kvSort }
+
+func (s *kvSortByValue) Less(i, j int) bool {
+	return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
+}
+
 func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 	resp := &pb.RangeResponse{}
 	resp.Header = &pb.ResponseHeader{}
@@ -201,18 +241,49 @@ func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeRespo
 		err error
 	)
 
+	limit := r.Limit
+	if r.SortOrder != pb.RangeRequest_NONE {
+		// fetch everything; sort and truncate afterwards
+		limit = 0
+	}
+
 	if txnID != noTxn {
-		kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, r.Limit, 0)
+		kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, limit, 0)
 		if err != nil {
 			return nil, err
 		}
 	} else {
-		kvs, rev, err = kv.Range(r.Key, r.RangeEnd, r.Limit, 0)
+		kvs, rev, err = kv.Range(r.Key, r.RangeEnd, limit, 0)
 		if err != nil {
 			return nil, err
 		}
 	}
 
+	if r.SortOrder != pb.RangeRequest_NONE {
+		var sorter sort.Interface
+		switch {
+		case r.SortTarget == pb.RangeRequest_KEY:
+			sorter = &kvSortByKey{&kvSort{kvs}}
+		case r.SortTarget == pb.RangeRequest_VERSION:
+			sorter = &kvSortByVersion{&kvSort{kvs}}
+		case r.SortTarget == pb.RangeRequest_CREATE:
+			sorter = &kvSortByCreate{&kvSort{kvs}}
+		case r.SortTarget == pb.RangeRequest_MOD:
+			sorter = &kvSortByMod{&kvSort{kvs}}
+		case r.SortTarget == pb.RangeRequest_VALUE:
+			sorter = &kvSortByValue{&kvSort{kvs}}
+		}
+		switch {
+		case r.SortOrder == pb.RangeRequest_ASCEND:
+			sort.Sort(sorter)
+		case r.SortOrder == pb.RangeRequest_DESCEND:
+			sort.Sort(sort.Reverse(sorter))
+		}
+		if r.Limit > 0 && len(kvs) > int(r.Limit) {
+			kvs = kvs[:r.Limit]
+		}
+	}
+
 	resp.Header.Revision = rev
 	for i := range kvs {
 		resp.Kvs = append(resp.Kvs, &kvs[i])