fetch_response.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. package sarama
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type AbortedTransaction struct {
  7. ProducerID int64
  8. FirstOffset int64
  9. }
  10. func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
  11. if t.ProducerID, err = pd.getInt64(); err != nil {
  12. return err
  13. }
  14. if t.FirstOffset, err = pd.getInt64(); err != nil {
  15. return err
  16. }
  17. return nil
  18. }
  19. func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
  20. pe.putInt64(t.ProducerID)
  21. pe.putInt64(t.FirstOffset)
  22. return nil
  23. }
  24. type FetchResponseBlock struct {
  25. Err KError
  26. HighWaterMarkOffset int64
  27. LastStableOffset int64
  28. LogStartOffset int64
  29. AbortedTransactions []*AbortedTransaction
  30. PreferredReadReplica int32
  31. Records *Records // deprecated: use FetchResponseBlock.RecordsSet
  32. RecordsSet []*Records
  33. Partial bool
  34. }
  35. func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  36. tmp, err := pd.getInt16()
  37. if err != nil {
  38. return err
  39. }
  40. b.Err = KError(tmp)
  41. b.HighWaterMarkOffset, err = pd.getInt64()
  42. if err != nil {
  43. return err
  44. }
  45. if version >= 4 {
  46. b.LastStableOffset, err = pd.getInt64()
  47. if err != nil {
  48. return err
  49. }
  50. if version >= 5 {
  51. b.LogStartOffset, err = pd.getInt64()
  52. if err != nil {
  53. return err
  54. }
  55. }
  56. numTransact, err := pd.getArrayLength()
  57. if err != nil {
  58. return err
  59. }
  60. if numTransact >= 0 {
  61. b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
  62. }
  63. for i := 0; i < numTransact; i++ {
  64. transact := new(AbortedTransaction)
  65. if err = transact.decode(pd); err != nil {
  66. return err
  67. }
  68. b.AbortedTransactions[i] = transact
  69. }
  70. }
  71. if version >= 11 {
  72. b.PreferredReadReplica, err = pd.getInt32()
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. recordsSize, err := pd.getInt32()
  78. if err != nil {
  79. return err
  80. }
  81. recordsDecoder, err := pd.getSubset(int(recordsSize))
  82. if err != nil {
  83. return err
  84. }
  85. b.RecordsSet = []*Records{}
  86. for recordsDecoder.remaining() > 0 {
  87. records := &Records{}
  88. if err := records.decode(recordsDecoder); err != nil {
  89. // If we have at least one decoded records, this is not an error
  90. if err == ErrInsufficientData {
  91. if len(b.RecordsSet) == 0 {
  92. b.Partial = true
  93. }
  94. break
  95. }
  96. return err
  97. }
  98. partial, err := records.isPartial()
  99. if err != nil {
  100. return err
  101. }
  102. n, err := records.numRecords()
  103. if err != nil {
  104. return err
  105. }
  106. if n > 0 || (partial && len(b.RecordsSet) == 0) {
  107. b.RecordsSet = append(b.RecordsSet, records)
  108. if b.Records == nil {
  109. b.Records = records
  110. }
  111. }
  112. overflow, err := records.isOverflow()
  113. if err != nil {
  114. return err
  115. }
  116. if partial || overflow {
  117. break
  118. }
  119. }
  120. return nil
  121. }
  122. func (b *FetchResponseBlock) numRecords() (int, error) {
  123. sum := 0
  124. for _, records := range b.RecordsSet {
  125. count, err := records.numRecords()
  126. if err != nil {
  127. return 0, err
  128. }
  129. sum += count
  130. }
  131. return sum, nil
  132. }
  133. func (b *FetchResponseBlock) isPartial() (bool, error) {
  134. if b.Partial {
  135. return true, nil
  136. }
  137. if len(b.RecordsSet) == 1 {
  138. return b.RecordsSet[0].isPartial()
  139. }
  140. return false, nil
  141. }
  142. func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  143. pe.putInt16(int16(b.Err))
  144. pe.putInt64(b.HighWaterMarkOffset)
  145. if version >= 4 {
  146. pe.putInt64(b.LastStableOffset)
  147. if version >= 5 {
  148. pe.putInt64(b.LogStartOffset)
  149. }
  150. if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
  151. return err
  152. }
  153. for _, transact := range b.AbortedTransactions {
  154. if err = transact.encode(pe); err != nil {
  155. return err
  156. }
  157. }
  158. }
  159. if version >= 11 {
  160. pe.putInt32(b.PreferredReadReplica)
  161. }
  162. pe.push(&lengthField{})
  163. for _, records := range b.RecordsSet {
  164. err = records.encode(pe)
  165. if err != nil {
  166. return err
  167. }
  168. }
  169. return pe.pop()
  170. }
  171. func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
  172. // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
  173. // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
  174. at := b.AbortedTransactions
  175. sort.Slice(
  176. at,
  177. func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
  178. )
  179. return at
  180. }
  181. type FetchResponse struct {
  182. Blocks map[string]map[int32]*FetchResponseBlock
  183. ThrottleTime time.Duration
  184. ErrorCode int16
  185. SessionID int32
  186. Version int16
  187. LogAppendTime bool
  188. Timestamp time.Time
  189. }
  190. func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
  191. r.Version = version
  192. if r.Version >= 1 {
  193. throttle, err := pd.getInt32()
  194. if err != nil {
  195. return err
  196. }
  197. r.ThrottleTime = time.Duration(throttle) * time.Millisecond
  198. }
  199. if r.Version >= 7 {
  200. r.ErrorCode, err = pd.getInt16()
  201. if err != nil {
  202. return err
  203. }
  204. r.SessionID, err = pd.getInt32()
  205. if err != nil {
  206. return err
  207. }
  208. }
  209. numTopics, err := pd.getArrayLength()
  210. if err != nil {
  211. return err
  212. }
  213. r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  214. for i := 0; i < numTopics; i++ {
  215. name, err := pd.getString()
  216. if err != nil {
  217. return err
  218. }
  219. numBlocks, err := pd.getArrayLength()
  220. if err != nil {
  221. return err
  222. }
  223. r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  224. for j := 0; j < numBlocks; j++ {
  225. id, err := pd.getInt32()
  226. if err != nil {
  227. return err
  228. }
  229. block := new(FetchResponseBlock)
  230. err = block.decode(pd, version)
  231. if err != nil {
  232. return err
  233. }
  234. r.Blocks[name][id] = block
  235. }
  236. }
  237. return nil
  238. }
  239. func (r *FetchResponse) encode(pe packetEncoder) (err error) {
  240. if r.Version >= 1 {
  241. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  242. }
  243. if r.Version >= 7 {
  244. pe.putInt16(r.ErrorCode)
  245. pe.putInt32(r.SessionID)
  246. }
  247. err = pe.putArrayLength(len(r.Blocks))
  248. if err != nil {
  249. return err
  250. }
  251. for topic, partitions := range r.Blocks {
  252. err = pe.putString(topic)
  253. if err != nil {
  254. return err
  255. }
  256. err = pe.putArrayLength(len(partitions))
  257. if err != nil {
  258. return err
  259. }
  260. for id, block := range partitions {
  261. pe.putInt32(id)
  262. err = block.encode(pe, r.Version)
  263. if err != nil {
  264. return err
  265. }
  266. }
  267. }
  268. return nil
  269. }
  270. func (r *FetchResponse) key() int16 {
  271. return 1
  272. }
  273. func (r *FetchResponse) version() int16 {
  274. return r.Version
  275. }
  276. func (r *FetchResponse) requiredVersion() KafkaVersion {
  277. switch r.Version {
  278. case 0:
  279. return MinVersion
  280. case 1:
  281. return V0_9_0_0
  282. case 2:
  283. return V0_10_0_0
  284. case 3:
  285. return V0_10_1_0
  286. case 4, 5:
  287. return V0_11_0_0
  288. case 6:
  289. return V1_0_0_0
  290. case 7:
  291. return V1_1_0_0
  292. case 8:
  293. return V2_0_0_0
  294. case 9, 10:
  295. return V2_1_0_0
  296. case 11:
  297. return V2_3_0_0
  298. default:
  299. return MaxVersion
  300. }
  301. }
  302. func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  303. if r.Blocks == nil {
  304. return nil
  305. }
  306. if r.Blocks[topic] == nil {
  307. return nil
  308. }
  309. return r.Blocks[topic][partition]
  310. }
  311. func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
  312. if r.Blocks == nil {
  313. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  314. }
  315. partitions, ok := r.Blocks[topic]
  316. if !ok {
  317. partitions = make(map[int32]*FetchResponseBlock)
  318. r.Blocks[topic] = partitions
  319. }
  320. frb, ok := partitions[partition]
  321. if !ok {
  322. frb = new(FetchResponseBlock)
  323. partitions[partition] = frb
  324. }
  325. frb.Err = err
  326. }
  327. func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
  328. if r.Blocks == nil {
  329. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  330. }
  331. partitions, ok := r.Blocks[topic]
  332. if !ok {
  333. partitions = make(map[int32]*FetchResponseBlock)
  334. r.Blocks[topic] = partitions
  335. }
  336. frb, ok := partitions[partition]
  337. if !ok {
  338. frb = new(FetchResponseBlock)
  339. partitions[partition] = frb
  340. }
  341. return frb
  342. }
  343. func encodeKV(key, value Encoder) ([]byte, []byte) {
  344. var kb []byte
  345. var vb []byte
  346. if key != nil {
  347. kb, _ = key.Encode()
  348. }
  349. if value != nil {
  350. vb, _ = value.Encode()
  351. }
  352. return kb, vb
  353. }
  354. func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
  355. frb := r.getOrCreateBlock(topic, partition)
  356. kb, vb := encodeKV(key, value)
  357. if r.LogAppendTime {
  358. timestamp = r.Timestamp
  359. }
  360. msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
  361. msgBlock := &MessageBlock{Msg: msg, Offset: offset}
  362. if len(frb.RecordsSet) == 0 {
  363. records := newLegacyRecords(&MessageSet{})
  364. frb.RecordsSet = []*Records{&records}
  365. }
  366. set := frb.RecordsSet[0].MsgSet
  367. set.Messages = append(set.Messages, msgBlock)
  368. }
  369. func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
  370. frb := r.getOrCreateBlock(topic, partition)
  371. kb, vb := encodeKV(key, value)
  372. if len(frb.RecordsSet) == 0 {
  373. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  374. frb.RecordsSet = []*Records{&records}
  375. }
  376. batch := frb.RecordsSet[0].RecordBatch
  377. rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  378. batch.addRecord(rec)
  379. }
  380. // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
  381. // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
  382. // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
  383. func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
  384. frb := r.getOrCreateBlock(topic, partition)
  385. kb, vb := encodeKV(key, value)
  386. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  387. batch := &RecordBatch{
  388. Version: 2,
  389. LogAppendTime: r.LogAppendTime,
  390. FirstTimestamp: timestamp,
  391. MaxTimestamp: r.Timestamp,
  392. FirstOffset: offset,
  393. LastOffsetDelta: 0,
  394. ProducerID: producerID,
  395. IsTransactional: isTransactional,
  396. }
  397. rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  398. batch.addRecord(rec)
  399. records.RecordBatch = batch
  400. frb.RecordsSet = append(frb.RecordsSet, &records)
  401. }
  402. func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
  403. frb := r.getOrCreateBlock(topic, partition)
  404. // batch
  405. batch := &RecordBatch{
  406. Version: 2,
  407. LogAppendTime: r.LogAppendTime,
  408. FirstTimestamp: timestamp,
  409. MaxTimestamp: r.Timestamp,
  410. FirstOffset: offset,
  411. LastOffsetDelta: 0,
  412. ProducerID: producerID,
  413. IsTransactional: true,
  414. Control: true,
  415. }
  416. // records
  417. records := newDefaultRecords(nil)
  418. records.RecordBatch = batch
  419. // record
  420. crAbort := ControlRecord{
  421. Version: 0,
  422. Type: recordType,
  423. }
  424. crKey := &realEncoder{raw: make([]byte, 4)}
  425. crValue := &realEncoder{raw: make([]byte, 6)}
  426. crAbort.encode(crKey, crValue)
  427. rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  428. batch.addRecord(rec)
  429. frb.RecordsSet = append(frb.RecordsSet, &records)
  430. }
  431. func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
  432. r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
  433. }
  434. func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
  435. r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
  436. }
  437. func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
  438. r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
  439. }
  440. func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
  441. // define controlRecord key and value
  442. r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
  443. }
  444. func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
  445. frb := r.getOrCreateBlock(topic, partition)
  446. if len(frb.RecordsSet) == 0 {
  447. records := newDefaultRecords(&RecordBatch{Version: 2})
  448. frb.RecordsSet = []*Records{&records}
  449. }
  450. batch := frb.RecordsSet[0].RecordBatch
  451. batch.LastOffsetDelta = offset
  452. }
  453. func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
  454. frb := r.getOrCreateBlock(topic, partition)
  455. frb.LastStableOffset = offset
  456. }