Просмотр исходного кода

feat: protocol support for V11 fetch w/ rackID

These are the initial protocol beginnings to add support for follower
fetching (KIP-392). This commit simply adds configuration support for a
consumer to specify its rack identifier and to flow it in a V11
FetchRequest and correctly decode the "preferredReplica" field in the
response. Currently no further action is taken. A future PR will be
required to act upon this response.

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 4 лет назад
Родитель
Сommit
48ba0ca84f
6 измененных файлов с 169 добавлено и 12 удалено
  1. 4 0
      config.go
  2. 4 1
      consumer.go
  3. 14 3
      fetch_request.go
  4. 30 0
      fetch_request_test.go
  5. 20 8
      fetch_response.go
  6. 97 0
      fetch_response_test.go

+ 4 - 0
config.go

@@ -396,6 +396,10 @@ type Config struct {
 	// debugging, and auditing purposes. Defaults to "sarama", but you should
 	// probably set it to something specific to your application.
 	ClientID string
+	// A rack identifier for this client. This can be any string value which
+	// indicates where this client is physically located.
+	// It corresponds with the broker config 'broker.rack'
+	RackID string
 	// The number of events to buffer in internal and external channels. This
 	// permits the producer and consumer to continue processing some messages
 	// in the background while user code is working, greatly improving throughput.

+ 4 - 1
consumer.go

@@ -887,10 +887,13 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		request.Version = 4
 		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
 	}
-
 	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
 		request.Version = 10
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
+		request.Version = 11
+		request.RackID = bc.consumer.conf.RackID
+	}
 
 	for child := range bc.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

+ 14 - 3
fetch_request.go

@@ -55,6 +55,7 @@ type FetchRequest struct {
 	SessionEpoch int32
 	blocks       map[string]map[int32]*fetchRequestBlock
 	forgotten    map[string][]int32
+	RackID       string
 }
 
 type IsolationLevel int8
@@ -118,6 +119,12 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 			}
 		}
 	}
+	if r.Version >= 11 {
+		err = pe.putString(r.RackID)
+		if err != nil {
+			return err
+		}
+	}
 
 	return nil
 }
@@ -192,9 +199,6 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 		if err != nil {
 			return err
 		}
-		if forgottenCount == 0 {
-			return nil
-		}
 		r.forgotten = make(map[string][]int32)
 		for i := 0; i < forgottenCount; i++ {
 			topic, err := pd.getString()
@@ -217,6 +221,13 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 		}
 	}
 
+	if r.Version >= 11 {
+		r.RackID, err = pd.getString()
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 

+ 30 - 0
fetch_request_test.go

@@ -26,6 +26,24 @@ var (
 		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
+
+	fetchRequestOneBlockV11 = []byte{
+		0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0xFF,
+		0x01,
+		0x00, 0x00, 0x00, 0xAA, // sessionID
+		0x00, 0x00, 0x00, 0xEE, // sessionEpoch
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x12, // partitionID
+		0xFF, 0xFF, 0xFF, 0xFF, // currentLeaderEpoch
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
+		0x00, 0x00, 0x00, 0x56, // maxBytes
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x06, 'r', 'a', 'c', 'k', '0', '1', // rackID
+	}
 )
 
 func TestFetchRequest(t *testing.T) {
@@ -57,4 +75,16 @@ func TestFetchRequest(t *testing.T) {
 		request.AddBlock("topic", 0x12, 0x34, 0x56)
 		testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
 	})
+
+	t.Run("one block v11 rackid", func(t *testing.T) {
+		request := new(FetchRequest)
+		request.Version = 11
+		request.MaxBytes = 0xFF
+		request.Isolation = ReadCommitted
+		request.SessionID = 0xAA
+		request.SessionEpoch = 0xEE
+		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		request.RackID = "rack01"
+		testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
+	})
 }

+ 20 - 8
fetch_response.go

