offset_commit_request.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package sarama
  2. // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
  3. // tells the broker to set the timestamp to the time at which the request was received.
  4. // The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
  5. const ReceiveTime int64 = -1
  6. type offsetCommitRequestBlock struct {
  7. offset int64
  8. timestamp int64
  9. metadata string
  10. }
  11. func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
  12. pe.putInt64(r.offset)
  13. if version >= 1 {
  14. pe.putInt64(r.timestamp)
  15. }
  16. return pe.putString(r.metadata)
  17. }
  18. type OffsetCommitRequest struct {
  19. ConsumerGroup string
  20. Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
  21. blocks map[string]map[int32]*offsetCommitRequestBlock
  22. }
  23. func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
  24. if r.Version < 0 || r.Version > 1 {
  25. return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
  26. }
  27. err := pe.putString(r.ConsumerGroup)
  28. if err != nil {
  29. return err
  30. }
  31. err = pe.putArrayLength(len(r.blocks))
  32. if err != nil {
  33. return err
  34. }
  35. for topic, partitions := range r.blocks {
  36. err = pe.putString(topic)
  37. if err != nil {
  38. return err
  39. }
  40. err = pe.putArrayLength(len(partitions))
  41. if err != nil {
  42. return err
  43. }
  44. for partition, block := range partitions {
  45. pe.putInt32(partition)
  46. err = block.encode(pe, r.Version)
  47. if err != nil {
  48. return err
  49. }
  50. }
  51. }
  52. return nil
  53. }
  54. func (r *OffsetCommitRequest) key() int16 {
  55. return 8
  56. }
  57. func (r *OffsetCommitRequest) version() int16 {
  58. return r.Version
  59. }
  60. func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
  61. if r.blocks == nil {
  62. r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
  63. }
  64. if r.blocks[topic] == nil {
  65. r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
  66. }
  67. if r.Version == 0 && timestamp != 0 {
  68. Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
  69. }
  70. r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
  71. }