Browse Source

Fewer producer helper functions, just make StringEncoder public

Evan Huus 12 years ago
parent
commit
9c71ad5039
2 changed files with 11 additions and 19 deletions
  1. 7 17
      kafka/producer.go
  2. 4 2
      kafka/utils.go

+ 7 - 17
kafka/producer.go

@@ -29,19 +29,9 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 }
 
 // SendMessage sends a message with the given key and value. If key is nil, the partition to send to is selected randomly, otherwise it
-// is selected by the Producer's Partitioner.
+// is selected by the Producer's Partitioner. To send strings as either key or value, see the StringEncoder type.
 func (p *Producer) SendMessage(key, value Encoder) error {
-	return p.safeSendMessage(key, value, 1)
-}
-
-// SendStringMessage is a helper for wrapping a string in an Encoder and calling SendMessage with the wrapped string (and a nil key).
-func (p *Producer) SendStringMessage(msg string) error {
-	return p.SendMessage(nil, encodableString(msg))
-}
-
-// SendKeyedStringMessage is a helper for wrapping both key and value strings in an Encoder and calling SendMessage with the wrapped strings.
-func (p *Producer) SendKeyedStringMessage(key, value string) error {
-	return p.SendMessage(encodableString(key), encodableString(value))
+	return p.safeSendMessage(key, value, true)
 }
 
 func (p *Producer) choosePartition(key Encoder) (int32, error) {
@@ -60,7 +50,7 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 	return partitions[partitioner.Partition(key, len(partitions))], nil
 }
 
-func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
+func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	partition, err := p.choosePartition(key)
 	if err != nil {
 		return err
@@ -106,7 +96,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
 	case k.NO_ERROR:
 		return nil
 	case k.LEADER_NOT_AVAILABLE:
-		if retries <= 0 {
+		if !retry {
 			return block.Err
 		}
 		// wait for leader election to finish
@@ -115,16 +105,16 @@ func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
 		if err != nil {
 			return err
 		}
-		return p.safeSendMessage(key, value, retries-1)
+		return p.safeSendMessage(key, value, false)
 	case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION:
-		if retries <= 0 {
+		if !retry {
 			return block.Err
 		}
 		err = p.client.cache.refreshTopic(p.topic)
 		if err != nil {
 			return err
 		}
-		return p.safeSendMessage(key, value, retries-1)
+		return p.safeSendMessage(key, value, false)
 	default:
 		return block.Err
 	}

+ 4 - 2
kafka/utils.go

@@ -19,9 +19,11 @@ func (slice int32Slice) Swap(i, j int) {
 // make strings encodable for convenience so they can be used as keys
 // and/or values in kafka messages
 
-type encodableString string
+// StringEncoder implements the Encoder interface for Go strings so that you can do things like
+//	producer.SendMessage(nil, kafka.StringEncoder("hello world"))
+type StringEncoder string
 
-func (s encodableString) Encode() ([]byte, error) {
+func (s StringEncoder) Encode() ([]byte, error) {
 	return []byte(s), nil
 }