123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- package raft
- import (
- "encoding/json"
- "fmt"
- "hash/crc32"
- "io"
- "io/ioutil"
- "os"
- "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
- "github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
- )
- // Snapshot represents an in-memory representation of the current state of the system.
- type Snapshot struct {
- LastIndex uint64 `json:"lastIndex"`
- LastTerm uint64 `json:"lastTerm"`
- // Cluster configuration.
- Peers []*Peer `json:"peers"`
- State []byte `json:"state"`
- Path string `json:"path"`
- }
- // The request sent to a server to start from the snapshot.
- type SnapshotRecoveryRequest struct {
- LeaderName string
- LastIndex uint64
- LastTerm uint64
- Peers []*Peer
- State []byte
- }
- // The response returned from a server appending entries to the log.
- type SnapshotRecoveryResponse struct {
- Term uint64
- Success bool
- CommitIndex uint64
- }
- // The request sent to a server to start from the snapshot.
- type SnapshotRequest struct {
- LeaderName string
- LastIndex uint64
- LastTerm uint64
- }
- // The response returned if the follower entered snapshot state
- type SnapshotResponse struct {
- Success bool `json:"success"`
- }
- // save writes the snapshot to file.
- func (ss *Snapshot) save() error {
- // Open the file for writing.
- file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)
- if err != nil {
- return err
- }
- defer file.Close()
- // Serialize to JSON.
- b, err := json.Marshal(ss)
- if err != nil {
- return err
- }
- // Generate checksum and write it to disk.
- checksum := crc32.ChecksumIEEE(b)
- if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
- return err
- }
- // Write the snapshot to disk.
- if _, err = file.Write(b); err != nil {
- return err
- }
- // Ensure that the snapshot has been flushed to disk before continuing.
- if err := file.Sync(); err != nil {
- return err
- }
- return nil
- }
- // remove deletes the snapshot file.
- func (ss *Snapshot) remove() error {
- if err := os.Remove(ss.Path); err != nil {
- return err
- }
- return nil
- }
- // Creates a new Snapshot request.
- func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
- return &SnapshotRecoveryRequest{
- LeaderName: leaderName,
- LastIndex: snapshot.LastIndex,
- LastTerm: snapshot.LastTerm,
- Peers: snapshot.Peers,
- State: snapshot.State,
- }
- }
- // Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
- // written and any error that may have occurred.
- func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
- protoPeers := make([]*protobuf.SnapshotRecoveryRequest_Peer, len(req.Peers))
- for i, peer := range req.Peers {
- protoPeers[i] = &protobuf.SnapshotRecoveryRequest_Peer{
- Name: proto.String(peer.Name),
- ConnectionString: proto.String(peer.ConnectionString),
- }
- }
- pb := &protobuf.SnapshotRecoveryRequest{
- LeaderName: proto.String(req.LeaderName),
- LastIndex: proto.Uint64(req.LastIndex),
- LastTerm: proto.Uint64(req.LastTerm),
- Peers: protoPeers,
- State: req.State,
- }
- p, err := proto.Marshal(pb)
- if err != nil {
- return -1, err
- }
- return w.Write(p)
- }
- // Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
- // any error that occurs.
- func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
- data, err := ioutil.ReadAll(r)
- if err != nil {
- return 0, err
- }
- totalBytes := len(data)
- pb := &protobuf.SnapshotRecoveryRequest{}
- if err = proto.Unmarshal(data, pb); err != nil {
- return -1, err
- }
- req.LeaderName = pb.GetLeaderName()
- req.LastIndex = pb.GetLastIndex()
- req.LastTerm = pb.GetLastTerm()
- req.State = pb.GetState()
- req.Peers = make([]*Peer, len(pb.Peers))
- for i, peer := range pb.Peers {
- req.Peers[i] = &Peer{
- Name: peer.GetName(),
- ConnectionString: peer.GetConnectionString(),
- }
- }
- return totalBytes, nil
- }
- // Creates a new Snapshot response.
- func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64) *SnapshotRecoveryResponse {
- return &SnapshotRecoveryResponse{
- Term: term,
- Success: success,
- CommitIndex: commitIndex,
- }
- }
- // Encode writes the response to a writer.
- // Returns the number of bytes written and any error that occurs.
- func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
- pb := &protobuf.SnapshotRecoveryResponse{
- Term: proto.Uint64(req.Term),
- Success: proto.Bool(req.Success),
- CommitIndex: proto.Uint64(req.CommitIndex),
- }
- p, err := proto.Marshal(pb)
- if err != nil {
- return -1, err
- }
- return w.Write(p)
- }
- // Decodes the SnapshotRecoveryResponse from a buffer.
- func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
- data, err := ioutil.ReadAll(r)
- if err != nil {
- return 0, err
- }
- totalBytes := len(data)
- pb := &protobuf.SnapshotRecoveryResponse{}
- if err := proto.Unmarshal(data, pb); err != nil {
- return -1, err
- }
- req.Term = pb.GetTerm()
- req.Success = pb.GetSuccess()
- req.CommitIndex = pb.GetCommitIndex()
- return totalBytes, nil
- }
- // Creates a new Snapshot request.
- func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest {
- return &SnapshotRequest{
- LeaderName: leaderName,
- LastIndex: snapshot.LastIndex,
- LastTerm: snapshot.LastTerm,
- }
- }
- // Encodes the SnapshotRequest to a buffer. Returns the number of bytes
- // written and any error that may have occurred.
- func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
- pb := &protobuf.SnapshotRequest{
- LeaderName: proto.String(req.LeaderName),
- LastIndex: proto.Uint64(req.LastIndex),
- LastTerm: proto.Uint64(req.LastTerm),
- }
- p, err := proto.Marshal(pb)
- if err != nil {
- return -1, err
- }
- return w.Write(p)
- }
- // Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
- // any error that occurs.
- func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
- data, err := ioutil.ReadAll(r)
- if err != nil {
- return 0, err
- }
- totalBytes := len(data)
- pb := &protobuf.SnapshotRequest{}
- if err := proto.Unmarshal(data, pb); err != nil {
- return -1, err
- }
- req.LeaderName = pb.GetLeaderName()
- req.LastIndex = pb.GetLastIndex()
- req.LastTerm = pb.GetLastTerm()
- return totalBytes, nil
- }
- // Creates a new Snapshot response.
- func newSnapshotResponse(success bool) *SnapshotResponse {
- return &SnapshotResponse{
- Success: success,
- }
- }
- // Encodes the SnapshotResponse to a buffer. Returns the number of bytes
- // written and any error that may have occurred.
- func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
- pb := &protobuf.SnapshotResponse{
- Success: proto.Bool(resp.Success),
- }
- p, err := proto.Marshal(pb)
- if err != nil {
- return -1, err
- }
- return w.Write(p)
- }
- // Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
- // any error that occurs.
- func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
- data, err := ioutil.ReadAll(r)
- if err != nil {
- return 0, err
- }
- totalBytes := len(data)
- pb := &protobuf.SnapshotResponse{}
- if err := proto.Unmarshal(data, pb); err != nil {
- return -1, err
- }
- resp.Success = pb.GetSuccess()
- return totalBytes, nil
- }
|