Browse Source

Added support for FetchRequest protocol version 3.

This commit will add the 'MaxBytes' field to FetchRequests when the Kafka version is
0.10.1 or better. On send, it will be set to MaxResponseSize. No tests, yet.
Michael Herstine 8 years ago
parent
commit
4d1eceb890
2 changed files with 16 additions and 0 deletions
  1. 4 0
      consumer.go
  2. 12 0
      fetch_request.go

+ 4 - 0
consumer.go

@@ -726,6 +726,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
 		request.Version = 2
 		request.Version = 2
 	}
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
+		request.Version = 3
+		request.MaxBytes = MaxResponseSize - 47 // TODO(mherstin): WTF?!
+	}
 
 
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

+ 12 - 0
fetch_request.go

@@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
 	return nil
 	return nil
 }
 }
 
 
+// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
+// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that.  The KIP is at
+// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
 type FetchRequest struct {
 type FetchRequest struct {
 	MaxWaitTime int32
 	MaxWaitTime int32
 	MinBytes    int32
 	MinBytes    int32
+	MaxBytes    int32
 	Version     int16
 	Version     int16
 	blocks      map[string]map[int32]*fetchRequestBlock
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
 }
@@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(r.MaxWaitTime)
 	pe.putInt32(r.MaxWaitTime)
 	pe.putInt32(r.MinBytes)
 	pe.putInt32(r.MinBytes)
+	if 3 == r.Version {
+		pe.putInt32(r.MaxBytes)
+	}
 	err = pe.putArrayLength(len(r.blocks))
 	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
+	if r.Version == 3 {
+		if r.MaxBytes, err = pd.getInt32(); err != nil {
+			return err
+		}
+	}
 	topicCount, err := pd.getArrayLength()
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 	if err != nil {
 		return err
 		return err