Browse Source

Expose Records union fields

Maxim Vladimirskiy 7 years ago
parent
commit
5468405d94
8 changed files with 53 additions and 53 deletions
  1. 2 2
      consumer.go
  2. 3 3
      fetch_response.go
  3. 3 3
      fetch_response_test.go
  4. 3 3
      produce_request.go
  5. 9 9
      produce_set.go
  6. 2 2
      produce_set_test.go
  7. 27 27
      records.go
  8. 4 4
      records_test.go

+ 2 - 2
consumer.go

@@ -583,14 +583,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 		switch records.recordsType {
 		case legacyRecords:
-			messageSetMessages, err := child.parseMessages(records.msgSet)
+			messageSetMessages, err := child.parseMessages(records.MsgSet)
 			if err != nil {
 				return nil, err
 			}
 
 			messages = append(messages, messageSetMessages...)
 		case defaultRecords:
-			recordBatchMessages, err := child.parseRecords(records.recordBatch)
+			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
 			if err != nil {
 				return nil, err
 			}

+ 3 - 3
fetch_response.go

@@ -353,7 +353,7 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
 		records := newLegacyRecords(&MessageSet{})
 		frb.RecordsSet = []*Records{&records}
 	}
-	set := frb.RecordsSet[0].msgSet
+	set := frb.RecordsSet[0].MsgSet
 	set.Messages = append(set.Messages, msgBlock)
 }
 
@@ -365,7 +365,7 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
 		records := newDefaultRecords(&RecordBatch{Version: 2})
 		frb.RecordsSet = []*Records{&records}
 	}
-	batch := frb.RecordsSet[0].recordBatch
+	batch := frb.RecordsSet[0].RecordBatch
 	batch.addRecord(rec)
 }
 
@@ -375,7 +375,7 @@ func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset
 		records := newDefaultRecords(&RecordBatch{Version: 2})
 		frb.RecordsSet = []*Records{&records}
 	}
-	batch := frb.RecordsSet[0].recordBatch
+	batch := frb.RecordsSet[0].RecordBatch
 	batch.LastOffsetDelta = offset
 }
 

+ 3 - 3
fetch_response_test.go

@@ -132,7 +132,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	if n != 1 {
 		t.Fatal("Decoding produced incorrect number of messages.")
 	}
-	msgBlock := block.RecordsSet[0].msgSet.Messages[0]
+	msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
 	if msgBlock.Offset != 0x550000 {
 		t.Error("Decoding produced incorrect message offset.")
 	}
@@ -185,7 +185,7 @@ func TestOneRecordFetchResponse(t *testing.T) {
 	if n != 1 {
 		t.Fatal("Decoding produced incorrect number of records.")
 	}
-	rec := block.RecordsSet[0].recordBatch.Records[0]
+	rec := block.RecordsSet[0].RecordBatch.Records[0]
 	if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
 		t.Error("Decoding produced incorrect record key.")
 	}
@@ -231,7 +231,7 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
 	if n != 1 {
 		t.Fatal("Decoding produced incorrect number of records.")
 	}
-	msgBlock := block.RecordsSet[0].msgSet.Messages[0]
+	msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
 	if msgBlock.Offset != 0x550000 {
 		t.Error("Decoding produced incorrect message offset.")
 	}

+ 3 - 3
produce_request.go

@@ -113,9 +113,9 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 			}
 			if metricRegistry != nil {
 				if r.Version >= 3 {
-					topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
+					topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
 				} else {
-					topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
+					topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
 				}
 				batchSize := int64(pe.offset() - startOffset)
 				batchSizeMetric.Update(batchSize)
@@ -231,7 +231,7 @@ func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
 
 func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
 	r.ensureRecords(topic, partition)
-	set := r.records[topic][partition].msgSet
+	set := r.records[topic][partition].MsgSet
 
 	if set == nil {
 		set = new(MessageSet)

+ 9 - 9
produce_set.go

@@ -80,7 +80,7 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		rec := &Record{
 			Key:            key,
 			Value:          val,
-			TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp),
+			TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
 		}
 		size += len(key) + len(val)
 		if len(msg.Headers) > 0 {
@@ -90,14 +90,14 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 				size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
 			}
 		}
