Browse Source

Merge pull request #3248 from xiang90/v3

initial v3 demo
Xiang Li 10 years ago
parent
commit
a718329ad3

+ 6 - 0
Documentation/configuration.md

@@ -193,6 +193,12 @@ Follow the instructions when using these flags.
 + Force to create a new one-member cluster. It commits configuration changes in force to remove all existing members in the cluster and add itself. It needs to be set to [restore a backup][restore].
 + default: false
 
+### Experimental Flags
+
+##### -experimental-v3demo
++ Enable experimental [v3 demo API](rfc/v3api.proto).
++ default: false
+
 ### Miscellaneous Flags
 
 ##### -version

+ 61 - 0
etcdctlv3/command/delete_range_command.go

@@ -0,0 +1,61 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"fmt"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// NewDeleteRangeCommand returns the CLI command for "deleteRange".
+func NewDeleteRangeCommand() cli.Command {
+	return cli.Command{
+		Name: "delete-range",
+		Action: func(c *cli.Context) {
+			deleteRangeCommandFunc(c)
+		},
+	}
+}
+
+// deleteRangeCommandFunc executes the "delegeRange" command.
+func deleteRangeCommandFunc(c *cli.Context) {
+	if len(c.Args()) == 0 {
+		panic("bad arg")
+	}
+
+	var rangeEnd []byte
+	key := []byte(c.Args()[0])
+	if len(c.Args()) > 1 {
+		rangeEnd = []byte(c.Args()[1])
+	}
+	conn, err := grpc.Dial("127.0.0.1:12379")
+	if err != nil {
+		panic(err)
+	}
+	etcd := pb.NewEtcdClient(conn)
+	req := &pb.DeleteRangeRequest{Key: key, RangeEnd: rangeEnd}
+
+	etcd.DeleteRange(context.Background(), req)
+
+	if rangeEnd != nil {
+		fmt.Printf("range [%s, %s) is deleted\n", string(key), string(rangeEnd))
+	} else {
+		fmt.Printf("key %s is deleted\n", string(key))
+	}
+}

+ 53 - 0
etcdctlv3/command/put_command.go

@@ -0,0 +1,53 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"fmt"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// NewPutCommand returns the CLI command for "put".
+func NewPutCommand() cli.Command {
+	return cli.Command{
+		Name: "put",
+		Action: func(c *cli.Context) {
+			putCommandFunc(c)
+		},
+	}
+}
+
+// putCommandFunc executes the "put" command.
+func putCommandFunc(c *cli.Context) {
+	if len(c.Args()) != 2 {
+		panic("bad arg")
+	}
+
+	key := []byte(c.Args()[0])
+	value := []byte(c.Args()[1])
+	conn, err := grpc.Dial("127.0.0.1:12379")
+	if err != nil {
+		panic(err)
+	}
+	etcd := pb.NewEtcdClient(conn)
+	req := &pb.PutRequest{Key: key, Value: value}
+
+	etcd.Put(context.Background(), req)
+	fmt.Printf("%s %s\n", key, value)
+}

+ 58 - 0
etcdctlv3/command/range_command.go

@@ -0,0 +1,58 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"fmt"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// NewRangeCommand returns the CLI command for "range".
+func NewRangeCommand() cli.Command {
+	return cli.Command{
+		Name: "range",
+		Action: func(c *cli.Context) {
+			rangeCommandFunc(c)
+		},
+	}
+}
+
+// rangeCommandFunc executes the "range" command.
+func rangeCommandFunc(c *cli.Context) {
+	if len(c.Args()) == 0 {
+		panic("bad arg")
+	}
+
+	var rangeEnd []byte
+	key := []byte(c.Args()[0])
+	if len(c.Args()) > 1 {
+		rangeEnd = []byte(c.Args()[1])
+	}
+	conn, err := grpc.Dial("127.0.0.1:12379")
+	if err != nil {
+		panic(err)
+	}
+	etcd := pb.NewEtcdClient(conn)
+	req := &pb.RangeRequest{Key: key, RangeEnd: rangeEnd}
+
+	resp, err := etcd.Range(context.Background(), req)
+	for _, kv := range resp.Kvs {
+		fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value))
+	}
+}

+ 37 - 0
etcdctlv3/main.go

