|
|
@@ -63,6 +63,9 @@ type Consumer interface {
|
|
|
// or OffsetOldest
|
|
|
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
|
|
|
|
|
|
+ // HighWaterMarks returns the current high water marks for each topic and partitions
|
|
|
+ HighWaterMarks() map[string]map[int32]int64
|
|
|
+
|
|
|
// Close shuts down the consumer. It must be called after all child
|
|
|
// PartitionConsumers have already been closed.
|
|
|
Close() error
|
|
|
@@ -163,6 +166,22 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
|
|
|
return child, nil
|
|
|
}
|
|
|
|
|
|
+func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
|
|
|
+ c.lock.Lock()
|
|
|
+ defer c.lock.Unlock()
|
|
|
+
|
|
|
+ hwms := make(map[string]map[int32]int64)
|
|
|
+ for topic, p := range c.children {
|
|
|
+ hwm := make(map[int32]int64, len(p))
|
|
|
+ for partition, pc := range p {
|
|
|
+ hwm[partition] = pc.HighWaterMarkOffset()
|
|
|
+ }
|
|
|
+ hwms[topic] = hwm
|
|
|
+ }
|
|
|
+
|
|
|
+ return hwms
|
|
|
+}
|
|
|
+
|
|
|
func (c *consumer) addChild(child *partitionConsumer) error {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|