-		set.recordsToSend.recordBatch.addRecord(rec)
+		set.recordsToSend.RecordBatch.addRecord(rec)
 	} else {
 		msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
 		if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 			msgToSend.Timestamp = timestamp
 			msgToSend.Version = 1
 		}
-		set.recordsToSend.msgSet.addMessage(msgToSend)
+		set.recordsToSend.MsgSet.addMessage(msgToSend)
 		size = producerMessageOverhead + len(key) + len(val)
 	}
 
@@ -123,7 +123,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {
 			if req.Version >= 3 {
-				rb := set.recordsToSend.recordBatch
+				rb := set.recordsToSend.RecordBatch
 				if len(rb.Records) > 0 {
 					rb.LastOffsetDelta = int32(len(rb.Records) - 1)
 					for i, record := range rb.Records {
@@ -135,7 +135,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 				continue
 			}
 			if ps.parent.conf.Producer.Compression == CompressionNone {
-				req.AddSet(topic, partition, set.recordsToSend.msgSet)
+				req.AddSet(topic, partition, set.recordsToSend.MsgSet)
 			} else {
 				// When compression is enabled, the entire set for each partition is compressed
 				// and sent as the payload of a single fake "message" with the appropriate codec
@@ -148,11 +148,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					// recompressing the message set.
 					// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
 					// for details on relative offsets.)
-					for i, msg := range set.recordsToSend.msgSet.Messages {
+					for i, msg := range set.recordsToSend.MsgSet.Messages {
 						msg.Offset = int64(i)
 					}
 				}
-				payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
+				payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
 				if err != nil {
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
@@ -162,11 +162,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
 					Key:              nil,
 					Value:            payload,
-					Set:              set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
+					Set:              set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
 				}
 				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 					compMsg.Version = 1
-					compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp
+					compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
 				}
 				req.AddMessage(topic, partition, compMsg)
 			}

+ 2 - 2
produce_set_test.go

@@ -167,7 +167,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
 		t.Error("Wrong request version")
 	}
 
-	for _, msgBlock := range req.records["t1"][0].msgSet.Messages {
+	for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
 		msg := msgBlock.Msg
 		err := msg.decodeSet()
 		if err != nil {
@@ -227,7 +227,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 		t.Error("Wrong request version")
 	}
 
-	batch := req.records["t1"][0].recordBatch
+	batch := req.records["t1"][0].RecordBatch
 	if batch.FirstTimestamp != now {
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 	}

+ 27 - 27
records.go

@@ -14,30 +14,30 @@ const (
 // Records implements a union type containing either a RecordBatch or a legacy MessageSet.
 type Records struct {
 	recordsType int
-	msgSet      *MessageSet
-	recordBatch *RecordBatch
+	MsgSet      *MessageSet
+	RecordBatch *RecordBatch
 }
 
 func newLegacyRecords(msgSet *MessageSet) Records {
-	return Records{recordsType: legacyRecords, msgSet: msgSet}
+	return Records{recordsType: legacyRecords, MsgSet: msgSet}
 }
 
 func newDefaultRecords(batch *RecordBatch) Records {
-	return Records{recordsType: defaultRecords, recordBatch: batch}
+	return Records{recordsType: defaultRecords, RecordBatch: batch}
 }
 
-// setTypeFromFields sets type of Records depending on which of msgSet or recordBatch is not nil.
+// setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
 // The first return value indicates whether both fields are nil (and the type is not set).
 // If both fields are not nil, it returns an error.
 func (r *Records) setTypeFromFields() (bool, error) {
-	if r.msgSet == nil && r.recordBatch == nil {
+	if r.MsgSet == nil && r.RecordBatch == nil {
 		return true, nil
 	}
-	if r.msgSet != nil && r.recordBatch != nil {
-		return false, fmt.Errorf("both msgSet and recordBatch are set, but record type is unknown")
+	if r.MsgSet != nil && r.RecordBatch != nil {
+		return false, fmt.Errorf("both MsgSet and RecordBatch are set, but record type is unknown")
 	}
 	r.recordsType = defaultRecords
-	if r.msgSet != nil {
+	if r.MsgSet != nil {
 		r.recordsType = legacyRecords
 	}
 	return false, nil
@@ -52,15 +52,15 @@ func (r *Records) encode(pe packetEncoder) error {
 
 	switch r.recordsType {
 	case legacyRecords:
-		if r.msgSet == nil {
+		if r.MsgSet == nil {
 			return nil
 		}
-		return r.msgSet.encode(pe)
+		return r.MsgSet.encode(pe)
 	case defaultRecords:
-		if r.recordBatch == nil {
+		if r.RecordBatch == nil {
 			return nil
 		}
-		return r.recordBatch.encode(pe)
+		return r.RecordBatch.encode(pe)
 	}
 
 	return fmt.Errorf("unknown records type: %v", r.recordsType)
@@ -89,11 +89,11 @@ func (r *Records) decode(pd packetDecoder) error {
 
 	switch r.recordsType {
 	case legacyRecords:
-		r.msgSet = &MessageSet{}
-		return r.msgSet.decode(pd)
+		r.MsgSet = &MessageSet{}
+		return r.MsgSet.decode(pd)
 	case defaultRecords:
-		r.recordBatch = &RecordBatch{}
-		return r.recordBatch.decode(pd)
+		r.RecordBatch = &RecordBatch{}
+		return r.RecordBatch.decode(pd)
 	}
 	return fmt.Errorf("unknown records type: %v", r.recordsType)
 }
@@ -107,15 +107,15 @@ func (r *Records) numRecords() (int, error) {
 
 	switch r.recordsType {
 	case legacyRecords:
-		if r.msgSet == nil {
+		if r.MsgSet == nil {
 			return 0, nil
 		}
-		return len(r.msgSet.Messages), nil
+		return len(r.MsgSet.Messages), nil
 	case defaultRecords:
-		if r.recordBatch == nil {
+		if r.RecordBatch == nil {
 			return 0, nil
 		}
-		return len(r.recordBatch.Records), nil
+		return len(r.RecordBatch.Records), nil
 	}
 	return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
 }
@@ -131,15 +131,15 @@ func (r *Records) isPartial() (bool, error) {
 	case unknownRecords:
 		return false, nil
 	case legacyRecords:
-		if r.msgSet == nil {
+		if r.MsgSet == nil {
 			return false, nil
 		}
-		return r.msgSet.PartialTrailingMessage, nil
+		return r.MsgSet.PartialTrailingMessage, nil
 	case defaultRecords:
-		if r.recordBatch == nil {
+		if r.RecordBatch == nil {
 			return false, nil
 		}
-		return r.recordBatch.PartialTrailingRecord, nil
+		return r.RecordBatch.PartialTrailingRecord, nil
 	}
 	return false, fmt.Errorf("unknown records type: %v", r.recordsType)
 }
@@ -155,10 +155,10 @@ func (r *Records) isControl() (bool, error) {
 	case legacyRecords:
 		return false, nil
 	case defaultRecords:
-		if r.recordBatch == nil {
+		if r.RecordBatch == nil {
 			return false, nil
 		}
-		return r.recordBatch.Control, nil
+		return r.RecordBatch.Control, nil
 	}
 	return false, fmt.Errorf("unknown records type: %v", r.recordsType)
 }

+ 4 - 4
records_test.go

@@ -45,8 +45,8 @@ func TestLegacyRecords(t *testing.T) {
 	if r.recordsType != legacyRecords {
 		t.Fatalf("Wrong records type %v, expected %v", r.recordsType, legacyRecords)
 	}
-	if !reflect.DeepEqual(set, r.msgSet) {
-		t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.msgSet)
+	if !reflect.DeepEqual(set, r.MsgSet) {
+		t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.MsgSet)
 	}
 
 	n, err := r.numRecords()
@@ -113,8 +113,8 @@ func TestDefaultRecords(t *testing.T) {
 	if r.recordsType != defaultRecords {
 		t.Fatalf("Wrong records type %v, expected %v", r.recordsType, defaultRecords)
 	}
-	if !reflect.DeepEqual(batch, r.recordBatch) {
-		t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.recordBatch)
+	if !reflect.DeepEqual(batch, r.RecordBatch) {
+		t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.RecordBatch)
 	}
 
 	n, err := r.numRecords()