broker_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package sarama
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "testing"
  7. "time"
  8. )
  9. func ExampleBroker() {
  10. broker := NewBroker("localhost:9092")
  11. err := broker.Open(nil)
  12. if err != nil {
  13. panic(err)
  14. }
  15. request := MetadataRequest{Topics: []string{"myTopic"}}
  16. response, err := broker.GetMetadata(&request)
  17. if err != nil {
  18. _ = broker.Close()
  19. panic(err)
  20. }
  21. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  22. if err = broker.Close(); err != nil {
  23. panic(err)
  24. }
  25. }
  26. type mockEncoder struct {
  27. bytes []byte
  28. }
  29. func (m mockEncoder) encode(pe packetEncoder) error {
  30. return pe.putRawBytes(m.bytes)
  31. }
  32. type brokerMetrics struct {
  33. bytesRead int
  34. bytesWritten int
  35. }
  36. func TestBrokerAccessors(t *testing.T) {
  37. broker := NewBroker("abc:123")
  38. if broker.ID() != -1 {
  39. t.Error("New broker didn't have an ID of -1.")
  40. }
  41. if broker.Addr() != "abc:123" {
  42. t.Error("New broker didn't have the correct address")
  43. }
  44. if broker.Rack() != "" {
  45. t.Error("New broker didn't have an unknown rack.")
  46. }
  47. broker.id = 34
  48. if broker.ID() != 34 {
  49. t.Error("Manually setting broker ID did not take effect.")
  50. }
  51. rack := "dc1"
  52. broker.rack = &rack
  53. if broker.Rack() != rack {
  54. t.Error("Manually setting broker rack did not take effect.")
  55. }
  56. }
  57. func TestSimpleBrokerCommunication(t *testing.T) {
  58. for _, tt := range brokerTestTable {
  59. Logger.Printf("Testing broker communication for %s", tt.name)
  60. mb := NewMockBroker(t, 0)
  61. mb.Returns(&mockEncoder{tt.response})
  62. pendingNotify := make(chan brokerMetrics)
  63. // Register a callback to be notified about successful requests
  64. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  65. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  66. })
  67. broker := NewBroker(mb.Addr())
  68. // Set the broker id in order to validate local broker metrics
  69. broker.id = 0
  70. conf := NewConfig()
  71. conf.Version = tt.version
  72. err := broker.Open(conf)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. tt.runner(t, broker)
  77. // Wait up to 500 ms for the remote broker to process the request and
  78. // notify us about the metrics
  79. timeout := 500 * time.Millisecond
  80. select {
  81. case mockBrokerMetrics := <-pendingNotify:
  82. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  83. case <-time.After(timeout):
  84. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  85. }
  86. mb.Close()
  87. err = broker.Close()
  88. if err != nil {
  89. t.Error(err)
  90. }
  91. }
  92. }
  93. func TestReceiveSASLOAuthBearerServerResponse(t *testing.T) {
  94. testTable := []struct {
  95. name string
  96. buf []byte
  97. err error
  98. }{
  99. {"OK server response",
  100. []byte{
  101. 0, 0, 0, 14,
  102. 0, 0, 0, 0,
  103. 0, 0,
  104. 255, 255, // no error message
  105. 0, 0, 0, 2, 'o', 'k',
  106. },
  107. nil},
  108. {"SASL authentication failure response",
  109. []byte{
  110. 0, 0, 0, 19,
  111. 0, 0, 0, 0,
  112. 0, 58,
  113. 0, 3, 'e', 'r', 'r',
  114. 0, 0, 0, 4, 'f', 'a', 'i', 'l',
  115. },
  116. ErrSASLAuthenticationFailed},
  117. {"Truncated header",
  118. []byte{
  119. 0, 0, 0, 12,
  120. },
  121. io.ErrUnexpectedEOF},
  122. {"Truncated response message",
  123. []byte{
  124. 0, 0, 0, 12,
  125. 0, 0, 0, 0,
  126. 0, 0,
  127. },
  128. io.ErrUnexpectedEOF},
  129. }
  130. for _, test := range testTable {
  131. in, out := net.Pipe()
  132. defer out.Close()
  133. b := &Broker{conn: out}
  134. go func() {
  135. defer in.Close()
  136. if _, err := in.Write(test.buf); err != nil {
  137. t.Error(err)
  138. }
  139. }()
  140. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(0)
  141. if len(test.buf) != bytesRead {
  142. t.Errorf("[%s] Expected %d bytes read, got %d", test.name, len(test.buf), bytesRead)
  143. }
  144. if test.err != err {
  145. t.Errorf("[%s] Expected error %s, got %s", test.name, test.err, err)
  146. }
  147. }
  148. }
  149. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  150. var brokerTestTable = []struct {
  151. version KafkaVersion
  152. name string
  153. response []byte
  154. runner func(*testing.T, *Broker)
  155. }{
  156. {V0_10_0_0,
  157. "MetadataRequest",
  158. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  159. func(t *testing.T, broker *Broker) {
  160. request := MetadataRequest{}
  161. response, err := broker.GetMetadata(&request)
  162. if err != nil {
  163. t.Error(err)
  164. }
  165. if response == nil {
  166. t.Error("Metadata request got no response!")
  167. }
  168. }},
  169. {V0_10_0_0,
  170. "ConsumerMetadataRequest",
  171. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  172. func(t *testing.T, broker *Broker) {
  173. request := ConsumerMetadataRequest{}
  174. response, err := broker.GetConsumerMetadata(&request)
  175. if err != nil {
  176. t.Error(err)
  177. }
  178. if response == nil {
  179. t.Error("Consumer Metadata request got no response!")
  180. }
  181. }},
  182. {V0_10_0_0,
  183. "ProduceRequest (NoResponse)",
  184. []byte{},
  185. func(t *testing.T, broker *Broker) {
  186. request := ProduceRequest{}
  187. request.RequiredAcks = NoResponse
  188. response, err := broker.Produce(&request)
  189. if err != nil {
  190. t.Error(err)
  191. }
  192. if response != nil {
  193. t.Error("Produce request with NoResponse got a response!")
  194. }
  195. }},
  196. {V0_10_0_0,
  197. "ProduceRequest (WaitForLocal)",
  198. []byte{0x00, 0x00, 0x00, 0x00},
  199. func(t *testing.T, broker *Broker) {
  200. request := ProduceRequest{}
  201. request.RequiredAcks = WaitForLocal
  202. response, err := broker.Produce(&request)
  203. if err != nil {
  204. t.Error(err)
  205. }
  206. if response == nil {
  207. t.Error("Produce request without NoResponse got no response!")
  208. }
  209. }},
  210. {V0_10_0_0,
  211. "FetchRequest",
  212. []byte{0x00, 0x00, 0x00, 0x00},
  213. func(t *testing.T, broker *Broker) {
  214. request := FetchRequest{}
  215. response, err := broker.Fetch(&request)
  216. if err != nil {
  217. t.Error(err)
  218. }
  219. if response == nil {
  220. t.Error("Fetch request got no response!")
  221. }
  222. }},
  223. {V0_10_0_0,
  224. "OffsetFetchRequest",
  225. []byte{0x00, 0x00, 0x00, 0x00},
  226. func(t *testing.T, broker *Broker) {
  227. request := OffsetFetchRequest{}
  228. response, err := broker.FetchOffset(&request)
  229. if err != nil {
  230. t.Error(err)
  231. }
  232. if response == nil {
  233. t.Error("OffsetFetch request got no response!")
  234. }
  235. }},
  236. {V0_10_0_0,
  237. "OffsetCommitRequest",
  238. []byte{0x00, 0x00, 0x00, 0x00},
  239. func(t *testing.T, broker *Broker) {
  240. request := OffsetCommitRequest{}
  241. response, err := broker.CommitOffset(&request)
  242. if err != nil {
  243. t.Error(err)
  244. }
  245. if response == nil {
  246. t.Error("OffsetCommit request got no response!")
  247. }
  248. }},
  249. {V0_10_0_0,
  250. "OffsetRequest",
  251. []byte{0x00, 0x00, 0x00, 0x00},
  252. func(t *testing.T, broker *Broker) {
  253. request := OffsetRequest{}
  254. response, err := broker.GetAvailableOffsets(&request)
  255. if err != nil {
  256. t.Error(err)
  257. }
  258. if response == nil {
  259. t.Error("Offset request got no response!")
  260. }
  261. }},
  262. {V0_10_0_0,
  263. "JoinGroupRequest",
  264. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  265. func(t *testing.T, broker *Broker) {
  266. request := JoinGroupRequest{}
  267. response, err := broker.JoinGroup(&request)
  268. if err != nil {
  269. t.Error(err)
  270. }
  271. if response == nil {
  272. t.Error("JoinGroup request got no response!")
  273. }
  274. }},
  275. {V0_10_0_0,
  276. "SyncGroupRequest",
  277. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  278. func(t *testing.T, broker *Broker) {
  279. request := SyncGroupRequest{}
  280. response, err := broker.SyncGroup(&request)
  281. if err != nil {
  282. t.Error(err)
  283. }
  284. if response == nil {
  285. t.Error("SyncGroup request got no response!")
  286. }
  287. }},
  288. {V0_10_0_0,
  289. "LeaveGroupRequest",
  290. []byte{0x00, 0x00},
  291. func(t *testing.T, broker *Broker) {
  292. request := LeaveGroupRequest{}
  293. response, err := broker.LeaveGroup(&request)
  294. if err != nil {
  295. t.Error(err)
  296. }
  297. if response == nil {
  298. t.Error("LeaveGroup request got no response!")
  299. }
  300. }},
  301. {V0_10_0_0,
  302. "HeartbeatRequest",
  303. []byte{0x00, 0x00},
  304. func(t *testing.T, broker *Broker) {
  305. request := HeartbeatRequest{}
  306. response, err := broker.Heartbeat(&request)
  307. if err != nil {
  308. t.Error(err)
  309. }
  310. if response == nil {
  311. t.Error("Heartbeat request got no response!")
  312. }
  313. }},
  314. {V0_10_0_0,
  315. "ListGroupsRequest",
  316. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  317. func(t *testing.T, broker *Broker) {
  318. request := ListGroupsRequest{}
  319. response, err := broker.ListGroups(&request)
  320. if err != nil {
  321. t.Error(err)
  322. }
  323. if response == nil {
  324. t.Error("ListGroups request got no response!")
  325. }
  326. }},
  327. {V0_10_0_0,
  328. "DescribeGroupsRequest",
  329. []byte{0x00, 0x00, 0x00, 0x00},
  330. func(t *testing.T, broker *Broker) {
  331. request := DescribeGroupsRequest{}
  332. response, err := broker.DescribeGroups(&request)
  333. if err != nil {
  334. t.Error(err)
  335. }
  336. if response == nil {
  337. t.Error("DescribeGroups request got no response!")
  338. }
  339. }},
  340. {V0_10_0_0,
  341. "ApiVersionsRequest",
  342. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  343. func(t *testing.T, broker *Broker) {
  344. request := ApiVersionsRequest{}
  345. response, err := broker.ApiVersions(&request)
  346. if err != nil {
  347. t.Error(err)
  348. }
  349. if response == nil {
  350. t.Error("ApiVersions request got no response!")
  351. }
  352. }},
  353. {V1_1_0_0,
  354. "DeleteGroupsRequest",
  355. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  356. func(t *testing.T, broker *Broker) {
  357. request := DeleteGroupsRequest{}
  358. response, err := broker.DeleteGroups(&request)
  359. if err != nil {
  360. t.Error(err)
  361. }
  362. if response == nil {
  363. t.Error("DeleteGroups request got no response!")
  364. }
  365. }},
  366. }
  367. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  368. metricValidators := newMetricValidators()
  369. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  370. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  371. // Check that the number of bytes sent corresponds to what the mock broker received
  372. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  373. if mockBrokerBytesWritten == 0 {
  374. // This a ProduceRequest with NoResponse
  375. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  376. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  377. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  378. } else {
  379. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  380. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  381. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  382. }
  383. // Check that the number of bytes received corresponds to what the mock broker sent
  384. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  385. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  386. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  387. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  388. // Run the validators
  389. metricValidators.run(t, broker.conf.MetricRegistry)
  390. }