Browse Source

Add method to inspect OffsetCommitRequest offsets

Maxim Vladimirskiy 7 years ago
parent
commit
6e796b14dc
1 changed files with 14 additions and 0 deletions
  1. 14 0
      offset_commit_request.go

+ 14 - 0
offset_commit_request.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "errors"
+
 // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
 // tells the broker to set the timestamp to the time at which the request was received.
 // The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
@@ -188,3 +190,15 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
 
 	r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
 }
+
+func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
+	partitions := r.blocks[topic]
+	if partitions == nil {
+		return 0, "", errors.New("No such offset")
+	}
+	block := partitions[partitionID]
+	if block == nil {
+		return 0, "", errors.New("No such offset")
+	}
+	return block.offset, block.metadata, nil
+}