Преглед на файлове

Merge pull request #5012 from heyitsanthony/snap-api

*: snapshot RPC
Anthony Romano преди 9 години
родител
ревизия
be822b05d2

+ 31 - 0
clientv3/maintenance.go

@@ -15,6 +15,7 @@
 package clientv3
 
 import (
+	"io"
 	"sync"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -47,6 +48,9 @@ type Maintenance interface {
 
 	// Status gets the status of the member.
 	Status(ctx context.Context, endpoint string) (*StatusResponse, error)
+
+	// Snapshot provides a reader for a snapshot of a backend.
+	Snapshot(ctx context.Context) (io.ReadCloser, error)
 }
 
 type maintenance struct {
@@ -145,6 +149,33 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
 	return (*StatusResponse)(resp), nil
 }
 
+func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
+	ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{})
+	if err != nil {
+		return nil, err
+	}
+
+	pr, pw := io.Pipe()
+	go func() {
+		for {
+			resp, err := ss.Recv()
+			if err != nil {
+				pw.CloseWithError(err)
+				return
+			}
+			if resp == nil && err == nil {
+				break
+			}
+			if _, werr := pw.Write(resp.Blob); werr != nil {
+				pw.CloseWithError(werr)
+				return
+			}
+		}
+		pw.Close()
+	}()
+	return pr, nil
+}
+
 func (m *maintenance) getRemote() pb.MaintenanceClient {
 	m.mu.Lock()
 	defer m.mu.Unlock()

+ 206 - 65
etcdctl/ctlv3/command/snapshot_command.go

@@ -15,56 +15,81 @@
 package command
 
 import (
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
+	"path"
+	"strings"
 
-	"github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/clientv3/mirror"
-	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/membership"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/storage"
+	"github.com/coreos/etcd/storage/backend"
+	"github.com/coreos/etcd/wal"
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 )
 
+const (
+	defaultName                     = "default"
+	defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001"
+)
+
+var (
+	restoreCluster      string
+	restoreClusterToken string
+	restoreDataDir      string
+	restorePeerURLs     string
+	restoreName         string
+)
+
 // NewSnapshotCommand returns the cobra command for "snapshot".
 func NewSnapshotCommand() *cobra.Command {
-	return &cobra.Command{
-		Use:   "snapshot [filename]",
-		Short: "Snapshot streams a point-in-time snapshot of the store",
-		Run:   snapshotCommandFunc,
+	cmd := &cobra.Command{
+		Use:   "snapshot",
+		Short: "snapshot manages etcd node snapshots.",
 	}
+	cmd.AddCommand(NewSnapshotSaveCommand())
+	cmd.AddCommand(NewSnapshotRestoreCommand())
+	return cmd
 }
 
-// snapshotCommandFunc watches for the length of the entire store and records
-// to a file.
-func snapshotCommandFunc(cmd *cobra.Command, args []string) {
-	switch {
-	case len(args) == 0:
-		snapshotToStdout(mustClientFromCmd(cmd))
-	case len(args) == 1:
-		snapshotToFile(mustClientFromCmd(cmd), args[0])
-	default:
-		err := fmt.Errorf("snapshot takes at most one argument")
-		ExitWithError(ExitBadArgs, err)
+func NewSnapshotSaveCommand() *cobra.Command {
+	return &cobra.Command{
+		Use:   "save <filename>",
+		Short: "save stores an etcd node backend snapshot to a given file.",
+		Run:   snapshotSaveCommandFunc,
 	}
 }
 
-// snapshotToStdout streams a snapshot over stdout
-func snapshotToStdout(c *clientv3.Client) {
-	// must explicitly fetch first revision since no retry on stdout
-	wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
-	if wr.Err() == nil {
-		wr.CompactRevision = 1
+func NewSnapshotRestoreCommand() *cobra.Command {
+	cmd := &cobra.Command{
+		Use:   "restore <filename>",
+		Short: "restore an etcd node snapshot to an etcd directory",
+		Run:   snapshotRestoreCommandFunc,
 	}
-	if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 {
-		err := fmt.Errorf("snapshot interrupted by compaction %v", rev)
-		ExitWithError(ExitInterrupted, err)
-	}
-	os.Stdout.Sync()
+	cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.")
+	cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.")
+	cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.")
+	cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.")
+	cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.")
+
+	return cmd
 }
 
-// snapshotToFile atomically writes a snapshot to a file
-func snapshotToFile(c *clientv3.Client, path string) {
+func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 1 {
+		err := fmt.Errorf("snapshot save expects one argument")
+		ExitWithError(ExitBadArgs, err)
+	}
+
+	path := args[0]
+
 	partpath := path + ".part"
 	f, err := os.Create(partpath)
 	defer f.Close()
@@ -72,56 +97,172 @@ func snapshotToFile(c *clientv3.Client, path string) {
 		exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
 		ExitWithError(ExitBadArgs, exiterr)
 	}
-	rev := int64(1)
-	for rev != 0 {
-		f.Seek(0, 0)
-		f.Truncate(0)
-		rev = snapshot(f, c, rev)
+
+	c := mustClientFromCmd(cmd)
+	r, serr := c.Snapshot(context.TODO())
+	if serr != nil {
+		os.RemoveAll(partpath)
+		ExitWithError(ExitInterrupted, serr)
+	}
+	if _, rerr := io.Copy(f, r); rerr != nil {
+		os.RemoveAll(partpath)
+		ExitWithError(ExitInterrupted, rerr)
 	}
+
 	f.Sync()
-	if err := os.Rename(partpath, path); err != nil {
-		exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, err)
+
+	if rerr := os.Rename(partpath, path); rerr != nil {
+		exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
 		ExitWithError(ExitIO, exiterr)
 	}
 }
 
-// snapshot reads all of a watcher; returns compaction revision if incomplete
-// TODO: stabilize snapshot format
-func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
-	s := mirror.NewSyncer(c, "", rev)
+func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 1 {
+		err := fmt.Errorf("snapshot restore exactly one argument")
+		ExitWithError(ExitBadArgs, err)
+	}
 
-	rc, errc := s.SyncBase(context.TODO())
+	urlmap, uerr := types.NewURLsMap(restoreCluster)
+	if uerr != nil {
+		ExitWithError(ExitBadArgs, uerr)
+	}
 
-	for r := range rc {
-		for _, kv := range r.Kvs {
-			fmt.Fprintln(w, kv)
-		}
+	cfg := etcdserver.ServerConfig{
+		InitialClusterToken: restoreClusterToken,
+		InitialPeerURLsMap:  urlmap,
+		PeerURLs:            types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
+		Name:                restoreName,
+	}
+	if err := cfg.VerifyBootstrap(); err != nil {
+		ExitWithError(ExitBadArgs, err)
 	}
 
-	err := <-errc
-	if err != nil {
-		if err == rpctypes.ErrCompacted {
-			// will get correct compact revision on retry
-			return rev + 1
-		}
-		// failed for some unknown reason, retry on same revision
-		return rev
+	cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
+	if cerr != nil {
+		ExitWithError(ExitBadArgs, cerr)
 	}
 
-	wc := s.SyncUpdates(context.TODO())
+	basedir := restoreDataDir
+	if basedir == "" {
+		basedir = restoreName + ".etcd"
+	}
+
+	waldir := path.Join(basedir, "member", "wal")
+	snapdir := path.Join(basedir, "member", "snap")
 
-	for wr := range wc {
-		if wr.Err() != nil {
-			return wr.CompactRevision
+	if _, err := os.Stat(basedir); err == nil {
+		ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
+	}
+
+	makeDB(snapdir, args[0])
+	makeWAL(waldir, cl)
+}
+
+func initialClusterFromName(name string) string {
+	n := name
+	if name == "" {
+		n = defaultName
+	}
+	return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
+}
+
+// makeWAL creates a WAL for the initial cluster
+func makeWAL(waldir string, cl *membership.RaftCluster) {
+	if err := os.MkdirAll(waldir, 0755); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+
+	m := cl.MemberByName(restoreName)
+	md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
+	metadata, merr := md.Marshal()
+	if merr != nil {
+		ExitWithError(ExitInvalidInput, merr)
+	}
+
+	w, walerr := wal.Create(waldir, metadata)
+	if walerr != nil {
+		ExitWithError(ExitIO, walerr)
+	}
+	defer w.Close()
+
+	peers := make([]raft.Peer, len(cl.MemberIDs()))
+	for i, id := range cl.MemberIDs() {
+		ctx, err := json.Marshal((*cl).Member(id))
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
 		}
-		for _, ev := range wr.Events {
-			fmt.Fprintln(w, ev)
+		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
+	}
+
+	ents := make([]raftpb.Entry, len(peers))
+	for i, p := range peers {
+		cc := raftpb.ConfChange{
+			Type:    raftpb.ConfChangeAddNode,
+			NodeID:  p.ID,
+			Context: p.Context}
+		d, err := cc.Marshal()
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
 		}
-		rev := wr.Events[len(wr.Events)-1].Kv.ModRevision
-		if rev >= wr.Header.Revision {
-			break
+		e := raftpb.Entry{
+			Type:  raftpb.EntryConfChange,
+			Term:  1,
+			Index: uint64(i + 1),
+			Data:  d,
 		}
+		ents[i] = e
 	}
 
-	return 0
+	w.Save(raftpb.HardState{
+		Term:   1,
+		Vote:   peers[0].ID,
+		Commit: uint64(len(ents))}, ents)
+}
+
+// initIndex implements ConsistentIndexGetter so the snapshot won't block
+// the new raft instance by waiting for a future raft index.
+type initIndex struct{}
+
+func (*initIndex) ConsistentIndex() uint64 { return 1 }
+
+// makeDB copies the database snapshot to the snapshot directory
+func makeDB(snapdir, dbfile string) {
+	f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
+	if ferr != nil {
+		ExitWithError(ExitInvalidInput, ferr)
+	}
+	defer f.Close()
+
+	if err := os.MkdirAll(snapdir, 0755); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+
+	dbpath := path.Join(snapdir, "db")
+	db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600)
+	if dberr != nil {
+		ExitWithError(ExitIO, dberr)
+	}
+	if _, err := io.Copy(db, f); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+	db.Close()
+
+	// update consistentIndex so applies go through on etcdserver despite
+	// having a new raft instance
+	be := backend.NewDefaultBackend(dbpath)
+	s := storage.NewStore(be, nil, &initIndex{})
+	id := s.TxnBegin()
+	btx := be.BatchTx()
+	del := func(k, v []byte) error {
+		_, _, err := s.TxnDeleteRange(id, k, nil)
+		return err
+	}
+	// delete stored members from old cluster since using new members
+	btx.UnsafeForEach([]byte("members"), del)
+	btx.UnsafeForEach([]byte("members_removed"), del)
+	// trigger write-out of new consistent index
+	s.TxnEnd(id)
+	s.Commit()
+	s.Close()
 }

+ 37 - 0
etcdserver/api/v3rpc/maintenance.go

@@ -15,6 +15,8 @@
 package v3rpc
 
 import (
+	"io"
+
 	"github.com/coreos/etcd/etcdserver"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/storage/backend"
@@ -51,6 +53,41 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
 	return &pb.DefragmentResponse{}, nil
 }
 
+func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
+	snap := ms.bg.Backend().Snapshot()
+	pr, pw := io.Pipe()
+
+	defer pr.Close()
+
+	go func() {
+		snap.WriteTo(pw)
+		if err := snap.Close(); err != nil {
+			plog.Errorf("error closing snapshot (%v)", err)
+		}
+		pw.Close()
+	}()
+
+	br := int64(0)
+	buf := make([]byte, 32*1024)
+	sz := snap.Size()
+	for br < sz {
+		n, err := io.ReadFull(pr, buf)
+		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
+			return togRPCError(err)
+		}
+		br += int64(n)
+		resp := &pb.SnapshotResponse{
+			RemainingBytes: uint64(sz - br),
+			Blob:           buf[:n],
+		}
+		if err = srv.Send(resp); err != nil {
+			return togRPCError(err)
+		}
+	}
+
+	return nil
+}
+
 func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
 	h, err := ms.bg.Backend().Hash()
 	if err != nil {

+ 4 - 2
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -31,6 +31,8 @@
 		CompactionResponse
 		HashRequest
 		HashResponse
+		SnapshotRequest
+		SnapshotResponse
 		WatchRequest
 		WatchCreateRequest
 		WatchCancelRequest
@@ -92,10 +94,10 @@ import (
 	"fmt"
 
 	proto "github.com/gogo/protobuf/proto"
-
-	math "math"
 )
 
+import math "math"
+
 import io "io"
 
 // Reference imports to suppress errors if they are not otherwise used.

+ 2 - 2
etcdserver/etcdserverpb/raft_internal.pb.go

@@ -8,10 +8,10 @@ import (
 	"fmt"
 
 	proto "github.com/gogo/protobuf/proto"
-
-	math "math"
 )
 
+import math "math"
+
 import io "io"
 
 // Reference imports to suppress errors if they are not otherwise used.

+ 426 - 67
etcdserver/etcdserverpb/rpc.pb.go

@@ -8,21 +8,20 @@ import (
 	"fmt"
 
 	proto "github.com/gogo/protobuf/proto"
-
-	math "math"
-
-	authpb "github.com/coreos/etcd/auth/authpb"
-
-	io "io"
 )
 
+import math "math"
+
 import storagepb "github.com/coreos/etcd/storage/storagepb"
+import authpb "github.com/coreos/etcd/auth/authpb"
 
 import (
 	context "golang.org/x/net/context"
 	grpc "google.golang.org/grpc"
 )
 
+import io "io"
+
 // Reference imports to suppress errors if they are not otherwise used.
 var _ = proto.Marshal
 var _ = fmt.Errorf
@@ -815,6 +814,34 @@ func (m *HashResponse) GetHeader() *ResponseHeader {
 	return nil
 }
 
+type SnapshotRequest struct {
+}
+
+func (m *SnapshotRequest) Reset()         { *m = SnapshotRequest{} }
+func (m *SnapshotRequest) String() string { return proto.CompactTextString(m) }
+func (*SnapshotRequest) ProtoMessage()    {}
+
+type SnapshotResponse struct {
+	// header has the current store information. The first header in the snapshot
+	// stream indicates the point in time of the snapshot.
+	Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
+	// remaining_bytes is the number of blob bytes to be sent after this message
+	RemainingBytes uint64 `protobuf:"varint,2,opt,name=remaining_bytes,proto3" json:"remaining_bytes,omitempty"`
+	// blob has the next chunk of the snapshot in the snapshot stream.
+	Blob []byte `protobuf:"bytes,3,opt,name=blob,proto3" json:"blob,omitempty"`
+}
+
+func (m *SnapshotResponse) Reset()         { *m = SnapshotResponse{} }
+func (m *SnapshotResponse) String() string { return proto.CompactTextString(m) }
+func (*SnapshotResponse) ProtoMessage()    {}
+
+func (m *SnapshotResponse) GetHeader() *ResponseHeader {
+	if m != nil {
+		return m.Header
+	}
+	return nil
+}
+
 type WatchRequest struct {
 	// Types that are valid to be assigned to RequestUnion:
 	//	*WatchRequest_CreateRequest
@@ -1616,6 +1643,8 @@ func init() {
 	proto.RegisterType((*CompactionResponse)(nil), "etcdserverpb.CompactionResponse")
 	proto.RegisterType((*HashRequest)(nil), "etcdserverpb.HashRequest")
 	proto.RegisterType((*HashResponse)(nil), "etcdserverpb.HashResponse")
+	proto.RegisterType((*SnapshotRequest)(nil), "etcdserverpb.SnapshotRequest")
+	proto.RegisterType((*SnapshotResponse)(nil), "etcdserverpb.SnapshotResponse")
 	proto.RegisterType((*WatchRequest)(nil), "etcdserverpb.WatchRequest")
 	proto.RegisterType((*WatchCreateRequest)(nil), "etcdserverpb.WatchCreateRequest")
 	proto.RegisterType((*WatchCancelRequest)(nil), "etcdserverpb.WatchCancelRequest")
@@ -2296,6 +2325,8 @@ type MaintenanceClient interface {
 	// This is designed for testing; do not use this in production when there
 	// are ongoing transactions.
 	Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
+	// Snapshot sends a snapshot of the entire backend
+	Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error)
 }
 
 type maintenanceClient struct {
@@ -2342,6 +2373,38 @@ func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...g
 	return out, nil
 }
 
+func (c *maintenanceClient) Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_Maintenance_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Maintenance/Snapshot", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &maintenanceSnapshotClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type Maintenance_SnapshotClient interface {
+	Recv() (*SnapshotResponse, error)
+	grpc.ClientStream
+}
+
+type maintenanceSnapshotClient struct {
+	grpc.ClientStream
+}
+
+func (x *maintenanceSnapshotClient) Recv() (*SnapshotResponse, error) {
+	m := new(SnapshotResponse)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
 // Server API for Maintenance service
 
 type MaintenanceServer interface {
@@ -2354,6 +2417,8 @@ type MaintenanceServer interface {
 	// This is designed for testing; do not use this in production when there
 	// are ongoing transactions.
 	Hash(context.Context, *HashRequest) (*HashResponse, error)
+	// Snapshot sends a snapshot of the entire backend
+	Snapshot(*SnapshotRequest, Maintenance_SnapshotServer) error
 }
 
 func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
@@ -2408,6 +2473,27 @@ func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(in
 	return out, nil
 }
 
+func _Maintenance_Snapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(SnapshotRequest)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(MaintenanceServer).Snapshot(m, &maintenanceSnapshotServer{stream})
+}
+
+type Maintenance_SnapshotServer interface {
+	Send(*SnapshotResponse) error
+	grpc.ServerStream
+}
+
+type maintenanceSnapshotServer struct {
+	grpc.ServerStream
+}
+
+func (x *maintenanceSnapshotServer) Send(m *SnapshotResponse) error {
+	return x.ServerStream.SendMsg(m)
+}
+
 var _Maintenance_serviceDesc = grpc.ServiceDesc{
 	ServiceName: "etcdserverpb.Maintenance",
 	HandlerType: (*MaintenanceServer)(nil),
@@ -2429,7 +2515,13 @@ var _Maintenance_serviceDesc = grpc.ServiceDesc{
 			Handler:    _Maintenance_Hash_Handler,
 		},
 	},
-	Streams: []grpc.StreamDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Snapshot",
+			Handler:       _Maintenance_Snapshot_Handler,
+			ServerStreams: true,
+		},
+	},
 }
 
 // Client API for Auth service
@@ -3578,6 +3670,65 @@ func (m *HashResponse) MarshalTo(data []byte) (int, error) {
 	return i, nil
 }
 
+func (m *SnapshotRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *SnapshotRequest) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	return i, nil
+}
+
+func (m *SnapshotResponse) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *SnapshotResponse) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Header != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
+		n16, err := m.Header.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n16
+	}
+	if m.RemainingBytes != 0 {
+		data[i] = 0x10
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.RemainingBytes))
+	}
+	if m.Blob != nil {
+		if len(m.Blob) > 0 {
+			data[i] = 0x1a
+			i++
+			i = encodeVarintRpc(data, i, uint64(len(m.Blob)))
+			i += copy(data[i:], m.Blob)
+		}
+	}
+	return i, nil
+}
+
 func (m *WatchRequest) Marshal() (data []byte, err error) {
 	size := m.Size()
 	data = make([]byte, size)
@@ -3594,11 +3745,11 @@ func (m *WatchRequest) MarshalTo(data []byte) (int, error) {
 	var l int
 	_ = l
 	if m.RequestUnion != nil {
-		nn16, err := m.RequestUnion.MarshalTo(data[i:])
+		nn17, err := m.RequestUnion.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += nn16
+		i += nn17
 	}
 	return i, nil
 }
@@ -3609,11 +3760,11 @@ func (m *WatchRequest_CreateRequest) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.CreateRequest.Size()))
-		n17, err := m.CreateRequest.MarshalTo(data[i:])
+		n18, err := m.CreateRequest.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n17
+		i += n18
 	}
 	return i, nil
 }
