producer.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package kafka
  2. type Producer struct {
  3. client *Client
  4. topic string
  5. partitioner Partitioner
  6. responseCondition int16
  7. responseTimeout int32
  8. }
  9. func NewProducer(client *Client, topic string, partitioner Partitioner, responseCondition int16, responseTimeout int32) *Producer {
  10. return &Producer{client, topic, partitioner, responseCondition, responseTimeout}
  11. }
  12. func NewSimpleProducer(client *Client, topic string) *Producer {
  13. return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
  14. }
  15. func (p *Producer) choosePartition(key Encoder) (int32, error) {
  16. partitions, err := p.client.partitions(p.topic)
  17. if err != nil {
  18. return -1, err
  19. }
  20. var partitioner Partitioner
  21. if key == nil {
  22. partitioner = RandomPartitioner{}
  23. } else {
  24. partitioner = p.partitioner
  25. }
  26. return partitions[partitioner.Partition(key, len(partitions))], nil
  27. }
  28. func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
  29. partition, err := p.choosePartition(key)
  30. if err != nil {
  31. return nil, err
  32. }
  33. var keyBytes []byte
  34. var valBytes []byte
  35. if key != nil {
  36. keyBytes, err = key.Encode()
  37. if err != nil {
  38. return nil, err
  39. }
  40. }
  41. valBytes, err = value.Encode()
  42. if err != nil {
  43. return nil, err
  44. }
  45. broker, err := p.client.leader(p.topic, partition)
  46. if err != nil {
  47. return nil, err
  48. }
  49. request := &ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
  50. request.AddMessage(&p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
  51. response, err := broker.Produce(p.client.id, request)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return response, nil
  56. }
  57. func (p *Producer) SendSimpleMessage(in string) (*ProduceResponse, error) {
  58. return p.SendMessage(nil, encodableString(in))
  59. }