fetch_request.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package sarama
  2. // FetchPartition contains the partitions to fetch.
  3. type FetchPartition struct {
  4. // Version defines the protocol version to use for encode and decode
  5. Version int16
  6. // PartitionIndex contains the partition index.
  7. PartitionIndex int32
  8. // CurrentLeaderEpoch contains the current leader epoch of the partition.
  9. CurrentLeaderEpoch int32
  10. // FetchOffset contains the message offset.
  11. FetchOffset int64
  12. // LogStartOffset contains the earliest available offset of the follower replica. The field is only used when the request is sent by the follower.
  13. LogStartOffset int64
  14. // MaxBytes contains the maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored.
  15. MaxBytes int32
  16. }
  17. func (f *FetchPartition) encode(pe packetEncoder, version int16) (err error) {
  18. f.Version = version
  19. pe.putInt32(f.PartitionIndex)
  20. if f.Version >= 9 {
  21. pe.putInt32(f.CurrentLeaderEpoch)
  22. }
  23. pe.putInt64(f.FetchOffset)
  24. if f.Version >= 5 {
  25. pe.putInt64(f.LogStartOffset)
  26. }
  27. pe.putInt32(f.MaxBytes)
  28. return nil
  29. }
  30. func (f *FetchPartition) decode(pd packetDecoder, version int16) (err error) {
  31. f.Version = version
  32. if f.PartitionIndex, err = pd.getInt32(); err != nil {
  33. return err
  34. }
  35. if f.Version >= 9 {
  36. if f.CurrentLeaderEpoch, err = pd.getInt32(); err != nil {
  37. return err
  38. }
  39. }
  40. if f.FetchOffset, err = pd.getInt64(); err != nil {
  41. return err
  42. }
  43. if f.Version >= 5 {
  44. if f.LogStartOffset, err = pd.getInt64(); err != nil {
  45. return err
  46. }
  47. }
  48. if f.MaxBytes, err = pd.getInt32(); err != nil {
  49. return err
  50. }
  51. return nil
  52. }
  53. // FetchableTopic contains the topics to fetch.
  54. type FetchableTopic struct {
  55. // Version defines the protocol version to use for encode and decode
  56. Version int16
  57. // Name contains the name of the topic to fetch.
  58. Name string
  59. // FetchPartitions contains the partitions to fetch.
  60. FetchPartitions []FetchPartition
  61. }
  62. func (t *FetchableTopic) encode(pe packetEncoder, version int16) (err error) {
  63. t.Version = version
  64. if err := pe.putString(t.Name); err != nil {
  65. return err
  66. }
  67. if err := pe.putArrayLength(len(t.FetchPartitions)); err != nil {
  68. return err
  69. }
  70. for _, block := range t.FetchPartitions {
  71. if err := block.encode(pe, t.Version); err != nil {
  72. return err
  73. }
  74. }
  75. return nil
  76. }
  77. func (t *FetchableTopic) decode(pd packetDecoder, version int16) (err error) {
  78. t.Version = version
  79. if t.Name, err = pd.getString(); err != nil {
  80. return err
  81. }
  82. if numFetchPartitions, err := pd.getArrayLength(); err != nil {
  83. return err
  84. } else {
  85. t.FetchPartitions = make([]FetchPartition, numFetchPartitions)
  86. for i := 0; i < numFetchPartitions; i++ {
  87. var block FetchPartition
  88. if err := block.decode(pd, t.Version); err != nil {
  89. return err
  90. }
  91. t.FetchPartitions[i] = block
  92. }
  93. }
  94. return nil
  95. }
  96. // ForgottenTopic contains in an incremental fetch request, the partitions to remove.
  97. type ForgottenTopic struct {
  98. // Version defines the protocol version to use for encode and decode
  99. Version int16
  100. // Name contains the partition name.
  101. Name string
  102. // ForgottenPartitionIndexes contains the partitions indexes to forget.
  103. ForgottenPartitionIndexes []int32
  104. }
  105. func (f *ForgottenTopic) encode(pe packetEncoder, version int16) (err error) {
  106. f.Version = version
  107. if f.Version >= 7 {
  108. if err := pe.putString(f.Name); err != nil {
  109. return err
  110. }
  111. }
  112. if f.Version >= 7 {
  113. if err := pe.putInt32Array(f.ForgottenPartitionIndexes); err != nil {
  114. return err
  115. }
  116. }
  117. return nil
  118. }
  119. func (f *ForgottenTopic) decode(pd packetDecoder, version int16) (err error) {
  120. f.Version = version
  121. if f.Version >= 7 {
  122. if f.Name, err = pd.getString(); err != nil {
  123. return err
  124. }
  125. }
  126. if f.Version >= 7 {
  127. if f.ForgottenPartitionIndexes, err = pd.getInt32Array(); err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. }
  133. const (
  134. ReadUncommitted int8 = iota
  135. ReadCommitted
  136. )
  137. type FetchRequest struct {
  138. // Version defines the protocol version to use for encode and decode
  139. Version int16
  140. // ReplicaID contains the broker ID of the follower, of -1 if this request is from a consumer.
  141. ReplicaID int32
  142. // MaxWait contains the maximum time in milliseconds to wait for the response.
  143. MaxWait int32
  144. // MinBytes contains the minimum bytes to accumulate in the response.
  145. MinBytes int32
  146. // MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored.
  147. MaxBytes int32
  148. // IsolationLevel contains a This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
  149. IsolationLevel int8
  150. // SessionID contains the fetch session ID.
  151. SessionID int32
  152. // Epoch contains the epoch of the partition leader as known to the follower replica or a consumer.
  153. Epoch int32
  154. // Topics contains the topics to fetch.
  155. Topics []FetchableTopic
  156. // Forgotten contains in an incremental fetch request, the partitions to remove.
  157. Forgotten []ForgottenTopic
  158. // RackID contains a Rack ID of the consumer making this request
  159. RackID string
  160. }
  161. func (r *FetchRequest) encode(pe packetEncoder) (err error) {
  162. pe.putInt32(r.ReplicaID)
  163. pe.putInt32(r.MaxWait)
  164. pe.putInt32(r.MinBytes)
  165. if r.Version >= 3 {
  166. pe.putInt32(r.MaxBytes)
  167. }
  168. if r.Version >= 4 {
  169. pe.putInt8(r.IsolationLevel)
  170. }
  171. if r.Version >= 7 {
  172. pe.putInt32(r.SessionID)
  173. }
  174. if r.Version >= 7 {
  175. pe.putInt32(r.Epoch)
  176. }
  177. if err := pe.putArrayLength(len(r.Topics)); err != nil {
  178. return err
  179. }
  180. for _, block := range r.Topics {
  181. if err := block.encode(pe, r.Version); err != nil {
  182. return err
  183. }
  184. }
  185. if r.Version >= 7 {
  186. if err := pe.putArrayLength(len(r.Forgotten)); err != nil {
  187. return err
  188. }
  189. for _, block := range r.Forgotten {
  190. if err := block.encode(pe, r.Version); err != nil {
  191. return err
  192. }
  193. }
  194. }
  195. if r.Version >= 11 {
  196. if err := pe.putString(r.RackID); err != nil {
  197. return err
  198. }
  199. }
  200. return nil
  201. }
  202. func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
  203. r.Version = version
  204. if r.ReplicaID, err = pd.getInt32(); err != nil {
  205. return err
  206. }
  207. if r.MaxWait, err = pd.getInt32(); err != nil {
  208. return err
  209. }
  210. if r.MinBytes, err = pd.getInt32(); err != nil {
  211. return err
  212. }
  213. if r.Version >= 3 {
  214. if r.MaxBytes, err = pd.getInt32(); err != nil {
  215. return err
  216. }
  217. }
  218. if r.Version >= 4 {
  219. if r.IsolationLevel, err = pd.getInt8(); err != nil {
  220. return err
  221. }
  222. }
  223. if r.Version >= 7 {
  224. if r.SessionID, err = pd.getInt32(); err != nil {
  225. return err
  226. }
  227. }
  228. if r.Version >= 7 {
  229. if r.Epoch, err = pd.getInt32(); err != nil {
  230. return err
  231. }
  232. }
  233. if numTopics, err := pd.getArrayLength(); err != nil {
  234. return err
  235. } else {
  236. r.Topics = make([]FetchableTopic, numTopics)
  237. for i := 0; i < numTopics; i++ {
  238. var block FetchableTopic
  239. if err := block.decode(pd, r.Version); err != nil {
  240. return err
  241. }
  242. r.Topics[i] = block
  243. }
  244. }
  245. if r.Version >= 7 {
  246. if numForgotten, err := pd.getArrayLength(); err != nil {
  247. return err
  248. } else {
  249. r.Forgotten = make([]ForgottenTopic, numForgotten)
  250. for i := 0; i < numForgotten; i++ {
  251. var block ForgottenTopic
  252. if err := block.decode(pd, r.Version); err != nil {
  253. return err
  254. }
  255. r.Forgotten[i] = block
  256. }
  257. }
  258. }
  259. if r.Version >= 11 {
  260. if r.RackID, err = pd.getString(); err != nil {
  261. return err
  262. }
  263. }
  264. return nil
  265. }
  266. func (r *FetchRequest) key() int16 {
  267. return 1
  268. }
  269. func (r *FetchRequest) version() int16 {
  270. return r.Version
  271. }
  272. func (r *FetchRequest) headerVersion() int16 {
  273. return 1
  274. }
  275. func (r *FetchRequest) requiredVersion() KafkaVersion {
  276. switch r.Version {
  277. case 0:
  278. return MinVersion
  279. case 1:
  280. return V0_9_0_0
  281. case 2:
  282. return V0_10_0_0
  283. case 3:
  284. return V0_10_1_0
  285. case 4, 5:
  286. return V0_11_0_0
  287. case 6:
  288. return V1_0_0_0
  289. case 7:
  290. return V1_1_0_0
  291. case 8:
  292. return V2_0_0_0
  293. case 9, 10:
  294. return V2_1_0_0
  295. case 11:
  296. return V2_3_0_0
  297. default:
  298. return MaxVersion
  299. }
  300. }