msgappv2.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/coreos/etcd/etcdserver/stats"
  21. "github.com/coreos/etcd/pkg/pbutil"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. )
  25. const (
  26. msgTypeLinkHeartbeat uint8 = 0
  27. msgTypeAppEntries uint8 = 1
  28. msgTypeApp uint8 = 2
  29. )
  30. // msgappv2 stream sends three types of message: linkHeartbeatMessage,
  31. // AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
  32. // replicate state in raft, whose index and term are fully predicatable.
  33. //
  34. // Data format of linkHeartbeatMessage:
  35. // | offset | bytes | description |
  36. // +--------+-------+-------------+
  37. // | 0 | 1 | \x00 |
  38. //
  39. // Data format of AppEntries:
  40. // | offset | bytes | description |
  41. // +--------+-------+-------------+
  42. // | 0 | 1 | \x01 |
  43. // | 1 | 8 | length of entries |
  44. // | 9 | 8 | length of first entry |
  45. // | 17 | n1 | first entry |
  46. // ...
  47. // | x | 8 | length of k-th entry data |
  48. // | x+8 | nk | k-th entry data |
  49. // | x+8+nk | 8 | commit index |
  50. //
  51. // Data format of MsgApp:
  52. // | offset | bytes | description |
  53. // +--------+-------+-------------+
  54. // | 0 | 1 | \x01 |
  55. // | 1 | 8 | length of encoded message |
  56. // | 9 | n | encoded message |
  57. type msgAppV2Encoder struct {
  58. w io.Writer
  59. fs *stats.FollowerStats
  60. term uint64
  61. index uint64
  62. }
  63. func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
  64. start := time.Now()
  65. switch {
  66. case isLinkHeartbeatMessage(m):
  67. return binary.Write(enc.w, binary.BigEndian, msgTypeLinkHeartbeat)
  68. case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
  69. if err := binary.Write(enc.w, binary.BigEndian, msgTypeAppEntries); err != nil {
  70. return err
  71. }
  72. // write length of entries
  73. l := len(m.Entries)
  74. if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil {
  75. return err
  76. }
  77. for i := 0; i < l; i++ {
  78. size := m.Entries[i].Size()
  79. if err := binary.Write(enc.w, binary.BigEndian, uint64(size)); err != nil {
  80. return err
  81. }
  82. if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
  83. return err
  84. }
  85. enc.index++
  86. }
  87. // write commit index
  88. if err := binary.Write(enc.w, binary.BigEndian, m.Commit); err != nil {
  89. return err
  90. }
  91. default:
  92. if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
  93. return err
  94. }
  95. // write size of message
  96. if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
  97. return err
  98. }
  99. // write message
  100. if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
  101. return err
  102. }
  103. enc.term = m.Term
  104. enc.index = m.Index
  105. if l := len(m.Entries); l > 0 {
  106. enc.index = m.Entries[l-1].Index
  107. }
  108. }
  109. enc.fs.Succ(time.Since(start))
  110. return nil
  111. }
  112. type msgAppV2Decoder struct {
  113. r io.Reader
  114. local, remote types.ID
  115. term uint64
  116. index uint64
  117. }
  118. func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
  119. var (
  120. m raftpb.Message
  121. typ uint8
  122. )
  123. if err := binary.Read(dec.r, binary.BigEndian, &typ); err != nil {
  124. return m, err
  125. }
  126. switch typ {
  127. case msgTypeLinkHeartbeat:
  128. return linkHeartbeatMessage, nil
  129. case msgTypeAppEntries:
  130. m = raftpb.Message{
  131. Type: raftpb.MsgApp,
  132. From: uint64(dec.remote),
  133. To: uint64(dec.local),
  134. Term: dec.term,
  135. LogTerm: dec.term,
  136. Index: dec.index,
  137. }
  138. // decode entries
  139. var l uint64
  140. if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
  141. return m, err
  142. }
  143. m.Entries = make([]raftpb.Entry, int(l))
  144. for i := 0; i < int(l); i++ {
  145. var size uint64
  146. if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
  147. return m, err
  148. }
  149. buf := make([]byte, int(size))
  150. if _, err := io.ReadFull(dec.r, buf); err != nil {
  151. return m, err
  152. }
  153. dec.index++
  154. pbutil.MustUnmarshal(&m.Entries[i], buf)
  155. }
  156. // decode commit index
  157. if err := binary.Read(dec.r, binary.BigEndian, &m.Commit); err != nil {
  158. return m, err
  159. }
  160. case msgTypeApp:
  161. var size uint64
  162. if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
  163. return m, err
  164. }
  165. buf := make([]byte, int(size))
  166. if _, err := io.ReadFull(dec.r, buf); err != nil {
  167. return m, err
  168. }
  169. pbutil.MustUnmarshal(&m, buf)
  170. dec.term = m.Term
  171. dec.index = m.Index
  172. if l := len(m.Entries); l > 0 {
  173. dec.index = m.Entries[l-1].Index
  174. }
  175. default:
  176. return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
  177. }
  178. return m, nil
  179. }