فهرست منبع

Make brokers request/response-type aware.

Rather than calling Broker.Send and casting the response yourself, just call
Broker.Produce or Broker.RequestMetadata or...
Evan Huus 12 سال پیش
والد
کامیت
ac861788b4
5فایلهای تغییر یافته به همراه67 افزوده شده و 73 حذف شده
  1. 33 7
      broker.go
  2. 1 2
      metadata_cache.go
  3. 27 51
      produce_request.go
  4. 4 8
      producer.go
  5. 2 5
      request.go

+ 33 - 7
broker.go

@@ -70,15 +70,41 @@ func (b *Broker) Close() error {
 	return b.conn.Close()
 }
 
-func (b *Broker) Send(clientID *string, req RequestEncoder) (interface{}, error) {
+func (b *Broker) RequestMetadata(clientID *string, request *MetadataRequest) (*MetadataResponse, error) {
+	response := new(MetadataResponse)
+
+	err := b.sendAndReceive(clientID, request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceResponse, error) {
+	var response *ProduceResponse
+	if request.ResponseCondition != NO_RESPONSE {
+		response = new(ProduceResponse)
+	}
+
+	err := b.sendAndReceive(clientID, request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
 	fullRequest := request{b.correlation_id, clientID, req}
 	packet, err := encode(&fullRequest)
 	if err != nil {
-		return nil, err
+		return err
 	}
 
-	response := req.responseDecoder()
-	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, response != nil}
+	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, res != nil}
 
 	b.requests <- sendRequest
 	sendRequest.response.packets <- packet // we cheat to avoid poofing up more channels than necessary
@@ -86,15 +112,15 @@ func (b *Broker) Send(clientID *string, req RequestEncoder) (interface{}, error)
 
 	select {
 	case buf := <-sendRequest.response.packets:
-		err = decode(buf, response)
+		err = decode(buf, res)
 	case err = <-sendRequest.response.errors:
 	}
 
 	if err != nil {
-		return nil, err
+		return err
 	}
 
-	return response, nil
+	return nil
 }
 
 func (b *Broker) encode(pe packetEncoder) {

+ 1 - 2
metadata_cache.go

@@ -87,11 +87,10 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 		return OutOfBrokers{}
 	}
 
-	decoder, err := broker.Send(mc.client.id, &MetadataRequest{topics})
+	response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
 	if err != nil {
 		return err
 	}
-	response := decoder.(*MetadataResponse)
 
 	mc.lock.Lock()
 	defer mc.lock.Unlock()

+ 27 - 51
produce_request.go

@@ -1,71 +1,47 @@
 package kafka
 
-type produceRequestPartitionBlock struct {
-	partition int32
-	msgSet    *messageSet
-}
-
-func (p *produceRequestPartitionBlock) encode(pe packetEncoder) {
-	pe.putInt32(p.partition)
-	pe.pushLength32()
-	p.msgSet.encode(pe)
-	pe.pop()
-}
-
-type produceRequestTopicBlock struct {
-	topic      *string
-	partitions []produceRequestPartitionBlock
-}
-
-func (p *produceRequestTopicBlock) encode(pe packetEncoder) {
-	pe.putString(p.topic)
-	pe.putArrayCount(len(p.partitions))
-	for i := range p.partitions {
-		(&p.partitions[i]).encode(pe)
-	}
-}
-
 const (
 	NO_RESPONSE    int16 = 0
 	WAIT_FOR_LOCAL int16 = 1
 	WAIT_FOR_ALL   int16 = -1
 )
 
-type produceRequest struct {
-	requiredAcks int16
-	timeout      int32
-	topics       []produceRequestTopicBlock
-}
-
-func (p *produceRequest) encode(pe packetEncoder) {
-	pe.putInt16(p.requiredAcks)
-	pe.putInt32(p.timeout)
-	pe.putArrayCount(len(p.topics))
-	for i := range p.topics {
-		(&p.topics[i]).encode(pe)
+type ProduceRequest struct {
+	ResponseCondition int16
+	Timeout           int32
+	MsgSets           map[*string]map[int32]*messageSet
+}
+
+func (p *ProduceRequest) encode(pe packetEncoder) {
+	pe.putInt16(p.ResponseCondition)
+	pe.putInt32(p.Timeout)
+	pe.putArrayCount(len(p.MsgSets))
+	for topic, partitions := range p.MsgSets {
+		pe.putString(topic)
+		pe.putArrayCount(len(partitions))
+		for id, msgSet := range partitions {
+			pe.putInt32(id)
+			msgSet.encode(pe)
+		}
 	}
 }
 
-func (p *produceRequest) key() int16 {
+func (p *ProduceRequest) key() int16 {
 	return 0
 }
 
-func (p *produceRequest) version() int16 {
+func (p *ProduceRequest) version() int16 {
 	return 0
 }
 
-func (p *produceRequest) responseDecoder() decoder {
-	if p.requiredAcks == NO_RESPONSE {
-		return nil
+func (p *ProduceRequest) AddMessageSet(topic *string, partition int32, set *messageSet) {
+	if p.MsgSets == nil {
+		p.MsgSets = make(map[*string]map[int32]*messageSet)
+	}
+
+	if p.MsgSets[topic] == nil {
+		p.MsgSets[topic] = make(map[int32]*messageSet)
 	}
-	return new(ProduceResponse)
-}
 
-func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
-	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
-	req.topics[0].topic = &topic
-	req.topics[0].partitions = make([]produceRequestPartitionBlock, 1)
-	req.topics[0].partitions[0].partition = partition
-	req.topics[0].partitions[0].msgSet = set
-	return req
+	p.MsgSets[topic][partition] = set
 }

+ 4 - 8
producer.go

@@ -57,19 +57,15 @@ func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
 		return nil, err
 	}
 
-	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(&Message{Key: keyBytes, Value: valBytes}))
-	request.requiredAcks = p.responseCondition
-	request.timeout = p.responseTimeout
+	request := &ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
+	request.AddMessageSet(&p.topic, partition, newSingletonMessageSet(&Message{Key: keyBytes, Value: valBytes}))
 
-	decoder, err := broker.Send(p.client.id, request)
+	response, err := broker.Produce(p.client.id, request)
 	if err != nil {
 		return nil, err
 	}
-	if decoder != nil {
-		return decoder.(*ProduceResponse), nil
-	}
 
-	return nil, nil
+	return response, nil
 }
 
 func (p *Producer) SendSimpleMessage(in string) (*ProduceResponse, error) {

+ 2 - 5
request.go

@@ -1,18 +1,15 @@
 package kafka
 
-// An internal interface satisfied by all of the Request structures
-// (MetadataRequest, ProduceRequest, etc).
-type RequestEncoder interface {
+type requestEncoder interface {
 	encoder
 	key() int16
 	version() int16
-	responseDecoder() decoder
 }
 
 type request struct {
 	correlation_id int32
 	id             *string
-	body           RequestEncoder
+	body           requestEncoder
 }
 
 func (r *request) encode(pe packetEncoder) {