broker_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package sarama
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "testing"
  7. "time"
  8. )
  9. func ExampleBroker() {
  10. broker := NewBroker("localhost:9092")
  11. err := broker.Open(nil)
  12. if err != nil {
  13. panic(err)
  14. }
  15. request := MetadataRequest{Topics: []string{"myTopic"}}
  16. response, err := broker.GetMetadata(&request)
  17. if err != nil {
  18. _ = broker.Close()
  19. panic(err)
  20. }
  21. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  22. if err = broker.Close(); err != nil {
  23. panic(err)
  24. }
  25. }
  26. type mockEncoder struct {
  27. bytes []byte
  28. }
  29. func (m mockEncoder) encode(pe packetEncoder) error {
  30. return pe.putRawBytes(m.bytes)
  31. }
  32. type brokerMetrics struct {
  33. bytesRead int
  34. bytesWritten int
  35. }
  36. func TestBrokerAccessors(t *testing.T) {
  37. broker := NewBroker("abc:123")
  38. if broker.ID() != -1 {
  39. t.Error("New broker didn't have an ID of -1.")
  40. }
  41. if broker.Addr() != "abc:123" {
  42. t.Error("New broker didn't have the correct address")
  43. }
  44. if broker.Rack() != "" {
  45. t.Error("New broker didn't have an unknown rack.")
  46. }
  47. broker.id = 34
  48. if broker.ID() != 34 {
  49. t.Error("Manually setting broker ID did not take effect.")
  50. }
  51. rack := "dc1"
  52. broker.rack = &rack
  53. if broker.Rack() != rack {
  54. t.Error("Manually setting broker rack did not take effect.")
  55. }
  56. }
  57. func TestSimpleBrokerCommunication(t *testing.T) {
  58. for _, tt := range brokerTestTable {
  59. Logger.Printf("Testing broker communication for %s", tt.name)
  60. mb := NewMockBroker(t, 0)
  61. mb.Returns(&mockEncoder{tt.response})
  62. pendingNotify := make(chan brokerMetrics)
  63. // Register a callback to be notified about successful requests
  64. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  65. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  66. })
  67. broker := NewBroker(mb.Addr())
  68. // Set the broker id in order to validate local broker metrics
  69. broker.id = 0
  70. conf := NewConfig()
  71. conf.Version = tt.version
  72. err := broker.Open(conf)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. tt.runner(t, broker)
  77. // Wait up to 500 ms for the remote broker to process the request and
  78. // notify us about the metrics
  79. timeout := 500 * time.Millisecond
  80. select {
  81. case mockBrokerMetrics := <-pendingNotify:
  82. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  83. case <-time.After(timeout):
  84. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  85. }
  86. mb.Close()
  87. err = broker.Close()
  88. if err != nil {
  89. t.Error(err)
  90. }
  91. }
  92. }
  93. func TestReceiveSASLOAuthBearerServerResponse(t *testing.T) {
  94. testTable := []struct {
  95. name string
  96. buf []byte
  97. err error
  98. }{
  99. {"OK server response",
  100. []byte{
  101. 0, 0, 0, 14,
  102. 0, 0, 0, 0,
  103. 0, 0,
  104. 255, 255, // no error message
  105. 0, 0, 0, 2, 'o', 'k',
  106. },
  107. nil},
  108. {"SASL authentication failure response",
  109. []byte{
  110. 0, 0, 0, 19,
  111. 0, 0, 0, 0,
  112. 0, 58,
  113. 0, 3, 'e', 'r', 'r',
  114. 0, 0, 0, 4, 'f', 'a', 'i', 'l',
  115. },
  116. ErrSASLAuthenticationFailed},
  117. {"Truncated header",
  118. []byte{
  119. 0, 0, 0, 12,
  120. },
  121. io.ErrUnexpectedEOF},
  122. {"Truncated response message",
  123. []byte{
  124. 0, 0, 0, 12,
  125. 0, 0, 0, 0,
  126. 0, 0,
  127. },
  128. io.ErrUnexpectedEOF},
  129. }
  130. for i, test := range testTable {
  131. in, out := net.Pipe()
  132. defer func() {
  133. if err := out.Close(); err != nil {
  134. t.Error(err)
  135. }
  136. }()
  137. b := &Broker{conn: out}
  138. go func() {
  139. defer func() {
  140. if err := in.Close(); err != nil {
  141. t.Error(err)
  142. }
  143. }()
  144. if _, err := in.Write(test.buf); err != nil {
  145. t.Error(err)
  146. }
  147. }()
  148. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(0)
  149. if len(test.buf) != bytesRead {
  150. t.Errorf("[%d]:[%s] Expected %d bytes read, got %d\n", i, test.name, len(test.buf), bytesRead)
  151. }
  152. if test.err != err {
  153. t.Errorf("[%d]:[%s] Expected %s error, got %s\n", i, test.name, test.err, err)
  154. }
  155. }
  156. }
  157. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  158. var brokerTestTable = []struct {
  159. version KafkaVersion
  160. name string
  161. response []byte
  162. runner func(*testing.T, *Broker)
  163. }{
  164. {V0_10_0_0,
  165. "MetadataRequest",
  166. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  167. func(t *testing.T, broker *Broker) {
  168. request := MetadataRequest{}
  169. response, err := broker.GetMetadata(&request)
  170. if err != nil {
  171. t.Error(err)
  172. }
  173. if response == nil {
  174. t.Error("Metadata request got no response!")
  175. }
  176. }},
  177. {V0_10_0_0,
  178. "ConsumerMetadataRequest",
  179. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  180. func(t *testing.T, broker *Broker) {
  181. request := ConsumerMetadataRequest{}
  182. response, err := broker.GetConsumerMetadata(&request)
  183. if err != nil {
  184. t.Error(err)
  185. }
  186. if response == nil {
  187. t.Error("Consumer Metadata request got no response!")
  188. }
  189. }},
  190. {V0_10_0_0,
  191. "ProduceRequest (NoResponse)",
  192. []byte{},
  193. func(t *testing.T, broker *Broker) {
  194. request := ProduceRequest{}
  195. request.RequiredAcks = NoResponse
  196. response, err := broker.Produce(&request)
  197. if err != nil {
  198. t.Error(err)
  199. }
  200. if response != nil {
  201. t.Error("Produce request with NoResponse got a response!")
  202. }
  203. }},
  204. {V0_10_0_0,
  205. "ProduceRequest (WaitForLocal)",
  206. []byte{0x00, 0x00, 0x00, 0x00},
  207. func(t *testing.T, broker *Broker) {
  208. request := ProduceRequest{}
  209. request.RequiredAcks = WaitForLocal
  210. response, err := broker.Produce(&request)
  211. if err != nil {
  212. t.Error(err)
  213. }
  214. if response == nil {
  215. t.Error("Produce request without NoResponse got no response!")
  216. }
  217. }},
  218. {V0_10_0_0,
  219. "FetchRequest",
  220. []byte{0x00, 0x00, 0x00, 0x00},
  221. func(t *testing.T, broker *Broker) {
  222. request := FetchRequest{}
  223. response, err := broker.Fetch(&request)
  224. if err != nil {
  225. t.Error(err)
  226. }
  227. if response == nil {
  228. t.Error("Fetch request got no response!")
  229. }
  230. }},
  231. {V0_10_0_0,
  232. "OffsetFetchRequest",
  233. []byte{0x00, 0x00, 0x00, 0x00},
  234. func(t *testing.T, broker *Broker) {
  235. request := OffsetFetchRequest{}
  236. response, err := broker.FetchOffset(&request)
  237. if err != nil {
  238. t.Error(err)
  239. }
  240. if response == nil {
  241. t.Error("OffsetFetch request got no response!")
  242. }
  243. }},
  244. {V0_10_0_0,
  245. "OffsetCommitRequest",
  246. []byte{0x00, 0x00, 0x00, 0x00},
  247. func(t *testing.T, broker *Broker) {
  248. request := OffsetCommitRequest{}
  249. response, err := broker.CommitOffset(&request)
  250. if err != nil {
  251. t.Error(err)
  252. }
  253. if response == nil {
  254. t.Error("OffsetCommit request got no response!")
  255. }
  256. }},
  257. {V0_10_0_0,
  258. "OffsetRequest",
  259. []byte{0x00, 0x00, 0x00, 0x00},
  260. func(t *testing.T, broker *Broker) {
  261. request := OffsetRequest{}
  262. response, err := broker.GetAvailableOffsets(&request)
  263. if err != nil {
  264. t.Error(err)
  265. }
  266. if response == nil {
  267. t.Error("Offset request got no response!")
  268. }
  269. }},
  270. {V0_10_0_0,
  271. "JoinGroupRequest",
  272. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  273. func(t *testing.T, broker *Broker) {
  274. request := JoinGroupRequest{}
  275. response, err := broker.JoinGroup(&request)
  276. if err != nil {
  277. t.Error(err)
  278. }
  279. if response == nil {
  280. t.Error("JoinGroup request got no response!")
  281. }
  282. }},
  283. {V0_10_0_0,
  284. "SyncGroupRequest",
  285. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  286. func(t *testing.T, broker *Broker) {
  287. request := SyncGroupRequest{}
  288. response, err := broker.SyncGroup(&request)
  289. if err != nil {
  290. t.Error(err)
  291. }
  292. if response == nil {
  293. t.Error("SyncGroup request got no response!")
  294. }
  295. }},
  296. {V0_10_0_0,
  297. "LeaveGroupRequest",
  298. []byte{0x00, 0x00},
  299. func(t *testing.T, broker *Broker) {
  300. request := LeaveGroupRequest{}
  301. response, err := broker.LeaveGroup(&request)
  302. if err != nil {
  303. t.Error(err)
  304. }
  305. if response == nil {
  306. t.Error("LeaveGroup request got no response!")
  307. }
  308. }},
  309. {V0_10_0_0,
  310. "HeartbeatRequest",
  311. []byte{0x00, 0x00},
  312. func(t *testing.T, broker *Broker) {
  313. request := HeartbeatRequest{}
  314. response, err := broker.Heartbeat(&request)
  315. if err != nil {
  316. t.Error(err)
  317. }
  318. if response == nil {
  319. t.Error("Heartbeat request got no response!")
  320. }
  321. }},
  322. {V0_10_0_0,
  323. "ListGroupsRequest",
  324. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  325. func(t *testing.T, broker *Broker) {
  326. request := ListGroupsRequest{}
  327. response, err := broker.ListGroups(&request)
  328. if err != nil {
  329. t.Error(err)
  330. }
  331. if response == nil {
  332. t.Error("ListGroups request got no response!")
  333. }
  334. }},
  335. {V0_10_0_0,
  336. "DescribeGroupsRequest",
  337. []byte{0x00, 0x00, 0x00, 0x00},
  338. func(t *testing.T, broker *Broker) {
  339. request := DescribeGroupsRequest{}
  340. response, err := broker.DescribeGroups(&request)
  341. if err != nil {
  342. t.Error(err)
  343. }
  344. if response == nil {
  345. t.Error("DescribeGroups request got no response!")
  346. }
  347. }},
  348. {V0_10_0_0,
  349. "ApiVersionsRequest",
  350. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  351. func(t *testing.T, broker *Broker) {
  352. request := ApiVersionsRequest{}
  353. response, err := broker.ApiVersions(&request)
  354. if err != nil {
  355. t.Error(err)
  356. }
  357. if response == nil {
  358. t.Error("ApiVersions request got no response!")
  359. }
  360. }},
  361. {V1_1_0_0,
  362. "DeleteGroupsRequest",
  363. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  364. func(t *testing.T, broker *Broker) {
  365. request := DeleteGroupsRequest{}
  366. response, err := broker.DeleteGroups(&request)
  367. if err != nil {
  368. t.Error(err)
  369. }
  370. if response == nil {
  371. t.Error("DeleteGroups request got no response!")
  372. }
  373. }},
  374. }
  375. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  376. metricValidators := newMetricValidators()
  377. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  378. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  379. // Check that the number of bytes sent corresponds to what the mock broker received
  380. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  381. if mockBrokerBytesWritten == 0 {
  382. // This a ProduceRequest with NoResponse
  383. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  384. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  385. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  386. } else {
  387. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  388. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  389. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  390. }
  391. // Check that the number of bytes received corresponds to what the mock broker sent
  392. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  393. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  394. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  395. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  396. // Run the validators
  397. metricValidators.run(t, broker.conf.MetricRegistry)
  398. }