Browse Source

Merge pull request #974 from wladh/consumer

Add consumer support for Kafka 0.11 messages
Evan Huus 8 years ago
parent
commit
606f9ba821
13 changed files with 566 additions and 175 deletions
  1. 101 38
      consumer.go
  2. 98 66
      consumer_test.go
  3. 22 2
      fetch_request.go
  4. 14 0
      fetch_request_test.go
  5. 124 12
      fetch_response.go
  6. 94 3
      fetch_response_test.go
  7. 3 19
      message.go
  8. 7 1
      mockresponses.go
  9. 7 3
      record.go
  10. 14 6
      record_batch.go
  11. 41 25
      record_test.go
  12. 40 0
      timestamp.go
  13. 1 0
      utils.go

+ 101 - 38
consumer.go

@@ -14,8 +14,9 @@ type ConsumerMessage struct {
 	Topic          string
 	Topic          string
 	Partition      int32
 	Partition      int32
 	Offset         int64
 	Offset         int64
-	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
-	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
+	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
+	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
+	Headers        []*RecordHeader // only set if kafka is version 0.11+
 }
 }
 
 
 // ConsumerError is what is provided to the user when an error occurs.
 // ConsumerError is what is provided to the user when an error occurs.
@@ -478,44 +479,12 @@ feederLoop:
 	close(child.errors)
 	close(child.errors)
 }
 }
 
 
-func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
-	block := response.GetBlock(child.topic, child.partition)
-	if block == nil {
-		return nil, ErrIncompleteResponse
-	}
-
-	if block.Err != ErrNoError {
-		return nil, block.Err
-	}
-
-	if len(block.MsgSet.Messages) == 0 {
-		// We got no messages. If we got a trailing one then we need to ask for more data.
-		// Otherwise we just poll again and wait for one to be produced...
-		if block.MsgSet.PartialTrailingMessage {
-			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
-				// we can't ask for more data, we've hit the configured limit
-				child.sendError(ErrMessageTooLarge)
-				child.offset++ // skip this one so we can keep processing future messages
-			} else {
-				child.fetchSize *= 2
-				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
-					child.fetchSize = child.conf.Consumer.Fetch.Max
-				}
-			}
-		}
-
-		return nil, nil
-	}
-
-	// we got messages, reset our fetch size in case it was increased for a previous request
-	child.fetchSize = child.conf.Consumer.Fetch.Default
-	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
-
-	incomplete := false
-	prelude := true
+func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
 	var messages []*ConsumerMessage
-	for _, msgBlock := range block.MsgSet.Messages {
+	var incomplete bool
+	prelude := true
 
 
+	for _, msgBlock := range msgSet.Messages {
 		for _, msg := range msgBlock.Messages() {
 		for _, msg := range msgBlock.Messages() {
 			offset := msg.Offset
 			offset := msg.Offset
 			if msg.Msg.Version >= 1 {
 			if msg.Msg.Version >= 1 {
@@ -542,7 +511,46 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 				incomplete = true
 				incomplete = true
 			}
 			}
 		}
 		}
+	}
 
 
+	if incomplete || len(messages) == 0 {
+		return nil, ErrIncompleteResponse
+	}
+	return messages, nil
+}
+
+func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*ConsumerMessage, error) {
+	var messages []*ConsumerMessage
+	var incomplete bool
+	prelude := true
+	batch := block.Records.recordBatch
+
+	for _, rec := range batch.Records {
+		offset := batch.FirstOffset + rec.OffsetDelta
+		if prelude && offset < child.offset {
+			continue
+		}
+		prelude = false
+
+		if offset >= child.offset {
+			messages = append(messages, &ConsumerMessage{
+				Topic:     child.topic,
+				Partition: child.partition,
+				Key:       rec.Key,
+				Value:     rec.Value,
+				Offset:    offset,
+				Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
+				Headers:   rec.Headers,
+			})
+			child.offset = offset + 1
+		} else {
+			incomplete = true
+		}
+
+		if child.offset > block.LastStableOffset {
+			// We reached the end of closed transactions
+			break
+		}
 	}
 	}
 
 
 	if incomplete || len(messages) == 0 {
 	if incomplete || len(messages) == 0 {
@@ -551,6 +559,57 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	return messages, nil
 	return messages, nil
 }
 }
 
 
