snapshot_merge.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "io"
  17. "go.etcd.io/etcd/etcdserver/api/snap"
  18. "go.etcd.io/etcd/mvcc/backend"
  19. "go.etcd.io/etcd/raft/raftpb"
  20. humanize "github.com/dustin/go-humanize"
  21. "go.uber.org/zap"
  22. )
  23. // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
  24. // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
  25. // as ReadCloser.
  26. func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
  27. // get a snapshot of v2 store as []byte
  28. clone := s.v2store.Clone()
  29. d, err := clone.SaveNoCopy()
  30. if err != nil {
  31. if lg := s.getLogger(); lg != nil {
  32. lg.Panic("failed to save v2 store data", zap.Error(err))
  33. } else {
  34. plog.Panicf("store save should never fail: %v", err)
  35. }
  36. }
  37. // commit kv to write metadata(for example: consistent index).
  38. s.KV().Commit()
  39. dbsnap := s.be.Snapshot()
  40. // get a snapshot of v3 KV as readCloser
  41. rc := newSnapshotReaderCloser(s.getLogger(), dbsnap)
  42. // put the []byte snapshot of store into raft snapshot and return the merged snapshot with
  43. // KV readCloser snapshot.
  44. snapshot := raftpb.Snapshot{
  45. Metadata: raftpb.SnapshotMetadata{
  46. Index: snapi,
  47. Term: snapt,
  48. ConfState: confState,
  49. },
  50. Data: d,
  51. }
  52. m.Snapshot = snapshot
  53. return *snap.NewMessage(m, rc, dbsnap.Size())
  54. }
  55. func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadCloser {
  56. pr, pw := io.Pipe()
  57. go func() {
  58. n, err := snapshot.WriteTo(pw)
  59. if err == nil {
  60. if lg != nil {
  61. lg.Info(
  62. "sent database snapshot to writer",
  63. zap.Int64("bytes", n),
  64. zap.String("size", humanize.Bytes(uint64(n))),
  65. )
  66. } else {
  67. plog.Infof("wrote database snapshot out [total bytes: %d]", n)
  68. }
  69. } else {
  70. if lg != nil {
  71. lg.Warn(
  72. "failed to send database snapshot to writer",
  73. zap.String("size", humanize.Bytes(uint64(n))),
  74. zap.Error(err),
  75. )
  76. } else {
  77. plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
  78. }
  79. }
  80. pw.CloseWithError(err)
  81. err = snapshot.Close()
  82. if err != nil {
  83. if lg != nil {
  84. lg.Panic("failed to close database snapshot", zap.Error(err))
  85. } else {
  86. plog.Panicf("failed to close database snapshot: %v", err)
  87. }
  88. }
  89. }()
  90. return pr
  91. }