Explorar o código

Take PartitionerConstructors in the producer

and generate a new partitioner for each topic
Evan Huus %!s(int64=11) %!d(string=hai) anos
pai
achega
dab3358e26
Modificáronse 4 ficheiros con 27 adicións e 19 borrados
  1. 7 3
      partitioner.go
  2. 18 14
      producer.go
  3. 1 1
      producer_test.go
  4. 1 1
      simple_producer.go

+ 7 - 3
partitioner.go

@@ -21,7 +21,7 @@ type RandomPartitioner struct {
 	m         sync.Mutex
 }
 
-func NewRandomPartitioner() *RandomPartitioner {
+func NewRandomPartitioner() Partitioner {
 	p := new(RandomPartitioner)
 	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
 	return p
@@ -39,6 +39,10 @@ type RoundRobinPartitioner struct {
 	m         sync.Mutex
 }
 
+func NewRoundRobinPartitioner() Partitioner {
+	return &RoundRobinPartitioner{}
+}
+
 func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 	p.m.Lock()
 	defer p.m.Unlock()
@@ -54,12 +58,12 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int3
 // is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
 // with the same key always end up on the same partition.
 type HashPartitioner struct {
-	random *RandomPartitioner
+	random Partitioner
 	hasher hash.Hash32
 	m      sync.Mutex
 }
 
-func NewHashPartitioner() *HashPartitioner {
+func NewHashPartitioner() Partitioner {
 	p := new(HashPartitioner)
 	p.random = NewRandomPartitioner()
 	p.hasher = fnv.New32a()

+ 18 - 14
producer.go

@@ -10,24 +10,27 @@ func forceFlushThreshold() int {
 	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
 }
 
+// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
+type PartitionerConstructor func() Partitioner
+
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
 type ProducerConfig struct {
-	Partitioner       Partitioner      // Chooses the partition to send messages to (defaults to random).
-	RequiredAcks      RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
-	Timeout           time.Duration    // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
-	Compression       CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	FlushMsgCount     int              // The number of messages needed to trigger a flush.
-	FlushFrequency    time.Duration    // If this amount of time elapses without a flush, one will be queued.
-	FlushByteCount    int              // If this many bytes of messages are accumulated, a flush will be triggered.
-	AckSuccesses      bool             // If enabled, successfully delivered messages will also be returned on the Errors channel, with a nil Err field
-	MaxMessageBytes   int              // The maximum permitted size of a message (defaults to 1000000)
-	ChannelBufferSize int              // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
+	Partitioner       PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to random).
+	RequiredAcks      RequiredAcks           // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+	Timeout           time.Duration          // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
+	Compression       CompressionCodec       // The type of compression to use on messages (defaults to no compression).
+	FlushMsgCount     int                    // The number of messages needed to trigger a flush.
+	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued.
+	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered.
+	AckSuccesses      bool                   // If enabled, successfully delivered messages will also be returned on the Errors channel, with a nil Err field
+	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000)
+	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
 }
 
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
 func NewProducerConfig() *ProducerConfig {
 	return &ProducerConfig{
-		Partitioner:     NewRandomPartitioner(),
+		Partitioner:     NewRandomPartitioner,
 		RequiredAcks:    WaitForLocal,
 		MaxMessageBytes: 1000000,
 	}
@@ -267,10 +270,11 @@ func (p *Producer) topicDispatcher() {
 // one per topic
 func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
 	handlers := make(map[int32]chan *MessageToSend)
+	partitioner := p.config.Partitioner()
 
 	for msg := range input {
 		if msg.flags&retried == 0 {
-			err := p.assignPartition(msg)
+			err := p.assignPartition(partitioner, msg)
 			if err != nil {
 				p.errors <- &ProduceError{Msg: msg, Err: err}
 				continue
@@ -575,7 +579,7 @@ func (p *Producer) retryHandler() {
 
 // utility functions
 
-func (p *Producer) assignPartition(msg *MessageToSend) error {
+func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
 	partitions, err := p.client.Partitions(msg.Topic)
 	if err != nil {
 		return err
@@ -587,7 +591,7 @@ func (p *Producer) assignPartition(msg *MessageToSend) error {
 		return LeaderNotAvailable
 	}
 
-	choice := p.config.Partitioner.Partition(msg.Key, numPartitions)
+	choice := partitioner.Partition(msg.Key, numPartitions)
 
 	if choice < 0 || choice >= numPartitions {
 		return InvalidPartition

+ 1 - 1
producer_test.go

@@ -165,7 +165,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	config := NewProducerConfig()
 	config.FlushMsgCount = 5
 	config.AckSuccesses = true
-	config.Partitioner = &RoundRobinPartitioner{}
+	config.Partitioner = NewRoundRobinPartitioner
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)

+ 1 - 1
simple_producer.go

@@ -10,7 +10,7 @@ type SimpleProducer struct {
 
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
 // partitioner is nil, messages are partitioned randomly.
-func NewSimpleProducer(client *Client, topic string, partitioner Partitioner) (*SimpleProducer, error) {
+func NewSimpleProducer(client *Client, topic string, partitioner PartitionerConstructor) (*SimpleProducer, error) {
 	if topic == "" {
 		return nil, ConfigurationError("Empty topic")
 	}