produce_response.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package sarama
  2. type ProduceResponseBlock struct {
  3. Err KError
  4. Offset int64
  5. }
  6. func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
  7. tmp, err := pd.getInt16()
  8. if err != nil {
  9. return err
  10. }
  11. pr.Err = KError(tmp)
  12. pr.Offset, err = pd.getInt64()
  13. if err != nil {
  14. return err
  15. }
  16. return nil
  17. }
  18. type ProduceResponse struct {
  19. Blocks map[string]map[int32]*ProduceResponseBlock
  20. }
  21. func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
  22. numTopics, err := pd.getArrayLength()
  23. if err != nil {
  24. return err
  25. }
  26. pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
  27. for i := 0; i < numTopics; i++ {
  28. name, err := pd.getString()
  29. if err != nil {
  30. return err
  31. }
  32. numBlocks, err := pd.getArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
  37. for j := 0; j < numBlocks; j++ {
  38. id, err := pd.getInt32()
  39. if err != nil {
  40. return err
  41. }
  42. block := new(ProduceResponseBlock)
  43. err = block.decode(pd)
  44. if err != nil {
  45. return err
  46. }
  47. pr.Blocks[name][id] = block
  48. }
  49. }
  50. return nil
  51. }
  52. func (pr *ProduceResponse) encode(pe packetEncoder) error {
  53. err := pe.putArrayLength(len(pr.Blocks))
  54. if err != nil {
  55. return err
  56. }
  57. for topic, partitions := range pr.Blocks {
  58. err = pe.putString(topic)
  59. if err != nil {
  60. return err
  61. }
  62. err = pe.putArrayLength(len(partitions))
  63. if err != nil {
  64. return err
  65. }
  66. for id, prb := range partitions {
  67. pe.putInt32(id)
  68. pe.putInt16(int16(prb.Err))
  69. pe.putInt64(prb.Offset)
  70. }
  71. }
  72. return nil
  73. }
  74. func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
  75. if pr.Blocks == nil {
  76. return nil
  77. }
  78. if pr.Blocks[topic] == nil {
  79. return nil
  80. }
  81. return pr.Blocks[topic][partition]
  82. }
  83. // Testing API
  84. func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
  85. if pr.Blocks == nil {
  86. pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  87. }
  88. byTopic, ok := pr.Blocks[topic]
  89. if !ok {
  90. byTopic = make(map[int32]*ProduceResponseBlock)
  91. pr.Blocks[topic] = byTopic
  92. }
  93. byTopic[partition] = &ProduceResponseBlock{Err: err}
  94. }