Browse Source

Requests produce decoders for their response now

gives a bit nicer api for broker.SendAndReceive
Evan Huus 11 years ago
parent
commit
f77808b6db
6 changed files with 24 additions and 18 deletions
  1. 7 6
      broker.go
  2. 2 2
      metadata_cache.go
  3. 2 2
      metadata_request.go
  4. 5 2
      produce_request.go
  5. 7 5
      producer.go
  6. 1 1
      request.go

+ 7 - 6
broker.go

@@ -164,14 +164,15 @@ func (b *broker) rcvResponseLoop() {
 	}
 }
 
-func (b *broker) SendAndReceive(clientID *string, req requestEncoder, res decoder) error {
+func (b *broker) SendAndReceive(clientID *string, req requestEncoder) (decoder, error) {
 	fullRequest := request{b.correlation_id, clientID, req}
 	packet, err := buildBytes(&fullRequest)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, req.expectResponse()}
+	response := req.responseDecoder()
+	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, response != nil}
 
 	b.requests <- sendRequest
 	sendRequest.response.packets <- *packet // we cheat to avoid poofing up more channels than necessary
@@ -182,11 +183,11 @@ func (b *broker) SendAndReceive(clientID *string, req requestEncoder, res decode
 		// Only try to decode if we got a response.
 		if buf != nil {
 			decoder := realDecoder{raw: buf}
-			err = res.decode(&decoder)
-			return err
+			err = response.decode(&decoder)
+			return response, err
 		}
 	case err = <-sendRequest.response.errors:
 	}
 
-	return err
+	return nil, err
 }

+ 2 - 2
metadata_cache.go

@@ -87,11 +87,11 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 		return OutOfBrokers{}
 	}
 
-	response := new(metadataResponse)
-	err := broker.SendAndReceive(mc.client.id, &metadataRequest{topics}, response)
+	decoder, err := broker.SendAndReceive(mc.client.id, &metadataRequest{topics})
 	if err != nil {
 		return err
 	}
+	response := decoder.(*metadataResponse)
 
 	mc.lock.Lock()
 	defer mc.lock.Unlock()

+ 2 - 2
metadata_request.go

@@ -36,6 +36,6 @@ func (mr *metadataRequest) version() int16 {
 	return 0
 }
 
-func (mr *metadataRequest) expectResponse() bool {
-	return true
+func (mr *metadataRequest) responseDecoder() decoder {
+	return new(metadataResponse)
 }

+ 5 - 2
produce_request.go

@@ -54,8 +54,11 @@ func (p *produceRequest) version() int16 {
 	return 0
 }
 
-func (p *produceRequest) expectResponse() bool {
-	return p.requiredAcks != NO_RESPONSE
+func (p *produceRequest) responseDecoder() decoder {
+	if p.requiredAcks == NO_RESPONSE {
+		return nil
+	}
+	return new(ProduceResponse)
 }
 
 func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {

+ 7 - 5
producer.go

@@ -52,13 +52,15 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 	request.requiredAcks = p.responseCondition
 	request.timeout = p.responseTimeout
 
-	var response *ProduceResponse
-	if request.expectResponse() {
-		response = new(ProduceResponse)
+	decoder, err := broker.SendAndReceive(p.id, request)
+	if err != nil {
+		return nil, err
+	}
+	if decoder != nil {
+		return decoder.(*ProduceResponse), nil
 	}
-	err = broker.SendAndReceive(p.id, request, response)
 
-	return response, err
+	return nil, nil
 }
 
 type encodableString string

+ 1 - 1
request.go

@@ -4,7 +4,7 @@ type requestEncoder interface {
 	encoder
 	key() int16
 	version() int16
-	expectResponse() bool
+	responseDecoder() decoder
 }
 
 type request struct {