|
|
@@ -3,12 +3,12 @@ package kafka
|
|
|
type Producer struct {
|
|
|
client *Client
|
|
|
topic string
|
|
|
- partitioner PartitionChooser
|
|
|
+ partitioner Partitioner
|
|
|
responseCondition int16
|
|
|
responseTimeout int32
|
|
|
}
|
|
|
|
|
|
-func NewProducer(client *Client, topic string, partitioner PartitionChooser, responseCondition int16, responseTimeout int32) *Producer {
|
|
|
+func NewProducer(client *Client, topic string, partitioner Partitioner, responseCondition int16, responseTimeout int32) *Producer {
|
|
|
return &Producer{client, topic, partitioner, responseCondition, responseTimeout}
|
|
|
}
|
|
|
|
|
|
@@ -22,14 +22,14 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
|
|
|
return -1, err
|
|
|
}
|
|
|
|
|
|
- var partitioner PartitionChooser
|
|
|
+ var partitioner Partitioner
|
|
|
if key == nil {
|
|
|
partitioner = RandomPartitioner{}
|
|
|
} else {
|
|
|
partitioner = p.partitioner
|
|
|
}
|
|
|
|
|
|
- return partitions[partitioner.ChoosePartition(key, len(partitions))], nil
|
|
|
+ return partitions[partitioner.Partition(key, len(partitions))], nil
|
|
|
}
|
|
|
|
|
|
func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
|