123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package sarama
- import (
- "testing"
- "time"
- )
- var (
- produceRequestEmpty = []byte{
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00}
- produceRequestHeader = []byte{
- 0x01, 0x23,
- 0x00, 0x00, 0x04, 0x44,
- 0x00, 0x00, 0x00, 0x00}
- produceRequestOneMessage = []byte{
- 0x01, 0x23,
- 0x00, 0x00, 0x04, 0x44,
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x00, 0x00, 0xAD,
- 0x00, 0x00, 0x00, 0x1C,
- // messageSet
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x10,
- // message
- 0x23, 0x96, 0x4a, 0xf7, // CRC
- 0x00,
- 0x00,
- 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
- produceRequestOneRecord = []byte{
- 0xFF, 0xFF, // Transaction ID
- 0x01, 0x23, // Required Acks
- 0x00, 0x00, 0x04, 0x44, // Timeout
- 0x00, 0x00, 0x00, 0x01, // Number of Topics
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
- 0x00, 0x00, 0x00, 0x01, // Number of Partitions
- 0x00, 0x00, 0x00, 0xAD, // Partition
- 0x00, 0x00, 0x00, 0x52, // Records length
- // recordBatch
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x46,
- 0x00, 0x00, 0x00, 0x00,
- 0x02,
- 0xCA, 0x33, 0xBC, 0x05,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x01,
- // record
- 0x28,
- 0x00,
- 0x0A,
- 0x00,
- 0x08, 0x01, 0x02, 0x03, 0x04,
- 0x06, 0x05, 0x06, 0x07,
- 0x02,
- 0x06, 0x08, 0x09, 0x0A,
- 0x04, 0x0B, 0x0C,
- }
- )
- func TestProduceRequest(t *testing.T) {
- request := new(ProduceRequest)
- testRequest(t, "empty", request, produceRequestEmpty)
- request.RequiredAcks = 0x123
- request.Timeout = 0x444
- testRequest(t, "header", request, produceRequestHeader)
- request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
- testRequest(t, "one message", request, produceRequestOneMessage)
- request.Version = 3
- batch := &RecordBatch{
- LastOffsetDelta: 1,
- Version: 2,
- FirstTimestamp: time.Unix(1479847795, 0),
- MaxTimestamp: time.Unix(0, 0),
- Records: []*Record{{
- TimestampDelta: 5 * time.Millisecond,
- Key: []byte{0x01, 0x02, 0x03, 0x04},
- Value: []byte{0x05, 0x06, 0x07},
- Headers: []*RecordHeader{{
- Key: []byte{0x08, 0x09, 0x0A},
- Value: []byte{0x0B, 0x0C},
- }},
- }},
- }
- request.AddBatch("topic", 0xAD, batch)
- packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
- // compressRecords field is not populated on decoding because consumers
- // are only interested in decoded records.
- batch.compressedRecords = nil
- testRequestDecode(t, "one record", request, packet)
- }
|