Browse Source

Fix compression timestamps (#759)

Fix message version/timestamp for compressed messages on kafka >= 0.10.0.0

Use default timestamp of time.Now and message version 1 if possible.

The first messages timestamp of a set is a proxy for the actual
timestamp of the group in many cases.
Rene Treffer 8 years ago
parent
commit
133322fbfe
2 changed files with 55 additions and 4 deletions
  1. 13 4
      produce_set.go
  2. 42 0
      produce_set_test.go

+ 13 - 4
produce_set.go

@@ -53,8 +53,12 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 
 	set.msgs = append(set.msgs, msg)
 	msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
-	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) && !msg.Timestamp.IsZero() {
-		msgToSend.Timestamp = msg.Timestamp
+	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+		if msg.Timestamp.IsZero() {
+			msgToSend.Timestamp = time.Now()
+		} else {
+			msgToSend.Timestamp = msg.Timestamp
+		}
 		msgToSend.Version = 1
 	}
 	set.setToSend.addMessage(msgToSend)
@@ -90,11 +94,16 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
 				}
-				req.AddMessage(topic, partition, &Message{
+				compMsg := &Message{
 					Codec: ps.parent.conf.Producer.Compression,
 					Key:   nil,
 					Value: payload,
-				})
+				}
+				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+					compMsg.Version = 1
+					compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp
+				}
+				req.AddMessage(topic, partition, compMsg)
 			}
 		}
 	}

+ 42 - 0
produce_set_test.go

@@ -141,3 +141,45 @@ func TestProduceSetRequestBuilding(t *testing.T) {
 		t.Error("Wrong number of topics in request")
 	}
 }
+
+func TestProduceSetCompressedRequestBuilding(t *testing.T) {
+	parent, ps := makeProduceSet()
+	parent.conf.Producer.RequiredAcks = WaitForAll
+	parent.conf.Producer.Timeout = 10 * time.Second
+	parent.conf.Producer.Compression = CompressionGZIP
+	parent.conf.Version = V0_10_0_0
+
+	msg := &ProducerMessage{
+		Topic:     "t1",
+		Partition: 0,
+		Key:       StringEncoder(TestMessage),
+		Value:     StringEncoder(TestMessage),
+		Timestamp: time.Now(),
+	}
+	for i := 0; i < 10; i++ {
+		safeAddMessage(t, ps, msg)
+	}
+
+	req := ps.buildRequest()
+
+	if req.Version != 2 {
+		t.Error("Wrong request version")
+	}
+
+	for _, msgBlock := range req.msgSets["t1"][0].Messages {
+		msg := msgBlock.Msg
+		err := msg.decodeSet()
+		if err != nil {
+			t.Error("Failed to decode set from payload")
+		}
+		for _, compMsgBlock := range msg.Set.Messages {
+			compMsg := compMsgBlock.Msg
+			if compMsg.Version != 1 {
+				t.Error("Wrong compressed message version")
+			}
+		}
+		if msg.Version != 1 {
+			t.Error("Wrong compressed parent message version")
+		}
+	}
+}