Explorar el Código

Make the simple producer API more powerful

Take a full ProducerConfig for more flexibility (covers #242) and permit
producing messages with different topics. Breaks the API, but we're not frozen
quite yet.
Evan Huus hace 11 años
padre
commit
bc6752428e
Se han modificado 2 ficheros con 13 adiciones y 22 borrados
  1. 6 6
      producer_test.go
  2. 7 16
      simple_producer.go

+ 6 - 6
producer_test.go

@@ -35,13 +35,13 @@ func TestSimpleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewSimpleProducer(client, "my_topic", nil)
+	producer, err := NewSimpleProducer(client, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	for i := 0; i < 10; i++ {
-		err = producer.SendMessage(nil, StringEncoder(TestMessage))
+		err = producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
 		if err != nil {
 			t.Error(err)
 		}
@@ -72,7 +72,7 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewSimpleProducer(client, "my_topic", nil)
+	producer, err := NewSimpleProducer(client, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -82,7 +82,7 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 	for i := 0; i < 100; i++ {
 		wg.Add(1)
 		go func() {
-			err := producer.SendMessage(nil, StringEncoder(TestMessage))
+			err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
 			if err != nil {
 				t.Error(err)
 			}
@@ -365,14 +365,14 @@ func ExampleSimpleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewSimpleProducer(client, "my_topic", nil)
+	producer, err := NewSimpleProducer(client, nil)
 	if err != nil {
 		panic(err)
 	}
 	defer producer.Close()
 
 	for {
-		err = producer.SendMessage(nil, StringEncoder("testing 123"))
+		err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
 		if err != nil {
 			panic(err)
 		} else {

+ 7 - 16
simple_producer.go

@@ -5,7 +5,6 @@ package sarama
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SimpleProducer struct {
 	producer        *Producer
-	topic           string
 	newExpectations chan *spExpect
 }
 
@@ -14,19 +13,12 @@ type spExpect struct {
 	result chan error
 }
 
-// NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
-// partitioner is nil, messages are partitioned by the hash of the key
-// (or randomly if there is no key).
-func NewSimpleProducer(client *Client, topic string, partitioner PartitionerConstructor) (*SimpleProducer, error) {
-	if topic == "" {
-		return nil, ConfigurationError("Empty topic")
+// NewSimpleProducer creates a new SimpleProducer using the given client  and configuration.
+func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer, error) {
+	if config == nil {
+		config = NewProducerConfig()
 	}
-
-	config := NewProducerConfig()
 	config.AckSuccesses = true
-	if partitioner != nil {
-		config.Partitioner = partitioner
-	}
 
 	prod, err := NewProducer(client, config)
 
@@ -36,7 +28,6 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 
 	sp := &SimpleProducer{
 		producer:        prod,
-		topic:           topic,
 		newExpectations: make(chan *spExpect), // this must be unbuffered
 	}
 
@@ -45,9 +36,9 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 	return sp, nil
 }
 
-// SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
-func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
-	msg := &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+// SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
+func (sp *SimpleProducer) SendMessage(topic string, key, value Encoder) error {
+	msg := &MessageToSend{Topic: topic, Key: key, Value: value}
 	expectation := &spExpect{msg: msg, result: make(chan error)}
 	sp.newExpectations <- expectation
 	sp.producer.Input() <- msg