offset_commit_request.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package protocol
  2. import enc "sarama/encoding"
  3. type offsetCommitRequestBlock struct {
  4. offset int64
  5. metadata string
  6. }
  7. func (r *offsetCommitRequestBlock) Encode(pe enc.PacketEncoder) error {
  8. pe.PutInt64(r.offset)
  9. return pe.PutString(r.metadata)
  10. }
  11. type OffsetCommitRequest struct {
  12. ConsumerGroup string
  13. blocks map[string]map[int32]*offsetCommitRequestBlock
  14. }
  15. func (r *OffsetCommitRequest) Encode(pe enc.PacketEncoder) error {
  16. err := pe.PutString(r.ConsumerGroup)
  17. if err != nil {
  18. return err
  19. }
  20. err = pe.PutArrayLength(len(r.blocks))
  21. if err != nil {
  22. return err
  23. }
  24. for topic, partitions := range r.blocks {
  25. err = pe.PutString(topic)
  26. if err != nil {
  27. return err
  28. }
  29. err = pe.PutArrayLength(len(partitions))
  30. if err != nil {
  31. return err
  32. }
  33. for partition, block := range partitions {
  34. pe.PutInt32(partition)
  35. err = block.Encode(pe)
  36. if err != nil {
  37. return err
  38. }
  39. }
  40. }
  41. }
  42. func (r *OffsetCommitRequest) key() int16 {
  43. return 6
  44. }
  45. func (r *OffsetCommitRequest) version() int16 {
  46. return 0
  47. }
  48. func (r *OffsetCommitRequest) AddBlock(topic string, partition_id int32, offset int64, metadata string) {
  49. if r.blocks == nil {
  50. r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
  51. }
  52. if r.blocks[topic] == nil {
  53. r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
  54. }
  55. tmp := new(offsetCommitRequestBlock)
  56. tmp.offset = offset
  57. tmp.metadata = metadata
  58. r.blocks[topic][partition_id] = tmp
  59. }