@@ -0,0 +1,37 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"os"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/etcdctlv3/command"
+	"github.com/coreos/etcd/version"
+)
+
+func main() {
+	app := cli.NewApp()
+	app.Name = "etcdctlv3"
+	app.Version = version.Version
+	app.Usage = "A simple command line client for etcd3."
+	app.Commands = []cli.Command{
+		command.NewRangeCommand(),
+		command.NewPutCommand(),
+		command.NewDeleteRangeCommand(),
+	}
+
+	app.Run(os.Args)
+}

+ 5 - 0
etcdmain/config.go

@@ -115,6 +115,8 @@ type config struct {
 
 	printVersion bool
 
+	v3demo bool
+
 	ignored []string
 }
 
@@ -208,6 +210,9 @@ func NewConfig() *config {
 	// version
 	fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit")
 
+	// demo flag
+	fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API")
+
 	// backwards-compatibility with v0.4.6
 	fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.")
 	fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.")

+ 21 - 0
etcdmain/etcd.go

@@ -31,9 +31,12 @@ import (
 	systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/cors"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/osutil"
@@ -232,6 +235,15 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		clns = append(clns, l)
 	}
 
+	var v3l net.Listener
+	if cfg.v3demo {
+		v3l, err = net.Listen("tcp", "127.0.0.1:12379")
+		if err != nil {
+			plog.Fatal(err)
+		}
+		plog.Infof("listening for client rpc on 127.0.0.1:12379")
+	}
+
 	srvcfg := &etcdserver.ServerConfig{
 		Name:                cfg.name,
 		ClientURLs:          cfg.acurls,
@@ -249,6 +261,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		Transport:           pt,
 		TickMs:              cfg.TickMs,
 		ElectionTicks:       cfg.electionTicks(),
+		V3demo:              cfg.v3demo,
 	}
 	var s *etcdserver.EtcdServer
 	s, err = etcdserver.NewServer(srvcfg)
@@ -280,6 +293,14 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 			plog.Fatal(serveHTTP(l, ch, 0))
 		}(l)
 	}
+
+	if cfg.v3demo {
+		// set up v3 demo rpc
+		grpcServer := grpc.NewServer()
+		etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s))
+		go plog.Fatal(grpcServer.Serve(v3l))
+	}
+
 	return s.StopNotify(), nil
 }
 

+ 6 - 0
etcdmain/help.go

@@ -121,5 +121,11 @@ given by the consensus protocol.
 
 	--force-new-cluster 'false'
 		force to create a new one-member cluster.
