Browse Source

Merge pull request #2688 from xiang90/versioning

etcdserver: internal request union
Xiang Li 10 years ago
parent
commit
ff0b8723c7

+ 142 - 0
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -12,6 +12,7 @@
 	It has these top-level messages:
 	It has these top-level messages:
 		Request
 		Request
 		Metadata
 		Metadata
+		InternalRaftRequest
 */
 */
 package etcdserverpb
 package etcdserverpb
 
 
@@ -61,6 +62,17 @@ func (m *Metadata) Reset()         { *m = Metadata{} }
 func (m *Metadata) String() string { return proto.CompactTextString(m) }
 func (m *Metadata) String() string { return proto.CompactTextString(m) }
 func (*Metadata) ProtoMessage()    {}
 func (*Metadata) ProtoMessage()    {}
 
 
+// An InternalRaftRequest is the union of all requests which can be
+// sent via raft.
+type InternalRaftRequest struct {
+	V2               *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"`
+	XXX_unrecognized []byte   `json:"-"`
+}
+
+func (m *InternalRaftRequest) Reset()         { *m = InternalRaftRequest{} }
+func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) }
+func (*InternalRaftRequest) ProtoMessage()    {}
+
 func init() {
 func init() {
 }
 }
 func (m *Request) Unmarshal(data []byte) error {
 func (m *Request) Unmarshal(data []byte) error {
@@ -462,6 +474,76 @@ func (m *Metadata) Unmarshal(data []byte) error {
 
 
 	return nil
 	return nil
 }
 }
+func (m *InternalRaftRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.V2 == nil {
+				m.V2 = &Request{}
+			}
+			if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			iNdEx -= sizeOfWire
+			skippy, err := skipEtcdserver(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...)
+			iNdEx += skippy
+		}
+	}
+
+	return nil
+}
 func skipEtcdserver(data []byte) (n int, err error) {
 func skipEtcdserver(data []byte) (n int, err error) {
 	l := len(data)
 	l := len(data)
 	iNdEx := 0
 	iNdEx := 0
@@ -546,6 +628,22 @@ func skipEtcdserver(data []byte) (n int, err error) {
 	}
 	}
 	panic("unreachable")
 	panic("unreachable")
 }
 }
+func (this *InternalRaftRequest) GetValue() interface{} {
+	if this.V2 != nil {
+		return this.V2
+	}
+	return nil
+}
+
+func (this *InternalRaftRequest) SetValue(value interface{}) bool {
+	switch vt := value.(type) {
+	case *Request:
+		this.V2 = vt
+	default:
+		return false
+	}
+	return true
+}
 func (m *Request) Size() (n int) {
 func (m *Request) Size() (n int) {
 	var l int
 	var l int
 	_ = l
 	_ = l
@@ -588,6 +686,19 @@ func (m *Metadata) Size() (n int) {
 	return n
 	return n
 }
 }
 
 
+func (m *InternalRaftRequest) Size() (n int) {
+	var l int
+	_ = l
+	if m.V2 != nil {
+		l = m.V2.Size()
+		n += 1 + l + sovEtcdserver(uint64(l))
+	}
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
 func sovEtcdserver(x uint64) (n int) {
 func sovEtcdserver(x uint64) (n int) {
 	for {
 	for {
 		n++
 		n++
@@ -740,6 +851,37 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) {
 	return i, nil
 	return i, nil
 }
 }
 
 
+func (m *InternalRaftRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.V2 != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintEtcdserver(data, i, uint64(m.V2.Size()))
+		n1, err := m.V2.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n1
+	}
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 	data[offset] = uint8(v)
 	data[offset] = uint8(v)
 	data[offset+1] = uint8(v >> 8)
 	data[offset+1] = uint8(v >> 8)

+ 9 - 0
etcdserver/etcdserverpb/etcdserver.proto

@@ -31,3 +31,12 @@ message Metadata {
 	optional uint64 NodeID    = 1 [(gogoproto.nullable) = false];
 	optional uint64 NodeID    = 1 [(gogoproto.nullable) = false];
 	optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
 	optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
 }
 }
+
+// An InternalRaftRequest is the union of all requests which can be
+// sent via raft.
+message InternalRaftRequest {
+  option (gogoproto.onlyone) = true;
+  oneof value {
+    Request v2 = 1;
+  }
+}

+ 16 - 4
etcdserver/server.go

@@ -521,7 +521,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 	}
 	switch r.Method {
 	switch r.Method {
 	case "POST", "PUT", "DELETE", "QGET":
 	case "POST", "PUT", "DELETE", "QGET":
-		data, err := r.Marshal()
+		var raftReq pb.InternalRaftRequest
+		raftReq.V2 = &r
+		data, err := raftReq.Marshal()
 		if err != nil {
 		if err != nil {
 			return Response{}, err
 			return Response{}, err
 		}
 		}
@@ -741,9 +743,19 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 				}
 				}
 				break
 				break
 			}
 			}
