broker_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. )
  11. func ExampleBroker() {
  12. broker := NewBroker("localhost:9092")
  13. err := broker.Open(nil)
  14. if err != nil {
  15. panic(err)
  16. }
  17. request := MetadataRequest{Topics: []string{"myTopic"}}
  18. response, err := broker.GetMetadata(&request)
  19. if err != nil {
  20. _ = broker.Close()
  21. panic(err)
  22. }
  23. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  24. if err = broker.Close(); err != nil {
  25. panic(err)
  26. }
  27. }
  28. type mockEncoder struct {
  29. bytes []byte
  30. }
  31. func (m mockEncoder) encode(pe packetEncoder) error {
  32. return pe.putRawBytes(m.bytes)
  33. }
  34. type brokerMetrics struct {
  35. bytesRead int
  36. bytesWritten int
  37. }
  38. func TestBrokerAccessors(t *testing.T) {
  39. broker := NewBroker("abc:123")
  40. if broker.ID() != -1 {
  41. t.Error("New broker didn't have an ID of -1.")
  42. }
  43. if broker.Addr() != "abc:123" {
  44. t.Error("New broker didn't have the correct address")
  45. }
  46. if broker.Rack() != "" {
  47. t.Error("New broker didn't have an unknown rack.")
  48. }
  49. broker.id = 34
  50. if broker.ID() != 34 {
  51. t.Error("Manually setting broker ID did not take effect.")
  52. }
  53. rack := "dc1"
  54. broker.rack = &rack
  55. if broker.Rack() != rack {
  56. t.Error("Manually setting broker rack did not take effect.")
  57. }
  58. }
  59. func TestSimpleBrokerCommunication(t *testing.T) {
  60. for _, tt := range brokerTestTable {
  61. Logger.Printf("Testing broker communication for %s", tt.name)
  62. mb := NewMockBroker(t, 0)
  63. mb.Returns(&mockEncoder{tt.response})
  64. pendingNotify := make(chan brokerMetrics)
  65. // Register a callback to be notified about successful requests
  66. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  67. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  68. })
  69. broker := NewBroker(mb.Addr())
  70. // Set the broker id in order to validate local broker metrics
  71. broker.id = 0
  72. conf := NewConfig()
  73. conf.Version = tt.version
  74. err := broker.Open(conf)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. tt.runner(t, broker)
  79. // Wait up to 500 ms for the remote broker to process the request and
  80. // notify us about the metrics
  81. timeout := 500 * time.Millisecond
  82. select {
  83. case mockBrokerMetrics := <-pendingNotify:
  84. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  85. case <-time.After(timeout):
  86. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  87. }
  88. mb.Close()
  89. err = broker.Close()
  90. if err != nil {
  91. t.Error(err)
  92. }
  93. }
  94. }
  95. var ErrTokenFailure = errors.New("Failure generating token")
  96. type TokenProvider struct {
  97. accessToken *AccessToken
  98. err error
  99. }
  100. func (t *TokenProvider) Token() (*AccessToken, error) {
  101. return t.accessToken, t.err
  102. }
  103. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  104. return &TokenProvider{
  105. accessToken: token,
  106. err: err,
  107. }
  108. }
  109. func TestSASLOAuthBearer(t *testing.T) {
  110. testTable := []struct {
  111. name string
  112. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  113. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  114. expectClientErr bool // Expect an internal client-side error
  115. tokProvider *TokenProvider
  116. }{
  117. {
  118. name: "SASL/OAUTHBEARER OK server response",
  119. mockAuthErr: ErrNoError,
  120. mockHandshakeErr: ErrNoError,
  121. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  122. },
  123. {
  124. name: "SASL/OAUTHBEARER authentication failure response",
  125. mockAuthErr: ErrSASLAuthenticationFailed,
  126. mockHandshakeErr: ErrNoError,
  127. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  128. },
  129. {
  130. name: "SASL/OAUTHBEARER handshake failure response",
  131. mockAuthErr: ErrNoError,
  132. mockHandshakeErr: ErrSASLAuthenticationFailed,
  133. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  134. },
  135. {
  136. name: "SASL/OAUTHBEARER token generation error",
  137. mockAuthErr: ErrNoError,
  138. mockHandshakeErr: ErrNoError,
  139. expectClientErr: true,
  140. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  141. },
  142. {
  143. name: "SASL/OAUTHBEARER invalid extension",
  144. mockAuthErr: ErrNoError,
  145. mockHandshakeErr: ErrNoError,
  146. expectClientErr: true,
  147. tokProvider: newTokenProvider(&AccessToken{
  148. Token: "access-token-123",
  149. Extensions: map[string]string{"auth": "auth-value"},
  150. }, nil),
  151. },
  152. }
  153. for i, test := range testTable {
  154. // mockBroker mocks underlying network logic and broker responses
  155. mockBroker := NewMockBroker(t, 0)
  156. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  157. SetAuthBytes([]byte(`response_payload`))
  158. if test.mockAuthErr != ErrNoError {
  159. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  160. }
  161. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  162. SetEnabledMechanisms([]string{SASLTypeOAuth})
  163. if test.mockHandshakeErr != ErrNoError {
  164. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  165. }
  166. mockBroker.SetHandlerByMap(map[string]MockResponse{
  167. "SaslAuthenticateRequest": mockSASLAuthResponse,
  168. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  169. })
  170. // broker executes SASL requests against mockBroker
  171. broker := NewBroker(mockBroker.Addr())
  172. broker.requestRate = metrics.NilMeter{}
  173. broker.outgoingByteRate = metrics.NilMeter{}
  174. broker.incomingByteRate = metrics.NilMeter{}
  175. broker.requestSize = metrics.NilHistogram{}
  176. broker.responseSize = metrics.NilHistogram{}
  177. broker.responseRate = metrics.NilMeter{}
  178. broker.requestLatency = metrics.NilHistogram{}
  179. conf := NewConfig()
  180. conf.Net.SASL.Mechanism = SASLTypeOAuth
  181. conf.Net.SASL.TokenProvider = test.tokProvider
  182. broker.conf = conf
  183. dialer := net.Dialer{
  184. Timeout: conf.Net.DialTimeout,
  185. KeepAlive: conf.Net.KeepAlive,
  186. LocalAddr: conf.Net.LocalAddr,
  187. }
  188. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  189. if err != nil {
  190. t.Fatal(err)
  191. }
  192. broker.conn = conn
  193. err = broker.authenticateViaSASL()
  194. if test.mockAuthErr != ErrNoError {
  195. if test.mockAuthErr != err {
  196. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  197. }
  198. } else if test.mockHandshakeErr != ErrNoError {
  199. if test.mockHandshakeErr != err {
  200. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  201. }
  202. } else if test.expectClientErr && err == nil {
  203. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  204. } else if !test.expectClientErr && err != nil {
  205. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  206. }
  207. mockBroker.Close()
  208. }
  209. }
  210. func TestBuildClientInitialResponse(t *testing.T) {
  211. testTable := []struct {
  212. name string
  213. token *AccessToken
  214. expected []byte
  215. expectError bool
  216. }{
  217. {
  218. name: "Build SASL client initial response with two extensions",
  219. token: &AccessToken{
  220. Token: "the-token",
  221. Extensions: map[string]string{
  222. "x": "1",
  223. "y": "2",
  224. },
  225. },
  226. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  227. },
  228. {
  229. name: "Build SASL client initial response with no extensions",
  230. token: &AccessToken{Token: "the-token"},
  231. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  232. },
  233. {
  234. name: "Build SASL client initial response using reserved extension",
  235. token: &AccessToken{
  236. Token: "the-token",
  237. Extensions: map[string]string{
  238. "auth": "auth-value",
  239. },
  240. },
  241. expected: []byte(""),
  242. expectError: true,
  243. },
  244. }
  245. for i, test := range testTable {
  246. actual, err := buildClientInitialResponse(test.token)
  247. if !reflect.DeepEqual(test.expected, actual) {
  248. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  249. }
  250. if test.expectError && err == nil {
  251. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  252. }
  253. if !test.expectError && err != nil {
  254. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  255. }
  256. }
  257. }
  258. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  259. var brokerTestTable = []struct {
  260. version KafkaVersion
  261. name string
  262. response []byte
  263. runner func(*testing.T, *Broker)
  264. }{
  265. {V0_10_0_0,
  266. "MetadataRequest",
  267. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  268. func(t *testing.T, broker *Broker) {
  269. request := MetadataRequest{}
  270. response, err := broker.GetMetadata(&request)
  271. if err != nil {
  272. t.Error(err)
  273. }
  274. if response == nil {
  275. t.Error("Metadata request got no response!")
  276. }
  277. }},
  278. {V0_10_0_0,
  279. "ConsumerMetadataRequest",
  280. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  281. func(t *testing.T, broker *Broker) {
  282. request := ConsumerMetadataRequest{}
  283. response, err := broker.GetConsumerMetadata(&request)
  284. if err != nil {
  285. t.Error(err)
  286. }
  287. if response == nil {
  288. t.Error("Consumer Metadata request got no response!")
  289. }
  290. }},
  291. {V0_10_0_0,
  292. "ProduceRequest (NoResponse)",
  293. []byte{},
  294. func(t *testing.T, broker *Broker) {
  295. request := ProduceRequest{}
  296. request.RequiredAcks = NoResponse
  297. response, err := broker.Produce(&request)
  298. if err != nil {
  299. t.Error(err)
  300. }
  301. if response != nil {
  302. t.Error("Produce request with NoResponse got a response!")
  303. }
  304. }},
  305. {V0_10_0_0,
  306. "ProduceRequest (WaitForLocal)",
  307. []byte{0x00, 0x00, 0x00, 0x00},
  308. func(t *testing.T, broker *Broker) {
  309. request := ProduceRequest{}
  310. request.RequiredAcks = WaitForLocal
  311. response, err := broker.Produce(&request)
  312. if err != nil {
  313. t.Error(err)
  314. }
  315. if response == nil {
  316. t.Error("Produce request without NoResponse got no response!")
  317. }
  318. }},
  319. {V0_10_0_0,
  320. "FetchRequest",
  321. []byte{0x00, 0x00, 0x00, 0x00},
  322. func(t *testing.T, broker *Broker) {
  323. request := FetchRequest{}
  324. response, err := broker.Fetch(&request)
  325. if err != nil {
  326. t.Error(err)
  327. }
  328. if response == nil {
  329. t.Error("Fetch request got no response!")
  330. }
  331. }},
  332. {V0_10_0_0,
  333. "OffsetFetchRequest",
  334. []byte{0x00, 0x00, 0x00, 0x00},
  335. func(t *testing.T, broker *Broker) {
  336. request := OffsetFetchRequest{}
  337. response, err := broker.FetchOffset(&request)
  338. if err != nil {
  339. t.Error(err)
  340. }
  341. if response == nil {
  342. t.Error("OffsetFetch request got no response!")
  343. }
  344. }},
  345. {V0_10_0_0,
  346. "OffsetCommitRequest",
  347. []byte{0x00, 0x00, 0x00, 0x00},
  348. func(t *testing.T, broker *Broker) {
  349. request := OffsetCommitRequest{}
  350. response, err := broker.CommitOffset(&request)
  351. if err != nil {
  352. t.Error(err)
  353. }
  354. if response == nil {
  355. t.Error("OffsetCommit request got no response!")
  356. }
  357. }},
  358. {V0_10_0_0,
  359. "OffsetRequest",
  360. []byte{0x00, 0x00, 0x00, 0x00},
  361. func(t *testing.T, broker *Broker) {
  362. request := OffsetRequest{}
  363. response, err := broker.GetAvailableOffsets(&request)
  364. if err != nil {
  365. t.Error(err)
  366. }
  367. if response == nil {
  368. t.Error("Offset request got no response!")
  369. }
  370. }},
  371. {V0_10_0_0,
  372. "JoinGroupRequest",
  373. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  374. func(t *testing.T, broker *Broker) {
  375. request := JoinGroupRequest{}
  376. response, err := broker.JoinGroup(&request)
  377. if err != nil {
  378. t.Error(err)
  379. }
  380. if response == nil {
  381. t.Error("JoinGroup request got no response!")
  382. }
  383. }},
  384. {V0_10_0_0,
  385. "SyncGroupRequest",
  386. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  387. func(t *testing.T, broker *Broker) {
  388. request := SyncGroupRequest{}
  389. response, err := broker.SyncGroup(&request)
  390. if err != nil {
  391. t.Error(err)
  392. }
  393. if response == nil {
  394. t.Error("SyncGroup request got no response!")
  395. }
  396. }},
  397. {V0_10_0_0,
  398. "LeaveGroupRequest",
  399. []byte{0x00, 0x00},
  400. func(t *testing.T, broker *Broker) {
  401. request := LeaveGroupRequest{}
  402. response, err := broker.LeaveGroup(&request)
  403. if err != nil {
  404. t.Error(err)
  405. }
  406. if response == nil {
  407. t.Error("LeaveGroup request got no response!")
  408. }
  409. }},
  410. {V0_10_0_0,
  411. "HeartbeatRequest",
  412. []byte{0x00, 0x00},
  413. func(t *testing.T, broker *Broker) {
  414. request := HeartbeatRequest{}
  415. response, err := broker.Heartbeat(&request)
  416. if err != nil {
  417. t.Error(err)
  418. }
  419. if response == nil {
  420. t.Error("Heartbeat request got no response!")
  421. }
  422. }},
  423. {V0_10_0_0,
  424. "ListGroupsRequest",
  425. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  426. func(t *testing.T, broker *Broker) {
  427. request := ListGroupsRequest{}
  428. response, err := broker.ListGroups(&request)
  429. if err != nil {
  430. t.Error(err)
  431. }
  432. if response == nil {
  433. t.Error("ListGroups request got no response!")
  434. }
  435. }},
  436. {V0_10_0_0,
  437. "DescribeGroupsRequest",
  438. []byte{0x00, 0x00, 0x00, 0x00},
  439. func(t *testing.T, broker *Broker) {
  440. request := DescribeGroupsRequest{}
  441. response, err := broker.DescribeGroups(&request)
  442. if err != nil {
  443. t.Error(err)
  444. }
  445. if response == nil {
  446. t.Error("DescribeGroups request got no response!")
  447. }
  448. }},
  449. {V0_10_0_0,
  450. "ApiVersionsRequest",
  451. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  452. func(t *testing.T, broker *Broker) {
  453. request := ApiVersionsRequest{}
  454. response, err := broker.ApiVersions(&request)
  455. if err != nil {
  456. t.Error(err)
  457. }
  458. if response == nil {
  459. t.Error("ApiVersions request got no response!")
  460. }
  461. }},
  462. {V1_1_0_0,
  463. "DeleteGroupsRequest",
  464. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  465. func(t *testing.T, broker *Broker) {
  466. request := DeleteGroupsRequest{}
  467. response, err := broker.DeleteGroups(&request)
  468. if err != nil {
  469. t.Error(err)
  470. }
  471. if response == nil {
  472. t.Error("DeleteGroups request got no response!")
  473. }
  474. }},
  475. }
  476. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  477. metricValidators := newMetricValidators()
  478. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  479. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  480. // Check that the number of bytes sent corresponds to what the mock broker received
  481. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  482. if mockBrokerBytesWritten == 0 {
  483. // This a ProduceRequest with NoResponse
  484. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  485. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  486. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  487. } else {
  488. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  489. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  490. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  491. }
  492. // Check that the number of bytes received corresponds to what the mock broker sent
  493. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  494. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  495. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  496. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  497. // Run the validators
  498. metricValidators.run(t, broker.conf.MetricRegistry)
  499. }