+func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+	block := response.GetBlock(child.topic, child.partition)
+	if block == nil {
+		return nil, ErrIncompleteResponse
+	}
+
+	if block.Err != ErrNoError {
+		return nil, block.Err
+	}
+
+	nRecs, err := block.Records.numRecords()
+	if err != nil {
+		return nil, err
+	}
+	if nRecs == 0 {
+		partialTrailingMessage, err := block.Records.isPartial()
+		if err != nil {
+			return nil, err
+		}
+		// We got no messages. If we got a trailing one then we need to ask for more data.
+		// Otherwise we just poll again and wait for one to be produced...
+		if partialTrailingMessage {
+			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
+				// we can't ask for more data, we've hit the configured limit
+				child.sendError(ErrMessageTooLarge)
+				child.offset++ // skip this one so we can keep processing future messages
+			} else {
+				child.fetchSize *= 2
+				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
+					child.fetchSize = child.conf.Consumer.Fetch.Max
+				}
+			}
+		}
+
+		return nil, nil
+	}
+
+	// we got messages, reset our fetch size in case it was increased for a previous request
+	child.fetchSize = child.conf.Consumer.Fetch.Default
+	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
+
+	if control, err := block.Records.isControl(); err != nil || control {
+		return nil, err
+	}
+
+	if response.Version < 4 {
+		return child.parseMessages(block.Records.msgSet)
+	}
+	return child.parseRecords(block)
+}
+
 // brokerConsumer
 // brokerConsumer
 
 
 type brokerConsumer struct {
 type brokerConsumer struct {
@@ -740,6 +799,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		request.Version = 3
 		request.Version = 3
 		request.MaxBytes = MaxResponseSize
 		request.MaxBytes = MaxResponseSize
 	}
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 4
+		request.Isolation = ReadUncommitted // We don't support yet transactions.
+	}
 
 
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

+ 98 - 66
consumer_test.go

