|
@@ -28,17 +28,17 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
|
|
|
|
|
|
type OffsetRequest struct {
|
|
|
Version int16
|
|
|
- replicaID *int32
|
|
|
- storeReplicaID int32
|
|
|
+ replicaID int32
|
|
|
+ isReplicaIDSet bool
|
|
|
blocks map[string]map[int32]*offsetRequestBlock
|
|
|
}
|
|
|
|
|
|
func (r *OffsetRequest) encode(pe packetEncoder) error {
|
|
|
- if r.replicaID == nil {
|
|
|
+ if r.isReplicaIDSet {
|
|
|
+ pe.putInt32(r.replicaID)
|
|
|
+ } else {
|
|
|
|
|
|
pe.putInt32(-1)
|
|
|
- } else {
|
|
|
- pe.putInt32(*r.replicaID)
|
|
|
}
|
|
|
|
|
|
err := pe.putArrayLength(len(r.blocks))
|
|
@@ -126,15 +126,15 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
|
|
|
}
|
|
|
|
|
|
func (r *OffsetRequest) SetReplicaID(id int32) {
|
|
|
- r.storeReplicaID = id
|
|
|
- r.replicaID = &r.storeReplicaID
|
|
|
+ r.replicaID = id
|
|
|
+ r.isReplicaIDSet = true
|
|
|
}
|
|
|
|
|
|
func (r *OffsetRequest) ReplicaID() int32 {
|
|
|
- if r.replicaID == nil {
|
|
|
- return -1
|
|
|
+ if r.isReplicaIDSet {
|
|
|
+ return r.replicaID
|
|
|
}
|
|
|
- return r.storeReplicaID
|
|
|
+ return -1
|
|
|
}
|
|
|
|
|
|
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
|