@@ -3623,11 +3774,11 @@ func (m *WatchRequest_CancelRequest) MarshalTo(data []byte) (int, error) {
 		data[i] = 0x12
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.CancelRequest.Size()))
-		n18, err := m.CancelRequest.MarshalTo(data[i:])
+		n19, err := m.CancelRequest.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n18
+		i += n19
 	}
 	return i, nil
 }
@@ -3722,11 +3873,11 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n19, err := m.Header.MarshalTo(data[i:])
+		n20, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n19
+		i += n20
 	}
 	if m.WatchId != 0 {
 		data[i] = 0x10
@@ -3820,11 +3971,11 @@ func (m *LeaseGrantResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n20, err := m.Header.MarshalTo(data[i:])
+		n21, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n20
+		i += n21
 	}
 	if m.ID != 0 {
 		data[i] = 0x10
@@ -3887,11 +4038,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n21, err := m.Header.MarshalTo(data[i:])
+		n22, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n21
+		i += n22
 	}
 	return i, nil
 }
@@ -3938,11 +4089,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n22, err := m.Header.MarshalTo(data[i:])
+		n23, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n22
+		i += n23
 	}
 	if m.ID != 0 {
 		data[i] = 0x10
@@ -4078,21 +4229,21 @@ func (m *MemberAddResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n23, err := m.Header.MarshalTo(data[i:])
+		n24, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n23
+		i += n24
 	}
 	if m.Member != nil {
 		data[i] = 0x12
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Member.Size()))
-		n24, err := m.Member.MarshalTo(data[i:])
+		n25, err := m.Member.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n24
+		i += n25
 	}
 	return i, nil
 }