@@ -379,86 +379,118 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
 // requested, then such messages are ignored.
 // requested, then such messages are ignored.
 func TestConsumerExtraOffsets(t *testing.T) {
 func TestConsumerExtraOffsets(t *testing.T) {
 	// Given
 	// Given
-	broker0 := NewMockBroker(t, 0)
-	fetchResponse1 := &FetchResponse{}
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3)
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4)
-	fetchResponse2 := &FetchResponse{}
-	fetchResponse2.AddError("my_topic", 0, ErrNoError)
-	broker0.SetHandlerByMap(map[string]MockResponse{
-		"MetadataRequest": NewMockMetadataResponse(t).
-			SetBroker(broker0.Addr(), broker0.BrokerID()).
-			SetLeader("my_topic", 0, broker0.BrokerID()),
-		"OffsetRequest": NewMockOffsetResponse(t).
-			SetOffset("my_topic", 0, OffsetNewest, 1234).
-			SetOffset("my_topic", 0, OffsetOldest, 0),
-		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
-	})
+	legacyFetchResponse := &FetchResponse{}
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
+	newFetchResponse := &FetchResponse{Version: 4}
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
+	newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
+	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
+		var offsetResponseVersion int16
+		cfg := NewConfig()
+		if fetchResponse1.Version >= 4 {
+			cfg.Version = V0_11_0_0
+			offsetResponseVersion = 1
+		}
 
 
-	master, err := NewConsumer([]string{broker0.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
+		broker0 := NewMockBroker(t, 0)
+		fetchResponse2 := &FetchResponse{}
+		fetchResponse2.Version = fetchResponse1.Version
+		fetchResponse2.AddError("my_topic", 0, ErrNoError)
+		broker0.SetHandlerByMap(map[string]MockResponse{
+			"MetadataRequest": NewMockMetadataResponse(t).
+				SetBroker(broker0.Addr(), broker0.BrokerID()).
+				SetLeader("my_topic", 0, broker0.BrokerID()),
+			"OffsetRequest": NewMockOffsetResponse(t).
+				SetVersion(offsetResponseVersion).
+				SetOffset("my_topic", 0, OffsetNewest, 1234).
+				SetOffset("my_topic", 0, OffsetOldest, 0),
+			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
+		})
+
+		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+		if err != nil {
+			t.Fatal(err)
+		}
 
 
-	// When
-	consumer, err := master.ConsumePartition("my_topic", 0, 3)
-	if err != nil {
-		t.Fatal(err)
-	}
+		// When
+		consumer, err := master.ConsumePartition("my_topic", 0, 3)
+		if err != nil {
+			t.Fatal(err)
+		}
 
 
-	// Then: messages with offsets 1 and 2 are not returned even though they
-	// are present in the response.
-	assertMessageOffset(t, <-consumer.Messages(), 3)
-	assertMessageOffset(t, <-consumer.Messages(), 4)
+		// Then: messages with offsets 1 and 2 are not returned even though they
+		// are present in the response.
+		assertMessageOffset(t, <-consumer.Messages(), 3)
+		assertMessageOffset(t, <-consumer.Messages(), 4)
 
 
-	safeClose(t, consumer)
-	safeClose(t, master)
-	broker0.Close()
+		safeClose(t, consumer)
+		safeClose(t, master)
+		broker0.Close()
+	}
 }
 }
 
 
 // It is fine if offsets of fetched messages are not sequential (although
 // It is fine if offsets of fetched messages are not sequential (although
 // strictly increasing!).
 // strictly increasing!).
 func TestConsumerNonSequentialOffsets(t *testing.T) {
 func TestConsumerNonSequentialOffsets(t *testing.T) {
 	// Given
 	// Given
-	broker0 := NewMockBroker(t, 0)
-	fetchResponse1 := &FetchResponse{}
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
-	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11)
-	fetchResponse2 := &FetchResponse{}
-	fetchResponse2.AddError("my_topic", 0, ErrNoError)
-	broker0.SetHandlerByMap(map[string]MockResponse{
-		"MetadataRequest": NewMockMetadataResponse(t).
-			SetBroker(broker0.Addr(), broker0.BrokerID()).
-			SetLeader("my_topic", 0, broker0.BrokerID()),
-		"OffsetRequest": NewMockOffsetResponse(t).
-			SetOffset("my_topic", 0, OffsetNewest, 1234).
-			SetOffset("my_topic", 0, OffsetOldest, 0),
-		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
-	})
+	legacyFetchResponse := &FetchResponse{}
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
+	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
+	newFetchResponse := &FetchResponse{Version: 4}
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
+	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
+	newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
+	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
+		var offsetResponseVersion int16
+		cfg := NewConfig()
+		if fetchResponse1.Version >= 4 {
+			cfg.Version = V0_11_0_0
+			offsetResponseVersion = 1
+		}
 
 
-	master, err := NewConsumer([]string{broker0.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
+		broker0 := NewMockBroker(t, 0)
+		fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
+		fetchResponse2.AddError("my_topic", 0, ErrNoError)
+		broker0.SetHandlerByMap(map[string]MockResponse{
+			"MetadataRequest": NewMockMetadataResponse(t).
+				SetBroker(broker0.Addr(), broker0.BrokerID()).
+				SetLeader("my_topic", 0, broker0.BrokerID()),
+			"OffsetRequest": NewMockOffsetResponse(t).
+				SetVersion(offsetResponseVersion).
+				SetOffset("my_topic", 0, OffsetNewest, 1234).
+				SetOffset("my_topic", 0, OffsetOldest, 0),
+			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
+		})
+
+		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+		if err != nil {
+			t.Fatal(err)
+		}
 
 
-	// When
-	consumer, err := master.ConsumePartition("my_topic", 0, 3)
-	if err != nil {
-		t.Fatal(err)
-	}
+		// When
+		consumer, err := master.ConsumePartition("my_topic", 0, 3)
+		if err != nil {
+			t.Fatal(err)
+		}
 
 
-	// Then: messages with offsets 1 and 2 are not returned even though they
-	// are present in the response.
-	assertMessageOffset(t, <-consumer.Messages(), 5)
-	assertMessageOffset(t, <-consumer.Messages(), 7)
-	assertMessageOffset(t, <-consumer.Messages(), 11)
+		// Then: messages with offsets 1 and 2 are not returned even though they
+		// are present in the response.
+		assertMessageOffset(t, <-consumer.Messages(), 5)
+		assertMessageOffset(t, <-consumer.Messages(), 7)
+		assertMessageOffset(t, <-consumer.Messages(), 11)
 
 
-	safeClose(t, consumer)
-	safeClose(t, master)
-	broker0.Close()
+		safeClose(t, consumer)
+		safeClose(t, master)
+		broker0.Close()
+	}
 }
 }
 
 
 // If leadership for a partition is changing then consumer resolves the new
 // If leadership for a partition is changing then consumer resolves the new

+ 22 - 2
fetch_request.go

@@ -29,16 +29,27 @@ type FetchRequest struct {
 	MinBytes    int32
 	MinBytes    int32
 	MaxBytes    int32
 	MaxBytes    int32
 	Version     int16
 	Version     int16
+	Isolation   IsolationLevel
 	blocks      map[string]map[int32]*fetchRequestBlock
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
 }
 
 
