snapshot.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package raft
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "hash/crc32"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
  10. "github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
  11. )
  12. // Snapshot represents an in-memory representation of the current state of the system.
  13. type Snapshot struct {
  14. LastIndex uint64 `json:"lastIndex"`
  15. LastTerm uint64 `json:"lastTerm"`
  16. // Cluster configuration.
  17. Peers []*Peer `json:"peers"`
  18. State []byte `json:"state"`
  19. Path string `json:"path"`
  20. }
  21. // The request sent to a server to start from the snapshot.
  22. type SnapshotRecoveryRequest struct {
  23. LeaderName string
  24. LastIndex uint64
  25. LastTerm uint64
  26. Peers []*Peer
  27. State []byte
  28. }
  29. // The response returned from a server appending entries to the log.
  30. type SnapshotRecoveryResponse struct {
  31. Term uint64
  32. Success bool
  33. CommitIndex uint64
  34. }
  35. // The request sent to a server to start from the snapshot.
  36. type SnapshotRequest struct {
  37. LeaderName string
  38. LastIndex uint64
  39. LastTerm uint64
  40. }
  41. // The response returned if the follower entered snapshot state
  42. type SnapshotResponse struct {
  43. Success bool `json:"success"`
  44. }
  45. // save writes the snapshot to file.
  46. func (ss *Snapshot) save() error {
  47. // Open the file for writing.
  48. file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)
  49. if err != nil {
  50. return err
  51. }
  52. defer file.Close()
  53. // Serialize to JSON.
  54. b, err := json.Marshal(ss)
  55. if err != nil {
  56. return err
  57. }
  58. // Generate checksum and write it to disk.
  59. checksum := crc32.ChecksumIEEE(b)
  60. if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
  61. return err
  62. }
  63. // Write the snapshot to disk.
  64. if _, err = file.Write(b); err != nil {
  65. return err
  66. }
  67. // Ensure that the snapshot has been flushed to disk before continuing.
  68. if err := file.Sync(); err != nil {
  69. return err
  70. }
  71. return nil
  72. }
  73. // remove deletes the snapshot file.
  74. func (ss *Snapshot) remove() error {
  75. if err := os.Remove(ss.Path); err != nil {
  76. return err
  77. }
  78. return nil
  79. }
  80. // Creates a new Snapshot request.
  81. func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
  82. return &SnapshotRecoveryRequest{
  83. LeaderName: leaderName,
  84. LastIndex: snapshot.LastIndex,
  85. LastTerm: snapshot.LastTerm,
  86. Peers: snapshot.Peers,
  87. State: snapshot.State,
  88. }
  89. }
  90. // Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
  91. // written and any error that may have occurred.
  92. func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
  93. protoPeers := make([]*protobuf.SnapshotRecoveryRequest_Peer, len(req.Peers))
  94. for i, peer := range req.Peers {
  95. protoPeers[i] = &protobuf.SnapshotRecoveryRequest_Peer{
  96. Name: proto.String(peer.Name),
  97. ConnectionString: proto.String(peer.ConnectionString),
  98. }
  99. }
  100. pb := &protobuf.SnapshotRecoveryRequest{
  101. LeaderName: proto.String(req.LeaderName),
  102. LastIndex: proto.Uint64(req.LastIndex),
  103. LastTerm: proto.Uint64(req.LastTerm),
  104. Peers: protoPeers,
  105. State: req.State,
  106. }
  107. p, err := proto.Marshal(pb)
  108. if err != nil {
  109. return -1, err
  110. }
  111. return w.Write(p)
  112. }
  113. // Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
  114. // any error that occurs.
  115. func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
  116. data, err := ioutil.ReadAll(r)
  117. if err != nil {
  118. return 0, err
  119. }
  120. totalBytes := len(data)
  121. pb := &protobuf.SnapshotRecoveryRequest{}
  122. if err = proto.Unmarshal(data, pb); err != nil {
  123. return -1, err
  124. }
  125. req.LeaderName = pb.GetLeaderName()
  126. req.LastIndex = pb.GetLastIndex()
  127. req.LastTerm = pb.GetLastTerm()
  128. req.State = pb.GetState()
  129. req.Peers = make([]*Peer, len(pb.Peers))
  130. for i, peer := range pb.Peers {
  131. req.Peers[i] = &Peer{
  132. Name: peer.GetName(),
  133. ConnectionString: peer.GetConnectionString(),
  134. }
  135. }
  136. return totalBytes, nil
  137. }
  138. // Creates a new Snapshot response.
  139. func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64) *SnapshotRecoveryResponse {
  140. return &SnapshotRecoveryResponse{
  141. Term: term,
  142. Success: success,
  143. CommitIndex: commitIndex,
  144. }
  145. }
  146. // Encode writes the response to a writer.
  147. // Returns the number of bytes written and any error that occurs.
  148. func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
  149. pb := &protobuf.SnapshotRecoveryResponse{
  150. Term: proto.Uint64(req.Term),
  151. Success: proto.Bool(req.Success),
  152. CommitIndex: proto.Uint64(req.CommitIndex),
  153. }
  154. p, err := proto.Marshal(pb)
  155. if err != nil {
  156. return -1, err
  157. }
  158. return w.Write(p)
  159. }
  160. // Decodes the SnapshotRecoveryResponse from a buffer.
  161. func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
  162. data, err := ioutil.ReadAll(r)
  163. if err != nil {
  164. return 0, err
  165. }
  166. totalBytes := len(data)
  167. pb := &protobuf.SnapshotRecoveryResponse{}
  168. if err := proto.Unmarshal(data, pb); err != nil {
  169. return -1, err
  170. }
  171. req.Term = pb.GetTerm()
  172. req.Success = pb.GetSuccess()
  173. req.CommitIndex = pb.GetCommitIndex()
  174. return totalBytes, nil
  175. }
  176. // Creates a new Snapshot request.
  177. func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest {
  178. return &SnapshotRequest{
  179. LeaderName: leaderName,
  180. LastIndex: snapshot.LastIndex,
  181. LastTerm: snapshot.LastTerm,
  182. }
  183. }
  184. // Encodes the SnapshotRequest to a buffer. Returns the number of bytes
  185. // written and any error that may have occurred.
  186. func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
  187. pb := &protobuf.SnapshotRequest{
  188. LeaderName: proto.String(req.LeaderName),
  189. LastIndex: proto.Uint64(req.LastIndex),
  190. LastTerm: proto.Uint64(req.LastTerm),
  191. }
  192. p, err := proto.Marshal(pb)
  193. if err != nil {
  194. return -1, err
  195. }
  196. return w.Write(p)
  197. }
  198. // Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
  199. // any error that occurs.
  200. func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
  201. data, err := ioutil.ReadAll(r)
  202. if err != nil {
  203. return 0, err
  204. }
  205. totalBytes := len(data)
  206. pb := &protobuf.SnapshotRequest{}
  207. if err := proto.Unmarshal(data, pb); err != nil {
  208. return -1, err
  209. }
  210. req.LeaderName = pb.GetLeaderName()
  211. req.LastIndex = pb.GetLastIndex()
  212. req.LastTerm = pb.GetLastTerm()
  213. return totalBytes, nil
  214. }
  215. // Creates a new Snapshot response.
  216. func newSnapshotResponse(success bool) *SnapshotResponse {
  217. return &SnapshotResponse{
  218. Success: success,
  219. }
  220. }
  221. // Encodes the SnapshotResponse to a buffer. Returns the number of bytes
  222. // written and any error that may have occurred.
  223. func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
  224. pb := &protobuf.SnapshotResponse{
  225. Success: proto.Bool(resp.Success),
  226. }
  227. p, err := proto.Marshal(pb)
  228. if err != nil {
  229. return -1, err
  230. }
  231. return w.Write(p)
  232. }
  233. // Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
  234. // any error that occurs.
  235. func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
  236. data, err := ioutil.ReadAll(r)
  237. if err != nil {
  238. return 0, err
  239. }
  240. totalBytes := len(data)
  241. pb := &protobuf.SnapshotResponse{}
  242. if err := proto.Unmarshal(data, pb); err != nil {
  243. return -1, err
  244. }
  245. resp.Success = pb.GetSuccess()
  246. return totalBytes, nil
  247. }