@@ -4139,11 +4290,11 @@ func (m *MemberRemoveResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n25, err := m.Header.MarshalTo(data[i:])
+		n26, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n25
+		i += n26
 	}
 	return i, nil
 }
@@ -4205,11 +4356,11 @@ func (m *MemberUpdateResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n26, err := m.Header.MarshalTo(data[i:])
+		n27, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n26
+		i += n27
 	}
 	return i, nil
 }
@@ -4251,11 +4402,11 @@ func (m *MemberListResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n27, err := m.Header.MarshalTo(data[i:])
+		n28, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n27
+		i += n28
 	}
 	if len(m.Members) > 0 {
 		for _, msg := range m.Members {
@@ -4309,11 +4460,11 @@ func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n28, err := m.Header.MarshalTo(data[i:])
+		n29, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n28
+		i += n29
 	}
 	return i, nil
 }
@@ -4398,11 +4549,11 @@ func (m *AlarmResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n29, err := m.Header.MarshalTo(data[i:])
+		n30, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n29
+		i += n30
 	}
 	if len(m.Alarms) > 0 {
 		for _, msg := range m.Alarms {
@@ -4456,11 +4607,11 @@ func (m *StatusResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n30, err := m.Header.MarshalTo(data[i:])
+		n31, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n30
+		i += n31
 	}
 	if len(m.Version) > 0 {
 		data[i] = 0x12
@@ -4760,11 +4911,11 @@ func (m *AuthRoleGrantRequest) MarshalTo(data []byte) (int, error) {
 		data[i] = 0x12
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Perm.Size()))
-		n31, err := m.Perm.MarshalTo(data[i:])
+		n32, err := m.Perm.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n31
+		i += n32
 	}
 	return i, nil
 }