+
+
+experimental flags:
+
+	--experimental-v3demo 'false'
+		enable experimental v3 demo API
 `
 )

+ 54 - 0
etcdserver/api/v3rpc/key.go

@@ -0,0 +1,54 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3rpc
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/etcdserver"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type handler struct {
+	server etcdserver.V3DemoServer
+}
+
+func New(s etcdserver.V3DemoServer) pb.EtcdServer {
+	return &handler{s}
+}
+
+func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r})
+	return resp.(*pb.RangeResponse), nil
+}
+
+func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+	resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r})
+	return resp.(*pb.PutResponse), nil
+}
+
+func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+	resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r})
+	return resp.(*pb.DeleteRangeResponse), nil
+}
+
+func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+	panic("not implemented")
+	return nil, nil
+}
+
+func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+	panic("not implemented")
+	return nil, nil
+}

+ 2 - 0
etcdserver/config.go

@@ -43,6 +43,8 @@ type ServerConfig struct {
 
 	TickMs        uint
 	ElectionTicks int
+
+	V3demo bool
 }
 
 // VerifyBootstrapConfig sanity-checks the initial config for bootstrap case

+ 1 - 142
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -7,12 +7,12 @@
 
 	It is generated from these files:
 		etcdserver.proto
+		raft_internal.proto
 		rpc.proto
 
 	It has these top-level messages:
 		Request
 		Metadata
-		InternalRaftRequest
 */
 package etcdserverpb
 
@@ -62,17 +62,6 @@ func (m *Metadata) Reset()         { *m = Metadata{} }
 func (m *Metadata) String() string { return proto.CompactTextString(m) }
 func (*Metadata) ProtoMessage()    {}
 
-// An InternalRaftRequest is the union of all requests which can be
-// sent via raft.
-type InternalRaftRequest struct {
-	V2               *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"`
-	XXX_unrecognized []byte   `json:"-"`
-}
-
-func (m *InternalRaftRequest) Reset()         { *m = InternalRaftRequest{} }
-func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) }
-func (*InternalRaftRequest) ProtoMessage()    {}
-
 func init() {
 }
 func (m *Request) Unmarshal(data []byte) error {
@@ -474,76 +463,6 @@ func (m *Metadata) Unmarshal(data []byte) error {
 
 	return nil
 }
-func (m *InternalRaftRequest) Unmarshal(data []byte) error {
-	l := len(data)
-	iNdEx := 0
-	for iNdEx < l {
-		var wire uint64
-		for shift := uint(0); ; shift += 7 {
-			if iNdEx >= l {
-				return io.ErrUnexpectedEOF
-			}
-			b := data[iNdEx]
-			iNdEx++
-			wire |= (uint64(b) & 0x7F) << shift
-			if b < 0x80 {
-				break
-			}
-		}
-		fieldNum := int32(wire >> 3)
-		wireType := int(wire & 0x7)
-		switch fieldNum {
-		case 1:
-			if wireType != 2 {
-				return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType)
-			}
-			var msglen int
-			for shift := uint(0); ; shift += 7 {
-				if iNdEx >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[iNdEx]
-				iNdEx++
-				msglen |= (int(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-			postIndex := iNdEx + msglen
-			if postIndex > l {
-				return io.ErrUnexpectedEOF
-			}
-			if m.V2 == nil {
-				m.V2 = &Request{}
-			}
-			if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil {
-				return err
-			}
-			iNdEx = postIndex
-		default:
-			var sizeOfWire int
-			for {
-				sizeOfWire++
-				wire >>= 7
-				if wire == 0 {
-					break
-				}
-			}
-			iNdEx -= sizeOfWire
-			skippy, err := skipEtcdserver(data[iNdEx:])
-			if err != nil {
-				return err
-			}
-			if (iNdEx + skippy) > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...)
-			iNdEx += skippy
-		}
-	}
-
-	return nil
-}
 func skipEtcdserver(data []byte) (n int, err error) {
 	l := len(data)
 	iNdEx := 0
@@ -628,22 +547,6 @@ func skipEtcdserver(data []byte) (n int, err error) {
 	}
 	panic("unreachable")
 }
-func (this *InternalRaftRequest) GetValue() interface{} {
-	if this.V2 != nil {
-		return this.V2
-	}
-	return nil
-}
-
-func (this *InternalRaftRequest) SetValue(value interface{}) bool {
-	switch vt := value.(type) {
-	case *Request:
-		this.V2 = vt
-	default:
-		return false
-	}
-	return true
-}
 func (m *Request) Size() (n int) {
 	var l int
 	_ = l
@@ -686,19 +589,6 @@ func (m *Metadata) Size() (n int) {
 	return n
 }
 
-func (m *InternalRaftRequest) Size() (n int) {
-	var l int
-	_ = l
-	if m.V2 != nil {
-		l = m.V2.Size()
-		n += 1 + l + sovEtcdserver(uint64(l))
-	}
-	if m.XXX_unrecognized != nil {
-		n += len(m.XXX_unrecognized)
-	}
-	return n
-}
-
 func sovEtcdserver(x uint64) (n int) {
 	for {
 		n++
@@ -851,37 +741,6 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) {
 	return i, nil
 }
 
-func (m *InternalRaftRequest) Marshal() (data []byte, err error) {
-	size := m.Size()
-	data = make([]byte, size)
-	n, err := m.MarshalTo(data)
-	if err != nil {
-		return nil, err
-	}
-	return data[:n], nil
-}
-
-func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) {
-	var i int
-	_ = i
-	var l int
-	_ = l
-	if m.V2 != nil {
-		data[i] = 0xa
-		i++
-		i = encodeVarintEtcdserver(data, i, uint64(m.V2.Size()))
-		n1, err := m.V2.MarshalTo(data[i:])
-		if err != nil {
-			return 0, err
-		}
-		i += n1
-	}
-	if m.XXX_unrecognized != nil {
-		i += copy(data[i:], m.XXX_unrecognized)
-	}
-	return i, nil
-}
-
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 	data[offset] = uint8(v)
 	data[offset+1] = uint8(v >> 8)

+ 0 - 9
etcdserver/etcdserverpb/etcdserver.proto

@@ -31,12 +31,3 @@ message Metadata {
 	optional uint64 NodeID    = 1 [(gogoproto.nullable) = false];
 	optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
 }
-
-// An InternalRaftRequest is the union of all requests which can be
-// sent via raft.
-message InternalRaftRequest {
-  option (gogoproto.onlyone) = true;
-  oneof value {
-    Request v2 = 1;
-  }
-}

+ 463 - 0
etcdserver/etcdserverpb/raft_internal.pb.go

@@ -0,0 +1,463 @@
+// Code generated by protoc-gen-gogo.
+// source: raft_internal.proto
+// DO NOT EDIT!
+
+package etcdserverpb
+
+import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
+
+// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb"
+
+import io "io"
+import fmt "fmt"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+
+// An InternalRaftRequest is the union of all requests which can be
+// sent via raft.
+type InternalRaftRequest struct {
+	V2          *Request            `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"`
+	Range       *RangeRequest       `protobuf:"bytes,2,opt,name=range" json:"range,omitempty"`
+	Put         *PutRequest         `protobuf:"bytes,3,opt,name=put" json:"put,omitempty"`
+	DeleteRange *DeleteRangeRequest `protobuf:"bytes,4,opt,name=delete_range" json:"delete_range,omitempty"`
+	Txn         *TxnRequest         `protobuf:"bytes,5,opt,name=txn" json:"txn,omitempty"`
+}
+
+func (m *InternalRaftRequest) Reset()         { *m = InternalRaftRequest{} }
+func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) }
+func (*InternalRaftRequest) ProtoMessage()    {}
+
+func init() {
+}
+func (m *InternalRaftRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.V2 == nil {
+				m.V2 = &Request{}
+			}
+			if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Range == nil {
+				m.Range = &RangeRequest{}
+			}
+			if err := m.Range.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Put", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Put == nil {
+				m.Put = &PutRequest{}
+			}
+			if err := m.Put.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 4:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field DeleteRange", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.DeleteRange == nil {
+				m.DeleteRange = &DeleteRangeRequest{}
+			}
+			if err := m.DeleteRange.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 5:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Txn == nil {
+				m.Txn = &TxnRequest{}
+			}
+			if err := m.Txn.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			iNdEx -= sizeOfWire
+			skippy, err := skipRaftInternal(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	return nil
+}
+func skipRaftInternal(data []byte) (n int, err error) {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if iNdEx >= l {
+				return 0, io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		wireType := int(wire & 0x7)
+		switch wireType {
+		case 0:
+			for {
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				iNdEx++
+				if data[iNdEx-1] < 0x80 {
+					break
+				}
+			}
+			return iNdEx, nil
+		case 1:
+			iNdEx += 8
+			return iNdEx, nil
+		case 2:
+			var length int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				length |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			iNdEx += length
+			return iNdEx, nil
+		case 3:
+			for {
+				var innerWire uint64
+				var start int = iNdEx
+				for shift := uint(0); ; shift += 7 {
+					if iNdEx >= l {
+						return 0, io.ErrUnexpectedEOF
+					}
+					b := data[iNdEx]
+					iNdEx++
+					innerWire |= (uint64(b) & 0x7F) << shift
+					if b < 0x80 {
+						break
+					}
+				}
+				innerWireType := int(innerWire & 0x7)
+				if innerWireType == 4 {
+					break
+				}
+				next, err := skipRaftInternal(data[start:])
+				if err != nil {
+					return 0, err
+				}
+				iNdEx = start + next
+			}
+			return iNdEx, nil
+		case 4:
+			return iNdEx, nil
+		case 5:
+			iNdEx += 4
+			return iNdEx, nil
+		default:
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+		}
+	}
+	panic("unreachable")
+}
+func (this *InternalRaftRequest) GetValue() interface{} {
+	if this.V2 != nil {
+		return this.V2
+	}
+	if this.Range != nil {
+		return this.Range
+	}
+	if this.Put != nil {
+		return this.Put
+	}
+	if this.DeleteRange != nil {
+		return this.DeleteRange
+	}
+	if this.Txn != nil {
+		return this.Txn
+	}
+	return nil
+}
+
+func (this *InternalRaftRequest) SetValue(value interface{}) bool {
+	switch vt := value.(type) {
+	case *Request:
+		this.V2 = vt
+	case *RangeRequest:
+		this.Range = vt
+	case *PutRequest:
+		this.Put = vt
+	case *DeleteRangeRequest:
+		this.DeleteRange = vt
+	case *TxnRequest:
+		this.Txn = vt
+	default:
+		return false
+	}
+	return true
+}
+func (m *InternalRaftRequest) Size() (n int) {
+	var l int
+	_ = l
+	if m.V2 != nil {
+		l = m.V2.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
+	if m.Range != nil {
+		l = m.Range.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
+	if m.Put != nil {
+		l = m.Put.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
+	if m.DeleteRange != nil {
+		l = m.DeleteRange.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
+	if m.Txn != nil {
+		l = m.Txn.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
+	return n
+}
+
+func sovRaftInternal(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozRaftInternal(x uint64) (n int) {
+	return sovRaftInternal(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *InternalRaftRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.V2 != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.V2.Size()))
+		n1, err := m.V2.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n1
+	}
+	if m.Range != nil {
+		data[i] = 0x12
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.Range.Size()))
+		n2, err := m.Range.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n2
+	}
+	if m.Put != nil {
+		data[i] = 0x1a
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.Put.Size()))
+		n3, err := m.Put.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n3
+	}
+	if m.DeleteRange != nil {
+		data[i] = 0x22
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.DeleteRange.Size()))
+		n4, err := m.DeleteRange.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n4
+	}
+	if m.Txn != nil {
+		data[i] = 0x2a
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.Txn.Size()))
+		n5, err := m.Txn.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n5
+	}
+	return i, nil
+}
+
+func encodeFixed64RaftInternal(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32RaftInternal(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintRaftInternal(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}

+ 24 - 0
etcdserver/etcdserverpb/raft_internal.proto

@@ -0,0 +1,24 @@
+syntax = "proto3";
+package etcdserverpb;
+
+import "github.com/gogo/protobuf/gogoproto/gogo.proto";
+import "etcdserver.proto";
+import "rpc.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+
+// An InternalRaftRequest is the union of all requests which can be
+// sent via raft.
+message InternalRaftRequest {
+  option (gogoproto.onlyone) = true;
+  oneof value {
+    Request v2 = 1;
+    RangeRequest range = 2;
+    PutRequest put = 3;
+    DeleteRangeRequest delete_range = 4;
+    TxnRequest txn = 5;
+  }
+}

+ 10 - 0
etcdserver/server.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"math/rand"
 	"net/http"
+	"os"
 	"path"
 	"regexp"
 	"sync/atomic"
@@ -43,6 +44,7 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/snap"
+	dstorage "github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/wal"
@@ -158,6 +160,7 @@ type EtcdServer struct {
 	cluster *cluster
 
 	store store.Store
+	kv    dstorage.KV
 
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
@@ -315,6 +318,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		forceVersionC: make(chan struct{}),
 	}
 
+	if cfg.V3demo {
+		srv.kv = dstorage.New(path.Join(cfg.DataDir, "member", "v3demo"))
+	} else {
+		// we do not care about the error of the removal
+		os.RemoveAll(path.Join(cfg.DataDir, "member", "v3demo"))
+	}
+
 	// TODO: move transport initialization near the definition of remote
 	tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats)
 	// add all remotes into transport

+ 62 - 0
etcdserver/v3demo_server.go

@@ -0,0 +1,62 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type V3DemoServer interface {
+	V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message
+}
+
+func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message {
+	switch {
+	case r.Range != nil:
+		rr := r.Range
+		resp := &pb.RangeResponse{}
+		resp.Header = &pb.ResponseHeader{}
+		kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0)
+		if err != nil {
+			panic("not handled error")
+		}
+
+		resp.Header.Index = rev
+		for i := range kvs {
+			resp.Kvs = append(resp.Kvs, &kvs[i])
+		}
+		return resp
+	case r.Put != nil:
+		rp := r.Put
+		resp := &pb.PutResponse{}
+		resp.Header = &pb.ResponseHeader{}
+		rev := s.kv.Put(rp.Key, rp.Value)
+		resp.Header.Index = rev
+		return resp
+	case r.DeleteRange != nil:
+		rd := r.DeleteRange
+		resp := &pb.DeleteRangeResponse{}
+		resp.Header = &pb.ResponseHeader{}
+		_, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd)
+		resp.Header.Index = rev
+		return resp
+	case r.Txn != nil:
+		panic("not implemented")
+	default:
+		panic("not implemented")
+	}
+}