produce_request.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package kafka
  2. type produceRequestPartitionBlock struct {
  3. partition int32
  4. msgSet *messageSet
  5. }
  6. func (p *produceRequestPartitionBlock) encode(pe packetEncoder) {
  7. pe.putInt32(p.partition)
  8. pe.pushLength32()
  9. p.msgSet.encode(pe)
  10. pe.pop()
  11. }
  12. type produceRequestTopicBlock struct {
  13. topic *string
  14. partitions []produceRequestPartitionBlock
  15. }
  16. func (p *produceRequestTopicBlock) encode(pe packetEncoder) {
  17. pe.putString(p.topic)
  18. pe.putArrayCount(len(p.partitions))
  19. for i := range p.partitions {
  20. (&p.partitions[i]).encode(pe)
  21. }
  22. }
  23. type produceRequest struct {
  24. requiredAcks int16
  25. timeout int32
  26. topics []produceRequestTopicBlock
  27. }
  28. func (p *produceRequest) encode(pe packetEncoder) {
  29. pe.putInt16(p.requiredAcks)
  30. pe.putInt32(p.timeout)
  31. pe.putArrayCount(len(p.topics))
  32. for i := range p.topics {
  33. (&p.topics[i]).encode(pe)
  34. }
  35. }
  36. func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
  37. req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
  38. req.topics[0].topic = &topic
  39. req.topics[0].partitions = make([]produceRequestPartitionBlock, 1)
  40. req.topics[0].partitions[0].partition = partition
  41. req.topics[0].partitions[0].msgSet = set
  42. return req
  43. }