Преглед на файлове

Make Partition and Offset normal struct fields instead of functions.

Willem van Bergen преди 10 години
родител
ревизия
a468f3ae10
променени са 3 файла, в които са добавени 25 реда и са изтрити 32 реда
  1. 20 30
      async_producer.go
  2. 3 0
      mocks/async_producer.go
  3. 2 2
      sync_producer.go

+ 20 - 30
async_producer.go

@@ -109,28 +109,18 @@ const (
 
 
 // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
 // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
 type ProducerMessage struct {
 type ProducerMessage struct {
-	Topic    string      // The Kafka topic for this message.
-	Key      Encoder     // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
-	Value    Encoder     // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
-	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
+	Topic string  // The Kafka topic for this message.
+	Key   Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
+	Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
 
 
-	// these are filled in by the producer as the message is processed
-	offset    int64
-	partition int32
-	retries   int
-	flags     flagSet
-}
+	// These are filled in by the producer as the message is processed
+	Offset    int64 // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
+	Partition int32 // Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
 
 
-// Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
-// the message was successfully delivered and RequiredAcks is not NoResponse.
-func (m *ProducerMessage) Offset() int64 {
-	return m.offset
-}
+	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
 
 
-// Partition is the partition that the message was sent to. This is only guaranteed to be defined if
-// the message was successfully delivered.
-func (m *ProducerMessage) Partition() int32 {
-	return m.partition
+	retries int
+	flags   flagSet
 }
 }
 
 
 func (m *ProducerMessage) byteSize() int {
 func (m *ProducerMessage) byteSize() int {
@@ -283,15 +273,15 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
 			}
 			}
 		}
 		}
 
 
-		handler := handlers[msg.partition]
+		handler := handlers[msg.Partition]
 		if handler == nil {
 		if handler == nil {
 			p.retries <- &ProducerMessage{flags: ref}
 			p.retries <- &ProducerMessage{flags: ref}
 			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			topic := msg.Topic         // block local because go's closure semantics suck
 			topic := msg.Topic         // block local because go's closure semantics suck
-			partition := msg.partition // block local because go's closure semantics suck
+			partition := msg.Partition // block local because go's closure semantics suck
 			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
 			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
 			handler = newHandler
 			handler = newHandler
-			handlers[msg.partition] = handler
+			handlers[msg.Partition] = handler
 		}
 		}
 
 
 		handler <- msg
 		handler <- msg
@@ -348,7 +338,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
 			highWatermark = msg.retries
 			highWatermark = msg.retries
 			Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
 			Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
 			retryState[msg.retries].expectChaser = true
 			retryState[msg.retries].expectChaser = true
-			output <- &ProducerMessage{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
+			output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
 			p.unrefBrokerProducer(leader)
 			p.unrefBrokerProducer(leader)
 			output = nil
 			output = nil
@@ -497,14 +487,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
 		// group messages by topic/partition
 		// group messages by topic/partition
 		msgSets := make(map[string]map[int32][]*ProducerMessage)
 		msgSets := make(map[string]map[int32][]*ProducerMessage)
 		for i, msg := range batch {
 		for i, msg := range batch {
-			if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
+			if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.Partition] != nil {
 				if msg.flags&chaser == chaser {
 				if msg.flags&chaser == chaser {
 					// we can start processing this topic/partition again
 					// we can start processing this topic/partition again
 					Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
 					Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
-						broker.ID(), msg.Topic, msg.partition)
-					currentRetries[msg.Topic][msg.partition] = nil
+						broker.ID(), msg.Topic, msg.Partition)
+					currentRetries[msg.Topic][msg.Partition] = nil
 				}
 				}
-				p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.partition])
+				p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
 				batch[i] = nil // to prevent it being returned/retried twice
 				batch[i] = nil // to prevent it being returned/retried twice
 				continue
 				continue
 			}
 			}
@@ -515,7 +505,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				msgSets[msg.Topic] = partitionSet
 				msgSets[msg.Topic] = partitionSet
 			}
 			}
 
 
-			partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
+			partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
 		}
 		}
 
 
 		request := p.buildRequest(msgSets)
 		request := p.buildRequest(msgSets)
@@ -563,7 +553,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
 					// All the messages for this topic-partition were delivered successfully!
 					// All the messages for this topic-partition were delivered successfully!
 					if p.conf.Producer.Return.Successes {
 					if p.conf.Producer.Return.Successes {
 						for i := range msgs {
 						for i := range msgs {
-							msgs[i].offset = block.Offset + int64(i)
+							msgs[i].Offset = block.Offset + int64(i)
 						}
 						}
 						p.returnSuccesses(msgs)
 						p.returnSuccesses(msgs)
 					}
 					}
@@ -664,7 +654,7 @@ func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMe
 		return ErrInvalidPartition
 		return ErrInvalidPartition
 	}
 	}
 
 
-	msg.partition = partitions[choice]
+	msg.Partition = partitions[choice]
 
 
 	return nil
 	return nil
 }
 }

+ 3 - 0
mocks/async_producer.go

@@ -18,6 +18,7 @@ type AsyncProducer struct {
 	input        chan *sarama.ProducerMessage
 	input        chan *sarama.ProducerMessage
 	successes    chan *sarama.ProducerMessage
 	successes    chan *sarama.ProducerMessage
 	errors       chan *sarama.ProducerError
 	errors       chan *sarama.ProducerError
+	lastOffset   int64
 }
 }
 
 
 // NewAsyncProducer instantiates a new Producer mock. The t argument should
 // NewAsyncProducer instantiates a new Producer mock. The t argument should
@@ -52,7 +53,9 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
 				expectation := mp.expectations[0]
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
 				mp.expectations = mp.expectations[1:]
 				if expectation.Result == errProduceSuccess {
 				if expectation.Result == errProduceSuccess {
+					mp.lastOffset++
 					if config.Producer.Return.Successes {
 					if config.Producer.Return.Successes {
+						msg.Offset = mp.lastOffset
 						mp.successes <- msg
 						mp.successes <- msg
 					}
 					}
 				} else {
 				} else {

+ 2 - 2
sync_producer.go

@@ -56,8 +56,8 @@ func (sp *syncProducer) SendMessage(topic string, key, value Encoder) (partition
 	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
 	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
 	sp.producer.Input() <- msg
 	sp.producer.Input() <- msg
 	err = <-expectation
 	err = <-expectation
-	partition = msg.Partition()
-	offset = msg.Offset()
+	partition = msg.Partition
+	offset = msg.Offset
 	return
 	return
 }
 }