broker_test.go 19 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. )
  11. func ExampleBroker() {
  12. broker := NewBroker("localhost:9092")
  13. err := broker.Open(nil)
  14. if err != nil {
  15. panic(err)
  16. }
  17. request := MetadataRequest{Topics: []string{"myTopic"}}
  18. response, err := broker.GetMetadata(&request)
  19. if err != nil {
  20. _ = broker.Close()
  21. panic(err)
  22. }
  23. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  24. if err = broker.Close(); err != nil {
  25. panic(err)
  26. }
  27. }
  28. type mockEncoder struct {
  29. bytes []byte
  30. }
  31. func (m mockEncoder) encode(pe packetEncoder) error {
  32. return pe.putRawBytes(m.bytes)
  33. }
  34. type brokerMetrics struct {
  35. bytesRead int
  36. bytesWritten int
  37. }
  38. func TestBrokerAccessors(t *testing.T) {
  39. broker := NewBroker("abc:123")
  40. if broker.ID() != -1 {
  41. t.Error("New broker didn't have an ID of -1.")
  42. }
  43. if broker.Addr() != "abc:123" {
  44. t.Error("New broker didn't have the correct address")
  45. }
  46. if broker.Rack() != "" {
  47. t.Error("New broker didn't have an unknown rack.")
  48. }
  49. broker.id = 34
  50. if broker.ID() != 34 {
  51. t.Error("Manually setting broker ID did not take effect.")
  52. }
  53. rack := "dc1"
  54. broker.rack = &rack
  55. if broker.Rack() != rack {
  56. t.Error("Manually setting broker rack did not take effect.")
  57. }
  58. }
  59. func TestSimpleBrokerCommunication(t *testing.T) {
  60. for _, tt := range brokerTestTable {
  61. Logger.Printf("Testing broker communication for %s", tt.name)
  62. mb := NewMockBroker(t, 0)
  63. mb.Returns(&mockEncoder{tt.response})
  64. pendingNotify := make(chan brokerMetrics)
  65. // Register a callback to be notified about successful requests
  66. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  67. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  68. })
  69. broker := NewBroker(mb.Addr())
  70. // Set the broker id in order to validate local broker metrics
  71. broker.id = 0
  72. conf := NewConfig()
  73. conf.Version = tt.version
  74. err := broker.Open(conf)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. tt.runner(t, broker)
  79. // Wait up to 500 ms for the remote broker to process the request and
  80. // notify us about the metrics
  81. timeout := 500 * time.Millisecond
  82. select {
  83. case mockBrokerMetrics := <-pendingNotify:
  84. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  85. case <-time.After(timeout):
  86. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  87. }
  88. mb.Close()
  89. err = broker.Close()
  90. if err != nil {
  91. t.Error(err)
  92. }
  93. }
  94. }
  95. var ErrTokenFailure = errors.New("Failure generating token")
  96. type TokenProvider struct {
  97. accessToken *AccessToken
  98. err error
  99. }
  100. func (t *TokenProvider) Token() (*AccessToken, error) {
  101. return t.accessToken, t.err
  102. }
  103. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  104. return &TokenProvider{
  105. accessToken: token,
  106. err: err,
  107. }
  108. }
  109. func TestSASLOAuthBearer(t *testing.T) {
  110. testTable := []struct {
  111. name string
  112. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  113. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  114. expectClientErr bool // Expect an internal client-side error
  115. tokProvider *TokenProvider
  116. }{
  117. {
  118. name: "SASL/OAUTHBEARER OK server response",
  119. mockAuthErr: ErrNoError,
  120. mockHandshakeErr: ErrNoError,
  121. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  122. },
  123. {
  124. name: "SASL/OAUTHBEARER authentication failure response",
  125. mockAuthErr: ErrSASLAuthenticationFailed,
  126. mockHandshakeErr: ErrNoError,
  127. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  128. },
  129. {
  130. name: "SASL/OAUTHBEARER handshake failure response",
  131. mockAuthErr: ErrNoError,
  132. mockHandshakeErr: ErrSASLAuthenticationFailed,
  133. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  134. },
  135. {
  136. name: "SASL/OAUTHBEARER token generation error",
  137. mockAuthErr: ErrNoError,
  138. mockHandshakeErr: ErrNoError,
  139. expectClientErr: true,
  140. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  141. },
  142. {
  143. name: "SASL/OAUTHBEARER invalid extension",
  144. mockAuthErr: ErrNoError,
  145. mockHandshakeErr: ErrNoError,
  146. expectClientErr: true,
  147. tokProvider: newTokenProvider(&AccessToken{
  148. Token: "access-token-123",
  149. Extensions: map[string]string{"auth": "auth-value"},
  150. }, nil),
  151. },
  152. }
  153. for i, test := range testTable {
  154. // mockBroker mocks underlying network logic and broker responses
  155. mockBroker := NewMockBroker(t, 0)
  156. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte("response_payload"))
  157. if test.mockAuthErr != ErrNoError {
  158. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  159. }
  160. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeOAuth})
  161. if test.mockHandshakeErr != ErrNoError {
  162. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  163. }
  164. mockBroker.SetHandlerByMap(map[string]MockResponse{
  165. "SaslAuthenticateRequest": mockSASLAuthResponse,
  166. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  167. })
  168. // broker executes SASL requests against mockBroker
  169. broker := NewBroker(mockBroker.Addr())
  170. broker.requestRate = metrics.NilMeter{}
  171. broker.outgoingByteRate = metrics.NilMeter{}
  172. broker.incomingByteRate = metrics.NilMeter{}
  173. broker.requestSize = metrics.NilHistogram{}
  174. broker.responseSize = metrics.NilHistogram{}
  175. broker.responseRate = metrics.NilMeter{}
  176. broker.requestLatency = metrics.NilHistogram{}
  177. conf := NewConfig()
  178. conf.Net.SASL.Mechanism = SASLTypeOAuth
  179. conf.Net.SASL.TokenProvider = test.tokProvider
  180. broker.conf = conf
  181. dialer := net.Dialer{
  182. Timeout: conf.Net.DialTimeout,
  183. KeepAlive: conf.Net.KeepAlive,
  184. LocalAddr: conf.Net.LocalAddr,
  185. }
  186. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  187. if err != nil {
  188. t.Fatal(err)
  189. }
  190. broker.conn = conn
  191. err = broker.authenticateViaSASL()
  192. if test.mockAuthErr != ErrNoError {
  193. if test.mockAuthErr != err {
  194. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  195. }
  196. } else if test.mockHandshakeErr != ErrNoError {
  197. if test.mockHandshakeErr != err {
  198. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  199. }
  200. } else if test.expectClientErr && err == nil {
  201. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  202. } else if !test.expectClientErr && err != nil {
  203. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  204. }
  205. mockBroker.Close()
  206. }
  207. }
  208. // A mock scram client.
  209. type MockSCRAMClient struct {
  210. done bool
  211. }
  212. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  213. return nil
  214. }
  215. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  216. if challenge == "" {
  217. return "ping", nil
  218. }
  219. if challenge == "pong" {
  220. m.done = true
  221. return "", nil
  222. }
  223. return "", errors.New("failed to authenticate :(")
  224. }
  225. func (m *MockSCRAMClient) Done() bool {
  226. return m.done
  227. }
  228. var _ SCRAMClient = &MockSCRAMClient{}
  229. func TestSASLSCRAMSHAXXX(t *testing.T) {
  230. testTable := []struct {
  231. name string
  232. mockHandshakeErr KError
  233. mockSASLAuthErr KError
  234. expectClientErr bool
  235. scramClient *MockSCRAMClient
  236. scramChallengeResp string
  237. }{
  238. {
  239. name: "SASL/SCRAMSHAXXX successfull authentication",
  240. mockHandshakeErr: ErrNoError,
  241. scramClient: &MockSCRAMClient{},
  242. scramChallengeResp: "pong",
  243. },
  244. {
  245. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  246. mockHandshakeErr: ErrNoError,
  247. mockSASLAuthErr: ErrNoError,
  248. scramClient: &MockSCRAMClient{},
  249. scramChallengeResp: "gong",
  250. expectClientErr: true,
  251. },
  252. {
  253. name: "SASL/SCRAMSHAXXX server authentication error",
  254. mockHandshakeErr: ErrNoError,
  255. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  256. scramClient: &MockSCRAMClient{},
  257. scramChallengeResp: "pong",
  258. },
  259. {
  260. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  261. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  262. mockSASLAuthErr: ErrNoError,
  263. scramClient: &MockSCRAMClient{},
  264. scramChallengeResp: "pong",
  265. },
  266. }
  267. for i, test := range testTable {
  268. // mockBroker mocks underlying network logic and broker responses
  269. mockBroker := NewMockBroker(t, 0)
  270. broker := NewBroker(mockBroker.Addr())
  271. // broker executes SASL requests against mockBroker
  272. broker.requestRate = metrics.NilMeter{}
  273. broker.outgoingByteRate = metrics.NilMeter{}
  274. broker.incomingByteRate = metrics.NilMeter{}
  275. broker.requestSize = metrics.NilHistogram{}
  276. broker.responseSize = metrics.NilHistogram{}
  277. broker.responseRate = metrics.NilMeter{}
  278. broker.requestLatency = metrics.NilHistogram{}
  279. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  280. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  281. if test.mockSASLAuthErr != ErrNoError {
  282. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  283. }
  284. if test.mockHandshakeErr != ErrNoError {
  285. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  286. }
  287. mockBroker.SetHandlerByMap(map[string]MockResponse{
  288. "SaslAuthenticateRequest": mockSASLAuthResponse,
  289. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  290. })
  291. conf := NewConfig()
  292. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  293. conf.Net.SASL.SCRAMClient = test.scramClient
  294. broker.conf = conf
  295. dialer := net.Dialer{
  296. Timeout: conf.Net.DialTimeout,
  297. KeepAlive: conf.Net.KeepAlive,
  298. LocalAddr: conf.Net.LocalAddr,
  299. }
  300. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. broker.conn = conn
  305. err = broker.authenticateViaSASL()
  306. if test.mockSASLAuthErr != ErrNoError {
  307. if test.mockSASLAuthErr != err {
  308. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  309. }
  310. } else if test.mockHandshakeErr != ErrNoError {
  311. if test.mockHandshakeErr != err {
  312. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  313. }
  314. } else if test.expectClientErr && err == nil {
  315. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  316. } else if !test.expectClientErr && err != nil {
  317. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  318. }
  319. mockBroker.Close()
  320. }
  321. }
  322. func TestBuildClientInitialResponse(t *testing.T) {
  323. testTable := []struct {
  324. name string
  325. token *AccessToken
  326. expected []byte
  327. expectError bool
  328. }{
  329. {
  330. name: "Build SASL client initial response with two extensions",
  331. token: &AccessToken{
  332. Token: "the-token",
  333. Extensions: map[string]string{
  334. "x": "1",
  335. "y": "2",
  336. },
  337. },
  338. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  339. },
  340. {
  341. name: "Build SASL client initial response with no extensions",
  342. token: &AccessToken{Token: "the-token"},
  343. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  344. },
  345. {
  346. name: "Build SASL client initial response using reserved extension",
  347. token: &AccessToken{
  348. Token: "the-token",
  349. Extensions: map[string]string{
  350. "auth": "auth-value",
  351. },
  352. },
  353. expected: []byte(""),
  354. expectError: true,
  355. },
  356. }
  357. for i, test := range testTable {
  358. actual, err := buildClientInitialResponse(test.token)
  359. if !reflect.DeepEqual(test.expected, actual) {
  360. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  361. }
  362. if test.expectError && err == nil {
  363. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  364. }
  365. if !test.expectError && err != nil {
  366. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  367. }
  368. }
  369. }
  370. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  371. var brokerTestTable = []struct {
  372. version KafkaVersion
  373. name string
  374. response []byte
  375. runner func(*testing.T, *Broker)
  376. }{
  377. {V0_10_0_0,
  378. "MetadataRequest",
  379. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  380. func(t *testing.T, broker *Broker) {
  381. request := MetadataRequest{}
  382. response, err := broker.GetMetadata(&request)
  383. if err != nil {
  384. t.Error(err)
  385. }
  386. if response == nil {
  387. t.Error("Metadata request got no response!")
  388. }
  389. }},
  390. {V0_10_0_0,
  391. "ConsumerMetadataRequest",
  392. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  393. func(t *testing.T, broker *Broker) {
  394. request := ConsumerMetadataRequest{}
  395. response, err := broker.GetConsumerMetadata(&request)
  396. if err != nil {
  397. t.Error(err)
  398. }
  399. if response == nil {
  400. t.Error("Consumer Metadata request got no response!")
  401. }
  402. }},
  403. {V0_10_0_0,
  404. "ProduceRequest (NoResponse)",
  405. []byte{},
  406. func(t *testing.T, broker *Broker) {
  407. request := ProduceRequest{}
  408. request.RequiredAcks = NoResponse
  409. response, err := broker.Produce(&request)
  410. if err != nil {
  411. t.Error(err)
  412. }
  413. if response != nil {
  414. t.Error("Produce request with NoResponse got a response!")
  415. }
  416. }},
  417. {V0_10_0_0,
  418. "ProduceRequest (WaitForLocal)",
  419. []byte{0x00, 0x00, 0x00, 0x00},
  420. func(t *testing.T, broker *Broker) {
  421. request := ProduceRequest{}
  422. request.RequiredAcks = WaitForLocal
  423. response, err := broker.Produce(&request)
  424. if err != nil {
  425. t.Error(err)
  426. }
  427. if response == nil {
  428. t.Error("Produce request without NoResponse got no response!")
  429. }
  430. }},
  431. {V0_10_0_0,
  432. "FetchRequest",
  433. []byte{0x00, 0x00, 0x00, 0x00},
  434. func(t *testing.T, broker *Broker) {
  435. request := FetchRequest{}
  436. response, err := broker.Fetch(&request)
  437. if err != nil {
  438. t.Error(err)
  439. }
  440. if response == nil {
  441. t.Error("Fetch request got no response!")
  442. }
  443. }},
  444. {V0_10_0_0,
  445. "OffsetFetchRequest",
  446. []byte{0x00, 0x00, 0x00, 0x00},
  447. func(t *testing.T, broker *Broker) {
  448. request := OffsetFetchRequest{}
  449. response, err := broker.FetchOffset(&request)
  450. if err != nil {
  451. t.Error(err)
  452. }
  453. if response == nil {
  454. t.Error("OffsetFetch request got no response!")
  455. }
  456. }},
  457. {V0_10_0_0,
  458. "OffsetCommitRequest",
  459. []byte{0x00, 0x00, 0x00, 0x00},
  460. func(t *testing.T, broker *Broker) {
  461. request := OffsetCommitRequest{}
  462. response, err := broker.CommitOffset(&request)
  463. if err != nil {
  464. t.Error(err)
  465. }
  466. if response == nil {
  467. t.Error("OffsetCommit request got no response!")
  468. }
  469. }},
  470. {V0_10_0_0,
  471. "OffsetRequest",
  472. []byte{0x00, 0x00, 0x00, 0x00},
  473. func(t *testing.T, broker *Broker) {
  474. request := OffsetRequest{}
  475. response, err := broker.GetAvailableOffsets(&request)
  476. if err != nil {
  477. t.Error(err)
  478. }
  479. if response == nil {
  480. t.Error("Offset request got no response!")
  481. }
  482. }},
  483. {V0_10_0_0,
  484. "JoinGroupRequest",
  485. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  486. func(t *testing.T, broker *Broker) {
  487. request := JoinGroupRequest{}
  488. response, err := broker.JoinGroup(&request)
  489. if err != nil {
  490. t.Error(err)
  491. }
  492. if response == nil {
  493. t.Error("JoinGroup request got no response!")
  494. }
  495. }},
  496. {V0_10_0_0,
  497. "SyncGroupRequest",
  498. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  499. func(t *testing.T, broker *Broker) {
  500. request := SyncGroupRequest{}
  501. response, err := broker.SyncGroup(&request)
  502. if err != nil {
  503. t.Error(err)
  504. }
  505. if response == nil {
  506. t.Error("SyncGroup request got no response!")
  507. }
  508. }},
  509. {V0_10_0_0,
  510. "LeaveGroupRequest",
  511. []byte{0x00, 0x00},
  512. func(t *testing.T, broker *Broker) {
  513. request := LeaveGroupRequest{}
  514. response, err := broker.LeaveGroup(&request)
  515. if err != nil {
  516. t.Error(err)
  517. }
  518. if response == nil {
  519. t.Error("LeaveGroup request got no response!")
  520. }
  521. }},
  522. {V0_10_0_0,
  523. "HeartbeatRequest",
  524. []byte{0x00, 0x00},
  525. func(t *testing.T, broker *Broker) {
  526. request := HeartbeatRequest{}
  527. response, err := broker.Heartbeat(&request)
  528. if err != nil {
  529. t.Error(err)
  530. }
  531. if response == nil {
  532. t.Error("Heartbeat request got no response!")
  533. }
  534. }},
  535. {V0_10_0_0,
  536. "ListGroupsRequest",
  537. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  538. func(t *testing.T, broker *Broker) {
  539. request := ListGroupsRequest{}
  540. response, err := broker.ListGroups(&request)
  541. if err != nil {
  542. t.Error(err)
  543. }
  544. if response == nil {
  545. t.Error("ListGroups request got no response!")
  546. }
  547. }},
  548. {V0_10_0_0,
  549. "DescribeGroupsRequest",
  550. []byte{0x00, 0x00, 0x00, 0x00},
  551. func(t *testing.T, broker *Broker) {
  552. request := DescribeGroupsRequest{}
  553. response, err := broker.DescribeGroups(&request)
  554. if err != nil {
  555. t.Error(err)
  556. }
  557. if response == nil {
  558. t.Error("DescribeGroups request got no response!")
  559. }
  560. }},
  561. {V0_10_0_0,
  562. "ApiVersionsRequest",
  563. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  564. func(t *testing.T, broker *Broker) {
  565. request := ApiVersionsRequest{}
  566. response, err := broker.ApiVersions(&request)
  567. if err != nil {
  568. t.Error(err)
  569. }
  570. if response == nil {
  571. t.Error("ApiVersions request got no response!")
  572. }
  573. }},
  574. {V1_1_0_0,
  575. "DeleteGroupsRequest",
  576. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  577. func(t *testing.T, broker *Broker) {
  578. request := DeleteGroupsRequest{}
  579. response, err := broker.DeleteGroups(&request)
  580. if err != nil {
  581. t.Error(err)
  582. }
  583. if response == nil {
  584. t.Error("DeleteGroups request got no response!")
  585. }
  586. }},
  587. }
  588. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  589. metricValidators := newMetricValidators()
  590. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  591. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  592. // Check that the number of bytes sent corresponds to what the mock broker received
  593. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  594. if mockBrokerBytesWritten == 0 {
  595. // This a ProduceRequest with NoResponse
  596. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  597. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  598. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  599. } else {
  600. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  601. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  602. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  603. }
  604. // Check that the number of bytes received corresponds to what the mock broker sent
  605. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  606. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  607. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  608. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  609. // Run the validators
  610. metricValidators.run(t, broker.conf.MetricRegistry)
  611. }