broker_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. func ExampleBroker() error {
  7. broker := NewBroker("localhost:9092")
  8. err := broker.Open(nil)
  9. if err != nil {
  10. return err
  11. }
  12. request := MetadataRequest{Topics: []string{"myTopic"}}
  13. response, err := broker.GetMetadata(&request)
  14. if err != nil {
  15. _ = broker.Close()
  16. return err
  17. }
  18. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  19. return broker.Close()
  20. }
  21. type mockEncoder struct {
  22. bytes []byte
  23. }
  24. func (m mockEncoder) encode(pe packetEncoder) error {
  25. return pe.putRawBytes(m.bytes)
  26. }
  27. func TestBrokerAccessors(t *testing.T) {
  28. broker := NewBroker("abc:123")
  29. if broker.ID() != -1 {
  30. t.Error("New broker didn't have an ID of -1.")
  31. }
  32. if broker.Addr() != "abc:123" {
  33. t.Error("New broker didn't have the correct address")
  34. }
  35. broker.id = 34
  36. if broker.ID() != 34 {
  37. t.Error("Manually setting broker ID did not take effect.")
  38. }
  39. }
  40. func TestSimpleBrokerCommunication(t *testing.T) {
  41. mb := newMockBroker(t, 0)
  42. defer mb.Close()
  43. broker := NewBroker(mb.Addr())
  44. err := broker.Open(nil)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. for _, tt := range brokerTestTable {
  49. mb.Returns(&mockEncoder{tt.response})
  50. }
  51. for _, tt := range brokerTestTable {
  52. tt.runner(t, broker)
  53. }
  54. err = broker.Close()
  55. if err != nil {
  56. t.Error(err)
  57. }
  58. }
  59. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  60. var brokerTestTable = []struct {
  61. response []byte
  62. runner func(*testing.T, *Broker)
  63. }{
  64. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  65. func(t *testing.T, broker *Broker) {
  66. request := MetadataRequest{}
  67. response, err := broker.GetMetadata(&request)
  68. if err != nil {
  69. t.Error(err)
  70. }
  71. if response == nil {
  72. t.Error("Metadata request got no response!")
  73. }
  74. }},
  75. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  76. func(t *testing.T, broker *Broker) {
  77. request := ConsumerMetadataRequest{}
  78. response, err := broker.GetConsumerMetadata(&request)
  79. if err != nil {
  80. t.Error(err)
  81. }
  82. if response == nil {
  83. t.Error("Consumer Metadata request got no response!")
  84. }
  85. }},
  86. {[]byte{},
  87. func(t *testing.T, broker *Broker) {
  88. request := ProduceRequest{}
  89. request.RequiredAcks = NoResponse
  90. response, err := broker.Produce(&request)
  91. if err != nil {
  92. t.Error(err)
  93. }
  94. if response != nil {
  95. t.Error("Produce request with NoResponse got a response!")
  96. }
  97. }},
  98. {[]byte{0x00, 0x00, 0x00, 0x00},
  99. func(t *testing.T, broker *Broker) {
  100. request := ProduceRequest{}
  101. request.RequiredAcks = WaitForLocal
  102. response, err := broker.Produce(&request)
  103. if err != nil {
  104. t.Error(err)
  105. }
  106. if response == nil {
  107. t.Error("Produce request without NoResponse got no response!")
  108. }
  109. }},
  110. {[]byte{0x00, 0x00, 0x00, 0x00},
  111. func(t *testing.T, broker *Broker) {
  112. request := FetchRequest{}
  113. response, err := broker.Fetch(&request)
  114. if err != nil {
  115. t.Error(err)
  116. }
  117. if response == nil {
  118. t.Error("Fetch request got no response!")
  119. }
  120. }},
  121. {[]byte{0x00, 0x00, 0x00, 0x00},
  122. func(t *testing.T, broker *Broker) {
  123. request := OffsetFetchRequest{}
  124. response, err := broker.FetchOffset(&request)
  125. if err != nil {
  126. t.Error(err)
  127. }
  128. if response == nil {
  129. t.Error("OffsetFetch request got no response!")
  130. }
  131. }},
  132. {[]byte{0x00, 0x00, 0x00, 0x00},
  133. func(t *testing.T, broker *Broker) {
  134. request := OffsetCommitRequest{}
  135. response, err := broker.CommitOffset(&request)
  136. if err != nil {
  137. t.Error(err)
  138. }
  139. if response == nil {
  140. t.Error("OffsetCommit request got no response!")
  141. }
  142. }},
  143. {[]byte{0x00, 0x00, 0x00, 0x00},
  144. func(t *testing.T, broker *Broker) {
  145. request := OffsetRequest{}
  146. response, err := broker.GetAvailableOffsets(&request)
  147. if err != nil {
  148. t.Error(err)
  149. }
  150. if response == nil {
  151. t.Error("Offset request got no response!")
  152. }
  153. }},
  154. }