broker_test.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  6. "net"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/rcrowley/go-metrics"
  11. )
  12. func ExampleBroker() {
  13. broker := NewBroker("localhost:9092")
  14. err := broker.Open(nil)
  15. if err != nil {
  16. panic(err)
  17. }
  18. request := MetadataRequest{Topics: []string{"myTopic"}}
  19. response, err := broker.GetMetadata(&request)
  20. if err != nil {
  21. _ = broker.Close()
  22. panic(err)
  23. }
  24. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  25. if err = broker.Close(); err != nil {
  26. panic(err)
  27. }
  28. }
  29. type mockEncoder struct {
  30. bytes []byte
  31. }
  32. func (m mockEncoder) encode(pe packetEncoder) error {
  33. return pe.putRawBytes(m.bytes)
  34. }
  35. type brokerMetrics struct {
  36. bytesRead int
  37. bytesWritten int
  38. }
  39. func TestBrokerAccessors(t *testing.T) {
  40. broker := NewBroker("abc:123")
  41. if broker.ID() != -1 {
  42. t.Error("New broker didn't have an ID of -1.")
  43. }
  44. if broker.Addr() != "abc:123" {
  45. t.Error("New broker didn't have the correct address")
  46. }
  47. if broker.Rack() != "" {
  48. t.Error("New broker didn't have an unknown rack.")
  49. }
  50. broker.id = 34
  51. if broker.ID() != 34 {
  52. t.Error("Manually setting broker ID did not take effect.")
  53. }
  54. rack := "dc1"
  55. broker.rack = &rack
  56. if broker.Rack() != rack {
  57. t.Error("Manually setting broker rack did not take effect.")
  58. }
  59. }
  60. func TestSimpleBrokerCommunication(t *testing.T) {
  61. for _, tt := range brokerTestTable {
  62. Logger.Printf("Testing broker communication for %s", tt.name)
  63. mb := NewMockBroker(t, 0)
  64. mb.Returns(&mockEncoder{tt.response})
  65. pendingNotify := make(chan brokerMetrics)
  66. // Register a callback to be notified about successful requests
  67. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  68. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  69. })
  70. broker := NewBroker(mb.Addr())
  71. // Set the broker id in order to validate local broker metrics
  72. broker.id = 0
  73. conf := NewConfig()
  74. conf.Version = tt.version
  75. err := broker.Open(conf)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. tt.runner(t, broker)
  80. // Wait up to 500 ms for the remote broker to process the request and
  81. // notify us about the metrics
  82. timeout := 500 * time.Millisecond
  83. select {
  84. case mockBrokerMetrics := <-pendingNotify:
  85. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  86. case <-time.After(timeout):
  87. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  88. }
  89. mb.Close()
  90. err = broker.Close()
  91. if err != nil {
  92. t.Error(err)
  93. }
  94. }
  95. }
  96. var ErrTokenFailure = errors.New("Failure generating token")
  97. type TokenProvider struct {
  98. accessToken *AccessToken
  99. err error
  100. }
  101. func (t *TokenProvider) Token() (*AccessToken, error) {
  102. return t.accessToken, t.err
  103. }
  104. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  105. return &TokenProvider{
  106. accessToken: token,
  107. err: err,
  108. }
  109. }
  110. func TestSASLOAuthBearer(t *testing.T) {
  111. testTable := []struct {
  112. name string
  113. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  114. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  115. expectClientErr bool // Expect an internal client-side error
  116. tokProvider *TokenProvider
  117. }{
  118. {
  119. name: "SASL/OAUTHBEARER OK server response",
  120. mockAuthErr: ErrNoError,
  121. mockHandshakeErr: ErrNoError,
  122. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  123. },
  124. {
  125. name: "SASL/OAUTHBEARER authentication failure response",
  126. mockAuthErr: ErrSASLAuthenticationFailed,
  127. mockHandshakeErr: ErrNoError,
  128. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  129. },
  130. {
  131. name: "SASL/OAUTHBEARER handshake failure response",
  132. mockAuthErr: ErrNoError,
  133. mockHandshakeErr: ErrSASLAuthenticationFailed,
  134. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  135. },
  136. {
  137. name: "SASL/OAUTHBEARER token generation error",
  138. mockAuthErr: ErrNoError,
  139. mockHandshakeErr: ErrNoError,
  140. expectClientErr: true,
  141. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  142. },
  143. {
  144. name: "SASL/OAUTHBEARER invalid extension",
  145. mockAuthErr: ErrNoError,
  146. mockHandshakeErr: ErrNoError,
  147. expectClientErr: true,
  148. tokProvider: newTokenProvider(&AccessToken{
  149. Token: "access-token-123",
  150. Extensions: map[string]string{"auth": "auth-value"},
  151. }, nil),
  152. },
  153. }
  154. for i, test := range testTable {
  155. // mockBroker mocks underlying network logic and broker responses
  156. mockBroker := NewMockBroker(t, 0)
  157. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte("response_payload"))
  158. if test.mockAuthErr != ErrNoError {
  159. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  160. }
  161. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeOAuth})
  162. if test.mockHandshakeErr != ErrNoError {
  163. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  164. }
  165. mockBroker.SetHandlerByMap(map[string]MockResponse{
  166. "SaslAuthenticateRequest": mockSASLAuthResponse,
  167. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  168. })
  169. // broker executes SASL requests against mockBroker
  170. broker := NewBroker(mockBroker.Addr())
  171. broker.requestRate = metrics.NilMeter{}
  172. broker.outgoingByteRate = metrics.NilMeter{}
  173. broker.incomingByteRate = metrics.NilMeter{}
  174. broker.requestSize = metrics.NilHistogram{}
  175. broker.responseSize = metrics.NilHistogram{}
  176. broker.responseRate = metrics.NilMeter{}
  177. broker.requestLatency = metrics.NilHistogram{}
  178. conf := NewConfig()
  179. conf.Net.SASL.Mechanism = SASLTypeOAuth
  180. conf.Net.SASL.TokenProvider = test.tokProvider
  181. broker.conf = conf
  182. dialer := net.Dialer{
  183. Timeout: conf.Net.DialTimeout,
  184. KeepAlive: conf.Net.KeepAlive,
  185. LocalAddr: conf.Net.LocalAddr,
  186. }
  187. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  188. if err != nil {
  189. t.Fatal(err)
  190. }
  191. broker.conn = conn
  192. err = broker.authenticateViaSASL()
  193. if test.mockAuthErr != ErrNoError {
  194. if test.mockAuthErr != err {
  195. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  196. }
  197. } else if test.mockHandshakeErr != ErrNoError {
  198. if test.mockHandshakeErr != err {
  199. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  200. }
  201. } else if test.expectClientErr && err == nil {
  202. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  203. } else if !test.expectClientErr && err != nil {
  204. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  205. }
  206. mockBroker.Close()
  207. }
  208. }
  209. // A mock scram client.
  210. type MockSCRAMClient struct {
  211. done bool
  212. }
  213. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  214. return nil
  215. }
  216. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  217. if challenge == "" {
  218. return "ping", nil
  219. }
  220. if challenge == "pong" {
  221. m.done = true
  222. return "", nil
  223. }
  224. return "", errors.New("failed to authenticate :(")
  225. }
  226. func (m *MockSCRAMClient) Done() bool {
  227. return m.done
  228. }
  229. var _ SCRAMClient = &MockSCRAMClient{}
  230. func TestSASLSCRAMSHAXXX(t *testing.T) {
  231. testTable := []struct {
  232. name string
  233. mockHandshakeErr KError
  234. mockSASLAuthErr KError
  235. expectClientErr bool
  236. scramClient *MockSCRAMClient
  237. scramChallengeResp string
  238. }{
  239. {
  240. name: "SASL/SCRAMSHAXXX successfull authentication",
  241. mockHandshakeErr: ErrNoError,
  242. scramClient: &MockSCRAMClient{},
  243. scramChallengeResp: "pong",
  244. },
  245. {
  246. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  247. mockHandshakeErr: ErrNoError,
  248. mockSASLAuthErr: ErrNoError,
  249. scramClient: &MockSCRAMClient{},
  250. scramChallengeResp: "gong",
  251. expectClientErr: true,
  252. },
  253. {
  254. name: "SASL/SCRAMSHAXXX server authentication error",
  255. mockHandshakeErr: ErrNoError,
  256. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  257. scramClient: &MockSCRAMClient{},
  258. scramChallengeResp: "pong",
  259. },
  260. {
  261. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  262. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  263. mockSASLAuthErr: ErrNoError,
  264. scramClient: &MockSCRAMClient{},
  265. scramChallengeResp: "pong",
  266. },
  267. }
  268. for i, test := range testTable {
  269. // mockBroker mocks underlying network logic and broker responses
  270. mockBroker := NewMockBroker(t, 0)
  271. broker := NewBroker(mockBroker.Addr())
  272. // broker executes SASL requests against mockBroker
  273. broker.requestRate = metrics.NilMeter{}
  274. broker.outgoingByteRate = metrics.NilMeter{}
  275. broker.incomingByteRate = metrics.NilMeter{}
  276. broker.requestSize = metrics.NilHistogram{}
  277. broker.responseSize = metrics.NilHistogram{}
  278. broker.responseRate = metrics.NilMeter{}
  279. broker.requestLatency = metrics.NilHistogram{}
  280. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  281. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  282. if test.mockSASLAuthErr != ErrNoError {
  283. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  284. }
  285. if test.mockHandshakeErr != ErrNoError {
  286. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  287. }
  288. mockBroker.SetHandlerByMap(map[string]MockResponse{
  289. "SaslAuthenticateRequest": mockSASLAuthResponse,
  290. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  291. })
  292. conf := NewConfig()
  293. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  294. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  295. broker.conf = conf
  296. dialer := net.Dialer{
  297. Timeout: conf.Net.DialTimeout,
  298. KeepAlive: conf.Net.KeepAlive,
  299. LocalAddr: conf.Net.LocalAddr,
  300. }
  301. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. broker.conn = conn
  306. err = broker.authenticateViaSASL()
  307. if test.mockSASLAuthErr != ErrNoError {
  308. if test.mockSASLAuthErr != err {
  309. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  310. }
  311. } else if test.mockHandshakeErr != ErrNoError {
  312. if test.mockHandshakeErr != err {
  313. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  314. }
  315. } else if test.expectClientErr && err == nil {
  316. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  317. } else if !test.expectClientErr && err != nil {
  318. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  319. }
  320. mockBroker.Close()
  321. }
  322. }
  323. func TestSASLPlainAuth(t *testing.T) {
  324. testTable := []struct {
  325. name string
  326. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  327. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  328. expectClientErr bool // Expect an internal client-side error
  329. }{
  330. {
  331. name: "SASL Plain OK server response",
  332. mockAuthErr: ErrNoError,
  333. mockHandshakeErr: ErrNoError,
  334. },
  335. {
  336. name: "SASL Plain authentication failure response",
  337. mockAuthErr: ErrSASLAuthenticationFailed,
  338. mockHandshakeErr: ErrNoError,
  339. },
  340. {
  341. name: "SASL Plain handshake failure response",
  342. mockAuthErr: ErrNoError,
  343. mockHandshakeErr: ErrSASLAuthenticationFailed,
  344. },
  345. }
  346. for i, test := range testTable {
  347. // mockBroker mocks underlying network logic and broker responses
  348. mockBroker := NewMockBroker(t, 0)
  349. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  350. SetAuthBytes([]byte(`response_payload`))
  351. if test.mockAuthErr != ErrNoError {
  352. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  353. }
  354. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  355. SetEnabledMechanisms([]string{SASLTypePlaintext})
  356. if test.mockHandshakeErr != ErrNoError {
  357. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  358. }
  359. mockBroker.SetHandlerByMap(map[string]MockResponse{
  360. "SaslAuthenticateRequest": mockSASLAuthResponse,
  361. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  362. })
  363. // broker executes SASL requests against mockBroker
  364. broker := NewBroker(mockBroker.Addr())
  365. broker.requestRate = metrics.NilMeter{}
  366. broker.outgoingByteRate = metrics.NilMeter{}
  367. broker.incomingByteRate = metrics.NilMeter{}
  368. broker.requestSize = metrics.NilHistogram{}
  369. broker.responseSize = metrics.NilHistogram{}
  370. broker.responseRate = metrics.NilMeter{}
  371. broker.requestLatency = metrics.NilHistogram{}
  372. conf := NewConfig()
  373. conf.Net.SASL.Mechanism = SASLTypePlaintext
  374. conf.Net.SASL.User = "token"
  375. conf.Net.SASL.Password = "password"
  376. broker.conf = conf
  377. broker.conf.Version = V1_0_0_0
  378. dialer := net.Dialer{
  379. Timeout: conf.Net.DialTimeout,
  380. KeepAlive: conf.Net.KeepAlive,
  381. LocalAddr: conf.Net.LocalAddr,
  382. }
  383. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. broker.conn = conn
  388. err = broker.authenticateViaSASL()
  389. if test.mockAuthErr != ErrNoError {
  390. if test.mockAuthErr != err {
  391. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  392. }
  393. } else if test.mockHandshakeErr != ErrNoError {
  394. if test.mockHandshakeErr != err {
  395. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  396. }
  397. } else if test.expectClientErr && err == nil {
  398. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  399. } else if !test.expectClientErr && err != nil {
  400. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  401. }
  402. mockBroker.Close()
  403. }
  404. }
  405. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  406. testTable := []struct {
  407. name string
  408. error error
  409. mockKerberosClient bool
  410. errorStage string
  411. badResponse bool
  412. badKeyChecksum bool
  413. }{
  414. {
  415. name: "Kerberos authentication success",
  416. error: nil,
  417. mockKerberosClient: true,
  418. },
  419. {
  420. name: "Kerberos login fails",
  421. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  422. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  423. "cation information was invalid - PREAUTH_FAILED"),
  424. mockKerberosClient: true,
  425. errorStage: "login",
  426. },
  427. {
  428. name: "Kerberos service ticket fails",
  429. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  430. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  431. "cation information was invalid - PREAUTH_FAILED"),
  432. mockKerberosClient: true,
  433. errorStage: "service_ticket",
  434. },
  435. {
  436. name: "Kerberos client creation fails",
  437. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  438. },
  439. {
  440. name: "Bad server response, unmarshall key error",
  441. error: errors.New("bytes shorter than header length"),
  442. badResponse: true,
  443. mockKerberosClient: true,
  444. },
  445. {
  446. name: "Bad token checksum",
  447. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  448. badResponse: false,
  449. badKeyChecksum: true,
  450. mockKerberosClient: true,
  451. },
  452. }
  453. for i, test := range testTable {
  454. mockBroker := NewMockBroker(t, 0)
  455. // broker executes SASL requests against mockBroker
  456. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  457. return nil
  458. })
  459. broker := NewBroker(mockBroker.Addr())
  460. broker.requestRate = metrics.NilMeter{}
  461. broker.outgoingByteRate = metrics.NilMeter{}
  462. broker.incomingByteRate = metrics.NilMeter{}
  463. broker.requestSize = metrics.NilHistogram{}
  464. broker.responseSize = metrics.NilHistogram{}
  465. broker.responseRate = metrics.NilMeter{}
  466. broker.requestLatency = metrics.NilHistogram{}
  467. conf := NewConfig()
  468. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  469. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  470. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  471. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  472. conf.Net.SASL.GSSAPI.Username = "kafka"
  473. conf.Net.SASL.GSSAPI.Password = "kafka"
  474. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  475. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  476. broker.conf = conf
  477. broker.conf.Version = V1_0_0_0
  478. dialer := net.Dialer{
  479. Timeout: conf.Net.DialTimeout,
  480. KeepAlive: conf.Net.KeepAlive,
  481. LocalAddr: conf.Net.LocalAddr,
  482. }
  483. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. gssapiHandler := KafkaGSSAPIHandler{
  488. client: &MockKerberosClient{},
  489. badResponse: test.badResponse,
  490. badKeyChecksum: test.badKeyChecksum,
  491. }
  492. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  493. broker.conn = conn
  494. if test.mockKerberosClient {
  495. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  496. return &MockKerberosClient{
  497. mockError: test.error,
  498. errorStage: test.errorStage,
  499. }, nil
  500. }
  501. } else {
  502. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  503. }
  504. err = broker.authenticateViaSASL()
  505. if err != nil && test.error != nil {
  506. if test.error.Error() != err.Error() {
  507. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  508. }
  509. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  510. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  511. }
  512. mockBroker.Close()
  513. }
  514. }
  515. func TestBuildClientInitialResponse(t *testing.T) {
  516. testTable := []struct {
  517. name string
  518. token *AccessToken
  519. expected []byte
  520. expectError bool
  521. }{
  522. {
  523. name: "Build SASL client initial response with two extensions",
  524. token: &AccessToken{
  525. Token: "the-token",
  526. Extensions: map[string]string{
  527. "x": "1",
  528. "y": "2",
  529. },
  530. },
  531. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  532. },
  533. {
  534. name: "Build SASL client initial response with no extensions",
  535. token: &AccessToken{Token: "the-token"},
  536. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  537. },
  538. {
  539. name: "Build SASL client initial response using reserved extension",
  540. token: &AccessToken{
  541. Token: "the-token",
  542. Extensions: map[string]string{
  543. "auth": "auth-value",
  544. },
  545. },
  546. expected: []byte(""),
  547. expectError: true,
  548. },
  549. }
  550. for i, test := range testTable {
  551. actual, err := buildClientInitialResponse(test.token)
  552. if !reflect.DeepEqual(test.expected, actual) {
  553. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  554. }
  555. if test.expectError && err == nil {
  556. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  557. }
  558. if !test.expectError && err != nil {
  559. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  560. }
  561. }
  562. }
  563. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  564. var brokerTestTable = []struct {
  565. version KafkaVersion
  566. name string
  567. response []byte
  568. runner func(*testing.T, *Broker)
  569. }{
  570. {V0_10_0_0,
  571. "MetadataRequest",
  572. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  573. func(t *testing.T, broker *Broker) {
  574. request := MetadataRequest{}
  575. response, err := broker.GetMetadata(&request)
  576. if err != nil {
  577. t.Error(err)
  578. }
  579. if response == nil {
  580. t.Error("Metadata request got no response!")
  581. }
  582. }},
  583. {V0_10_0_0,
  584. "ConsumerMetadataRequest",
  585. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  586. func(t *testing.T, broker *Broker) {
  587. request := ConsumerMetadataRequest{}
  588. response, err := broker.GetConsumerMetadata(&request)
  589. if err != nil {
  590. t.Error(err)
  591. }
  592. if response == nil {
  593. t.Error("Consumer Metadata request got no response!")
  594. }
  595. }},
  596. {V0_10_0_0,
  597. "ProduceRequest (NoResponse)",
  598. []byte{},
  599. func(t *testing.T, broker *Broker) {
  600. request := ProduceRequest{}
  601. request.RequiredAcks = NoResponse
  602. response, err := broker.Produce(&request)
  603. if err != nil {
  604. t.Error(err)
  605. }
  606. if response != nil {
  607. t.Error("Produce request with NoResponse got a response!")
  608. }
  609. }},
  610. {V0_10_0_0,
  611. "ProduceRequest (WaitForLocal)",
  612. []byte{0x00, 0x00, 0x00, 0x00},
  613. func(t *testing.T, broker *Broker) {
  614. request := ProduceRequest{}
  615. request.RequiredAcks = WaitForLocal
  616. response, err := broker.Produce(&request)
  617. if err != nil {
  618. t.Error(err)
  619. }
  620. if response == nil {
  621. t.Error("Produce request without NoResponse got no response!")
  622. }
  623. }},
  624. {V0_10_0_0,
  625. "FetchRequest",
  626. []byte{0x00, 0x00, 0x00, 0x00},
  627. func(t *testing.T, broker *Broker) {
  628. request := FetchRequest{}
  629. response, err := broker.Fetch(&request)
  630. if err != nil {
  631. t.Error(err)
  632. }
  633. if response == nil {
  634. t.Error("Fetch request got no response!")
  635. }
  636. }},
  637. {V0_10_0_0,
  638. "OffsetFetchRequest",
  639. []byte{0x00, 0x00, 0x00, 0x00},
  640. func(t *testing.T, broker *Broker) {
  641. request := OffsetFetchRequest{}
  642. response, err := broker.FetchOffset(&request)
  643. if err != nil {
  644. t.Error(err)
  645. }
  646. if response == nil {
  647. t.Error("OffsetFetch request got no response!")
  648. }
  649. }},
  650. {V0_10_0_0,
  651. "OffsetCommitRequest",
  652. []byte{0x00, 0x00, 0x00, 0x00},
  653. func(t *testing.T, broker *Broker) {
  654. request := OffsetCommitRequest{}
  655. response, err := broker.CommitOffset(&request)
  656. if err != nil {
  657. t.Error(err)
  658. }
  659. if response == nil {
  660. t.Error("OffsetCommit request got no response!")
  661. }
  662. }},
  663. {V0_10_0_0,
  664. "OffsetRequest",
  665. []byte{0x00, 0x00, 0x00, 0x00},
  666. func(t *testing.T, broker *Broker) {
  667. request := OffsetRequest{}
  668. response, err := broker.GetAvailableOffsets(&request)
  669. if err != nil {
  670. t.Error(err)
  671. }
  672. if response == nil {
  673. t.Error("Offset request got no response!")
  674. }
  675. }},
  676. {V0_10_0_0,
  677. "JoinGroupRequest",
  678. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  679. func(t *testing.T, broker *Broker) {
  680. request := JoinGroupRequest{}
  681. response, err := broker.JoinGroup(&request)
  682. if err != nil {
  683. t.Error(err)
  684. }
  685. if response == nil {
  686. t.Error("JoinGroup request got no response!")
  687. }
  688. }},
  689. {V0_10_0_0,
  690. "SyncGroupRequest",
  691. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  692. func(t *testing.T, broker *Broker) {
  693. request := SyncGroupRequest{}
  694. response, err := broker.SyncGroup(&request)
  695. if err != nil {
  696. t.Error(err)
  697. }
  698. if response == nil {
  699. t.Error("SyncGroup request got no response!")
  700. }
  701. }},
  702. {V0_10_0_0,
  703. "LeaveGroupRequest",
  704. []byte{0x00, 0x00},
  705. func(t *testing.T, broker *Broker) {
  706. request := LeaveGroupRequest{}
  707. response, err := broker.LeaveGroup(&request)
  708. if err != nil {
  709. t.Error(err)
  710. }
  711. if response == nil {
  712. t.Error("LeaveGroup request got no response!")
  713. }
  714. }},
  715. {V0_10_0_0,
  716. "HeartbeatRequest",
  717. []byte{0x00, 0x00},
  718. func(t *testing.T, broker *Broker) {
  719. request := HeartbeatRequest{}
  720. response, err := broker.Heartbeat(&request)
  721. if err != nil {
  722. t.Error(err)
  723. }
  724. if response == nil {
  725. t.Error("Heartbeat request got no response!")
  726. }
  727. }},
  728. {V0_10_0_0,
  729. "ListGroupsRequest",
  730. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  731. func(t *testing.T, broker *Broker) {
  732. request := ListGroupsRequest{}
  733. response, err := broker.ListGroups(&request)
  734. if err != nil {
  735. t.Error(err)
  736. }
  737. if response == nil {
  738. t.Error("ListGroups request got no response!")
  739. }
  740. }},
  741. {V0_10_0_0,
  742. "DescribeGroupsRequest",
  743. []byte{0x00, 0x00, 0x00, 0x00},
  744. func(t *testing.T, broker *Broker) {
  745. request := DescribeGroupsRequest{}
  746. response, err := broker.DescribeGroups(&request)
  747. if err != nil {
  748. t.Error(err)
  749. }
  750. if response == nil {
  751. t.Error("DescribeGroups request got no response!")
  752. }
  753. }},
  754. {V0_10_0_0,
  755. "ApiVersionsRequest",
  756. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  757. func(t *testing.T, broker *Broker) {
  758. request := ApiVersionsRequest{}
  759. response, err := broker.ApiVersions(&request)
  760. if err != nil {
  761. t.Error(err)
  762. }
  763. if response == nil {
  764. t.Error("ApiVersions request got no response!")
  765. }
  766. }},
  767. {V1_1_0_0,
  768. "DeleteGroupsRequest",
  769. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  770. func(t *testing.T, broker *Broker) {
  771. request := DeleteGroupsRequest{}
  772. response, err := broker.DeleteGroups(&request)
  773. if err != nil {
  774. t.Error(err)
  775. }
  776. if response == nil {
  777. t.Error("DeleteGroups request got no response!")
  778. }
  779. }},
  780. }
  781. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  782. metricValidators := newMetricValidators()
  783. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  784. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  785. // Check that the number of bytes sent corresponds to what the mock broker received
  786. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  787. if mockBrokerBytesWritten == 0 {
  788. // This a ProduceRequest with NoResponse
  789. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  790. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  791. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  792. } else {
  793. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  794. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  795. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  796. }
  797. // Check that the number of bytes received corresponds to what the mock broker sent
  798. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  799. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  800. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  801. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  802. // Run the validators
  803. metricValidators.run(t, broker.conf.MetricRegistry)
  804. }