123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package sarama
- import "log"
- type produceMessage struct {
- tp topicPartition
- key, value []byte
- retried bool
- sync bool
- }
- type produceRequestBuilder []*produceMessage
- func (msg *produceMessage) enqueue(p *Producer) error {
- if !msg.sync {
- return p.addMessage(msg)
- }
- var prb produceRequestBuilder = []*produceMessage{msg}
- bp, err := p.brokerProducerFor(msg.tp)
- if err != nil {
- return err
- }
- errs := make(chan error, 1)
- bp.flushRequest(p, prb, func(err error) {
- errs <- err
- })
- return <-errs
- }
- func (msg *produceMessage) reenqueue(p *Producer) error {
- if !msg.retried {
- msg.retried = true
- return msg.enqueue(p)
- }
- return nil
- }
- func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
- return msg.tp.partition == partition && msg.tp.topic == topic
- }
- func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
- req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
-
-
-
-
- if config.Compression != CompressionNone {
- msgSets := make(map[topicPartition]*MessageSet)
- for _, pmsg := range b {
- msgSet, ok := msgSets[pmsg.tp]
- if !ok {
- msgSet = new(MessageSet)
- msgSets[pmsg.tp] = msgSet
- }
- msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
- }
- for tp, msgSet := range msgSets {
- valBytes, err := encode(msgSet)
- if err != nil {
- log.Fatal(err)
- }
- msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
- req.AddMessage(tp.topic, tp.partition, &msg)
- }
- return req
- }
-
-
- for _, pmsg := range b {
- msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
- req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
- }
- return req
- }
- func (msg *produceMessage) byteSize() uint32 {
- return uint32(len(msg.key) + len(msg.value))
- }
- func (b produceRequestBuilder) byteSize() uint32 {
- var size uint32
- for _, m := range b {
- size += m.byteSize()
- }
- return size
- }
- func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
- for i := len(b) - 1; i >= 0; i-- {
- fn(b[i])
- }
- }
|