broker_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func ExampleBroker() {
  8. broker := NewBroker("localhost:9092")
  9. err := broker.Open(nil)
  10. if err != nil {
  11. panic(err)
  12. }
  13. request := MetadataRequest{Topics: []string{"myTopic"}}
  14. response, err := broker.GetMetadata(&request)
  15. if err != nil {
  16. _ = broker.Close()
  17. panic(err)
  18. }
  19. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  20. if err = broker.Close(); err != nil {
  21. panic(err)
  22. }
  23. }
  24. type mockEncoder struct {
  25. bytes []byte
  26. }
  27. func (m mockEncoder) encode(pe packetEncoder) error {
  28. return pe.putRawBytes(m.bytes)
  29. }
  30. type brokerMetrics struct {
  31. bytesRead int
  32. bytesWritten int
  33. }
  34. func TestBrokerAccessors(t *testing.T) {
  35. broker := NewBroker("abc:123")
  36. if broker.ID() != -1 {
  37. t.Error("New broker didn't have an ID of -1.")
  38. }
  39. if broker.Addr() != "abc:123" {
  40. t.Error("New broker didn't have the correct address")
  41. }
  42. broker.id = 34
  43. if broker.ID() != 34 {
  44. t.Error("Manually setting broker ID did not take effect.")
  45. }
  46. }
  47. func TestSimpleBrokerCommunication(t *testing.T) {
  48. for _, tt := range brokerTestTable {
  49. Logger.Printf("Testing broker communication for %s", tt.name)
  50. mb := NewMockBroker(t, 0)
  51. mb.Returns(&mockEncoder{tt.response})
  52. pendingNotify := make(chan brokerMetrics)
  53. // Register a callback to be notified about successful requests
  54. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  55. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  56. })
  57. broker := NewBroker(mb.Addr())
  58. // Set the broker id in order to validate local broker metrics
  59. broker.id = 0
  60. conf := NewConfig()
  61. conf.Version = V0_10_0_0
  62. err := broker.Open(conf)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. tt.runner(t, broker)
  67. // Wait up to 500 ms for the remote broker to process the request and
  68. // notify us about the metrics
  69. timeout := 500 * time.Millisecond
  70. select {
  71. case mockBrokerMetrics := <-pendingNotify:
  72. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  73. case <-time.After(timeout):
  74. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  75. }
  76. mb.Close()
  77. err = broker.Close()
  78. if err != nil {
  79. t.Error(err)
  80. }
  81. }
  82. }
  83. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  84. var brokerTestTable = []struct {
  85. name string
  86. response []byte
  87. runner func(*testing.T, *Broker)
  88. }{
  89. {"MetadataRequest",
  90. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  91. func(t *testing.T, broker *Broker) {
  92. request := MetadataRequest{}
  93. response, err := broker.GetMetadata(&request)
  94. if err != nil {
  95. t.Error(err)
  96. }
  97. if response == nil {
  98. t.Error("Metadata request got no response!")
  99. }
  100. }},
  101. {"ConsumerMetadataRequest",
  102. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  103. func(t *testing.T, broker *Broker) {
  104. request := ConsumerMetadataRequest{}
  105. response, err := broker.GetConsumerMetadata(&request)
  106. if err != nil {
  107. t.Error(err)
  108. }
  109. if response == nil {
  110. t.Error("Consumer Metadata request got no response!")
  111. }
  112. }},
  113. {"ProduceRequest (NoResponse)",
  114. []byte{},
  115. func(t *testing.T, broker *Broker) {
  116. request := ProduceRequest{}
  117. request.RequiredAcks = NoResponse
  118. response, err := broker.Produce(&request)
  119. if err != nil {
  120. t.Error(err)
  121. }
  122. if response != nil {
  123. t.Error("Produce request with NoResponse got a response!")
  124. }
  125. }},
  126. {"ProduceRequest (WaitForLocal)",
  127. []byte{0x00, 0x00, 0x00, 0x00},
  128. func(t *testing.T, broker *Broker) {
  129. request := ProduceRequest{}
  130. request.RequiredAcks = WaitForLocal
  131. response, err := broker.Produce(&request)
  132. if err != nil {
  133. t.Error(err)
  134. }
  135. if response == nil {
  136. t.Error("Produce request without NoResponse got no response!")
  137. }
  138. }},
  139. {"FetchRequest",
  140. []byte{0x00, 0x00, 0x00, 0x00},
  141. func(t *testing.T, broker *Broker) {
  142. request := FetchRequest{}
  143. response, err := broker.Fetch(&request)
  144. if err != nil {
  145. t.Error(err)
  146. }
  147. if response == nil {
  148. t.Error("Fetch request got no response!")
  149. }
  150. }},
  151. {"OffsetFetchRequest",
  152. []byte{0x00, 0x00, 0x00, 0x00},
  153. func(t *testing.T, broker *Broker) {
  154. request := OffsetFetchRequest{}
  155. response, err := broker.FetchOffset(&request)
  156. if err != nil {
  157. t.Error(err)
  158. }
  159. if response == nil {
  160. t.Error("OffsetFetch request got no response!")
  161. }
  162. }},
  163. {"OffsetCommitRequest",
  164. []byte{0x00, 0x00, 0x00, 0x00},
  165. func(t *testing.T, broker *Broker) {
  166. request := OffsetCommitRequest{}
  167. response, err := broker.CommitOffset(&request)
  168. if err != nil {
  169. t.Error(err)
  170. }
  171. if response == nil {
  172. t.Error("OffsetCommit request got no response!")
  173. }
  174. }},
  175. {"OffsetRequest",
  176. []byte{0x00, 0x00, 0x00, 0x00},
  177. func(t *testing.T, broker *Broker) {
  178. request := OffsetRequest{}
  179. response, err := broker.GetAvailableOffsets(&request)
  180. if err != nil {
  181. t.Error(err)
  182. }
  183. if response == nil {
  184. t.Error("Offset request got no response!")
  185. }
  186. }},
  187. {"JoinGroupRequest",
  188. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  189. func(t *testing.T, broker *Broker) {
  190. request := JoinGroupRequest{}
  191. response, err := broker.JoinGroup(&request)
  192. if err != nil {
  193. t.Error(err)
  194. }
  195. if response == nil {
  196. t.Error("JoinGroup request got no response!")
  197. }
  198. }},
  199. {"SyncGroupRequest",
  200. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  201. func(t *testing.T, broker *Broker) {
  202. request := SyncGroupRequest{}
  203. response, err := broker.SyncGroup(&request)
  204. if err != nil {
  205. t.Error(err)
  206. }
  207. if response == nil {
  208. t.Error("SyncGroup request got no response!")
  209. }
  210. }},
  211. {"LeaveGroupRequest",
  212. []byte{0x00, 0x00},
  213. func(t *testing.T, broker *Broker) {
  214. request := LeaveGroupRequest{}
  215. response, err := broker.LeaveGroup(&request)
  216. if err != nil {
  217. t.Error(err)
  218. }
  219. if response == nil {
  220. t.Error("LeaveGroup request got no response!")
  221. }
  222. }},
  223. {"HeartbeatRequest",
  224. []byte{0x00, 0x00},
  225. func(t *testing.T, broker *Broker) {
  226. request := HeartbeatRequest{}
  227. response, err := broker.Heartbeat(&request)
  228. if err != nil {
  229. t.Error(err)
  230. }
  231. if response == nil {
  232. t.Error("Heartbeat request got no response!")
  233. }
  234. }},
  235. {"ListGroupsRequest",
  236. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  237. func(t *testing.T, broker *Broker) {
  238. request := ListGroupsRequest{}
  239. response, err := broker.ListGroups(&request)
  240. if err != nil {
  241. t.Error(err)
  242. }
  243. if response == nil {
  244. t.Error("ListGroups request got no response!")
  245. }
  246. }},
  247. {"DescribeGroupsRequest",
  248. []byte{0x00, 0x00, 0x00, 0x00},
  249. func(t *testing.T, broker *Broker) {
  250. request := DescribeGroupsRequest{}
  251. response, err := broker.DescribeGroups(&request)
  252. if err != nil {
  253. t.Error(err)
  254. }
  255. if response == nil {
  256. t.Error("DescribeGroups request got no response!")
  257. }
  258. }},
  259. {"ApiVersionsRequest",
  260. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  261. func(t *testing.T, broker *Broker) {
  262. request := ApiVersionsRequest{}
  263. response, err := broker.ApiVersions(&request)
  264. if err != nil {
  265. t.Error(err)
  266. }
  267. if response == nil {
  268. t.Error("ApiVersions request got no response!")
  269. }
  270. }},
  271. {"DeleteGroupsRequest",
  272. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  273. func(t *testing.T, broker *Broker) {
  274. request := DeleteGroupsRequest{}
  275. response, err := broker.DeleteGroups(&request)
  276. if err != nil {
  277. t.Error(err)
  278. }
  279. if response == nil {
  280. t.Error("DeleteGroups request got no response!")
  281. }
  282. }},
  283. }
  284. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  285. metricValidators := newMetricValidators()
  286. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  287. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  288. // Check that the number of bytes sent corresponds to what the mock broker received
  289. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  290. if mockBrokerBytesWritten == 0 {
  291. // This a ProduceRequest with NoResponse
  292. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  293. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  294. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  295. } else {
  296. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  297. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  298. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  299. }
  300. // Check that the number of bytes received corresponds to what the mock broker sent
  301. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  302. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  303. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  304. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  305. // Run the validators
  306. metricValidators.run(t, broker.conf.MetricRegistry)
  307. }