|
@@ -33,7 +33,8 @@ type FetchResponseBlock struct {
|
|
|
HighWaterMarkOffset int64
|
|
HighWaterMarkOffset int64
|
|
|
LastStableOffset int64
|
|
LastStableOffset int64
|
|
|
AbortedTransactions []*AbortedTransaction
|
|
AbortedTransactions []*AbortedTransaction
|
|
|
- Records Records
|
|
|
|
|
|
|
+ RecordsSet []*Records
|
|
|
|
|
+ Partial bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
|
|
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
|
|
@@ -81,15 +82,51 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- if recordsSize > 0 {
|
|
|
|
|
- if err = b.Records.decode(recordsDecoder); err != nil {
|
|
|
|
|
|
|
+
|
|
|
|
|
+ b.RecordsSet = []*Records{}
|
|
|
|
|
+
|
|
|
|
|
+ for {
|
|
|
|
|
+ if recordsDecoder.remaining() == 0 {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ chunk := &Records{}
|
|
|
|
|
+ if err := chunk.decode(recordsDecoder); err != nil {
|
|
|
|
|
+ // If we have at least one decoded record chunk, this is not an error
|
|
|
|
|
+ if err == ErrInsufficientData {
|
|
|
|
|
+ if len(b.RecordsSet) == 0 {
|
|
|
|
|
+ b.Partial = true
|
|
|
|
|
+ }
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // If we have at least one full record chunk, we skip incomplete ones
|
|
|
|
|
+ if chunk.isPartial() && len(b.RecordsSet) > 0 {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ b.RecordsSet = append(b.RecordsSet, chunk)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (b *FetchResponseBlock) numRecords() int {
|
|
|
|
|
+ s := 0
|
|
|
|
|
+
|
|
|
|
|
+ for _, chunk := range b.RecordsSet {
|
|
|
|
|
+ s += chunk.numRecords()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return s
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (b *FetchResponseBlock) isPartial() bool {
|
|
|
|
|
+ return b.Partial || len(b.RecordsSet) == 1 && b.RecordsSet[0].isPartial()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
|
|
func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
|
|
|
pe.putInt16(int16(b.Err))
|
|
pe.putInt16(int16(b.Err))
|
|
|
|
|
|
|
@@ -109,9 +146,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pe.push(&lengthField{})
|
|
pe.push(&lengthField{})
|
|
|
- err = b.Records.encode(pe)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+ for _, chunk := range b.RecordsSet {
|
|
|
|
|
+ err = chunk.encode(pe)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
return pe.pop()
|
|
return pe.pop()
|
|
|
}
|
|
}
|
|
@@ -291,11 +330,10 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
|
|
|
kb, vb := encodeKV(key, value)
|
|
kb, vb := encodeKV(key, value)
|
|
|
msg := &Message{Key: kb, Value: vb}
|
|
msg := &Message{Key: kb, Value: vb}
|
|
|
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
|
|
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
|
|
|
- set := frb.Records.msgSet
|
|
|
|
|
- if set == nil {
|
|
|
|
|
- set = &MessageSet{}
|
|
|
|
|
- frb.Records = newLegacyRecords(set)
|
|
|
|
|
|
|
+ if len(frb.RecordsSet) == 0 {
|
|
|
|
|
+ frb.RecordsSet = []*Records{&Records{msgSet: &MessageSet{}}}
|
|
|
}
|
|
}
|
|
|
|
|
+ set := frb.RecordsSet[0].msgSet
|
|
|
set.Messages = append(set.Messages, msgBlock)
|
|
set.Messages = append(set.Messages, msgBlock)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -303,18 +341,20 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
|
|
|
frb := r.getOrCreateBlock(topic, partition)
|
|
frb := r.getOrCreateBlock(topic, partition)
|
|
|
kb, vb := encodeKV(key, value)
|
|
kb, vb := encodeKV(key, value)
|
|
|
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
|
|
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
|
|
|
- if frb.Records.recordBatchSet == nil {
|
|
|
|
|
- frb.Records = newDefaultRecords([]*RecordBatch{&RecordBatch{Version: 2}})
|
|
|
|
|
|
|
+ if len(frb.RecordsSet) == 0 {
|
|
|
|
|
+ frb.RecordsSet = []*Records{&Records{recordBatch: &RecordBatch{Version: 2}}}
|
|
|
}
|
|
}
|
|
|
- frb.Records.recordBatchSet.batches[0].addRecord(rec)
|
|
|
|
|
|
|
+ batch := frb.RecordsSet[0].recordBatch
|
|
|
|
|
+ batch.addRecord(rec)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
|
|
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
|
|
|
frb := r.getOrCreateBlock(topic, partition)
|
|
frb := r.getOrCreateBlock(topic, partition)
|
|
|
- if frb.Records.recordBatchSet == nil {
|
|
|
|
|
- frb.Records = newDefaultRecords([]*RecordBatch{&RecordBatch{Version: 2}})
|
|
|
|
|
|
|
+ if len(frb.RecordsSet) == 0 {
|
|
|
|
|
+ frb.RecordsSet = []*Records{&Records{recordBatch: &RecordBatch{Version: 2}}}
|
|
|
}
|
|
}
|
|
|
- frb.Records.recordBatchSet.batches[0].LastOffsetDelta = offset
|
|
|
|
|
|
|
+ batch := frb.RecordsSet[0].recordBatch
|
|
|
|
|
+ batch.LastOffsetDelta = offset
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
|
|
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
|