Преглед на файлове

Merge pull request #1100 from urso/offset-req-replica-id

Optional replica id in offset request
Vlad Gorodetsky преди 7 години
родител
ревизия
49e0aa492d
променени са 2 файла, в които са добавени 45 реда и са изтрити 5 реда
  1. 29 5
      offset_request.go
  2. 16 0
      offset_request_test.go

+ 29 - 5
offset_request.go

@@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
 }
 }
 
 
 type OffsetRequest struct {
 type OffsetRequest struct {
-	Version int16
-	blocks  map[string]map[int32]*offsetRequestBlock
+	Version        int16
+	replicaID      int32
+	isReplicaIDSet bool
+	blocks         map[string]map[int32]*offsetRequestBlock
 }
 }
 
 
 func (r *OffsetRequest) encode(pe packetEncoder) error {
 func (r *OffsetRequest) encode(pe packetEncoder) error {
-	pe.putInt32(-1) // replica ID is always -1 for clients
+	if r.isReplicaIDSet {
+		pe.putInt32(r.replicaID)
+	} else {
+		// default replica ID is always -1 for clients
+		pe.putInt32(-1)
+	}
+
 	err := pe.putArrayLength(len(r.blocks))
 	err := pe.putArrayLength(len(r.blocks))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -59,10 +67,14 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
 func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 	r.Version = version
 	r.Version = version
 
 
-	// Ignore replica ID
-	if _, err := pd.getInt32(); err != nil {
+	replicaID, err := pd.getInt32()
+	if err != nil {
 		return err
 		return err
 	}
 	}
+	if replicaID >= 0 {
+		r.SetReplicaID(replicaID)
+	}
+
 	blockCount, err := pd.getArrayLength()
 	blockCount, err := pd.getArrayLength()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -113,6 +125,18 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	}
 	}
 }
 }
 
 
+func (r *OffsetRequest) SetReplicaID(id int32) {
+	r.replicaID = id
+	r.isReplicaIDSet = true
+}
+
+func (r *OffsetRequest) ReplicaID() int32 {
+	if r.isReplicaIDSet {
+		return r.replicaID
+	}
+	return -1
+}
+
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)

+ 16 - 0
offset_request_test.go

@@ -23,6 +23,10 @@ var (
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x04,
 		0x00, 0x00, 0x00, 0x04,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
+
+	offsetRequestReplicaID = []byte{
+		0x00, 0x00, 0x00, 0x2a,
+		0x00, 0x00, 0x00, 0x00}
 )
 )
 
 
 func TestOffsetRequest(t *testing.T) {
 func TestOffsetRequest(t *testing.T) {
@@ -41,3 +45,15 @@ func TestOffsetRequestV1(t *testing.T) {
 	request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
 	request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
 	testRequest(t, "one block", request, offsetRequestOneBlockV1)
 	testRequest(t, "one block", request, offsetRequestOneBlockV1)
 }
 }
+
+func TestOffsetRequestReplicaID(t *testing.T) {
+	request := new(OffsetRequest)
+	replicaID := int32(42)
+	request.SetReplicaID(replicaID)
+
+	if found := request.ReplicaID(); found != replicaID {
+		t.Errorf("replicaID: expected %v, found %v", replicaID, found)
+	}
+
+	testRequest(t, "with replica ID", request, offsetRequestReplicaID)
+}