package sarama import "log" type produceMessage struct { tp topicPartition key, value []byte retried bool sync bool } type produceRequestBuilder []*produceMessage // If the message is synchronous, we manually send it and wait for a return. // Otherwise, we just hand it back to the producer to enqueue using the normal // method. func (msg *produceMessage) enqueue(p *Producer) error { if !msg.sync { return p.addMessage(msg) } var prb produceRequestBuilder = []*produceMessage{msg} bp, err := p.brokerProducerFor(msg.tp) if err != nil { return err } errs := make(chan error, 1) bp.flushRequest(p, prb, func(err error) { errs <- err }) return <-errs } func (msg *produceMessage) reenqueue(p *Producer) error { if !msg.retried { msg.retried = true return msg.enqueue(p) } return nil } func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool { return msg.tp.partition == partition && msg.tp.topic == topic } func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest { req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout} // If compression is enabled, we need to group messages by topic-partition and // wrap them in MessageSets. We already discarded that grouping, so we // inefficiently re-sort them. This could be optimized (ie. pass a hash around // rather than an array. Not sure what the best way is. if config.Compression != CompressionNone { msgSets := make(map[topicPartition]*MessageSet) for _, pmsg := range b { msgSet, ok := msgSets[pmsg.tp] if !ok { msgSet = new(MessageSet) msgSets[pmsg.tp] = msgSet } msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value}) } for tp, msgSet := range msgSets { valBytes, err := encode(msgSet) if err != nil { log.Fatal(err) // if this happens, it's basically our fault. } msg := Message{Codec: config.Compression, Key: nil, Value: valBytes} req.AddMessage(tp.topic, tp.partition, &msg) } return req } // Compression is not enabled. Dumb-ly append each request directly to the // request, with no MessageSet wrapper. for _, pmsg := range b { msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value} req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg) } return req } func (msg *produceMessage) byteSize() uint32 { return uint32(len(msg.key) + len(msg.value)) } func (b produceRequestBuilder) byteSize() uint32 { var size uint32 for _, m := range b { size += m.byteSize() } return size } func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) { for i := len(b) - 1; i >= 0; i-- { fn(b[i]) } }