|
@@ -2,8 +2,10 @@ package kafka
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
|
|
|
+ "fmt"
|
|
|
"sarama/mock"
|
|
"sarama/mock"
|
|
|
"testing"
|
|
"testing"
|
|
|
|
|
+ "time"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
func TestSimpleConsumer(t *testing.T) {
|
|
func TestSimpleConsumer(t *testing.T) {
|
|
@@ -87,3 +89,35 @@ func TestSimpleConsumer(t *testing.T) {
|
|
|
consumer.Close()
|
|
consumer.Close()
|
|
|
client.Close()
|
|
client.Close()
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+func ExampleConsumer() {
|
|
|
|
|
+ client, err := NewClient("myClient", "localhost", 9092)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ panic(err)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ fmt.Println("> connected")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ panic(err)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ fmt.Println("> consumer ready")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+consumerLoop:
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case msg := <-consumer.Messages():
|
|
|
|
|
+ fmt.Println(msg)
|
|
|
|
|
+ case err := <-consumer.Errors():
|
|
|
|
|
+ panic(err)
|
|
|
|
|
+ case <-time.After(5 * time.Second):
|
|
|
|
|
+ fmt.Println("> timed out")
|
|
|
|
|
+ break consumerLoop
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ consumer.Close()
|
|
|
|
|
+ client.Close()
|
|
|
|
|
+}
|