offset_request.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package protocol
  2. type offsetRequestBlock struct {
  3. time int64
  4. maxOffsets int32
  5. }
  6. func (r *offsetRequestBlock) encode(pe packetEncoder) {
  7. pe.putInt64(r.time)
  8. pe.putInt32(r.maxOffsets)
  9. }
  10. type OffsetRequest struct {
  11. blocks map[string]map[int32]*offsetRequestBlock
  12. }
  13. func (r *OffsetRequest) encode(pe packetEncoder) {
  14. pe.putInt32(-1) // replica ID is always -1 for clients
  15. pe.putArrayCount(len(r.blocks))
  16. for topic, partitions := range r.blocks {
  17. pe.putString(topic)
  18. pe.putArrayCount(len(partitions))
  19. for partition, block := range partitions {
  20. pe.putInt32(partition)
  21. block.encode(pe)
  22. }
  23. }
  24. }
  25. func (r *OffsetRequest) key() int16 {
  26. return 2
  27. }
  28. func (r *OffsetRequest) version() int16 {
  29. return 0
  30. }
  31. func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time int64, maxOffsets int32) {
  32. if r.blocks == nil {
  33. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  34. }
  35. if r.blocks[topic] == nil {
  36. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  37. }
  38. tmp := new(offsetRequestBlock)
  39. tmp.time = time
  40. tmp.maxOffsets = maxOffsets
  41. r.blocks[topic][partition_id] = tmp
  42. }