msgappv2.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/coreos/etcd/etcdserver/stats"
  21. "github.com/coreos/etcd/pkg/pbutil"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. )
  25. const (
  26. msgTypeLinkHeartbeat uint8 = 0
  27. msgTypeAppEntries uint8 = 1
  28. msgTypeApp uint8 = 2
  29. msgAppV2BufSize = 1024 * 1024
  30. )
  31. // msgappv2 stream sends three types of message: linkHeartbeatMessage,
  32. // AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
  33. // replicate state in raft, whose index and term are fully predicatable.
  34. //
  35. // Data format of linkHeartbeatMessage:
  36. // | offset | bytes | description |
  37. // +--------+-------+-------------+
  38. // | 0 | 1 | \x00 |
  39. //
  40. // Data format of AppEntries:
  41. // | offset | bytes | description |
  42. // +--------+-------+-------------+
  43. // | 0 | 1 | \x01 |
  44. // | 1 | 8 | length of entries |
  45. // | 9 | 8 | length of first entry |
  46. // | 17 | n1 | first entry |
  47. // ...
  48. // | x | 8 | length of k-th entry data |
  49. // | x+8 | nk | k-th entry data |
  50. // | x+8+nk | 8 | commit index |
  51. //
  52. // Data format of MsgApp:
  53. // | offset | bytes | description |
  54. // +--------+-------+-------------+
  55. // | 0 | 1 | \x01 |
  56. // | 1 | 8 | length of encoded message |
  57. // | 9 | n | encoded message |
  58. type msgAppV2Encoder struct {
  59. w io.Writer
  60. fs *stats.FollowerStats
  61. term uint64
  62. index uint64
  63. buf []byte
  64. uint64buf []byte
  65. uint8buf []byte
  66. }
  67. func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
  68. return &msgAppV2Encoder{
  69. w: w,
  70. fs: fs,
  71. buf: make([]byte, msgAppV2BufSize),
  72. uint64buf: make([]byte, 8),
  73. uint8buf: make([]byte, 1),
  74. }
  75. }
  76. func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
  77. start := time.Now()
  78. switch {
  79. case isLinkHeartbeatMessage(m):
  80. enc.uint8buf[0] = byte(msgTypeLinkHeartbeat)
  81. if _, err := enc.w.Write(enc.uint8buf); err != nil {
  82. return err
  83. }
  84. case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
  85. enc.uint8buf[0] = byte(msgTypeAppEntries)
  86. if _, err := enc.w.Write(enc.uint8buf); err != nil {
  87. return err
  88. }
  89. // write length of entries
  90. binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
  91. if _, err := enc.w.Write(enc.uint64buf); err != nil {
  92. return err
  93. }
  94. for i := 0; i < len(m.Entries); i++ {
  95. // write length of entry
  96. binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
  97. if _, err := enc.w.Write(enc.uint64buf); err != nil {
  98. return err
  99. }
  100. if n := m.Entries[i].Size(); n < msgAppV2BufSize {
  101. if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
  102. return err
  103. }
  104. if _, err := enc.w.Write(enc.buf[:n]); err != nil {
  105. return err
  106. }
  107. } else {
  108. if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
  109. return err
  110. }
  111. }
  112. enc.index++
  113. }
  114. // write commit index
  115. binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
  116. if _, err := enc.w.Write(enc.uint64buf); err != nil {
  117. return err
  118. }
  119. enc.fs.Succ(time.Since(start))
  120. default:
  121. if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
  122. return err
  123. }
  124. // write size of message
  125. if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
  126. return err
  127. }
  128. // write message
  129. if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
  130. return err
  131. }
  132. enc.term = m.Term
  133. enc.index = m.Index
  134. if l := len(m.Entries); l > 0 {
  135. enc.index = m.Entries[l-1].Index
  136. }
  137. enc.fs.Succ(time.Since(start))
  138. }
  139. return nil
  140. }
  141. type msgAppV2Decoder struct {
  142. r io.Reader
  143. local, remote types.ID
  144. term uint64
  145. index uint64
  146. buf []byte
  147. uint64buf []byte
  148. uint8buf []byte
  149. }
  150. func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
  151. return &msgAppV2Decoder{
  152. r: r,
  153. local: local,
  154. remote: remote,
  155. buf: make([]byte, msgAppV2BufSize),
  156. uint64buf: make([]byte, 8),
  157. uint8buf: make([]byte, 1),
  158. }
  159. }
  160. func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
  161. var (
  162. m raftpb.Message
  163. typ uint8
  164. )
  165. if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
  166. return m, err
  167. }
  168. typ = uint8(dec.uint8buf[0])
  169. switch typ {
  170. case msgTypeLinkHeartbeat:
  171. return linkHeartbeatMessage, nil
  172. case msgTypeAppEntries:
  173. m = raftpb.Message{
  174. Type: raftpb.MsgApp,
  175. From: uint64(dec.remote),
  176. To: uint64(dec.local),
  177. Term: dec.term,
  178. LogTerm: dec.term,
  179. Index: dec.index,
  180. }
  181. // decode entries
  182. if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
  183. return m, err
  184. }
  185. l := binary.BigEndian.Uint64(dec.uint64buf)
  186. m.Entries = make([]raftpb.Entry, int(l))
  187. for i := 0; i < int(l); i++ {
  188. if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
  189. return m, err
  190. }
  191. size := binary.BigEndian.Uint64(dec.uint64buf)
  192. var buf []byte
  193. if size < msgAppV2BufSize {
  194. buf = dec.buf[:size]
  195. if _, err := io.ReadFull(dec.r, buf); err != nil {
  196. return m, err
  197. }
  198. } else {
  199. buf = make([]byte, int(size))
  200. if _, err := io.ReadFull(dec.r, buf); err != nil {
  201. return m, err
  202. }
  203. }
  204. dec.index++
  205. // 1 alloc
  206. pbutil.MustUnmarshal(&m.Entries[i], buf)
  207. }
  208. // decode commit index
  209. if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
  210. return m, err
  211. }
  212. m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
  213. case msgTypeApp:
  214. var size uint64
  215. if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
  216. return m, err
  217. }
  218. buf := make([]byte, int(size))
  219. if _, err := io.ReadFull(dec.r, buf); err != nil {
  220. return m, err
  221. }
  222. pbutil.MustUnmarshal(&m, buf)
  223. dec.term = m.Term
  224. dec.index = m.Index
  225. if l := len(m.Entries); l > 0 {
  226. dec.index = m.Entries[l-1].Index
  227. }
  228. default:
  229. return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
  230. }
  231. return m, nil
  232. }