|
|
@@ -57,9 +57,13 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- event := <-consumer.Messages()
|
|
|
- if event.Offset != int64(i+1234) {
|
|
|
- t.Error("Incorrect message offset!")
|
|
|
+ select {
|
|
|
+ case message := <-consumer.Messages():
|
|
|
+ if message.Offset != int64(i+1234) {
|
|
|
+ t.Error("Incorrect message offset!")
|
|
|
+ }
|
|
|
+ case err := <-consumer.Errors():
|
|
|
+ t.Error(err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -195,6 +199,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
+
|
|
|
+ go func(c *PartitionConsumer) {
|
|
|
+ for err := range c.Errors() {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ }(consumer)
|
|
|
+
|
|
|
wg.Add(1)
|
|
|
go func(partition int32, c *PartitionConsumer) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
@@ -283,7 +294,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
-func ExampleConsumer() {
|
|
|
+func ExampleConsumerWithSelect() {
|
|
|
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
|
|
|
if err != nil {
|
|
|
panic(err)
|
|
|
@@ -323,3 +334,51 @@ consumerLoop:
|
|
|
}
|
|
|
fmt.Println("Got", msgCount, "messages.")
|
|
|
}
|
|
|
+
|
|
|
+func ExampleConsumerWithGoroutines() {
|
|
|
+ client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ } else {
|
|
|
+ fmt.Println("> connected")
|
|
|
+ }
|
|
|
+ defer client.Close()
|
|
|
+
|
|
|
+ master, err := NewConsumer(client, nil)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ } else {
|
|
|
+ fmt.Println("> master consumer ready")
|
|
|
+ }
|
|
|
+
|
|
|
+ consumer, err := master.ConsumePartition("my_topic", 0, nil)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ } else {
|
|
|
+ fmt.Println("> consumer ready")
|
|
|
+ }
|
|
|
+ defer consumer.Close()
|
|
|
+
|
|
|
+ var (
|
|
|
+ wg sync.WaitGroup
|
|
|
+ msgCount int
|
|
|
+ )
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ for message := range consumer.Messages() {
|
|
|
+ fmt.Printf("Consumed message with offset %d", message.Offset)
|
|
|
+ msgCount++
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ for err := range consumer.Errors() {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ fmt.Println("Got", msgCount, "messages.")
|
|
|
+}
|