Browse Source

Publish the Message type. Separate encoders.

We were requiring that anything being sent as the body of a message was not just
encodable to bytes, but was encodable using kafka's encoding rules, which is not
really useful. Create a much simpler (and public) interface for things that can
turn into bytes somehow and use that.
Evan Huus 12 years ago
parent
commit
2fa436eb05
6 changed files with 59 additions and 54 deletions
  1. 12 5
      encoder_decoder.go
  2. 19 35
      message.go
  3. 2 2
      message_set.go
  4. 2 2
      partition_chooser.go
  5. 13 10
      producer.go
  6. 11 0
      utils.go

+ 12 - 5
encoder_decoder.go

@@ -1,5 +1,15 @@
 package kafka
 
+// Public Encoding
+
+// A simple interface for any type that can be encoded as an array of bytes
+// in order to be sent as the key or value of a Kafka message.
+type Encoder interface {
+	Encode() ([]byte, error)
+}
+
+// Kafka Encoding
+
 type encoder interface {
 	encode(pe packetEncoder)
 }
@@ -23,6 +33,8 @@ func encode(in encoder) ([]byte, error) {
 	return realEnc.raw, nil
 }
 
+// Kafka Decoding
+
 type decoder interface {
 	decode(pd packetDecoder) error
 }
@@ -35,8 +47,3 @@ func decode(buf []byte, in decoder) error {
 	helper := realDecoder{raw: buf}
 	return in.decode(&helper)
 }
-
-type encoderDecoder interface {
-	encoder
-	decoder
-}

+ 19 - 35
message.go

