broker_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func ExampleBroker() {
  8. broker := NewBroker("localhost:9092")
  9. err := broker.Open(nil)
  10. if err != nil {
  11. panic(err)
  12. }
  13. request := MetadataRequest{Topics: []string{"myTopic"}}
  14. response, err := broker.GetMetadata(&request)
  15. if err != nil {
  16. _ = broker.Close()
  17. panic(err)
  18. }
  19. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  20. if err = broker.Close(); err != nil {
  21. panic(err)
  22. }
  23. }
  24. type mockEncoder struct {
  25. bytes []byte
  26. }
  27. func (m mockEncoder) encode(pe packetEncoder) error {
  28. return pe.putRawBytes(m.bytes)
  29. }
  30. type brokerMetrics struct {
  31. bytesRead int
  32. bytesWritten int
  33. }
  34. func TestBrokerAccessors(t *testing.T) {
  35. broker := NewBroker("abc:123")
  36. if broker.ID() != -1 {
  37. t.Error("New broker didn't have an ID of -1.")
  38. }
  39. if broker.Addr() != "abc:123" {
  40. t.Error("New broker didn't have the correct address")
  41. }
  42. broker.id = 34
  43. if broker.ID() != 34 {
  44. t.Error("Manually setting broker ID did not take effect.")
  45. }
  46. }
  47. func TestSimpleBrokerCommunication(t *testing.T) {
  48. for _, tt := range brokerTestTable {
  49. Logger.Printf("Testing broker communication for %s", tt.name)
  50. mb := NewMockBroker(t, 0)
  51. mb.Returns(&mockEncoder{tt.response})
  52. pendingNotify := make(chan brokerMetrics)
  53. // Register a callback to be notified about successful requests
  54. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  55. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  56. })
  57. broker := NewBroker(mb.Addr())
  58. // Set the broker id in order to validate local broker metrics
  59. broker.id = 0
  60. conf := NewConfig()
  61. conf.Version = tt.version
  62. err := broker.Open(conf)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. tt.runner(t, broker)
  67. // Wait up to 500 ms for the remote broker to process the request and
  68. // notify us about the metrics
  69. timeout := 500 * time.Millisecond
  70. select {
  71. case mockBrokerMetrics := <-pendingNotify:
  72. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  73. case <-time.After(timeout):
  74. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  75. }
  76. mb.Close()
  77. err = broker.Close()
  78. if err != nil {
  79. t.Error(err)
  80. }
  81. }
  82. }
  83. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  84. var brokerTestTable = []struct {
  85. version KafkaVersion
  86. name string
  87. response []byte
  88. runner func(*testing.T, *Broker)
  89. }{
  90. {V0_10_0_0,
  91. "MetadataRequest",
  92. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  93. func(t *testing.T, broker *Broker) {
  94. request := MetadataRequest{}
  95. response, err := broker.GetMetadata(&request)
  96. if err != nil {
  97. t.Error(err)
  98. }
  99. if response == nil {
  100. t.Error("Metadata request got no response!")
  101. }
  102. }},
  103. {V0_10_0_0,
  104. "ConsumerMetadataRequest",
  105. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  106. func(t *testing.T, broker *Broker) {
  107. request := ConsumerMetadataRequest{}
  108. response, err := broker.GetConsumerMetadata(&request)
  109. if err != nil {
  110. t.Error(err)
  111. }
  112. if response == nil {
  113. t.Error("Consumer Metadata request got no response!")
  114. }
  115. }},
  116. {V0_10_0_0,
  117. "ProduceRequest (NoResponse)",
  118. []byte{},
  119. func(t *testing.T, broker *Broker) {
  120. request := ProduceRequest{}
  121. request.RequiredAcks = NoResponse
  122. response, err := broker.Produce(&request)
  123. if err != nil {
  124. t.Error(err)
  125. }
  126. if response != nil {
  127. t.Error("Produce request with NoResponse got a response!")
  128. }
  129. }},
  130. {V0_10_0_0,
  131. "ProduceRequest (WaitForLocal)",
  132. []byte{0x00, 0x00, 0x00, 0x00},
  133. func(t *testing.T, broker *Broker) {
  134. request := ProduceRequest{}
  135. request.RequiredAcks = WaitForLocal
  136. response, err := broker.Produce(&request)
  137. if err != nil {
  138. t.Error(err)
  139. }
  140. if response == nil {
  141. t.Error("Produce request without NoResponse got no response!")
  142. }
  143. }},
  144. {V0_10_0_0,
  145. "FetchRequest",
  146. []byte{0x00, 0x00, 0x00, 0x00},
  147. func(t *testing.T, broker *Broker) {
  148. request := FetchRequest{}
  149. response, err := broker.Fetch(&request)
  150. if err != nil {
  151. t.Error(err)
  152. }
  153. if response == nil {
  154. t.Error("Fetch request got no response!")
  155. }
  156. }},
  157. {V0_10_0_0,
  158. "OffsetFetchRequest",
  159. []byte{0x00, 0x00, 0x00, 0x00},
  160. func(t *testing.T, broker *Broker) {
  161. request := OffsetFetchRequest{}
  162. response, err := broker.FetchOffset(&request)
  163. if err != nil {
  164. t.Error(err)
  165. }
  166. if response == nil {
  167. t.Error("OffsetFetch request got no response!")
  168. }
  169. }},
  170. {V0_10_0_0,
  171. "OffsetCommitRequest",
  172. []byte{0x00, 0x00, 0x00, 0x00},
  173. func(t *testing.T, broker *Broker) {
  174. request := OffsetCommitRequest{}
  175. response, err := broker.CommitOffset(&request)
  176. if err != nil {
  177. t.Error(err)
  178. }
  179. if response == nil {
  180. t.Error("OffsetCommit request got no response!")
  181. }
  182. }},
  183. {V0_10_0_0,
  184. "OffsetRequest",
  185. []byte{0x00, 0x00, 0x00, 0x00},
  186. func(t *testing.T, broker *Broker) {
  187. request := OffsetRequest{}
  188. response, err := broker.GetAvailableOffsets(&request)
  189. if err != nil {
  190. t.Error(err)
  191. }
  192. if response == nil {
  193. t.Error("Offset request got no response!")
  194. }
  195. }},
  196. {V0_10_0_0,
  197. "JoinGroupRequest",
  198. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  199. func(t *testing.T, broker *Broker) {
  200. request := JoinGroupRequest{}
  201. response, err := broker.JoinGroup(&request)
  202. if err != nil {
  203. t.Error(err)
  204. }
  205. if response == nil {
  206. t.Error("JoinGroup request got no response!")
  207. }
  208. }},
  209. {V0_10_0_0,
  210. "SyncGroupRequest",
  211. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  212. func(t *testing.T, broker *Broker) {
  213. request := SyncGroupRequest{}
  214. response, err := broker.SyncGroup(&request)
  215. if err != nil {
  216. t.Error(err)
  217. }
  218. if response == nil {
  219. t.Error("SyncGroup request got no response!")
  220. }
  221. }},
  222. {V0_10_0_0,
  223. "LeaveGroupRequest",
  224. []byte{0x00, 0x00},
  225. func(t *testing.T, broker *Broker) {
  226. request := LeaveGroupRequest{}
  227. response, err := broker.LeaveGroup(&request)
  228. if err != nil {
  229. t.Error(err)
  230. }
  231. if response == nil {
  232. t.Error("LeaveGroup request got no response!")
  233. }
  234. }},
  235. {V0_10_0_0,
  236. "HeartbeatRequest",
  237. []byte{0x00, 0x00},
  238. func(t *testing.T, broker *Broker) {
  239. request := HeartbeatRequest{}
  240. response, err := broker.Heartbeat(&request)
  241. if err != nil {
  242. t.Error(err)
  243. }
  244. if response == nil {
  245. t.Error("Heartbeat request got no response!")
  246. }
  247. }},
  248. {V0_10_0_0,
  249. "ListGroupsRequest",
  250. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  251. func(t *testing.T, broker *Broker) {
  252. request := ListGroupsRequest{}
  253. response, err := broker.ListGroups(&request)
  254. if err != nil {
  255. t.Error(err)
  256. }
  257. if response == nil {
  258. t.Error("ListGroups request got no response!")
  259. }
  260. }},
  261. {V0_10_0_0,
  262. "DescribeGroupsRequest",
  263. []byte{0x00, 0x00, 0x00, 0x00},
  264. func(t *testing.T, broker *Broker) {
  265. request := DescribeGroupsRequest{}
  266. response, err := broker.DescribeGroups(&request)
  267. if err != nil {
  268. t.Error(err)
  269. }
  270. if response == nil {
  271. t.Error("DescribeGroups request got no response!")
  272. }
  273. }},
  274. {V0_10_0_0,
  275. "ApiVersionsRequest",
  276. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  277. func(t *testing.T, broker *Broker) {
  278. request := ApiVersionsRequest{}
  279. response, err := broker.ApiVersions(&request)
  280. if err != nil {
  281. t.Error(err)
  282. }
  283. if response == nil {
  284. t.Error("ApiVersions request got no response!")
  285. }
  286. }},
  287. {V1_1_0_0,
  288. "DeleteGroupsRequest",
  289. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  290. func(t *testing.T, broker *Broker) {
  291. request := DeleteGroupsRequest{}
  292. response, err := broker.DeleteGroups(&request)
  293. if err != nil {
  294. t.Error(err)
  295. }
  296. if response == nil {
  297. t.Error("DeleteGroups request got no response!")
  298. }
  299. }},
  300. }
  301. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  302. metricValidators := newMetricValidators()
  303. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  304. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  305. // Check that the number of bytes sent corresponds to what the mock broker received
  306. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  307. if mockBrokerBytesWritten == 0 {
  308. // This a ProduceRequest with NoResponse
  309. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  310. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  311. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  312. } else {
  313. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  314. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  315. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  316. }
  317. // Check that the number of bytes received corresponds to what the mock broker sent
  318. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  319. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  320. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  321. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  322. // Run the validators
  323. metricValidators.run(t, broker.conf.MetricRegistry)
  324. }