Sfoglia il codice sorgente

Allow sending arbitrary messages from producer

And a surprising amount of refactor needed to support that.
Evan Huus 12 anni fa
parent
commit
b6d96975b7
9 ha cambiato i file con 91 aggiunte e 40 eliminazioni
  1. 7 15
      broker.go
  2. 4 4
      broker_manager.go
  3. 14 3
      message.go
  4. 20 0
      packet_encoder.go
  5. 4 4
      partitionChooser.go
  6. 4 0
      prep_encoder.go
  7. 30 12
      producer.go
  8. 6 2
      real_encoder.go
  9. 2 0
      request.go

+ 7 - 15
broker.go

@@ -165,26 +165,18 @@ func (b *broker) rcvResponseLoop() {
 }
 
 func (b *broker) sendRequest(clientID *string, body requestEncoder) (*responsePromise, error) {
-	var prepEnc prepEncoder
-	var realEnc realEncoder
-
 	req := request{b.correlation_id, clientID, body}
-
-	req.encode(&prepEnc)
-	if prepEnc.err != nil {
-		return nil, prepEnc.err
+	packet, err := buildBytes(&req)
+	if err != nil {
+		return nil, err
 	}
 
-	realEnc.raw = make([]byte, prepEnc.length+4)
-	realEnc.putInt32(int32(prepEnc.length))
-	req.encode(&realEnc)
-
-	request := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, body.expectResponse()}
+	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, body.expectResponse()}
 
-	b.requests <- request
-	request.response.packets <- realEnc.raw // we cheat to avoid poofing up more channels than necessary
+	b.requests <- sendRequest
+	sendRequest.response.packets <- *packet // we cheat to avoid poofing up more channels than necessary
 	b.correlation_id++
-	return &request.response, nil
+	return &sendRequest.response, nil
 }
 
 // returns true if there was a response, even if there was an error decoding it (in

+ 4 - 4
broker_manager.go

@@ -78,20 +78,20 @@ func (bm *brokerManager) getValidLeader(topic string, partition_id int32) (*brok
 	return leader, nil
 }
 
