瀏覽代碼

Merge pull request #769 from mathpl/feature/expose_high_water_mark

Add Consumer.HighWaterMarks()
Evan Huus 9 年之前
父節點
當前提交
4d11317f35
共有 2 個文件被更改,包括 36 次插入0 次删除
  1. 20 0
      consumer.go
  2. 16 0
      mocks/consumer.go

+ 20 - 0
consumer.go

@@ -63,6 +63,10 @@ type Consumer interface {
 	// or OffsetOldest
 	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
 
+	// HighWaterMarks returns the current high water marks for each topic and partition
+	// Consistency between partitions is not garanteed since high water marks are updated separately.
+	HighWaterMarks() map[string]map[int32]int64
+
 	// Close shuts down the consumer. It must be called after all child
 	// PartitionConsumers have already been closed.
 	Close() error
@@ -163,6 +167,22 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
 	return child, nil
 }
 
+func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	hwms := make(map[string]map[int32]int64)
+	for topic, p := range c.children {
+		hwm := make(map[int32]int64, len(p))
+		for partition, pc := range p {
+			hwm[partition] = pc.HighWaterMarkOffset()
+		}
+		hwms[topic] = hwm
+	}
+
+	return hwms
+}
+
 func (c *consumer) addChild(child *partitionConsumer) error {
 	c.lock.Lock()
 	defer c.lock.Unlock()

+ 16 - 0
mocks/consumer.go

@@ -96,6 +96,22 @@ func (c *Consumer) Partitions(topic string) ([]int32, error) {
 	return c.metadata[topic], nil
 }
 
+func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	hwms := make(map[string]map[int32]int64, len(c.partitionConsumers))
+	for topic, partitionConsumers := range c.partitionConsumers {
+		hwm := make(map[int32]int64, len(partitionConsumers))
+		for partition, pc := range partitionConsumers {
+			hwm[partition] = pc.HighWaterMarkOffset()
+		}
+		hwms[topic] = hwm
+	}
+
+	return hwms
+}
+
 // Close implements the Close method from the sarama.Consumer interface. It will close
 // all registered PartitionConsumer instances.
 func (c *Consumer) Close() error {