Bläddra i källkod

Fix parsing of producer timestamp

Move it to the protocol layer for stronger typing, and parse it properly (it is
in milliseconds, not seconds).

Thanks again to @kchaliki.
Evan Huus 9 år sedan
förälder
incheckning
5246fed60b
2 ändrade filer med 7 tillägg och 4 borttagningar
  1. 1 2
      async_producer.go
  2. 6 2
      produce_response.go

+ 1 - 2
async_producer.go

@@ -728,9 +728,8 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		// Success
 		case ErrNoError:
 			if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) {
-				timestamp := time.Unix(block.Timestamp, 0)
 				for _, msg := range msgs {
-					msg.Timestamp = timestamp
+					msg.Timestamp = block.Timestamp
 				}
 			}
 			for i, msg := range msgs {

+ 6 - 2
produce_response.go

@@ -1,9 +1,11 @@
 package sarama
 
+import "time"
+
 type ProduceResponseBlock struct {
 	Err       KError
 	Offset    int64
-	Timestamp int64 // only provided if Version >= 2
+	Timestamp time.Time // only provided if Version >= 2
 }
 
 func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -19,8 +21,10 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
 	}
 
 	if version >= 2 {
-		if pr.Timestamp, err = pd.getInt64(); err != nil {
+		if millis, err := pd.getInt64(); err != nil {
 			return err
+		} else {
+			pr.Timestamp = time.Unix(millis/1000, millis%1000)
 		}
 	}