append_entries_request.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package raft
  2. import (
  3. "code.google.com/p/goprotobuf/proto"
  4. "github.com/benbjohnson/go-raft/protobuf"
  5. "io"
  6. "io/ioutil"
  7. )
  8. // The request sent to a server to append entries to the log.
  9. type AppendEntriesRequest struct {
  10. Term uint64
  11. PrevLogIndex uint64
  12. PrevLogTerm uint64
  13. CommitIndex uint64
  14. LeaderName string
  15. Entries []*LogEntry
  16. }
  17. // Creates a new AppendEntries request.
  18. func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64, commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
  19. return &AppendEntriesRequest{
  20. Term: term,
  21. PrevLogIndex: prevLogIndex,
  22. PrevLogTerm: prevLogTerm,
  23. CommitIndex: commitIndex,
  24. LeaderName: leaderName,
  25. Entries: entries,
  26. }
  27. }
  28. // Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
  29. // written and any error that may have occurred.
  30. func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
  31. protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries))
  32. for i, entry := range req.Entries {
  33. protoEntries[i] = &protobuf.ProtoAppendEntriesRequest_ProtoLogEntry{
  34. Index: proto.Uint64(entry.Index),
  35. Term: proto.Uint64(entry.Term),
  36. CommandName: proto.String(entry.CommandName),
  37. Command: entry.Command,
  38. }
  39. }
  40. pb := &protobuf.ProtoAppendEntriesRequest{
  41. Term: proto.Uint64(req.Term),
  42. PrevLogIndex: proto.Uint64(req.PrevLogIndex),
  43. PrevLogTerm: proto.Uint64(req.PrevLogTerm),
  44. CommitIndex: proto.Uint64(req.CommitIndex),
  45. LeaderName: proto.String(req.LeaderName),
  46. Entries: protoEntries,
  47. }
  48. p, err := proto.Marshal(pb)
  49. if err != nil {
  50. return -1, err
  51. }
  52. return w.Write(p)
  53. }
  54. // Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
  55. // any error that occurs.
  56. func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) {
  57. data, err := ioutil.ReadAll(r)
  58. if err != nil {
  59. return -1, err
  60. }
  61. totalBytes := len(data)
  62. pb := &protobuf.ProtoAppendEntriesRequest{}
  63. if err := proto.Unmarshal(data, pb); err != nil {
  64. return -1, err
  65. }
  66. req.Term = pb.GetTerm()
  67. req.PrevLogIndex = pb.GetPrevLogIndex()
  68. req.PrevLogTerm = pb.GetPrevLogTerm()
  69. req.CommitIndex = pb.GetCommitIndex()
  70. req.LeaderName = pb.GetLeaderName()
  71. req.Entries = make([]*LogEntry, len(pb.Entries))
  72. for i, entry := range pb.Entries {
  73. req.Entries[i] = &LogEntry{
  74. Index: entry.GetIndex(),
  75. Term: entry.GetTerm(),
  76. CommandName: entry.GetCommandName(),
  77. Command: entry.Command,
  78. }
  79. }
  80. return totalBytes, nil
  81. }