+type IsolationLevel int8
+
+const (
+	ReadUncommitted IsolationLevel = 0
+	ReadCommitted   IsolationLevel = 1
+)
+
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(r.MaxWaitTime)
 	pe.putInt32(r.MaxWaitTime)
 	pe.putInt32(r.MinBytes)
 	pe.putInt32(r.MinBytes)
-	if r.Version == 3 {
+	if r.Version >= 3 {
 		pe.putInt32(r.MaxBytes)
 		pe.putInt32(r.MaxBytes)
 	}
 	}
+	if r.Version >= 4 {
+		pe.putInt8(int8(r.Isolation))
+	}
 	err = pe.putArrayLength(len(r.blocks))
 	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -74,11 +85,18 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
-	if r.Version == 3 {
+	if r.Version >= 3 {
 		if r.MaxBytes, err = pd.getInt32(); err != nil {
 		if r.MaxBytes, err = pd.getInt32(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
+	if r.Version >= 4 {
+		isolation, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		r.Isolation = IsolationLevel(isolation)
+	}
 	topicCount, err := pd.getArrayLength()
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -128,6 +146,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 		return V0_10_0_0
 		return V0_10_0_0
 	case 3:
 	case 3:
 		return V0_10_1_0
 		return V0_10_1_0
+	case 4:
+		return V0_11_0_0
 	default:
 	default:
 		return minVersion
 		return minVersion
 	}
 	}

+ 14 - 0
fetch_request_test.go

@@ -17,6 +17,15 @@ var (
 		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
 		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
 		0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
+
+	fetchRequestOneBlockV4 = []byte{
+		0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0xFF,
+		0x01,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
 )
 )
 
 
 func TestFetchRequest(t *testing.T) {
 func TestFetchRequest(t *testing.T) {
@@ -31,4 +40,9 @@ func TestFetchRequest(t *testing.T) {
 	request.MinBytes = 0
 	request.MinBytes = 0
 	request.AddBlock("topic", 0x12, 0x34, 0x56)
 	request.AddBlock("topic", 0x12, 0x34, 0x56)
 	testRequest(t, "one block", request, fetchRequestOneBlock)
 	testRequest(t, "one block", request, fetchRequestOneBlock)
+
+	request.Version = 4
+	request.MaxBytes = 0xFF
+	request.Isolation = ReadCommitted
+	testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
 }
 }

+ 124 - 12
fetch_response.go

@@ -2,13 +2,39 @@ package sarama
 
 
 import "time"
 import "time"
 
 
+type AbortedTransaction struct {
+	ProducerID  int64
+	FirstOffset int64
+}
+
+func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
+	if t.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	if t.FirstOffset, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
+	pe.putInt64(t.ProducerID)
+	pe.putInt64(t.FirstOffset)
+
+	return nil
+}
+
 type FetchResponseBlock struct {
 type FetchResponseBlock struct {
 	Err                 KError
 	Err                 KError
 	HighWaterMarkOffset int64
 	HighWaterMarkOffset int64
-	MsgSet              MessageSet
+	LastStableOffset    int64
+	AbortedTransactions []*AbortedTransaction
+	Records             Records
 }
 }
 
 
-func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	tmp, err := pd.getInt16()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -20,27 +46,75 @@ func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	msgSetSize, err := pd.getInt32()
+	if version >= 4 {
+		b.LastStableOffset, err = pd.getInt64()
+		if err != nil {
+			return err
+		}
+
+		numTransact, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+
+		if numTransact >= 0 {
+			b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
+		}
+
+		for i := 0; i < numTransact; i++ {
+			transact := new(AbortedTransaction)
+			if err = transact.decode(pd); err != nil {
+				return err
+			}
+			b.AbortedTransactions[i] = transact
+		}
+	}
+
+	recordsSize, err := pd.getInt32()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	msgSetDecoder, err := pd.getSubset(int(msgSetSize))
+	recordsDecoder, err := pd.getSubset(int(recordsSize))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	err = (&b.MsgSet).decode(msgSetDecoder)
+	var records Records
+	if version >= 4 {
+		records = newDefaultRecords(nil)
+	} else {
+		records = newLegacyRecords(nil)
+	}
+	if recordsSize > 0 {
+		if err = records.decode(recordsDecoder); err != nil {
+			return err
+		}
+	}
+	b.Records = records
 
 
-	return err
+	return nil
 }
 }
 
 