-func (bm *brokerManager) choosePartition(topic string, p partitionChooser) (int32, error) {
+func (bm *brokerManager) partitionsForTopic(topic string) ([]int32, error) {
 	bm.lock.RLock()
 	id_map := bm.partitions[topic]
 	if id_map == nil {
 		bm.lock.RUnlock()
 		err := bm.refreshTopic(topic)
 		if err != nil {
-			return -1, err
+			return nil, err
 		}
 		bm.lock.RLock()
 		id_map = bm.partitions[topic]
 		if id_map == nil {
 			bm.lock.RUnlock()
-			return -1, UNKNOWN_TOPIC_OR_PARTITION
+			return nil, UNKNOWN_TOPIC_OR_PARTITION
 		}
 	}
 	partitions := make([]int32, len(id_map))
@@ -101,7 +101,7 @@ func (bm *brokerManager) choosePartition(topic string, p partitionChooser) (int3
 		i++
 	}
 	bm.lock.RUnlock()
-	return p.choosePartition(partitions), nil
+	return partitions, nil
 }
 
 func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) (bool, error) {

+ 14 - 3
message.go

@@ -116,7 +116,18 @@ func (m *message) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func newMessageFromString(in string) *message {
-	buf := make([]byte, len(in))
-	return &message{value: &buf}
+func newMessage(key, value encoder) (msg *message, err error) {
+	msg = new(message)
+
+	msg.key, err = buildBytes(key)
+	if err != nil {
+		return nil, err
+	}
+
+	msg.value, err = buildBytes(value)
+	if err != nil {
+		return nil, err
+	}
+
+	return msg, nil
 }

+ 20 - 0
packet_encoder.go

@@ -15,6 +15,7 @@ type packetEncoder interface {
 	putError(in KError)
 	putString(in *string)
 	putBytes(in *[]byte)
+	putRaw(in []byte)
 
 	// stackable
 	push(in pushEncoder)
@@ -28,3 +29,22 @@ type pushEncoder interface {
 	reserveLength() int
 	run(curOffset int, buf []byte)
 }
+
+func buildBytes(in encoder) (*[]byte, error) {
+	if in == nil {
+		return nil, nil
+	}
+
+	var prepEnc prepEncoder
+	var realEnc realEncoder
+
+	in.encode(&prepEnc)
+	if prepEnc.err != nil {
+		return nil, prepEnc.err
+	}
+
+	realEnc.raw = make([]byte, prepEnc.length)
+	in.encode(&realEnc)
+
+	return &(realEnc.raw), nil
+}

+ 4 - 4
partitionChooser.go

@@ -3,12 +3,12 @@ package kafka
 import "math/rand"
 
 type partitionChooser interface {
-	choosePartition(options []int32) int32
+	choosePartition(key encoder, partitions []int32) int32
 }
 
-type randomPartitioner struct {
+type RandomPartitioner struct {
 }
 
-func (p randomPartitioner) choosePartition(options []int32) int32 {
-	return options[rand.Intn(len(options))]
+func (p RandomPartitioner) choosePartition(key encoder, partitions []int32) int32 {
+	return partitions[rand.Intn(len(partitions))]
 }

+ 4 - 0
prep_encoder.go

@@ -66,6 +66,10 @@ func (pe *prepEncoder) putBytes(in *[]byte) {
 	}
 }
 
+func (pe *prepEncoder) putRaw(in []byte) {
+	pe.length += len(in)
+}
+
 // stackable
 
 func (pe *prepEncoder) push(in pushEncoder) {

+ 30 - 12
producer.go

@@ -1,28 +1,46 @@
 package kafka
 
 type Producer struct {
-	client *Client
-	topic  string
+	client      *Client
+	topic       string
+	partitioner partitionChooser
 }
 
-func NewProducer(client *Client, topic string) *Producer {
-	return &Producer{client, topic}
+func NewSimpleProducer(client *Client, topic string) *Producer {
+	return &Producer{client, topic, RandomPartitioner{}}
 }
 
-func (p *Producer) SendSimpleMessage(in string) error {
-	partition, err := p.client.brokers.choosePartition(p.topic, randomPartitioner{})
+func (p *Producer) SendMessage(key, value encoder) error {
+	partitions, err := p.client.brokers.partitionsForTopic(p.topic)
 	if err != nil {
 		return err
 	}
 
-	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(newMessageFromString(in)))
-	request.requiredAcks = WAIT_FOR_LOCAL
-
-	response := produceResponse{}
+	partitioner := p.partitioner
+	if key == nil {
+		partitioner = RandomPartitioner{}
+	}
+	partition := partitioner.choosePartition(nil, partitions)
 
-	_, err = p.client.brokers.sendToPartition(p.topic, partition, request, &response)
+	msg, err := newMessage(key, value)
 	if err != nil {
 		return err
 	}
-	return nil
+
+	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(msg))
+	request.requiredAcks = WAIT_FOR_LOCAL
+
+	_, err = p.client.brokers.sendToPartition(p.topic, partition, request, &produceResponse{})
+
+	return err
+}
+
+type encodableString string
+
+func (s encodableString) encode(pe packetEncoder) {
+	pe.putRaw([]byte(s))
+}
+
+func (p *Producer) SendSimpleMessage(in string) error {
+	return p.SendMessage(nil, encodableString(in))
 }

+ 6 - 2
real_encoder.go

@@ -65,8 +65,12 @@ func (re *realEncoder) putBytes(in *[]byte) {
 		return
 	}
 	re.putInt32(int32(len(*in)))
-	copy(re.raw[re.off:], *in)
-	re.off += len(*in)
+	re.putRaw(*in)
+}
+
+func (re *realEncoder) putRaw(in []byte) {
+	copy(re.raw[re.off:], in)
+	re.off += len(in)
 }
 
 // stackable

+ 2 - 0
request.go

@@ -18,9 +18,11 @@ type request struct {
 }
 
 func (r *request) encode(pe packetEncoder) {
+	pe.pushLength32()
 	pe.putInt16(r.body.key())
 	pe.putInt16(r.body.version())
 	pe.putInt32(r.correlation_id)
 	pe.putString(r.id)
 	r.body.encode(pe)
+	pe.pop()
 }