broker_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. package sarama
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/rcrowley/go-metrics"
  11. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  12. )
  13. func ExampleBroker() {
  14. broker := NewBroker("localhost:9092")
  15. err := broker.Open(nil)
  16. if err != nil {
  17. panic(err)
  18. }
  19. request := MetadataRequest{Topics: []string{"myTopic"}}
  20. response, err := broker.GetMetadata(&request)
  21. if err != nil {
  22. _ = broker.Close()
  23. panic(err)
  24. }
  25. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  26. if err = broker.Close(); err != nil {
  27. panic(err)
  28. }
  29. }
  30. type mockEncoder struct {
  31. bytes []byte
  32. }
  33. func (m mockEncoder) encode(pe packetEncoder) error {
  34. return pe.putRawBytes(m.bytes)
  35. }
  36. type brokerMetrics struct {
  37. bytesRead int
  38. bytesWritten int
  39. }
  40. func TestBrokerAccessors(t *testing.T) {
  41. broker := NewBroker("abc:123")
  42. if broker.ID() != -1 {
  43. t.Error("New broker didn't have an ID of -1.")
  44. }
  45. if broker.Addr() != "abc:123" {
  46. t.Error("New broker didn't have the correct address")
  47. }
  48. if broker.Rack() != "" {
  49. t.Error("New broker didn't have an unknown rack.")
  50. }
  51. broker.id = 34
  52. if broker.ID() != 34 {
  53. t.Error("Manually setting broker ID did not take effect.")
  54. }
  55. rack := "dc1"
  56. broker.rack = &rack
  57. if broker.Rack() != rack {
  58. t.Error("Manually setting broker rack did not take effect.")
  59. }
  60. }
  61. func TestSimpleBrokerCommunication(t *testing.T) {
  62. for _, tt := range brokerTestTable {
  63. Logger.Printf("Testing broker communication for %s", tt.name)
  64. mb := NewMockBroker(t, 0)
  65. mb.Returns(&mockEncoder{tt.response})
  66. pendingNotify := make(chan brokerMetrics)
  67. // Register a callback to be notified about successful requests
  68. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  69. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  70. })
  71. broker := NewBroker(mb.Addr())
  72. // Set the broker id in order to validate local broker metrics
  73. broker.id = 0
  74. conf := NewConfig()
  75. conf.Version = tt.version
  76. err := broker.Open(conf)
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. tt.runner(t, broker)
  81. // Wait up to 500 ms for the remote broker to process the request and
  82. // notify us about the metrics
  83. timeout := 500 * time.Millisecond
  84. select {
  85. case mockBrokerMetrics := <-pendingNotify:
  86. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  87. case <-time.After(timeout):
  88. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  89. }
  90. mb.Close()
  91. err = broker.Close()
  92. if err != nil {
  93. t.Error(err)
  94. }
  95. }
  96. }
  97. var ErrTokenFailure = errors.New("Failure generating token")
  98. type TokenProvider struct {
  99. accessToken *AccessToken
  100. err error
  101. }
  102. func (t *TokenProvider) Token() (*AccessToken, error) {
  103. return t.accessToken, t.err
  104. }
  105. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  106. return &TokenProvider{
  107. accessToken: token,
  108. err: err,
  109. }
  110. }
  111. func TestSASLOAuthBearer(t *testing.T) {
  112. testTable := []struct {
  113. name string
  114. authidentity string
  115. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  116. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  117. expectClientErr bool // Expect an internal client-side error
  118. expectedBrokerError KError // Expected Kafka error returned by client
  119. tokProvider *TokenProvider
  120. }{
  121. {
  122. name: "SASL/OAUTHBEARER OK server response",
  123. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  124. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  125. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  126. expectClientErr: false,
  127. expectedBrokerError: ErrNoError,
  128. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  129. },
  130. {
  131. name: "SASL/OAUTHBEARER authentication failure response",
  132. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  133. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  134. mockSASLAuthResponse: NewMockSequence(
  135. // First, the broker response with a challenge
  136. NewMockSaslAuthenticateResponse(t).
  137. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  138. // Next, the client terminates the token exchange. Finally, the
  139. // broker responds with an error message.
  140. NewMockSaslAuthenticateResponse(t).
  141. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  142. SetError(ErrSASLAuthenticationFailed),
  143. ),
  144. expectClientErr: true,
  145. expectedBrokerError: ErrSASLAuthenticationFailed,
  146. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  147. },
  148. {
  149. name: "SASL/OAUTHBEARER handshake failure response",
  150. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  151. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  152. SetError(ErrSASLAuthenticationFailed),
  153. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  154. expectClientErr: true,
  155. expectedBrokerError: ErrSASLAuthenticationFailed,
  156. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  157. },
  158. {
  159. name: "SASL/OAUTHBEARER token generation error",
  160. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  161. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  162. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  163. expectClientErr: true,
  164. expectedBrokerError: ErrNoError,
  165. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  166. },
  167. {
  168. name: "SASL/OAUTHBEARER invalid extension",
  169. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  170. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  171. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  172. expectClientErr: true,
  173. expectedBrokerError: ErrNoError,
  174. tokProvider: newTokenProvider(&AccessToken{
  175. Token: "access-token-123",
  176. Extensions: map[string]string{"auth": "auth-value"},
  177. }, nil),
  178. },
  179. }
  180. for i, test := range testTable {
  181. // mockBroker mocks underlying network logic and broker responses
  182. mockBroker := NewMockBroker(t, 0)
  183. mockBroker.SetHandlerByMap(map[string]MockResponse{
  184. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  185. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  186. })
  187. // broker executes SASL requests against mockBroker
  188. broker := NewBroker(mockBroker.Addr())
  189. broker.requestRate = metrics.NilMeter{}
  190. broker.outgoingByteRate = metrics.NilMeter{}
  191. broker.incomingByteRate = metrics.NilMeter{}
  192. broker.requestSize = metrics.NilHistogram{}
  193. broker.responseSize = metrics.NilHistogram{}
  194. broker.responseRate = metrics.NilMeter{}
  195. broker.requestLatency = metrics.NilHistogram{}
  196. conf := NewConfig()
  197. conf.Net.SASL.Mechanism = SASLTypeOAuth
  198. conf.Net.SASL.TokenProvider = test.tokProvider
  199. broker.conf = conf
  200. dialer := net.Dialer{
  201. Timeout: conf.Net.DialTimeout,
  202. KeepAlive: conf.Net.KeepAlive,
  203. LocalAddr: conf.Net.LocalAddr,
  204. }
  205. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. broker.conn = conn
  210. err = broker.authenticateViaSASL()
  211. if test.expectedBrokerError != ErrNoError {
  212. if test.expectedBrokerError != err {
  213. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  214. }
  215. } else if test.expectedBrokerError != ErrNoError {
  216. if test.expectedBrokerError != err {
  217. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.expectedBrokerError, err)
  218. }
  219. } else if test.expectClientErr && err == nil {
  220. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  221. } else if !test.expectClientErr && err != nil {
  222. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  223. }
  224. mockBroker.Close()
  225. }
  226. }
  227. // A mock scram client.
  228. type MockSCRAMClient struct {
  229. done bool
  230. }
  231. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  232. return nil
  233. }
  234. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  235. if challenge == "" {
  236. return "ping", nil
  237. }
  238. if challenge == "pong" {
  239. m.done = true
  240. return "", nil
  241. }
  242. return "", errors.New("failed to authenticate :(")
  243. }
  244. func (m *MockSCRAMClient) Done() bool {
  245. return m.done
  246. }
  247. var _ SCRAMClient = &MockSCRAMClient{}
  248. func TestSASLSCRAMSHAXXX(t *testing.T) {
  249. testTable := []struct {
  250. name string
  251. mockHandshakeErr KError
  252. mockSASLAuthErr KError
  253. expectClientErr bool
  254. scramClient *MockSCRAMClient
  255. scramChallengeResp string
  256. }{
  257. {
  258. name: "SASL/SCRAMSHAXXX successfull authentication",
  259. mockHandshakeErr: ErrNoError,
  260. scramClient: &MockSCRAMClient{},
  261. scramChallengeResp: "pong",
  262. },
  263. {
  264. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  265. mockHandshakeErr: ErrNoError,
  266. mockSASLAuthErr: ErrNoError,
  267. scramClient: &MockSCRAMClient{},
  268. scramChallengeResp: "gong",
  269. expectClientErr: true,
  270. },
  271. {
  272. name: "SASL/SCRAMSHAXXX server authentication error",
  273. mockHandshakeErr: ErrNoError,
  274. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  275. scramClient: &MockSCRAMClient{},
  276. scramChallengeResp: "pong",
  277. },
  278. {
  279. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  280. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  281. mockSASLAuthErr: ErrNoError,
  282. scramClient: &MockSCRAMClient{},
  283. scramChallengeResp: "pong",
  284. },
  285. }
  286. for i, test := range testTable {
  287. // mockBroker mocks underlying network logic and broker responses
  288. mockBroker := NewMockBroker(t, 0)
  289. broker := NewBroker(mockBroker.Addr())
  290. // broker executes SASL requests against mockBroker
  291. broker.requestRate = metrics.NilMeter{}
  292. broker.outgoingByteRate = metrics.NilMeter{}
  293. broker.incomingByteRate = metrics.NilMeter{}
  294. broker.requestSize = metrics.NilHistogram{}
  295. broker.responseSize = metrics.NilHistogram{}
  296. broker.responseRate = metrics.NilMeter{}
  297. broker.requestLatency = metrics.NilHistogram{}
  298. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  299. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  300. if test.mockSASLAuthErr != ErrNoError {
  301. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  302. }
  303. if test.mockHandshakeErr != ErrNoError {
  304. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  305. }
  306. mockBroker.SetHandlerByMap(map[string]MockResponse{
  307. "SaslAuthenticateRequest": mockSASLAuthResponse,
  308. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  309. })
  310. conf := NewConfig()
  311. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  312. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  313. broker.conf = conf
  314. dialer := net.Dialer{
  315. Timeout: conf.Net.DialTimeout,
  316. KeepAlive: conf.Net.KeepAlive,
  317. LocalAddr: conf.Net.LocalAddr,
  318. }
  319. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. broker.conn = conn
  324. err = broker.authenticateViaSASL()
  325. if test.mockSASLAuthErr != ErrNoError {
  326. if test.mockSASLAuthErr != err {
  327. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  328. }
  329. } else if test.mockHandshakeErr != ErrNoError {
  330. if test.mockHandshakeErr != err {
  331. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  332. }
  333. } else if test.expectClientErr && err == nil {
  334. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  335. } else if !test.expectClientErr && err != nil {
  336. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  337. }
  338. mockBroker.Close()
  339. }
  340. }
  341. func TestSASLPlainAuth(t *testing.T) {
  342. testTable := []struct {
  343. name string
  344. authidentity string
  345. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  346. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  347. expectClientErr bool // Expect an internal client-side error
  348. }{
  349. {
  350. name: "SASL Plain OK server response",
  351. mockAuthErr: ErrNoError,
  352. mockHandshakeErr: ErrNoError,
  353. },
  354. {
  355. name: "SASL Plain OK server response with authidentity",
  356. authidentity: "authid",
  357. mockAuthErr: ErrNoError,
  358. mockHandshakeErr: ErrNoError,
  359. },
  360. {
  361. name: "SASL Plain authentication failure response",
  362. mockAuthErr: ErrSASLAuthenticationFailed,
  363. mockHandshakeErr: ErrNoError,
  364. },
  365. {
  366. name: "SASL Plain handshake failure response",
  367. mockAuthErr: ErrNoError,
  368. mockHandshakeErr: ErrSASLAuthenticationFailed,
  369. },
  370. }
  371. for i, test := range testTable {
  372. // mockBroker mocks underlying network logic and broker responses
  373. mockBroker := NewMockBroker(t, 0)
  374. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  375. SetAuthBytes([]byte(`response_payload`))
  376. if test.mockAuthErr != ErrNoError {
  377. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  378. }
  379. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  380. SetEnabledMechanisms([]string{SASLTypePlaintext})
  381. if test.mockHandshakeErr != ErrNoError {
  382. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  383. }
  384. mockBroker.SetHandlerByMap(map[string]MockResponse{
  385. "SaslAuthenticateRequest": mockSASLAuthResponse,
  386. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  387. })
  388. // broker executes SASL requests against mockBroker
  389. broker := NewBroker(mockBroker.Addr())
  390. broker.requestRate = metrics.NilMeter{}
  391. broker.outgoingByteRate = metrics.NilMeter{}
  392. broker.incomingByteRate = metrics.NilMeter{}
  393. broker.requestSize = metrics.NilHistogram{}
  394. broker.responseSize = metrics.NilHistogram{}
  395. broker.responseRate = metrics.NilMeter{}
  396. broker.requestLatency = metrics.NilHistogram{}
  397. conf := NewConfig()
  398. conf.Net.SASL.Mechanism = SASLTypePlaintext
  399. conf.Net.SASL.AuthIdentity = test.authidentity
  400. conf.Net.SASL.User = "token"
  401. conf.Net.SASL.Password = "password"
  402. conf.Net.SASL.Version = SASLHandshakeV1
  403. broker.conf = conf
  404. broker.conf.Version = V1_0_0_0
  405. dialer := net.Dialer{
  406. Timeout: conf.Net.DialTimeout,
  407. KeepAlive: conf.Net.KeepAlive,
  408. LocalAddr: conf.Net.LocalAddr,
  409. }
  410. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  411. if err != nil {
  412. t.Fatal(err)
  413. }
  414. broker.conn = conn
  415. err = broker.authenticateViaSASL()
  416. if err == nil {
  417. for _, rr := range mockBroker.History() {
  418. switch r := rr.Request.(type) {
  419. case *SaslAuthenticateRequest:
  420. x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
  421. if string(x[0]) != conf.Net.SASL.AuthIdentity {
  422. t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
  423. }
  424. if string(x[1]) != conf.Net.SASL.User {
  425. t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
  426. }
  427. if string(x[2]) != conf.Net.SASL.Password {
  428. t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
  429. }
  430. }
  431. }
  432. }
  433. if test.mockAuthErr != ErrNoError {
  434. if test.mockAuthErr != err {
  435. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  436. }
  437. } else if test.mockHandshakeErr != ErrNoError {
  438. if test.mockHandshakeErr != err {
  439. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  440. }
  441. } else if test.expectClientErr && err == nil {
  442. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  443. } else if !test.expectClientErr && err != nil {
  444. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  445. }
  446. mockBroker.Close()
  447. }
  448. }
  449. // TestSASLReadTimeout ensures that the broker connection won't block forever
  450. // if the remote end never responds after the handshake
  451. func TestSASLReadTimeout(t *testing.T) {
  452. mockBroker := NewMockBroker(t, 0)
  453. defer mockBroker.Close()
  454. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  455. SetAuthBytes([]byte(`response_payload`))
  456. mockBroker.SetHandlerByMap(map[string]MockResponse{
  457. "SaslAuthenticateRequest": mockSASLAuthResponse,
  458. })
  459. broker := NewBroker(mockBroker.Addr())
  460. {
  461. broker.requestRate = metrics.NilMeter{}
  462. broker.outgoingByteRate = metrics.NilMeter{}
  463. broker.incomingByteRate = metrics.NilMeter{}
  464. broker.requestSize = metrics.NilHistogram{}
  465. broker.responseSize = metrics.NilHistogram{}
  466. broker.responseRate = metrics.NilMeter{}
  467. broker.requestLatency = metrics.NilHistogram{}
  468. }
  469. conf := NewConfig()
  470. {
  471. conf.Net.ReadTimeout = time.Millisecond
  472. conf.Net.SASL.Mechanism = SASLTypePlaintext
  473. conf.Net.SASL.User = "token"
  474. conf.Net.SASL.Password = "password"
  475. conf.Net.SASL.Version = SASLHandshakeV1
  476. }
  477. broker.conf = conf
  478. broker.conf.Version = V1_0_0_0
  479. dialer := net.Dialer{}
  480. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  481. if err != nil {
  482. t.Fatal(err)
  483. }
  484. broker.conn = conn
  485. err = broker.authenticateViaSASL()
  486. if err == nil {
  487. t.Errorf("should never happen - expected read timeout")
  488. }
  489. }
  490. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  491. testTable := []struct {
  492. name string
  493. error error
  494. mockKerberosClient bool
  495. errorStage string
  496. badResponse bool
  497. badKeyChecksum bool
  498. }{
  499. {
  500. name: "Kerberos authentication success",
  501. error: nil,
  502. mockKerberosClient: true,
  503. },
  504. {
  505. name: "Kerberos login fails",
  506. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  507. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  508. "cation information was invalid - PREAUTH_FAILED"),
  509. mockKerberosClient: true,
  510. errorStage: "login",
  511. },
  512. {
  513. name: "Kerberos service ticket fails",
  514. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  515. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  516. "cation information was invalid - PREAUTH_FAILED"),
  517. mockKerberosClient: true,
  518. errorStage: "service_ticket",
  519. },
  520. {
  521. name: "Kerberos client creation fails",
  522. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  523. },
  524. {
  525. name: "Bad server response, unmarshall key error",
  526. error: errors.New("bytes shorter than header length"),
  527. badResponse: true,
  528. mockKerberosClient: true,
  529. },
  530. {
  531. name: "Bad token checksum",
  532. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  533. badResponse: false,
  534. badKeyChecksum: true,
  535. mockKerberosClient: true,
  536. },
  537. }
  538. for i, test := range testTable {
  539. mockBroker := NewMockBroker(t, 0)
  540. // broker executes SASL requests against mockBroker
  541. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  542. return nil
  543. })
  544. broker := NewBroker(mockBroker.Addr())
  545. broker.requestRate = metrics.NilMeter{}
  546. broker.outgoingByteRate = metrics.NilMeter{}
  547. broker.incomingByteRate = metrics.NilMeter{}
  548. broker.requestSize = metrics.NilHistogram{}
  549. broker.responseSize = metrics.NilHistogram{}
  550. broker.responseRate = metrics.NilMeter{}
  551. broker.requestLatency = metrics.NilHistogram{}
  552. conf := NewConfig()
  553. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  554. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  555. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  556. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  557. conf.Net.SASL.GSSAPI.Username = "kafka"
  558. conf.Net.SASL.GSSAPI.Password = "kafka"
  559. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  560. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  561. broker.conf = conf
  562. broker.conf.Version = V1_0_0_0
  563. dialer := net.Dialer{
  564. Timeout: conf.Net.DialTimeout,
  565. KeepAlive: conf.Net.KeepAlive,
  566. LocalAddr: conf.Net.LocalAddr,
  567. }
  568. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  569. if err != nil {
  570. t.Fatal(err)
  571. }
  572. gssapiHandler := KafkaGSSAPIHandler{
  573. client: &MockKerberosClient{},
  574. badResponse: test.badResponse,
  575. badKeyChecksum: test.badKeyChecksum,
  576. }
  577. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  578. broker.conn = conn
  579. if test.mockKerberosClient {
  580. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  581. return &MockKerberosClient{
  582. mockError: test.error,
  583. errorStage: test.errorStage,
  584. }, nil
  585. }
  586. } else {
  587. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  588. }
  589. err = broker.authenticateViaSASL()
  590. if err != nil && test.error != nil {
  591. if test.error.Error() != err.Error() {
  592. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  593. }
  594. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  595. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  596. }
  597. mockBroker.Close()
  598. }
  599. }
  600. func TestBuildClientFirstMessage(t *testing.T) {
  601. testTable := []struct {
  602. name string
  603. token *AccessToken
  604. expected []byte
  605. expectError bool
  606. }{
  607. {
  608. name: "Build SASL client initial response with two extensions",
  609. token: &AccessToken{
  610. Token: "the-token",
  611. Extensions: map[string]string{
  612. "x": "1",
  613. "y": "2",
  614. },
  615. },
  616. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  617. },
  618. {
  619. name: "Build SASL client initial response with no extensions",
  620. token: &AccessToken{Token: "the-token"},
  621. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  622. },
  623. {
  624. name: "Build SASL client initial response using reserved extension",
  625. token: &AccessToken{
  626. Token: "the-token",
  627. Extensions: map[string]string{
  628. "auth": "auth-value",
  629. },
  630. },
  631. expected: []byte(""),
  632. expectError: true,
  633. },
  634. }
  635. for i, test := range testTable {
  636. actual, err := buildClientFirstMessage(test.token)
  637. if !reflect.DeepEqual(test.expected, actual) {
  638. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  639. }
  640. if test.expectError && err == nil {
  641. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  642. }
  643. if !test.expectError && err != nil {
  644. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  645. }
  646. }
  647. }
  648. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  649. var brokerTestTable = []struct {
  650. version KafkaVersion
  651. name string
  652. response []byte
  653. runner func(*testing.T, *Broker)
  654. }{
  655. {V0_10_0_0,
  656. "MetadataRequest",
  657. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  658. func(t *testing.T, broker *Broker) {
  659. request := MetadataRequest{}
  660. response, err := broker.GetMetadata(&request)
  661. if err != nil {
  662. t.Error(err)
  663. }
  664. if response == nil {
  665. t.Error("Metadata request got no response!")
  666. }
  667. }},
  668. {V0_10_0_0,
  669. "ConsumerMetadataRequest",
  670. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  671. func(t *testing.T, broker *Broker) {
  672. request := ConsumerMetadataRequest{}
  673. response, err := broker.GetConsumerMetadata(&request)
  674. if err != nil {
  675. t.Error(err)
  676. }
  677. if response == nil {
  678. t.Error("Consumer Metadata request got no response!")
  679. }
  680. }},
  681. {V0_10_0_0,
  682. "ProduceRequest (NoResponse)",
  683. []byte{},
  684. func(t *testing.T, broker *Broker) {
  685. request := ProduceRequest{}
  686. request.RequiredAcks = NoResponse
  687. response, err := broker.Produce(&request)
  688. if err != nil {
  689. t.Error(err)
  690. }
  691. if response != nil {
  692. t.Error("Produce request with NoResponse got a response!")
  693. }
  694. }},
  695. {V0_10_0_0,
  696. "ProduceRequest (WaitForLocal)",
  697. []byte{0x00, 0x00, 0x00, 0x00},
  698. func(t *testing.T, broker *Broker) {
  699. request := ProduceRequest{}
  700. request.RequiredAcks = WaitForLocal
  701. response, err := broker.Produce(&request)
  702. if err != nil {
  703. t.Error(err)
  704. }
  705. if response == nil {
  706. t.Error("Produce request without NoResponse got no response!")
  707. }
  708. }},
  709. {V0_10_0_0,
  710. "FetchRequest",
  711. []byte{0x00, 0x00, 0x00, 0x00},
  712. func(t *testing.T, broker *Broker) {
  713. request := FetchRequest{}
  714. response, err := broker.Fetch(&request)
  715. if err != nil {
  716. t.Error(err)
  717. }
  718. if response == nil {
  719. t.Error("Fetch request got no response!")
  720. }
  721. }},
  722. {V0_10_0_0,
  723. "OffsetFetchRequest",
  724. []byte{0x00, 0x00, 0x00, 0x00},
  725. func(t *testing.T, broker *Broker) {
  726. request := OffsetFetchRequest{}
  727. response, err := broker.FetchOffset(&request)
  728. if err != nil {
  729. t.Error(err)
  730. }
  731. if response == nil {
  732. t.Error("OffsetFetch request got no response!")
  733. }
  734. }},
  735. {V0_10_0_0,
  736. "OffsetCommitRequest",
  737. []byte{0x00, 0x00, 0x00, 0x00},
  738. func(t *testing.T, broker *Broker) {
  739. request := OffsetCommitRequest{}
  740. response, err := broker.CommitOffset(&request)
  741. if err != nil {
  742. t.Error(err)
  743. }
  744. if response == nil {
  745. t.Error("OffsetCommit request got no response!")
  746. }
  747. }},
  748. {V0_10_0_0,
  749. "OffsetRequest",
  750. []byte{0x00, 0x00, 0x00, 0x00},
  751. func(t *testing.T, broker *Broker) {
  752. request := OffsetRequest{}
  753. response, err := broker.GetAvailableOffsets(&request)
  754. if err != nil {
  755. t.Error(err)
  756. }
  757. if response == nil {
  758. t.Error("Offset request got no response!")
  759. }
  760. }},
  761. {V0_10_0_0,
  762. "JoinGroupRequest",
  763. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  764. func(t *testing.T, broker *Broker) {
  765. request := JoinGroupRequest{}
  766. response, err := broker.JoinGroup(&request)
  767. if err != nil {
  768. t.Error(err)
  769. }
  770. if response == nil {
  771. t.Error("JoinGroup request got no response!")
  772. }
  773. }},
  774. {V0_10_0_0,
  775. "SyncGroupRequest",
  776. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  777. func(t *testing.T, broker *Broker) {
  778. request := SyncGroupRequest{}
  779. response, err := broker.SyncGroup(&request)
  780. if err != nil {
  781. t.Error(err)
  782. }
  783. if response == nil {
  784. t.Error("SyncGroup request got no response!")
  785. }
  786. }},
  787. {V0_10_0_0,
  788. "LeaveGroupRequest",
  789. []byte{0x00, 0x00},
  790. func(t *testing.T, broker *Broker) {
  791. request := LeaveGroupRequest{}
  792. response, err := broker.LeaveGroup(&request)
  793. if err != nil {
  794. t.Error(err)
  795. }
  796. if response == nil {
  797. t.Error("LeaveGroup request got no response!")
  798. }
  799. }},
  800. {V0_10_0_0,
  801. "HeartbeatRequest",
  802. []byte{0x00, 0x00},
  803. func(t *testing.T, broker *Broker) {
  804. request := HeartbeatRequest{}
  805. response, err := broker.Heartbeat(&request)
  806. if err != nil {
  807. t.Error(err)
  808. }
  809. if response == nil {
  810. t.Error("Heartbeat request got no response!")
  811. }
  812. }},
  813. {V0_10_0_0,
  814. "ListGroupsRequest",
  815. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  816. func(t *testing.T, broker *Broker) {
  817. request := ListGroupsRequest{}
  818. response, err := broker.ListGroups(&request)
  819. if err != nil {
  820. t.Error(err)
  821. }
  822. if response == nil {
  823. t.Error("ListGroups request got no response!")
  824. }
  825. }},
  826. {V0_10_0_0,
  827. "DescribeGroupsRequest",
  828. []byte{0x00, 0x00, 0x00, 0x00},
  829. func(t *testing.T, broker *Broker) {
  830. request := DescribeGroupsRequest{}
  831. response, err := broker.DescribeGroups(&request)
  832. if err != nil {
  833. t.Error(err)
  834. }
  835. if response == nil {
  836. t.Error("DescribeGroups request got no response!")
  837. }
  838. }},
  839. {V0_10_0_0,
  840. "ApiVersionsRequest",
  841. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  842. func(t *testing.T, broker *Broker) {
  843. request := ApiVersionsRequest{}
  844. response, err := broker.ApiVersions(&request)
  845. if err != nil {
  846. t.Error(err)
  847. }
  848. if response == nil {
  849. t.Error("ApiVersions request got no response!")
  850. }
  851. }},
  852. {V1_1_0_0,
  853. "DeleteGroupsRequest",
  854. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  855. func(t *testing.T, broker *Broker) {
  856. request := DeleteGroupsRequest{}
  857. response, err := broker.DeleteGroups(&request)
  858. if err != nil {
  859. t.Error(err)
  860. }
  861. if response == nil {
  862. t.Error("DeleteGroups request got no response!")
  863. }
  864. }},
  865. }
  866. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  867. metricValidators := newMetricValidators()
  868. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  869. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  870. // Check that the number of bytes sent corresponds to what the mock broker received
  871. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  872. if mockBrokerBytesWritten == 0 {
  873. // This a ProduceRequest with NoResponse
  874. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  875. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  876. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  877. } else {
  878. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  879. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  880. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  881. }
  882. // Check that the number of bytes received corresponds to what the mock broker sent
  883. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  884. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  885. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  886. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  887. // Run the validators
  888. metricValidators.run(t, broker.conf.MetricRegistry)
  889. }