produce_request.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package kafka
  2. type produceRequestPartitionBlock struct {
  3. partition int32
  4. msgSet *messageSet
  5. }
  6. func (p *produceRequestPartitionBlock) encode(pe packetEncoder) {
  7. pe.putInt32(p.partition)
  8. pe.pushLength32()
  9. p.msgSet.encode(pe)
  10. pe.pop()
  11. }
  12. type produceRequestTopicBlock struct {
  13. topic *string
  14. partitions []produceRequestPartitionBlock
  15. }
  16. func (p *produceRequestTopicBlock) encode(pe packetEncoder) {
  17. pe.putString(p.topic)
  18. pe.putArrayCount(len(p.partitions))
  19. for i := range p.partitions {
  20. (&p.partitions[i]).encode(pe)
  21. }
  22. }
  23. const (
  24. NO_RESPONSE int16 = 0
  25. WAIT_FOR_LOCAL int16 = 1
  26. WAIT_FOR_ALL int16 = -1
  27. )
  28. type produceRequest struct {
  29. requiredAcks int16
  30. timeout int32
  31. topics []produceRequestTopicBlock
  32. }
  33. func (p *produceRequest) encode(pe packetEncoder) {
  34. pe.putInt16(p.requiredAcks)
  35. pe.putInt32(p.timeout)
  36. pe.putArrayCount(len(p.topics))
  37. for i := range p.topics {
  38. (&p.topics[i]).encode(pe)
  39. }
  40. }
  41. func (p *produceRequest) key() int16 {
  42. return 0
  43. }
  44. func (p *produceRequest) version() int16 {
  45. return 0
  46. }
  47. func (p *produceRequest) expectResponse() bool {
  48. return p.requiredAcks != NO_RESPONSE
  49. }
  50. func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
  51. req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
  52. req.topics[0].topic = &topic
  53. req.topics[0].partitions = make([]produceRequestPartitionBlock, 1)
  54. req.topics[0].partitions[0].partition = partition
  55. req.topics[0].partitions[0].msgSet = set
  56. return req
  57. }