Browse Source

Merge pull request #310 from Shopify/sync-producer

Rename SimpleProducer to SyncProducer
Evan Huus 10 years ago
parent
commit
e5e8ace555
2 changed files with 35 additions and 22 deletions
  1. 21 12
      producer_test.go
  2. 14 10
      sync_producer.go

+ 21 - 12
producer_test.go

@@ -35,13 +35,13 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 	}
 }
 
-func TestSimpleProducer(t *testing.T) {
+func TestSyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -55,13 +55,19 @@ func TestSimpleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewSimpleProducer(client, nil)
+	producer, err := NewSyncProducer(client, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	for i := 0; i < 10; i++ {
-		err = producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
+		partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
+		if partition != 0 {
+			t.Error("Unexpected partition")
+		}
+		if offset != 0 {
+			t.Error("Unexpected offset")
+		}
 		if err != nil {
 			t.Error(err)
 		}
@@ -73,13 +79,13 @@ func TestSimpleProducer(t *testing.T) {
 	seedBroker.Close()
 }
 
-func TestConcurrentSimpleProducer(t *testing.T) {
+func TestConcurrentSyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -93,7 +99,7 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 	config := NewProducerConfig()
 	config.FlushMsgCount = 100
-	producer, err := NewSimpleProducer(client, config)
+	producer, err := NewSyncProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -103,7 +109,10 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 	for i := 0; i < 100; i++ {
 		wg.Add(1)
 		go func() {
-			err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
+			partition, _, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
+			if partition != 0 {
+				t.Error("Unexpected partition")
+			}
 			if err != nil {
 				t.Error(err)
 			}
@@ -584,7 +593,7 @@ func ExampleProducer() {
 	}
 }
 
-func ExampleSimpleProducer() {
+func ExampleSyncProducer() {
 	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
 	if err != nil {
 		panic(err)
@@ -593,18 +602,18 @@ func ExampleSimpleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewSimpleProducer(client, nil)
+	producer, err := NewSyncProducer(client, nil)
 	if err != nil {
 		panic(err)
 	}
 	defer producer.Close()
 
 	for {
-		err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
+		partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
 		if err != nil {
 			panic(err)
 		} else {
-			fmt.Println("> message sent")
+			fmt.Printf("> message sent to partition %d at offset %d\n", partition, offset)
 		}
 	}
 }

+ 14 - 10
simple_producer.go → sync_producer.go

@@ -2,16 +2,16 @@ package sarama
 
 import "sync"
 
-// SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
+// SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
-type SimpleProducer struct {
+type SyncProducer struct {
 	producer *Producer
 	wg       sync.WaitGroup
 }
 
-// NewSimpleProducer creates a new SimpleProducer using the given client  and configuration.
-func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer, error) {
+// NewSyncProducer creates a new SyncProducer using the given client  and configuration.
+func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error) {
 	if config == nil {
 		config = NewProducerConfig()
 	}
@@ -23,7 +23,7 @@ func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer,
 		return nil, err
 	}
 
-	sp := &SimpleProducer{producer: prod}
+	sp := &SyncProducer{producer: prod}
 
 	sp.wg.Add(2)
 	go withRecover(sp.handleSuccesses)
@@ -33,14 +33,18 @@ func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer,
 }
 
 // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
-func (sp *SimpleProducer) SendMessage(topic string, key, value Encoder) error {
+// It returns the partition and offset of the successfully-produced message, or the error (if any).
+func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
 	expectation := make(chan error, 1)
 	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
 	sp.producer.Input() <- msg
-	return <-expectation
+	err = <-expectation
+	partition = msg.Partition()
+	offset = msg.Offset()
+	return
 }
 
-func (sp *SimpleProducer) handleSuccesses() {
+func (sp *SyncProducer) handleSuccesses() {
 	defer sp.wg.Done()
 	for msg := range sp.producer.Successes() {
 		expectation := msg.Metadata.(chan error)
@@ -48,7 +52,7 @@ func (sp *SimpleProducer) handleSuccesses() {
 	}
 }
 
-func (sp *SimpleProducer) handleErrors() {
+func (sp *SyncProducer) handleErrors() {
 	defer sp.wg.Done()
 	for err := range sp.producer.Errors() {
 		expectation := err.Msg.Metadata.(chan error)
@@ -59,7 +63,7 @@ func (sp *SimpleProducer) handleErrors() {
 // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
 // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
 // on the underlying client.
-func (sp *SimpleProducer) Close() error {
+func (sp *SyncProducer) Close() error {
 	sp.producer.AsyncClose()
 	sp.wg.Wait()
 	return nil