@@ -18,32 +18,32 @@ const (
 // binary format." but it doesn't say what the current value is, so presumably 0...
 const message_format int8 = 0
 
-type message struct {
-	codec compressionCodec
-	key   []byte
-	value []byte
+type Message struct {
+	Codec compressionCodec // how  to compress the contents of the message
+	Key   []byte           // the message key, may be nil
+	Value []byte           // the message contents
 }
 
-func (m *message) encode(pe packetEncoder) {
+func (m *Message) encode(pe packetEncoder) {
 	pe.pushCRC32()
 
 	pe.putInt8(message_format)
 
 	var attributes int8 = 0
-	attributes |= int8(m.codec & 0x07)
+	attributes |= int8(m.Codec & 0x07)
 	pe.putInt8(attributes)
 
-	pe.putBytes(m.key)
+	pe.putBytes(m.Key)
 
 	var body []byte
-	switch m.codec {
+	switch m.Codec {
 	case COMPRESSION_NONE:
-		body = m.value
+		body = m.Value
 	case COMPRESSION_GZIP:
-		if m.value != nil {
+		if m.Value != nil {
 			var buf bytes.Buffer
 			writer := gzip.NewWriter(&buf)
-			writer.Write(m.value)
+			writer.Write(m.Value)
 			writer.Close()
 			body = buf.Bytes()
 		}
@@ -55,7 +55,7 @@ func (m *message) encode(pe packetEncoder) {
 	pe.pop()
 }
 
-func (m *message) decode(pd packetDecoder) (err error) {
+func (m *Message) decode(pd packetDecoder) (err error) {
 	err = pd.pushCRC32()
 	if err != nil {
 		return err
@@ -73,30 +73,30 @@ func (m *message) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	m.codec = compressionCodec(attribute & 0x07)
+	m.Codec = compressionCodec(attribute & 0x07)
 
-	m.key, err = pd.getBytes()
+	m.Key, err = pd.getBytes()
 	if err != nil {
 		return err
 	}
 
-	m.value, err = pd.getBytes()
+	m.Value, err = pd.getBytes()
 	if err != nil {
 		return err
 	}
 
-	switch m.codec {
+	switch m.Codec {
 	case COMPRESSION_NONE:
 		// nothing to do
 	case COMPRESSION_GZIP:
-		if m.value == nil {
+		if m.Value == nil {
 			return DecodingError("Nil contents cannot be compressed.")
 		}
-		reader, err := gzip.NewReader(bytes.NewReader(m.value))
+		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
 			return err
 		}
-		m.value, err = ioutil.ReadAll(reader)
+		m.Value, err = ioutil.ReadAll(reader)
 		if err != nil {
 			return err
 		}
@@ -113,19 +113,3 @@ func (m *message) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
-
-func newMessage(key, value encoder) (msg *message, err error) {
-	msg = new(message)
-
-	msg.key, err = encode(key)
-	if err != nil {
-		return nil, err
-	}
-
-	msg.value, err = encode(value)
-	if err != nil {
-		return nil, err
-	}
-
-	return msg, nil
-}

+ 2 - 2
message_set.go

@@ -2,7 +2,7 @@ package kafka
 
 type messageSetBlock struct {
 	offset int64
-	msg    message
+	msg    Message
 }
 
 func (msb *messageSetBlock) encode(pe packetEncoder) {
@@ -61,7 +61,7 @@ func (ms *messageSet) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func newSingletonMessageSet(msg *message) *messageSet {
+func newSingletonMessageSet(msg *Message) *messageSet {
 	tmp := make([]*messageSetBlock, 1)
 	tmp[0] = &messageSetBlock{msg: *msg}
 	return &messageSet{tmp}

+ 2 - 2
partition_chooser.go

@@ -3,12 +3,12 @@ package kafka
 import "math/rand"
 
 type PartitionChooser interface {
-	ChoosePartition(key encoder, partitions int) int
+	ChoosePartition(key Encoder, partitions int) int
 }
 
 type RandomPartitioner struct {
 }
 
-func (p RandomPartitioner) ChoosePartition(key encoder, partitions int) int {
+func (p RandomPartitioner) ChoosePartition(key Encoder, partitions int) int {
 	return rand.Intn(partitions)
 }

+ 13 - 10
producer.go

@@ -16,7 +16,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
 }
 
-func (p *Producer) choosePartition(key encoder) (int32, error) {
+func (p *Producer) choosePartition(key Encoder) (int32, error) {
 	partitions, err := p.client.partitions(p.topic)
 	if err != nil {
 		return -1, err
@@ -32,13 +32,22 @@ func (p *Producer) choosePartition(key encoder) (int32, error) {
 	return partitions[partitioner.ChoosePartition(key, len(partitions))], nil
 }
 
-func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
+func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
 	partition, err := p.choosePartition(key)
 	if err != nil {
 		return nil, err
 	}
 
-	msg, err := newMessage(key, value)
+	var keyBytes []byte
+	var valBytes []byte
+
+	if key != nil {
+		keyBytes, err = key.Encode()
+		if err != nil {
+			return nil, err
+		}
+	}
+	valBytes, err = value.Encode()
 	if err != nil {
 		return nil, err
 	}
@@ -48,7 +57,7 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 		return nil, err
 	}
 
-	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(msg))
+	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(&Message{Key: keyBytes, Value: valBytes}))
 	request.requiredAcks = p.responseCondition
 	request.timeout = p.responseTimeout
 
@@ -63,12 +72,6 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 	return nil, nil
 }
 
-type encodableString string
-
-func (s encodableString) encode(pe packetEncoder) {
-	pe.putRaw([]byte(s))
-}
-
 func (p *Producer) SendSimpleMessage(in string) (*ProduceResponse, error) {
 	return p.SendMessage(nil, encodableString(in))
 }

+ 11 - 0
utils.go

@@ -1,5 +1,7 @@
 package kafka
 
+// make []int32 sortable so we can sort partition numbers
+
 type int32Slice []int32
 
 func (slice int32Slice) Len() int {
@@ -13,3 +15,12 @@ func (slice int32Slice) Less(i, j int) bool {
 func (slice int32Slice) Swap(i, j int) {
 	slice[i], slice[j] = slice[j], slice[i]
 }
+
+// make strings encodable for convenience so they can be used as keys
+// and/or values in kafka messages
+
+type encodableString string
+
+func (s encodableString) Encode() ([]byte, error) {
+	return []byte(s), nil
+}