123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924 |
- package sarama
- import (
- "fmt"
- "sync"
- "time"
- "github.com/eapache/go-resiliency/breaker"
- "github.com/eapache/queue"
- )
- type AsyncProducer interface {
-
-
-
-
- AsyncClose()
-
-
-
-
- Close() error
-
- Input() chan<- *ProducerMessage
-
-
-
- Successes() <-chan *ProducerMessage
-
-
-
- Errors() <-chan *ProducerError
- }
- type asyncProducer struct {
- client Client
- conf *Config
- ownClient bool
- errors chan *ProducerError
- input, successes, retries chan *ProducerMessage
- inFlight sync.WaitGroup
- brokers map[*Broker]chan<- *ProducerMessage
- brokerRefs map[chan<- *ProducerMessage]int
- brokerLock sync.Mutex
- }
- func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
- client, err := NewClient(addrs, conf)
- if err != nil {
- return nil, err
- }
- p, err := NewAsyncProducerFromClient(client)
- if err != nil {
- return nil, err
- }
- p.(*asyncProducer).ownClient = true
- return p, nil
- }
- func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
-
- if client.Closed() {
- return nil, ErrClosedClient
- }
- p := &asyncProducer{
- client: client,
- conf: client.Config(),
- errors: make(chan *ProducerError),
- input: make(chan *ProducerMessage),
- successes: make(chan *ProducerMessage),
- retries: make(chan *ProducerMessage),
- brokers: make(map[*Broker]chan<- *ProducerMessage),
- brokerRefs: make(map[chan<- *ProducerMessage]int),
- }
-
- go withRecover(p.dispatcher)
- go withRecover(p.retryHandler)
- return p, nil
- }
- type flagSet int8
- const (
- chaser flagSet = 1 << iota
- shutdown
- )
- type ProducerMessage struct {
- Topic string
- Key Encoder
- Value Encoder
-
- Offset int64
- Partition int32
- Metadata interface{}
- retries int
- flags flagSet
- }
- func (m *ProducerMessage) byteSize() int {
- size := 26
- if m.Key != nil {
- size += m.Key.Length()
- }
- if m.Value != nil {
- size += m.Value.Length()
- }
- return size
- }
- func (m *ProducerMessage) clear() {
- m.flags = 0
- m.retries = 0
- }
- type ProducerError struct {
- Msg *ProducerMessage
- Err error
- }
- func (pe ProducerError) Error() string {
- return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
- }
- type ProducerErrors []*ProducerError
- func (pe ProducerErrors) Error() string {
- return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
- }
- func (p *asyncProducer) Errors() <-chan *ProducerError {
- return p.errors
- }
- func (p *asyncProducer) Successes() <-chan *ProducerMessage {
- return p.successes
- }
- func (p *asyncProducer) Input() chan<- *ProducerMessage {
- return p.input
- }
- func (p *asyncProducer) Close() error {
- p.AsyncClose()
- if p.conf.Producer.Return.Successes {
- go withRecover(func() {
- for _ = range p.successes {
- }
- })
- }
- var errors ProducerErrors
- if p.conf.Producer.Return.Errors {
- for event := range p.errors {
- errors = append(errors, event)
- }
- }
- if len(errors) > 0 {
- return errors
- }
- return nil
- }
- func (p *asyncProducer) AsyncClose() {
- go withRecover(p.shutdown)
- }
- func (p *asyncProducer) dispatcher() {
- handlers := make(map[string]chan<- *ProducerMessage)
- shuttingDown := false
- for msg := range p.input {
- if msg == nil {
- Logger.Println("Something tried to send a nil message, it was ignored.")
- continue
- }
- if msg.flags&shutdown != 0 {
- shuttingDown = true
- p.inFlight.Done()
- continue
- } else if msg.retries == 0 {
- if shuttingDown {
-
-
- pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
- if p.conf.Producer.Return.Errors {
- p.errors <- pErr
- } else {
- Logger.Println(pErr)
- }
- continue
- }
- p.inFlight.Add(1)
- }
- if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
- (msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
- p.returnError(msg, ErrMessageSizeTooLarge)
- continue
- }
- handler := handlers[msg.Topic]
- if handler == nil {
- handler = p.newTopicProducer(msg.Topic)
- handlers[msg.Topic] = handler
- }
- handler <- msg
- }
- for _, handler := range handlers {
- close(handler)
- }
- }
- type topicProducer struct {
- parent *asyncProducer
- topic string
- input <-chan *ProducerMessage
- breaker *breaker.Breaker
- handlers map[int32]chan<- *ProducerMessage
- partitioner Partitioner
- }
- func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
- input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
- tp := &topicProducer{
- parent: p,
- topic: topic,
- input: input,
- breaker: breaker.New(3, 1, 10*time.Second),
- handlers: make(map[int32]chan<- *ProducerMessage),
- partitioner: p.conf.Producer.Partitioner(topic),
- }
- go withRecover(tp.dispatch)
- return input
- }
- func (tp *topicProducer) dispatch() {
- for msg := range tp.input {
- if msg.retries == 0 {
- if err := tp.partitionMessage(msg); err != nil {
- tp.parent.returnError(msg, err)
- continue
- }
- }
- handler := tp.handlers[msg.Partition]
- if handler == nil {
- handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
- tp.handlers[msg.Partition] = handler
- }
- handler <- msg
- }
- for _, handler := range tp.handlers {
- close(handler)
- }
- }
- func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
- var partitions []int32
- err := tp.breaker.Run(func() (err error) {
- if tp.partitioner.RequiresConsistency() {
- partitions, err = tp.parent.client.Partitions(msg.Topic)
- } else {
- partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
- }
- return
- })
- if err != nil {
- return err
- }
- numPartitions := int32(len(partitions))
- if numPartitions == 0 {
- return ErrLeaderNotAvailable
- }
- choice, err := tp.partitioner.Partition(msg, numPartitions)
- if err != nil {
- return err
- } else if choice < 0 || choice >= numPartitions {
- return ErrInvalidPartition
- }
- msg.Partition = partitions[choice]
- return nil
- }
- type partitionProducer struct {
- parent *asyncProducer
- topic string
- partition int32
- input <-chan *ProducerMessage
- leader *Broker
- breaker *breaker.Breaker
- output chan<- *ProducerMessage
-
-
-
-
- highWatermark int
- retryState []partitionRetryState
- }
- type partitionRetryState struct {
- buf []*ProducerMessage
- expectChaser bool
- }
- func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
- input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
- pp := &partitionProducer{
- parent: p,
- topic: topic,
- partition: partition,
- input: input,
- breaker: breaker.New(3, 1, 10*time.Second),
- retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
- }
- go withRecover(pp.dispatch)
- return input
- }
- func (pp *partitionProducer) dispatch() {
-
-
- pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
- if pp.leader != nil {
- pp.output = pp.parent.getBrokerProducer(pp.leader)
- }
- for msg := range pp.input {
- if msg.retries > pp.highWatermark {
-
- pp.newHighWatermark(msg.retries)
- time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
- } else if pp.highWatermark > 0 {
-
- if msg.retries < pp.highWatermark {
-
- if msg.flags&chaser == chaser {
- pp.retryState[msg.retries].expectChaser = false
- pp.parent.inFlight.Done()
- } else {
- pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
- }
- continue
- } else if msg.flags&chaser == chaser {
-
-
- pp.retryState[pp.highWatermark].expectChaser = false
- pp.flushRetryBuffers()
- pp.parent.inFlight.Done()
- continue
- }
- }
-
-
- if pp.output == nil {
- if err := pp.updateLeader(); err != nil {
- pp.parent.returnError(msg, err)
- time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
- continue
- }
- Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
- }
- pp.output <- msg
- }
- if pp.output != nil {
- pp.parent.unrefBrokerProducer(pp.leader, pp.output)
- }
- }
- func (pp *partitionProducer) newHighWatermark(hwm int) {
- Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
- pp.highWatermark = hwm
-
-
- pp.retryState[pp.highWatermark].expectChaser = true
- pp.parent.inFlight.Add(1)
- pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
-
- Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
- pp.parent.unrefBrokerProducer(pp.leader, pp.output)
- pp.output = nil
- }
- func (pp *partitionProducer) flushRetryBuffers() {
- Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
- for {
- pp.highWatermark--
- if pp.output == nil {
- if err := pp.updateLeader(); err != nil {
- pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
- goto flushDone
- }
- Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
- }
- for _, msg := range pp.retryState[pp.highWatermark].buf {
- pp.output <- msg
- }
- flushDone:
- pp.retryState[pp.highWatermark].buf = nil
- if pp.retryState[pp.highWatermark].expectChaser {
- Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
- break
- } else if pp.highWatermark == 0 {
- Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
- break
- }
- }
- }
- func (pp *partitionProducer) updateLeader() error {
- return pp.breaker.Run(func() (err error) {
- if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
- return err
- }
- if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
- return err
- }
- pp.output = pp.parent.getBrokerProducer(pp.leader)
- return nil
- })
- }
- func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
- input := make(chan *ProducerMessage)
- bridge := make(chan []*ProducerMessage)
- a := &aggregator{
- parent: p,
- broker: broker,
- input: input,
- output: bridge,
- }
- go withRecover(a.run)
- f := &flusher{
- parent: p,
- broker: broker,
- input: bridge,
- currentRetries: make(map[string]map[int32]error),
- }
- go withRecover(f.run)
- return input
- }
- type aggregator struct {
- parent *asyncProducer
- broker *Broker
- input <-chan *ProducerMessage
- output chan<- []*ProducerMessage
- buffer []*ProducerMessage
- bufferBytes int
- timer <-chan time.Time
- }
- func (a *aggregator) run() {
- var output chan<- []*ProducerMessage
- for {
- select {
- case msg := <-a.input:
- if msg == nil {
- goto shutdown
- }
- if a.wouldOverflow(msg) {
- Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
- a.output <- a.buffer
- a.reset()
- output = nil
- }
- a.buffer = append(a.buffer, msg)
- a.bufferBytes += msg.byteSize()
- if a.readyToFlush(msg) {
- output = a.output
- } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
- a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
- }
- case <-a.timer:
- output = a.output
- case output <- a.buffer:
- a.reset()
- output = nil
- }
- }
- shutdown:
- if len(a.buffer) > 0 {
- a.output <- a.buffer
- }
- close(a.output)
- }
- func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
- switch {
-
- case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
- return true
-
- case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
- return true
-
- case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
- return true
- default:
- return false
- }
- }
- func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
- switch {
-
- case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
- return true
-
- case msg.flags&chaser == chaser:
- return true
-
- case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
- return true
-
- case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
- return true
- default:
- return false
- }
- }
- func (a *aggregator) reset() {
- a.timer = nil
- a.buffer = nil
- a.bufferBytes = 0
- }
- type flusher struct {
- parent *asyncProducer
- broker *Broker
- input <-chan []*ProducerMessage
- currentRetries map[string]map[int32]error
- }
- func (f *flusher) run() {
- var closing error
- Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
- for batch := range f.input {
- if closing != nil {
- f.parent.retryMessages(batch, closing)
- continue
- }
- msgSets := f.groupAndFilter(batch)
- request := f.parent.buildRequest(msgSets)
- if request == nil {
- continue
- }
- response, err := f.broker.Produce(request)
- switch err.(type) {
- case nil:
- break
- case PacketEncodingError:
- f.parent.returnErrors(batch, err)
- continue
- default:
- Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
- f.parent.abandonBrokerConnection(f.broker)
- _ = f.broker.Close()
- closing = err
- f.parent.retryMessages(batch, err)
- continue
- }
- if response == nil {
-
- f.parent.returnSuccesses(batch)
- continue
- }
- f.parseResponse(msgSets, response)
- }
- Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
- }
- func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
- msgSets := make(map[string]map[int32][]*ProducerMessage)
- for i, msg := range batch {
- if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
-
- f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
- batch[i] = nil
- if msg.flags&chaser == chaser {
-
- Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
- f.broker.ID(), msg.Topic, msg.Partition)
- delete(f.currentRetries[msg.Topic], msg.Partition)
- }
- continue
- }
- partitionSet := msgSets[msg.Topic]
- if partitionSet == nil {
- partitionSet = make(map[int32][]*ProducerMessage)
- msgSets[msg.Topic] = partitionSet
- }
- partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
- }
- return msgSets
- }
- func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
-
-
- for topic, partitionSet := range msgSets {
- for partition, msgs := range partitionSet {
- block := response.GetBlock(topic, partition)
- if block == nil {
- f.parent.returnErrors(msgs, ErrIncompleteResponse)
- continue
- }
- switch block.Err {
-
- case ErrNoError:
- for i := range msgs {
- msgs[i].Offset = block.Offset + int64(i)
- }
- f.parent.returnSuccesses(msgs)
-
- case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
- ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
- Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
- f.broker.ID(), topic, partition, block.Err)
- if f.currentRetries[topic] == nil {
- f.currentRetries[topic] = make(map[int32]error)
- }
- f.currentRetries[topic][partition] = block.Err
- f.parent.retryMessages(msgs, block.Err)
-
- default:
- f.parent.returnErrors(msgs, block.Err)
- }
- }
- }
- }
- func (p *asyncProducer) retryHandler() {
- var msg *ProducerMessage
- buf := queue.New()
- for {
- if buf.Length() == 0 {
- msg = <-p.retries
- } else {
- select {
- case msg = <-p.retries:
- case p.input <- buf.Peek().(*ProducerMessage):
- buf.Remove()
- continue
- }
- }
- if msg == nil {
- return
- }
- buf.Add(msg)
- }
- }
- func (p *asyncProducer) shutdown() {
- Logger.Println("Producer shutting down.")
- p.inFlight.Add(1)
- p.input <- &ProducerMessage{flags: shutdown}
- p.inFlight.Wait()
- if p.ownClient {
- err := p.client.Close()
- if err != nil {
- Logger.Println("producer/shutdown failed to close the embedded client:", err)
- }
- }
- close(p.input)
- close(p.retries)
- close(p.errors)
- close(p.successes)
- }
- func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
- req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
- empty := true
- for topic, partitionSet := range batch {
- for partition, msgSet := range partitionSet {
- setToSend := new(MessageSet)
- setSize := 0
- for _, msg := range msgSet {
- var keyBytes, valBytes []byte
- var err error
- if msg.Key != nil {
- if keyBytes, err = msg.Key.Encode(); err != nil {
- p.returnError(msg, err)
- continue
- }
- }
- if msg.Value != nil {
- if valBytes, err = msg.Value.Encode(); err != nil {
- p.returnError(msg, err)
- continue
- }
- }
- if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
-
-
- valBytes, err := encode(setToSend)
- if err != nil {
- Logger.Println(err)
- panic(err)
- }
- req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
- setToSend = new(MessageSet)
- setSize = 0
- }
- setSize += msg.byteSize()
- setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
- empty = false
- }
- if p.conf.Producer.Compression == CompressionNone {
- req.AddSet(topic, partition, setToSend)
- } else {
- valBytes, err := encode(setToSend)
- if err != nil {
- Logger.Println(err)
- panic(err)
- }
- req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
- }
- }
- }
- if empty {
- return nil
- }
- return req
- }
- func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
- msg.clear()
- pErr := &ProducerError{Msg: msg, Err: err}
- if p.conf.Producer.Return.Errors {
- p.errors <- pErr
- } else {
- Logger.Println(pErr)
- }
- p.inFlight.Done()
- }
- func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
- for _, msg := range batch {
- if msg != nil {
- p.returnError(msg, err)
- }
- }
- }
- func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
- for _, msg := range batch {
- if msg == nil {
- continue
- }
- if p.conf.Producer.Return.Successes {
- msg.clear()
- p.successes <- msg
- }
- p.inFlight.Done()
- }
- }
- func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
- for _, msg := range batch {
- if msg == nil {
- continue
- }
- if msg.retries >= p.conf.Producer.Retry.Max {
- p.returnError(msg, err)
- } else {
- msg.retries++
- p.retries <- msg
- }
- }
- }
- func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
- p.brokerLock.Lock()
- defer p.brokerLock.Unlock()
- bp := p.brokers[broker]
- if bp == nil {
- bp = p.newBrokerProducer(broker)
- p.brokers[broker] = bp
- p.brokerRefs[bp] = 0
- }
- p.brokerRefs[bp]++
- return bp
- }
- func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
- p.brokerLock.Lock()
- defer p.brokerLock.Unlock()
- p.brokerRefs[bp]--
- if p.brokerRefs[bp] == 0 {
- close(bp)
- delete(p.brokerRefs, bp)
- if p.brokers[broker] == bp {
- delete(p.brokers, broker)
- }
- }
- }
- func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
- p.brokerLock.Lock()
- defer p.brokerLock.Unlock()
- delete(p.brokers, broker)
- }
|