package sarama import ( "fmt" "sync" "time" ) // ProducerConfig is used to pass multiple configuration options to NewProducer. // // If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and // constantly, but if multiple messages are received while a roundtrip to kafka // is in progress, they will both be combined into the next request. In this // mode, errors are not returned from SendMessage, but over the Errors() // channel. // // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will // buffer messages before sending, to reduce traffic. type ProducerConfig struct { Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement). Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression). MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka. MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker. } // Producer publishes Kafka messages. It routes messages to the correct broker // for the provided topic-partition, refreshing metadata as appropriate, and // parses responses for errors. You must call Close() on a producer to avoid // leaks: it may not be garbage-collected automatically when it passes out of // scope (this is in addition to calling Close on the underlying client, which // is still necessary). // // The default values for MaxBufferedBytes and MaxBufferTime cause sarama to // deliver messages immediately, but to buffer subsequent messages while a // previous request is in-flight. This is often the correct behaviour. // // If synchronous operation is desired, you can use SendMessage. This will cause // sarama to block until the broker has returned a value. Normally, you will // want to use QueueMessage instead, and read the error back from the Errors() // channel. Note that when using QueueMessage, you *must* read the values from // the Errors() channel, or sarama will block indefinitely after a few requests. type Producer struct { client *Client config ProducerConfig brokerProducers map[*Broker]*brokerProducer m sync.RWMutex errors chan error deliveryLocks map[topicPartition]chan bool dm sync.RWMutex } type brokerProducer struct { mapM sync.Mutex messages map[topicPartition][]*produceMessage bufferedBytes uint32 flushNow chan bool broker *Broker stopper chan bool done chan bool hasMessages chan bool } type topicPartition struct { topic string partition int32 } // NewProducer creates a new Producer using the given client. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) { if config == nil { config = NewProducerConfig() } if err := config.Validate(); err != nil { return nil, err } return &Producer{ client: client, config: *config, errors: make(chan error, 16), deliveryLocks: make(map[topicPartition]chan bool), brokerProducers: make(map[*Broker]*brokerProducer), }, nil } // When operating in asynchronous mode, provides access to errors generated // while parsing ProduceResponses from kafka. Should never be called in // synchronous mode. func (p *Producer) Errors() chan error { return p.errors } // Close shuts down the producer and flushes any messages it may have buffered. // You must call this function before a producer object passes out of scope, as // it may otherwise leak memory. You must call this before calling Close on the // underlying client. func (p *Producer) Close() error { for _, bp := range p.brokerProducers { bp.Close() } return nil } // QueueMessage sends a message with the given key and value to the given topic. // The partition to send to is selected by the Producer's Partitioner. To send // strings as either key or value, see the StringEncoder type. // // QueueMessage uses buffering semantics to reduce the nubmer of requests to the // broker. The buffer logic is tunable with config.MaxBufferedBytes and // config.MaxBufferTime. // // QueueMessage will return an error if it's unable to construct the message // (unlikely), but network and response errors must be read from Errors(), since // QueueMessage uses asynchronous delivery. Note that you MUST read back from // Errors(), otherwise the producer will stall after some number of errors. // // If you care about message ordering, you should not call QueueMessage and // SendMessage on the same Producer. Either, used alone, preserves ordering, // however. func (p *Producer) QueueMessage(topic string, key, value Encoder) error { return p.genericSendMessage(topic, key, value, false) } // SendMessage sends a message with the given key and value to the given topic. // The partition to send to is selected by the Producer's Partitioner. To send // strings as either key or value, see the StringEncoder type. // // Unlike QueueMessage, SendMessage operates synchronously, and will block until // the response is received from the broker, returning any error generated in // the process. Reading from Errors() may interfere with the operation of // SendMessage(). // // If you care about message ordering, you should not call QueueMessage and // SendMessage on the same Producer. func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) { return p.genericSendMessage(topic, key, value, true) } func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) { var keyBytes, valBytes []byte if key != nil { if keyBytes, err = key.Encode(); err != nil { return err } } if value != nil { if valBytes, err = value.Encode(); err != nil { return err } } partition, err := p.choosePartition(topic, key) if err != nil { return err } // produce_message.go msg := &produceMessage{ tp: topicPartition{topic, partition}, key: keyBytes, value: valBytes, sync: synchronous, } // produce_message.go return msg.enqueue(p) } func (p *Producer) addMessage(msg *produceMessage) error { bp, err := p.brokerProducerFor(msg.tp) if err != nil { return err } bp.addMessage(msg, p.config.MaxBufferedBytes) return nil } func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) { broker, err := p.client.Leader(tp.topic, tp.partition) if err != nil { return nil, err } p.m.RLock() bp, ok := p.brokerProducers[broker] p.m.RUnlock() if !ok { p.m.Lock() bp, ok = p.brokerProducers[broker] if !ok { bp = p.newBrokerProducer(broker) p.brokerProducers[broker] = bp } p.m.Unlock() } return bp, nil } func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer { bp := &brokerProducer{ messages: make(map[topicPartition][]*produceMessage), flushNow: make(chan bool, 1), broker: broker, stopper: make(chan bool), done: make(chan bool), hasMessages: make(chan bool, 1), } maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond var wg sync.WaitGroup wg.Add(1) go func() { timer := time.NewTimer(maxBufferTime) var shutdownRequired bool wg.Done() for { select { case <-bp.flushNow: if shutdownRequired = bp.flush(p); shutdownRequired { goto shutdown } case <-timer.C: if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired { goto shutdown } case <-bp.stopper: goto shutdown } timer.Reset(maxBufferTime) } shutdown: delete(p.brokerProducers, bp.broker) bp.flushIfAnyMessages(p) p.client.disconnectBroker(bp.broker) close(bp.flushNow) close(bp.hasMessages) close(bp.done) }() wg.Wait() // don't return until the G has started return bp } func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) { bp.mapM.Lock() if msg.retried { // Prepend: Deliver first, before any more recently-added messages. bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...) } else { // Append bp.messages[msg.tp] = append(bp.messages[msg.tp], msg) } bp.bufferedBytes += msg.byteSize() select { case bp.hasMessages <- true: default: } bp.mapM.Unlock() bp.flushIfOverCapacity(maxBufferBytes) } func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) { if bp.bufferedBytes > maxBufferBytes { select { case bp.flushNow <- true: default: } } } func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) { select { case <-bp.hasMessages: select { case bp.hasMessages <- true: default: } return bp.flush(p) default: } return false } func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) { var prb produceRequestBuilder // only deliver messages for topic-partitions that are not currently being delivered. bp.mapM.Lock() for tp, messages := range bp.messages { if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) { prb = append(prb, messages...) delete(bp.messages, tp) p.releaseDeliveryLock(tp) } } bp.mapM.Unlock() if len(prb) > 0 { bp.mapM.Lock() bp.bufferedBytes -= prb.byteSize() bp.mapM.Unlock() return bp.flushRequest(p, prb, func(err error) { if err != nil { Logger.Println(err) } p.errors <- err }) } return false } func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) { // produce_message.go req := prb.toRequest(&p.config) response, err := bp.broker.Produce(p.client.id, req) switch err { case nil: break case EncodingError: // No sense in retrying; it'll just fail again. But what about all the other // messages that weren't invalid? Really, this is a "shit's broke real good" // scenario, so logging it and moving on is probably acceptable. errorCb(err) return false default: overlimit := 0 prb.reverseEach(func(msg *produceMessage) { if err := msg.reenqueue(p); err != nil { overlimit++ } }) if overlimit > 0 { errorCb(DroppedMessagesError{overlimit, nil}) } return true } // When does this ever actually happen, and why don't we explode when it does? // This seems bad. if response == nil { errorCb(nil) return false } for topic, d := range response.Blocks { for partition, block := range d { if block == nil { // IncompleteResponse. Here we just drop all the messages; we don't know whether // they were successfully sent or not. Non-ideal, but how often does it happen? errorCb(DroppedMessagesError{len(prb), IncompleteResponse}) } switch block.Err { case NoError: // All the messages for this topic-partition were delivered successfully! // Unlock delivery for this topic-partition and discard the produceMessage objects. errorCb(nil) case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable: p.client.RefreshTopicMetadata(topic) overlimit := 0 prb.reverseEach(func(msg *produceMessage) { if msg.hasTopicPartition(topic, partition) { if err := msg.reenqueue(p); err != nil { overlimit++ } } }) if overlimit > 0 { errorCb(DroppedMessagesError{overlimit, nil}) } default: errorCb(DroppedMessagesError{len(prb), err}) } } } return false } func (bp *brokerProducer) Close() error { select { case <-bp.stopper: return fmt.Errorf("already closed or closing") default: close(bp.stopper) <-bp.done } return nil } func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool { p.dm.RLock() ch, ok := p.deliveryLocks[tp] p.dm.RUnlock() if !ok { p.dm.Lock() ch, ok = p.deliveryLocks[tp] if !ok { ch = make(chan bool, 1) p.deliveryLocks[tp] = ch } p.dm.Unlock() } select { case ch <- true: return true default: return false } } func (p *Producer) releaseDeliveryLock(tp topicPartition) { p.dm.RLock() ch := p.deliveryLocks[tp] p.dm.RUnlock() select { case <-ch: default: panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.") } } func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) { partitions, err := p.client.Partitions(topic) if err != nil { return -1, err } numPartitions := int32(len(partitions)) if numPartitions == 0 { return -1, LeaderNotAvailable } choice := p.config.Partitioner.Partition(key, numPartitions) if choice < 0 || choice >= numPartitions { return -1, InvalidPartition } return partitions[choice], nil } // Creates a new ProducerConfig instance with sensible defaults. func NewProducerConfig() *ProducerConfig { return &ProducerConfig{ Partitioner: NewRandomPartitioner(), RequiredAcks: WaitForLocal, } } // Validates a ProducerConfig instance. It will return a // ConfigurationError if the specified value doesn't make sense. func (config *ProducerConfig) Validate() error { if config.RequiredAcks < -1 { return ConfigurationError("Invalid RequiredAcks") } if config.Timeout < 0 { return ConfigurationError("Invalid Timeout") } if config.MaxBufferedBytes == 0 { return ConfigurationError("Invalid MaxBufferedBytes") } if config.MaxBufferTime == 0 { return ConfigurationError("Invalid MaxBufferTime") } if config.Partitioner == nil { return ConfigurationError("No partitioner set") } return nil }