@@ -30,14 +30,15 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
 }
 
 type FetchResponseBlock struct {
-	Err                 KError
-	HighWaterMarkOffset int64
-	LastStableOffset    int64
-	LogStartOffset      int64
-	AbortedTransactions []*AbortedTransaction
-	Records             *Records // deprecated: use FetchResponseBlock.RecordsSet
-	RecordsSet          []*Records
-	Partial             bool
+	Err                  KError
+	HighWaterMarkOffset  int64
+	LastStableOffset     int64
+	LogStartOffset       int64
+	AbortedTransactions  []*AbortedTransaction
+	PreferredReadReplica int32
+	Records              *Records // deprecated: use FetchResponseBlock.RecordsSet
+	RecordsSet           []*Records
+	Partial              bool
 }
 
 func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -83,6 +84,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
 		}
 	}
 
+	if version >= 11 {
+		b.PreferredReadReplica, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	recordsSize, err := pd.getInt32()
 	if err != nil {
 		return err
@@ -188,6 +196,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
 		}
 	}
 
+	if version >= 11 {
+		pe.putInt32(b.PreferredReadReplica)
+	}
+
 	pe.push(&lengthField{})
 	for _, records := range b.RecordsSet {
 		err = records.encode(pe)

+ 97 - 0
fetch_response_test.go

@@ -137,6 +137,31 @@ var (
 		0x00,
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
+
+	preferredReplicaFetchResponseV11 = []byte{
+		0x00, 0x00, 0x00, 0x00, // ThrottleTime
+		0x00, 0x02, // ErrorCode
+		0x00, 0x00, 0x00, 0xAC, // SessionID
+		0x00, 0x00, 0x00, 0x01, // Number of Topics
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
+		0x00, 0x00, 0x00, 0x01, // Number of Partitions
+		0x00, 0x00, 0x00, 0x05, // Partition
+		0x00, 0x01, // Error
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x09, // Last Stable Offset
+		0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, // Log Start Offset
+		0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
+		0x00, 0x00, 0x00, 0x03, // Preferred Read Replica
+		0x00, 0x00, 0x00, 0x1C,
+		// messageSet
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x10,
+		// message
+		0x23, 0x96, 0x4a, 0xf7, // CRC
+		0x00,
+		0x00,
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
 )
 
 func TestEmptyFetchResponse(t *testing.T) {
@@ -398,3 +423,75 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
 		t.Error("Decoding produced incorrect message value.")
 	}
 }
+
+func TestPreferredReplicaFetchResponseV11(t *testing.T) {
+	response := FetchResponse{}
+	testVersionDecodable(
+		t, "preferred replica fetch response v11", &response,
+		preferredReplicaFetchResponseV11, 11)
+
+	if response.ErrorCode != 0x0002 {
+		t.Fatal("Decoding produced incorrect error code.")
+	}
+
+	if response.SessionID != 0x000000AC {
+		t.Fatal("Decoding produced incorrect session ID.")
+	}
+
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != ErrOffsetOutOfRange {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	if block.LastStableOffset != 0x10101009 {
+		t.Error("Decoding didn't produce correct last stable offset.")
+	}
+	if block.LogStartOffset != 0x01010101 {
+		t.Error("Decoding didn't produce correct log start offset.")
+	}
+	if block.PreferredReadReplica != 0x0003 {
+		t.Error("Decoding didn't produce correct preferred read replica.")
+	}
+	partial, err := block.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if partial {
+		t.Error("Decoding detected a partial trailing record where there wasn't one.")
+	}
+
+	n, err := block.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 1 {
+		t.Fatal("Decoding produced incorrect number of records.")
+	}
+	msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
+	if msgBlock.Offset != 0x550000 {
+		t.Error("Decoding produced incorrect message offset.")
+	}
+	msg := msgBlock.Msg
+	if msg.Codec != CompressionNone {
+		t.Error("Decoding produced incorrect message compression.")
+	}
+	if msg.Key != nil {
+		t.Error("Decoding produced message key where there was none.")
+	}
+	if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
+		t.Error("Decoding produced incorrect message value.")
+	}
+}