123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- package sarama
- import (
- "fmt"
- "sync"
- "time"
- )
- type ProducerConfig struct {
- Partitioner Partitioner
- RequiredAcks RequiredAcks
- Timeout int32
- Compression CompressionCodec
- MaxBufferedBytes uint32
- MaxBufferTime uint32
- }
- type Producer struct {
- client *Client
- config ProducerConfig
- brokerProducers map[*Broker]*brokerProducer
- m sync.RWMutex
- errors chan error
- deliveryLocks map[topicPartition]chan bool
- dm sync.RWMutex
- }
- type brokerProducer struct {
- mapM sync.Mutex
- messages map[topicPartition][]*produceMessage
- bufferedBytes uint32
- flushNow chan bool
- broker *Broker
- stopper chan bool
- done chan bool
- hasMessages chan bool
- }
- type topicPartition struct {
- topic string
- partition int32
- }
- func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
- if config == nil {
- config = NewProducerConfig()
- }
- if err := config.Validate(); err != nil {
- return nil, err
- }
- return &Producer{
- client: client,
- config: *config,
- errors: make(chan error, 16),
- deliveryLocks: make(map[topicPartition]chan bool),
- brokerProducers: make(map[*Broker]*brokerProducer),
- }, nil
- }
- func (p *Producer) Errors() chan error {
- return p.errors
- }
- func (p *Producer) Close() error {
- for _, bp := range p.brokerProducers {
- bp.Close()
- }
- return nil
- }
- func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
- return p.genericSendMessage(topic, key, value, false)
- }
- func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
- return p.genericSendMessage(topic, key, value, true)
- }
- func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
- var keyBytes, valBytes []byte
- if key != nil {
- if keyBytes, err = key.Encode(); err != nil {
- return err
- }
- }
- if value != nil {
- if valBytes, err = value.Encode(); err != nil {
- return err
- }
- }
- partition, err := p.choosePartition(topic, key)
- if err != nil {
- return err
- }
-
- msg := &produceMessage{
- tp: topicPartition{topic, partition},
- key: keyBytes,
- value: valBytes,
- sync: synchronous,
- }
-
- return msg.enqueue(p)
- }
- func (p *Producer) addMessage(msg *produceMessage) error {
- bp, err := p.brokerProducerFor(msg.tp)
- if err != nil {
- return err
- }
- bp.addMessage(msg, p.config.MaxBufferedBytes)
- return nil
- }
- func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
- broker, err := p.client.Leader(tp.topic, tp.partition)
- if err != nil {
- return nil, err
- }
- p.m.RLock()
- bp, ok := p.brokerProducers[broker]
- p.m.RUnlock()
- if !ok {
- p.m.Lock()
- bp, ok = p.brokerProducers[broker]
- if !ok {
- bp = p.newBrokerProducer(broker)
- p.brokerProducers[broker] = bp
- }
- p.m.Unlock()
- }
- return bp, nil
- }
- func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
- bp := &brokerProducer{
- messages: make(map[topicPartition][]*produceMessage),
- flushNow: make(chan bool, 1),
- broker: broker,
- stopper: make(chan bool),
- done: make(chan bool),
- hasMessages: make(chan bool, 1),
- }
- maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- timer := time.NewTimer(maxBufferTime)
- var shutdownRequired bool
- wg.Done()
- for {
- select {
- case <-bp.flushNow:
- if shutdownRequired = bp.flush(p); shutdownRequired {
- goto shutdown
- }
- case <-timer.C:
- if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
- goto shutdown
- }
- case <-bp.stopper:
- goto shutdown
- }
- timer.Reset(maxBufferTime)
- }
- shutdown:
- delete(p.brokerProducers, bp.broker)
- bp.flushIfAnyMessages(p)
- p.client.disconnectBroker(bp.broker)
- close(bp.flushNow)
- close(bp.hasMessages)
- close(bp.done)
- }()
- wg.Wait()
- return bp
- }
- func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
- bp.mapM.Lock()
- if msg.retried {
-
- bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
- } else {
-
- bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
- }
- bp.bufferedBytes += msg.byteSize()
- select {
- case bp.hasMessages <- true:
- default:
- }
- bp.mapM.Unlock()
- bp.flushIfOverCapacity(maxBufferBytes)
- }
- func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
- if bp.bufferedBytes > maxBufferBytes {
- select {
- case bp.flushNow <- true:
- default:
- }
- }
- }
- func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) {
- select {
- case <-bp.hasMessages:
- select {
- case bp.hasMessages <- true:
- default:
- }
- return bp.flush(p)
- default:
- }
- return false
- }
- func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
- var prb produceRequestBuilder
-
- bp.mapM.Lock()
- for tp, messages := range bp.messages {
- if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
- prb = append(prb, messages...)
- delete(bp.messages, tp)
- p.releaseDeliveryLock(tp)
- }
- }
- bp.mapM.Unlock()
- if len(prb) > 0 {
- bp.mapM.Lock()
- bp.bufferedBytes -= prb.byteSize()
- bp.mapM.Unlock()
- return bp.flushRequest(p, prb, func(err error) {
- if err != nil {
- Logger.Println(err)
- }
- p.errors <- err
- })
- }
- return false
- }
- func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
-
- req := prb.toRequest(&p.config)
- response, err := bp.broker.Produce(p.client.id, req)
- switch err {
- case nil:
- break
- case EncodingError:
-
-
-
- errorCb(err)
- return false
- default:
- overlimit := 0
- prb.reverseEach(func(msg *produceMessage) {
- if err := msg.reenqueue(p); err != nil {
- overlimit++
- }
- })
- if overlimit > 0 {
- errorCb(DroppedMessagesError{overlimit, nil})
- }
- return true
- }
-
-
- if response == nil {
- errorCb(nil)
- return false
- }
- for topic, d := range response.Blocks {
- for partition, block := range d {
- if block == nil {
-
-
- errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
- }
- switch block.Err {
- case NoError:
-
-
- errorCb(nil)
- case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
- p.client.RefreshTopicMetadata(topic)
- overlimit := 0
- prb.reverseEach(func(msg *produceMessage) {
- if msg.hasTopicPartition(topic, partition) {
- if err := msg.reenqueue(p); err != nil {
- overlimit++
- }
- }
- })
- if overlimit > 0 {
- errorCb(DroppedMessagesError{overlimit, nil})
- }
- default:
- errorCb(DroppedMessagesError{len(prb), err})
- }
- }
- }
- return false
- }
- func (bp *brokerProducer) Close() error {
- select {
- case <-bp.stopper:
- return fmt.Errorf("already closed or closing")
- default:
- close(bp.stopper)
- <-bp.done
- }
- return nil
- }
- func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
- p.dm.RLock()
- ch, ok := p.deliveryLocks[tp]
- p.dm.RUnlock()
- if !ok {
- p.dm.Lock()
- ch, ok = p.deliveryLocks[tp]
- if !ok {
- ch = make(chan bool, 1)
- p.deliveryLocks[tp] = ch
- }
- p.dm.Unlock()
- }
- select {
- case ch <- true:
- return true
- default:
- return false
- }
- }
- func (p *Producer) releaseDeliveryLock(tp topicPartition) {
- p.dm.RLock()
- ch := p.deliveryLocks[tp]
- p.dm.RUnlock()
- select {
- case <-ch:
- default:
- panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
- }
- }
- func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
- partitions, err := p.client.Partitions(topic)
- if err != nil {
- return -1, err
- }
- numPartitions := int32(len(partitions))
- if numPartitions == 0 {
- return -1, LeaderNotAvailable
- }
- choice := p.config.Partitioner.Partition(key, numPartitions)
- if choice < 0 || choice >= numPartitions {
- return -1, InvalidPartition
- }
- return partitions[choice], nil
- }
- func NewProducerConfig() *ProducerConfig {
- return &ProducerConfig{
- Partitioner: NewRandomPartitioner(),
- RequiredAcks: WaitForLocal,
- }
- }
- func (config *ProducerConfig) Validate() error {
- if config.RequiredAcks < -1 {
- return ConfigurationError("Invalid RequiredAcks")
- }
- if config.Timeout < 0 {
- return ConfigurationError("Invalid Timeout")
- }
- if config.MaxBufferedBytes == 0 {
- return ConfigurationError("Invalid MaxBufferedBytes")
- }
- if config.MaxBufferTime == 0 {
- return ConfigurationError("Invalid MaxBufferTime")
- }
- if config.Partitioner == nil {
- return ConfigurationError("No partitioner set")
- }
- return nil
- }
|