offset_commit_response.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package sarama
  2. type OffsetCommitResponse struct {
  3. Errors map[string]map[int32]KError
  4. }
  5. func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
  6. if r.Errors == nil {
  7. r.Errors = make(map[string]map[int32]KError)
  8. }
  9. partitions := r.Errors[topic]
  10. if partitions == nil {
  11. partitions = make(map[int32]KError)
  12. r.Errors[topic] = partitions
  13. }
  14. partitions[partition] = kerror
  15. }
  16. func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
  17. if err := pe.putArrayLength(len(r.Errors)); err != nil {
  18. return err
  19. }
  20. for topic, partitions := range r.Errors {
  21. if err := pe.putString(topic); err != nil {
  22. return err
  23. }
  24. if err := pe.putArrayLength(len(partitions)); err != nil {
  25. return err
  26. }
  27. for partition, kerror := range partitions {
  28. pe.putInt32(partition)
  29. pe.putInt16(int16(kerror))
  30. }
  31. }
  32. return nil
  33. }
  34. func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
  35. numTopics, err := pd.getArrayLength()
  36. if err != nil || numTopics == 0 {
  37. return err
  38. }
  39. r.Errors = make(map[string]map[int32]KError, numTopics)
  40. for i := 0; i < numTopics; i++ {
  41. name, err := pd.getString()
  42. if err != nil {
  43. return err
  44. }
  45. numErrors, err := pd.getArrayLength()
  46. if err != nil {
  47. return err
  48. }
  49. r.Errors[name] = make(map[int32]KError, numErrors)
  50. for j := 0; j < numErrors; j++ {
  51. id, err := pd.getInt32()
  52. if err != nil {
  53. return err
  54. }
  55. tmp, err := pd.getInt16()
  56. if err != nil {
  57. return err
  58. }
  59. r.Errors[name][id] = KError(tmp)
  60. }
  61. }
  62. return nil
  63. }
  64. func (r *OffsetCommitResponse) key() int16 {
  65. return 8
  66. }
  67. func (r *OffsetCommitResponse) version() int16 {
  68. return 0
  69. }
  70. func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
  71. return MinVersion
  72. }