@@ -4806,11 +4957,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n32, err := m.Header.MarshalTo(data[i:])
+		n33, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n32
+		i += n33
 	}
 	return i, nil
 }
@@ -4834,11 +4985,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n33, err := m.Header.MarshalTo(data[i:])
+		n34, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n33
+		i += n34
 	}
 	return i, nil
 }
@@ -4862,11 +5013,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n34, err := m.Header.MarshalTo(data[i:])
+		n35, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n34
+		i += n35
 	}
 	return i, nil
 }
@@ -4890,11 +5041,11 @@ func (m *AuthUserAddResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n35, err := m.Header.MarshalTo(data[i:])
+		n36, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n35
+		i += n36
 	}
 	return i, nil
 }
@@ -4918,11 +5069,11 @@ func (m *AuthUserGetResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n36, err := m.Header.MarshalTo(data[i:])
+		n37, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n36
+		i += n37
 	}
 	return i, nil
 }
@@ -4946,11 +5097,11 @@ func (m *AuthUserDeleteResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n37, err := m.Header.MarshalTo(data[i:])
+		n38, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n37
+		i += n38
 	}
 	return i, nil
 }
@@ -4974,11 +5125,11 @@ func (m *AuthUserChangePasswordResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n38, err := m.Header.MarshalTo(data[i:])
+		n39, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n38
+		i += n39
 	}
 	return i, nil
 }
