|
@@ -1,6 +1,9 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
+ "log"
|
|
|
+ "os"
|
|
|
+ "os/signal"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
"time"
|
|
@@ -842,3 +845,47 @@ func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int6
|
|
|
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func ExampleConsumer() {
|
|
|
+ consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ if err := consumer.Close(); err != nil {
|
|
|
+ log.Fatalln(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ if err := partitionConsumer.Close(); err != nil {
|
|
|
+ log.Fatalln(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+
|
|
|
+ signals := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(signals, os.Interrupt)
|
|
|
+
|
|
|
+ consumed := 0
|
|
|
+ConsumerLoop:
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case msg := <-partitionConsumer.Messages():
|
|
|
+ log.Printf("Consumed message offset %d\n", msg.Offset)
|
|
|
+ consumed++
|
|
|
+ case <-signals:
|
|
|
+ break ConsumerLoop
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("Consumed: %d\n", consumed)
|
|
|
+}
|