offset_commit_request.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package sarama
  2. // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
  3. // tells the broker to set the timestamp to the time at which the request was received.
  4. const ReceiveTime int64 = -1
  5. type offsetCommitRequestBlock struct {
  6. offset int64
  7. timestamp int64
  8. metadata string
  9. }
  10. func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
  11. pe.putInt64(r.offset)
  12. pe.putInt64(r.timestamp)
  13. return pe.putString(r.metadata)
  14. }
  15. type OffsetCommitRequest struct {
  16. ConsumerGroup string
  17. blocks map[string]map[int32]*offsetCommitRequestBlock
  18. }
  19. func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
  20. err := pe.putString(r.ConsumerGroup)
  21. if err != nil {
  22. return err
  23. }
  24. err = pe.putArrayLength(len(r.blocks))
  25. if err != nil {
  26. return err
  27. }
  28. for topic, partitions := range r.blocks {
  29. err = pe.putString(topic)
  30. if err != nil {
  31. return err
  32. }
  33. err = pe.putArrayLength(len(partitions))
  34. if err != nil {
  35. return err
  36. }
  37. for partition, block := range partitions {
  38. pe.putInt32(partition)
  39. err = block.encode(pe)
  40. if err != nil {
  41. return err
  42. }
  43. }
  44. }
  45. return nil
  46. }
  47. func (r *OffsetCommitRequest) key() int16 {
  48. return 8
  49. }
  50. func (r *OffsetCommitRequest) version() int16 {
  51. return 0
  52. }
  53. func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
  54. if r.blocks == nil {
  55. r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
  56. }
  57. if r.blocks[topic] == nil {
  58. r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
  59. }
  60. r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
  61. }