|
|
@@ -1,7 +1,7 @@
|
|
|
package kafka
|
|
|
|
|
|
type Producer struct {
|
|
|
- *Client
|
|
|
+ client *Client
|
|
|
topic string
|
|
|
partitioner PartitionChooser
|
|
|
responseCondition int16
|
|
|
@@ -17,7 +17,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
|
|
|
}
|
|
|
|
|
|
func (p *Producer) choosePartition(key encoder) (int32, error) {
|
|
|
- partitions, err := p.partitions(p.topic)
|
|
|
+ partitions, err := p.client.partitions(p.topic)
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
}
|
|
|
@@ -43,7 +43,7 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- broker, err := p.leader(p.topic, partition)
|
|
|
+ broker, err := p.client.leader(p.topic, partition)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -52,7 +52,7 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
|
|
|
request.requiredAcks = p.responseCondition
|
|
|
request.timeout = p.responseTimeout
|
|
|
|
|
|
- decoder, err := broker.Send(p.id, request)
|
|
|
+ decoder, err := broker.Send(p.client.id, request)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|