|
@@ -5,16 +5,29 @@ import (
|
|
|
"time"
|
|
"time"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+// ProducerConfig is used to pass multiple configuration options to NewProducer.
|
|
|
type ProducerConfig struct {
|
|
type ProducerConfig struct {
|
|
|
- Partitioner Partitioner
|
|
|
|
|
- RequiredAcks RequiredAcks
|
|
|
|
|
- Timeout int32
|
|
|
|
|
- Compression CompressionCodec
|
|
|
|
|
- MaxBufferBytes uint32
|
|
|
|
|
- MaxBufferTime uint32
|
|
|
|
|
- MaxDeliveryRetries uint32
|
|
|
|
|
|
|
+ Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
|
|
|
|
|
+ RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
|
|
|
|
|
+ Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
|
|
|
|
|
+ Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
|
|
|
|
|
+ MaxBufferBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
|
|
|
|
|
+ MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
|
|
|
|
|
+ MaxDeliveryRetries uint32 // The number of times to retry a failed message. You should always specify at least 1.
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// Producer publishes Kafka messages. It routes messages to the correct broker
|
|
|
|
|
+// for the provided topic-partition, refreshing metadata as appropriate, and
|
|
|
|
|
+// parses responses for errors. You must call Close() on a producer to avoid
|
|
|
|
|
+// leaks: it may not be garbage-collected automatically when it passes out of
|
|
|
|
|
+// scope (this is in addition to calling Close on the underlying client, which
|
|
|
|
|
+// is still necessary).
|
|
|
|
|
+//
|
|
|
|
|
+// If MaxBufferBytes=0 and MaxBufferTime=0, the Producer is considered to be
|
|
|
|
|
+// operating in "synchronous" mode. This means that errors will be returned
|
|
|
|
|
+// directly from calls to SendMessage. If either value is greater than zero, the
|
|
|
|
|
+// Producer is operating in "asynchronous" mode, and you must read these return
|
|
|
|
|
+// values back from the channel returned by Errors().
|
|
|
type Producer struct {
|
|
type Producer struct {
|
|
|
client *Client
|
|
client *Client
|
|
|
config ProducerConfig
|
|
config ProducerConfig
|
|
@@ -47,6 +60,7 @@ type topicPartition struct {
|
|
|
partition int32
|
|
partition int32
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// NewProducer creates a new Producer using the given client.
|
|
|
func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
if config == nil {
|
|
if config == nil {
|
|
|
config = new(ProducerConfig)
|
|
config = new(ProducerConfig)
|
|
@@ -60,6 +74,10 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
return nil, ConfigurationError("Invalid Timeout")
|
|
return nil, ConfigurationError("Invalid Timeout")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if config.MaxDeliveryRetries < 1 {
|
|
|
|
|
+ Logger.Println("Warning: config.MaxDeliveryRetries is set dangerously low. This will lead to occasional data loss.")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if config.Partitioner == nil {
|
|
if config.Partitioner == nil {
|
|
|
config.Partitioner = NewRandomPartitioner()
|
|
config.Partitioner = NewRandomPartitioner()
|
|
|
}
|
|
}
|
|
@@ -77,6 +95,9 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
}, nil
|
|
}, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// When operating in asynchronous mode, provides access to errors generated
|
|
|
|
|
+// while parsing ProduceResponses from kafka. Should never be called in
|
|
|
|
|
+// synchronous mode.
|
|
|
func (p *Producer) Errors() chan error {
|
|
func (p *Producer) Errors() chan error {
|
|
|
if p.isSynchronous() {
|
|
if p.isSynchronous() {
|
|
|
panic("use of Errors() is not permitted in synchronous mode.")
|
|
panic("use of Errors() is not permitted in synchronous mode.")
|
|
@@ -85,10 +106,23 @@ func (p *Producer) Errors() chan error {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// Close shuts down the producer and flushes any messages it may have buffered.
|
|
|
|
|
+// You must call this function before a producer object passes out of scope, as
|
|
|
|
|
+// it may otherwise leak memory. You must call this before calling Close on the
|
|
|
|
|
+// underlying client.
|
|
|
func (p *Producer) Close() error {
|
|
func (p *Producer) Close() error {
|
|
|
|
|
+ // TODO
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// SendMessage sends a message with the given key and value to the given topic.
|
|
|
|
|
+// The partition to send to is selected by the Producer's Partitioner. To send
|
|
|
|
|
+// strings as either key or value, see the StringEncoder type.
|
|
|
|
|
+//
|
|
|
|
|
+// If operating in synchronous mode, a nil return indicates everything happened
|
|
|
|
|
+// successfully. In asynchronous mode, a nil return only means that the data was
|
|
|
|
|
+// successfully sent to kafka, and you must listen to the channel returned by
|
|
|
|
|
+// Errors() for any errors generated later when the response is received.
|
|
|
func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
|
|
func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
|
|
|
var keyBytes, valBytes []byte
|
|
var keyBytes, valBytes []byte
|
|
|
|
|
|