fetch_response.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. package sarama
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type AbortedTransaction struct {
  7. ProducerID int64
  8. FirstOffset int64
  9. }
  10. func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
  11. if t.ProducerID, err = pd.getInt64(); err != nil {
  12. return err
  13. }
  14. if t.FirstOffset, err = pd.getInt64(); err != nil {
  15. return err
  16. }
  17. return nil
  18. }
  19. func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
  20. pe.putInt64(t.ProducerID)
  21. pe.putInt64(t.FirstOffset)
  22. return nil
  23. }
  24. type FetchResponseBlock struct {
  25. Err KError
  26. HighWaterMarkOffset int64
  27. LastStableOffset int64
  28. AbortedTransactions []*AbortedTransaction
  29. Records *Records // deprecated: use FetchResponseBlock.RecordsSet
  30. RecordsSet []*Records
  31. Partial bool
  32. }
  33. func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  34. tmp, err := pd.getInt16()
  35. if err != nil {
  36. return err
  37. }
  38. b.Err = KError(tmp)
  39. b.HighWaterMarkOffset, err = pd.getInt64()
  40. if err != nil {
  41. return err
  42. }
  43. if version >= 4 {
  44. b.LastStableOffset, err = pd.getInt64()
  45. if err != nil {
  46. return err
  47. }
  48. numTransact, err := pd.getArrayLength()
  49. if err != nil {
  50. return err
  51. }
  52. if numTransact >= 0 {
  53. b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
  54. }
  55. for i := 0; i < numTransact; i++ {
  56. transact := new(AbortedTransaction)
  57. if err = transact.decode(pd); err != nil {
  58. return err
  59. }
  60. b.AbortedTransactions[i] = transact
  61. }
  62. }
  63. recordsSize, err := pd.getInt32()
  64. if err != nil {
  65. return err
  66. }
  67. recordsDecoder, err := pd.getSubset(int(recordsSize))
  68. if err != nil {
  69. return err
  70. }
  71. b.RecordsSet = []*Records{}
  72. for recordsDecoder.remaining() > 0 {
  73. records := &Records{}
  74. if err := records.decode(recordsDecoder); err != nil {
  75. // If we have at least one decoded records, this is not an error
  76. if err == ErrInsufficientData {
  77. if len(b.RecordsSet) == 0 {
  78. b.Partial = true
  79. }
  80. break
  81. }
  82. return err
  83. }
  84. partial, err := records.isPartial()
  85. if err != nil {
  86. return err
  87. }
  88. n, err := records.numRecords()
  89. if err != nil {
  90. return err
  91. }
  92. if n > 0 || (partial && len(b.RecordsSet) == 0) {
  93. b.RecordsSet = append(b.RecordsSet, records)
  94. if b.Records == nil {
  95. b.Records = records
  96. }
  97. }
  98. overflow, err := records.isOverflow()
  99. if err != nil {
  100. return err
  101. }
  102. if partial || overflow {
  103. break
  104. }
  105. }
  106. return nil
  107. }
  108. func (b *FetchResponseBlock) numRecords() (int, error) {
  109. sum := 0
  110. for _, records := range b.RecordsSet {
  111. count, err := records.numRecords()
  112. if err != nil {
  113. return 0, err
  114. }
  115. sum += count
  116. }
  117. return sum, nil
  118. }
  119. func (b *FetchResponseBlock) isPartial() (bool, error) {
  120. if b.Partial {
  121. return true, nil
  122. }
  123. if len(b.RecordsSet) == 1 {
  124. return b.RecordsSet[0].isPartial()
  125. }
  126. return false, nil
  127. }
  128. func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  129. pe.putInt16(int16(b.Err))
  130. pe.putInt64(b.HighWaterMarkOffset)
  131. if version >= 4 {
  132. pe.putInt64(b.LastStableOffset)
  133. if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
  134. return err
  135. }
  136. for _, transact := range b.AbortedTransactions {
  137. if err = transact.encode(pe); err != nil {
  138. return err
  139. }
  140. }
  141. }
  142. pe.push(&lengthField{})
  143. for _, records := range b.RecordsSet {
  144. err = records.encode(pe)
  145. if err != nil {
  146. return err
  147. }
  148. }
  149. return pe.pop()
  150. }
  151. func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
  152. // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
  153. // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
  154. at := b.AbortedTransactions
  155. sort.Slice(
  156. at,
  157. func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
  158. )
  159. return at
  160. }
  161. type FetchResponse struct {
  162. Blocks map[string]map[int32]*FetchResponseBlock
  163. ThrottleTime time.Duration
  164. Version int16 // v1 requires 0.9+, v2 requires 0.10+
  165. LogAppendTime bool
  166. Timestamp time.Time
  167. }
  168. func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
  169. r.Version = version
  170. if r.Version >= 1 {
  171. throttle, err := pd.getInt32()
  172. if err != nil {
  173. return err
  174. }
  175. r.ThrottleTime = time.Duration(throttle) * time.Millisecond
  176. }
  177. numTopics, err := pd.getArrayLength()
  178. if err != nil {
  179. return err
  180. }
  181. r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  182. for i := 0; i < numTopics; i++ {
  183. name, err := pd.getString()
  184. if err != nil {
  185. return err
  186. }
  187. numBlocks, err := pd.getArrayLength()
  188. if err != nil {
  189. return err
  190. }
  191. r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  192. for j := 0; j < numBlocks; j++ {
  193. id, err := pd.getInt32()
  194. if err != nil {
  195. return err
  196. }
  197. block := new(FetchResponseBlock)
  198. err = block.decode(pd, version)
  199. if err != nil {
  200. return err
  201. }
  202. r.Blocks[name][id] = block
  203. }
  204. }
  205. return nil
  206. }
  207. func (r *FetchResponse) encode(pe packetEncoder) (err error) {
  208. if r.Version >= 1 {
  209. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  210. }
  211. err = pe.putArrayLength(len(r.Blocks))
  212. if err != nil {
  213. return err
  214. }
  215. for topic, partitions := range r.Blocks {
  216. err = pe.putString(topic)
  217. if err != nil {
  218. return err
  219. }
  220. err = pe.putArrayLength(len(partitions))
  221. if err != nil {
  222. return err
  223. }
  224. for id, block := range partitions {
  225. pe.putInt32(id)
  226. err = block.encode(pe, r.Version)
  227. if err != nil {
  228. return err
  229. }
  230. }
  231. }
  232. return nil
  233. }
  234. func (r *FetchResponse) key() int16 {
  235. return 1
  236. }
  237. func (r *FetchResponse) version() int16 {
  238. return r.Version
  239. }
  240. func (r *FetchResponse) requiredVersion() KafkaVersion {
  241. switch r.Version {
  242. case 1:
  243. return V0_9_0_0
  244. case 2:
  245. return V0_10_0_0
  246. case 3:
  247. return V0_10_1_0
  248. case 4:
  249. return V0_11_0_0
  250. default:
  251. return MinVersion
  252. }
  253. }
  254. func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  255. if r.Blocks == nil {
  256. return nil
  257. }
  258. if r.Blocks[topic] == nil {
  259. return nil
  260. }
  261. return r.Blocks[topic][partition]
  262. }
  263. func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
  264. if r.Blocks == nil {
  265. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  266. }
  267. partitions, ok := r.Blocks[topic]
  268. if !ok {
  269. partitions = make(map[int32]*FetchResponseBlock)
  270. r.Blocks[topic] = partitions
  271. }
  272. frb, ok := partitions[partition]
  273. if !ok {
  274. frb = new(FetchResponseBlock)
  275. partitions[partition] = frb
  276. }
  277. frb.Err = err
  278. }
  279. func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
  280. if r.Blocks == nil {
  281. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  282. }
  283. partitions, ok := r.Blocks[topic]
  284. if !ok {
  285. partitions = make(map[int32]*FetchResponseBlock)
  286. r.Blocks[topic] = partitions
  287. }
  288. frb, ok := partitions[partition]
  289. if !ok {
  290. frb = new(FetchResponseBlock)
  291. partitions[partition] = frb
  292. }
  293. return frb
  294. }
  295. func encodeKV(key, value Encoder) ([]byte, []byte) {
  296. var kb []byte
  297. var vb []byte
  298. if key != nil {
  299. kb, _ = key.Encode()
  300. }
  301. if value != nil {
  302. vb, _ = value.Encode()
  303. }
  304. return kb, vb
  305. }
  306. func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
  307. frb := r.getOrCreateBlock(topic, partition)
  308. kb, vb := encodeKV(key, value)
  309. if r.LogAppendTime {
  310. timestamp = r.Timestamp
  311. }
  312. msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
  313. msgBlock := &MessageBlock{Msg: msg, Offset: offset}
  314. if len(frb.RecordsSet) == 0 {
  315. records := newLegacyRecords(&MessageSet{})
  316. frb.RecordsSet = []*Records{&records}
  317. }
  318. set := frb.RecordsSet[0].MsgSet
  319. set.Messages = append(set.Messages, msgBlock)
  320. }
  321. func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
  322. frb := r.getOrCreateBlock(topic, partition)
  323. kb, vb := encodeKV(key, value)
  324. if len(frb.RecordsSet) == 0 {
  325. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  326. frb.RecordsSet = []*Records{&records}
  327. }
  328. batch := frb.RecordsSet[0].RecordBatch
  329. rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  330. batch.addRecord(rec)
  331. }
  332. // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
  333. // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
  334. // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
  335. func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
  336. frb := r.getOrCreateBlock(topic, partition)
  337. kb, vb := encodeKV(key, value)
  338. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  339. batch := &RecordBatch{
  340. Version: 2,
  341. LogAppendTime: r.LogAppendTime,
  342. FirstTimestamp: timestamp,
  343. MaxTimestamp: r.Timestamp,
  344. FirstOffset: offset,
  345. LastOffsetDelta: 0,
  346. ProducerID: producerID,
  347. IsTransactional: isTransactional,
  348. }
  349. rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  350. batch.addRecord(rec)
  351. records.RecordBatch = batch
  352. frb.RecordsSet = append(frb.RecordsSet, &records)
  353. }
  354. func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
  355. frb := r.getOrCreateBlock(topic, partition)
  356. // batch
  357. batch := &RecordBatch{
  358. Version: 2,
  359. LogAppendTime: r.LogAppendTime,
  360. FirstTimestamp: timestamp,
  361. MaxTimestamp: r.Timestamp,
  362. FirstOffset: offset,
  363. LastOffsetDelta: 0,
  364. ProducerID: producerID,
  365. IsTransactional: true,
  366. Control: true,
  367. }
  368. // records
  369. records := newDefaultRecords(nil)
  370. records.RecordBatch = batch
  371. // record
  372. crAbort := ControlRecord{
  373. Version: 0,
  374. Type: recordType,
  375. }
  376. crKey := &realEncoder{raw: make([]byte, 4)}
  377. crValue := &realEncoder{raw: make([]byte, 6)}
  378. crAbort.encode(crKey, crValue)
  379. rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  380. batch.addRecord(rec)
  381. frb.RecordsSet = append(frb.RecordsSet, &records)
  382. }
  383. func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
  384. r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
  385. }
  386. func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
  387. r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
  388. }
  389. func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
  390. r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
  391. }
  392. func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
  393. // define controlRecord key and value
  394. r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
  395. }
  396. func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
  397. frb := r.getOrCreateBlock(topic, partition)
  398. if len(frb.RecordsSet) == 0 {
  399. records := newDefaultRecords(&RecordBatch{Version: 2})
  400. frb.RecordsSet = []*Records{&records}
  401. }
  402. batch := frb.RecordsSet[0].RecordBatch
  403. batch.LastOffsetDelta = offset
  404. }
  405. func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
  406. frb := r.getOrCreateBlock(topic, partition)
  407. frb.LastStableOffset = offset
  408. }