@@ -5002,11 +5153,11 @@ func (m *AuthUserGrantResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n39, err := m.Header.MarshalTo(data[i:])
+		n40, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n39
+		i += n40
 	}
 	return i, nil
 }
@@ -5030,11 +5181,11 @@ func (m *AuthUserRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n40, err := m.Header.MarshalTo(data[i:])
+		n41, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n40
+		i += n41
 	}
 	return i, nil
 }
@@ -5058,11 +5209,11 @@ func (m *AuthRoleAddResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n41, err := m.Header.MarshalTo(data[i:])
+		n42, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n41
+		i += n42
 	}
 	return i, nil
 }
@@ -5086,11 +5237,11 @@ func (m *AuthRoleGetResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n42, err := m.Header.MarshalTo(data[i:])
+		n43, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n42
+		i += n43
 	}
 	return i, nil
 }
@@ -5114,11 +5265,11 @@ func (m *AuthRoleDeleteResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n43, err := m.Header.MarshalTo(data[i:])
+		n44, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n43
+		i += n44
 	}
 	return i, nil
 }
@@ -5142,11 +5293,11 @@ func (m *AuthRoleGrantResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n44, err := m.Header.MarshalTo(data[i:])
+		n45, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n44
+		i += n45
 	}
 	return i, nil
 }
