broker_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. func ExampleBroker() {
  7. broker := NewBroker("localhost:9092")
  8. err := broker.Open(nil)
  9. if err != nil {
  10. panic(err)
  11. }
  12. request := MetadataRequest{Topics: []string{"myTopic"}}
  13. response, err := broker.GetMetadata(&request)
  14. if err != nil {
  15. _ = broker.Close()
  16. panic(err)
  17. }
  18. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  19. if err = broker.Close(); err != nil {
  20. panic(err)
  21. }
  22. }
  23. type mockEncoder struct {
  24. bytes []byte
  25. }
  26. func (m mockEncoder) encode(pe packetEncoder) error {
  27. return pe.putRawBytes(m.bytes)
  28. }
  29. func TestBrokerAccessors(t *testing.T) {
  30. broker := NewBroker("abc:123")
  31. if broker.ID() != -1 {
  32. t.Error("New broker didn't have an ID of -1.")
  33. }
  34. if broker.Addr() != "abc:123" {
  35. t.Error("New broker didn't have the correct address")
  36. }
  37. broker.id = 34
  38. if broker.ID() != 34 {
  39. t.Error("Manually setting broker ID did not take effect.")
  40. }
  41. }
  42. func TestSimpleBrokerCommunication(t *testing.T) {
  43. mb := NewMockBroker(t, 0)
  44. defer mb.Close()
  45. broker := NewBroker(mb.Addr())
  46. conf := NewConfig()
  47. conf.Version = V0_10_0_0
  48. err := broker.Open(conf)
  49. if err != nil {
  50. t.Fatal(err)
  51. }
  52. for _, tt := range brokerTestTable {
  53. mb.Returns(&mockEncoder{tt.response})
  54. }
  55. for _, tt := range brokerTestTable {
  56. tt.runner(t, broker)
  57. }
  58. err = broker.Close()
  59. if err != nil {
  60. t.Error(err)
  61. }
  62. }
  63. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  64. var brokerTestTable = []struct {
  65. response []byte
  66. runner func(*testing.T, *Broker)
  67. }{
  68. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  69. func(t *testing.T, broker *Broker) {
  70. request := MetadataRequest{}
  71. response, err := broker.GetMetadata(&request)
  72. if err != nil {
  73. t.Error(err)
  74. }
  75. if response == nil {
  76. t.Error("Metadata request got no response!")
  77. }
  78. }},
  79. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  80. func(t *testing.T, broker *Broker) {
  81. request := ConsumerMetadataRequest{}
  82. response, err := broker.GetConsumerMetadata(&request)
  83. if err != nil {
  84. t.Error(err)
  85. }
  86. if response == nil {
  87. t.Error("Consumer Metadata request got no response!")
  88. }
  89. }},
  90. {[]byte{},
  91. func(t *testing.T, broker *Broker) {
  92. request := ProduceRequest{}
  93. request.RequiredAcks = NoResponse
  94. response, err := broker.Produce(&request)
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. if response != nil {
  99. t.Error("Produce request with NoResponse got a response!")
  100. }
  101. }},
  102. {[]byte{0x00, 0x00, 0x00, 0x00},
  103. func(t *testing.T, broker *Broker) {
  104. request := ProduceRequest{}
  105. request.RequiredAcks = WaitForLocal
  106. response, err := broker.Produce(&request)
  107. if err != nil {
  108. t.Error(err)
  109. }
  110. if response == nil {
  111. t.Error("Produce request without NoResponse got no response!")
  112. }
  113. }},
  114. {[]byte{0x00, 0x00, 0x00, 0x00},
  115. func(t *testing.T, broker *Broker) {
  116. request := FetchRequest{}
  117. response, err := broker.Fetch(&request)
  118. if err != nil {
  119. t.Error(err)
  120. }
  121. if response == nil {
  122. t.Error("Fetch request got no response!")
  123. }
  124. }},
  125. {[]byte{0x00, 0x00, 0x00, 0x00},
  126. func(t *testing.T, broker *Broker) {
  127. request := OffsetFetchRequest{}
  128. response, err := broker.FetchOffset(&request)
  129. if err != nil {
  130. t.Error(err)
  131. }
  132. if response == nil {
  133. t.Error("OffsetFetch request got no response!")
  134. }
  135. }},
  136. {[]byte{0x00, 0x00, 0x00, 0x00},
  137. func(t *testing.T, broker *Broker) {
  138. request := OffsetCommitRequest{}
  139. response, err := broker.CommitOffset(&request)
  140. if err != nil {
  141. t.Error(err)
  142. }
  143. if response == nil {
  144. t.Error("OffsetCommit request got no response!")
  145. }
  146. }},
  147. {[]byte{0x00, 0x00, 0x00, 0x00},
  148. func(t *testing.T, broker *Broker) {
  149. request := OffsetRequest{}
  150. response, err := broker.GetAvailableOffsets(&request)
  151. if err != nil {
  152. t.Error(err)
  153. }
  154. if response == nil {
  155. t.Error("Offset request got no response!")
  156. }
  157. }},
  158. {[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  159. func(t *testing.T, broker *Broker) {
  160. request := JoinGroupRequest{}
  161. response, err := broker.JoinGroup(&request)
  162. if err != nil {
  163. t.Error(err)
  164. }
  165. if response == nil {
  166. t.Error("JoinGroup request got no response!")
  167. }
  168. }},
  169. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  170. func(t *testing.T, broker *Broker) {
  171. request := SyncGroupRequest{}
  172. response, err := broker.SyncGroup(&request)
  173. if err != nil {
  174. t.Error(err)
  175. }
  176. if response == nil {
  177. t.Error("SyncGroup request got no response!")
  178. }
  179. }},
  180. {[]byte{0x00, 0x00},
  181. func(t *testing.T, broker *Broker) {
  182. request := LeaveGroupRequest{}
  183. response, err := broker.LeaveGroup(&request)
  184. if err != nil {
  185. t.Error(err)
  186. }
  187. if response == nil {
  188. t.Error("LeaveGroup request got no response!")
  189. }
  190. }},
  191. {[]byte{0x00, 0x00},
  192. func(t *testing.T, broker *Broker) {
  193. request := HeartbeatRequest{}
  194. response, err := broker.Heartbeat(&request)
  195. if err != nil {
  196. t.Error(err)
  197. }
  198. if response == nil {
  199. t.Error("Heartbeat request got no response!")
  200. }
  201. }},
  202. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  203. func(t *testing.T, broker *Broker) {
  204. request := ListGroupsRequest{}
  205. response, err := broker.ListGroups(&request)
  206. if err != nil {
  207. t.Error(err)
  208. }
  209. if response == nil {
  210. t.Error("ListGroups request got no response!")
  211. }
  212. }},
  213. {[]byte{0x00, 0x00, 0x00, 0x00},
  214. func(t *testing.T, broker *Broker) {
  215. request := DescribeGroupsRequest{}
  216. response, err := broker.DescribeGroups(&request)
  217. if err != nil {
  218. t.Error(err)
  219. }
  220. if response == nil {
  221. t.Error("DescribeGroups request got no response!")
  222. }
  223. }},
  224. }