offset_response.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package sarama
  2. type OffsetResponseBlock struct {
  3. Err KError
  4. Offsets []int64
  5. }
  6. func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
  7. tmp, err := pd.getInt16()
  8. if err != nil {
  9. return err
  10. }
  11. r.Err = KError(tmp)
  12. r.Offsets, err = pd.getInt64Array()
  13. return err
  14. }
  15. func (r *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
  16. pe.putInt16(int16(r.Err))
  17. return pe.putInt64Array(r.Offsets)
  18. }
  19. type OffsetResponse struct {
  20. Blocks map[string]map[int32]*OffsetResponseBlock
  21. }
  22. func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
  23. numTopics, err := pd.getArrayLength()
  24. if err != nil {
  25. return err
  26. }
  27. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  28. for i := 0; i < numTopics; i++ {
  29. name, err := pd.getString()
  30. if err != nil {
  31. return err
  32. }
  33. numBlocks, err := pd.getArrayLength()
  34. if err != nil {
  35. return err
  36. }
  37. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  38. for j := 0; j < numBlocks; j++ {
  39. id, err := pd.getInt32()
  40. if err != nil {
  41. return err
  42. }
  43. block := new(OffsetResponseBlock)
  44. err = block.decode(pd)
  45. if err != nil {
  46. return err
  47. }
  48. r.Blocks[name][id] = block
  49. }
  50. }
  51. return nil
  52. }
  53. func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
  54. if r.Blocks == nil {
  55. return nil
  56. }
  57. if r.Blocks[topic] == nil {
  58. return nil
  59. }
  60. return r.Blocks[topic][partition]
  61. }
  62. /*
  63. // [0 0 0 1 ntopics
  64. 0 8 109 121 95 116 111 112 105 99 topic
  65. 0 0 0 1 npartitions
  66. 0 0 0 0 id
  67. 0 0
  68. 0 0 0 1 0 0 0 0
  69. 0 1 1 1 0 0 0 1
  70. 0 8 109 121 95 116 111 112
  71. 105 99 0 0 0 1 0 0
  72. 0 0 0 0 0 0 0 1
  73. 0 0 0 0 0 1 1 1] <nil>
  74. */
  75. func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
  76. if err = pe.putArrayLength(len(r.Blocks)); err != nil {
  77. return err
  78. }
  79. for topic, partitions := range r.Blocks {
  80. if err = pe.putString(topic); err != nil {
  81. return err
  82. }
  83. if err = pe.putArrayLength(len(partitions)); err != nil {
  84. return err
  85. }
  86. for partition, block := range partitions {
  87. pe.putInt32(partition)
  88. if err = block.encode(pe); err != nil {
  89. return err
  90. }
  91. }
  92. }
  93. return nil
  94. }
  95. // testing API
  96. func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
  97. if r.Blocks == nil {
  98. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
  99. }
  100. byTopic, ok := r.Blocks[topic]
  101. if !ok {
  102. byTopic = make(map[int32]*OffsetResponseBlock)
  103. r.Blocks[topic] = byTopic
  104. }
  105. byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
  106. }