append_entries.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package raft
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
  6. "github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
  7. )
  8. // The request sent to a server to append entries to the log.
  9. type AppendEntriesRequest struct {
  10. Term uint64
  11. PrevLogIndex uint64
  12. PrevLogTerm uint64
  13. CommitIndex uint64
  14. LeaderName string
  15. Entries []*protobuf.LogEntry
  16. }
  17. // The response returned from a server appending entries to the log.
  18. type AppendEntriesResponse struct {
  19. pb *protobuf.AppendEntriesResponse
  20. peer string
  21. append bool
  22. }
  23. // Creates a new AppendEntries request.
  24. func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64,
  25. commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
  26. pbEntries := make([]*protobuf.LogEntry, len(entries))
  27. for i := range entries {
  28. pbEntries[i] = entries[i].pb
  29. }
  30. return &AppendEntriesRequest{
  31. Term: term,
  32. PrevLogIndex: prevLogIndex,
  33. PrevLogTerm: prevLogTerm,
  34. CommitIndex: commitIndex,
  35. LeaderName: leaderName,
  36. Entries: pbEntries,
  37. }
  38. }
  39. // Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
  40. // written and any error that may have occurred.
  41. func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
  42. pb := &protobuf.AppendEntriesRequest{
  43. Term: proto.Uint64(req.Term),
  44. PrevLogIndex: proto.Uint64(req.PrevLogIndex),
  45. PrevLogTerm: proto.Uint64(req.PrevLogTerm),
  46. CommitIndex: proto.Uint64(req.CommitIndex),
  47. LeaderName: proto.String(req.LeaderName),
  48. Entries: req.Entries,
  49. }
  50. p, err := proto.Marshal(pb)
  51. if err != nil {
  52. return -1, err
  53. }
  54. return w.Write(p)
  55. }
  56. // Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
  57. // any error that occurs.
  58. func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
  59. data, err := ioutil.ReadAll(r)
  60. if err != nil {
  61. return -1, err
  62. }
  63. pb := new(protobuf.AppendEntriesRequest)
  64. if err := proto.Unmarshal(data, pb); err != nil {
  65. return -1, err
  66. }
  67. req.Term = pb.GetTerm()
  68. req.PrevLogIndex = pb.GetPrevLogIndex()
  69. req.PrevLogTerm = pb.GetPrevLogTerm()
  70. req.CommitIndex = pb.GetCommitIndex()
  71. req.LeaderName = pb.GetLeaderName()
  72. req.Entries = pb.GetEntries()
  73. return len(data), nil
  74. }
  75. // Creates a new AppendEntries response.
  76. func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse {
  77. pb := &protobuf.AppendEntriesResponse{
  78. Term: proto.Uint64(term),
  79. Index: proto.Uint64(index),
  80. Success: proto.Bool(success),
  81. CommitIndex: proto.Uint64(commitIndex),
  82. }
  83. return &AppendEntriesResponse{
  84. pb: pb,
  85. }
  86. }
  87. func (aer *AppendEntriesResponse) Index() uint64 {
  88. return aer.pb.GetIndex()
  89. }
  90. func (aer *AppendEntriesResponse) CommitIndex() uint64 {
  91. return aer.pb.GetCommitIndex()
  92. }
  93. func (aer *AppendEntriesResponse) Term() uint64 {
  94. return aer.pb.GetTerm()
  95. }
  96. func (aer *AppendEntriesResponse) Success() bool {
  97. return aer.pb.GetSuccess()
  98. }
  99. // Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
  100. // written and any error that may have occurred.
  101. func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
  102. b, err := proto.Marshal(resp.pb)
  103. if err != nil {
  104. return -1, err
  105. }
  106. return w.Write(b)
  107. }
  108. // Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
  109. // any error that occurs.
  110. func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
  111. data, err := ioutil.ReadAll(r)
  112. if err != nil {
  113. return -1, err
  114. }
  115. resp.pb = new(protobuf.AppendEntriesResponse)
  116. if err := proto.Unmarshal(data, resp.pb); err != nil {
  117. return -1, err
  118. }
  119. return len(data), nil
  120. }