|
@@ -1,7 +1,9 @@
|
|
|
package sarama
|
|
package sarama
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
- "fmt"
|
|
|
|
|
|
|
+ "log"
|
|
|
|
|
+ "os"
|
|
|
|
|
+ "os/signal"
|
|
|
"sync"
|
|
"sync"
|
|
|
"testing"
|
|
"testing"
|
|
|
"time"
|
|
"time"
|
|
@@ -243,54 +245,55 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
seedBroker.Close()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// This example shows how to use a consumer with a select statement
|
|
|
|
|
+// dealing with the different channels.
|
|
|
func ExampleConsumer_select() {
|
|
func ExampleConsumer_select() {
|
|
|
master, err := NewConsumer([]string{"localhost:9092"}, nil)
|
|
master, err := NewConsumer([]string{"localhost:9092"}, nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
|
|
- } else {
|
|
|
|
|
- fmt.Println("> master consumer ready")
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if err := master.Close(); err != nil {
|
|
if err := master.Close(); err != nil {
|
|
|
- panic(err)
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
consumer, err := master.ConsumePartition("my_topic", 0, 0)
|
|
consumer, err := master.ConsumePartition("my_topic", 0, 0)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
|
|
- } else {
|
|
|
|
|
- fmt.Println("> consumer ready")
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if err := consumer.Close(); err != nil {
|
|
if err := consumer.Close(); err != nil {
|
|
|
- panic(err)
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
msgCount := 0
|
|
msgCount := 0
|
|
|
|
|
|
|
|
|
|
+ signals := make(chan os.Signal, 1)
|
|
|
|
|
+ signal.Notify(signals, os.Interrupt)
|
|
|
|
|
+
|
|
|
consumerLoop:
|
|
consumerLoop:
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
case err := <-consumer.Errors():
|
|
case err := <-consumer.Errors():
|
|
|
- panic(err)
|
|
|
|
|
|
|
+ log.Println(err)
|
|
|
case <-consumer.Messages():
|
|
case <-consumer.Messages():
|
|
|
msgCount++
|
|
msgCount++
|
|
|
- case <-time.After(5 * time.Second):
|
|
|
|
|
- fmt.Println("> timed out")
|
|
|
|
|
|
|
+ case <-signals:
|
|
|
|
|
+ log.Println("Received interrupt")
|
|
|
break consumerLoop
|
|
break consumerLoop
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- fmt.Println("Got", msgCount, "messages.")
|
|
|
|
|
|
|
+ log.Println("Processed", msgCount, "messages.")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// This example shows how to use a consumer with different goroutines
|
|
|
|
|
+// to read from the Messages and Errors channels.
|
|
|
func ExampleConsumer_goroutines() {
|
|
func ExampleConsumer_goroutines() {
|
|
|
master, err := NewConsumer([]string{"localhost:9092"}, nil)
|
|
master, err := NewConsumer([]string{"localhost:9092"}, nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
|
|
- } else {
|
|
|
|
|
- fmt.Println("> master consumer ready")
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if err := master.Close(); err != nil {
|
|
if err := master.Close(); err != nil {
|
|
@@ -300,15 +303,8 @@ func ExampleConsumer_goroutines() {
|
|
|
|
|
|
|
|
consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
|
|
consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
|
|
- } else {
|
|
|
|
|
- fmt.Println("> consumer ready")
|
|
|
|
|
|
|
+ log.Fatalln(err)
|
|
|
}
|
|
}
|
|
|
- defer func() {
|
|
|
|
|
- if err := consumer.Close(); err != nil {
|
|
|
|
|
- panic(err)
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
|
|
|
|
|
var (
|
|
var (
|
|
|
wg sync.WaitGroup
|
|
wg sync.WaitGroup
|
|
@@ -319,7 +315,7 @@ func ExampleConsumer_goroutines() {
|
|
|
go func() {
|
|
go func() {
|
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
|
for message := range consumer.Messages() {
|
|
for message := range consumer.Messages() {
|
|
|
- fmt.Printf("Consumed message with offset %d", message.Offset)
|
|
|
|
|
|
|
+ log.Printf("Consumed message with offset %d", message.Offset)
|
|
|
msgCount++
|
|
msgCount++
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
@@ -328,10 +324,17 @@ func ExampleConsumer_goroutines() {
|
|
|
go func() {
|
|
go func() {
|
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
|
for err := range consumer.Errors() {
|
|
for err := range consumer.Errors() {
|
|
|
- fmt.Println(err)
|
|
|
|
|
|
|
+ log.Println(err)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
|
|
+ // Wait for an interrupt signal to trigger the shutdown
|
|
|
|
|
+ signals := make(chan os.Signal, 1)
|
|
|
|
|
+ signal.Notify(signals, os.Interrupt)
|
|
|
|
|
+ <-signals
|
|
|
|
|
+ consumer.AsyncClose()
|
|
|
|
|
+
|
|
|
|
|
+ // Wait for the Messages and Errors channel to be fully drained.
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
- fmt.Println("Got", msgCount, "messages.")
|
|
|
|
|
|
|
+ log.Println("Processed", msgCount, "messages.")
|
|
|
}
|
|
}
|