Bläddra i källkod

Support and provide producer message timestamps

- Support v0/v1/v2 of Produce requests in the protocol layer.
- Enforce the correct message version for Kafka version at the Broker.
- Use v2 messages in the producer if the Kafka version supports it, and pass the
  resulting timestamp back to the user in the ProducerMessage.
Evan Huus 9 år sedan
förälder
incheckning
90b61cbf50
5 ändrade filer med 54 tillägg och 7 borttagningar
  1. 11 0
      async_producer.go
  2. 15 0
      broker.go
  3. 2 1
      produce_request.go
  4. 23 6
      produce_response.go
  5. 3 0
      produce_set.go

+ 11 - 0
async_producer.go

@@ -135,6 +135,11 @@ type ProducerMessage struct {
 	// Partition is the partition that the message was sent to. This is only
 	// guaranteed to be defined if the message was successfully delivered.
 	Partition int32
+	// Timestamp is the timestamp assigned to the message by the broker. This
+	// is only guaranteed to be defined if the message was successfully
+	// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
+	// least version 0.10.0.
+	Timestamp time.Time
 
 	retries int
 	flags   flagSet
@@ -722,6 +727,12 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		switch block.Err {
 		// Success
 		case ErrNoError:
+			if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+				timestamp := time.Unix(block.Timestamp, 0)
+				for _, msg := range msgs {
+					msg.Timestamp = timestamp
+				}
+			}
 			for i, msg := range msgs {
 				msg.Offset = block.Offset + int64(i)
 			}

+ 15 - 0
broker.go

@@ -198,6 +198,21 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
 }
 
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
+	switch request.version() {
+	case 0:
+		break
+	case 1:
+		if !b.conf.Version.IsAtLeast(V0_9_0_0) {
+			return nil, ErrUnsupportedVersion
+		}
+	case 2:
+		if !b.conf.Version.IsAtLeast(V0_10_0_0) {
+			return nil, ErrUnsupportedVersion
+		}
+	default:
+		return nil, ErrUnsupportedVersion
+	}
+
 	var response *ProduceResponse
 	var err error
 

+ 2 - 1
produce_request.go

@@ -19,6 +19,7 @@ const (
 type ProduceRequest struct {
 	RequiredAcks RequiredAcks
 	Timeout      int32
+	Version      int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
 	msgSets      map[string]map[int32]*MessageSet
 }
 
@@ -110,7 +111,7 @@ func (p *ProduceRequest) key() int16 {
 }
 
 func (p *ProduceRequest) version() int16 {
-	return 0
+	return p.Version
 }
 
 func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {

+ 23 - 6
produce_response.go

@@ -1,11 +1,12 @@
 package sarama
 
 type ProduceResponseBlock struct {
-	Err    KError
-	Offset int64
+	Err       KError
+	Offset    int64
+	Timestamp int64 // only provided if Version >= 2
 }
 
-func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
+func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
@@ -17,14 +18,24 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	if version >= 2 {
+		if pr.Timestamp, err = pd.getInt64(); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
 type ProduceResponse struct {
-	Blocks map[string]map[int32]*ProduceResponseBlock
+	Blocks       map[string]map[int32]*ProduceResponseBlock
+	Version      int16
+	ThrottleTime int32 // only provided if Version >= 1
 }
 
 func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
+	pr.Version = version
+
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -51,7 +62,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 			}
 
 			block := new(ProduceResponseBlock)
-			err = block.decode(pd)
+			err = block.decode(pd, version)
 			if err != nil {
 				return err
 			}
@@ -59,6 +70,12 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 		}
 	}
 
+	if pr.Version >= 1 {
+		if pr.ThrottleTime, err = pd.getInt32(); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -90,7 +107,7 @@ func (r *ProduceResponse) key() int16 {
 }
 
 func (r *ProduceResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {

+ 3 - 0
produce_set.go

@@ -67,6 +67,9 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 		RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
 		Timeout:      int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
 	}
+	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+		req.Version = 2
+	}
 
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {