123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- package sarama
- import (
- "bytes"
- "testing"
- )
- var (
- emptyFetchResponse = []byte{
- 0x00, 0x00, 0x00, 0x00}
- oneMessageFetchResponse = []byte{
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x00, 0x00, 0x05,
- 0x00, 0x01,
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
- 0x00, 0x00, 0x00, 0x1C,
- // messageSet
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x10,
- // message
- 0x23, 0x96, 0x4a, 0xf7, // CRC
- 0x00,
- 0x00,
- 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
- overflowMessageFetchResponse = []byte{
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
- 0x00, 0x00, 0x00, 0x01,
- 0x00, 0x00, 0x00, 0x05,
- 0x00, 0x01,
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
- 0x00, 0x00, 0x00, 0x30,
- // messageSet
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x10,
- // message
- 0x23, 0x96, 0x4a, 0xf7, // CRC
- 0x00,
- 0x00,
- 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
- // overflow messageSet
- 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0xFF,
- // overflow bytes
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
- oneRecordFetchResponse = []byte{
- 0x00, 0x00, 0x00, 0x00, // ThrottleTime
- 0x00, 0x00, 0x00, 0x01, // Number of Topics
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
- 0x00, 0x00, 0x00, 0x01, // Number of Partitions
- 0x00, 0x00, 0x00, 0x05, // Partition
- 0x00, 0x01, // Error
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
- 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
- 0x00, 0x00, 0x00, 0x52, // Records length
- // recordBatch
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x46,
- 0x00, 0x00, 0x00, 0x00,
- 0x02,
- 0xDB, 0x47, 0x14, 0xC9,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x01,
- // record
- 0x28,
- 0x00,
- 0x0A,
- 0x00,
- 0x08, 0x01, 0x02, 0x03, 0x04,
- 0x06, 0x05, 0x06, 0x07,
- 0x02,
- 0x06, 0x08, 0x09, 0x0A,
- 0x04, 0x0B, 0x0C}
- partialFetchResponse = []byte{
- 0x00, 0x00, 0x00, 0x00, // ThrottleTime
- 0x00, 0x00, 0x00, 0x01, // Number of Topics
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
- 0x00, 0x00, 0x00, 0x01, // Number of Partitions
- 0x00, 0x00, 0x00, 0x05, // Partition
- 0x00, 0x00, // Error
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
- 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
- 0x00, 0x00, 0x00, 0x40, // Records length
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x46,
- 0x00, 0x00, 0x00, 0x00,
- 0x02,
- 0xDB, 0x47, 0x14, 0xC9,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x01,
- // record
- 0x28,
- 0x00,
- 0x00,
- }
- oneMessageFetchResponseV4 = []byte{
- 0x00, 0x00, 0x00, 0x00, // ThrottleTime
- 0x00, 0x00, 0x00, 0x01, // Number of Topics
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
- 0x00, 0x00, 0x00, 0x01, // Number of Partitions
- 0x00, 0x00, 0x00, 0x05, // Partition
- 0x00, 0x01, // Error
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
- 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
- 0x00, 0x00, 0x00, 0x1C,
- // messageSet
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x10,
- // message
- 0x23, 0x96, 0x4a, 0xf7, // CRC
- 0x00,
- 0x00,
- 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
- preferredReplicaFetchResponseV11 = []byte{
- 0x00, 0x00, 0x00, 0x00, // ThrottleTime
- 0x00, 0x02, // ErrorCode
- 0x00, 0x00, 0x00, 0xAC, // SessionID
- 0x00, 0x00, 0x00, 0x01, // Number of Topics
- 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
- 0x00, 0x00, 0x00, 0x01, // Number of Partitions
- 0x00, 0x00, 0x00, 0x05, // Partition
- 0x00, 0x01, // Error
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
- 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x09, // Last Stable Offset
- 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, // Log Start Offset
- 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
- 0x00, 0x00, 0x00, 0x03, // Preferred Read Replica
- 0x00, 0x00, 0x00, 0x1C,
- // messageSet
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x10,
- // message
- 0x23, 0x96, 0x4a, 0xf7, // CRC
- 0x00,
- 0x00,
- 0xFF, 0xFF, 0xFF, 0xFF,
- 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
- )
- func TestEmptyFetchResponse(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
- if len(response.Blocks) != 0 {
- t.Error("Decoding produced topic blocks where there were none.")
- }
- }
- func TestOneMessageFetchResponse(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrOffsetOutOfRange {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- partial, err := block.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if partial {
- t.Error("Decoding detected a partial trailing message where there wasn't one.")
- }
- n, err := block.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 1 {
- t.Fatal("Decoding produced incorrect number of messages.")
- }
- msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
- if msgBlock.Offset != 0x550000 {
- t.Error("Decoding produced incorrect message offset.")
- }
- msg := msgBlock.Msg
- if msg.Codec != CompressionNone {
- t.Error("Decoding produced incorrect message compression.")
- }
- if msg.Key != nil {
- t.Error("Decoding produced message key where there was none.")
- }
- if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
- t.Error("Decoding produced incorrect message value.")
- }
- }
- func TestOverflowMessageFetchResponse(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrOffsetOutOfRange {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- partial, err := block.Records.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if partial {
- t.Error("Decoding detected a partial trailing message where there wasn't one.")
- }
- overflow, err := block.Records.isOverflow()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if !overflow {
- t.Error("Decoding detected a partial trailing message where there wasn't one.")
- }
- n, err := block.Records.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 1 {
- t.Fatal("Decoding produced incorrect number of messages.")
- }
- msgBlock := block.Records.MsgSet.Messages[0]
- if msgBlock.Offset != 0x550000 {
- t.Error("Decoding produced incorrect message offset.")
- }
- msg := msgBlock.Msg
- if msg.Codec != CompressionNone {
- t.Error("Decoding produced incorrect message compression.")
- }
- if msg.Key != nil {
- t.Error("Decoding produced message key where there was none.")
- }
- if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
- t.Error("Decoding produced incorrect message value.")
- }
- }
- func TestOneRecordFetchResponse(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrOffsetOutOfRange {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- partial, err := block.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if partial {
- t.Error("Decoding detected a partial trailing record where there wasn't one.")
- }
- n, err := block.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 1 {
- t.Fatal("Decoding produced incorrect number of records.")
- }
- rec := block.RecordsSet[0].RecordBatch.Records[0]
- if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
- t.Error("Decoding produced incorrect record key.")
- }
- if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
- t.Error("Decoding produced incorrect record value.")
- }
- }
- func TestPartailFetchResponse(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4)
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrNoError {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- partial, err := block.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if !partial {
- t.Error("Decoding not a partial trailing record")
- }
- n, err := block.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 0 {
- t.Fatal("Decoding produced incorrect number of records.")
- }
- }
- func TestOneMessageFetchResponseV4(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrOffsetOutOfRange {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- partial, err := block.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if partial {
- t.Error("Decoding detected a partial trailing record where there wasn't one.")
- }
- n, err := block.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 1 {
- t.Fatal("Decoding produced incorrect number of records.")
- }
- msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
- if msgBlock.Offset != 0x550000 {
- t.Error("Decoding produced incorrect message offset.")
- }
- msg := msgBlock.Msg
- if msg.Codec != CompressionNone {
- t.Error("Decoding produced incorrect message compression.")
- }
- if msg.Key != nil {
- t.Error("Decoding produced message key where there was none.")
- }
- if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
- t.Error("Decoding produced incorrect message value.")
- }
- }
- func TestPreferredReplicaFetchResponseV11(t *testing.T) {
- response := FetchResponse{}
- testVersionDecodable(
- t, "preferred replica fetch response v11", &response,
- preferredReplicaFetchResponseV11, 11)
- if response.ErrorCode != 0x0002 {
- t.Fatal("Decoding produced incorrect error code.")
- }
- if response.SessionID != 0x000000AC {
- t.Fatal("Decoding produced incorrect session ID.")
- }
- if len(response.Blocks) != 1 {
- t.Fatal("Decoding produced incorrect number of topic blocks.")
- }
- if len(response.Blocks["topic"]) != 1 {
- t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
- }
- block := response.GetBlock("topic", 5)
- if block == nil {
- t.Fatal("GetBlock didn't return block.")
- }
- if block.Err != ErrOffsetOutOfRange {
- t.Error("Decoding didn't produce correct error code.")
- }
- if block.HighWaterMarkOffset != 0x10101010 {
- t.Error("Decoding didn't produce correct high water mark offset.")
- }
- if block.LastStableOffset != 0x10101009 {
- t.Error("Decoding didn't produce correct last stable offset.")
- }
- if block.LogStartOffset != 0x01010101 {
- t.Error("Decoding didn't produce correct log start offset.")
- }
- if block.PreferredReadReplica != 0x0003 {
- t.Error("Decoding didn't produce correct preferred read replica.")
- }
- partial, err := block.isPartial()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if partial {
- t.Error("Decoding detected a partial trailing record where there wasn't one.")
- }
- n, err := block.numRecords()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- if n != 1 {
- t.Fatal("Decoding produced incorrect number of records.")
- }
- msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
- if msgBlock.Offset != 0x550000 {
- t.Error("Decoding produced incorrect message offset.")
- }
- msg := msgBlock.Msg
- if msg.Codec != CompressionNone {
- t.Error("Decoding produced incorrect message compression.")
- }
- if msg.Key != nil {
- t.Error("Decoding produced message key where there was none.")
- }
- if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
- t.Error("Decoding produced incorrect message value.")
- }
- }
|