offset_commit_request.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package sarama
  2. type offsetCommitRequestBlock struct {
  3. offset int64
  4. metadata string
  5. }
  6. func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
  7. pe.putInt64(r.offset)
  8. return pe.putString(r.metadata)
  9. }
  10. type OffsetCommitRequest struct {
  11. ConsumerGroup string
  12. blocks map[string]map[int32]*offsetCommitRequestBlock
  13. }
  14. func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
  15. err := pe.putString(r.ConsumerGroup)
  16. if err != nil {
  17. return err
  18. }
  19. err = pe.putArrayLength(len(r.blocks))
  20. if err != nil {
  21. return err
  22. }
  23. for topic, partitions := range r.blocks {
  24. err = pe.putString(topic)
  25. if err != nil {
  26. return err
  27. }
  28. err = pe.putArrayLength(len(partitions))
  29. if err != nil {
  30. return err
  31. }
  32. for partition, block := range partitions {
  33. pe.putInt32(partition)
  34. err = block.encode(pe)
  35. if err != nil {
  36. return err
  37. }
  38. }
  39. }
  40. return nil
  41. }
  42. func (r *OffsetCommitRequest) key() int16 {
  43. return 8
  44. }
  45. func (r *OffsetCommitRequest) version() int16 {
  46. return 0
  47. }
  48. func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, metadata string) {
  49. if r.blocks == nil {
  50. r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
  51. }
  52. if r.blocks[topic] == nil {
  53. r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
  54. }
  55. tmp := new(offsetCommitRequestBlock)
  56. tmp.offset = offset
  57. tmp.metadata = metadata
  58. r.blocks[topic][partitionID] = tmp
  59. }