@@ -5170,11 +5321,11 @@ func (m *AuthRoleRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n45, err := m.Header.MarshalTo(data[i:])
+		n46, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n45
+		i += n46
 	}
 	return i, nil
 }
@@ -5542,6 +5693,31 @@ func (m *HashResponse) Size() (n int) {
 	return n
 }
 
+func (m *SnapshotRequest) Size() (n int) {
+	var l int
+	_ = l
+	return n
+}
+
+func (m *SnapshotResponse) Size() (n int) {
+	var l int
+	_ = l
+	if m.Header != nil {
+		l = m.Header.Size()
+		n += 1 + l + sovRpc(uint64(l))
+	}
+	if m.RemainingBytes != 0 {
+		n += 1 + sovRpc(uint64(m.RemainingBytes))
+	}
+	if m.Blob != nil {
+		l = len(m.Blob)
+		if l > 0 {
+			n += 1 + l + sovRpc(uint64(l))
+		}
+	}
+	return n
+}
+
 func (m *WatchRequest) Size() (n int) {
 	var l int
 	_ = l
@@ -8180,6 +8356,189 @@ func (m *HashResponse) Unmarshal(data []byte) error {
 	}
 	return nil
 }
+func (m *SnapshotRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowRpc
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: SnapshotRequest: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: SnapshotRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		default:
+			iNdEx = preIndex
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *SnapshotResponse) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowRpc
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: SnapshotResponse: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: SnapshotResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthRpc
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Header == nil {
+				m.Header = &ResponseHeader{}
+			}
+			if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field RemainingBytes", wireType)
+			}
+			m.RemainingBytes = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.RemainingBytes |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Blob", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if byteLen < 0 {
+				return ErrInvalidLengthRpc
+			}
+			postIndex := iNdEx + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Blob = append(m.Blob[:0], data[iNdEx:postIndex]...)
+			if m.Blob == nil {
+				m.Blob = []byte{}
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
 func (m *WatchRequest) Unmarshal(data []byte) error {
 	l := len(data)
 	iNdEx := 0

+ 18 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -85,6 +85,9 @@ service Maintenance {
   // This is designed for testing; do not use this in production when there
   // are ongoing transactions.
   rpc Hash(HashRequest) returns (HashResponse) {}
+
+  // Snapshot sends a snapshot of the entire backend
+  rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {}
 }
 
 service Auth {
@@ -311,6 +314,21 @@ message HashResponse {
   uint32 hash = 2;
 }
 
+message SnapshotRequest {
+}
+
+message SnapshotResponse {
+  // header has the current store information. The first header in the snapshot
+  // stream indicates the point in time of the snapshot.
+  ResponseHeader header = 1;
+
+  // remaining_bytes is the number of blob bytes to be sent after this message
+  uint64 remaining_bytes = 2;
+
+  // blob has the next chunk of the snapshot in the snapshot stream.
+  bytes blob = 3;
+}
+
 message WatchRequest {
   oneof request_union {
     WatchCreateRequest create_request = 1;

+ 6 - 1
storage/backend/batch_tx.go

@@ -125,7 +125,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 
 // UnsafeForEach must be called holding the lock on the tx.
 func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
-	return t.tx.Bucket(bucketName).ForEach(visitor)
+	b := t.tx.Bucket(bucketName)
+	if b == nil {
+		// bucket does not exist
+		return nil
+	}
+	return b.ForEach(visitor)
 }
 
 // Commit commits a previous tx and begins a new writable one.