broker_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func ExampleBroker() {
  8. broker := NewBroker("localhost:9092")
  9. err := broker.Open(nil)
  10. if err != nil {
  11. panic(err)
  12. }
  13. request := MetadataRequest{Topics: []string{"myTopic"}}
  14. response, err := broker.GetMetadata(&request)
  15. if err != nil {
  16. _ = broker.Close()
  17. panic(err)
  18. }
  19. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  20. if err = broker.Close(); err != nil {
  21. panic(err)
  22. }
  23. }
  24. type mockEncoder struct {
  25. bytes []byte
  26. }
  27. func (m mockEncoder) encode(pe packetEncoder) error {
  28. return pe.putRawBytes(m.bytes)
  29. }
  30. type brokerMetrics struct {
  31. bytesRead int
  32. bytesWritten int
  33. }
  34. func TestBrokerAccessors(t *testing.T) {
  35. broker := NewBroker("abc:123")
  36. if broker.ID() != -1 {
  37. t.Error("New broker didn't have an ID of -1.")
  38. }
  39. if broker.Addr() != "abc:123" {
  40. t.Error("New broker didn't have the correct address")
  41. }
  42. if broker.Rack() != "" {
  43. t.Error("New broker didn't have an unknown rack.")
  44. }
  45. broker.id = 34
  46. if broker.ID() != 34 {
  47. t.Error("Manually setting broker ID did not take effect.")
  48. }
  49. rack := "dc1"
  50. broker.rack = &rack
  51. if broker.Rack() != rack {
  52. t.Error("Manually setting broker rack did not take effect.")
  53. }
  54. }
  55. func TestSimpleBrokerCommunication(t *testing.T) {
  56. for _, tt := range brokerTestTable {
  57. Logger.Printf("Testing broker communication for %s", tt.name)
  58. mb := NewMockBroker(t, 0)
  59. mb.Returns(&mockEncoder{tt.response})
  60. pendingNotify := make(chan brokerMetrics)
  61. // Register a callback to be notified about successful requests
  62. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  63. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  64. })
  65. broker := NewBroker(mb.Addr())
  66. // Set the broker id in order to validate local broker metrics
  67. broker.id = 0
  68. conf := NewConfig()
  69. conf.Version = tt.version
  70. err := broker.Open(conf)
  71. if err != nil {
  72. t.Fatal(err)
  73. }
  74. tt.runner(t, broker)
  75. // Wait up to 500 ms for the remote broker to process the request and
  76. // notify us about the metrics
  77. timeout := 500 * time.Millisecond
  78. select {
  79. case mockBrokerMetrics := <-pendingNotify:
  80. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  81. case <-time.After(timeout):
  82. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  83. }
  84. mb.Close()
  85. err = broker.Close()
  86. if err != nil {
  87. t.Error(err)
  88. }
  89. }
  90. }
  91. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  92. var brokerTestTable = []struct {
  93. version KafkaVersion
  94. name string
  95. response []byte
  96. runner func(*testing.T, *Broker)
  97. }{
  98. {V0_10_0_0,
  99. "MetadataRequest",
  100. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  101. func(t *testing.T, broker *Broker) {
  102. request := MetadataRequest{}
  103. response, err := broker.GetMetadata(&request)
  104. if err != nil {
  105. t.Error(err)
  106. }
  107. if response == nil {
  108. t.Error("Metadata request got no response!")
  109. }
  110. }},
  111. {V0_10_0_0,
  112. "ConsumerMetadataRequest",
  113. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  114. func(t *testing.T, broker *Broker) {
  115. request := ConsumerMetadataRequest{}
  116. response, err := broker.GetConsumerMetadata(&request)
  117. if err != nil {
  118. t.Error(err)
  119. }
  120. if response == nil {
  121. t.Error("Consumer Metadata request got no response!")
  122. }
  123. }},
  124. {V0_10_0_0,
  125. "ProduceRequest (NoResponse)",
  126. []byte{},
  127. func(t *testing.T, broker *Broker) {
  128. request := ProduceRequest{}
  129. request.RequiredAcks = NoResponse
  130. response, err := broker.Produce(&request)
  131. if err != nil {
  132. t.Error(err)
  133. }
  134. if response != nil {
  135. t.Error("Produce request with NoResponse got a response!")
  136. }
  137. }},
  138. {V0_10_0_0,
  139. "ProduceRequest (WaitForLocal)",
  140. []byte{0x00, 0x00, 0x00, 0x00},
  141. func(t *testing.T, broker *Broker) {
  142. request := ProduceRequest{}
  143. request.RequiredAcks = WaitForLocal
  144. response, err := broker.Produce(&request)
  145. if err != nil {
  146. t.Error(err)
  147. }
  148. if response == nil {
  149. t.Error("Produce request without NoResponse got no response!")
  150. }
  151. }},
  152. {V0_10_0_0,
  153. "FetchRequest",
  154. []byte{0x00, 0x00, 0x00, 0x00},
  155. func(t *testing.T, broker *Broker) {
  156. request := FetchRequest{}
  157. response, err := broker.Fetch(&request)
  158. if err != nil {
  159. t.Error(err)
  160. }
  161. if response == nil {
  162. t.Error("Fetch request got no response!")
  163. }
  164. }},
  165. {V0_10_0_0,
  166. "OffsetFetchRequest",
  167. []byte{0x00, 0x00, 0x00, 0x00},
  168. func(t *testing.T, broker *Broker) {
  169. request := OffsetFetchRequest{}
  170. response, err := broker.FetchOffset(&request)
  171. if err != nil {
  172. t.Error(err)
  173. }
  174. if response == nil {
  175. t.Error("OffsetFetch request got no response!")
  176. }
  177. }},
  178. {V0_10_0_0,
  179. "OffsetCommitRequest",
  180. []byte{0x00, 0x00, 0x00, 0x00},
  181. func(t *testing.T, broker *Broker) {
  182. request := OffsetCommitRequest{}
  183. response, err := broker.CommitOffset(&request)
  184. if err != nil {
  185. t.Error(err)
  186. }
  187. if response == nil {
  188. t.Error("OffsetCommit request got no response!")
  189. }
  190. }},
  191. {V0_10_0_0,
  192. "OffsetRequest",
  193. []byte{0x00, 0x00, 0x00, 0x00},
  194. func(t *testing.T, broker *Broker) {
  195. request := OffsetRequest{}
  196. response, err := broker.GetAvailableOffsets(&request)
  197. if err != nil {
  198. t.Error(err)
  199. }
  200. if response == nil {
  201. t.Error("Offset request got no response!")
  202. }
  203. }},
  204. {V0_10_0_0,
  205. "JoinGroupRequest",
  206. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  207. func(t *testing.T, broker *Broker) {
  208. request := JoinGroupRequest{}
  209. response, err := broker.JoinGroup(&request)
  210. if err != nil {
  211. t.Error(err)
  212. }
  213. if response == nil {
  214. t.Error("JoinGroup request got no response!")
  215. }
  216. }},
  217. {V0_10_0_0,
  218. "SyncGroupRequest",
  219. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  220. func(t *testing.T, broker *Broker) {
  221. request := SyncGroupRequest{}
  222. response, err := broker.SyncGroup(&request)
  223. if err != nil {
  224. t.Error(err)
  225. }
  226. if response == nil {
  227. t.Error("SyncGroup request got no response!")
  228. }
  229. }},
  230. {V0_10_0_0,
  231. "LeaveGroupRequest",
  232. []byte{0x00, 0x00},
  233. func(t *testing.T, broker *Broker) {
  234. request := LeaveGroupRequest{}
  235. response, err := broker.LeaveGroup(&request)
  236. if err != nil {
  237. t.Error(err)
  238. }
  239. if response == nil {
  240. t.Error("LeaveGroup request got no response!")
  241. }
  242. }},
  243. {V0_10_0_0,
  244. "HeartbeatRequest",
  245. []byte{0x00, 0x00},
  246. func(t *testing.T, broker *Broker) {
  247. request := HeartbeatRequest{}
  248. response, err := broker.Heartbeat(&request)
  249. if err != nil {
  250. t.Error(err)
  251. }
  252. if response == nil {
  253. t.Error("Heartbeat request got no response!")
  254. }
  255. }},
  256. {V0_10_0_0,
  257. "ListGroupsRequest",
  258. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  259. func(t *testing.T, broker *Broker) {
  260. request := ListGroupsRequest{}
  261. response, err := broker.ListGroups(&request)
  262. if err != nil {
  263. t.Error(err)
  264. }
  265. if response == nil {
  266. t.Error("ListGroups request got no response!")
  267. }
  268. }},
  269. {V0_10_0_0,
  270. "DescribeGroupsRequest",
  271. []byte{0x00, 0x00, 0x00, 0x00},
  272. func(t *testing.T, broker *Broker) {
  273. request := DescribeGroupsRequest{}
  274. response, err := broker.DescribeGroups(&request)
  275. if err != nil {
  276. t.Error(err)
  277. }
  278. if response == nil {
  279. t.Error("DescribeGroups request got no response!")
  280. }
  281. }},
  282. {V0_10_0_0,
  283. "ApiVersionsRequest",
  284. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  285. func(t *testing.T, broker *Broker) {
  286. request := ApiVersionsRequest{}
  287. response, err := broker.ApiVersions(&request)
  288. if err != nil {
  289. t.Error(err)
  290. }
  291. if response == nil {
  292. t.Error("ApiVersions request got no response!")
  293. }
  294. }},
  295. {V1_1_0_0,
  296. "DeleteGroupsRequest",
  297. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  298. func(t *testing.T, broker *Broker) {
  299. request := DeleteGroupsRequest{}
  300. response, err := broker.DeleteGroups(&request)
  301. if err != nil {
  302. t.Error(err)
  303. }
  304. if response == nil {
  305. t.Error("DeleteGroups request got no response!")
  306. }
  307. }},
  308. }
  309. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  310. metricValidators := newMetricValidators()
  311. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  312. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  313. // Check that the number of bytes sent corresponds to what the mock broker received
  314. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  315. if mockBrokerBytesWritten == 0 {
  316. // This a ProduceRequest with NoResponse
  317. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  318. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  319. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  320. } else {
  321. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  322. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  323. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  324. }
  325. // Check that the number of bytes received corresponds to what the mock broker sent
  326. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  327. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  328. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  329. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  330. // Run the validators
  331. metricValidators.run(t, broker.conf.MetricRegistry)
  332. }