consumer_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "testing"
  6. "time"
  7. )
  8. func TestSimpleConsumer(t *testing.T) {
  9. masterResponses := make(chan []byte, 1)
  10. extraResponses := make(chan []byte)
  11. mockBroker := NewMockBroker(t, masterResponses)
  12. mockExtra := NewMockBroker(t, extraResponses)
  13. defer mockBroker.Close()
  14. defer mockExtra.Close()
  15. // return the extra mock as another available broker
  16. response := []byte{
  17. 0x00, 0x00, 0x00, 0x01,
  18. 0x00, 0x00, 0x00, 0x01,
  19. 0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
  20. 0x00, 0x00, 0x00, 0x00,
  21. 0x00, 0x00, 0x00, 0x01,
  22. 0x00, 0x00,
  23. 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
  24. 0x00, 0x00, 0x00, 0x01,
  25. 0x00, 0x00,
  26. 0x00, 0x00, 0x00, 0x00,
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x00, 0x00, 0x00,
  29. 0x00, 0x00, 0x00, 0x00}
  30. binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
  31. masterResponses <- response
  32. go func() {
  33. for i := 0; i < 10; i++ {
  34. msg := []byte{
  35. 0x00, 0x00, 0x00, 0x01,
  36. 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
  37. 0x00, 0x00, 0x00, 0x01,
  38. 0x00, 0x00, 0x00, 0x00,
  39. 0x00, 0x00,
  40. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  41. 0x00, 0x00, 0x00, 0x1C,
  42. // messageSet
  43. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  44. 0x00, 0x00, 0x00, 0x10,
  45. // message
  46. 0x23, 0x96, 0x4a, 0xf7, // CRC
  47. 0x00,
  48. 0x00,
  49. 0xFF, 0xFF, 0xFF, 0xFF,
  50. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  51. binary.BigEndian.PutUint64(msg[35:], uint64(i))
  52. extraResponses <- msg
  53. }
  54. extraResponses <- []byte{
  55. 0x00, 0x00, 0x00, 0x01,
  56. 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
  57. 0x00, 0x00, 0x00, 0x01,
  58. 0x00, 0x00, 0x00, 0x00,
  59. 0x00, 0x00,
  60. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  61. 0x00, 0x00, 0x00, 0x00}
  62. }()
  63. client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
  64. if err != nil {
  65. t.Fatal(err)
  66. }
  67. defer client.Close()
  68. consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. defer consumer.Close()
  73. for i := 0; i < 10; i++ {
  74. event := <-consumer.Events()
  75. if event.Err != nil {
  76. t.Error(err)
  77. }
  78. if event.Offset != int64(i) {
  79. t.Error("Incorrect message offset!")
  80. }
  81. }
  82. }
  83. func ExampleConsumer() {
  84. client, err := NewClient("myClient", []string{"localhost:9092"}, nil)
  85. if err != nil {
  86. panic(err)
  87. } else {
  88. fmt.Println("> connected")
  89. }
  90. defer client.Close()
  91. consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
  92. if err != nil {
  93. panic(err)
  94. } else {
  95. fmt.Println("> consumer ready")
  96. }
  97. defer consumer.Close()
  98. msgCount := 0
  99. consumerLoop:
  100. for {
  101. select {
  102. case event := <-consumer.Events():
  103. if event.Err != nil {
  104. panic(event.Err)
  105. }
  106. msgCount += 1
  107. case <-time.After(5 * time.Second):
  108. fmt.Println("> timed out")
  109. break consumerLoop
  110. }
  111. }
  112. fmt.Println("Got", msgCount, "messages.")
  113. }