|
|
@@ -96,6 +96,22 @@ func (c *Consumer) Partitions(topic string) ([]int32, error) {
|
|
|
return c.metadata[topic], nil
|
|
|
}
|
|
|
|
|
|
+func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 {
|
|
|
+ c.l.Lock()
|
|
|
+ defer c.l.Unlock()
|
|
|
+
|
|
|
+ hwms := make(map[string]map[int32]int64, len(c.partitionConsumers))
|
|
|
+ for topic, partitionConsumers := range c.partitionConsumers {
|
|
|
+ hwm := make(map[int32]int64, len(partitionConsumers))
|
|
|
+ for partition, pc := range partitionConsumers {
|
|
|
+ hwm[partition] = pc.HighWaterMarkOffset()
|
|
|
+ }
|
|
|
+ hwms[topic] = hwm
|
|
|
+ }
|
|
|
+
|
|
|
+ return hwms
|
|
|
+}
|
|
|
+
|
|
|
// Close implements the Close method from the sarama.Consumer interface. It will close
|
|
|
// all registered PartitionConsumer instances.
|
|
|
func (c *Consumer) Close() error {
|