浏览代码

Merge pull request #1002 from emfree/assign_relative_offsets

Assign relative offsets in compressed message sets
Evan Huus 8 年之前
父节点
当前提交
f144d1183f
共有 2 个文件被更改,包括 15 次插入1 次删除
  1. 11 0
      produce_set.go
  2. 4 1
      produce_set_test.go

+ 11 - 0
produce_set.go

@@ -132,6 +132,17 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 				// and sent as the payload of a single fake "message" with the appropriate codec
 				// set and no key. When the server sees a message with a compression codec, it
 				// decompresses the payload and treats the result as its message set.
+
+				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+					// If our version is 0.10 or later, assign relative offsets
+					// to the inner messages. This lets the broker avoid
+					// recompressing the message set.
+					// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
+					// for details on relative offsets.)
+					for i, msg := range set.recordsToSend.msgSet.Messages {
+						msg.Offset = int64(i)
+					}
+				}
 				payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
 				if err != nil {
 					Logger.Println(err) // if this happens, it's basically our fault.

+ 4 - 1
produce_set_test.go

@@ -173,11 +173,14 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
 		if err != nil {
 			t.Error("Failed to decode set from payload")
 		}
-		for _, compMsgBlock := range msg.Set.Messages {
+		for i, compMsgBlock := range msg.Set.Messages {
 			compMsg := compMsgBlock.Msg
 			if compMsg.Version != 1 {
 				t.Error("Wrong compressed message version")
 			}
+			if compMsgBlock.Offset != int64(i) {
+				t.Error("Wrong relative inner offset")
+			}
 		}
 		if msg.Version != 1 {
 			t.Error("Wrong compressed parent message version")