瀏覽代碼

Plumb through v0.10 support for consumer/fetch

Evan Huus 9 年之前
父節點
當前提交
10d06c2fe7
共有 5 個文件被更改,包括 73 次插入21 次删除
  1. 5 0
      consumer.go
  2. 11 2
      fetch_request.go
  3. 28 3
      fetch_response.go
  4. 21 13
      message.go
  5. 8 3
      produce_response.go

+ 5 - 0
consumer.go

@@ -14,6 +14,7 @@ type ConsumerMessage struct {
 	Topic      string
 	Partition  int32
 	Offset     int64
+	Timestamp  time.Time // only set if kafka is version 0.10+
 }
 
 // ConsumerError is what is provided to the user when an error occurs.
@@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 					Key:       msg.Msg.Key,
 					Value:     msg.Msg.Value,
 					Offset:    msg.Offset,
+					Timestamp: msg.Msg.Timestamp,
 				})
 				child.offset = msg.Offset + 1
 			} else {
@@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
 		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
+		request.Version = 2
+	}
 
 	for child := range bc.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

+ 11 - 2
fetch_request.go

@@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
 type FetchRequest struct {
 	MaxWaitTime int32
 	MinBytes    int32
+	Version     int16
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
 
@@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
 }
 
 func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
+	f.Version = version
 	if _, err = pd.getInt32(); err != nil {
 		return err
 	}
@@ -103,11 +105,18 @@ func (f *FetchRequest) key() int16 {
 }
 
 func (f *FetchRequest) version() int16 {
-	return 0
+	return f.Version
 }
 
 func (r *FetchRequest) requiredVersion() KafkaVersion {
-	return minVersion
+	switch r.Version {
+	case 1:
+		return V0_9_0_0
+	case 2:
+		return V0_10_0_0
+	default:
+		return minVersion
+	}
 }
 
 func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {

+ 28 - 3
fetch_response.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "time"
+
 type FetchResponseBlock struct {
 	Err                 KError
 	HighWaterMarkOffset int64
@@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 }
 
 type FetchResponse struct {
-	Blocks map[string]map[int32]*FetchResponseBlock
+	Blocks       map[string]map[int32]*FetchResponseBlock
+	ThrottleTime time.Duration
+	Version      int16 // v1 requires 0.9+, v2 requires 0.10+
 }
 
 func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
@@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 
 func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
+	fr.Version = version
+
+	if fr.Version >= 1 {
+		throttle, err := pd.getInt32()
+		if err != nil {
+			return err
+		}
+		fr.ThrottleTime = time.Duration(throttle) * time.Millisecond
+	}
+
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 }
 
 func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
+	if fr.Version >= 1 {
+		pe.putInt32(int32(fr.ThrottleTime / time.Millisecond))
+	}
+
 	err = pe.putArrayLength(len(fr.Blocks))
 	if err != nil {
 		return err
@@ -121,11 +139,18 @@ func (r *FetchResponse) key() int16 {
 }
 
 func (r *FetchResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *FetchResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	switch r.Version {
+	case 1:
+		return V0_9_0_0
+	case 2:
+		return V0_10_0_0
+	default:
+		return minVersion
+	}
 }
 
 func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {

+ 21 - 13
message.go

@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"fmt"
 	"io/ioutil"
+	"time"
 
 	"github.com/eapache/go-xerial-snappy"
 )
@@ -21,15 +22,13 @@ const (
 	CompressionSnappy CompressionCodec = 2
 )
 
-// The spec just says: "This is a version id used to allow backwards compatible evolution of the message
-// binary format." but it doesn't say what the current value is, so presumably 0...
-const messageFormat int8 = 0
-
 type Message struct {
-	Codec CompressionCodec // codec used to compress the message contents
-	Key   []byte           // the message key, may be nil
-	Value []byte           // the message contents
-	Set   *MessageSet      // the message set a message might wrap
+	Codec     CompressionCodec // codec used to compress the message contents
+	Key       []byte           // the message key, may be nil
+	Value     []byte           // the message contents
+	Set       *MessageSet      // the message set a message might wrap
+	Version   int8             // v1 requires Kafka 0.10
+	Timestamp time.Time        // the timestamp of the message (version 1+ only)
 
 	compressedCache []byte
 }
@@ -37,11 +36,15 @@ type Message struct {
 func (m *Message) encode(pe packetEncoder) error {
 	pe.push(&crc32Field{})
 
-	pe.putInt8(messageFormat)
+	pe.putInt8(m.Version)
 
 	attributes := int8(m.Codec) & compressionCodecMask
 	pe.putInt8(attributes)
 
+	if m.Version >= 1 {
+		pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond))
+	}
+
 	err := pe.putBytes(m.Key)
 	if err != nil {
 		return err
@@ -89,13 +92,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	format, err := pd.getInt8()
+	m.Version, err = pd.getInt8()
 	if err != nil {
 		return err
 	}
-	if format != messageFormat {
-		return PacketDecodingError{"unexpected messageFormat"}
-	}
 
 	attribute, err := pd.getInt8()
 	if err != nil {
@@ -103,6 +103,14 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	}
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 
+	if m.Version >= 1 {
+		millis, err := pd.getInt64()
+		if err != nil {
+			return err
+		}
+		m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
+	}
+
 	m.Key, err = pd.getBytes()
 	if err != nil {
 		return err

+ 8 - 3
produce_response.go

@@ -24,7 +24,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
 		if millis, err := pd.getInt64(); err != nil {
 			return err
 		} else {
-			pr.Timestamp = time.Unix(millis/1000, millis%1000)
+			pr.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
 		}
 	}
 
@@ -34,7 +34,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
 type ProduceResponse struct {
 	Blocks       map[string]map[int32]*ProduceResponseBlock
 	Version      int16
-	ThrottleTime int32 // only provided if Version >= 1
+	ThrottleTime time.Duration // only provided if Version >= 1
 }
 
 func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -75,8 +75,10 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 	}
 
 	if pr.Version >= 1 {
-		if pr.ThrottleTime, err = pd.getInt32(); err != nil {
+		if millis, err := pd.getInt32(); err != nil {
 			return err
+		} else {
+			pr.ThrottleTime = time.Duration(millis) * time.Millisecond
 		}
 	}
 
@@ -103,6 +105,9 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error {
 			pe.putInt64(prb.Offset)
 		}
 	}
+	if pr.Version >= 1 {
+		pe.putInt32(int32(pr.ThrottleTime / time.Millisecond))
+	}
 	return nil
 }