Jelajahi Sumber

Optional replica id in offset request

urso 8 tahun lalu
induk
melakukan
76f99f0b41
1 mengubah file dengan 16 tambahan dan 3 penghapusan
  1. 16 3
      offset_request.go

+ 16 - 3
offset_request.go

@@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
 }
 
 type OffsetRequest struct {
-	Version int16
-	blocks  map[string]map[int32]*offsetRequestBlock
+	Version        int16
+	replicaID      *int32
+	storeReplicaID int32
+	blocks         map[string]map[int32]*offsetRequestBlock
 }
 
 func (r *OffsetRequest) encode(pe packetEncoder) error {
-	pe.putInt32(-1) // replica ID is always -1 for clients
+	if r.replicaID == nil {
+		// default replica ID is always -1 for clients
+		pe.putInt32(-1)
+	} else {
+		pe.putInt32(*r.replicaID)
+	}
+
 	err := pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
@@ -113,6 +121,11 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	}
 }
 
+func (r *OffsetRequest) SetReplicaID(id int32) {
+	r.storeReplicaID = id
+	r.replicaID = &r.storeReplicaID
+}
+
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)