offset_request.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package kafka
  2. // Special values accepted by Kafka for the 'time' parameter of OffsetRequest.AddBlock().
  3. const (
  4. LATEST_OFFSET int64 = -1
  5. EARLIEST_OFFSETS int64 = -2
  6. )
  7. type offsetRequestBlock struct {
  8. time int64
  9. maxOffsets int32
  10. }
  11. func (r *offsetRequestBlock) encode(pe packetEncoder) {
  12. pe.putInt64(r.time)
  13. pe.putInt32(r.maxOffsets)
  14. }
  15. type OffsetRequest struct {
  16. blocks map[*string]map[int32]*offsetRequestBlock
  17. }
  18. func (r *OffsetRequest) encode(pe packetEncoder) {
  19. pe.putInt32(-1) // replica ID is always -1 for clients
  20. pe.putArrayCount(len(r.blocks))
  21. for topic, partitions := range r.blocks {
  22. pe.putString(topic)
  23. pe.putArrayCount(len(partitions))
  24. for partition, block := range partitions {
  25. pe.putInt32(partition)
  26. block.encode(pe)
  27. }
  28. }
  29. }
  30. func (r *OffsetRequest) key() int16 {
  31. return 2
  32. }
  33. func (r *OffsetRequest) version() int16 {
  34. return 0
  35. }
  36. func (r *OffsetRequest) AddBlock(topic *string, partition_id int32, time int64, maxOffsets int32) {
  37. if r.blocks == nil {
  38. r.blocks = make(map[*string]map[int32]*offsetRequestBlock)
  39. }
  40. if r.blocks[topic] == nil {
  41. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  42. }
  43. tmp := new(offsetRequestBlock)
  44. tmp.time = time
  45. tmp.maxOffsets = maxOffsets
  46. r.blocks[topic][partition_id] = tmp
  47. }