broker_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. "github.com/rcrowley/go-metrics"
  10. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  11. )
  12. func ExampleBroker() {
  13. broker := NewBroker("localhost:9092")
  14. err := broker.Open(nil)
  15. if err != nil {
  16. panic(err)
  17. }
  18. request := MetadataRequest{Topics: []string{"myTopic"}}
  19. response, err := broker.GetMetadata(&request)
  20. if err != nil {
  21. _ = broker.Close()
  22. panic(err)
  23. }
  24. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  25. if err = broker.Close(); err != nil {
  26. panic(err)
  27. }
  28. }
  29. type mockEncoder struct {
  30. bytes []byte
  31. }
  32. func (m mockEncoder) encode(pe packetEncoder) error {
  33. return pe.putRawBytes(m.bytes)
  34. }
  35. type brokerMetrics struct {
  36. bytesRead int
  37. bytesWritten int
  38. }
  39. func TestBrokerAccessors(t *testing.T) {
  40. broker := NewBroker("abc:123")
  41. if broker.ID() != -1 {
  42. t.Error("New broker didn't have an ID of -1.")
  43. }
  44. if broker.Addr() != "abc:123" {
  45. t.Error("New broker didn't have the correct address")
  46. }
  47. if broker.Rack() != "" {
  48. t.Error("New broker didn't have an unknown rack.")
  49. }
  50. broker.id = 34
  51. if broker.ID() != 34 {
  52. t.Error("Manually setting broker ID did not take effect.")
  53. }
  54. rack := "dc1"
  55. broker.rack = &rack
  56. if broker.Rack() != rack {
  57. t.Error("Manually setting broker rack did not take effect.")
  58. }
  59. }
  60. func TestSimpleBrokerCommunication(t *testing.T) {
  61. for _, tt := range brokerTestTable {
  62. Logger.Printf("Testing broker communication for %s", tt.name)
  63. mb := NewMockBroker(t, 0)
  64. mb.Returns(&mockEncoder{tt.response})
  65. pendingNotify := make(chan brokerMetrics)
  66. // Register a callback to be notified about successful requests
  67. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  68. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  69. })
  70. broker := NewBroker(mb.Addr())
  71. // Set the broker id in order to validate local broker metrics
  72. broker.id = 0
  73. conf := NewConfig()
  74. conf.Version = tt.version
  75. err := broker.Open(conf)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. tt.runner(t, broker)
  80. // Wait up to 500 ms for the remote broker to process the request and
  81. // notify us about the metrics
  82. timeout := 500 * time.Millisecond
  83. select {
  84. case mockBrokerMetrics := <-pendingNotify:
  85. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  86. case <-time.After(timeout):
  87. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  88. }
  89. mb.Close()
  90. err = broker.Close()
  91. if err != nil {
  92. t.Error(err)
  93. }
  94. }
  95. }
  96. var ErrTokenFailure = errors.New("Failure generating token")
  97. type TokenProvider struct {
  98. accessToken *AccessToken
  99. err error
  100. }
  101. func (t *TokenProvider) Token() (*AccessToken, error) {
  102. return t.accessToken, t.err
  103. }
  104. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  105. return &TokenProvider{
  106. accessToken: token,
  107. err: err,
  108. }
  109. }
  110. func TestSASLOAuthBearer(t *testing.T) {
  111. testTable := []struct {
  112. name string
  113. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  114. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  115. expectClientErr bool // Expect an internal client-side error
  116. expectedBrokerError KError // Expected Kafka error returned by client
  117. tokProvider *TokenProvider
  118. }{
  119. {
  120. name: "SASL/OAUTHBEARER OK server response",
  121. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  122. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  123. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  124. expectClientErr: false,
  125. expectedBrokerError: ErrNoError,
  126. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  127. },
  128. {
  129. name: "SASL/OAUTHBEARER authentication failure response",
  130. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  131. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  132. mockSASLAuthResponse: NewMockSequence(
  133. // First, the broker response with a challenge
  134. NewMockSaslAuthenticateResponse(t).
  135. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  136. // Next, the client terminates the token exchange. Finally, the
  137. // broker responds with an error message.
  138. NewMockSaslAuthenticateResponse(t).
  139. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  140. SetError(ErrSASLAuthenticationFailed),
  141. ),
  142. expectClientErr: true,
  143. expectedBrokerError: ErrSASLAuthenticationFailed,
  144. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  145. },
  146. {
  147. name: "SASL/OAUTHBEARER handshake failure response",
  148. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  149. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  150. SetError(ErrSASLAuthenticationFailed),
  151. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  152. expectClientErr: true,
  153. expectedBrokerError: ErrSASLAuthenticationFailed,
  154. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  155. },
  156. {
  157. name: "SASL/OAUTHBEARER token generation error",
  158. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  159. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  160. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  161. expectClientErr: true,
  162. expectedBrokerError: ErrNoError,
  163. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  164. },
  165. {
  166. name: "SASL/OAUTHBEARER invalid extension",
  167. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  168. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  169. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  170. expectClientErr: true,
  171. expectedBrokerError: ErrNoError,
  172. tokProvider: newTokenProvider(&AccessToken{
  173. Token: "access-token-123",
  174. Extensions: map[string]string{"auth": "auth-value"},
  175. }, nil),
  176. },
  177. }
  178. for i, test := range testTable {
  179. // mockBroker mocks underlying network logic and broker responses
  180. mockBroker := NewMockBroker(t, 0)
  181. mockBroker.SetHandlerByMap(map[string]MockResponse{
  182. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  183. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  184. })
  185. // broker executes SASL requests against mockBroker
  186. broker := NewBroker(mockBroker.Addr())
  187. broker.requestRate = metrics.NilMeter{}
  188. broker.outgoingByteRate = metrics.NilMeter{}
  189. broker.incomingByteRate = metrics.NilMeter{}
  190. broker.requestSize = metrics.NilHistogram{}
  191. broker.responseSize = metrics.NilHistogram{}
  192. broker.responseRate = metrics.NilMeter{}
  193. broker.requestLatency = metrics.NilHistogram{}
  194. conf := NewConfig()
  195. conf.Net.SASL.Mechanism = SASLTypeOAuth
  196. conf.Net.SASL.TokenProvider = test.tokProvider
  197. broker.conf = conf
  198. dialer := net.Dialer{
  199. Timeout: conf.Net.DialTimeout,
  200. KeepAlive: conf.Net.KeepAlive,
  201. LocalAddr: conf.Net.LocalAddr,
  202. }
  203. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. broker.conn = conn
  208. err = broker.authenticateViaSASL()
  209. if test.expectedBrokerError != ErrNoError {
  210. if test.expectedBrokerError != err {
  211. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  212. }
  213. } else if test.expectedBrokerError != ErrNoError {
  214. if test.expectedBrokerError != err {
  215. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.expectedBrokerError, err)
  216. }
  217. } else if test.expectClientErr && err == nil {
  218. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  219. } else if !test.expectClientErr && err != nil {
  220. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  221. }
  222. mockBroker.Close()
  223. }
  224. }
  225. // A mock scram client.
  226. type MockSCRAMClient struct {
  227. done bool
  228. }
  229. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  230. return nil
  231. }
  232. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  233. if challenge == "" {
  234. return "ping", nil
  235. }
  236. if challenge == "pong" {
  237. m.done = true
  238. return "", nil
  239. }
  240. return "", errors.New("failed to authenticate :(")
  241. }
  242. func (m *MockSCRAMClient) Done() bool {
  243. return m.done
  244. }
  245. var _ SCRAMClient = &MockSCRAMClient{}
  246. func TestSASLSCRAMSHAXXX(t *testing.T) {
  247. testTable := []struct {
  248. name string
  249. mockHandshakeErr KError
  250. mockSASLAuthErr KError
  251. expectClientErr bool
  252. scramClient *MockSCRAMClient
  253. scramChallengeResp string
  254. }{
  255. {
  256. name: "SASL/SCRAMSHAXXX successfull authentication",
  257. mockHandshakeErr: ErrNoError,
  258. scramClient: &MockSCRAMClient{},
  259. scramChallengeResp: "pong",
  260. },
  261. {
  262. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  263. mockHandshakeErr: ErrNoError,
  264. mockSASLAuthErr: ErrNoError,
  265. scramClient: &MockSCRAMClient{},
  266. scramChallengeResp: "gong",
  267. expectClientErr: true,
  268. },
  269. {
  270. name: "SASL/SCRAMSHAXXX server authentication error",
  271. mockHandshakeErr: ErrNoError,
  272. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  273. scramClient: &MockSCRAMClient{},
  274. scramChallengeResp: "pong",
  275. },
  276. {
  277. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  278. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  279. mockSASLAuthErr: ErrNoError,
  280. scramClient: &MockSCRAMClient{},
  281. scramChallengeResp: "pong",
  282. },
  283. }
  284. for i, test := range testTable {
  285. // mockBroker mocks underlying network logic and broker responses
  286. mockBroker := NewMockBroker(t, 0)
  287. broker := NewBroker(mockBroker.Addr())
  288. // broker executes SASL requests against mockBroker
  289. broker.requestRate = metrics.NilMeter{}
  290. broker.outgoingByteRate = metrics.NilMeter{}
  291. broker.incomingByteRate = metrics.NilMeter{}
  292. broker.requestSize = metrics.NilHistogram{}
  293. broker.responseSize = metrics.NilHistogram{}
  294. broker.responseRate = metrics.NilMeter{}
  295. broker.requestLatency = metrics.NilHistogram{}
  296. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  297. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  298. if test.mockSASLAuthErr != ErrNoError {
  299. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  300. }
  301. if test.mockHandshakeErr != ErrNoError {
  302. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  303. }
  304. mockBroker.SetHandlerByMap(map[string]MockResponse{
  305. "SaslAuthenticateRequest": mockSASLAuthResponse,
  306. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  307. })
  308. conf := NewConfig()
  309. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  310. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  311. broker.conf = conf
  312. dialer := net.Dialer{
  313. Timeout: conf.Net.DialTimeout,
  314. KeepAlive: conf.Net.KeepAlive,
  315. LocalAddr: conf.Net.LocalAddr,
  316. }
  317. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. broker.conn = conn
  322. err = broker.authenticateViaSASL()
  323. if test.mockSASLAuthErr != ErrNoError {
  324. if test.mockSASLAuthErr != err {
  325. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  326. }
  327. } else if test.mockHandshakeErr != ErrNoError {
  328. if test.mockHandshakeErr != err {
  329. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  330. }
  331. } else if test.expectClientErr && err == nil {
  332. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  333. } else if !test.expectClientErr && err != nil {
  334. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  335. }
  336. mockBroker.Close()
  337. }
  338. }
  339. func TestSASLPlainAuth(t *testing.T) {
  340. testTable := []struct {
  341. name string
  342. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  343. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  344. expectClientErr bool // Expect an internal client-side error
  345. }{
  346. {
  347. name: "SASL Plain OK server response",
  348. mockAuthErr: ErrNoError,
  349. mockHandshakeErr: ErrNoError,
  350. },
  351. {
  352. name: "SASL Plain authentication failure response",
  353. mockAuthErr: ErrSASLAuthenticationFailed,
  354. mockHandshakeErr: ErrNoError,
  355. },
  356. {
  357. name: "SASL Plain handshake failure response",
  358. mockAuthErr: ErrNoError,
  359. mockHandshakeErr: ErrSASLAuthenticationFailed,
  360. },
  361. }
  362. for i, test := range testTable {
  363. // mockBroker mocks underlying network logic and broker responses
  364. mockBroker := NewMockBroker(t, 0)
  365. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  366. SetAuthBytes([]byte(`response_payload`))
  367. if test.mockAuthErr != ErrNoError {
  368. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  369. }
  370. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  371. SetEnabledMechanisms([]string{SASLTypePlaintext})
  372. if test.mockHandshakeErr != ErrNoError {
  373. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  374. }
  375. mockBroker.SetHandlerByMap(map[string]MockResponse{
  376. "SaslAuthenticateRequest": mockSASLAuthResponse,
  377. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  378. })
  379. // broker executes SASL requests against mockBroker
  380. broker := NewBroker(mockBroker.Addr())
  381. broker.requestRate = metrics.NilMeter{}
  382. broker.outgoingByteRate = metrics.NilMeter{}
  383. broker.incomingByteRate = metrics.NilMeter{}
  384. broker.requestSize = metrics.NilHistogram{}
  385. broker.responseSize = metrics.NilHistogram{}
  386. broker.responseRate = metrics.NilMeter{}
  387. broker.requestLatency = metrics.NilHistogram{}
  388. conf := NewConfig()
  389. conf.Net.SASL.Mechanism = SASLTypePlaintext
  390. conf.Net.SASL.User = "token"
  391. conf.Net.SASL.Password = "password"
  392. conf.Net.SASL.Version = SASLHandshakeV1
  393. broker.conf = conf
  394. broker.conf.Version = V1_0_0_0
  395. dialer := net.Dialer{
  396. Timeout: conf.Net.DialTimeout,
  397. KeepAlive: conf.Net.KeepAlive,
  398. LocalAddr: conf.Net.LocalAddr,
  399. }
  400. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. broker.conn = conn
  405. err = broker.authenticateViaSASL()
  406. if test.mockAuthErr != ErrNoError {
  407. if test.mockAuthErr != err {
  408. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  409. }
  410. } else if test.mockHandshakeErr != ErrNoError {
  411. if test.mockHandshakeErr != err {
  412. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  413. }
  414. } else if test.expectClientErr && err == nil {
  415. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  416. } else if !test.expectClientErr && err != nil {
  417. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  418. }
  419. mockBroker.Close()
  420. }
  421. }
  422. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  423. testTable := []struct {
  424. name string
  425. error error
  426. mockKerberosClient bool
  427. errorStage string
  428. badResponse bool
  429. badKeyChecksum bool
  430. }{
  431. {
  432. name: "Kerberos authentication success",
  433. error: nil,
  434. mockKerberosClient: true,
  435. },
  436. {
  437. name: "Kerberos login fails",
  438. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  439. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  440. "cation information was invalid - PREAUTH_FAILED"),
  441. mockKerberosClient: true,
  442. errorStage: "login",
  443. },
  444. {
  445. name: "Kerberos service ticket fails",
  446. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  447. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  448. "cation information was invalid - PREAUTH_FAILED"),
  449. mockKerberosClient: true,
  450. errorStage: "service_ticket",
  451. },
  452. {
  453. name: "Kerberos client creation fails",
  454. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  455. },
  456. {
  457. name: "Bad server response, unmarshall key error",
  458. error: errors.New("bytes shorter than header length"),
  459. badResponse: true,
  460. mockKerberosClient: true,
  461. },
  462. {
  463. name: "Bad token checksum",
  464. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  465. badResponse: false,
  466. badKeyChecksum: true,
  467. mockKerberosClient: true,
  468. },
  469. }
  470. for i, test := range testTable {
  471. mockBroker := NewMockBroker(t, 0)
  472. // broker executes SASL requests against mockBroker
  473. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  474. return nil
  475. })
  476. broker := NewBroker(mockBroker.Addr())
  477. broker.requestRate = metrics.NilMeter{}
  478. broker.outgoingByteRate = metrics.NilMeter{}
  479. broker.incomingByteRate = metrics.NilMeter{}
  480. broker.requestSize = metrics.NilHistogram{}
  481. broker.responseSize = metrics.NilHistogram{}
  482. broker.responseRate = metrics.NilMeter{}
  483. broker.requestLatency = metrics.NilHistogram{}
  484. conf := NewConfig()
  485. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  486. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  487. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  488. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  489. conf.Net.SASL.GSSAPI.Username = "kafka"
  490. conf.Net.SASL.GSSAPI.Password = "kafka"
  491. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  492. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  493. broker.conf = conf
  494. broker.conf.Version = V1_0_0_0
  495. dialer := net.Dialer{
  496. Timeout: conf.Net.DialTimeout,
  497. KeepAlive: conf.Net.KeepAlive,
  498. LocalAddr: conf.Net.LocalAddr,
  499. }
  500. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  501. if err != nil {
  502. t.Fatal(err)
  503. }
  504. gssapiHandler := KafkaGSSAPIHandler{
  505. client: &MockKerberosClient{},
  506. badResponse: test.badResponse,
  507. badKeyChecksum: test.badKeyChecksum,
  508. }
  509. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  510. broker.conn = conn
  511. if test.mockKerberosClient {
  512. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  513. return &MockKerberosClient{
  514. mockError: test.error,
  515. errorStage: test.errorStage,
  516. }, nil
  517. }
  518. } else {
  519. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  520. }
  521. err = broker.authenticateViaSASL()
  522. if err != nil && test.error != nil {
  523. if test.error.Error() != err.Error() {
  524. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  525. }
  526. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  527. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  528. }
  529. mockBroker.Close()
  530. }
  531. }
  532. func TestBuildClientFirstMessage(t *testing.T) {
  533. testTable := []struct {
  534. name string
  535. token *AccessToken
  536. expected []byte
  537. expectError bool
  538. }{
  539. {
  540. name: "Build SASL client initial response with two extensions",
  541. token: &AccessToken{
  542. Token: "the-token",
  543. Extensions: map[string]string{
  544. "x": "1",
  545. "y": "2",
  546. },
  547. },
  548. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  549. },
  550. {
  551. name: "Build SASL client initial response with no extensions",
  552. token: &AccessToken{Token: "the-token"},
  553. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  554. },
  555. {
  556. name: "Build SASL client initial response using reserved extension",
  557. token: &AccessToken{
  558. Token: "the-token",
  559. Extensions: map[string]string{
  560. "auth": "auth-value",
  561. },
  562. },
  563. expected: []byte(""),
  564. expectError: true,
  565. },
  566. }
  567. for i, test := range testTable {
  568. actual, err := buildClientFirstMessage(test.token)
  569. if !reflect.DeepEqual(test.expected, actual) {
  570. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  571. }
  572. if test.expectError && err == nil {
  573. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  574. }
  575. if !test.expectError && err != nil {
  576. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  577. }
  578. }
  579. }
  580. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  581. var brokerTestTable = []struct {
  582. version KafkaVersion
  583. name string
  584. response []byte
  585. runner func(*testing.T, *Broker)
  586. }{
  587. {V0_10_0_0,
  588. "MetadataRequest",
  589. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  590. func(t *testing.T, broker *Broker) {
  591. request := MetadataRequest{}
  592. response, err := broker.GetMetadata(&request)
  593. if err != nil {
  594. t.Error(err)
  595. }
  596. if response == nil {
  597. t.Error("Metadata request got no response!")
  598. }
  599. }},
  600. {V0_10_0_0,
  601. "ConsumerMetadataRequest",
  602. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  603. func(t *testing.T, broker *Broker) {
  604. request := ConsumerMetadataRequest{}
  605. response, err := broker.GetConsumerMetadata(&request)
  606. if err != nil {
  607. t.Error(err)
  608. }
  609. if response == nil {
  610. t.Error("Consumer Metadata request got no response!")
  611. }
  612. }},
  613. {V0_10_0_0,
  614. "ProduceRequest (NoResponse)",
  615. []byte{},
  616. func(t *testing.T, broker *Broker) {
  617. request := ProduceRequest{}
  618. request.RequiredAcks = NoResponse
  619. response, err := broker.Produce(&request)
  620. if err != nil {
  621. t.Error(err)
  622. }
  623. if response != nil {
  624. t.Error("Produce request with NoResponse got a response!")
  625. }
  626. }},
  627. {V0_10_0_0,
  628. "ProduceRequest (WaitForLocal)",
  629. []byte{0x00, 0x00, 0x00, 0x00},
  630. func(t *testing.T, broker *Broker) {
  631. request := ProduceRequest{}
  632. request.RequiredAcks = WaitForLocal
  633. response, err := broker.Produce(&request)
  634. if err != nil {
  635. t.Error(err)
  636. }
  637. if response == nil {
  638. t.Error("Produce request without NoResponse got no response!")
  639. }
  640. }},
  641. {V0_10_0_0,
  642. "FetchRequest",
  643. []byte{0x00, 0x00, 0x00, 0x00},
  644. func(t *testing.T, broker *Broker) {
  645. request := FetchRequest{}
  646. response, err := broker.Fetch(&request)
  647. if err != nil {
  648. t.Error(err)
  649. }
  650. if response == nil {
  651. t.Error("Fetch request got no response!")
  652. }
  653. }},
  654. {V0_10_0_0,
  655. "OffsetFetchRequest",
  656. []byte{0x00, 0x00, 0x00, 0x00},
  657. func(t *testing.T, broker *Broker) {
  658. request := OffsetFetchRequest{}
  659. response, err := broker.FetchOffset(&request)
  660. if err != nil {
  661. t.Error(err)
  662. }
  663. if response == nil {
  664. t.Error("OffsetFetch request got no response!")
  665. }
  666. }},
  667. {V0_10_0_0,
  668. "OffsetCommitRequest",
  669. []byte{0x00, 0x00, 0x00, 0x00},
  670. func(t *testing.T, broker *Broker) {
  671. request := OffsetCommitRequest{}
  672. response, err := broker.CommitOffset(&request)
  673. if err != nil {
  674. t.Error(err)
  675. }
  676. if response == nil {
  677. t.Error("OffsetCommit request got no response!")
  678. }
  679. }},
  680. {V0_10_0_0,
  681. "OffsetRequest",
  682. []byte{0x00, 0x00, 0x00, 0x00},
  683. func(t *testing.T, broker *Broker) {
  684. request := OffsetRequest{}
  685. response, err := broker.GetAvailableOffsets(&request)
  686. if err != nil {
  687. t.Error(err)
  688. }
  689. if response == nil {
  690. t.Error("Offset request got no response!")
  691. }
  692. }},
  693. {V0_10_0_0,
  694. "JoinGroupRequest",
  695. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  696. func(t *testing.T, broker *Broker) {
  697. request := JoinGroupRequest{}
  698. response, err := broker.JoinGroup(&request)
  699. if err != nil {
  700. t.Error(err)
  701. }
  702. if response == nil {
  703. t.Error("JoinGroup request got no response!")
  704. }
  705. }},
  706. {V0_10_0_0,
  707. "SyncGroupRequest",
  708. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  709. func(t *testing.T, broker *Broker) {
  710. request := SyncGroupRequest{}
  711. response, err := broker.SyncGroup(&request)
  712. if err != nil {
  713. t.Error(err)
  714. }
  715. if response == nil {
  716. t.Error("SyncGroup request got no response!")
  717. }
  718. }},
  719. {V0_10_0_0,
  720. "LeaveGroupRequest",
  721. []byte{0x00, 0x00},
  722. func(t *testing.T, broker *Broker) {
  723. request := LeaveGroupRequest{}
  724. response, err := broker.LeaveGroup(&request)
  725. if err != nil {
  726. t.Error(err)
  727. }
  728. if response == nil {
  729. t.Error("LeaveGroup request got no response!")
  730. }
  731. }},
  732. {V0_10_0_0,
  733. "HeartbeatRequest",
  734. []byte{0x00, 0x00},
  735. func(t *testing.T, broker *Broker) {
  736. request := HeartbeatRequest{}
  737. response, err := broker.Heartbeat(&request)
  738. if err != nil {
  739. t.Error(err)
  740. }
  741. if response == nil {
  742. t.Error("Heartbeat request got no response!")
  743. }
  744. }},
  745. {V0_10_0_0,
  746. "ListGroupsRequest",
  747. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  748. func(t *testing.T, broker *Broker) {
  749. request := ListGroupsRequest{}
  750. response, err := broker.ListGroups(&request)
  751. if err != nil {
  752. t.Error(err)
  753. }
  754. if response == nil {
  755. t.Error("ListGroups request got no response!")
  756. }
  757. }},
  758. {V0_10_0_0,
  759. "DescribeGroupsRequest",
  760. []byte{0x00, 0x00, 0x00, 0x00},
  761. func(t *testing.T, broker *Broker) {
  762. request := DescribeGroupsRequest{}
  763. response, err := broker.DescribeGroups(&request)
  764. if err != nil {
  765. t.Error(err)
  766. }
  767. if response == nil {
  768. t.Error("DescribeGroups request got no response!")
  769. }
  770. }},
  771. {V0_10_0_0,
  772. "ApiVersionsRequest",
  773. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  774. func(t *testing.T, broker *Broker) {
  775. request := ApiVersionsRequest{}
  776. response, err := broker.ApiVersions(&request)
  777. if err != nil {
  778. t.Error(err)
  779. }
  780. if response == nil {
  781. t.Error("ApiVersions request got no response!")
  782. }
  783. }},
  784. {V1_1_0_0,
  785. "DeleteGroupsRequest",
  786. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  787. func(t *testing.T, broker *Broker) {
  788. request := DeleteGroupsRequest{}
  789. response, err := broker.DeleteGroups(&request)
  790. if err != nil {
  791. t.Error(err)
  792. }
  793. if response == nil {
  794. t.Error("DeleteGroups request got no response!")
  795. }
  796. }},
  797. }
  798. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  799. metricValidators := newMetricValidators()
  800. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  801. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  802. // Check that the number of bytes sent corresponds to what the mock broker received
  803. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  804. if mockBrokerBytesWritten == 0 {
  805. // This a ProduceRequest with NoResponse
  806. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  807. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  808. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  809. } else {
  810. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  811. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  812. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  813. }
  814. // Check that the number of bytes received corresponds to what the mock broker sent
  815. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  816. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  817. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  818. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  819. // Run the validators
  820. metricValidators.run(t, broker.conf.MetricRegistry)
  821. }