-			var r pb.Request
-			pbutil.MustUnmarshal(&r, e.Data)
-			s.w.Trigger(r.ID, s.applyRequest(r))
+
+			var raftReq pb.InternalRaftRequest
+			if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
+				var r pb.Request
+				pbutil.MustUnmarshal(&r, e.Data)
+				s.w.Trigger(r.ID, s.applyRequest(r))
+			} else {
+				switch {
+				case raftReq.V2 != nil:
+					req := raftReq.V2
+					s.w.Trigger(req.ID, s.applyRequest(*req))
+				}
+			}
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			pbutil.MustUnmarshal(&cc, e.Data)

+ 6 - 4
etcdserver/server_test.go

@@ -982,10 +982,11 @@ func TestPublish(t *testing.T) {
 		t.Fatalf("action = %s, want Propose", action[0].Name)
 		t.Fatalf("action = %s, want Propose", action[0].Name)
 	}
 	}
 	data := action[0].Params[0].([]byte)
 	data := action[0].Params[0].([]byte)
-	var r pb.Request
-	if err := r.Unmarshal(data); err != nil {
+	var rr pb.InternalRaftRequest
+	if err := rr.Unmarshal(data); err != nil {
 		t.Fatalf("unmarshal request error: %v", err)
 		t.Fatalf("unmarshal request error: %v", err)
 	}
 	}
+	r := rr.V2
 	if r.Method != "PUT" {
 	if r.Method != "PUT" {
 		t.Errorf("method = %s, want PUT", r.Method)
 		t.Errorf("method = %s, want PUT", r.Method)
 	}
 	}
@@ -1062,10 +1063,11 @@ func TestUpdateVersion(t *testing.T) {
 		t.Fatalf("action = %s, want Propose", action[0].Name)
 		t.Fatalf("action = %s, want Propose", action[0].Name)
 	}
 	}
 	data := action[0].Params[0].([]byte)
 	data := action[0].Params[0].([]byte)
-	var r pb.Request
-	if err := r.Unmarshal(data); err != nil {
+	var rr pb.InternalRaftRequest
+	if err := rr.Unmarshal(data); err != nil {
 		t.Fatalf("unmarshal request error: %v", err)
 		t.Fatalf("unmarshal request error: %v", err)
 	}
 	}
+	r := rr.V2
 	if r.Method != "PUT" {
 	if r.Method != "PUT" {
 		t.Errorf("method = %s, want PUT", r.Method)
 		t.Errorf("method = %s, want PUT", r.Method)
 	}
 	}

+ 7 - 0
pkg/pbutil/pbutil.go

@@ -42,6 +42,13 @@ func MustUnmarshal(um Unmarshaler, data []byte) {
 	}
 	}
 }
 }
 
 
+func MaybeUnmarshal(um Unmarshaler, data []byte) bool {
+	if err := um.Unmarshal(data); err != nil {
+		return false
+	}
+	return true
+}
+
 func GetBool(v *bool) (vv bool, set bool) {
 func GetBool(v *bool) (vv bool, set bool) {
 	if v == nil {
 	if v == nil {
 		return false, false
 		return false, false