offset_request.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package kafka
  2. // Special values accepted by Kafka for the 'time' parameter of OffsetRequest.AddBlock().
  3. const (
  4. // Ask for the latest offsets.
  5. LATEST_OFFSETS int64 = -1
  6. // Ask for the earliest available offset. Note that because offsets are pulled in descending order,
  7. // asking for the earliest offset will always return you a single element.
  8. EARLIEST_OFFSET int64 = -2
  9. )
  10. type offsetRequestBlock struct {
  11. time int64
  12. maxOffsets int32
  13. }
  14. func (r *offsetRequestBlock) encode(pe packetEncoder) {
  15. pe.putInt64(r.time)
  16. pe.putInt32(r.maxOffsets)
  17. }
  18. type OffsetRequest struct {
  19. blocks map[*string]map[int32]*offsetRequestBlock
  20. }
  21. func (r *OffsetRequest) encode(pe packetEncoder) {
  22. pe.putInt32(-1) // replica ID is always -1 for clients
  23. pe.putArrayCount(len(r.blocks))
  24. for topic, partitions := range r.blocks {
  25. pe.putString(topic)
  26. pe.putArrayCount(len(partitions))
  27. for partition, block := range partitions {
  28. pe.putInt32(partition)
  29. block.encode(pe)
  30. }
  31. }
  32. }
  33. func (r *OffsetRequest) key() int16 {
  34. return 2
  35. }
  36. func (r *OffsetRequest) version() int16 {
  37. return 0
  38. }
  39. func (r *OffsetRequest) AddBlock(topic *string, partition_id int32, time int64, maxOffsets int32) {
  40. if r.blocks == nil {
  41. r.blocks = make(map[*string]map[int32]*offsetRequestBlock)
  42. }
  43. if r.blocks[topic] == nil {
  44. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  45. }
  46. tmp := new(offsetRequestBlock)
  47. tmp.time = time
  48. tmp.maxOffsets = maxOffsets
  49. r.blocks[topic][partition_id] = tmp
  50. }