|
|
@@ -7,25 +7,19 @@ import "time"
|
|
|
// and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
|
|
|
// its underlying Client.
|
|
|
type Producer struct {
|
|
|
- client *Client
|
|
|
- topic string
|
|
|
- partitioner Partitioner
|
|
|
- responseCondition int16
|
|
|
- responseTimeout int32
|
|
|
+ client *Client
|
|
|
+ topic string
|
|
|
+ partitioner Partitioner
|
|
|
}
|
|
|
|
|
|
// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
|
|
|
-// and partition messages using the given partitioner. The responseCondition is the number of replicas Kafka will wait for acknowledgement
|
|
|
-// from before reporting success (TODO: special values). The responseTimeout is how long (TODO: in ms I guess, the spec doesn't say???)
|
|
|
-// Kafka will wait for those acknowledgements before reporting a timeout error.
|
|
|
-func NewProducer(client *Client, topic string, partitioner Partitioner, responseCondition int16, responseTimeout int32) *Producer {
|
|
|
- return &Producer{client, topic, partitioner, responseCondition, responseTimeout}
|
|
|
-}
|
|
|
-
|
|
|
-// NewSimpleProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
|
|
|
-// and partition messages randomly. Kafka will wait for local acknowledgement before reporting success.
|
|
|
-func NewSimpleProducer(client *Client, topic string) *Producer {
|
|
|
- return NewProducer(client, topic, RandomPartitioner{}, k.WAIT_FOR_LOCAL, 0)
|
|
|
+// and partition messages using the given partitioner.
|
|
|
+func NewProducer(client *Client, topic string, partitioner Partitioner) *Producer {
|
|
|
+ p := new(Producer)
|
|
|
+ p.client = client
|
|
|
+ p.topic = topic
|
|
|
+ p.partitioner = partitioner
|
|
|
+ return p
|
|
|
}
|
|
|
|
|
|
// SendMessage sends a message with the given key and value. If key is nil, the partition to send to is selected randomly, otherwise it
|
|
|
@@ -75,7 +69,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
|
|
|
+ request := &k.ProduceRequest{ResponseCondition: k.WAIT_FOR_LOCAL, Timeout: 0}
|
|
|
request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
|
|
|
|
|
|
response, err := broker.Produce(p.client.id, request)
|