-func (b *FetchResponseBlock) encode(pe packetEncoder) (err error) {
+func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(int16(b.Err))
 	pe.putInt16(int16(b.Err))
 
 
 	pe.putInt64(b.HighWaterMarkOffset)
 	pe.putInt64(b.HighWaterMarkOffset)
 
 
+	if version >= 4 {
+		pe.putInt64(b.LastStableOffset)
+
+		if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
+			return err
+		}
+		for _, transact := range b.AbortedTransactions {
+			if err = transact.encode(pe); err != nil {
+				return err
+			}
+		}
+	}
+
 	pe.push(&lengthField{})
 	pe.push(&lengthField{})
-	err = b.MsgSet.encode(pe)
+	err = b.Records.encode(pe)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -90,7 +164,7 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 			}
 			}
 
 
 			block := new(FetchResponseBlock)
 			block := new(FetchResponseBlock)
-			err = block.decode(pd)
+			err = block.decode(pd, version)
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
@@ -124,7 +198,7 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
 
 
 		for id, block := range partitions {
 		for id, block := range partitions {
 			pe.putInt32(id)
 			pe.putInt32(id)
-			err = block.encode(pe)
+			err = block.encode(pe, r.Version)
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
@@ -148,6 +222,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
 		return V0_9_0_0
 		return V0_9_0_0
 	case 2:
 	case 2:
 		return V0_10_0_0
 		return V0_10_0_0
+	case 3:
+		return V0_10_1_0
+	case 4:
+		return V0_11_0_0
 	default:
 	default:
 		return minVersion
 		return minVersion
 	}
 	}
@@ -182,7 +260,7 @@ func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
 	frb.Err = err
 	frb.Err = err
 }
 }
 
 
-func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
 	if r.Blocks == nil {
 	if r.Blocks == nil {
 		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
 		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
 	}
 	}
@@ -196,6 +274,11 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
 		frb = new(FetchResponseBlock)
 		frb = new(FetchResponseBlock)
 		partitions[partition] = frb
 		partitions[partition] = frb
 	}
 	}
+
+	return frb
+}
+
+func encodeKV(key, value Encoder) ([]byte, []byte) {
 	var kb []byte
 	var kb []byte
 	var vb []byte
 	var vb []byte
 	if key != nil {
 	if key != nil {
@@ -204,7 +287,36 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
 	if value != nil {
 	if value != nil {
 		vb, _ = value.Encode()
 		vb, _ = value.Encode()
 	}
 	}
+
+	return kb, vb
+}
+
+func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+	frb := r.getOrCreateBlock(topic, partition)
+	kb, vb := encodeKV(key, value)
 	msg := &Message{Key: kb, Value: vb}
 	msg := &Message{Key: kb, Value: vb}
 	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
 	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
-	frb.MsgSet.Messages = append(frb.MsgSet.Messages, msgBlock)
+	set := frb.Records.msgSet
+	if set == nil {
+		set = &MessageSet{}
+		frb.Records = newLegacyRecords(set)
+	}
+	set.Messages = append(set.Messages, msgBlock)
+}
+
+func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
+	frb := r.getOrCreateBlock(topic, partition)
+	kb, vb := encodeKV(key, value)
+	rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
+	batch := frb.Records.recordBatch
+	if batch == nil {
+		batch = &RecordBatch{Version: 2}
+		frb.Records = newDefaultRecords(batch)
+	}
+	batch.addRecord(rec)
+}
+
+func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
+	frb := r.getOrCreateBlock(topic, partition)
+	frb.LastStableOffset = offset
 }
 }

+ 94 - 3
fetch_response_test.go

