Forráskód Böngészése

Merge pull request #905 from sp1ff/master

Added support for FetchRequest protocol version 3.
Evan Huus 8 éve
szülő
commit
2fd980e23b
2 módosított fájl, 18 hozzáadás és 0 törlés
  1. 4 0
      consumer.go
  2. 14 0
      fetch_request.go

+ 4 - 0
consumer.go

@@ -728,6 +728,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
 		request.Version = 2
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
+		request.Version = 3
+		request.MaxBytes = MaxResponseSize
+	}
 
 	for child := range bc.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

+ 14 - 0
fetch_request.go

@@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
+// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that.  The KIP is at
+// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
 type FetchRequest struct {
 	MaxWaitTime int32
 	MinBytes    int32
+	MaxBytes    int32
 	Version     int16
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
@@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(r.MaxWaitTime)
 	pe.putInt32(r.MinBytes)
+	if r.Version == 3 {
+		pe.putInt32(r.MaxBytes)
+	}
 	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
@@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 		return err
 	}
+	if r.Version == 3 {
+		if r.MaxBytes, err = pd.getInt32(); err != nil {
+			return err
+		}
+	}
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -114,6 +126,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 		return V0_9_0_0
 	case 2:
 		return V0_10_0_0
+	case 3:
+		return V0_10_1_0
 	default:
 		return minVersion
 	}