Jelajahi Sumber

Infrastructure for not expecting a response.

Use it when sending a produceRequest with RequiredAcks==0
Evan Huus 12 tahun lalu
induk
melakukan
863b58681b
3 mengubah file dengan 36 tambahan dan 20 penghapusan
  1. 20 9
      broker.go
  2. 15 9
      broker_manager.go
  3. 1 2
      producer.go

+ 20 - 9
broker.go

@@ -16,7 +16,7 @@ type broker struct {
 	conn net.Conn
 	addr net.TCPAddr
 
-	requests  chan responsePromise
+	requests  chan requestToSend
 	responses chan responsePromise
 }
 
@@ -26,6 +26,12 @@ type responsePromise struct {
 	errors         chan error
 }
 
+type requestToSend struct {
+	// we cheat and use the responsePromise channels to avoid creating more than necessary
+	response       responsePromise
+	expectResponse bool
+}
+
 func newBroker(host string, port int32) (b *broker, err error) {
 	b = new(broker)
 	b.id = -1 // don't know it yet
@@ -53,7 +59,7 @@ func (b *broker) connect() (err error) {
 		return err
 	}
 
-	b.requests = make(chan responsePromise)
+	b.requests = make(chan requestToSend)
 	b.responses = make(chan responsePromise)
 
 	go b.sendRequestLoop()
@@ -108,13 +114,18 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 
 func (b *broker) sendRequestLoop() {
 	for request := range b.requests {
-		buf := <-request.packets
+		buf := <-request.response.packets
 		_, err := b.conn.Write(buf)
 		if err != nil {
-			b.forceDisconnect(&request, err)
+			b.forceDisconnect(&request.response, err)
 			return
 		}
-		b.responses <- request
+		if request.expectResponse {
+			b.responses <- request.response
+		} else {
+			close(request.response.packets)
+			close(request.response.errors)
+		}
 	}
 }
 
@@ -153,7 +164,7 @@ func (b *broker) rcvResponseLoop() {
 	}
 }
 
-func (b *broker) sendRequest(clientID *string, body encoder) (*responsePromise, error) {
+func (b *broker) sendRequest(clientID *string, body encoder, expectResponse bool) (*responsePromise, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 	var api API
@@ -178,10 +189,10 @@ func (b *broker) sendRequest(clientID *string, body encoder) (*responsePromise,
 	realEnc.putInt32(int32(prepEnc.length))
 	req.encode(&realEnc)
 
-	request := responsePromise{b.correlation_id, make(chan []byte), make(chan error)}
+	request := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, expectResponse}
 
 	b.requests <- request
-	request.packets <- realEnc.raw
+	request.response.packets <- realEnc.raw // we cheat to avoid poofing up more channels than necessary
 	b.correlation_id++
-	return &request, nil
+	return &request.response, nil
 }

+ 15 - 9
broker_manager.go

@@ -110,7 +110,7 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req enco
 		return err
 	}
 
-	responseChan, err := b.sendRequest(bm.client.id, req)
+	responseChan, err := b.sendRequest(bm.client.id, req, res != nil)
 	if err != nil {
 		// errors that would make us refresh the broker metadata don't get returned here,
 		// they'd come through responseChan.errors, so it's safe to just return here
@@ -119,8 +119,10 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req enco
 
 	select {
 	case buf := <-responseChan.packets:
-		decoder := realDecoder{raw: buf}
-		err = res.decode(&decoder)
+		if res != nil {
+			decoder := realDecoder{raw: buf}
+			err = res.decode(&decoder)
+		}
 	case err = <-responseChan.errors:
 	}
 
@@ -142,15 +144,17 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req enco
 		return err
 	}
 
-	responseChan, err = b.sendRequest(bm.client.id, req)
+	responseChan, err = b.sendRequest(bm.client.id, req, res != nil)
 	if err != nil {
 		return err
 	}
 
 	select {
 	case buf := <-responseChan.packets:
-		decoder := realDecoder{raw: buf}
-		err = res.decode(&decoder)
+		if res != nil {
+			decoder := realDecoder{raw: buf}
+			err = res.decode(&decoder)
+		}
 	case err = <-responseChan.errors:
 	}
 
@@ -174,15 +178,17 @@ func (bm *brokerManager) getDefault() *broker {
 
 func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
-		responseChan, err := b.sendRequest(bm.client.id, req)
+		responseChan, err := b.sendRequest(bm.client.id, req, res != nil)
 		if err != nil {
 			return err
 		}
 
 		select {
 		case buf := <-responseChan.packets:
-			decoder := realDecoder{raw: buf}
-			err = res.decode(&decoder)
+			if res != nil {
+				decoder := realDecoder{raw: buf}
+				err = res.decode(&decoder)
+			}
 			return err
 		case <-responseChan.errors:
 			bm.defaultBroker = nil

+ 1 - 2
producer.go

@@ -16,7 +16,6 @@ func (p *Producer) SendSimpleMessage(in string) error {
 	}
 
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(newMessageFromString(in)))
-	response := new(produceResponse)
 
-	return p.client.brokers.sendToPartition(p.topic, partition, request, response)
+	return p.client.brokers.sendToPartition(p.topic, partition, request, nil)
 }