fetch_response.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. package sarama
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type AbortedTransaction struct {
  7. ProducerID int64
  8. FirstOffset int64
  9. }
  10. func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
  11. if t.ProducerID, err = pd.getInt64(); err != nil {
  12. return err
  13. }
  14. if t.FirstOffset, err = pd.getInt64(); err != nil {
  15. return err
  16. }
  17. return nil
  18. }
  19. func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
  20. pe.putInt64(t.ProducerID)
  21. pe.putInt64(t.FirstOffset)
  22. return nil
  23. }
  24. type FetchResponseBlock struct {
  25. Err KError
  26. HighWaterMarkOffset int64
  27. LastStableOffset int64
  28. LogStartOffset int64
  29. AbortedTransactions []*AbortedTransaction
  30. PreferredReadReplica int32
  31. Records *Records // deprecated: use FetchResponseBlock.RecordsSet
  32. RecordsSet []*Records
  33. Partial bool
  34. }
  35. func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  36. tmp, err := pd.getInt16()
  37. if err != nil {
  38. return err
  39. }
  40. b.Err = KError(tmp)
  41. b.HighWaterMarkOffset, err = pd.getInt64()
  42. if err != nil {
  43. return err
  44. }
  45. if version >= 4 {
  46. b.LastStableOffset, err = pd.getInt64()
  47. if err != nil {
  48. return err
  49. }
  50. if version >= 5 {
  51. b.LogStartOffset, err = pd.getInt64()
  52. if err != nil {
  53. return err
  54. }
  55. }
  56. numTransact, err := pd.getArrayLength()
  57. if err != nil {
  58. return err
  59. }
  60. if numTransact >= 0 {
  61. b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
  62. }
  63. for i := 0; i < numTransact; i++ {
  64. transact := new(AbortedTransaction)
  65. if err = transact.decode(pd); err != nil {
  66. return err
  67. }
  68. b.AbortedTransactions[i] = transact
  69. }
  70. }
  71. if version >= 11 {
  72. b.PreferredReadReplica, err = pd.getInt32()
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. recordsSize, err := pd.getInt32()
  78. if err != nil {
  79. return err
  80. }
  81. recordsDecoder, err := pd.getSubset(int(recordsSize))
  82. if err != nil {
  83. return err
  84. }
  85. b.RecordsSet = []*Records{}
  86. for recordsDecoder.remaining() > 0 {
  87. records := &Records{}
  88. if err := records.decode(recordsDecoder); err != nil {
  89. // If we have at least one decoded records, this is not an error
  90. if err == ErrInsufficientData {
  91. if len(b.RecordsSet) == 0 {
  92. b.Partial = true
  93. }
  94. break
  95. }
  96. return err
  97. }
  98. partial, err := records.isPartial()
  99. if err != nil {
  100. return err
  101. }
  102. n, err := records.numRecords()
  103. if err != nil {
  104. return err
  105. }
  106. if n > 0 || (partial && len(b.RecordsSet) == 0) {
  107. b.RecordsSet = append(b.RecordsSet, records)
  108. if b.Records == nil {
  109. b.Records = records
  110. }
  111. }
  112. overflow, err := records.isOverflow()
  113. if err != nil {
  114. return err
  115. }
  116. if partial || overflow {
  117. break
  118. }
  119. }
  120. return nil
  121. }
  122. func (b *FetchResponseBlock) numRecords() (int, error) {
  123. sum := 0
  124. for _, records := range b.RecordsSet {
  125. count, err := records.numRecords()
  126. if err != nil {
  127. return 0, err
  128. }
  129. sum += count
  130. }
  131. return sum, nil
  132. }
  133. func (b *FetchResponseBlock) isPartial() (bool, error) {
  134. if b.Partial {
  135. return true, nil
  136. }
  137. if len(b.RecordsSet) == 1 {
  138. return b.RecordsSet[0].isPartial()
  139. }
  140. return false, nil
  141. }
  142. func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  143. pe.putInt16(int16(b.Err))
  144. pe.putInt64(b.HighWaterMarkOffset)
  145. if version >= 4 {
  146. pe.putInt64(b.LastStableOffset)
  147. if version >= 5 {
  148. pe.putInt64(b.LogStartOffset)
  149. }
  150. if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
  151. return err
  152. }
  153. for _, transact := range b.AbortedTransactions {
  154. if err = transact.encode(pe); err != nil {
  155. return err
  156. }
  157. }
  158. }
  159. if version >= 11 {
  160. pe.putInt32(b.PreferredReadReplica)
  161. }
  162. pe.push(&lengthField{})
  163. for _, records := range b.RecordsSet {
  164. err = records.encode(pe)
  165. if err != nil {
  166. return err
  167. }
  168. }
  169. return pe.pop()
  170. }
  171. func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
  172. // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
  173. // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
  174. at := b.AbortedTransactions
  175. sort.Slice(
  176. at,
  177. func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
  178. )
  179. return at
  180. }
  181. type FetchResponse struct {
  182. Blocks map[string]map[int32]*FetchResponseBlock
  183. ThrottleTime time.Duration
  184. ErrorCode int16
  185. SessionID int32
  186. Version int16
  187. LogAppendTime bool
  188. Timestamp time.Time
  189. }
  190. func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
  191. r.Version = version
  192. if r.Version >= 1 {
  193. throttle, err := pd.getInt32()
  194. if err != nil {
  195. return err
  196. }
  197. r.ThrottleTime = time.Duration(throttle) * time.Millisecond
  198. }
  199. if r.Version >= 7 {
  200. r.ErrorCode, err = pd.getInt16()
  201. if err != nil {
  202. return err
  203. }
  204. r.SessionID, err = pd.getInt32()
  205. if err != nil {
  206. return err
  207. }
  208. }
  209. numTopics, err := pd.getArrayLength()
  210. if err != nil {
  211. return err
  212. }
  213. r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  214. for i := 0; i < numTopics; i++ {
  215. name, err := pd.getString()
  216. if err != nil {
  217. return err
  218. }
  219. numBlocks, err := pd.getArrayLength()
  220. if err != nil {
  221. return err
  222. }
  223. r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  224. for j := 0; j < numBlocks; j++ {
  225. id, err := pd.getInt32()
  226. if err != nil {
  227. return err
  228. }
  229. block := new(FetchResponseBlock)
  230. err = block.decode(pd, version)
  231. if err != nil {
  232. return err
  233. }
  234. r.Blocks[name][id] = block
  235. }
  236. }
  237. return nil
  238. }
  239. func (r *FetchResponse) encode(pe packetEncoder) (err error) {
  240. if r.Version >= 1 {
  241. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  242. }
  243. if r.Version >= 7 {
  244. pe.putInt16(r.ErrorCode)
  245. pe.putInt32(r.SessionID)
  246. }
  247. err = pe.putArrayLength(len(r.Blocks))
  248. if err != nil {
  249. return err
  250. }
  251. for topic, partitions := range r.Blocks {
  252. err = pe.putString(topic)
  253. if err != nil {
  254. return err
  255. }
  256. err = pe.putArrayLength(len(partitions))
  257. if err != nil {
  258. return err
  259. }
  260. for id, block := range partitions {
  261. pe.putInt32(id)
  262. err = block.encode(pe, r.Version)
  263. if err != nil {
  264. return err
  265. }
  266. }
  267. }
  268. return nil
  269. }
  270. func (r *FetchResponse) key() int16 {
  271. return 1
  272. }
  273. func (r *FetchResponse) version() int16 {
  274. return r.Version
  275. }
  276. func (r *FetchResponse) headerVersion() int16 {
  277. return 0
  278. }
  279. func (r *FetchResponse) requiredVersion() KafkaVersion {
  280. switch r.Version {
  281. case 0:
  282. return MinVersion
  283. case 1:
  284. return V0_9_0_0
  285. case 2:
  286. return V0_10_0_0
  287. case 3:
  288. return V0_10_1_0
  289. case 4, 5:
  290. return V0_11_0_0
  291. case 6:
  292. return V1_0_0_0
  293. case 7:
  294. return V1_1_0_0
  295. case 8:
  296. return V2_0_0_0
  297. case 9, 10:
  298. return V2_1_0_0
  299. case 11:
  300. return V2_3_0_0
  301. default:
  302. return MaxVersion
  303. }
  304. }
  305. func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  306. if r.Blocks == nil {
  307. return nil
  308. }
  309. if r.Blocks[topic] == nil {
  310. return nil
  311. }
  312. return r.Blocks[topic][partition]
  313. }
  314. func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
  315. if r.Blocks == nil {
  316. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  317. }
  318. partitions, ok := r.Blocks[topic]
  319. if !ok {
  320. partitions = make(map[int32]*FetchResponseBlock)
  321. r.Blocks[topic] = partitions
  322. }
  323. frb, ok := partitions[partition]
  324. if !ok {
  325. frb = new(FetchResponseBlock)
  326. partitions[partition] = frb
  327. }
  328. frb.Err = err
  329. }
  330. func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
  331. if r.Blocks == nil {
  332. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  333. }
  334. partitions, ok := r.Blocks[topic]
  335. if !ok {
  336. partitions = make(map[int32]*FetchResponseBlock)
  337. r.Blocks[topic] = partitions
  338. }
  339. frb, ok := partitions[partition]
  340. if !ok {
  341. frb = new(FetchResponseBlock)
  342. partitions[partition] = frb
  343. }
  344. return frb
  345. }
  346. func encodeKV(key, value Encoder) ([]byte, []byte) {
  347. var kb []byte
  348. var vb []byte
  349. if key != nil {
  350. kb, _ = key.Encode()
  351. }
  352. if value != nil {
  353. vb, _ = value.Encode()
  354. }
  355. return kb, vb
  356. }
  357. func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
  358. frb := r.getOrCreateBlock(topic, partition)
  359. kb, vb := encodeKV(key, value)
  360. if r.LogAppendTime {
  361. timestamp = r.Timestamp
  362. }
  363. msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
  364. msgBlock := &MessageBlock{Msg: msg, Offset: offset}
  365. if len(frb.RecordsSet) == 0 {
  366. records := newLegacyRecords(&MessageSet{})
  367. frb.RecordsSet = []*Records{&records}
  368. }
  369. set := frb.RecordsSet[0].MsgSet
  370. set.Messages = append(set.Messages, msgBlock)
  371. }
  372. func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
  373. frb := r.getOrCreateBlock(topic, partition)
  374. kb, vb := encodeKV(key, value)
  375. if len(frb.RecordsSet) == 0 {
  376. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  377. frb.RecordsSet = []*Records{&records}
  378. }
  379. batch := frb.RecordsSet[0].RecordBatch
  380. rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  381. batch.addRecord(rec)
  382. }
  383. // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
  384. // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
  385. // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
  386. func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
  387. frb := r.getOrCreateBlock(topic, partition)
  388. kb, vb := encodeKV(key, value)
  389. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  390. batch := &RecordBatch{
  391. Version: 2,
  392. LogAppendTime: r.LogAppendTime,
  393. FirstTimestamp: timestamp,
  394. MaxTimestamp: r.Timestamp,
  395. FirstOffset: offset,
  396. LastOffsetDelta: 0,
  397. ProducerID: producerID,
  398. IsTransactional: isTransactional,
  399. }
  400. rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  401. batch.addRecord(rec)
  402. records.RecordBatch = batch
  403. frb.RecordsSet = append(frb.RecordsSet, &records)
  404. }
  405. func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
  406. frb := r.getOrCreateBlock(topic, partition)
  407. // batch
  408. batch := &RecordBatch{
  409. Version: 2,
  410. LogAppendTime: r.LogAppendTime,
  411. FirstTimestamp: timestamp,
  412. MaxTimestamp: r.Timestamp,
  413. FirstOffset: offset,
  414. LastOffsetDelta: 0,
  415. ProducerID: producerID,
  416. IsTransactional: true,
  417. Control: true,
  418. }
  419. // records
  420. records := newDefaultRecords(nil)
  421. records.RecordBatch = batch
  422. // record
  423. crAbort := ControlRecord{
  424. Version: 0,
  425. Type: recordType,
  426. }
  427. crKey := &realEncoder{raw: make([]byte, 4)}
  428. crValue := &realEncoder{raw: make([]byte, 6)}
  429. crAbort.encode(crKey, crValue)
  430. rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  431. batch.addRecord(rec)
  432. frb.RecordsSet = append(frb.RecordsSet, &records)
  433. }
  434. func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
  435. r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
  436. }
  437. func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
  438. r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
  439. }
  440. func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
  441. r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
  442. }
  443. func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
  444. // define controlRecord key and value
  445. r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
  446. }
  447. func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
  448. frb := r.getOrCreateBlock(topic, partition)
  449. if len(frb.RecordsSet) == 0 {
  450. records := newDefaultRecords(&RecordBatch{Version: 2})
  451. frb.RecordsSet = []*Records{&records}
  452. }
  453. batch := frb.RecordsSet[0].RecordBatch
  454. batch.LastOffsetDelta = offset
  455. }
  456. func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
  457. frb := r.getOrCreateBlock(topic, partition)
  458. frb.LastStableOffset = offset
  459. }