فهرست منبع

Move successes to its own channel

Evan Huus 11 سال پیش
والد
کامیت
eb42926dd4
4فایلهای تغییر یافته به همراه73 افزوده شده و 46 حذف شده
  1. 7 9
      functional_test.go
  2. 38 18
      producer.go
  3. 21 16
      producer_test.go
  4. 7 3
      simple_producer.go

+ 7 - 9
functional_test.go

@@ -103,19 +103,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 		case producer.Input() <- msg:
 			i++
 		case ret := <-producer.Errors():
-			if ret.Err == nil {
-				expectedResponses--
-			} else {
-				t.Fatal(ret.Err)
-			}
+			t.Fatal(ret.Err)
+		case <-producer.Successes():
+			expectedResponses--
 		}
 	}
 	for expectedResponses > 0 {
-		ret := <-producer.Errors()
-		if ret.Err == nil {
-			expectedResponses--
-		} else {
+		select {
+		case ret := <-producer.Errors():
 			t.Fatal(ret.Err)
+		case <-producer.Successes():
+			expectedResponses--
 		}
 	}
 	err = producer.Close()

+ 38 - 18
producer.go

@@ -19,7 +19,7 @@ type ProducerConfig struct {
 	FlushMsgCount     int                    // The number of messages needed to trigger a flush.
 	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued.
 	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered.
-	AckSuccesses      bool                   // If enabled, successfully delivered messages will also be returned on the Errors channel, with a nil Err field
+	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000)
 	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
 }
@@ -84,8 +84,8 @@ type Producer struct {
 	client *Client
 	config ProducerConfig
 
-	errors         chan *ProduceError
-	input, retries chan *MessageToSend
+	errors                    chan *ProduceError
+	input, successes, retries chan *MessageToSend
 
 	brokers    map[*Broker]*brokerWorker
 	brokerLock sync.Mutex
@@ -108,12 +108,13 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	}
 
 	p := &Producer{
-		client:  client,
-		config:  *config,
-		errors:  make(chan *ProduceError),
-		input:   make(chan *MessageToSend),
-		retries: make(chan *MessageToSend),
-		brokers: make(map[*Broker]*brokerWorker),
+		client:    client,
+		config:    *config,
+		errors:    make(chan *ProduceError),
+		input:     make(chan *MessageToSend),
+		successes: make(chan *MessageToSend),
+		retries:   make(chan *MessageToSend),
+		brokers:   make(map[*Broker]*brokerWorker),
 	}
 
 	// launch our singleton dispatchers
@@ -184,12 +185,19 @@ func (pe ProduceErrors) Error() string {
 	return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
 }
 
-// Errors is the output channel back to the user. You MUST read from this channel or the Producer will deadlock.
+// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
 // It is suggested that you send messages and read errors together in a single select statement.
 func (p *Producer) Errors() <-chan *ProduceError {
 	return p.errors
 }
 
+// Successes is the success output channel back to the user when AckSuccesses is configured.
+// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
+// It is suggested that you send and read messages together in a single select statement.
+func (p *Producer) Successes() <-chan *MessageToSend {
+	return p.successes
+}
+
 // Input is the input channel for the user to write messages to that they wish to send.
 func (p *Producer) Input() chan<- *MessageToSend {
 	return p.input
@@ -208,6 +216,9 @@ func (p *Producer) Close() error {
 	for event := range p.errors {
 		errors = append(errors, event)
 	}
+
+	close(p.successes)
+
 	if len(errors) > 0 {
 		return errors
 	}
@@ -331,14 +342,14 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			if output == nil {
 				err := p.client.RefreshTopicMetadata(topic)
 				if err != nil {
-					p.returnMessages(backlog, err)
+					p.returnErrors(backlog, err)
 					backlog = nil
 					continue
 				}
 
 				leader, err = p.client.Leader(topic, partition)
 				if err != nil {
-					p.returnMessages(backlog, err)
+					p.returnErrors(backlog, err)
 					backlog = nil
 					continue
 				}
@@ -483,7 +494,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 		case nil:
 			break
 		case EncodingError:
-			p.returnMessages(batch, err)
+			p.returnErrors(batch, err)
 			continue
 		default:
 			p.client.disconnectBroker(broker)
@@ -495,7 +506,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
 			if p.config.AckSuccesses {
-				p.returnMessages(batch, nil)
+				p.returnSuccesses(batch)
 			}
 			continue
 		}
@@ -507,7 +518,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 
 				block := response.GetBlock(topic, partition)
 				if block == nil {
-					p.returnMessages(msgs, IncompleteResponse)
+					p.returnErrors(msgs, IncompleteResponse)
 					continue
 				}
 
@@ -518,7 +529,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 						}
-						p.returnMessages(msgs, nil)
+						p.returnSuccesses(msgs)
 					}
 				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
 					if currentRetries[topic] == nil {
@@ -527,7 +538,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 					currentRetries[topic][partition] = block.Err
 					p.retryMessages(msgs, block.Err)
 				default:
-					p.returnMessages(msgs, block.Err)
+					p.returnErrors(msgs, block.Err)
 				}
 			}
 		}
