broker_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. )
  11. func ExampleBroker() {
  12. broker := NewBroker("localhost:9092")
  13. err := broker.Open(nil)
  14. if err != nil {
  15. panic(err)
  16. }
  17. request := MetadataRequest{Topics: []string{"myTopic"}}
  18. response, err := broker.GetMetadata(&request)
  19. if err != nil {
  20. _ = broker.Close()
  21. panic(err)
  22. }
  23. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  24. if err = broker.Close(); err != nil {
  25. panic(err)
  26. }
  27. }
  28. type mockEncoder struct {
  29. bytes []byte
  30. }
  31. func (m mockEncoder) encode(pe packetEncoder) error {
  32. return pe.putRawBytes(m.bytes)
  33. }
  34. type brokerMetrics struct {
  35. bytesRead int
  36. bytesWritten int
  37. }
  38. func TestBrokerAccessors(t *testing.T) {
  39. broker := NewBroker("abc:123")
  40. if broker.ID() != -1 {
  41. t.Error("New broker didn't have an ID of -1.")
  42. }
  43. if broker.Addr() != "abc:123" {
  44. t.Error("New broker didn't have the correct address")
  45. }
  46. if broker.Rack() != "" {
  47. t.Error("New broker didn't have an unknown rack.")
  48. }
  49. broker.id = 34
  50. if broker.ID() != 34 {
  51. t.Error("Manually setting broker ID did not take effect.")
  52. }
  53. rack := "dc1"
  54. broker.rack = &rack
  55. if broker.Rack() != rack {
  56. t.Error("Manually setting broker rack did not take effect.")
  57. }
  58. }
  59. func TestSimpleBrokerCommunication(t *testing.T) {
  60. for _, tt := range brokerTestTable {
  61. Logger.Printf("Testing broker communication for %s", tt.name)
  62. mb := NewMockBroker(t, 0)
  63. mb.Returns(&mockEncoder{tt.response})
  64. pendingNotify := make(chan brokerMetrics)
  65. // Register a callback to be notified about successful requests
  66. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  67. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  68. })
  69. broker := NewBroker(mb.Addr())
  70. // Set the broker id in order to validate local broker metrics
  71. broker.id = 0
  72. conf := NewConfig()
  73. conf.Version = tt.version
  74. err := broker.Open(conf)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. tt.runner(t, broker)
  79. // Wait up to 500 ms for the remote broker to process the request and
  80. // notify us about the metrics
  81. timeout := 500 * time.Millisecond
  82. select {
  83. case mockBrokerMetrics := <-pendingNotify:
  84. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  85. case <-time.After(timeout):
  86. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  87. }
  88. mb.Close()
  89. err = broker.Close()
  90. if err != nil {
  91. t.Error(err)
  92. }
  93. }
  94. }
  95. var ErrTokenFailure = errors.New("Failure generating token")
  96. type TokenProvider struct {
  97. accessToken *AccessToken
  98. err error
  99. }
  100. func (t *TokenProvider) Token() (*AccessToken, error) {
  101. return t.accessToken, t.err
  102. }
  103. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  104. return &TokenProvider{
  105. accessToken: token,
  106. err: err,
  107. }
  108. }
  109. func TestSASLOAuthBearer(t *testing.T) {
  110. testTable := []struct {
  111. name string
  112. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  113. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  114. expectClientErr bool // Expect an internal client-side error
  115. tokProvider *TokenProvider
  116. }{
  117. {
  118. name: "SASL/OAUTHBEARER OK server response",
  119. mockAuthErr: ErrNoError,
  120. mockHandshakeErr: ErrNoError,
  121. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  122. },
  123. {
  124. name: "SASL/OAUTHBEARER authentication failure response",
  125. mockAuthErr: ErrSASLAuthenticationFailed,
  126. mockHandshakeErr: ErrNoError,
  127. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  128. },
  129. {
  130. name: "SASL/OAUTHBEARER handshake failure response",
  131. mockAuthErr: ErrNoError,
  132. mockHandshakeErr: ErrSASLAuthenticationFailed,
  133. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  134. },
  135. {
  136. name: "SASL/OAUTHBEARER token generation error",
  137. mockAuthErr: ErrNoError,
  138. mockHandshakeErr: ErrNoError,
  139. expectClientErr: true,
  140. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  141. },
  142. {
  143. name: "SASL/OAUTHBEARER invalid extension",
  144. mockAuthErr: ErrNoError,
  145. mockHandshakeErr: ErrNoError,
  146. expectClientErr: true,
  147. tokProvider: newTokenProvider(&AccessToken{
  148. Token: "access-token-123",
  149. Extensions: map[string]string{"auth": "auth-value"},
  150. }, nil),
  151. },
  152. }
  153. for i, test := range testTable {
  154. // mockBroker mocks underlying network logic and broker responses
  155. mockBroker := NewMockBroker(t, 0)
  156. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte("response_payload"))
  157. if test.mockAuthErr != ErrNoError {
  158. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  159. }
  160. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeOAuth})
  161. if test.mockHandshakeErr != ErrNoError {
  162. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  163. }
  164. mockBroker.SetHandlerByMap(map[string]MockResponse{
  165. "SaslAuthenticateRequest": mockSASLAuthResponse,
  166. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  167. })
  168. // broker executes SASL requests against mockBroker
  169. broker := NewBroker(mockBroker.Addr())
  170. broker.requestRate = metrics.NilMeter{}
  171. broker.outgoingByteRate = metrics.NilMeter{}
  172. broker.incomingByteRate = metrics.NilMeter{}
  173. broker.requestSize = metrics.NilHistogram{}
  174. broker.responseSize = metrics.NilHistogram{}
  175. broker.responseRate = metrics.NilMeter{}
  176. broker.requestLatency = metrics.NilHistogram{}
  177. conf := NewConfig()
  178. conf.Net.SASL.Mechanism = SASLTypeOAuth
  179. conf.Net.SASL.TokenProvider = test.tokProvider
  180. broker.conf = conf
  181. dialer := net.Dialer{
  182. Timeout: conf.Net.DialTimeout,
  183. KeepAlive: conf.Net.KeepAlive,
  184. LocalAddr: conf.Net.LocalAddr,
  185. }
  186. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  187. if err != nil {
  188. t.Fatal(err)
  189. }
  190. broker.conn = conn
  191. err = broker.authenticateViaSASL()
  192. if test.mockAuthErr != ErrNoError {
  193. if test.mockAuthErr != err {
  194. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  195. }
  196. } else if test.mockHandshakeErr != ErrNoError {
  197. if test.mockHandshakeErr != err {
  198. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  199. }
  200. } else if test.expectClientErr && err == nil {
  201. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  202. } else if !test.expectClientErr && err != nil {
  203. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  204. }
  205. mockBroker.Close()
  206. }
  207. }
  208. // A mock scram client.
  209. type MockSCRAMClient struct {
  210. done bool
  211. }
  212. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  213. return nil
  214. }
  215. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  216. if challenge == "" {
  217. return "ping", nil
  218. }
  219. if challenge == "pong" {
  220. m.done = true
  221. return "", nil
  222. }
  223. return "", errors.New("failed to authenticate :(")
  224. }
  225. func (m *MockSCRAMClient) Done() bool {
  226. return m.done
  227. }
  228. var _ SCRAMClient = &MockSCRAMClient{}
  229. func TestSASLSCRAMSHAXXX(t *testing.T) {
  230. testTable := []struct {
  231. name string
  232. mockHandshakeErr KError
  233. mockSASLAuthErr KError
  234. expectClientErr bool
  235. scramClient *MockSCRAMClient
  236. scramChallengeResp string
  237. }{
  238. {
  239. name: "SASL/SCRAMSHAXXX successfull authentication",
  240. mockHandshakeErr: ErrNoError,
  241. scramClient: &MockSCRAMClient{},
  242. scramChallengeResp: "pong",
  243. },
  244. {
  245. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  246. mockHandshakeErr: ErrNoError,
  247. mockSASLAuthErr: ErrNoError,
  248. scramClient: &MockSCRAMClient{},
  249. scramChallengeResp: "gong",
  250. expectClientErr: true,
  251. },
  252. {
  253. name: "SASL/SCRAMSHAXXX server authentication error",
  254. mockHandshakeErr: ErrNoError,
  255. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  256. scramClient: &MockSCRAMClient{},
  257. scramChallengeResp: "pong",
  258. },
  259. {
  260. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  261. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  262. mockSASLAuthErr: ErrNoError,
  263. scramClient: &MockSCRAMClient{},
  264. scramChallengeResp: "pong",
  265. },
  266. }
  267. for i, test := range testTable {
  268. // mockBroker mocks underlying network logic and broker responses
  269. mockBroker := NewMockBroker(t, 0)
  270. broker := NewBroker(mockBroker.Addr())
  271. // broker executes SASL requests against mockBroker
  272. broker.requestRate = metrics.NilMeter{}
  273. broker.outgoingByteRate = metrics.NilMeter{}
  274. broker.incomingByteRate = metrics.NilMeter{}
  275. broker.requestSize = metrics.NilHistogram{}
  276. broker.responseSize = metrics.NilHistogram{}
  277. broker.responseRate = metrics.NilMeter{}
  278. broker.requestLatency = metrics.NilHistogram{}
  279. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  280. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  281. if test.mockSASLAuthErr != ErrNoError {
  282. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  283. }
  284. if test.mockHandshakeErr != ErrNoError {
  285. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  286. }
  287. mockBroker.SetHandlerByMap(map[string]MockResponse{
  288. "SaslAuthenticateRequest": mockSASLAuthResponse,
  289. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  290. })
  291. conf := NewConfig()
  292. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  293. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  294. broker.conf = conf
  295. dialer := net.Dialer{
  296. Timeout: conf.Net.DialTimeout,
  297. KeepAlive: conf.Net.KeepAlive,
  298. LocalAddr: conf.Net.LocalAddr,
  299. }
  300. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. broker.conn = conn
  305. err = broker.authenticateViaSASL()
  306. if test.mockSASLAuthErr != ErrNoError {
  307. if test.mockSASLAuthErr != err {
  308. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  309. }
  310. } else if test.mockHandshakeErr != ErrNoError {
  311. if test.mockHandshakeErr != err {
  312. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  313. }
  314. } else if test.expectClientErr && err == nil {
  315. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  316. } else if !test.expectClientErr && err != nil {
  317. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  318. }
  319. mockBroker.Close()
  320. }
  321. }
  322. func TestSASLPlainAuth(t *testing.T) {
  323. testTable := []struct {
  324. name string
  325. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  326. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  327. expectClientErr bool // Expect an internal client-side error
  328. }{
  329. {
  330. name: "SASL Plain OK server response",
  331. mockAuthErr: ErrNoError,
  332. mockHandshakeErr: ErrNoError,
  333. },
  334. {
  335. name: "SASL Plain authentication failure response",
  336. mockAuthErr: ErrSASLAuthenticationFailed,
  337. mockHandshakeErr: ErrNoError,
  338. },
  339. {
  340. name: "SASL Plain handshake failure response",
  341. mockAuthErr: ErrNoError,
  342. mockHandshakeErr: ErrSASLAuthenticationFailed,
  343. },
  344. }
  345. for i, test := range testTable {
  346. // mockBroker mocks underlying network logic and broker responses
  347. mockBroker := NewMockBroker(t, 0)
  348. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  349. SetAuthBytes([]byte(`response_payload`))
  350. if test.mockAuthErr != ErrNoError {
  351. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  352. }
  353. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  354. SetEnabledMechanisms([]string{SASLTypePlaintext})
  355. if test.mockHandshakeErr != ErrNoError {
  356. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  357. }
  358. mockBroker.SetHandlerByMap(map[string]MockResponse{
  359. "SaslAuthenticateRequest": mockSASLAuthResponse,
  360. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  361. })
  362. // broker executes SASL requests against mockBroker
  363. broker := NewBroker(mockBroker.Addr())
  364. broker.requestRate = metrics.NilMeter{}
  365. broker.outgoingByteRate = metrics.NilMeter{}
  366. broker.incomingByteRate = metrics.NilMeter{}
  367. broker.requestSize = metrics.NilHistogram{}
  368. broker.responseSize = metrics.NilHistogram{}
  369. broker.responseRate = metrics.NilMeter{}
  370. broker.requestLatency = metrics.NilHistogram{}
  371. conf := NewConfig()
  372. conf.Net.SASL.Mechanism = SASLTypePlaintext
  373. conf.Net.SASL.User = "token"
  374. conf.Net.SASL.Password = "password"
  375. broker.conf = conf
  376. broker.conf.Version = V1_0_0_0
  377. dialer := net.Dialer{
  378. Timeout: conf.Net.DialTimeout,
  379. KeepAlive: conf.Net.KeepAlive,
  380. LocalAddr: conf.Net.LocalAddr,
  381. }
  382. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. broker.conn = conn
  387. err = broker.authenticateViaSASL()
  388. if test.mockAuthErr != ErrNoError {
  389. if test.mockAuthErr != err {
  390. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  391. }
  392. } else if test.mockHandshakeErr != ErrNoError {
  393. if test.mockHandshakeErr != err {
  394. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  395. }
  396. } else if test.expectClientErr && err == nil {
  397. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  398. } else if !test.expectClientErr && err != nil {
  399. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  400. }
  401. mockBroker.Close()
  402. }
  403. }
  404. func TestBuildClientInitialResponse(t *testing.T) {
  405. testTable := []struct {
  406. name string
  407. token *AccessToken
  408. expected []byte
  409. expectError bool
  410. }{
  411. {
  412. name: "Build SASL client initial response with two extensions",
  413. token: &AccessToken{
  414. Token: "the-token",
  415. Extensions: map[string]string{
  416. "x": "1",
  417. "y": "2",
  418. },
  419. },
  420. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  421. },
  422. {
  423. name: "Build SASL client initial response with no extensions",
  424. token: &AccessToken{Token: "the-token"},
  425. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  426. },
  427. {
  428. name: "Build SASL client initial response using reserved extension",
  429. token: &AccessToken{
  430. Token: "the-token",
  431. Extensions: map[string]string{
  432. "auth": "auth-value",
  433. },
  434. },
  435. expected: []byte(""),
  436. expectError: true,
  437. },
  438. }
  439. for i, test := range testTable {
  440. actual, err := buildClientInitialResponse(test.token)
  441. if !reflect.DeepEqual(test.expected, actual) {
  442. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  443. }
  444. if test.expectError && err == nil {
  445. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  446. }
  447. if !test.expectError && err != nil {
  448. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  449. }
  450. }
  451. }
  452. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  453. var brokerTestTable = []struct {
  454. version KafkaVersion
  455. name string
  456. response []byte
  457. runner func(*testing.T, *Broker)
  458. }{
  459. {V0_10_0_0,
  460. "MetadataRequest",
  461. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  462. func(t *testing.T, broker *Broker) {
  463. request := MetadataRequest{}
  464. response, err := broker.GetMetadata(&request)
  465. if err != nil {
  466. t.Error(err)
  467. }
  468. if response == nil {
  469. t.Error("Metadata request got no response!")
  470. }
  471. }},
  472. {V0_10_0_0,
  473. "ConsumerMetadataRequest",
  474. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  475. func(t *testing.T, broker *Broker) {
  476. request := ConsumerMetadataRequest{}
  477. response, err := broker.GetConsumerMetadata(&request)
  478. if err != nil {
  479. t.Error(err)
  480. }
  481. if response == nil {
  482. t.Error("Consumer Metadata request got no response!")
  483. }
  484. }},
  485. {V0_10_0_0,
  486. "ProduceRequest (NoResponse)",
  487. []byte{},
  488. func(t *testing.T, broker *Broker) {
  489. request := ProduceRequest{}
  490. request.RequiredAcks = NoResponse
  491. response, err := broker.Produce(&request)
  492. if err != nil {
  493. t.Error(err)
  494. }
  495. if response != nil {
  496. t.Error("Produce request with NoResponse got a response!")
  497. }
  498. }},
  499. {V0_10_0_0,
  500. "ProduceRequest (WaitForLocal)",
  501. []byte{0x00, 0x00, 0x00, 0x00},
  502. func(t *testing.T, broker *Broker) {
  503. request := ProduceRequest{}
  504. request.RequiredAcks = WaitForLocal
  505. response, err := broker.Produce(&request)
  506. if err != nil {
  507. t.Error(err)
  508. }
  509. if response == nil {
  510. t.Error("Produce request without NoResponse got no response!")
  511. }
  512. }},
  513. {V0_10_0_0,
  514. "FetchRequest",
  515. []byte{0x00, 0x00, 0x00, 0x00},
  516. func(t *testing.T, broker *Broker) {
  517. request := FetchRequest{}
  518. response, err := broker.Fetch(&request)
  519. if err != nil {
  520. t.Error(err)
  521. }
  522. if response == nil {
  523. t.Error("Fetch request got no response!")
  524. }
  525. }},
  526. {V0_10_0_0,
  527. "OffsetFetchRequest",
  528. []byte{0x00, 0x00, 0x00, 0x00},
  529. func(t *testing.T, broker *Broker) {
  530. request := OffsetFetchRequest{}
  531. response, err := broker.FetchOffset(&request)
  532. if err != nil {
  533. t.Error(err)
  534. }
  535. if response == nil {
  536. t.Error("OffsetFetch request got no response!")
  537. }
  538. }},
  539. {V0_10_0_0,
  540. "OffsetCommitRequest",
  541. []byte{0x00, 0x00, 0x00, 0x00},
  542. func(t *testing.T, broker *Broker) {
  543. request := OffsetCommitRequest{}
  544. response, err := broker.CommitOffset(&request)
  545. if err != nil {
  546. t.Error(err)
  547. }
  548. if response == nil {
  549. t.Error("OffsetCommit request got no response!")
  550. }
  551. }},
  552. {V0_10_0_0,
  553. "OffsetRequest",
  554. []byte{0x00, 0x00, 0x00, 0x00},
  555. func(t *testing.T, broker *Broker) {
  556. request := OffsetRequest{}
  557. response, err := broker.GetAvailableOffsets(&request)
  558. if err != nil {
  559. t.Error(err)
  560. }
  561. if response == nil {
  562. t.Error("Offset request got no response!")
  563. }
  564. }},
  565. {V0_10_0_0,
  566. "JoinGroupRequest",
  567. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  568. func(t *testing.T, broker *Broker) {
  569. request := JoinGroupRequest{}
  570. response, err := broker.JoinGroup(&request)
  571. if err != nil {
  572. t.Error(err)
  573. }
  574. if response == nil {
  575. t.Error("JoinGroup request got no response!")
  576. }
  577. }},
  578. {V0_10_0_0,
  579. "SyncGroupRequest",
  580. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  581. func(t *testing.T, broker *Broker) {
  582. request := SyncGroupRequest{}
  583. response, err := broker.SyncGroup(&request)
  584. if err != nil {
  585. t.Error(err)
  586. }
  587. if response == nil {
  588. t.Error("SyncGroup request got no response!")
  589. }
  590. }},
  591. {V0_10_0_0,
  592. "LeaveGroupRequest",
  593. []byte{0x00, 0x00},
  594. func(t *testing.T, broker *Broker) {
  595. request := LeaveGroupRequest{}
  596. response, err := broker.LeaveGroup(&request)
  597. if err != nil {
  598. t.Error(err)
  599. }
  600. if response == nil {
  601. t.Error("LeaveGroup request got no response!")
  602. }
  603. }},
  604. {V0_10_0_0,
  605. "HeartbeatRequest",
  606. []byte{0x00, 0x00},
  607. func(t *testing.T, broker *Broker) {
  608. request := HeartbeatRequest{}
  609. response, err := broker.Heartbeat(&request)
  610. if err != nil {
  611. t.Error(err)
  612. }
  613. if response == nil {
  614. t.Error("Heartbeat request got no response!")
  615. }
  616. }},
  617. {V0_10_0_0,
  618. "ListGroupsRequest",
  619. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  620. func(t *testing.T, broker *Broker) {
  621. request := ListGroupsRequest{}
  622. response, err := broker.ListGroups(&request)
  623. if err != nil {
  624. t.Error(err)
  625. }
  626. if response == nil {
  627. t.Error("ListGroups request got no response!")
  628. }
  629. }},
  630. {V0_10_0_0,
  631. "DescribeGroupsRequest",
  632. []byte{0x00, 0x00, 0x00, 0x00},
  633. func(t *testing.T, broker *Broker) {
  634. request := DescribeGroupsRequest{}
  635. response, err := broker.DescribeGroups(&request)
  636. if err != nil {
  637. t.Error(err)
  638. }
  639. if response == nil {
  640. t.Error("DescribeGroups request got no response!")
  641. }
  642. }},
  643. {V0_10_0_0,
  644. "ApiVersionsRequest",
  645. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  646. func(t *testing.T, broker *Broker) {
  647. request := ApiVersionsRequest{}
  648. response, err := broker.ApiVersions(&request)
  649. if err != nil {
  650. t.Error(err)
  651. }
  652. if response == nil {
  653. t.Error("ApiVersions request got no response!")
  654. }
  655. }},
  656. {V1_1_0_0,
  657. "DeleteGroupsRequest",
  658. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  659. func(t *testing.T, broker *Broker) {
  660. request := DeleteGroupsRequest{}
  661. response, err := broker.DeleteGroups(&request)
  662. if err != nil {
  663. t.Error(err)
  664. }
  665. if response == nil {
  666. t.Error("DeleteGroups request got no response!")
  667. }
  668. }},
  669. }
  670. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  671. metricValidators := newMetricValidators()
  672. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  673. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  674. // Check that the number of bytes sent corresponds to what the mock broker received
  675. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  676. if mockBrokerBytesWritten == 0 {
  677. // This a ProduceRequest with NoResponse
  678. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  679. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  680. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  681. } else {
  682. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  683. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  684. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  685. }
  686. // Check that the number of bytes received corresponds to what the mock broker sent
  687. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  688. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  689. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  690. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  691. // Run the validators
  692. metricValidators.run(t, broker.conf.MetricRegistry)
  693. }