broker_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. "github.com/rcrowley/go-metrics"
  10. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  11. )
  12. func ExampleBroker() {
  13. broker := NewBroker("localhost:9092")
  14. err := broker.Open(nil)
  15. if err != nil {
  16. panic(err)
  17. }
  18. request := MetadataRequest{Topics: []string{"myTopic"}}
  19. response, err := broker.GetMetadata(&request)
  20. if err != nil {
  21. _ = broker.Close()
  22. panic(err)
  23. }
  24. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  25. if err = broker.Close(); err != nil {
  26. panic(err)
  27. }
  28. }
  29. type mockEncoder struct {
  30. bytes []byte
  31. }
  32. func (m mockEncoder) encode(pe packetEncoder) error {
  33. return pe.putRawBytes(m.bytes)
  34. }
  35. type brokerMetrics struct {
  36. bytesRead int
  37. bytesWritten int
  38. }
  39. func TestBrokerAccessors(t *testing.T) {
  40. broker := NewBroker("abc:123")
  41. if broker.ID() != -1 {
  42. t.Error("New broker didn't have an ID of -1.")
  43. }
  44. if broker.Addr() != "abc:123" {
  45. t.Error("New broker didn't have the correct address")
  46. }
  47. if broker.Rack() != "" {
  48. t.Error("New broker didn't have an unknown rack.")
  49. }
  50. broker.id = 34
  51. if broker.ID() != 34 {
  52. t.Error("Manually setting broker ID did not take effect.")
  53. }
  54. rack := "dc1"
  55. broker.rack = &rack
  56. if broker.Rack() != rack {
  57. t.Error("Manually setting broker rack did not take effect.")
  58. }
  59. }
  60. func TestSimpleBrokerCommunication(t *testing.T) {
  61. for _, tt := range brokerTestTable {
  62. Logger.Printf("Testing broker communication for %s", tt.name)
  63. mb := NewMockBroker(t, 0)
  64. mb.Returns(&mockEncoder{tt.response})
  65. pendingNotify := make(chan brokerMetrics)
  66. // Register a callback to be notified about successful requests
  67. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  68. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  69. })
  70. broker := NewBroker(mb.Addr())
  71. // Set the broker id in order to validate local broker metrics
  72. broker.id = 0
  73. conf := NewConfig()
  74. conf.Version = tt.version
  75. err := broker.Open(conf)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. tt.runner(t, broker)
  80. // Wait up to 500 ms for the remote broker to process the request and
  81. // notify us about the metrics
  82. timeout := 500 * time.Millisecond
  83. select {
  84. case mockBrokerMetrics := <-pendingNotify:
  85. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  86. case <-time.After(timeout):
  87. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  88. }
  89. mb.Close()
  90. err = broker.Close()
  91. if err != nil {
  92. t.Error(err)
  93. }
  94. }
  95. }
  96. var ErrTokenFailure = errors.New("Failure generating token")
  97. type TokenProvider struct {
  98. accessToken *AccessToken
  99. err error
  100. }
  101. func (t *TokenProvider) Token() (*AccessToken, error) {
  102. return t.accessToken, t.err
  103. }
  104. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  105. return &TokenProvider{
  106. accessToken: token,
  107. err: err,
  108. }
  109. }
  110. func TestSASLOAuthBearer(t *testing.T) {
  111. testTable := []struct {
  112. name string
  113. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  114. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  115. expectClientErr bool // Expect an internal client-side error
  116. expectedBrokerError KError // Expected Kafka error returned by client
  117. tokProvider *TokenProvider
  118. }{
  119. {
  120. name: "SASL/OAUTHBEARER OK server response",
  121. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  122. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  123. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  124. expectClientErr: false,
  125. expectedBrokerError: ErrNoError,
  126. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  127. },
  128. {
  129. name: "SASL/OAUTHBEARER authentication failure response",
  130. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  131. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  132. mockSASLAuthResponse: NewMockSequence(
  133. // First, the broker response with a challenge
  134. NewMockSaslAuthenticateResponse(t).
  135. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  136. // Next, the client terminates the token exchange. Finally, the
  137. // broker responds with an error message.
  138. NewMockSaslAuthenticateResponse(t).
  139. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  140. SetError(ErrSASLAuthenticationFailed),
  141. ),
  142. expectClientErr: true,
  143. expectedBrokerError: ErrSASLAuthenticationFailed,
  144. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  145. },
  146. {
  147. name: "SASL/OAUTHBEARER handshake failure response",
  148. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  149. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  150. SetError(ErrSASLAuthenticationFailed),
  151. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  152. expectClientErr: true,
  153. expectedBrokerError: ErrSASLAuthenticationFailed,
  154. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  155. },
  156. {
  157. name: "SASL/OAUTHBEARER token generation error",
  158. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  159. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  160. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  161. expectClientErr: true,
  162. expectedBrokerError: ErrNoError,
  163. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  164. },
  165. {
  166. name: "SASL/OAUTHBEARER invalid extension",
  167. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  168. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  169. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  170. expectClientErr: true,
  171. expectedBrokerError: ErrNoError,
  172. tokProvider: newTokenProvider(&AccessToken{
  173. Token: "access-token-123",
  174. Extensions: map[string]string{"auth": "auth-value"},
  175. }, nil),
  176. },
  177. }
  178. for i, test := range testTable {
  179. // mockBroker mocks underlying network logic and broker responses
  180. mockBroker := NewMockBroker(t, 0)
  181. mockBroker.SetHandlerByMap(map[string]MockResponse{
  182. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  183. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  184. })
  185. // broker executes SASL requests against mockBroker
  186. broker := NewBroker(mockBroker.Addr())
  187. broker.requestRate = metrics.NilMeter{}
  188. broker.outgoingByteRate = metrics.NilMeter{}
  189. broker.incomingByteRate = metrics.NilMeter{}
  190. broker.requestSize = metrics.NilHistogram{}
  191. broker.responseSize = metrics.NilHistogram{}
  192. broker.responseRate = metrics.NilMeter{}
  193. broker.requestLatency = metrics.NilHistogram{}
  194. conf := NewConfig()
  195. conf.Net.SASL.Mechanism = SASLTypeOAuth
  196. conf.Net.SASL.TokenProvider = test.tokProvider
  197. broker.conf = conf
  198. dialer := net.Dialer{
  199. Timeout: conf.Net.DialTimeout,
  200. KeepAlive: conf.Net.KeepAlive,
  201. LocalAddr: conf.Net.LocalAddr,
  202. }
  203. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. broker.conn = conn
  208. err = broker.authenticateViaSASL()
  209. if test.expectedBrokerError != ErrNoError {
  210. if test.expectedBrokerError != err {
  211. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  212. }
  213. } else if test.expectedBrokerError != ErrNoError {
  214. if test.expectedBrokerError != err {
  215. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.expectedBrokerError, err)
  216. }
  217. } else if test.expectClientErr && err == nil {
  218. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  219. } else if !test.expectClientErr && err != nil {
  220. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  221. }
  222. mockBroker.Close()
  223. }
  224. }
  225. // A mock scram client.
  226. type MockSCRAMClient struct {
  227. done bool
  228. }
  229. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  230. return nil
  231. }
  232. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  233. if challenge == "" {
  234. return "ping", nil
  235. }
  236. if challenge == "pong" {
  237. m.done = true
  238. return "", nil
  239. }
  240. return "", errors.New("failed to authenticate :(")
  241. }
  242. func (m *MockSCRAMClient) Done() bool {
  243. return m.done
  244. }
  245. var _ SCRAMClient = &MockSCRAMClient{}
  246. func TestSASLSCRAMSHAXXX(t *testing.T) {
  247. testTable := []struct {
  248. name string
  249. mockHandshakeErr KError
  250. mockSASLAuthErr KError
  251. expectClientErr bool
  252. scramClient *MockSCRAMClient
  253. scramChallengeResp string
  254. }{
  255. {
  256. name: "SASL/SCRAMSHAXXX successfull authentication",
  257. mockHandshakeErr: ErrNoError,
  258. scramClient: &MockSCRAMClient{},
  259. scramChallengeResp: "pong",
  260. },
  261. {
  262. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  263. mockHandshakeErr: ErrNoError,
  264. mockSASLAuthErr: ErrNoError,
  265. scramClient: &MockSCRAMClient{},
  266. scramChallengeResp: "gong",
  267. expectClientErr: true,
  268. },
  269. {
  270. name: "SASL/SCRAMSHAXXX server authentication error",
  271. mockHandshakeErr: ErrNoError,
  272. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  273. scramClient: &MockSCRAMClient{},
  274. scramChallengeResp: "pong",
  275. },
  276. {
  277. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  278. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  279. mockSASLAuthErr: ErrNoError,
  280. scramClient: &MockSCRAMClient{},
  281. scramChallengeResp: "pong",
  282. },
  283. }
  284. for i, test := range testTable {
  285. // mockBroker mocks underlying network logic and broker responses
  286. mockBroker := NewMockBroker(t, 0)
  287. broker := NewBroker(mockBroker.Addr())
  288. // broker executes SASL requests against mockBroker
  289. broker.requestRate = metrics.NilMeter{}
  290. broker.outgoingByteRate = metrics.NilMeter{}
  291. broker.incomingByteRate = metrics.NilMeter{}
  292. broker.requestSize = metrics.NilHistogram{}
  293. broker.responseSize = metrics.NilHistogram{}
  294. broker.responseRate = metrics.NilMeter{}
  295. broker.requestLatency = metrics.NilHistogram{}
  296. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  297. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  298. if test.mockSASLAuthErr != ErrNoError {
  299. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  300. }
  301. if test.mockHandshakeErr != ErrNoError {
  302. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  303. }
  304. mockBroker.SetHandlerByMap(map[string]MockResponse{
  305. "SaslAuthenticateRequest": mockSASLAuthResponse,
  306. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  307. })
  308. conf := NewConfig()
  309. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  310. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  311. broker.conf = conf
  312. dialer := net.Dialer{
  313. Timeout: conf.Net.DialTimeout,
  314. KeepAlive: conf.Net.KeepAlive,
  315. LocalAddr: conf.Net.LocalAddr,
  316. }
  317. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. broker.conn = conn
  322. err = broker.authenticateViaSASL()
  323. if test.mockSASLAuthErr != ErrNoError {
  324. if test.mockSASLAuthErr != err {
  325. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  326. }
  327. } else if test.mockHandshakeErr != ErrNoError {
  328. if test.mockHandshakeErr != err {
  329. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  330. }
  331. } else if test.expectClientErr && err == nil {
  332. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  333. } else if !test.expectClientErr && err != nil {
  334. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  335. }
  336. mockBroker.Close()
  337. }
  338. }
  339. func TestSASLPlainAuth(t *testing.T) {
  340. testTable := []struct {
  341. name string
  342. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  343. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  344. expectClientErr bool // Expect an internal client-side error
  345. }{
  346. {
  347. name: "SASL Plain OK server response",
  348. mockAuthErr: ErrNoError,
  349. mockHandshakeErr: ErrNoError,
  350. },
  351. {
  352. name: "SASL Plain authentication failure response",
  353. mockAuthErr: ErrSASLAuthenticationFailed,
  354. mockHandshakeErr: ErrNoError,
  355. },
  356. {
  357. name: "SASL Plain handshake failure response",
  358. mockAuthErr: ErrNoError,
  359. mockHandshakeErr: ErrSASLAuthenticationFailed,
  360. },
  361. }
  362. for i, test := range testTable {
  363. // mockBroker mocks underlying network logic and broker responses
  364. mockBroker := NewMockBroker(t, 0)
  365. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  366. SetAuthBytes([]byte(`response_payload`))
  367. if test.mockAuthErr != ErrNoError {
  368. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  369. }
  370. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  371. SetEnabledMechanisms([]string{SASLTypePlaintext})
  372. if test.mockHandshakeErr != ErrNoError {
  373. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  374. }
  375. mockBroker.SetHandlerByMap(map[string]MockResponse{
  376. "SaslAuthenticateRequest": mockSASLAuthResponse,
  377. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  378. })
  379. // broker executes SASL requests against mockBroker
  380. broker := NewBroker(mockBroker.Addr())
  381. broker.requestRate = metrics.NilMeter{}
  382. broker.outgoingByteRate = metrics.NilMeter{}
  383. broker.incomingByteRate = metrics.NilMeter{}
  384. broker.requestSize = metrics.NilHistogram{}
  385. broker.responseSize = metrics.NilHistogram{}
  386. broker.responseRate = metrics.NilMeter{}
  387. broker.requestLatency = metrics.NilHistogram{}
  388. conf := NewConfig()
  389. conf.Net.SASL.Mechanism = SASLTypePlaintext
  390. conf.Net.SASL.User = "token"
  391. conf.Net.SASL.Password = "password"
  392. conf.Net.SASL.Version = SASLHandshakeV1
  393. broker.conf = conf
  394. broker.conf.Version = V1_0_0_0
  395. dialer := net.Dialer{
  396. Timeout: conf.Net.DialTimeout,
  397. KeepAlive: conf.Net.KeepAlive,
  398. LocalAddr: conf.Net.LocalAddr,
  399. }
  400. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. broker.conn = conn
  405. err = broker.authenticateViaSASL()
  406. if test.mockAuthErr != ErrNoError {
  407. if test.mockAuthErr != err {
  408. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  409. }
  410. } else if test.mockHandshakeErr != ErrNoError {
  411. if test.mockHandshakeErr != err {
  412. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  413. }
  414. } else if test.expectClientErr && err == nil {
  415. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  416. } else if !test.expectClientErr && err != nil {
  417. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  418. }
  419. mockBroker.Close()
  420. }
  421. }
  422. // TestSASLReadTimeout ensures that the broker connection won't block forever
  423. // if the remote end never responds after the handshake
  424. func TestSASLReadTimeout(t *testing.T) {
  425. mockBroker := NewMockBroker(t, 0)
  426. defer mockBroker.Close()
  427. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  428. SetAuthBytes([]byte(`response_payload`))
  429. mockBroker.SetHandlerByMap(map[string]MockResponse{
  430. "SaslAuthenticateRequest": mockSASLAuthResponse,
  431. })
  432. broker := NewBroker(mockBroker.Addr())
  433. {
  434. broker.requestRate = metrics.NilMeter{}
  435. broker.outgoingByteRate = metrics.NilMeter{}
  436. broker.incomingByteRate = metrics.NilMeter{}
  437. broker.requestSize = metrics.NilHistogram{}
  438. broker.responseSize = metrics.NilHistogram{}
  439. broker.responseRate = metrics.NilMeter{}
  440. broker.requestLatency = metrics.NilHistogram{}
  441. }
  442. conf := NewConfig()
  443. {
  444. conf.Net.ReadTimeout = time.Millisecond
  445. conf.Net.SASL.Mechanism = SASLTypePlaintext
  446. conf.Net.SASL.User = "token"
  447. conf.Net.SASL.Password = "password"
  448. conf.Net.SASL.Version = SASLHandshakeV1
  449. }
  450. broker.conf = conf
  451. broker.conf.Version = V1_0_0_0
  452. dialer := net.Dialer{}
  453. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. broker.conn = conn
  458. err = broker.authenticateViaSASL()
  459. if err == nil {
  460. t.Errorf("should never happen - expected read timeout")
  461. }
  462. }
  463. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  464. testTable := []struct {
  465. name string
  466. error error
  467. mockKerberosClient bool
  468. errorStage string
  469. badResponse bool
  470. badKeyChecksum bool
  471. }{
  472. {
  473. name: "Kerberos authentication success",
  474. error: nil,
  475. mockKerberosClient: true,
  476. },
  477. {
  478. name: "Kerberos login fails",
  479. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  480. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  481. "cation information was invalid - PREAUTH_FAILED"),
  482. mockKerberosClient: true,
  483. errorStage: "login",
  484. },
  485. {
  486. name: "Kerberos service ticket fails",
  487. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  488. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  489. "cation information was invalid - PREAUTH_FAILED"),
  490. mockKerberosClient: true,
  491. errorStage: "service_ticket",
  492. },
  493. {
  494. name: "Kerberos client creation fails",
  495. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  496. },
  497. {
  498. name: "Bad server response, unmarshall key error",
  499. error: errors.New("bytes shorter than header length"),
  500. badResponse: true,
  501. mockKerberosClient: true,
  502. },
  503. {
  504. name: "Bad token checksum",
  505. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  506. badResponse: false,
  507. badKeyChecksum: true,
  508. mockKerberosClient: true,
  509. },
  510. }
  511. for i, test := range testTable {
  512. mockBroker := NewMockBroker(t, 0)
  513. // broker executes SASL requests against mockBroker
  514. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  515. return nil
  516. })
  517. broker := NewBroker(mockBroker.Addr())
  518. broker.requestRate = metrics.NilMeter{}
  519. broker.outgoingByteRate = metrics.NilMeter{}
  520. broker.incomingByteRate = metrics.NilMeter{}
  521. broker.requestSize = metrics.NilHistogram{}
  522. broker.responseSize = metrics.NilHistogram{}
  523. broker.responseRate = metrics.NilMeter{}
  524. broker.requestLatency = metrics.NilHistogram{}
  525. conf := NewConfig()
  526. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  527. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  528. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  529. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  530. conf.Net.SASL.GSSAPI.Username = "kafka"
  531. conf.Net.SASL.GSSAPI.Password = "kafka"
  532. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  533. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  534. broker.conf = conf
  535. broker.conf.Version = V1_0_0_0
  536. dialer := net.Dialer{
  537. Timeout: conf.Net.DialTimeout,
  538. KeepAlive: conf.Net.KeepAlive,
  539. LocalAddr: conf.Net.LocalAddr,
  540. }
  541. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. gssapiHandler := KafkaGSSAPIHandler{
  546. client: &MockKerberosClient{},
  547. badResponse: test.badResponse,
  548. badKeyChecksum: test.badKeyChecksum,
  549. }
  550. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  551. broker.conn = conn
  552. if test.mockKerberosClient {
  553. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  554. return &MockKerberosClient{
  555. mockError: test.error,
  556. errorStage: test.errorStage,
  557. }, nil
  558. }
  559. } else {
  560. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  561. }
  562. err = broker.authenticateViaSASL()
  563. if err != nil && test.error != nil {
  564. if test.error.Error() != err.Error() {
  565. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  566. }
  567. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  568. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  569. }
  570. mockBroker.Close()
  571. }
  572. }
  573. func TestBuildClientFirstMessage(t *testing.T) {
  574. testTable := []struct {
  575. name string
  576. token *AccessToken
  577. expected []byte
  578. expectError bool
  579. }{
  580. {
  581. name: "Build SASL client initial response with two extensions",
  582. token: &AccessToken{
  583. Token: "the-token",
  584. Extensions: map[string]string{
  585. "x": "1",
  586. "y": "2",
  587. },
  588. },
  589. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  590. },
  591. {
  592. name: "Build SASL client initial response with no extensions",
  593. token: &AccessToken{Token: "the-token"},
  594. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  595. },
  596. {
  597. name: "Build SASL client initial response using reserved extension",
  598. token: &AccessToken{
  599. Token: "the-token",
  600. Extensions: map[string]string{
  601. "auth": "auth-value",
  602. },
  603. },
  604. expected: []byte(""),
  605. expectError: true,
  606. },
  607. }
  608. for i, test := range testTable {
  609. actual, err := buildClientFirstMessage(test.token)
  610. if !reflect.DeepEqual(test.expected, actual) {
  611. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  612. }
  613. if test.expectError && err == nil {
  614. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  615. }
  616. if !test.expectError && err != nil {
  617. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  618. }
  619. }
  620. }
  621. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  622. var brokerTestTable = []struct {
  623. version KafkaVersion
  624. name string
  625. response []byte
  626. runner func(*testing.T, *Broker)
  627. }{
  628. {V0_10_0_0,
  629. "MetadataRequest",
  630. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  631. func(t *testing.T, broker *Broker) {
  632. request := MetadataRequest{}
  633. response, err := broker.GetMetadata(&request)
  634. if err != nil {
  635. t.Error(err)
  636. }
  637. if response == nil {
  638. t.Error("Metadata request got no response!")
  639. }
  640. }},
  641. {V0_10_0_0,
  642. "ConsumerMetadataRequest",
  643. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  644. func(t *testing.T, broker *Broker) {
  645. request := ConsumerMetadataRequest{}
  646. response, err := broker.GetConsumerMetadata(&request)
  647. if err != nil {
  648. t.Error(err)
  649. }
  650. if response == nil {
  651. t.Error("Consumer Metadata request got no response!")
  652. }
  653. }},
  654. {V0_10_0_0,
  655. "ProduceRequest (NoResponse)",
  656. []byte{},
  657. func(t *testing.T, broker *Broker) {
  658. request := ProduceRequest{}
  659. request.RequiredAcks = NoResponse
  660. response, err := broker.Produce(&request)
  661. if err != nil {
  662. t.Error(err)
  663. }
  664. if response != nil {
  665. t.Error("Produce request with NoResponse got a response!")
  666. }
  667. }},
  668. {V0_10_0_0,
  669. "ProduceRequest (WaitForLocal)",
  670. []byte{0x00, 0x00, 0x00, 0x00},
  671. func(t *testing.T, broker *Broker) {
  672. request := ProduceRequest{}
  673. request.RequiredAcks = WaitForLocal
  674. response, err := broker.Produce(&request)
  675. if err != nil {
  676. t.Error(err)
  677. }
  678. if response == nil {
  679. t.Error("Produce request without NoResponse got no response!")
  680. }
  681. }},
  682. {V0_10_0_0,
  683. "FetchRequest",
  684. []byte{0x00, 0x00, 0x00, 0x00},
  685. func(t *testing.T, broker *Broker) {
  686. request := FetchRequest{}
  687. response, err := broker.Fetch(&request)
  688. if err != nil {
  689. t.Error(err)
  690. }
  691. if response == nil {
  692. t.Error("Fetch request got no response!")
  693. }
  694. }},
  695. {V0_10_0_0,
  696. "OffsetFetchRequest",
  697. []byte{0x00, 0x00, 0x00, 0x00},
  698. func(t *testing.T, broker *Broker) {
  699. request := OffsetFetchRequest{}
  700. response, err := broker.FetchOffset(&request)
  701. if err != nil {
  702. t.Error(err)
  703. }
  704. if response == nil {
  705. t.Error("OffsetFetch request got no response!")
  706. }
  707. }},
  708. {V0_10_0_0,
  709. "OffsetCommitRequest",
  710. []byte{0x00, 0x00, 0x00, 0x00},
  711. func(t *testing.T, broker *Broker) {
  712. request := OffsetCommitRequest{}
  713. response, err := broker.CommitOffset(&request)
  714. if err != nil {
  715. t.Error(err)
  716. }
  717. if response == nil {
  718. t.Error("OffsetCommit request got no response!")
  719. }
  720. }},
  721. {V0_10_0_0,
  722. "OffsetRequest",
  723. []byte{0x00, 0x00, 0x00, 0x00},
  724. func(t *testing.T, broker *Broker) {
  725. request := OffsetRequest{}
  726. response, err := broker.GetAvailableOffsets(&request)
  727. if err != nil {
  728. t.Error(err)
  729. }
  730. if response == nil {
  731. t.Error("Offset request got no response!")
  732. }
  733. }},
  734. {V0_10_0_0,
  735. "JoinGroupRequest",
  736. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  737. func(t *testing.T, broker *Broker) {
  738. request := JoinGroupRequest{}
  739. response, err := broker.JoinGroup(&request)
  740. if err != nil {
  741. t.Error(err)
  742. }
  743. if response == nil {
  744. t.Error("JoinGroup request got no response!")
  745. }
  746. }},
  747. {V0_10_0_0,
  748. "SyncGroupRequest",
  749. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  750. func(t *testing.T, broker *Broker) {
  751. request := SyncGroupRequest{}
  752. response, err := broker.SyncGroup(&request)
  753. if err != nil {
  754. t.Error(err)
  755. }
  756. if response == nil {
  757. t.Error("SyncGroup request got no response!")
  758. }
  759. }},
  760. {V0_10_0_0,
  761. "LeaveGroupRequest",
  762. []byte{0x00, 0x00},
  763. func(t *testing.T, broker *Broker) {
  764. request := LeaveGroupRequest{}
  765. response, err := broker.LeaveGroup(&request)
  766. if err != nil {
  767. t.Error(err)
  768. }
  769. if response == nil {
  770. t.Error("LeaveGroup request got no response!")
  771. }
  772. }},
  773. {V0_10_0_0,
  774. "HeartbeatRequest",
  775. []byte{0x00, 0x00},
  776. func(t *testing.T, broker *Broker) {
  777. request := HeartbeatRequest{}
  778. response, err := broker.Heartbeat(&request)
  779. if err != nil {
  780. t.Error(err)
  781. }
  782. if response == nil {
  783. t.Error("Heartbeat request got no response!")
  784. }
  785. }},
  786. {V0_10_0_0,
  787. "ListGroupsRequest",
  788. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  789. func(t *testing.T, broker *Broker) {
  790. request := ListGroupsRequest{}
  791. response, err := broker.ListGroups(&request)
  792. if err != nil {
  793. t.Error(err)
  794. }
  795. if response == nil {
  796. t.Error("ListGroups request got no response!")
  797. }
  798. }},
  799. {V0_10_0_0,
  800. "DescribeGroupsRequest",
  801. []byte{0x00, 0x00, 0x00, 0x00},
  802. func(t *testing.T, broker *Broker) {
  803. request := DescribeGroupsRequest{}
  804. response, err := broker.DescribeGroups(&request)
  805. if err != nil {
  806. t.Error(err)
  807. }
  808. if response == nil {
  809. t.Error("DescribeGroups request got no response!")
  810. }
  811. }},
  812. {V0_10_0_0,
  813. "ApiVersionsRequest",
  814. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  815. func(t *testing.T, broker *Broker) {
  816. request := ApiVersionsRequest{}
  817. response, err := broker.ApiVersions(&request)
  818. if err != nil {
  819. t.Error(err)
  820. }
  821. if response == nil {
  822. t.Error("ApiVersions request got no response!")
  823. }
  824. }},
  825. {V1_1_0_0,
  826. "DeleteGroupsRequest",
  827. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  828. func(t *testing.T, broker *Broker) {
  829. request := DeleteGroupsRequest{}
  830. response, err := broker.DeleteGroups(&request)
  831. if err != nil {
  832. t.Error(err)
  833. }
  834. if response == nil {
  835. t.Error("DeleteGroups request got no response!")
  836. }
  837. }},
  838. }
  839. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  840. metricValidators := newMetricValidators()
  841. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  842. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  843. // Check that the number of bytes sent corresponds to what the mock broker received
  844. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  845. if mockBrokerBytesWritten == 0 {
  846. // This a ProduceRequest with NoResponse
  847. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  848. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  849. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  850. } else {
  851. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  852. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  853. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  854. }
  855. // Check that the number of bytes received corresponds to what the mock broker sent
  856. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  857. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  858. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  859. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  860. // Run the validators
  861. metricValidators.run(t, broker.conf.MetricRegistry)
  862. }