@@ -668,7 +679,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
 	return req
 }
 
-func (p *Producer) returnMessages(batch []*MessageToSend, err error) {
+func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
 	for _, msg := range batch {
 		if msg == nil {
 			continue
@@ -677,6 +688,15 @@ func (p *Producer) returnMessages(batch []*MessageToSend, err error) {
 	}
 }
 
+func (p *Producer) returnSuccesses(batch []*MessageToSend) {
+	for _, msg := range batch {
+		if msg == nil {
+			continue
+		}
+		p.successes <- msg
+	}
+}
+
 func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
 	Logger.Println("Producer requeueing batch of", len(batch), "messages due to error:", err)
 	for _, msg := range batch {

+ 21 - 16
producer_test.go

@@ -85,9 +85,10 @@ func TestProducer(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	for i := 0; i < 10; i++ {
-		msg := <-producer.Errors()
-		if msg.Err != nil {
-			t.Error(err)
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+		case <-producer.Successes():
 		}
 	}
 }
@@ -129,9 +130,10 @@ func TestProducerMultipleFlushes(t *testing.T) {
 			producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		}
 		for i := 0; i < 5; i++ {
-			msg := <-producer.Errors()
-			if msg.Err != nil {
-				t.Error(err)
+			select {
+			case msg := <-producer.Errors():
+				t.Error(msg.Err)
+			case <-producer.Successes():
 			}
 		}
 	}
@@ -180,9 +182,10 @@ func TestProducerMultipleBrokers(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	for i := 0; i < 10; i++ {
-		msg := <-producer.Errors()
-		if msg.Err != nil {
-			t.Error(err)
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+		case <-producer.Successes():
 		}
 	}
 }
@@ -231,9 +234,10 @@ func TestProducerFailureRetry(t *testing.T) {
 	response4.AddTopicPartition("my_topic", 0, NoError)
 	broker3.Returns(response4)
 	for i := 0; i < 10; i++ {
-		msg := <-producer.Errors()
-		if msg.Err != nil {
-			t.Error(err)
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+		case <-producer.Successes():
 		}
 	}
 
@@ -242,9 +246,10 @@ func TestProducerFailureRetry(t *testing.T) {
 	}
 	broker3.Returns(response4)
 	for i := 0; i < 10; i++ {
-		msg := <-producer.Errors()
-		if msg.Err != nil {
-			t.Error(err)
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+		case <-producer.Successes():
 		}
 	}
 }
@@ -269,7 +274,7 @@ func ExampleProducer() {
 		case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
 			fmt.Println("> message queued")
 		case err := <-producer.Errors():
-			panic(err)
+			panic(err.Err)
 		}
 	}
 }

+ 7 - 3
simple_producer.go

@@ -35,9 +35,13 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
 	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
 
-	result := <-sp.producer.Errors() // we always get something because AckSuccesses is true
-
-	return result.Err
+	// we always get one or the other because AckSuccesses is true
+	select {
+	case err := <-sp.producer.Errors():
+		return err.Err
+	case <-sp.producer.Successes():
+		return nil
+	}
 }
 
 // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before