@@ -26,6 +26,43 @@ var (
 		0x00,
 		0x00,
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
+
+	oneRecordFetchResponse = []byte{
+		0x00, 0x00, 0x00, 0x00, // ThrottleTime
+		0x00, 0x00, 0x00, 0x01, // Number of Topics
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
+		0x00, 0x00, 0x00, 0x01, // Number of Partitions
+		0x00, 0x00, 0x00, 0x05, // Partition
+		0x00, 0x01, // Error
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
+		0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
+		0x00, 0x00, 0x00, 0x52, // Records length
+		// recordBatch
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x46,
+		0x00, 0x00, 0x00, 0x00,
+		0x02,
+		0xDB, 0x47, 0x14, 0xC9,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		// record
+		0x28,
+		0x00,
+		0x0A,
+		0x00,
+		0x08, 0x01, 0x02, 0x03, 0x04,
+		0x06, 0x05, 0x06, 0x07,
+		0x02,
+		0x06, 0x08, 0x09, 0x0A,
+		0x04, 0x0B, 0x0C,
+	}
 )
 )
 
 
 func TestEmptyFetchResponse(t *testing.T) {
 func TestEmptyFetchResponse(t *testing.T) {
@@ -60,14 +97,22 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	if block.HighWaterMarkOffset != 0x10101010 {
 	if block.HighWaterMarkOffset != 0x10101010 {
 		t.Error("Decoding didn't produce correct high water mark offset.")
 		t.Error("Decoding didn't produce correct high water mark offset.")
 	}
 	}
-	if block.MsgSet.PartialTrailingMessage {
+	partial, err := block.Records.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if partial {
 		t.Error("Decoding detected a partial trailing message where there wasn't one.")
 		t.Error("Decoding detected a partial trailing message where there wasn't one.")
 	}
 	}
 
 
-	if len(block.MsgSet.Messages) != 1 {
+	n, err := block.Records.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 1 {
 		t.Fatal("Decoding produced incorrect number of messages.")
 		t.Fatal("Decoding produced incorrect number of messages.")
 	}
 	}
-	msgBlock := block.MsgSet.Messages[0]
+	msgBlock := block.Records.msgSet.Messages[0]
 	if msgBlock.Offset != 0x550000 {
 	if msgBlock.Offset != 0x550000 {
 		t.Error("Decoding produced incorrect message offset.")
 		t.Error("Decoding produced incorrect message offset.")
 	}
 	}
@@ -82,3 +127,49 @@ func TestOneMessageFetchResponse(t *testing.T) {
 		t.Error("Decoding produced incorrect message value.")
 		t.Error("Decoding produced incorrect message value.")
 	}
 	}
 }
 }
+
+func TestOneRecordFetchResponse(t *testing.T) {
+	response := FetchResponse{}
+	testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
+
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != ErrOffsetOutOfRange {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	partial, err := block.Records.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if partial {
+		t.Error("Decoding detected a partial trailing record where there wasn't one.")
+	}
+
+	n, err := block.Records.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 1 {
+		t.Fatal("Decoding produced incorrect number of records.")
+	}
+	rec := block.Records.recordBatch.Records[0]
+	if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
+		t.Error("Decoding produced incorrect record key.")
+	}
+	if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
+		t.Error("Decoding produced incorrect record value.")
+	}
+}

+ 3 - 19
message.go

@@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error {
 	pe.putInt8(attributes)
 	pe.putInt8(attributes)
 
 
 	if m.Version >= 1 {
 	if m.Version >= 1 {
-		timestamp := int64(-1)
-
-		if !m.Timestamp.Before(time.Unix(0, 0)) {
-			timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
-		} else if !m.Timestamp.IsZero() {
-			return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
+		if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
+			return err
 		}
 		}
-
-		pe.putInt64(timestamp)
 	}
 	}
 
 
 	err := pe.putBytes(m.Key)
 	err := pe.putBytes(m.Key)
@@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 
 
 	if m.Version == 1 {
 	if m.Version == 1 {
-		millis, err := pd.getInt64()
-		if err != nil {
+		if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
 			return err
 			return err
 		}
 		}
-
-		// negative timestamps are invalid, in these cases we should return
-		// a zero time
-		timestamp := time.Time{}
-		if millis >= 0 {
-			timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
-		}
-
-		m.Timestamp = timestamp
 	}
 	}
 
 
 	m.Key, err = pd.getBytes()
 	m.Key, err = pd.getBytes()

+ 7 - 1
mockresponses.go

@@ -122,6 +122,7 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 type MockOffsetResponse struct {
 type MockOffsetResponse struct {
 	offsets map[string]map[int32]map[int64]int64
 	offsets map[string]map[int32]map[int64]int64
 	t       TestReporter
 	t       TestReporter
+	version int16
 }
 }
 
 
 func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
 func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
@@ -131,6 +132,11 @@ func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
 	}
 	}
 }
 }
 
 
