offset_commit_request.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package kafka
  2. type offsetCommitRequestBlock struct {
  3. offset int64
  4. metadata *string
  5. }
  6. func (r *offsetCommitRequestBlock) encode(pe packetEncoder) {
  7. pe.putInt64(r.offset)
  8. pe.putString(r.metadata)
  9. }
  10. type OffsetCommitRequest struct {
  11. ConsumerGroup *string
  12. blocks map[*string]map[int32]*offsetCommitRequestBlock
  13. }
  14. func (r *OffsetCommitRequest) encode(pe packetEncoder) {
  15. pe.putString(r.ConsumerGroup)
  16. pe.putArrayCount(len(r.blocks))
  17. for topic, partitions := range r.blocks {
  18. pe.putString(topic)
  19. pe.putArrayCount(len(partitions))
  20. for partition, block := range partitions {
  21. pe.putInt32(partition)
  22. block.encode(pe)
  23. }
  24. }
  25. }
  26. func (r *OffsetCommitRequest) key() int16 {
  27. return 6
  28. }
  29. func (r *OffsetCommitRequest) version() int16 {
  30. return 0
  31. }
  32. func (r *OffsetCommitRequest) AddBlock(topic *string, partition_id int32, offset int64, metadata *string) {
  33. if r.blocks == nil {
  34. r.blocks = make(map[*string]map[int32]*offsetCommitRequestBlock)
  35. }
  36. if r.blocks[topic] == nil {
  37. r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
  38. }
  39. tmp := new(offsetCommitRequestBlock)
  40. tmp.offset = offset
  41. tmp.metadata = metadata
  42. r.blocks[topic][partition_id] = tmp
  43. }