Browse Source

Merge pull request #4508 from xiang90/l

*: support local range request
Xiang Li 9 years ago
parent
commit
00f89941a9

+ 41 - 1
etcdserver/etcdserverpb/rpc.pb.go

@@ -143,7 +143,8 @@ func (*ResponseHeader) ProtoMessage()    {}
 type RangeRequest struct {
 type RangeRequest struct {
 	// if the range_end is not given, the request returns the key.
 	// if the range_end is not given, the request returns the key.
 	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
 	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
-	// if the range_end is given, it gets the keys in range [key, range_end).
+	// if the range_end is given, it gets the keys in range [key, range_end)
+	// if range_end is nonempty, otherwise it returns all keys >= key.
 	RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"`
 	RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"`
 	// limit the number of keys returned.
 	// limit the number of keys returned.
 	Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"`
 	Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"`
@@ -156,6 +157,12 @@ type RangeRequest struct {
 	SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
 	SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
 	// sort_target is the kv field to use for sorting
 	// sort_target is the kv field to use for sorting
 	SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
 	SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
+	// range request is linearizable by default. Linearizable requests has a higher
+	// latency and lower throughput than serializable request.
+	// To reduce latency, serializable can be set. If serializable is set, range request
+	// will be serializable, but not linearizable with other requests.
+	// Serializable range can be served locally without waiting for other nodes in the cluster.
+	Serializable bool `protobuf:"varint,7,opt,name=serializable,proto3" json:"serializable,omitempty"`
 }
 }
 
 
 func (m *RangeRequest) Reset()         { *m = RangeRequest{} }
 func (m *RangeRequest) Reset()         { *m = RangeRequest{} }
@@ -1883,6 +1890,16 @@ func (m *RangeRequest) MarshalTo(data []byte) (int, error) {
 		i++
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.SortTarget))
 		i = encodeVarintRpc(data, i, uint64(m.SortTarget))
 	}
 	}
+	if m.Serializable {
+		data[i] = 0x38
+		i++
+		if m.Serializable {
+			data[i] = 1
+		} else {
+			data[i] = 0
+		}
+		i++
+	}
 	return i, nil
 	return i, nil
 }
 }
 
 
@@ -3230,6 +3247,9 @@ func (m *RangeRequest) Size() (n int) {
 	if m.SortTarget != 0 {
 	if m.SortTarget != 0 {
 		n += 1 + sovRpc(uint64(m.SortTarget))
 		n += 1 + sovRpc(uint64(m.SortTarget))
 	}
 	}
+	if m.Serializable {
+		n += 2
+	}
 	return n
 	return n
 }
 }
 
 
@@ -4095,6 +4115,26 @@ func (m *RangeRequest) Unmarshal(data []byte) error {
 					break
 					break
 				}
 				}
 			}
 			}
+		case 7:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Serializable", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Serializable = bool(v != 0)
 		default:
 		default:
 			iNdEx = preIndex
 			iNdEx = preIndex
 			skippy, err := skipRpc(data[iNdEx:])
 			skippy, err := skipRpc(data[iNdEx:])

+ 7 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -117,6 +117,13 @@ message RangeRequest {
 
 
   // sort_target is the kv field to use for sorting
   // sort_target is the kv field to use for sorting
   SortTarget sort_target = 6;
   SortTarget sort_target = 6;
+
+  // range request is linearizable by default. Linearizable requests has a higher
+  // latency and lower throughput than serializable request.
+  // To reduce latency, serializable can be set. If serializable is set, range request 
+  // will be serializable, but not linearizable with other requests.
+  // Serializable range can be served locally without waiting for other nodes in the cluster.
+  bool serializable = 7;
 }
 }
 
 
 message RangeResponse {
 message RangeResponse {

+ 4 - 0
etcdserver/v3demo_server.go

@@ -58,6 +58,10 @@ type Lessor interface {
 }
 }
 
 
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	if r.Serializable {
+		return applyRange(noTxn, s.kv, r)
+	}
+
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 4 - 3
storage/storagepb/kv.pb.go

@@ -53,9 +53,10 @@ func (x Event_EventType) String() string {
 }
 }
 
 
 type KeyValue struct {
 type KeyValue struct {
-	Key            []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
-	CreateRevision int64  `protobuf:"varint,2,opt,name=create_revision,proto3" json:"create_revision,omitempty"`
-	// mod_revision is the last modified revision of the key.
+	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+	// create_revision is the revision of last creation on this key.
+	CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,proto3" json:"create_revision,omitempty"`
+	// mod_revision is the revision of last modification on this key.
 	ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,proto3" json:"mod_revision,omitempty"`
 	ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,proto3" json:"mod_revision,omitempty"`
 	// version is the version of the key. A deletion resets
 	// version is the version of the key. A deletion resets
 	// the version to zero and any modification of the key
 	// the version to zero and any modification of the key