+func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
+	mor.version = version
+	return mor
+}
+
 func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
 func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
 	partitions := mor.offsets[topic]
 	partitions := mor.offsets[topic]
 	if partitions == nil {
 	if partitions == nil {
@@ -148,7 +154,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of
 
 
 func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
 func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
 	offsetRequest := reqBody.(*OffsetRequest)
 	offsetRequest := reqBody.(*OffsetRequest)
-	offsetResponse := &OffsetResponse{}
+	offsetResponse := &OffsetResponse{Version: mor.version}
 	for topic, partitions := range offsetRequest.blocks {
 	for topic, partitions := range offsetRequest.blocks {
 		for partition, block := range partitions {
 		for partition, block := range partitions {
 			offset := mor.getOffset(topic, partition, block.time)
 			offset := mor.getOffset(topic, partition, block.time)

+ 7 - 3
record.go

@@ -1,5 +1,7 @@
 package sarama
 package sarama
 
 
+import "time"
+
 const (
 const (
 	controlMask = 0x20
 	controlMask = 0x20
 )
 )
@@ -29,7 +31,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) {
 
 
 type Record struct {
 type Record struct {
 	Attributes     int8
 	Attributes     int8
-	TimestampDelta int64
+	TimestampDelta time.Duration
 	OffsetDelta    int64
 	OffsetDelta    int64
 	Key            []byte
 	Key            []byte
 	Value          []byte
 	Value          []byte
@@ -41,7 +43,7 @@ type Record struct {
 func (r *Record) encode(pe packetEncoder) error {
 func (r *Record) encode(pe packetEncoder) error {
 	pe.push(&r.length)
 	pe.push(&r.length)
 	pe.putInt8(r.Attributes)
 	pe.putInt8(r.Attributes)
-	pe.putVarint(r.TimestampDelta)
+	pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
 	pe.putVarint(r.OffsetDelta)
 	pe.putVarint(r.OffsetDelta)
 	if err := pe.putVarintBytes(r.Key); err != nil {
 	if err := pe.putVarintBytes(r.Key); err != nil {
 		return err
 		return err
@@ -69,9 +71,11 @@ func (r *Record) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	if r.TimestampDelta, err = pd.getVarint(); err != nil {
+	timestamp, err := pd.getVarint()
+	if err != nil {
 		return err
 		return err
 	}
 	}
+	r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
 
 
 	if r.OffsetDelta, err = pd.getVarint(); err != nil {
 	if r.OffsetDelta, err = pd.getVarint(); err != nil {
 		return err
 		return err

+ 14 - 6
record_batch.go

@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"compress/gzip"
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
+	"time"
 
 
 	"github.com/eapache/go-xerial-snappy"
 	"github.com/eapache/go-xerial-snappy"
 	"github.com/pierrec/lz4"
 	"github.com/pierrec/lz4"
@@ -41,8 +42,8 @@ type RecordBatch struct {
 	Codec                 CompressionCodec
 	Codec                 CompressionCodec
 	Control               bool
 	Control               bool
 	LastOffsetDelta       int32
 	LastOffsetDelta       int32
-	FirstTimestamp        int64
-	MaxTimestamp          int64
+	FirstTimestamp        time.Time
+	MaxTimestamp          time.Time
 	ProducerID            int64
 	ProducerID            int64
 	ProducerEpoch         int16
 	ProducerEpoch         int16
 	FirstSequence         int32
 	FirstSequence         int32
@@ -64,8 +65,15 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.putInt16(b.computeAttributes())
 	pe.putInt16(b.computeAttributes())
 	pe.putInt32(b.LastOffsetDelta)
 	pe.putInt32(b.LastOffsetDelta)
-	pe.putInt64(b.FirstTimestamp)
-	pe.putInt64(b.MaxTimestamp)
+
+	if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
+		return err
+	}
+
+	if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
+		return err
+	}
+
 	pe.putInt64(b.ProducerID)
 	pe.putInt64(b.ProducerID)
 	pe.putInt16(b.ProducerEpoch)
 	pe.putInt16(b.ProducerEpoch)
 	pe.putInt32(b.FirstSequence)
 	pe.putInt32(b.FirstSequence)
@@ -122,11 +130,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	if b.FirstTimestamp, err = pd.getInt64(); err != nil {
+	if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	if b.MaxTimestamp, err = pd.getInt64(); err != nil {
+	if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
 		return err
 		return err
 	}
 	}
 
 

+ 41 - 25
record_test.go

@@ -6,6 +6,7 @@ import (
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/davecgh/go-spew/spew"
 	"github.com/davecgh/go-spew/spew"
 )
 )
@@ -17,8 +18,13 @@ var recordBatchTestCases = []struct {
 	oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
 	oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
 }{
 }{
 	{
 	{
-		name:  "empty record",
-		batch: RecordBatch{Version: 2, Records: []*Record{}},
+		name: "empty record",
+		batch: RecordBatch{
+			Version:        2,
+			FirstTimestamp: time.Unix(0, 0),
+			MaxTimestamp:   time.Unix(0, 0),
+			Records:        []*Record{},
+		},
 		encoded: []byte{
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 49, // Length
 			0, 0, 0, 49, // Length
@@ -36,8 +42,14 @@ var recordBatchTestCases = []struct {
 		},
 		},
 	},
 	},
 	{
 	{
-		name:  "control batch",
-		batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
+		name: "control batch",
+		batch: RecordBatch{
+			Version:        2,
+			Control:        true,
+			FirstTimestamp: time.Unix(0, 0),
+			MaxTimestamp:   time.Unix(0, 0),
+			Records:        []*Record{},
+		},
 		encoded: []byte{
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 49, // Length
 			0, 0, 0, 49, // Length
@@ -58,9 +70,10 @@ var recordBatchTestCases = []struct {
 		name: "uncompressed record",
 		name: "uncompressed record",
 		batch: RecordBatch{
 		batch: RecordBatch{
 			Version:        2,
 			Version:        2,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
 				Headers: []*RecordHeader{{
@@ -74,10 +87,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 70, // Length
 			0, 0, 0, 70, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
 			2,                // Version
 			2,                // Version
-			219, 71, 20, 201, // CRC
+			84, 121, 97, 253, // CRC
 			0, 0, // Attributes
 			0, 0, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
 			0, 0, // Producer Epoch
@@ -103,9 +116,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 		batch: RecordBatch{
 			Version:        2,
 			Version:        2,
 			Codec:          CompressionGZIP,
 			Codec:          CompressionGZIP,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
 				Headers: []*RecordHeader{{
@@ -118,11 +132,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 94, // Length
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			15, 156, 184, 78, // CRC
+			2,                  // Version
+			159, 236, 182, 189, // CRC
 			0, 1, // Attributes
 			0, 1, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
 			0, 0, // Producer Epoch
@@ -136,10 +150,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 94, // Length
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
 			2,               // Version
 			2,               // Version
-			144, 168, 0, 33, // CRC
+			0, 216, 14, 210, // CRC
 			0, 1, // Attributes
 			0, 1, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
 			0, 0, // Producer Epoch
@@ -154,9 +168,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 		batch: RecordBatch{
 			Version:        2,
 			Version:        2,
 			Codec:          CompressionSnappy,
 			Codec:          CompressionSnappy,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
 				Headers: []*RecordHeader{{
@@ -169,11 +184,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 72, // Length
 			0, 0, 0, 72, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,               // Version
-			95, 173, 35, 17, // CRC
+			2,              // Version
+			21, 0, 159, 97, // CRC
 			0, 2, // Attributes
 			0, 2, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
 			0, 0, // Producer Epoch
@@ -187,9 +202,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 		batch: RecordBatch{
 			Version:        2,
 			Version:        2,
 			Codec:          CompressionLZ4,
 			Codec:          CompressionLZ4,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
 				Headers: []*RecordHeader{{
@@ -202,11 +218,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 89, // Length
 			0, 0, 0, 89, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			129, 238, 43, 82, // CRC
+			2,                 // Version
+			169, 74, 119, 197, // CRC
 			0, 3, // Attributes
 			0, 3, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
 			0, 0, // Producer Epoch

+ 40 - 0
timestamp.go

@@ -0,0 +1,40 @@
+package sarama
+
+import (
+	"fmt"
+	"time"
+)
+
+type Timestamp struct {
+	*time.Time
+}
+
+func (t Timestamp) encode(pe packetEncoder) error {
+	timestamp := int64(-1)
+
+	if !t.Before(time.Unix(0, 0)) {
+		timestamp = t.UnixNano() / int64(time.Millisecond)
+	} else if !t.IsZero() {
+		return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)}
+	}
+
+	pe.putInt64(timestamp)
+	return nil
+}
+
+func (t Timestamp) decode(pd packetDecoder) error {
+	millis, err := pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	// negative timestamps are invalid, in these cases we should return
+	// a zero time
+	timestamp := time.Time{}
+	if millis >= 0 {
+		timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
+	}
+
+	*t.Time = timestamp
+	return nil
+}

+ 1 - 0
utils.go

@@ -146,5 +146,6 @@ var (
 	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
 	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
 	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
 	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
 	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
 	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
+	V0_11_0_0  = newKafkaVersion(0, 11, 0, 0)
 	minVersion = V0_8_2_0
 	minVersion = V0_8_2_0
 )
 )