|
|
@@ -3,6 +3,9 @@ package kafka
|
|
|
import k "sarama/protocol"
|
|
|
import "time"
|
|
|
|
|
|
+// Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
|
|
|
+// and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
|
|
|
+// its underlying Client.
|
|
|
type Producer struct {
|
|
|
client *Client
|
|
|
topic string
|
|
|
@@ -11,20 +14,34 @@ type Producer struct {
|
|
|
responseTimeout int32
|
|
|
}
|
|
|
|
|
|
+// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
|
|
|
+// and partition messages using the given partitioner. The responseCondition is the number of replicas Kafka will wait for acknowledgement
|
|
|
+// from before reporting success (TODO: special values). The responseTimeout is how long (TODO: in ms I guess, the spec doesn't say???)
|
|
|
+// Kafka will wait for those acknowledgements before reporting a timeout error.
|
|
|
func NewProducer(client *Client, topic string, partitioner Partitioner, responseCondition int16, responseTimeout int32) *Producer {
|
|
|
return &Producer{client, topic, partitioner, responseCondition, responseTimeout}
|
|
|
}
|
|
|
|
|
|
+// NewSimpleProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
|
|
|
+// and partition messages randomly. Kafka will wait for local acknowledgement before reporting success.
|
|
|
func NewSimpleProducer(client *Client, topic string) *Producer {
|
|
|
return NewProducer(client, topic, RandomPartitioner{}, k.WAIT_FOR_LOCAL, 0)
|
|
|
}
|
|
|
|
|
|
+// SendMessage sends a message with the given key and value. If key is nil, the partition to send to is selected randomly, otherwise it
|
|
|
+// is selected by the Producer's Partitioner.
|
|
|
func (p *Producer) SendMessage(key, value Encoder) error {
|
|
|
return p.safeSendMessage(key, value, 1)
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) SendSimpleMessage(msg string) error {
|
|
|
- return p.safeSendMessage(nil, encodableString(msg), 1)
|
|
|
+// SendStringMessage is a helper for wrapping a string in an Encoder and calling SendMessage with the wrapped string (and a nil key).
|
|
|
+func (p *Producer) SendStringMessage(msg string) error {
|
|
|
+ return p.SendMessage(nil, encodableString(msg))
|
|
|
+}
|
|
|
+
|
|
|
+// SendKeyedStringMessage is a helper for wrapping both key and value strings in an Encoder and calling SendMessage with the wrapped strings.
|
|
|
+func (p *Producer) SendKeyedStringMessage(key, value string) error {
|
|
|
+ return p.SendMessage(encodableString(key), encodableString(value))
|
|
|
}
|
|
|
|
|
|
func (p *Producer) choosePartition(key Encoder) (int32, error) {
|