Bladeren bron

More options in producer

Evan Huus 12 jaren geleden
bovenliggende
commit
20a1a98e2f
2 gewijzigde bestanden met toevoegingen van 19 en 10 verwijderingen
  1. 3 3
      partitionChooser.go
  2. 16 7
      producer.go

+ 3 - 3
partitionChooser.go

@@ -2,13 +2,13 @@ package kafka
 
 import "math/rand"
 
-type partitionChooser interface {
-	choosePartition(key encoder, partitions []int32) int32
+type PartitionChooser interface {
+	ChoosePartition(key encoder, partitions []int32) int32
 }
 
 type RandomPartitioner struct {
 }
 
-func (p RandomPartitioner) choosePartition(key encoder, partitions []int32) int32 {
+func (p RandomPartitioner) ChoosePartition(key encoder, partitions []int32) int32 {
 	return partitions[rand.Intn(len(partitions))]
 }

+ 16 - 7
producer.go

@@ -1,13 +1,19 @@
 package kafka
 
 type Producer struct {
-	client      *Client
-	topic       string
-	partitioner partitionChooser
+	client            *Client
+	topic             string
+	partitioner       PartitionChooser
+	responseCondition int16
+	responseTimeout   int32
+}
+
+func NewProducer(client *Client, topic string, partitioner PartitionChooser, responseCondition int16, responseTimeout int32) *Producer {
+	return &Producer{client, topic, partitioner, responseCondition, responseTimeout}
 }
 
 func NewSimpleProducer(client *Client, topic string) *Producer {
-	return &Producer{client, topic, RandomPartitioner{}}
+	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
 }
 
 func (p *Producer) SendMessage(key, value encoder) error {
@@ -16,11 +22,13 @@ func (p *Producer) SendMessage(key, value encoder) error {
 		return err
 	}
 
-	partitioner := p.partitioner
+	var partitioner PartitionChooser
 	if key == nil {
 		partitioner = RandomPartitioner{}
+	} else {
+		partitioner = p.partitioner
 	}
-	partition := partitioner.choosePartition(nil, partitions)
+	partition := partitioner.ChoosePartition(key, partitions)
 
 	msg, err := newMessage(key, value)
 	if err != nil {
@@ -28,7 +36,8 @@ func (p *Producer) SendMessage(key, value encoder) error {
 	}
 
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(msg))
-	request.requiredAcks = WAIT_FOR_LOCAL
+	request.requiredAcks = p.responseCondition
+	request.timeout = p.responseTimeout
 
 	_, err = p.client.brokers.sendToPartition(p.topic, partition, request, &produceResponse{})